From ccbadd186ddbc6adaab2f125f7c890d93f2a38c8 Mon Sep 17 00:00:00 2001 From: zhenyu wang Date: Tue, 28 May 2024 16:17:47 +0800 Subject: [PATCH] fix: split merge and default op --- dpgen2/entrypoint/submit.py | 35 +++++-- dpgen2/op/caly_evo_step_merge.py | 9 +- dpgen2/superop/caly_evo_step.py | 5 +- dpgen2/superop/prep_run_calypso.py | 156 +++++++++++++++++++---------- tests/test_merge_caly_evo_step.py | 3 +- tests/test_prep_run_caly.py | 92 +++++++++++++---- 6 files changed, 206 insertions(+), 94 deletions(-) diff --git a/dpgen2/entrypoint/submit.py b/dpgen2/entrypoint/submit.py index 40e09d93..8cf872a1 100644 --- a/dpgen2/entrypoint/submit.py +++ b/dpgen2/entrypoint/submit.py @@ -113,6 +113,9 @@ PrepRunFp, PrepRunLmp, ) +from dpgen2.superop.caly_evo_step import ( + CalyEvoStep, +) from dpgen2.utils import ( BinaryFileInput, bohrium_config_from_dict, @@ -174,16 +177,28 @@ def make_concurrent_learning_op( upload_python_packages=upload_python_packages, ) elif explore_style == "calypso": - caly_evo_step_op = CalyEvoStepMerge( - mode=expl_mode, - name="caly-evo-step", - collect_run_caly=CollRunCaly, - prep_dp_optim=PrepCalyDPOptim, - run_dp_optim=RunCalyDPOptim, - prep_config=prep_explore_config, - run_config=run_explore_config, - upload_python_packages=upload_python_packages, - ) + if expl_mode == "merge": + caly_evo_step_op = CalyEvoStepMerge( + name="caly-evo-step", + collect_run_caly=CollRunCaly, + prep_dp_optim=PrepCalyDPOptim, + run_dp_optim=RunCalyDPOptim, + prep_config=prep_explore_config, + run_config=run_explore_config, + upload_python_packages=None, + ) + elif expl_mode == "default": + caly_evo_step_op = CalyEvoStep( + name="caly-evo-step", + collect_run_caly=CollRunCaly, + prep_dp_optim=PrepCalyDPOptim, + run_dp_optim=RunCalyDPOptim, + prep_config=prep_explore_config, + run_config=run_explore_config, + upload_python_packages=upload_python_packages, + ) + else: + raise KeyError(f"Unknown key: {expl_mode}, support `default` and `merge`.") prep_run_explore_op = PrepRunCaly( "prep-run-calypso", prep_caly_input_op=PrepCalyInput, diff --git a/dpgen2/op/caly_evo_step_merge.py b/dpgen2/op/caly_evo_step_merge.py index eab5411c..855de0b7 100644 --- a/dpgen2/op/caly_evo_step_merge.py +++ b/dpgen2/op/caly_evo_step_merge.py @@ -26,6 +26,9 @@ Slices, TransientError, ) +from dflow.utils import ( + flatten, +) from dpgen2.constants import ( calypso_check_opt_file, @@ -47,7 +50,7 @@ class CalyEvoStepMerge(OP): - def __init__(self, mode="default", *args, **kwargs): + def __init__(self, mode="debug", *args, **kwargs): self.mode = mode self.args = args self.kwargs = kwargs @@ -115,10 +118,6 @@ def execute( for k in step.outputs.artifacts: path_list = download_artifact(step.outputs.artifacts[k]) if output_sign[k].type == List[Path]: - from dflow.utils import ( - flatten, - ) - if not isinstance(path_list, list) or any( [p is not None and not isinstance(p, str) for p in path_list] ): diff --git a/dpgen2/superop/caly_evo_step.py b/dpgen2/superop/caly_evo_step.py index 1ce0a05b..44fe966d 100644 --- a/dpgen2/superop/caly_evo_step.py +++ b/dpgen2/superop/caly_evo_step.py @@ -137,7 +137,6 @@ def _caly_evo_step( run_config: dict = normalize_step_dict({}), upload_python_packages: Optional[List[os.PathLike]] = None, ): - print("run_config. = == = = =", run_config) prep_config = deepcopy(prep_config) run_config = deepcopy(run_config) prep_template_config = prep_config.pop("template_config") @@ -150,11 +149,11 @@ def _caly_evo_step( def wise_executor(expl_mode, origin_executor): if expl_mode == "default": return init_executor(origin_executor) - elif expl_mode == "debug": + elif expl_mode == "merge": return None else: raise NotImplementedError( - f"Unknown expl_mode {expl_mode}, only support `default` and `debug`." + f"Unknown expl_mode {expl_mode}, only support `default` and `merge`." ) # collect the last step files and run calypso.x to generate structures diff --git a/dpgen2/superop/prep_run_calypso.py b/dpgen2/superop/prep_run_calypso.py index 3afb8b93..40ce47de 100644 --- a/dpgen2/superop/prep_run_calypso.py +++ b/dpgen2/superop/prep_run_calypso.py @@ -53,7 +53,7 @@ def __init__( self, name: str, prep_caly_input_op: Type[OP], - caly_evo_step_op: Type[OP], + caly_evo_step_op, prep_caly_model_devi_op: Type[OP], run_caly_model_devi_op: Type[OP], prep_config: dict = normalize_step_dict({}), @@ -143,7 +143,7 @@ def _prep_run_caly( prep_run_caly_steps: Steps, step_keys: Dict[str, Any], prep_caly_input_op: Type[OP], - caly_evo_step_op: Type[OP], + caly_evo_step_op, prep_caly_model_devi_op: Type[OP], run_caly_model_devi_op: Type[OP], prep_config: dict = normalize_step_dict({}), @@ -179,60 +179,106 @@ def _prep_run_caly( temp_value = None if expl_mode == "default": - caly_evo_step_merge_executor = prep_executor - caly_evo_step_merge_config = prep_config - elif expl_mode == "debug": - caly_evo_step_merge_executor = run_executor - caly_evo_step_merge_config = run_config + caly_evo_step_executor = prep_executor + caly_evo_step_config = prep_config + elif expl_mode == "merge": + caly_evo_step_executor = run_executor + caly_evo_step_config = run_config else: - raise NotImplementedError(f"Unknown expl mode {expl_mode}") + raise NotImplementedError(f"Unknown expl mode `{expl_mode}`") - caly_evo_step_merge = Step( - "caly-evo-step", - template=PythonOPTemplate( - caly_evo_step_op, - python_packages=upload_python_packages, - **run_template_config, - ), - slices=Slices( - input_parameter=[ - "task_name", - ], - input_artifact=[ - "input_file", - "results", - "step", - "opt_results_dir", - "caly_run_opt_file", - "caly_check_opt_file", - ], - output_artifact=["traj_results"], - ), - parameters={ - "block_id": prep_run_caly_steps.inputs.parameters["block_id"], - "expl_config": prep_run_caly_steps.inputs.parameters["explore_config"], - "task_name": prep_caly_input.outputs.parameters["task_names"], - "iter_num": "{{item}}", - }, - artifacts={ - "models": prep_run_caly_steps.inputs.artifacts["models"], - "input_file": prep_caly_input.outputs.artifacts["input_dat_files"], - "caly_run_opt_file": prep_caly_input.outputs.artifacts[ - "caly_run_opt_files" - ], - "caly_check_opt_file": prep_caly_input.outputs.artifacts[ - "caly_check_opt_files" - ], - "results": temp_value, - "step": temp_value, - "opt_results_dir": temp_value, - "qhull_input": temp_value, - }, - key=step_keys["caly-evo-step-{{item}}"], - executor=caly_evo_step_merge_executor, - **caly_evo_step_merge_config, - ) - prep_run_caly_steps.add(caly_evo_step_merge) + if expl_mode == "merge": + caly_evo_step = Step( + "caly-evo-step", + template=PythonOPTemplate( + caly_evo_step_op, + python_packages=upload_python_packages, + **run_template_config, + ), + slices=Slices( + input_parameter=[ + "task_name", + ], + input_artifact=[ + "input_file", + "results", + "step", + "opt_results_dir", + "caly_run_opt_file", + "caly_check_opt_file", + ], + output_artifact=["traj_results"], + ), + parameters={ + "block_id": prep_run_caly_steps.inputs.parameters["block_id"], + "expl_config": prep_run_caly_steps.inputs.parameters["explore_config"], + "task_name": prep_caly_input.outputs.parameters["task_names"], + "iter_num": "{{item}}", + }, + artifacts={ + "models": prep_run_caly_steps.inputs.artifacts["models"], + "input_file": prep_caly_input.outputs.artifacts["input_dat_files"], + "caly_run_opt_file": prep_caly_input.outputs.artifacts[ + "caly_run_opt_files" + ], + "caly_check_opt_file": prep_caly_input.outputs.artifacts[ + "caly_check_opt_files" + ], + "results": temp_value, + "step": temp_value, + "opt_results_dir": temp_value, + "qhull_input": temp_value, + }, + key=step_keys["caly-evo-step-{{item}}"], + executor=caly_evo_step_executor, + **caly_evo_step_config, + ) + prep_run_caly_steps.add(caly_evo_step) + elif expl_mode == "default": + caly_evo_step = Step( + name="caly-evo-step", + template=caly_evo_step_op, + slices=Slices( + input_parameter=[ + "task_name", + ], + input_artifact=[ + "input_file", + "results", + "step", + "opt_results_dir", + "caly_run_opt_file", + "caly_check_opt_file", + ], + output_artifact=["traj_results"], + ), + parameters={ + "block_id": prep_run_caly_steps.inputs.parameters["block_id"], + "expl_config": prep_run_caly_steps.inputs.parameters["explore_config"], + "task_name": prep_caly_input.outputs.parameters["task_names"], + "iter_num": "{{item}}", + }, + artifacts={ + "models": prep_run_caly_steps.inputs.artifacts["models"], + "input_file": prep_caly_input.outputs.artifacts["input_dat_files"], + "caly_run_opt_file": prep_caly_input.outputs.artifacts[ + "caly_run_opt_files" + ], + "caly_check_opt_file": prep_caly_input.outputs.artifacts[ + "caly_check_opt_files" + ], + "results": temp_value, + "step": temp_value, + "opt_results_dir": temp_value, + "qhull_input": temp_value, + }, + key=step_keys["caly-evo-step-{{item}}"], + executor=caly_evo_step_executor, + **caly_evo_step_config, + ) + prep_run_caly_steps.add(caly_evo_step) + else: + raise KeyError(f"Unknown key: `{expl_mode}`, support `default` and `merge`.") # prep_caly_model_devi prep_caly_model_devi = Step( @@ -247,7 +293,7 @@ def _prep_run_caly( "template_slice_config": template_slice_config, }, artifacts={ - "traj_results": caly_evo_step_merge.outputs.artifacts["traj_results"], + "traj_results": caly_evo_step.outputs.artifacts["traj_results"], }, key="%s--prep-caly-model-devi" % (prep_run_caly_steps.inputs.parameters["block_id"],), diff --git a/tests/test_merge_caly_evo_step.py b/tests/test_merge_caly_evo_step.py index 00f18160..3f22ba56 100644 --- a/tests/test_merge_caly_evo_step.py +++ b/tests/test_merge_caly_evo_step.py @@ -278,14 +278,13 @@ def tearDown(self): def test_caly_evo_step(self): steps = CalyEvoStepMerge( - mode="debug", name="caly-evo-step", collect_run_caly=MockedCollRunCaly, prep_dp_optim=PrepCalyDPOptim, run_dp_optim=MockedRunCalyDPOptim, prep_config=default_config, run_config=default_config, - upload_python_packages=upload_python_packages, + upload_python_packages=None, ) caly_evo_step = Step( "caly-evo-step", diff --git a/tests/test_prep_run_caly.py b/tests/test_prep_run_caly.py index 7da80ed8..ddcd657e 100644 --- a/tests/test_prep_run_caly.py +++ b/tests/test_prep_run_caly.py @@ -84,6 +84,9 @@ from dpgen2.op.run_caly_model_devi import ( RunCalyModelDevi, ) +from dpgen2.superop.caly_evo_step import ( + CalyEvoStep, +) from dpgen2.superop.prep_run_calypso import ( PrepRunCaly, ) @@ -96,18 +99,6 @@ }, } ) -run_default_config = normalize_step_dict( - { - "template_config": { - "image": default_image, - }, - "template_slice_config": { - "group_size": 2, - "pool_size": 1, - "model_devi_group_size": 30, - }, - } -) def make_task_group_list(njobs): @@ -148,9 +139,21 @@ def tearDown(self): for i in Path().glob("prep-run-caly-step*"): shutil.rmtree(i, ignore_errors=True) - def test_caly_evo_step_merge_debug_mode(self): + def test_caly_evo_step_merge_merge_mode(self): + run_default_config = normalize_step_dict( + { + "mode": "merge", + "template_config": { + "image": default_image, + }, + "template_slice_config": { + "group_size": 2, + "pool_size": 1, + "model_devi_group_size": 30, + }, + } + ) caly_evo_step_op = CalyEvoStepMerge( - mode="debug", name="caly-evo-step", collect_run_caly=MockedCollRunCaly, prep_dp_optim=PrepCalyDPOptim, @@ -194,9 +197,60 @@ def test_caly_evo_step_merge_debug_mode(self): step = wf.query_step(name="prep-run-caly-step")[0] self.assertEqual(step.phase, "Succeeded") - # download_artifact(step.outputs.artifacts["model_devis"]) - # download_artifact(step.outputs.artifacts["trajs"]) - # download_artifact(step.outputs.artifacts["logs"]) + def test_caly_evo_step_merge_default_mode(self): + run_default_config = normalize_step_dict( + { + "mode": "default", + "template_config": { + "image": default_image, + }, + "template_slice_config": { + "group_size": 2, + "pool_size": 1, + "model_devi_group_size": 30, + }, + } + ) + caly_evo_step_op = CalyEvoStep( + "caly-evo-run", + MockedCollRunCaly, + PrepCalyDPOptim, + MockedRunCalyDPOptim, + prep_config=prep_default_config, + run_config=run_default_config, + upload_python_packages=upload_python_packages, + ) + prep_run_caly_op = PrepRunCaly( + "prep-run-calypso", + PrepCalyInput, + caly_evo_step_op, + PrepCalyModelDevi, + MockedRunCalyModelDevi, + prep_config=prep_default_config, + run_config=run_default_config, + upload_python_packages=upload_python_packages, + ) + prep_run_caly_step = Step( + "prep-run-caly-step", + template=prep_run_caly_op, + parameters={ + "block_id": self.block_id, + "expl_task_grp": self.expl_task_grp, + "explore_config": self.expl_config, + "type_map": self.type_map, + }, + artifacts={ + "models": self.models, + }, + ) + + wf = Workflow(name="prep-run-caly-step", host=default_host) + wf.add(prep_run_caly_step) + wf.submit() - # for ii in step.outputs.parameters["task_names"].value: - # self.check_run_lmp_output(ii, self.model_list) + while wf.query_status() in ["Pending", "Running"]: + time.sleep(4) + + self.assertEqual(wf.query_status(), "Succeeded") + step = wf.query_step(name="prep-run-caly-step")[0] + self.assertEqual(step.phase, "Succeeded")