Skip to content

Commit ec1ddc4

Browse files
authored
[ML][Pipelines] feat: support v2 style 1p only components (Azure#30443)
* refactor: use pytest.param to shorten test name * feat: support v2 style 1p only component types
1 parent 68800ac commit ec1ddc4

File tree

91 files changed

+16323
-9214
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

91 files changed

+16323
-9214
lines changed

sdk/ml/azure-ai-ml/.pre-commit-config.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ repos:
1010
hooks:
1111
- id: interrogate
1212
types_or: [python]
13-
exclude: ^(sdk/ml/azure-ai-ml/tests/|sdk/ml/azure-ai-ml/azure/ai/ml/_restclient|sdk/ml/azure-ai-ml/setup.py)
13+
exclude: ^(sdk/ml/azure-ai-ml/tests/|sdk/ml/azure-ai-ml/scripts/|sdk/ml/azure-ai-ml/azure/ai/ml/_restclient|sdk/ml/azure-ai-ml/setup.py)
1414
# exclude defined here because exclude in pyproject.toml is not being respected
1515
- repo: https://github.com/streetsidesoftware/cspell-cli
1616
rev: v6.31.0

sdk/ml/azure-ai-ml/azure/ai/ml/_internal/_schema/component.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,12 @@ class NodeType:
3030
DATA_TRANSFER = "DataTransferComponent"
3131
DISTRIBUTED = "DistributedComponent"
3232
HDI = "HDInsightComponent"
33+
SCOPE_V2 = "scope"
34+
HDI_V2 = "hdinsight"
35+
HEMERA_V2 = "hemera"
36+
STARLITE_V2 = "starlite"
37+
AE365EXEPOOL_V2 = "ae365exepool"
38+
AETHER_BRIDGE_V2 = "aetherbridge"
3339
PARALLEL = "ParallelComponent"
3440
SCOPE = "ScopeComponent"
3541
STARLITE = "StarliteComponent"

sdk/ml/azure-ai-ml/azure/ai/ml/_internal/_setup.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,11 +70,18 @@ def enable_internal_components_in_pipeline(*, force=False):
7070

7171
# redo the registration for those with specific runsettings
7272
_register_node(NodeType.DATA_TRANSFER, DataTransfer, InternalBaseNodeSchema)
73-
_register_node(NodeType.HEMERA, Hemera, InternalBaseNodeSchema)
74-
_register_node(NodeType.STARLITE, Starlite, InternalBaseNodeSchema)
7573
_register_node(NodeType.COMMAND, Command, CommandSchema)
7674
_register_node(NodeType.DISTRIBUTED, Distributed, DistributedSchema)
77-
_register_node(NodeType.SCOPE, Scope, ScopeSchema)
7875
_register_node(NodeType.PARALLEL, Parallel, ParallelSchema)
76+
_register_node(NodeType.HEMERA, Hemera, InternalBaseNodeSchema)
77+
_register_node(NodeType.STARLITE, Starlite, InternalBaseNodeSchema)
78+
_register_node(NodeType.SCOPE, Scope, ScopeSchema)
7979
_register_node(NodeType.HDI, HDInsight, HDInsightSchema)
80+
81+
# register v2 style 1p only components
82+
_register_node(NodeType.HEMERA_V2, Hemera, InternalBaseNodeSchema)
83+
_register_node(NodeType.STARLITE_V2, Starlite, InternalBaseNodeSchema)
84+
_register_node(NodeType.SCOPE_V2, Scope, ScopeSchema)
85+
_register_node(NodeType.HDI_V2, HDInsight, HDInsightSchema)
86+
# Ae365exepool and AetherBridge have been registered to InternalBaseNode
8087
_set_registered(True)

sdk/ml/azure-ai-ml/azure/ai/ml/entities/_builders/spark.py

Lines changed: 36 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
import copy
77
import logging
8+
import re
89
from enum import Enum
910
from os import PathLike, path
1011
from pathlib import Path
@@ -466,10 +467,7 @@ def _customized_validate(self):
466467
message=SPARK_ENVIRONMENT_WARNING_MESSAGE,
467468
)
468469
result.merge_with(self._validate_entry_exist(raise_error=False))
469-
try:
470-
self._validate_fields()
471-
except ValidationException as e:
472-
result.append_error(yaml_path="*", message=str(e))
470+
result.merge_with(self._validate_fields())
473471
return result
474472

475473
def _validate_entry_exist(self, raise_error=False) -> MutableValidationResult:
@@ -508,14 +506,42 @@ def _validate_entry_exist(self, raise_error=False) -> MutableValidationResult:
508506
)
509507
return validation_result.try_raise(error_target=self._get_validation_error_target(), raise_error=raise_error)
510508

511-
def _validate_fields(self) -> None:
512-
_validate_compute_or_resources(self.compute, self.resources)
513-
_validate_input_output_mode(self.inputs, self.outputs)
514-
_validate_spark_configurations(self)
515-
self._validate_entry()
509+
def _validate_fields(self) -> MutableValidationResult:
510+
validation_result = self._create_empty_validation_result()
511+
try:
512+
_validate_compute_or_resources(self.compute, self.resources)
513+
except ValidationException as e:
514+
validation_result.append_error(message=str(e), yaml_path="resources")
515+
validation_result.append_error(message=str(e), yaml_path="compute")
516+
517+
try:
518+
_validate_input_output_mode(self.inputs, self.outputs)
519+
except ValidationException as e:
520+
msg = str(e)
521+
m = re.match(r"(Input|Output) '(\w+)'", msg)
522+
if m:
523+
io_type, io_name = m.groups()
524+
if io_type == "Input":
525+
validation_result.append_error(message=msg, yaml_path=f"inputs.{io_name}")
526+
else:
527+
validation_result.append_error(message=msg, yaml_path=f"outputs.{io_name}")
528+
529+
try:
530+
_validate_spark_configurations(self)
531+
except ValidationException as e:
532+
validation_result.append_error(message=str(e), yaml_path="conf")
533+
534+
try:
535+
self._validate_entry()
536+
except ValidationException as e:
537+
validation_result.append_error(message=str(e), yaml_path="entry")
516538

517539
if self.args:
518-
validate_inputs_for_args(self.args, self.inputs)
540+
try:
541+
validate_inputs_for_args(self.args, self.inputs)
542+
except ValidationException as e:
543+
validation_result.append_error(message=str(e), yaml_path="args")
544+
return validation_result
519545

520546
def __call__(self, *args, **kwargs) -> "Spark":
521547
"""Call Spark as a function will return a new instance each time."""

sdk/ml/azure-ai-ml/pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ verifytypes = false
88
fail-under = 35
99
verbose = 2
1010
ignore-module = true
11-
exclude = ["setup.py", "tests", "azure/ai/ml/_restclient"]
11+
exclude = ["setup.py", "tests", "azure/ai/ml/_restclient", "scripts"]
1212

1313
[tool.isort]
1414
profile = "black"

sdk/ml/azure-ai-ml/scripts/run_tests.py

Lines changed: 77 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
# ---------------------------------------------------------
44
import argparse
55
import contextlib
6+
import glob
67
import json
78
import os
89
import re
@@ -88,24 +89,34 @@ def run_simple(
8889
extra_params,
8990
*,
9091
is_live_and_recording,
91-
log_file_path=None,
92+
log_file_path,
93+
log_suffix=None,
9294
log_in_json=False,
9395
):
9496
print(f"Running {len(tests_to_run)} tests under {working_dir}: ")
9597
for test_name in tests_to_run:
9698
print(test_name)
9799

98-
if log_in_json or log_file_path is None:
100+
if log_file_path and log_suffix:
101+
log_file_path = log_file_path.with_suffix(log_file_path.suffix + log_suffix)
102+
103+
if log_in_json:
104+
if log_file_path is None:
105+
raise ValueError("log_file_path must be specified when log_in_json is True")
99106
stdout = None
107+
json_log_file_path = log_file_path.with_suffix(log_file_path.suffix + ".log")
100108
else:
101-
stdout = open(log_file_path, "wb")
109+
stdout = open(log_file_path.with_suffix(log_file_path.suffix + ".txt"), "wb")
110+
json_log_file_path = None
111+
102112
with update_dot_env_file(
103113
{"AZURE_TEST_RUN_LIVE": is_live_and_recording, "AZURE_SKIP_LIVE_RECORDING": not is_live_and_recording},
104114
):
105115
for test_class, keyword_param in reorganize_tests(tests_to_run):
106116
tmp_extra_params = extra_params + keyword_param
107117
if log_in_json:
108-
temp_log_file_path = log_file_path.with_stem("temp")
118+
# use a temp json file to avoid overwriting the final log file
119+
temp_log_file_path = json_log_file_path.with_stem("temp")
109120
tmp_extra_params += ["--report-log", temp_log_file_path.as_posix()]
110121

111122
subprocess.run(
@@ -120,10 +131,14 @@ def run_simple(
120131
stdout=stdout,
121132
)
122133
if log_in_json:
123-
with open(log_file_path, "a", encoding="utf-8") as f:
134+
# append temp json file to the final log file
135+
with open(json_log_file_path, "a", encoding="utf-8") as f:
124136
f.write(temp_log_file_path.read_text())
125137
if stdout is not None:
126138
stdout.close()
139+
print(log_file_path.with_suffix(log_file_path.suffix + ".txt").read_text())
140+
141+
return json_log_file_path
127142

128143

129144
def reorganize_tests(tests_to_run):
@@ -168,59 +183,80 @@ def reorganize_tests(tests_to_run):
168183
yield test_class, keyword_param
169184

170185

171-
def run_tests(tests_to_run, extras, *, skip_first_run=False, record_mismatch=False, is_live_and_recording=False):
186+
def get_base_log_path(working_dir, *, create_new=True):
187+
log_dir = working_dir / "scripts" / "tmp"
188+
if not create_new:
189+
logs = sorted(glob.glob(str(log_dir / "pytest.*.first.log")))
190+
if len(logs) == 0:
191+
raise RuntimeError("No previous run log file found")
192+
return Path(logs[-1][: -len(".first.log")])
193+
else:
194+
log_file_path = log_dir / "pytest.{}".format(datetime.now().strftime("%Y%m%d%H%M%S"))
195+
log_file_path.parent.mkdir(parents=True, exist_ok=True)
196+
return log_file_path
197+
198+
199+
def get_failed_tests(log_file_path):
200+
tests_failed_with_recording_mismatch = []
201+
failed_tests = []
202+
with open(log_file_path, "r") as f:
203+
for line in f:
204+
node = json.loads(line)
205+
if "outcome" not in node:
206+
continue
207+
if node["outcome"] != "failed":
208+
continue
209+
test_name = location_to_test_name(node["location"])
210+
failed_tests.append(test_name)
211+
msg = node["longrepr"]["reprcrash"]["message"]
212+
if "NotFound" in msg:
213+
tests_failed_with_recording_mismatch.append(test_name)
214+
return failed_tests, tests_failed_with_recording_mismatch
215+
216+
217+
def run_tests(tests_to_run, extras, *, skip_first_run=False, record_mismatch=False):
172218
working_dir = Path(__file__).parent.parent
173-
log_file_path = working_dir / "scripts" / "tmp" / "pytest_first_run.log"
174-
log_file_path.parent.mkdir(parents=True, exist_ok=True)
175-
if record_mismatch and not skip_first_run:
176-
# reset the log file
177-
log_file_path.unlink(missing_ok=True)
178-
179-
if not (record_mismatch and skip_first_run):
180-
# first run
181-
run_simple(
219+
log_file_path = get_base_log_path(working_dir, create_new=not skip_first_run)
220+
221+
if skip_first_run:
222+
json_log_file_path = log_file_path.with_suffix(log_file_path.suffix + ".first.log")
223+
else:
224+
json_log_file_path = run_simple(
182225
tests_to_run,
183226
working_dir,
184227
extras + ["--disable-warnings", "--disable-pytest-warnings"],
185-
# first run in record-mismatch mode is always in playback mode
186-
is_live_and_recording=is_live_and_recording and not record_mismatch,
187-
log_file_path=log_file_path if record_mismatch else None,
188-
log_in_json=record_mismatch,
228+
# first run is always in playback mode
229+
is_live_and_recording=False,
230+
log_file_path=log_file_path,
231+
log_in_json=True,
232+
log_suffix=".first",
189233
)
190234

191235
if record_mismatch:
192-
tests_failed_with_recording_mismatch = []
193-
with open(log_file_path, "r") as f:
194-
for line in f:
195-
node = json.loads(line)
196-
if "outcome" not in node:
197-
continue
198-
if node["outcome"] != "failed":
199-
continue
200-
msg = node["longrepr"]["reprcrash"]["message"]
201-
if "ResourceNotFoundError" in msg:
202-
tests_failed_with_recording_mismatch.append(location_to_test_name(node["location"]))
203-
236+
failed_tests, tests_failed_with_recording_mismatch = get_failed_tests(json_log_file_path)
204237
if tests_failed_with_recording_mismatch:
205238
print("Re-do live mode recording for tests: \n", json.dumps(tests_failed_with_recording_mismatch, indent=2))
206239
run_simple(
207240
tests_failed_with_recording_mismatch,
208241
working_dir,
209242
extra_params=["--tb=line"],
210243
is_live_and_recording=True,
244+
log_suffix=".record",
245+
log_file_path=log_file_path,
211246
)
212247

213-
# re-run the original tests to check if they are still failures and output the log
248+
print(
249+
"Rerun playback mode for failed tests: \n", json.dumps(tests_failed_with_recording_mismatch, indent=2)
250+
)
214251
run_simple(
215-
tests_to_run,
252+
failed_tests,
216253
working_dir,
217-
extras + ["--disable-warnings", "--disable-pytest-warnings"],
254+
extra_params=extras + ["--disable-warnings", "--disable-pytest-warnings"],
218255
is_live_and_recording=False,
219-
log_file_path=working_dir
220-
/ "scripts"
221-
/ "tmp"
222-
/ "pytest.{}.log".format(datetime.now().strftime("%Y%m%d%H%M%S")),
256+
log_file_path=log_file_path,
257+
log_suffix=".final",
223258
)
259+
print(log_file_path.with_suffix(log_file_path.suffix + ".final.log").read_text())
224260

225261

226262
if __name__ == "__main__":
@@ -263,6 +299,9 @@ def run_tests(tests_to_run, extras, *, skip_first_run=False, record_mismatch=Fal
263299
_tests = load_tests_from_file(_args.file)
264300
elif _args.name:
265301
_tests = [_args.name]
302+
elif _args.skip_first_run and _args.record_mismatch:
303+
# load failed tests from last run log
304+
_tests = []
266305
else:
267306
raise ValueError("Must specify either --file or --name")
268307
run_tests(

sdk/ml/azure-ai-ml/tests/dsl/e2etests/test_dsl_pipeline_samples.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -380,7 +380,8 @@ def test_spark_job_with_builder_in_pipeline_with_dynamic_allocation_disabled(
380380
validation_result = client.jobs.validate(pipeline)
381381
assert validation_result.passed is False
382382
assert validation_result.error_messages == {
383-
"jobs.add_greeting_column": "Should not specify min or max executors when dynamic allocation is disabled.",
383+
"jobs.add_greeting_column.conf": "Should not specify min or max executors "
384+
"when dynamic allocation is disabled.",
384385
}
385386

386387
@pytest.mark.e2etest

sdk/ml/azure-ai-ml/tests/dsl/unittests/test_command_builder.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -780,7 +780,7 @@ def test_spark_job_with_dynamic_allocation_disabled(self):
780780
)
781781
result = node._validate()
782782
message = "Should not specify min or max executors when dynamic allocation is disabled."
783-
assert "*" in result.error_messages and message == result.error_messages["*"]
783+
assert "conf" in result.error_messages and message == result.error_messages["conf"]
784784

785785
def test_executor_instances_is_mandatory_when_dynamic_allocation_disabled(self):
786786
node = spark(
@@ -797,7 +797,7 @@ def test_executor_instances_is_mandatory_when_dynamic_allocation_disabled(self):
797797
"spark.driver.cores, spark.driver.memory, spark.executor.cores, spark.executor.memory and "
798798
"spark.executor.instances are mandatory fields."
799799
)
800-
assert "*" in result.error_messages and message == result.error_messages["*"]
800+
assert "conf" in result.error_messages and message == result.error_messages["conf"]
801801

802802
def test_executor_instances_is_specified_as_min_executor_if_unset(self):
803803
node = spark(
@@ -834,7 +834,7 @@ def test_excutor_instances_throw_error_when_out_of_range(self):
834834
"Executor instances must be a valid non-negative integer and must be between "
835835
"spark.dynamicAllocation.minExecutors and spark.dynamicAllocation.maxExecutors"
836836
)
837-
assert "*" in result.error_messages and message == result.error_messages["*"]
837+
assert "conf" in result.error_messages and message == result.error_messages["conf"]
838838

839839
def test_spark_job_with_additional_conf(self):
840840
node = spark(

0 commit comments

Comments
 (0)