Skip to content

Commit a739058

Browse files
[Parallel Run Step] Support run settings binding to pipeline literal input. (Azure#29355)
* allow prs run settings binding to literal input * add test case * e2e test cases * fix lint * fix black --------- Co-authored-by: Xiaole Wen <xiwe@microsoft.com>
1 parent 683849a commit a739058

10 files changed

+3096
-186
lines changed

sdk/ml/azure-ai-ml/azure/ai/ml/entities/_builders/parallel.py

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,6 @@
1212
from typing import Dict, List, Optional, Union
1313

1414
from marshmallow import Schema
15-
16-
from azure.ai.ml._restclient.v2023_02_01_preview.models import JobResourceConfiguration as RestJobResourceConfiguration
1715
from azure.ai.ml.constants._common import ARM_ID_PREFIX
1816
from azure.ai.ml.constants._component import NodeType
1917
from azure.ai.ml.entities._component.component import Component
@@ -166,7 +164,7 @@ def __init__(
166164
self.environment_variables = {} if environment_variables is None else environment_variables
167165

168166
if isinstance(self.component, ParallelComponent):
169-
self.resources = self.resources or self.component.resources
167+
self.resources = self.resources or copy.deepcopy(self.component.resources)
170168
self.input_data = self.input_data or self.component.input_data
171169
self.max_concurrency_per_instance = (
172170
self.max_concurrency_per_instance or self.component.max_concurrency_per_instance
@@ -175,7 +173,7 @@ def __init__(
175173
self.mini_batch_error_threshold or self.component.mini_batch_error_threshold
176174
)
177175
self.mini_batch_size = self.mini_batch_size or self.component.mini_batch_size
178-
self.partition_keys = self.partition_keys or self.component.partition_keys
176+
self.partition_keys = self.partition_keys or copy.deepcopy(self.component.partition_keys)
179177

180178
if not self.task:
181179
self.task = self.component.task
@@ -266,9 +264,9 @@ def _attr_type_map(cls) -> dict:
266264
"resources": (dict, JobResourceConfiguration),
267265
"task": (dict, ParallelTask),
268266
"logging_level": str,
269-
"max_concurrency_per_instance": int,
270-
"error_threshold": int,
271-
"mini_batch_error_threshold": int,
267+
"max_concurrency_per_instance": (str, int),
268+
"error_threshold": (str, int),
269+
"mini_batch_error_threshold": (str, int),
272270
"environment_variables": dict,
273271
}
274272

@@ -357,8 +355,7 @@ def _from_rest_object_to_init_params(cls, obj: dict) -> Dict:
357355
obj["task"].environment = task_env[len(ARM_ID_PREFIX) :]
358356

359357
if "resources" in obj and obj["resources"]:
360-
resources = RestJobResourceConfiguration.from_dict(obj["resources"])
361-
obj["resources"] = JobResourceConfiguration._from_rest_object(resources)
358+
obj["resources"] = JobResourceConfiguration._from_dict(obj["resources"])
362359

363360
if "partition_keys" in obj and obj["partition_keys"]:
364361
obj["partition_keys"] = json.dumps(obj["partition_keys"])

sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/job_resource_configuration.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,12 @@ def _to_rest_object(self) -> RestJobResourceConfiguration:
146146
shm_size=self.shm_size,
147147
)
148148

149+
@classmethod
150+
def _from_dict(cls, dct: dict):
151+
"""Convert a dict to an Input object."""
152+
obj = cls(**dict(dct.items()))
153+
return obj
154+
149155
@classmethod
150156
def _from_rest_object(cls, obj: Optional[RestJobResourceConfiguration]) -> Optional["JobResourceConfiguration"]:
151157
if obj is None:

sdk/ml/azure-ai-ml/tests/dsl/e2etests/test_dsl_pipeline.py

Lines changed: 173 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,81 @@ def check_name_and_version(output, output_name, output_version):
6464
assert output.version == output_version
6565

6666

67+
def build_pipeline_with_parallel_run_function(data, literal_input=None):
68+
# command job with dict distribution
69+
environment = "AzureML-sklearn-1.0-ubuntu20.04-py38-cpu:33"
70+
inputs = {
71+
"job_data_path": Input(
72+
type=AssetTypes.MLTABLE,
73+
path="./tests/test_configs/dataset/mnist-data",
74+
mode=InputOutputModes.EVAL_MOUNT,
75+
),
76+
"job_data_path_optional": Input(
77+
type=AssetTypes.MLTABLE,
78+
mode=InputOutputModes.EVAL_MOUNT,
79+
optional=True,
80+
),
81+
}
82+
input_data = "${{inputs.job_data_path}}"
83+
outputs = {"job_output_path": Output(type=AssetTypes.URI_FOLDER, mode="rw_mount")}
84+
expected_resources = {"instance_count": 2}
85+
86+
task = RunFunction(
87+
code="./tests/test_configs/dsl_pipeline/parallel_component_with_file_input/src/",
88+
entry_script="score.py",
89+
program_arguments="--job_output_path ${{outputs.job_output_path}}",
90+
environment=environment,
91+
)
92+
logging_level = "DEBUG"
93+
max_concurrency_per_instance = 1
94+
error_threshold = 1
95+
mini_batch_error_threshold = 1
96+
mini_batch_size = "5"
97+
98+
# Parallel from parallel_run_function()
99+
parallel_function = parallel_run_function(
100+
display_name="my-evaluate-job",
101+
inputs=inputs,
102+
outputs=outputs,
103+
mini_batch_size=mini_batch_size,
104+
task=task,
105+
logging_level=logging_level,
106+
max_concurrency_per_instance=max_concurrency_per_instance,
107+
error_threshold=error_threshold,
108+
mini_batch_error_threshold=mini_batch_error_threshold,
109+
resources=expected_resources,
110+
input_data=input_data,
111+
)
112+
if literal_input is None:
113+
114+
@dsl.pipeline(experiment_name="test_pipeline_with_parallel_function", default_compute="cpu-cluster")
115+
def parallel_in_pipeline(job_data_path):
116+
node1 = parallel_function(job_data_path=job_data_path)
117+
# TODO 2104247: node1.task will be kept as a local path when submitting the pipeline job.
118+
node1.task = None
119+
return {
120+
"pipeline_output": node1.outputs.job_output_path,
121+
}
122+
123+
return parallel_in_pipeline(data)
124+
else:
125+
126+
@dsl.pipeline(experiment_name="test_pipeline_with_parallel_function", default_compute="cpu-cluster")
127+
def parallel_in_pipeline(job_data_path, literal_input):
128+
node1 = parallel_function(job_data_path=job_data_path)
129+
# TODO 2104247: node1.task will be kept as a local path when submitting the pipeline job.
130+
node1.task = None
131+
node1.resources.instance_count = literal_input
132+
node1.max_concurrency_per_instance = literal_input
133+
node1.error_threshold = literal_input
134+
node1.mini_batch_error_threshold = literal_input
135+
return {
136+
"pipeline_output": node1.outputs.job_output_path,
137+
}
138+
139+
return parallel_in_pipeline(data, literal_input)
140+
141+
67142
@pytest.mark.usefixtures(
68143
"enable_environment_id_arm_expansion",
69144
"enable_pipeline_private_preview_features",
@@ -1568,6 +1643,44 @@ def parallel_in_pipeline(job_data_path, score_model):
15681643
assert_job_input_output_types(pipeline_job)
15691644
assert pipeline_job.settings.default_compute == "cpu-cluster"
15701645

1646+
def test_parallel_components_with_tabular_input_bind_to_literal_input(self, client: MLClient) -> None:
1647+
components_dir = tests_root_dir / "test_configs/dsl_pipeline/parallel_component_with_tabular_input"
1648+
1649+
batch_inference = load_component(source=str(components_dir / "tabular_input_e2e.yml"))
1650+
1651+
# Construct pipeline
1652+
@dsl.pipeline(default_compute="cpu-cluster")
1653+
def parallel_in_pipeline(job_data_path, score_model, literal_input):
1654+
batch_inference_node = batch_inference(job_data_path=job_data_path, score_model=score_model)
1655+
batch_inference_node.mini_batch_size = 5
1656+
batch_inference_node.max_concurrency_per_instance = literal_input
1657+
batch_inference_node.error_threshold = literal_input
1658+
batch_inference_node.mini_batch_error_threshold = literal_input
1659+
1660+
pipeline = parallel_in_pipeline(
1661+
job_data_path=Input(
1662+
type=AssetTypes.MLTABLE,
1663+
path="./tests/test_configs/dataset/neural-iris-mltable",
1664+
mode=InputOutputModes.DIRECT,
1665+
),
1666+
score_model=Input(
1667+
path="./tests/test_configs/model", type=AssetTypes.URI_FOLDER, mode=InputOutputModes.DOWNLOAD
1668+
),
1669+
literal_input=2,
1670+
)
1671+
# submit pipeline job
1672+
pipeline_job = assert_job_cancel(pipeline, client, experiment_name="parallel_in_pipeline")
1673+
1674+
# check required fields in job dict
1675+
job_dict = pipeline_job._to_dict()
1676+
expected_keys = ["status", "properties", "creation_context"]
1677+
for k in expected_keys:
1678+
assert k in job_dict.keys(), f"failed to get {k} in {job_dict}"
1679+
1680+
# original job did not change
1681+
assert_job_input_output_types(pipeline_job)
1682+
assert pipeline_job.settings.default_compute == "cpu-cluster"
1683+
15711684
def test_parallel_components_with_file_input(self, client: MLClient) -> None:
15721685
components_dir = tests_root_dir / "test_configs/dsl_pipeline/parallel_component_with_file_input"
15731686

@@ -1599,67 +1712,12 @@ def parallel_in_pipeline(job_data_path):
15991712
assert pipeline_job.settings.default_compute == "cpu-cluster"
16001713

16011714
def test_parallel_run_function(self, client: MLClient):
1602-
# command job with dict distribution
1603-
environment = "AzureML-sklearn-1.0-ubuntu20.04-py38-cpu:33"
1604-
inputs = {
1605-
"job_data_path": Input(
1606-
type=AssetTypes.MLTABLE,
1607-
path="./tests/test_configs/dataset/mnist-data",
1608-
mode=InputOutputModes.EVAL_MOUNT,
1609-
),
1610-
"job_data_path_optional": Input(
1611-
type=AssetTypes.MLTABLE,
1612-
mode=InputOutputModes.EVAL_MOUNT,
1613-
optional=True,
1614-
),
1615-
}
1616-
input_data = "${{inputs.job_data_path}}"
1617-
outputs = {"job_output_path": Output(type=AssetTypes.URI_FOLDER, mode="rw_mount")}
1618-
expected_resources = {"instance_count": 2}
1619-
1620-
task = RunFunction(
1621-
code="./tests/test_configs/dsl_pipeline/parallel_component_with_file_input/src/",
1622-
entry_script="score.py",
1623-
program_arguments="--job_output_path ${{outputs.job_output_path}}",
1624-
environment=environment,
1625-
)
1626-
logging_level = "DEBUG"
1627-
max_concurrency_per_instance = 1
1628-
error_threshold = 1
1629-
mini_batch_error_threshold = 1
1630-
mini_batch_size = "5"
1631-
1632-
# Parallel from parallel_run_function()
1633-
parallel_function = parallel_run_function(
1634-
display_name="my-evaluate-job",
1635-
inputs=inputs,
1636-
outputs=outputs,
1637-
mini_batch_size=mini_batch_size,
1638-
task=task,
1639-
logging_level=logging_level,
1640-
max_concurrency_per_instance=max_concurrency_per_instance,
1641-
error_threshold=error_threshold,
1642-
mini_batch_error_threshold=mini_batch_error_threshold,
1643-
resources=expected_resources,
1644-
input_data=input_data,
1645-
)
1646-
16471715
data = Input(
16481716
type=AssetTypes.MLTABLE,
16491717
path="./tests/test_configs/dataset/mnist-data",
16501718
mode=InputOutputModes.EVAL_MOUNT,
16511719
)
1652-
1653-
@dsl.pipeline(experiment_name="test_pipeline_with_parallel_function", default_compute="cpu-cluster")
1654-
def parallel_in_pipeline(job_data_path):
1655-
node1 = parallel_function(job_data_path=job_data_path)
1656-
# TODO 2104247: node1.task will be kept as a local path when submitting the pipeline job.
1657-
node1.task = None
1658-
return {
1659-
"pipeline_output": node1.outputs.job_output_path,
1660-
}
1661-
1662-
pipeline = parallel_in_pipeline(data)
1720+
pipeline = build_pipeline_with_parallel_run_function(data)
16631721

16641722
pipeline_job = client.create_or_update(pipeline) # submit pipeline job
16651723

@@ -1712,6 +1770,65 @@ def parallel_in_pipeline(job_data_path):
17121770
assert_job_input_output_types(pipeline_job)
17131771
assert pipeline_job.settings.default_compute == "cpu-cluster"
17141772

1773+
def test_parallel_run_function_run_settings_bind_to_literal_input(self, client: MLClient):
1774+
data = Input(
1775+
type=AssetTypes.MLTABLE,
1776+
path="./tests/test_configs/dataset/mnist-data",
1777+
mode=InputOutputModes.EVAL_MOUNT,
1778+
)
1779+
pipeline = build_pipeline_with_parallel_run_function(data, 2)
1780+
1781+
pipeline_job = client.create_or_update(pipeline) # submit pipeline job
1782+
1783+
actual_job = omit_with_wildcard(pipeline_job._to_rest_object().properties.as_dict(), *common_omit_fields)
1784+
expected_job = {
1785+
"tags": {},
1786+
"is_archived": False,
1787+
"job_type": "Pipeline",
1788+
"inputs": {
1789+
"job_data_path": {"job_input_type": "mltable", "mode": "EvalMount"},
1790+
"literal_input": {"job_input_type": "literal", "value": "2"},
1791+
},
1792+
"jobs": {
1793+
"node1": {
1794+
"input_data": "${{inputs.job_data_path}}",
1795+
"display_name": "my-evaluate-job",
1796+
"inputs": {
1797+
"job_data_path": {
1798+
"job_input_type": "literal",
1799+
"value": "${{parent.inputs.job_data_path}}",
1800+
}
1801+
},
1802+
"name": "node1",
1803+
"mini_batch_size": 5,
1804+
"logging_level": "DEBUG",
1805+
"max_concurrency_per_instance": "${{parent.inputs.literal_input}}",
1806+
"error_threshold": "${{parent.inputs.literal_input}}",
1807+
"mini_batch_error_threshold": "${{parent.inputs.literal_input}}",
1808+
"outputs": {"job_output_path": {"type": "literal", "value": "${{parent.outputs.pipeline_output}}"}},
1809+
"resources": {"instance_count": "${{parent.inputs.literal_input}}"},
1810+
"type": "parallel",
1811+
},
1812+
},
1813+
"outputs": {
1814+
"pipeline_output": {
1815+
"mode": "ReadWriteMount",
1816+
"job_output_type": "uri_folder",
1817+
}
1818+
},
1819+
"settings": {"default_compute": "cpu-cluster"},
1820+
}
1821+
assert expected_job == actual_job
1822+
# check required fields in job dict
1823+
job_dict = pipeline_job._to_dict()
1824+
expected_keys = ["status", "properties", "creation_context"]
1825+
for k in expected_keys:
1826+
assert k in job_dict.keys(), f"failed to get {k} in {job_dict}"
1827+
1828+
# original job did not change
1829+
assert_job_input_output_types(pipeline_job)
1830+
assert pipeline_job.settings.default_compute == "cpu-cluster"
1831+
17151832
def test_parallel_job(self, randstr: Callable[[str], str], client: MLClient):
17161833
environment = "AzureML-sklearn-1.0-ubuntu20.04-py38-cpu:33"
17171834
inputs = {

sdk/ml/azure-ai-ml/tests/pipeline_job/e2etests/test_pipeline_job.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -541,6 +541,58 @@ def test_pipeline_job_with_parallel_job(
541541
# assert on the number of converted jobs to make sure we didn't drop the parallel job
542542
assert len(created_job.jobs.items()) == 1
543543

544+
@pytest.mark.parametrize(
545+
"pipeline_job_path",
546+
[
547+
"file_component_literal_input_e2e.yml",
548+
],
549+
)
550+
def test_pipeline_job_with_parallel_component_job_bind_to_literal_input(
551+
self, client: MLClient, randstr: Callable[[str], str], pipeline_job_path: str
552+
) -> None:
553+
base_file_name = "./tests/test_configs/pipeline_jobs/helloworld_pipeline_job_defaults_with_parallel_job_"
554+
params_override = [{"name": randstr("name")}]
555+
pipeline_job = load_job(
556+
source=base_file_name + pipeline_job_path,
557+
params_override=params_override,
558+
)
559+
created_job = client.jobs.create_or_update(pipeline_job)
560+
561+
for job in created_job.jobs.values():
562+
# The parallel job must be translated to component job in the pipeline job.
563+
assert isinstance(job, Parallel)
564+
565+
# assert on the number of converted jobs to make sure we didn't drop the parallel job
566+
assert len(created_job.jobs.items()) == 1
567+
568+
@pytest.mark.skip(
569+
reason="The task for fixing this is tracked by "
570+
"https://msdata.visualstudio.com/Vienna/_workitems/edit/2298433"
571+
)
572+
@pytest.mark.parametrize(
573+
"pipeline_job_path",
574+
[
575+
"file_literal_input_e2e.yml",
576+
],
577+
)
578+
def test_pipeline_job_with_inline_parallel_job_bind_to_literal_input(
579+
self, client: MLClient, randstr: Callable[[str], str], pipeline_job_path: str
580+
) -> None:
581+
base_file_name = "./tests/test_configs/pipeline_jobs/helloworld_pipeline_job_defaults_with_parallel_job_"
582+
params_override = [{"name": randstr("name")}]
583+
pipeline_job = load_job(
584+
source=base_file_name + pipeline_job_path,
585+
params_override=params_override,
586+
)
587+
created_job = client.jobs.create_or_update(pipeline_job)
588+
589+
for job in created_job.jobs.values():
590+
# The parallel job must be translated to component job in the pipeline job.
591+
assert isinstance(job, Parallel)
592+
593+
# assert on the number of converted jobs to make sure we didn't drop the parallel job
594+
assert len(created_job.jobs.items()) == 1
595+
544596
def test_pipeline_job_with_multiple_parallel_job(self, client: MLClient, randstr: Callable[[str], str]) -> None:
545597
params_override = [{"name": randstr("name")}]
546598
pipeline_job = load_job(

0 commit comments

Comments
 (0)