Spaces:
Running
Running
fix max_page
Browse files
app.py
CHANGED
@@ -74,24 +74,25 @@ def index(dataset: str, config: str, split: str) -> Tuple[np.ndarray, RowGroupRe
|
|
74 |
raise AppError(f"Invalid config {config}. Available configs are: {', '.join(get_parquet_configs(dataset))}.")
|
75 |
else:
|
76 |
raise AppError(f"Invalid split {split}. Available splits are: {', '.join(get_parquet_splits(dataset, config))}.")
|
77 |
-
|
|
|
78 |
features = Features.from_arrow_schema(all_pf[0].schema.to_arrow_schema())
|
79 |
columns = [col for col in features if all(bad_type not in str(features[col]) for bad_type in ["Image(", "Audio(", "'binary'"])]
|
80 |
info = "" if len(columns) == len(features) else f"Some columns are not supported yet: {sorted(set(features) - set(columns))}"
|
81 |
rg_offsets = np.cumsum([pf.metadata.row_group(i).num_rows for pf in all_pf for i in range(pf.metadata.num_row_groups)])
|
82 |
rg_readers = [partial(pf.read_row_group, i, columns=columns) for pf in all_pf for i in range(pf.metadata.num_row_groups)]
|
83 |
-
max_page = rg_offsets[-1] // PAGE_SIZE
|
84 |
return rg_offsets, rg_readers, max_page, info
|
85 |
|
86 |
|
87 |
def query(page: int, page_size: int, rg_offsets: np.ndarray, rg_readers: RowGroupReaders) -> pd.DataFrame:
|
88 |
-
start_row, end_row = (page - 1) * page_size, page * page_size
|
89 |
-
start_rg
|
90 |
-
|
91 |
-
|
92 |
pa_table = pa.concat_tables([rg_readers[i]() for i in range(start_rg, end_rg + 1)])
|
93 |
-
offset = start_row - rg_offsets[start_rg - 1] if start_rg else
|
94 |
-
pa_table = pa_table.slice(offset,
|
95 |
return pa_table.to_pandas()
|
96 |
|
97 |
|
@@ -99,6 +100,8 @@ def query(page: int, page_size: int, rg_offsets: np.ndarray, rg_readers: RowGrou
|
|
99 |
def get_page(dataset: str, config: str, split: str, page: str) -> Tuple[str, int, str]:
|
100 |
dataset, config, split, page = sanitize_inputs(dataset, config, split, page)
|
101 |
rg_offsets, rg_readers, max_page, info = index(dataset, config, split)
|
|
|
|
|
102 |
df = query(page, PAGE_SIZE, rg_offsets=rg_offsets, rg_readers=rg_readers)
|
103 |
buf = StringIO()
|
104 |
df.to_json(buf, lines=True, orient="records")
|
|
|
74 |
raise AppError(f"Invalid config {config}. Available configs are: {', '.join(get_parquet_configs(dataset))}.")
|
75 |
else:
|
76 |
raise AppError(f"Invalid split {split}. Available splits are: {', '.join(get_parquet_splits(dataset, config))}.")
|
77 |
+
desc = f"{dataset}/{config}/{split}"
|
78 |
+
all_pf: List[pq.ParquetFile] = thread_map(partial(pq.ParquetFile, filesystem=fs), sources, desc=desc, unit="pq")
|
79 |
features = Features.from_arrow_schema(all_pf[0].schema.to_arrow_schema())
|
80 |
columns = [col for col in features if all(bad_type not in str(features[col]) for bad_type in ["Image(", "Audio(", "'binary'"])]
|
81 |
info = "" if len(columns) == len(features) else f"Some columns are not supported yet: {sorted(set(features) - set(columns))}"
|
82 |
rg_offsets = np.cumsum([pf.metadata.row_group(i).num_rows for pf in all_pf for i in range(pf.metadata.num_row_groups)])
|
83 |
rg_readers = [partial(pf.read_row_group, i, columns=columns) for pf in all_pf for i in range(pf.metadata.num_row_groups)]
|
84 |
+
max_page = 1 + (rg_offsets[-1] - 1) // PAGE_SIZE
|
85 |
return rg_offsets, rg_readers, max_page, info
|
86 |
|
87 |
|
88 |
def query(page: int, page_size: int, rg_offsets: np.ndarray, rg_readers: RowGroupReaders) -> pd.DataFrame:
|
89 |
+
start_row, end_row = (page - 1) * page_size, min(page * page_size, rg_offsets[-1] - 1) # both included
|
90 |
+
# rg_offsets[start_rg - 1] <= start_row < rg_offsets[start_rg]
|
91 |
+
# rg_offsets[end_rg - 1] <= end_row < rg_offsets[end_rg]
|
92 |
+
start_rg, end_rg = np.searchsorted(rg_offsets, [start_row, end_row], side="right") # both included
|
93 |
pa_table = pa.concat_tables([rg_readers[i]() for i in range(start_rg, end_rg + 1)])
|
94 |
+
offset = start_row - (rg_offsets[start_rg - 1] if start_rg > 0 else 0)
|
95 |
+
pa_table = pa_table.slice(offset, page_size)
|
96 |
return pa_table.to_pandas()
|
97 |
|
98 |
|
|
|
100 |
def get_page(dataset: str, config: str, split: str, page: str) -> Tuple[str, int, str]:
|
101 |
dataset, config, split, page = sanitize_inputs(dataset, config, split, page)
|
102 |
rg_offsets, rg_readers, max_page, info = index(dataset, config, split)
|
103 |
+
if page > max_page:
|
104 |
+
raise AppError(f"Page {page} does not exist")
|
105 |
df = query(page, PAGE_SIZE, rg_offsets=rg_offsets, rg_readers=rg_readers)
|
106 |
buf = StringIO()
|
107 |
df.to_json(buf, lines=True, orient="records")
|