From 1525da247fb213066463f8961e556e6430b497c9 Mon Sep 17 00:00:00 2001 From: delucchi-cmu Date: Wed, 16 Aug 2023 23:41:57 -0400 Subject: [PATCH] Invert default on resume check. --- src/hipscat_import/catalog/arguments.py | 2 +- src/hipscat_import/pipeline_resume_plan.py | 10 ++++++---- src/hipscat_import/soap/arguments.py | 2 +- .../hipscat_import/catalog/test_run_import.py | 19 +------------------ tests/hipscat_import/soap/conftest.py | 1 - .../test_pipeline_resume_plan.py | 2 +- 6 files changed, 10 insertions(+), 26 deletions(-) diff --git a/src/hipscat_import/catalog/arguments.py b/src/hipscat_import/catalog/arguments.py index 41b0c576..6dda8ddd 100644 --- a/src/hipscat_import/catalog/arguments.py +++ b/src/hipscat_import/catalog/arguments.py @@ -47,7 +47,7 @@ class ImportArguments(RuntimeArguments): use_schema_file: str | None = None """path to a parquet file with schema metadata. this will be used for column metadata when writing the files, if specified""" - resume: bool = False + resume: bool = True """if there are existing intermediate resume files, should we read those and continue to create a new catalog where we left off""" constant_healpix_order: int = -1 diff --git a/src/hipscat_import/pipeline_resume_plan.py b/src/hipscat_import/pipeline_resume_plan.py index 747e9c40..192ccbd7 100644 --- a/src/hipscat_import/pipeline_resume_plan.py +++ b/src/hipscat_import/pipeline_resume_plan.py @@ -18,7 +18,7 @@ class PipelineResumePlan: tmp_path: FilePointer """path for any intermediate files""" - resume: bool = False + resume: bool = True """if there are existing intermediate resume files, should we read those and continue to run pipeline where we left off""" progress_bar: bool = True @@ -31,13 +31,15 @@ def safe_to_resume(self): Raises: ValueError: if the tmp_path already exists and contains some files. """ - if not self.resume: - if file_io.directory_has_contents(self.tmp_path): + if file_io.directory_has_contents(self.tmp_path): + if not self.resume: raise ValueError( f"tmp_path ({self.tmp_path}) contains intermediate files." " choose a different directory or use --resume flag" ) - file_io.make_directory(self.tmp_path, exist_ok=True) + print(f"tmp_path ({self.tmp_path}) contains intermediate files. resuming prior progress.") + else: + file_io.make_directory(self.tmp_path, exist_ok=True) def done_file_exists(self, stage_name): """Is there a file at a given path? diff --git a/src/hipscat_import/soap/arguments.py b/src/hipscat_import/soap/arguments.py index 2c5c6a39..aa57f245 100644 --- a/src/hipscat_import/soap/arguments.py +++ b/src/hipscat_import/soap/arguments.py @@ -18,7 +18,7 @@ class SoapArguments(RuntimeArguments): source_catalog_dir: str = "" source_object_id_column: str = "" - resume: bool = False + resume: bool = True """if there are existing intermediate resume files, should we read those and continue to run the pipeline where we left off""" diff --git a/tests/hipscat_import/catalog/test_run_import.py b/tests/hipscat_import/catalog/test_run_import.py index 98cfd162..be5b7cb6 100644 --- a/tests/hipscat_import/catalog/test_run_import.py +++ b/tests/hipscat_import/catalog/test_run_import.py @@ -41,7 +41,7 @@ def test_resume_dask_runner( os.path.join(resume_dir, "intermediate"), temp_path, ) - plan = ResumePlan(tmp_path=temp_path, progress_bar=False, resume=True) + plan = ResumePlan(tmp_path=temp_path, progress_bar=False) histogram = hist.empty_histogram(0) histogram[11] = 131 empty = hist.empty_histogram(0) @@ -59,21 +59,6 @@ def test_resume_dask_runner( os.path.join(tmp_path, "resume", "Norder=0"), ) - with pytest.raises(ValueError, match="resume"): - ## Check that we fail if there are some existing intermediate files - ImportArguments( - output_catalog_name="resume", - input_path=small_sky_parts_dir, - input_format="csv", - output_path=tmp_path, - dask_tmp=tmp_path, - tmp_dir=tmp_path, - overwrite=True, - highest_healpix_order=0, - pixel_threshold=1000, - progress_bar=False, - ) - args = ImportArguments( output_catalog_name="resume", input_path=small_sky_parts_dir, @@ -82,7 +67,6 @@ def test_resume_dask_runner( dask_tmp=tmp_path, tmp_dir=tmp_path, overwrite=True, - resume=True, highest_healpix_order=0, pixel_threshold=1000, progress_bar=False, @@ -124,7 +108,6 @@ def test_resume_dask_runner( dask_tmp=tmp_path, tmp_dir=tmp_path, overwrite=True, - resume=True, highest_healpix_order=0, pixel_threshold=1000, progress_bar=False, diff --git a/tests/hipscat_import/soap/conftest.py b/tests/hipscat_import/soap/conftest.py index d76ccd32..205b2e48 100644 --- a/tests/hipscat_import/soap/conftest.py +++ b/tests/hipscat_import/soap/conftest.py @@ -37,7 +37,6 @@ def small_sky_soap_args(small_sky_object_catalog, small_sky_source_catalog, tmp_ source_object_id_column="object_id", output_catalog_name="small_sky_association", output_path=tmp_path, - resume=True, overwrite=True, progress_bar=False, ) diff --git a/tests/hipscat_import/test_pipeline_resume_plan.py b/tests/hipscat_import/test_pipeline_resume_plan.py index 7b18f023..76cce377 100644 --- a/tests/hipscat_import/test_pipeline_resume_plan.py +++ b/tests/hipscat_import/test_pipeline_resume_plan.py @@ -53,7 +53,7 @@ def test_done_file(tmp_path): def test_safe_to_resume(tmp_path): """Check that we throw errors when it's not safe to resume.""" - plan = PipelineResumePlan(tmp_path=tmp_path, progress_bar=False) + plan = PipelineResumePlan(tmp_path=tmp_path, progress_bar=False, resume=False) plan.safe_to_resume() ## check is idempotent - intermediate directory exists but does not