Skip to content

Commit 1f7ea0d

Browse files
authored
Fixed concurrency issue in buffered upload that caused large files to… (Azure#20964)
1 parent 9af2f3f commit 1f7ea0d

File tree

5 files changed

+25
-110
lines changed

5 files changed

+25
-110
lines changed

sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/BlobAsyncClient.java

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
import com.azure.storage.common.Utility;
3434
import com.azure.storage.common.implementation.Constants;
3535
import com.azure.storage.common.implementation.StorageImplUtils;
36-
import com.azure.storage.common.implementation.UploadBufferPool;
36+
import com.azure.storage.common.implementation.BufferStagingArea;
3737
import com.azure.storage.common.implementation.UploadUtils;
3838
import reactor.core.publisher.Flux;
3939
import reactor.core.publisher.Mono;
@@ -573,21 +573,19 @@ private Mono<Response<BlockBlobItem>> uploadInChunks(BlockBlobAsyncClient blockB
573573
Lock progressLock = new ReentrantLock();
574574

575575
// Validation done in the constructor.
576-
/*
577-
We use maxConcurrency + 1 for the number of buffers because one buffer will typically be being filled while the
578-
others are being sent.
579-
*/
580-
UploadBufferPool pool = new UploadBufferPool(parallelTransferOptions.getMaxConcurrency() + 1,
581-
parallelTransferOptions.getBlockSizeLong(), BlockBlobClient.MAX_STAGE_BLOCK_BYTES_LONG);
576+
BufferStagingArea pool = new BufferStagingArea(parallelTransferOptions.getBlockSizeLong(),
577+
BlockBlobClient.MAX_STAGE_BLOCK_BYTES_LONG);
582578

583579
Flux<ByteBuffer> chunkedSource = UploadUtils.chunkSource(data,
584580
ModelHelper.wrapBlobOptions(parallelTransferOptions));
585581

586582
/*
587583
Write to the pool and upload the output.
584+
maxConcurrency = 1 when writing means only 1 BufferAggregator will be accumulating at a time.
585+
parallelTransferOptions.getMaxConcurrency() appends will be happening at once, so we guarantee buffering of
586+
only concurrency + 1 chunks at a time.
588587
*/
589-
return chunkedSource.concatMap(pool::write)
590-
.limitRate(parallelTransferOptions.getMaxConcurrency()) // This guarantees that concatMap will only buffer maxConcurrency * chunkSize data
588+
return chunkedSource.flatMapSequential(pool::write, 1)
591589
.concatWith(Flux.defer(pool::flush))
592590
.flatMapSequential(bufferAggregator -> {
593591
// Report progress as necessary.
@@ -605,7 +603,6 @@ private Mono<Response<BlockBlobItem>> uploadInChunks(BlockBlobAsyncClient blockB
605603
// We only care about the stageBlock insofar as it was successful,
606604
// but we need to collect the ids.
607605
.map(x -> blockId)
608-
.doFinally(x -> pool.returnBuffer(bufferAggregator))
609606
.flux();
610607
}, parallelTransferOptions.getMaxConcurrency())
611608
.collect(Collectors.toList())

sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/specialized/BlockBlobAPITest.groovy

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1230,7 +1230,6 @@ class BlockBlobAPITest extends APISpec {
12301230
// Only run these tests in live mode as they use variables that can't be captured.
12311231
@Unroll
12321232
@Requires({ liveMode() })
1233-
@Ignore("Timeouts")
12341233
def "Async buffered upload"() {
12351234
setup:
12361235
def blobAsyncClient = getPrimaryServiceClientForWrites(bufferSize)
@@ -1379,7 +1378,6 @@ class BlockBlobAPITest extends APISpec {
13791378
// Only run these tests in live mode as they use variables that can't be captured.
13801379
@Unroll
13811380
@Requires({ liveMode() })
1382-
@Ignore("Timeouts")
13831381
def "Buffered upload chunked source"() {
13841382
/*
13851383
This test should validate that the upload should work regardless of what format the passed data is in because

sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/implementation/UploadBufferPool.java renamed to sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/implementation/BufferStagingArea.java

Lines changed: 6 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,9 @@
33

44
package com.azure.storage.common.implementation;
55

6-
import com.azure.core.util.logging.ClientLogger;
76
import reactor.core.publisher.Flux;
87

98
import java.nio.ByteBuffer;
10-
import java.util.concurrent.BlockingQueue;
11-
import java.util.concurrent.LinkedBlockingQueue;
129

1310
/**
1411
* This type is to support the implementation of buffered upload only. It is mandatory that the caller has broken the
@@ -32,49 +29,21 @@
3229
*
3330
* RESERVED FOR INTERNAL USE ONLY
3431
*/
35-
public final class UploadBufferPool {
36-
private final ClientLogger logger = new ClientLogger(UploadBufferPool.class);
37-
38-
/*
39-
Note that a blocking on a synchronized object is not the same as blocking on a reactive operation; blocking on this
40-
queue will not compromise the async nature of this workflow. Fluxes themselves are internally synchronized to ensure
41-
only one call to onNext happens at a time.
42-
*/
43-
private final BlockingQueue<BufferAggregator> buffers;
44-
45-
private final int maxBuffs;
46-
47-
// The number of buffs we have allocated. We can query the queue for how many are available.
48-
private int numBuffs;
32+
public final class BufferStagingArea {
4933

5034
private final long buffSize;
5135

5236
private BufferAggregator currentBuf;
5337

5438
/**
5539
* Creates a new instance of UploadBufferPool
56-
* @param numBuffs The number of buffers in the buffer pool.
5740
* @param buffSize The size of the buffers
41+
* @param maxBuffSize The max size of the buffers
5842
*/
59-
public UploadBufferPool(final int numBuffs, final long buffSize, long maxBuffSize) {
60-
/*
61-
We require at least two buffers because it is possible that a given write will spill over into a second buffer.
62-
We only need one overflow buffer because the max size of a ByteBuffer is assumed to be the size as a buffer in
63-
the pool.
64-
*/
65-
StorageImplUtils.assertInBounds("numBuffs", numBuffs, 2, Integer.MAX_VALUE);
66-
this.maxBuffs = numBuffs;
67-
buffers = new LinkedBlockingQueue<>(numBuffs);
68-
69-
43+
public BufferStagingArea(final long buffSize, long maxBuffSize) {
7044
// These buffers will be used in calls to stageBlock, so they must be no greater than block size.
7145
StorageImplUtils.assertInBounds("buffSize", buffSize, 1, maxBuffSize);
7246
this.buffSize = buffSize;
73-
74-
// We prep the queue with two buffers in case there is overflow.
75-
buffers.add(new BufferAggregator(this.buffSize));
76-
buffers.add(new BufferAggregator(this.buffSize));
77-
this.numBuffs = 2;
7847
}
7948

8049
/*
@@ -88,9 +57,10 @@ public UploadBufferPool(final int numBuffs, final long buffSize, long maxBuffSiz
8857
* @return The {@code Flux<BufferAggregator>}
8958
*/
9059
public Flux<BufferAggregator> write(ByteBuffer buf) {
60+
9161
// Check if there's a buffer holding any data from a previous call to write. If not, get a new one.
9262
if (this.currentBuf == null) {
93-
this.currentBuf = this.getBuffer();
63+
this.currentBuf = new BufferAggregator(this.buffSize);
9464
}
9565

9666
Flux<BufferAggregator> result;
@@ -126,37 +96,10 @@ public Flux<BufferAggregator> write(ByteBuffer buf) {
12696
means we'll only have to over flow once, and the buffer we overflow into will not be filled. This is the
12797
buffer we will write to on the next call to write().
12898
*/
129-
this.currentBuf = this.getBuffer();
99+
this.currentBuf = new BufferAggregator(this.buffSize);
130100
this.currentBuf.append(buf);
131101
}
132-
return result;
133-
}
134-
135-
/*
136-
Note that the upload method will be calling write sequentially as there is only one worker reading from the source
137-
and calling write. Hence there is only one worker calling getBuffer at any time.
138-
*/
139-
private BufferAggregator getBuffer() {
140-
BufferAggregator result;
141-
/*
142-
There are no buffers in the queue and we have space to allocate one. We do not add the new buffer to the queue
143-
because we want to make immediate use of it. This is effectively equivalent to a buffers.add(newBuffer) and
144-
then result = buffers.pop()--because we only get here when the queue is empty, the buffer returned is the one
145-
we just created. The new buffer will be added to buffers when it is returned to the pool.
146-
*/
147-
if (this.buffers.isEmpty() && this.numBuffs < this.maxBuffs) {
148-
result = new BufferAggregator(this.buffSize);
149-
this.numBuffs++;
150-
} else {
151-
try {
152-
// If empty, this will wait for an upload to finish and return a buffer.
153-
result = this.buffers.take();
154102

155-
} catch (InterruptedException e) {
156-
throw logger.logExceptionAsError(new IllegalStateException("BufferedUpload thread interrupted. Thread:"
157-
+ Thread.currentThread().getId()));
158-
}
159-
}
160103
return result;
161104
}
162105

@@ -177,19 +120,4 @@ public Flux<BufferAggregator> flush() {
177120
}
178121
return Flux.empty();
179122
}
180-
181-
/**
182-
* Returns the ByteBuffer
183-
* @param b The ByteBuffer to reset and return
184-
*/
185-
public void returnBuffer(BufferAggregator b) {
186-
// Reset the buffer aggregator.
187-
b.reset();
188-
189-
try {
190-
this.buffers.put(new BufferAggregator(this.buffSize));
191-
} catch (InterruptedException e) {
192-
throw logger.logExceptionAsError(new IllegalStateException("UploadFromStream thread interrupted."));
193-
}
194-
}
195123
}

sdk/storage/azure-storage-file-datalake/src/main/java/com/azure/storage/file/datalake/DataLakeFileAsyncClient.java

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import com.azure.storage.common.implementation.BufferAggregator;
2424
import com.azure.storage.common.implementation.Constants;
2525
import com.azure.storage.common.implementation.StorageImplUtils;
26-
import com.azure.storage.common.implementation.UploadBufferPool;
26+
import com.azure.storage.common.implementation.BufferStagingArea;
2727
import com.azure.storage.common.implementation.UploadUtils;
2828
import com.azure.storage.file.datalake.implementation.models.LeaseAccessConditions;
2929
import com.azure.storage.file.datalake.implementation.models.ModifiedAccessConditions;
@@ -35,13 +35,13 @@
3535
import com.azure.storage.file.datalake.models.DownloadRetryOptions;
3636
import com.azure.storage.file.datalake.models.FileExpirationOffset;
3737
import com.azure.storage.file.datalake.models.FileQueryAsyncResponse;
38-
import com.azure.storage.file.datalake.options.FileParallelUploadOptions;
39-
import com.azure.storage.file.datalake.options.FileQueryOptions;
4038
import com.azure.storage.file.datalake.models.FileRange;
4139
import com.azure.storage.file.datalake.models.FileReadAsyncResponse;
4240
import com.azure.storage.file.datalake.models.PathHttpHeaders;
4341
import com.azure.storage.file.datalake.models.PathInfo;
4442
import com.azure.storage.file.datalake.models.PathProperties;
43+
import com.azure.storage.file.datalake.options.FileParallelUploadOptions;
44+
import com.azure.storage.file.datalake.options.FileQueryOptions;
4545
import com.azure.storage.file.datalake.options.FileScheduleDeletionOptions;
4646
import reactor.core.publisher.Flux;
4747
import reactor.core.publisher.Mono;
@@ -349,20 +349,17 @@ private Mono<Response<PathInfo>> uploadInChunks(Flux<ByteBuffer> data, long file
349349
Lock progressLock = new ReentrantLock();
350350

351351
// Validation done in the constructor.
352-
/*
353-
We use maxConcurrency + 1 for the number of buffers because one buffer will typically be being filled while the
354-
others are being sent.
355-
*/
356-
UploadBufferPool pool = new UploadBufferPool(parallelTransferOptions.getMaxConcurrency() + 1,
357-
parallelTransferOptions.getBlockSizeLong(), MAX_APPEND_FILE_BYTES);
352+
BufferStagingArea pool = new BufferStagingArea(parallelTransferOptions.getBlockSizeLong(), MAX_APPEND_FILE_BYTES);
358353

359354
Flux<ByteBuffer> chunkedSource = UploadUtils.chunkSource(data, parallelTransferOptions);
360355

361356
/*
362357
Write to the pool and upload the output.
358+
maxConcurrency = 1 when writing means only 1 BufferAggregator will be accumulating at a time.
359+
parallelTransferOptions.getMaxConcurrency() appends will be happening at once, so we guarantee buffering of
360+
only concurrency + 1 chunks at a time.
363361
*/
364-
return chunkedSource.concatMap(pool::write)
365-
.limitRate(parallelTransferOptions.getMaxConcurrency()) // This guarantees that concatMap will only buffer maxConcurrency * chunkSize data
362+
return chunkedSource.flatMapSequential(pool::write, 1)
366363
.concatWith(Flux.defer(pool::flush))
367364
/* Map the data to a tuple 3, of buffer, buffer length, buffer offset */
368365
.map(bufferAggregator -> Tuples.of(bufferAggregator, bufferAggregator.length(), 0L))
@@ -389,10 +386,10 @@ private Mono<Response<PathInfo>> uploadInChunks(Flux<ByteBuffer> data, long file
389386
Flux<ByteBuffer> progressData = ProgressReporter.addParallelProgressReporting(
390387
bufferAggregator.asFlux(), parallelTransferOptions.getProgressReceiver(),
391388
progressLock, totalProgress);
389+
final long offset = currentBufferLength + currentOffset;
392390
return appendWithResponse(progressData, currentOffset, currentBufferLength, null,
393391
requestConditions.getLeaseId())
394-
.doFinally(x -> pool.returnBuffer(bufferAggregator))
395-
.map(resp -> currentBufferLength + currentOffset) /* End of file after append to pass to flush. */
392+
.map(resp -> offset) /* End of file after append to pass to flush. */
396393
.flux();
397394
}, parallelTransferOptions.getMaxConcurrency())
398395
.last()

sdk/storage/azure-storage-file-datalake/src/test/java/com/azure/storage/file/datalake/FileAPITest.groovy

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,13 @@ import com.azure.identity.DefaultAzureCredentialBuilder
77
import com.azure.storage.blob.BlobUrlParts
88
import com.azure.storage.blob.models.BlobErrorCode
99
import com.azure.storage.blob.models.BlobStorageException
10-
import com.azure.storage.blob.models.BlockListType
11-
import com.azure.storage.file.datalake.models.DownloadRetryOptions
1210
import com.azure.storage.common.ParallelTransferOptions
1311
import com.azure.storage.common.ProgressReceiver
1412
import com.azure.storage.common.implementation.Constants
1513
import com.azure.storage.file.datalake.models.AccessTier
1614
import com.azure.storage.file.datalake.models.DataLakeRequestConditions
1715
import com.azure.storage.file.datalake.models.DataLakeStorageException
16+
import com.azure.storage.file.datalake.models.DownloadRetryOptions
1817
import com.azure.storage.file.datalake.models.FileExpirationOffset
1918
import com.azure.storage.file.datalake.models.FileQueryArrowField
2019
import com.azure.storage.file.datalake.models.FileQueryArrowFieldType
@@ -36,11 +35,11 @@ import com.azure.storage.file.datalake.models.RolePermissions
3635
import com.azure.storage.file.datalake.options.FileParallelUploadOptions
3736
import com.azure.storage.file.datalake.options.FileQueryOptions
3837
import com.azure.storage.file.datalake.options.FileScheduleDeletionOptions
39-
import spock.lang.Ignore
4038
import reactor.core.Exceptions
4139
import reactor.core.publisher.Flux
4240
import reactor.core.publisher.Hooks
4341
import reactor.test.StepVerifier
42+
import spock.lang.Ignore
4443
import spock.lang.Requires
4544
import spock.lang.Unroll
4645

@@ -2296,7 +2295,6 @@ class FileAPITest extends APISpec {
22962295

22972296
@Unroll
22982297
@Requires({ liveMode() }) // Test uploads large amount of data
2299-
@Ignore("Timeouts")
23002298
def "Async buffered upload"() {
23012299
setup:
23022300
DataLakeFileAsyncClient facWrite = getPrimaryServiceClientForWrites(bufferSize)
@@ -2368,7 +2366,6 @@ class FileAPITest extends APISpec {
23682366

23692367
@Unroll
23702368
@Requires({ liveMode() })
2371-
@Ignore // Hanging in pipeline
23722369
def "Buffered upload with reporter"() {
23732370
setup:
23742371
DataLakeFileAsyncClient fac = fscAsync.getFileAsyncClient(generatePathName())
@@ -2402,7 +2399,6 @@ class FileAPITest extends APISpec {
24022399

24032400
@Unroll
24042401
@Requires({liveMode()}) // Test uploads large amount of data
2405-
@Ignore("Timeouts")
24062402
def "Buffered upload chunked source"() {
24072403
setup:
24082404
DataLakeFileAsyncClient facWrite = getPrimaryServiceClientForWrites(bufferSize)
@@ -2624,7 +2620,6 @@ class FileAPITest extends APISpec {
26242620

26252621
@Unroll
26262622
@Requires({ liveMode() })
2627-
// @Ignore("failing in ci")
26282623
def "Buffered upload options"() {
26292624
setup:
26302625
DataLakeFileAsyncClient fac = fscAsync.getFileAsyncClient(generatePathName())

0 commit comments

Comments
 (0)