Skip to content

Commit d2092ee

Browse files
authored
[ML][Pipelines] Test: migrate do-while tests (Azure#29593)
* update enable_private_preview_schema_features for pipeline job schema/entity reload * fix: support input list in do-while mapping * test: migrate some skip tests in mldesigner * test: add primitive test and recordings * test: fix automl in pipeline serverless test * test: fix wrong registry component * pylint: remove unused import * test: re-org tests & add missing recordings * reformat with black * fix: consider internal flag for reload schema
1 parent c5954d4 commit d2092ee

18 files changed

+5206
-14
lines changed

sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_do_while.py

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -68,19 +68,24 @@ def _infer_and_update_body_input_from_mapping():
6868
# if loop body output type is not specified, skip as we have no place to infer
6969
if body.outputs[output_name].type is None:
7070
continue
71-
# if input type is specified, no need to infer and skip
72-
if body_input.type is not None:
73-
continue
74-
inferred_type = body.outputs[output_name].type
75-
# update node input
76-
body_input._meta._is_inferred_optional = True
77-
body_input.type = inferred_type
78-
# update node corresponding component input
79-
input_name = body_input._meta.name
80-
body.component.inputs[input_name]._is_inferred_optional = True
81-
body.component.inputs[input_name].type = inferred_type
71+
# body input can be a list of inputs, normalize as a list to process
72+
if not isinstance(body_input, list):
73+
body_input = [body_input]
74+
for single_input in body_input:
75+
# if input type is specified, no need to infer and skip
76+
if single_input.type is not None:
77+
continue
78+
inferred_type = body.outputs[output_name].type
79+
# update node input
80+
single_input._meta._is_inferred_optional = True
81+
single_input.type = inferred_type
82+
# update node corresponding component input
83+
input_name = single_input._meta.name
84+
body.component.inputs[input_name]._is_inferred_optional = True
85+
body.component.inputs[input_name].type = inferred_type
8286

83-
# infer and update for dynamic input
84-
_infer_and_update_body_input_from_mapping()
87+
# when mapping is a dictionary, infer and update for dynamic input
88+
if isinstance(mapping, dict):
89+
_infer_and_update_body_input_from_mapping()
8590

8691
return do_while_node

sdk/ml/azure-ai-ml/tests/conftest.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -780,23 +780,33 @@ def enable_pipeline_private_preview_features(mocker: MockFixture):
780780
@pytest.fixture()
781781
def enable_private_preview_schema_features():
782782
"""Schemas will be imported at the very beginning, so need to reload related classes."""
783+
from azure.ai.ml._internal._setup import _registered, enable_internal_components_in_pipeline
783784
from azure.ai.ml._schema.component import command_component as command_component_schema
784785
from azure.ai.ml._schema.component import component as component_schema
785786
from azure.ai.ml._schema.component import input_output
786787
from azure.ai.ml._schema.pipeline import pipeline_component as pipeline_component_schema
788+
from azure.ai.ml._schema.pipeline import pipeline_job as pipeline_job_schema
787789
from azure.ai.ml.entities._component import command_component as command_component_entity
788790
from azure.ai.ml.entities._component import pipeline_component as pipeline_component_entity
791+
from azure.ai.ml.entities._job.pipeline import pipeline_job as pipeline_job_entity
789792

790793
def _reload_related_classes():
791794
reload(component_schema)
792795
reload(input_output)
793796
reload(command_component_schema)
794797
reload(pipeline_component_schema)
798+
reload(pipeline_job_schema)
795799

796800
command_component_entity.CommandComponentSchema = command_component_schema.CommandComponentSchema
797801
pipeline_component_entity.PipelineComponentSchema = pipeline_component_schema.PipelineComponentSchema
802+
pipeline_job_entity.PipelineJobSchema = pipeline_job_schema.PipelineJobSchema
803+
804+
# check internal flag after reload, force register if it is set as True
805+
if _registered:
806+
enable_internal_components_in_pipeline(force=True)
798807

799808
with patch.dict(os.environ, {AZUREML_PRIVATE_FEATURES_ENV_VAR: "True"}):
809+
800810
_reload_related_classes()
801811
yield
802812
_reload_related_classes()

sdk/ml/azure-ai-ml/tests/dsl/e2etests/test_controlflow_pipeline.py

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from pathlib import Path
2+
13
import pytest
24
from azure.ai.ml.dsl._group_decorator import group
35
from devtools_testutils import AzureRecordedTestCase, is_live
@@ -395,6 +397,149 @@ def condition_pipeline():
395397
}
396398

397399

400+
class TestDoWhilePipeline(TestControlFlowPipeline):
401+
@property
402+
def _basic_component_func(self):
403+
return load_component("./tests/test_configs/dsl_pipeline/do_while/basic_component/component.yml")
404+
405+
def test_do_while_pipeline(self, client: MLClient):
406+
@pipeline
407+
def do_while_body_pipeline_component(
408+
component_in_number: Input(type="integer", optional=True),
409+
component_in_path: Input(type="uri_folder"),
410+
):
411+
# Call component obj as function: apply given inputs & parameters to create a node in pipeline
412+
basic_component = self._basic_component_func(
413+
component_in_number=component_in_number,
414+
component_in_path=component_in_path,
415+
)
416+
return basic_component.outputs
417+
418+
@pipeline
419+
def pipeline_with_do_while(
420+
component_in_number: Input(type="integer"),
421+
component_in_path: Input(type="uri_folder"),
422+
):
423+
# Do while node with pipeline component
424+
do_while_body_pipeline = do_while_body_pipeline_component(
425+
component_in_number=component_in_number,
426+
component_in_path=component_in_path,
427+
)
428+
do_while_with_pipeline = do_while( # noqa: F841
429+
body=do_while_body_pipeline,
430+
condition=do_while_body_pipeline.outputs.is_number_larger_than_zero,
431+
mapping={
432+
do_while_body_pipeline.outputs.output_in_path: do_while_body_pipeline.inputs.component_in_path,
433+
},
434+
max_iteration_count=5,
435+
)
436+
437+
command_component = self._basic_component_func(
438+
component_in_number=component_in_number,
439+
component_in_path=component_in_path,
440+
)
441+
do_while_with_command_component = do_while( # noqa: F841
442+
body=command_component,
443+
condition=command_component.outputs.is_number_larger_than_zero,
444+
mapping={
445+
"output_in_number": command_component.inputs.component_in_number,
446+
command_component.outputs.output_in_path: command_component.inputs.component_in_path,
447+
},
448+
max_iteration_count=5,
449+
)
450+
451+
# Use the outputs of do_while node
452+
basic_component = do_while_body_pipeline_component(
453+
component_in_number=None,
454+
component_in_path=command_component.outputs.output_in_path,
455+
)
456+
return {"output_in_path": basic_component.outputs.output_in_path}
457+
458+
pipeline_job = pipeline_with_do_while(
459+
component_in_number=2,
460+
component_in_path=Input(type="uri_folder", path=str(Path(__file__).parent)),
461+
)
462+
# set pipeline level compute
463+
pipeline_job.settings.default_compute = "cpu-cluster"
464+
465+
assert_job_cancel(pipeline_job, client)
466+
467+
def test_do_while_pipeline_with_primitive_inputs(self, client: MLClient):
468+
@pipeline
469+
def do_while_body_pipeline_component(
470+
component_in_number: Input(type="integer"),
471+
component_in_number_1: Input(type="integer"),
472+
component_in_path: Input(type="uri_folder"),
473+
):
474+
"""E2E dummy train-score-eval pipeline with components defined via yaml."""
475+
# Call component obj as function: apply given inputs & parameters to create a node in pipeline
476+
train_with_sample_data = self._basic_component_func(
477+
component_in_number=component_in_number,
478+
component_in_number_1=component_in_number_1,
479+
component_in_path=component_in_path,
480+
)
481+
return train_with_sample_data.outputs
482+
483+
@pipeline
484+
def pipeline_with_do_while(
485+
component_in_number: Input(type="integer"),
486+
component_in_path: Input(type="uri_folder"),
487+
):
488+
# Do while node with pipeline component
489+
do_while_body_pipeline = do_while_body_pipeline_component(
490+
component_in_number=component_in_number,
491+
component_in_number_1=component_in_number,
492+
component_in_path=component_in_path,
493+
)
494+
do_while_with_pipeline = do_while( # noqa: F841
495+
body=do_while_body_pipeline,
496+
condition=do_while_body_pipeline.outputs.is_number_larger_than_zero,
497+
mapping={
498+
do_while_body_pipeline.outputs.output_in_number: [
499+
do_while_body_pipeline.inputs.component_in_number,
500+
do_while_body_pipeline.inputs.component_in_number_1,
501+
],
502+
"output_in_path": do_while_body_pipeline.inputs.component_in_path,
503+
},
504+
max_iteration_count=5,
505+
)
506+
507+
command_component = self._basic_component_func(
508+
component_in_number=component_in_number,
509+
component_in_number_1=component_in_number,
510+
component_in_path=component_in_path,
511+
)
512+
do_while_with_command_component = do_while( # noqa: F841
513+
body=command_component,
514+
condition=command_component.outputs.is_number_larger_than_zero,
515+
mapping={
516+
"output_in_number": [
517+
command_component.inputs.component_in_number,
518+
command_component.inputs.component_in_number_1,
519+
],
520+
"output_in_path": command_component.inputs.component_in_path,
521+
},
522+
max_iteration_count=5,
523+
)
524+
525+
# Use the outputs of do_while node
526+
basic_component = do_while_body_pipeline_component(
527+
component_in_number=do_while_body_pipeline.outputs.output_in_number,
528+
component_in_number_1=command_component.outputs.output_in_number,
529+
component_in_path=do_while_body_pipeline.outputs.output_in_path,
530+
)
531+
return {"output_in_path": basic_component.outputs.output_in_path}
532+
533+
pipeline_job = pipeline_with_do_while(
534+
component_in_number=2,
535+
component_in_path=Input(type="uri_folder", path=str(Path(__file__).parent)),
536+
)
537+
# set pipeline level compute
538+
pipeline_job.settings.default_compute = "cpu-cluster"
539+
540+
assert_job_cancel(pipeline_job, client)
541+
542+
398543
@pytest.mark.skipif(
399544
condition=is_live(),
400545
# TODO: reopen live test when parallel_for deployed to canary

sdk/ml/azure-ai-ml/tests/dsl/e2etests/test_dsl_pipeline.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3027,3 +3027,41 @@ def my_pipeline(timeout) -> PipelineJob:
30273027
pipeline_job = assert_job_cancel(pipeline, client)
30283028
assert pipeline_job.jobs["node_0"].limits.timeout == "${{parent.inputs.timeout}}"
30293029
assert pipeline_job.jobs["node_1"].limits.timeout == 1
3030+
3031+
def test_pipeline_component_primitive_type_consumption(self, client: MLClient):
3032+
baisc_component_func = load_component(
3033+
"./tests/test_configs/dsl_pipeline/primitive_type_components/basic_component.yml"
3034+
)
3035+
boolean_func = load_component("./tests/test_configs/dsl_pipeline/primitive_type_components/boolean.yml")
3036+
integer_func = load_component("./tests/test_configs/dsl_pipeline/primitive_type_components/integer.yml")
3037+
number_func = load_component("./tests/test_configs/dsl_pipeline/primitive_type_components/number.yml")
3038+
string_func = load_component("./tests/test_configs/dsl_pipeline/primitive_type_components/string.yml")
3039+
3040+
@dsl.pipeline
3041+
def pipeline_component_func(bool_param: bool, int_param: int, float_param: float, str_param: str):
3042+
baisc_component_func(
3043+
bool_param=bool_param,
3044+
int_param=int_param,
3045+
float_param=float_param,
3046+
str_param=str_param,
3047+
)
3048+
3049+
@dsl.pipeline
3050+
def pipeline_func():
3051+
# components return primitive type outputs
3052+
bool_node = boolean_func()
3053+
int_node = integer_func()
3054+
float_node = number_func()
3055+
str_node = string_func()
3056+
# pipeline component consume above primitive type outputs
3057+
pipeline_node = pipeline_component_func( # noqa: F841
3058+
bool_param=bool_node.outputs.output,
3059+
int_param=int_node.outputs.output,
3060+
float_param=float_node.outputs.output,
3061+
str_param=str_node.outputs.output,
3062+
)
3063+
3064+
pipeline_job = pipeline_func()
3065+
pipeline_job.settings.default_compute = "cpu-cluster"
3066+
3067+
assert_job_cancel(pipeline_job, client)

0 commit comments

Comments
 (0)