Skip to content

Commit

Permalink
Print task failures, and pylint.
Browse files Browse the repository at this point in the history
  • Loading branch information
delucchi-cmu committed May 29, 2024
1 parent 0ddaf56 commit 92e2478
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 51 deletions.
106 changes: 59 additions & 47 deletions src/hipscat_import/margin_cache/margin_cache_map_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from hipscat.pixel_math.hipscat_id import HIPSCAT_ID_COLUMN

from hipscat_import.margin_cache.margin_cache_resume_plan import MarginCachePlan
from hipscat_import.pipeline_resume_plan import get_pixel_cache_directory
from hipscat_import.pipeline_resume_plan import get_pixel_cache_directory, print_task_failure


def map_pixel_shards(
Expand All @@ -25,29 +25,33 @@ def map_pixel_shards(
dec_column,
):
"""Creates margin cache shards from a source partition file."""
data = file_io.load_parquet_to_pandas(partition_file, storage_options=input_storage_options)

data["margin_pixel"] = hp.ang2pix(
2**margin_order,
data[ra_column].values,
data[dec_column].values,
lonlat=True,
nest=True,
)
try:
data = file_io.load_parquet_to_pandas(partition_file, storage_options=input_storage_options)

data["margin_pixel"] = hp.ang2pix(
2**margin_order,
data[ra_column].values,
data[dec_column].values,
lonlat=True,
nest=True,
)

margin_pairs = pd.read_csv(margin_pair_file)
constrained_data = data.reset_index().merge(margin_pairs, on="margin_pixel")
margin_pairs = pd.read_csv(margin_pair_file)
constrained_data = data.reset_index().merge(margin_pairs, on="margin_pixel")

if len(constrained_data):
constrained_data.groupby(["partition_order", "partition_pixel"]).apply(
_to_pixel_shard,
margin_threshold=margin_threshold,
output_path=output_path,
ra_column=ra_column,
dec_column=dec_column,
)
if len(constrained_data):
constrained_data.groupby(["partition_order", "partition_pixel"]).apply(
_to_pixel_shard,
margin_threshold=margin_threshold,
output_path=output_path,
ra_column=ra_column,
dec_column=dec_column,
)

MarginCachePlan.mapping_key_done(output_path, mapping_key)
MarginCachePlan.mapping_key_done(output_path, mapping_key)
except Exception as exception: # pylint: disable=broad-exception-caught
print_task_failure(f"Failed MAPPING stage for pixel: {mapping_key}", exception)
raise exception


def _to_pixel_shard(data, margin_threshold, output_path, ra_column, dec_column):
Expand Down Expand Up @@ -117,29 +121,37 @@ def reduce_margin_shards(
input_storage_options,
):
"""Reduce all partition pixel directories into a single file"""
shard_dir = get_pixel_cache_directory(
intermediate_directory, HealpixPixel(partition_order, partition_pixel)
)
if file_io.does_file_or_directory_exist(shard_dir):
data = ds.dataset(shard_dir, format="parquet")
full_df = data.to_table().to_pandas()
margin_cache_dir = paths.pixel_directory(output_path, partition_order, partition_pixel)
file_io.make_directory(margin_cache_dir, exist_ok=True, storage_options=output_storage_options)

if len(full_df):
schema = file_io.read_parquet_metadata(
original_catalog_metadata, storage_options=input_storage_options
).schema.to_arrow_schema()

schema = (
schema.append(pa.field("margin_Norder", pa.uint8()))
.append(pa.field("margin_Dir", pa.uint64()))
.append(pa.field("margin_Npix", pa.uint64()))
)

margin_cache_file_path = paths.pixel_catalog_file(output_path, partition_order, partition_pixel)

full_df.to_parquet(margin_cache_file_path, schema=schema, storage_options=output_storage_options)
file_io.remove_directory(shard_dir)

MarginCachePlan.reducing_key_done(intermediate_directory, reducing_key)
try:
shard_dir = get_pixel_cache_directory(
intermediate_directory, HealpixPixel(partition_order, partition_pixel)
)
if file_io.does_file_or_directory_exist(shard_dir):
data = ds.dataset(shard_dir, format="parquet")
full_df = data.to_table().to_pandas()
margin_cache_dir = paths.pixel_directory(output_path, partition_order, partition_pixel)
file_io.make_directory(margin_cache_dir, exist_ok=True, storage_options=output_storage_options)

if len(full_df):
schema = file_io.read_parquet_metadata(
original_catalog_metadata, storage_options=input_storage_options
).schema.to_arrow_schema()

schema = (
schema.append(pa.field("margin_Norder", pa.uint8()))
.append(pa.field("margin_Dir", pa.uint64()))
.append(pa.field("margin_Npix", pa.uint64()))
)

margin_cache_file_path = paths.pixel_catalog_file(
output_path, partition_order, partition_pixel
)

full_df.to_parquet(
margin_cache_file_path, schema=schema, storage_options=output_storage_options
)
file_io.remove_directory(shard_dir)

MarginCachePlan.reducing_key_done(intermediate_directory, reducing_key)
except Exception as exception: # pylint: disable=broad-exception-caught
print_task_failure(f"Failed REDUCING stage for pixel: {reducing_key}", exception)
raise exception
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import pandas as pd
from hipscat import pixel_math
from hipscat.io import file_io, paths
from hipscat.io import file_io
from hipscat.pixel_math.healpix_pixel import HealpixPixel
from tqdm.auto import tqdm

Expand Down
54 changes: 52 additions & 2 deletions tests/hipscat_import/margin_cache/test_margin_cache_map_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,27 @@ def test_to_pixel_shard_polar(tmp_path, polar_data_shard_df):
validate_result_dataframe(path, 360)


@pytest.mark.dask
def test_reduce_margin_shards(tmp_path, basic_data_shard_df):
def test_map_pixel_shards_error(tmp_path, capsys):
"""Test error behavior on reduce stage. e.g. by not creating the original
catalog parquet files."""
with pytest.raises(FileNotFoundError):
margin_cache_map_reduce.map_pixel_shards(
paths.pixel_catalog_file(tmp_path, 1, 0),
mapping_key="1_21",
input_storage_options=None,
margin_pair_file="",
margin_threshold=10,
output_path=tmp_path,
margin_order=4,
ra_column="ra",
dec_column="dec",
)

captured = capsys.readouterr()
assert "No such file or directory" in captured.out


def test_reduce_margin_shards(tmp_path):
intermediate_dir = os.path.join(tmp_path, "intermediate")
partition_dir = get_pixel_cache_directory(intermediate_dir, HealpixPixel(1, 21))
shard_dir = paths.pixel_directory(partition_dir, 1, 21)
Expand Down Expand Up @@ -132,3 +151,34 @@ def test_reduce_margin_shards(tmp_path, basic_data_shard_df):

validate_result_dataframe(result_path, 720)
assert not os.path.exists(shard_dir)


def test_reduce_margin_shards_error(tmp_path, basic_data_shard_df, capsys):
"""Test error behavior on reduce stage. e.g. by not creating the original
catalog metadata."""
intermediate_dir = os.path.join(tmp_path, "intermediate")
partition_dir = get_pixel_cache_directory(intermediate_dir, HealpixPixel(1, 21))
shard_dir = paths.pixel_directory(partition_dir, 1, 21)
os.makedirs(shard_dir)
os.makedirs(os.path.join(intermediate_dir, "reducing"))

# Don't write anything at the metadata path!
schema_path = os.path.join(tmp_path, "metadata.parquet")

basic_data_shard_df.to_parquet(paths.pixel_catalog_file(partition_dir, 1, 0))
basic_data_shard_df.to_parquet(paths.pixel_catalog_file(partition_dir, 1, 1))

with pytest.raises(FileNotFoundError):
margin_cache_map_reduce.reduce_margin_shards(
intermediate_dir,
"1_21",
tmp_path,
None,
1,
21,
original_catalog_metadata=schema_path,
input_storage_options=None,
)

captured = capsys.readouterr()
assert "No such file or directory" in captured.out
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import numpy as np
import numpy.testing as npt
import pytest
from hipscat.io import file_io
from hipscat.pixel_math.healpix_pixel import HealpixPixel

from hipscat_import.margin_cache.margin_cache_arguments import MarginCacheArguments
Expand Down

0 comments on commit 92e2478

Please sign in to comment.