diff --git a/src/nomad_simulations/schema_packages/workflow/general.py b/src/nomad_simulations/schema_packages/workflow/general.py index b19ce7b7..d731a715 100644 --- a/src/nomad_simulations/schema_packages/workflow/general.py +++ b/src/nomad_simulations/schema_packages/workflow/general.py @@ -1,17 +1,34 @@ from nomad.datamodel import ArchiveSection, EntryArchive from nomad.datamodel.metainfo.workflow import Link, Task, Workflow -from nomad.metainfo import SubSection +from nomad.metainfo import Quantity, SchemaPackage, SubSection from structlog.stdlib import BoundLogger +from nomad_simulations.schema_packages.model_method import ModelMethod +from nomad_simulations.schema_packages.model_system import ModelSystem + INCORRECT_N_TASKS = 'Incorrect number of tasks found.' +m_package = SchemaPackage() + -class SimulationWorkflowMethod(ArchiveSection): +class SimulationWorkflowModel(ArchiveSection): """ - Base class for simulation workflow method sub-section definition. + Base class for simulation workflow model sub-section definition. """ - pass + initial_system = Quantity( + type=ModelSystem, + description=""" + Reference to the input model_system. + """, + ) + + initial_method = Quantity( + type=ModelMethod, + description=""" + Reference to the input model_method. + """, + ) class SimulationWorkflowResults(ArchiveSection): @@ -19,7 +36,12 @@ class SimulationWorkflowResults(ArchiveSection): Base class for simulation workflow results sub-section definition. """ - pass + final_system = Quantity( + type=ModelSystem, + description=""" + Reference to the final model_system. + """, + ) class SimulationWorkflow(Workflow): @@ -27,77 +49,100 @@ class SimulationWorkflow(Workflow): Base class for simulation workflows. """ - method = SubSection(sub_section=SimulationWorkflowMethod.m_def) + model_label = 'Input model' + + results_label = 'Output results' + + model = SubSection(sub_section=SimulationWorkflowModel.m_def) results = SubSection(sub_section=SimulationWorkflowResults.m_def) - def normalize(self, archive: EntryArchive, logger: BoundLogger): + def generate_inputs(self, archive: EntryArchive, logger: BoundLogger) -> None: + if not self.model: + self.model = SimulationWorkflowModel() + self.model.initial_method = archive.data.outputs[0].model_method_ref + self.model.initial_system = archive.data.outputs[0].model_system_ref + + # set method as inputs + self.inputs.append(Link(name=self.model_label, section=self.model)) + + def generate_outputs(self, archive: EntryArchive, logger: BoundLogger) -> None: + if not self.results: + self.results = SimulationWorkflowResults() + self.results.final_system = archive.data.outputs[-1].model_system_ref + + # set results as outputs + self.outputs.append(Link(name=self.results_label, section=self.results)) + + def generate_tasks(self, archive: EntryArchive, logger: BoundLogger) -> None: """ - Generate tasks from the archive data outputs. + Generate tasks from archive data outputs. Tasks are ordered and linked based + on the execution time of the calculation corresponding to the output. + By default, the tasks follow the order of the outputs and are linked sequentially. """ + # default should to serial execution + times: list[tuple[float, float]] = list( + [ + (o.wall_start or n, o.wall_end or n) + for n, o in enumerate(archive.data.outputs) + ] + ) + times.sort(key=lambda x: x[0]) + # current parent task + parent_n = 0 + parent_outputs: list[Link] = [] + for n, time in enumerate(times): + task = Task( + outputs=[ + Link( + name='Output', + section=archive.data.outputs[n], + ) + ], + ) + self.tasks.append(task) + # link tasks based on overlap in execution time + if time[0] >= times[parent_n][1]: + # if no overlap, assign outputs of parent as input to next task + task.inputs.extend( + [ + Link(name='Input', section=output.section) + for output in parent_outputs or task.outputs + ] + ) + # assign first parent outputs as workflow inputs + if not self.inputs: + self.inputs.extend(task.inputs) + # assign as new parent + parent_n = n + # reset outputs + parent_outputs = list(task.outputs) + else: + parent_outputs.extend(task.outputs) + # if overlap, assign parent outputs to task inputs + task.inputs.extend( + [ + Link(name='Input', section=output.section) + for output in self.tasks[parent_n or n].outputs + ] + ) + + if not self.outputs: + # assign parent outputs as workflow outputs + self.outputs.extend(parent_outputs) + + def normalize(self, archive: EntryArchive, logger: BoundLogger): if not archive.data or not archive.data.outputs: return - # generate tasks from outputs - if not self.tasks: - # default should to serial execution - times: list[tuple[float, float]] = list( - [ - (o.wall_start or n, o.wall_end or n) - for n, o in enumerate(archive.data.outputs) - ] - ) - times.sort(key=lambda x: x[0]) - # current parent task - parent_n = 0 - parent_outputs: list[Link] = [] - for n, time in enumerate(times): - task = Task( - outputs=[ - Link( - name='Output', - section=archive.data.outputs[n], - ) - ], - ) - self.tasks.append(task) - # link tasks based on overlap in execution time - if time[0] >= times[parent_n][1]: - # if no overlap, assign outputs of parent as input to next task - task.inputs.extend( - [ - Link(name='Input', section=output.section) - for output in parent_outputs or task.outputs - ] - ) - # assign first parent outputs as workflow inputs - if not self.inputs: - self.inputs.extend(task.inputs) - # assign as new parent - parent_n = n - # reset outputs - parent_outputs = list(task.outputs) - else: - parent_outputs.extend(task.outputs) - # if overlap, assign parent outputs to task inputs - task.inputs.extend( - [ - Link(name='Input', section=output.section) - for output in self.tasks[parent_n or n].outputs - ] - ) - if not self.outputs: - # assign parent outputs as workflow outputs - self.outputs.extend(parent_outputs) + if not self.inputs: + self.generate_inputs(archive, logger) - if not self.method: - self.method = SimulationWorkflowMethod() + if not self.outputs: + self.generate_outputs(archive, logger) - if not self.results: - self.results = SimulationWorkflowResults() + if not self.tasks: + self.generate_tasks(archive, logger) - # set method as inputs - self.inputs.append(Link(name='Input method', section=self.method)) - # set results as outputs - self.outputs.append(Link(name='Ouput results', section=self.results)) +m_package.__init_metainfo__() diff --git a/src/nomad_simulations/schema_packages/workflow/geometry_optimization.py b/src/nomad_simulations/schema_packages/workflow/geometry_optimization.py index c3b9875f..b54c5139 100644 --- a/src/nomad_simulations/schema_packages/workflow/geometry_optimization.py +++ b/src/nomad_simulations/schema_packages/workflow/geometry_optimization.py @@ -1,19 +1,18 @@ +import numpy as np from nomad.datamodel import EntryArchive -from nomad.datamodel.metainfo.workflow import Link, Task -from nomad.metainfo import MEnum, Quantity -from nomad.metainfo.util import MSubSectionList +from nomad.metainfo import MEnum, Quantity, SchemaPackage from structlog.stdlib import BoundLogger -from nomad_simulations.schema_packages.outputs import Outputs - from .general import ( SimulationWorkflow, - SimulationWorkflowMethod, + SimulationWorkflowModel, SimulationWorkflowResults, ) +m_package = SchemaPackage() + -class GeometryOptimizationMethod(SimulationWorkflowMethod): +class GeometryOptimizationModel(SimulationWorkflowModel): optimization_type = Quantity( type=MEnum('static', 'atomic', 'cell_shape', 'cell_volume'), shape=[], @@ -94,13 +93,73 @@ class GeometryOptimizationMethod(SimulationWorkflowMethod): type=int, shape=[], description=""" - The number of optimization steps between sucessive outputs. + The number of optimization steps between saved outputs. """, ) class GeometryOptimizationResults(SimulationWorkflowResults): - pass + n_steps = Quantity( + type=int, + shape=[], + description=""" + Number of saved optimization steps. + """, + ) + + energies = Quantity( + type=np.float64, + unit='joule', + shape=['optimization_steps'], + description=""" + List of energy_total values gathered from the single configuration + calculations that are a part of the optimization trajectory. + """, + ) + + steps = Quantity( + type=np.int32, + shape=['optimization_steps'], + description=""" + The step index corresponding to each saved configuration. + """, + ) + + final_energy_difference = Quantity( + type=np.float64, + shape=[], + unit='joule', + description=""" + The difference in the energy_total between the last two steps during + optimization. + """, + ) + + final_force_maximum = Quantity( + type=np.float64, + shape=[], + unit='newton', + description=""" + The maximum net force in the last optimization step. + """, + ) + + final_displacement_maximum = Quantity( + type=np.float64, + shape=[], + unit='meter', + description=""" + The maximum displacement in the last optimization step with respect to previous. + """, + ) + + is_converged_geometry = Quantity( + type=bool, + shape=[], + description=""" + Indicates if the geometry convergence criteria were fulfilled. + """, + ) class GeometryOptimization(SimulationWorkflow): @@ -108,37 +167,28 @@ class GeometryOptimization(SimulationWorkflow): Definitions for geometry optimization workflow. """ - def normalize(self, archive: EntryArchive, logger: BoundLogger) -> None: - """ - Specify the inputs and outputs of the tasks as the model system. - """ + task_label = 'Step' - # set up first method and results before we call base normalizer - if not self.method: - self.method = GeometryOptimizationMethod() + def generate_inputs(self, archive: EntryArchive, logger: BoundLogger) -> None: + if not self.model: + self.model = GeometryOptimizationModel() + super().generate_inputs(archive, logger) + def generate_outputs(self, archive: EntryArchive, logger: BoundLogger): if not self.results: self.results = GeometryOptimizationResults() + super().generate_outputs(archive, logger) - super().normalize(archive, logger) - - def extend_links(task: Task) -> None: - def get_system_links(links: MSubSectionList, name: str) -> list[Link]: - return [ - Link(name=name, section=link.section.model_system_ref) - for link in links - if isinstance(link.section, Outputs) - and link.section.model_system_ref - ] + def generate_tasks(self, archive: EntryArchive, logger: BoundLogger) -> None: + super().generate_tasks(archive, logger) + for n, task in enumerate(self.tasks): + if not task.name: + task.name = f'{self.task_label} {n}' - task.inputs.extend(get_system_links(self.inputs, 'Input system')) - task.outputs.extend(get_system_links(self.outputs, 'Output system')) + # link inputs to first task + self.tasks[0].inputs.extend(self.inputs) + # add outputs of last task to outputs + self.outputs.extend(self.tasks[-1].outputs) - if not self.name: - self.name = 'Geometry Optimization' - extend_links(self) - for n, task in enumerate(self.tasks): - if not task.name: - task.name = f'Step {n}' - extend_links(task) +m_package.__init_metainfo__() diff --git a/src/nomad_simulations/schema_packages/workflow/gw.py b/src/nomad_simulations/schema_packages/workflow/gw.py index a5bede53..20350f6d 100644 --- a/src/nomad_simulations/schema_packages/workflow/gw.py +++ b/src/nomad_simulations/schema_packages/workflow/gw.py @@ -1,8 +1,11 @@ from nomad.datamodel import EntryArchive +from nomad.metainfo import SchemaPackage from structlog.stdlib import BoundLogger from .general import INCORRECT_N_TASKS, SimulationWorkflow +m_package = SchemaPackage() + class DFTGWWorkflow(SimulationWorkflow): """ @@ -35,3 +38,6 @@ def normalize(self, archive: EntryArchive, logger: BoundLogger) -> None: self.tasks[0].outputs = self.tasks[0].task.outputs self.tasks[1].inputs = self.tasks[0].outputs self.tasks[1].outputs = self.outputs + + +m_package.__init_metainfo__() diff --git a/src/nomad_simulations/schema_packages/workflow/single_point.py b/src/nomad_simulations/schema_packages/workflow/single_point.py index c377095c..4b50e560 100644 --- a/src/nomad_simulations/schema_packages/workflow/single_point.py +++ b/src/nomad_simulations/schema_packages/workflow/single_point.py @@ -1,8 +1,32 @@ from nomad.datamodel import EntryArchive -from nomad.datamodel.metainfo.workflow import Link +from nomad.datamodel.metainfo.workflow import Task +from nomad.metainfo import SchemaPackage from structlog.stdlib import BoundLogger -from .general import INCORRECT_N_TASKS, SimulationWorkflow +from .general import ( + INCORRECT_N_TASKS, + SimulationWorkflow, + SimulationWorkflowModel, + SimulationWorkflowResults, +) + +m_package = SchemaPackage() + + +class SinglePointModel(SimulationWorkflowModel): + """ + Contains definitions for the input model of a single point workflow. + """ + + pass + + +class SinglePointResults(SimulationWorkflowResults): + """ + Contains defintions for the results of a single point workflow. + """ + + pass class SinglePoint(SimulationWorkflow): @@ -10,31 +34,27 @@ class SinglePoint(SimulationWorkflow): Definitions for single point workflow. """ - def normalize(self, archive: EntryArchive, logger: BoundLogger) -> None: - """ - Specify the method and system as inputs. - """ - super().normalize(archive, logger) - if len(self.tasks) != 1: + task_label = 'Calculation' + + def generate_inputs(self, archive: EntryArchive, logger: BoundLogger) -> None: + if not self.model: + self.model = SinglePointModel() + super().generate_inputs(archive, logger) + + def generate_outputs(self, archive: EntryArchive, logger: BoundLogger) -> None: + if not self.results: + self.results = SinglePointResults() + super().generate_outputs(archive, logger) + + def generate_tasks(self, archive: EntryArchive, logger: BoundLogger) -> None: + if len(archive.data.outputs) != 1: logger.error(INCORRECT_N_TASKS) - return - - if not self.inputs: - self.inputs.extend(self.tasks[0].inputs) - - inps: list[Link] = [] - for inp in self.inputs: - if inp.section and inp.section.model_system_ref: - inps.append( - Link(name='Input system', section=inp.section.model_system_ref) - ) - if inp.section and inp.section.model_method_ref: - inps.append( - Link(name='Input method', section=inp.section.model_method_ref) - ) - self.inputs.clear() - self.inputs.extend(inps) - - # reconnect inputs to link as these are redefined - self.tasks[0].inputs.clear() - self.tasks[0].inputs.extend(inps) + + task = Task(name=self.task_label) + task.inputs.extend(self.inputs) + task.outputs.extend(self.outputs) + + self.tasks.append(task) + + +m_package.__init_metainfo__()