Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Print file name for mapping/splitting failures. #318

Merged
merged 5 commits into from
May 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
231 changes: 128 additions & 103 deletions src/hipscat_import/catalog/map_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from hipscat_import.catalog.file_readers import InputReader
from hipscat_import.catalog.resume_plan import ResumePlan
from hipscat_import.catalog.sparse_histogram import SparseHistogram
from hipscat_import.pipeline_resume_plan import get_pixel_cache_directory
from hipscat_import.pipeline_resume_plan import get_pixel_cache_directory, print_task_failure

# pylint: disable=too-many-locals,too-many-arguments

Expand Down Expand Up @@ -94,22 +94,28 @@ def map_to_pixels(
ValueError: if the `ra_column` or `dec_column` cannot be found in the input file.
FileNotFoundError: if the file does not exist, or is a directory
"""
histo = SparseHistogram.make_empty(highest_order)
try:
histo = SparseHistogram.make_empty(highest_order)

if use_hipscat_index:
read_columns = [HIPSCAT_ID_COLUMN]
else:
read_columns = [ra_column, dec_column]
if use_hipscat_index:
read_columns = [HIPSCAT_ID_COLUMN]
else:
read_columns = [ra_column, dec_column]

for _, _, mapped_pixels in _iterate_input_file(
input_file, file_reader, highest_order, ra_column, dec_column, use_hipscat_index, read_columns
):
mapped_pixel, count_at_pixel = np.unique(mapped_pixels, return_counts=True)
for _, _, mapped_pixels in _iterate_input_file(
input_file, file_reader, highest_order, ra_column, dec_column, use_hipscat_index, read_columns
):
mapped_pixel, count_at_pixel = np.unique(mapped_pixels, return_counts=True)

partial = SparseHistogram.make_from_counts(mapped_pixel, count_at_pixel, healpix_order=highest_order)
histo.add(partial)
partial = SparseHistogram.make_from_counts(
mapped_pixel, count_at_pixel, healpix_order=highest_order
)
histo.add(partial)

histo.to_file(ResumePlan.partial_histogram_file(tmp_path=resume_path, mapping_key=mapping_key))
histo.to_file(ResumePlan.partial_histogram_file(tmp_path=resume_path, mapping_key=mapping_key))
except Exception as exception: # pylint: disable=broad-exception-caught
print_task_failure(f"Failed MAPPING stage with file {input_file}", exception)
raise exception


def split_pixels(
Expand Down Expand Up @@ -142,30 +148,34 @@ def split_pixels(
ValueError: if the `ra_column` or `dec_column` cannot be found in the input file.
FileNotFoundError: if the file does not exist, or is a directory
"""
for chunk_number, data, mapped_pixels in _iterate_input_file(
input_file, file_reader, highest_order, ra_column, dec_column, use_hipscat_index
):
aligned_pixels = alignment[mapped_pixels]
unique_pixels, unique_inverse = np.unique(aligned_pixels, return_inverse=True)

for unique_index, [order, pixel, _] in enumerate(unique_pixels):
mapped_indexes = np.where(unique_inverse == unique_index)
data_indexes = data.index[mapped_indexes[0].tolist()]

filtered_data = data.filter(items=data_indexes, axis=0)

pixel_dir = get_pixel_cache_directory(cache_shard_path, HealpixPixel(order, pixel))
file_io.make_directory(pixel_dir, exist_ok=True)
output_file = file_io.append_paths_to_pointer(
pixel_dir, f"shard_{splitting_key}_{chunk_number}.parquet"
)
if _has_named_index(filtered_data):
filtered_data.to_parquet(output_file, index=True)
else:
filtered_data.to_parquet(output_file, index=False)
del filtered_data, data_indexes

ResumePlan.splitting_key_done(tmp_path=resume_path, splitting_key=splitting_key)
try:
for chunk_number, data, mapped_pixels in _iterate_input_file(
input_file, file_reader, highest_order, ra_column, dec_column, use_hipscat_index
):
aligned_pixels = alignment[mapped_pixels]
unique_pixels, unique_inverse = np.unique(aligned_pixels, return_inverse=True)

for unique_index, [order, pixel, _] in enumerate(unique_pixels):
mapped_indexes = np.where(unique_inverse == unique_index)
data_indexes = data.index[mapped_indexes[0].tolist()]

filtered_data = data.filter(items=data_indexes, axis=0)

pixel_dir = get_pixel_cache_directory(cache_shard_path, HealpixPixel(order, pixel))
file_io.make_directory(pixel_dir, exist_ok=True)
output_file = file_io.append_paths_to_pointer(
pixel_dir, f"shard_{splitting_key}_{chunk_number}.parquet"
)
if _has_named_index(filtered_data):
filtered_data.to_parquet(output_file, index=True)
else:
filtered_data.to_parquet(output_file, index=False)
del filtered_data, data_indexes

ResumePlan.splitting_key_done(tmp_path=resume_path, splitting_key=splitting_key)
except Exception as exception: # pylint: disable=broad-exception-caught
print_task_failure(f"Failed SPLITTING stage with file {input_file}", exception)
raise exception


def reduce_pixel_shards(
Expand Down Expand Up @@ -227,84 +237,99 @@ def reduce_pixel_shards(
ValueError: if the number of rows written doesn't equal provided
`destination_pixel_size`
"""
destination_dir = paths.pixel_directory(output_path, destination_pixel_order, destination_pixel_number)
file_io.make_directory(destination_dir, exist_ok=True, storage_options=storage_options)
try:
destination_dir = paths.pixel_directory(
output_path, destination_pixel_order, destination_pixel_number
)
file_io.make_directory(destination_dir, exist_ok=True, storage_options=storage_options)

destination_file = paths.pixel_catalog_file(
output_path, destination_pixel_order, destination_pixel_number
)
destination_file = paths.pixel_catalog_file(
output_path, destination_pixel_order, destination_pixel_number
)

schema = None
if use_schema_file:
schema = file_io.read_parquet_metadata(
use_schema_file, storage_options=storage_options
).schema.to_arrow_schema()
schema = None
if use_schema_file:
schema = file_io.read_parquet_metadata(
use_schema_file, storage_options=storage_options
).schema.to_arrow_schema()

tables = []
healpix_pixel = HealpixPixel(destination_pixel_order, destination_pixel_number)
pixel_dir = get_pixel_cache_directory(cache_shard_path, healpix_pixel)
tables = []
healpix_pixel = HealpixPixel(destination_pixel_order, destination_pixel_number)
pixel_dir = get_pixel_cache_directory(cache_shard_path, healpix_pixel)

if schema:
tables.append(pq.read_table(pixel_dir, schema=schema))
else:
tables.append(pq.read_table(pixel_dir))
if schema:
tables.append(pq.read_table(pixel_dir, schema=schema))
else:
tables.append(pq.read_table(pixel_dir))

merged_table = pa.concat_tables(tables)
merged_table = pa.concat_tables(tables)

rows_written = len(merged_table)
rows_written = len(merged_table)

if rows_written != destination_pixel_size:
raise ValueError(
"Unexpected number of objects at pixel "
f"({healpix_pixel})."
f" Expected {destination_pixel_size}, wrote {rows_written}"
)
if rows_written != destination_pixel_size:
raise ValueError(
"Unexpected number of objects at pixel "
f"({healpix_pixel})."
f" Expected {destination_pixel_size}, wrote {rows_written}"
)

dataframe = merged_table.to_pandas()
if sort_columns:
dataframe = dataframe.sort_values(sort_columns.split(","))
if add_hipscat_index:
## If we had a meaningful index before, preserve it as a column.
if _has_named_index(dataframe):
dataframe = dataframe.reset_index()
dataframe = merged_table.to_pandas()
if sort_columns:
dataframe = dataframe.sort_values(sort_columns.split(","))
if add_hipscat_index:
## If we had a meaningful index before, preserve it as a column.
if _has_named_index(dataframe):
dataframe = dataframe.reset_index()

dataframe[HIPSCAT_ID_COLUMN] = pixel_math.compute_hipscat_id(
dataframe[ra_column].values,
dataframe[dec_column].values,
)
dataframe = dataframe.set_index(HIPSCAT_ID_COLUMN).sort_index()

dataframe[HIPSCAT_ID_COLUMN] = pixel_math.compute_hipscat_id(
dataframe[ra_column].values,
dataframe[dec_column].values,
)
dataframe = dataframe.set_index(HIPSCAT_ID_COLUMN).sort_index()
# Adjust the schema to make sure that the _hipscat_index will
# be saved as a uint64
elif use_hipscat_index:
if dataframe.index.name != HIPSCAT_ID_COLUMN:
dataframe = dataframe.set_index(HIPSCAT_ID_COLUMN)
dataframe = dataframe.sort_index()

dataframe["Norder"] = np.full(rows_written, fill_value=healpix_pixel.order, dtype=np.uint8)
dataframe["Dir"] = np.full(rows_written, fill_value=healpix_pixel.dir, dtype=np.uint64)
dataframe["Npix"] = np.full(rows_written, fill_value=healpix_pixel.pixel, dtype=np.uint64)

# Adjust the schema to make sure that the _hipscat_index will
# be saved as a uint64
if schema:
pandas_index_column = schema.get_field_index("__index_level_0__")
if pandas_index_column != -1:
schema = schema.remove(pandas_index_column)
schema = schema.insert(0, pa.field(HIPSCAT_ID_COLUMN, pa.uint64()))
elif use_hipscat_index:
if dataframe.index.name != HIPSCAT_ID_COLUMN:
dataframe = dataframe.set_index(HIPSCAT_ID_COLUMN)
dataframe = dataframe.sort_index()

dataframe["Norder"] = np.full(rows_written, fill_value=healpix_pixel.order, dtype=np.uint8)
dataframe["Dir"] = np.full(rows_written, fill_value=healpix_pixel.dir, dtype=np.uint64)
dataframe["Npix"] = np.full(rows_written, fill_value=healpix_pixel.pixel, dtype=np.uint64)

if schema:
schema = (
schema.append(pa.field("Norder", pa.uint8()))
.append(pa.field("Dir", pa.uint64()))
.append(pa.field("Npix", pa.uint64()))
)
dataframe.to_parquet(destination_file, schema=schema, storage_options=storage_options)
else:
dataframe.to_parquet(destination_file, storage_options=storage_options)
schema = _modify_arrow_schema(schema, add_hipscat_index)
dataframe.to_parquet(destination_file, schema=schema, storage_options=storage_options)
else:
dataframe.to_parquet(destination_file, storage_options=storage_options)

del dataframe, merged_table, tables
del dataframe, merged_table, tables

if delete_input_files:
pixel_dir = get_pixel_cache_directory(cache_shard_path, healpix_pixel)
if delete_input_files:
pixel_dir = get_pixel_cache_directory(cache_shard_path, healpix_pixel)

file_io.remove_directory(pixel_dir, ignore_errors=True, storage_options=storage_options)
file_io.remove_directory(pixel_dir, ignore_errors=True, storage_options=storage_options)

ResumePlan.reducing_key_done(tmp_path=resume_path, reducing_key=reducing_key)
except Exception as exception: # pylint: disable=broad-exception-caught
print_task_failure(
f"Failed REDUCING stage for shard: {destination_pixel_order} {destination_pixel_number}",
exception,
)
raise exception


def _modify_arrow_schema(schema, add_hipscat_index):
if add_hipscat_index:
pandas_index_column = schema.get_field_index("__index_level_0__")
if pandas_index_column != -1:
schema = schema.remove(pandas_index_column)
schema = schema.insert(0, pa.field(HIPSCAT_ID_COLUMN, pa.uint64()))
schema = (
schema.append(pa.field("Norder", pa.uint8()))
.append(pa.field("Dir", pa.uint64()))
.append(pa.field("Npix", pa.uint64()))
)

ResumePlan.reducing_key_done(tmp_path=resume_path, reducing_key=reducing_key)
return schema
37 changes: 21 additions & 16 deletions src/hipscat_import/pipeline_resume_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
from dataclasses import dataclass
from pathlib import Path

from dask.distributed import as_completed
from dask.distributed import as_completed, get_worker
from dask.distributed import print as dask_print
from hipscat.io import FilePointer, file_io
from hipscat.pixel_math.healpix_pixel import HealpixPixel
from tqdm.auto import tqdm
Expand Down Expand Up @@ -134,22 +135,8 @@ def wait_for_futures(self, futures, stage_name, fail_fast=False):
):
if future.status == "error":
some_error = True
exception = future.exception()
trace_strs = [
f"{stage_name} task {future.key} failed with message:",
f" {type(exception).__name__}: {exception}",
" Traceback (most recent call last):",
]
stack_trace = exception.__traceback__
while stack_trace is not None:
filename = stack_trace.tb_frame.f_code.co_filename
method_name = stack_trace.tb_frame.f_code.co_name
line_number = stack_trace.tb_lineno
trace_strs.append(f' File "{filename}", line {line_number}, in {method_name}')
stack_trace = stack_trace.tb_next
print("\n".join(trace_strs))
if fail_fast:
raise exception
raise future.exception()

if some_error:
raise RuntimeError(f"Some {stage_name} stages failed. See logs for details.")
Expand Down Expand Up @@ -220,3 +207,21 @@ def get_pixel_cache_directory(cache_path, pixel: HealpixPixel):
return file_io.append_paths_to_pointer(
cache_path, f"order_{pixel.order}", f"dir_{pixel.dir}", f"pixel_{pixel.pixel}"
)


def print_task_failure(custom_message, exception):
"""Use dask's distributed print feature to print the exception message to the task's logs
and to the controller job's logs.

Optionally print the worker address if a worker is found.

Args:
custom_message (str): some custom message for the task that might help with debugging
exception (Exception): error raised in execution of the task.
"""
dask_print(custom_message)
try:
dask_print(" worker address:", get_worker().address)
except Exception: # pylint: disable=broad-exception-caught
pass
dask_print(exception)
Loading