Skip to content

Commit b7d4785

Browse files
Add read method to StorageStreamDownloader (Azure#24275)
1 parent 11edc4a commit b7d4785

File tree

46 files changed

+22852
-240
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+22852
-240
lines changed

sdk/storage/azure-storage-blob/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ This version and all future versions will require Python 3.7+. Python 3.6 is no
66

77
### Features Added
88
- Added support for `AzureNamedKeyCredential` as a valid `credential` type.
9+
- Added standard `read` method to `StorageStreamDownloader`.
10+
- Added support for async streams (classes with an async `read` method) to async `upload_blob`.
911

1012
### Bugs Fixed
1113
- Removed dead retry meachism from async `azure.storage.blob.aio.StorageStreamDownloader`.

sdk/storage/azure-storage-blob/azure/storage/blob/_download.py

Lines changed: 113 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
import time
1010
import warnings
1111
from io import BytesIO
12-
from typing import Generic, Iterator, TypeVar
12+
from typing import Generic, Iterator, Optional, TypeVar
1313

1414
from azure.core.exceptions import DecodeError, HttpResponseError, IncompleteReadError
1515
from azure.core.tracing.common import with_current_context
@@ -334,6 +334,7 @@ def __init__(
334334
self._non_empty_ranges = None
335335
self._response = None
336336
self._encryption_data = None
337+
self._offset = 0
337338

338339
# The cls is passed in via download_cls to avoid conflicting arg name with Generic.__new__
339340
# but needs to be changed to cls in the request options.
@@ -504,6 +505,17 @@ def _initial_request(self):
504505

505506
return response
506507

508+
def _get_downloader_start_with_offset(self):
509+
# Start where the initial request download ended
510+
start = self._initial_range[1] + 1
511+
# For encryption V2 only, adjust start to the end of the fetched data rather than download size
512+
if self._encryption_options.get("key") is not None or self._encryption_options.get("resolver") is not None:
513+
start = (self._start_range or 0) + len(self._current_content)
514+
515+
# Adjust the start based on any data read past the current content
516+
start += (self._offset - len(self._current_content))
517+
return start
518+
507519
def chunks(self):
508520
# type: () -> Iterator[bytes]
509521
"""Iterate over chunks in the download stream.
@@ -554,11 +566,91 @@ def chunks(self):
554566
downloader=iter_downloader,
555567
chunk_size=self._config.max_chunk_get_size)
556568

569+
def read(self, size: Optional[int] = -1) -> T:
570+
"""
571+
Read up to size bytes from the stream and return them. If size
572+
is unspecified or is -1, all bytes will be read.
573+
574+
:param size:
575+
The number of bytes to download from the stream. Leave unsepcified
576+
or set to -1 to download all bytes.
577+
:returns:
578+
The requsted data as bytes or a string if encoding was speicified. If
579+
the return value is empty, there is no more data to read.
580+
:rtype: T
581+
"""
582+
if size == -1:
583+
return self.readall()
584+
# Empty blob or already read to the end
585+
if size == 0 or self._offset >= self.size:
586+
return b'' if not self._encoding else ''
587+
588+
stream = BytesIO()
589+
remaining_size = size
590+
591+
# Start by reading from current_content if there is data left
592+
if self._offset < len(self._current_content):
593+
start = self._offset
594+
length = min(remaining_size, len(self._current_content) - self._offset)
595+
read = stream.write(self._current_content[start:start + length])
596+
597+
remaining_size -= read
598+
self._offset += read
599+
if self._progress_hook:
600+
self._progress_hook(self._offset, self.size)
601+
602+
if remaining_size > 0:
603+
start_range = self._get_downloader_start_with_offset()
604+
605+
# End is the min between the remaining size, the file size, and the end of the specified range
606+
end_range = min(start_range + remaining_size, self._file_size)
607+
if self._end_range is not None:
608+
end_range = min(end_range, self._end_range + 1)
609+
610+
parallel = self._max_concurrency > 1
611+
downloader = _ChunkDownloader(
612+
client=self._clients.blob,
613+
non_empty_ranges=self._non_empty_ranges,
614+
total_size=self.size,
615+
chunk_size=self._config.max_chunk_get_size,
616+
current_progress=self._offset,
617+
start_range=start_range,
618+
end_range=end_range,
619+
stream=stream,
620+
parallel=parallel,
621+
validate_content=self._validate_content,
622+
encryption_options=self._encryption_options,
623+
encryption_data=self._encryption_data,
624+
use_location=self._location_mode,
625+
progress_hook=self._progress_hook,
626+
**self._request_options
627+
)
628+
629+
if parallel and remaining_size > self._config.max_chunk_get_size:
630+
import concurrent.futures
631+
with concurrent.futures.ThreadPoolExecutor(self._max_concurrency) as executor:
632+
list(executor.map(
633+
with_current_context(downloader.process_chunk),
634+
downloader.get_chunk_offsets()
635+
))
636+
else:
637+
for chunk in downloader.get_chunk_offsets():
638+
downloader.process_chunk(chunk)
639+
640+
self._offset += remaining_size
641+
642+
data = stream.getvalue()
643+
if self._encoding:
644+
return data.decode(self._encoding)
645+
return data
646+
557647
def readall(self):
558648
# type: () -> T
559-
"""Download the contents of this blob.
560-
649+
"""
650+
Read the entire contents of this blob.
561651
This operation is blocking until all data is downloaded.
652+
653+
:returns: The requsted data as bytes or a string if encoding was speicified.
562654
:rtype: T
563655
"""
564656
stream = BytesIO()
@@ -625,30 +717,36 @@ def readinto(self, stream):
625717
except (NotImplementedError, AttributeError):
626718
raise ValueError(error_message)
627719

628-
# Write the content to the user stream
629-
stream.write(self._current_content)
630-
if self._progress_hook:
631-
self._progress_hook(len(self._current_content), self.size)
720+
# If some data has been streamed using `read`, only stream the remaining data
721+
remaining_size = self.size - self._offset
722+
# Already read to the end
723+
if remaining_size <= 0:
724+
return 0
725+
726+
# Write the content to the user stream if there is data left
727+
if self._offset < len(self._current_content):
728+
content = self._current_content[self._offset:]
729+
stream.write(content)
730+
self._offset += len(content)
731+
if self._progress_hook:
732+
self._progress_hook(len(content), self.size)
632733

633734
if self._download_complete:
634-
return self.size
735+
return remaining_size
635736

636737
data_end = self._file_size
637738
if self._end_range is not None:
638739
# Use the length unless it is over the end of the file
639740
data_end = min(self._file_size, self._end_range + 1)
640741

641-
data_start = self._initial_range[1] + 1 # Start where the first download ended
642-
# For encryption, adjust start to the end of the fetched data rather than download size
643-
if self._encryption_options.get("key") is not None or self._encryption_options.get("resolver") is not None:
644-
data_start = (self._start_range or 0) + len(self._current_content)
742+
data_start = self._get_downloader_start_with_offset()
645743

646744
downloader = _ChunkDownloader(
647745
client=self._clients.blob,
648746
non_empty_ranges=self._non_empty_ranges,
649747
total_size=self.size,
650748
chunk_size=self._config.max_chunk_get_size,
651-
current_progress=self._first_get_size,
749+
current_progress=self._offset,
652750
start_range=data_start,
653751
end_range=data_end,
654752
stream=stream,
@@ -670,7 +768,8 @@ def readinto(self, stream):
670768
else:
671769
for chunk in downloader.get_chunk_offsets():
672770
downloader.process_chunk(chunk)
673-
return self.size
771+
772+
return remaining_size
674773

675774
def download_to_stream(self, stream, max_concurrency=1):
676775
"""Download the contents of this blob to a stream.

sdk/storage/azure-storage-blob/azure/storage/blob/_shared/uploads_async.py

Lines changed: 35 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,26 @@
1919
from .uploads import SubStream, IterStreamer # pylint: disable=unused-import
2020

2121

22+
async def _async_parallel_uploads(uploader, pending, running):
23+
range_ids = []
24+
while True:
25+
# Wait for some download to finish before adding a new one
26+
done, running = await asyncio.wait(running, return_when=asyncio.FIRST_COMPLETED)
27+
range_ids.extend([chunk.result() for chunk in done])
28+
try:
29+
for _ in range(0, len(done)):
30+
next_chunk = await pending.__anext__()
31+
running.add(asyncio.ensure_future(uploader(next_chunk)))
32+
except StopAsyncIteration:
33+
break
34+
35+
# Wait for the remaining uploads to finish
36+
if running:
37+
done, _running = await asyncio.wait(running)
38+
range_ids.extend([chunk.result() for chunk in done])
39+
return range_ids
40+
41+
2242
async def _parallel_uploads(uploader, pending, running):
2343
range_ids = []
2444
while True:
@@ -65,14 +85,18 @@ async def upload_data_chunks(
6585

6686
if parallel:
6787
upload_tasks = uploader.get_chunk_streams()
68-
running_futures = [
69-
asyncio.ensure_future(uploader.process_chunk(u))
70-
for u in islice(upload_tasks, 0, max_concurrency)
71-
]
72-
range_ids = await _parallel_uploads(uploader.process_chunk, upload_tasks, running_futures)
88+
running_futures = []
89+
for _ in range(max_concurrency):
90+
try:
91+
chunk = await upload_tasks.__anext__()
92+
running_futures.append(asyncio.ensure_future(uploader.process_chunk(chunk)))
93+
except StopAsyncIteration:
94+
break
95+
96+
range_ids = await _async_parallel_uploads(uploader.process_chunk, upload_tasks, running_futures)
7397
else:
7498
range_ids = []
75-
for chunk in uploader.get_chunk_streams():
99+
async for chunk in uploader.get_chunk_streams():
76100
range_ids.append(await uploader.process_chunk(chunk))
77101

78102
if any(range_ids):
@@ -152,7 +176,7 @@ def __init__(
152176
self.last_modified = None
153177
self.request_options = kwargs
154178

155-
def get_chunk_streams(self):
179+
async def get_chunk_streams(self):
156180
index = 0
157181
while True:
158182
data = b''
@@ -162,7 +186,10 @@ def get_chunk_streams(self):
162186
while True:
163187
if self.total_size:
164188
read_size = min(self.chunk_size - len(data), self.total_size - (index + len(data)))
165-
temp = self.stream.read(read_size)
189+
if asyncio.iscoroutinefunction(self.stream.read):
190+
temp = await self.stream.read(read_size)
191+
else:
192+
temp = self.stream.read(read_size)
166193
if not isinstance(temp, six.binary_type):
167194
raise TypeError('Blob data should be of type bytes.')
168195
data += temp or b""

0 commit comments

Comments
 (0)