77import com .azure .core .amqp .AmqpEndpointState ;
88import com .azure .core .amqp .AmqpRetryOptions ;
99import com .azure .core .amqp .exception .AmqpErrorContext ;
10+ import com .azure .core .amqp .exception .AmqpException ;
1011import com .azure .core .amqp .implementation .handler .ReceiveLinkHandler ;
1112import com .azure .core .amqp .implementation .handler .SendLinkHandler ;
1213import com .azure .core .util .AsyncCloseable ;
3738import reactor .core .scheduler .Schedulers ;
3839
3940import java .io .IOException ;
41+ import java .util .Map ;
4042import java .util .UUID ;
4143import java .util .concurrent .ConcurrentSkipListMap ;
4244import java .util .concurrent .RejectedExecutionException ;
5052
5153/**
5254 * Represents a bidirectional link between the message broker and the client. Allows client to send a request to the
53- * broker and receive the associated response.
55+ * broker and receive the associated response. The {@link RequestResponseChannel} composes a proton-j {@link Sender}
56+ * link and {@link Receiver} link.
5457 */
5558public class RequestResponseChannel implements AsyncCloseable {
56- private final ConcurrentSkipListMap <UnsignedLong , MonoSink <Message >> unconfirmedSends =
57- new ConcurrentSkipListMap <>();
58- private final AtomicBoolean hasError = new AtomicBoolean ();
59- private final Sinks .Many <AmqpEndpointState > endpointStates = Sinks .many ().multicast ().onBackpressureBuffer ();
6059 private final ClientLogger logger = new ClientLogger (RequestResponseChannel .class );
6160
62- // The request response channel is closed when both the receive and send link component are disposed of.
63- private final AtomicInteger pendingDisposes = new AtomicInteger (2 );
64- private final Sinks .One <Void > closeMono = Sinks .one ();
65-
6661 private final Sender sendLink ;
6762 private final Receiver receiveLink ;
68- private final String replyTo ;
69- private final MessageSerializer messageSerializer ;
70- private final AmqpRetryOptions retryOptions ;
71- private final ReactorProvider provider ;
72- private final AtomicBoolean isDisposed = new AtomicBoolean ();
73- private final AtomicLong requestId = new AtomicLong (0 );
7463 private final SendLinkHandler sendLinkHandler ;
7564 private final ReceiveLinkHandler receiveLinkHandler ;
76- private final Disposable .Composite subscriptions ;
7765 private final SenderSettleMode senderSettleMode ;
78- private final String linkName ;
66+ // The request-response-channel endpoint states derived from the latest state of the send and receive links.
67+ private final Sinks .Many <AmqpEndpointState > endpointStates = Sinks .many ().multicast ().onBackpressureBuffer ();
68+ // The latest state of the send and receive links.
69+ private volatile AmqpEndpointState sendLinkState ;
70+ private volatile AmqpEndpointState receiveLinkState ;
71+ // Generates unique Id for each message send over the request-response-channel.
72+ private final AtomicLong requestId = new AtomicLong (0 );
73+ // Tracks the sends that are not yet acknowledged by the broker. Map key is the unique Id
74+ // of the send and value is the MonoSink to notify upon broker acknowledgment.
75+ private final ConcurrentSkipListMap <UnsignedLong , MonoSink <Message >> unconfirmedSends =
76+ new ConcurrentSkipListMap <>();
77+
78+ // Tracks the count of links that is not terminated yet. Once both the receive and send links
79+ // are terminated (i.e. pendingLinkTerminations is zero), the request-response-channel is
80+ // considered as terminated.
81+ private final AtomicInteger pendingLinkTerminations = new AtomicInteger (2 );
82+ // The Mono that completes once the request-response-channel is terminated.
83+ private final Sinks .One <Void > closeMono = Sinks .one ();
84+ // A flag indicating that an error in either of the links caused link to terminate.
85+ private final AtomicBoolean hasError = new AtomicBoolean ();
86+ // A flag indicating that the request-response-channel is closed (after the call to closeAsync()).
87+ private final AtomicBoolean isDisposed = new AtomicBoolean ();
88+ // Tracks all subscriptions listening for events from various endpoints (sender, receiver & connection),
89+ // those subscriptions should be disposed when the request-response-channel terminates.
90+ private final Disposable .Composite subscriptions ;
91+
7992 private final String connectionId ;
93+ private final String linkName ;
94+ private final AmqpRetryOptions retryOptions ;
95+ private final String replyTo ;
8096 private final String activeEndpointTimeoutMessage ;
81-
82- private volatile AmqpEndpointState sendLinkEndpoint ;
83- private volatile AmqpEndpointState receiveLinkEndpoint ;
97+ private final MessageSerializer messageSerializer ;
98+ // The API calls on proton-j entities (e.g., Sender, Receiver) must happen in the non-blocking thread
99+ // (aka ReactorThread) assigned to the connection's org.apache.qpid.proton.reactor.Reactor object.
100+ // The provider exposes ReactorDispatcher that can schedule such calls on the ReactorThread.
101+ private final ReactorProvider provider ;
84102
85103 /**
86104 * Creates a new instance of {@link RequestResponseChannel} to send and receive responses from the {@code
@@ -113,22 +131,24 @@ protected RequestResponseChannel(AmqpConnection amqpConnection, String connectio
113131
114132 this .replyTo = entityPath .replace ("$" , "" ) + "-client-reply-to" ;
115133 this .messageSerializer = messageSerializer ;
134+
135+ // Setup send (request) link.
116136 this .sendLink = session .sender (linkName + ":sender" );
117- final Target target = new Target ();
118- target .setAddress (entityPath );
119- this .sendLink .setTarget (target );
120- sendLink .setSource (new Source ());
137+ final Target senderTarget = new Target ();
138+ senderTarget .setAddress (entityPath );
139+ this .sendLink .setTarget (senderTarget );
140+ this . sendLink .setSource (new Source ());
121141 this .sendLink .setSenderSettleMode (senderSettleMode );
122142
123143 this .sendLinkHandler = handlerProvider .createSendLinkHandler (connectionId , fullyQualifiedNamespace , linkName ,
124144 entityPath );
125-
126145 BaseHandler .setHandler (sendLink , sendLinkHandler );
127146
147+ // Setup receive (response) link.
128148 this .receiveLink = session .receiver (linkName + ":receiver" );
129- final Source source = new Source ();
130- source .setAddress (entityPath );
131- this .receiveLink .setSource (source );
149+ final Source receiverSource = new Source ();
150+ receiverSource .setAddress (entityPath );
151+ this .receiveLink .setSource (receiverSource );
132152
133153 final Target receiverTarget = new Target ();
134154 receiverTarget .setAddress (replyTo );
@@ -138,8 +158,10 @@ protected RequestResponseChannel(AmqpConnection amqpConnection, String connectio
138158
139159 this .receiveLinkHandler = handlerProvider .createReceiveLinkHandler (connectionId , fullyQualifiedNamespace ,
140160 linkName , entityPath );
141- BaseHandler .setHandler (this . receiveLink , receiveLinkHandler );
161+ BaseHandler .setHandler (receiveLink , receiveLinkHandler );
142162
163+ // Subscribe to the events from endpoints (Sender, Receiver & Connection) and track the subscriptions.
164+ //
143165 //@formatter:off
144166 this .subscriptions = Disposables .composite (
145167 receiveLinkHandler .getDeliveredMessages ()
@@ -180,12 +202,13 @@ protected RequestResponseChannel(AmqpConnection amqpConnection, String connectio
180202 );
181203 //@formatter:on
182204
183- // If we try to do proton-j API calls such as opening/closing/sending on AMQP links, it may
184- // encounter a race condition. So, we are forced to use the dispatcher.
205+ // Open send and receive links.
206+ //
207+ // Schedule API calls on proton-j entities on the ReactorThread associated with the connection.
185208 try {
186- provider .getReactorDispatcher ().invoke (() -> {
187- sendLink .open ();
188- receiveLink .open ();
209+ this . provider .getReactorDispatcher ().invoke (() -> {
210+ this . sendLink .open ();
211+ this . receiveLink .open ();
189212 });
190213 } catch (IOException e ) {
191214 throw logger .logExceptionAsError (new RuntimeException (String .format (
@@ -194,9 +217,9 @@ protected RequestResponseChannel(AmqpConnection amqpConnection, String connectio
194217 }
195218
196219 /**
197- * Gets the endpoint states for the request/ response channel.
220+ * Gets the endpoint states for the request- response- channel.
198221 *
199- * @return The endpoint states for the request/ response channel.
222+ * @return The endpoint states for the request- response- channel.
200223 */
201224 public Flux <AmqpEndpointState > getEndpointStates () {
202225 return endpointStates .asFlux ();
@@ -227,6 +250,7 @@ public Mono<Void> closeAsync() {
227250
228251 return Mono .fromRunnable (() -> {
229252 try {
253+ // Schedule API calls on proton-j entities on the ReactorThread associated with the connection.
230254 provider .getReactorDispatcher ().invoke (() -> {
231255 logger .verbose ("connectionId[{}] linkName[{}] Closing send link and receive link." ,
232256 connectionId , linkName );
@@ -286,19 +310,18 @@ public Mono<Message> sendWithAck(final Message message, DeliveryState deliverySt
286310 message .setMessageId (messageId );
287311 message .setReplyTo (replyTo );
288312
289- final Mono <Void > activeEndpoints = Mono .when (
313+ final Mono <Void > onActiveEndpoints = Mono .when (
290314 sendLinkHandler .getEndpointStates ().takeUntil (x -> x == EndpointState .ACTIVE ),
291315 receiveLinkHandler .getEndpointStates ().takeUntil (x -> x == EndpointState .ACTIVE ));
292316
293- return RetryUtil .withRetry (activeEndpoints , retryOptions , activeEndpointTimeoutMessage )
317+ return RetryUtil .withRetry (onActiveEndpoints , retryOptions , activeEndpointTimeoutMessage )
294318 .then (Mono .create (sink -> {
295319 try {
296320 logger .verbose ("connectionId[{}], linkName[{}]: Scheduling on dispatcher. MessageId[{}]" ,
297321 connectionId , linkName , messageId );
298322 unconfirmedSends .putIfAbsent (messageId , sink );
299323
300- // If we try to do proton-j API calls such as sending on AMQP links, it may encounter a race
301- // condition. So, we are forced to use the dispatcher.
324+ // Schedule API calls on proton-j entities on the ReactorThread associated with the connection.
302325 provider .getReactorDispatcher ().invoke (() -> {
303326 final Delivery delivery = sendLink .delivery (UUID .randomUUID ().toString ()
304327 .replace ("-" , "" ).getBytes (UTF_8 ));
@@ -379,25 +402,28 @@ private void handleError(Throwable error, String message) {
379402 return false ;
380403 });
381404
382- unconfirmedSends .forEach ((key , value ) -> value .error (error ));
383- unconfirmedSends .clear ();
405+ terminateUnconfirmedSends (error );
384406
385407 closeAsync ().subscribe ();
386408 }
387409
388410 private void onTerminalState (String handlerName ) {
389- if (pendingDisposes .get () = = 0 ) {
411+ if (pendingLinkTerminations .get () < = 0 ) {
390412 logger .verbose ("connectionId[{}] linkName[{}]: Already disposed send/receive links." );
391413 return ;
392414 }
393415
394- final int remaining = pendingDisposes .decrementAndGet ();
416+ final int remaining = pendingLinkTerminations .decrementAndGet ();
395417 logger .verbose ("connectionId[{}] linkName[{}]: {} disposed. Remaining: {}" ,
396418 connectionId , linkName , handlerName , remaining );
397419
398420 if (remaining == 0 ) {
399421 subscriptions .dispose ();
400422
423+ terminateUnconfirmedSends (new AmqpException (true ,
424+ "The RequestResponseChannel didn't receive the acknowledgment for the send due receive link termination." ,
425+ null ));
426+
401427 endpointStates .emitComplete (((signalType , emitResult ) -> onEmitSinkFailure (signalType , emitResult ,
402428 "Could not emit complete signal." )));
403429
@@ -413,18 +439,36 @@ private boolean onEmitSinkFailure(SignalType signalType, Sinks.EmitResult emitRe
413439 return false ;
414440 }
415441
416- private synchronized void updateEndpointState (AmqpEndpointState newSendLink , AmqpEndpointState newReceiveLink ) {
417- if (newSendLink != null ) {
418- sendLinkEndpoint = newSendLink ;
419- } else if (newReceiveLink != null ) {
420- receiveLinkEndpoint = newReceiveLink ;
442+ // Derive and emits the endpoint state for this RequestResponseChannel from the current endpoint state
443+ // of send and receive links.
444+ private synchronized void updateEndpointState (AmqpEndpointState sendLinkState , AmqpEndpointState receiveLinkState ) {
445+ if (sendLinkState != null ) {
446+ this .sendLinkState = sendLinkState ;
447+ } else if (receiveLinkState != null ) {
448+ this .receiveLinkState = receiveLinkState ;
421449 }
422450
423451 logger .verbose ("connectionId[{}] linkName[{}] sendState[{}] receiveState[{}] Updating endpoint states." ,
424- connectionId , linkName , sendLinkEndpoint , receiveLinkEndpoint );
452+ connectionId , linkName , this .sendLinkState , this .receiveLinkState );
453+
454+ if (this .sendLinkState == this .receiveLinkState ) {
455+ this .endpointStates .emitNext (this .sendLinkState , Sinks .EmitFailureHandler .FAIL_FAST );
456+ }
457+ }
425458
426- if (sendLinkEndpoint == receiveLinkEndpoint ) {
427- endpointStates .emitNext (sendLinkEndpoint , Sinks .EmitFailureHandler .FAIL_FAST );
459+ // Terminate the unconfirmed MonoSinks by notifying the given error.
460+ private void terminateUnconfirmedSends (Throwable error ) {
461+ logger .verbose ("connectionId[{}] linkName[{}] terminating {} unconfirmed sends (reason: {})." ,
462+ connectionId , linkName , unconfirmedSends .size (), error .getMessage ());
463+ Map .Entry <UnsignedLong , MonoSink <Message >> next ;
464+ int count = 0 ;
465+ while ((next = unconfirmedSends .pollFirstEntry ()) != null ) {
466+ // pollFirstEntry: atomic retrieve and remove of each entry.
467+ next .getValue ().error (error );
468+ count ++;
428469 }
470+ // The below log can also help debug if the external code that error() calls into never return.
471+ logger .verbose ("connectionId[{}] linkName[{}] completed the termination of {} unconfirmed sends (reason: {})." ,
472+ connectionId , linkName , count , error .getMessage ());
429473 }
430474}
0 commit comments