Skip to content

Commit 7a794cd

Browse files
authored
[Storage] Make uploads less memory greedy. (Azure#23061)
* don't prefetch too much data. * more scenarios. * test core converter too. * checkstyle. * more. * can do better.
1 parent a76deff commit 7a794cd

File tree

10 files changed

+64
-11
lines changed

10 files changed

+64
-11
lines changed

sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/BlobAsyncClient.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -640,7 +640,7 @@ private Mono<Response<BlockBlobItem>> uploadInChunks(BlockBlobAsyncClient blockB
640640
parallelTransferOptions.getMaxConcurrency() appends will be happening at once, so we guarantee buffering of
641641
only concurrency + 1 chunks at a time.
642642
*/
643-
return chunkedSource.flatMapSequential(stagingArea::write, 1)
643+
return chunkedSource.flatMapSequential(stagingArea::write, 1, 1)
644644
.concatWith(Flux.defer(stagingArea::flush))
645645
.flatMapSequential(bufferAggregator -> {
646646
// Report progress as necessary.
@@ -659,7 +659,7 @@ private Mono<Response<BlockBlobItem>> uploadInChunks(BlockBlobAsyncClient blockB
659659
// but we need to collect the ids.
660660
.map(x -> blockId)
661661
.flux();
662-
}, parallelTransferOptions.getMaxConcurrency())
662+
}, parallelTransferOptions.getMaxConcurrency(), 1)
663663
.collect(Collectors.toList())
664664
.flatMap(ids ->
665665
blockBlobAsyncClient.commitBlockListWithResponse(new BlockBlobCommitBlockListOptions(ids)

sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/implementation/UploadUtils.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public static <T> Mono<Response<T>> uploadFullOrChunked(final Flux<ByteBuffer> d
4949
return data
5050
.filter(ByteBuffer::hasRemaining)
5151
// The gate buffers data until threshold is breached.
52-
.concatMap(gate::write)
52+
.concatMap(gate::write, 0)
5353
// First buffer is emitted after threshold is breached or there's no more data.
5454
// Therefore we can make a decision how to upload data on first element.
5555
.switchOnFirst((signal, flux) -> {
@@ -103,7 +103,7 @@ public static Flux<ByteBuffer> chunkSource(Flux<ByteBuffer> data, ParallelTransf
103103
duplicate.limit(Math.min(duplicate.limit(), (i + 1) * chunkSize));
104104
return duplicate;
105105
});
106-
});
106+
}, 1, 1);
107107
} else {
108108
return data;
109109
}

sdk/storage/azure-storage-file-datalake/src/main/java/com/azure/storage/file/datalake/DataLakeFileAsyncClient.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -366,7 +366,7 @@ private Mono<Response<PathInfo>> uploadInChunks(Flux<ByteBuffer> data, long file
366366
parallelTransferOptions.getMaxConcurrency() appends will be happening at once, so we guarantee buffering of
367367
only concurrency + 1 chunks at a time.
368368
*/
369-
return chunkedSource.flatMapSequential(stagingArea::write, 1)
369+
return chunkedSource.flatMapSequential(stagingArea::write, 1, 1)
370370
.concatWith(Flux.defer(stagingArea::flush))
371371
/* Map the data to a tuple 3, of buffer, buffer length, buffer offset */
372372
.map(bufferAggregator -> Tuples.of(bufferAggregator, bufferAggregator.length(), 0L))
@@ -398,7 +398,7 @@ private Mono<Response<PathInfo>> uploadInChunks(Flux<ByteBuffer> data, long file
398398
requestConditions.getLeaseId())
399399
.map(resp -> offset) /* End of file after append to pass to flush. */
400400
.flux();
401-
}, parallelTransferOptions.getMaxConcurrency())
401+
}, parallelTransferOptions.getMaxConcurrency(), 1)
402402
.last()
403403
.flatMap(length -> flushWithResponse(length, false, false, httpHeaders, requestConditions));
404404
}

sdk/storage/azure-storage-file-share/src/main/java/com/azure/storage/file/share/ShareFileAsyncClient.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1546,7 +1546,7 @@ Mono<Response<ShareFileUploadInfo>> uploadInChunks(Flux<ByteBuffer> data, long o
15461546
parallelTransferOptions.getMaxConcurrency() appends will be happening at once, so we guarantee buffering of
15471547
only concurrency + 1 chunks at a time.
15481548
*/
1549-
return chunkedSource.flatMapSequential(stagingArea::write, 1)
1549+
return chunkedSource.flatMapSequential(stagingArea::write, 1, 1)
15501550
.concatWith(Flux.defer(stagingArea::flush))
15511551
.map(bufferAggregator -> Tuples.of(bufferAggregator, bufferAggregator.length(), 0L))
15521552
/* Scan reduces a flux with an accumulator while emitting the intermediate results. */
@@ -1575,7 +1575,7 @@ Mono<Response<ShareFileUploadInfo>> uploadInChunks(Flux<ByteBuffer> data, long o
15751575
return uploadRangeWithResponse(new ShareFileUploadRangeOptions(progressData, currentBufferLength)
15761576
.setOffset(currentOffset).setRequestConditions(requestConditions), context)
15771577
.flux();
1578-
}, parallelTransferOptions.getMaxConcurrency())
1578+
}, parallelTransferOptions.getMaxConcurrency(), 1)
15791579
.last();
15801580
}
15811581

sdk/storage/azure-storage-perf/memory-stress-scenarios.ps1

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,5 +19,12 @@ function Run-Scenario {
1919
}
2020
}
2121

22-
Run-Scenario "700m" "uploadoutputstream --warmup 0 --duration 1 --size 1048576000 --sync"
23-
Run-Scenario "700m" "uploadblob --warmup 0 --duration 1 --size 1048576000"
22+
$env:STORAGE_CONNECTION_STRING=$env:PRIMARY_STORAGE_CONNECTION_STRING
23+
Run-Scenario "600m" "uploadoutputstream --warmup 0 --duration 1 --size 1048576000 --sync"
24+
Run-Scenario "600m" "uploadblob --warmup 0 --duration 1 --size 1048576000"
25+
Run-Scenario "400m" "uploadblob --warmup 0 --duration 1 --size 1048576000 --sync"
26+
Run-Scenario "400m" "uploadblobnolength --warmup 0 --duration 1 --size 1048576000 --sync"
27+
28+
$env:STORAGE_CONNECTION_STRING=$env:STORAGE_DATA_LAKE_CONNECTION_STRING
29+
Run-Scenario "200m" "uploadfiledatalake --warmup 0 --duration 1 --size 1048576000 --sync"
30+
Run-Scenario "300m" "uploadfiledatalake --warmup 0 --duration 1 --size 1048576000"

sdk/storage/azure-storage-perf/src/main/java/com/azure/storage/App.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import com.azure.perf.test.core.PerfStressProgram;
77
import com.azure.storage.blob.perf.DownloadBlobTest;
88
import com.azure.storage.blob.perf.ListBlobsTest;
9+
import com.azure.storage.blob.perf.UploadBlobNoLengthTest;
910
import com.azure.storage.blob.perf.UploadBlobTest;
1011
import com.azure.storage.blob.perf.UploadBlockBlobTest;
1112
import com.azure.storage.blob.perf.UploadFromFileTest;
@@ -35,6 +36,7 @@ public static void main(String[] args) {
3536
DownloadBlobTest.class,
3637
ListBlobsTest.class,
3738
UploadBlobTest.class,
39+
UploadBlobNoLengthTest.class,
3840
UploadBlockBlobTest.class,
3941
UploadFromFileTest.class,
4042
UploadOutputStreamTest.class,
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.storage.blob.perf;
5+
6+
import com.azure.perf.test.core.PerfStressOptions;
7+
import com.azure.perf.test.core.RepeatingInputStream;
8+
import com.azure.storage.blob.options.BlobParallelUploadOptions;
9+
import com.azure.storage.blob.perf.core.BlobTestBase;
10+
import reactor.core.publisher.Flux;
11+
import reactor.core.publisher.Mono;
12+
13+
import java.nio.ByteBuffer;
14+
15+
import static com.azure.perf.test.core.TestDataCreationHelper.createRandomByteBufferFlux;
16+
import static com.azure.perf.test.core.TestDataCreationHelper.createRandomInputStream;
17+
18+
public class UploadBlobNoLengthTest extends BlobTestBase<PerfStressOptions> {
19+
protected final RepeatingInputStream inputStream;
20+
protected final Flux<ByteBuffer> byteBufferFlux;
21+
22+
public UploadBlobNoLengthTest(PerfStressOptions options) {
23+
super(options);
24+
inputStream = (RepeatingInputStream) createRandomInputStream(options.getSize());
25+
inputStream.mark(Integer.MAX_VALUE);
26+
byteBufferFlux = createRandomByteBufferFlux(options.getSize());
27+
}
28+
29+
@Override
30+
public void run() {
31+
inputStream.reset();
32+
// This one uses Core's stream->flux converter
33+
blobClient.uploadWithResponse(new BlobParallelUploadOptions(inputStream), null, null);
34+
}
35+
36+
@Override
37+
public Mono<Void> runAsync() {
38+
throw new UnsupportedOperationException();
39+
}
40+
}

sdk/storage/azure-storage-perf/src/main/java/com/azure/storage/blob/perf/UploadBlobTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ public UploadBlobTest(PerfStressOptions options) {
2828
@Override
2929
public void run() {
3030
inputStream.reset();
31+
// This one uses Storage's stream->flux converter
3132
blobClient.upload(inputStream, options.getSize(), true);
3233
}
3334

sdk/storage/test-resources.json

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -601,6 +601,10 @@
601601
"type": "string",
602602
"value": "[listKeys(resourceId('Microsoft.Storage/storageAccounts', variables('dataLakeAccountName')), variables('storageApiVersion')).keys[0].value]"
603603
},
604+
"STORAGE_DATA_LAKE_CONNECTION_STRING": {
605+
"type": "string",
606+
"value": "[concat('DefaultEndpointsProtocol=https;AccountName=', variables('dataLakeAccountName'), ';AccountKey=', listKeys(resourceId('Microsoft.Storage/storageAccounts', variables('dataLakeAccountName')), variables('storageApiVersion')).keys[0].value, ';EndpointSuffix=', parameters('endpointSuffix'))]"
607+
},
604608
"STORAGE_DATA_LAKE_SOFT_DELETE_ACCOUNT_NAME": {
605609
"type": "string",
606610
"value": "[variables('dataLakeSoftDeleteAccountName')]"

sdk/storage/tests.yml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,6 @@ stages:
7272
JAVA_HOME: $(JAVA_HOME_11_X64)
7373
${{ if eq(variables['JavaTestVersion'], '1.8') }}:
7474
JAVA_HOME: $(JAVA_HOME_8_X64)
75-
STORAGE_CONNECTION_STRING: $(PRIMARY_STORAGE_CONNECTION_STRING)
7675
- pwsh: |
7776
New-Item $(Build.ArtifactStagingDirectory)/test-logs -ItemType directory
7877
Copy-Item sdk/storage/azure-storage-blob/target/test.log $(Build.ArtifactStagingDirectory)/test-logs/azure-storage-blob-test.log -ErrorAction SilentlyContinue

0 commit comments

Comments
 (0)