Skip to content

Commit

Permalink
Merge branch 'main' into issue/308/indexed
Browse files Browse the repository at this point in the history
  • Loading branch information
delucchi-cmu authored Jun 11, 2024
2 parents db62e1e + 04c1a8f commit daf153a
Show file tree
Hide file tree
Showing 33 changed files with 1,228 additions and 775 deletions.
2 changes: 1 addition & 1 deletion docs/catalogs/arguments.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ A minimal arguments block will look something like:

.. code-block:: python
from hipscat_import.pipeline.catalog.arguments import ImportArguments
from hipscat_import.catalog.arguments import ImportArguments
args = ImportArguments(
sort_columns="ObjectID",
Expand Down
5 changes: 3 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ dependencies = [
"dask[complete]>=2024.3.0", # Includes dask expressions.
"deprecated",
"healpy",
"hipscat >=0.3.0",
"hipscat >=0.3.4",
"ipykernel", # Support for Jupyter notebooks
"numpy",
"pandas",
"pyarrow",
"pyyaml",
"scipy",
"tqdm",
"numpy",
]

# On a mac, install optional dependencies with `pip install '.[dev]'` (include the single quotes)
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
git+https://github.com/astronomy-commons/hipscat.git@main
7 changes: 7 additions & 0 deletions src/hipscat_import/catalog/file_readers.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,16 @@ def read(self, input_file, read_columns=None):
def provenance_info(self) -> dict:
"""Create dictionary of parameters for provenance tracking.
If any `storage_options` have been provided as kwargs, we will replace the
value with ``REDACTED`` for the purpose of writing to provenance info, as it
may contain user names or API keys.
Returns:
dictionary with all argument_name -> argument_value as key -> value pairs.
"""
all_args = vars(self)
if "kwargs" in all_args and "storage_options" in all_args["kwargs"]:
all_args["kwargs"]["storage_options"] = "REDACTED"
return {"input_reader_type": type(self).__name__, **vars(self)}

def regular_file_exists(self, input_file, storage_options: Union[Dict[Any, Any], None] = None, **_kwargs):
Expand Down
255 changes: 145 additions & 110 deletions src/hipscat_import/catalog/map_reduce.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Import a set of non-hipscat files using dask for parallelization"""

import pickle
from typing import Any, Dict, Union

import healpy as hp
Expand All @@ -11,10 +12,9 @@
from hipscat.pixel_math.healpix_pixel import HealpixPixel
from hipscat.pixel_math.hipscat_id import HIPSCAT_ID_COLUMN, hipscat_id_to_healpix

from hipscat_import.catalog.file_readers import InputReader
from hipscat_import.catalog.resume_plan import ResumePlan
from hipscat_import.catalog.sparse_histogram import SparseHistogram
from hipscat_import.pipeline_resume_plan import get_pixel_cache_directory
from hipscat_import.pipeline_resume_plan import get_pixel_cache_directory, print_task_failure

# pylint: disable=too-many-locals,too-many-arguments

Expand All @@ -35,14 +35,16 @@ def _has_named_index(dataframe):

def _iterate_input_file(
input_file: FilePointer,
file_reader: InputReader,
pickled_reader_file: str,
highest_order,
ra_column,
dec_column,
use_hipscat_index=False,
read_columns=None,
):
"""Helper function to handle input file reading and healpix pixel calculation"""
with open(pickled_reader_file, "rb") as pickle_file:
file_reader = pickle.load(pickle_file)
if not file_reader:
raise NotImplementedError("No file reader implemented")

Expand All @@ -66,7 +68,7 @@ def _iterate_input_file(

def map_to_pixels(
input_file: FilePointer,
file_reader: InputReader,
pickled_reader_file: str,
resume_path: FilePointer,
mapping_key,
highest_order,
Expand Down Expand Up @@ -94,34 +96,46 @@ def map_to_pixels(
ValueError: if the `ra_column` or `dec_column` cannot be found in the input file.
FileNotFoundError: if the file does not exist, or is a directory
"""
histo = SparseHistogram.make_empty(highest_order)
try:
histo = SparseHistogram.make_empty(highest_order)

if use_hipscat_index:
read_columns = [HIPSCAT_ID_COLUMN]
else:
read_columns = [ra_column, dec_column]

for _, _, mapped_pixels in _iterate_input_file(
input_file, file_reader, highest_order, ra_column, dec_column, use_hipscat_index, read_columns
):
mapped_pixel, count_at_pixel = np.unique(mapped_pixels, return_counts=True)

partial = SparseHistogram.make_from_counts(mapped_pixel, count_at_pixel, healpix_order=highest_order)
histo.add(partial)
if use_hipscat_index:
read_columns = [HIPSCAT_ID_COLUMN]
else:
read_columns = [ra_column, dec_column]

for _, _, mapped_pixels in _iterate_input_file(
input_file,
pickled_reader_file,
highest_order,
ra_column,
dec_column,
use_hipscat_index,
read_columns,
):
mapped_pixel, count_at_pixel = np.unique(mapped_pixels, return_counts=True)

partial = SparseHistogram.make_from_counts(
mapped_pixel, count_at_pixel, healpix_order=highest_order
)
histo.add(partial)

histo.to_file(ResumePlan.partial_histogram_file(tmp_path=resume_path, mapping_key=mapping_key))
histo.to_file(ResumePlan.partial_histogram_file(tmp_path=resume_path, mapping_key=mapping_key))
except Exception as exception: # pylint: disable=broad-exception-caught
print_task_failure(f"Failed MAPPING stage with file {input_file}", exception)
raise exception


def split_pixels(
input_file: FilePointer,
file_reader: InputReader,
pickled_reader_file: str,
splitting_key,
highest_order,
ra_column,
dec_column,
cache_shard_path: FilePointer,
resume_path: FilePointer,
alignment=None,
alignment_file=None,
use_hipscat_index=False,
):
"""Map a file of input objects to their healpix pixels and split into shards.
Expand All @@ -142,30 +156,36 @@ def split_pixels(
ValueError: if the `ra_column` or `dec_column` cannot be found in the input file.
FileNotFoundError: if the file does not exist, or is a directory
"""
for chunk_number, data, mapped_pixels in _iterate_input_file(
input_file, file_reader, highest_order, ra_column, dec_column, use_hipscat_index
):
aligned_pixels = alignment[mapped_pixels]
unique_pixels, unique_inverse = np.unique(aligned_pixels, return_inverse=True)

for unique_index, [order, pixel, _] in enumerate(unique_pixels):
mapped_indexes = np.where(unique_inverse == unique_index)
data_indexes = data.index[mapped_indexes[0].tolist()]

filtered_data = data.filter(items=data_indexes, axis=0)

pixel_dir = get_pixel_cache_directory(cache_shard_path, HealpixPixel(order, pixel))
file_io.make_directory(pixel_dir, exist_ok=True)
output_file = file_io.append_paths_to_pointer(
pixel_dir, f"shard_{splitting_key}_{chunk_number}.parquet"
)
if _has_named_index(filtered_data):
filtered_data.to_parquet(output_file, index=True)
else:
filtered_data.to_parquet(output_file, index=False)
del filtered_data, data_indexes

ResumePlan.splitting_key_done(tmp_path=resume_path, splitting_key=splitting_key)
try:
with open(alignment_file, "rb") as pickle_file:
alignment = pickle.load(pickle_file)
for chunk_number, data, mapped_pixels in _iterate_input_file(
input_file, pickled_reader_file, highest_order, ra_column, dec_column, use_hipscat_index
):
aligned_pixels = alignment[mapped_pixels]
unique_pixels, unique_inverse = np.unique(aligned_pixels, return_inverse=True)

for unique_index, [order, pixel, _] in enumerate(unique_pixels):
mapped_indexes = np.where(unique_inverse == unique_index)
data_indexes = data.index[mapped_indexes[0].tolist()]

filtered_data = data.filter(items=data_indexes, axis=0)

pixel_dir = get_pixel_cache_directory(cache_shard_path, HealpixPixel(order, pixel))
file_io.make_directory(pixel_dir, exist_ok=True)
output_file = file_io.append_paths_to_pointer(
pixel_dir, f"shard_{splitting_key}_{chunk_number}.parquet"
)
if _has_named_index(filtered_data):
filtered_data.to_parquet(output_file, index=True)
else:
filtered_data.to_parquet(output_file, index=False)
del filtered_data, data_indexes

ResumePlan.splitting_key_done(tmp_path=resume_path, splitting_key=splitting_key)
except Exception as exception: # pylint: disable=broad-exception-caught
print_task_failure(f"Failed SPLITTING stage with file {input_file}", exception)
raise exception


def reduce_pixel_shards(
Expand Down Expand Up @@ -227,84 +247,99 @@ def reduce_pixel_shards(
ValueError: if the number of rows written doesn't equal provided
`destination_pixel_size`
"""
destination_dir = paths.pixel_directory(output_path, destination_pixel_order, destination_pixel_number)
file_io.make_directory(destination_dir, exist_ok=True, storage_options=storage_options)
try:
destination_dir = paths.pixel_directory(
output_path, destination_pixel_order, destination_pixel_number
)
file_io.make_directory(destination_dir, exist_ok=True, storage_options=storage_options)

destination_file = paths.pixel_catalog_file(
output_path, destination_pixel_order, destination_pixel_number
)
destination_file = paths.pixel_catalog_file(
output_path, destination_pixel_order, destination_pixel_number
)

schema = None
if use_schema_file:
schema = file_io.read_parquet_metadata(
use_schema_file, storage_options=storage_options
).schema.to_arrow_schema()
schema = None
if use_schema_file:
schema = file_io.read_parquet_metadata(
use_schema_file, storage_options=storage_options
).schema.to_arrow_schema()

tables = []
healpix_pixel = HealpixPixel(destination_pixel_order, destination_pixel_number)
pixel_dir = get_pixel_cache_directory(cache_shard_path, healpix_pixel)
tables = []
healpix_pixel = HealpixPixel(destination_pixel_order, destination_pixel_number)
pixel_dir = get_pixel_cache_directory(cache_shard_path, healpix_pixel)

if schema:
tables.append(pq.read_table(pixel_dir, schema=schema))
else:
tables.append(pq.read_table(pixel_dir))
if schema:
tables.append(pq.read_table(pixel_dir, schema=schema))
else:
tables.append(pq.read_table(pixel_dir))

merged_table = pa.concat_tables(tables)
merged_table = pa.concat_tables(tables)

rows_written = len(merged_table)
rows_written = len(merged_table)

if rows_written != destination_pixel_size:
raise ValueError(
"Unexpected number of objects at pixel "
f"({healpix_pixel})."
f" Expected {destination_pixel_size}, wrote {rows_written}"
)
if rows_written != destination_pixel_size:
raise ValueError(
"Unexpected number of objects at pixel "
f"({healpix_pixel})."
f" Expected {destination_pixel_size}, wrote {rows_written}"
)

dataframe = merged_table.to_pandas()
if sort_columns:
dataframe = dataframe.sort_values(sort_columns.split(","))
if add_hipscat_index:
## If we had a meaningful index before, preserve it as a column.
if _has_named_index(dataframe):
dataframe = dataframe.reset_index()
dataframe = merged_table.to_pandas()
if sort_columns:
dataframe = dataframe.sort_values(sort_columns.split(","))
if add_hipscat_index:
## If we had a meaningful index before, preserve it as a column.
if _has_named_index(dataframe):
dataframe = dataframe.reset_index()

dataframe[HIPSCAT_ID_COLUMN] = pixel_math.compute_hipscat_id(
dataframe[ra_column].values,
dataframe[dec_column].values,
)
dataframe = dataframe.set_index(HIPSCAT_ID_COLUMN).sort_index()

dataframe[HIPSCAT_ID_COLUMN] = pixel_math.compute_hipscat_id(
dataframe[ra_column].values,
dataframe[dec_column].values,
)
dataframe = dataframe.set_index(HIPSCAT_ID_COLUMN).sort_index()
# Adjust the schema to make sure that the _hipscat_index will
# be saved as a uint64
elif use_hipscat_index:
if dataframe.index.name != HIPSCAT_ID_COLUMN:
dataframe = dataframe.set_index(HIPSCAT_ID_COLUMN)
dataframe = dataframe.sort_index()

dataframe["Norder"] = np.full(rows_written, fill_value=healpix_pixel.order, dtype=np.uint8)
dataframe["Dir"] = np.full(rows_written, fill_value=healpix_pixel.dir, dtype=np.uint64)
dataframe["Npix"] = np.full(rows_written, fill_value=healpix_pixel.pixel, dtype=np.uint64)

# Adjust the schema to make sure that the _hipscat_index will
# be saved as a uint64
if schema:
pandas_index_column = schema.get_field_index("__index_level_0__")
if pandas_index_column != -1:
schema = schema.remove(pandas_index_column)
schema = schema.insert(0, pa.field(HIPSCAT_ID_COLUMN, pa.uint64()))
elif use_hipscat_index:
if dataframe.index.name != HIPSCAT_ID_COLUMN:
dataframe = dataframe.set_index(HIPSCAT_ID_COLUMN)
dataframe = dataframe.sort_index()

dataframe["Norder"] = np.full(rows_written, fill_value=healpix_pixel.order, dtype=np.uint8)
dataframe["Dir"] = np.full(rows_written, fill_value=healpix_pixel.dir, dtype=np.uint64)
dataframe["Npix"] = np.full(rows_written, fill_value=healpix_pixel.pixel, dtype=np.uint64)

if schema:
schema = (
schema.append(pa.field("Norder", pa.uint8()))
.append(pa.field("Dir", pa.uint64()))
.append(pa.field("Npix", pa.uint64()))
)
dataframe.to_parquet(destination_file, schema=schema, storage_options=storage_options)
else:
dataframe.to_parquet(destination_file, storage_options=storage_options)
schema = _modify_arrow_schema(schema, add_hipscat_index)
dataframe.to_parquet(destination_file, schema=schema, storage_options=storage_options)
else:
dataframe.to_parquet(destination_file, storage_options=storage_options)

del dataframe, merged_table, tables
del dataframe, merged_table, tables

if delete_input_files:
pixel_dir = get_pixel_cache_directory(cache_shard_path, healpix_pixel)
if delete_input_files:
pixel_dir = get_pixel_cache_directory(cache_shard_path, healpix_pixel)

file_io.remove_directory(pixel_dir, ignore_errors=True, storage_options=storage_options)

ResumePlan.reducing_key_done(tmp_path=resume_path, reducing_key=reducing_key)
except Exception as exception: # pylint: disable=broad-exception-caught
print_task_failure(
f"Failed REDUCING stage for shard: {destination_pixel_order} {destination_pixel_number}",
exception,
)
raise exception

file_io.remove_directory(pixel_dir, ignore_errors=True, storage_options=storage_options)

ResumePlan.reducing_key_done(tmp_path=resume_path, reducing_key=reducing_key)
def _modify_arrow_schema(schema, add_hipscat_index):
if add_hipscat_index:
pandas_index_column = schema.get_field_index("__index_level_0__")
if pandas_index_column != -1:
schema = schema.remove(pandas_index_column)
schema = schema.insert(0, pa.field(HIPSCAT_ID_COLUMN, pa.uint64()))
schema = (
schema.append(pa.field("Norder", pa.uint8()))
.append(pa.field("Dir", pa.uint64()))
.append(pa.field("Npix", pa.uint64()))
)

return schema
Loading

0 comments on commit daf153a

Please sign in to comment.