Skip to content

Commit

Permalink
Invert default on resume check.
Browse files Browse the repository at this point in the history
  • Loading branch information
delucchi-cmu committed Aug 17, 2023
1 parent 450d826 commit 1525da2
Show file tree
Hide file tree
Showing 6 changed files with 10 additions and 26 deletions.
2 changes: 1 addition & 1 deletion src/hipscat_import/catalog/arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class ImportArguments(RuntimeArguments):
use_schema_file: str | None = None
"""path to a parquet file with schema metadata. this will be used for column
metadata when writing the files, if specified"""
resume: bool = False
resume: bool = True
"""if there are existing intermediate resume files, should we
read those and continue to create a new catalog where we left off"""
constant_healpix_order: int = -1
Expand Down
10 changes: 6 additions & 4 deletions src/hipscat_import/pipeline_resume_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class PipelineResumePlan:

tmp_path: FilePointer
"""path for any intermediate files"""
resume: bool = False
resume: bool = True
"""if there are existing intermediate resume files, should we
read those and continue to run pipeline where we left off"""
progress_bar: bool = True
Expand All @@ -31,13 +31,15 @@ def safe_to_resume(self):
Raises:
ValueError: if the tmp_path already exists and contains some files.
"""
if not self.resume:
if file_io.directory_has_contents(self.tmp_path):
if file_io.directory_has_contents(self.tmp_path):
if not self.resume:
raise ValueError(
f"tmp_path ({self.tmp_path}) contains intermediate files."
" choose a different directory or use --resume flag"
)
file_io.make_directory(self.tmp_path, exist_ok=True)
print(f"tmp_path ({self.tmp_path}) contains intermediate files. resuming prior progress.")
else:
file_io.make_directory(self.tmp_path, exist_ok=True)

def done_file_exists(self, stage_name):
"""Is there a file at a given path?
Expand Down
2 changes: 1 addition & 1 deletion src/hipscat_import/soap/arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class SoapArguments(RuntimeArguments):
source_catalog_dir: str = ""
source_object_id_column: str = ""

resume: bool = False
resume: bool = True
"""if there are existing intermediate resume files, should we
read those and continue to run the pipeline where we left off"""

Expand Down
19 changes: 1 addition & 18 deletions tests/hipscat_import/catalog/test_run_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def test_resume_dask_runner(
os.path.join(resume_dir, "intermediate"),
temp_path,
)
plan = ResumePlan(tmp_path=temp_path, progress_bar=False, resume=True)
plan = ResumePlan(tmp_path=temp_path, progress_bar=False)
histogram = hist.empty_histogram(0)
histogram[11] = 131
empty = hist.empty_histogram(0)
Expand All @@ -59,21 +59,6 @@ def test_resume_dask_runner(
os.path.join(tmp_path, "resume", "Norder=0"),
)

with pytest.raises(ValueError, match="resume"):
## Check that we fail if there are some existing intermediate files
ImportArguments(
output_catalog_name="resume",
input_path=small_sky_parts_dir,
input_format="csv",
output_path=tmp_path,
dask_tmp=tmp_path,
tmp_dir=tmp_path,
overwrite=True,
highest_healpix_order=0,
pixel_threshold=1000,
progress_bar=False,
)

args = ImportArguments(
output_catalog_name="resume",
input_path=small_sky_parts_dir,
Expand All @@ -82,7 +67,6 @@ def test_resume_dask_runner(
dask_tmp=tmp_path,
tmp_dir=tmp_path,
overwrite=True,
resume=True,
highest_healpix_order=0,
pixel_threshold=1000,
progress_bar=False,
Expand Down Expand Up @@ -124,7 +108,6 @@ def test_resume_dask_runner(
dask_tmp=tmp_path,
tmp_dir=tmp_path,
overwrite=True,
resume=True,
highest_healpix_order=0,
pixel_threshold=1000,
progress_bar=False,
Expand Down
1 change: 0 additions & 1 deletion tests/hipscat_import/soap/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ def small_sky_soap_args(small_sky_object_catalog, small_sky_source_catalog, tmp_
source_object_id_column="object_id",
output_catalog_name="small_sky_association",
output_path=tmp_path,
resume=True,
overwrite=True,
progress_bar=False,
)
Expand Down
2 changes: 1 addition & 1 deletion tests/hipscat_import/test_pipeline_resume_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def test_done_file(tmp_path):

def test_safe_to_resume(tmp_path):
"""Check that we throw errors when it's not safe to resume."""
plan = PipelineResumePlan(tmp_path=tmp_path, progress_bar=False)
plan = PipelineResumePlan(tmp_path=tmp_path, progress_bar=False, resume=False)
plan.safe_to_resume()

## check is idempotent - intermediate directory exists but does not
Expand Down

0 comments on commit 1525da2

Please sign in to comment.