diff --git a/src/hipscat_import/catalog/resume_plan.py b/src/hipscat_import/catalog/resume_plan.py index e73d6a4f..c554f32b 100644 --- a/src/hipscat_import/catalog/resume_plan.py +++ b/src/hipscat_import/catalog/resume_plan.py @@ -32,8 +32,6 @@ class ResumePlan(PipelineResumePlan): SPLITTING_STAGE = "splitting" REDUCING_STAGE = "reducing" - ORIGINAL_INPUT_PATHS = "input_paths.txt" - HISTOGRAM_BINARY_FILE = "mapping_histogram.binary" HISTOGRAMS_DIR = "histograms" @@ -63,15 +61,7 @@ def gather_plan(self): step_progress.update(1) ## Validate that we're operating on the same file set as the previous instance. - unique_file_paths = set(self.input_paths) - self.input_paths = list(unique_file_paths) - self.input_paths.sort() - original_input_paths = self.get_original_paths() - if not original_input_paths: - self.save_original_paths() - else: - if original_input_paths != unique_file_paths: - raise ValueError("Different file set from resumed pipeline execution.") + self.input_paths = self.check_original_input_paths(self.input_paths) step_progress.update(1) ## Gather keys for execution. @@ -97,25 +87,6 @@ def gather_plan(self): ) step_progress.update(1) - def get_original_paths(self): - """Get all input file paths from the first pipeline attempt.""" - file_path = file_io.append_paths_to_pointer(self.tmp_path, self.ORIGINAL_INPUT_PATHS) - try: - with open(file_path, "r", encoding="utf-8") as file_handle: - contents = file_handle.readlines() - contents = [path.strip() for path in contents] - original_input_paths = set(contents) - return original_input_paths - except FileNotFoundError: - return [] - - def save_original_paths(self): - """Save input file paths from the first pipeline attempt.""" - file_path = file_io.append_paths_to_pointer(self.tmp_path, self.ORIGINAL_INPUT_PATHS) - with open(file_path, "w", encoding="utf-8") as file_handle: - for path in self.input_paths: - file_handle.write(f"{path}\n") - def get_remaining_map_keys(self): """Gather remaining keys, dropping successful mapping tasks from histogram names. diff --git a/src/hipscat_import/cross_match/macauff_map_reduce.py b/src/hipscat_import/cross_match/macauff_map_reduce.py index a83b7c07..27a4b157 100644 --- a/src/hipscat_import/cross_match/macauff_map_reduce.py +++ b/src/hipscat_import/cross_match/macauff_map_reduce.py @@ -7,10 +7,11 @@ from hipscat.pixel_math.healpix_pixel_function import get_pixel_argsort from hipscat_import.catalog.map_reduce import _get_pixel_directory, _iterate_input_file -from hipscat_import.catalog.resume_plan import ResumePlan +from hipscat_import.cross_match.macauff_resume_plan import MacauffResumePlan # pylint: disable=too-many-arguments,too-many-locals + def split_associations( input_file, file_reader, @@ -69,7 +70,7 @@ def split_associations( filtered_data.to_parquet(output_file, index=False) del data, filtered_data, data_indexes - ResumePlan.splitting_key_done(tmp_path=tmp_path, splitting_key=splitting_key) + MacauffResumePlan.splitting_key_done(tmp_path=tmp_path, splitting_key=splitting_key) def reduce_associations(left_pixel, tmp_path, catalog_path, reduce_key): @@ -78,7 +79,9 @@ def reduce_associations(left_pixel, tmp_path, catalog_path, reduce_key): inputs = _get_pixel_directory(tmp_path, left_pixel.order, left_pixel.pixel) if not file_io.directory_has_contents(inputs): - ResumePlan.reducing_key_done(tmp_path=tmp_path, reducing_key=f"{left_pixel.order}_{left_pixel.pixel}") + MacauffResumePlan.reducing_key_done( + tmp_path=tmp_path, reducing_key=f"{left_pixel.order}_{left_pixel.pixel}" + ) print(f"Warning: no input data for pixel {left_pixel}") return destination_dir = paths.pixel_directory(catalog_path, left_pixel.order, left_pixel.pixel) @@ -100,4 +103,4 @@ def reduce_associations(left_pixel, tmp_path, catalog_path, reduce_key): join_pixel_frame = join_pixel_frames.get_group((join_pixel.order, join_pixel.pixel)).reset_index() writer.write_table(pa.Table.from_pandas(join_pixel_frame, schema=merged_table.schema)) - ResumePlan.reducing_key_done(tmp_path=tmp_path, reducing_key=reduce_key) + MacauffResumePlan.reducing_key_done(tmp_path=tmp_path, reducing_key=reduce_key) diff --git a/src/hipscat_import/cross_match/macauff_resume_plan.py b/src/hipscat_import/cross_match/macauff_resume_plan.py index cf481cbd..dfc3d871 100644 --- a/src/hipscat_import/cross_match/macauff_resume_plan.py +++ b/src/hipscat_import/cross_match/macauff_resume_plan.py @@ -13,6 +13,7 @@ # pylint: disable=duplicate-code + @dataclass class MacauffResumePlan(PipelineResumePlan): """Container class for holding the state of each file in the pipeline plan.""" @@ -27,8 +28,6 @@ class MacauffResumePlan(PipelineResumePlan): SPLITTING_STAGE = "splitting" REDUCING_STAGE = "reducing" - ORIGINAL_INPUT_PATHS = "input_paths.txt" - def __init__(self, args: MacauffArguments, left_pixels): if not args.tmp_path: # pragma: no cover (not reachable, but required for mypy) raise ValueError("tmp_path is required") @@ -50,15 +49,7 @@ def gather_plan(self, left_pixels): raise ValueError("splitting must be complete before reducing") ## Validate that we're operating on the same file set as the previous instance. - unique_file_paths = set(self.input_paths) - self.input_paths = list(unique_file_paths) - self.input_paths.sort() - original_input_paths = self.get_original_paths() - if not original_input_paths: - self.save_original_paths() - else: - if original_input_paths != unique_file_paths: - raise ValueError("Different file set from resumed pipeline execution.") + self.input_paths = self.check_original_input_paths(self.input_paths) ## Gather keys for execution. if not splitting_done: @@ -75,25 +66,6 @@ def gather_plan(self, left_pixels): exist_ok=True, ) - def get_original_paths(self): - """Get all input file paths from the first pipeline attempt.""" - file_path = file_io.append_paths_to_pointer(self.tmp_path, self.ORIGINAL_INPUT_PATHS) - try: - with open(file_path, "r", encoding="utf-8") as file_handle: - contents = file_handle.readlines() - contents = [path.strip() for path in contents] - original_input_paths = set(contents) - return original_input_paths - except FileNotFoundError: - return [] - - def save_original_paths(self): - """Save input file paths from the first pipeline attempt.""" - file_path = file_io.append_paths_to_pointer(self.tmp_path, self.ORIGINAL_INPUT_PATHS) - with open(file_path, "w", encoding="utf-8") as file_handle: - for path in self.input_paths: - file_handle.write(f"{path}\n") - def get_remaining_split_keys(self): """Gather remaining keys, dropping successful split tasks from done file names. diff --git a/src/hipscat_import/pipeline_resume_plan.py b/src/hipscat_import/pipeline_resume_plan.py index 38d727f7..91759f7c 100644 --- a/src/hipscat_import/pipeline_resume_plan.py +++ b/src/hipscat_import/pipeline_resume_plan.py @@ -24,6 +24,8 @@ class PipelineResumePlan: """if true, a tqdm progress bar will be displayed for user feedback of planning progress""" + ORIGINAL_INPUT_PATHS = "input_paths.txt" + def safe_to_resume(self): """Check that we are ok to resume an in-progress pipeline, if one exists. @@ -148,3 +150,39 @@ def get_formatted_stage_name(stage_name) -> str: stage_name = "progress" return f"{stage_name.capitalize(): <10}" + + def check_original_input_paths(self, input_paths): + """Validate that we're operating on the same file set as the original pipeline, + or save the inputs as the originals if not found. + + Args: + input_paths (list[str]): input paths that will be processed by a pipeline. + + Raises: + ValueError if the retrieved file set differs from `input_paths`. + """ + unique_file_paths = set(input_paths) + + original_input_paths = [] + + file_path = file_io.append_paths_to_pointer(self.tmp_path, self.ORIGINAL_INPUT_PATHS) + try: + with open(file_path, "r", encoding="utf-8") as file_handle: + contents = file_handle.readlines() + contents = [path.strip() for path in contents] + original_input_paths = set(contents) + except FileNotFoundError: + pass + + if len(original_input_paths) == 0: + file_path = file_io.append_paths_to_pointer(self.tmp_path, self.ORIGINAL_INPUT_PATHS) + with open(file_path, "w", encoding="utf-8") as file_handle: + for path in input_paths: + file_handle.write(f"{path}\n") + else: + if original_input_paths != unique_file_paths: + raise ValueError("Different file set from resumed pipeline execution.") + + input_paths = list(unique_file_paths) + input_paths.sort() + return input_paths