Skip to content

Commit ba3ba79

Browse files
authored
[ML][Pipelines] Test: enable pipelines registry consumption tests (Azure#28177)
* test: try to enable test_pipeline_job_create_with_registries * update recording for test_pipeline_job_create_with_registries * update test to avoid registry name get sanitized * update format * test: enable test_pipeline_job_create_with_registry_model_as_input * remove unused import * test: enable test_pipeline_job_create_with_registered_component_on_registry * update skipif location * update tests * add yield in mock_component_hash
1 parent 361036e commit ba3ba79

13 files changed

+70
-4740
lines changed

sdk/ml/azure-ai-ml/tests/component/e2etests/test_component.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import re
2-
import tempfile
32
import uuid
43
from itertools import tee
54
from pathlib import Path

sdk/ml/azure-ai-ml/tests/conftest.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -570,6 +570,7 @@ def mock_component_hash(mocker: MockFixture, request: FixtureRequest):
570570
"""
571571
# do nothing if in live mode and not recording
572572
if is_live_and_not_recording():
573+
yield
573574
return
574575

575576
if is_live():
Lines changed: 51 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -1,134 +1,97 @@
11
from pathlib import Path
22

33
import pytest
4-
from devtools_testutils import AzureRecordedTestCase
5-
from test_utilities.utils import _PYTEST_TIMEOUT_METHOD, assert_job_cancel
4+
from azure.core.exceptions import ResourceNotFoundError
65

76
from azure.ai.ml import Input, MLClient, load_component, load_model
87
from azure.ai.ml.constants import AssetTypes
98
from azure.ai.ml.dsl import pipeline
10-
from azure.core.exceptions import HttpResponseError, ResourceNotFoundError
11-
9+
from devtools_testutils import AzureRecordedTestCase, is_live
10+
from test_utilities.utils import _PYTEST_TIMEOUT_METHOD, assert_job_cancel
1211
from .._util import _DSL_TIMEOUT_SECOND
1312

1413

14+
def assert_pipeline_job_cancel(client: MLClient, score_func, pipeline_model_input, pipeline_test_data):
15+
@pipeline
16+
def score_pipeline(model_input, test_data):
17+
score = score_func(model_input=model_input, test_data=test_data) # noqa: F841
18+
score_duplicate = score_func(model_input=model_input, test_data=test_data) # noqa: F841
19+
20+
pipeline_job = score_pipeline(model_input=pipeline_model_input, test_data=pipeline_test_data)
21+
pipeline_job.settings.default_compute = "cpu-cluster"
22+
assert_job_cancel(pipeline_job, client)
23+
24+
25+
@pytest.mark.skipif(condition=not is_live(), reason="registry test, may fail in playback mode")
1526
@pytest.mark.usefixtures("enable_pipeline_private_preview_features", "recorded_test")
1627
@pytest.mark.timeout(timeout=_DSL_TIMEOUT_SECOND, method=_PYTEST_TIMEOUT_METHOD)
1728
@pytest.mark.e2etest
1829
@pytest.mark.pipeline_test
1930
class TestDSLPipelineOnRegistry(AzureRecordedTestCase):
20-
@pytest.mark.skip(reason="not able to re-record")
21-
def test_pipeline_job_create_with_registered_component_on_registry(
22-
self,
23-
registry_client: MLClient,
24-
) -> None:
31+
test_data = Input(
32+
type=AssetTypes.URI_FILE,
33+
path="./tests/test_configs/pipeline_jobs/job_with_registry_model_as_input/data/sample1.csv"
34+
)
35+
36+
def test_pipeline_job_create_with_registered_component_on_registry(self, pipelines_registry_client: MLClient):
2537
local_component = load_component("./tests/test_configs/components/basic_component_code_local_path.yml")
2638
try:
27-
created_component = registry_client.components.get(local_component.name, version=local_component.version)
28-
except HttpResponseError:
29-
created_component = registry_client.components.create_or_update(local_component)
39+
created_component = pipelines_registry_client.components.get(
40+
local_component.name, version=local_component.version
41+
)
42+
except ResourceNotFoundError:
43+
created_component = pipelines_registry_client.components.create_or_update(local_component)
3044

31-
@pipeline()
45+
@pipeline
3246
def sample_pipeline():
33-
node = created_component()
34-
node.compute = "cpu-cluster"
47+
created_component()
3548

3649
pipeline_job = sample_pipeline()
37-
assert registry_client.jobs.validate(pipeline_job).passed
38-
# TODO: add test for pipeline job create with registered component on registry after support is ready on canary
39-
40-
@pytest.mark.skip(reason="request body still exits when re-record and will raise error "
41-
"'Unable to find a record for the request' in playback mode")
42-
def test_pipeline_with_local_component_and_registry_model_as_input(self, registry_client: MLClient, client: MLClient):
43-
# get dataset
44-
test_data = Input(
45-
type=AssetTypes.URI_FILE,
46-
path="./tests/test_configs/pipeline_jobs/job_with_registry_model_as_input/data/sample1.csv"
47-
)
50+
pipeline_job.settings.default_compute = "cpu-cluster"
51+
assert pipelines_registry_client.jobs.validate(pipeline_job).passed
4852

53+
# this test will break in playback mode, so include it in live test only
54+
def test_pipeline_with_local_component_and_registry_model_as_input(self, client: MLClient):
4955
# load_component
5056
score_func = load_component("./tests/test_configs/pipeline_jobs/job_with_registry_model_as_input/score.yml")
5157

5258
pipeline_score_model = Input(
53-
type='mlflow_model',
54-
path='azureml://registries/testFeed/models/iris_model/versions/1'
59+
type="custom_model", path="azureml://registries/sdk-test/models/iris_model/versions/1"
5560
)
5661

57-
@pipeline()
58-
def score_pipeline_with_registry_model(model_input, test_data):
59-
score = score_func(model_input=model_input, test_data=test_data)
60-
score_duplicate = score_func(model_input=pipeline_score_model, test_data=test_data)
61-
62-
pipeline_job = score_pipeline_with_registry_model(
63-
model_input=pipeline_score_model,
64-
test_data=test_data
65-
)
66-
pipeline_job.settings.default_compute = "cpu-cluster"
67-
assert_job_cancel(pipeline_job, client)
62+
assert_pipeline_job_cancel(client, score_func, pipeline_score_model, self.test_data)
6863

69-
@pytest.mark.skip(reason="request body still exits when re-record and will raise error "
70-
"'Unable to find a record for the request' in playback mode")
7164
def test_pipeline_with_local_component_and_registry_model_as_input_with_model_input(
72-
self,
73-
registry_client: MLClient,
74-
client: MLClient):
75-
# get dataset
76-
test_data = Input(
77-
type=AssetTypes.URI_FILE,
78-
path="./tests/test_configs/pipeline_jobs/job_with_registry_model_as_input/data/sample1.csv"
79-
)
80-
65+
self, client: MLClient, pipelines_registry_client: MLClient
66+
):
8167
# load_component
8268
score_func = load_component("./tests/test_configs/pipeline_jobs/job_with_registry_model_as_input/score.yml")
8369

8470
model_path = Path("./tests/test_configs/model/model_iris.yml")
8571
model_entity = load_model(model_path)
8672
try:
87-
pipeline_score_model = registry_client.models.get(name=model_entity.name, version=model_entity.version)
73+
pipeline_score_model = pipelines_registry_client.models.get(
74+
name=model_entity.name, version=model_entity.version
75+
)
8876
except ResourceNotFoundError:
89-
model_entity = registry_client.models.create_or_update(model_entity)
90-
pipeline_score_model = registry_client.models.get(name=model_entity.name, version=model_entity.version)
77+
model_entity = pipelines_registry_client.models.create_or_update(model_entity)
78+
pipeline_score_model = pipelines_registry_client.models.get(
79+
name=model_entity.name, version=model_entity.version
80+
)
9181

92-
@pipeline()
93-
def score_pipeline_with_registry_model(model_input, test_data):
94-
score = score_func(model_input=model_input, test_data=test_data)
95-
score_duplicate = score_func(model_input=pipeline_score_model, test_data=test_data)
96-
97-
pipeline_job = score_pipeline_with_registry_model(
98-
model_input=pipeline_score_model, test_data=test_data
99-
)
100-
pipeline_job.settings.default_compute = "cpu-cluster"
101-
assert_job_cancel(pipeline_job, client)
102-
103-
@pytest.mark.skip(reason="request body still exits when re-record and will raise error "
104-
"'Unable to find a record for the request' in playback mode")
105-
def test_pipeline_with_registry_component_and_model_as_input(self, registry_client: MLClient, client: MLClient):
106-
# get dataset
107-
test_data = Input(
108-
type=AssetTypes.URI_FILE,
109-
path="./tests/test_configs/pipeline_jobs/job_with_registry_model_as_input/data/sample1.csv"
110-
)
82+
assert_pipeline_job_cancel(client, score_func, pipeline_score_model, self.test_data)
11183

84+
def test_pipeline_with_registry_component_and_model_as_input(
85+
self, client: MLClient, pipelines_registry_client: MLClient
86+
):
11287
# load_component
113-
score_component_name = "v2_dsl_score_component"
114-
component_version = "0.0.8"
115-
score_func = registry_client.components.get(
88+
score_component_name, component_version = "score_component", "2"
89+
score_func = pipelines_registry_client.components.get(
11690
name=score_component_name, version=component_version
11791
)
11892

11993
pipeline_score_model = Input(
120-
type='mlflow_model',
121-
path='azureml://registries/testFeed/models/iris_model/versions/1'
94+
type="mlflow_model", path="azureml://registries/sdk-test/models/iris_model/versions/1"
12295
)
12396

124-
@pipeline()
125-
def score_pipeline_with_registry_model(model_input, test_data):
126-
score = score_func(model_input=model_input, test_data=test_data)
127-
score_duplicate = score_func(model_input=pipeline_score_model, test_data=test_data)
128-
129-
pipeline_job = score_pipeline_with_registry_model(
130-
model_input=pipeline_score_model,
131-
test_data=test_data
132-
)
133-
pipeline_job.settings.default_compute = "cpu-cluster"
134-
assert_job_cancel(pipeline_job, client)
97+
assert_pipeline_job_cancel(client, score_func, pipeline_score_model, self.test_data)

sdk/ml/azure-ai-ml/tests/pipeline_job/e2etests/test_pipeline_job.py

Lines changed: 12 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -76,26 +76,22 @@ def test_pipeline_job_create(
7676
assert new_tag_name in updated_job.tags
7777
assert updated_job.tags[new_tag_name] == new_tag_value
7878

79-
@pytest.mark.skip("skip as registries not work in canary region for now")
79+
@pytest.mark.skipif(condition=not is_live(), reason="registry test, may fail in playback mode")
8080
def test_pipeline_job_create_with_registries(
81-
self,
82-
client: MLClient,
83-
randstr: Callable[[str], str],
81+
self, client: MLClient, randstr: Callable[[str], str]
8482
) -> None:
8583
params_override = [{"name": randstr("name")}]
8684
pipeline_job = load_job(
8785
source="./tests/test_configs/pipeline_jobs/hello_pipeline_job_with_registries.yml",
8886
params_override=params_override,
8987
)
90-
assert (
91-
pipeline_job.jobs.get("a").environment
92-
== "azureml://registries/testFeed/environments/sklearn-10-ubuntu2004-py38-cpu/versions/19.dev6"
93-
)
94-
job = client.jobs.create_or_update(pipeline_job)
88+
# registry sdk-test may be sanitized as other name, so use two assertions to avoid this issue
89+
assert str(pipeline_job.jobs["a"].environment).startswith("azureml://registries/")
90+
assert str(pipeline_job.jobs["a"].environment).endswith("/environments/openMPIUbuntu/versions/1")
91+
job = assert_job_cancel(pipeline_job, client)
9592
assert job.name == params_override[0]["name"]
96-
assert (
97-
job.jobs.get("a").component == "azureml://registries/testFeed/components/my_hello_world_asset_2/versions/1"
98-
)
93+
assert str(job.jobs["a"].component).startswith("azureml://registries/")
94+
assert str(job.jobs["a"].component).endswith("/components/hello_world_asset/versions/1")
9995

10096
@pytest.mark.parametrize(
10197
"pipeline_job_path",
@@ -980,6 +976,7 @@ def test_pipeline_job_with_automl_image_multiclass_classification(
980976
"sweep": {
981977
"sampling_algorithm": "random",
982978
"early_termination": {
979+
"evaluation_interval": 10,
983980
"evaluation_interval": 10,
984981
"delay_evaluation": 0,
985982
"type": "bandit",
@@ -1324,22 +1321,14 @@ def test_remote_pipeline_component_job(self, client: MLClient, randstr: Callable
13241321
# assert pipeline_dict["outputs"] == {"output_path": {"mode": "ReadWriteMount", "job_output_type": "uri_folder"}}
13251322
assert pipeline_dict["settings"] == {"default_compute": "cpu-cluster", "_source": "REMOTE.WORKSPACE.COMPONENT"}
13261323

1327-
@pytest.mark.skip(
1328-
reason="request body still exits when re-record and will raise error "
1329-
"'Unable to find a record for the request' in playback mode"
1330-
)
1331-
def test_pipeline_job_create_with_registry_model_as_input(
1332-
self,
1333-
client: MLClient,
1334-
registry_client: MLClient,
1335-
randstr: Callable[[str], str],
1336-
) -> None:
1324+
@pytest.mark.skipif(condition=not is_live(), reason="registry test, may fail in playback mode")
1325+
def test_pipeline_job_create_with_registry_model_as_input(self, client: MLClient, randstr: Callable[[str], str]):
13371326
params_override = [{"name": randstr("name")}]
13381327
pipeline_job = load_job(
13391328
source="./tests/test_configs/pipeline_jobs/job_with_registry_model_as_input/pipeline.yml",
13401329
params_override=params_override,
13411330
)
1342-
job = client.jobs.create_or_update(pipeline_job)
1331+
job = assert_job_cancel(pipeline_job, client)
13431332
assert job.name == params_override[0]["name"]
13441333

13451334
def test_pipeline_node_with_default_component(self, client: MLClient, randstr: Callable[[str], str]):

0 commit comments

Comments
 (0)