From e88eb00b6c383e593b0e1bfb63d72655602ba693 Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Fri, 4 Feb 2022 10:05:40 +0200 Subject: [PATCH 1/3] Read Parquet Dataset Essay --- essays/read_parquet_dataset.py | 57 ++++++++++++++++++++++++++++++++++ 1 file changed, 57 insertions(+) create mode 100644 essays/read_parquet_dataset.py diff --git a/essays/read_parquet_dataset.py b/essays/read_parquet_dataset.py new file mode 100644 index 00000000..5e03c762 --- /dev/null +++ b/essays/read_parquet_dataset.py @@ -0,0 +1,57 @@ +import argparse +from pathlib import Path +from pprint import pprint + +import dask.array as da +import numpy as np + +import pyarrow as pa +import pyarrow.parquet as pq + +from daskms.patterns import LazyProxy + +def create_parser(): + p = argparse.ArgumentParser() + p.add_argument("ms") + return p + +def read_column(file_proxy, column): + chunks = file_proxy[0].read(columns=[column]).column(column).chunks + assert len(chunks) == 1 + zero_copy = chunks[0].type not in (pa.string(), pa.bool_()) + return chunks[0].to_numpy(zero_copy_only=zero_copy) + +def column(path, column): + fragments = list(sorted(path.rglob("*.parquet"))) + proxies = np.asarray([LazyProxy(pq.ParquetFile, f) for f in fragments]) + # NOTE: instantiates ParquetFile's in the graph construction process + # for purposes of reading metadata + rows = np.asarray([p.metadata.num_rows for p in proxies]) + + # Get the table schema from the first file, this should + # be the same for all files + schema = proxies[0].metadata.schema.to_arrow_schema() + fields = {n: schema.field(n) for n in schema.names} + + try: + field = fields[column] + except KeyError: + raise ValueError(f"Parquet dataset has no column {column}") + + + dask_proxies = da.from_array(proxies, chunks=1) + dask_rows = da.from_array(rows, chunks=1) + + + data = da.blockwise(read_column, ("row",), + dask_proxies, ("row",), + column, None, + adjust_chunks={"row": tuple(rows.tolist())}, + meta=np.empty((0,), dtype=field.type.to_pandas_dtype())) + + return data + +if __name__ == "__main__": + args = create_parser().parse_args() + data = column(Path(args.ms), "TIME") + print(data.compute()) From 7f618c47827f91d5f97c3ed89b5bf3c8db45e7e6 Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Fri, 4 Feb 2022 10:18:15 +0200 Subject: [PATCH 2/3] WIP --- essays/read_parquet_dataset.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/essays/read_parquet_dataset.py b/essays/read_parquet_dataset.py index 5e03c762..c4323546 100644 --- a/essays/read_parquet_dataset.py +++ b/essays/read_parquet_dataset.py @@ -9,10 +9,12 @@ import pyarrow.parquet as pq from daskms.patterns import LazyProxy +import daskms.experimental.arrow.extension_types def create_parser(): p = argparse.ArgumentParser() p.add_argument("ms") + p.add_argument("-c", "--column", default="TIME") return p def read_column(file_proxy, column): @@ -53,5 +55,5 @@ def column(path, column): if __name__ == "__main__": args = create_parser().parse_args() - data = column(Path(args.ms), "TIME") - print(data.compute()) + data = column(Path(args.ms), args.column) + print(data, data.compute()) From 8a3d4bbb2121c8084b57c81b5012e83911a71262 Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Fri, 4 Feb 2022 12:21:45 +0200 Subject: [PATCH 3/3] Pushdown predicates and Tensor reading --- essays/read_parquet_dataset.py | 43 +++++++++++++++++++++++++--------- 1 file changed, 32 insertions(+), 11 deletions(-) diff --git a/essays/read_parquet_dataset.py b/essays/read_parquet_dataset.py index c4323546..acb6d1ca 100644 --- a/essays/read_parquet_dataset.py +++ b/essays/read_parquet_dataset.py @@ -1,4 +1,5 @@ import argparse +import json from pathlib import Path from pprint import pprint @@ -10,6 +11,7 @@ from daskms.patterns import LazyProxy import daskms.experimental.arrow.extension_types +from daskms.experimental.arrow.arrow_schema import DASKMS_METADATA def create_parser(): p = argparse.ArgumentParser() @@ -17,22 +19,32 @@ def create_parser(): p.add_argument("-c", "--column", default="TIME") return p -def read_column(file_proxy, column): - chunks = file_proxy[0].read(columns=[column]).column(column).chunks +def read_column(table_proxy, column): + arrow_col = table_proxy.item().column(column) + chunks = arrow_col.chunks + + if not chunks: + return np.array([]) + assert len(chunks) == 1 zero_copy = chunks[0].type not in (pa.string(), pa.bool_()) return chunks[0].to_numpy(zero_copy_only=zero_copy) def column(path, column): fragments = list(sorted(path.rglob("*.parquet"))) - proxies = np.asarray([LazyProxy(pq.ParquetFile, f) for f in fragments]) - # NOTE: instantiates ParquetFile's in the graph construction process + proxies = np.asarray([LazyProxy(pq.read_table, f, filters=[ + [("ANTENNA1", "<=", 3), + ("ANTENNA2", ">=", 5)]]) for f in fragments]) + # NOTE: instantiates Tables's in the graph construction process # for purposes of reading metadata - rows = np.asarray([p.metadata.num_rows for p in proxies]) + rows = np.asarray([p.num_rows for p in proxies]) # Get the table schema from the first file, this should # be the same for all files - schema = proxies[0].metadata.schema.to_arrow_schema() + schema = proxies[0].schema + with open("schema.txt", "w") as f: + f.write(str(schema)) + fields = {n: schema.field(n) for n in schema.names} try: @@ -40,20 +52,29 @@ def column(path, column): except KeyError: raise ValueError(f"Parquet dataset has no column {column}") + field_metadata = field.metadata[DASKMS_METADATA.encode()] + field_metadata = json.loads(field_metadata) + dims = tuple(field_metadata["dims"]) + shape = (rows,) + field.type.shape - dask_proxies = da.from_array(proxies, chunks=1) - dask_rows = da.from_array(rows, chunks=1) + assert len(shape) == len(dims) + meta = np.empty((0,)*len(dims), field.type.to_pandas_dtype()) + new_axes = {d: s for d, s in zip(dims[1:], shape[1:])} + dask_proxies = da.from_array(proxies, chunks=1) + # dask_rows = da.from_array(rows, chunks=1) - data = da.blockwise(read_column, ("row",), + data = da.blockwise(read_column, dims, dask_proxies, ("row",), column, None, + new_axes=new_axes, adjust_chunks={"row": tuple(rows.tolist())}, - meta=np.empty((0,), dtype=field.type.to_pandas_dtype())) + meta=meta) return data if __name__ == "__main__": args = create_parser().parse_args() data = column(Path(args.ms), args.column) - print(data, data.compute()) + print(data, data.compute(scheduler="sync")) + None \ No newline at end of file