77import com .azure .core .http .HttpResponse ;
88import com .azure .core .util .CoreUtils ;
99import com .azure .core .util .FluxUtil ;
10+ import com .azure .core .util .logging .ClientLogger ;
1011import reactor .core .publisher .Flux ;
1112import reactor .core .publisher .Mono ;
1213
1314import java .nio .ByteBuffer ;
1415import java .nio .charset .Charset ;
16+ import java .util .List ;
17+ import java .util .concurrent .atomic .AtomicLong ;
18+
19+ import static com .azure .core .util .FluxUtil .monoError ;
1520
1621/**
1722 * HTTP response which will buffer the response's body when/if it is read.
1823 */
1924public final class BufferedHttpResponse extends HttpResponse {
25+ private final ClientLogger logger = new ClientLogger (BufferedHttpResponse .class );
26+
2027 private final HttpResponse innerHttpResponse ;
21- private final Flux <ByteBuffer > cachedBody ;
28+ private final Mono <List <ByteBuffer >> cachedBody ;
29+ private final AtomicLong cachedBodySize = new AtomicLong ();
2230
2331 /**
2432 * Creates a buffered HTTP response.
@@ -28,12 +36,13 @@ public final class BufferedHttpResponse extends HttpResponse {
2836 public BufferedHttpResponse (HttpResponse innerHttpResponse ) {
2937 super (innerHttpResponse .getRequest ());
3038 this .innerHttpResponse = innerHttpResponse ;
31- this .cachedBody = FluxUtil .collectBytesFromNetworkResponse (innerHttpResponse .getBody (),
32- innerHttpResponse .getHeaders ())
33- .map (ByteBuffer ::wrap )
34- .flux ()
35- .cache ()
36- .map (ByteBuffer ::duplicate );
39+ this .cachedBody = innerHttpResponse .getBody ()
40+ .map (buffer -> {
41+ cachedBodySize .addAndGet (buffer .remaining ());
42+ return ByteBuffer .wrap (FluxUtil .byteBufferToArray (buffer ));
43+ })
44+ .collectList ()
45+ .cache ();
3746 }
3847
3948 @ Override
@@ -53,23 +62,35 @@ public HttpHeaders getHeaders() {
5362
5463 @ Override
5564 public Flux <ByteBuffer > getBody () {
56- return cachedBody ;
65+ return cachedBody . flatMapMany ( Flux :: fromIterable ). map ( ByteBuffer :: duplicate ) ;
5766 }
5867
5968 @ Override
6069 public Mono <byte []> getBodyAsByteArray () {
61- return cachedBody .next ().map (ByteBuffer ::array );
70+ // Check that the body would fit into a byte array before spending time to create the merged byte array.
71+ return (cachedBodySize .get () > Integer .MAX_VALUE )
72+ ? monoError (logger , new IllegalStateException (
73+ "Response with body size " + cachedBodySize .get () + " doesn't fit into a byte array." ))
74+ : FluxUtil .collectBytesInByteBufferStream (getBody (), (int ) cachedBodySize .get ());
6275 }
6376
6477 @ Override
6578 public Mono <String > getBodyAsString () {
66- return getBodyAsByteArray ().map (bytes ->
67- CoreUtils .bomAwareToString (bytes , innerHttpResponse .getHeaderValue ("Content-Type" )));
79+ // Check that the body would fit into a String before spending the time to create the String.
80+ return (cachedBodySize .get () > Integer .MAX_VALUE )
81+ ? monoError (logger , new IllegalStateException (
82+ "Response with body size " + cachedBodySize .get () + " doesn't fit into a String." ))
83+ : getBodyAsByteArray ().map (bytes ->
84+ CoreUtils .bomAwareToString (bytes , innerHttpResponse .getHeaderValue ("Content-Type" )));
6885 }
6986
7087 @ Override
7188 public Mono <String > getBodyAsString (Charset charset ) {
72- return getBodyAsByteArray ().map (bytes -> new String (bytes , charset ));
89+ // Check that the body would fit into a String before spending the time to create the String.
90+ return (cachedBodySize .get () > Integer .MAX_VALUE )
91+ ? monoError (logger , new IllegalStateException (
92+ "Response with body size " + cachedBodySize .get () + " doesn't fit into a String." ))
93+ : getBodyAsByteArray ().map (bytes -> new String (bytes , charset ));
7394 }
7495
7596 @ Override
0 commit comments