From 33d70e79516009e14dee3d60d02d6a7b08171e31 Mon Sep 17 00:00:00 2001 From: Melissa DeLucchi <113376043+delucchi-cmu@users.noreply.github.com> Date: Wed, 12 Jun 2024 13:54:42 -0400 Subject: [PATCH] Enable "index file" reads for catalog import (#334) * Indexed CSV reads. * Indexed parquet reads. * implement dataset.to_batches method in IndexedParquetReader * Use file_io dataset read for cloud URIs * Add documentation on index batching --------- Co-authored-by: Troy Raen --- docs/catalogs/arguments.rst | 36 ++++- src/hipscat_import/catalog/file_readers.py | 129 +++++++++++++++++- .../catalog/test_file_readers.py | 63 ++++++++- .../catalog/test_run_round_trip.py | 47 +++++++ tests/hipscat_import/conftest.py | 5 + .../indexed_files/csv_list_double_1_of_2.txt | 3 + .../indexed_files/csv_list_double_2_of_2.txt | 3 + .../data/indexed_files/csv_list_single.txt | 6 + .../indexed_files/parquet_list_single.txt | 5 + 9 files changed, 291 insertions(+), 6 deletions(-) create mode 100644 tests/hipscat_import/data/indexed_files/csv_list_double_1_of_2.txt create mode 100644 tests/hipscat_import/data/indexed_files/csv_list_double_2_of_2.txt create mode 100644 tests/hipscat_import/data/indexed_files/csv_list_single.txt create mode 100644 tests/hipscat_import/data/indexed_files/parquet_list_single.txt diff --git a/docs/catalogs/arguments.rst b/docs/catalogs/arguments.rst index ca86302b..86e378f8 100644 --- a/docs/catalogs/arguments.rst +++ b/docs/catalogs/arguments.rst @@ -97,7 +97,6 @@ Reading input files Catalog import reads through a list of files and converts them into a hipscatted catalog. - Which files? ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ @@ -129,9 +128,10 @@ along to the map/reduce stages. We've provided reference implementations for reading CSV, FITS, and Parquet input files, but you can subclass the reader type to suit whatever input files you've got. -You only need to provide the ``file_reader`` argument if you are using a custom file reader +You only need to provide an object ``file_reader`` argument if you are using a custom file reader or passing parameters to the file reader. For example you might use ``file_reader=CsvReader(sep="\s+")`` -to parse a whitespace separated file. +to parse a whitespace separated file. Otherwise, you can use a short string to +specify an existing file reader type e.g. ``file_reader="csv"``. You can find the full API documentation for :py:class:`hipscat_import.catalog.file_readers.InputReader` @@ -171,6 +171,36 @@ You can find the full API documentation for If you're reading from cloud storage, or otherwise have some filesystem credential dict, put those in ``input_storage_options``. +Indexed batching strategy +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +If you have many small files (think 400k+ CSV files with a few rows each), you +may benefit from "indexed" file readers. These allow you to explicitly create +batches for tasks by providing a set of index files, where each file is a +text file that contains only paths to data files. + +Benefits: + +1. If you have 400k+ input files, you don't want to create 400k+ dask tasks + to process these files. +2. If the files are very small, batching them in this way allows the import + process to *combine* several small files into a single chunk for processing. + This will result in fewer intermediate files during the ``splitting`` stage. +3. If you have a parquet files over a slow networked file system, we support + pyarrow's readahead protocol through indexed readers. + +Warnings: + +1. If you have 20 dask workers in your pool, you may be tempted to create + 20 index files. This is not always an efficient use of resources! + You'd be better served by 200 index files, so that: + + a. dask can spread the load if some lists of files take longer to process + than others + b. if the pipeline dies after successfully processing 15 lists, when you + retry the pipeline, you'll only be processing 5 lists with those same 20 + workers and many workers will be sitting idle. + Which fields? ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/src/hipscat_import/catalog/file_readers.py b/src/hipscat_import/catalog/file_readers.py index f3f27bf0..e0202ddb 100644 --- a/src/hipscat_import/catalog/file_readers.py +++ b/src/hipscat_import/catalog/file_readers.py @@ -3,6 +3,9 @@ import abc from typing import Any, Dict, Union +import pandas as pd +import pyarrow +import pyarrow.dataset import pyarrow.parquet as pq from astropy.io import ascii as ascii_reader from astropy.table import Table @@ -30,8 +33,16 @@ def get_file_reader( includes `.csv.gz` and other compressed csv files - `fits`, flexible image transport system. often used for astropy tables. - `parquet`, compressed columnar data format + - `ecsv`, astropy's enhanced CSV + - `indexed_csv`, "index" style reader, that accepts a file with a list + of csv files that are appended in-memory + - `indexed_parquet`, "index" style reader, that accepts a file with a list + of parquet files that are appended in-memory chunksize (int): number of rows to read in a single iteration. + for single-file readers, large files are split into batches based on this value. + for index-style readers, we read files until we reach this chunksize and + create a single batch in-memory. schema_file (str): path to a parquet schema file. if provided, header names and column types will be pulled from the parquet schema metadata. column_names (list[str]): for CSV files, the names of columns if no header @@ -59,7 +70,16 @@ def get_file_reader( ) if file_format == "parquet": return ParquetReader(chunksize=chunksize, **kwargs) - + if file_format == "indexed_csv": + return IndexedCsvReader( + chunksize=chunksize, + schema_file=schema_file, + column_names=column_names, + type_map=type_map, + **kwargs, + ) + if file_format == "indexed_parquet": + return IndexedParquetReader(chunksize=chunksize, **kwargs) raise NotImplementedError(f"File Format: {file_format} not supported") @@ -96,7 +116,7 @@ def provenance_info(self) -> dict: def regular_file_exists(self, input_file, storage_options: Union[Dict[Any, Any], None] = None, **_kwargs): """Check that the `input_file` points to a single regular file - Raises + Raises: FileNotFoundError: if nothing exists at path, or directory found. """ if not file_io.does_file_or_directory_exist(input_file, storage_options=storage_options): @@ -104,6 +124,20 @@ def regular_file_exists(self, input_file, storage_options: Union[Dict[Any, Any], if not file_io.is_regular_file(input_file, storage_options=storage_options): raise FileNotFoundError(f"Directory found at path - requires regular file: {input_file}") + def read_index_file(self, input_file, storage_options: Union[Dict[Any, Any], None] = None, **kwargs): + """Read an "indexed" file. + + This should contain a list of paths to files to be read and batched. + + Raises: + FileNotFoundError: if nothing exists at path, or directory found. + """ + self.regular_file_exists(input_file, **kwargs) + file_names = file_io.load_text_file(input_file, storage_options=storage_options) + file_names = [f.strip() for f in file_names] + file_names = [f for f in file_names if f] + return file_names + class CsvReader(InputReader): """CSV reader for the most common CSV reading arguments. @@ -180,6 +214,37 @@ def read(self, input_file, read_columns=None): yield from reader +class IndexedCsvReader(CsvReader): + """Reads an index file, containing paths to CSV files to be read and batched + + See CsvReader for additional configuration for reading CSV files. + """ + + def read(self, input_file, read_columns=None): + file_names = self.read_index_file(input_file=input_file, **self.kwargs) + + batch_size = 0 + batch_frames = [] + for file in file_names: + for single_frame in super().read(file, read_columns=read_columns): + if batch_size + len(single_frame) >= self.chunksize: + # We've hit our chunksize, send the batch off to the task. + if len(batch_frames) == 0: + yield single_frame + batch_size = 0 + else: + yield pd.concat(batch_frames, ignore_index=True) + batch_frames = [] + batch_frames.append(single_frame) + batch_size = len(single_frame) + else: + batch_frames.append(single_frame) + batch_size += len(single_frame) + + if len(batch_frames) > 0: + yield pd.concat(batch_frames, ignore_index=True) + + class AstropyEcsvReader(InputReader): """Reads astropy ascii .ecsv files. @@ -291,3 +356,63 @@ def read(self, input_file, read_columns=None): batch_size=self.chunksize, columns=columns, use_pandas_metadata=True ): yield smaller_table.to_pandas() + + +class IndexedParquetReader(InputReader): + """Reads an index file, containing paths to parquet files to be read and batched + + Attributes: + chunksize (int): maximum number of rows to process at once. + Large files will be processed in chunks. Small files will be concatenated. + Also passed to pyarrow.dataset.Dataset.to_batches as `batch_size`. + batch_readahead (int): number of batches to read ahead. + Passed to pyarrow.dataset.Dataset.to_batches. + fragment_readahead (int): number of fragments to read ahead. + Passed to pyarrow.dataset.Dataset.to_batches. + use_threads (bool): whether to use multiple threads for reading. + Passed to pyarrow.dataset.Dataset.to_batches. + column_names (list[str] or None): Names of columns to use from the input dataset. + If None, use all columns. + kwargs: additional arguments to pass along to InputReader.read_index_file. + """ + + def __init__( + self, + chunksize=500_000, + batch_readahead=16, + fragment_readahead=4, + use_threads=True, + column_names=None, + **kwargs, + ): + self.chunksize = chunksize + self.batch_readahead = batch_readahead + self.fragment_readahead = fragment_readahead + self.use_threads = use_threads + self.column_names = column_names + self.kwargs = kwargs + + def read(self, input_file, read_columns=None): + columns = read_columns or self.column_names + file_names = self.read_index_file(input_file=input_file, **self.kwargs) + (_, input_dataset) = file_io.read_parquet_dataset(file_names, **self.kwargs) + + batches, nrows = [], 0 + for batch in input_dataset.to_batches( + batch_size=self.chunksize, + batch_readahead=self.batch_readahead, + fragment_readahead=self.fragment_readahead, + use_threads=self.use_threads, + columns=columns, + ): + if nrows + batch.num_rows > self.chunksize: + # We've hit the chunksize so load to a DataFrame and yield. + # There should always be at least one batch in here since batch_size == self.chunksize above. + yield pyarrow.Table.from_batches(batches).to_pandas() + batches, nrows = [], 0 + + batches.append(batch) + nrows += batch.num_rows + + if len(batches) > 0: + yield pyarrow.Table.from_batches(batches).to_pandas() diff --git a/tests/hipscat_import/catalog/test_file_readers.py b/tests/hipscat_import/catalog/test_file_readers.py index fbd056cc..5438f750 100644 --- a/tests/hipscat_import/catalog/test_file_readers.py +++ b/tests/hipscat_import/catalog/test_file_readers.py @@ -8,7 +8,14 @@ import pytest from hipscat.catalog.catalog import CatalogInfo -from hipscat_import.catalog.file_readers import CsvReader, FitsReader, ParquetReader, get_file_reader +from hipscat_import.catalog.file_readers import ( + CsvReader, + FitsReader, + IndexedCsvReader, + IndexedParquetReader, + ParquetReader, + get_file_reader, +) # pylint: disable=redefined-outer-name @@ -233,6 +240,32 @@ def test_csv_reader_provenance_info(tmp_path, basic_catalog_info): assert "SECRETS" not in data +def test_indexed_csv_reader(indexed_files_dir): + # Chunksize covers all the inputs. + total_chunks = 0 + for frame in IndexedCsvReader(chunksize=10_000).read(indexed_files_dir / "csv_list_single.txt"): + total_chunks += 1 + assert len(frame) == 131 + + assert total_chunks == 1 + + # Chunksize requires splitting into just a few batches. + total_chunks = 0 + for frame in IndexedCsvReader(chunksize=60).read(indexed_files_dir / "csv_list_single.txt"): + total_chunks += 1 + assert len(frame) < 60 + + assert total_chunks == 3 + + # Requesting a very small chunksize. This will split up reads on the CSV. + total_chunks = 0 + for frame in IndexedCsvReader(chunksize=5).read(indexed_files_dir / "csv_list_single.txt"): + total_chunks += 1 + assert len(frame) <= 5 + + assert total_chunks == 29 + + def test_parquet_reader(parquet_shards_shard_44_0): """Verify we can read the parquet file into a single data frame.""" total_chunks = 0 @@ -252,6 +285,34 @@ def test_parquet_reader_chunked(parquet_shards_shard_44_0): assert total_chunks == 7 +def test_indexed_parquet_reader(indexed_files_dir): + # Chunksize covers all the inputs. + total_chunks = 0 + for frame in get_file_reader("indexed_parquet", chunksize=10_000).read( + indexed_files_dir / "parquet_list_single.txt" + ): + total_chunks += 1 + assert len(frame) == 131 + + assert total_chunks == 1 + + # Chunksize requires splitting into just a few batches. + total_chunks = 0 + for frame in IndexedParquetReader(chunksize=60).read(indexed_files_dir / "parquet_list_single.txt"): + total_chunks += 1 + assert len(frame) < 60 + + assert total_chunks == 3 + + # Requesting a very small chunksize. This will split up reads on the CSV. + total_chunks = 0 + for frame in IndexedParquetReader(chunksize=5).read(indexed_files_dir / "parquet_list_single.txt"): + total_chunks += 1 + assert len(frame) <= 5 + + assert total_chunks == 29 + + def test_parquet_reader_provenance_info(tmp_path, basic_catalog_info): """Test that we get some provenance info and it is parseable into JSON.""" reader = ParquetReader(chunksize=1) diff --git a/tests/hipscat_import/catalog/test_run_round_trip.py b/tests/hipscat_import/catalog/test_run_round_trip.py index d9862067..d80b3619 100644 --- a/tests/hipscat_import/catalog/test_run_round_trip.py +++ b/tests/hipscat_import/catalog/test_run_round_trip.py @@ -673,3 +673,50 @@ def test_gaia_ecsv( assert schema.equals(expected_parquet_schema, check_metadata=False) schema = pds.dataset(args.catalog_path, format="parquet").schema assert schema.equals(expected_parquet_schema, check_metadata=False) + + +@pytest.mark.dask +def test_import_indexed_csv( + dask_client, + indexed_files_dir, + tmp_path, +): + """Use indexed-style CSV reads. There are two index files, and we expect + to have two batches worth of intermediate files.""" + temp = tmp_path / "intermediate_files" + os.makedirs(temp) + + args = ImportArguments( + output_artifact_name="indexed_csv", + input_file_list=[ + indexed_files_dir / "csv_list_double_1_of_2.txt", + indexed_files_dir / "csv_list_double_2_of_2.txt", + ], + output_path=tmp_path, + file_reader="indexed_csv", + sort_columns="id", + tmp_dir=temp, + dask_tmp=temp, + highest_healpix_order=2, + delete_intermediate_parquet_files=False, + delete_resume_log_files=False, + pixel_threshold=3_000, + progress_bar=False, + ) + + runner.run(args, dask_client) + + # Check that the catalog metadata file exists + catalog = Catalog.read_from_hipscat(args.catalog_path) + assert catalog.on_disk + assert catalog.catalog_path == args.catalog_path + assert len(catalog.get_healpix_pixels()) == 1 + + # Check that there are TWO intermediate parquet file (two index files). + assert_directory_contains( + temp / "indexed_csv" / "intermediate" / "order_0" / "dir_0" / "pixel_11", + [ + "shard_split_0_0.parquet", + "shard_split_1_0.parquet", + ], + ) diff --git a/tests/hipscat_import/conftest.py b/tests/hipscat_import/conftest.py index 7ef2fc20..e16e9fab 100644 --- a/tests/hipscat_import/conftest.py +++ b/tests/hipscat_import/conftest.py @@ -121,6 +121,11 @@ def formats_pandasindex(test_data_dir): return test_data_dir / "test_formats" / "pandasindex.parquet" +@pytest.fixture +def indexed_files_dir(test_data_dir): + return test_data_dir / "indexed_files" + + @pytest.fixture def small_sky_parts_dir(test_data_dir): return test_data_dir / "small_sky_parts" diff --git a/tests/hipscat_import/data/indexed_files/csv_list_double_1_of_2.txt b/tests/hipscat_import/data/indexed_files/csv_list_double_1_of_2.txt new file mode 100644 index 00000000..8e9c9d54 --- /dev/null +++ b/tests/hipscat_import/data/indexed_files/csv_list_double_1_of_2.txt @@ -0,0 +1,3 @@ +tests/hipscat_import/data/small_sky_parts/catalog_00_of_05.csv +tests/hipscat_import/data/small_sky_parts/catalog_01_of_05.csv + diff --git a/tests/hipscat_import/data/indexed_files/csv_list_double_2_of_2.txt b/tests/hipscat_import/data/indexed_files/csv_list_double_2_of_2.txt new file mode 100644 index 00000000..352c08ea --- /dev/null +++ b/tests/hipscat_import/data/indexed_files/csv_list_double_2_of_2.txt @@ -0,0 +1,3 @@ +tests/hipscat_import/data/small_sky_parts/catalog_02_of_05.csv +tests/hipscat_import/data/small_sky_parts/catalog_03_of_05.csv +tests/hipscat_import/data/small_sky_parts/catalog_04_of_05.csv \ No newline at end of file diff --git a/tests/hipscat_import/data/indexed_files/csv_list_single.txt b/tests/hipscat_import/data/indexed_files/csv_list_single.txt new file mode 100644 index 00000000..04817f83 --- /dev/null +++ b/tests/hipscat_import/data/indexed_files/csv_list_single.txt @@ -0,0 +1,6 @@ +tests/hipscat_import/data/small_sky_parts/catalog_00_of_05.csv +tests/hipscat_import/data/small_sky_parts/catalog_01_of_05.csv +tests/hipscat_import/data/small_sky_parts/catalog_02_of_05.csv +tests/hipscat_import/data/small_sky_parts/catalog_03_of_05.csv +tests/hipscat_import/data/small_sky_parts/catalog_04_of_05.csv + diff --git a/tests/hipscat_import/data/indexed_files/parquet_list_single.txt b/tests/hipscat_import/data/indexed_files/parquet_list_single.txt new file mode 100644 index 00000000..63e5b84f --- /dev/null +++ b/tests/hipscat_import/data/indexed_files/parquet_list_single.txt @@ -0,0 +1,5 @@ +tests/hipscat_import/data/parquet_shards/order_0/dir_0/pixel_11/shard_0_0.parquet +tests/hipscat_import/data/parquet_shards/order_0/dir_0/pixel_11/shard_1_0.parquet +tests/hipscat_import/data/parquet_shards/order_0/dir_0/pixel_11/shard_2_0.parquet +tests/hipscat_import/data/parquet_shards/order_0/dir_0/pixel_11/shard_3_0.parquet +tests/hipscat_import/data/parquet_shards/order_0/dir_0/pixel_11/shard_4_0.parquet