Skip to content

Commit

Permalink
Merge branch 'main' into issue/303/312
Browse files Browse the repository at this point in the history
  • Loading branch information
delucchi-cmu committed Jul 18, 2024
2 parents 4ac8906 + 72a7626 commit 7009664
Show file tree
Hide file tree
Showing 35 changed files with 837 additions and 408 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/add-issue-to-project-tracker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ jobs:
name: Add issue to project
runs-on: ubuntu-latest
steps:
- uses: actions/[email protected].1
- uses: actions/[email protected].2
with:
project-url: https://github.com/orgs/astronomy-commons/projects/7
github-token: ${{ secrets.ADD_TO_PROJECT_PAT }}
42 changes: 39 additions & 3 deletions docs/catalogs/arguments.rst
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ Reading input files

Catalog import reads through a list of files and converts them into a hipscatted catalog.


Which files?
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Expand Down Expand Up @@ -129,9 +128,10 @@ along to the map/reduce stages. We've provided reference implementations for
reading CSV, FITS, and Parquet input files, but you can subclass the reader
type to suit whatever input files you've got.

You only need to provide the ``file_reader`` argument if you are using a custom file reader
You only need to provide an object ``file_reader`` argument if you are using a custom file reader
or passing parameters to the file reader. For example you might use ``file_reader=CsvReader(sep="\s+")``
to parse a whitespace separated file.
to parse a whitespace separated file. Otherwise, you can use a short string to
specify an existing file reader type e.g. ``file_reader="csv"``.

You can find the full API documentation for
:py:class:`hipscat_import.catalog.file_readers.InputReader`
Expand Down Expand Up @@ -171,6 +171,36 @@ You can find the full API documentation for
If you're reading from cloud storage, or otherwise have some filesystem credential
dict, put those in ``input_storage_options``.

Indexed batching strategy
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

If you have many small files (think 400k+ CSV files with a few rows each), you
may benefit from "indexed" file readers. These allow you to explicitly create
batches for tasks by providing a set of index files, where each file is a
text file that contains only paths to data files.

Benefits:

1. If you have 400k+ input files, you don't want to create 400k+ dask tasks
to process these files.
2. If the files are very small, batching them in this way allows the import
process to *combine* several small files into a single chunk for processing.
This will result in fewer intermediate files during the ``splitting`` stage.
3. If you have a parquet files over a slow networked file system, we support
pyarrow's readahead protocol through indexed readers.

Warnings:

1. If you have 20 dask workers in your pool, you may be tempted to create
20 index files. This is not always an efficient use of resources!
You'd be better served by 200 index files, so that:

a. dask can spread the load if some lists of files take longer to process
than others
b. if the pipeline dies after successfully processing 15 lists, when you
retry the pipeline, you'll only be processing 5 lists with those same 20
workers and many workers will be sitting idle.

Which fields?
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Expand Down Expand Up @@ -245,6 +275,12 @@ reporting to look like the following:
Reducing : 100%|██████████| 10895/10895 [7:46:07<00:00, 2.57s/it]
Finishing: 100%|██████████| 6/6 [08:03<00:00, 80.65s/it]
``tqdm`` will try to make a guess about the type of output to provide: plain
text as for a command line, or a pretty ipywidget. If it tries to use a pretty
widget but your execution environment can't support the widget, you can
force the pipeline to use a simple progress bar with the ``simple_progress_bar``
argument.

For very long-running pipelines (e.g. multi-TB inputs), you can get an
email notification when the pipeline completes using the
``completion_email_address`` argument. This will send a brief email,
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ dependencies = [
"dask[complete]>=2024.3.0", # Includes dask expressions.
"deprecated",
"healpy",
"hipscat >=0.3.4",
"hipscat >=0.3.5",
"ipykernel", # Support for Jupyter notebooks
"numpy",
"pandas",
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
git+https://github.com/astronomy-commons/hipscat.git@main
133 changes: 129 additions & 4 deletions src/hipscat_import/catalog/file_readers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
import abc
from typing import Any, Dict, Union

import pandas as pd
import pyarrow
import pyarrow.dataset
import pyarrow.parquet as pq
from astropy.io import ascii as ascii_reader
from astropy.table import Table
Expand Down Expand Up @@ -30,8 +33,16 @@ def get_file_reader(
includes `.csv.gz` and other compressed csv files
- `fits`, flexible image transport system. often used for astropy tables.
- `parquet`, compressed columnar data format
- `ecsv`, astropy's enhanced CSV
- `indexed_csv`, "index" style reader, that accepts a file with a list
of csv files that are appended in-memory
- `indexed_parquet`, "index" style reader, that accepts a file with a list
of parquet files that are appended in-memory
chunksize (int): number of rows to read in a single iteration.
for single-file readers, large files are split into batches based on this value.
for index-style readers, we read files until we reach this chunksize and
create a single batch in-memory.
schema_file (str): path to a parquet schema file. if provided, header names
and column types will be pulled from the parquet schema metadata.
column_names (list[str]): for CSV files, the names of columns if no header
Expand Down Expand Up @@ -59,7 +70,16 @@ def get_file_reader(
)
if file_format == "parquet":
return ParquetReader(chunksize=chunksize, **kwargs)

if file_format == "indexed_csv":
return IndexedCsvReader(
chunksize=chunksize,
schema_file=schema_file,
column_names=column_names,
type_map=type_map,
**kwargs,
)
if file_format == "indexed_parquet":
return IndexedParquetReader(chunksize=chunksize, **kwargs)
raise NotImplementedError(f"File Format: {file_format} not supported")


Expand Down Expand Up @@ -96,14 +116,28 @@ def provenance_info(self) -> dict:
def regular_file_exists(self, input_file, storage_options: Union[Dict[Any, Any], None] = None, **_kwargs):
"""Check that the `input_file` points to a single regular file
Raises
Raises:
FileNotFoundError: if nothing exists at path, or directory found.
"""
if not file_io.does_file_or_directory_exist(input_file, storage_options=storage_options):
raise FileNotFoundError(f"File not found at path: {input_file}")
if not file_io.is_regular_file(input_file, storage_options=storage_options):
raise FileNotFoundError(f"Directory found at path - requires regular file: {input_file}")

def read_index_file(self, input_file, storage_options: Union[Dict[Any, Any], None] = None, **kwargs):
"""Read an "indexed" file.
This should contain a list of paths to files to be read and batched.
Raises:
FileNotFoundError: if nothing exists at path, or directory found.
"""
self.regular_file_exists(input_file, **kwargs)
file_names = file_io.load_text_file(input_file, storage_options=storage_options)
file_names = [f.strip() for f in file_names]
file_names = [f for f in file_names if f]
return file_names


class CsvReader(InputReader):
"""CSV reader for the most common CSV reading arguments.
Expand Down Expand Up @@ -158,7 +192,7 @@ def __init__(
if self.column_names:
self.kwargs["names"] = self.column_names
elif not self.header and schema_parquet is not None:
self.kwargs["names"] = schema_parquet.columns
self.kwargs["names"] = list(schema_parquet.columns)

if self.type_map:
self.kwargs["dtype"] = self.type_map
Expand All @@ -180,6 +214,37 @@ def read(self, input_file, read_columns=None):
yield from reader


class IndexedCsvReader(CsvReader):
"""Reads an index file, containing paths to CSV files to be read and batched
See CsvReader for additional configuration for reading CSV files.
"""

def read(self, input_file, read_columns=None):
file_names = self.read_index_file(input_file=input_file, **self.kwargs)

batch_size = 0
batch_frames = []
for file in file_names:
for single_frame in super().read(file, read_columns=read_columns):
if batch_size + len(single_frame) >= self.chunksize:
# We've hit our chunksize, send the batch off to the task.
if len(batch_frames) == 0:
yield single_frame
batch_size = 0
else:
yield pd.concat(batch_frames, ignore_index=True)
batch_frames = []
batch_frames.append(single_frame)
batch_size = len(single_frame)
else:
batch_frames.append(single_frame)
batch_size += len(single_frame)

if len(batch_frames) > 0:
yield pd.concat(batch_frames, ignore_index=True)


class AstropyEcsvReader(InputReader):
"""Reads astropy ascii .ecsv files.
Expand Down Expand Up @@ -241,7 +306,7 @@ def read(self, input_file, read_columns=None):
table = Table.read(input_file, memmap=True, **self.kwargs)
if read_columns:
table.keep_columns(read_columns)
if self.column_names:
elif self.column_names:
table.keep_columns(self.column_names)
elif self.skip_column_names:
table.remove_columns(self.skip_column_names)
Expand Down Expand Up @@ -291,3 +356,63 @@ def read(self, input_file, read_columns=None):
batch_size=self.chunksize, columns=columns, use_pandas_metadata=True
):
yield smaller_table.to_pandas()


class IndexedParquetReader(InputReader):
"""Reads an index file, containing paths to parquet files to be read and batched
Attributes:
chunksize (int): maximum number of rows to process at once.
Large files will be processed in chunks. Small files will be concatenated.
Also passed to pyarrow.dataset.Dataset.to_batches as `batch_size`.
batch_readahead (int): number of batches to read ahead.
Passed to pyarrow.dataset.Dataset.to_batches.
fragment_readahead (int): number of fragments to read ahead.
Passed to pyarrow.dataset.Dataset.to_batches.
use_threads (bool): whether to use multiple threads for reading.
Passed to pyarrow.dataset.Dataset.to_batches.
column_names (list[str] or None): Names of columns to use from the input dataset.
If None, use all columns.
kwargs: additional arguments to pass along to InputReader.read_index_file.
"""

def __init__(
self,
chunksize=500_000,
batch_readahead=16,
fragment_readahead=4,
use_threads=True,
column_names=None,
**kwargs,
):
self.chunksize = chunksize
self.batch_readahead = batch_readahead
self.fragment_readahead = fragment_readahead
self.use_threads = use_threads
self.column_names = column_names
self.kwargs = kwargs

def read(self, input_file, read_columns=None):
columns = read_columns or self.column_names
file_names = self.read_index_file(input_file=input_file, **self.kwargs)
(_, input_dataset) = file_io.read_parquet_dataset(file_names, **self.kwargs)

batches, nrows = [], 0
for batch in input_dataset.to_batches(
batch_size=self.chunksize,
batch_readahead=self.batch_readahead,
fragment_readahead=self.fragment_readahead,
use_threads=self.use_threads,
columns=columns,
):
if nrows + batch.num_rows > self.chunksize:
# We've hit the chunksize so load to a DataFrame and yield.
# There should always be at least one batch in here since batch_size == self.chunksize above.
yield pyarrow.Table.from_batches(batches).to_pandas()
batches, nrows = [], 0

batches.append(batch)
nrows += batch.num_rows

if len(batches) > 0:
yield pyarrow.Table.from_batches(batches).to_pandas()
24 changes: 17 additions & 7 deletions src/hipscat_import/catalog/map_reduce.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Import a set of non-hipscat files using dask for parallelization"""

import pickle
from typing import Any, Dict, Union

import healpy as hp
Expand All @@ -11,7 +12,6 @@
from hipscat.pixel_math.healpix_pixel import HealpixPixel
from hipscat.pixel_math.hipscat_id import HIPSCAT_ID_COLUMN, hipscat_id_to_healpix

from hipscat_import.catalog.file_readers import InputReader
from hipscat_import.catalog.resume_plan import ResumePlan
from hipscat_import.catalog.sparse_histogram import SparseHistogram
from hipscat_import.pipeline_resume_plan import get_pixel_cache_directory, print_task_failure
Expand All @@ -35,14 +35,16 @@ def _has_named_index(dataframe):

def _iterate_input_file(
input_file: FilePointer,
file_reader: InputReader,
pickled_reader_file: str,
highest_order,
ra_column,
dec_column,
use_hipscat_index=False,
read_columns=None,
):
"""Helper function to handle input file reading and healpix pixel calculation"""
with open(pickled_reader_file, "rb") as pickle_file:
file_reader = pickle.load(pickle_file)
if not file_reader:
raise NotImplementedError("No file reader implemented")

Expand All @@ -66,7 +68,7 @@ def _iterate_input_file(

def map_to_pixels(
input_file: FilePointer,
file_reader: InputReader,
pickled_reader_file: str,
resume_path: FilePointer,
mapping_key,
highest_order,
Expand Down Expand Up @@ -103,7 +105,13 @@ def map_to_pixels(
read_columns = [ra_column, dec_column]

for _, _, mapped_pixels in _iterate_input_file(
input_file, file_reader, highest_order, ra_column, dec_column, use_hipscat_index, read_columns
input_file,
pickled_reader_file,
highest_order,
ra_column,
dec_column,
use_hipscat_index,
read_columns,
):
mapped_pixel, count_at_pixel = np.unique(mapped_pixels, return_counts=True)

Expand All @@ -120,14 +128,14 @@ def map_to_pixels(

def split_pixels(
input_file: FilePointer,
file_reader: InputReader,
pickled_reader_file: str,
splitting_key,
highest_order,
ra_column,
dec_column,
cache_shard_path: FilePointer,
resume_path: FilePointer,
alignment=None,
alignment_file=None,
use_hipscat_index=False,
):
"""Map a file of input objects to their healpix pixels and split into shards.
Expand All @@ -149,8 +157,10 @@ def split_pixels(
FileNotFoundError: if the file does not exist, or is a directory
"""
try:
with open(alignment_file, "rb") as pickle_file:
alignment = pickle.load(pickle_file)
for chunk_number, data, mapped_pixels in _iterate_input_file(
input_file, file_reader, highest_order, ra_column, dec_column, use_hipscat_index
input_file, pickled_reader_file, highest_order, ra_column, dec_column, use_hipscat_index
):
aligned_pixels = alignment[mapped_pixels]
unique_pixels, unique_inverse = np.unique(aligned_pixels, return_inverse=True)
Expand Down
Loading

0 comments on commit 7009664

Please sign in to comment.