Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions dpgen2/entrypoint/args.py
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,7 @@ def fp_args(inputs, run):
doc_inputs_config = "Configuration for preparing vasp inputs"
doc_run_config = "Configuration for running vasp tasks"
doc_task_max = "Maximum number of vasp tasks for each iteration"
doc_async_ratio = "Configuration ratio for async fp"
doc_extra_output_files = "Extra output file names, support wildcards"

return [
Expand All @@ -485,6 +486,13 @@ def fp_args(inputs, run):
doc=doc_run_config,
),
Argument("task_max", int, optional=True, default=10, doc=doc_task_max),
Argument(
"async_ratio",
float,
optional=True,
default=0.0,
doc=doc_async_ratio,
),
Argument(
"extra_output_files",
list,
Expand Down
9 changes: 9 additions & 0 deletions dpgen2/entrypoint/submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ def make_concurrent_learning_op(
valid_data: Optional[S3Artifact] = None,
train_optional_files: Optional[List[str]] = None,
explore_config: Optional[dict] = None,
async_fp: bool = False,
):
if train_style in ("dp", "dp-dist"):
prep_run_train_op = PrepRunDPTrain(
Expand Down Expand Up @@ -268,6 +269,7 @@ def make_concurrent_learning_op(
select_confs_config=select_confs_config,
collect_data_config=collect_data_config,
upload_python_packages=upload_python_packages,
async_fp=async_fp,
)
# dpgen
dpgen_op = ConcurrentLearning(
Expand Down Expand Up @@ -308,6 +310,7 @@ def get_conf_filters(config):
def make_naive_exploration_scheduler_without_conf(config, explore_style):
model_devi_jobs = config["explore"]["stages"]
fp_task_max = config["fp"]["task_max"]
fp_async_ratio = config["fp"]["async_ratio"]
max_numb_iter = config["explore"]["max_numb_iter"]
fatal_at_max = config["explore"]["fatal_at_max"]
convergence = config["explore"]["convergence"]
Expand All @@ -325,6 +328,7 @@ def make_naive_exploration_scheduler_without_conf(config, explore_style):
report,
fp_task_max,
conf_filters,
fp_async_ratio,
)

for job_ in model_devi_jobs:
Expand Down Expand Up @@ -368,6 +372,7 @@ def make_lmp_naive_exploration_scheduler(config):
type_map = config["inputs"]["type_map"]
numb_models = config["train"]["numb_models"]
fp_task_max = config["fp"]["task_max"]
fp_async_ratio = config["fp"]["async_ratio"]
max_numb_iter = config["explore"]["max_numb_iter"]
fatal_at_max = config["explore"]["fatal_at_max"]
convergence = config["explore"]["convergence"]
Expand All @@ -385,6 +390,7 @@ def make_lmp_naive_exploration_scheduler(config):
report,
fp_task_max,
conf_filters,
fp_async_ratio,
)

sys_configs_lmp = []
Expand Down Expand Up @@ -490,6 +496,7 @@ def workflow_concurrent_learning(
cl_step_config = config["step_configs"]["cl_step_config"]
upload_python_packages = config.get("upload_python_packages", None)
train_optional_files = config["train"].get("optional_files", None)
fp_async_ratio = config["fp"]["async_ratio"]

if train_style == "dp":
init_models_paths = config["train"].get("init_models_paths", None)
Expand Down Expand Up @@ -556,6 +563,7 @@ def workflow_concurrent_learning(
valid_data=valid_data,
train_optional_files=train_optional_files,
explore_config=explore_config,
async_fp=(fp_async_ratio > 0),
)
scheduler = make_naive_exploration_scheduler(config)

Expand Down Expand Up @@ -664,6 +672,7 @@ def workflow_concurrent_learning(
"init_models": init_models,
"init_data": init_data,
"iter_data": iter_data,
"async_confs": upload_artifact([]),
},
)
return dpgen_step
Expand Down
2 changes: 1 addition & 1 deletion dpgen2/exploration/selector/conf_selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,5 @@ def select(
model_devis: Union[List[Path], List[HDF5Dataset]],
type_map: Optional[List[str]] = None,
optional_outputs: Optional[List[Path]] = None,
) -> Tuple[List[Path], ExplorationReport]:
) -> Tuple[List[Path], List[Path], ExplorationReport]:
pass
35 changes: 33 additions & 2 deletions dpgen2/exploration/selector/conf_selector_frame.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import copy
import math
import random
from collections import (
Counter,
)
Expand Down Expand Up @@ -48,19 +50,21 @@ def __init__(
report: ExplorationReport,
max_numb_sel: Optional[int] = None,
conf_filters: Optional[ConfFilters] = None,
async_ratio: float = 0.0,
):
self.max_numb_sel = max_numb_sel
self.conf_filters = conf_filters
self.traj_render = traj_render
self.report = report
self.async_ratio = async_ratio

Comment on lines +53 to 60
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Validate async_ratio in constructor to prevent out-of-bounds selection and runtime errors

Negative or >1 ratios can lead to ValueError in random.sample and undefined split behavior. Guard early.

Apply this diff:

     def __init__(
         self,
         traj_render: TrajRender,
         report: ExplorationReport,
         max_numb_sel: Optional[int] = None,
         conf_filters: Optional[ConfFilters] = None,
-        async_ratio: float = 0.0,
+        async_ratio: float = 0.0,
     ):
         self.max_numb_sel = max_numb_sel
         self.conf_filters = conf_filters
         self.traj_render = traj_render
         self.report = report
-        self.async_ratio = async_ratio
+        # Validate [0, 1] to keep downstream splitting safe
+        if not (0.0 <= async_ratio <= 1.0):
+            raise ValueError(f"async_ratio must be within [0, 1], got {async_ratio}")
+        self.async_ratio = float(async_ratio)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async_ratio: float = 0.0,
):
self.max_numb_sel = max_numb_sel
self.conf_filters = conf_filters
self.traj_render = traj_render
self.report = report
self.async_ratio = async_ratio
async_ratio: float = 0.0,
):
self.max_numb_sel = max_numb_sel
self.conf_filters = conf_filters
self.traj_render = traj_render
self.report = report
# Validate [0, 1] to keep downstream splitting safe
if not (0.0 <= async_ratio <= 1.0):
raise ValueError(f"async_ratio must be within [0, 1], got {async_ratio}")
self.async_ratio = float(async_ratio)
🤖 Prompt for AI Agents
In dpgen2/exploration/selector/conf_selector_frame.py around lines 53 to 60,
validate the async_ratio constructor argument to ensure it is within [0.0, 1.0];
if async_ratio < 0.0 or async_ratio > 1.0 raise a ValueError with a clear
message indicating the invalid value and expected range so invalid ratios cannot
later cause random.sample or splitting errors.

def select(
self,
trajs: Union[List[Path], List[HDF5Dataset]],
model_devis: Union[List[Path], List[HDF5Dataset]],
type_map: Optional[List[str]] = None,
optional_outputs: Optional[List[Path]] = None,
) -> Tuple[List[Path], ExplorationReport]:
) -> Tuple[List[Path], List[Path], ExplorationReport]:
"""Select configurations

Parameters
Expand All @@ -81,6 +85,8 @@ def select(
-------
confs : List[Path]
The selected confgurations, stored in a folder in deepmd/npy format, can be parsed as dpdata.MultiSystems. The `list` only has one item.
async_confs : List[Path]
The selected confgurations for async fp, stored in a folder in deepmd/npy format, can be parsed as dpdata.MultiSystems. The `list` only has one item.
report : ExplorationReport
The exploration report recoding the status of the exploration.

Expand All @@ -102,8 +108,33 @@ def select(
optional_outputs,
)

async_confs = []
if self.async_ratio > 0:
async_ms, ms = split_multisystems(ms, self.async_ratio)
if len(async_ms) > 0:
async_out_path = Path("async_confs")
async_out_path.mkdir(exist_ok=True)
async_ms.to_deepmd_npy(async_out_path) # type: ignore
async_confs = [async_out_path]

out_path = Path("confs")
out_path.mkdir(exist_ok=True)
ms.to_deepmd_npy(out_path) # type: ignore

return [out_path], copy.deepcopy(self.report)
return [out_path], async_confs, copy.deepcopy(self.report)


def split_multisystems(ms, ratio):
selected_ms = dpdata.MultiSystems()
unselected_ms = dpdata.MultiSystems()
for s in ms:
nsel = math.floor(len(s) * ratio)
if random.random() < len(s) * ratio - nsel:
nsel += 1
selected_indices = random.sample(range(len(s)), nsel)
unselected_indices = list(set(range(len(s))).difference(selected_indices))
if len(selected_indices) > 0:
selected_ms.append(s.sub_system(selected_indices))
if len(unselected_indices) > 0:
unselected_ms.append(s.sub_system(unselected_indices))
return selected_ms, unselected_ms
5 changes: 5 additions & 0 deletions dpgen2/flow/dpgen_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ def __init__(
"init_models": InputArtifact(optional=True),
"init_data": InputArtifact(),
"iter_data": InputArtifact(),
"async_confs": InputArtifact(optional=True),
}
self._output_parameters = {
"exploration_scheduler": OutputParameter(),
Expand Down Expand Up @@ -279,6 +280,7 @@ def __init__(
"init_models": InputArtifact(optional=True),
"init_data": InputArtifact(),
"iter_data": InputArtifact(),
"async_confs": InputArtifact(optional=True),
}
self._output_parameters = {
"exploration_scheduler": OutputParameter(),
Expand Down Expand Up @@ -376,6 +378,7 @@ def _loop(
"init_models": steps.inputs.artifacts["init_models"],
"init_data": steps.inputs.artifacts["init_data"],
"iter_data": steps.inputs.artifacts["iter_data"],
"async_confs": steps.inputs.artifacts["async_confs"],
},
key=step_keys["block"],
)
Expand Down Expand Up @@ -448,6 +451,7 @@ def _loop(
"init_models": block_step.outputs.artifacts["models"],
"init_data": steps.inputs.artifacts["init_data"],
"iter_data": block_step.outputs.artifacts["iter_data"],
"async_confs": block_step.outputs.artifacts["async_confs"],
},
when="%s == false" % (scheduler_step.outputs.parameters["converged"]),
)
Expand Down Expand Up @@ -552,6 +556,7 @@ def _dpgen(
"init_models": steps.inputs.artifacts["init_models"],
"init_data": steps.inputs.artifacts["init_data"],
"iter_data": steps.inputs.artifacts["iter_data"],
"async_confs": steps.inputs.artifacts["async_confs"],
},
key="--".join(["%s" % id_step.outputs.parameters["block_id"], loop_key]),
)
Expand Down
4 changes: 3 additions & 1 deletion dpgen2/op/collect_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def get_input_sign(cls):
default=CollectData.default_optional_parameter,
),
"labeled_data": Artifact(List[Path]),
"async_labeled_data": Artifact(List[Path], optional=True),
"iter_data": Artifact(List[Path]),
Comment on lines +53 to 54
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Guard optional async_labeled_data to avoid KeyError; simplify concatenation

Accessing ip["async_labeled_data"] raises KeyError when the optional artifact is absent. Use .get() and default to an empty list. Then drop the inner “or []” in the concatenation.

Apply:

@@ class CollectData(OP):
-                "async_labeled_data": Artifact(List[Path], optional=True),
+                "async_labeled_data": Artifact(List[Path], optional=True),
@@ def execute(self, ip: OPIO) -> OPIO:
-        async_labeled_data = ip["async_labeled_data"]
+        async_labeled_data = ip.get("async_labeled_data") or []
@@ def execute(self, ip: OPIO) -> OPIO:
-        for ii in labeled_data + (async_labeled_data or []):
+        for ii in labeled_data + async_labeled_data:

Also applies to: 95-100

🤖 Prompt for AI Agents
In dpgen2/op/collect_data.py around lines 53-54 (and similarly lines 95-100),
the code indexes the optional artifact ip["async_labeled_data"] which raises
KeyError when absent; change those accesses to ip.get("async_labeled_data", [])
so missing keys yield an empty list, and simplify concatenations by removing the
inner "or []" (i.e., concatenate using ip.get("async_labeled_data", []) +
ip["iter_data"] or similar) to avoid redundant fallbacks.

}
)
Expand Down Expand Up @@ -91,10 +92,11 @@ def execute(
type_map = ip["type_map"]
mixed_type = ip["optional_parameter"]["mixed_type"]
labeled_data = ip["labeled_data"]
async_labeled_data = ip["async_labeled_data"]
iter_data = ip["iter_data"]

ms = dpdata.MultiSystems(type_map=type_map)
for ii in labeled_data:
for ii in labeled_data + (async_labeled_data or []):
if ii and len(list(ii.rglob("fparam.npy"))) > 0:
setup_ele_temp(False)
if ii and len(list(ii.rglob("aparam.npy"))) > 0:
Expand Down
4 changes: 3 additions & 1 deletion dpgen2/op/select_confs.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def get_output_sign(cls):
{
"report": BigParameter(ExplorationReport),
"confs": Artifact(List[Path]),
"async_confs": Artifact(List[Path]),
}
)

Expand Down Expand Up @@ -88,7 +89,7 @@ def execute(
trajs, model_devis, optional_outputs
)

confs, report = conf_selector.select(
confs, async_confs, report = conf_selector.select(
trajs,
model_devis,
type_map=type_map,
Expand All @@ -99,6 +100,7 @@ def execute(
{
"report": report,
"confs": confs,
"async_confs": async_confs,
}
)

Expand Down
29 changes: 28 additions & 1 deletion dpgen2/superop/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ def __init__(
select_confs_config: dict = normalize_step_dict({}),
collect_data_config: dict = normalize_step_dict({}),
upload_python_packages: Optional[List[os.PathLike]] = None,
async_fp: bool = False,
):
self._input_parameters = {
"block_id": InputParameter(),
Expand All @@ -115,6 +116,7 @@ def __init__(
"init_models": InputArtifact(optional=True),
"init_data": InputArtifact(),
"iter_data": InputArtifact(),
"async_confs": InputArtifact(optional=True),
}
self._output_parameters = {
"exploration_report": OutputParameter(),
Expand All @@ -123,6 +125,7 @@ def __init__(
"models": OutputArtifact(),
"iter_data": OutputArtifact(),
"trajs": OutputArtifact(),
"async_confs": OutputArtifact(),
}

super().__init__(
Expand Down Expand Up @@ -163,6 +166,7 @@ def __init__(
select_confs_config=select_confs_config,
collect_data_config=collect_data_config,
upload_python_packages=upload_python_packages,
async_fp=async_fp,
)

@property
Expand Down Expand Up @@ -198,6 +202,7 @@ def _block_cl(
select_confs_config: dict = normalize_step_dict({}),
collect_data_config: dict = normalize_step_dict({}),
upload_python_packages: Optional[List[os.PathLike]] = None,
async_fp: bool = False,
):
select_confs_config = deepcopy(select_confs_config)
collect_data_config = deepcopy(collect_data_config)
Expand Down Expand Up @@ -231,7 +236,25 @@ def _block_cl(
["%s" % block_steps.inputs.parameters["block_id"], "prep-run-train"]
),
)
block_steps.add(prep_run_dp_train)
if async_fp:
async_fp_step = Step(
name=name + "-async-prep-run-fp",
template=prep_run_fp_op,
parameters={
"block_id": f"{block_steps.inputs.parameters['block_id']}-async",
"fp_config": block_steps.inputs.parameters["fp_config"],
"type_map": block_steps.inputs.parameters["type_map"],
},
artifacts={
"confs": block_steps.inputs.artifacts["async_confs"],
},
key=f"{block_steps.inputs.parameters['block_id']}-async--prep-run-fp",
)
block_steps.add([prep_run_dp_train, async_fp_step])
async_labeled_data = async_fp_step.outputs.artifacts["labeled_data"]
else:
block_steps.add(prep_run_dp_train)
async_labeled_data = None
Comment on lines +239 to +257
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

⚠️ Potential issue

Gate the async FP step when no async_confs are provided to avoid runtime failures

If async_fp is True but the block receives no async_confs (typical for the first iteration), PrepRunFp may receive a missing/empty artifact and fail. Use an Argo “when” guard leveraging argo_len so the step runs only when input exists.

Apply this diff:

-    if async_fp:
-        async_fp_step = Step(
+    if async_fp:
+        async_fp_step = Step(
             name=name + "-async-prep-run-fp",
             template=prep_run_fp_op,
             parameters={
                 "block_id": f"{block_steps.inputs.parameters['block_id']}-async",
                 "fp_config": block_steps.inputs.parameters["fp_config"],
                 "type_map": block_steps.inputs.parameters["type_map"],
             },
             artifacts={
                 "confs": block_steps.inputs.artifacts["async_confs"],
             },
+            # Run only when upstream provided async_confs
+            when=argo_len(block_steps.inputs.artifacts["async_confs"]) > 0,
             key=f"{block_steps.inputs.parameters['block_id']}-async--prep-run-fp",
         )
         block_steps.add([prep_run_dp_train, async_fp_step])
         async_labeled_data = async_fp_step.outputs.artifacts["labeled_data"]
     else:

Follow-up: if you adopt the conf_selector change to return [] when confs are empty, consider adding a similar guard to the normal prep-run-fp step keyed by the length of select_confs.outputs.artifacts["confs"] to keep the DAG robust when the split is 100% async.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if async_fp:
async_fp_step = Step(
name=name + "-async-prep-run-fp",
template=prep_run_fp_op,
parameters={
"block_id": f"{block_steps.inputs.parameters['block_id']}-async",
"fp_config": block_steps.inputs.parameters["fp_config"],
"type_map": block_steps.inputs.parameters["type_map"],
},
artifacts={
"confs": block_steps.inputs.artifacts["async_confs"],
},
key=f"{block_steps.inputs.parameters['block_id']}-async--prep-run-fp",
)
block_steps.add([prep_run_dp_train, async_fp_step])
async_labeled_data = async_fp_step.outputs.artifacts["labeled_data"]
else:
block_steps.add(prep_run_dp_train)
async_labeled_data = None
if async_fp:
async_fp_step = Step(
name=name + "-async-prep-run-fp",
template=prep_run_fp_op,
parameters={
"block_id": f"{block_steps.inputs.parameters['block_id']}-async",
"fp_config": block_steps.inputs.parameters["fp_config"],
"type_map": block_steps.inputs.parameters["type_map"],
},
artifacts={
"confs": block_steps.inputs.artifacts["async_confs"],
},
# Run only when upstream provided async_confs
when=argo_len(block_steps.inputs.artifacts["async_confs"]) > 0,
key=f"{block_steps.inputs.parameters['block_id']}-async--prep-run-fp",
)
block_steps.add([prep_run_dp_train, async_fp_step])
async_labeled_data = async_fp_step.outputs.artifacts["labeled_data"]
else:
block_steps.add(prep_run_dp_train)
async_labeled_data = None
🤖 Prompt for AI Agents
In dpgen2/superop/block.py around lines 239-257, the async FP Step is created
and added unconditionally when async_fp is True which can cause PrepRunFp to
fail if the artifact async_confs is missing/empty; guard the async_fp_step with
an Argo "when" condition that checks
argo_len(block_steps.inputs.artifacts['async_confs']) > 0 so the step only
executes when async_confs exists and has items (apply the same condition to its
key/name logic as needed), and as a follow-up consider adding a similar
argo_len-based "when" guard to the normal prep-run-fp step using
select_confs.outputs.artifacts['confs'] to protect against 100% async splits.


prep_run_explore = Step(
name=name + "-prep-run-explore",
Expand Down Expand Up @@ -309,6 +332,7 @@ def _block_cl(
artifacts={
"iter_data": block_steps.inputs.artifacts["iter_data"],
"labeled_data": prep_run_fp.outputs.artifacts["labeled_data"],
"async_labeled_data": async_labeled_data,
},
key=step_keys["collect-data"],
executor=collect_data_executor,
Expand All @@ -328,5 +352,8 @@ def _block_cl(
block_steps.outputs.artifacts["trajs"]._from = prep_run_explore.outputs.artifacts[
"trajs"
]
block_steps.outputs.artifacts["async_confs"]._from = select_confs.outputs.artifacts[
"async_confs"
]

return block_steps
1 change: 1 addition & 0 deletions dpgen2/utils/dflow_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def matched_step_key(
re.match(f"iter-[0-9]*--{jj}-[0-9]*", kk)
or re.match(f"iter-[0-9]*--{jj}", kk)
or re.match(f"init--{jj}", kk)
or re.match(f"iter-[0-9]*-async--{jj}", kk)
):
ret.append(kk)
continue
Expand Down
Loading
Loading