Skip to content

Commit 3fa3e1c

Browse files
YingChen1996Ying Chen
andauthored
[ML] [Pipelines] Make optional input has consistent behavior in @pipeline (Azure#27904)
* raise error when unprovided pipeline required input without default value binding to optional/required input * update test * update * revert * update test * update recording * update 1.5 scope component code to avoid recording error * revert test code * update test json * update test json * update test json * update test json * update test json Co-authored-by: Ying Chen <2601502859@qq.com>
1 parent 8e0101d commit 3fa3e1c

File tree

94 files changed

+14509
-11922
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

94 files changed

+14509
-11922
lines changed

sdk/ml/azure-ai-ml/azure/ai/ml/entities/_inputs_outputs/input.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -446,8 +446,8 @@ def _normalize_self_properties(self):
446446
origin_value = getattr(self, key)
447447
new_value = self._simple_parse(origin_value)
448448
setattr(self, key, new_value)
449-
self.optional = self._simple_parse(getattr(self, "optional", "false"), _type="boolean")
450-
self.optional = True if self.optional is True else None
449+
if self.optional:
450+
self.optional = self._simple_parse(getattr(self, "optional", "false"), _type="boolean")
451451

452452
@classmethod
453453
def _get_input_by_type(cls, t: type, optional=None):

sdk/ml/azure-ai-ml/tests/component/e2etests/test_component.py

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
from azure.core.exceptions import HttpResponseError, ResourceNotFoundError
2424
from azure.core.paging import ItemPaged
2525

26-
from test_utilities.utils import assert_job_cancel, sleep_if_live
26+
from test_utilities.utils import assert_job_cancel, sleep_if_live, omit_with_wildcard
2727
from .._util import _COMPONENT_TIMEOUT_SECOND
2828
from ..unittests.test_component_schema import load_component_entity_from_rest_json
2929

@@ -146,7 +146,7 @@ def test_command_component(self, client: MLClient, randstr: Callable[[str], str]
146146
"optional": True,
147147
"type": "number",
148148
},
149-
"component_in_path": {"description": "A path", "type": "uri_folder"},
149+
"component_in_path": {"description": "A path", "type": "uri_folder", 'optional': False},
150150
},
151151
"is_deterministic": True,
152152
"outputs": {"component_out_path": {"type": "uri_folder"}},
@@ -172,9 +172,11 @@ def test_parallel_component(self, client: MLClient, randstr: Callable[[str], str
172172
"error_threshold": 10,
173173
"input_data": "${{inputs.score_input}}",
174174
"inputs": {
175-
"label": {"description": "Other reference data for batch scoring, " "e.g. labels.", "type": "uri_file"},
176-
"score_input": {"description": "The data to be split and scored in " "parallel.", "type": "mltable"},
177-
"score_model": {"description": "The model for batch score.", "type": "custom_model"},
175+
"label": {"description": "Other reference data for batch scoring, " "e.g. labels.",
176+
"type": "uri_file", 'optional': False},
177+
"score_input": {"description": "The data to be split and scored in " "parallel.", "type": "mltable",
178+
'optional': False},
179+
"score_model": {"description": "The model for batch score.", "type": "custom_model", 'optional': False},
178180
},
179181
"is_deterministic": True,
180182
"max_concurrency_per_instance": 12,
@@ -243,7 +245,7 @@ def test_spark_component(self, client: MLClient, randstr: Callable[[], str]) ->
243245
'description': 'Aml Spark dataset test module',
244246
'display_name': 'Aml Spark dataset test module',
245247
'entry': {'file': 'kmeans_example.py'},
246-
'inputs': {'file_input': {'type': 'uri_file'}},
248+
'inputs': {'file_input': {'type': 'uri_file', 'optional': False}},
247249
'is_deterministic': True,
248250
'outputs': {'output': {'type': 'uri_folder'}},
249251
'type': 'spark',
@@ -283,10 +285,10 @@ def test_command_component_create_input_output_types(
283285
component_entity._creation_context = None
284286
assert target_entity.id
285287
# server side will remove \n from the code now. Skip them given it's not targeted to check in this test
286-
omit_fields = ["id", "command", "environment"]
287-
assert pydash.omit(component_entity._to_dict(), *omit_fields) == pydash.omit(
288-
target_entity._to_dict(), *omit_fields
289-
)
288+
# server side will return optional False for optional None input
289+
omit_fields = ["id", "command", "environment", "inputs.*.optional"]
290+
assert omit_with_wildcard(component_entity._to_dict(), *omit_fields) == \
291+
omit_with_wildcard(target_entity._to_dict(), *omit_fields)
290292

291293
def test_command_component_with_code(self, client: MLClient, randstr: Callable[[str], str]) -> None:
292294
component_name = randstr("component_name")
@@ -340,6 +342,7 @@ def test_component_update(self, client: MLClient, randstr: Callable[[str], str])
340342
"creation_context",
341343
"resources",
342344
"id",
345+
"inputs.component_in_path.optional" # backend will return component inputs as optional:False
343346
)
344347
expected_dict = pydash.omit(
345348
dict(target_entity._to_dict()),
@@ -746,15 +749,15 @@ def test_simple_pipeline_component_create(self, client: MLClient, randstr: Calla
746749
"display_name": "Hello World Pipeline Component",
747750
"is_deterministic": False,
748751
"inputs": {
749-
"component_in_path": {"type": "uri_folder", "description": "A path"},
752+
"component_in_path": {"type": "uri_folder", "description": "A path", "optional": False},
750753
"component_in_number": {
751754
"type": "number",
752755
"optional": True,
753756
"default": "10.99",
754757
"description": "A number",
755758
},
756759
# The azureml: prefix has been resolve and removed by service
757-
"node_compute": {"type": "string", "default": "cpu-cluster"},
760+
"node_compute": {"type": "string", "default": "cpu-cluster", "optional": False},
758761
},
759762
"type": "pipeline",
760763
}
@@ -784,7 +787,8 @@ def test_helloworld_nested_pipeline_component(self, client: MLClient, randstr: C
784787
"display_name": "Hello World Pipeline Component",
785788
"is_deterministic": False,
786789
"inputs": {
787-
"component_in_path": {"type": "uri_folder", "description": "A path for pipeline component"},
790+
"component_in_path": {"type": "uri_folder", "description": "A path for pipeline component",
791+
"optional": False},
788792
"component_in_number": {
789793
"type": "number",
790794
"optional": True,

sdk/ml/azure-ai-ml/tests/dsl/unittests/test_dsl_pipeline.py

Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2346,3 +2346,214 @@ def outer_pipeline():
23462346

23472347
validate_result = pipeline_job._validate()
23482348
assert validate_result.passed
2349+
2350+
def test_dsl_pipeline_with_unprovided_pipeline_optional_input(self, client: MLClient) -> None:
2351+
component_func = load_component(source=str(components_dir / "default_optional_component.yml"))
2352+
2353+
# optional pipeline input binding to optional node input
2354+
@dsl.pipeline()
2355+
def pipeline_func(optional_input: Input(optional=True, type="uri_file")):
2356+
component_func(
2357+
required_input=Input(type="uri_file", path="https://dprepdata.blob.core.windows.net/demo/Titanic.csv"),
2358+
required_param="def",
2359+
optional_input=optional_input,
2360+
)
2361+
2362+
pipeline_job = pipeline_func()
2363+
pipeline_job.settings.default_compute = "cpu-cluster"
2364+
validate_result = pipeline_job._validate()
2365+
assert validate_result.error_messages == {}
2366+
2367+
# optional pipeline parameter binding to optional node parameter
2368+
@dsl.pipeline()
2369+
def pipeline_func(optional_param: Input(optional=True, type="string"),
2370+
optional_param_duplicate: Input(optional=True, type="string")):
2371+
component_func(
2372+
required_input=Input(type="uri_file", path="https://dprepdata.blob.core.windows.net/demo/Titanic.csv"),
2373+
required_param="def",
2374+
optional_param=optional_param,
2375+
optional_param_with_default=optional_param_duplicate,
2376+
)
2377+
2378+
pipeline_job = pipeline_func()
2379+
pipeline_job.settings.default_compute = "cpu-cluster"
2380+
validate_result = pipeline_job._validate()
2381+
assert validate_result.error_messages == {}
2382+
2383+
def test_dsl_pipeline_with_unprovided_pipeline_required_input(self, client: MLClient) -> None:
2384+
component_func = load_component(source=str(components_dir / "default_optional_component.yml"))
2385+
2386+
# required pipeline input binding to optional node input
2387+
@dsl.pipeline()
2388+
def pipeline_func(required_input: Input(optional=False, type="uri_file")):
2389+
component_func(
2390+
required_input=Input(type="uri_file", path="https://dprepdata.blob.core.windows.net/demo/Titanic.csv"),
2391+
required_param="def",
2392+
optional_input=required_input,
2393+
)
2394+
2395+
pipeline_job = pipeline_func()
2396+
pipeline_job.settings.default_compute = "cpu-cluster"
2397+
validate_result = pipeline_job._validate()
2398+
assert validate_result.error_messages == {
2399+
'inputs.required_input': "Required input 'required_input' for pipeline "
2400+
"'pipeline_func' not provided."
2401+
}
2402+
2403+
# required pipeline parameter binding to optional node parameter
2404+
@dsl.pipeline()
2405+
def pipeline_func(required_param: Input(optional=False, type="string"),
2406+
required_param_duplicate: Input(optional=False, type="string")):
2407+
component_func(
2408+
required_input=Input(type="uri_file", path="https://dprepdata.blob.core.windows.net/demo/Titanic.csv"),
2409+
required_param="def",
2410+
optional_param=required_param,
2411+
optional_param_with_default=required_param_duplicate,
2412+
)
2413+
2414+
pipeline_job = pipeline_func()
2415+
pipeline_job.settings.default_compute = "cpu-cluster"
2416+
validate_result = pipeline_job._validate()
2417+
assert validate_result.error_messages == {
2418+
'inputs.required_param': "Required input 'required_param' for pipeline "
2419+
"'pipeline_func' not provided.",
2420+
'inputs.required_param_duplicate': "Required input 'required_param_duplicate' for pipeline "
2421+
"'pipeline_func' not provided."
2422+
}
2423+
2424+
# required pipeline parameter with default value binding to optional node parameter
2425+
@dsl.pipeline()
2426+
def pipeline_func(required_param: Input(optional=False, type="string", default="pipeline_required_param")):
2427+
component_func(
2428+
required_input=Input(type="uri_file", path="https://dprepdata.blob.core.windows.net/demo/Titanic.csv"),
2429+
required_param=required_param,
2430+
optional_param=required_param,
2431+
optional_param_with_default=required_param,
2432+
)
2433+
2434+
pipeline_job = pipeline_func()
2435+
pipeline_job.settings.default_compute = "cpu-cluster"
2436+
validate_result = pipeline_job._validate()
2437+
assert validate_result.error_messages == {}
2438+
2439+
def test_dsl_pipeline_with_pipeline_component_unprovided_pipeline_optional_input(self, client: MLClient) -> None:
2440+
component_func = load_component(source=str(components_dir / "default_optional_component.yml"))
2441+
2442+
# optional pipeline input binding to optional node input
2443+
@dsl.pipeline()
2444+
def subgraph_pipeline(optional_input: Input(optional=True, type="uri_file")):
2445+
component_func(
2446+
required_input=Input(type="uri_file", path="https://dprepdata.blob.core.windows.net/demo/Titanic.csv"),
2447+
required_param="def",
2448+
optional_input=optional_input,
2449+
)
2450+
2451+
@dsl.pipeline()
2452+
def root_pipeline():
2453+
subgraph_node = subgraph_pipeline(
2454+
)
2455+
2456+
pipeline_job = root_pipeline()
2457+
pipeline_job.settings.default_compute = "cpu-cluster"
2458+
validate_result = pipeline_job._validate()
2459+
assert validate_result.error_messages == {}
2460+
2461+
# optional pipeline parameter binding to optional node parameter
2462+
@dsl.pipeline()
2463+
def subgraph_pipeline(optional_parameter: Input(optional=True, type="string"),
2464+
optional_parameter_duplicate: Input(optional=True, type="string")):
2465+
component_func(
2466+
required_input=Input(type="uri_file", path="https://dprepdata.blob.core.windows.net/demo/Titanic.csv"),
2467+
required_param="def",
2468+
optional_param=optional_parameter,
2469+
optional_param_with_default=optional_parameter_duplicate,
2470+
)
2471+
2472+
@dsl.pipeline()
2473+
def root_pipeline():
2474+
subgraph_node = subgraph_pipeline(
2475+
)
2476+
2477+
pipeline_job = root_pipeline()
2478+
pipeline_job.settings.default_compute = "cpu-cluster"
2479+
validate_result = pipeline_job._validate()
2480+
assert validate_result.error_messages == {}
2481+
2482+
def test_dsl_pipeline_with_pipeline_component_unprovided_pipeline_required_input(self, client: MLClient) -> None:
2483+
component_func = load_component(source=str(components_dir / "default_optional_component.yml"))
2484+
2485+
# required pipeline input binding to optional node input
2486+
@dsl.pipeline()
2487+
def subgraph_pipeline(required_input: Input(optional=False, type="uri_file")):
2488+
component_func(
2489+
required_input=Input(type="uri_file", path="https://dprepdata.blob.core.windows.net/demo/Titanic.csv"),
2490+
required_param="def",
2491+
optional_input=required_input
2492+
)
2493+
2494+
@dsl.pipeline()
2495+
def root_pipeline():
2496+
subgraph_node = subgraph_pipeline(
2497+
)
2498+
2499+
pipeline_job = root_pipeline()
2500+
pipeline_job.settings.default_compute = "cpu-cluster"
2501+
validate_result = pipeline_job._validate()
2502+
assert validate_result.error_messages == {
2503+
'jobs.subgraph_node.inputs.required_input': "Required input 'required_input' for component 'subgraph_node'"
2504+
" not provided."
2505+
}
2506+
2507+
@dsl.pipeline()
2508+
def root_pipeline(required_input: Input(optional=False, type="uri_file")):
2509+
subgraph_node = subgraph_pipeline(
2510+
required_input=required_input
2511+
)
2512+
2513+
pipeline_job = root_pipeline()
2514+
pipeline_job.settings.default_compute = "cpu-cluster"
2515+
validate_result = pipeline_job._validate()
2516+
assert validate_result.error_messages == {
2517+
'inputs.required_input': "Required input 'required_input' for pipeline 'root_pipeline' not provided."
2518+
}
2519+
2520+
# required pipeline parameter binding to optional node parameter
2521+
@dsl.pipeline()
2522+
def subgraph_pipeline(required_parameter: Input(optional=False, type="string")):
2523+
component_func(
2524+
required_input=Input(type="uri_file", path="https://dprepdata.blob.core.windows.net/demo/Titanic.csv"),
2525+
required_param="def",
2526+
optional_param=required_parameter
2527+
)
2528+
2529+
@dsl.pipeline()
2530+
def root_pipeline():
2531+
subgraph_node = subgraph_pipeline(
2532+
)
2533+
2534+
pipeline_job = root_pipeline()
2535+
pipeline_job.settings.default_compute = "cpu-cluster"
2536+
validate_result = pipeline_job._validate()
2537+
assert validate_result.error_messages == {
2538+
'jobs.subgraph_node.inputs.required_parameter': "Required input 'required_parameter' for component "
2539+
"'subgraph_node' not provided."
2540+
}
2541+
2542+
# required pipeline parameter with default value binding to optional node parameter
2543+
@dsl.pipeline()
2544+
def subgraph_pipeline(required_parameter: Input(optional=False, type="string", default="subgraph_pipeline")):
2545+
component_func(
2546+
required_input=Input(type="uri_file", path="https://dprepdata.blob.core.windows.net/demo/Titanic.csv"),
2547+
required_param="def",
2548+
optional_param=required_parameter
2549+
)
2550+
2551+
@dsl.pipeline()
2552+
def root_pipeline():
2553+
subgraph_node = subgraph_pipeline(
2554+
)
2555+
2556+
pipeline_job = root_pipeline()
2557+
pipeline_job.settings.default_compute = "cpu-cluster"
2558+
validate_result = pipeline_job._validate()
2559+
assert validate_result.error_messages == {}

sdk/ml/azure-ai-ml/tests/internal/unittests/test_component.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ def test_load_from_registered_internal_command_component_rest_obj(self):
7676
"display_name": "0.0.1",
7777
"is_deterministic": True,
7878
"successful_return_code": "Zero",
79-
"inputs": {"train_data": {"type": "path"}},
79+
"inputs": {"train_data": {"type": "path", "optional": False}},
8080
"outputs": {"output_dir": {"type": "path", "datastore_mode": "Upload"}},
8181
"command": "sh ls.sh {inputs.input_dir} {inputs.file_name} {outputs.output_dir}",
8282
"environment": {"name": "AzureML-Minimal", "version": "45", "os": "Linux"},
@@ -91,7 +91,7 @@ def test_load_from_registered_internal_command_component_rest_obj(self):
9191
"display_name": "0.0.1",
9292
"is_deterministic": True,
9393
"successful_return_code": "Zero",
94-
"inputs": {"train_data": {"type": "path"}}, # optional will be drop if False
94+
"inputs": {"train_data": {"type": "path", "optional": False}}, # optional will be drop if False
9595
"outputs": {"output_dir": {"type": "path", "datastore_mode": "Upload"}},
9696
"command": "sh ls.sh {inputs.input_dir} {inputs.file_name} {outputs.output_dir}",
9797
"environment": {"name": "AzureML-Minimal", "version": "45", "os": "Linux"},
@@ -136,10 +136,12 @@ def test_load_from_registered_internal_scope_component_rest_obj(self):
136136
"inputs": {
137137
"TextData": {
138138
"type": "AnyFile",
139+
'optional': False,
139140
"description": "relative path on ADLS storage",
140141
},
141142
"ExtractionClause": {
142143
"type": "string",
144+
'optional': False,
143145
"description": 'the extraction clause,something like "column1:string, column2:int"',
144146
},
145147
},
@@ -161,12 +163,12 @@ def test_load_from_registered_internal_scope_component_rest_obj(self):
161163
"inputs": {
162164
"TextData": {
163165
"type": "AnyFile",
164-
# "optional": False, # expected. optional will be dropped if it's False
166+
"optional": False,
165167
"description": "relative path on ADLS storage",
166168
},
167169
"ExtractionClause": {
168170
"type": "string",
169-
# "optional": False, # expected. optional will be dropped if it's False
171+
"optional": False,
170172
"description": 'the extraction clause,something like "column1:string, column2:int"',
171173
},
172174
},
@@ -201,9 +203,6 @@ def test_component_serialization(self, yaml_path):
201203
input_port["enum"] = list(map(lambda x: str(x), input_port["enum"]))
202204
if "default" in input_port:
203205
input_port["default"] = str(input_port["default"])
204-
# optional will be dropped if it's False
205-
if "optional" in input_port and input_port["optional"] is False:
206-
del input_port["optional"]
207206

208207
# code will be dumped as absolute path
209208
if "code" in expected_dict:

sdk/ml/azure-ai-ml/tests/internal/unittests/test_pipeline_job.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -408,10 +408,6 @@ def test_load_pipeline_job_with_internal_components_as_node(self):
408408
with open(yaml_path, encoding="utf-8") as yaml_file:
409409
yaml_dict = yaml.safe_load(yaml_file)
410410

411-
# handle some known difference in yaml_dict
412-
for _input in yaml_dict["inputs"].values():
413-
if "optional" in _input and _input["optional"] is False:
414-
del _input["optional"]
415411
yaml_dict["code"] = parse_local_path(yaml_dict["code"], scope_internal_func.base_path)
416412

417413
command_func = load_component("./tests/test_configs/components/helloworld_component.yml")

0 commit comments

Comments
 (0)