Skip to content

Commit

Permalink
Apply read_columns kwarg in parquet reader (#315)
Browse files Browse the repository at this point in the history
* make ParquetReader.read use read_columns kwarg. add kwarg to init.

* add test_parquet_reader_columns
  • Loading branch information
troyraen authored May 20, 2024
1 parent 06b8379 commit 80991e9
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 2 deletions.
10 changes: 8 additions & 2 deletions src/hipscat_import/catalog/file_readers.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,18 +291,24 @@ class ParquetReader(InputReader):
chunksize (int): number of rows of the file to process at once.
For large files, this can prevent loading the entire file
into memory at once.
column_names (list[str] or None): Names of columns to use from the input dataset.
If None, use all columns.
kwargs: arguments to pass along to pyarrow.parquet.ParquetFile.
See https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetFile.html
"""

def __init__(self, chunksize=500_000, **kwargs):
def __init__(self, chunksize=500_000, column_names=None, **kwargs):
self.chunksize = chunksize
self.column_names = column_names
self.kwargs = kwargs

def read(self, input_file, read_columns=None):
self.regular_file_exists(input_file, **self.kwargs)
columns = read_columns or self.column_names
parquet_file = pq.ParquetFile(input_file, **self.kwargs)
for smaller_table in parquet_file.iter_batches(batch_size=self.chunksize, use_pandas_metadata=True):
for smaller_table in parquet_file.iter_batches(
batch_size=self.chunksize, columns=columns, use_pandas_metadata=True
):
yield smaller_table.to_pandas()

def provenance_info(self) -> dict:
Expand Down
13 changes: 13 additions & 0 deletions tests/hipscat_import/catalog/test_file_readers.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,19 @@ def test_parquet_reader_provenance_info(tmp_path, basic_catalog_info):
io.write_provenance_info(catalog_base_dir, basic_catalog_info, provenance_info)


def test_parquet_reader_columns(parquet_shards_shard_44_0):
"""Verify we can read a subset of columns."""
column_subset = ["id", "dec"]

# test column_names class property
for frame in ParquetReader(column_names=column_subset).read(parquet_shards_shard_44_0):
assert set(frame.columns) == set(column_subset)

# test read_columns kwarg
for frame in ParquetReader().read(parquet_shards_shard_44_0, read_columns=column_subset):
assert set(frame.columns) == set(column_subset)


def test_read_fits(formats_fits):
"""Success case - fits file that exists being read as fits"""
total_chunks = 0
Expand Down

0 comments on commit 80991e9

Please sign in to comment.