Skip to content

Commit

Permalink
Print file name for mapping/splitting failures. (#318)
Browse files Browse the repository at this point in the history
* Use dask print for worker failures

* Pylint.

* Improve test coverage

* Print worker address (if inside worker)

* Remove unneeded line.
  • Loading branch information
delucchi-cmu committed May 24, 2024
1 parent 793212f commit c3fda23
Show file tree
Hide file tree
Showing 6 changed files with 278 additions and 207 deletions.
231 changes: 128 additions & 103 deletions src/hipscat_import/catalog/map_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from hipscat_import.catalog.file_readers import InputReader
from hipscat_import.catalog.resume_plan import ResumePlan
from hipscat_import.catalog.sparse_histogram import SparseHistogram
from hipscat_import.pipeline_resume_plan import get_pixel_cache_directory
from hipscat_import.pipeline_resume_plan import get_pixel_cache_directory, print_task_failure

# pylint: disable=too-many-locals,too-many-arguments

Expand Down Expand Up @@ -94,22 +94,28 @@ def map_to_pixels(
ValueError: if the `ra_column` or `dec_column` cannot be found in the input file.
FileNotFoundError: if the file does not exist, or is a directory
"""
histo = SparseHistogram.make_empty(highest_order)
try:
histo = SparseHistogram.make_empty(highest_order)

if use_hipscat_index:
read_columns = [HIPSCAT_ID_COLUMN]
else:
read_columns = [ra_column, dec_column]
if use_hipscat_index:
read_columns = [HIPSCAT_ID_COLUMN]
else:
read_columns = [ra_column, dec_column]

for _, _, mapped_pixels in _iterate_input_file(
input_file, file_reader, highest_order, ra_column, dec_column, use_hipscat_index, read_columns
):
mapped_pixel, count_at_pixel = np.unique(mapped_pixels, return_counts=True)
for _, _, mapped_pixels in _iterate_input_file(
input_file, file_reader, highest_order, ra_column, dec_column, use_hipscat_index, read_columns
):
mapped_pixel, count_at_pixel = np.unique(mapped_pixels, return_counts=True)

partial = SparseHistogram.make_from_counts(mapped_pixel, count_at_pixel, healpix_order=highest_order)
histo.add(partial)
partial = SparseHistogram.make_from_counts(
mapped_pixel, count_at_pixel, healpix_order=highest_order
)
histo.add(partial)

histo.to_file(ResumePlan.partial_histogram_file(tmp_path=resume_path, mapping_key=mapping_key))
histo.to_file(ResumePlan.partial_histogram_file(tmp_path=resume_path, mapping_key=mapping_key))
except Exception as exception: # pylint: disable=broad-exception-caught
print_task_failure(f"Failed MAPPING stage with file {input_file}", exception)
raise exception


def split_pixels(
Expand Down Expand Up @@ -142,30 +148,34 @@ def split_pixels(
ValueError: if the `ra_column` or `dec_column` cannot be found in the input file.
FileNotFoundError: if the file does not exist, or is a directory
"""
for chunk_number, data, mapped_pixels in _iterate_input_file(
input_file, file_reader, highest_order, ra_column, dec_column, use_hipscat_index
):
aligned_pixels = alignment[mapped_pixels]
unique_pixels, unique_inverse = np.unique(aligned_pixels, return_inverse=True)

for unique_index, [order, pixel, _] in enumerate(unique_pixels):
mapped_indexes = np.where(unique_inverse == unique_index)
data_indexes = data.index[mapped_indexes[0].tolist()]

filtered_data = data.filter(items=data_indexes, axis=0)

pixel_dir = get_pixel_cache_directory(cache_shard_path, HealpixPixel(order, pixel))
file_io.make_directory(pixel_dir, exist_ok=True)
output_file = file_io.append_paths_to_pointer(
pixel_dir, f"shard_{splitting_key}_{chunk_number}.parquet"
)
if _has_named_index(filtered_data):
filtered_data.to_parquet(output_file, index=True)
else:
filtered_data.to_parquet(output_file, index=False)
del filtered_data, data_indexes

ResumePlan.splitting_key_done(tmp_path=resume_path, splitting_key=splitting_key)
try:
for chunk_number, data, mapped_pixels in _iterate_input_file(
input_file, file_reader, highest_order, ra_column, dec_column, use_hipscat_index
):
aligned_pixels = alignment[mapped_pixels]
unique_pixels, unique_inverse = np.unique(aligned_pixels, return_inverse=True)

for unique_index, [order, pixel, _] in enumerate(unique_pixels):
mapped_indexes = np.where(unique_inverse == unique_index)
data_indexes = data.index[mapped_indexes[0].tolist()]

filtered_data = data.filter(items=data_indexes, axis=0)

pixel_dir = get_pixel_cache_directory(cache_shard_path, HealpixPixel(order, pixel))
file_io.make_directory(pixel_dir, exist_ok=True)
output_file = file_io.append_paths_to_pointer(
pixel_dir, f"shard_{splitting_key}_{chunk_number}.parquet"
)
if _has_named_index(filtered_data):
filtered_data.to_parquet(output_file, index=True)
else:
filtered_data.to_parquet(output_file, index=False)
del filtered_data, data_indexes

ResumePlan.splitting_key_done(tmp_path=resume_path, splitting_key=splitting_key)
except Exception as exception: # pylint: disable=broad-exception-caught
print_task_failure(f"Failed SPLITTING stage with file {input_file}", exception)
raise exception


def reduce_pixel_shards(
Expand Down Expand Up @@ -227,84 +237,99 @@ def reduce_pixel_shards(
ValueError: if the number of rows written doesn't equal provided
`destination_pixel_size`
"""
destination_dir = paths.pixel_directory(output_path, destination_pixel_order, destination_pixel_number)
file_io.make_directory(destination_dir, exist_ok=True, storage_options=storage_options)
try:
destination_dir = paths.pixel_directory(
output_path, destination_pixel_order, destination_pixel_number
)
file_io.make_directory(destination_dir, exist_ok=True, storage_options=storage_options)

destination_file = paths.pixel_catalog_file(
output_path, destination_pixel_order, destination_pixel_number
)
destination_file = paths.pixel_catalog_file(
output_path, destination_pixel_order, destination_pixel_number
)

schema = None
if use_schema_file:
schema = file_io.read_parquet_metadata(
use_schema_file, storage_options=storage_options
).schema.to_arrow_schema()
schema = None
if use_schema_file:
schema = file_io.read_parquet_metadata(
use_schema_file, storage_options=storage_options
).schema.to_arrow_schema()

tables = []
healpix_pixel = HealpixPixel(destination_pixel_order, destination_pixel_number)
pixel_dir = get_pixel_cache_directory(cache_shard_path, healpix_pixel)
tables = []
healpix_pixel = HealpixPixel(destination_pixel_order, destination_pixel_number)
pixel_dir = get_pixel_cache_directory(cache_shard_path, healpix_pixel)

if schema:
tables.append(pq.read_table(pixel_dir, schema=schema))
else:
tables.append(pq.read_table(pixel_dir))
if schema:
tables.append(pq.read_table(pixel_dir, schema=schema))
else:
tables.append(pq.read_table(pixel_dir))

merged_table = pa.concat_tables(tables)
merged_table = pa.concat_tables(tables)

rows_written = len(merged_table)
rows_written = len(merged_table)

if rows_written != destination_pixel_size:
raise ValueError(
"Unexpected number of objects at pixel "
f"({healpix_pixel})."
f" Expected {destination_pixel_size}, wrote {rows_written}"
)
if rows_written != destination_pixel_size:
raise ValueError(
"Unexpected number of objects at pixel "
f"({healpix_pixel})."
f" Expected {destination_pixel_size}, wrote {rows_written}"
)

dataframe = merged_table.to_pandas()
if sort_columns:
dataframe = dataframe.sort_values(sort_columns.split(","))
if add_hipscat_index:
## If we had a meaningful index before, preserve it as a column.
if _has_named_index(dataframe):
dataframe = dataframe.reset_index()
dataframe = merged_table.to_pandas()
if sort_columns:
dataframe = dataframe.sort_values(sort_columns.split(","))
if add_hipscat_index:
## If we had a meaningful index before, preserve it as a column.
if _has_named_index(dataframe):
dataframe = dataframe.reset_index()

dataframe[HIPSCAT_ID_COLUMN] = pixel_math.compute_hipscat_id(
dataframe[ra_column].values,
dataframe[dec_column].values,
)
dataframe = dataframe.set_index(HIPSCAT_ID_COLUMN).sort_index()

dataframe[HIPSCAT_ID_COLUMN] = pixel_math.compute_hipscat_id(
dataframe[ra_column].values,
dataframe[dec_column].values,
)
dataframe = dataframe.set_index(HIPSCAT_ID_COLUMN).sort_index()
# Adjust the schema to make sure that the _hipscat_index will
# be saved as a uint64
elif use_hipscat_index:
if dataframe.index.name != HIPSCAT_ID_COLUMN:
dataframe = dataframe.set_index(HIPSCAT_ID_COLUMN)
dataframe = dataframe.sort_index()

dataframe["Norder"] = np.full(rows_written, fill_value=healpix_pixel.order, dtype=np.uint8)
dataframe["Dir"] = np.full(rows_written, fill_value=healpix_pixel.dir, dtype=np.uint64)
dataframe["Npix"] = np.full(rows_written, fill_value=healpix_pixel.pixel, dtype=np.uint64)

# Adjust the schema to make sure that the _hipscat_index will
# be saved as a uint64
if schema:
pandas_index_column = schema.get_field_index("__index_level_0__")
if pandas_index_column != -1:
schema = schema.remove(pandas_index_column)
schema = schema.insert(0, pa.field(HIPSCAT_ID_COLUMN, pa.uint64()))
elif use_hipscat_index:
if dataframe.index.name != HIPSCAT_ID_COLUMN:
dataframe = dataframe.set_index(HIPSCAT_ID_COLUMN)
dataframe = dataframe.sort_index()

dataframe["Norder"] = np.full(rows_written, fill_value=healpix_pixel.order, dtype=np.uint8)
dataframe["Dir"] = np.full(rows_written, fill_value=healpix_pixel.dir, dtype=np.uint64)
dataframe["Npix"] = np.full(rows_written, fill_value=healpix_pixel.pixel, dtype=np.uint64)

if schema:
schema = (
schema.append(pa.field("Norder", pa.uint8()))
.append(pa.field("Dir", pa.uint64()))
.append(pa.field("Npix", pa.uint64()))
)
dataframe.to_parquet(destination_file, schema=schema, storage_options=storage_options)
else:
dataframe.to_parquet(destination_file, storage_options=storage_options)
schema = _modify_arrow_schema(schema, add_hipscat_index)
dataframe.to_parquet(destination_file, schema=schema, storage_options=storage_options)
else:
dataframe.to_parquet(destination_file, storage_options=storage_options)

del dataframe, merged_table, tables
del dataframe, merged_table, tables

if delete_input_files:
pixel_dir = get_pixel_cache_directory(cache_shard_path, healpix_pixel)
if delete_input_files:
pixel_dir = get_pixel_cache_directory(cache_shard_path, healpix_pixel)

file_io.remove_directory(pixel_dir, ignore_errors=True, storage_options=storage_options)
file_io.remove_directory(pixel_dir, ignore_errors=True, storage_options=storage_options)

ResumePlan.reducing_key_done(tmp_path=resume_path, reducing_key=reducing_key)
except Exception as exception: # pylint: disable=broad-exception-caught
print_task_failure(
f"Failed REDUCING stage for shard: {destination_pixel_order} {destination_pixel_number}",
exception,
)
raise exception


def _modify_arrow_schema(schema, add_hipscat_index):
if add_hipscat_index:
pandas_index_column = schema.get_field_index("__index_level_0__")
if pandas_index_column != -1:
schema = schema.remove(pandas_index_column)
schema = schema.insert(0, pa.field(HIPSCAT_ID_COLUMN, pa.uint64()))
schema = (
schema.append(pa.field("Norder", pa.uint8()))
.append(pa.field("Dir", pa.uint64()))
.append(pa.field("Npix", pa.uint64()))
)

ResumePlan.reducing_key_done(tmp_path=resume_path, reducing_key=reducing_key)
return schema
37 changes: 21 additions & 16 deletions src/hipscat_import/pipeline_resume_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
from dataclasses import dataclass
from pathlib import Path

from dask.distributed import as_completed
from dask.distributed import as_completed, get_worker
from dask.distributed import print as dask_print
from hipscat.io import FilePointer, file_io
from hipscat.pixel_math.healpix_pixel import HealpixPixel
from tqdm.auto import tqdm
Expand Down Expand Up @@ -134,22 +135,8 @@ def wait_for_futures(self, futures, stage_name, fail_fast=False):
):
if future.status == "error":
some_error = True
exception = future.exception()
trace_strs = [
f"{stage_name} task {future.key} failed with message:",
f" {type(exception).__name__}: {exception}",
" Traceback (most recent call last):",
]
stack_trace = exception.__traceback__
while stack_trace is not None:
filename = stack_trace.tb_frame.f_code.co_filename
method_name = stack_trace.tb_frame.f_code.co_name
line_number = stack_trace.tb_lineno
trace_strs.append(f' File "{filename}", line {line_number}, in {method_name}')
stack_trace = stack_trace.tb_next
print("\n".join(trace_strs))
if fail_fast:
raise exception
raise future.exception()

if some_error:
raise RuntimeError(f"Some {stage_name} stages failed. See logs for details.")
Expand Down Expand Up @@ -220,3 +207,21 @@ def get_pixel_cache_directory(cache_path, pixel: HealpixPixel):
return file_io.append_paths_to_pointer(
cache_path, f"order_{pixel.order}", f"dir_{pixel.dir}", f"pixel_{pixel.pixel}"
)


def print_task_failure(custom_message, exception):
"""Use dask's distributed print feature to print the exception message to the task's logs
and to the controller job's logs.
Optionally print the worker address if a worker is found.
Args:
custom_message (str): some custom message for the task that might help with debugging
exception (Exception): error raised in execution of the task.
"""
dask_print(custom_message)
try:
dask_print(" worker address:", get_worker().address)
except Exception: # pylint: disable=broad-exception-caught
pass
dask_print(exception)
Loading

0 comments on commit c3fda23

Please sign in to comment.