Skip to content

Commit

Permalink
Merge branch 'main' into issue/312/subset
Browse files Browse the repository at this point in the history
  • Loading branch information
delucchi-cmu committed May 29, 2024
2 parents e306dbb + d27f650 commit 0922f47
Show file tree
Hide file tree
Showing 13 changed files with 418 additions and 284 deletions.
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ dependencies = [
"healpy",
"hipscat >=0.3.4",
"ipykernel", # Support for Jupyter notebooks
"numpy",
"pandas",
"pyarrow",
"pyyaml",
"scipy",
"tqdm",
"numpy",
]

# On a mac, install optional dependencies with `pip install '.[dev]'` (include the single quotes)
Expand Down
7 changes: 7 additions & 0 deletions src/hipscat_import/catalog/file_readers.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,16 @@ def read(self, input_file, read_columns=None):
def provenance_info(self) -> dict:
"""Create dictionary of parameters for provenance tracking.
If any `storage_options` have been provided as kwargs, we will replace the
value with ``REDACTED`` for the purpose of writing to provenance info, as it
may contain user names or API keys.
Returns:
dictionary with all argument_name -> argument_value as key -> value pairs.
"""
all_args = vars(self)
if "kwargs" in all_args and "storage_options" in all_args["kwargs"]:
all_args["kwargs"]["storage_options"] = "REDACTED"
return {"input_reader_type": type(self).__name__, **vars(self)}

def regular_file_exists(self, input_file, storage_options: Union[Dict[Any, Any], None] = None, **_kwargs):
Expand Down
231 changes: 128 additions & 103 deletions src/hipscat_import/catalog/map_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from hipscat_import.catalog.file_readers import InputReader
from hipscat_import.catalog.resume_plan import ResumePlan
from hipscat_import.catalog.sparse_histogram import SparseHistogram
from hipscat_import.pipeline_resume_plan import get_pixel_cache_directory
from hipscat_import.pipeline_resume_plan import get_pixel_cache_directory, print_task_failure

# pylint: disable=too-many-locals,too-many-arguments

Expand Down Expand Up @@ -94,22 +94,28 @@ def map_to_pixels(
ValueError: if the `ra_column` or `dec_column` cannot be found in the input file.
FileNotFoundError: if the file does not exist, or is a directory
"""
histo = SparseHistogram.make_empty(highest_order)
try:
histo = SparseHistogram.make_empty(highest_order)

if use_hipscat_index:
read_columns = [HIPSCAT_ID_COLUMN]
else:
read_columns = [ra_column, dec_column]
if use_hipscat_index:
read_columns = [HIPSCAT_ID_COLUMN]
else:
read_columns = [ra_column, dec_column]

for _, _, mapped_pixels in _iterate_input_file(
input_file, file_reader, highest_order, ra_column, dec_column, use_hipscat_index, read_columns
):
mapped_pixel, count_at_pixel = np.unique(mapped_pixels, return_counts=True)
for _, _, mapped_pixels in _iterate_input_file(
input_file, file_reader, highest_order, ra_column, dec_column, use_hipscat_index, read_columns
):
mapped_pixel, count_at_pixel = np.unique(mapped_pixels, return_counts=True)

partial = SparseHistogram.make_from_counts(mapped_pixel, count_at_pixel, healpix_order=highest_order)
histo.add(partial)
partial = SparseHistogram.make_from_counts(
mapped_pixel, count_at_pixel, healpix_order=highest_order
)
histo.add(partial)

histo.to_file(ResumePlan.partial_histogram_file(tmp_path=resume_path, mapping_key=mapping_key))
histo.to_file(ResumePlan.partial_histogram_file(tmp_path=resume_path, mapping_key=mapping_key))
except Exception as exception: # pylint: disable=broad-exception-caught
print_task_failure(f"Failed MAPPING stage with file {input_file}", exception)
raise exception


def split_pixels(
Expand Down Expand Up @@ -142,30 +148,34 @@ def split_pixels(
ValueError: if the `ra_column` or `dec_column` cannot be found in the input file.
FileNotFoundError: if the file does not exist, or is a directory
"""
for chunk_number, data, mapped_pixels in _iterate_input_file(
input_file, file_reader, highest_order, ra_column, dec_column, use_hipscat_index
):
aligned_pixels = alignment[mapped_pixels]
unique_pixels, unique_inverse = np.unique(aligned_pixels, return_inverse=True)

for unique_index, [order, pixel, _] in enumerate(unique_pixels):
mapped_indexes = np.where(unique_inverse == unique_index)
data_indexes = data.index[mapped_indexes[0].tolist()]

filtered_data = data.filter(items=data_indexes, axis=0)

pixel_dir = get_pixel_cache_directory(cache_shard_path, HealpixPixel(order, pixel))
file_io.make_directory(pixel_dir, exist_ok=True)
output_file = file_io.append_paths_to_pointer(
pixel_dir, f"shard_{splitting_key}_{chunk_number}.parquet"
)
if _has_named_index(filtered_data):
filtered_data.to_parquet(output_file, index=True)
else:
filtered_data.to_parquet(output_file, index=False)
del filtered_data, data_indexes

ResumePlan.splitting_key_done(tmp_path=resume_path, splitting_key=splitting_key)
try:
for chunk_number, data, mapped_pixels in _iterate_input_file(
input_file, file_reader, highest_order, ra_column, dec_column, use_hipscat_index
):
aligned_pixels = alignment[mapped_pixels]
unique_pixels, unique_inverse = np.unique(aligned_pixels, return_inverse=True)

for unique_index, [order, pixel, _] in enumerate(unique_pixels):
mapped_indexes = np.where(unique_inverse == unique_index)
data_indexes = data.index[mapped_indexes[0].tolist()]

filtered_data = data.filter(items=data_indexes, axis=0)

pixel_dir = get_pixel_cache_directory(cache_shard_path, HealpixPixel(order, pixel))
file_io.make_directory(pixel_dir, exist_ok=True)
output_file = file_io.append_paths_to_pointer(
pixel_dir, f"shard_{splitting_key}_{chunk_number}.parquet"
)
if _has_named_index(filtered_data):
filtered_data.to_parquet(output_file, index=True)
else:
filtered_data.to_parquet(output_file, index=False)
del filtered_data, data_indexes

ResumePlan.splitting_key_done(tmp_path=resume_path, splitting_key=splitting_key)
except Exception as exception: # pylint: disable=broad-exception-caught
print_task_failure(f"Failed SPLITTING stage with file {input_file}", exception)
raise exception


def reduce_pixel_shards(
Expand Down Expand Up @@ -227,84 +237,99 @@ def reduce_pixel_shards(
ValueError: if the number of rows written doesn't equal provided
`destination_pixel_size`
"""
destination_dir = paths.pixel_directory(output_path, destination_pixel_order, destination_pixel_number)
file_io.make_directory(destination_dir, exist_ok=True, storage_options=storage_options)
try:
destination_dir = paths.pixel_directory(
output_path, destination_pixel_order, destination_pixel_number
)
file_io.make_directory(destination_dir, exist_ok=True, storage_options=storage_options)

destination_file = paths.pixel_catalog_file(
output_path, destination_pixel_order, destination_pixel_number
)
destination_file = paths.pixel_catalog_file(
output_path, destination_pixel_order, destination_pixel_number
)

schema = None
if use_schema_file:
schema = file_io.read_parquet_metadata(
use_schema_file, storage_options=storage_options
).schema.to_arrow_schema()
schema = None
if use_schema_file:
schema = file_io.read_parquet_metadata(
use_schema_file, storage_options=storage_options
).schema.to_arrow_schema()

tables = []
healpix_pixel = HealpixPixel(destination_pixel_order, destination_pixel_number)
pixel_dir = get_pixel_cache_directory(cache_shard_path, healpix_pixel)
tables = []
healpix_pixel = HealpixPixel(destination_pixel_order, destination_pixel_number)
pixel_dir = get_pixel_cache_directory(cache_shard_path, healpix_pixel)

if schema:
tables.append(pq.read_table(pixel_dir, schema=schema))
else:
tables.append(pq.read_table(pixel_dir))
if schema:
tables.append(pq.read_table(pixel_dir, schema=schema))
else:
tables.append(pq.read_table(pixel_dir))

merged_table = pa.concat_tables(tables)
merged_table = pa.concat_tables(tables)

rows_written = len(merged_table)
rows_written = len(merged_table)

if rows_written != destination_pixel_size:
raise ValueError(
"Unexpected number of objects at pixel "
f"({healpix_pixel})."
f" Expected {destination_pixel_size}, wrote {rows_written}"
)
if rows_written != destination_pixel_size:
raise ValueError(
"Unexpected number of objects at pixel "
f"({healpix_pixel})."
f" Expected {destination_pixel_size}, wrote {rows_written}"
)

dataframe = merged_table.to_pandas()
if sort_columns:
dataframe = dataframe.sort_values(sort_columns.split(","))
if add_hipscat_index:
## If we had a meaningful index before, preserve it as a column.
if _has_named_index(dataframe):
dataframe = dataframe.reset_index()
dataframe = merged_table.to_pandas()
if sort_columns:
dataframe = dataframe.sort_values(sort_columns.split(","))
if add_hipscat_index:
## If we had a meaningful index before, preserve it as a column.
if _has_named_index(dataframe):
dataframe = dataframe.reset_index()

dataframe[HIPSCAT_ID_COLUMN] = pixel_math.compute_hipscat_id(
dataframe[ra_column].values,
dataframe[dec_column].values,
)
dataframe = dataframe.set_index(HIPSCAT_ID_COLUMN).sort_index()

dataframe[HIPSCAT_ID_COLUMN] = pixel_math.compute_hipscat_id(
dataframe[ra_column].values,
dataframe[dec_column].values,
)
dataframe = dataframe.set_index(HIPSCAT_ID_COLUMN).sort_index()
# Adjust the schema to make sure that the _hipscat_index will
# be saved as a uint64
elif use_hipscat_index:
if dataframe.index.name != HIPSCAT_ID_COLUMN:
dataframe = dataframe.set_index(HIPSCAT_ID_COLUMN)
dataframe = dataframe.sort_index()

dataframe["Norder"] = np.full(rows_written, fill_value=healpix_pixel.order, dtype=np.uint8)
dataframe["Dir"] = np.full(rows_written, fill_value=healpix_pixel.dir, dtype=np.uint64)
dataframe["Npix"] = np.full(rows_written, fill_value=healpix_pixel.pixel, dtype=np.uint64)

# Adjust the schema to make sure that the _hipscat_index will
# be saved as a uint64
if schema:
pandas_index_column = schema.get_field_index("__index_level_0__")
if pandas_index_column != -1:
schema = schema.remove(pandas_index_column)
schema = schema.insert(0, pa.field(HIPSCAT_ID_COLUMN, pa.uint64()))
elif use_hipscat_index:
if dataframe.index.name != HIPSCAT_ID_COLUMN:
dataframe = dataframe.set_index(HIPSCAT_ID_COLUMN)
dataframe = dataframe.sort_index()

dataframe["Norder"] = np.full(rows_written, fill_value=healpix_pixel.order, dtype=np.uint8)
dataframe["Dir"] = np.full(rows_written, fill_value=healpix_pixel.dir, dtype=np.uint64)
dataframe["Npix"] = np.full(rows_written, fill_value=healpix_pixel.pixel, dtype=np.uint64)

if schema:
schema = (
schema.append(pa.field("Norder", pa.uint8()))
.append(pa.field("Dir", pa.uint64()))
.append(pa.field("Npix", pa.uint64()))
)
dataframe.to_parquet(destination_file, schema=schema, storage_options=storage_options)
else:
dataframe.to_parquet(destination_file, storage_options=storage_options)
schema = _modify_arrow_schema(schema, add_hipscat_index)
dataframe.to_parquet(destination_file, schema=schema, storage_options=storage_options)
else:
dataframe.to_parquet(destination_file, storage_options=storage_options)

del dataframe, merged_table, tables
del dataframe, merged_table, tables

if delete_input_files:
pixel_dir = get_pixel_cache_directory(cache_shard_path, healpix_pixel)
if delete_input_files:
pixel_dir = get_pixel_cache_directory(cache_shard_path, healpix_pixel)

file_io.remove_directory(pixel_dir, ignore_errors=True, storage_options=storage_options)
file_io.remove_directory(pixel_dir, ignore_errors=True, storage_options=storage_options)

ResumePlan.reducing_key_done(tmp_path=resume_path, reducing_key=reducing_key)
except Exception as exception: # pylint: disable=broad-exception-caught
print_task_failure(
f"Failed REDUCING stage for shard: {destination_pixel_order} {destination_pixel_number}",
exception,
)
raise exception


def _modify_arrow_schema(schema, add_hipscat_index):
if add_hipscat_index:
pandas_index_column = schema.get_field_index("__index_level_0__")
if pandas_index_column != -1:
schema = schema.remove(pandas_index_column)
schema = schema.insert(0, pa.field(HIPSCAT_ID_COLUMN, pa.uint64()))
schema = (
schema.append(pa.field("Norder", pa.uint8()))
.append(pa.field("Dir", pa.uint64()))
.append(pa.field("Npix", pa.uint64()))
)

ResumePlan.reducing_key_done(tmp_path=resume_path, reducing_key=reducing_key)
return schema
Loading

0 comments on commit 0922f47

Please sign in to comment.