Skip to content

Commit

Permalink
Fix dp optim parallel (#208)
Browse files Browse the repository at this point in the history
Signed-off-by: zjgemi <[email protected]>
Co-authored-by: zjgemi <[email protected]>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored Apr 1, 2024
1 parent f2e1d59 commit 2e17cf9
Show file tree
Hide file tree
Showing 17 changed files with 1,114 additions and 428 deletions.
15 changes: 10 additions & 5 deletions dpgen2/entrypoint/submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,11 @@
from dpgen2.op import (
CollectData,
CollRunCaly,
PrepCalyDPOptim,
PrepCalyInput,
PrepDPTrain,
PrepLmp,
PrepRunDPOptim,
RunCalyDPOptim,
RunCalyModelDevi,
RunDPTrain,
RunLmp,
Expand Down Expand Up @@ -172,7 +173,8 @@ def make_concurrent_learning_op(
caly_evo_step_op = CalyEvoStep(
"caly-evo-step",
collect_run_caly=CollRunCaly,
prep_run_dp_optim=PrepRunDPOptim,
prep_dp_optim=PrepCalyDPOptim,
run_dp_optim=RunCalyDPOptim,
prep_config=prep_explore_config,
run_config=run_explore_config,
upload_python_packages=upload_python_packages,
Expand Down Expand Up @@ -797,8 +799,10 @@ def get_superop(key):
return key.replace("prep-caly-input", "prep-run-explore")
elif "collect-run-calypso-" in key:
return re.sub("collect-run-calypso-[0-9]*-[0-9]*", "prep-run-explore", key)
elif "prep-run-dp-optim-" in key:
return re.sub("prep-run-dp-optim-[0-9]*-[0-9]*", "prep-run-explore", key)
elif "prep-dp-optim-" in key:
return re.sub("prep-dp-optim-[0-9]*-[0-9]*", "prep-run-explore", key)
elif "run-dp-optim-" in key:
return re.sub("run-dp-optim-[0-9]*-[0-9]*-[0-9]*", "prep-run-explore", key)
elif "run-caly-model-devi" in key:
return key.replace("run-caly-model-devi", "prep-run-explore")
return None
Expand Down Expand Up @@ -843,7 +847,8 @@ def get_resubmit_keys(
"modify-train-script",
"prep-caly-input",
"collect-run-calypso",
"prep-run-dp-optim",
"prep-dp-optim",
"run-dp-optim",
"run-caly-model-devi",
"prep-run-explore",
"prep-lmp",
Expand Down
7 changes: 5 additions & 2 deletions dpgen2/op/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
from .collect_run_caly import (
CollRunCaly,
)
from .prep_caly_dp_optim import (
PrepCalyDPOptim,
)
from .prep_caly_input import (
PrepCalyInput,
)
Expand All @@ -13,8 +16,8 @@
from .prep_lmp import (
PrepLmp,
)
from .prep_run_dp_optim import (
PrepRunDPOptim,
from .run_caly_dp_optim import (
RunCalyDPOptim,
)
from .run_caly_model_devi import (
RunCalyModelDevi,
Expand Down
25 changes: 16 additions & 9 deletions dpgen2/op/collect_run_caly.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def get_input_sign(cls):
type=Path, optional=True
), # dir named results for evo
"opt_results_dir": Artifact(
type=Path, optional=True
type=List[Path], optional=True
), # dir contains POSCAR* CONTCAR* OUTCAR*
"qhull_input": Artifact(type=Path, optional=True), # for vsc
}
Expand Down Expand Up @@ -141,11 +141,16 @@ def execute(
results = (
ip["results"].resolve() if ip["results"] is not None else ip["results"]
)
opt_results_dir = (
ip["opt_results_dir"].resolve()
if ip["opt_results_dir"] is not None
else ip["opt_results_dir"]
)
# opt_results_dir = (
# ip["opt_results_dir"].resolve()
# if ip["opt_results_dir"] is not None
# else ip["opt_results_dir"]
# )
opt_results_dir = []
if ip["opt_results_dir"] is not None:
for temp in ip["opt_results_dir"]:
opt_results_dir.append(Path(temp).resolve())

qhull_input = (
ip["qhull_input"].resolve()
if ip["qhull_input"] is not None
Expand Down Expand Up @@ -227,11 +232,13 @@ def normalize_config(data={}):


def prep_last_calypso_file(step, results, opt_results_dir, qhull_input, vsc):
if step is not None and results is not None or opt_results_dir is not None:
if step is not None and results is not None and opt_results_dir is not None:
Path(step.name).symlink_to(step)
Path(results.name).symlink_to(results)
for file_name in opt_results_dir.iterdir():
Path(file_name.name).symlink_to(file_name)
assert isinstance(opt_results_dir, list), "opt_results_dir should be a list."
for opt_results_name in opt_results_dir:
for file_name in opt_results_name.iterdir():
Path(file_name.name).symlink_to(file_name)

if vsc and qhull_input is not None:
Path(qhull_input.name).symlink_to(qhull_input)
Expand Down
163 changes: 163 additions & 0 deletions dpgen2/op/prep_caly_dp_optim.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
import json
import logging
import pickle
import shutil
from pathlib import (
Path,
)
from typing import (
List,
Tuple,
)

from dflow.python import (
OP,
OPIO,
Artifact,
BigParameter,
OPIOSign,
Parameter,
TransientError,
)

from dpgen2.constants import (
calypso_check_opt_file,
calypso_opt_dir_name,
calypso_run_opt_file,
model_name_pattern,
)
from dpgen2.exploration.task import (
ExplorationTaskGroup,
)
from dpgen2.utils import (
BinaryFileInput,
set_directory,
)
from dpgen2.utils.run_command import (
run_command,
)


class PrepCalyDPOptim(OP):
r"""Prepare the working directories and input file according to slices information
for structure optimization with DP.
`POSCAR_*`, `frozen_model.pb` or `model.ckpt.pt`, `calypso_run_opt.py`
and `calypso_check_opt.py` will be copied or symlink to each optimization directory
from `ip["work_path"]`, according to the group size of ip["template_slice_config"].
The POSCAR_* will be splited into group_size parts and the name of each path will be returned
in a `task_names` list and `task_dirs` list.
"""

@classmethod
def get_input_sign(cls):
return OPIOSign(
{
"task_name": Parameter(str), # calypso_task.idx
"finished": Parameter(str),
"template_slice_config": Parameter(dict),
"poscar_dir": Artifact(
Path
), # from run_calypso first, then from collect_run_caly
"models_dir": Artifact(Path), #
"caly_run_opt_file": Artifact(Path), # from prep_caly_input
"caly_check_opt_file": Artifact(Path), # from prep_caly_input
}
)

@classmethod
def get_output_sign(cls):
return OPIOSign(
{
"task_names": Parameter(List[str]),
"task_dirs": Artifact(List[Path]),
"caly_run_opt_file": Artifact(Path), # from prep_caly_input
"caly_check_opt_file": Artifact(Path), # from prep_caly_input
}
)

@OP.exec_sign_check
def execute(
self,
ip: OPIO,
) -> OPIO:
r"""Execute the OP.
Parameters
----------
ip : dict
Input dict with components:
- `task_name` : (`str`)
- `finished` : (`str`)
- `template_slice_config` : (`dict`)
- `poscar_dir` : (`Path`)
- `models_dir` : (`Path`)
- `caly_run_opt_file` : (`Path`)
- `caly_check_opt_file` : (`Path`)
Returns
-------
op : dict
Output dict with components:
- `task_names`: (`List[str]`)
- `task_dirs`: (`Artifact(List[Path])`)
- `caly_run_opt_file` : (`Path`)
- `caly_check_opt_file` : (`Path`)
"""
group_size = ip["template_slice_config"]["group_size"]

finished = ip["finished"]

work_dir = Path(ip["task_name"])
poscar_dir = ip["poscar_dir"]
models_dir = ip["models_dir"]
_caly_run_opt_file = ip["caly_run_opt_file"]
_caly_check_opt_file = ip["caly_check_opt_file"]
caly_run_opt_file = _caly_run_opt_file.resolve()
caly_check_opt_file = _caly_check_opt_file.resolve()
poscar_list = [poscar.resolve() for poscar in poscar_dir.rglob("POSCAR_*")]
poscar_list = sorted(poscar_list, key=lambda x: int(x.name.strip("POSCAR_")))
model_name = "frozen_model.pb"
model_list = [model.resolve() for model in models_dir.rglob(model_name)]
if len(model_list) == 0:
model_name = "model.ckpt.pt"
model_list = [model.resolve() for model in models_dir.rglob(model_name)]
model_list = sorted(model_list, key=lambda x: str(x).split(".")[1])
model_file = model_list[0]

with set_directory(work_dir):
Path(caly_run_opt_file.name).symlink_to(caly_run_opt_file)
Path(caly_check_opt_file.name).symlink_to(caly_check_opt_file)
if finished == "false":
grouped_poscar_list = [
poscar_list[i : i + group_size]
for i in range(0, len(poscar_list), group_size)
]

task_dirs = []
for idx, _poscar_list in enumerate(grouped_poscar_list):
opt_path = Path(f"opt_path_{idx}")
task_dirs.append(work_dir / opt_path)
with set_directory(opt_path):
for poscar in _poscar_list:
Path(poscar.name).symlink_to(poscar)
Path(model_name).symlink_to(model_file)
Path(caly_run_opt_file.name).symlink_to(caly_run_opt_file)
Path(caly_check_opt_file.name).symlink_to(caly_check_opt_file)
task_names = [str(task_dir) for task_dir in task_dirs]
else:
temp_dir = work_dir / "opt_path"
temp_dir.mkdir(parents=True, exist_ok=True)
task_dirs = [temp_dir]
task_names = [str(task_dir) for task_dir in task_dirs]

return OPIO(
{
"task_names": task_names,
"task_dirs": task_dirs,
"caly_run_opt_file": work_dir / caly_run_opt_file.name,
"caly_check_opt_file": work_dir / caly_check_opt_file.name,
}
)
Loading

0 comments on commit 2e17cf9

Please sign in to comment.