1010import com .azure .core .http .HttpPipelineNextPolicy ;
1111import com .azure .core .http .HttpRequest ;
1212import com .azure .core .http .HttpResponse ;
13+ import com .azure .core .implementation .AccessibleByteArrayOutputStream ;
14+ import com .azure .core .implementation .ImplUtils ;
1315import com .azure .core .implementation .http .HttpPipelineCallContextHelper ;
1416import com .azure .core .implementation .jackson .ObjectMapperShim ;
1517import com .azure .core .util .Context ;
1618import com .azure .core .util .CoreUtils ;
19+ import com .azure .core .util .FluxUtil ;
1720import com .azure .core .util .UrlBuilder ;
1821import com .azure .core .util .logging .ClientLogger ;
1922import com .azure .core .util .logging .LogLevel ;
23+ import reactor .core .publisher .Flux ;
2024import reactor .core .publisher .Mono ;
2125
22- import java .io .ByteArrayOutputStream ;
2326import java .io .IOException ;
24- import java .io .UnsupportedEncodingException ;
27+ import java .io .UncheckedIOException ;
2528import java .net .URL ;
2629import java .nio .ByteBuffer ;
27- import java .nio .channels . Channels ;
28- import java .nio .channels . WritableByteChannel ;
30+ import java .nio .charset . Charset ;
31+ import java .nio .charset . StandardCharsets ;
2932import java .time .Duration ;
3033import java .util .Collections ;
3134import java .util .Locale ;
@@ -122,14 +125,14 @@ public Mono<HttpResponse> process(HttpPipelineCallContext context, HttpPipelineN
122125 private HttpRequestLoggingContext getRequestLoggingOptions (HttpPipelineCallContext callContext ) {
123126 return new HttpRequestLoggingContext (callContext .getHttpRequest (),
124127 HttpPipelineCallContextHelper .getContext (callContext ),
125- getRequestRetryCount (HttpPipelineCallContextHelper .getContext (callContext ), LOGGER ));
128+ getRequestRetryCount (HttpPipelineCallContextHelper .getContext (callContext )));
126129 }
127130
128131 private HttpResponseLoggingContext getResponseLoggingOptions (HttpResponse httpResponse , long startNs ,
129132 HttpPipelineCallContext callContext ) {
130133 return new HttpResponseLoggingContext (httpResponse , Duration .ofNanos (System .nanoTime () - startNs ),
131134 HttpPipelineCallContextHelper .getContext (callContext ),
132- getRequestRetryCount (HttpPipelineCallContextHelper .getContext (callContext ), LOGGER ));
135+ getRequestRetryCount (HttpPipelineCallContextHelper .getContext (callContext )));
133136 }
134137
135138 private final class DefaultHttpRequestLogger implements HttpRequestLogger {
@@ -164,7 +167,8 @@ public Mono<Void> logRequest(ClientLogger logger, HttpRequestLoggingContext logg
164167 }
165168
166169 if (!httpLogDetailLevel .shouldLogBody ()) {
167- return logAndReturn (logger , logLevel , requestLogMessage , null );
170+ logMessage (logger , logLevel , requestLogMessage );
171+ return Mono .empty ();
168172 }
169173
170174 if (request .getBody () == null ) {
@@ -174,35 +178,40 @@ public Mono<Void> logRequest(ClientLogger logger, HttpRequestLoggingContext logg
174178 .append (request .getHttpMethod ())
175179 .append (System .lineSeparator ());
176180
177- return logAndReturn (logger , logLevel , requestLogMessage , null );
181+ logMessage (logger , logLevel , requestLogMessage );
182+ return Mono .empty ();
178183 }
179184
180185 String contentType = request .getHeaders ().getValue ("Content-Type" );
181186 long contentLength = getContentLength (logger , request .getHeaders ());
182187
183188 if (shouldBodyBeLogged (contentType , contentLength )) {
184- ByteArrayOutputStream outputStream = new ByteArrayOutputStream ((int ) contentLength );
185- WritableByteChannel bodyContentChannel = Channels .newChannel (outputStream );
189+ AccessibleByteArrayOutputStream stream = new AccessibleByteArrayOutputStream ((int ) contentLength );
186190
187191 // Add non-mutating operators to the data stream.
188192 request .setBody (
189193 request .getBody ()
190- .flatMap (byteBuffer -> writeBufferToBodyStream (bodyContentChannel , byteBuffer ))
194+ .doOnNext (byteBuffer -> {
195+ try {
196+ ImplUtils .writeByteBufferToStream (byteBuffer .duplicate (), stream );
197+ } catch (IOException ex ) {
198+ throw LOGGER .logExceptionAsError (new UncheckedIOException (ex ));
199+ }
200+ })
191201 .doFinally (ignored -> {
192202 requestLogMessage .append (contentLength )
193203 .append ("-byte body:" )
194204 .append (System .lineSeparator ())
195205 .append (prettyPrintIfNeeded (logger , prettyPrintBody , contentType ,
196- convertStreamToString ( outputStream , logger )))
206+ new String ( stream . toByteArray (), 0 , stream . count (), StandardCharsets . UTF_8 )))
197207 .append (System .lineSeparator ())
198208 .append ("--> END " )
199209 .append (request .getHttpMethod ())
200210 .append (System .lineSeparator ());
201211
202- logAndReturn (logger , logLevel , requestLogMessage , null );
212+ logMessage (logger , logLevel , requestLogMessage );
203213 }));
204214
205- return Mono .empty ();
206215 } else {
207216 requestLogMessage .append (contentLength )
208217 .append ("-byte body: (content not logged)" )
@@ -211,8 +220,10 @@ public Mono<Void> logRequest(ClientLogger logger, HttpRequestLoggingContext logg
211220 .append (request .getHttpMethod ())
212221 .append (System .lineSeparator ());
213222
214- return logAndReturn (logger , logLevel , requestLogMessage , null );
223+ logMessage (logger , logLevel , requestLogMessage );
215224 }
225+
226+ return Mono .empty ();
216227 }
217228 }
218229
@@ -251,40 +262,28 @@ public Mono<HttpResponse> logResponse(ClientLogger logger, HttpResponseLoggingCo
251262
252263 if (!httpLogDetailLevel .shouldLogBody ()) {
253264 responseLogMessage .append ("<-- END HTTP" );
254- return logAndReturn (logger , logLevel , responseLogMessage , response );
265+ logMessage (logger , logLevel , responseLogMessage );
266+ return Mono .justOrEmpty (response );
255267 }
256268
257269 String contentTypeHeader = response .getHeaderValue ("Content-Type" );
258270 long contentLength = getContentLength (logger , response .getHeaders ());
259271
260272 if (shouldBodyBeLogged (contentTypeHeader , contentLength )) {
261- HttpResponse bufferedResponse = response .buffer ();
262- ByteArrayOutputStream outputStream = new ByteArrayOutputStream ((int ) contentLength );
263- WritableByteChannel bodyContentChannel = Channels .newChannel (outputStream );
264- return bufferedResponse .getBody ()
265- .flatMap (byteBuffer -> writeBufferToBodyStream (bodyContentChannel , byteBuffer ))
266- .doFinally (ignored -> {
267- responseLogMessage .append ("Response body:" )
268- .append (System .lineSeparator ())
269- .append (prettyPrintIfNeeded (logger , prettyPrintBody , contentTypeHeader ,
270- convertStreamToString (outputStream , logger )))
271- .append (System .lineSeparator ())
272- .append ("<-- END HTTP" );
273-
274- logAndReturn (logger , logLevel , responseLogMessage , response );
275- }).then (Mono .just (bufferedResponse ));
273+ return Mono .just (new LoggingHttpResponse (response , responseLogMessage , logger , logLevel ,
274+ (int ) contentLength , contentTypeHeader , prettyPrintBody ));
276275 } else {
277276 responseLogMessage .append ("(body content not logged)" )
278277 .append (System .lineSeparator ())
279278 .append ("<-- END HTTP" );
280279
281- return logAndReturn (logger , logLevel , responseLogMessage , response );
280+ logMessage (logger , logLevel , responseLogMessage );
281+ return Mono .just (response );
282282 }
283283 }
284284 }
285285
286- private static <T > Mono <T > logAndReturn (ClientLogger logger , LogLevel logLevel , StringBuilder logMessageBuilder ,
287- T data ) {
286+ private static void logMessage (ClientLogger logger , LogLevel logLevel , StringBuilder logMessageBuilder ) {
288287 switch (logLevel ) {
289288 case VERBOSE :
290289 logger .verbose (logMessageBuilder .toString ());
@@ -305,8 +304,6 @@ private static <T> Mono<T> logAndReturn(ClientLogger logger, LogLevel logLevel,
305304 default :
306305 break ;
307306 }
308-
309- return Mono .justOrEmpty (data );
310307 }
311308
312309 /*
@@ -450,36 +447,13 @@ private static boolean shouldBodyBeLogged(String contentTypeHeader, long content
450447 && contentLength < MAX_BODY_LOG_SIZE ;
451448 }
452449
453- /*
454- * Helper function which converts a ByteArrayOutputStream to a String without duplicating the internal buffer.
455- */
456- private static String convertStreamToString (ByteArrayOutputStream stream , ClientLogger logger ) {
457- try {
458- return stream .toString ("UTF-8" );
459- } catch (UnsupportedEncodingException ex ) {
460- throw logger .logExceptionAsError (new RuntimeException (ex ));
461- }
462- }
463-
464- /*
465- * Helper function which writes body ByteBuffers into the body message channel.
466- */
467- private static Mono <ByteBuffer > writeBufferToBodyStream (WritableByteChannel channel , ByteBuffer byteBuffer ) {
468- try {
469- channel .write (byteBuffer .duplicate ());
470- return Mono .just (byteBuffer );
471- } catch (IOException ex ) {
472- return Mono .error (ex );
473- }
474- }
475-
476450 /*
477451 * Gets the request retry count to include in logging.
478452 *
479453 * If there is no value set or it isn't a valid number null will be returned indicating that retry count won't be
480454 * logged.
481455 */
482- private static Integer getRequestRetryCount (Context context , ClientLogger logger ) {
456+ private static Integer getRequestRetryCount (Context context ) {
483457 Object rawRetryCount = context .getData (RETRY_COUNT_CONTEXT ).orElse (null );
484458 if (rawRetryCount == null ) {
485459 return null ;
@@ -488,7 +462,7 @@ private static Integer getRequestRetryCount(Context context, ClientLogger logger
488462 try {
489463 return Integer .valueOf (rawRetryCount .toString ());
490464 } catch (NumberFormatException ex ) {
491- logger .warning ("Could not parse the request retry count: '{}'." , rawRetryCount );
465+ LOGGER .warning ("Could not parse the request retry count: '{}'." , rawRetryCount );
492466 return null ;
493467 }
494468 }
@@ -503,4 +477,81 @@ private static ClientLogger getOrCreateMethodLogger(String methodName) {
503477
504478 return CALLER_METHOD_LOGGER_CACHE .computeIfAbsent (methodName , ClientLogger ::new );
505479 }
480+
481+ private static final class LoggingHttpResponse extends HttpResponse {
482+ private final HttpResponse actualResponse ;
483+ private final StringBuilder responseLogMessage ;
484+ private final int contentLength ;
485+ private final ClientLogger logger ;
486+ private final boolean prettyPrintBody ;
487+ private final String contentTypeHeader ;
488+ private final LogLevel logLevel ;
489+
490+ private LoggingHttpResponse (HttpResponse actualResponse , StringBuilder responseLogMessage ,
491+ ClientLogger logger , LogLevel logLevel , int contentLength , String contentTypeHeader ,
492+ boolean prettyPrintBody ) {
493+ super (actualResponse .getRequest ());
494+ this .actualResponse = actualResponse ;
495+ this .responseLogMessage = responseLogMessage ;
496+ this .logger = logger ;
497+ this .logLevel = logLevel ;
498+ this .contentLength = contentLength ;
499+ this .contentTypeHeader = contentTypeHeader ;
500+ this .prettyPrintBody = prettyPrintBody ;
501+ }
502+
503+ @ Override
504+ public int getStatusCode () {
505+ return actualResponse .getStatusCode ();
506+ }
507+
508+ @ Override
509+ public String getHeaderValue (String name ) {
510+ return actualResponse .getHeaderValue (name );
511+ }
512+
513+ @ Override
514+ public HttpHeaders getHeaders () {
515+ return actualResponse .getHeaders ();
516+ }
517+
518+ @ Override
519+ public Flux <ByteBuffer > getBody () {
520+ AccessibleByteArrayOutputStream stream = new AccessibleByteArrayOutputStream (contentLength );
521+
522+ return actualResponse .getBody ()
523+ .doOnNext (byteBuffer -> {
524+ try {
525+ ImplUtils .writeByteBufferToStream (byteBuffer .duplicate (), stream );
526+ } catch (IOException ex ) {
527+ throw LOGGER .logExceptionAsError (new UncheckedIOException (ex ));
528+ }
529+ })
530+ .doFinally (ignored -> {
531+ responseLogMessage .append ("Response body:" )
532+ .append (System .lineSeparator ())
533+ .append (prettyPrintIfNeeded (logger , prettyPrintBody , contentTypeHeader ,
534+ new String (stream .toByteArray (), 0 , stream .count (), StandardCharsets .UTF_8 )))
535+ .append (System .lineSeparator ())
536+ .append ("<-- END HTTP" );
537+
538+ logMessage (logger , logLevel , responseLogMessage );
539+ });
540+ }
541+
542+ @ Override
543+ public Mono <byte []> getBodyAsByteArray () {
544+ return FluxUtil .collectBytesFromNetworkResponse (getBody (), actualResponse .getHeaders ());
545+ }
546+
547+ @ Override
548+ public Mono <String > getBodyAsString () {
549+ return getBodyAsByteArray ().map (String ::new );
550+ }
551+
552+ @ Override
553+ public Mono <String > getBodyAsString (Charset charset ) {
554+ return getBodyAsByteArray ().map (bytes -> new String (bytes , charset ));
555+ }
556+ }
506557}
0 commit comments