Skip to content

Commit

Permalink
feat: merged mode for caly evo step (deepmodeling#224)
Browse files Browse the repository at this point in the history
one can choose mode from `default` and `merge` in
run_explore_config/mode

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Introduced `CalyEvoStepMerge` class to handle merging results from
evolutionary steps.
- Added support for different execution modes ("merge" and "default") in
the concurrent learning operations.

- **Improvements**
- Updated configuration settings in `input.test.json` to include new
image versions and refined atom configurations.

- **Bug Fixes**
- Adjusted temporary directory path in `prep_caly_dp_optim.py` to
correct directory structure.
- Modified parameter types and added new parameters in various functions
to enhance flexibility and accuracy.

- **Tests**
- Added new test cases for `CalyEvoStepMerge` and other workflow
operations.
  - Updated existing tests to reflect changes in parameters and paths.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Signed-off-by: zjgemi <[email protected]>
Co-authored-by: zjgemi <[email protected]>
  • Loading branch information
wangzyphysics and zjgemi authored May 29, 2024
1 parent aa806e1 commit 9408071
Show file tree
Hide file tree
Showing 14 changed files with 658 additions and 87 deletions.
39 changes: 29 additions & 10 deletions dpgen2/entrypoint/submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,19 @@
RunLmp,
SelectConfs,
)
from dpgen2.op.caly_evo_step_merge import (
CalyEvoStepMerge,
)
from dpgen2.superop import (
CalyEvoStep,
ConcurrentLearningBlock,
PrepRunCaly,
PrepRunDPTrain,
PrepRunFp,
PrepRunLmp,
)
from dpgen2.superop.caly_evo_step import (
CalyEvoStep,
)
from dpgen2.utils import (
BinaryFileInput,
bohrium_config_from_dict,
Expand Down Expand Up @@ -149,6 +154,7 @@ def make_concurrent_learning_op(
upload_python_packages: Optional[List[os.PathLike]] = None,
valid_data: Optional[S3Artifact] = None,
):
expl_mode = run_explore_config.get("mode", "default")
if train_style in ("dp", "dp-dist"):
prep_run_train_op = PrepRunDPTrain(
"prep-run-dp-train",
Expand All @@ -171,15 +177,28 @@ def make_concurrent_learning_op(
upload_python_packages=upload_python_packages,
)
elif explore_style == "calypso":
caly_evo_step_op = CalyEvoStep(
"caly-evo-step",
collect_run_caly=CollRunCaly,
prep_dp_optim=PrepCalyDPOptim,
run_dp_optim=RunCalyDPOptim,
prep_config=prep_explore_config,
run_config=run_explore_config,
upload_python_packages=upload_python_packages,
)
if expl_mode == "merge":
caly_evo_step_op = CalyEvoStepMerge(
name="caly-evo-step",
collect_run_caly=CollRunCaly,
prep_dp_optim=PrepCalyDPOptim,
run_dp_optim=RunCalyDPOptim,
prep_config=prep_explore_config,
run_config=run_explore_config,
upload_python_packages=None,
)
elif expl_mode == "default":
caly_evo_step_op = CalyEvoStep(
name="caly-evo-step",
collect_run_caly=CollRunCaly,
prep_dp_optim=PrepCalyDPOptim,
run_dp_optim=RunCalyDPOptim,
prep_config=prep_explore_config,
run_config=run_explore_config,
upload_python_packages=upload_python_packages,
)
else:
raise KeyError(f"Unknown key: {expl_mode}, support `default` and `merge`.")
prep_run_explore_op = PrepRunCaly(
"prep-run-calypso",
prep_caly_input_op=PrepCalyInput,
Expand Down
129 changes: 129 additions & 0 deletions dpgen2/op/caly_evo_step_merge.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
import json
import logging
import pickle
import shutil
from pathlib import (
Path,
)
from typing import (
List,
Tuple,
)

from dflow import (
Step,
Workflow,
download_artifact,
upload_artifact,
)
from dflow.python import (
OP,
OPIO,
Artifact,
BigParameter,
OPIOSign,
Parameter,
Slices,
TransientError,
)
from dflow.utils import (
flatten,
)

from dpgen2.constants import (
calypso_check_opt_file,
calypso_run_opt_file,
)
from dpgen2.exploration.task import (
ExplorationTaskGroup,
)
from dpgen2.superop.caly_evo_step import (
CalyEvoStep,
)
from dpgen2.utils import (
BinaryFileInput,
set_directory,
)
from dpgen2.utils.run_command import (
run_command,
)


class CalyEvoStepMerge(OP):
def __init__(self, mode="debug", *args, **kwargs):
self.mode = mode
self.args = args
self.kwargs = kwargs

@classmethod
def get_input_sign(cls):
return OPIOSign(
{
"iter_num": int,
"cnt_num": Parameter(int, default=0),
"block_id": Parameter(str, default=""),
"task_name": BigParameter(str),
"expl_config": BigParameter(dict),
"models": Artifact(Path),
"input_file": Artifact(Path),
"caly_run_opt_file": Artifact(Path),
"caly_check_opt_file": Artifact(Path),
"results": Artifact(Path, optional=True),
"step": Artifact(Path, optional=True),
"opt_results_dir": Artifact(List[Path], optional=True),
"qhull_input": Artifact(Path, optional=True),
}
)

@classmethod
def get_output_sign(cls):
return OPIOSign(
{
"traj_results": Artifact(List[Path]),
}
)

@OP.exec_sign_check
def execute(
self,
ip: OPIO,
) -> OPIO:
from dflow import (
config,
)

config["mode"] = self.mode
wf = Workflow("caly-evo-workflow")
steps = CalyEvoStep(*self.args, **self.kwargs)
step = Step(
"caly-evo-step",
template=steps,
slices=Slices(output_artifact=["traj_results"]),
parameters={k: ip[k] for k in steps.inputs.parameters},
artifacts={
k: upload_artifact(ip[k]) if ip[k] is not None else None
for k in steps.inputs.artifacts
},
with_param=[0],
)
wf.add(step)
wf.submit()
wf.wait()
assert wf.query_status() == "Succeeded"
out = OPIO()
step = wf.query_step("caly-evo-step")[0]
for k in step.outputs.parameters:
out[k] = step.outputs.parameters[k].value
output_sign = self.get_output_sign()
for k in step.outputs.artifacts:
path_list = download_artifact(step.outputs.artifacts[k])
if output_sign[k].type == List[Path]:
if not isinstance(path_list, list) or any(
[p is not None and not isinstance(p, str) for p in path_list]
):
path_list = list(flatten(path_list).values())
out[k] = [Path(p) for p in path_list]
elif output_sign[k].type == Path:
assert len(path_list) == 1
out[k] = Path(path_list[0])
return out
4 changes: 2 additions & 2 deletions dpgen2/op/collect_run_caly.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def get_input_sign(cls):
return OPIOSign(
{
"config": BigParameter(dict), # for command
"task_name": Parameter(str), # calypso_task.idx
"task_name": BigParameter(str), # calypso_task.idx
"cnt_num": Parameter(int),
"input_file": Artifact(Path), # input.dat, !!! must be provided
"step": Artifact(type=Path, optional=True), # step file
Expand All @@ -77,7 +77,7 @@ def get_input_sign(cls):
def get_output_sign(cls):
return OPIOSign(
{
"task_name": Parameter(str), # calypso_task.idx
"task_name": BigParameter(str), # calypso_task.idx
"finished": Parameter(str), # True if cnt_num == maxstep
"poscar_dir": Artifact(Path), # dir contains POSCAR* of next step
"input_file": Artifact(Path), # input.dat
Expand Down
4 changes: 2 additions & 2 deletions dpgen2/op/prep_caly_dp_optim.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class PrepCalyDPOptim(OP):
def get_input_sign(cls):
return OPIOSign(
{
"task_name": Parameter(str), # calypso_task.idx
"task_name": BigParameter(str), # calypso_task.idx
"finished": Parameter(str),
"template_slice_config": Parameter(dict),
"poscar_dir": Artifact(
Expand Down Expand Up @@ -149,7 +149,7 @@ def execute(
Path(caly_check_opt_file.name).symlink_to(caly_check_opt_file)
task_names = [str(task_dir) for task_dir in task_dirs]
else:
temp_dir = work_dir / "opt_path"
temp_dir = work_dir / "opt_path_0"
temp_dir.mkdir(parents=True, exist_ok=True)
task_dirs = [temp_dir]
task_names = [str(task_dir) for task_dir in task_dirs]
Expand Down
4 changes: 3 additions & 1 deletion dpgen2/op/prep_caly_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,8 @@ def get_input_sign(cls):
def get_output_sign(cls):
return OPIOSign(
{
"task_names": Parameter(List[str]), # task dir names
"ntasks": Parameter(int),
"task_names": BigParameter(List[str]), # task dir names
"input_dat_files": Artifact(List[Path]), # `input.dat`s
"caly_run_opt_files": Artifact(List[Path]),
"caly_check_opt_files": Artifact(List[Path]),
Expand Down Expand Up @@ -367,6 +368,7 @@ def execute(

return OPIO(
{
"ntasks": len(task_names),
"task_names": task_names,
"input_dat_files": input_dat_files,
"caly_run_opt_files": caly_run_opt_files,
Expand Down
3 changes: 0 additions & 3 deletions dpgen2/superop/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
from .block import (
ConcurrentLearningBlock,
)
from .caly_evo_step import (
CalyEvoStep,
)
from .prep_run_calypso import (
PrepRunCaly,
)
Expand Down
21 changes: 16 additions & 5 deletions dpgen2/superop/caly_evo_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,20 @@ def _caly_evo_step(
run_config = deepcopy(run_config)
prep_template_config = prep_config.pop("template_config")
run_template_config = run_config.pop("template_config")
prep_executor = init_executor(prep_config.pop("executor"))
run_executor = init_executor(run_config.pop("executor"))
prep_executor_config = prep_config.pop("executor")
run_executor_config = run_config.pop("executor")
template_slice_config = run_config.pop("template_slice_config", {})
expl_mode = run_config.pop("mode", "default")

def wise_executor(expl_mode, origin_executor_config):
if expl_mode == "default":
return init_executor(deepcopy(origin_executor_config))
elif expl_mode == "merge":
return None
else:
raise NotImplementedError(
f"Unknown expl_mode {expl_mode}, only support `default` and `merge`."
)

# collect the last step files and run calypso.x to generate structures
collect_run_calypso = Step(
Expand Down Expand Up @@ -171,7 +182,7 @@ def _caly_evo_step(
caly_evo_step_steps.inputs.parameters["iter_num"],
caly_evo_step_steps.inputs.parameters["cnt_num"],
),
executor=prep_executor,
executor=wise_executor(expl_mode, prep_executor_config),
**run_config,
)
caly_evo_step_steps.add(collect_run_calypso)
Expand Down Expand Up @@ -205,7 +216,7 @@ def _caly_evo_step(
caly_evo_step_steps.inputs.parameters["iter_num"],
caly_evo_step_steps.inputs.parameters["cnt_num"],
),
executor=prep_executor, # cpu is enough to run calypso.x, default step config is c2m4
executor=wise_executor(expl_mode, prep_executor_config),
**run_config,
)
caly_evo_step_steps.add(prep_dp_optim)
Expand Down Expand Up @@ -238,7 +249,7 @@ def _caly_evo_step(
caly_evo_step_steps.inputs.parameters["iter_num"],
caly_evo_step_steps.inputs.parameters["cnt_num"],
),
executor=run_executor,
executor=wise_executor(expl_mode, run_executor_config),
**run_config,
)
caly_evo_step_steps.add(run_dp_optim)
Expand Down
33 changes: 23 additions & 10 deletions dpgen2/superop/prep_run_calypso.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
List,
Optional,
Type,
Union,
)

from dflow import (
Expand Down Expand Up @@ -47,17 +48,13 @@
)
from dpgen2.utils.step_config import normalize as normalize_step_dict

from .caly_evo_step import (
CalyEvoStep,
)


class PrepRunCaly(Steps):
def __init__(
self,
name: str,
prep_caly_input_op: Type[OP],
caly_evo_step_op: OPTemplate,
caly_evo_step_op: Union[OPTemplate, OP],
prep_caly_model_devi_op: Type[OP],
run_caly_model_devi_op: Type[OP],
prep_config: dict = normalize_step_dict({}),
Expand Down Expand Up @@ -147,7 +144,7 @@ def _prep_run_caly(
prep_run_caly_steps: Steps,
step_keys: Dict[str, Any],
prep_caly_input_op: Type[OP],
caly_evo_step_op: OPTemplate,
caly_evo_step_op: Union[OPTemplate, OP],
prep_caly_model_devi_op: Type[OP],
run_caly_model_devi_op: Type[OP],
prep_config: dict = normalize_step_dict({}),
Expand All @@ -161,6 +158,7 @@ def _prep_run_caly(
prep_executor = init_executor(prep_config.pop("executor"))
run_executor = init_executor(run_config.pop("executor"))
template_slice_config = run_config.pop("template_slice_config", {})
expl_mode = run_config.pop("mode", "default")

# prep caly input files
prep_caly_input = Step(
Expand All @@ -181,10 +179,24 @@ def _prep_run_caly(
prep_run_caly_steps.add(prep_caly_input)

temp_value = None
if expl_mode == "default":
caly_evo_step_config = prep_config
caly_evo_step_executor = None
template = caly_evo_step_op
elif expl_mode == "merge":
caly_evo_step_config = run_config
caly_evo_step_executor = run_executor
template = PythonOPTemplate(
caly_evo_step_op, # type: ignore
python_packages=upload_python_packages,
**run_template_config,
) # type: ignore
else:
raise KeyError(f"Unknown expl mode `{expl_mode}`")

caly_evo_step = Step(
name="caly-evo-step",
template=caly_evo_step_op,
"caly-evo-step",
template=template, # type: ignore
slices=Slices(
input_parameter=[
"task_name",
Expand Down Expand Up @@ -220,8 +232,9 @@ def _prep_run_caly(
"qhull_input": temp_value,
},
key=step_keys["caly-evo-step-{{item}}"],
executor=prep_executor,
**prep_config,
with_param=argo_range(prep_caly_input.outputs.parameters["ntasks"]), # type: ignore
executor=caly_evo_step_executor,
**caly_evo_step_config,
)
prep_run_caly_steps.add(caly_evo_step)

Expand Down
Loading

0 comments on commit 9408071

Please sign in to comment.