@@ -96,7 +96,7 @@ public class RequestResponseChannel implements AsyncCloseable {
9696 private final AtomicBoolean isDisposed = new AtomicBoolean ();
9797 // Tracks all subscriptions listening for events from various endpoints (sender, receiver & connection),
9898 // those subscriptions should be disposed when the request-response-channel terminates.
99- private final Disposable .Composite subscriptions ;
99+ private final Disposable .Composite subscriptions = Disposables . composite () ;
100100
101101 private final AmqpRetryOptions retryOptions ;
102102 private final String replyTo ;
@@ -179,46 +179,46 @@ protected RequestResponseChannel(AmqpConnection amqpConnection, String connectio
179179
180180 // Subscribe to the events from endpoints (Sender, Receiver & Connection) and track the subscriptions.
181181 //
182- //@formatter:off
183- this . subscriptions = Disposables . composite (
184- receiveLinkHandler . getDeliveredMessages ()
185- . map ( this :: decodeDelivery )
186- . subscribe ( message -> {
187- logger . atVerbose ()
188- . addKeyValue ( "messageId" , message . getCorrelationId ())
189- . log ( "Settling message." );
190-
191- settleMessage ( message );
192- }),
193-
194- receiveLinkHandler . getEndpointStates (). subscribe (state -> {
195- updateEndpointState ( null , AmqpEndpointStateUtil . getConnectionState ( state ));
196- }, error -> {
197- handleError ( error , "Error in ReceiveLinkHandler. " );
198- onTerminalState ( "ReceiveLinkHandler" );
199- }, () -> {
200- closeAsync (). subscribe ( );
201- onTerminalState ( "ReceiveLinkHandler" );
202- }),
203-
204- sendLinkHandler .getEndpointStates ().subscribe (state -> {
205- updateEndpointState (AmqpEndpointStateUtil .getConnectionState (state ), null );
206- }, error -> {
207- handleError (error , "Error in SendLinkHandler." );
208- onTerminalState ("SendLinkHandler" );
209- }, () -> {
210- closeAsync ().subscribe ();
211- onTerminalState ("SendLinkHandler" );
212- }),
213-
214- // To ensure graceful closure of request-response-channel instance that won the race between
215- // its creation and its parent connection close.
216- amqpConnection . getShutdownSignals (). next (). flatMap ( signal -> {
217- logger . verbose ( "Shutdown signal received." );
218- return closeAsync ( );
219- }). subscribe ()
220- );
221- //@formatter:on
182+ final Disposable receiveMessagesDisposable = receiveLinkHandler . getDeliveredMessages ()
183+ . map ( this :: decodeDelivery )
184+ . subscribe ( message -> {
185+ logger . atVerbose ( )
186+ . addKeyValue ( "messageId" , message . getCorrelationId ())
187+ . log ( "Settling message." );
188+
189+ settleMessage ( message );
190+ });
191+ this . subscriptions . add ( receiveMessagesDisposable );
192+
193+ final Disposable receiveEndpointDisposable = receiveLinkHandler . getEndpointStates (). subscribe ( state -> {
194+ updateEndpointState ( null , AmqpEndpointStateUtil . getConnectionState (state ));
195+ }, error -> {
196+ handleError ( error , "Error in ReceiveLinkHandler." );
197+ onTerminalState ( " ReceiveLinkHandler" );
198+ }, () -> {
199+ closeAsync (). subscribe ();
200+ onTerminalState ( "ReceiveLinkHandler" );
201+ } );
202+ this . subscriptions . add ( receiveEndpointDisposable );
203+
204+ final Disposable sendEndpointDisposable = sendLinkHandler .getEndpointStates ().subscribe (state -> {
205+ updateEndpointState (AmqpEndpointStateUtil .getConnectionState (state ), null );
206+ }, error -> {
207+ handleError (error , "Error in SendLinkHandler." );
208+ onTerminalState ("SendLinkHandler" );
209+ }, () -> {
210+ closeAsync ().subscribe ();
211+ onTerminalState ("SendLinkHandler" );
212+ });
213+ this . subscriptions . add ( sendEndpointDisposable );
214+
215+ // To ensure graceful closure of request-response-channel instance that won the race between
216+ // its creation and its parent connection close.
217+ final Disposable shutdownDisposable = amqpConnection . getShutdownSignals (). next (). flatMap ( signal -> {
218+ logger . verbose ( "Shutdown signal received." );
219+ return closeAsync ();
220+ }). subscribe ( );
221+ this . subscriptions . add ( shutdownDisposable );
222222
223223 // Open send and receive links.
224224 //
0 commit comments