@@ -89,7 +89,7 @@ final class ReactorNettyClient implements Client {
8989 requireNonNull (context , "context must not be null" );
9090 requireNonNull (ssl , "ssl must not be null" );
9191 require (responseProcessor .asFlux () instanceof Subscriber ,
92- "responseProcessor(" + responseProcessor + ") must be a Subscriber" );
92+ "responseProcessor(" + responseProcessor + ") must be a Subscriber" );
9393
9494 this .connection = connection ;
9595 this .context = context ;
@@ -105,54 +105,54 @@ final class ReactorNettyClient implements Client {
105105 logger .debug ("Connection tracking logging is enabled" );
106106
107107 connection .addHandlerFirst (LoggingHandler .class .getSimpleName (),
108- new LoggingHandler (ReactorNettyClient .class , LogLevel .TRACE ));
108+ new LoggingHandler (ReactorNettyClient .class , LogLevel .TRACE ));
109109 }
110110
111111 ResponseSink sink = new ResponseSink ();
112112
113113 connection .inbound ().receiveObject ()
114- .doOnNext (it -> {
115- if (it instanceof ServerMessage ) {
116- if (it instanceof ReferenceCounted ) {
117- ((ReferenceCounted ) it ).retain ();
118- }
119- sink .next ((ServerMessage ) it );
120- } else {
121- // ReferenceCounted will be released by Netty.
122- throw ClientExceptions .unsupportedProtocol (it .getClass ().getTypeName ());
123- }
124- })
125- .onErrorResume (this ::resumeError )
126- .subscribe (new ResponseSubscriber (sink ));
114+ .doOnNext (it -> {
115+ if (it instanceof ServerMessage ) {
116+ if (it instanceof ReferenceCounted ) {
117+ ((ReferenceCounted ) it ).retain ();
118+ }
119+ sink .next ((ServerMessage ) it );
120+ } else {
121+ // ReferenceCounted will be released by Netty.
122+ throw ClientExceptions .unsupportedProtocol (it .getClass ().getTypeName ());
123+ }
124+ })
125+ .onErrorResume (this ::resumeError )
126+ .subscribe (new ResponseSubscriber (sink ));
127127
128128 this .requests .asFlux ()
129- .concatMap (message -> {
130- if (DEBUG_ENABLED ) {
131- logger .debug ("Request: {}" , message );
132- }
133-
134- if (message == ExitMessage .INSTANCE ) {
135- if (STATE_UPDATER .compareAndSet (this , ST_CONNECTED , ST_CLOSING )) {
136- logger .debug ("Exit message sent" );
137- } else {
138- logger .debug ("Exit message sent (duplicated / connection already closed)" );
139- }
140- }
141-
142- if (message .isSequenceReset ()) {
143- resetSequence (connection );
144- }
145-
146- return connection .outbound ().sendObject (message );
147- })
148- .onErrorResume (this ::resumeError )
149- .doAfterTerminate (this ::handleClose )
150- .subscribe ();
129+ .concatMap (message -> {
130+ if (DEBUG_ENABLED ) {
131+ logger .debug ("Request: {}" , message );
132+ }
133+
134+ if (message == ExitMessage .INSTANCE ) {
135+ if (STATE_UPDATER .compareAndSet (this , ST_CONNECTED , ST_CLOSING )) {
136+ logger .debug ("Exit message sent" );
137+ } else {
138+ logger .debug ("Exit message sent (duplicated / connection already closed)" );
139+ }
140+ }
141+
142+ if (message .isSequenceReset ()) {
143+ resetSequence (connection );
144+ }
145+
146+ return connection .outbound ().sendObject (message );
147+ })
148+ .onErrorResume (this ::resumeError )
149+ .doAfterTerminate (this ::handleClose )
150+ .subscribe ();
151151 }
152152
153153 @ Override
154154 public <T > Flux <T > exchange (ClientMessage request ,
155- BiConsumer <ServerMessage , SynchronousSink <T >> handler ) {
155+ BiConsumer <ServerMessage , SynchronousSink <T >> handler ) {
156156 requireNonNull (request , "request must not be null" );
157157
158158 return Mono .<Flux <T >>create (sink -> {
@@ -166,9 +166,9 @@ public <T> Flux<T> exchange(ClientMessage request,
166166
167167 Flux <T > responses = OperatorUtils .discardOnCancel (
168168 responseProcessor .asFlux ()
169- .doOnSubscribe (ignored -> emitNextRequest (request ))
170- .handle (handler )
171- .doOnTerminate (requestQueue )
169+ .doOnSubscribe (ignored -> emitNextRequest (request ))
170+ .handle (handler )
171+ .doOnTerminate (requestQueue )
172172 ).doOnDiscard (ReferenceCounted .class , ReferenceCounted ::release );
173173
174174 requestQueue .submit (RequestTask .wrap (request , sink , responses ));
@@ -203,8 +203,9 @@ public <T> Flux<T> exchange(FluxExchangeable<T> exchangeable) {
203203 });
204204
205205 requestQueue .submit (RequestTask .wrap (exchangeable , sink , OperatorUtils .discardOnCancel (responses )
206- .doOnDiscard (ReferenceCounted .class , ReferenceCounted ::release )
207- .doOnCancel (exchangeable ::dispose )));
206+ .doOnDiscard (ReferenceCounted .class ,
207+ ReferenceCounted ::release )
208+ .doOnCancel (exchangeable ::dispose )));
208209 }).flatMapMany (Function .identity ());
209210 }
210211
@@ -272,7 +273,7 @@ private static void resetSequence(Connection connection) {
272273 @ Override
273274 public String toString () {
274275 return String .format ("ReactorNettyClient(%s){connectionId=%d}" ,
275- isConnected () ? "activating" : "closing or closed" , context .getConnectionId ());
276+ isConnected ()? "activating" : "closing or closed" , context .getConnectionId ());
276277 }
277278
278279 private void emitNextRequest (ClientMessage request ) {
@@ -379,17 +380,17 @@ public void error(Throwable e) {
379380
380381 @ Override
381382 public void next (ServerMessage message ) {
382- if (message instanceof WarningMessage ) {
383- int warnings = (( WarningMessage ) message ). getWarnings ();
384- if ( warnings == 0 ) {
385- if (DEBUG_ENABLED ) {
383+ if (DEBUG_ENABLED ) {
384+ if ( message instanceof WarningMessage ) {
385+ int warnings = (( WarningMessage ) message ). getWarnings ();
386+ if (warnings == 0 ) {
386387 logger .debug ("Response: {}" , message );
388+ } else {
389+ logger .debug ("Response: {}, reports {} warning(s)" , message , warnings );
387390 }
388- } else if ( INFO_ENABLED ) {
389- logger .info ("Response: {}, reports {} warning(s) " , message , warnings );
391+ } else {
392+ logger .debug ("Response: {}" , message );
390393 }
391- } else if (DEBUG_ENABLED ) {
392- logger .debug ("Response: {}" , message );
393394 }
394395
395396 responseProcessor .emitNext (message , EmitFailureHandler .FAIL_FAST );
0 commit comments