Skip to content

Commit

Permalink
CLI: Dump only sealed process nodes (#6591)
Browse files Browse the repository at this point in the history
Previously, no checks were applied on the state of a process when dumping its data to disk via `verdi process dump`.
Now, by default, only `sealed` processes are being dumped. This can be disabled with the new `--dump-unsealed` flag. In
addition, the option of `--incremental` dumping is added, in which case an incomplete existing dumping output directory
can be gradually filled up with data, e.g., while the process is running in conjunction with the `--dump-unsealed` flag.
Before, only the `--overwrite` flag was available to handle the case of an already existing directory, however, it
directly cleans the entire top-level directory. Lastly, the `prepare_dump_path` method (previously
`validate_make_dump_path`) was refactored out of the `ProcessDumper` and put as a normal function into `utils.py` in the
`tools/dumping` directory.
  • Loading branch information
GeigerJ2 authored Oct 28, 2024
1 parent 867353c commit 7057238
Show file tree
Hide file tree
Showing 7 changed files with 219 additions and 95 deletions.
31 changes: 18 additions & 13 deletions docs/source/howto/data.rst
Original file line number Diff line number Diff line change
Expand Up @@ -129,19 +129,24 @@ top-level process. Further, numbered subdirectories are created for each step of
``aiida.out`` of the ``ArithmeticAddCalculation`` are placed in ``inputs`` and ``outputs``. In addition, these also
contain the submission script ``_aiidasubmit.sh``, as well as the scheduler stdout and stderr, ``_scheduler-stdout.txt``
and ``_scheduler-stderr.txt``, respectively. Lastly, the source code of the ``multiply`` ``calcfunction`` presenting the
first step of the workflow is contained in the ``source_file``.

Upon having a closer look at the directory, we also find the hidden ``.aiida_node_metadata.yaml`` files, which are
created for every ``ProcessNode`` and contain additional information about the ``Node``, the ``User``, and the
``Computer``, as well as the ``.aiida`` subdirectory with machine-readable AiiDA-internal data in JSON format.

Since child processes are explored recursively, arbitrarily complex, nested workflows can be dumped. As already seen
above, the ``-p`` flag allows to specify a custom dumping path. If none is provided, it is automatically generated from
the ``process_label`` (or ``process_type``) and the ``pk``. In addition, the command provides the ``-o`` flag to
overwrite existing directories, the ``-f`` flag to dump all files for each ``CalculationNode`` of the workflow in a flat
directory structure, and the ``--include-inputs/--exclude-inputs`` (``--include-outputs/--exclude-outputs``) flags to
also dump additional node inputs (outputs) of each ``CalculationNode`` of the workflow into ``node_inputs``
(``node_outputs``) subdirectories. For a full list of available options, call :code:`verdi process dump --help`.
first step of the workflow is contained in the ``source_file``. Since child processes are explored recursively,
arbitrarily complex, nested workflows can be dumped. Upon having a closer look at the directory, we also find the hidden
``.aiida_node_metadata.yaml`` files, which are created for every ``ProcessNode`` and contain additional information
about the ``Node``, the ``User``, and the ``Computer``, as well as the ``.aiida`` subdirectory with machine-readable
AiiDA-internal data in JSON format.

As already seen above, the ``-p`` flag allows to specify a custom dumping path. If none is provided, it is automatically
generated from the ``process_label`` (or ``process_type``) and the ``pk``. In addition, the command provides the
``-o/--overwrite`` flag to fully overwrite an existing dumping directory, as well as the ``--incremental`` flag, with
which files are gradually added to an existing directory (this is the default behavior). By default, only sealed process
nodes can be dumped, however, the behavior can be changed with the ``--dump-unsealed`` flag, which can be useful in
conjunction with ``--incremental`` to gradually obtain data while a process is running. Furthermore, the ``-f/--flat``
flag can be used to dump all files for each ``CalculationNode`` of the workflow in a flat directory structure, and the
``--include-inputs/--exclude-inputs`` (``--include-outputs/--exclude-outputs``) flags are used to also dump additional
node inputs (outputs) of each ``CalculationNode`` of the workflow into ``node_inputs`` (``node_outputs``)
subdirectories.

For a full list of available options, call :code:`verdi process dump --help`.

.. _how-to:data:import:provenance:

Expand Down
16 changes: 16 additions & 0 deletions src/aiida/cmdline/commands/cmd_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -581,8 +581,17 @@ def process_repair(manager, broker, dry_run):
'--flat',
is_flag=True,
default=False,
show_default=True,
help='Dump files in a flat directory for every step of the workflow.',
)
@click.option(
'--dump-unsealed',
is_flag=True,
default=False,
show_default=True,
help='Also allow the dumping of unsealed process nodes.',
)
@options.INCREMENTAL()
def process_dump(
process,
path,
Expand All @@ -592,6 +601,8 @@ def process_dump(
include_attributes,
include_extras,
flat,
dump_unsealed,
incremental,
) -> None:
"""Dump process input and output files to disk.
Expand All @@ -609,6 +620,7 @@ def process_dump(
node data for further inspection.
"""

from aiida.tools.archive.exceptions import ExportValidationError
from aiida.tools.dumping.processes import ProcessDumper

process_dumper = ProcessDumper(
Expand All @@ -618,6 +630,8 @@ def process_dump(
include_extras=include_extras,
overwrite=overwrite,
flat=flat,
dump_unsealed=dump_unsealed,
incremental=incremental,
)

try:
Expand All @@ -626,6 +640,8 @@ def process_dump(
echo.echo_critical(
'Dumping directory exists and overwrite is False. Set overwrite to True, or delete directory manually.'
)
except ExportValidationError as e:
echo.echo_critical(f'{e!s}')
except Exception as e:
echo.echo_critical(f'Unexpected error while dumping {process.__class__.__name__} <{process.pk}>:\n ({e!s}).')

Expand Down
13 changes: 11 additions & 2 deletions src/aiida/cmdline/params/options/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
'GROUP_CLEAR',
'HOSTNAME',
'IDENTIFIER',
'INCREMENTAL',
'INPUT_FORMAT',
'INPUT_PLUGIN',
'LABEL',
Expand Down Expand Up @@ -765,12 +766,12 @@ def set_log_level(ctx, _param, value):
)

OVERWRITE = OverridableOption(
'--overwrite',
'-o',
'--overwrite',
is_flag=True,
default=False,
show_default=True,
help='Overwrite file/directory if writing to disk.',
help='Overwrite file/directory when writing to disk.',
)

SORT = OverridableOption(
Expand All @@ -781,3 +782,11 @@ def set_log_level(ctx, _param, value):
help='Sort the keys of the output YAML.',
show_default=True,
)

INCREMENTAL = OverridableOption(
'--incremental/--no-incremental',
is_flag=True,
default=True,
show_default=True,
help="Incremental dumping of data to disk. Doesn't require using overwrite to clean previous directories.",
)
59 changes: 15 additions & 44 deletions src/aiida/tools/dumping/processes.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
WorkFunctionNode,
)
from aiida.orm.utils import LinkTriple
from aiida.tools.archive.exceptions import ExportValidationError
from aiida.tools.dumping.utils import prepare_dump_path

LOGGER = logging.getLogger(__name__)

Expand All @@ -43,13 +45,17 @@ def __init__(
include_extras: bool = True,
overwrite: bool = False,
flat: bool = False,
dump_unsealed: bool = False,
incremental: bool = True,
) -> None:
self.include_inputs = include_inputs
self.include_outputs = include_outputs
self.include_attributes = include_attributes
self.include_extras = include_extras
self.overwrite = overwrite
self.flat = flat
self.dump_unsealed = dump_unsealed
self.incremental = incremental

@staticmethod
def _generate_default_dump_path(process_node: ProcessNode) -> Path:
Expand Down Expand Up @@ -178,12 +184,18 @@ def dump(
:param output_path: The output path where the directory tree will be created.
:param io_dump_paths: Subdirectories created for each `CalculationNode`.
Default: ['inputs', 'outputs', 'node_inputs', 'node_outputs']
:raises: ExportValidationError if the node is not sealed and dump_unsealed is False.
"""

if not process_node.is_sealed and not self.dump_unsealed:
raise ExportValidationError(
f'Process `{process_node.pk} must be sealed before it can be dumped, or `dump_unsealed` set to True.'
)

if output_path is None:
output_path = self._generate_default_dump_path(process_node=process_node)

self._validate_make_dump_path(validate_path=output_path)
prepare_dump_path(path_to_validate=output_path, overwrite=self.overwrite, incremental=self.incremental)

if isinstance(process_node, CalculationNode):
self._dump_calculation(
Expand Down Expand Up @@ -213,7 +225,7 @@ def _dump_workflow(
:param io_dump_paths: Custom subdirectories for `CalculationNode` s, defaults to None
"""

self._validate_make_dump_path(validate_path=output_path)
prepare_dump_path(path_to_validate=output_path, overwrite=self.overwrite, incremental=self.incremental)
self._dump_node_yaml(process_node=workflow_node, output_path=output_path)

called_links = workflow_node.base.links.get_outgoing(link_type=(LinkType.CALL_CALC, LinkType.CALL_WORK)).all()
Expand Down Expand Up @@ -254,7 +266,7 @@ def _dump_calculation(
Default: ['inputs', 'outputs', 'node_inputs', 'node_outputs']
"""

self._validate_make_dump_path(validate_path=output_path)
prepare_dump_path(path_to_validate=output_path, overwrite=self.overwrite, incremental=self.incremental)
self._dump_node_yaml(process_node=calculation_node, output_path=output_path)

io_dump_mapping = self._generate_calculation_io_mapping(io_dump_paths=io_dump_paths)
Expand Down Expand Up @@ -303,47 +315,6 @@ def _dump_calculation_io(self, parent_path: Path, link_triples: LinkManager | Li

link_triple.node.base.repository.copy_tree(linked_node_path.resolve())

def _validate_make_dump_path(self, validate_path: Path, safeguard_file: str = '.aiida_node_metadata.yaml') -> Path:
"""Create default dumping directory for a given process node and return it as absolute path.
:param validate_path: Path to validate for dumping.
:param safeguard_file: Dumping-specific file to avoid deleting wrong directory.
Default: `.aiida_node_metadata.yaml`
:return: The absolute created dump path.
"""
import shutil

if validate_path.is_dir():
# Existing, empty directory -> OK
if not any(validate_path.iterdir()):
pass

# Existing, non-empty directory and overwrite False -> FileExistsError
elif not self.overwrite:
raise FileExistsError(f'Path `{validate_path}` already exists and overwrite set to False.')

# Existing, non-empty directory and overwrite True
# Check for safeguard file ('.aiida_node_metadata.yaml') for safety
# If present -> Remove directory
elif (validate_path / safeguard_file).is_file():
LOGGER.info(f'Overwrite set to true, will overwrite directory `{validate_path}`.')
shutil.rmtree(validate_path)

# Existing and non-empty directory and overwrite True
# Check for safeguard file ('.aiida_node_metadata.yaml') for safety
# If absent -> Don't remove directory as to not accidentally remove a wrong one
else:
raise Exception(
f"Path `{validate_path}` already exists and doesn't contain safeguard file {safeguard_file}."
f' Not removing for safety reasons.'
)

# Not included in if-else as to avoid having to repeat the `mkdir` call.
# `exist_ok=True` as checks implemented above
validate_path.mkdir(exist_ok=True, parents=True)

return validate_path.resolve()

def _generate_calculation_io_mapping(self, io_dump_paths: List[str | Path] | None = None) -> SimpleNamespace:
"""Helper function to generate mapping for entities dumped for each `CalculationNode`.
Expand Down
75 changes: 75 additions & 0 deletions src/aiida/tools/dumping/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
###########################################################################
# Copyright (c), The AiiDA team. All rights reserved. #
# This file is part of the AiiDA code. #
# #
# The code is hosted on GitHub at https://github.com/aiidateam/aiida-core #
# For further information on the license, see the LICENSE.txt file #
# For further information please visit http://www.aiida.net #
###########################################################################
"""Utility functions for dumping features."""

from __future__ import annotations

import logging
import shutil
from pathlib import Path

__all__ = ['prepare_dump_path']

logger = logging.getLogger(__name__)


def prepare_dump_path(
path_to_validate: Path,
overwrite: bool = False,
incremental: bool = True,
safeguard_file: str = '.aiida_node_metadata.yaml',
) -> None:
"""Create default dumping directory for a given process node and return it as absolute path.
:param validate_path: Path to validate for dumping.
:param safeguard_file: Dumping-specific file that indicates that the directory indeed originated from a `verdi ...
dump` command to avoid accidentally deleting wrong directory.
Default: `.aiida_node_metadata.yaml`
:return: The absolute created dump path.
:raises ValueError: If both `overwrite` and `incremental` are set to True.
:raises FileExistsError: If a file or non-empty directory exists at the given path and none of `overwrite` or
`incremental` are enabled.
:raises FileNotFoundError: If no `safeguard_file` is found."""

if overwrite and incremental:
raise ValueError('Both overwrite and incremental set to True. Only specify one.')

if path_to_validate.is_file():
raise FileExistsError(f'A file at the given path `{path_to_validate}` already exists.')

# Handle existing directory
if path_to_validate.is_dir():
is_empty = not any(path_to_validate.iterdir())

# Case 1: Non-empty directory and overwrite is False
if not is_empty and not overwrite:
if incremental:
logger.info('Incremental dumping selected. Will keep directory.')
else:
raise FileExistsError(
f'Path `{path_to_validate}` already exists, and neither overwrite nor incremental is enabled.'
)

# Case 2: Non-empty directory, overwrite is True
if not is_empty and overwrite:
safeguard_exists = (path_to_validate / safeguard_file).is_file()

if safeguard_exists:
logger.info(f'Overwriting directory `{path_to_validate}`.')
shutil.rmtree(path_to_validate)

else:
raise FileNotFoundError(
f'Path `{path_to_validate}` exists without safeguard file '
f'`{safeguard_file}`. Not removing because path might be a directory not created by AiiDA.'
)

# Create directory if it doesn't exist or was removed
path_to_validate.mkdir(exist_ok=True, parents=True)
(path_to_validate / safeguard_file).touch()
6 changes: 3 additions & 3 deletions tests/cmdline/commands/test_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,13 +365,13 @@ def test_process_dump(self, run_cli_command, tmp_path, generate_workchain_multip
assert result.exception is None, result.output
assert 'Success:' in result.output

# Trying to run the dumping again in the same path but without overwrite=True should raise exception
options = [str(node.pk), '-p', str(test_path)]
# Trying to run the dumping again in the same path but with overwrite=False should raise exception
options = [str(node.pk), '-p', str(test_path), '--no-incremental']
result = run_cli_command(cmd_process.process_dump, options, raises=True)
assert result.exit_code is ExitCode.CRITICAL

# Works fine when using overwrite=True
options = [str(node.pk), '-p', str(test_path), '-o']
options = [str(node.pk), '-p', str(test_path), '-o', '--no-incremental']
result = run_cli_command(cmd_process.process_dump, options)
assert result.exception is None, result.output
assert 'Success:' in result.output
Expand Down
Loading

0 comments on commit 7057238

Please sign in to comment.