1010import java .util .Locale ;
1111import java .util .concurrent .ScheduledExecutorService ;
1212
13+ import com .microsoft .azure .eventhubs .impl .IOObject .IOObjectState ;
14+
1315public class RequestResponseOpener implements Operation <RequestResponseChannel > {
1416 private static final Logger TRACE_LOGGER = LoggerFactory .getLogger (RequestResponseOpener .class );
1517
@@ -21,7 +23,11 @@ public class RequestResponseOpener implements Operation<RequestResponseChannel>
2123 private final AmqpConnection eventDispatcher ;
2224 private final ScheduledExecutorService executor ;
2325
24- private boolean isOpened ;
26+ private final String instanceName = StringUtil .getRandomString ("RRO" );
27+
28+ private RequestResponseChannel currentChannel = null ;
29+ private final Object isOpenedSynchronizer = new Object ();
30+ private volatile boolean isOpening = false ;
2531
2632 public RequestResponseOpener (final SessionProvider sessionProvider , final String clientId , final String sessionName , final String linkName ,
2733 final String endpointAddress , final AmqpConnection eventDispatcher , final ScheduledExecutorService executor ) {
@@ -36,8 +42,33 @@ public RequestResponseOpener(final SessionProvider sessionProvider, final String
3642
3743 @ Override
3844 public synchronized void run (OperationResult <RequestResponseChannel , Exception > operationCallback ) {
39- if (this .isOpened ) {
40- return ;
45+ synchronized (this .isOpenedSynchronizer ) {
46+ if (this .currentChannel != null ) {
47+ if ((this .currentChannel .getState () == IOObjectState .OPENED ) || (this .currentChannel .getState () == IOObjectState .OPENING )) {
48+ if (TRACE_LOGGER .isInfoEnabled ()) {
49+ TRACE_LOGGER .info (String .format (Locale .US , "clientId[%s] rro[%s] inner channel rrc[%s] currently [%s], no need to recreate" ,
50+ this .clientId , this .instanceName , this .currentChannel .getId (), this .currentChannel .getState ().toString ()));
51+ }
52+ return ;
53+ }
54+ }
55+
56+ // Inner channel doesn't exist or it is closing/closed. Do we need to start creation of a new one,
57+ // or is that already in progress?
58+ if (this .isOpening ) {
59+ if (TRACE_LOGGER .isInfoEnabled ()) {
60+ TRACE_LOGGER .info (String .format (Locale .US , "clientId[%s] rro[%s] inner channel creation already in progress" ,
61+ this .clientId , this .instanceName ));
62+ }
63+ return ;
64+ }
65+
66+ // Need to start creating an inner channel.
67+ this .isOpening = true ;
68+ if (TRACE_LOGGER .isInfoEnabled ()) {
69+ TRACE_LOGGER .info (String .format (Locale .US , "clientId[%s] rro[%s] opening inner channel" ,
70+ this .clientId , this .instanceName ));
71+ }
4172 }
4273
4374 final Session session = this .sessionProvider .getSession (
@@ -52,14 +83,23 @@ public synchronized void run(OperationResult<RequestResponseChannel, Exception>
5283 });
5384
5485 if (session == null ) {
86+ if (TRACE_LOGGER .isErrorEnabled ()) {
87+ TRACE_LOGGER .error (String .format (Locale .US , "clientId[%s] rro[%s] got a null session, inner channel recreation cannot continue" ,
88+ this .clientId , this .instanceName ));
89+ }
90+ synchronized (RequestResponseOpener .this .isOpenedSynchronizer ) {
91+ // Inner channel creation failed.
92+ // The next time run() is called should try again.
93+ isOpening = false ;
94+ }
5595 return ;
5696 }
5797 final RequestResponseChannel requestResponseChannel = new RequestResponseChannel (
5898 this .linkName ,
5999 this .endpointAddress ,
60100 session ,
61101 this .executor );
62-
102+ this . currentChannel = requestResponseChannel ;
63103 requestResponseChannel .open (
64104 new OperationResult <Void , Exception >() {
65105 @ Override
@@ -69,21 +109,35 @@ public void onComplete(Void result) {
69109
70110 operationCallback .onComplete (requestResponseChannel );
71111
72- isOpened = true ;
112+ synchronized (RequestResponseOpener .this .isOpenedSynchronizer ) {
113+ // Inner channel creation complete.
114+ RequestResponseOpener .this .isOpening = false ;
115+ }
73116
74117 if (TRACE_LOGGER .isInfoEnabled ()) {
75- TRACE_LOGGER .info (String .format (Locale .US , "requestResponseChannel.onOpen complete clientId[%s], session[%s], link[%s], endpoint[%s]" ,
76- clientId , sessionName , linkName , endpointAddress ));
118+ TRACE_LOGGER .info (String .format (Locale .US , "requestResponseChannel.onOpen complete clientId[%s], session[%s], link[%s], endpoint[%s], rrc[%s]" ,
119+ RequestResponseOpener .this .clientId , RequestResponseOpener .this .sessionName , RequestResponseOpener .this .linkName ,
120+ RequestResponseOpener .this .endpointAddress , requestResponseChannel .getId ()));
77121 }
78122 }
79123
80124 @ Override
81125 public void onError (Exception error ) {
82126 operationCallback .onError (error );
83127
128+ synchronized (RequestResponseOpener .this .isOpenedSynchronizer ) {
129+ // Inner channel creation failed. The next time run() is called should try again.
130+ // Sometimes this.currentChannel ends up in a weird state that shows as OPENING (because
131+ // remote states are UNINITIALIZED) instead of CLOSED/CLOSING, which will still cause the
132+ // next attempt to short-circuit, so null out currentChannel to prevent that.
133+ RequestResponseOpener .this .currentChannel = null ;
134+ RequestResponseOpener .this .isOpening = false ;
135+ }
136+
84137 if (TRACE_LOGGER .isWarnEnabled ()) {
85138 TRACE_LOGGER .warn (String .format (Locale .US , "requestResponseChannel.onOpen error clientId[%s], session[%s], link[%s], endpoint[%s], error %s" ,
86- clientId , sessionName , linkName , endpointAddress , error ));
139+ RequestResponseOpener .this .clientId , RequestResponseOpener .this .sessionName , RequestResponseOpener .this .linkName ,
140+ RequestResponseOpener .this .endpointAddress , error .getMessage ()));
87141 }
88142 }
89143 },
@@ -93,11 +147,10 @@ public void onComplete(Void result) {
93147 eventDispatcher .deregisterForConnectionError (requestResponseChannel .getSendLink ());
94148 eventDispatcher .deregisterForConnectionError (requestResponseChannel .getReceiveLink ());
95149
96- isOpened = false ;
97-
98150 if (TRACE_LOGGER .isInfoEnabled ()) {
99- TRACE_LOGGER .info (String .format (Locale .US , "requestResponseChannel.onClose complete clientId[%s], session[%s], link[%s], endpoint[%s]" ,
100- clientId , sessionName , linkName , endpointAddress ));
151+ TRACE_LOGGER .info (String .format (Locale .US , "requestResponseChannel.onClose complete clientId[%s], session[%s], link[%s], endpoint[%s], rrc[%s]" ,
152+ RequestResponseOpener .this .clientId , RequestResponseOpener .this .sessionName , RequestResponseOpener .this .linkName ,
153+ RequestResponseOpener .this .endpointAddress , requestResponseChannel .getId ()));
101154 }
102155 }
103156
@@ -106,11 +159,10 @@ public void onError(Exception error) {
106159 eventDispatcher .deregisterForConnectionError (requestResponseChannel .getSendLink ());
107160 eventDispatcher .deregisterForConnectionError (requestResponseChannel .getReceiveLink ());
108161
109- isOpened = false ;
110-
111162 if (TRACE_LOGGER .isWarnEnabled ()) {
112- TRACE_LOGGER .warn (String .format (Locale .US , "requestResponseChannel.onClose error clientId[%s], session[%s], link[%s], endpoint[%s], error %s" ,
113- clientId , sessionName , linkName , endpointAddress , error ));
163+ TRACE_LOGGER .warn (String .format (Locale .US , "requestResponseChannel.onClose error clientId[%s], session[%s], link[%s], endpoint[%s], rrc[%s], error %s" ,
164+ RequestResponseOpener .this .clientId , RequestResponseOpener .this .sessionName , RequestResponseOpener .this .linkName ,
165+ RequestResponseOpener .this .endpointAddress , requestResponseChannel .getId (), error .getMessage ()));
114166 }
115167 }
116168 });
0 commit comments