Skip to content

Commit

Permalink
Improve code coverage
Browse files Browse the repository at this point in the history
  • Loading branch information
delucchi-cmu committed Dec 14, 2023
1 parent 7eadbc5 commit ff39e53
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 64 deletions.
31 changes: 1 addition & 30 deletions src/hipscat_import/catalog/resume_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ class ResumePlan(PipelineResumePlan):
SPLITTING_STAGE = "splitting"
REDUCING_STAGE = "reducing"

ORIGINAL_INPUT_PATHS = "input_paths.txt"

HISTOGRAM_BINARY_FILE = "mapping_histogram.binary"
HISTOGRAMS_DIR = "histograms"

Expand Down Expand Up @@ -63,15 +61,7 @@ def gather_plan(self):
step_progress.update(1)

## Validate that we're operating on the same file set as the previous instance.
unique_file_paths = set(self.input_paths)
self.input_paths = list(unique_file_paths)
self.input_paths.sort()
original_input_paths = self.get_original_paths()
if not original_input_paths:
self.save_original_paths()
else:
if original_input_paths != unique_file_paths:
raise ValueError("Different file set from resumed pipeline execution.")
self.input_paths = self.check_original_input_paths(self.input_paths)
step_progress.update(1)

## Gather keys for execution.
Expand All @@ -97,25 +87,6 @@ def gather_plan(self):
)
step_progress.update(1)

def get_original_paths(self):
"""Get all input file paths from the first pipeline attempt."""
file_path = file_io.append_paths_to_pointer(self.tmp_path, self.ORIGINAL_INPUT_PATHS)
try:
with open(file_path, "r", encoding="utf-8") as file_handle:
contents = file_handle.readlines()
contents = [path.strip() for path in contents]
original_input_paths = set(contents)
return original_input_paths
except FileNotFoundError:
return []

def save_original_paths(self):
"""Save input file paths from the first pipeline attempt."""
file_path = file_io.append_paths_to_pointer(self.tmp_path, self.ORIGINAL_INPUT_PATHS)
with open(file_path, "w", encoding="utf-8") as file_handle:
for path in self.input_paths:
file_handle.write(f"{path}\n")

def get_remaining_map_keys(self):
"""Gather remaining keys, dropping successful mapping tasks from histogram names.
Expand Down
11 changes: 7 additions & 4 deletions src/hipscat_import/cross_match/macauff_map_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@
from hipscat.pixel_math.healpix_pixel_function import get_pixel_argsort

from hipscat_import.catalog.map_reduce import _get_pixel_directory, _iterate_input_file
from hipscat_import.catalog.resume_plan import ResumePlan
from hipscat_import.cross_match.macauff_resume_plan import MacauffResumePlan

# pylint: disable=too-many-arguments,too-many-locals


def split_associations(
input_file,
file_reader,
Expand Down Expand Up @@ -69,7 +70,7 @@ def split_associations(
filtered_data.to_parquet(output_file, index=False)
del data, filtered_data, data_indexes

ResumePlan.splitting_key_done(tmp_path=tmp_path, splitting_key=splitting_key)
MacauffResumePlan.splitting_key_done(tmp_path=tmp_path, splitting_key=splitting_key)


def reduce_associations(left_pixel, tmp_path, catalog_path, reduce_key):
Expand All @@ -78,7 +79,9 @@ def reduce_associations(left_pixel, tmp_path, catalog_path, reduce_key):
inputs = _get_pixel_directory(tmp_path, left_pixel.order, left_pixel.pixel)

if not file_io.directory_has_contents(inputs):
ResumePlan.reducing_key_done(tmp_path=tmp_path, reducing_key=f"{left_pixel.order}_{left_pixel.pixel}")
MacauffResumePlan.reducing_key_done(
tmp_path=tmp_path, reducing_key=f"{left_pixel.order}_{left_pixel.pixel}"
)
print(f"Warning: no input data for pixel {left_pixel}")
return
destination_dir = paths.pixel_directory(catalog_path, left_pixel.order, left_pixel.pixel)
Expand All @@ -100,4 +103,4 @@ def reduce_associations(left_pixel, tmp_path, catalog_path, reduce_key):
join_pixel_frame = join_pixel_frames.get_group((join_pixel.order, join_pixel.pixel)).reset_index()
writer.write_table(pa.Table.from_pandas(join_pixel_frame, schema=merged_table.schema))

ResumePlan.reducing_key_done(tmp_path=tmp_path, reducing_key=reduce_key)
MacauffResumePlan.reducing_key_done(tmp_path=tmp_path, reducing_key=reduce_key)
32 changes: 2 additions & 30 deletions src/hipscat_import/cross_match/macauff_resume_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

# pylint: disable=duplicate-code


@dataclass
class MacauffResumePlan(PipelineResumePlan):
"""Container class for holding the state of each file in the pipeline plan."""
Expand All @@ -27,8 +28,6 @@ class MacauffResumePlan(PipelineResumePlan):
SPLITTING_STAGE = "splitting"
REDUCING_STAGE = "reducing"

ORIGINAL_INPUT_PATHS = "input_paths.txt"

def __init__(self, args: MacauffArguments, left_pixels):
if not args.tmp_path: # pragma: no cover (not reachable, but required for mypy)
raise ValueError("tmp_path is required")
Expand All @@ -50,15 +49,7 @@ def gather_plan(self, left_pixels):
raise ValueError("splitting must be complete before reducing")

Check warning on line 49 in src/hipscat_import/cross_match/macauff_resume_plan.py

View check run for this annotation

Codecov / codecov/patch

src/hipscat_import/cross_match/macauff_resume_plan.py#L49

Added line #L49 was not covered by tests

## Validate that we're operating on the same file set as the previous instance.
unique_file_paths = set(self.input_paths)
self.input_paths = list(unique_file_paths)
self.input_paths.sort()
original_input_paths = self.get_original_paths()
if not original_input_paths:
self.save_original_paths()
else:
if original_input_paths != unique_file_paths:
raise ValueError("Different file set from resumed pipeline execution.")
self.input_paths = self.check_original_input_paths(self.input_paths)

## Gather keys for execution.
if not splitting_done:
Expand All @@ -75,25 +66,6 @@ def gather_plan(self, left_pixels):
exist_ok=True,
)

def get_original_paths(self):
"""Get all input file paths from the first pipeline attempt."""
file_path = file_io.append_paths_to_pointer(self.tmp_path, self.ORIGINAL_INPUT_PATHS)
try:
with open(file_path, "r", encoding="utf-8") as file_handle:
contents = file_handle.readlines()
contents = [path.strip() for path in contents]
original_input_paths = set(contents)
return original_input_paths
except FileNotFoundError:
return []

def save_original_paths(self):
"""Save input file paths from the first pipeline attempt."""
file_path = file_io.append_paths_to_pointer(self.tmp_path, self.ORIGINAL_INPUT_PATHS)
with open(file_path, "w", encoding="utf-8") as file_handle:
for path in self.input_paths:
file_handle.write(f"{path}\n")

def get_remaining_split_keys(self):
"""Gather remaining keys, dropping successful split tasks from done file names.
Expand Down
38 changes: 38 additions & 0 deletions src/hipscat_import/pipeline_resume_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ class PipelineResumePlan:
"""if true, a tqdm progress bar will be displayed for user
feedback of planning progress"""

ORIGINAL_INPUT_PATHS = "input_paths.txt"

def safe_to_resume(self):
"""Check that we are ok to resume an in-progress pipeline, if one exists.
Expand Down Expand Up @@ -148,3 +150,39 @@ def get_formatted_stage_name(stage_name) -> str:
stage_name = "progress"

return f"{stage_name.capitalize(): <10}"

def check_original_input_paths(self, input_paths):
"""Validate that we're operating on the same file set as the original pipeline,
or save the inputs as the originals if not found.
Args:
input_paths (list[str]): input paths that will be processed by a pipeline.
Raises:
ValueError if the retrieved file set differs from `input_paths`.
"""
unique_file_paths = set(input_paths)

original_input_paths = []

file_path = file_io.append_paths_to_pointer(self.tmp_path, self.ORIGINAL_INPUT_PATHS)
try:
with open(file_path, "r", encoding="utf-8") as file_handle:
contents = file_handle.readlines()
contents = [path.strip() for path in contents]
original_input_paths = set(contents)
except FileNotFoundError:
pass

if len(original_input_paths) == 0:
file_path = file_io.append_paths_to_pointer(self.tmp_path, self.ORIGINAL_INPUT_PATHS)
with open(file_path, "w", encoding="utf-8") as file_handle:
for path in input_paths:
file_handle.write(f"{path}\n")
else:
if original_input_paths != unique_file_paths:
raise ValueError("Different file set from resumed pipeline execution.")

input_paths = list(unique_file_paths)
input_paths.sort()
return input_paths

0 comments on commit ff39e53

Please sign in to comment.