From bc64f9b9237008d98cc9c63a0c51b91d512a0050 Mon Sep 17 00:00:00 2001 From: Sandro Campos Date: Thu, 22 Aug 2024 11:29:06 -0400 Subject: [PATCH 1/2] Remove temp folder if outside output dir --- src/hipscat_import/catalog/resume_plan.py | 10 ++ .../margin_cache/margin_cache_resume_plan.py | 3 + src/hipscat_import/pipeline_resume_plan.py | 23 ++++- src/hipscat_import/runtime_arguments.py | 8 +- src/hipscat_import/soap/resume_plan.py | 3 + .../catalog/test_run_round_trip.py | 94 +++++++++++++++++-- 6 files changed, 124 insertions(+), 17 deletions(-) diff --git a/src/hipscat_import/catalog/resume_plan.py b/src/hipscat_import/catalog/resume_plan.py index 1c18873..0f8a668 100644 --- a/src/hipscat_import/catalog/resume_plan.py +++ b/src/hipscat_import/catalog/resume_plan.py @@ -41,6 +41,7 @@ class ResumePlan(PipelineResumePlan): HISTOGRAMS_DIR = "histograms" ALIGNMENT_FILE = "alignment.pickle" + # pylint: disable=too-many-arguments def __init__( self, resume: bool = True, @@ -48,7 +49,10 @@ def __init__( simple_progress_bar: bool = False, input_paths=None, tmp_path=None, + tmp_base_path: FilePointer | None = None, delete_resume_log_files: bool = True, + delete_intermediate_parquet_files: bool = True, + output_storage_options: dict | None = None, run_stages: List[str] | None = None, import_args=None, ): @@ -58,7 +62,10 @@ def __init__( progress_bar=import_args.progress_bar, simple_progress_bar=import_args.simple_progress_bar, tmp_path=import_args.resume_tmp, + tmp_base_path=import_args.tmp_base_path, delete_resume_log_files=import_args.delete_resume_log_files, + delete_intermediate_parquet_files=import_args.delete_intermediate_parquet_files, + output_storage_options=import_args.output_storage_options, ) if import_args.debug_stats_only: run_stages = ["mapping", "finishing"] @@ -69,7 +76,10 @@ def __init__( progress_bar=progress_bar, simple_progress_bar=simple_progress_bar, tmp_path=tmp_path, + tmp_base_path=tmp_base_path, delete_resume_log_files=delete_resume_log_files, + delete_intermediate_parquet_files=delete_intermediate_parquet_files, + output_storage_options=output_storage_options, ) self.input_paths = input_paths self.gather_plan(run_stages) diff --git a/src/hipscat_import/margin_cache/margin_cache_resume_plan.py b/src/hipscat_import/margin_cache/margin_cache_resume_plan.py index aa0fc68..f8105c0 100644 --- a/src/hipscat_import/margin_cache/margin_cache_resume_plan.py +++ b/src/hipscat_import/margin_cache/margin_cache_resume_plan.py @@ -34,7 +34,10 @@ def __init__(self, args: MarginCacheArguments): progress_bar=args.progress_bar, simple_progress_bar=args.simple_progress_bar, tmp_path=args.tmp_path, + tmp_base_path=args.tmp_base_path, delete_resume_log_files=args.delete_resume_log_files, + delete_intermediate_parquet_files=args.delete_intermediate_parquet_files, + output_storage_options=args.output_storage_options, ) self._gather_plan(args) diff --git a/src/hipscat_import/pipeline_resume_plan.py b/src/hipscat_import/pipeline_resume_plan.py index 8b271ba..833930f 100644 --- a/src/hipscat_import/pipeline_resume_plan.py +++ b/src/hipscat_import/pipeline_resume_plan.py @@ -20,6 +20,10 @@ class PipelineResumePlan: tmp_path: FilePointer """path for any intermediate files""" + tmp_base_path: FilePointer | None = None + """temporary base directory: either `tmp_dir` or `dask_dir`, if those were provided by the user""" + output_storage_options: dict | None = None + """optional dictionary of abstract filesystem credentials for the output.""" resume: bool = True """if there are existing intermediate resume files, should we read those and continue to run pipeline where we left off""" @@ -34,6 +38,9 @@ class PipelineResumePlan: """should we delete task-level done files once each stage is complete? if False, we will keep all sub-histograms from the mapping stage, and all done marker files at the end of the pipeline.""" + delete_intermediate_parquet_files: bool = True + """should we delete the smaller intermediate parquet files generated in the + mapping stage, once the relevant reducing stage is complete?""" ORIGINAL_INPUT_PATHS = "input_paths.txt" @@ -45,10 +52,12 @@ def safe_to_resume(self): """ if file_io.directory_has_contents(self.tmp_path): if not self.resume: - self.clean_resume_files() + file_io.remove_directory( + self.tmp_path, ignore_errors=True, storage_options=self.output_storage_options + ) else: print(f"tmp_path ({self.tmp_path}) contains intermediate files; resuming prior progress.") - file_io.make_directory(self.tmp_path, exist_ok=True) + file_io.make_directory(self.tmp_path, exist_ok=True, storage_options=self.output_storage_options) def done_file_exists(self, stage_name): """Is there a file at a given path? @@ -117,9 +126,13 @@ def get_keys_from_file_names(directory, extension): return keys def clean_resume_files(self): - """Remove all intermediate files created in execution.""" - if self.delete_resume_log_files: - file_io.remove_directory(self.tmp_path, ignore_errors=True) + """Remove the intermediate directory created in execution if the user decided + to erase all the stage log files as well as intermediate parquet.""" + if self.delete_resume_log_files and self.delete_intermediate_parquet_files: + # Use the temporary directory base path if the user provided it, or + # delete the intermediate directory from inside the output directory + path = self.tmp_base_path if self.tmp_base_path is not None else self.tmp_path + file_io.remove_directory(path, ignore_errors=True, storage_options=self.output_storage_options) def wait_for_futures(self, futures, stage_name, fail_fast=False): """Wait for collected futures to complete. diff --git a/src/hipscat_import/runtime_arguments.py b/src/hipscat_import/runtime_arguments.py index b8d1c81..b9d54a5 100644 --- a/src/hipscat_import/runtime_arguments.py +++ b/src/hipscat_import/runtime_arguments.py @@ -58,6 +58,8 @@ class RuntimeArguments: tmp_path: FilePointer | None = None """constructed temp path - defaults to tmp_dir, then dask_tmp, but will create a new temp directory under catalog_path if no other options are provided""" + tmp_base_path: FilePointer | None = None + """temporary base directory: either `tmp_dir` or `dask_dir`, if those were provided by the user""" def __post_init__(self): self._check_arguments() @@ -82,18 +84,20 @@ def _check_arguments(self): ) file_io.make_directory(self.catalog_path, exist_ok=True, storage_options=self.output_storage_options) - if self.tmp_dir: + if self.tmp_dir and str(self.tmp_dir) != str(self.output_path): if not file_io.does_file_or_directory_exist(self.tmp_dir): raise FileNotFoundError(f"tmp_dir ({self.tmp_dir}) not found on local storage") self.tmp_path = file_io.append_paths_to_pointer( self.tmp_dir, self.output_artifact_name, "intermediate" ) - elif self.dask_tmp: + self.tmp_base_path = self.tmp_dir + elif self.dask_tmp and str(self.dask_tmp) != str(self.output_path): if not file_io.does_file_or_directory_exist(self.dask_tmp): raise FileNotFoundError(f"dask_tmp ({self.dask_tmp}) not found on local storage") self.tmp_path = file_io.append_paths_to_pointer( self.dask_tmp, self.output_artifact_name, "intermediate" ) + self.tmp_base_path = self.dask_tmp else: self.tmp_path = file_io.append_paths_to_pointer(self.catalog_path, "intermediate") file_io.make_directory(self.tmp_path, exist_ok=True, storage_options=self.output_storage_options) diff --git a/src/hipscat_import/soap/resume_plan.py b/src/hipscat_import/soap/resume_plan.py index c1b7e97..18e98ec 100644 --- a/src/hipscat_import/soap/resume_plan.py +++ b/src/hipscat_import/soap/resume_plan.py @@ -40,7 +40,10 @@ def __init__(self, args: SoapArguments): progress_bar=args.progress_bar, simple_progress_bar=args.simple_progress_bar, tmp_path=args.tmp_path, + tmp_base_path=args.tmp_base_path, delete_resume_log_files=args.delete_resume_log_files, + delete_intermediate_parquet_files=args.delete_intermediate_parquet_files, + output_storage_options=args.output_storage_options, ) self.gather_plan(args) diff --git a/tests/hipscat_import/catalog/test_run_round_trip.py b/tests/hipscat_import/catalog/test_run_round_trip.py index 3680d34..b488f32 100644 --- a/tests/hipscat_import/catalog/test_run_round_trip.py +++ b/tests/hipscat_import/catalog/test_run_round_trip.py @@ -244,13 +244,6 @@ def test_import_constant_healpix_order( assert np.logical_and(ids >= 700, ids < 832).all() -def assert_directory_contains(dir_name, expected_contents): - assert os.path.exists(dir_name) - actual_contents = os.listdir(dir_name) - actual_contents.sort() - npt.assert_array_equal(actual_contents, expected_contents) - - @pytest.mark.dask def test_import_keep_intermediate_files( dask_client, @@ -274,7 +267,6 @@ def test_import_keep_intermediate_files( delete_intermediate_parquet_files=False, delete_resume_log_files=False, ) - runner.run(args, dask_client) # Check that the catalog metadata file exists @@ -282,8 +274,80 @@ def test_import_keep_intermediate_files( assert catalog.on_disk assert catalog.catalog_path == args.catalog_path - ## Check that stage-level done files are still around. + # Check that both stage level and intermediate parquet files exist + base_intermediate_dir = temp / "small_sky_object_catalog" / "intermediate" + assert_stage_level_files_exist(base_intermediate_dir) + assert_intermediate_parquet_files_exist(base_intermediate_dir) + + +@pytest.mark.dask +def test_import_delete_provided_temp_directory( + dask_client, + small_sky_parts_dir, + tmp_path_factory, +): + """Test that ALL intermediate files (and temporary base directory) are deleted + after successful import, when both `delete_intermediate_parquet_files` and + `delete_resume_log_files` are set to True.""" + output_dir = tmp_path_factory.mktemp("small_sky_object_catalog") + # Provided temporary directory, outside `output_dir` + temp = tmp_path_factory.mktemp("intermediate_files") base_intermediate_dir = temp / "small_sky_object_catalog" / "intermediate" + + # When at least one of the delete flags is set to False we do + # not delete the provided temporary base directory. + args = ImportArguments( + output_artifact_name="small_sky_object_catalog", + input_path=small_sky_parts_dir, + file_reader="csv", + output_path=output_dir, + tmp_path=temp, + dask_tmp=temp, + progress_bar=False, + highest_healpix_order=2, + delete_intermediate_parquet_files=True, + delete_resume_log_files=False, + ) + runner.run(args, dask_client) + assert_stage_level_files_exist(base_intermediate_dir) + + args = ImportArguments( + output_artifact_name="small_sky_object_catalog", + input_path=small_sky_parts_dir, + file_reader="csv", + output_path=output_dir, + tmp_path=temp, + dask_tmp=temp, + progress_bar=False, + highest_healpix_order=2, + delete_intermediate_parquet_files=False, + delete_resume_log_files=True, + resume=False, + ) + runner.run(args, dask_client) + assert_intermediate_parquet_files_exist(base_intermediate_dir) + + # The temporary directory is deleted. + args = ImportArguments( + output_artifact_name="small_sky_object_catalog", + input_path=small_sky_parts_dir, + file_reader="csv", + output_path=output_dir, + tmp_path=temp, + dask_tmp=temp, + progress_bar=False, + highest_healpix_order=2, + delete_intermediate_parquet_files=True, + delete_resume_log_files=True, + resume=False, + ) + runner.run(args, dask_client) + assert not os.path.exists(temp) + + +def assert_stage_level_files_exist(base_intermediate_dir): + # Check that stage-level done files are still around for the import of + # `small_sky_object_catalog` at order 0. expected_contents = [ "alignment.pickle", "histograms", # directory containing sub-histograms @@ -312,7 +376,10 @@ def test_import_keep_intermediate_files( checking_dir = base_intermediate_dir / "reducing" assert_directory_contains(checking_dir, ["0_11_done"]) - # Check that all of the intermediate parquet shards are still around. + +def assert_intermediate_parquet_files_exist(base_intermediate_dir): + # Check that all the intermediate parquet shards are still around for the + # import of `small_sky_object_catalog` at order 0. checking_dir = base_intermediate_dir / "order_0" / "dir_0" / "pixel_11" assert_directory_contains( checking_dir, @@ -326,6 +393,13 @@ def test_import_keep_intermediate_files( ) +def assert_directory_contains(dir_name, expected_contents): + assert os.path.exists(dir_name) + actual_contents = os.listdir(dir_name) + actual_contents.sort() + npt.assert_array_equal(actual_contents, expected_contents) + + @pytest.mark.dask def test_import_lowest_healpix_order( dask_client, From 42a9877a6c00715769a5244cf8774764c5265583 Mon Sep 17 00:00:00 2001 From: Sandro Campos Date: Mon, 26 Aug 2024 11:09:21 -0400 Subject: [PATCH 2/2] Update src/hipscat_import/pipeline_resume_plan.py Co-authored-by: Melissa DeLucchi <113376043+delucchi-cmu@users.noreply.github.com> --- src/hipscat_import/pipeline_resume_plan.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/hipscat_import/pipeline_resume_plan.py b/src/hipscat_import/pipeline_resume_plan.py index 833930f..5216155 100644 --- a/src/hipscat_import/pipeline_resume_plan.py +++ b/src/hipscat_import/pipeline_resume_plan.py @@ -39,8 +39,8 @@ class PipelineResumePlan: if False, we will keep all sub-histograms from the mapping stage, and all done marker files at the end of the pipeline.""" delete_intermediate_parquet_files: bool = True - """should we delete the smaller intermediate parquet files generated in the - mapping stage, once the relevant reducing stage is complete?""" + """should we delete any smaller intermediate parquet files that may be + generated by the pipeline?""" ORIGINAL_INPUT_PATHS = "input_paths.txt"