From 70572380bf05998f27ea54df3653ec357e44fc69 Mon Sep 17 00:00:00 2001 From: Julian Geiger Date: Mon, 28 Oct 2024 13:35:27 +0100 Subject: [PATCH] CLI: Dump only `sealed` process nodes (#6591) Previously, no checks were applied on the state of a process when dumping its data to disk via `verdi process dump`. Now, by default, only `sealed` processes are being dumped. This can be disabled with the new `--dump-unsealed` flag. In addition, the option of `--incremental` dumping is added, in which case an incomplete existing dumping output directory can be gradually filled up with data, e.g., while the process is running in conjunction with the `--dump-unsealed` flag. Before, only the `--overwrite` flag was available to handle the case of an already existing directory, however, it directly cleans the entire top-level directory. Lastly, the `prepare_dump_path` method (previously `validate_make_dump_path`) was refactored out of the `ProcessDumper` and put as a normal function into `utils.py` in the `tools/dumping` directory. --- docs/source/howto/data.rst | 31 +++--- src/aiida/cmdline/commands/cmd_process.py | 16 +++ src/aiida/cmdline/params/options/main.py | 13 ++- src/aiida/tools/dumping/processes.py | 59 +++-------- src/aiida/tools/dumping/utils.py | 75 ++++++++++++++ tests/cmdline/commands/test_process.py | 6 +- tests/tools/dumping/test_processes.py | 114 +++++++++++++++------- 7 files changed, 219 insertions(+), 95 deletions(-) create mode 100644 src/aiida/tools/dumping/utils.py diff --git a/docs/source/howto/data.rst b/docs/source/howto/data.rst index 33c9c33aba..19b8391db0 100644 --- a/docs/source/howto/data.rst +++ b/docs/source/howto/data.rst @@ -129,19 +129,24 @@ top-level process. Further, numbered subdirectories are created for each step of ``aiida.out`` of the ``ArithmeticAddCalculation`` are placed in ``inputs`` and ``outputs``. In addition, these also contain the submission script ``_aiidasubmit.sh``, as well as the scheduler stdout and stderr, ``_scheduler-stdout.txt`` and ``_scheduler-stderr.txt``, respectively. Lastly, the source code of the ``multiply`` ``calcfunction`` presenting the -first step of the workflow is contained in the ``source_file``. - -Upon having a closer look at the directory, we also find the hidden ``.aiida_node_metadata.yaml`` files, which are -created for every ``ProcessNode`` and contain additional information about the ``Node``, the ``User``, and the -``Computer``, as well as the ``.aiida`` subdirectory with machine-readable AiiDA-internal data in JSON format. - -Since child processes are explored recursively, arbitrarily complex, nested workflows can be dumped. As already seen -above, the ``-p`` flag allows to specify a custom dumping path. If none is provided, it is automatically generated from -the ``process_label`` (or ``process_type``) and the ``pk``. In addition, the command provides the ``-o`` flag to -overwrite existing directories, the ``-f`` flag to dump all files for each ``CalculationNode`` of the workflow in a flat -directory structure, and the ``--include-inputs/--exclude-inputs`` (``--include-outputs/--exclude-outputs``) flags to -also dump additional node inputs (outputs) of each ``CalculationNode`` of the workflow into ``node_inputs`` -(``node_outputs``) subdirectories. For a full list of available options, call :code:`verdi process dump --help`. +first step of the workflow is contained in the ``source_file``. Since child processes are explored recursively, +arbitrarily complex, nested workflows can be dumped. Upon having a closer look at the directory, we also find the hidden +``.aiida_node_metadata.yaml`` files, which are created for every ``ProcessNode`` and contain additional information +about the ``Node``, the ``User``, and the ``Computer``, as well as the ``.aiida`` subdirectory with machine-readable +AiiDA-internal data in JSON format. + +As already seen above, the ``-p`` flag allows to specify a custom dumping path. If none is provided, it is automatically +generated from the ``process_label`` (or ``process_type``) and the ``pk``. In addition, the command provides the +``-o/--overwrite`` flag to fully overwrite an existing dumping directory, as well as the ``--incremental`` flag, with +which files are gradually added to an existing directory (this is the default behavior). By default, only sealed process +nodes can be dumped, however, the behavior can be changed with the ``--dump-unsealed`` flag, which can be useful in +conjunction with ``--incremental`` to gradually obtain data while a process is running. Furthermore, the ``-f/--flat`` +flag can be used to dump all files for each ``CalculationNode`` of the workflow in a flat directory structure, and the +``--include-inputs/--exclude-inputs`` (``--include-outputs/--exclude-outputs``) flags are used to also dump additional +node inputs (outputs) of each ``CalculationNode`` of the workflow into ``node_inputs`` (``node_outputs``) +subdirectories. + +For a full list of available options, call :code:`verdi process dump --help`. .. _how-to:data:import:provenance: diff --git a/src/aiida/cmdline/commands/cmd_process.py b/src/aiida/cmdline/commands/cmd_process.py index c9c492ae14..e203bdddfc 100644 --- a/src/aiida/cmdline/commands/cmd_process.py +++ b/src/aiida/cmdline/commands/cmd_process.py @@ -581,8 +581,17 @@ def process_repair(manager, broker, dry_run): '--flat', is_flag=True, default=False, + show_default=True, help='Dump files in a flat directory for every step of the workflow.', ) +@click.option( + '--dump-unsealed', + is_flag=True, + default=False, + show_default=True, + help='Also allow the dumping of unsealed process nodes.', +) +@options.INCREMENTAL() def process_dump( process, path, @@ -592,6 +601,8 @@ def process_dump( include_attributes, include_extras, flat, + dump_unsealed, + incremental, ) -> None: """Dump process input and output files to disk. @@ -609,6 +620,7 @@ def process_dump( node data for further inspection. """ + from aiida.tools.archive.exceptions import ExportValidationError from aiida.tools.dumping.processes import ProcessDumper process_dumper = ProcessDumper( @@ -618,6 +630,8 @@ def process_dump( include_extras=include_extras, overwrite=overwrite, flat=flat, + dump_unsealed=dump_unsealed, + incremental=incremental, ) try: @@ -626,6 +640,8 @@ def process_dump( echo.echo_critical( 'Dumping directory exists and overwrite is False. Set overwrite to True, or delete directory manually.' ) + except ExportValidationError as e: + echo.echo_critical(f'{e!s}') except Exception as e: echo.echo_critical(f'Unexpected error while dumping {process.__class__.__name__} <{process.pk}>:\n ({e!s}).') diff --git a/src/aiida/cmdline/params/options/main.py b/src/aiida/cmdline/params/options/main.py index 381199d199..3bf05e4191 100644 --- a/src/aiida/cmdline/params/options/main.py +++ b/src/aiida/cmdline/params/options/main.py @@ -68,6 +68,7 @@ 'GROUP_CLEAR', 'HOSTNAME', 'IDENTIFIER', + 'INCREMENTAL', 'INPUT_FORMAT', 'INPUT_PLUGIN', 'LABEL', @@ -765,12 +766,12 @@ def set_log_level(ctx, _param, value): ) OVERWRITE = OverridableOption( - '--overwrite', '-o', + '--overwrite', is_flag=True, default=False, show_default=True, - help='Overwrite file/directory if writing to disk.', + help='Overwrite file/directory when writing to disk.', ) SORT = OverridableOption( @@ -781,3 +782,11 @@ def set_log_level(ctx, _param, value): help='Sort the keys of the output YAML.', show_default=True, ) + +INCREMENTAL = OverridableOption( + '--incremental/--no-incremental', + is_flag=True, + default=True, + show_default=True, + help="Incremental dumping of data to disk. Doesn't require using overwrite to clean previous directories.", +) diff --git a/src/aiida/tools/dumping/processes.py b/src/aiida/tools/dumping/processes.py index 3d970c421c..794b1fcab2 100644 --- a/src/aiida/tools/dumping/processes.py +++ b/src/aiida/tools/dumping/processes.py @@ -30,6 +30,8 @@ WorkFunctionNode, ) from aiida.orm.utils import LinkTriple +from aiida.tools.archive.exceptions import ExportValidationError +from aiida.tools.dumping.utils import prepare_dump_path LOGGER = logging.getLogger(__name__) @@ -43,6 +45,8 @@ def __init__( include_extras: bool = True, overwrite: bool = False, flat: bool = False, + dump_unsealed: bool = False, + incremental: bool = True, ) -> None: self.include_inputs = include_inputs self.include_outputs = include_outputs @@ -50,6 +54,8 @@ def __init__( self.include_extras = include_extras self.overwrite = overwrite self.flat = flat + self.dump_unsealed = dump_unsealed + self.incremental = incremental @staticmethod def _generate_default_dump_path(process_node: ProcessNode) -> Path: @@ -178,12 +184,18 @@ def dump( :param output_path: The output path where the directory tree will be created. :param io_dump_paths: Subdirectories created for each `CalculationNode`. Default: ['inputs', 'outputs', 'node_inputs', 'node_outputs'] + :raises: ExportValidationError if the node is not sealed and dump_unsealed is False. """ + if not process_node.is_sealed and not self.dump_unsealed: + raise ExportValidationError( + f'Process `{process_node.pk} must be sealed before it can be dumped, or `dump_unsealed` set to True.' + ) + if output_path is None: output_path = self._generate_default_dump_path(process_node=process_node) - self._validate_make_dump_path(validate_path=output_path) + prepare_dump_path(path_to_validate=output_path, overwrite=self.overwrite, incremental=self.incremental) if isinstance(process_node, CalculationNode): self._dump_calculation( @@ -213,7 +225,7 @@ def _dump_workflow( :param io_dump_paths: Custom subdirectories for `CalculationNode` s, defaults to None """ - self._validate_make_dump_path(validate_path=output_path) + prepare_dump_path(path_to_validate=output_path, overwrite=self.overwrite, incremental=self.incremental) self._dump_node_yaml(process_node=workflow_node, output_path=output_path) called_links = workflow_node.base.links.get_outgoing(link_type=(LinkType.CALL_CALC, LinkType.CALL_WORK)).all() @@ -254,7 +266,7 @@ def _dump_calculation( Default: ['inputs', 'outputs', 'node_inputs', 'node_outputs'] """ - self._validate_make_dump_path(validate_path=output_path) + prepare_dump_path(path_to_validate=output_path, overwrite=self.overwrite, incremental=self.incremental) self._dump_node_yaml(process_node=calculation_node, output_path=output_path) io_dump_mapping = self._generate_calculation_io_mapping(io_dump_paths=io_dump_paths) @@ -303,47 +315,6 @@ def _dump_calculation_io(self, parent_path: Path, link_triples: LinkManager | Li link_triple.node.base.repository.copy_tree(linked_node_path.resolve()) - def _validate_make_dump_path(self, validate_path: Path, safeguard_file: str = '.aiida_node_metadata.yaml') -> Path: - """Create default dumping directory for a given process node and return it as absolute path. - - :param validate_path: Path to validate for dumping. - :param safeguard_file: Dumping-specific file to avoid deleting wrong directory. - Default: `.aiida_node_metadata.yaml` - :return: The absolute created dump path. - """ - import shutil - - if validate_path.is_dir(): - # Existing, empty directory -> OK - if not any(validate_path.iterdir()): - pass - - # Existing, non-empty directory and overwrite False -> FileExistsError - elif not self.overwrite: - raise FileExistsError(f'Path `{validate_path}` already exists and overwrite set to False.') - - # Existing, non-empty directory and overwrite True - # Check for safeguard file ('.aiida_node_metadata.yaml') for safety - # If present -> Remove directory - elif (validate_path / safeguard_file).is_file(): - LOGGER.info(f'Overwrite set to true, will overwrite directory `{validate_path}`.') - shutil.rmtree(validate_path) - - # Existing and non-empty directory and overwrite True - # Check for safeguard file ('.aiida_node_metadata.yaml') for safety - # If absent -> Don't remove directory as to not accidentally remove a wrong one - else: - raise Exception( - f"Path `{validate_path}` already exists and doesn't contain safeguard file {safeguard_file}." - f' Not removing for safety reasons.' - ) - - # Not included in if-else as to avoid having to repeat the `mkdir` call. - # `exist_ok=True` as checks implemented above - validate_path.mkdir(exist_ok=True, parents=True) - - return validate_path.resolve() - def _generate_calculation_io_mapping(self, io_dump_paths: List[str | Path] | None = None) -> SimpleNamespace: """Helper function to generate mapping for entities dumped for each `CalculationNode`. diff --git a/src/aiida/tools/dumping/utils.py b/src/aiida/tools/dumping/utils.py new file mode 100644 index 0000000000..a631ac25e5 --- /dev/null +++ b/src/aiida/tools/dumping/utils.py @@ -0,0 +1,75 @@ +########################################################################### +# Copyright (c), The AiiDA team. All rights reserved. # +# This file is part of the AiiDA code. # +# # +# The code is hosted on GitHub at https://github.com/aiidateam/aiida-core # +# For further information on the license, see the LICENSE.txt file # +# For further information please visit http://www.aiida.net # +########################################################################### +"""Utility functions for dumping features.""" + +from __future__ import annotations + +import logging +import shutil +from pathlib import Path + +__all__ = ['prepare_dump_path'] + +logger = logging.getLogger(__name__) + + +def prepare_dump_path( + path_to_validate: Path, + overwrite: bool = False, + incremental: bool = True, + safeguard_file: str = '.aiida_node_metadata.yaml', +) -> None: + """Create default dumping directory for a given process node and return it as absolute path. + + :param validate_path: Path to validate for dumping. + :param safeguard_file: Dumping-specific file that indicates that the directory indeed originated from a `verdi ... + dump` command to avoid accidentally deleting wrong directory. + Default: `.aiida_node_metadata.yaml` + :return: The absolute created dump path. + :raises ValueError: If both `overwrite` and `incremental` are set to True. + :raises FileExistsError: If a file or non-empty directory exists at the given path and none of `overwrite` or + `incremental` are enabled. + :raises FileNotFoundError: If no `safeguard_file` is found.""" + + if overwrite and incremental: + raise ValueError('Both overwrite and incremental set to True. Only specify one.') + + if path_to_validate.is_file(): + raise FileExistsError(f'A file at the given path `{path_to_validate}` already exists.') + + # Handle existing directory + if path_to_validate.is_dir(): + is_empty = not any(path_to_validate.iterdir()) + + # Case 1: Non-empty directory and overwrite is False + if not is_empty and not overwrite: + if incremental: + logger.info('Incremental dumping selected. Will keep directory.') + else: + raise FileExistsError( + f'Path `{path_to_validate}` already exists, and neither overwrite nor incremental is enabled.' + ) + + # Case 2: Non-empty directory, overwrite is True + if not is_empty and overwrite: + safeguard_exists = (path_to_validate / safeguard_file).is_file() + + if safeguard_exists: + logger.info(f'Overwriting directory `{path_to_validate}`.') + shutil.rmtree(path_to_validate) + + else: + raise FileNotFoundError( + f'Path `{path_to_validate}` exists without safeguard file ' + f'`{safeguard_file}`. Not removing because path might be a directory not created by AiiDA.' + ) + + # Create directory if it doesn't exist or was removed + path_to_validate.mkdir(exist_ok=True, parents=True) + (path_to_validate / safeguard_file).touch() diff --git a/tests/cmdline/commands/test_process.py b/tests/cmdline/commands/test_process.py index fae9957f80..b70991aa4d 100644 --- a/tests/cmdline/commands/test_process.py +++ b/tests/cmdline/commands/test_process.py @@ -365,13 +365,13 @@ def test_process_dump(self, run_cli_command, tmp_path, generate_workchain_multip assert result.exception is None, result.output assert 'Success:' in result.output - # Trying to run the dumping again in the same path but without overwrite=True should raise exception - options = [str(node.pk), '-p', str(test_path)] + # Trying to run the dumping again in the same path but with overwrite=False should raise exception + options = [str(node.pk), '-p', str(test_path), '--no-incremental'] result = run_cli_command(cmd_process.process_dump, options, raises=True) assert result.exit_code is ExitCode.CRITICAL # Works fine when using overwrite=True - options = [str(node.pk), '-p', str(test_path), '-o'] + options = [str(node.pk), '-p', str(test_path), '-o', '--no-incremental'] result = run_cli_command(cmd_process.process_dump, options) assert result.exception is None, result.output assert 'Success:' in result.output diff --git a/tests/tools/dumping/test_processes.py b/tests/tools/dumping/test_processes.py index 82e704f4e2..be05d50441 100644 --- a/tests/tools/dumping/test_processes.py +++ b/tests/tools/dumping/test_processes.py @@ -11,6 +11,7 @@ from __future__ import annotations import io +import shutil from pathlib import Path import pytest @@ -114,11 +115,19 @@ def _generate_workchain_node_io(cj_nodes, store_all: bool = True): # Only test top-level actions, like path and README creation # Other things tested via `_dump_workflow` and `_dump_calculation` def test_dump(generate_calculation_node_io, generate_workchain_node_io, tmp_path): + from aiida.tools.archive.exceptions import ExportValidationError + dump_parent_path = tmp_path / 'wc-dump-test-io' process_dumper = ProcessDumper() # Don't attach outputs, as it would require storing the calculation_node and then it cannot be used in the workchain cj_nodes = [generate_calculation_node_io(attach_outputs=False), generate_calculation_node_io(attach_outputs=False)] wc_node = generate_workchain_node_io(cj_nodes=cj_nodes) + + # Raises if ProcessNode not sealed + with pytest.raises(ExportValidationError): + return_path = process_dumper.dump(process_node=wc_node, output_path=dump_parent_path) + + wc_node.seal() return_path = process_dumper.dump(process_node=wc_node, output_path=dump_parent_path) assert dump_parent_path.is_dir() @@ -266,14 +275,31 @@ def test_dump_calculation_flat(tmp_path, generate_calculation_node_io): # Here, in principle, test only non-default arguments, as defaults tested above -# @pytest.mark.parametrize('overwrite', (True, False)) -def test_dump_calculation_overwrite(tmp_path, generate_calculation_node_io): +def test_dump_calculation_overwr_incr(tmp_path, generate_calculation_node_io): + """Tests the ProcessDumper for the overwrite and incremental option.""" dump_parent_path = tmp_path / 'cj-dump-test-overwrite' - process_dumper = ProcessDumper(overwrite=False) + process_dumper = ProcessDumper(overwrite=False, incremental=False) calculation_node = generate_calculation_node_io() - process_dumper._dump_calculation(calculation_node=calculation_node, output_path=dump_parent_path) + calculation_node.seal() + # Create safeguard file to mock existing dump directory + dump_parent_path.mkdir() + # we create safeguard file so the dumping works + (dump_parent_path / '.aiida_node_metadata.yaml').touch() with pytest.raises(FileExistsError): process_dumper._dump_calculation(calculation_node=calculation_node, output_path=dump_parent_path) + # With overwrite option true no error is raised and the dumping can run through. + process_dumper = ProcessDumper(overwrite=True, incremental=False) + process_dumper._dump_calculation(calculation_node=calculation_node, output_path=dump_parent_path) + assert (dump_parent_path / inputs_relpath / filename).is_file() + + shutil.rmtree(dump_parent_path) + + # Incremental also does work + dump_parent_path.mkdir() + (dump_parent_path / '.aiida_node_metadata.yaml').touch() + process_dumper = ProcessDumper(overwrite=False, incremental=True) + process_dumper._dump_calculation(calculation_node=calculation_node, output_path=dump_parent_path) + assert (dump_parent_path / inputs_relpath / filename).is_file() # With both inputs and outputs being dumped is the standard test case above, so only test without inputs here @@ -303,43 +329,65 @@ def test_dump_calculation_add(tmp_path, generate_calculation_node_add): # Tests for helper methods @pytest.mark.usefixtures('chdir_tmp_path') -def test_validate_make_dump_path(tmp_path): +def test_prepare_dump_path(tmp_path): + from aiida.tools.dumping.utils import prepare_dump_path + + test_dir = tmp_path / Path('test-dir') + test_file = test_dir / filename safeguard_file = node_metadata_file + safeguard_file_path = test_dir / safeguard_file - # Path must be provided - process_dumper = ProcessDumper(overwrite=False) - with pytest.raises(TypeError): - process_dumper._validate_make_dump_path() + # Cannot set both overwrite and incremental to True + with pytest.raises(ValueError): + prepare_dump_path(path_to_validate=test_dir, overwrite=True, incremental=True) + + # Check that fails if file with same name as output dir + test_dir.touch() + with pytest.raises(FileExistsError): + prepare_dump_path(path_to_validate=test_dir) + test_dir.unlink() # Check if path created if non-existent - test_dir = tmp_path / Path('test-dir') - test_dir.mkdir() - output_path = process_dumper._validate_make_dump_path(validate_path=test_dir) - assert output_path == test_dir + prepare_dump_path(path_to_validate=test_dir) + assert test_dir.exists() + assert safeguard_file_path.is_file() - # Empty path is fine -> No error and full path returned - output_path = process_dumper._validate_make_dump_path(validate_path=test_dir) - assert output_path == test_dir + # Directory exists, but empty -> is fine + safeguard_file_path.unlink() + prepare_dump_path(path_to_validate=test_dir) + assert test_dir.exists() + assert safeguard_file_path.is_file() # Fails if directory not empty, safeguard file existent, and overwrite set to False - (test_dir / filename).touch() - (test_dir / safeguard_file).touch() + test_file.touch() + safeguard_file_path.touch() with pytest.raises(FileExistsError): - output_path = process_dumper._validate_make_dump_path(validate_path=test_dir) - assert (test_dir / filename).is_file() - - # Works if directory not empty, but overwrite=True and safeguard_file (e.g. `.aiida_node_metadata.yaml`) contained - process_dumper = ProcessDumper(overwrite=True) - output_path = process_dumper._validate_make_dump_path(validate_path=test_dir, safeguard_file=safeguard_file) - assert output_path == test_dir - assert not (test_dir / safeguard_file).is_file() - - # Fails if directory not empty and overwrite set to True, but safeguard_file not found (for safety reasons) - # Could define new Exception for this... - (test_dir / filename).touch() - with pytest.raises(Exception): - output_path = process_dumper._validate_make_dump_path(validate_path=test_dir) - assert (test_dir / filename).is_file() + prepare_dump_path(path_to_validate=test_dir, overwrite=False, incremental=False) + + # Fails if directory not empty, overwrite set to True, but safeguard_file not found (for safety reasons) + safeguard_file_path.unlink() + test_file.touch() + with pytest.raises(FileNotFoundError): + prepare_dump_path(path_to_validate=test_dir, overwrite=True, incremental=False) + + # Works if directory not empty, overwrite set to True and safeguard_file contained + # -> After function call, test_file is deleted, and safeguard_file again created + safeguard_file_path.touch() + prepare_dump_path( + path_to_validate=test_dir, + safeguard_file=safeguard_file, + overwrite=True, + incremental=False, + ) + assert not test_file.is_file() + assert safeguard_file_path.is_file() + + # Works if directory not empty, but incremental=True and safeguard_file (e.g. `.aiida_node_metadata.yaml`) contained + # -> After function call, test file and safeguard_file still there + test_file.touch() + prepare_dump_path(path_to_validate=test_dir, safeguard_file=safeguard_file, incremental=True) + assert safeguard_file_path.is_file() + assert test_file.is_file() def test_generate_default_dump_path(