1111from azure .ai .ml ._azure_environments import ENDPOINT_URLS , _get_cloud_details
1212
1313from azure .storage .blob import generate_blob_sas , BlobSasPermissions
14+ from azure .storage .filedatalake import generate_file_sas , FileSasPermissions
1415from azure .ai .ml ._ml_exceptions import ValidationException
15- from azure .ai .ml ._operations import DatastoreOperations
16- from azure .ai .ml ._utils ._storage_utils import get_storage_client , STORAGE_ACCOUNT_URLS
16+ from azure .ai .ml .operations import DatastoreOperations
17+ from azure .ai .ml ._utils ._storage_utils import get_storage_client
1718from azure .ai .ml .entities import Environment
1819from azure .ai .ml .entities ._assets ._artifacts .artifact import Artifact , ArtifactStorageInfo
1920from azure .ai .ml .entities ._datastore .credentials import AccountKeyCredentials
3031 IgnoreFile ,
3132 _build_metadata_dict ,
3233)
33- from azure .ai .ml ._utils ._storage_utils import get_artifact_path_from_blob_url , AzureMLDatastorePathUri
34+ from azure .ai .ml ._utils ._storage_utils import get_artifact_path_from_storage_url , AzureMLDatastorePathUri
3435from azure .ai .ml ._scope_dependent_operations import OperationScope
3536from azure .ai .ml ._restclient .v2021_10_01 .models import (
3637 DatastoreType ,
3738)
3839from azure .ai .ml ._utils .utils import is_url , is_mlflow_uri
3940from azure .ai .ml ._utils ._arm_id_utils import AMLNamedArmId
40- from azure .ai .ml .constants import SHORT_URI_FORMAT
41+ from azure .ai .ml .constants import SHORT_URI_FORMAT , STORAGE_ACCOUNT_URLS
4142from azure .ai .ml .entities ._datastore ._constants import WORKSPACE_BLOB_STORE
42-
43+ from azure .ai .ml ._artifacts ._blob_storage_helper import BlobStorageClient
44+ from azure .ai .ml ._artifacts ._gen2_storage_helper import Gen2StorageClient
4345
4446module_logger = logging .getLogger (__name__ )
4547
@@ -98,40 +100,51 @@ def get_datastore_info(operations: DatastoreOperations, name: str) -> Dict[str,
98100 return datastore_info
99101
100102
101- def list_logs_in_datastore (ds_info : Dict [str , str ], blob_prefix : str , legacy_log_folder_name : str ) -> Dict [str , str ]:
103+ def list_logs_in_datastore (ds_info : Dict [str , str ], prefix : str , legacy_log_folder_name : str ) -> Dict [str , str ]:
102104 """
103- Returns a dictionary of file name to blob uri with SAS token, matching the structure of RunDetials .logFiles
105+ Returns a dictionary of file name to blob or data lake uri with SAS token, matching the structure of RunDetails .logFiles
104106
105107 legacy_log_folder_name: the name of the folder in the datastore that contains the logs
106108 /azureml-logs/*.txt is the legacy log structure for commandJob and sweepJob
107109 /logs/azureml/*.txt is the legacy log structure for pipeline parent Job
108110 """
109- # Only support blob storage for azureml log outputs
110- if ds_info [ "storage_type" ] != DatastoreType . AZURE_BLOB :
111- raise Exception ( "Can only list logs in blob storage" )
111+ if ds_info [ "storage_type" ] not in [ DatastoreType . AZURE_BLOB , DatastoreType . AZURE_DATA_LAKE_GEN2 ]:
112+ raise Exception ( "Only Blob and Azure DataLake Storage Gen2 datastores are supported." )
113+
112114 storage_client = get_storage_client (
113115 credential = ds_info ["credential" ],
114116 container_name = ds_info ["container_name" ],
115117 storage_account = ds_info ["storage_account" ],
116118 storage_type = ds_info ["storage_type" ],
117119 )
118- blobs = storage_client .list (starts_with = blob_prefix + "/user_logs/" )
120+
121+ items = storage_client .list (starts_with = prefix + "/user_logs/" )
119122 # Append legacy log files if present
120- blobs .extend (storage_client .list (starts_with = blob_prefix + legacy_log_folder_name ))
123+ items .extend (storage_client .list (starts_with = prefix + legacy_log_folder_name ))
121124
122125 log_dict = {}
123- for blob_name in blobs :
124- sub_name = blob_name .split (blob_prefix + "/" )[1 ]
125- token = generate_blob_sas (
126- account_name = ds_info ["storage_account" ],
127- container_name = ds_info ["container_name" ],
128- blob_name = blob_name ,
129- account_key = ds_info ["credential" ],
130- permission = BlobSasPermissions (read = True ),
131- expiry = datetime .utcnow () + timedelta (minutes = 30 ),
132- )
133-
134- log_dict [sub_name ] = "{}/{}/{}?{}" .format (ds_info ["account_url" ], ds_info ["container_name" ], blob_name , token )
126+ for item_name in items :
127+ sub_name = item_name .split (prefix + "/" )[1 ]
128+ if isinstance (storage_client , BlobStorageClient ):
129+ token = generate_blob_sas (
130+ account_name = ds_info ["storage_account" ],
131+ container_name = ds_info ["container_name" ],
132+ blob_name = item_name ,
133+ account_key = ds_info ["credential" ],
134+ permission = BlobSasPermissions (read = True ),
135+ expiry = datetime .utcnow () + timedelta (minutes = 30 ),
136+ )
137+ elif isinstance (storage_client , Gen2StorageClient ):
138+ token = generate_file_sas (
139+ account_name = ds_info ["storage_account" ],
140+ file_system_name = ds_info ["container_name" ],
141+ file_name = item_name ,
142+ credential = ds_info ["credential" ],
143+ permission = FileSasPermissions (read = True ),
144+ expiry = datetime .utcnow () + timedelta (minutes = 30 ),
145+ )
146+
147+ log_dict [sub_name ] = "{}/{}/{}?{}" .format (ds_info ["account_url" ], ds_info ["container_name" ], item_name , token )
135148 return log_dict
136149
137150
@@ -175,7 +188,9 @@ def upload_artifact(
175188 version = artifact_info ["version" ],
176189 relative_path = artifact_info ["remote path" ],
177190 datastore_arm_id = get_datastore_arm_id (datastore_name , operation_scope ) if not sas_uri else None ,
178- container_name = storage_client .container ,
191+ container_name = (
192+ storage_client .container if isinstance (storage_client , BlobStorageClient ) else storage_client .file_system
193+ ),
179194 storage_account_url = datastore_info .get ("account_url" ) if not sas_uri else sas_uri ,
180195 indicator_file = artifact_info ["indicator file" ],
181196 is_file = Path (local_path ).is_file (),
@@ -209,7 +224,7 @@ def download_artifact(
209224 return destination
210225
211226
212- def download_artifact_from_blob_url (
227+ def download_artifact_from_storage_url (
213228 blob_url : str ,
214229 destination : str ,
215230 datastore_operation : DatastoreOperations ,
@@ -220,7 +235,7 @@ def download_artifact_from_blob_url(
220235 """
221236 datastore_name = _get_datastore_name (datastore_name = datastore_name )
222237 datastore_info = get_datastore_info (datastore_operation , datastore_name )
223- starts_with = get_artifact_path_from_blob_url (
238+ starts_with = get_artifact_path_from_storage_url (
224239 blob_url = str (blob_url ), container_name = datastore_info .get ("container_name" )
225240 )
226241 return download_artifact (
@@ -317,13 +332,26 @@ def _upload_and_generate_remote_uri(
317332
318333def _update_metadata (name , version , indicator_file , datastore_info ) -> None :
319334 storage_client = get_storage_client (** datastore_info )
335+
336+ if isinstance (storage_client , BlobStorageClient ):
337+ _update_blob_metadata (name , version , indicator_file , storage_client )
338+ elif isinstance (storage_client , Gen2StorageClient ):
339+ _update_gen2_metadata (name , version , indicator_file , storage_client )
340+
341+
342+ def _update_blob_metadata (name , version , indicator_file , storage_client ) -> None :
320343 container_client = storage_client .container_client
321344 if indicator_file .startswith (storage_client .container ):
322345 indicator_file = indicator_file .split (storage_client .container )[1 ]
323346 blob = container_client .get_blob_client (blob = indicator_file )
324347 blob .set_blob_metadata (_build_metadata_dict (name = name , version = version ))
325348
326349
350+ def _update_gen2_metadata (name , version , indicator_file , storage_client ) -> None :
351+ artifact_directory_client = storage_client .file_system_client .get_directory_client (indicator_file )
352+ artifact_directory_client .set_metadata (_build_metadata_dict (name = name , version = version ))
353+
354+
327355T = TypeVar ("T" , bound = Artifact )
328356
329357
@@ -332,15 +360,13 @@ def _check_and_upload_path(
332360 asset_operations : Union ["DatasetOperations" , "DataOperations" , "ModelOperations" , "CodeOperations" ],
333361 datastore_name : str = None ,
334362 sas_uri : str = None ,
335- check_only : bool = False ,
336363) -> Tuple [T , str ]:
337364 """Checks whether `artifact` is a path or a uri and uploads it to the datastore if necessary.
338365 param T artifact: artifact to check and upload
339366 param Union["DatasetOperations", "DataOperations", "ModelOperations", "CodeOperations"] asset_operations:
340367 the asset operations to use for uploading
341368 param str datastore_name: the name of the datastore to upload to
342369 param str sas_uri: the sas uri to use for uploading
343- param bool check_only: if True, only checks whether the artifact is a valid path & won't upload it
344370 """
345371
346372 indicator_file = None
@@ -358,15 +384,8 @@ def _check_and_upload_path(
358384 if hasattr (artifact , "path" ) and artifact .path is not None
359385 else Path (artifact .local_path )
360386 )
361-
362387 if not path .is_absolute ():
363388 path = Path (artifact .base_path , path ).resolve ()
364-
365- if check_only :
366- if not path .is_dir ():
367- raise ValueError (f"{ path } is not a directory" )
368- return artifact , ""
369-
370389 uploaded_artifact = _upload_to_datastore (
371390 asset_operations ._operation_scope ,
372391 asset_operations ._datastore_operation ,
0 commit comments