Skip to content

Commit 6e09986

Browse files
authored
Make fromFlux read streams eagerly (Azure#23907)
* Make fromFlux read streams eagerly * Remove unused import
1 parent 750fe59 commit 6e09986

File tree

1 file changed

+11
-8
lines changed

1 file changed

+11
-8
lines changed

sdk/core/azure-core/src/main/java/com/azure/core/util/BinaryData.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import com.azure.core.util.implementation.BinaryDataContent;
77
import com.azure.core.util.implementation.ByteArrayContent;
88
import com.azure.core.util.implementation.FileContent;
9-
import com.azure.core.util.implementation.FluxByteBufferContent;
109
import com.azure.core.util.implementation.InputStreamContent;
1110
import com.azure.core.util.implementation.SerializableContent;
1211
import com.azure.core.util.implementation.StringContent;
@@ -149,8 +148,7 @@ public static Mono<BinaryData> fromStreamAsync(InputStream inputStream) {
149148
}
150149

151150
/**
152-
* Creates an instance of {@link BinaryData} from the given {@link Flux} of {@link ByteBuffer}. The source flux
153-
* is subscribed to as many times as the content is read. The flux, therefore, must be replayable.
151+
* Creates an instance of {@link BinaryData} from the given {@link Flux} of {@link ByteBuffer}.
154152
*
155153
* <p><strong>Create an instance from a Flux of ByteBuffer</strong></p>
156154
*
@@ -164,12 +162,12 @@ public static Mono<BinaryData> fromFlux(Flux<ByteBuffer> data) {
164162
if (data == null) {
165163
return monoError(LOGGER, new NullPointerException("'content' cannot be null."));
166164
}
167-
return Mono.just(new BinaryData(new FluxByteBufferContent(data)));
165+
return FluxUtil.collectBytesInByteBufferStream(data)
166+
.flatMap(bytes -> Mono.just(BinaryData.fromBytes(bytes)));
168167
}
169168

170169
/**
171-
* Creates an instance of {@link BinaryData} from the given {@link Flux} of {@link ByteBuffer}. The source flux
172-
* is subscribed to as many times as the content is read. The flux, therefore, must be replayable.
170+
* Creates an instance of {@link BinaryData} from the given {@link Flux} of {@link ByteBuffer}.
173171
*
174172
* <p><strong>Create an instance from a Flux of ByteBuffer</strong></p>
175173
*
@@ -185,10 +183,15 @@ public static Mono<BinaryData> fromFlux(Flux<ByteBuffer> data, Long length) {
185183
if (data == null) {
186184
return monoError(LOGGER, new NullPointerException("'content' cannot be null."));
187185
}
188-
if (length < 0) {
186+
if (length != null && length < 0) {
189187
return monoError(LOGGER, new IllegalArgumentException("'length' cannot be less than 0."));
190188
}
191-
return Mono.just(new BinaryData(new FluxByteBufferContent(data, length)));
189+
if (length != null) {
190+
return FluxUtil.collectBytesInByteBufferStream(data, length.intValue())
191+
.flatMap(bytes -> Mono.just(BinaryData.fromBytes(bytes)));
192+
}
193+
return FluxUtil.collectBytesInByteBufferStream(data)
194+
.flatMap(bytes -> Mono.just(BinaryData.fromBytes(bytes)));
192195
}
193196

194197
/**

0 commit comments

Comments
 (0)