From 9408071cea3041311314e48f7e2c82f49fadf4e9 Mon Sep 17 00:00:00 2001 From: zhenyu <76582286+wangzyphysics@users.noreply.github.com> Date: Wed, 29 May 2024 15:56:08 +0800 Subject: [PATCH] feat: merged mode for caly evo step (#224) one can choose mode from `default` and `merge` in run_explore_config/mode ## Summary by CodeRabbit - **New Features** - Introduced `CalyEvoStepMerge` class to handle merging results from evolutionary steps. - Added support for different execution modes ("merge" and "default") in the concurrent learning operations. - **Improvements** - Updated configuration settings in `input.test.json` to include new image versions and refined atom configurations. - **Bug Fixes** - Adjusted temporary directory path in `prep_caly_dp_optim.py` to correct directory structure. - Modified parameter types and added new parameters in various functions to enhance flexibility and accuracy. - **Tests** - Added new test cases for `CalyEvoStepMerge` and other workflow operations. - Updated existing tests to reflect changes in parameters and paths. --------- Signed-off-by: zjgemi Co-authored-by: zjgemi --- dpgen2/entrypoint/submit.py | 39 +++- dpgen2/op/caly_evo_step_merge.py | 129 ++++++++++ dpgen2/op/collect_run_caly.py | 4 +- dpgen2/op/prep_caly_dp_optim.py | 4 +- dpgen2/op/prep_caly_input.py | 4 +- dpgen2/superop/__init__.py | 3 - dpgen2/superop/caly_evo_step.py | 21 +- dpgen2/superop/prep_run_calypso.py | 33 ++- examples/calypso/input.test.json | 59 +++-- tests/op/test_prep_caly_dp_optim.py | 2 +- tests/op/test_prep_caly_input.py | 1 + tests/test_caly_evo_step.py | 1 - tests/test_merge_caly_evo_step.py | 350 ++++++++++++++++++++++++++++ tests/test_prep_run_caly.py | 95 ++++++-- 14 files changed, 658 insertions(+), 87 deletions(-) create mode 100644 dpgen2/op/caly_evo_step_merge.py create mode 100644 tests/test_merge_caly_evo_step.py diff --git a/dpgen2/entrypoint/submit.py b/dpgen2/entrypoint/submit.py index 4bcc4f21..70f7d492 100644 --- a/dpgen2/entrypoint/submit.py +++ b/dpgen2/entrypoint/submit.py @@ -103,14 +103,19 @@ RunLmp, SelectConfs, ) +from dpgen2.op.caly_evo_step_merge import ( + CalyEvoStepMerge, +) from dpgen2.superop import ( - CalyEvoStep, ConcurrentLearningBlock, PrepRunCaly, PrepRunDPTrain, PrepRunFp, PrepRunLmp, ) +from dpgen2.superop.caly_evo_step import ( + CalyEvoStep, +) from dpgen2.utils import ( BinaryFileInput, bohrium_config_from_dict, @@ -149,6 +154,7 @@ def make_concurrent_learning_op( upload_python_packages: Optional[List[os.PathLike]] = None, valid_data: Optional[S3Artifact] = None, ): + expl_mode = run_explore_config.get("mode", "default") if train_style in ("dp", "dp-dist"): prep_run_train_op = PrepRunDPTrain( "prep-run-dp-train", @@ -171,15 +177,28 @@ def make_concurrent_learning_op( upload_python_packages=upload_python_packages, ) elif explore_style == "calypso": - caly_evo_step_op = CalyEvoStep( - "caly-evo-step", - collect_run_caly=CollRunCaly, - prep_dp_optim=PrepCalyDPOptim, - run_dp_optim=RunCalyDPOptim, - prep_config=prep_explore_config, - run_config=run_explore_config, - upload_python_packages=upload_python_packages, - ) + if expl_mode == "merge": + caly_evo_step_op = CalyEvoStepMerge( + name="caly-evo-step", + collect_run_caly=CollRunCaly, + prep_dp_optim=PrepCalyDPOptim, + run_dp_optim=RunCalyDPOptim, + prep_config=prep_explore_config, + run_config=run_explore_config, + upload_python_packages=None, + ) + elif expl_mode == "default": + caly_evo_step_op = CalyEvoStep( + name="caly-evo-step", + collect_run_caly=CollRunCaly, + prep_dp_optim=PrepCalyDPOptim, + run_dp_optim=RunCalyDPOptim, + prep_config=prep_explore_config, + run_config=run_explore_config, + upload_python_packages=upload_python_packages, + ) + else: + raise KeyError(f"Unknown key: {expl_mode}, support `default` and `merge`.") prep_run_explore_op = PrepRunCaly( "prep-run-calypso", prep_caly_input_op=PrepCalyInput, diff --git a/dpgen2/op/caly_evo_step_merge.py b/dpgen2/op/caly_evo_step_merge.py new file mode 100644 index 00000000..8abda6dd --- /dev/null +++ b/dpgen2/op/caly_evo_step_merge.py @@ -0,0 +1,129 @@ +import json +import logging +import pickle +import shutil +from pathlib import ( + Path, +) +from typing import ( + List, + Tuple, +) + +from dflow import ( + Step, + Workflow, + download_artifact, + upload_artifact, +) +from dflow.python import ( + OP, + OPIO, + Artifact, + BigParameter, + OPIOSign, + Parameter, + Slices, + TransientError, +) +from dflow.utils import ( + flatten, +) + +from dpgen2.constants import ( + calypso_check_opt_file, + calypso_run_opt_file, +) +from dpgen2.exploration.task import ( + ExplorationTaskGroup, +) +from dpgen2.superop.caly_evo_step import ( + CalyEvoStep, +) +from dpgen2.utils import ( + BinaryFileInput, + set_directory, +) +from dpgen2.utils.run_command import ( + run_command, +) + + +class CalyEvoStepMerge(OP): + def __init__(self, mode="debug", *args, **kwargs): + self.mode = mode + self.args = args + self.kwargs = kwargs + + @classmethod + def get_input_sign(cls): + return OPIOSign( + { + "iter_num": int, + "cnt_num": Parameter(int, default=0), + "block_id": Parameter(str, default=""), + "task_name": BigParameter(str), + "expl_config": BigParameter(dict), + "models": Artifact(Path), + "input_file": Artifact(Path), + "caly_run_opt_file": Artifact(Path), + "caly_check_opt_file": Artifact(Path), + "results": Artifact(Path, optional=True), + "step": Artifact(Path, optional=True), + "opt_results_dir": Artifact(List[Path], optional=True), + "qhull_input": Artifact(Path, optional=True), + } + ) + + @classmethod + def get_output_sign(cls): + return OPIOSign( + { + "traj_results": Artifact(List[Path]), + } + ) + + @OP.exec_sign_check + def execute( + self, + ip: OPIO, + ) -> OPIO: + from dflow import ( + config, + ) + + config["mode"] = self.mode + wf = Workflow("caly-evo-workflow") + steps = CalyEvoStep(*self.args, **self.kwargs) + step = Step( + "caly-evo-step", + template=steps, + slices=Slices(output_artifact=["traj_results"]), + parameters={k: ip[k] for k in steps.inputs.parameters}, + artifacts={ + k: upload_artifact(ip[k]) if ip[k] is not None else None + for k in steps.inputs.artifacts + }, + with_param=[0], + ) + wf.add(step) + wf.submit() + wf.wait() + assert wf.query_status() == "Succeeded" + out = OPIO() + step = wf.query_step("caly-evo-step")[0] + for k in step.outputs.parameters: + out[k] = step.outputs.parameters[k].value + output_sign = self.get_output_sign() + for k in step.outputs.artifacts: + path_list = download_artifact(step.outputs.artifacts[k]) + if output_sign[k].type == List[Path]: + if not isinstance(path_list, list) or any( + [p is not None and not isinstance(p, str) for p in path_list] + ): + path_list = list(flatten(path_list).values()) + out[k] = [Path(p) for p in path_list] + elif output_sign[k].type == Path: + assert len(path_list) == 1 + out[k] = Path(path_list[0]) + return out diff --git a/dpgen2/op/collect_run_caly.py b/dpgen2/op/collect_run_caly.py index d5738310..4b6148f6 100644 --- a/dpgen2/op/collect_run_caly.py +++ b/dpgen2/op/collect_run_caly.py @@ -59,7 +59,7 @@ def get_input_sign(cls): return OPIOSign( { "config": BigParameter(dict), # for command - "task_name": Parameter(str), # calypso_task.idx + "task_name": BigParameter(str), # calypso_task.idx "cnt_num": Parameter(int), "input_file": Artifact(Path), # input.dat, !!! must be provided "step": Artifact(type=Path, optional=True), # step file @@ -77,7 +77,7 @@ def get_input_sign(cls): def get_output_sign(cls): return OPIOSign( { - "task_name": Parameter(str), # calypso_task.idx + "task_name": BigParameter(str), # calypso_task.idx "finished": Parameter(str), # True if cnt_num == maxstep "poscar_dir": Artifact(Path), # dir contains POSCAR* of next step "input_file": Artifact(Path), # input.dat diff --git a/dpgen2/op/prep_caly_dp_optim.py b/dpgen2/op/prep_caly_dp_optim.py index 81e69255..d2e4d8b0 100644 --- a/dpgen2/op/prep_caly_dp_optim.py +++ b/dpgen2/op/prep_caly_dp_optim.py @@ -53,7 +53,7 @@ class PrepCalyDPOptim(OP): def get_input_sign(cls): return OPIOSign( { - "task_name": Parameter(str), # calypso_task.idx + "task_name": BigParameter(str), # calypso_task.idx "finished": Parameter(str), "template_slice_config": Parameter(dict), "poscar_dir": Artifact( @@ -149,7 +149,7 @@ def execute( Path(caly_check_opt_file.name).symlink_to(caly_check_opt_file) task_names = [str(task_dir) for task_dir in task_dirs] else: - temp_dir = work_dir / "opt_path" + temp_dir = work_dir / "opt_path_0" temp_dir.mkdir(parents=True, exist_ok=True) task_dirs = [temp_dir] task_names = [str(task_dir) for task_dir in task_dirs] diff --git a/dpgen2/op/prep_caly_input.py b/dpgen2/op/prep_caly_input.py index b0b2a919..e3da359a 100644 --- a/dpgen2/op/prep_caly_input.py +++ b/dpgen2/op/prep_caly_input.py @@ -318,7 +318,8 @@ def get_input_sign(cls): def get_output_sign(cls): return OPIOSign( { - "task_names": Parameter(List[str]), # task dir names + "ntasks": Parameter(int), + "task_names": BigParameter(List[str]), # task dir names "input_dat_files": Artifact(List[Path]), # `input.dat`s "caly_run_opt_files": Artifact(List[Path]), "caly_check_opt_files": Artifact(List[Path]), @@ -367,6 +368,7 @@ def execute( return OPIO( { + "ntasks": len(task_names), "task_names": task_names, "input_dat_files": input_dat_files, "caly_run_opt_files": caly_run_opt_files, diff --git a/dpgen2/superop/__init__.py b/dpgen2/superop/__init__.py index 376bda97..f573f7d1 100644 --- a/dpgen2/superop/__init__.py +++ b/dpgen2/superop/__init__.py @@ -1,9 +1,6 @@ from .block import ( ConcurrentLearningBlock, ) -from .caly_evo_step import ( - CalyEvoStep, -) from .prep_run_calypso import ( PrepRunCaly, ) diff --git a/dpgen2/superop/caly_evo_step.py b/dpgen2/superop/caly_evo_step.py index 547cd1ea..320313e5 100644 --- a/dpgen2/superop/caly_evo_step.py +++ b/dpgen2/superop/caly_evo_step.py @@ -141,9 +141,20 @@ def _caly_evo_step( run_config = deepcopy(run_config) prep_template_config = prep_config.pop("template_config") run_template_config = run_config.pop("template_config") - prep_executor = init_executor(prep_config.pop("executor")) - run_executor = init_executor(run_config.pop("executor")) + prep_executor_config = prep_config.pop("executor") + run_executor_config = run_config.pop("executor") template_slice_config = run_config.pop("template_slice_config", {}) + expl_mode = run_config.pop("mode", "default") + + def wise_executor(expl_mode, origin_executor_config): + if expl_mode == "default": + return init_executor(deepcopy(origin_executor_config)) + elif expl_mode == "merge": + return None + else: + raise NotImplementedError( + f"Unknown expl_mode {expl_mode}, only support `default` and `merge`." + ) # collect the last step files and run calypso.x to generate structures collect_run_calypso = Step( @@ -171,7 +182,7 @@ def _caly_evo_step( caly_evo_step_steps.inputs.parameters["iter_num"], caly_evo_step_steps.inputs.parameters["cnt_num"], ), - executor=prep_executor, + executor=wise_executor(expl_mode, prep_executor_config), **run_config, ) caly_evo_step_steps.add(collect_run_calypso) @@ -205,7 +216,7 @@ def _caly_evo_step( caly_evo_step_steps.inputs.parameters["iter_num"], caly_evo_step_steps.inputs.parameters["cnt_num"], ), - executor=prep_executor, # cpu is enough to run calypso.x, default step config is c2m4 + executor=wise_executor(expl_mode, prep_executor_config), **run_config, ) caly_evo_step_steps.add(prep_dp_optim) @@ -238,7 +249,7 @@ def _caly_evo_step( caly_evo_step_steps.inputs.parameters["iter_num"], caly_evo_step_steps.inputs.parameters["cnt_num"], ), - executor=run_executor, + executor=wise_executor(expl_mode, run_executor_config), **run_config, ) caly_evo_step_steps.add(run_dp_optim) diff --git a/dpgen2/superop/prep_run_calypso.py b/dpgen2/superop/prep_run_calypso.py index 647e5bb2..0924d909 100644 --- a/dpgen2/superop/prep_run_calypso.py +++ b/dpgen2/superop/prep_run_calypso.py @@ -11,6 +11,7 @@ List, Optional, Type, + Union, ) from dflow import ( @@ -47,17 +48,13 @@ ) from dpgen2.utils.step_config import normalize as normalize_step_dict -from .caly_evo_step import ( - CalyEvoStep, -) - class PrepRunCaly(Steps): def __init__( self, name: str, prep_caly_input_op: Type[OP], - caly_evo_step_op: OPTemplate, + caly_evo_step_op: Union[OPTemplate, OP], prep_caly_model_devi_op: Type[OP], run_caly_model_devi_op: Type[OP], prep_config: dict = normalize_step_dict({}), @@ -147,7 +144,7 @@ def _prep_run_caly( prep_run_caly_steps: Steps, step_keys: Dict[str, Any], prep_caly_input_op: Type[OP], - caly_evo_step_op: OPTemplate, + caly_evo_step_op: Union[OPTemplate, OP], prep_caly_model_devi_op: Type[OP], run_caly_model_devi_op: Type[OP], prep_config: dict = normalize_step_dict({}), @@ -161,6 +158,7 @@ def _prep_run_caly( prep_executor = init_executor(prep_config.pop("executor")) run_executor = init_executor(run_config.pop("executor")) template_slice_config = run_config.pop("template_slice_config", {}) + expl_mode = run_config.pop("mode", "default") # prep caly input files prep_caly_input = Step( @@ -181,10 +179,24 @@ def _prep_run_caly( prep_run_caly_steps.add(prep_caly_input) temp_value = None + if expl_mode == "default": + caly_evo_step_config = prep_config + caly_evo_step_executor = None + template = caly_evo_step_op + elif expl_mode == "merge": + caly_evo_step_config = run_config + caly_evo_step_executor = run_executor + template = PythonOPTemplate( + caly_evo_step_op, # type: ignore + python_packages=upload_python_packages, + **run_template_config, + ) # type: ignore + else: + raise KeyError(f"Unknown expl mode `{expl_mode}`") caly_evo_step = Step( - name="caly-evo-step", - template=caly_evo_step_op, + "caly-evo-step", + template=template, # type: ignore slices=Slices( input_parameter=[ "task_name", @@ -220,8 +232,9 @@ def _prep_run_caly( "qhull_input": temp_value, }, key=step_keys["caly-evo-step-{{item}}"], - executor=prep_executor, - **prep_config, + with_param=argo_range(prep_caly_input.outputs.parameters["ntasks"]), # type: ignore + executor=caly_evo_step_executor, + **caly_evo_step_config, ) prep_run_caly_steps.add(caly_evo_step) diff --git a/examples/calypso/input.test.json b/examples/calypso/input.test.json index 6c745f02..67b3b86d 100644 --- a/examples/calypso/input.test.json +++ b/examples/calypso/input.test.json @@ -1,8 +1,8 @@ { "bohrium_config": { - "username": "x@x.cn", + "username": "wzy@calypso.cn", "password": "xxx", - "project_id": 111111, + "project_id": 1, "_host": "https://workflow.dp.tech/", "_k8s_api_server": "https://workflows.deepmodeling.com", "_repo_key": "oss-bohrium", @@ -10,7 +10,7 @@ }, "default_step_config": { "template_config": { - "image": "registry.dp.tech/dptech/prod-11265/deepmdv3-dpgen2-calypso:v0.2", + "image": "registry.dp.tech/dptech/prod-11265/deepmdv3-dgpen2-calypso:v0.4", "_comment": "all" }, "executor": { @@ -33,7 +33,7 @@ "step_configs": { "run_train_config": { "template_config": { - "image": "registry.dp.tech/dptech/prod-11265/deepmdv3-dpgen2-calypso:v0.2", + "image": "registry.dp.tech/dptech/prod-11265/deepmdv3-dgpen2-calypso:v0.4", "_comment": "all" }, "executor": { @@ -47,7 +47,8 @@ "input_data": { "job_type": "container", "platform": "ali", - "scass_type": "1 * NVIDIA V100_32g" + "_scass_type": "1 * NVIDIA V100_16g", + "scass_type": "c16_m64_cpu" } } } @@ -59,8 +60,10 @@ "_comment": "all" }, "run_explore_config": { + "mode": "default", + "_mode": "merge", "template_config": { - "image": "registry.dp.tech/dptech/prod-11265/deepmdv3-dpgen2-calypso:v0.2", + "image": "registry.dp.tech/dptech/prod-11265/deepmdv3-dgpen2-calypso:v0.4", "_comment": "all" }, "_continue_on_success_ratio": 0.8, @@ -75,14 +78,15 @@ "input_data": { "job_type": "container", "platform": "ali", - "scass_type": "1 * NVIDIA T4_16g" + "scass_type": "c16_m64_cpu" } } } }, "template_slice_config": { - "group_size": 2, - "pool_size": 1 + "group_size": 1, + "pool_size": 1, + "model_devi_group_size": 1 }, "_comment": "all" }, @@ -116,10 +120,8 @@ }, "_comment": "all" }, - "_upload_python_packages": [ - "/root/dpgen2/dpgen2", - "/opt/re_e_bias_new/deepmd-kit/deepmd", - "/root/dflow/src/dflow" + "upload_python_packages": [ + "/root/dpgen2/dpgen2" ], "inputs": { "mixed_type": true, @@ -442,23 +444,16 @@ "stages": [ [ { - "numb_of_species": 1, + "numb_of_species": 3, "numb_of_atoms": [ - 1 + 3, 2, 1 ], "name_of_atoms": [ - "Mg" + ["Mg", "Si", "O"], ["La", "Li"], ["H"] ], - "atomic_number": [ - 3 - ], - "pop_size": 50, - "max_step": 3, - "distance_of_ions": [ - [ - 1.0 - ] - ] + "pop_size": 5, + "max_step": 2, + "distance_of_ions": {"Mg": 0.7, "Si": 0.7, "O": 0.7, "La": 0.7, "Li": 0.7, "H": 0.7} }, { "numb_of_species": 2, @@ -474,8 +469,8 @@ 3, 37 ], - "pop_size": 50, - "max_step": 3, + "pop_size": 5, + "max_step": 2, "distance_of_ions": [ [ 1.0, @@ -500,8 +495,8 @@ "atomic_number": [ 3 ], - "pop_size": 50, - "max_step": 3, + "pop_size": 5, + "max_step": 2, "distance_of_ions": [ [ 1.0 @@ -522,8 +517,8 @@ 3, 37 ], - "pop_size": 50, - "max_step": 3, + "pop_size": 5, + "max_step": 2, "distance_of_ions": [ [ 1.0, diff --git a/tests/op/test_prep_caly_dp_optim.py b/tests/op/test_prep_caly_dp_optim.py index 8905e229..d1ffd724 100644 --- a/tests/op/test_prep_caly_dp_optim.py +++ b/tests/op/test_prep_caly_dp_optim.py @@ -149,5 +149,5 @@ def test_01_success(self): self.assertEqual(len(out["task_names"]), 1) self.assertEqual(len(out["task_dirs"]), 1) self.assertEqual( - out["task_names"], [str(Path(self.task_name) / Path("opt_path"))] + out["task_names"], [str(Path(self.task_name) / Path("opt_path_0"))] ) diff --git a/tests/op/test_prep_caly_input.py b/tests/op/test_prep_caly_input.py index 5f2f505e..7eb77929 100644 --- a/tests/op/test_prep_caly_input.py +++ b/tests/op/test_prep_caly_input.py @@ -91,6 +91,7 @@ def test_success(self): self.assertEqual(out["input_dat_files"], self.input_dat_list) self.assertEqual(out["caly_run_opt_files"], self.caly_run_opt_list) self.assertEqual(out["caly_check_opt_files"], self.caly_check_opt_list) + self.assertEqual(out["ntasks"], 2) # check files details self.assertEqual(self.input_dat_list[0].read_text().strip("\n"), "input.dat_0") # self.assertEqual(self.caly_run_opt_list[1].read_text().strip("\n"), "run_1") diff --git a/tests/test_caly_evo_step.py b/tests/test_caly_evo_step.py index 25759973..2aff3614 100644 --- a/tests/test_caly_evo_step.py +++ b/tests/test_caly_evo_step.py @@ -272,7 +272,6 @@ def tearDown(self): shutil.rmtree(i, ignore_errors=True) for i in Path().glob("caly_task*"): shutil.rmtree(i, ignore_errors=True) - # shutil.rmtree("upload", ignore_errors=True) @unittest.skip("only need to run test_01") def test_00(self): diff --git a/tests/test_merge_caly_evo_step.py b/tests/test_merge_caly_evo_step.py new file mode 100644 index 00000000..3f22ba56 --- /dev/null +++ b/tests/test_merge_caly_evo_step.py @@ -0,0 +1,350 @@ +import json +import os +import pickle +import shutil +import time +import unittest +from pathlib import ( + Path, +) +from typing import ( + List, + Set, +) + +import jsonpickle +import numpy as np +from dflow import ( + InputArtifact, + InputParameter, + Inputs, + OutputArtifact, + OutputParameter, + Outputs, + S3Artifact, + Step, + Steps, + Workflow, + argo_range, + download_artifact, + upload_artifact, +) +from dflow.python import ( + OP, + OPIO, + Artifact, + OPIOSign, + PythonOPTemplate, + Slices, +) + +from dpgen2.constants import ( + calypso_check_opt_file, + calypso_index_pattern, + calypso_run_opt_file, +) + +try: + from context import ( + dpgen2, + ) +except ModuleNotFoundError: + # case of upload everything to argo, no context needed + pass +from context import ( + default_host, + default_image, + skip_ut_with_dflow, + skip_ut_with_dflow_reason, + upload_python_packages, +) +from mocked_ops import ( + MockedCollRunCaly, + MockedPrepCalyDPOptim, + MockedRunCalyDPOptim, + mocked_numb_models, +) + +from dpgen2.constants import ( + lmp_conf_name, + lmp_input_name, + lmp_log_name, + lmp_model_devi_name, + lmp_task_pattern, + lmp_traj_name, + model_name_pattern, + train_log_name, + train_script_name, + train_task_pattern, +) +from dpgen2.exploration.task import ( + ExplorationTask, + ExplorationTaskGroup, +) +from dpgen2.op import ( + PrepCalyDPOptim, + RunCalyDPOptim, +) +from dpgen2.op.caly_evo_step_merge import ( + CalyEvoStepMerge, +) +from dpgen2.op.collect_run_caly import ( + CollRunCaly, +) +from dpgen2.op.prep_caly_input import ( + PrepCalyInput, +) +from dpgen2.superop.caly_evo_step import ( + CalyEvoStep, +) +from dpgen2.utils.step_config import normalize as normalize_step_dict + +default_config = normalize_step_dict( + { + "template_config": { + "image": default_image, + }, + "template_slice_config": { + "group_size": 2, + "pool_size": 1, + }, + } +) + + +class TestMockedCollRunCaly(unittest.TestCase): + def setUp(self) -> None: + self.config = {} + self.task_name = "task_name" + self.file_storage = Path("storge_files") + self.file_storage.mkdir(parents=True, exist_ok=True) + self.input_file = self.file_storage.joinpath("input.dat") + self.input_file.write_text("5") + self.step_file = None + self.results_dir = None + self.opt_results_dir = None + self.cnt_num = 0 + self.finished = str(False) + + def tearDown(self) -> None: + shutil.rmtree(self.file_storage, ignore_errors=True) + shutil.rmtree(Path(self.task_name), ignore_errors=True) + + def test_mocked_coll_run_caly_00(self): + op = MockedCollRunCaly() + out = op.execute( + OPIO( + { + "config": self.config, + "task_name": self.task_name, + "cnt_num": self.cnt_num, + "input_file": self.input_file, + "step": self.step_file, + "results": self.results_dir, + "opt_results_dir": self.opt_results_dir, + } + ) + ) + + self.assertTrue(out["task_name"] == self.task_name) + self.assertTrue(out["finished"] == "false") + self.assertTrue( + Path("task_name/poscar_dir").joinpath("POSCAR_1") + in list(out["poscar_dir"].glob("POSCAR_*")) + ) + self.assertTrue(len(list(out["poscar_dir"].rglob("POSCAR_*"))) == 5) + self.assertTrue( + out["input_file"] == Path(self.task_name).joinpath(self.input_file.name) + ) + self.assertTrue(out["input_file"].read_text() == str(5)) + self.assertTrue(out["step"] == Path(self.task_name).joinpath("step")) + self.assertTrue(out["step"].read_text() == str(2)) + self.assertTrue(out["results"] == Path(self.task_name).joinpath("results")) + + +class TestMockedRunDPOptim(unittest.TestCase): + def setUp(self) -> None: + self.config = {} + self.task_name = "task_name" + self.file_storage = Path("storge_files") + self.file_storage.mkdir(parents=True, exist_ok=True) + for i in range(5): + self.file_storage.joinpath(f"POSCAR_{i}").write_text(f"POSCAR_{i}") + self.file_storage.joinpath(f"frozen_model.pb").write_text(f"model.{i}.pb") + self.caly_run_opt_file = self.file_storage.joinpath(calypso_run_opt_file) + self.caly_run_opt_file.write_text("caly_run_opt_script") + self.caly_check_opt_file = self.file_storage.joinpath(calypso_check_opt_file) + self.caly_check_opt_file.write_text("caly_check_opt_script") + + def tearDown(self) -> None: + shutil.rmtree(self.file_storage, ignore_errors=True) + shutil.rmtree(Path(self.task_name), ignore_errors=True) + + def test_mocked_run_dp_optim(self): + op = MockedRunCalyDPOptim() + out = op.execute( + OPIO( + { + "config": self.config, + "finished": "false", + "cnt_num": 0, + "task_name": self.task_name, + "task_dir": self.file_storage, + } + ) + ) + + # check output + self.assertEqual(out["task_name"], self.task_name) + + optim_results_dir = out["optim_results_dir"] + list_optim_results_dir = list(optim_results_dir.iterdir()) + counts_optim_results_dir = len(list_optim_results_dir) + counts_outcar_in_optim_results_dir = len( + list(optim_results_dir.rglob("OUTCAR_*")) + ) + + self.assertTrue(optim_results_dir, Path(self.task_name) / "optim_results_dir") + self.assertEqual(counts_optim_results_dir, 15) + self.assertEqual(counts_outcar_in_optim_results_dir, 5) + self.assertTrue( + Path(self.task_name) / "optim_results_dir" / "CONTCAR_4" + in list_optim_results_dir + ) + + traj_results_dir = out["traj_results"] + list_traj_results_dir = list(traj_results_dir.glob("*.traj")) + counts_traj = len(list_traj_results_dir) + self.assertEqual(traj_results_dir, Path(self.task_name) / "traj_results") + self.assertEqual(counts_traj, 5) + self.assertTrue( + Path(self.task_name) / "traj_results" / "3.traj", list_traj_results_dir + ) + + +# @unittest.skip("temporary pass") +@unittest.skipIf(skip_ut_with_dflow, skip_ut_with_dflow_reason) +class TestCalyEvoStepMerge(unittest.TestCase): + def setUp(self): + self.expl_config = {} + self.work_dir = Path("storge_files") + self.work_dir.mkdir(parents=True, exist_ok=True) + + self.max_step = 2 + self.nmodels = mocked_numb_models + self.model_list = [] + for ii in range(self.nmodels): + model_path = self.work_dir.joinpath(f"task.{ii}") + model_path.mkdir(exist_ok=True, parents=True) + model = model_path.joinpath(f"model.ckpt.pt") + model.write_text(f"model {ii}") + self.model_list.append(model) + self.models = upload_artifact(self.model_list) + + self.block_id = "id123id" + temp_name_pattern = "caly_task." + calypso_index_pattern + self.task_name = temp_name_pattern % 1 + self.task_name_list = [self.task_name, temp_name_pattern % 2] + + input_file = self.work_dir.joinpath("input.dat") + input_file.write_text(str(self.max_step)) + self.input_file = upload_artifact(input_file) + self.input_file_list = upload_artifact([input_file, input_file]) + + self.step = None + self.results = None + self.opt_results_dir = None + + caly_run_opt_file = self.work_dir.joinpath("caly_run_opt.py") + caly_run_opt_file.write_text("caly_run_opt") + self.caly_run_opt_file = upload_artifact(caly_run_opt_file) + self.caly_run_opt_files = upload_artifact( + [caly_run_opt_file, caly_run_opt_file] + ) + + caly_check_opt_file = self.work_dir.joinpath("caly_check_opt.py") + caly_check_opt_file.write_text("caly_check_opt") + self.caly_check_opt_file = upload_artifact(caly_check_opt_file) + self.caly_check_opt_files = upload_artifact( + [caly_check_opt_file, caly_check_opt_file] + ) + + def tearDown(self): + shutil.rmtree(self.work_dir, ignore_errors=True) + for i in Path().glob("caly-evo-step-*"): + shutil.rmtree(i, ignore_errors=True) + for i in Path().glob("caly_task*"): + shutil.rmtree(i, ignore_errors=True) + + def test_caly_evo_step(self): + steps = CalyEvoStepMerge( + name="caly-evo-step", + collect_run_caly=MockedCollRunCaly, + prep_dp_optim=PrepCalyDPOptim, + run_dp_optim=MockedRunCalyDPOptim, + prep_config=default_config, + run_config=default_config, + upload_python_packages=None, + ) + caly_evo_step = Step( + "caly-evo-step", + template=PythonOPTemplate( + steps, + python_packages=upload_python_packages, + **default_config["template_config"], + ), + slices=Slices( + input_parameter=[ + "task_name", + ], + input_artifact=[ + "input_file", + "results", + "step", + "opt_results_dir", + "caly_run_opt_file", + "caly_check_opt_file", + ], + output_artifact=["traj_results"], + ), + parameters={ + "block_id": self.block_id, + "task_name": self.task_name_list, + "iter_num": "{{item}}", + "expl_config": self.expl_config, + }, + artifacts={ + "models": self.models, + "input_file": self.input_file_list, + "caly_run_opt_file": self.caly_run_opt_files, + "caly_check_opt_file": self.caly_check_opt_files, + "results": None, + "step": None, + "opt_results_dir": None, + "qhull_input": None, + }, + ) + + wf = Workflow(name="caly-evo-step", host=default_host) + wf.add(caly_evo_step) + wf.submit() + + while wf.query_status() in ["Pending", "Running"]: + time.sleep(4) + + self.assertEqual(wf.query_status(), "Succeeded") + step = wf.query_step(name="caly-evo-step")[0] + self.assertEqual(step.phase, "Succeeded") + + download_artifact(step.outputs.artifacts["traj_results"]) + + cwd = Path().cwd() + for idx, name in enumerate(self.task_name_list): + cwd = Path().cwd() + os.chdir(Path(name)) + traj_list = list(Path().rglob("*.traj")) + self.assertEqual(len(traj_list), 5 * self.max_step) + self.assertTrue( + Path("opt_path_0/traj_results").joinpath(f"{idx}.0.traj") in traj_list + ) + os.chdir(cwd) diff --git a/tests/test_prep_run_caly.py b/tests/test_prep_run_caly.py index d555d418..beba2ca4 100644 --- a/tests/test_prep_run_caly.py +++ b/tests/test_prep_run_caly.py @@ -69,6 +69,9 @@ BaseExplorationTaskGroup, ExplorationTask, ) +from dpgen2.op.caly_evo_step_merge import ( + CalyEvoStepMerge, +) from dpgen2.op.prep_caly_dp_optim import ( PrepCalyDPOptim, ) @@ -96,18 +99,6 @@ }, } ) -run_default_config = normalize_step_dict( - { - "template_config": { - "image": default_image, - }, - "template_slice_config": { - "group_size": 2, - "pool_size": 1, - "model_devi_group_size": 30, - }, - } -) def make_task_group_list(njobs): @@ -148,7 +139,78 @@ def tearDown(self): for i in Path().glob("prep-run-caly-step*"): shutil.rmtree(i, ignore_errors=True) - def test(self): + def test_caly_evo_step_merge_merge_mode(self): + run_default_config = normalize_step_dict( + { + "mode": "merge", + "template_config": { + "image": default_image, + }, + "template_slice_config": { + "group_size": 2, + "pool_size": 1, + "model_devi_group_size": 30, + }, + } + ) + caly_evo_step_op = CalyEvoStepMerge( + name="caly-evo-step", + collect_run_caly=MockedCollRunCaly, + prep_dp_optim=PrepCalyDPOptim, + run_dp_optim=MockedRunCalyDPOptim, + prep_config=prep_default_config, + run_config=run_default_config, + upload_python_packages=None, + ) + prep_run_caly_op = PrepRunCaly( + "prep-run-calypso", + PrepCalyInput, + caly_evo_step_op, + PrepCalyModelDevi, + MockedRunCalyModelDevi, + prep_config=prep_default_config, + run_config=run_default_config, + upload_python_packages=upload_python_packages, + ) + prep_run_caly_step = Step( + "prep-run-caly-step", + template=prep_run_caly_op, + parameters={ + "block_id": self.block_id, + "expl_task_grp": self.expl_task_grp, + "explore_config": self.expl_config, + "type_map": self.type_map, + }, + artifacts={ + "models": self.models, + }, + ) + + wf = Workflow(name="prep-run-caly-step", host=default_host) + wf.add(prep_run_caly_step) + wf.submit() + + while wf.query_status() in ["Pending", "Running"]: + time.sleep(4) + + self.assertEqual(wf.query_status(), "Succeeded") + step = wf.query_step(name="prep-run-caly-step")[0] + self.assertEqual(step.phase, "Succeeded") + + def test_caly_evo_step_merge_default_mode(self): + run_default_config = normalize_step_dict( + { + "mode": "default", + "template_config": { + "image": default_image, + }, + "template_slice_config": { + "group_size": 2, + "pool_size": 1, + "model_devi_group_size": 30, + }, + } + ) caly_evo_step_op = CalyEvoStep( "caly-evo-run", MockedCollRunCaly, @@ -192,10 +254,3 @@ def test(self): self.assertEqual(wf.query_status(), "Succeeded") step = wf.query_step(name="prep-run-caly-step")[0] self.assertEqual(step.phase, "Succeeded") - - # download_artifact(step.outputs.artifacts["model_devis"]) - # download_artifact(step.outputs.artifacts["trajs"]) - # download_artifact(step.outputs.artifacts["logs"]) - - # for ii in step.outputs.parameters["task_names"].value: - # self.check_run_lmp_output(ii, self.model_list)