From 76938d9f75963eb6b988a3af2ccb316f85ed8688 Mon Sep 17 00:00:00 2001 From: Melissa DeLucchi Date: Tue, 22 Oct 2024 14:22:31 -0400 Subject: [PATCH 1/3] Keep track of expected number of margin rows written. --- src/hats_import/margin_cache/margin_cache.py | 11 +++++- .../margin_cache/margin_cache_map_reduce.py | 9 +++-- .../margin_cache/margin_cache_resume_plan.py | 28 +++++++++++++-- src/hats_import/pipeline_resume_plan.py | 36 ++++++++++++++++++- tests/data/markers/mapping/map_001_done | 1 + tests/data/markers/mapping/map_002_done | 1 + .../margin_cache/test_margin_cache.py | 16 +++++++++ .../test_margin_cache_resume_plan.py | 19 ++++++++++ .../hats_import/test_pipeline_resume_plan.py | 6 ++++ 9 files changed, 120 insertions(+), 7 deletions(-) create mode 100644 tests/data/markers/mapping/map_001_done create mode 100644 tests/data/markers/mapping/map_002_done diff --git a/src/hats_import/margin_cache/margin_cache.py b/src/hats_import/margin_cache/margin_cache.py index 4042cde1..ace4af4f 100644 --- a/src/hats_import/margin_cache/margin_cache.py +++ b/src/hats_import/margin_cache/margin_cache.py @@ -39,6 +39,11 @@ def generate_margin_cache(args, client): ) resume_plan.wait_for_mapping(futures) + with resume_plan.print_progress(total=1, stage_name="Binning") as step_progress: + total_rows = resume_plan.get_mapping_total() + if not total_rows: + raise ValueError("Margin cache contains no rows. Increase margin size and re-run.") + if not resume_plan.is_reducing_done(): futures = [] for reducing_key, pix in resume_plan.get_remaining_reduce_keys(): @@ -57,7 +62,11 @@ def generate_margin_cache(args, client): resume_plan.wait_for_reducing(futures) with resume_plan.print_progress(total=4, stage_name="Finishing") as step_progress: - total_rows = parquet_metadata.write_parquet_metadata(args.catalog_path) + metadata_total_rows = parquet_metadata.write_parquet_metadata(args.catalog_path) + if metadata_total_rows != total_rows: + raise ValueError( + f"Wrote unexpected number of rows ({total_rows} expected, {metadata_total_rows} written)" + ) step_progress.update(1) metadata_path = paths.get_parquet_metadata_pointer(args.catalog_path) partition_info = PartitionInfo.read_from_file(metadata_path) diff --git a/src/hats_import/margin_cache/margin_cache_map_reduce.py b/src/hats_import/margin_cache/margin_cache_map_reduce.py index 931dc2f3..625046c1 100644 --- a/src/hats_import/margin_cache/margin_cache_map_reduce.py +++ b/src/hats_import/margin_cache/margin_cache_map_reduce.py @@ -55,12 +55,13 @@ def map_pixel_shards( # For every possible output pixel, find the full margin_order pixel filter list, # perform the filter, and pass along to helper method to compute fine filter # and write out shard file. + num_rows = 0 for partition_key, data_filter in margin_pixel_filter.groupby(["partition_order", "partition_pixel"]): data_filter = np.unique(data_filter["filter_value"]).tolist() pixel = HealpixPixel(partition_key[0], partition_key[1]) filtered_data = data.iloc[data_filter] - _to_pixel_shard( + num_rows += _to_pixel_shard( filtered_data=filtered_data, pixel=pixel, margin_threshold=margin_threshold, @@ -71,7 +72,7 @@ def map_pixel_shards( fine_filtering=fine_filtering, ) - MarginCachePlan.mapping_key_done(output_path, mapping_key) + MarginCachePlan.mapping_key_done(output_path, mapping_key, num_rows) except Exception as exception: # pylint: disable=broad-exception-caught print_task_failure(f"Failed MAPPING stage for pixel: {mapping_key}", exception) raise exception @@ -101,7 +102,8 @@ def _to_pixel_shard( else: margin_data = filtered_data - if len(margin_data): + num_rows = len(margin_data) + if num_rows: # generate a file name for our margin shard, that uses both sets of Norder/Npix partition_dir = get_pixel_cache_directory(output_path, pixel) shard_dir = paths.pixel_directory(partition_dir, source_pixel.order, source_pixel.pixel) @@ -132,6 +134,7 @@ def _to_pixel_shard( margin_data = margin_data.sort_index() margin_data.to_parquet(shard_path.path, filesystem=shard_path.fs) + return num_rows def reduce_margin_shards( diff --git a/src/hats_import/margin_cache/margin_cache_resume_plan.py b/src/hats_import/margin_cache/margin_cache_resume_plan.py index f1239d23..e17bdd0c 100644 --- a/src/hats_import/margin_cache/margin_cache_resume_plan.py +++ b/src/hats_import/margin_cache/margin_cache_resume_plan.py @@ -25,6 +25,7 @@ class MarginCachePlan(PipelineResumePlan): MAPPING_STAGE = "mapping" REDUCING_STAGE = "reducing" MARGIN_PAIR_FILE = "margin_pair.csv" + MAPPING_TOTAL_FILE = "mapping_total" def __init__(self, args: MarginCacheArguments): if not args.tmp_path: # pragma: no cover (not reachable, but required for mypy) @@ -64,14 +65,14 @@ def _gather_plan(self, args): step_progress.update(1) @classmethod - def mapping_key_done(cls, tmp_path, mapping_key: str): + def mapping_key_done(cls, tmp_path, mapping_key: str, num_rows: int): """Mark a single mapping task as done Args: tmp_path (str): where to write intermediate resume files. mapping_key (str): unique string for each mapping task (e.g. "map_1_24") """ - cls.touch_key_done_file(tmp_path, cls.MAPPING_STAGE, mapping_key) + cls.write_marker_file(tmp_path, cls.MAPPING_STAGE, mapping_key, str(num_rows)) def wait_for_mapping(self, futures): """Wait for mapping stage futures to complete.""" @@ -87,6 +88,22 @@ def is_mapping_done(self) -> bool: """Are there sources left to count?""" return self.done_file_exists(self.MAPPING_STAGE) + def get_mapping_total(self) -> int: + """Find the total number of rows, based on the intermediate markers.""" + if not self.is_mapping_done(): + raise ValueError("mapping stage is not done yet.") + + total_marker_file = file_io.append_paths_to_pointer(self.tmp_path, self.MAPPING_TOTAL_FILE) + + if file_io.does_file_or_directory_exist(total_marker_file): + marker_value = file_io.load_text_file(total_marker_file) + return _marker_value_to_int(marker_value) + + markers = self.read_markers(self.MAPPING_STAGE) + total_marker_value = sum(_marker_value_to_int(value) for value in markers.values()) + file_io.write_string_to_file(total_marker_file, str(total_marker_value)) + return total_marker_value + def get_remaining_map_keys(self): """Fetch a tuple for each pixel/partition left to map.""" map_keys = set(self.read_done_keys(self.MAPPING_STAGE)) @@ -163,3 +180,10 @@ def _find_partition_margin_pixel_pairs(combined_pixels, margin_order): columns=["partition_order", "partition_pixel", "margin_pixel"], ).sort_values("margin_pixel") return margin_pairs_df + + +def _marker_value_to_int(marker_value: List[str]) -> int: + """Convenience method to parse the contents of a marker file.""" + if len(marker_value) != 1: + raise ValueError("Marker file should contain only one integer value.") + return int(marker_value[0]) diff --git a/src/hats_import/pipeline_resume_plan.py b/src/hats_import/pipeline_resume_plan.py index 9e49b15f..9664cb5e 100644 --- a/src/hats_import/pipeline_resume_plan.py +++ b/src/hats_import/pipeline_resume_plan.py @@ -85,12 +85,46 @@ def touch_key_done_file(cls, tmp_path, stage_name, key): """Touch (create) a done file for a single key, within a pipeline stage. Args: - stage_name(str): name of the stage (e.g. mapping, reducing) + tmp_path: where to write pipeline intermediate files stage_name(str): name of the stage (e.g. mapping, reducing) key (str): unique string for each task (e.g. "map_57") """ Path(file_io.append_paths_to_pointer(tmp_path, stage_name, f"{key}_done")).touch() + @classmethod + def write_marker_file(cls, tmp_path, stage_name, key, value): + """Create a marker file for a single key, within a pipeline stage. + + Similar to a "done" file, but contains some value inside the file to be read later. + + Args: + tmp_path: where to write pipeline intermediate files + stage_name(str): name of the stage (e.g. mapping, reducing) + key (str): unique string for each task (e.g. "map_57") + value (str): value for the marker. + """ + file_io.write_string_to_file( + file_io.append_paths_to_pointer(tmp_path, stage_name, f"{key}_done"), value + ) + + def read_markers(self, stage_name): + """Inspect the stage's directory of marker files, fetching the key value pairs + from marker file names and contents. + + Args: + stage_name(str): name of the stage (e.g. mapping, reducing) + Return: + List[str] - all keys found in done directory + """ + prefix = file_io.append_paths_to_pointer(self.tmp_path, stage_name) + result = {} + result_files = file_io.find_files_matching_path(prefix, "*_done") + for file_path in result_files: + match = re.match(r"(.*)_done", str(file_path.name)) + key = match.group(1) + result[key] = file_io.load_text_file(file_path) + return result + def read_done_keys(self, stage_name): """Inspect the stage's directory of done files, fetching the keys from done file names. diff --git a/tests/data/markers/mapping/map_001_done b/tests/data/markers/mapping/map_001_done new file mode 100644 index 00000000..7d373862 --- /dev/null +++ b/tests/data/markers/mapping/map_001_done @@ -0,0 +1 @@ +45 \ No newline at end of file diff --git a/tests/data/markers/mapping/map_002_done b/tests/data/markers/mapping/map_002_done new file mode 100644 index 00000000..d95ced81 --- /dev/null +++ b/tests/data/markers/mapping/map_002_done @@ -0,0 +1 @@ +zippy \ No newline at end of file diff --git a/tests/hats_import/margin_cache/test_margin_cache.py b/tests/hats_import/margin_cache/test_margin_cache.py index ebe11558..ab8c8853 100644 --- a/tests/hats_import/margin_cache/test_margin_cache.py +++ b/tests/hats_import/margin_cache/test_margin_cache.py @@ -97,3 +97,19 @@ def test_margin_cache_gen_negative_pixels(small_sky_source_catalog, tmp_path, da negative_data = pd.read_parquet(negative_test_file) assert len(negative_data) > 0 + + +@pytest.mark.dask(timeout=150) +def test_margin_too_small(small_sky_object_catalog, tmp_path, dask_client): + """Test that margin cache generation works end to end.""" + args = MarginCacheArguments( + margin_threshold=10.0, + input_catalog_path=small_sky_object_catalog, + output_path=tmp_path, + output_artifact_name="catalog_cache", + margin_order=8, + progress_bar=False, + ) + + with pytest.raises(ValueError, match="Margin cache contains no rows"): + mc.generate_margin_cache(args, dask_client) diff --git a/tests/hats_import/margin_cache/test_margin_cache_resume_plan.py b/tests/hats_import/margin_cache/test_margin_cache_resume_plan.py index f9b3a3a2..a535f43c 100644 --- a/tests/hats_import/margin_cache/test_margin_cache_resume_plan.py +++ b/tests/hats_import/margin_cache/test_margin_cache_resume_plan.py @@ -99,6 +99,25 @@ def test_some_reducing_task_failures(small_sky_margin_args, dask_client): plan.wait_for_reducing(futures) +def test_mapping_total(small_sky_margin_args): + plan = MarginCachePlan(small_sky_margin_args) + + MarginCachePlan.mapping_key_done(plan.tmp_path, "map_001", 45) + MarginCachePlan.mapping_key_done(plan.tmp_path, "map_002", 9) + plan.touch_stage_done_file(MarginCachePlan.MAPPING_STAGE) + + markers = plan.read_markers("mapping") + assert markers == {"map_001": ["45"], "map_002": ["9"]} + + mapping_total = plan.get_mapping_total() + assert mapping_total == 54 + + # We'll just return the previously-computed total + MarginCachePlan.mapping_key_done(plan.tmp_path, "map_002", 10) + mapping_total = plan.get_mapping_total() + assert mapping_total == 54 + + def test_partition_margin_pixel_pairs(small_sky_source_catalog): """Ensure partition_margin_pixel_pairs can generate main partition pixels.""" source_catalog = Catalog.read_hats(small_sky_source_catalog) diff --git a/tests/hats_import/test_pipeline_resume_plan.py b/tests/hats_import/test_pipeline_resume_plan.py index b1bfeac7..ab74db56 100644 --- a/tests/hats_import/test_pipeline_resume_plan.py +++ b/tests/hats_import/test_pipeline_resume_plan.py @@ -185,3 +185,9 @@ def test_check_original_input_paths(tmp_path, mixed_schema_csv_dir): round_trip_files = plan.check_original_input_paths(checked_files) npt.assert_array_equal(checked_files, round_trip_files) + + +def test_read_markers(test_data_dir): + plan = PipelineResumePlan(tmp_path=test_data_dir / "markers", progress_bar=False) + markers = plan.read_markers("mapping") + assert markers == {"map_001": ["45"], "map_002": ["zippy"]} From be64d688fb8c7486450e549ca7dcb6ea3b83b6da Mon Sep 17 00:00:00 2001 From: Melissa DeLucchi <113376043+delucchi-cmu@users.noreply.github.com> Date: Tue, 22 Oct 2024 15:32:17 -0400 Subject: [PATCH 2/3] Apply suggestions from code review Co-authored-by: Derek T. Jones --- src/hats_import/margin_cache/margin_cache_resume_plan.py | 2 +- src/hats_import/pipeline_resume_plan.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/hats_import/margin_cache/margin_cache_resume_plan.py b/src/hats_import/margin_cache/margin_cache_resume_plan.py index e17bdd0c..a26608b4 100644 --- a/src/hats_import/margin_cache/margin_cache_resume_plan.py +++ b/src/hats_import/margin_cache/margin_cache_resume_plan.py @@ -182,7 +182,7 @@ def _find_partition_margin_pixel_pairs(combined_pixels, margin_order): return margin_pairs_df -def _marker_value_to_int(marker_value: List[str]) -> int: +def _marker_value_to_int(marker_value: list[str]) -> int: """Convenience method to parse the contents of a marker file.""" if len(marker_value) != 1: raise ValueError("Marker file should contain only one integer value.") diff --git a/src/hats_import/pipeline_resume_plan.py b/src/hats_import/pipeline_resume_plan.py index 9664cb5e..159b71c6 100644 --- a/src/hats_import/pipeline_resume_plan.py +++ b/src/hats_import/pipeline_resume_plan.py @@ -107,14 +107,14 @@ def write_marker_file(cls, tmp_path, stage_name, key, value): file_io.append_paths_to_pointer(tmp_path, stage_name, f"{key}_done"), value ) - def read_markers(self, stage_name): + def read_markers(self, stage_name: str) -> list[str]: """Inspect the stage's directory of marker files, fetching the key value pairs from marker file names and contents. Args: stage_name(str): name of the stage (e.g. mapping, reducing) Return: - List[str] - all keys found in done directory + list[str] - all keys found in done directory """ prefix = file_io.append_paths_to_pointer(self.tmp_path, stage_name) result = {} From ace36477a5b0fe70ca2ff1d26e0bea9c783d9178 Mon Sep 17 00:00:00 2001 From: Melissa DeLucchi Date: Wed, 23 Oct 2024 08:48:07 -0400 Subject: [PATCH 3/3] Mypy issue --- src/hats_import/pipeline_resume_plan.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/hats_import/pipeline_resume_plan.py b/src/hats_import/pipeline_resume_plan.py index 159b71c6..d7ded581 100644 --- a/src/hats_import/pipeline_resume_plan.py +++ b/src/hats_import/pipeline_resume_plan.py @@ -107,7 +107,7 @@ def write_marker_file(cls, tmp_path, stage_name, key, value): file_io.append_paths_to_pointer(tmp_path, stage_name, f"{key}_done"), value ) - def read_markers(self, stage_name: str) -> list[str]: + def read_markers(self, stage_name: str) -> dict[str, list[str]]: """Inspect the stage's directory of marker files, fetching the key value pairs from marker file names and contents. @@ -121,6 +121,8 @@ def read_markers(self, stage_name: str) -> list[str]: result_files = file_io.find_files_matching_path(prefix, "*_done") for file_path in result_files: match = re.match(r"(.*)_done", str(file_path.name)) + if not match: + raise ValueError(f"Unexpected file found: {file_path.name}") key = match.group(1) result[key] = file_io.load_text_file(file_path) return result