From 6291accf0538eafe7426e89bc4c1e9eb90ce0385 Mon Sep 17 00:00:00 2001 From: Julian Geiger Date: Mon, 27 May 2024 13:04:08 +0200 Subject: [PATCH] CLI: Add `verdi process dump` and the `ProcessDumper` (#6276) This commit adds functionality to write all files involved in the execution of a workflow to disk. This is achieved via the new `ProcessDumper` class, which exposes the top-level `dump` method, while `verdi process dump` provides a wrapper for access via the CLI. Instantiating the `ProcessDumper` class is used to set the available options for the dumping. These are the `-o/--overwrite` option, the `--io-dump-paths` option which can be used to provide custom subdirectories for the folders created for each `CalculationNode` (the dumped data being the `CalculationNode` repository, its `retrieved` outputs, as well as the linked node inputs and outputs), the `-f/--flat` option that disables the creation of these subdirectories, thus creating all files in a flat hierarchy (for each step of the workflow), and the `--include-inputs/--exclude-inputs` (`--include-outputs/--exclude-outputs`) options to enable/disable the dumping of linked inputs (outputs) for each `CalculationNode`. In addition, a `README` is created in the parent dumping directory, as well as `.aiida_node_metadata.yaml` files with the `Node`, `User`, and `Computer` information in the subdirectories created for each `ProcessNode`. Nested workchains with considerable file I/O were needed for meaningful testing of this feature, so it was required to extend the `generate_calculation_node` fixture of `conftest.py`. Moreover, the `generate_calculation_node_add` and `generate_workchain_multiply_add` fixtures that actually run the `ArithmeticAddCalculation` and `MultiplyAddWorkchain` were also added. These could in the future possibly be used to reduce code duplication where the objects are being constructed in other parts of the test suite (benchmarking of manually constructing the `ProcessNode`s vs. running the `Process` will still have to be conducted). Lastly, the `generate_calculation_node_io` and `generate_workchain_node_io` were added in `test_processes.py`, which actually create the `CalculationNode`s and `WorkflowNode`s that are used for the tests of the dumping functionality. Co-Authored-By: Junfeng Qiao --- docs/source/howto/data.rst | 64 +++ docs/source/reference/command_line.rst | 1 + src/aiida/cmdline/commands/cmd_process.py | 84 ++++ src/aiida/cmdline/params/options/main.py | 21 + src/aiida/engine/daemon/execmanager.py | 4 +- src/aiida/tools/__init__.py | 1 + src/aiida/tools/dumping/__init__.py | 11 + src/aiida/tools/dumping/processes.py | 455 +++++++++++++++++++++ tests/cmdline/commands/test_process.py | 32 ++ tests/conftest.py | 88 +++- tests/tools/dumping/test_processes.py | 468 ++++++++++++++++++++++ 11 files changed, 1222 insertions(+), 7 deletions(-) create mode 100644 src/aiida/tools/dumping/__init__.py create mode 100644 src/aiida/tools/dumping/processes.py create mode 100644 tests/tools/dumping/test_processes.py diff --git a/docs/source/howto/data.rst b/docs/source/howto/data.rst index 504192cfda..33c9c33aba 100644 --- a/docs/source/howto/data.rst +++ b/docs/source/howto/data.rst @@ -78,6 +78,70 @@ Ways to find and retrieve data that have previously been imported are described If none of the currently available data types, as listed by ``verdi plugin list``, seem to fit your needs, you can also create your own custom type. For details refer to the next section :ref:`"How to add support for custom data types"`. +.. _how-to:data:dump: + +Dumping data to disk +-------------------- + +.. versionadded:: 2.6 + +It is now possible to dump your executed workflows to disk in a hierarchical directory tree structure. This can be +particularly useful if one is not yet familiar with the ``QueryBuilder`` or wants to quickly explore input/output files +using existing shell scripts or common terminal utilities, such as ``grep``. The dumping can be achieved with the command: + +.. code-block:: shell + + verdi process dump + +For our beloved ``MultiplyAddWorkChain``, we obtain the following: + +.. code-block:: shell + + $ verdi process dump -p dump-multiply_add + Success: Raw files for WorkChainNode dumped into folder `dump-multiply_add`. + +.. code-block:: shell + + $ tree -a dump-multiply_add + dump-multiply_add + ├── README.md + ├── .aiida_node_metadata.yaml + ├── 01-multiply + │ ├── .aiida_node_metadata.yaml + │ └── inputs + │ └── source_file + └── 02-ArithmeticAddCalculation + ├── .aiida_node_metadata.yaml + ├── inputs + │ ├── .aiida + │ │ ├── calcinfo.json + │ │ └── job_tmpl.json + │ ├── _aiidasubmit.sh + │ └── aiida.in + └── outputs + ├── _scheduler-stderr.txt + ├── _scheduler-stdout.txt + └── aiida.out + +The ``README.md`` file provides a description of the directory structure, as well as useful information about the +top-level process. Further, numbered subdirectories are created for each step of the workflow, resulting in the +``01-multiply`` and ``02-ArithmeticAddCalculation`` folders. The raw calculation input and output files ``aiida.in`` and +``aiida.out`` of the ``ArithmeticAddCalculation`` are placed in ``inputs`` and ``outputs``. In addition, these also +contain the submission script ``_aiidasubmit.sh``, as well as the scheduler stdout and stderr, ``_scheduler-stdout.txt`` +and ``_scheduler-stderr.txt``, respectively. Lastly, the source code of the ``multiply`` ``calcfunction`` presenting the +first step of the workflow is contained in the ``source_file``. + +Upon having a closer look at the directory, we also find the hidden ``.aiida_node_metadata.yaml`` files, which are +created for every ``ProcessNode`` and contain additional information about the ``Node``, the ``User``, and the +``Computer``, as well as the ``.aiida`` subdirectory with machine-readable AiiDA-internal data in JSON format. + +Since child processes are explored recursively, arbitrarily complex, nested workflows can be dumped. As already seen +above, the ``-p`` flag allows to specify a custom dumping path. If none is provided, it is automatically generated from +the ``process_label`` (or ``process_type``) and the ``pk``. In addition, the command provides the ``-o`` flag to +overwrite existing directories, the ``-f`` flag to dump all files for each ``CalculationNode`` of the workflow in a flat +directory structure, and the ``--include-inputs/--exclude-inputs`` (``--include-outputs/--exclude-outputs``) flags to +also dump additional node inputs (outputs) of each ``CalculationNode`` of the workflow into ``node_inputs`` +(``node_outputs``) subdirectories. For a full list of available options, call :code:`verdi process dump --help`. .. _how-to:data:import:provenance: diff --git a/docs/source/reference/command_line.rst b/docs/source/reference/command_line.rst index 6822df9f0f..3553f953dd 100644 --- a/docs/source/reference/command_line.rst +++ b/docs/source/reference/command_line.rst @@ -367,6 +367,7 @@ Below is a list with all available subcommands. Commands: call-root Show root process of the call stack for the given processes. + dump Dump process input and output files to disk. kill Kill running processes. list Show a list of running or terminated processes. pause Pause running processes. diff --git a/src/aiida/cmdline/commands/cmd_process.py b/src/aiida/cmdline/commands/cmd_process.py index 3f37645e5e..52b286e795 100644 --- a/src/aiida/cmdline/commands/cmd_process.py +++ b/src/aiida/cmdline/commands/cmd_process.py @@ -481,3 +481,87 @@ def process_repair(manager, broker, dry_run): if pid not in set_process_tasks: process_controller.continue_process(pid) echo.echo_report(f'Revived process `{pid}`') + + +@verdi_process.command('dump') +@arguments.PROCESS() +@options.PATH() +@options.OVERWRITE() +@click.option( + '--include-inputs/--exclude-inputs', + default=True, + show_default=True, + help='Include the linked input nodes of the `CalculationNode`(s).', +) +@click.option( + '--include-outputs/--exclude-outputs', + default=False, + show_default=True, + help='Include the linked output nodes of the `CalculationNode`(s).', +) +@click.option( + '--include-attributes/--exclude-attributes', + default=True, + show_default=True, + help='Include attributes in the `.aiida_node_metadata.yaml` written for every `ProcessNode`.', +) +@click.option( + '--include-extras/--exclude-extras', + default=True, + show_default=True, + help='Include extras in the `.aiida_node_metadata.yaml` written for every `ProcessNode`.', +) +@click.option( + '-f', + '--flat', + is_flag=True, + default=False, + help='Dump files in a flat directory for every step of the workflow.', +) +def process_dump( + process, + path, + overwrite, + include_inputs, + include_outputs, + include_attributes, + include_extras, + flat, +) -> None: + """Dump process input and output files to disk. + + Child calculations/workflows (also called `CalcJob`s/`CalcFunction`s and `WorkChain`s/`WorkFunction`s in AiiDA + jargon) run by the parent workflow are contained in the directory tree as sub-folders and are sorted by their + creation time. The directory tree thus mirrors the logical execution of the workflow, which can also be queried by + running `verdi process status ` on the command line. + + By default, input and output files of each calculation can be found in the corresponding "inputs" and + "outputs" directories (the former also contains the hidden ".aiida" folder with machine-readable job execution + settings). Additional input and output files (depending on the type of calculation) are placed in the "node_inputs" + and "node_outputs", respectively. + + Lastly, every folder also contains a hidden, human-readable `.aiida_node_metadata.yaml` file with the relevant AiiDA + node data for further inspection. + """ + + from aiida.tools.dumping.processes import ProcessDumper + + process_dumper = ProcessDumper( + include_inputs=include_inputs, + include_outputs=include_outputs, + include_attributes=include_attributes, + include_extras=include_extras, + overwrite=overwrite, + flat=flat, + ) + + try: + dump_path = process_dumper.dump(process_node=process, output_path=path) + except FileExistsError: + echo.echo_critical( + 'Dumping directory exists and overwrite is False. Set overwrite to True, or delete directory manually.' + ) + except Exception as e: + echo.echo_critical(f'Unexpected error while dumping {process.__class__.__name__} <{process.pk}>:\n ({e!s}).') + + echo.echo_success(f'Raw files for {process.__class__.__name__} <{process.pk}> dumped into folder `{dump_path}`.') diff --git a/src/aiida/cmdline/params/options/main.py b/src/aiida/cmdline/params/options/main.py index 72545b2a9f..85b3090ad5 100644 --- a/src/aiida/cmdline/params/options/main.py +++ b/src/aiida/cmdline/params/options/main.py @@ -8,6 +8,8 @@ ########################################################################### """Module with pre-defined reusable commandline options that can be used as `click` decorators.""" +import pathlib + import click from aiida.brokers.rabbitmq.defaults import BROKER_DEFAULTS @@ -77,6 +79,8 @@ 'OLDER_THAN', 'ORDER_BY', 'ORDER_DIRECTION', + 'OVERWRITE', + 'PATH', 'PAST_DAYS', 'PAUSED', 'PORT', @@ -743,3 +747,20 @@ def set_log_level(_ctx, _param, value): is_flag=True, help='Print the full traceback in case an exception is raised.', ) + +PATH = OverridableOption( + '-p', + '--path', + type=click.Path(path_type=pathlib.Path), + show_default=False, + help='Base path for operations that write to disk.', +) + +OVERWRITE = OverridableOption( + '--overwrite', + '-o', + is_flag=True, + default=False, + show_default=True, + help='Overwrite file/directory if writing to disk.', +) diff --git a/src/aiida/engine/daemon/execmanager.py b/src/aiida/engine/daemon/execmanager.py index 305dd174b7..2eb6a5ff33 100644 --- a/src/aiida/engine/daemon/execmanager.py +++ b/src/aiida/engine/daemon/execmanager.py @@ -25,7 +25,7 @@ from aiida.common import AIIDA_LOGGER, exceptions from aiida.common.datastructures import CalcInfo, FileCopyOperation -from aiida.common.folders import SandboxFolder +from aiida.common.folders import Folder, SandboxFolder from aiida.common.links import LinkType from aiida.engine.processes.exit_code import ExitCode from aiida.manage.configuration import get_config_option @@ -66,7 +66,7 @@ def upload_calculation( node: CalcJobNode, transport: Transport, calc_info: CalcInfo, - folder: SandboxFolder, + folder: Folder, inputs: Optional[MappingType[str, Any]] = None, dry_run: bool = False, ) -> RemoteData | None: diff --git a/src/aiida/tools/__init__.py b/src/aiida/tools/__init__.py index a9ab4e5762..9c238fd858 100644 --- a/src/aiida/tools/__init__.py +++ b/src/aiida/tools/__init__.py @@ -24,6 +24,7 @@ from .calculations import * from .data import * +from .dumping import * from .graph import * from .groups import * from .visualization import * diff --git a/src/aiida/tools/dumping/__init__.py b/src/aiida/tools/dumping/__init__.py new file mode 100644 index 0000000000..a746fa171e --- /dev/null +++ b/src/aiida/tools/dumping/__init__.py @@ -0,0 +1,11 @@ +########################################################################### +# Copyright (c), The AiiDA team. All rights reserved. # +# This file is part of the AiiDA code. # +# # +# The code is hosted on GitHub at https://github.com/aiidateam/aiida-core # +# For further information on the license, see the LICENSE.txt file # +# For further information please visit http://www.aiida.net # +########################################################################### +"""Modules related to the dumping of AiiDA data.""" + +__all__ = ('processes',) diff --git a/src/aiida/tools/dumping/processes.py b/src/aiida/tools/dumping/processes.py new file mode 100644 index 0000000000..3d970c421c --- /dev/null +++ b/src/aiida/tools/dumping/processes.py @@ -0,0 +1,455 @@ +########################################################################### +# Copyright (c), The AiiDA team. All rights reserved. # +# This file is part of the AiiDA code. # +# # +# The code is hosted on GitHub at https://github.com/aiidateam/aiida-core # +# For further information on the license, see the LICENSE.txt file # +# For further information please visit http://www.aiida.net # +########################################################################### +"""Functionality for dumping of ProcessNodes.""" + +from __future__ import annotations + +import logging +from pathlib import Path +from types import SimpleNamespace +from typing import List + +import yaml + +from aiida.common import LinkType +from aiida.common.exceptions import NotExistentAttributeError +from aiida.orm import ( + CalcFunctionNode, + CalcJobNode, + CalculationNode, + LinkManager, + ProcessNode, + WorkChainNode, + WorkflowNode, + WorkFunctionNode, +) +from aiida.orm.utils import LinkTriple + +LOGGER = logging.getLogger(__name__) + + +class ProcessDumper: + def __init__( + self, + include_inputs: bool = True, + include_outputs: bool = False, + include_attributes: bool = True, + include_extras: bool = True, + overwrite: bool = False, + flat: bool = False, + ) -> None: + self.include_inputs = include_inputs + self.include_outputs = include_outputs + self.include_attributes = include_attributes + self.include_extras = include_extras + self.overwrite = overwrite + self.flat = flat + + @staticmethod + def _generate_default_dump_path(process_node: ProcessNode) -> Path: + """Simple helper function to generate the default parent-dumping directory if none given. + + This function is not called for the recursive sub-calls of `_dump_calculation` as it just creates the default + parent folder for the dumping, if no name is given. + + :param process_node: The `ProcessNode` for which the directory is created. + :return: The absolute default parent dump path. + """ + + pk = process_node.pk + try: + return Path(f'dump-{process_node.process_label}-{pk}') + except AttributeError: + # This case came up during testing, not sure how relevant it actually is + return Path(f'dump-{process_node.process_type}-{pk}') + + @staticmethod + def _generate_readme(process_node: ProcessNode, output_path: Path) -> None: + """Generate README.md file in main dumping directory. + + :param process_node: `CalculationNode` or `WorkflowNode`. + :param output_path: Output path for dumping. + + """ + + import textwrap + + from aiida.cmdline.utils.ascii_vis import format_call_graph + from aiida.cmdline.utils.common import ( + get_calcjob_report, + get_node_info, + get_process_function_report, + get_workchain_report, + ) + + pk = process_node.pk + + _readme_string = textwrap.dedent( + f"""\ + This directory contains the files involved in the calculation/workflow + `{process_node.process_label} <{pk}>` run with AiiDA. + + Child calculations/workflows (also called `CalcJob`s/`CalcFunction`s and `WorkChain`s/`WorkFunction`s in AiiDA + jargon) run by the parent workflow are contained in the directory tree as sub-folders and are sorted by their + creation time. The directory tree thus mirrors the logical execution of the workflow, which can also be queried + by running `verdi process status {pk}` on the command line. + + By default, input and output files of each calculation can be found in the corresponding "inputs" and "outputs" + directories (the former also contains the hidden ".aiida" folder with machine-readable job execution settings). + Additional input and output files (depending on the type of calculation) are placed in the "node_inputs" and + "node_outputs", respectively. + + Lastly, every folder also contains a hidden, human-readable `.aiida_node_metadata.yaml` file with the relevant + AiiDA node data for further inspection.""" + ) + + # `verdi process status` + process_status = format_call_graph(calc_node=process_node, max_depth=None, call_link_label=True) + _readme_string += f'\n\n\nOutput of `verdi process status {pk}`:\n\n```shell\n{process_status}\n```' + + # `verdi process report` + # Copied over from `cmd_process` + if isinstance(process_node, CalcJobNode): + process_report = get_calcjob_report(process_node) + elif isinstance(process_node, WorkChainNode): + process_report = get_workchain_report(process_node, levelname='REPORT', indent_size=2, max_depth=None) + elif isinstance(process_node, (CalcFunctionNode, WorkFunctionNode)): + process_report = get_process_function_report(process_node) + else: + process_report = f'Nothing to show for node type {process_node.__class__}' + + _readme_string += f'\n\n\nOutput of `verdi process report {pk}`:\n\n```shell\n{process_report}\n```' + + # `verdi process show`? + process_show = get_node_info(node=process_node) + _readme_string += f'\n\n\nOutput of `verdi process show {pk}`:\n\n```shell\n{process_show}\n```' + + (output_path / 'README.md').write_text(_readme_string) + + @staticmethod + def _generate_child_node_label(index: int, link_triple: LinkTriple) -> str: + """Small helper function to generate and clean directory label for child nodes during recursion. + + :param index: Index assigned to step at current level of recursion. + :param link_triple: `LinkTriple` of `ProcessNode` explored during recursion. + :return: Chlild node label during recursion. + """ + node = link_triple.node + link_label = link_triple.link_label + + # Generate directories with naming scheme akin to `verdi process status` + label_list = [f'{index:02d}', link_label] + + try: + process_label = node.process_label + if process_label is not None and process_label != link_label: + label_list += [process_label] + + except AttributeError: + process_type = node.process_type + if process_type is not None and process_type != link_label: + label_list += [process_type] + + node_label = '-'.join(label_list) + # `CALL-` as part of the link labels also for MultiplyAddWorkChain -> Seems general enough, so remove + node_label = node_label.replace('CALL-', '') + node_label = node_label.replace('None-', '') + + return node_label + + def dump( + self, + process_node: ProcessNode, + output_path: Path | None, + io_dump_paths: List[str | Path] | None = None, + ) -> Path: + """Dumps all data involved in a `ProcessNode`, including its outgoing links. + + Note that if an outgoing link is a `WorkflowNode`, the function recursively calls itself, while files are + only actually created when a `CalculationNode` is reached. + + :param process_node: The parent `ProcessNode` node to be dumped. + :param output_path: The output path where the directory tree will be created. + :param io_dump_paths: Subdirectories created for each `CalculationNode`. + Default: ['inputs', 'outputs', 'node_inputs', 'node_outputs'] + """ + + if output_path is None: + output_path = self._generate_default_dump_path(process_node=process_node) + + self._validate_make_dump_path(validate_path=output_path) + + if isinstance(process_node, CalculationNode): + self._dump_calculation( + calculation_node=process_node, + output_path=output_path, + io_dump_paths=io_dump_paths, + ) + + elif isinstance(process_node, WorkflowNode): + self._dump_workflow( + workflow_node=process_node, + output_path=output_path, + io_dump_paths=io_dump_paths, + ) + + self._generate_readme(process_node=process_node, output_path=output_path) + + return output_path + + def _dump_workflow( + self, workflow_node: WorkflowNode, output_path: Path, io_dump_paths: List[str | Path] | None = None + ) -> None: + """Recursive function to traverse a `WorkflowNode` and dump its `CalculationNode` s. + + :param workflow_node: `WorkflowNode` to be traversed. Will be updated during recursion. + :param output_path: Dumping parent directory. Will be updated during recursion. + :param io_dump_paths: Custom subdirectories for `CalculationNode` s, defaults to None + """ + + self._validate_make_dump_path(validate_path=output_path) + self._dump_node_yaml(process_node=workflow_node, output_path=output_path) + + called_links = workflow_node.base.links.get_outgoing(link_type=(LinkType.CALL_CALC, LinkType.CALL_WORK)).all() + called_links = sorted(called_links, key=lambda link_triple: link_triple.node.ctime) + + for index, link_triple in enumerate(called_links, start=1): + child_node = link_triple.node + child_label = self._generate_child_node_label(index=index, link_triple=link_triple) + child_output_path = output_path.resolve() / child_label + + # Recursive function call for `WorkFlowNode` + if isinstance(child_node, WorkflowNode): + self._dump_workflow( + workflow_node=child_node, + output_path=child_output_path, + io_dump_paths=io_dump_paths, + ) + + # Once a `CalculationNode` as child reached, dump it + elif isinstance(child_node, CalculationNode): + self._dump_calculation( + calculation_node=child_node, + output_path=child_output_path, + io_dump_paths=io_dump_paths, + ) + + def _dump_calculation( + self, + calculation_node: CalculationNode, + output_path: Path, + io_dump_paths: List[str | Path] | None = None, + ) -> None: + """Dump the contents of a `CalculationNode` to a specified output path. + + :param calculation_node: The `CalculationNode` to be dumped. + :param output_path: The path where the files will be dumped. + :param io_dump_paths: Subdirectories created for the `CalculationNode`. + Default: ['inputs', 'outputs', 'node_inputs', 'node_outputs'] + """ + + self._validate_make_dump_path(validate_path=output_path) + self._dump_node_yaml(process_node=calculation_node, output_path=output_path) + + io_dump_mapping = self._generate_calculation_io_mapping(io_dump_paths=io_dump_paths) + + # Dump the repository contents of the node + calculation_node.base.repository.copy_tree(output_path.resolve() / io_dump_mapping.repository) + + # Dump the repository contents of `outputs.retrieved` + try: + calculation_node.outputs.retrieved.base.repository.copy_tree( + output_path.resolve() / io_dump_mapping.retrieved + ) + except NotExistentAttributeError: + pass + + # Dump the node_inputs + if self.include_inputs: + input_links = calculation_node.base.links.get_incoming(link_type=LinkType.INPUT_CALC) + self._dump_calculation_io(parent_path=output_path / io_dump_mapping.inputs, link_triples=input_links) + + # Dump the node_outputs apart from `retrieved` + if self.include_outputs: + output_links = list(calculation_node.base.links.get_outgoing(link_type=LinkType.CREATE)) + output_links = [output_link for output_link in output_links if output_link.link_label != 'retrieved'] + + self._dump_calculation_io( + parent_path=output_path / io_dump_mapping.outputs, + link_triples=output_links, + ) + + def _dump_calculation_io(self, parent_path: Path, link_triples: LinkManager | List[LinkTriple]): + """Small helper function to dump linked input/output nodes of a `CalculationNode`. + + :param parent_path: Parent directory for dumping the linked node contents. + :param link_triples: List of link triples. + """ + + for link_triple in link_triples: + link_label = link_triple.link_label + + if not self.flat: + linked_node_path = parent_path / Path(*link_label.split('__')) + else: + # Don't use link_label at all -> But, relative path inside FolderData is retained + linked_node_path = parent_path + + link_triple.node.base.repository.copy_tree(linked_node_path.resolve()) + + def _validate_make_dump_path(self, validate_path: Path, safeguard_file: str = '.aiida_node_metadata.yaml') -> Path: + """Create default dumping directory for a given process node and return it as absolute path. + + :param validate_path: Path to validate for dumping. + :param safeguard_file: Dumping-specific file to avoid deleting wrong directory. + Default: `.aiida_node_metadata.yaml` + :return: The absolute created dump path. + """ + import shutil + + if validate_path.is_dir(): + # Existing, empty directory -> OK + if not any(validate_path.iterdir()): + pass + + # Existing, non-empty directory and overwrite False -> FileExistsError + elif not self.overwrite: + raise FileExistsError(f'Path `{validate_path}` already exists and overwrite set to False.') + + # Existing, non-empty directory and overwrite True + # Check for safeguard file ('.aiida_node_metadata.yaml') for safety + # If present -> Remove directory + elif (validate_path / safeguard_file).is_file(): + LOGGER.info(f'Overwrite set to true, will overwrite directory `{validate_path}`.') + shutil.rmtree(validate_path) + + # Existing and non-empty directory and overwrite True + # Check for safeguard file ('.aiida_node_metadata.yaml') for safety + # If absent -> Don't remove directory as to not accidentally remove a wrong one + else: + raise Exception( + f"Path `{validate_path}` already exists and doesn't contain safeguard file {safeguard_file}." + f' Not removing for safety reasons.' + ) + + # Not included in if-else as to avoid having to repeat the `mkdir` call. + # `exist_ok=True` as checks implemented above + validate_path.mkdir(exist_ok=True, parents=True) + + return validate_path.resolve() + + def _generate_calculation_io_mapping(self, io_dump_paths: List[str | Path] | None = None) -> SimpleNamespace: + """Helper function to generate mapping for entities dumped for each `CalculationNode`. + + This is to avoid exposing AiiDA terminology, like `repository` to the user, while keeping track of which + entities should be dumped into which directory, and allowing for alternative directory names. + + :param io_dump_paths: Subdirectories created for the `CalculationNode`. + Default: ['inputs', 'outputs', 'node_inputs', 'node_outputs'] + :return: SimpleNamespace mapping. + """ + + aiida_entities_to_dump = ['repository', 'retrieved', 'inputs', 'outputs'] + default_calculation_io_dump_paths = ['inputs', 'outputs', 'node_inputs', 'node_outputs'] + empty_calculation_io_dump_paths = [''] * 4 + + if self.flat and io_dump_paths is None: + LOGGER.info( + 'Flat set to True and no `io_dump_paths`. Dumping in a flat directory, files might be overwritten.' + ) + return SimpleNamespace(**dict(zip(aiida_entities_to_dump, empty_calculation_io_dump_paths))) + + elif not self.flat and io_dump_paths is None: + LOGGER.info( + 'Flat set to False but no `io_dump_paths` provided. ' + + f'Will use the defaults {default_calculation_io_dump_paths}.' + ) + return SimpleNamespace(**dict(zip(aiida_entities_to_dump, default_calculation_io_dump_paths))) + + elif self.flat and io_dump_paths is not None: + LOGGER.info('Flat set to True but `io_dump_paths` provided. These will be used, but `inputs` not nested.') + return SimpleNamespace(**dict(zip(aiida_entities_to_dump, io_dump_paths))) + else: + LOGGER.info( + 'Flat set to False but no `io_dump_paths` provided. These will be used, but `node_inputs` flattened.' + ) + return SimpleNamespace(**dict(zip(aiida_entities_to_dump, io_dump_paths))) # type: ignore[arg-type] + + def _dump_node_yaml( + self, + process_node: ProcessNode, + output_path: Path, + output_filename: str = '.aiida_node_metadata.yaml', + ) -> None: + """Dump the selected `ProcessNode` properties, attributes, and extras to a YAML file. + + :param process_node: The `ProcessNode` to dump. + :param output_path: The path to the directory where the YAML file will be saved. + :param output_filename: The name of the output YAML file. Defaults to `.aiida_node_metadata.yaml`. + """ + + node_properties = [ + 'label', + 'description', + 'pk', + 'uuid', + 'ctime', + 'mtime', + 'node_type', + 'process_type', + 'is_finished_ok', + ] + + user_properties = ('first_name', 'last_name', 'email', 'institution') + + computer_properties = ('label', 'hostname', 'scheduler_type', 'transport_type') + + node_dict = {} + metadata_dict = {} + + # Add actual node `@property`s to dictionary + for metadata_property in node_properties: + metadata_dict[metadata_property] = getattr(process_node, metadata_property) + + node_dict['Node data'] = metadata_dict + + # Add user data + try: + node_dbuser = process_node.user + user_dict = {} + for user_property in user_properties: + user_dict[user_property] = getattr(node_dbuser, user_property) + node_dict['User data'] = user_dict + except AttributeError: + pass + + # Add computer data + try: + node_dbcomputer = process_node.computer + computer_dict = {} + for computer_property in computer_properties: + computer_dict[computer_property] = getattr(node_dbcomputer, computer_property) + node_dict['Computer data'] = computer_dict + except AttributeError: + pass + + # Add node attributes + if self.include_attributes: + node_attributes = process_node.base.attributes.all + node_dict['Node attributes'] = node_attributes + + # Add node extras + if self.include_extras: + node_extras = process_node.base.extras.all + if node_extras: + node_dict['Node extras'] = node_extras + + output_file = output_path.resolve() / output_filename + with open(output_file, 'w') as handle: + yaml.dump(node_dict, handle, sort_keys=False) diff --git a/tests/cmdline/commands/test_process.py b/tests/cmdline/commands/test_process.py index 2b37750c67..73c9ac7084 100644 --- a/tests/cmdline/commands/test_process.py +++ b/tests/cmdline/commands/test_process.py @@ -17,6 +17,7 @@ import pytest from aiida import get_profile from aiida.cmdline.commands import cmd_process +from aiida.cmdline.utils.echo import ExitCode from aiida.common.links import LinkType from aiida.common.log import LOG_LEVEL_REPORT from aiida.engine import Process, ProcessState @@ -336,6 +337,37 @@ def test_report(self, run_cli_command): assert len(result.output_lines) == 1, result.output_lines assert result.output_lines[0] == 'No log messages recorded for this entry' + def test_process_dump(self, run_cli_command, tmp_path, generate_workchain_multiply_add): + """Test verdi process dump""" + + # Only test CLI interface here, the actual functionalities of the Python API are tested in `test_processes.py` + test_path = tmp_path / 'cli-dump' + node = generate_workchain_multiply_add() + + # Giving a single identifier should print a non empty string message + options = [str(node.pk), '-p', str(test_path)] + result = run_cli_command(cmd_process.process_dump, options) + assert result.exception is None, result.output + assert 'Success:' in result.output + + # Trying to run the dumping again in the same path but without overwrite=True should raise exception + options = [str(node.pk), '-p', str(test_path)] + result = run_cli_command(cmd_process.process_dump, options, raises=True) + assert result.exit_code is ExitCode.CRITICAL + + # Works fine when using overwrite=True + options = [str(node.pk), '-p', str(test_path), '-o'] + result = run_cli_command(cmd_process.process_dump, options) + assert result.exception is None, result.output + assert 'Success:' in result.output + + # Set overwrite=True but provide bad directory, i.e. missing metadata file + (test_path / '.aiida_node_metadata.yaml').unlink() + + options = [str(node.pk), '-p', str(test_path), '-o'] + result = run_cli_command(cmd_process.process_dump, options, raises=True) + assert result.exit_code is ExitCode.CRITICAL + @pytest.mark.usefixtures('aiida_profile_clean') @pytest.mark.parametrize('numprocesses, percentage', ((0, 100), (1, 90))) diff --git a/tests/conftest.py b/tests/conftest.py index 936794b5e2..55bf01a185 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -25,6 +25,7 @@ import click import pytest from aiida import get_profile +from aiida.common.links import LinkType from aiida.manage.configuration import Profile, get_config, load_profile if t.TYPE_CHECKING: @@ -158,7 +159,14 @@ def generate_calculation_node(): """Generate an instance of a `CalculationNode`.""" from aiida.engine import ProcessState - def _generate_calculation_node(process_state=ProcessState.FINISHED, exit_status=None, entry_point=None): + def _generate_calculation_node( + process_state: ProcessState = ProcessState.FINISHED, + exit_status: int | None = None, + entry_point: str | None = None, + inputs: dict | None = None, + outputs: dict | None = None, + repository: pathlib.Path | None = None, + ): """Generate an instance of a `CalculationNode`.. :param process_state: state to set @@ -170,13 +178,38 @@ def _generate_calculation_node(process_state=ProcessState.FINISHED, exit_status= if process_state is ProcessState.FINISHED and exit_status is None: exit_status = 0 - node = CalculationNode(process_type=entry_point) - node.set_process_state(process_state) + calculation_node = CalculationNode(process_type=entry_point) + calculation_node.set_process_state(process_state) if exit_status is not None: - node.set_exit_status(exit_status) + calculation_node.set_exit_status(exit_status) + + if repository is not None: + calculation_node.base.repository.put_object_from_tree(repository) + + # For storing, need to first store the input nodes, then the CalculationNode, then the output nodes + if inputs is not None: + for input_label, input_node in inputs.items(): + calculation_node.base.links.add_incoming( + input_node, + link_type=LinkType.INPUT_CALC, + link_label=input_label, + ) + + input_node.store() + + if outputs is not None: + # Need to first store CalculationNode before I can attach `created` outputs + calculation_node.store() + for output_label, output_node in outputs.items(): + output_node.base.links.add_incoming( + calculation_node, link_type=LinkType.CREATE, link_label=output_label + ) - return node + output_node.store() + + # Return unstored by default + return calculation_node return _generate_calculation_node @@ -671,3 +704,48 @@ def reset_log_level(): log.CLI_ACTIVE = None log.CLI_LOG_LEVEL = None log.configure_logging(with_orm=True) + + +@pytest.fixture +def generate_calculation_node_add(aiida_localhost): + def _generate_calculation_node_add(): + from aiida.engine import run_get_node + from aiida.orm import InstalledCode, Int + from aiida.plugins import CalculationFactory + + arithmetic_add = CalculationFactory('core.arithmetic.add') + + add_inputs = { + 'x': Int(1), + 'y': Int(2), + 'code': InstalledCode(computer=aiida_localhost, filepath_executable='/bin/bash'), + } + + _, add_node = run_get_node(arithmetic_add, **add_inputs) + + return add_node + + return _generate_calculation_node_add + + +@pytest.fixture +def generate_workchain_multiply_add(aiida_localhost): + def _generate_workchain_multiply_add(): + from aiida.engine import run_get_node + from aiida.orm import InstalledCode, Int + from aiida.plugins import WorkflowFactory + + multiplyaddworkchain = WorkflowFactory('core.arithmetic.multiply_add') + + multiply_add_inputs = { + 'x': Int(1), + 'y': Int(2), + 'z': Int(3), + 'code': InstalledCode(computer=aiida_localhost, filepath_executable='/bin/bash'), + } + + _, multiply_add_node = run_get_node(multiplyaddworkchain, **multiply_add_inputs) + + return multiply_add_node + + return _generate_workchain_multiply_add diff --git a/tests/tools/dumping/test_processes.py b/tests/tools/dumping/test_processes.py new file mode 100644 index 0000000000..371dcb80a9 --- /dev/null +++ b/tests/tools/dumping/test_processes.py @@ -0,0 +1,468 @@ +########################################################################### +# Copyright (c), The AiiDA team. All rights reserved. # +# This file is part of the AiiDA code. # +# # +# The code is hosted on GitHub at https://github.com/aiidateam/aiida-core # +# For further information on the license, see the LICENSE.txt file # +# For further information please visit http://www.aiida.net # +########################################################################### +"""Tests for the dumping of ProcessNode data to disk.""" + +from __future__ import annotations + +import io +from pathlib import Path + +import pytest +from aiida.common.links import LinkType +from aiida.tools.dumping.processes import ProcessDumper + +# Non-AiiDA variables +filename = 'file.txt' +filecontent = 'a' +inputs_relpath = Path('inputs') +outputs_relpath = Path('outputs') +node_inputs_relpath = Path('node_inputs') +node_outputs_relpath = Path('node_outputs') +default_dump_paths = [inputs_relpath, outputs_relpath, node_inputs_relpath, node_outputs_relpath] +custom_dump_paths = [f'{path}_' for path in default_dump_paths] + +# Define variables used for constructing the nodes used to test the dumping +singlefiledata_linklabel = 'singlefile' +folderdata_linklabel = 'folderdata' +folderdata_relpath = Path('relative_path') +folderdata_test_path = folderdata_linklabel / folderdata_relpath +arraydata_linklabel = 'arraydata' +node_metadata_file = '.aiida_node_metadata.yaml' + + +# Helper functions to generate the actual `WorkflowNode`s and `CalculationNode`s used for testing +@pytest.fixture +def generate_calculation_node_io(generate_calculation_node, tmp_path): + def _generate_calculation_node_io(entry_point: str | None = None, attach_outputs: bool = True): + import numpy as np + from aiida.orm import ArrayData, FolderData, SinglefileData + + singlefiledata_input = SinglefileData.from_string(content=filecontent, filename=filename) + # ? Use instance for folderdata + folderdata = FolderData() + folderdata.put_object_from_filelike(handle=io.StringIO(filecontent), path=str(folderdata_relpath / filename)) # type: ignore[arg-type] + arraydata_input = ArrayData(arrays=np.ones(3)) + + # Create calculation inputs, outputs + calculation_node_inputs = { + singlefiledata_linklabel: singlefiledata_input, + folderdata_linklabel: folderdata, + arraydata_linklabel: arraydata_input, + } + + singlefiledata_output = singlefiledata_input.clone() + folderdata_output = folderdata.clone() + + if attach_outputs: + calculation_outputs = { + folderdata_linklabel: folderdata_output, + singlefiledata_linklabel: singlefiledata_output, + } + else: + calculation_outputs = None + + # Actually write repository file and then read it in when generating calculation_node + (tmp_path / filename).write_text(filecontent) + + calculation_node = generate_calculation_node( + repository=tmp_path, + inputs=calculation_node_inputs, + outputs=calculation_outputs, + entry_point=entry_point, + ) + return calculation_node + + return _generate_calculation_node_io + + +@pytest.fixture +def generate_workchain_node_io(): + def _generate_workchain_node_io(cj_nodes, store_all: bool = True): + """Generate an instance of a `WorkChain` that contains a sub-`WorkChain` and a `Calculation` with file io.""" + from aiida.orm import WorkflowNode + + wc_node = WorkflowNode() + wc_node_sub = WorkflowNode() + + # Add sub-workchain that calls a calculation + wc_node_sub.base.links.add_incoming(wc_node, link_type=LinkType.CALL_WORK, link_label='sub_workflow') + for cj_node in cj_nodes: + cj_node.base.links.add_incoming(wc_node_sub, link_type=LinkType.CALL_CALC, link_label='calculation') + + # Set process_state so that tests don't throw exception for build_call_graph of README generation + [cj_node.set_process_state('finished') for cj_node in cj_nodes] + wc_node.set_process_state('finished') + wc_node_sub.set_process_state('finished') + + # Need to store so that outputs are being dumped + if store_all: + wc_node.store() + wc_node_sub.store() + [cj_node.store() for cj_node in cj_nodes] + + return wc_node + + return _generate_workchain_node_io + + +# Only test top-level actions, like path and README creation +# Other things tested via `_dump_workflow` and `_dump_calculation` +def test_dump(generate_calculation_node_io, generate_workchain_node_io, tmp_path): + dump_parent_path = tmp_path / 'wc-dump-test-io' + process_dumper = ProcessDumper() + # Don't attach outputs, as it would require storing the calculation_node and then it cannot be used in the workchain + cj_nodes = [generate_calculation_node_io(attach_outputs=False), generate_calculation_node_io(attach_outputs=False)] + wc_node = generate_workchain_node_io(cj_nodes=cj_nodes) + return_path = process_dumper.dump(process_node=wc_node, output_path=dump_parent_path) + + assert dump_parent_path.is_dir() + assert (dump_parent_path / 'README.md').is_file() + assert return_path == dump_parent_path + + +def test_dump_workflow(generate_calculation_node_io, generate_workchain_node_io, tmp_path): + # Need to generate parent path for dumping, as I don't want the sub-workchains to be dumped directly into `tmp_path` + dump_parent_path = tmp_path / 'wc-workflow_dump-test-io' + process_dumper = ProcessDumper() + # Don't attach outputs, as it would require storing the calculation_node and then it cannot be used in the workchain + cj_nodes = [generate_calculation_node_io(attach_outputs=False), generate_calculation_node_io(attach_outputs=False)] + wc_node = generate_workchain_node_io(cj_nodes=cj_nodes) + process_dumper._dump_workflow(workflow_node=wc_node, output_path=dump_parent_path) + + input_path = '01-sub_workflow/01-calculation/inputs/file.txt' + singlefiledata_path = '01-sub_workflow/01-calculation/node_inputs/singlefile/file.txt' + folderdata_path = '01-sub_workflow/01-calculation/node_inputs/folderdata/relative_path/file.txt' + arraydata_path = '01-sub_workflow/01-calculation/node_inputs/arraydata/default.npy' + node_metadata_paths = [ + node_metadata_file, + f'01-sub_workflow/{node_metadata_file}', + f'01-sub_workflow/01-calculation/{node_metadata_file}', + f'01-sub_workflow/02-calculation/{node_metadata_file}', + ] + + expected_files = [input_path, singlefiledata_path, folderdata_path, arraydata_path, *node_metadata_paths] + expected_files = [dump_parent_path / expected_file for expected_file in expected_files] + + assert all([expected_file.is_file() for expected_file in expected_files]) + + # Flat dumping + dump_parent_path = tmp_path / 'wc-dump-test-io-flat' + process_dumper = ProcessDumper(flat=True) + process_dumper._dump_workflow(workflow_node=wc_node, output_path=dump_parent_path) + + input_path = '01-sub_workflow/01-calculation/file.txt' + arraydata_path = '01-sub_workflow/01-calculation/default.npy' + folderdata_path = '01-sub_workflow/01-calculation/relative_path/file.txt' + node_metadata_paths = [ + node_metadata_file, + f'01-sub_workflow/{node_metadata_file}', + f'01-sub_workflow/01-calculation/{node_metadata_file}', + f'01-sub_workflow/02-calculation/{node_metadata_file}', + ] + + expected_files = [input_path, folderdata_path, arraydata_path, *node_metadata_paths] + expected_files = [dump_parent_path / expected_file for expected_file in expected_files] + + assert all([expected_file.is_file() for expected_file in expected_files]) + + +def test_dump_multiply_add(tmp_path, generate_workchain_multiply_add): + dump_parent_path = tmp_path / 'wc-dump-test-multiply-add' + process_dumper = ProcessDumper() + wc_node = generate_workchain_multiply_add() + process_dumper.dump(process_node=wc_node, output_path=dump_parent_path) + + input_files = ['_aiidasubmit.sh', 'aiida.in', '.aiida/job_tmpl.json', '.aiida/calcinfo.json'] + output_files = ['_scheduler-stderr.txt', '_scheduler-stdout.txt', 'aiida.out'] + input_files = [ + dump_parent_path / '02-ArithmeticAddCalculation' / inputs_relpath / input_file for input_file in input_files + ] + input_files += [dump_parent_path / '01-multiply' / inputs_relpath / 'source_file'] + output_files = [ + dump_parent_path / '02-ArithmeticAddCalculation' / outputs_relpath / output_file for output_file in output_files + ] + + # No node_inputs contained in MultiplyAddWorkChain + assert all([input_file.is_file() for input_file in input_files]) + assert all([output_file.is_file() for output_file in output_files]) + + # Flat dumping + dump_parent_path = tmp_path / 'wc-dump-test-multiply-add-flat' + process_dumper = ProcessDumper(flat=True) + process_dumper.dump(process_node=wc_node, output_path=dump_parent_path) + + multiply_file = dump_parent_path / '01-multiply' / 'source_file' + arithmetic_add_files = [ + '_aiidasubmit.sh', + 'aiida.in', + '.aiida/job_tmpl.json', + '.aiida/calcinfo.json', + '_scheduler-stderr.txt', + '_scheduler-stdout.txt', + 'aiida.out', + ] + arithmetic_add_files = [ + dump_parent_path / '02-ArithmeticAddCalculation' / arithmetic_add_file + for arithmetic_add_file in arithmetic_add_files + ] + + assert multiply_file.is_file() + assert all([expected_file.is_file() for expected_file in arithmetic_add_files]) + + +# Tests for dump_calculation method +def test_dump_calculation_node(tmp_path, generate_calculation_node_io): + # Checking the actual content should be handled by `test_copy_tree` + + # Normal dumping -> node_inputs and not flat; no paths provided + dump_parent_path = tmp_path / 'cj-dump-test-io' + process_dumper = ProcessDumper(include_outputs=True) + calculation_node = generate_calculation_node_io() + process_dumper._dump_calculation(calculation_node=calculation_node, output_path=dump_parent_path) + + assert (dump_parent_path / inputs_relpath / filename).is_file() + assert (dump_parent_path / node_inputs_relpath / singlefiledata_linklabel / filename).is_file() + assert (dump_parent_path / node_inputs_relpath / folderdata_test_path / filename).is_file() + assert (dump_parent_path / node_inputs_relpath / arraydata_linklabel / 'default.npy').is_file() + + assert (dump_parent_path / node_outputs_relpath / singlefiledata_linklabel / filename).is_file() + assert (dump_parent_path / node_outputs_relpath / folderdata_test_path / filename).is_file() + + # Check contents once + with open(dump_parent_path / inputs_relpath / filename, 'r') as handle: + assert handle.read() == filecontent + with open(dump_parent_path / node_inputs_relpath / singlefiledata_linklabel / filename) as handle: + assert handle.read() == filecontent + with open(dump_parent_path / node_inputs_relpath / folderdata_test_path / filename) as handle: + assert handle.read() == filecontent + with open(dump_parent_path / node_outputs_relpath / singlefiledata_linklabel / filename) as handle: + assert handle.read() == filecontent + with open(dump_parent_path / node_outputs_relpath / folderdata_test_path / filename) as handle: + assert handle.read() == filecontent + + +def test_dump_calculation_flat(tmp_path, generate_calculation_node_io): + # Flat dumping -> no paths provided -> Default paths should not be existent. + # Internal FolderData structure retained. + dump_parent_path = tmp_path / 'cj-dump-test-custom' + process_dumper = ProcessDumper(flat=True) + calculation_node = generate_calculation_node_io() + process_dumper._dump_calculation(calculation_node=calculation_node, output_path=dump_parent_path) + + # Here, the same file will be written by inputs and node_outputs and node_inputs + # So it should only be present once in the parent dump directory + assert not (dump_parent_path / inputs_relpath).is_dir() + assert not (dump_parent_path / node_inputs_relpath).is_dir() + assert not (dump_parent_path / outputs_relpath).is_dir() + assert (dump_parent_path / filename).is_file() + assert (dump_parent_path / 'default.npy').is_file() + assert (dump_parent_path / folderdata_relpath / filename).is_file() + + +# Here, in principle, test only non-default arguments, as defaults tested above +# @pytest.mark.parametrize('overwrite', (True, False)) +def test_dump_calculation_overwrite(tmp_path, generate_calculation_node_io): + dump_parent_path = tmp_path / 'cj-dump-test-overwrite' + process_dumper = ProcessDumper(overwrite=False) + calculation_node = generate_calculation_node_io() + process_dumper._dump_calculation(calculation_node=calculation_node, output_path=dump_parent_path) + with pytest.raises(FileExistsError): + process_dumper._dump_calculation(calculation_node=calculation_node, output_path=dump_parent_path) + + +# With both inputs and outputs being dumped is the standard test case above, so only test without inputs here +def test_dump_calculation_no_inputs(tmp_path, generate_calculation_node_io): + dump_parent_path = tmp_path / 'cj-dump-test-noinputs' + process_dumper = ProcessDumper(include_inputs=False) + calculation_node = generate_calculation_node_io() + process_dumper._dump_calculation(calculation_node=calculation_node, output_path=dump_parent_path) + assert not (dump_parent_path / node_inputs_relpath).is_dir() + + +def test_dump_calculation_add(tmp_path, generate_calculation_node_add): + dump_parent_path = tmp_path / 'cj-dump-test-add' + + process_dumper = ProcessDumper() + calculation_node_add = generate_calculation_node_add() + process_dumper._dump_calculation(calculation_node=calculation_node_add, output_path=dump_parent_path) + + input_files = ['_aiidasubmit.sh', 'aiida.in', '.aiida/job_tmpl.json', '.aiida/calcinfo.json'] + output_files = ['_scheduler-stderr.txt', '_scheduler-stdout.txt', 'aiida.out'] + input_files = [dump_parent_path / inputs_relpath / input_file for input_file in input_files] + output_files = [dump_parent_path / outputs_relpath / output_file for output_file in output_files] + + assert all([input_file.is_file() for input_file in input_files]) + assert all([output_file.is_file() for output_file in output_files]) + + +# Tests for helper methods +def test_validate_make_dump_path(chdir_tmp_path, tmp_path): + chdir_tmp_path + + safeguard_file = node_metadata_file + + # Path must be provided + process_dumper = ProcessDumper(overwrite=False) + with pytest.raises(TypeError): + process_dumper._validate_make_dump_path() + + # Check if path created if non-existent + test_dir = tmp_path / Path('test-dir') + test_dir.mkdir() + output_path = process_dumper._validate_make_dump_path(validate_path=test_dir) + assert output_path == test_dir + + # Empty path is fine -> No error and full path returned + output_path = process_dumper._validate_make_dump_path(validate_path=test_dir) + assert output_path == test_dir + + # Fails if directory not empty, safeguard file existent, and overwrite set to False + (test_dir / filename).touch() + (test_dir / safeguard_file).touch() + with pytest.raises(FileExistsError): + output_path = process_dumper._validate_make_dump_path(validate_path=test_dir) + assert (test_dir / filename).is_file() + + # Works if directory not empty, but overwrite=True and safeguard_file (e.g. `.aiida_node_metadata.yaml`) contained + process_dumper = ProcessDumper(overwrite=True) + output_path = process_dumper._validate_make_dump_path(validate_path=test_dir, safeguard_file=safeguard_file) + assert output_path == test_dir + assert not (test_dir / safeguard_file).is_file() + + # Fails if directory not empty and overwrite set to True, but safeguard_file not found (for safety reasons) + # Could define new Exception for this... + (test_dir / filename).touch() + with pytest.raises(Exception): + output_path = process_dumper._validate_make_dump_path(validate_path=test_dir) + assert (test_dir / filename).is_file() + + +def test_generate_default_dump_path( + generate_calculation_node_add, + generate_workchain_multiply_add, +): + process_dumper = ProcessDumper() + add_node = generate_calculation_node_add() + multiply_add_node = generate_workchain_multiply_add() + add_path = process_dumper._generate_default_dump_path(process_node=add_node) + multiply_add_path = process_dumper._generate_default_dump_path(process_node=multiply_add_node) + + assert str(add_path) == f'dump-ArithmeticAddCalculation-{add_node.pk}' + assert str(multiply_add_path) == f'dump-MultiplyAddWorkChain-{multiply_add_node.pk}' + + +def test_generate_calculation_io_mapping(): + process_dumper = ProcessDumper() + calculation_io_mapping = process_dumper._generate_calculation_io_mapping() + assert calculation_io_mapping.repository == 'inputs' + assert calculation_io_mapping.retrieved == 'outputs' + assert calculation_io_mapping.inputs == 'node_inputs' + assert calculation_io_mapping.outputs == 'node_outputs' + + calculation_io_mapping = process_dumper._generate_calculation_io_mapping(io_dump_paths=custom_dump_paths) + assert calculation_io_mapping.repository == 'inputs_' + assert calculation_io_mapping.retrieved == 'outputs_' + assert calculation_io_mapping.inputs == 'node_inputs_' + assert calculation_io_mapping.outputs == 'node_outputs_' + + +def test_generate_child_node_label( + generate_workchain_multiply_add, generate_calculation_node_io, generate_workchain_node_io +): + # Check with manually constructed, more complex workchain + cj_node = generate_calculation_node_io(attach_outputs=False) + wc_node = generate_workchain_node_io(cj_nodes=[cj_node]) + wc_output_triples = wc_node.base.links.get_outgoing().all() + sub_wc_node = wc_output_triples[0].node + + output_triples = wc_output_triples + sub_wc_node.base.links.get_outgoing().all() + # Sort by mtime here, not ctime, as I'm actually creating the CalculationNode first. + output_triples = sorted(output_triples, key=lambda link_triple: link_triple.node.mtime) + + process_dumper = ProcessDumper() + + output_paths = sorted( + [ + process_dumper._generate_child_node_label(index, output_node) + for index, output_node in enumerate(output_triples) + ] + ) + assert output_paths == ['00-sub_workflow', '01-calculation'] + + # Check with multiply_add workchain node + multiply_add_node = generate_workchain_multiply_add() + output_triples = multiply_add_node.base.links.get_outgoing().all() + # Sort by ctime here, not mtime, as I'm generating the WorkChain normally + output_triples = sorted(output_triples, key=lambda link_triple: link_triple.node.ctime) + output_paths = sorted( + [process_dumper._generate_child_node_label(_, output_node) for _, output_node in enumerate(output_triples)] + ) + assert output_paths == ['00-multiply', '01-ArithmeticAddCalculation', '02-result'] + + +def test_dump_node_yaml(generate_calculation_node_io, tmp_path, generate_workchain_multiply_add): + process_dumper = ProcessDumper() + cj_node = generate_calculation_node_io(attach_outputs=False) + process_dumper._dump_node_yaml(process_node=cj_node, output_path=tmp_path) + + assert (tmp_path / node_metadata_file).is_file() + + # Test with multiply_add + wc_node = generate_workchain_multiply_add() + process_dumper._dump_node_yaml(process_node=wc_node, output_path=tmp_path) + + assert (tmp_path / node_metadata_file).is_file() + + # Open the dumped YAML file and read its contents + with open(tmp_path / node_metadata_file, 'r') as dumped_file: + contents = dumped_file.read() + + # Check if contents as expected + assert 'Node data:' in contents + assert 'User data:' in contents + # Computer is None for the locally run MultiplyAdd + assert 'Computer data:' not in contents + assert 'Node attributes:' in contents + assert 'Node extras:' in contents + + process_dumper = ProcessDumper(include_attributes=False, include_extras=False) + + process_dumper._dump_node_yaml(process_node=wc_node, output_path=tmp_path) + + # Open the dumped YAML file and read its contents + with open(tmp_path / node_metadata_file, 'r') as dumped_file: + contents = dumped_file.read() + + # Check if contents as expected -> No attributes and extras + assert 'Node data:' in contents + assert 'User data:' in contents + # Computer is None for the locally run MultiplyAdd + assert 'Computer data:' not in contents + assert 'Node attributes:' not in contents + assert 'Node extras:' not in contents + + +def test_generate_parent_readme(tmp_path, generate_workchain_multiply_add): + wc_node = generate_workchain_multiply_add() + process_dumper = ProcessDumper() + + process_dumper._generate_readme(process_node=wc_node, output_path=tmp_path) + + assert (tmp_path / 'README.md').is_file() + + with open(tmp_path / 'README.md', 'r') as dumped_file: + contents = dumped_file.read() + + assert 'This directory contains' in contents + assert '`MultiplyAddWorkChain' in contents + assert 'ArithmeticAddCalculation' in contents + # Check for outputs of `verdi process status/report/show` + assert 'Finished [0] [3:result]' in contents + assert 'Property Value' in contents + assert 'No log messages' in contents