Skip to content

Commit 81af98c

Browse files
authored
Ensure BinaryData responses are durable in polling operations (Azure#36999)
Ensure BinaryData responses are durable in polling operations
1 parent d0cfcce commit 81af98c

File tree

3 files changed

+101
-22
lines changed

3 files changed

+101
-22
lines changed

sdk/core/azure-core/src/main/java/com/azure/core/implementation/util/FluxByteBufferContent.java

Lines changed: 35 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import java.util.Objects;
1919
import java.util.concurrent.atomic.AtomicReference;
2020
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
21-
import java.util.function.BiConsumer;
2221

2322
/**
2423
* A {@link BinaryDataContent} implementation which is backed by a {@link Flux} of {@link ByteBuffer}.
@@ -125,31 +124,47 @@ public BinaryDataContent toReplayableContent() {
125124
return replayableContent;
126125
}
127126

128-
Flux<ByteBuffer> bufferedFlux = content
129-
.map(buffer -> {
130-
// deep copy direct buffers
131-
ByteBuffer copy = ByteBuffer.allocate(buffer.remaining());
132-
copy.put(buffer);
133-
copy.flip();
134-
return copy;
135-
})
136-
// collectList() uses ArrayList. We don't want to be bound by array capacity
137-
// and we don't need random access.
138-
.collect(LinkedList::new, (BiConsumer<LinkedList<ByteBuffer>, ByteBuffer>) LinkedList::add)
139-
.cache()
140-
.flatMapMany(
141-
// Duplicate buffers on re-subscription.
142-
listOfBuffers -> Flux.fromIterable(listOfBuffers).map(ByteBuffer::duplicate));
143-
replayableContent = new FluxByteBufferContent(bufferedFlux, length, true);
144-
cachedReplayableContent.set(replayableContent);
145-
return replayableContent;
127+
return bufferContent().map(bufferedData -> {
128+
FluxByteBufferContent bufferedContent = new FluxByteBufferContent(Flux.fromIterable(bufferedData)
129+
.map(ByteBuffer::duplicate), length, true);
130+
cachedReplayableContent.set(bufferedContent);
131+
132+
return bufferedContent;
133+
}).block();
146134
}
147135

148136
@Override
149137
public Mono<BinaryDataContent> toReplayableContentAsync() {
150-
return Mono.fromCallable(this::toReplayableContent);
138+
if (isReplayable) {
139+
return Mono.just(this);
140+
}
141+
142+
FluxByteBufferContent replayableContent = cachedReplayableContent.get();
143+
if (replayableContent != null) {
144+
return Mono.just(replayableContent);
145+
}
146+
147+
return bufferContent().cache().map(bufferedData -> {
148+
Flux<ByteBuffer> bufferedFluxData = Flux.fromIterable(bufferedData).map(ByteBuffer::asReadOnlyBuffer);
149+
FluxByteBufferContent bufferedBinaryDataContent = new FluxByteBufferContent(bufferedFluxData, length, true);
150+
cachedReplayableContent.set(bufferedBinaryDataContent);
151+
152+
return bufferedBinaryDataContent;
153+
});
151154
}
152155

156+
private Mono<LinkedList<ByteBuffer>> bufferContent() {
157+
// collectList() uses ArrayList, we don't want to be bound by array capacity and don't need random access
158+
return content.map(buffer -> {
159+
// deep copy direct buffers
160+
ByteBuffer copy = ByteBuffer.allocate(buffer.remaining());
161+
copy.put(buffer);
162+
copy.flip();
163+
return copy;
164+
}).collect(LinkedList::new, LinkedList::add);
165+
}
166+
167+
153168
@Override
154169
public BinaryDataContentType getContentType() {
155170
return BinaryDataContentType.BINARY;

sdk/core/azure-core/src/main/java/com/azure/core/util/polling/implementation/PollingUtils.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ public static BinaryData serializeResponseSync(Object response, ObjectSerializer
6363
public static <T> Mono<T> deserializeResponse(BinaryData binaryData, ObjectSerializer serializer,
6464
TypeReference<T> typeReference) {
6565
if (TypeUtil.isTypeOrSubTypeOf(BinaryData.class, typeReference.getJavaType())) {
66-
return Mono.just((T) binaryData);
66+
return (Mono<T>) binaryData.toReplayableBinaryDataAsync();
6767
} else {
6868
return binaryData.toObjectAsync(typeReference, serializer);
6969
}
@@ -83,7 +83,7 @@ public static <T> Mono<T> deserializeResponse(BinaryData binaryData, ObjectSeria
8383
public static <T> T deserializeResponseSync(BinaryData binaryData, ObjectSerializer serializer,
8484
TypeReference<T> typeReference) {
8585
if (TypeUtil.isTypeOrSubTypeOf(BinaryData.class, typeReference.getJavaType())) {
86-
return (T) binaryData;
86+
return (T) binaryData.toReplayableBinaryData();
8787
} else {
8888
return binaryData.toObject(typeReference, serializer);
8989
}

sdk/core/azure-core/src/test/java/com/azure/core/util/BinaryDataTest.java

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import java.util.List;
5858
import java.util.Objects;
5959
import java.util.UUID;
60+
import java.util.concurrent.ThreadLocalRandom;
6061
import java.util.concurrent.atomic.AtomicBoolean;
6162
import java.util.concurrent.atomic.AtomicInteger;
6263
import java.util.concurrent.atomic.AtomicLong;
@@ -68,6 +69,7 @@
6869
import static com.azure.core.CoreTestUtils.fillArray;
6970
import static com.azure.core.CoreTestUtils.readStream;
7071
import static com.azure.core.implementation.util.BinaryDataContent.STREAM_READ_SIZE;
72+
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
7173
import static org.junit.jupiter.api.Assertions.assertEquals;
7274
import static org.junit.jupiter.api.Assertions.assertFalse;
7375
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@@ -1049,6 +1051,68 @@ public void binaryDataAsPropertyDeserialization() throws IOException {
10491051
assertEquals(expected.getProperty().toString(), actual.getProperty().toString());
10501052
}
10511053

1054+
@Test
1055+
public void emptyFluxByteBufferToReplayable() {
1056+
BinaryData binaryData = BinaryData.fromFlux(Flux.empty()).block();
1057+
1058+
BinaryData replayable = assertDoesNotThrow(() -> binaryData.toReplayableBinaryData());
1059+
assertEquals("", replayable.toString());
1060+
}
1061+
1062+
@Test
1063+
public void emptyFluxByteBufferToReplayableAsync() {
1064+
StepVerifier.create(BinaryData.fromFlux(Flux.empty())
1065+
.flatMap(BinaryData::toReplayableBinaryDataAsync))
1066+
.assertNext(replayable -> assertEquals("", replayable.toString()))
1067+
.verifyComplete();
1068+
}
1069+
1070+
/**
1071+
* Tests that {@link FluxByteBufferContent#toReplayableContent()} eagerly makes the {@link FluxByteBufferContent}
1072+
* replayable. Before, this method wouldn't make the content replayable until the return
1073+
* {@link FluxByteBufferContent} was consumed, which defeated the purpose of the method as the underlying data could
1074+
* be reclaimed or consumed before it was made replayable.
1075+
*/
1076+
@Test
1077+
public void fluxByteBufferToReplayableEagerlyConvertsToReplayable() {
1078+
byte[] data = new byte[1024];
1079+
ThreadLocalRandom.current().nextBytes(data);
1080+
byte[] expectedData = CoreUtils.clone(data);
1081+
1082+
BinaryDataContent binaryDataContent = new FluxByteBufferContent(Flux.just(ByteBuffer.wrap(data)))
1083+
.toReplayableContent();
1084+
1085+
Arrays.fill(data, (byte) 0);
1086+
1087+
assertArraysEqual(expectedData, binaryDataContent.toBytes());
1088+
}
1089+
1090+
/**
1091+
* Tests that {@link FluxByteBufferContent} returned by {@link FluxByteBufferContent#toReplayableContentAsync()}
1092+
* won't attempt to access the original {@link Flux Flux&lt;ByteBuffer&gt;} as the initial duplicated is cached as a
1093+
* stream of {@link ByteBuffer ByteBuffers} that are shared to all subscribers, and duplicated in each subscription
1094+
* so that the underlying content cannot be modified.
1095+
*/
1096+
@Test
1097+
public void multipleSubscriptionsToReplayableAsyncFluxByteBufferAreConsistent() {
1098+
byte[] data = new byte[1024];
1099+
ThreadLocalRandom.current().nextBytes(data);
1100+
byte[] expectedData = CoreUtils.clone(data);
1101+
1102+
Mono<BinaryDataContent> binaryDataContentMono = new FluxByteBufferContent(Flux.just(ByteBuffer.wrap(data)))
1103+
.toReplayableContentAsync();
1104+
1105+
StepVerifier.create(binaryDataContentMono)
1106+
.assertNext(binaryDataContent -> assertArraysEqual(expectedData, binaryDataContent.toBytes()))
1107+
.verifyComplete();
1108+
1109+
Arrays.fill(data, (byte) 0);
1110+
1111+
StepVerifier.create(binaryDataContentMono)
1112+
.assertNext(binaryDataContent -> assertArraysEqual(expectedData, binaryDataContent.toBytes()))
1113+
.verifyComplete();
1114+
}
1115+
10521116
public static final class BinaryDataAsProperty {
10531117
@JsonProperty("property")
10541118
private BinaryData property;

0 commit comments

Comments
 (0)