Skip to content

Commit 3bc5595

Browse files
author
Yalin Li
authored
[ACR] Support upload large blob in chunks (Azure#29382)
1 parent 8ece366 commit 3bc5595

File tree

10 files changed

+249
-135
lines changed

10 files changed

+249
-135
lines changed

sdk/containerregistry/azure-containerregistry/CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@
55
### Features Added
66

77
### Breaking Changes
8-
- Suppress the response error with status code `404` in `delete_blob()`.
8+
- Suppress the response error with status code `404` in `delete_blob()`.
9+
- Added to return blob size in bytes in `upload_blob()`.
910

1011
### Bugs Fixed
1112

sdk/containerregistry/azure-containerregistry/assets.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,5 @@
22
"AssetsRepo": "Azure/azure-sdk-assets",
33
"AssetsRepoPrefixPath": "python",
44
"TagPrefix": "python/containerregistry/azure-containerregistry",
5-
"Tag": "python/containerregistry/azure-containerregistry_b29296432a"
5+
"Tag": "python/containerregistry/azure-containerregistry_49dd591b0e"
66
}

sdk/containerregistry/azure-containerregistry/azure/containerregistry/_container_registry_client.py

Lines changed: 56 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
# Licensed under the MIT License.
55
# ------------------------------------
66
# pylint: disable=too-many-lines
7+
import hashlib
78
from io import BytesIO
89
from typing import Any, Dict, IO, Optional, overload, Union, cast, Tuple
910
from azure.core.credentials import TokenCredential
@@ -29,6 +30,7 @@
2930
OCI_MANIFEST_MEDIA_TYPE,
3031
SUPPORTED_API_VERSIONS,
3132
AZURE_RESOURCE_MANAGER_PUBLIC_CLOUD,
33+
DEFAULT_CHUNK_SIZE,
3234
)
3335
from ._models import (
3436
RepositoryProperties,
@@ -857,16 +859,15 @@ def upload_manifest(
857859
) -> str:
858860
"""Upload a manifest for an OCI artifact.
859861
860-
:param str repository: Name of the repository
862+
:param str repository: Name of the repository.
861863
:param manifest: The manifest to upload. Note: This must be a seekable stream.
862864
:type manifest: ~azure.containerregistry.models.OCIManifest or IO
863865
:keyword tag: Tag of the manifest.
864866
:paramtype tag: str or None
865867
:returns: The digest of the uploaded manifest, calculated by the registry.
866868
:rtype: str
867-
:raises ValueError: If the parameter repository or manifest is None.
868-
:raises ~azure.core.exceptions.HttpResponseError:
869-
If the digest in the response does not match the digest of the uploaded manifest.
869+
:raises ValueError: If the parameter repository or manifest is None,
870+
or the digest in the response does not match the digest of the uploaded manifest.
870871
"""
871872
try:
872873
if isinstance(manifest, OCIManifest):
@@ -889,86 +890,93 @@ def upload_manifest(
889890
digest = response_headers['Docker-Content-Digest']
890891
if not _validate_digest(data, digest):
891892
raise ValueError("The digest in the response does not match the digest of the uploaded manifest.")
892-
except ValueError:
893+
except Exception as e:
893894
if repository is None or manifest is None:
894-
raise ValueError("The parameter repository and manifest cannot be None.")
895+
raise ValueError("The parameter repository and manifest cannot be None.") from e
895896
raise
896897
return digest
897898

898899
@distributed_trace
899-
def upload_blob(self, repository: str, data: IO, **kwargs) -> str:
900+
def upload_blob(self, repository: str, data: IO[bytes], **kwargs) -> Tuple[str, int]:
900901
"""Upload an artifact blob.
901902
902-
:param str repository: Name of the repository
903+
:param str repository: Name of the repository.
903904
:param data: The blob to upload. Note: This must be a seekable stream.
904905
:type data: IO
905-
:returns: The digest of the uploaded blob, calculated by the registry.
906-
:rtype: str
906+
:returns: The digest and size in bytes of the uploaded blob.
907+
:rtype: Tuple[str, int]
907908
:raises ValueError: If the parameter repository or data is None.
908909
"""
909910
try:
910911
start_upload_response_headers = cast(Dict[str, str], self._client.container_registry_blob.start_upload(
911912
repository, cls=_return_response_headers, **kwargs
912913
))
913-
upload_chunk_response_headers = cast(Dict[str, str], self._client.container_registry_blob.upload_chunk(
914-
start_upload_response_headers['Location'],
915-
data,
916-
cls=_return_response_headers,
917-
**kwargs
918-
))
919-
digest = _compute_digest(data)
914+
digest, location, blob_size = self._upload_blob_chunk(
915+
start_upload_response_headers['Location'], data, **kwargs
916+
)
920917
complete_upload_response_headers = cast(
921918
Dict[str, str],
922919
self._client.container_registry_blob.complete_upload(
923920
digest=digest,
924-
next_link=upload_chunk_response_headers['Location'],
921+
next_link=location,
925922
cls=_return_response_headers,
926923
**kwargs
927924
)
928925
)
929-
except ValueError:
926+
except Exception as e:
930927
if repository is None or data is None:
931-
raise ValueError("The parameter repository and data cannot be None.")
928+
raise ValueError("The parameter repository and data cannot be None.") from e
932929
raise
933-
return complete_upload_response_headers['Docker-Content-Digest']
930+
return complete_upload_response_headers['Docker-Content-Digest'], blob_size
931+
932+
def _upload_blob_chunk(self, location: str, data: IO[bytes], **kwargs) -> Tuple[str, str, int]:
933+
hasher = hashlib.sha256()
934+
buffer = data.read(DEFAULT_CHUNK_SIZE)
935+
blob_size = len(buffer)
936+
while len(buffer) > 0:
937+
response_headers = cast(Dict[str, str], self._client.container_registry_blob.upload_chunk(
938+
location,
939+
BytesIO(buffer),
940+
cls=_return_response_headers,
941+
**kwargs
942+
))
943+
location = response_headers['Location']
944+
hasher.update(buffer)
945+
buffer = data.read(DEFAULT_CHUNK_SIZE)
946+
blob_size += len(buffer)
947+
return "sha256:" + hasher.hexdigest(), location, blob_size
934948

935949
@distributed_trace
936950
def download_manifest(self, repository: str, tag_or_digest: str, **kwargs) -> DownloadManifestResult:
937951
"""Download the manifest for an OCI artifact.
938952
939-
:param str repository: Name of the repository
953+
:param str repository: Name of the repository.
940954
:param str tag_or_digest: The tag or digest of the manifest to download.
941955
When digest is provided, will use this digest to compare with the one calculated by the response payload.
942956
When tag is provided, will use the digest in response headers to compare.
943957
:returns: DownloadManifestResult
944958
:rtype: ~azure.containerregistry.models.DownloadManifestResult
945-
:raises ValueError: If the parameter repository or tag_or_digest is None.
946-
:raises ~azure.core.exceptions.HttpResponseError:
947-
If the requested digest does not match the digest of the received manifest.
959+
:raises ValueError: If the requested digest does not match the digest of the received manifest.
948960
"""
949-
try:
950-
response, manifest_wrapper = cast(
951-
Tuple[PipelineResponse, ManifestWrapper],
952-
self._client.container_registry.get_manifest(
953-
name=repository,
954-
reference=tag_or_digest,
955-
headers={"Accept": OCI_MANIFEST_MEDIA_TYPE},
956-
cls=_return_response_and_deserialized,
957-
**kwargs
958-
)
961+
response, manifest_wrapper = cast(
962+
Tuple[PipelineResponse, ManifestWrapper],
963+
self._client.container_registry.get_manifest(
964+
name=repository,
965+
reference=tag_or_digest,
966+
headers={"Accept": OCI_MANIFEST_MEDIA_TYPE},
967+
cls=_return_response_and_deserialized,
968+
**kwargs
959969
)
960-
manifest = OCIManifest.deserialize(cast(ManifestWrapper, manifest_wrapper).serialize())
961-
manifest_stream = _serialize_manifest(manifest)
962-
if tag_or_digest.startswith("sha256:"):
963-
digest = tag_or_digest
964-
else:
965-
digest = response.http_response.headers['Docker-Content-Digest']
966-
if not _validate_digest(manifest_stream, digest):
967-
raise ValueError("The requested digest does not match the digest of the received manifest.")
968-
except ValueError:
969-
if repository is None or tag_or_digest is None:
970-
raise ValueError("The parameter repository and tag_or_digest cannot be None.")
971-
raise
970+
)
971+
manifest = OCIManifest.deserialize(cast(ManifestWrapper, manifest_wrapper).serialize())
972+
manifest_stream = _serialize_manifest(manifest)
973+
if tag_or_digest.startswith("sha256:"):
974+
digest = tag_or_digest
975+
else:
976+
digest = response.http_response.headers['Docker-Content-Digest']
977+
if not _validate_digest(manifest_stream, digest):
978+
raise ValueError("The requested digest does not match the digest of the received manifest.")
979+
972980
return DownloadManifestResult(digest=digest, data=manifest_stream, manifest=manifest)
973981

974982
@distributed_trace
@@ -979,14 +987,8 @@ def download_blob(self, repository: str, digest: str, **kwargs) -> DownloadBlobR
979987
:param str digest: The digest of the blob to download.
980988
:returns: DownloadBlobResult
981989
:rtype: ~azure.containerregistry.DownloadBlobResult
982-
:raises ValueError: If the parameter repository or digest is None.
983990
"""
984-
try:
985-
deserialized = self._client.container_registry_blob.get_blob(repository, digest, **kwargs)
986-
except ValueError:
987-
if repository is None or digest is None:
988-
raise ValueError("The parameter repository and digest cannot be None.")
989-
raise
991+
deserialized = self._client.container_registry_blob.get_blob(repository, digest, **kwargs)
990992

991993
blob_content = b''
992994
for chunk in deserialized: # type: ignore

sdk/containerregistry/azure-containerregistry/azure/containerregistry/_helpers.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
"2021-07-01"
2424
]
2525
OCI_MANIFEST_MEDIA_TYPE = "application/vnd.oci.image.manifest.v1+json"
26+
DEFAULT_CHUNK_SIZE = 4 * 1024 * 1024 # 4MB
2627

2728
# Public cloud audience
2829
AZURE_RESOURCE_MANAGER_PUBLIC_CLOUD = "https://management.azure.com"

sdk/containerregistry/azure-containerregistry/azure/containerregistry/aio/_async_authentication_policy.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
# Licensed under the MIT License.
44
# ------------------------------------
55
from typing import Union, Optional
6+
from io import SEEK_SET, UnsupportedOperation
67

78
from azure.core.credentials_async import AsyncTokenCredential
89
from azure.core.pipeline import PipelineRequest, PipelineResponse
@@ -49,6 +50,14 @@ async def send(self, request: PipelineRequest) -> PipelineResponse:
4950
if response.http_response.status_code == 401:
5051
challenge = response.http_response.headers.get("WWW-Authenticate")
5152
if challenge and await self.on_challenge(request, response, challenge):
53+
if request.http_request.body and hasattr(request.http_request.body, 'read'):
54+
try:
55+
# attempt to rewind the body to the initial position
56+
request.http_request.body.seek(0, SEEK_SET)
57+
except (UnsupportedOperation, ValueError, AttributeError):
58+
# if body is not seekable, then retry would not work
59+
return response
60+
5261
response = await self.next.send(request)
5362

5463
return response

sdk/containerregistry/azure-containerregistry/azure/containerregistry/aio/_async_container_registry_client.py

Lines changed: 81 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@
33
# Copyright (c) Microsoft Corporation.
44
# Licensed under the MIT License.
55
# ------------------------------------
6-
from typing import Any, Dict, Optional, overload, Union, cast
7-
6+
import hashlib
7+
from io import BytesIO
8+
from typing import Any, Dict, IO, Optional, overload, Union, cast, Tuple
89
from azure.core.async_paging import AsyncItemPaged, AsyncList
910
from azure.core.credentials_async import AsyncTokenCredential
1011
from azure.core.exceptions import (
@@ -18,11 +19,26 @@
1819
from azure.core.tracing.decorator_async import distributed_trace_async
1920

2021
from ._async_base_client import ContainerRegistryBaseClient
22+
from .._container_registry_client import _return_response_headers
2123
from .._generated.models import AcrErrors
22-
from .._helpers import _is_tag, _parse_next_link, SUPPORTED_API_VERSIONS, AZURE_RESOURCE_MANAGER_PUBLIC_CLOUD
24+
from .._helpers import (
25+
_is_tag,
26+
_parse_next_link,
27+
SUPPORTED_API_VERSIONS,
28+
AZURE_RESOURCE_MANAGER_PUBLIC_CLOUD,
29+
DEFAULT_CHUNK_SIZE,
30+
)
2331
from .._models import RepositoryProperties, ArtifactManifestProperties, ArtifactTagProperties
2432

2533

34+
class _UnclosableBytesIO(BytesIO):
35+
def close(self):
36+
pass
37+
38+
def manual_close(self):
39+
super().close()
40+
41+
2642
class ContainerRegistryClient(ContainerRegistryBaseClient):
2743
def __init__(
2844
self,
@@ -839,6 +855,68 @@ async def update_tag_properties(
839855
repository=repository
840856
)
841857

858+
@distributed_trace_async
859+
async def upload_blob(self, repository: str, data: IO[bytes], **kwargs) -> Tuple[str, int]:
860+
"""Upload an artifact blob.
861+
862+
:param str repository: Name of the repository.
863+
:param data: The blob to upload. Note: This must be a seekable stream.
864+
:type data: IO
865+
:returns: The digest and size in bytes of the uploaded blob.
866+
:rtype: Tuple[str, int]
867+
:raises ValueError: If the parameter repository or data is None.
868+
"""
869+
try:
870+
start_upload_response_headers = cast(
871+
Dict[str, str],
872+
await self._client.container_registry_blob.start_upload(
873+
repository, cls=_return_response_headers, **kwargs
874+
)
875+
)
876+
digest, location, blob_size = await self._upload_blob_chunk(
877+
start_upload_response_headers['Location'], data, **kwargs
878+
)
879+
complete_upload_response_headers = cast(
880+
Dict[str, str],
881+
await self._client.container_registry_blob.complete_upload(
882+
digest=digest,
883+
next_link=location,
884+
cls=_return_response_headers,
885+
**kwargs
886+
)
887+
)
888+
except Exception as e:
889+
if repository is None or data is None:
890+
raise ValueError("The parameter repository and data cannot be None.") from e
891+
raise
892+
return complete_upload_response_headers['Docker-Content-Digest'], blob_size
893+
894+
async def _upload_blob_chunk(self, location: str, data: IO[bytes], **kwargs) -> Tuple[str, str, int]:
895+
hasher = hashlib.sha256()
896+
blob_size = 0
897+
buffer = data.read(DEFAULT_CHUNK_SIZE)
898+
899+
while len(buffer) > 0:
900+
try:
901+
buffer_stream = _UnclosableBytesIO(buffer)
902+
response_headers = cast(
903+
Dict[str, str],
904+
await self._client.container_registry_blob.upload_chunk(
905+
location,
906+
buffer_stream,
907+
cls=_return_response_headers,
908+
**kwargs
909+
)
910+
)
911+
blob_size += len(buffer)
912+
hasher.update(buffer)
913+
location = response_headers['Location']
914+
buffer = data.read(DEFAULT_CHUNK_SIZE)
915+
finally:
916+
buffer_stream.manual_close()
917+
918+
return "sha256:" + hasher.hexdigest(), location, blob_size
919+
842920
@distributed_trace_async
843921
async def delete_manifest(self, repository: str, tag_or_digest: str, **kwargs) -> None:
844922
"""Delete a manifest. If the manifest cannot be found or a response status code of

sdk/containerregistry/azure-containerregistry/samples/sample_upload_download_blob.py

Lines changed: 0 additions & 52 deletions
This file was deleted.

0 commit comments

Comments
 (0)