Skip to content

Commit 57fa943

Browse files
authored
[ML][Pipelines] redo feat: flow in pipeline (Azure#31914)
* [ML][Pipelines] redo feat: load component from flow (Azure#31632) (Azure#31911) This reverts commit c6be14f. * fix: remove extra-index-url from requirements.txt * fix: resolve comments
1 parent 87c886d commit 57fa943

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

51 files changed

+2625
-159
lines changed

.vscode/cspell.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@
8888
"sdk/ml/azure-ai-ml/tests/**",
8989
"sdk/ml/azure-ai-ml/swagger/**",
9090
"sdk/ml/azure-ai-ml/NOTICE.txt",
91+
"sdk/ml/azure-ai-ml/.pre-commit-config.yaml",
9192
"sdk/loadtestservice/azure-developer-loadtesting/**",
9293
"sdk/translation/azure-ai-translation-text/azure/ai/translation/text/_serialization.py",
9394
"sdk/translation/azure-ai-translation-text/tests/test_break_sentence.py",

sdk/ml/azure-ai-ml/.pre-commit-config.yaml

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ repos:
2626
args: ['--settings-file', 'sdk/ml/azure-ai-ml/pyproject.toml']
2727
- repo: local
2828
hooks:
29-
- id: pylint
30-
name: pylint
29+
- id: pylint-dependencies-check
30+
name: pylint-dependencies-check
3131
entry: python
3232
language: system
3333
types: [python]
@@ -69,9 +69,9 @@ repos:
6969
print(f"Please run the following command to install the correct version of {packagename}")
7070
print(f"\tpython -m pip install {packagename}=={required_version} {' '.join(install_args)}")
7171
sys.exit(1)
72-
73-
# Run pylint
74-
os.execl(sys.executable, sys.executable, "-m", "pylint", *sys.argv[1:])
75-
# cspell:disable-next-line
76-
- "--rcfile=pylintrc"
77-
- "--output-format=parseable"
72+
- id: pylint
73+
name: pylint
74+
entry: python
75+
language: system
76+
args: [ -m, pylint, --rcfile=pylintrc, --output-format=parseable ]
77+
types: [python]

sdk/ml/azure-ai-ml/assets.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,5 @@
22
"AssetsRepo": "Azure/azure-sdk-assets",
33
"AssetsRepoPrefixPath": "python",
44
"TagPrefix": "python/ml/azure-ai-ml",
5-
"Tag": "python/ml/azure-ai-ml_d05969c2b4"
5+
"Tag": "python/ml/azure-ai-ml_039ca40c3d"
66
}

sdk/ml/azure-ai-ml/azure/ai/ml/_schema/component/__init__.py

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,21 +5,21 @@
55
__path__ = __import__("pkgutil").extend_path(__path__, __name__) # type: ignore
66

77
from .command_component import AnonymousCommandComponentSchema, CommandComponentSchema, ComponentFileRefField
8-
from .component import ComponentSchema
9-
from .import_component import AnonymousImportComponentSchema, ImportComponentFileRefField, ImportComponentSchema
10-
from .parallel_component import AnonymousParallelComponentSchema, ParallelComponentFileRefField, ParallelComponentSchema
11-
from .spark_component import AnonymousSparkComponentSchema, SparkComponentFileRefField, SparkComponentSchema
8+
from .component import ComponentSchema, ComponentYamlRefField
129
from .data_transfer_component import (
1310
AnonymousDataTransferCopyComponentSchema,
11+
AnonymousDataTransferExportComponentSchema,
12+
AnonymousDataTransferImportComponentSchema,
1413
DataTransferCopyComponentFileRefField,
1514
DataTransferCopyComponentSchema,
16-
AnonymousDataTransferImportComponentSchema,
17-
DataTransferImportComponentFileRefField,
18-
DataTransferImportComponentSchema,
19-
AnonymousDataTransferExportComponentSchema,
2015
DataTransferExportComponentFileRefField,
2116
DataTransferExportComponentSchema,
17+
DataTransferImportComponentFileRefField,
18+
DataTransferImportComponentSchema,
2219
)
20+
from .import_component import AnonymousImportComponentSchema, ImportComponentFileRefField, ImportComponentSchema
21+
from .parallel_component import AnonymousParallelComponentSchema, ParallelComponentFileRefField, ParallelComponentSchema
22+
from .spark_component import AnonymousSparkComponentSchema, SparkComponentFileRefField, SparkComponentSchema
2323

2424
__all__ = [
2525
"ComponentSchema",
@@ -44,4 +44,5 @@
4444
"AnonymousDataTransferExportComponentSchema",
4545
"DataTransferExportComponentFileRefField",
4646
"DataTransferExportComponentSchema",
47+
"ComponentYamlRefField",
4748
]

sdk/ml/azure-ai-ml/azure/ai/ml/_schema/component/component.py

Lines changed: 51 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
# ---------------------------------------------------------
22
# Copyright (c) Microsoft Corporation. All rights reserved.
33
# ---------------------------------------------------------
4+
from pathlib import Path
45

5-
from marshmallow import fields, post_dump, pre_dump, pre_load
6+
from marshmallow import ValidationError, fields, post_dump, pre_dump, pre_load
7+
from marshmallow.fields import Field
68

79
from azure.ai.ml._schema.component.input_output import InputPortSchema, OutputPortSchema, ParameterSchema
810
from azure.ai.ml._schema.core.fields import (
@@ -13,8 +15,8 @@
1315
UnionField,
1416
)
1517
from azure.ai.ml._schema.core.intellectual_property import IntellectualPropertySchema
16-
from azure.ai.ml._utils.utils import is_private_preview_enabled
17-
from azure.ai.ml.constants._common import AzureMLResourceType
18+
from azure.ai.ml._utils.utils import is_private_preview_enabled, load_yaml
19+
from azure.ai.ml.constants._common import BASE_PATH_CONTEXT_KEY, AzureMLResourceType
1820

1921
from .._utils.utils import _resolve_group_inputs_for_component
2022
from ..assets.asset import AssetSchema
@@ -26,6 +28,52 @@ def _get_field_name(self):
2628
return "Component"
2729

2830

31+
class ComponentYamlRefField(Field):
32+
"""Allows you to nest a :class:`Schema <marshmallow.Schema>`
33+
inside a yaml ref field.
34+
"""
35+
36+
def _jsonschema_type_mapping(self):
37+
schema = {"type": "string"}
38+
if self.name is not None:
39+
schema["title"] = self.name
40+
if self.dump_only:
41+
schema["readonly"] = True
42+
return schema
43+
44+
def _deserialize(self, value, attr, data, **kwargs):
45+
if not isinstance(value, str):
46+
raise ValidationError(f"Nested yaml ref field expected a string but got {type(value)}.")
47+
48+
base_path = Path(self.context[BASE_PATH_CONTEXT_KEY])
49+
50+
source_path = Path(value)
51+
# raise if the string is not a valid path, like "azureml:xxx"
52+
try:
53+
source_path.resolve()
54+
except OSError as ex:
55+
raise ValidationError(f"Nested file ref field expected a local path but got {value}.") from ex
56+
57+
if not source_path.is_absolute():
58+
source_path = base_path / source_path
59+
60+
if not source_path.is_file():
61+
raise ValidationError(
62+
f"Nested yaml ref field expected a local path but can't find {value} based on {base_path.as_posix()}."
63+
)
64+
65+
loaded_value = load_yaml(source_path)
66+
67+
# local import to avoid circular import
68+
from azure.ai.ml.entities import Component
69+
70+
component = Component._load(data=loaded_value, yaml_path=source_path) # pylint: disable=protected-access
71+
return component
72+
73+
def _serialize(self, value, attr, obj, **kwargs):
74+
raise ValidationError("Serialize on RefField is not supported.")
75+
76+
2977
class ComponentSchema(AssetSchema):
3078
schema = fields.Str(data_key="$schema", attribute="_schema")
3179
name = ComponentNameStr(required=True)
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
# ---------------------------------------------------------
2+
# Copyright (c) Microsoft Corporation. All rights reserved.
3+
# ---------------------------------------------------------
4+
5+
# pylint: disable=unused-argument,protected-access,no-member
6+
7+
from marshmallow import fields
8+
9+
from azure.ai.ml._schema import YamlFileSchema
10+
from azure.ai.ml._schema.component import ComponentSchema
11+
from azure.ai.ml._schema.component.component import ComponentNameStr
12+
from azure.ai.ml._schema.core.fields import ArmVersionedStr, LocalPathField, StringTransformedEnum, UnionField
13+
from azure.ai.ml._schema.core.schema_meta import PatchedSchemaMeta
14+
from azure.ai.ml.constants._common import AzureMLResourceType
15+
from azure.ai.ml.constants._component import NodeType
16+
17+
18+
class _ComponentMetadataSchema(metaclass=PatchedSchemaMeta):
19+
"""Schema to recognize metadata of a flow as a component."""
20+
21+
name = ComponentNameStr()
22+
version = fields.Str()
23+
display_name = fields.Str()
24+
description = fields.Str()
25+
tags = fields.Dict(keys=fields.Str(), values=fields.Str())
26+
is_deterministic = fields.Bool()
27+
28+
29+
class _FlowAttributesSchema(metaclass=PatchedSchemaMeta):
30+
"""Schema to recognize attributes of a flow."""
31+
32+
variant = fields.Str()
33+
column_mappings = fields.Dict(
34+
fields.Str(),
35+
fields.Str(),
36+
)
37+
connections = fields.Dict(
38+
keys=fields.Str(),
39+
values=fields.Dict(
40+
keys=fields.Str(),
41+
values=fields.Str(),
42+
),
43+
)
44+
environment_variables = fields.Dict(
45+
fields.Str(),
46+
fields.Str(),
47+
)
48+
49+
50+
class FlowSchema(YamlFileSchema, _ComponentMetadataSchema):
51+
"""Schema for flow.dag.yaml file."""
52+
53+
additional_includes = fields.List(LocalPathField())
54+
55+
56+
class RunSchema(YamlFileSchema, _ComponentMetadataSchema, _FlowAttributesSchema):
57+
"""Schema for run.yaml file."""
58+
59+
flow = LocalPathField(required=True)
60+
61+
62+
class FlowComponentSchema(ComponentSchema, _FlowAttributesSchema):
63+
"""FlowSchema and FlowRunSchema are used to load flow while FlowComponentSchema is used to dump flow."""
64+
65+
class Meta:
66+
"""Override this to exclude inputs & outputs as component doesn't have them."""
67+
68+
exclude = ["inputs", "outputs"] # component doesn't have inputs & outputs
69+
70+
# TODO: name should be required?
71+
name = ComponentNameStr()
72+
73+
type = StringTransformedEnum(allowed_values=[NodeType.FLOW_PARALLEL], required=True)
74+
75+
# name, version, tags, display_name and is_deterministic are inherited from ComponentSchema
76+
properties = fields.Dict(
77+
fields.Str(),
78+
fields.Str(),
79+
)
80+
81+
# this is different from regular CodeField
82+
code = UnionField(
83+
[
84+
LocalPathField(),
85+
ArmVersionedStr(azureml_type=AzureMLResourceType.CODE),
86+
],
87+
metadata={"description": "A local path or http:, https:, azureml: url pointing to a remote location."},
88+
)
89+
additional_includes = fields.List(LocalPathField(), load_only=True)

sdk/ml/azure-ai-ml/azure/ai/ml/_schema/pipeline/component_job.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
AnonymousParallelComponentSchema,
1616
AnonymousSparkComponentSchema,
1717
ComponentFileRefField,
18+
ComponentYamlRefField,
1819
DataTransferCopyComponentFileRefField,
1920
ImportComponentFileRefField,
2021
ParallelComponentFileRefField,
@@ -28,6 +29,7 @@
2829
from ...exceptions import ValidationException
2930
from .._sweep.parameterized_sweep import ParameterizedSweepSchema
3031
from .._utils.data_binding_expression import support_data_binding_expression_for_fields
32+
from ..component.flow import FlowComponentSchema
3133
from ..core.fields import (
3234
ArmVersionedStr,
3335
ComputeField,
@@ -252,6 +254,10 @@ class ParallelSchema(BaseNodeSchema, ParameterizedParallelSchema):
252254
# component file reference
253255
ParallelComponentFileRefField(),
254256
],
257+
NodeType.FLOW_PARALLEL: [
258+
NestedField(FlowComponentSchema, unknown=INCLUDE, dump_only=True),
259+
ComponentYamlRefField(),
260+
],
255261
},
256262
plain_union_fields=[
257263
# for registry type assets

sdk/ml/azure-ai-ml/azure/ai/ml/constants/_common.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -381,6 +381,18 @@ class CommonYamlFields:
381381
"""Schema."""
382382

383383

384+
class SchemaUrl:
385+
"""Schema urls.
386+
387+
Schema urls will be used in VSCode extension to validate yaml files. It will also be used to identify the
388+
corresponding entity type of a yaml file, especially for some internal yaml files.
389+
"""
390+
391+
PROMPTFLOW_PREFIX = "https://azuremlschemas.azureedge.net/promptflow/"
392+
PROMPTFLOW_FLOW = PROMPTFLOW_PREFIX + "latest/Flow.schema.json"
393+
PROMPTFLOW_RUN = PROMPTFLOW_PREFIX + "latest/Run.schema.json"
394+
395+
384396
class GitProperties:
385397
"""GitProperties is a class that defines the constants used by the SDK/CLI for Git operations.
386398

sdk/ml/azure-ai-ml/azure/ai/ml/constants/_component.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ class NodeType(object):
2323
IMPORT = "import"
2424
SPARK = "spark"
2525
DATA_TRANSFER = "data_transfer"
26+
FLOW_PARALLEL = "promptflow_parallel"
2627
# Note: container is not a real component type,
2728
# only used to mark component from container data.
2829
_CONTAINER = "_container"

sdk/ml/azure-ai-ml/azure/ai/ml/entities/_builders/parallel.py

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from ...constants._common import ARM_ID_PREFIX
1818
from ...constants._component import NodeType
1919
from .._component.component import Component
20+
from .._component.flow import FlowComponent
2021
from .._component.parallel_component import ParallelComponent
2122
from .._inputs_outputs import Input, Output
2223
from .._job.job_resource_configuration import JobResourceConfiguration
@@ -122,15 +123,28 @@ def __init__(
122123
validate_attribute_type(attrs_to_check=locals(), attr_type_map=self._attr_type_map())
123124
kwargs.pop("type", None)
124125

125-
BaseNode.__init__(
126-
self,
127-
type=NodeType.PARALLEL,
128-
component=component,
129-
inputs=inputs,
130-
outputs=outputs,
131-
compute=compute,
132-
**kwargs,
133-
)
126+
if isinstance(component, FlowComponent):
127+
# make input definition fit actual inputs for flow component
128+
with component._inputs._fit_inputs(inputs): # pylint: disable=protected-access
129+
BaseNode.__init__(
130+
self,
131+
type=NodeType.PARALLEL,
132+
component=component,
133+
inputs=inputs,
134+
outputs=outputs,
135+
compute=compute,
136+
**kwargs,
137+
)
138+
else:
139+
BaseNode.__init__(
140+
self,
141+
type=NodeType.PARALLEL,
142+
component=component,
143+
inputs=inputs,
144+
outputs=outputs,
145+
compute=compute,
146+
**kwargs,
147+
)
134148
# init mark for _AttrDict
135149
self._init = True
136150

@@ -309,7 +323,7 @@ def set_resources(
309323
@classmethod
310324
def _attr_type_map(cls) -> dict:
311325
return {
312-
"component": (str, ParallelComponent),
326+
"component": (str, ParallelComponent, FlowComponent),
313327
"retry_settings": (dict, RetrySettings),
314328
"resources": (dict, JobResourceConfiguration),
315329
"task": (dict, ParallelTask),

0 commit comments

Comments
 (0)