Skip to content

Commit 1daae94

Browse files
Buffered upload no longer requires length in sync client (Azure#22218)
* BlobParallelUploadOptions takes -1 length * javadoc syntax * ported converter changes to datalake * ported converter changes to file share * deprecated old constructors * test retry on new inputstream converter * poke ci * redid lengthless buffered upload API * fixed bad arg check Co-authored-by: jschrepp-MSFT <41338290+jschrepp-MSFT@users.noreply.github.com>
1 parent d7d096d commit 1daae94

File tree

31 files changed

+2024
-27
lines changed

31 files changed

+2024
-27
lines changed

sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/BlobAsyncClient.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -568,11 +568,20 @@ buffers is not a common scenario for async like it is in sync (and we already bu
568568
(stream, length) -> uploadFullBlob(blockBlobAsyncClient, stream, length, parallelTransferOptions,
569569
headers, metadata, tags, tier, requestConditions, computeMd5, immutabilityPolicy, legalHold);
570570

571-
Flux<ByteBuffer> data = options.getDataFlux() == null ? Utility.convertStreamToByteBuffer(
572-
options.getDataStream(), options.getLength(),
571+
Flux<ByteBuffer> data = options.getDataFlux();
572+
// no specified length: use azure.core's converter
573+
if (data == null && options.getOptionalLength() == null) {
573574
// We can only buffer up to max int due to restrictions in ByteBuffer.
574-
(int) Math.min(Integer.MAX_VALUE, parallelTransferOptions.getBlockSizeLong()), false)
575-
: options.getDataFlux();
575+
int chunkSize = (int) Math.min(Integer.MAX_VALUE, parallelTransferOptions.getBlockSizeLong());
576+
data = FluxUtil.toFluxByteBuffer(options.getDataStream(), chunkSize);
577+
// specified length (legacy requirement): use custom converter. no marking because we buffer anyway.
578+
} else if (data == null) {
579+
// We can only buffer up to max int due to restrictions in ByteBuffer.
580+
int chunkSize = (int) Math.min(Integer.MAX_VALUE, parallelTransferOptions.getBlockSizeLong());
581+
data = Utility.convertStreamToByteBuffer(
582+
options.getDataStream(), options.getOptionalLength(), chunkSize, false);
583+
}
584+
576585
return UploadUtils.uploadFullOrChunked(data, ModelHelper.wrapBlobOptions(parallelTransferOptions),
577586
uploadInChunksFunction, uploadFullBlobFunction);
578587
} catch (RuntimeException ex) {

sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/options/BlobParallelUploadOptions.java

Lines changed: 46 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
public class BlobParallelUploadOptions {
2828
private final Flux<ByteBuffer> dataFlux;
2929
private final InputStream dataStream;
30-
private final long length;
30+
private final Long length;
3131
private ParallelTransferOptions parallelTransferOptions;
3232
private BlobHttpHeaders headers;
3333
private Map<String, String> metadata;
@@ -40,7 +40,7 @@ public class BlobParallelUploadOptions {
4040
private Boolean legalHold;
4141

4242
/**
43-
* Constructs a new {@code BlobParallelUploadOptions}.
43+
* Constructs a new {@link BlobParallelUploadOptions}.
4444
*
4545
* @param dataFlux The data to write to the blob. Unlike other upload methods, this method does not require that
4646
* the {@code Flux} be replayable. In other words, it does not have to support multiple subscribers and is not
@@ -50,25 +50,48 @@ public BlobParallelUploadOptions(Flux<ByteBuffer> dataFlux) {
5050
StorageImplUtils.assertNotNull("dataFlux", dataFlux);
5151
this.dataFlux = dataFlux;
5252
this.dataStream = null;
53-
this.length = -1;
53+
this.length = null;
5454
}
5555

5656
/**
57-
* Constructs a new {@code BlobParalleUploadOptions}.
57+
* Constructs a new {@link BlobParallelUploadOptions}.
58+
*
59+
* Use {@link #BlobParallelUploadOptions(InputStream)} instead to supply an InputStream without knowing the exact
60+
* length beforehand.
5861
*
59-
* @param dataStream The data to write to the blob. The data must be markable. This is in order to support retries.
60-
* If the data is not markable, consider opening a {@link com.azure.storage.blob.specialized.BlobOutputStream} and
61-
* writing to the returned stream. Alternatively, consider wrapping your data source in a
62-
* {@link java.io.BufferedInputStream} to add mark support.
62+
* @param dataStream The data to write to the blob.
6363
* @param length The exact length of the data. It is important that this value match precisely the length of the
6464
* data provided in the {@link InputStream}.
65+
* @deprecated length is no longer necessary; use {@link #BlobParallelUploadOptions(InputStream)} instead.
6566
*/
67+
@Deprecated
6668
public BlobParallelUploadOptions(InputStream dataStream, long length) {
69+
this(dataStream, Long.valueOf(length));
70+
}
71+
72+
/**
73+
* Constructs a new {@link BlobParallelUploadOptions}.
74+
*
75+
* @param dataStream The data to write to the blob.
76+
*/
77+
public BlobParallelUploadOptions(InputStream dataStream) {
78+
this(dataStream, null);
79+
}
80+
81+
/**
82+
* Common constructor for building options from InputStream.
83+
*
84+
* @param dataStream The data to write to the blob.
85+
* @param length Optional known length of the data, affects reactive behavior for backwards compatibility.
86+
*/
87+
private BlobParallelUploadOptions(InputStream dataStream, Long length) {
6788
StorageImplUtils.assertNotNull("dataStream", dataStream);
68-
StorageImplUtils.assertInBounds("length", length, 0, Long.MAX_VALUE);
89+
if (length != null) {
90+
StorageImplUtils.assertInBounds("length", length, 0, Long.MAX_VALUE);
91+
}
6992
this.dataStream = dataStream;
70-
this.length = length;
7193
this.dataFlux = null;
94+
this.length = length;
7295
}
7396

7497
/**
@@ -80,7 +103,7 @@ public BlobParallelUploadOptions(BinaryData data) {
80103
StorageImplUtils.assertNotNull("data", data);
81104
this.dataFlux = Flux.just(data.toByteBuffer());
82105
this.dataStream = null;
83-
this.length = -1;
106+
this.length = null;
84107
}
85108

86109
/**
@@ -106,11 +129,23 @@ public InputStream getDataStream() {
106129
*
107130
* @return The exact length of the data. It is important that this value match precisely the length of the
108131
* data provided in the {@link InputStream}.
132+
* @deprecated use {@link #getOptionalLength()} to have safe access to a length that will not always exist.
109133
*/
134+
@Deprecated
110135
public long getLength() {
111136
return length;
112137
}
113138

139+
/**
140+
* Gets the length of the data.
141+
*
142+
* @return The exact length of the data. It is important that this value match precisely the length of the
143+
* data provided in the {@link InputStream}.
144+
*/
145+
public Long getOptionalLength() {
146+
return length;
147+
}
148+
114149
/**
115150
* Gets the {@link ParallelTransferOptions}.
116151
*

sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/BlobAPITest.groovy

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,45 @@ class BlobAPITest extends APISpec {
216216
.getValue().getETag() != null
217217
}
218218

219+
def "Upload InputStream no length"() {
220+
when:
221+
bc.uploadWithResponse(new BlobParallelUploadOptions(data.defaultInputStream), null, null)
222+
223+
then:
224+
notThrown(Exception)
225+
bc.downloadContent().toBytes() == data.defaultBytes
226+
}
227+
228+
def "Upload InputStream bad length"() {
229+
when:
230+
bc.uploadWithResponse(new BlobParallelUploadOptions(data.defaultInputStream, length), null, null)
231+
232+
then:
233+
thrown(Exception)
234+
235+
where:
236+
_ | length
237+
_ | 0
238+
_ | -100
239+
_ | data.defaultDataSize - 1
240+
_ | data.defaultDataSize + 1
241+
}
242+
243+
def "Upload successful retry"() {
244+
given:
245+
def clientWithFailure = getBlobClient(
246+
env.primaryAccount.credential,
247+
bc.getBlobUrl(),
248+
new TransientFailureInjectingHttpPipelinePolicy())
249+
250+
when:
251+
clientWithFailure.uploadWithResponse(new BlobParallelUploadOptions(data.defaultInputStream), null, null)
252+
253+
then:
254+
notThrown(Exception)
255+
bc.downloadContent().toBytes() == data.defaultBytes
256+
}
257+
219258
@LiveOnly
220259
// Reading from recordings will not allow for the timing of the test to work correctly.
221260
def "Upload timeout"() {
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
{
2+
"networkCallRecords" : [ {
3+
"Method" : "PUT",
4+
"Uri" : "https://REDACTED.blob.core.windows.net/7f5018f307f5018f3aec15131ec2a5fb9dde640e685e?restype=container",
5+
"Headers" : {
6+
"x-ms-version" : "2020-08-04",
7+
"User-Agent" : "azsdk-java-azure-storage-blob/12.13.0-beta.1 (11.0.9; Windows 10; 10.0)",
8+
"x-ms-client-request-id" : "a3d9e138-90e1-4885-8cd7-544bce3e1fca"
9+
},
10+
"Response" : {
11+
"Transfer-Encoding" : "chunked",
12+
"x-ms-version" : "2020-08-04",
13+
"Server" : "Windows-Azure-Blob/1.0 Microsoft-HTTPAPI/2.0",
14+
"eTag" : "0x8D92C58EF0BCAB0",
15+
"Last-Modified" : "Thu, 10 Jun 2021 21:44:32 GMT",
16+
"retry-after" : "0",
17+
"StatusCode" : "201",
18+
"x-ms-request-id" : "69ef6576-b01e-0019-0641-5e7a16000000",
19+
"x-ms-client-request-id" : "a3d9e138-90e1-4885-8cd7-544bce3e1fca",
20+
"Date" : "Thu, 10 Jun 2021 21:44:32 GMT"
21+
},
22+
"Exception" : null
23+
}, {
24+
"Method" : "PUT",
25+
"Uri" : "https://REDACTED.blob.core.windows.net/7f5018f307f5018f3aec15131ec2a5fb9dde640e685e/7f5018f317f5018f3aec68648158188ad9aa3434bb9a",
26+
"Headers" : {
27+
"x-ms-version" : "2020-08-04",
28+
"User-Agent" : "azsdk-java-azure-storage-blob/12.13.0-beta.1 (11.0.9; Windows 10; 10.0)",
29+
"x-ms-client-request-id" : "9fe09bbf-3b98-4f30-b335-6ae82cb45011",
30+
"Content-Type" : "application/octet-stream"
31+
},
32+
"Response" : {
33+
"Transfer-Encoding" : "chunked",
34+
"x-ms-version" : "2020-08-04",
35+
"Server" : "Windows-Azure-Blob/1.0 Microsoft-HTTPAPI/2.0",
36+
"x-ms-content-crc64" : "6RYQPwaVsyQ=",
37+
"Last-Modified" : "Thu, 10 Jun 2021 21:44:33 GMT",
38+
"retry-after" : "0",
39+
"StatusCode" : "201",
40+
"x-ms-request-server-encrypted" : "true",
41+
"Date" : "Thu, 10 Jun 2021 21:44:32 GMT",
42+
"Content-MD5" : "wh+Wm18D0z1D4E+PE252gg==",
43+
"eTag" : "0x8D92C58EF38659F",
44+
"x-ms-request-id" : "69ef6580-b01e-0019-0b41-5e7a16000000",
45+
"x-ms-client-request-id" : "9fe09bbf-3b98-4f30-b335-6ae82cb45011"
46+
},
47+
"Exception" : null
48+
} ],
49+
"variables" : [ "7f5018f307f5018f3aec15131ec2a5fb9dde640e685e", "7f5018f317f5018f3aec68648158188ad9aa3434bb9a" ]
50+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
{
2+
"networkCallRecords" : [ {
3+
"Method" : "PUT",
4+
"Uri" : "https://REDACTED.blob.core.windows.net/664b29b20664b29b2b8a39735445a8518b952493b840?restype=container",
5+
"Headers" : {
6+
"x-ms-version" : "2020-08-04",
7+
"User-Agent" : "azsdk-java-azure-storage-blob/12.13.0-beta.1 (11.0.9; Windows 10; 10.0)",
8+
"x-ms-client-request-id" : "0a78fd58-091c-48dd-8e06-460e6d3b7b2d"
9+
},
10+
"Response" : {
11+
"Transfer-Encoding" : "chunked",
12+
"x-ms-version" : "2020-08-04",
13+
"Server" : "Windows-Azure-Blob/1.0 Microsoft-HTTPAPI/2.0",
14+
"eTag" : "0x8D92C58EF7CCB72",
15+
"Last-Modified" : "Thu, 10 Jun 2021 21:44:33 GMT",
16+
"retry-after" : "0",
17+
"StatusCode" : "201",
18+
"x-ms-request-id" : "69ef6587-b01e-0019-1241-5e7a16000000",
19+
"x-ms-client-request-id" : "0a78fd58-091c-48dd-8e06-460e6d3b7b2d",
20+
"Date" : "Thu, 10 Jun 2021 21:44:32 GMT"
21+
},
22+
"Exception" : null
23+
}, {
24+
"Method" : "PUT",
25+
"Uri" : "https://REDACTED.blob.core.windows.net/664b29b20664b29b2b8a39735445a8518b952493b840/664b29b21664b29b2b8a255541c44a05dfa854d5d864",
26+
"Headers" : {
27+
"x-ms-version" : "2020-08-04",
28+
"User-Agent" : "azsdk-java-azure-storage-blob/12.13.0-beta.1 (11.0.9; Windows 10; 10.0)",
29+
"x-ms-client-request-id" : "8f4270ac-e7b8-4aba-8fc1-067461d7a2f2",
30+
"Content-Type" : "application/octet-stream"
31+
},
32+
"Response" : {
33+
"Transfer-Encoding" : "chunked",
34+
"x-ms-version" : "2020-08-04",
35+
"Server" : "Windows-Azure-Blob/1.0 Microsoft-HTTPAPI/2.0",
36+
"x-ms-content-crc64" : "6RYQPwaVsyQ=",
37+
"Last-Modified" : "Thu, 10 Jun 2021 21:44:33 GMT",
38+
"retry-after" : "0",
39+
"StatusCode" : "201",
40+
"x-ms-request-server-encrypted" : "true",
41+
"Date" : "Thu, 10 Jun 2021 21:44:33 GMT",
42+
"Content-MD5" : "wh+Wm18D0z1D4E+PE252gg==",
43+
"eTag" : "0x8D92C58EF8BA9BB",
44+
"x-ms-request-id" : "69ef658a-b01e-0019-1441-5e7a16000000",
45+
"x-ms-client-request-id" : "8f4270ac-e7b8-4aba-8fc1-067461d7a2f2"
46+
},
47+
"Exception" : null
48+
} ],
49+
"variables" : [ "664b29b20664b29b2b8a39735445a8518b952493b840", "664b29b21664b29b2b8a255541c44a05dfa854d5d864" ]
50+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
{
2+
"networkCallRecords" : [ {
3+
"Method" : "PUT",
4+
"Uri" : "https://REDACTED.blob.core.windows.net/4d667a7104d667a71bea6591957c250d6618b480daf9?restype=container",
5+
"Headers" : {
6+
"x-ms-version" : "2020-08-04",
7+
"User-Agent" : "azsdk-java-azure-storage-blob/12.13.0-beta.1 (11.0.9; Windows 10; 10.0)",
8+
"x-ms-client-request-id" : "c79f5d3b-8274-4c59-ada0-c8d13da599cf"
9+
},
10+
"Response" : {
11+
"Transfer-Encoding" : "chunked",
12+
"x-ms-version" : "2020-08-04",
13+
"Server" : "Windows-Azure-Blob/1.0 Microsoft-HTTPAPI/2.0",
14+
"eTag" : "0x8D92C58EFB6BEAF",
15+
"Last-Modified" : "Thu, 10 Jun 2021 21:44:34 GMT",
16+
"retry-after" : "0",
17+
"StatusCode" : "201",
18+
"x-ms-request-id" : "69ef6590-b01e-0019-1a41-5e7a16000000",
19+
"x-ms-client-request-id" : "c79f5d3b-8274-4c59-ada0-c8d13da599cf",
20+
"Date" : "Thu, 10 Jun 2021 21:44:33 GMT"
21+
},
22+
"Exception" : null
23+
}, {
24+
"Method" : "PUT",
25+
"Uri" : "https://REDACTED.blob.core.windows.net/4d667a7104d667a71bea6591957c250d6618b480daf9/4d667a7114d667a71bea705084c8cea16df3a446680d",
26+
"Headers" : {
27+
"x-ms-version" : "2020-08-04",
28+
"User-Agent" : "azsdk-java-azure-storage-blob/12.13.0-beta.1 (11.0.9; Windows 10; 10.0)",
29+
"x-ms-client-request-id" : "4054dcf0-fb43-40f1-8f39-d47e1a4e5500",
30+
"Content-Type" : "application/octet-stream"
31+
},
32+
"Response" : {
33+
"Transfer-Encoding" : "chunked",
34+
"x-ms-version" : "2020-08-04",
35+
"Server" : "Windows-Azure-Blob/1.0 Microsoft-HTTPAPI/2.0",
36+
"x-ms-content-crc64" : "6RYQPwaVsyQ=",
37+
"Last-Modified" : "Thu, 10 Jun 2021 21:44:34 GMT",
38+
"retry-after" : "0",
39+
"StatusCode" : "201",
40+
"x-ms-request-server-encrypted" : "true",
41+
"Date" : "Thu, 10 Jun 2021 21:44:33 GMT",
42+
"Content-MD5" : "wh+Wm18D0z1D4E+PE252gg==",
43+
"eTag" : "0x8D92C58EFC500F4",
44+
"x-ms-request-id" : "69ef6593-b01e-0019-1c41-5e7a16000000",
45+
"x-ms-client-request-id" : "4054dcf0-fb43-40f1-8f39-d47e1a4e5500"
46+
},
47+
"Exception" : null
48+
} ],
49+
"variables" : [ "4d667a7104d667a71bea6591957c250d6618b480daf9", "4d667a7114d667a71bea705084c8cea16df3a446680d" ]
50+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
{
2+
"networkCallRecords" : [ {
3+
"Method" : "PUT",
4+
"Uri" : "https://REDACTED.blob.core.windows.net/547d4b300547d4b30f11300636363b37d2e8646149bf?restype=container",
5+
"Headers" : {
6+
"x-ms-version" : "2020-08-04",
7+
"User-Agent" : "azsdk-java-azure-storage-blob/12.13.0-beta.1 (11.0.9; Windows 10; 10.0)",
8+
"x-ms-client-request-id" : "d7c66faa-0162-4603-98c4-6fa0af046d3d"
9+
},
10+
"Response" : {
11+
"Transfer-Encoding" : "chunked",
12+
"x-ms-version" : "2020-08-04",
13+
"Server" : "Windows-Azure-Blob/1.0 Microsoft-HTTPAPI/2.0",
14+
"eTag" : "0x8D92C58EFEF5292",
15+
"Last-Modified" : "Thu, 10 Jun 2021 21:44:34 GMT",
16+
"retry-after" : "0",
17+
"StatusCode" : "201",
18+
"x-ms-request-id" : "69ef659a-b01e-0019-2241-5e7a16000000",
19+
"x-ms-client-request-id" : "d7c66faa-0162-4603-98c4-6fa0af046d3d",
20+
"Date" : "Thu, 10 Jun 2021 21:44:33 GMT"
21+
},
22+
"Exception" : null
23+
}, {
24+
"Method" : "PUT",
25+
"Uri" : "https://REDACTED.blob.core.windows.net/547d4b300547d4b30f11300636363b37d2e8646149bf/547d4b301547d4b30f118395439a3b957136742218ef",
26+
"Headers" : {
27+
"x-ms-version" : "2020-08-04",
28+
"User-Agent" : "azsdk-java-azure-storage-blob/12.13.0-beta.1 (11.0.9; Windows 10; 10.0)",
29+
"x-ms-client-request-id" : "96a79c9b-ca06-4499-9825-9a7d0ba61b79",
30+
"Content-Type" : "application/octet-stream"
31+
},
32+
"Response" : {
33+
"Transfer-Encoding" : "chunked",
34+
"x-ms-version" : "2020-08-04",
35+
"Server" : "Windows-Azure-Blob/1.0 Microsoft-HTTPAPI/2.0",
36+
"x-ms-content-crc64" : "6RYQPwaVsyQ=",
37+
"Last-Modified" : "Thu, 10 Jun 2021 21:44:34 GMT",
38+
"retry-after" : "0",
39+
"StatusCode" : "201",
40+
"x-ms-request-server-encrypted" : "true",
41+
"Date" : "Thu, 10 Jun 2021 21:44:33 GMT",
42+
"Content-MD5" : "wh+Wm18D0z1D4E+PE252gg==",
43+
"eTag" : "0x8D92C58EFFD6DF1",
44+
"x-ms-request-id" : "69ef659c-b01e-0019-2341-5e7a16000000",
45+
"x-ms-client-request-id" : "96a79c9b-ca06-4499-9825-9a7d0ba61b79"
46+
},
47+
"Exception" : null
48+
} ],
49+
"variables" : [ "547d4b300547d4b30f11300636363b37d2e8646149bf", "547d4b301547d4b30f118395439a3b957136742218ef" ]
50+
}

0 commit comments

Comments
 (0)