Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable "index file" reads for catalog import #334

Merged
merged 6 commits into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 33 additions & 3 deletions docs/catalogs/arguments.rst
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ Reading input files

Catalog import reads through a list of files and converts them into a hipscatted catalog.


Which files?
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Expand Down Expand Up @@ -129,9 +128,10 @@ along to the map/reduce stages. We've provided reference implementations for
reading CSV, FITS, and Parquet input files, but you can subclass the reader
type to suit whatever input files you've got.

You only need to provide the ``file_reader`` argument if you are using a custom file reader
You only need to provide an object ``file_reader`` argument if you are using a custom file reader
or passing parameters to the file reader. For example you might use ``file_reader=CsvReader(sep="\s+")``
to parse a whitespace separated file.
to parse a whitespace separated file. Otherwise, you can use a short string to
specify an existing file reader type e.g. ``file_reader="csv"``.

You can find the full API documentation for
:py:class:`hipscat_import.catalog.file_readers.InputReader`
Expand Down Expand Up @@ -171,6 +171,36 @@ You can find the full API documentation for
If you're reading from cloud storage, or otherwise have some filesystem credential
dict, put those in ``input_storage_options``.

Indexed batching strategy
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

If you have many small files (think 400k+ CSV files with a few rows each), you
may benefit from "indexed" file readers. These allow you to explicitly create
batches for tasks by providing a set of index files, where each file is a
text file that contains only paths to data files.

Benefits:

1. If you have 400k+ input files, you don't want to create 400k+ dask tasks
to process these files.
2. If the files are very small, batching them in this way allows the import
process to *combine* several small files into a single chunk for processing.
This will result in fewer intermediate files during the ``splitting`` stage.
3. If you have a parquet files over a slow networked file system, we support
pyarrow's readahead protocol through indexed readers.

Warnings:

1. If you have 20 dask workers in your pool, you may be tempted to create
20 index files. This is not always an efficient use of resources!
You'd be better served by 200 index files, so that:

a. dask can spread the load if some lists of files take longer to process
than others
b. if the pipeline dies after successfully processing 15 lists, when you
retry the pipeline, you'll only be processing 5 lists with those same 20
workers and many workers will be sitting idle.

Which fields?
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Expand Down
129 changes: 127 additions & 2 deletions src/hipscat_import/catalog/file_readers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
import abc
from typing import Any, Dict, Union

import pandas as pd
import pyarrow
import pyarrow.dataset
import pyarrow.parquet as pq
from astropy.io import ascii as ascii_reader
from astropy.table import Table
Expand Down Expand Up @@ -30,8 +33,16 @@ def get_file_reader(
includes `.csv.gz` and other compressed csv files
- `fits`, flexible image transport system. often used for astropy tables.
- `parquet`, compressed columnar data format
- `ecsv`, astropy's enhanced CSV
- `indexed_csv`, "index" style reader, that accepts a file with a list
of csv files that are appended in-memory
- `indexed_parquet`, "index" style reader, that accepts a file with a list
of parquet files that are appended in-memory

chunksize (int): number of rows to read in a single iteration.
for single-file readers, large files are split into batches based on this value.
for index-style readers, we read files until we reach this chunksize and
create a single batch in-memory.
schema_file (str): path to a parquet schema file. if provided, header names
and column types will be pulled from the parquet schema metadata.
column_names (list[str]): for CSV files, the names of columns if no header
Expand Down Expand Up @@ -59,7 +70,16 @@ def get_file_reader(
)
if file_format == "parquet":
return ParquetReader(chunksize=chunksize, **kwargs)

if file_format == "indexed_csv":
return IndexedCsvReader(
chunksize=chunksize,
schema_file=schema_file,
column_names=column_names,
type_map=type_map,
**kwargs,
)
if file_format == "indexed_parquet":
return IndexedParquetReader(chunksize=chunksize, **kwargs)
raise NotImplementedError(f"File Format: {file_format} not supported")


Expand Down Expand Up @@ -96,14 +116,28 @@ def provenance_info(self) -> dict:
def regular_file_exists(self, input_file, storage_options: Union[Dict[Any, Any], None] = None, **_kwargs):
"""Check that the `input_file` points to a single regular file

Raises
Raises:
FileNotFoundError: if nothing exists at path, or directory found.
"""
if not file_io.does_file_or_directory_exist(input_file, storage_options=storage_options):
raise FileNotFoundError(f"File not found at path: {input_file}")
if not file_io.is_regular_file(input_file, storage_options=storage_options):
raise FileNotFoundError(f"Directory found at path - requires regular file: {input_file}")

def read_index_file(self, input_file, storage_options: Union[Dict[Any, Any], None] = None, **kwargs):
"""Read an "indexed" file.

This should contain a list of paths to files to be read and batched.

Raises:
FileNotFoundError: if nothing exists at path, or directory found.
"""
self.regular_file_exists(input_file, **kwargs)
file_names = file_io.load_text_file(input_file, storage_options=storage_options)
file_names = [f.strip() for f in file_names]
file_names = [f for f in file_names if f]
return file_names


class CsvReader(InputReader):
"""CSV reader for the most common CSV reading arguments.
Expand Down Expand Up @@ -180,6 +214,37 @@ def read(self, input_file, read_columns=None):
yield from reader


class IndexedCsvReader(CsvReader):
"""Reads an index file, containing paths to CSV files to be read and batched

See CsvReader for additional configuration for reading CSV files.
"""

def read(self, input_file, read_columns=None):
file_names = self.read_index_file(input_file=input_file, **self.kwargs)

batch_size = 0
batch_frames = []
delucchi-cmu marked this conversation as resolved.
Show resolved Hide resolved
for file in file_names:
for single_frame in super().read(file, read_columns=read_columns):
if batch_size + len(single_frame) >= self.chunksize:
# We've hit our chunksize, send the batch off to the task.
if len(batch_frames) == 0:
yield single_frame
delucchi-cmu marked this conversation as resolved.
Show resolved Hide resolved
batch_size = 0
else:
yield pd.concat(batch_frames, ignore_index=True)
batch_frames = []
batch_frames.append(single_frame)
batch_size = len(single_frame)
else:
batch_frames.append(single_frame)
batch_size += len(single_frame)

if len(batch_frames) > 0:
yield pd.concat(batch_frames, ignore_index=True)


class AstropyEcsvReader(InputReader):
"""Reads astropy ascii .ecsv files.

Expand Down Expand Up @@ -291,3 +356,63 @@ def read(self, input_file, read_columns=None):
batch_size=self.chunksize, columns=columns, use_pandas_metadata=True
):
yield smaller_table.to_pandas()


class IndexedParquetReader(InputReader):
"""Reads an index file, containing paths to parquet files to be read and batched

Attributes:
chunksize (int): maximum number of rows to process at once.
Large files will be processed in chunks. Small files will be concatenated.
Also passed to pyarrow.dataset.Dataset.to_batches as `batch_size`.
batch_readahead (int): number of batches to read ahead.
Passed to pyarrow.dataset.Dataset.to_batches.
fragment_readahead (int): number of fragments to read ahead.
Passed to pyarrow.dataset.Dataset.to_batches.
use_threads (bool): whether to use multiple threads for reading.
Passed to pyarrow.dataset.Dataset.to_batches.
column_names (list[str] or None): Names of columns to use from the input dataset.
If None, use all columns.
kwargs: additional arguments to pass along to InputReader.read_index_file.
"""

def __init__(
self,
chunksize=500_000,
batch_readahead=16,
fragment_readahead=4,
use_threads=True,
column_names=None,
**kwargs,
):
self.chunksize = chunksize
self.batch_readahead = batch_readahead
self.fragment_readahead = fragment_readahead
self.use_threads = use_threads
self.column_names = column_names
self.kwargs = kwargs

def read(self, input_file, read_columns=None):
columns = read_columns or self.column_names
file_names = self.read_index_file(input_file=input_file, **self.kwargs)
(_, input_dataset) = file_io.read_parquet_dataset(file_names, **self.kwargs)

batches, nrows = [], 0
for batch in input_dataset.to_batches(
batch_size=self.chunksize,
batch_readahead=self.batch_readahead,
fragment_readahead=self.fragment_readahead,
use_threads=self.use_threads,
columns=columns,
):
if nrows + batch.num_rows > self.chunksize:
# We've hit the chunksize so load to a DataFrame and yield.
# There should always be at least one batch in here since batch_size == self.chunksize above.
yield pyarrow.Table.from_batches(batches).to_pandas()
batches, nrows = [], 0

batches.append(batch)
nrows += batch.num_rows

if len(batches) > 0:
yield pyarrow.Table.from_batches(batches).to_pandas()
63 changes: 62 additions & 1 deletion tests/hipscat_import/catalog/test_file_readers.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,14 @@
import pytest
from hipscat.catalog.catalog import CatalogInfo

from hipscat_import.catalog.file_readers import CsvReader, FitsReader, ParquetReader, get_file_reader
from hipscat_import.catalog.file_readers import (
CsvReader,
FitsReader,
IndexedCsvReader,
IndexedParquetReader,
ParquetReader,
get_file_reader,
)


# pylint: disable=redefined-outer-name
Expand Down Expand Up @@ -233,6 +240,32 @@ def test_csv_reader_provenance_info(tmp_path, basic_catalog_info):
assert "SECRETS" not in data


def test_indexed_csv_reader(indexed_files_dir):
# Chunksize covers all the inputs.
total_chunks = 0
for frame in IndexedCsvReader(chunksize=10_000).read(indexed_files_dir / "csv_list_single.txt"):
total_chunks += 1
assert len(frame) == 131

assert total_chunks == 1

# Chunksize requires splitting into just a few batches.
total_chunks = 0
for frame in IndexedCsvReader(chunksize=60).read(indexed_files_dir / "csv_list_single.txt"):
total_chunks += 1
assert len(frame) < 60

assert total_chunks == 3

# Requesting a very small chunksize. This will split up reads on the CSV.
total_chunks = 0
for frame in IndexedCsvReader(chunksize=5).read(indexed_files_dir / "csv_list_single.txt"):
total_chunks += 1
assert len(frame) <= 5

assert total_chunks == 29


def test_parquet_reader(parquet_shards_shard_44_0):
"""Verify we can read the parquet file into a single data frame."""
total_chunks = 0
Expand All @@ -252,6 +285,34 @@ def test_parquet_reader_chunked(parquet_shards_shard_44_0):
assert total_chunks == 7


def test_indexed_parquet_reader(indexed_files_dir):
# Chunksize covers all the inputs.
total_chunks = 0
for frame in get_file_reader("indexed_parquet", chunksize=10_000).read(
indexed_files_dir / "parquet_list_single.txt"
):
total_chunks += 1
assert len(frame) == 131

assert total_chunks == 1

# Chunksize requires splitting into just a few batches.
total_chunks = 0
for frame in IndexedParquetReader(chunksize=60).read(indexed_files_dir / "parquet_list_single.txt"):
total_chunks += 1
assert len(frame) < 60

assert total_chunks == 3

# Requesting a very small chunksize. This will split up reads on the CSV.
total_chunks = 0
for frame in IndexedParquetReader(chunksize=5).read(indexed_files_dir / "parquet_list_single.txt"):
total_chunks += 1
assert len(frame) <= 5

assert total_chunks == 29


def test_parquet_reader_provenance_info(tmp_path, basic_catalog_info):
"""Test that we get some provenance info and it is parseable into JSON."""
reader = ParquetReader(chunksize=1)
Expand Down
47 changes: 47 additions & 0 deletions tests/hipscat_import/catalog/test_run_round_trip.py
Original file line number Diff line number Diff line change
Expand Up @@ -673,3 +673,50 @@ def test_gaia_ecsv(
assert schema.equals(expected_parquet_schema, check_metadata=False)
schema = pds.dataset(args.catalog_path, format="parquet").schema
assert schema.equals(expected_parquet_schema, check_metadata=False)


@pytest.mark.dask
def test_import_indexed_csv(
dask_client,
indexed_files_dir,
tmp_path,
):
"""Use indexed-style CSV reads. There are two index files, and we expect
to have two batches worth of intermediate files."""
temp = tmp_path / "intermediate_files"
os.makedirs(temp)

args = ImportArguments(
output_artifact_name="indexed_csv",
input_file_list=[
indexed_files_dir / "csv_list_double_1_of_2.txt",
indexed_files_dir / "csv_list_double_2_of_2.txt",
],
output_path=tmp_path,
file_reader="indexed_csv",
sort_columns="id",
tmp_dir=temp,
dask_tmp=temp,
highest_healpix_order=2,
delete_intermediate_parquet_files=False,
delete_resume_log_files=False,
pixel_threshold=3_000,
progress_bar=False,
)

runner.run(args, dask_client)

# Check that the catalog metadata file exists
catalog = Catalog.read_from_hipscat(args.catalog_path)
assert catalog.on_disk
assert catalog.catalog_path == args.catalog_path
assert len(catalog.get_healpix_pixels()) == 1

# Check that there are TWO intermediate parquet file (two index files).
assert_directory_contains(
temp / "indexed_csv" / "intermediate" / "order_0" / "dir_0" / "pixel_11",
[
"shard_split_0_0.parquet",
"shard_split_1_0.parquet",
],
)
5 changes: 5 additions & 0 deletions tests/hipscat_import/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ def formats_pandasindex(test_data_dir):
return test_data_dir / "test_formats" / "pandasindex.parquet"


@pytest.fixture
def indexed_files_dir(test_data_dir):
return test_data_dir / "indexed_files"


@pytest.fixture
def small_sky_parts_dir(test_data_dir):
return test_data_dir / "small_sky_parts"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
tests/hipscat_import/data/small_sky_parts/catalog_00_of_05.csv
tests/hipscat_import/data/small_sky_parts/catalog_01_of_05.csv

Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
tests/hipscat_import/data/small_sky_parts/catalog_02_of_05.csv
tests/hipscat_import/data/small_sky_parts/catalog_03_of_05.csv
tests/hipscat_import/data/small_sky_parts/catalog_04_of_05.csv
6 changes: 6 additions & 0 deletions tests/hipscat_import/data/indexed_files/csv_list_single.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
tests/hipscat_import/data/small_sky_parts/catalog_00_of_05.csv
tests/hipscat_import/data/small_sky_parts/catalog_01_of_05.csv
tests/hipscat_import/data/small_sky_parts/catalog_02_of_05.csv
tests/hipscat_import/data/small_sky_parts/catalog_03_of_05.csv
tests/hipscat_import/data/small_sky_parts/catalog_04_of_05.csv

Loading