Skip to content

Commit a094170

Browse files
authored
Adding new overload with length for BlobAsyncClient.upload() (Azure#30672)
1 parent d41e9d4 commit a094170

File tree

3 files changed

+139
-1
lines changed

3 files changed

+139
-1
lines changed

sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/options/BlobParallelUploadOptions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ public BlobParallelUploadOptions(BinaryData data) {
105105
StorageImplUtils.assertNotNull("data", data);
106106
this.dataFlux = Flux.just(data.toByteBuffer());
107107
this.dataStream = null;
108-
this.length = null;
108+
this.length = data.getLength();
109109
}
110110

111111
/**
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.storage.blob;
5+
6+
import com.azure.core.util.BinaryData;
7+
import com.azure.storage.blob.models.ParallelTransferOptions;
8+
import com.azure.storage.blob.options.BlobParallelUploadOptions;
9+
import com.azure.storage.blob.specialized.BlockBlobAsyncClient;
10+
import com.azure.storage.common.StorageSharedKeyCredential;
11+
import reactor.core.publisher.Flux;
12+
13+
import java.io.IOException;
14+
import java.nio.ByteBuffer;
15+
import java.nio.charset.Charset;
16+
import java.util.Locale;
17+
18+
/**
19+
* This example shows how to use the buffered upload method on BlockBlobAsyncClient with a known length.
20+
*
21+
* Note that the use of .block() in the method is only used to enable the sample to run effectively in isolation. It is
22+
* not recommended for use in async environments.
23+
*/
24+
public class BufferedUploadWithKnownLengthExample {
25+
/**
26+
* Entry point into the basic examples for Storage blobs.
27+
* @param args Unused. Arguments to the program.
28+
* @throws IOException If an I/O error occurs
29+
* @throws RuntimeException If the downloaded data doesn't match the uploaded data
30+
*/
31+
public static void main(String[] args) throws IOException {
32+
33+
/*
34+
* For more information on this setup, please refer to the BasicExample.
35+
*/
36+
String accountName = SampleHelper.getAccountName();
37+
String accountKey = SampleHelper.getAccountKey();
38+
StorageSharedKeyCredential credential = new StorageSharedKeyCredential(accountName, accountKey);
39+
String endpoint = String.format(Locale.ROOT, "https://%s.blob.core.windows.net", accountName);
40+
String containerName = "myjavacontainerbuffereduploadlength" + System.currentTimeMillis();
41+
BlobServiceAsyncClient storageClient = new BlobServiceClientBuilder().endpoint(endpoint).credential(credential)
42+
.buildAsyncClient();
43+
44+
BlobContainerAsyncClient containerClient = storageClient.getBlobContainerAsyncClient(containerName);
45+
containerClient.create().block();
46+
47+
uploadSourceBlob(endpoint, credential, containerName);
48+
BlobAsyncClient blobClient = containerClient.getBlobAsyncClient("HelloWorld.txt");
49+
50+
51+
/*
52+
sourceData has a network stream as its source and therefore likely does not support multiple subscribers. Even
53+
if it did support multiple subscribers, it would not produce the same data each time it was subscribed to. While
54+
we could inspect the http headers for the content-length, let us suppose that this information is unavailable
55+
at this time. All three of these factors would individually make the use of the standard upload method
56+
impossible--the first two because retries would not work and the third one because we could not satisfy the
57+
argument list.
58+
*/
59+
Flux<ByteBuffer> sourceData = getSourceBlobClient(endpoint, credential, containerName).downloadStream()
60+
// Perform transformation with length of 1 GB.
61+
.map(BufferedUploadWithKnownLengthExample::bufferTransformation);
62+
63+
/*
64+
Although this upload overload permits the use of such unreliable data sources, with known length we can speed
65+
up the upload process. A buffer size and maximum concurrency can still be passed in to achieve optimized upload.
66+
*/
67+
long length = 10;
68+
long blockSize = 10 * 1024;
69+
int maxConcurrency = 5;
70+
ParallelTransferOptions parallelTransferOptions = new ParallelTransferOptions()
71+
.setBlockSizeLong(blockSize)
72+
.setMaxConcurrency(maxConcurrency);
73+
74+
// Since we already know the size of our buffered bytes, we can pass the ByteBuffer and length to the BinaryData.
75+
// This will internally convert the BinaryData to a Flux<ByteBuffer>, but with known length we can optimize the
76+
// upload speed.
77+
// Need to use BinaryData.fromFlux(Flux<ByteBuffer>, Long, Boolean) with bufferContent set to false, this allows
78+
// us to configure the BinaryData to have a specified length set without the BinaryData being infinitely
79+
// subscribed to the Flux<ByteBuffer>.
80+
BinaryData.fromFlux(sourceData, length, false).flatMap(binaryData ->
81+
blobClient.uploadWithResponse(new BlobParallelUploadOptions(binaryData)
82+
.setParallelTransferOptions(parallelTransferOptions)));
83+
}
84+
85+
@SuppressWarnings("cast")
86+
private static ByteBuffer bufferTransformation(ByteBuffer buffer) {
87+
// The JDK changed the return type of ByteBuffer#limit between 8 and 9. In 8 and below it returns Buffer, whereas
88+
// in JDK 9 and later, it returns ByteBuffer. To compile on both, we explicitly cast the returned value to
89+
// ByteBuffer.
90+
// See https://bugs-stage.openjdk.java.net/browse/JDK-8062376
91+
int length = 10;
92+
return (ByteBuffer) buffer.limit(length);
93+
}
94+
95+
private static void uploadSourceBlob(String endpoint, StorageSharedKeyCredential credential, String containerName) {
96+
getSourceBlobClient(endpoint, credential, containerName)
97+
.upload(Flux.just(ByteBuffer.wrap("Hello world".getBytes(Charset.defaultCharset()))), "Hello world".length()).block();
98+
}
99+
100+
private static BlockBlobAsyncClient getSourceBlobClient(String endpoint, StorageSharedKeyCredential credential,
101+
String containerName) {
102+
return new BlobServiceClientBuilder().endpoint(endpoint).credential(credential).buildAsyncClient()
103+
.getBlobContainerAsyncClient(containerName).getBlobAsyncClient("sourceBlob").getBlockBlobAsyncClient();
104+
}
105+
}

sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/specialized/BlockBlobAPITest.groovy

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2042,6 +2042,28 @@ class BlockBlobAPITest extends APISpec {
20422042
100 | 50 | 20 || 5 // Test that blockSize is respected
20432043
}
20442044

2045+
@Unroll
2046+
@LiveOnly
2047+
def "Buffered upload with length"() {
2048+
setup:
2049+
def data = Flux.just(getRandomData(dataSize))
2050+
def binaryData = BinaryData.fromFlux(data, dataSize).block()
2051+
def parallelUploadOptions = new BlobParallelUploadOptions(binaryData)
2052+
.setParallelTransferOptions(new ParallelTransferOptions().setBlockSizeLong(blockSize).setMaxSingleUploadSizeLong(singleUploadSize))
2053+
2054+
when:
2055+
blobAsyncClient.uploadWithResponse(parallelUploadOptions).block()
2056+
2057+
then:
2058+
blobAsyncClient.getBlockBlobAsyncClient()
2059+
.listBlocks(BlockListType.COMMITTED).block().getCommittedBlocks().size() == expectedBlockCount
2060+
2061+
where:
2062+
dataSize | singleUploadSize | blockSize || expectedBlockCount
2063+
100 | 100 | null || 0 // Test that singleUploadSize is respected
2064+
100 | 50 | 20 || 5 // Test that blockSize is respected
2065+
}
2066+
20452067
// Only run these tests in live mode as they use variables that can't be captured.
20462068
@Unroll
20472069
@LiveOnly
@@ -2250,6 +2272,17 @@ class BlockBlobAPITest extends APISpec {
22502272
smallFile.delete()
22512273
}
22522274

2275+
@LiveOnly
2276+
def "Buffered upload with specified length"() {
2277+
setup:
2278+
def fluxData = Flux.just(getRandomData(data.getDefaultDataSizeLong() as int))
2279+
def binaryData = BinaryData.fromFlux(fluxData, data.getDefaultDataSizeLong()).block()
2280+
def parallelUploadOptions = new BlobParallelUploadOptions(binaryData)
2281+
expect:
2282+
StepVerifier.create(blobAsyncClient.uploadWithResponse(parallelUploadOptions))
2283+
.assertNext({ assert it.getValue().getETag() != null }).verifyComplete()
2284+
}
2285+
22532286
@LiveOnly
22542287
def "Buffered upload overwrite"() {
22552288
when:

0 commit comments

Comments
 (0)