Skip to content

Commit

Permalink
Merge pull request #382 from astronomy-commons/issue/298/erase-empty-…
Browse files Browse the repository at this point in the history
…temp-dir

Remove temporary directory when all intermediate files are deleted
  • Loading branch information
camposandro authored Aug 26, 2024
2 parents a4ab612 + 42a9877 commit c61c1a7
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 17 deletions.
10 changes: 10 additions & 0 deletions src/hipscat_import/catalog/resume_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,18 @@ class ResumePlan(PipelineResumePlan):
HISTOGRAMS_DIR = "histograms"
ALIGNMENT_FILE = "alignment.pickle"

# pylint: disable=too-many-arguments
def __init__(
self,
resume: bool = True,
progress_bar: bool = True,
simple_progress_bar: bool = False,
input_paths=None,
tmp_path=None,
tmp_base_path: FilePointer | None = None,
delete_resume_log_files: bool = True,
delete_intermediate_parquet_files: bool = True,
output_storage_options: dict | None = None,
run_stages: List[str] | None = None,
import_args=None,
):
Expand All @@ -58,7 +62,10 @@ def __init__(
progress_bar=import_args.progress_bar,
simple_progress_bar=import_args.simple_progress_bar,
tmp_path=import_args.resume_tmp,
tmp_base_path=import_args.tmp_base_path,
delete_resume_log_files=import_args.delete_resume_log_files,
delete_intermediate_parquet_files=import_args.delete_intermediate_parquet_files,
output_storage_options=import_args.output_storage_options,
)
if import_args.debug_stats_only:
run_stages = ["mapping", "finishing"]
Expand All @@ -69,7 +76,10 @@ def __init__(
progress_bar=progress_bar,
simple_progress_bar=simple_progress_bar,
tmp_path=tmp_path,
tmp_base_path=tmp_base_path,
delete_resume_log_files=delete_resume_log_files,
delete_intermediate_parquet_files=delete_intermediate_parquet_files,
output_storage_options=output_storage_options,
)
self.input_paths = input_paths
self.gather_plan(run_stages)
Expand Down
3 changes: 3 additions & 0 deletions src/hipscat_import/margin_cache/margin_cache_resume_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ def __init__(self, args: MarginCacheArguments):
progress_bar=args.progress_bar,
simple_progress_bar=args.simple_progress_bar,
tmp_path=args.tmp_path,
tmp_base_path=args.tmp_base_path,
delete_resume_log_files=args.delete_resume_log_files,
delete_intermediate_parquet_files=args.delete_intermediate_parquet_files,
output_storage_options=args.output_storage_options,
)
self._gather_plan(args)

Expand Down
23 changes: 18 additions & 5 deletions src/hipscat_import/pipeline_resume_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ class PipelineResumePlan:

tmp_path: FilePointer
"""path for any intermediate files"""
tmp_base_path: FilePointer | None = None
"""temporary base directory: either `tmp_dir` or `dask_dir`, if those were provided by the user"""
output_storage_options: dict | None = None
"""optional dictionary of abstract filesystem credentials for the output."""
resume: bool = True
"""if there are existing intermediate resume files, should we
read those and continue to run pipeline where we left off"""
Expand All @@ -34,6 +38,9 @@ class PipelineResumePlan:
"""should we delete task-level done files once each stage is complete?
if False, we will keep all sub-histograms from the mapping stage, and all
done marker files at the end of the pipeline."""
delete_intermediate_parquet_files: bool = True
"""should we delete any smaller intermediate parquet files that may be
generated by the pipeline?"""

ORIGINAL_INPUT_PATHS = "input_paths.txt"

Expand All @@ -45,10 +52,12 @@ def safe_to_resume(self):
"""
if file_io.directory_has_contents(self.tmp_path):
if not self.resume:
self.clean_resume_files()
file_io.remove_directory(
self.tmp_path, ignore_errors=True, storage_options=self.output_storage_options
)
else:
print(f"tmp_path ({self.tmp_path}) contains intermediate files; resuming prior progress.")
file_io.make_directory(self.tmp_path, exist_ok=True)
file_io.make_directory(self.tmp_path, exist_ok=True, storage_options=self.output_storage_options)

def done_file_exists(self, stage_name):
"""Is there a file at a given path?
Expand Down Expand Up @@ -117,9 +126,13 @@ def get_keys_from_file_names(directory, extension):
return keys

def clean_resume_files(self):
"""Remove all intermediate files created in execution."""
if self.delete_resume_log_files:
file_io.remove_directory(self.tmp_path, ignore_errors=True)
"""Remove the intermediate directory created in execution if the user decided
to erase all the stage log files as well as intermediate parquet."""
if self.delete_resume_log_files and self.delete_intermediate_parquet_files:
# Use the temporary directory base path if the user provided it, or
# delete the intermediate directory from inside the output directory
path = self.tmp_base_path if self.tmp_base_path is not None else self.tmp_path
file_io.remove_directory(path, ignore_errors=True, storage_options=self.output_storage_options)

def wait_for_futures(self, futures, stage_name, fail_fast=False):
"""Wait for collected futures to complete.
Expand Down
8 changes: 6 additions & 2 deletions src/hipscat_import/runtime_arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ class RuntimeArguments:
tmp_path: FilePointer | None = None
"""constructed temp path - defaults to tmp_dir, then dask_tmp, but will create
a new temp directory under catalog_path if no other options are provided"""
tmp_base_path: FilePointer | None = None
"""temporary base directory: either `tmp_dir` or `dask_dir`, if those were provided by the user"""

def __post_init__(self):
self._check_arguments()
Expand All @@ -82,18 +84,20 @@ def _check_arguments(self):
)
file_io.make_directory(self.catalog_path, exist_ok=True, storage_options=self.output_storage_options)

if self.tmp_dir:
if self.tmp_dir and str(self.tmp_dir) != str(self.output_path):
if not file_io.does_file_or_directory_exist(self.tmp_dir):
raise FileNotFoundError(f"tmp_dir ({self.tmp_dir}) not found on local storage")
self.tmp_path = file_io.append_paths_to_pointer(
self.tmp_dir, self.output_artifact_name, "intermediate"
)
elif self.dask_tmp:
self.tmp_base_path = self.tmp_dir
elif self.dask_tmp and str(self.dask_tmp) != str(self.output_path):
if not file_io.does_file_or_directory_exist(self.dask_tmp):
raise FileNotFoundError(f"dask_tmp ({self.dask_tmp}) not found on local storage")
self.tmp_path = file_io.append_paths_to_pointer(
self.dask_tmp, self.output_artifact_name, "intermediate"
)
self.tmp_base_path = self.dask_tmp
else:
self.tmp_path = file_io.append_paths_to_pointer(self.catalog_path, "intermediate")
file_io.make_directory(self.tmp_path, exist_ok=True, storage_options=self.output_storage_options)
Expand Down
3 changes: 3 additions & 0 deletions src/hipscat_import/soap/resume_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ def __init__(self, args: SoapArguments):
progress_bar=args.progress_bar,
simple_progress_bar=args.simple_progress_bar,
tmp_path=args.tmp_path,
tmp_base_path=args.tmp_base_path,
delete_resume_log_files=args.delete_resume_log_files,
delete_intermediate_parquet_files=args.delete_intermediate_parquet_files,
output_storage_options=args.output_storage_options,
)
self.gather_plan(args)

Expand Down
94 changes: 84 additions & 10 deletions tests/hipscat_import/catalog/test_run_round_trip.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,13 +244,6 @@ def test_import_constant_healpix_order(
assert np.logical_and(ids >= 700, ids < 832).all()


def assert_directory_contains(dir_name, expected_contents):
assert os.path.exists(dir_name)
actual_contents = os.listdir(dir_name)
actual_contents.sort()
npt.assert_array_equal(actual_contents, expected_contents)


@pytest.mark.dask
def test_import_keep_intermediate_files(
dask_client,
Expand All @@ -274,16 +267,87 @@ def test_import_keep_intermediate_files(
delete_intermediate_parquet_files=False,
delete_resume_log_files=False,
)

runner.run(args, dask_client)

# Check that the catalog metadata file exists
catalog = Catalog.read_from_hipscat(args.catalog_path)
assert catalog.on_disk
assert catalog.catalog_path == args.catalog_path

## Check that stage-level done files are still around.
# Check that both stage level and intermediate parquet files exist
base_intermediate_dir = temp / "small_sky_object_catalog" / "intermediate"
assert_stage_level_files_exist(base_intermediate_dir)
assert_intermediate_parquet_files_exist(base_intermediate_dir)


@pytest.mark.dask
def test_import_delete_provided_temp_directory(
dask_client,
small_sky_parts_dir,
tmp_path_factory,
):
"""Test that ALL intermediate files (and temporary base directory) are deleted
after successful import, when both `delete_intermediate_parquet_files` and
`delete_resume_log_files` are set to True."""
output_dir = tmp_path_factory.mktemp("small_sky_object_catalog")
# Provided temporary directory, outside `output_dir`
temp = tmp_path_factory.mktemp("intermediate_files")
base_intermediate_dir = temp / "small_sky_object_catalog" / "intermediate"

# When at least one of the delete flags is set to False we do
# not delete the provided temporary base directory.
args = ImportArguments(
output_artifact_name="small_sky_object_catalog",
input_path=small_sky_parts_dir,
file_reader="csv",
output_path=output_dir,
tmp_path=temp,
dask_tmp=temp,
progress_bar=False,
highest_healpix_order=2,
delete_intermediate_parquet_files=True,
delete_resume_log_files=False,
)
runner.run(args, dask_client)
assert_stage_level_files_exist(base_intermediate_dir)

args = ImportArguments(
output_artifact_name="small_sky_object_catalog",
input_path=small_sky_parts_dir,
file_reader="csv",
output_path=output_dir,
tmp_path=temp,
dask_tmp=temp,
progress_bar=False,
highest_healpix_order=2,
delete_intermediate_parquet_files=False,
delete_resume_log_files=True,
resume=False,
)
runner.run(args, dask_client)
assert_intermediate_parquet_files_exist(base_intermediate_dir)

# The temporary directory is deleted.
args = ImportArguments(
output_artifact_name="small_sky_object_catalog",
input_path=small_sky_parts_dir,
file_reader="csv",
output_path=output_dir,
tmp_path=temp,
dask_tmp=temp,
progress_bar=False,
highest_healpix_order=2,
delete_intermediate_parquet_files=True,
delete_resume_log_files=True,
resume=False,
)
runner.run(args, dask_client)
assert not os.path.exists(temp)


def assert_stage_level_files_exist(base_intermediate_dir):
# Check that stage-level done files are still around for the import of
# `small_sky_object_catalog` at order 0.
expected_contents = [
"alignment.pickle",
"histograms", # directory containing sub-histograms
Expand Down Expand Up @@ -312,7 +376,10 @@ def test_import_keep_intermediate_files(
checking_dir = base_intermediate_dir / "reducing"
assert_directory_contains(checking_dir, ["0_11_done"])

# Check that all of the intermediate parquet shards are still around.

def assert_intermediate_parquet_files_exist(base_intermediate_dir):
# Check that all the intermediate parquet shards are still around for the
# import of `small_sky_object_catalog` at order 0.
checking_dir = base_intermediate_dir / "order_0" / "dir_0" / "pixel_11"
assert_directory_contains(
checking_dir,
Expand All @@ -326,6 +393,13 @@ def test_import_keep_intermediate_files(
)


def assert_directory_contains(dir_name, expected_contents):
assert os.path.exists(dir_name)
actual_contents = os.listdir(dir_name)
actual_contents.sort()
npt.assert_array_equal(actual_contents, expected_contents)


@pytest.mark.dask
def test_import_lowest_healpix_order(
dask_client,
Expand Down

0 comments on commit c61c1a7

Please sign in to comment.