Skip to content

Commit

Permalink
Re-raise instead of swallowing error.
Browse files Browse the repository at this point in the history
  • Loading branch information
delucchi-cmu committed Jun 15, 2023
1 parent ef051b9 commit 9105d84
Show file tree
Hide file tree
Showing 9 changed files with 54 additions and 64 deletions.
4 changes: 2 additions & 2 deletions docs/notebooks/intro_notebook.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@
" # Calculate the cosine of each value of X\n",
" z = np.cos(x)\n",
" # Plot the sine wave in blue, using degrees rather than radians on the X axis\n",
" pl.plot(xdeg, y, color='blue', label='Sine wave')\n",
" pl.plot(xdeg, y, color=\"blue\", label=\"Sine wave\")\n",
" # Plot the cos wave in green, using degrees rather than radians on the X axis\n",
" pl.plot(xdeg, z, color='green', label='Cosine wave')\n",
" pl.plot(xdeg, z, color=\"green\", label=\"Cosine wave\")\n",
" pl.xlabel(\"Degrees\")\n",
" # More sensible X axis values\n",
" pl.xticks(np.arange(0, 361, 45))\n",
Expand Down
3 changes: 1 addition & 2 deletions src/hipscat_import/association/run_association.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@
from tqdm import tqdm

from hipscat_import.association.arguments import AssociationArguments
from hipscat_import.association.map_reduce import (map_association,
reduce_association)
from hipscat_import.association.map_reduce import map_association, reduce_association


def run(args):
Expand Down
4 changes: 2 additions & 2 deletions src/hipscat_import/catalog/map_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def map_to_pixels(
Args:
input_file (FilePointer): file to read for catalog data.
file_reader (hipscat_import.catalog.file_readers.InputReader): instance of input
file_reader (hipscat_import.catalog.file_readers.InputReader): instance of input
reader that specifies arguments necessary for reading from the input file.
shard_suffix (str): unique counter for this input file, used
when creating intermediate files
Expand Down Expand Up @@ -137,7 +137,7 @@ def split_pixels(
Args:
input_file (FilePointer): file to read for catalog data.
file_reader (hipscat_import.catalog.file_readers.InputReader): instance
file_reader (hipscat_import.catalog.file_readers.InputReader): instance
of input reader that specifies arguments necessary for reading from the input file.
shard_suffix (str): unique counter for this input file, used
when creating intermediate files
Expand Down
47 changes: 22 additions & 25 deletions src/hipscat_import/margin_cache/margin_cache_map_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@

# pylint: disable=too-many-locals,too-many-arguments


def map_pixel_shards(
partition_file,
margin_pairs,
margin_threshold,
output_path,
margin_order,
ra_column,
dec_column
dec_column,
):
"""Creates margin cache shards from a source partition file."""
data = pd.read_parquet(partition_file)
Expand All @@ -38,7 +39,10 @@ def map_pixel_shards(
dec_column=dec_column,
)

def _to_pixel_shard(data, margin_threshold, output_path, margin_order, ra_column, dec_column):

def _to_pixel_shard(
data, margin_threshold, output_path, margin_order, ra_column, dec_column
):
"""Do boundary checking for the cached partition and then output remaining data."""
order, pix = data["partition_order"].iloc[0], data["partition_pixel"].iloc[0]
source_order, source_pix = data["Norder"].iloc[0], data["Npix"].iloc[0]
Expand All @@ -60,9 +64,7 @@ def _to_pixel_shard(data, margin_threshold, output_path, margin_order, ra_column
)
else:
data["margin_check"] = pixel_math.check_margin_bounds(
data[ra_column].values,
data[dec_column].values,
bounding_polygons
data[ra_column].values, data[dec_column].values, bounding_polygons
)

# pylint: disable-next=singleton-comparison
Expand All @@ -74,28 +76,27 @@ def _to_pixel_shard(data, margin_threshold, output_path, margin_order, ra_column
# generate a file name for our margin shard
partition_file = paths.pixel_catalog_file(output_path, order, pix)
partition_dir = f"{partition_file[:-8]}/"
shard_dir = paths.pixel_directory(
partition_dir, source_order, source_pix
)
shard_dir = paths.pixel_directory(partition_dir, source_order, source_pix)

file_io.make_directory(shard_dir, exist_ok=True)

shard_path = paths.pixel_catalog_file(
partition_dir, source_order, source_pix
)
shard_path = paths.pixel_catalog_file(partition_dir, source_order, source_pix)

final_df = margin_data.drop(columns=[
"partition_order",
"partition_pixel",
"margin_check",
"margin_pixel"
])
final_df = margin_data.drop(
columns=[
"partition_order",
"partition_pixel",
"margin_check",
"margin_pixel",
]
)

if is_polar:
final_df = final_df.drop(columns=["is_trunc"])

final_df.to_parquet(shard_path)


def _margin_filter_polar(
data,
order,
Expand All @@ -104,13 +105,11 @@ def _margin_filter_polar(
margin_threshold,
ra_column,
dec_column,
bounding_polygons
bounding_polygons,
):
"""Filter out margin data around the poles."""
trunc_pix = pixel_math.get_truncated_margin_pixels(
order=order,
pix=pix,
margin_order=margin_order
order=order, pix=pix, margin_order=margin_order
)
data["is_trunc"] = np.isin(data["margin_pixel"], trunc_pix)

Expand All @@ -124,14 +123,12 @@ def _margin_filter_polar(
order,
pix,
margin_order,
margin_threshold
margin_threshold,
)
data.loc[data["is_trunc"] == True, "margin_check"] = trunc_check

no_trunc_check = pixel_math.check_margin_bounds(
other_data[ra_column].values,
other_data[dec_column].values,
bounding_polygons
other_data[ra_column].values, other_data[dec_column].values, bounding_polygons
)
data.loc[data["is_trunc"] == False, "margin_check"] = no_trunc_check
# pylint: enable=singleton-comparison
Expand Down
2 changes: 1 addition & 1 deletion src/hipscat_import/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def pipeline_with_client(args: RuntimeArguments, client: Client):

def _send_failure_email(args: RuntimeArguments, exception: Exception):
if not args.completion_email_address:
return
raise exception
message = EmailMessage()
message["Subject"] = "hipscat-import failure."
message["To"] = args.completion_email_address
Expand Down
3 changes: 1 addition & 2 deletions tests/hipscat_import/association/test_run_association.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@
import numpy.testing as npt
import pandas as pd
import pytest
from hipscat.catalog.association_catalog.association_catalog import \
AssociationCatalog
from hipscat.catalog.association_catalog.association_catalog import AssociationCatalog

import hipscat_import.association.run_association as runner
from hipscat_import.association.arguments import AssociationArguments
Expand Down
3 changes: 1 addition & 2 deletions tests/hipscat_import/catalog/test_argument_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@

import pytest

from hipscat_import.catalog.arguments import (ImportArguments,
check_healpix_order_range)
from hipscat_import.catalog.arguments import ImportArguments, check_healpix_order_range

# pylint: disable=protected-access

Expand Down
4 changes: 1 addition & 3 deletions tests/hipscat_import/catalog/test_map_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,7 @@ def test_reduce_order0(parquet_shards_dir, assert_parquet_file_ids, tmp_path):
assert_parquet_file_ids(output_file, "id", expected_ids)


def test_reduce_hipscat_index(
parquet_shards_dir, assert_parquet_file_ids, tmp_path
):
def test_reduce_hipscat_index(parquet_shards_dir, assert_parquet_file_ids, tmp_path):
"""Test reducing with or without a _hipscat_index field"""
mr.reduce_pixel_shards(
cache_path=parquet_shards_dir,
Expand Down
48 changes: 23 additions & 25 deletions tests/hipscat_import/margin_cache/test_margin_cache_map_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,17 @@

from hipscat_import.margin_cache import margin_cache_map_reduce

keep_cols = [
"weird_ra",
"weird_dec"
]
keep_cols = ["weird_ra", "weird_dec"]

drop_cols = [
"partition_order",
"partition_pixel",
"margin_check",
"partition_order",
"partition_pixel",
"margin_check",
"margin_pixel",
"is_trunc"
"is_trunc",
]


def validate_result_dataframe(df_path, expected_len):
res_df = pd.read_parquet(df_path)

Expand All @@ -32,10 +30,11 @@ def validate_result_dataframe(df_path, expected_len):
for col in drop_cols:
assert col not in cols


@pytest.mark.timeout(5)
def test_to_pixel_shard_equator(tmp_path):
ras = np.arange(0.,360.)
dec = np.full(360, 0.)
ras = np.arange(0.0, 360.0)
dec = np.full(360, 0.0)
ppix = np.full(360, 21)
porder = np.full(360, 1)
norder = np.full(360, 1)
Expand All @@ -44,21 +43,21 @@ def test_to_pixel_shard_equator(tmp_path):
test_df = pd.DataFrame(
data=zip(ras, dec, ppix, porder, norder, npix),
columns=[
"weird_ra",
"weird_ra",
"weird_dec",
"partition_pixel",
"partition_order",
"Norder",
"Npix"
]
"Npix",
],
)

test_df["margin_pixel"] = hp.ang2pix(
2**3,
test_df["weird_ra"].values,
test_df["weird_dec"].values,
lonlat=True,
nest=True
nest=True,
)

margin_cache_map_reduce._to_pixel_shard(
Expand All @@ -67,21 +66,21 @@ def test_to_pixel_shard_equator(tmp_path):
output_path=tmp_path,
margin_order=3,
ra_column="weird_ra",
dec_column="weird_dec"
dec_column="weird_dec",
)

path = file_io.append_paths_to_pointer(
tmp_path,
"Norder=1/Dir=0/Npix=21/Norder=1/Dir=0/Npix=0.parquet"
tmp_path, "Norder=1/Dir=0/Npix=21/Norder=1/Dir=0/Npix=0.parquet"
)

assert file_io.does_file_or_directory_exist(path)

validate_result_dataframe(path, 46)


@pytest.mark.timeout(5)
def test_to_pixel_shard_polar(tmp_path):
ras = np.arange(0.,360.)
ras = np.arange(0.0, 360.0)
dec = np.full(360, 89.9)
ppix = np.full(360, 15)
porder = np.full(360, 2)
Expand All @@ -91,21 +90,21 @@ def test_to_pixel_shard_polar(tmp_path):
test_df = pd.DataFrame(
data=zip(ras, dec, ppix, porder, norder, npix),
columns=[
"weird_ra",
"weird_ra",
"weird_dec",
"partition_pixel",
"partition_order",
"Norder",
"Npix"
]
"Npix",
],
)

test_df["margin_pixel"] = hp.ang2pix(
2**3,
test_df["weird_ra"].values,
test_df["weird_dec"].values,
lonlat=True,
nest=True
nest=True,
)

margin_cache_map_reduce._to_pixel_shard(
Expand All @@ -114,12 +113,11 @@ def test_to_pixel_shard_polar(tmp_path):
output_path=tmp_path,
margin_order=3,
ra_column="weird_ra",
dec_column="weird_dec"
dec_column="weird_dec",
)

path = file_io.append_paths_to_pointer(
tmp_path,
"Norder=2/Dir=0/Npix=15/Norder=2/Dir=0/Npix=0.parquet"
tmp_path, "Norder=2/Dir=0/Npix=15/Norder=2/Dir=0/Npix=0.parquet"
)

assert file_io.does_file_or_directory_exist(path)
Expand Down

0 comments on commit 9105d84

Please sign in to comment.