Skip to content

Commit 7e74cb3

Browse files
authored
[Fix]Add retry for streaming download (Azure#18164)
* [Fix]Add retry for streaming download * [Fix]Add retry for streaming download * Update _download.py * Update _download_async.py
1 parent f746a2a commit 7e74cb3

File tree

2 files changed

+169
-126
lines changed

2 files changed

+169
-126
lines changed

sdk/storage/azure-storage-blob/azure/storage/blob/_download.py

Lines changed: 92 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,15 @@
66

77
import sys
88
import threading
9+
import time
10+
911
import warnings
1012
from io import BytesIO
11-
1213
from typing import Iterator
13-
from azure.core.exceptions import HttpResponseError
14+
15+
import requests
16+
from azure.core.exceptions import HttpResponseError, ServiceResponseError
17+
1418
from azure.core.tracing.common import with_current_context
1519
from ._shared.encryption import decrypt_blob
1620
from ._shared.request_handlers import validate_and_format_range_headers
@@ -44,10 +48,9 @@ def process_range_and_offset(start_range, end_range, length, encryption):
4448
def process_content(data, start_offset, end_offset, encryption):
4549
if data is None:
4650
raise ValueError("Response cannot be None.")
47-
try:
48-
content = b"".join(list(data))
49-
except Exception as error:
50-
raise HttpResponseError(message="Download stream interrupted.", response=data.response, error=error)
51+
52+
content = b"".join(list(data))
53+
5154
if content and encryption.get("key") is not None or encryption.get("resolver") is not None:
5255
try:
5356
return decrypt_blob(
@@ -189,19 +192,29 @@ def _download_chunk(self, chunk_start, chunk_end):
189192
check_content_md5=self.validate_content
190193
)
191194

192-
try:
193-
_, response = self.client.download(
194-
range=range_header,
195-
range_get_content_md5=range_validation,
196-
validate_content=self.validate_content,
197-
data_stream_total=self.total_size,
198-
download_stream_current=self.progress_total,
199-
**self.request_options
200-
)
201-
except HttpResponseError as error:
202-
process_storage_error(error)
195+
retry_active = True
196+
retry_total = 3
197+
while retry_active:
198+
try:
199+
_, response = self.client.download(
200+
range=range_header,
201+
range_get_content_md5=range_validation,
202+
validate_content=self.validate_content,
203+
data_stream_total=self.total_size,
204+
download_stream_current=self.progress_total,
205+
**self.request_options
206+
)
207+
except HttpResponseError as error:
208+
process_storage_error(error)
203209

204-
chunk_data = process_content(response, offset[0], offset[1], self.encryption_options)
210+
try:
211+
chunk_data = process_content(response, offset[0], offset[1], self.encryption_options)
212+
retry_active = False
213+
except (requests.exceptions.ChunkedEncodingError, requests.exceptions.ConnectionError) as error:
214+
retry_total -= 1
215+
if retry_total <= 0:
216+
raise ServiceResponseError(error, error=error)
217+
time.sleep(1)
205218

206219
# This makes sure that if_match is set so that we can validate
207220
# that subsequent downloads are to an unmodified blob
@@ -354,16 +367,6 @@ def __init__(
354367
# TODO: Set to the stored MD5 when the service returns this
355368
self.properties.content_md5 = None
356369

357-
if self.size == 0:
358-
self._current_content = b""
359-
else:
360-
self._current_content = process_content(
361-
self._response,
362-
self._initial_offset[0],
363-
self._initial_offset[1],
364-
self._encryption_options
365-
)
366-
367370
def __len__(self):
368371
return self.size
369372

@@ -376,51 +379,71 @@ def _initial_request(self):
376379
check_content_md5=self._validate_content
377380
)
378381

379-
try:
380-
location_mode, response = self._clients.blob.download(
381-
range=range_header,
382-
range_get_content_md5=range_validation,
383-
validate_content=self._validate_content,
384-
data_stream_total=None,
385-
download_stream_current=0,
386-
**self._request_options
387-
)
382+
retry_active = True
383+
retry_total = 3
384+
while retry_active:
385+
try:
386+
location_mode, response = self._clients.blob.download(
387+
range=range_header,
388+
range_get_content_md5=range_validation,
389+
validate_content=self._validate_content,
390+
data_stream_total=None,
391+
download_stream_current=0,
392+
**self._request_options
393+
)
388394

389-
# Check the location we read from to ensure we use the same one
390-
# for subsequent requests.
391-
self._location_mode = location_mode
395+
# Check the location we read from to ensure we use the same one
396+
# for subsequent requests.
397+
self._location_mode = location_mode
398+
399+
# Parse the total file size and adjust the download size if ranges
400+
# were specified
401+
self._file_size = parse_length_from_content_range(response.properties.content_range)
402+
if self._end_range is not None:
403+
# Use the end range index unless it is over the end of the file
404+
self.size = min(self._file_size, self._end_range - self._start_range + 1)
405+
elif self._start_range is not None:
406+
self.size = self._file_size - self._start_range
407+
else:
408+
self.size = self._file_size
392409

393-
# Parse the total file size and adjust the download size if ranges
394-
# were specified
395-
self._file_size = parse_length_from_content_range(response.properties.content_range)
396-
if self._end_range is not None:
397-
# Use the end range index unless it is over the end of the file
398-
self.size = min(self._file_size, self._end_range - self._start_range + 1)
399-
elif self._start_range is not None:
400-
self.size = self._file_size - self._start_range
401-
else:
402-
self.size = self._file_size
403-
404-
except HttpResponseError as error:
405-
if self._start_range is None and error.response.status_code == 416:
406-
# Get range will fail on an empty file. If the user did not
407-
# request a range, do a regular get request in order to get
408-
# any properties.
409-
try:
410-
_, response = self._clients.blob.download(
411-
validate_content=self._validate_content,
412-
data_stream_total=0,
413-
download_stream_current=0,
414-
**self._request_options
415-
)
416-
except HttpResponseError as error:
410+
except HttpResponseError as error:
411+
if self._start_range is None and error.response.status_code == 416:
412+
# Get range will fail on an empty file. If the user did not
413+
# request a range, do a regular get request in order to get
414+
# any properties.
415+
try:
416+
_, response = self._clients.blob.download(
417+
validate_content=self._validate_content,
418+
data_stream_total=0,
419+
download_stream_current=0,
420+
**self._request_options
421+
)
422+
except HttpResponseError as error:
423+
process_storage_error(error)
424+
425+
# Set the download size to empty
426+
self.size = 0
427+
self._file_size = 0
428+
else:
417429
process_storage_error(error)
418430

419-
# Set the download size to empty
420-
self.size = 0
421-
self._file_size = 0
422-
else:
423-
process_storage_error(error)
431+
try:
432+
if self.size == 0:
433+
self._current_content = b""
434+
else:
435+
self._current_content = process_content(
436+
response,
437+
self._initial_offset[0],
438+
self._initial_offset[1],
439+
self._encryption_options
440+
)
441+
retry_active = False
442+
except (requests.exceptions.ChunkedEncodingError, requests.exceptions.ConnectionError) as error:
443+
retry_total -= 1
444+
if retry_total <= 0:
445+
raise ServiceResponseError(error, error=error)
446+
time.sleep(1)
424447

425448
# get page ranges to optimize downloading sparse page blob
426449
if response.properties.blob_type == 'PageBlob':

sdk/storage/azure-storage-blob/azure/storage/blob/aio/_download_async.py

Lines changed: 77 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,10 @@
1010
from io import BytesIO
1111
from itertools import islice
1212
import warnings
13-
1413
from typing import AsyncIterator
15-
from azure.core.exceptions import HttpResponseError
14+
15+
from aiohttp import ClientPayloadError
16+
from azure.core.exceptions import HttpResponseError, ServiceResponseError
1617
from .._shared.encryption import decrypt_blob
1718
from .._shared.request_handlers import validate_and_format_range_headers
1819
from .._shared.response_handlers import process_storage_error, parse_length_from_content_range
@@ -22,10 +23,7 @@
2223
async def process_content(data, start_offset, end_offset, encryption):
2324
if data is None:
2425
raise ValueError("Response cannot be None.")
25-
try:
26-
content = data.response.body()
27-
except Exception as error:
28-
raise HttpResponseError(message="Download stream interrupted.", response=data.response, error=error)
26+
content = data.response.body()
2927
if encryption.get('key') is not None or encryption.get('resolver') is not None:
3028
try:
3129
return decrypt_blob(
@@ -91,20 +89,31 @@ async def _download_chunk(self, chunk_start, chunk_end):
9189
download_range[1],
9290
check_content_md5=self.validate_content
9391
)
94-
try:
95-
_, response = await self.client.download(
96-
range=range_header,
97-
range_get_content_md5=range_validation,
98-
validate_content=self.validate_content,
99-
data_stream_total=self.total_size,
100-
download_stream_current=self.progress_total,
101-
**self.request_options
102-
)
103-
except HttpResponseError as error:
104-
process_storage_error(error)
92+
retry_active = True
93+
retry_total = 3
94+
while retry_active:
95+
try:
96+
_, response = await self.client.download(
97+
range=range_header,
98+
range_get_content_md5=range_validation,
99+
validate_content=self.validate_content,
100+
data_stream_total=self.total_size,
101+
download_stream_current=self.progress_total,
102+
**self.request_options
103+
)
104+
retry_active = False
105+
106+
except HttpResponseError as error:
107+
process_storage_error(error)
108+
except ClientPayloadError as error:
109+
retry_total -= 1
110+
if retry_total <= 0:
111+
raise ServiceResponseError(error, error=error)
112+
await asyncio.sleep(1)
105113

106114
chunk_data = await process_content(response, offset[0], offset[1], self.encryption_options)
107115

116+
108117
# This makes sure that if_match is set so that we can validate
109118
# that subsequent downloads are to an unmodified blob
110119
if self.request_options.get('modified_access_conditions'):
@@ -277,49 +286,60 @@ async def _initial_request(self):
277286
end_range_required=False,
278287
check_content_md5=self._validate_content)
279288

280-
try:
281-
location_mode, response = await self._clients.blob.download(
282-
range=range_header,
283-
range_get_content_md5=range_validation,
284-
validate_content=self._validate_content,
285-
data_stream_total=None,
286-
download_stream_current=0,
287-
**self._request_options)
288-
289-
# Check the location we read from to ensure we use the same one
290-
# for subsequent requests.
291-
self._location_mode = location_mode
292-
293-
# Parse the total file size and adjust the download size if ranges
294-
# were specified
295-
self._file_size = parse_length_from_content_range(response.properties.content_range)
296-
if self._end_range is not None:
297-
# Use the length unless it is over the end of the file
298-
self.size = min(self._file_size, self._end_range - self._start_range + 1)
299-
elif self._start_range is not None:
300-
self.size = self._file_size - self._start_range
301-
else:
302-
self.size = self._file_size
289+
retry_active = True
290+
retry_total = 3
291+
while retry_active:
292+
try:
293+
location_mode, response = await self._clients.blob.download(
294+
range=range_header,
295+
range_get_content_md5=range_validation,
296+
validate_content=self._validate_content,
297+
data_stream_total=None,
298+
download_stream_current=0,
299+
**self._request_options)
300+
301+
# Check the location we read from to ensure we use the same one
302+
# for subsequent requests.
303+
self._location_mode = location_mode
304+
305+
# Parse the total file size and adjust the download size if ranges
306+
# were specified
307+
self._file_size = parse_length_from_content_range(response.properties.content_range)
308+
if self._end_range is not None:
309+
# Use the length unless it is over the end of the file
310+
self.size = min(self._file_size, self._end_range - self._start_range + 1)
311+
elif self._start_range is not None:
312+
self.size = self._file_size - self._start_range
313+
else:
314+
self.size = self._file_size
315+
retry_active = False
303316

304-
except HttpResponseError as error:
305-
if self._start_range is None and error.response.status_code == 416:
306-
# Get range will fail on an empty file. If the user did not
307-
# request a range, do a regular get request in order to get
308-
# any properties.
309-
try:
310-
_, response = await self._clients.blob.download(
311-
validate_content=self._validate_content,
312-
data_stream_total=0,
313-
download_stream_current=0,
314-
**self._request_options)
315-
except HttpResponseError as error:
317+
except HttpResponseError as error:
318+
if self._start_range is None and error.response.status_code == 416:
319+
# Get range will fail on an empty file. If the user did not
320+
# request a range, do a regular get request in order to get
321+
# any properties.
322+
try:
323+
_, response = await self._clients.blob.download(
324+
validate_content=self._validate_content,
325+
data_stream_total=0,
326+
download_stream_current=0,
327+
**self._request_options)
328+
retry_active = False
329+
except HttpResponseError as error:
330+
process_storage_error(error)
331+
332+
# Set the download size to empty
333+
self.size = 0
334+
self._file_size = 0
335+
else:
316336
process_storage_error(error)
317337

318-
# Set the download size to empty
319-
self.size = 0
320-
self._file_size = 0
321-
else:
322-
process_storage_error(error)
338+
except ClientPayloadError as error:
339+
retry_total -= 1
340+
if retry_total <= 0:
341+
raise ServiceResponseError(error, error=error)
342+
await asyncio.sleep(1)
323343

324344
# get page ranges to optimize downloading sparse page blob
325345
if response.properties.blob_type == 'PageBlob':

0 commit comments

Comments
 (0)