Skip to content

Commit ea98670

Browse files
lllilithyangchenyangliao
andauthored
Support remaining issues in register node output (Azure#28859)
* add name and version for output * add asset_name, asset_version, merge 2022-12-01 pr * resolve comments * add test and modify related codes * refine code * fix failed test * fix comments * fix condition._port_name * fix test and some comments * revert the modification of compute * fix comments * fix pylint error, add tests for CommandJob, Spark, Parallel, Sweep * delete one test, will add in another pr * modify error message * support Output() for origin node * delete useless change * modify test, update recording * fix test_register_output_sdk * modify test_register_output_sdk_succeed, add content of another pr * run black with 22.3.0 --------- Co-authored-by: chenyangliao <chenyangliao@microsoft.com>
1 parent 27282e9 commit ea98670

9 files changed

+3363
-3063
lines changed

sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/pipeline/_io/attr_dict.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ def __getitem__(self, item) -> NodeOutput:
160160
def __setattr__(self, key: str, value: Union[Data, Output]):
161161
if isinstance(value, Output):
162162
mode = value.mode
163-
value = Output(type=value.type, path=value.path, mode=mode)
163+
value = Output(type=value.type, path=value.path, mode=mode, name=value.name, version=value.version)
164164
original_output = self.__getattr__(key) # Note that an exception will be raised if the keyword is invalid.
165165
original_output._data = original_output._build_data(value)
166166

sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/pipeline/_io/base.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -384,8 +384,8 @@ def __init__(
384384
super().__init__(meta=meta, data=data, **kwargs)
385385
self._port_name = port_name
386386
self._owner = owner
387-
self._name = data.name if isinstance(data, Output) else None
388-
self._version = data.version if isinstance(data, Output) else None
387+
self._name = self._data.name if isinstance(self._data, Output) else None
388+
self._version = self._data.version if isinstance(self._data, Output) else None
389389

390390
self._assert_name_and_version()
391391

@@ -510,8 +510,8 @@ def _to_job_output(self):
510510
path=self._data._data_binding(),
511511
mode=self.mode,
512512
is_control=is_control,
513-
name=self.name,
514-
version=self.version,
513+
name=self._data.name,
514+
version=self._data.version,
515515
description=self.description,
516516
)
517517
else:

sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/pipeline/_io/mixin.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,7 @@ def _to_rest_outputs(self) -> Dict[str, Dict]:
207207
rest_output_bindings[key].update({"version": binding["version"]})
208208

209209
def _rename_name_and_version(output_dict):
210+
# NodeOutput can only be registered with name and version, therefore we rename here
210211
if "asset_name" in output_dict.keys():
211212
output_dict["name"] = output_dict.pop("asset_name")
212213
if "asset_version" in output_dict.keys():

sdk/ml/azure-ai-ml/tests/dsl/e2etests/test_dsl_pipeline.py

Lines changed: 96 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,11 @@
5959
]
6060

6161

62+
def check_name_and_version(output, output_name, output_version):
63+
assert output.name == output_name
64+
assert output.version == output_version
65+
66+
6267
@pytest.mark.usefixtures(
6368
"enable_environment_id_arm_expansion",
6469
"enable_pipeline_private_preview_features",
@@ -2692,8 +2697,8 @@ def register_both_output():
26922697
assert pipeline_output.name == "b_output"
26932698
assert pipeline_output.version == "2"
26942699
node_output = pipeline_job.jobs["node"].outputs.component_out_path
2695-
assert node_output.name == "a_output"
2696-
assert node_output.version == "1"
2700+
assert node_output.name == None
2701+
assert node_output.version == None
26972702

26982703
def test_dsl_pipeline_with_data_transfer_copy_2urifolder(self, client: MLClient) -> None:
26992704
from test_configs.dsl_pipeline.data_transfer_job_in_pipeline.copy_data.pipeline import (
@@ -2833,6 +2838,10 @@ def register_both_output():
28332838
node_2.outputs.component_out_path.name = "n2_output"
28342839
node_2.outputs.component_out_path.version = "v1"
28352840

2841+
# register NodeOutput without version, in this case the run result can be reused
2842+
node_3 = component(component_in_path=component_input)
2843+
node_3.outputs.component_out_path.name = "n3_output"
2844+
28362845
# register NodeOutput of subgraph
28372846
sub_node = sub_pipeline()
28382847
sub_node.outputs.sub_pipeine_a_output.name = "sub_pipeline"
@@ -2844,18 +2853,90 @@ def register_both_output():
28442853
pipeline.outputs.pipeine_a_output.name = "p1_output"
28452854
pipeline.outputs.pipeine_a_output.version = "v1"
28462855
pipeline.settings.default_compute = "cpu-cluster"
2847-
pipeline_job = client.jobs.create_or_update(pipeline)
2848-
client.jobs.stream(pipeline_job.name)
2856+
pipeline_job = assert_job_cancel(pipeline, client)
28492857

2850-
def check_name_version_and_register_succeed(output, output_name, output_version):
2851-
assert output.name == output_name
2852-
assert output.version == output_version
2853-
assert client.data.get(name=output_name, version=output_version)
2858+
check_name_and_version(pipeline_job.outputs.pipeine_a_output, "p1_output", "v1")
2859+
check_name_and_version(pipeline_job.jobs["node_2"].outputs.component_out_path, "n2_output", "v1")
2860+
assert pipeline_job.jobs["node_3"].outputs.component_out_path.name == "n3_output"
2861+
check_name_and_version(pipeline_job.jobs["sub_node"].outputs.sub_pipeine_a_output, "sub_pipeline", "v1")
28542862

2855-
check_name_version_and_register_succeed(pipeline_job.outputs.pipeine_a_output, "p1_output", "v1")
2856-
check_name_version_and_register_succeed(
2857-
pipeline_job.jobs["node_2"].outputs.component_out_path, "n2_output", "v1"
2858-
)
2859-
check_name_version_and_register_succeed(
2860-
pipeline_job.jobs["sub_node"].outputs.sub_pipeine_a_output, "sub_pipeline", "v1"
2861-
)
2863+
@pytest.mark.disable_mock_code_hash
2864+
@pytest.mark.skip(reason="TODO (269646): Internal Server Error")
2865+
def test_register_output_for_pipeline_component(self, client: MLClient):
2866+
component = load_component(source="./tests/test_configs/components/helloworld_component.yml")
2867+
component_input = Input(type="uri_file", path="https://dprepdata.blob.core.windows.net/demo/Titanic.csv")
2868+
2869+
@dsl.pipeline()
2870+
def sub_pipeline():
2871+
node_1 = component(component_in_path=component_input) # test use Output to initialize subgraph.jobs.output
2872+
node_1.outputs.component_out_path = Output(name="sub_pipeline_1_output", version="v1")
2873+
2874+
node_2 = component(component_in_path=component_input) # test we can pass NodeOutput in PipelineComponent
2875+
node_2.outputs.component_out_path.name = "sub_pipeline_2_output"
2876+
node_2.outputs.component_out_path.version = "v2"
2877+
2878+
return {
2879+
"sub_node_1": node_1.outputs.component_out_path,
2880+
}
2881+
2882+
@dsl.pipeline()
2883+
def register_both_output():
2884+
subgraph = sub_pipeline()
2885+
2886+
pipeline = register_both_output()
2887+
pipeline.settings.default_compute = "cpu-cluster"
2888+
pipeline_job = assert_job_cancel(pipeline, client)
2889+
2890+
check_name_and_version(pipeline_job.jobs["subgraph"].outputs["sub_node_1"], "sub_pipeline_1_output", "v1")
2891+
2892+
subgraph_id = pipeline_job.jobs["subgraph"].component
2893+
subgraph_id = subgraph_id.split(":")
2894+
subgraph = client.components.get(name=subgraph_id[0], version=subgraph_id[1])
2895+
check_name_and_version(subgraph.jobs["node_2"].outputs["component_out_path"], "sub_pipeline_2_output", "v2")
2896+
2897+
@pytest.mark.skipif(
2898+
condition=not is_live(),
2899+
reason="TODO (2235034) x-ms-meta-name header masking fixture isn't working, so playback fails",
2900+
)
2901+
@pytest.mark.disable_mock_code_hash
2902+
# without this mark, the code would be passed with different id even when we upload the same component,
2903+
# add this mark to reuse node and further reuse pipeline
2904+
def test_register_with_output_format(self, client: MLClient):
2905+
component = load_component(source="./tests/test_configs/components/helloworld_component.yml")
2906+
component_input = Input(type="uri_file", path="https://dprepdata.blob.core.windows.net/demo/Titanic.csv")
2907+
2908+
@dsl.pipeline()
2909+
def sub_pipeline():
2910+
node = component(component_in_path=component_input)
2911+
node.outputs.component_out_path = Output(name="sub_pipeline_o_output", version="v1")
2912+
return {"sub_pipeine_a_output": node.outputs.component_out_path}
2913+
2914+
@dsl.pipeline()
2915+
def register_both_output():
2916+
# register NodeOutput which is binding to PipelineOutput
2917+
node = component(component_in_path=component_input) # binding and re-define name and version
2918+
node.outputs.component_out_path = Output(name="n1_o_output", version="1")
2919+
2920+
node_2 = component(component_in_path=component_input) # binding
2921+
node_2.outputs.component_out_path = Output(name="n2_o_output", version="2")
2922+
2923+
node_3 = component(component_in_path=component_input) # isn't binding
2924+
node_3.outputs.component_out_path = Output(name="n3_o_output", version="4")
2925+
2926+
sub_node = sub_pipeline() # test set Output for PipelineComponent
2927+
sub_node.outputs.sub_pipeine_a_output = Output(name="subgraph_o_output", version="1")
2928+
return {
2929+
"pipeine_a_output": node.outputs.component_out_path,
2930+
"pipeine_b_output": node_2.outputs.component_out_path,
2931+
}
2932+
2933+
pipeline = register_both_output()
2934+
pipeline.settings.default_compute = "cpu-cluster"
2935+
pipeline.outputs.pipeine_a_output.name = "np_output"
2936+
pipeline.outputs.pipeine_a_output.version = "1"
2937+
pipeline_job = assert_job_cancel(pipeline, client)
2938+
2939+
check_name_and_version(pipeline_job.outputs.pipeine_a_output, "np_output", "1")
2940+
check_name_and_version(pipeline_job.outputs.pipeine_b_output, "n2_o_output", "2")
2941+
check_name_and_version(pipeline_job.jobs["node_3"].outputs["component_out_path"], "n3_o_output", "4")
2942+
check_name_and_version(pipeline_job.jobs["sub_node"].outputs["sub_pipeine_a_output"], "subgraph_o_output", "1")

0 commit comments

Comments
 (0)