Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Print file name for mapping/splitting failures. #318

Merged
merged 5 commits into from
May 24, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
230 changes: 128 additions & 102 deletions src/hipscat_import/catalog/map_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq
from dask.distributed import print as dask_print
from hipscat import pixel_math
from hipscat.io import FilePointer, file_io, paths
from hipscat.pixel_math.healpix_pixel import HealpixPixel
Expand Down Expand Up @@ -94,22 +95,29 @@ def map_to_pixels(
ValueError: if the `ra_column` or `dec_column` cannot be found in the input file.
FileNotFoundError: if the file does not exist, or is a directory
"""
histo = SparseHistogram.make_empty(highest_order)
try:
histo = SparseHistogram.make_empty(highest_order)

if use_hipscat_index:
read_columns = [HIPSCAT_ID_COLUMN]
else:
read_columns = [ra_column, dec_column]
if use_hipscat_index:
read_columns = [HIPSCAT_ID_COLUMN]
else:
read_columns = [ra_column, dec_column]

for _, _, mapped_pixels in _iterate_input_file(
input_file, file_reader, highest_order, ra_column, dec_column, use_hipscat_index, read_columns
):
mapped_pixel, count_at_pixel = np.unique(mapped_pixels, return_counts=True)
for _, _, mapped_pixels in _iterate_input_file(
input_file, file_reader, highest_order, ra_column, dec_column, use_hipscat_index, read_columns
):
mapped_pixel, count_at_pixel = np.unique(mapped_pixels, return_counts=True)

partial = SparseHistogram.make_from_counts(mapped_pixel, count_at_pixel, healpix_order=highest_order)
histo.add(partial)
partial = SparseHistogram.make_from_counts(
mapped_pixel, count_at_pixel, healpix_order=highest_order
)
histo.add(partial)

histo.to_file(ResumePlan.partial_histogram_file(tmp_path=resume_path, mapping_key=mapping_key))
histo.to_file(ResumePlan.partial_histogram_file(tmp_path=resume_path, mapping_key=mapping_key))
except Exception as exception: # pylint: disable=broad-exception-caught
dask_print("Failed MAPPING stage with file ", input_file)
dask_print(exception)
raise exception
camposandro marked this conversation as resolved.
Show resolved Hide resolved


def split_pixels(
Expand Down Expand Up @@ -142,30 +150,35 @@ def split_pixels(
ValueError: if the `ra_column` or `dec_column` cannot be found in the input file.
FileNotFoundError: if the file does not exist, or is a directory
"""
for chunk_number, data, mapped_pixels in _iterate_input_file(
input_file, file_reader, highest_order, ra_column, dec_column, use_hipscat_index
):
aligned_pixels = alignment[mapped_pixels]
unique_pixels, unique_inverse = np.unique(aligned_pixels, return_inverse=True)

for unique_index, [order, pixel, _] in enumerate(unique_pixels):
mapped_indexes = np.where(unique_inverse == unique_index)
data_indexes = data.index[mapped_indexes[0].tolist()]

filtered_data = data.filter(items=data_indexes, axis=0)

pixel_dir = get_pixel_cache_directory(cache_shard_path, HealpixPixel(order, pixel))
file_io.make_directory(pixel_dir, exist_ok=True)
output_file = file_io.append_paths_to_pointer(
pixel_dir, f"shard_{splitting_key}_{chunk_number}.parquet"
)
if _has_named_index(filtered_data):
filtered_data.to_parquet(output_file, index=True)
else:
filtered_data.to_parquet(output_file, index=False)
del filtered_data, data_indexes

ResumePlan.splitting_key_done(tmp_path=resume_path, splitting_key=splitting_key)
try:
for chunk_number, data, mapped_pixels in _iterate_input_file(
input_file, file_reader, highest_order, ra_column, dec_column, use_hipscat_index
):
aligned_pixels = alignment[mapped_pixels]
unique_pixels, unique_inverse = np.unique(aligned_pixels, return_inverse=True)

for unique_index, [order, pixel, _] in enumerate(unique_pixels):
mapped_indexes = np.where(unique_inverse == unique_index)
data_indexes = data.index[mapped_indexes[0].tolist()]

filtered_data = data.filter(items=data_indexes, axis=0)

pixel_dir = get_pixel_cache_directory(cache_shard_path, HealpixPixel(order, pixel))
file_io.make_directory(pixel_dir, exist_ok=True)
output_file = file_io.append_paths_to_pointer(
pixel_dir, f"shard_{splitting_key}_{chunk_number}.parquet"
)
if _has_named_index(filtered_data):
filtered_data.to_parquet(output_file, index=True)
else:
filtered_data.to_parquet(output_file, index=False)
del filtered_data, data_indexes

ResumePlan.splitting_key_done(tmp_path=resume_path, splitting_key=splitting_key)
except Exception as exception: # pylint: disable=broad-exception-caught
dask_print("Failed SPLITTING stage with file ", input_file)
dask_print(exception)
raise exception


def reduce_pixel_shards(
Expand Down Expand Up @@ -227,84 +240,97 @@ def reduce_pixel_shards(
ValueError: if the number of rows written doesn't equal provided
`destination_pixel_size`
"""
destination_dir = paths.pixel_directory(output_path, destination_pixel_order, destination_pixel_number)
file_io.make_directory(destination_dir, exist_ok=True, storage_options=storage_options)
try:
destination_dir = paths.pixel_directory(
output_path, destination_pixel_order, destination_pixel_number
)
file_io.make_directory(destination_dir, exist_ok=True, storage_options=storage_options)

destination_file = paths.pixel_catalog_file(
output_path, destination_pixel_order, destination_pixel_number
)
destination_file = paths.pixel_catalog_file(
output_path, destination_pixel_order, destination_pixel_number
)

schema = None
if use_schema_file:
schema = file_io.read_parquet_metadata(
use_schema_file, storage_options=storage_options
).schema.to_arrow_schema()
schema = None
if use_schema_file:
schema = file_io.read_parquet_metadata(
use_schema_file, storage_options=storage_options
).schema.to_arrow_schema()

tables = []
healpix_pixel = HealpixPixel(destination_pixel_order, destination_pixel_number)
pixel_dir = get_pixel_cache_directory(cache_shard_path, healpix_pixel)
tables = []
healpix_pixel = HealpixPixel(destination_pixel_order, destination_pixel_number)
pixel_dir = get_pixel_cache_directory(cache_shard_path, healpix_pixel)

if schema:
tables.append(pq.read_table(pixel_dir, schema=schema))
else:
tables.append(pq.read_table(pixel_dir))
if schema:
tables.append(pq.read_table(pixel_dir, schema=schema))
else:
tables.append(pq.read_table(pixel_dir))

merged_table = pa.concat_tables(tables)
merged_table = pa.concat_tables(tables)

rows_written = len(merged_table)
rows_written = len(merged_table)

if rows_written != destination_pixel_size:
raise ValueError(
"Unexpected number of objects at pixel "
f"({healpix_pixel})."
f" Expected {destination_pixel_size}, wrote {rows_written}"
)
if rows_written != destination_pixel_size:
raise ValueError(
"Unexpected number of objects at pixel "
f"({healpix_pixel})."
f" Expected {destination_pixel_size}, wrote {rows_written}"
)

dataframe = merged_table.to_pandas()
if sort_columns:
dataframe = dataframe.sort_values(sort_columns.split(","))
if add_hipscat_index:
## If we had a meaningful index before, preserve it as a column.
if _has_named_index(dataframe):
dataframe = dataframe.reset_index()
dataframe = merged_table.to_pandas()
if sort_columns:
dataframe = dataframe.sort_values(sort_columns.split(","))
if add_hipscat_index:
## If we had a meaningful index before, preserve it as a column.
if _has_named_index(dataframe):
dataframe = dataframe.reset_index()

dataframe[HIPSCAT_ID_COLUMN] = pixel_math.compute_hipscat_id(
dataframe[ra_column].values,
dataframe[dec_column].values,
)
dataframe = dataframe.set_index(HIPSCAT_ID_COLUMN).sort_index()

dataframe[HIPSCAT_ID_COLUMN] = pixel_math.compute_hipscat_id(
dataframe[ra_column].values,
dataframe[dec_column].values,
)
dataframe = dataframe.set_index(HIPSCAT_ID_COLUMN).sort_index()
# Adjust the schema to make sure that the _hipscat_index will
# be saved as a uint64
elif use_hipscat_index:
if dataframe.index.name != HIPSCAT_ID_COLUMN:
dataframe = dataframe.set_index(HIPSCAT_ID_COLUMN)
dataframe = dataframe.sort_index()

dataframe["Norder"] = np.full(rows_written, fill_value=healpix_pixel.order, dtype=np.uint8)
dataframe["Dir"] = np.full(rows_written, fill_value=healpix_pixel.dir, dtype=np.uint64)
dataframe["Npix"] = np.full(rows_written, fill_value=healpix_pixel.pixel, dtype=np.uint64)

# Adjust the schema to make sure that the _hipscat_index will
# be saved as a uint64
if schema:
pandas_index_column = schema.get_field_index("__index_level_0__")
if pandas_index_column != -1:
schema = schema.remove(pandas_index_column)
schema = schema.insert(0, pa.field(HIPSCAT_ID_COLUMN, pa.uint64()))
elif use_hipscat_index:
if dataframe.index.name != HIPSCAT_ID_COLUMN:
dataframe = dataframe.set_index(HIPSCAT_ID_COLUMN)
dataframe = dataframe.sort_index()

dataframe["Norder"] = np.full(rows_written, fill_value=healpix_pixel.order, dtype=np.uint8)
dataframe["Dir"] = np.full(rows_written, fill_value=healpix_pixel.dir, dtype=np.uint64)
dataframe["Npix"] = np.full(rows_written, fill_value=healpix_pixel.pixel, dtype=np.uint64)

if schema:
schema = (
schema.append(pa.field("Norder", pa.uint8()))
.append(pa.field("Dir", pa.uint64()))
.append(pa.field("Npix", pa.uint64()))
)
dataframe.to_parquet(destination_file, schema=schema, storage_options=storage_options)
else:
dataframe.to_parquet(destination_file, storage_options=storage_options)
schema = _modify_arrow_schema(schema, add_hipscat_index)
dataframe.to_parquet(destination_file, schema=schema, storage_options=storage_options)
else:
dataframe.to_parquet(destination_file, storage_options=storage_options)

del dataframe, merged_table, tables
del dataframe, merged_table, tables

if delete_input_files:
pixel_dir = get_pixel_cache_directory(cache_shard_path, healpix_pixel)
if delete_input_files:
pixel_dir = get_pixel_cache_directory(cache_shard_path, healpix_pixel)

file_io.remove_directory(pixel_dir, ignore_errors=True, storage_options=storage_options)

ResumePlan.reducing_key_done(tmp_path=resume_path, reducing_key=reducing_key)
except Exception as exception: # pylint: disable=broad-exception-caught
dask_print("Failed REDUCING stage for shard: ", destination_pixel_order, destination_pixel_number)
dask_print(exception)
raise exception

file_io.remove_directory(pixel_dir, ignore_errors=True, storage_options=storage_options)

ResumePlan.reducing_key_done(tmp_path=resume_path, reducing_key=reducing_key)
def _modify_arrow_schema(schema, add_hipscat_index):
if add_hipscat_index:
pandas_index_column = schema.get_field_index("__index_level_0__")
if pandas_index_column != -1:
schema = schema.remove(pandas_index_column)
schema = schema.insert(0, pa.field(HIPSCAT_ID_COLUMN, pa.uint64()))
schema = (
schema.append(pa.field("Norder", pa.uint8()))
.append(pa.field("Dir", pa.uint64()))
.append(pa.field("Npix", pa.uint64()))
)

return schema
16 changes: 1 addition & 15 deletions src/hipscat_import/pipeline_resume_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,22 +134,8 @@ def wait_for_futures(self, futures, stage_name, fail_fast=False):
):
if future.status == "error":
some_error = True
exception = future.exception()
trace_strs = [
f"{stage_name} task {future.key} failed with message:",
f" {type(exception).__name__}: {exception}",
" Traceback (most recent call last):",
]
stack_trace = exception.__traceback__
while stack_trace is not None:
filename = stack_trace.tb_frame.f_code.co_filename
method_name = stack_trace.tb_frame.f_code.co_name
line_number = stack_trace.tb_lineno
trace_strs.append(f' File "{filename}", line {line_number}, in {method_name}')
stack_trace = stack_trace.tb_next
print("\n".join(trace_strs))
if fail_fast:
raise exception
raise future.exception()

if some_error:
raise RuntimeError(f"Some {stage_name} stages failed. See logs for details.")
Expand Down
Loading