From ce7a7f93f83731ba0319603641295dfe7b6cbdb3 Mon Sep 17 00:00:00 2001 From: Melissa DeLucchi Date: Fri, 24 May 2024 11:31:24 -0400 Subject: [PATCH 1/5] Indexed CSV reads. --- src/hipscat_import/catalog/file_readers.py | 98 ++++++++++++++++++- .../catalog/test_file_readers.py | 34 ++++++- .../catalog/test_run_round_trip.py | 47 +++++++++ tests/hipscat_import/conftest.py | 8 +- .../indexed_files/csv_list_double_1_of_2.txt | 3 + .../indexed_files/csv_list_double_2_of_2.txt | 3 + .../data/indexed_files/csv_list_single.txt | 6 ++ 7 files changed, 195 insertions(+), 4 deletions(-) create mode 100644 tests/hipscat_import/data/indexed_files/csv_list_double_1_of_2.txt create mode 100644 tests/hipscat_import/data/indexed_files/csv_list_double_2_of_2.txt create mode 100644 tests/hipscat_import/data/indexed_files/csv_list_single.txt diff --git a/src/hipscat_import/catalog/file_readers.py b/src/hipscat_import/catalog/file_readers.py index 9119f630..7de46930 100644 --- a/src/hipscat_import/catalog/file_readers.py +++ b/src/hipscat_import/catalog/file_readers.py @@ -3,6 +3,7 @@ import abc from typing import Any, Dict, Union +import pandas as pd import pyarrow.parquet as pq from astropy.io import ascii as ascii_reader from astropy.table import Table @@ -30,8 +31,16 @@ def get_file_reader( includes `.csv.gz` and other compressed csv files - `fits`, flexible image transport system. often used for astropy tables. - `parquet`, compressed columnar data format + - `ecsv`, astropy's enhanced CSV + - `indexed_csv`, "index" style reader, that accepts a file with a list + of csv files that are appended in-memory + - `indexed_parquet`, "index" style reader, that accepts a file with a list + of parquet files that are appended in-memory chunksize (int): number of rows to read in a single iteration. + for single-file readers, large files are split into batches based on this value. + for index-style readers, we read files until we reach this chunksize and + create a single batch in-memory. schema_file (str): path to a parquet schema file. if provided, header names and column types will be pulled from the parquet schema metadata. column_names (list[str]): for CSV files, the names of columns if no header @@ -59,7 +68,16 @@ def get_file_reader( ) if file_format == "parquet": return ParquetReader(chunksize=chunksize, **kwargs) - + if file_format == "indexed_csv": + return IndexedCsvReader( + chunksize=chunksize, + schema_file=schema_file, + column_names=column_names, + type_map=type_map, + **kwargs, + ) + if file_format == "indexed_parquet": + return IndexedParquetReader(chunksize=chunksize, **kwargs) raise NotImplementedError(f"File Format: {file_format} not supported") @@ -89,7 +107,7 @@ def provenance_info(self) -> dict: def regular_file_exists(self, input_file, storage_options: Union[Dict[Any, Any], None] = None, **_kwargs): """Check that the `input_file` points to a single regular file - Raises + Raises: FileNotFoundError: if nothing exists at path, or directory found. """ if not file_io.does_file_or_directory_exist(input_file, storage_options=storage_options): @@ -97,6 +115,20 @@ def regular_file_exists(self, input_file, storage_options: Union[Dict[Any, Any], if not file_io.is_regular_file(input_file, storage_options=storage_options): raise FileNotFoundError(f"Directory found at path - requires regular file: {input_file}") + def read_index_file(self, input_file, storage_options: Union[Dict[Any, Any], None] = None, **kwargs): + """Read an "indexed" file. + + This should contain a list of paths to files to be read and batched. + + Raises: + FileNotFoundError: if nothing exists at path, or directory found. + """ + self.regular_file_exists(input_file, **kwargs) + file_names = file_io.load_text_file(input_file, storage_options=storage_options) + file_names = [f.strip() for f in file_names] + file_names = [f for f in file_names if f] + return file_names + class CsvReader(InputReader): """CSV reader for the most common CSV reading arguments. @@ -173,6 +205,37 @@ def read(self, input_file, read_columns=None): yield from reader +class IndexedCsvReader(CsvReader): + """Reads an index file, containing paths to CSV files to be read and batched + + See CsvReader for additional configuration for reading CSV files. + """ + + def read(self, input_file, read_columns=None): + file_names = self.read_index_file(input_file=input_file, **self.kwargs) + + batch_size = 0 + batch_frames = [] + for file in file_names: + for single_frame in super().read(file, read_columns=read_columns): + if batch_size + len(single_frame) >= self.chunksize: + # We've hit our chunksize, send the batch off to the task. + if len(batch_frames) == 0: + yield single_frame + batch_size = 0 + else: + yield pd.concat(batch_frames, ignore_index=True) + batch_frames = [] + batch_frames.append(single_frame) + batch_size = len(single_frame) + else: + batch_frames.append(single_frame) + batch_size += len(single_frame) + + if len(batch_frames) > 0: + yield pd.concat(batch_frames, ignore_index=True) + + class AstropyEcsvReader(InputReader): """Reads astropy ascii .ecsv files. @@ -284,3 +347,34 @@ def read(self, input_file, read_columns=None): batch_size=self.chunksize, columns=columns, use_pandas_metadata=True ): yield smaller_table.to_pandas() + + +class IndexedParquetReader(ParquetReader): + """Reads an index file, containing paths to parquet files to be read and batched + + See ParquetReader for additional configuration for reading parquet files. + """ + + def read(self, input_file, read_columns=None): + file_names = self.read_index_file(input_file=input_file, **self.kwargs) + + batch_size = 0 + batch_frames = [] + for file in file_names: + for single_frame in super().read(file, read_columns=read_columns): + if batch_size + len(single_frame) >= self.chunksize: + # We've hit our chunksize, send the batch off to the task. + if len(batch_frames) == 0: + yield single_frame + batch_size = 0 + else: + yield pd.concat(batch_frames, ignore_index=True) + batch_frames = [] + batch_frames.append(single_frame) + batch_size = len(single_frame) + else: + batch_frames.append(single_frame) + batch_size += len(single_frame) + + if len(batch_frames) > 0: + yield pd.concat(batch_frames, ignore_index=True) diff --git a/tests/hipscat_import/catalog/test_file_readers.py b/tests/hipscat_import/catalog/test_file_readers.py index c7f76176..7d62684f 100644 --- a/tests/hipscat_import/catalog/test_file_readers.py +++ b/tests/hipscat_import/catalog/test_file_readers.py @@ -10,7 +10,13 @@ import pytest from hipscat.catalog.catalog import CatalogInfo -from hipscat_import.catalog.file_readers import CsvReader, FitsReader, ParquetReader, get_file_reader +from hipscat_import.catalog.file_readers import ( + CsvReader, + FitsReader, + IndexedCsvReader, + ParquetReader, + get_file_reader, +) # pylint: disable=redefined-outer-name @@ -227,6 +233,32 @@ def test_csv_reader_provenance_info(tmp_path, basic_catalog_info): io.write_provenance_info(catalog_base_dir, basic_catalog_info, provenance_info) +def test_indexed_csv_reader(indexed_files_dir): + # Chunksize covers all the inputs. + total_chunks = 0 + for frame in IndexedCsvReader(chunksize=10_000).read(indexed_files_dir / "csv_list_single.txt"): + total_chunks += 1 + assert len(frame) == 131 + + assert total_chunks == 1 + + # Chunksize requires splitting into just a few batches. + total_chunks = 0 + for frame in IndexedCsvReader(chunksize=60).read(indexed_files_dir / "csv_list_single.txt"): + total_chunks += 1 + assert len(frame) < 60 + + assert total_chunks == 3 + + # Requesting a very small chunksize. This will split up reads on the CSV. + total_chunks = 0 + for frame in IndexedCsvReader(chunksize=5).read(indexed_files_dir / "csv_list_single.txt"): + total_chunks += 1 + assert len(frame) <= 5 + + assert total_chunks == 29 + + def test_parquet_reader(parquet_shards_shard_44_0): """Verify we can read the parquet file into a single data frame.""" total_chunks = 0 diff --git a/tests/hipscat_import/catalog/test_run_round_trip.py b/tests/hipscat_import/catalog/test_run_round_trip.py index 11e8aaf7..e0267fe5 100644 --- a/tests/hipscat_import/catalog/test_run_round_trip.py +++ b/tests/hipscat_import/catalog/test_run_round_trip.py @@ -670,3 +670,50 @@ def test_gaia_ecsv( assert schema.equals(expected_parquet_schema, check_metadata=False) schema = pds.dataset(args.catalog_path, format="parquet").schema assert schema.equals(expected_parquet_schema, check_metadata=False) + + +@pytest.mark.dask +def test_import_indexed_csv( + dask_client, + indexed_files_dir, + tmp_path, +): + """Use indexed-style CSV reads. There are two index files, and we expect + to have two batches worth of intermediate files.""" + temp = tmp_path / "intermediate_files" + os.makedirs(temp) + + args = ImportArguments( + output_artifact_name="indexed_csv", + input_file_list=[ + indexed_files_dir / "csv_list_double_1_of_2.txt", + indexed_files_dir / "csv_list_double_2_of_2.txt", + ], + output_path=tmp_path, + file_reader="indexed_csv", + sort_columns="id", + tmp_dir=temp, + dask_tmp=temp, + highest_healpix_order=2, + delete_intermediate_parquet_files=False, + delete_resume_log_files=False, + pixel_threshold=3_000, + progress_bar=False, + ) + + runner.run(args, dask_client) + + # Check that the catalog metadata file exists + catalog = Catalog.read_from_hipscat(args.catalog_path) + assert catalog.on_disk + assert catalog.catalog_path == args.catalog_path + assert len(catalog.get_healpix_pixels()) == 1 + + # Check that there are TWO intermediate parquet file (two index files). + assert_directory_contains( + temp / "indexed_csv" / "intermediate" / "order_0" / "dir_0" / "pixel_11", + [ + "shard_split_0_0.parquet", + "shard_split_1_0.parquet", + ], + ) diff --git a/tests/hipscat_import/conftest.py b/tests/hipscat_import/conftest.py index 303144bc..00959c04 100644 --- a/tests/hipscat_import/conftest.py +++ b/tests/hipscat_import/conftest.py @@ -2,6 +2,7 @@ import os import re +from pathlib import Path import healpy as hp import numpy as np @@ -52,7 +53,7 @@ def test_long_running(): @pytest.fixture def test_data_dir(): - return os.path.join(TEST_DIR, "data") + return Path(TEST_DIR) / "data" @pytest.fixture @@ -120,6 +121,11 @@ def formats_pandasindex(test_data_dir): return os.path.join(test_data_dir, "test_formats", "pandasindex.parquet") +@pytest.fixture +def indexed_files_dir(test_data_dir): + return test_data_dir / "indexed_files" + + @pytest.fixture def small_sky_parts_dir(test_data_dir): return os.path.join(test_data_dir, "small_sky_parts") diff --git a/tests/hipscat_import/data/indexed_files/csv_list_double_1_of_2.txt b/tests/hipscat_import/data/indexed_files/csv_list_double_1_of_2.txt new file mode 100644 index 00000000..8e9c9d54 --- /dev/null +++ b/tests/hipscat_import/data/indexed_files/csv_list_double_1_of_2.txt @@ -0,0 +1,3 @@ +tests/hipscat_import/data/small_sky_parts/catalog_00_of_05.csv +tests/hipscat_import/data/small_sky_parts/catalog_01_of_05.csv + diff --git a/tests/hipscat_import/data/indexed_files/csv_list_double_2_of_2.txt b/tests/hipscat_import/data/indexed_files/csv_list_double_2_of_2.txt new file mode 100644 index 00000000..352c08ea --- /dev/null +++ b/tests/hipscat_import/data/indexed_files/csv_list_double_2_of_2.txt @@ -0,0 +1,3 @@ +tests/hipscat_import/data/small_sky_parts/catalog_02_of_05.csv +tests/hipscat_import/data/small_sky_parts/catalog_03_of_05.csv +tests/hipscat_import/data/small_sky_parts/catalog_04_of_05.csv \ No newline at end of file diff --git a/tests/hipscat_import/data/indexed_files/csv_list_single.txt b/tests/hipscat_import/data/indexed_files/csv_list_single.txt new file mode 100644 index 00000000..04817f83 --- /dev/null +++ b/tests/hipscat_import/data/indexed_files/csv_list_single.txt @@ -0,0 +1,6 @@ +tests/hipscat_import/data/small_sky_parts/catalog_00_of_05.csv +tests/hipscat_import/data/small_sky_parts/catalog_01_of_05.csv +tests/hipscat_import/data/small_sky_parts/catalog_02_of_05.csv +tests/hipscat_import/data/small_sky_parts/catalog_03_of_05.csv +tests/hipscat_import/data/small_sky_parts/catalog_04_of_05.csv + From 37ed8571d08cd8a25972e360eeecbf39078ce411 Mon Sep 17 00:00:00 2001 From: Melissa DeLucchi Date: Fri, 24 May 2024 11:37:45 -0400 Subject: [PATCH 2/5] Indexed parquet reads. --- .../catalog/test_file_readers.py | 29 +++++++++++++++++++ .../indexed_files/parquet_list_single.txt | 5 ++++ 2 files changed, 34 insertions(+) create mode 100644 tests/hipscat_import/data/indexed_files/parquet_list_single.txt diff --git a/tests/hipscat_import/catalog/test_file_readers.py b/tests/hipscat_import/catalog/test_file_readers.py index 7d62684f..0a1ef19f 100644 --- a/tests/hipscat_import/catalog/test_file_readers.py +++ b/tests/hipscat_import/catalog/test_file_readers.py @@ -14,6 +14,7 @@ CsvReader, FitsReader, IndexedCsvReader, + IndexedParquetReader, ParquetReader, get_file_reader, ) @@ -278,6 +279,34 @@ def test_parquet_reader_chunked(parquet_shards_shard_44_0): assert total_chunks == 7 +def test_indexed_parquet_reader(indexed_files_dir): + # Chunksize covers all the inputs. + total_chunks = 0 + for frame in get_file_reader("indexed_parquet", chunksize=10_000).read( + indexed_files_dir / "parquet_list_single.txt" + ): + total_chunks += 1 + assert len(frame) == 131 + + assert total_chunks == 1 + + # Chunksize requires splitting into just a few batches. + total_chunks = 0 + for frame in IndexedParquetReader(chunksize=60).read(indexed_files_dir / "parquet_list_single.txt"): + total_chunks += 1 + assert len(frame) < 60 + + assert total_chunks == 3 + + # Requesting a very small chunksize. This will split up reads on the CSV. + total_chunks = 0 + for frame in IndexedParquetReader(chunksize=5).read(indexed_files_dir / "parquet_list_single.txt"): + total_chunks += 1 + assert len(frame) <= 5 + + assert total_chunks == 29 + + def test_parquet_reader_provenance_info(tmp_path, basic_catalog_info): """Test that we get some provenance info and it is parseable into JSON.""" reader = ParquetReader(chunksize=1) diff --git a/tests/hipscat_import/data/indexed_files/parquet_list_single.txt b/tests/hipscat_import/data/indexed_files/parquet_list_single.txt new file mode 100644 index 00000000..63e5b84f --- /dev/null +++ b/tests/hipscat_import/data/indexed_files/parquet_list_single.txt @@ -0,0 +1,5 @@ +tests/hipscat_import/data/parquet_shards/order_0/dir_0/pixel_11/shard_0_0.parquet +tests/hipscat_import/data/parquet_shards/order_0/dir_0/pixel_11/shard_1_0.parquet +tests/hipscat_import/data/parquet_shards/order_0/dir_0/pixel_11/shard_2_0.parquet +tests/hipscat_import/data/parquet_shards/order_0/dir_0/pixel_11/shard_3_0.parquet +tests/hipscat_import/data/parquet_shards/order_0/dir_0/pixel_11/shard_4_0.parquet From c4b6732e5da6514abd28bc5c477261eab161a5a8 Mon Sep 17 00:00:00 2001 From: Troy Raen Date: Sat, 8 Jun 2024 00:02:23 -0700 Subject: [PATCH 3/5] implement dataset.to_batches method in IndexedParquetReader --- src/hipscat_import/catalog/file_readers.py | 73 +++++++++++++++------- 1 file changed, 52 insertions(+), 21 deletions(-) diff --git a/src/hipscat_import/catalog/file_readers.py b/src/hipscat_import/catalog/file_readers.py index 7de46930..7bb3aafe 100644 --- a/src/hipscat_import/catalog/file_readers.py +++ b/src/hipscat_import/catalog/file_readers.py @@ -4,6 +4,8 @@ from typing import Any, Dict, Union import pandas as pd +import pyarrow +import pyarrow.dataset import pyarrow.parquet as pq from astropy.io import ascii as ascii_reader from astropy.table import Table @@ -349,32 +351,61 @@ def read(self, input_file, read_columns=None): yield smaller_table.to_pandas() -class IndexedParquetReader(ParquetReader): +class IndexedParquetReader(InputReader): """Reads an index file, containing paths to parquet files to be read and batched - See ParquetReader for additional configuration for reading parquet files. + Attributes: + chunksize (int): maximum number of rows to process at once. + Large files will be processed in chunks. Small files will be concatenated. + Also passed to pyarrow.dataset.Dataset.to_batches as `batch_size`. + batch_readahead (int): number of batches to read ahead. + Passed to pyarrow.dataset.Dataset.to_batches. + fragment_readahead (int): number of fragments to read ahead. + Passed to pyarrow.dataset.Dataset.to_batches. + use_threads (bool): whether to use multiple threads for reading. + Passed to pyarrow.dataset.Dataset.to_batches. + column_names (list[str] or None): Names of columns to use from the input dataset. + If None, use all columns. + kwargs: additional arguments to pass along to InputReader.read_index_file. """ + def __init__( + self, + chunksize=500_000, + batch_readahead=16, + fragment_readahead=4, + use_threads=True, + column_names=None, + **kwargs, + ): + self.chunksize = chunksize + self.batch_readahead = batch_readahead + self.fragment_readahead = fragment_readahead + self.use_threads = use_threads + self.column_names = column_names + self.kwargs = kwargs + def read(self, input_file, read_columns=None): + columns = read_columns or self.column_names file_names = self.read_index_file(input_file=input_file, **self.kwargs) + input_dataset = pyarrow.dataset.dataset(file_names) + + batches, nrows = [], 0 + for batch in input_dataset.to_batches( + batch_size=self.chunksize, + batch_readahead=self.batch_readahead, + fragment_readahead=self.fragment_readahead, + use_threads=self.use_threads, + columns=columns, + ): + if nrows + batch.num_rows > self.chunksize: + # We've hit the chunksize so load to a DataFrame and yield. + # There should always be at least one batch in here since batch_size == self.chunksize above. + yield pyarrow.Table.from_batches(batches).to_pandas() + batches, nrows = [], 0 - batch_size = 0 - batch_frames = [] - for file in file_names: - for single_frame in super().read(file, read_columns=read_columns): - if batch_size + len(single_frame) >= self.chunksize: - # We've hit our chunksize, send the batch off to the task. - if len(batch_frames) == 0: - yield single_frame - batch_size = 0 - else: - yield pd.concat(batch_frames, ignore_index=True) - batch_frames = [] - batch_frames.append(single_frame) - batch_size = len(single_frame) - else: - batch_frames.append(single_frame) - batch_size += len(single_frame) + batches.append(batch) + nrows += batch.num_rows - if len(batch_frames) > 0: - yield pd.concat(batch_frames, ignore_index=True) + if len(batches) > 0: + yield pyarrow.Table.from_batches(batches).to_pandas() From db62e1e9cad6de8a13de825320da7b2aeab5bb12 Mon Sep 17 00:00:00 2001 From: Melissa DeLucchi Date: Mon, 10 Jun 2024 14:53:12 -0400 Subject: [PATCH 4/5] Use file_io dataset read for cloud URIs --- src/hipscat_import/catalog/file_readers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/hipscat_import/catalog/file_readers.py b/src/hipscat_import/catalog/file_readers.py index 7bb3aafe..43d50be6 100644 --- a/src/hipscat_import/catalog/file_readers.py +++ b/src/hipscat_import/catalog/file_readers.py @@ -388,7 +388,7 @@ def __init__( def read(self, input_file, read_columns=None): columns = read_columns or self.column_names file_names = self.read_index_file(input_file=input_file, **self.kwargs) - input_dataset = pyarrow.dataset.dataset(file_names) + (_, input_dataset) = file_io.read_parquet_dataset(file_names, **self.kwargs) batches, nrows = [], 0 for batch in input_dataset.to_batches( From c45a6dc492af6baff00c5e015e68dfdc8e66d5b8 Mon Sep 17 00:00:00 2001 From: Melissa DeLucchi Date: Wed, 12 Jun 2024 13:25:42 -0400 Subject: [PATCH 5/5] Add documentation on index batching --- docs/catalogs/arguments.rst | 36 +++++++++++++++++++++++++++++++++--- 1 file changed, 33 insertions(+), 3 deletions(-) diff --git a/docs/catalogs/arguments.rst b/docs/catalogs/arguments.rst index ca86302b..86e378f8 100644 --- a/docs/catalogs/arguments.rst +++ b/docs/catalogs/arguments.rst @@ -97,7 +97,6 @@ Reading input files Catalog import reads through a list of files and converts them into a hipscatted catalog. - Which files? ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ @@ -129,9 +128,10 @@ along to the map/reduce stages. We've provided reference implementations for reading CSV, FITS, and Parquet input files, but you can subclass the reader type to suit whatever input files you've got. -You only need to provide the ``file_reader`` argument if you are using a custom file reader +You only need to provide an object ``file_reader`` argument if you are using a custom file reader or passing parameters to the file reader. For example you might use ``file_reader=CsvReader(sep="\s+")`` -to parse a whitespace separated file. +to parse a whitespace separated file. Otherwise, you can use a short string to +specify an existing file reader type e.g. ``file_reader="csv"``. You can find the full API documentation for :py:class:`hipscat_import.catalog.file_readers.InputReader` @@ -171,6 +171,36 @@ You can find the full API documentation for If you're reading from cloud storage, or otherwise have some filesystem credential dict, put those in ``input_storage_options``. +Indexed batching strategy +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +If you have many small files (think 400k+ CSV files with a few rows each), you +may benefit from "indexed" file readers. These allow you to explicitly create +batches for tasks by providing a set of index files, where each file is a +text file that contains only paths to data files. + +Benefits: + +1. If you have 400k+ input files, you don't want to create 400k+ dask tasks + to process these files. +2. If the files are very small, batching them in this way allows the import + process to *combine* several small files into a single chunk for processing. + This will result in fewer intermediate files during the ``splitting`` stage. +3. If you have a parquet files over a slow networked file system, we support + pyarrow's readahead protocol through indexed readers. + +Warnings: + +1. If you have 20 dask workers in your pool, you may be tempted to create + 20 index files. This is not always an efficient use of resources! + You'd be better served by 200 index files, so that: + + a. dask can spread the load if some lists of files take longer to process + than others + b. if the pipeline dies after successfully processing 15 lists, when you + retry the pipeline, you'll only be processing 5 lists with those same 20 + workers and many workers will be sitting idle. + Which fields? ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^