Skip to content

Commit 3aaeb58

Browse files
authored
See if Redesign of InputStream Converter Helps Memory Usage (Azure#32444)
1 parent eccb2c2 commit 3aaeb58

File tree

2 files changed

+99
-75
lines changed

2 files changed

+99
-75
lines changed

sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/BlobAsyncClient.java

Lines changed: 36 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -650,27 +650,34 @@ public Mono<Response<BlockBlobItem>> uploadWithResponse(Flux<ByteBuffer> data,
650650
@ServiceMethod(returns = ReturnType.SINGLE)
651651
public Mono<Response<BlockBlobItem>> uploadWithResponse(BlobParallelUploadOptions options) {
652652
/*
653-
The following is catalogue of all the places we allocate memory/copy in any upload method a justification for
654-
that case current as of 1/13/21.
655-
- Async buffered upload chunked upload: We used an UploadBufferPool. This will allocate memory as needed up to
656-
the configured maximum. This is necessary to support replayability on retires. Each flux to come out of the pool
657-
is a Flux.just() of up to two deep copied buffers, so it is replayable. It also allows us to optimize the upload
658-
by uploading the maximum amount per block. Finally, in the case of chunked uploading, it allows the customer to
659-
pass data without knowing the size. Note that full upload does not need a deep copy because the Flux emitted by
660-
the PayloadSizeGate in the full upload case is already replayable and the length is maintained by the gate.
661-
- Sync buffered upload: converting the input stream to a flux involves creating a buffer for each stream read.
662-
Using a new buffer per read ensures that the reads are safe and not overwriting data in buffers that were passed
663-
to the async upload but have not yet been sent. This covers both full and chunked uploads in the sync case.
664-
- BlobOutputStream: A deep copy is made of any buffer passed to write. While async copy does streamline our code
665-
and allow for some potential parallelization, this extra copy is necessary to ensure that customers writing to
666-
the stream in a tight loop are not overwriting data previously given to the stream before it has been sent.
667-
668-
Taken together, these should support retries and protect against data being overwritten in all upload scenarios.
669-
670-
One note is that there is no deep copy in the uploadFull method. This is unnecessary as explained in
671-
uploadFullOrChunked because the Flux coming out of the size gate in that case is already replayable and reusing
672-
buffers is not a common scenario for async like it is in sync (and we already buffer in sync to convert from a
673-
stream).
653+
* The following is catalogue of all the places we allocate memory/copy in any upload method a justification for
654+
* that case current as of 1/13/21.
655+
*
656+
* - Async buffered upload chunked upload: We used an UploadBufferPool. This will allocate memory as needed up
657+
* to the configured maximum. This is necessary to support replayability on retires. Each flux to come out of
658+
* the pool is a Flux.just() of up to two deep copied buffers, so it is replayable. It also allows us to
659+
* optimize the upload by uploading the maximum amount per block. Finally, in the case of chunked uploading,
660+
* it allows the customer to pass data without knowing the size. Note that full upload does not need a deep
661+
* copy because the Flux emitted by the PayloadSizeGate in the full upload case is already replayable and the
662+
* length is maintained by the gate.
663+
*
664+
* - Sync buffered upload: converting the input stream to a flux involves creating a buffer for each stream
665+
* read. Using a new buffer per read ensures that the reads are safe and not overwriting data in buffers that
666+
* were passed to the async upload but have not yet been sent. This covers both full and chunked uploads in
667+
* the sync case.
668+
*
669+
* - BlobOutputStream: A deep copy is made of any buffer passed to write. While async copy does streamline our
670+
* code and allow for some potential parallelization, this extra copy is necessary to ensure that customers
671+
* writing to the stream in a tight loop are not overwriting data previously given to the stream before it has
672+
* been sent.
673+
*
674+
* Taken together, these should support retries and protect against data being overwritten in all upload
675+
* scenarios.
676+
*
677+
* One note is that there is no deep copy in the uploadFull method. This is unnecessary as explained in
678+
* uploadFullOrChunked because the Flux coming out of the size gate in that case is already replayable and
679+
* reusing buffers is not a common scenario for async like it is in sync (and we already buffer in sync to
680+
* convert from a stream).
674681
*/
675682
try {
676683
StorageImplUtils.assertNotNull("options", options);
@@ -728,10 +735,9 @@ private Mono<Response<BlockBlobItem>> uploadFullBlob(BlockBlobAsyncClient blockB
728735
Boolean legalHold) {
729736

730737
/*
731-
Note that there is no need to buffer here as the flux returned by the size gate in this case is created
732-
from an iterable and is therefore replayable.
738+
* Note that there is no need to buffer here as the flux returned by the size gate in this case is created
739+
* from an iterable and is therefore replayable.
733740
*/
734-
735741
return UploadUtils.computeMd5(data, computeMd5, LOGGER)
736742
.map(fluxMd5Wrapper -> new BlockBlobSimpleUploadOptions(fluxMd5Wrapper.getData(), length)
737743
.setHeaders(headers)
@@ -775,10 +781,10 @@ private Mono<Response<BlockBlobItem>> uploadInChunks(BlockBlobAsyncClient blockB
775781
ModelHelper.wrapBlobOptions(parallelTransferOptions));
776782

777783
/*
778-
Write to the pool and upload the output.
779-
maxConcurrency = 1 when writing means only 1 BufferAggregator will be accumulating at a time.
780-
parallelTransferOptions.getMaxConcurrency() appends will be happening at once, so we guarantee buffering of
781-
only concurrency + 1 chunks at a time.
784+
* Write to the pool and upload the output.
785+
* maxConcurrency = 1 when writing means only 1 BufferAggregator will be accumulating at a time.
786+
* parallelTransferOptions.getMaxConcurrency() appends will be happening at once, so we guarantee buffering of
787+
* only concurrency + 1 chunks at a time.
782788
*/
783789
return chunkedSource.flatMapSequential(stagingArea::write, 1, 1)
784790
.concatWith(Flux.defer(stagingArea::flush))
@@ -799,10 +805,8 @@ private Mono<Response<BlockBlobItem>> uploadInChunks(BlockBlobAsyncClient blockB
799805
}
800806
return responseMono;
801807
})
802-
// We only care about the stageBlock insofar as it was successful,
803-
// but we need to collect the ids.
804-
.map(x -> blockId)
805-
.flux();
808+
// We only care about the stageBlock insofar as it was successful, but we need to collect the ids.
809+
.map(x -> blockId);
806810
}, parallelTransferOptions.getMaxConcurrency(), 1)
807811
.collect(Collectors.toList())
808812
.flatMap(ids ->

sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/Utility.java

Lines changed: 63 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,15 @@
55

66
import com.azure.core.exception.UnexpectedLengthException;
77
import com.azure.core.util.CoreUtils;
8+
import com.azure.core.util.FluxUtil;
89
import com.azure.core.util.UrlBuilder;
910
import com.azure.core.util.logging.ClientLogger;
1011
import com.azure.storage.common.implementation.StorageImplUtils;
1112
import reactor.core.publisher.Flux;
12-
import reactor.core.publisher.Mono;
1313

1414
import java.io.IOException;
1515
import java.io.InputStream;
16+
import java.io.UncheckedIOException;
1617
import java.io.UnsupportedEncodingException;
1718
import java.net.URLDecoder;
1819
import java.net.URLEncoder;
@@ -188,7 +189,7 @@ public static Flux<ByteBuffer> convertStreamToByteBuffer(InputStream data, long
188189
/**
189190
* A utility method for converting the input stream to Flux of ByteBuffer. Will check the equality of entity length
190191
* and the input length.
191-
*
192+
* <p>
192193
* Using markAndReset=true to force a seekable stream implies a buffering strategy is not being used, in which case
193194
* length is still needed for whatever underlying REST call is being streamed to. If markAndReset=false and data is
194195
* being buffered, consider using {@link com.azure.core.util.FluxUtil#toFluxByteBuffer(InputStream, int)} which
@@ -205,78 +206,97 @@ public static Flux<ByteBuffer> convertStreamToByteBuffer(InputStream data, long
205206
* @throws RuntimeException When I/O error occurs.
206207
*/
207208
public static Flux<ByteBuffer> convertStreamToByteBuffer(InputStream data, long length, int blockSize,
208-
boolean markAndReset) {
209+
boolean markAndReset) {
209210
if (markAndReset) {
210211
data.mark(Integer.MAX_VALUE);
211212
}
213+
212214
if (length == 0) {
213215
try {
214216
if (data.read() != -1) {
215217
long totalLength = 1 + data.available();
216-
throw LOGGER.logExceptionAsError(new UnexpectedLengthException(
217-
String.format("Request body emitted %d bytes, more than the expected %d bytes.",
218-
totalLength, length), totalLength, length));
218+
return FluxUtil.fluxError(LOGGER, new UnexpectedLengthException(String.format(
219+
"Request body emitted %d bytes, more than the expected %d bytes.", totalLength, length),
220+
totalLength, length));
219221
}
220222
} catch (IOException e) {
221-
throw LOGGER.logExceptionAsError(new RuntimeException("I/O errors occurred", e));
223+
return FluxUtil.fluxError(LOGGER, new UncheckedIOException(e));
222224
}
223225
}
226+
224227
return Flux.defer(() -> {
225228
/*
226-
If the request needs to be retried, the flux will be resubscribed to. The stream and counter must be
227-
reset in order to correctly return the same data again.
229+
* If the request needs to be retried, the flux will be resubscribed to. The stream and counter must be
230+
* reset in order to correctly return the same data again.
228231
*/
229-
final long[] currentTotalLength = new long[1];
230232
if (markAndReset) {
231233
try {
232234
data.reset();
233235
} catch (IOException e) {
234-
throw LOGGER.logExceptionAsError(new RuntimeException(e));
236+
return FluxUtil.fluxError(LOGGER, new UncheckedIOException(e));
235237
}
236238
}
237-
return Flux.range(0, (int) Math.ceil((double) length / (double) blockSize))
238-
.map(i -> i * blockSize)
239-
.concatMap(pos -> Mono.fromCallable(() -> {
240-
long count = pos + blockSize > length ? length - pos : blockSize;
241-
byte[] cache = new byte[(int) count];
242-
int numOfBytes = 0;
243-
int offset = 0;
244-
// Revise the casting if the max allowed network data transmission is over 2G.
245-
int len = (int) count;
246-
while (numOfBytes != -1 && offset < count) {
239+
240+
final long[] currentTotalLength = new long[1];
241+
return Flux.generate(() -> data, (is, sink) -> {
242+
long pos = currentTotalLength[0];
243+
244+
long count = (pos + blockSize) > length ? (length - pos) : blockSize;
245+
byte[] cache = new byte[(int) count];
246+
247+
int numOfBytes = 0;
248+
int offset = 0;
249+
// Revise the casting if the max allowed network data transmission is over 2G.
250+
int len = (int) count;
251+
252+
while (numOfBytes != -1 && offset < count) {
253+
try {
247254
numOfBytes = data.read(cache, offset, len);
248255
if (numOfBytes != -1) {
249256
offset += numOfBytes;
250257
len -= numOfBytes;
251258
currentTotalLength[0] += numOfBytes;
252259
}
260+
} catch (IOException e) {
261+
sink.error(e);
262+
return is;
253263
}
254-
if (numOfBytes == -1 && currentTotalLength[0] < length) {
255-
throw LOGGER.logExceptionAsError(new UnexpectedLengthException(
256-
String.format("Request body emitted %d bytes, less than the expected %d bytes.",
257-
currentTotalLength[0], length), currentTotalLength[0], length));
258-
}
264+
}
259265

260-
// Validate that stream isn't longer.
261-
if (currentTotalLength[0] >= length) {
262-
try {
263-
if (data.read() != -1) {
264-
long totalLength = 1 + currentTotalLength[0] + data.available();
265-
throw LOGGER.logExceptionAsError(new UnexpectedLengthException(
266-
String.format("Request body emitted %d bytes, more than the expected %d bytes.",
267-
totalLength, length), totalLength, length));
268-
} else if (currentTotalLength[0] > length) {
269-
throw LOGGER.logExceptionAsError(new IllegalStateException(
270-
String.format("Read more data than was requested. Size of data read: %d. Size of data"
271-
+ " requested: %d", currentTotalLength[0], length)));
272-
}
273-
} catch (IOException e) {
274-
throw LOGGER.logExceptionAsError(new RuntimeException("I/O errors occurred", e));
266+
if (numOfBytes == -1 && currentTotalLength[0] < length) {
267+
sink.error(LOGGER.logExceptionAsError(new UnexpectedLengthException(String.format(
268+
"Request body emitted %d bytes, less than the expected %d bytes.",
269+
currentTotalLength[0], length), currentTotalLength[0], length)));
270+
return is;
271+
}
272+
273+
// Validate that stream isn't longer.
274+
if (currentTotalLength[0] >= length) {
275+
try {
276+
if (data.read() != -1) {
277+
long totalLength = 1 + currentTotalLength[0] + data.available();
278+
sink.error(LOGGER.logExceptionAsError(new UnexpectedLengthException(
279+
String.format("Request body emitted %d bytes, more than the expected %d bytes.",
280+
totalLength, length), totalLength, length)));
281+
return is;
282+
} else if (currentTotalLength[0] > length) {
283+
sink.error(LOGGER.logExceptionAsError(new IllegalStateException(
284+
String.format("Read more data than was requested. Size of data read: %d. Size of data"
285+
+ " requested: %d", currentTotalLength[0], length))));
286+
return is;
275287
}
288+
} catch (IOException e) {
289+
sink.error(LOGGER.logExceptionAsError(new RuntimeException("I/O errors occurred", e)));
290+
return is;
276291
}
292+
}
277293

278-
return ByteBuffer.wrap(cache, 0, offset);
279-
}));
294+
sink.next(ByteBuffer.wrap(cache, 0, offset));
295+
if (currentTotalLength[0] == length) {
296+
sink.complete();
297+
}
298+
return is;
299+
});
280300
});
281301
}
282302

0 commit comments

Comments
 (0)