@@ -89,7 +89,7 @@ final class ReactorNettyClient implements Client {
8989 requireNonNull (context , "context must not be null" );
9090 requireNonNull (ssl , "ssl must not be null" );
9191 require (responseProcessor .asFlux () instanceof Subscriber ,
92- "responseProcessor(" + responseProcessor + ") must be a Subscriber" );
92+ "responseProcessor(" + responseProcessor + ") must be a Subscriber" );
9393
9494 this .connection = connection ;
9595 this .context = context ;
@@ -105,54 +105,54 @@ final class ReactorNettyClient implements Client {
105105 logger .debug ("Connection tracking logging is enabled" );
106106
107107 connection .addHandlerFirst (LoggingHandler .class .getSimpleName (),
108- new LoggingHandler (ReactorNettyClient .class , LogLevel .TRACE ));
108+ new LoggingHandler (ReactorNettyClient .class , LogLevel .TRACE ));
109109 }
110110
111111 ResponseSink sink = new ResponseSink ();
112112
113113 connection .inbound ().receiveObject ()
114- .doOnNext (it -> {
115- if (it instanceof ServerMessage ) {
116- if (it instanceof ReferenceCounted ) {
117- ((ReferenceCounted ) it ).retain ();
118- }
119- sink .next ((ServerMessage ) it );
120- } else {
121- // ReferenceCounted will be released by Netty.
122- throw ClientExceptions .unsupportedProtocol (it .getClass ().getTypeName ());
123- }
124- })
125- .onErrorResume (this ::resumeError )
126- .subscribe (new ResponseSubscriber (sink ));
114+ .doOnNext (it -> {
115+ if (it instanceof ServerMessage ) {
116+ if (it instanceof ReferenceCounted ) {
117+ ((ReferenceCounted ) it ).retain ();
118+ }
119+ sink .next ((ServerMessage ) it );
120+ } else {
121+ // ReferenceCounted will be released by Netty.
122+ throw ClientExceptions .unsupportedProtocol (it .getClass ().getTypeName ());
123+ }
124+ })
125+ .onErrorResume (this ::resumeError )
126+ .subscribe (new ResponseSubscriber (sink ));
127127
128128 this .requests .asFlux ()
129- .concatMap (message -> {
130- if (DEBUG_ENABLED ) {
131- logger .debug ("Request: {}" , message );
132- }
133-
134- if (message == ExitMessage .INSTANCE ) {
135- if (STATE_UPDATER .compareAndSet (this , ST_CONNECTED , ST_CLOSING )) {
136- logger .debug ("Exit message sent" );
137- } else {
138- logger .debug ("Exit message sent (duplicated / connection already closed)" );
139- }
140- }
141-
142- if (message .isSequenceReset ()) {
143- resetSequence (connection );
144- }
145-
146- return connection .outbound ().sendObject (message );
147- })
148- .onErrorResume (this ::resumeError )
149- .doAfterTerminate (this ::handleClose )
150- .subscribe ();
129+ .concatMap (message -> {
130+ if (DEBUG_ENABLED ) {
131+ logger .debug ("Request: {}" , message );
132+ }
133+
134+ if (message == ExitMessage .INSTANCE ) {
135+ if (STATE_UPDATER .compareAndSet (this , ST_CONNECTED , ST_CLOSING )) {
136+ logger .debug ("Exit message sent" );
137+ } else {
138+ logger .debug ("Exit message sent (duplicated / connection already closed)" );
139+ }
140+ }
141+
142+ if (message .isSequenceReset ()) {
143+ resetSequence (connection );
144+ }
145+
146+ return connection .outbound ().sendObject (message );
147+ })
148+ .onErrorResume (this ::resumeError )
149+ .doAfterTerminate (this ::handleClose )
150+ .subscribe ();
151151 }
152152
153153 @ Override
154154 public <T > Flux <T > exchange (ClientMessage request ,
155- BiConsumer <ServerMessage , SynchronousSink <T >> handler ) {
155+ BiConsumer <ServerMessage , SynchronousSink <T >> handler ) {
156156 requireNonNull (request , "request must not be null" );
157157
158158 return Mono .<Flux <T >>create (sink -> {
@@ -166,9 +166,9 @@ public <T> Flux<T> exchange(ClientMessage request,
166166
167167 Flux <T > responses = OperatorUtils .discardOnCancel (
168168 responseProcessor .asFlux ()
169- .doOnSubscribe (ignored -> emitNextRequest (request ))
170- .handle (handler )
171- .doOnTerminate (requestQueue )
169+ .doOnSubscribe (ignored -> emitNextRequest (request ))
170+ .handle (handler )
171+ .doOnTerminate (requestQueue )
172172 ).doOnDiscard (ReferenceCounted .class , ReferenceCounted ::release );
173173
174174 requestQueue .submit (RequestTask .wrap (request , sink , responses ));
@@ -203,9 +203,8 @@ public <T> Flux<T> exchange(FluxExchangeable<T> exchangeable) {
203203 });
204204
205205 requestQueue .submit (RequestTask .wrap (exchangeable , sink , OperatorUtils .discardOnCancel (responses )
206- .doOnDiscard (ReferenceCounted .class ,
207- ReferenceCounted ::release )
208- .doOnCancel (exchangeable ::dispose )));
206+ .doOnDiscard (ReferenceCounted .class , ReferenceCounted ::release )
207+ .doOnCancel (exchangeable ::dispose )));
209208 }).flatMapMany (Function .identity ());
210209 }
211210
0 commit comments