Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove temporary directory when all intermediate files are deleted #382

Merged
merged 2 commits into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/hipscat_import/catalog/resume_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,18 @@ class ResumePlan(PipelineResumePlan):
HISTOGRAMS_DIR = "histograms"
ALIGNMENT_FILE = "alignment.pickle"

# pylint: disable=too-many-arguments
def __init__(
self,
resume: bool = True,
progress_bar: bool = True,
simple_progress_bar: bool = False,
input_paths=None,
tmp_path=None,
tmp_base_path: FilePointer | None = None,
delete_resume_log_files: bool = True,
delete_intermediate_parquet_files: bool = True,
output_storage_options: dict | None = None,
run_stages: List[str] | None = None,
import_args=None,
):
Expand All @@ -58,7 +62,10 @@ def __init__(
progress_bar=import_args.progress_bar,
simple_progress_bar=import_args.simple_progress_bar,
tmp_path=import_args.resume_tmp,
tmp_base_path=import_args.tmp_base_path,
delete_resume_log_files=import_args.delete_resume_log_files,
delete_intermediate_parquet_files=import_args.delete_intermediate_parquet_files,
output_storage_options=import_args.output_storage_options,
)
if import_args.debug_stats_only:
run_stages = ["mapping", "finishing"]
Expand All @@ -69,7 +76,10 @@ def __init__(
progress_bar=progress_bar,
simple_progress_bar=simple_progress_bar,
tmp_path=tmp_path,
tmp_base_path=tmp_base_path,
delete_resume_log_files=delete_resume_log_files,
delete_intermediate_parquet_files=delete_intermediate_parquet_files,
output_storage_options=output_storage_options,
)
self.input_paths = input_paths
self.gather_plan(run_stages)
Expand Down
3 changes: 3 additions & 0 deletions src/hipscat_import/margin_cache/margin_cache_resume_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ def __init__(self, args: MarginCacheArguments):
progress_bar=args.progress_bar,
simple_progress_bar=args.simple_progress_bar,
tmp_path=args.tmp_path,
tmp_base_path=args.tmp_base_path,
delete_resume_log_files=args.delete_resume_log_files,
delete_intermediate_parquet_files=args.delete_intermediate_parquet_files,
output_storage_options=args.output_storage_options,
)
self._gather_plan(args)

Expand Down
23 changes: 18 additions & 5 deletions src/hipscat_import/pipeline_resume_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ class PipelineResumePlan:

tmp_path: FilePointer
"""path for any intermediate files"""
tmp_base_path: FilePointer | None = None
"""temporary base directory: either `tmp_dir` or `dask_dir`, if those were provided by the user"""
output_storage_options: dict | None = None
"""optional dictionary of abstract filesystem credentials for the output."""
resume: bool = True
"""if there are existing intermediate resume files, should we
read those and continue to run pipeline where we left off"""
Expand All @@ -34,6 +38,9 @@ class PipelineResumePlan:
"""should we delete task-level done files once each stage is complete?
if False, we will keep all sub-histograms from the mapping stage, and all
done marker files at the end of the pipeline."""
delete_intermediate_parquet_files: bool = True
"""should we delete the smaller intermediate parquet files generated in the
mapping stage, once the relevant reducing stage is complete?"""
camposandro marked this conversation as resolved.
Show resolved Hide resolved

ORIGINAL_INPUT_PATHS = "input_paths.txt"

Expand All @@ -45,10 +52,12 @@ def safe_to_resume(self):
"""
if file_io.directory_has_contents(self.tmp_path):
if not self.resume:
self.clean_resume_files()
file_io.remove_directory(
self.tmp_path, ignore_errors=True, storage_options=self.output_storage_options
)
else:
print(f"tmp_path ({self.tmp_path}) contains intermediate files; resuming prior progress.")
file_io.make_directory(self.tmp_path, exist_ok=True)
file_io.make_directory(self.tmp_path, exist_ok=True, storage_options=self.output_storage_options)

def done_file_exists(self, stage_name):
"""Is there a file at a given path?
Expand Down Expand Up @@ -117,9 +126,13 @@ def get_keys_from_file_names(directory, extension):
return keys

def clean_resume_files(self):
"""Remove all intermediate files created in execution."""
if self.delete_resume_log_files:
file_io.remove_directory(self.tmp_path, ignore_errors=True)
"""Remove the intermediate directory created in execution if the user decided
to erase all the stage log files as well as intermediate parquet."""
if self.delete_resume_log_files and self.delete_intermediate_parquet_files:
# Use the temporary directory base path if the user provided it, or
# delete the intermediate directory from inside the output directory
path = self.tmp_base_path if self.tmp_base_path is not None else self.tmp_path
file_io.remove_directory(path, ignore_errors=True, storage_options=self.output_storage_options)

def wait_for_futures(self, futures, stage_name, fail_fast=False):
"""Wait for collected futures to complete.
Expand Down
8 changes: 6 additions & 2 deletions src/hipscat_import/runtime_arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ class RuntimeArguments:
tmp_path: FilePointer | None = None
"""constructed temp path - defaults to tmp_dir, then dask_tmp, but will create
a new temp directory under catalog_path if no other options are provided"""
tmp_base_path: FilePointer | None = None
"""temporary base directory: either `tmp_dir` or `dask_dir`, if those were provided by the user"""

def __post_init__(self):
self._check_arguments()
Expand All @@ -82,18 +84,20 @@ def _check_arguments(self):
)
file_io.make_directory(self.catalog_path, exist_ok=True, storage_options=self.output_storage_options)

if self.tmp_dir:
if self.tmp_dir and str(self.tmp_dir) != str(self.output_path):
if not file_io.does_file_or_directory_exist(self.tmp_dir):
raise FileNotFoundError(f"tmp_dir ({self.tmp_dir}) not found on local storage")
self.tmp_path = file_io.append_paths_to_pointer(
self.tmp_dir, self.output_artifact_name, "intermediate"
)
elif self.dask_tmp:
self.tmp_base_path = self.tmp_dir
elif self.dask_tmp and str(self.dask_tmp) != str(self.output_path):
if not file_io.does_file_or_directory_exist(self.dask_tmp):
raise FileNotFoundError(f"dask_tmp ({self.dask_tmp}) not found on local storage")
self.tmp_path = file_io.append_paths_to_pointer(
self.dask_tmp, self.output_artifact_name, "intermediate"
)
self.tmp_base_path = self.dask_tmp
else:
self.tmp_path = file_io.append_paths_to_pointer(self.catalog_path, "intermediate")
file_io.make_directory(self.tmp_path, exist_ok=True, storage_options=self.output_storage_options)
Expand Down
3 changes: 3 additions & 0 deletions src/hipscat_import/soap/resume_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ def __init__(self, args: SoapArguments):
progress_bar=args.progress_bar,
simple_progress_bar=args.simple_progress_bar,
tmp_path=args.tmp_path,
tmp_base_path=args.tmp_base_path,
delete_resume_log_files=args.delete_resume_log_files,
delete_intermediate_parquet_files=args.delete_intermediate_parquet_files,
output_storage_options=args.output_storage_options,
)
self.gather_plan(args)

Expand Down
94 changes: 84 additions & 10 deletions tests/hipscat_import/catalog/test_run_round_trip.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,13 +244,6 @@ def test_import_constant_healpix_order(
assert np.logical_and(ids >= 700, ids < 832).all()


def assert_directory_contains(dir_name, expected_contents):
assert os.path.exists(dir_name)
actual_contents = os.listdir(dir_name)
actual_contents.sort()
npt.assert_array_equal(actual_contents, expected_contents)


@pytest.mark.dask
def test_import_keep_intermediate_files(
dask_client,
Expand All @@ -274,16 +267,87 @@ def test_import_keep_intermediate_files(
delete_intermediate_parquet_files=False,
delete_resume_log_files=False,
)

runner.run(args, dask_client)

# Check that the catalog metadata file exists
catalog = Catalog.read_from_hipscat(args.catalog_path)
assert catalog.on_disk
assert catalog.catalog_path == args.catalog_path

## Check that stage-level done files are still around.
# Check that both stage level and intermediate parquet files exist
base_intermediate_dir = temp / "small_sky_object_catalog" / "intermediate"
assert_stage_level_files_exist(base_intermediate_dir)
assert_intermediate_parquet_files_exist(base_intermediate_dir)


@pytest.mark.dask
def test_import_delete_provided_temp_directory(
dask_client,
small_sky_parts_dir,
tmp_path_factory,
):
"""Test that ALL intermediate files (and temporary base directory) are deleted
after successful import, when both `delete_intermediate_parquet_files` and
`delete_resume_log_files` are set to True."""
output_dir = tmp_path_factory.mktemp("small_sky_object_catalog")
# Provided temporary directory, outside `output_dir`
temp = tmp_path_factory.mktemp("intermediate_files")
base_intermediate_dir = temp / "small_sky_object_catalog" / "intermediate"

# When at least one of the delete flags is set to False we do
# not delete the provided temporary base directory.
args = ImportArguments(
output_artifact_name="small_sky_object_catalog",
input_path=small_sky_parts_dir,
file_reader="csv",
output_path=output_dir,
tmp_path=temp,
dask_tmp=temp,
progress_bar=False,
highest_healpix_order=2,
delete_intermediate_parquet_files=True,
delete_resume_log_files=False,
)
runner.run(args, dask_client)
assert_stage_level_files_exist(base_intermediate_dir)

args = ImportArguments(
output_artifact_name="small_sky_object_catalog",
input_path=small_sky_parts_dir,
file_reader="csv",
output_path=output_dir,
tmp_path=temp,
dask_tmp=temp,
progress_bar=False,
highest_healpix_order=2,
delete_intermediate_parquet_files=False,
delete_resume_log_files=True,
resume=False,
)
runner.run(args, dask_client)
assert_intermediate_parquet_files_exist(base_intermediate_dir)

# The temporary directory is deleted.
args = ImportArguments(
output_artifact_name="small_sky_object_catalog",
input_path=small_sky_parts_dir,
file_reader="csv",
output_path=output_dir,
tmp_path=temp,
dask_tmp=temp,
progress_bar=False,
highest_healpix_order=2,
delete_intermediate_parquet_files=True,
delete_resume_log_files=True,
resume=False,
)
runner.run(args, dask_client)
assert not os.path.exists(temp)


def assert_stage_level_files_exist(base_intermediate_dir):
# Check that stage-level done files are still around for the import of
# `small_sky_object_catalog` at order 0.
expected_contents = [
"alignment.pickle",
"histograms", # directory containing sub-histograms
Expand Down Expand Up @@ -312,7 +376,10 @@ def test_import_keep_intermediate_files(
checking_dir = base_intermediate_dir / "reducing"
assert_directory_contains(checking_dir, ["0_11_done"])

# Check that all of the intermediate parquet shards are still around.

def assert_intermediate_parquet_files_exist(base_intermediate_dir):
# Check that all the intermediate parquet shards are still around for the
# import of `small_sky_object_catalog` at order 0.
checking_dir = base_intermediate_dir / "order_0" / "dir_0" / "pixel_11"
assert_directory_contains(
checking_dir,
Expand All @@ -326,6 +393,13 @@ def test_import_keep_intermediate_files(
)


def assert_directory_contains(dir_name, expected_contents):
assert os.path.exists(dir_name)
actual_contents = os.listdir(dir_name)
actual_contents.sort()
npt.assert_array_equal(actual_contents, expected_contents)


@pytest.mark.dask
def test_import_lowest_healpix_order(
dask_client,
Expand Down