Skip to content

Commit fcf8237

Browse files
authored
Removing inactive/timed out sessions (Azure#32365)
* Adding test case for non-retried session. * Adding test case for non-retried session. * Removing old session. * Add changelog entry.
1 parent 69fc168 commit fcf8237

File tree

3 files changed

+115
-23
lines changed

3 files changed

+115
-23
lines changed

sdk/core/azure-core-amqp/CHANGELOG.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88

99
### Bugs Fixed
1010

11+
- Removing inactive session when it has timed out, so `ReactorConnection.getSession(String)` does not return the same session.
12+
1113
### Other Changes
1214

1315
## 2.8.0 (2022-11-04)
@@ -19,8 +21,6 @@
1921

2022
### Other Changes
2123

22-
-
23-
2424
#### Dependency Updates
2525

2626
- Upgraded `azure-core` from `1.33.0` to `1.34.0`.

sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import java.util.concurrent.RejectedExecutionException;
4242
import java.util.concurrent.atomic.AtomicBoolean;
4343

44+
import static com.azure.core.amqp.exception.AmqpErrorCondition.TIMEOUT_ERROR;
4445
import static com.azure.core.amqp.implementation.AmqpLoggingUtils.addShutdownSignal;
4546
import static com.azure.core.amqp.implementation.AmqpLoggingUtils.addSignalTypeAndResult;
4647
import static com.azure.core.amqp.implementation.AmqpLoggingUtils.createContextWithConnectionId;
@@ -114,7 +115,7 @@ public ReactorConnection(String connectionId, ConnectionOptions connectionOption
114115
this.connectionOptions = connectionOptions;
115116
this.reactorProvider = reactorProvider;
116117
this.connectionId = connectionId;
117-
this.logger = new ClientLogger(ReactorConnection.class, createContextWithConnectionId(connectionId));
118+
this.logger = new ClientLogger(ReactorConnection.class, createContextWithConnectionId(connectionId));
118119
this.handlerProvider = handlerProvider;
119120
this.tokenManagerProvider = Objects.requireNonNull(tokenManagerProvider,
120121
"'tokenManagerProvider' cannot be null.");
@@ -276,7 +277,7 @@ public Map<String, Object> getConnectionProperties() {
276277
@Override
277278
public Mono<AmqpSession> createSession(String sessionName) {
278279
return connectionMono.map(connection -> {
279-
final SessionSubscription sessionSubscription = sessionMap.computeIfAbsent(sessionName, key -> {
280+
return sessionMap.computeIfAbsent(sessionName, key -> {
280281
final SessionHandler sessionHandler = handlerProvider.createSessionHandler(connectionId,
281282
getFullyQualifiedNamespace(), key, connectionOptions.getRetry().getTryTimeout());
282283
final Session session = connection.session();
@@ -309,15 +310,27 @@ public Mono<AmqpSession> createSession(String sessionName) {
309310

310311
return new SessionSubscription(amqpSession, subscription);
311312
});
312-
313-
return sessionSubscription;
314313
}).flatMap(sessionSubscription -> {
315314
final Mono<AmqpEndpointState> activeSession = sessionSubscription.getSession().getEndpointStates()
316315
.filter(state -> state == AmqpEndpointState.ACTIVE)
317316
.next()
318317
.timeout(retryPolicy.getRetryOptions().getTryTimeout(), Mono.error(() -> new AmqpException(true,
319-
String.format("connectionId[%s] sessionName[%s] Timeout waiting for session to be active.",
320-
connectionId, sessionName), handler.getErrorContext())));
318+
TIMEOUT_ERROR, String.format(
319+
"connectionId[%s] sessionName[%s] Timeout waiting for session to be active.", connectionId,
320+
sessionName), handler.getErrorContext())))
321+
.doOnError(error -> {
322+
// Clean up the subscription if there was an error waiting for the session to become active.
323+
324+
if (!(error instanceof AmqpException)) {
325+
return;
326+
}
327+
328+
final AmqpException amqpException = (AmqpException) error;
329+
if (amqpException.getErrorCondition() == TIMEOUT_ERROR) {
330+
final SessionSubscription removed = sessionMap.remove(sessionName);
331+
removed.dispose();
332+
}
333+
});
321334

322335
return activeSession.thenReturn(sessionSubscription.getSession());
323336
});

sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorConnectionTest.java

Lines changed: 94 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -60,12 +60,14 @@
6060
import java.util.Map;
6161
import java.util.concurrent.TimeUnit;
6262
import java.util.concurrent.atomic.AtomicBoolean;
63+
import java.util.concurrent.atomic.AtomicInteger;
6364
import java.util.function.Consumer;
6465

6566
import static org.junit.jupiter.api.Assertions.assertEquals;
6667
import static org.junit.jupiter.api.Assertions.assertFalse;
6768
import static org.junit.jupiter.api.Assertions.assertNotNull;
6869
import static org.junit.jupiter.api.Assertions.assertNull;
70+
import static org.junit.jupiter.api.Assertions.assertSame;
6971
import static org.junit.jupiter.api.Assertions.assertTrue;
7072
import static org.mockito.ArgumentMatchers.any;
7173
import static org.mockito.ArgumentMatchers.anyInt;
@@ -99,6 +101,7 @@ class ReactorConnectionTest {
99101
private ReactorConnection connection;
100102
private ConnectionHandler connectionHandler;
101103
private SessionHandler sessionHandler;
104+
private SessionHandler sessionHandler2;
102105
private AutoCloseable mocksCloseable;
103106
private ConnectionOptions connectionOptions;
104107

@@ -155,8 +158,11 @@ void setup() throws IOException {
155158
.thenReturn(connectionHandler);
156159
sessionHandler = new SessionHandler(CONNECTION_ID, FULLY_QUALIFIED_NAMESPACE, SESSION_NAME, reactorDispatcher,
157160
TEST_DURATION, AmqpMetricsProvider.noop());
161+
sessionHandler2 = new SessionHandler(CONNECTION_ID, FULLY_QUALIFIED_NAMESPACE, SESSION_NAME, reactorDispatcher,
162+
TEST_DURATION, AmqpMetricsProvider.noop());
163+
158164
when(reactorHandlerProvider.createSessionHandler(anyString(), anyString(), anyString(), any(Duration.class)))
159-
.thenReturn(sessionHandler);
165+
.thenReturn(sessionHandler, sessionHandler2, null);
160166

161167
connection = new ReactorConnection(CONNECTION_ID, connectionOptions, reactorProvider, reactorHandlerProvider,
162168
tokenManager, messageSerializer, SenderSettleMode.SETTLED, ReceiverSettleMode.FIRST);
@@ -170,6 +176,8 @@ void setup() throws IOException {
170176

171177
@AfterEach
172178
void teardown() throws Exception {
179+
System.err.println("Clean-up");
180+
173181
connectionHandler.close();
174182
sessionHandler.close();
175183

@@ -191,22 +199,22 @@ void createConnection() {
191199
final Map<String, Object> expectedProperties = new HashMap<>(connectionHandler.getConnectionProperties());
192200

193201
// Assert
194-
Assertions.assertNotNull(connection);
195-
Assertions.assertEquals(CONNECTION_ID, connection.getId());
196-
Assertions.assertEquals(FULLY_QUALIFIED_NAMESPACE, connection.getFullyQualifiedNamespace());
202+
assertNotNull(connection);
203+
assertEquals(CONNECTION_ID, connection.getId());
204+
assertEquals(FULLY_QUALIFIED_NAMESPACE, connection.getFullyQualifiedNamespace());
197205

198-
Assertions.assertEquals(connectionHandler.getMaxFrameSize(), connection.getMaxFrameSize());
206+
assertEquals(connectionHandler.getMaxFrameSize(), connection.getMaxFrameSize());
199207

200-
Assertions.assertNotNull(connection.getConnectionProperties());
201-
Assertions.assertEquals(expectedProperties.size(), connection.getConnectionProperties().size());
208+
assertNotNull(connection.getConnectionProperties());
209+
assertEquals(expectedProperties.size(), connection.getConnectionProperties().size());
202210

203211
expectedProperties.forEach((key, value) -> {
204212
final Object removed = connection.getConnectionProperties().remove(key);
205-
Assertions.assertNotNull(removed);
213+
assertNotNull(removed);
206214

207215
final String expected = String.valueOf(value);
208216
final String actual = String.valueOf(removed);
209-
Assertions.assertEquals(expected, actual);
217+
assertEquals(expected, actual);
210218
});
211219
assertTrue(connection.getConnectionProperties().isEmpty());
212220
}
@@ -227,26 +235,27 @@ void createSession() {
227235
when(connectionProtonJ.getRemoteState()).thenReturn(EndpointState.ACTIVE);
228236
connectionHandler.onConnectionRemoteOpen(connectionEvent);
229237

238+
230239
sessionHandler.onSessionRemoteOpen(sessionEvent);
231240

232241
// Act & Assert
233242
StepVerifier.create(connection.createSession(SESSION_NAME))
234243
.assertNext(s -> {
235-
Assertions.assertNotNull(s);
236-
Assertions.assertEquals(SESSION_NAME, s.getSessionName());
244+
assertNotNull(s);
245+
assertEquals(SESSION_NAME, s.getSessionName());
237246
assertTrue(s instanceof ReactorSession);
238-
Assertions.assertSame(session, ((ReactorSession) s).session());
247+
assertSame(session, ((ReactorSession) s).session());
239248
})
240249
.expectComplete()
241250
.verify(VERIFY_TIMEOUT);
242251

243252
// Assert that the same instance is obtained and we don't get a new session with the same name.
244253
StepVerifier.create(connection.createSession(SESSION_NAME))
245254
.assertNext(s -> {
246-
Assertions.assertNotNull(s);
247-
Assertions.assertEquals(SESSION_NAME, s.getSessionName());
255+
assertNotNull(s);
256+
assertEquals(SESSION_NAME, s.getSessionName());
248257
assertTrue(s instanceof ReactorSession);
249-
Assertions.assertSame(session, ((ReactorSession) s).session());
258+
assertSame(session, ((ReactorSession) s).session());
250259
})
251260
.expectComplete()
252261
.verify(VERIFY_TIMEOUT);
@@ -277,6 +286,76 @@ void createSessionWhenConnectionInactive() {
277286
.verify(VERIFY_TIMEOUT);
278287
}
279288

289+
@Test
290+
void createSessionFailureWorksWithRetry() {
291+
when(reactor.process()).then(invocation -> {
292+
TimeUnit.SECONDS.sleep(5);
293+
return true;
294+
});
295+
296+
final AtomicInteger numberOfInvocations = new AtomicInteger();
297+
298+
final Session session2 = mock(Session.class);
299+
final Record record2 = mock(Record.class);
300+
301+
when(session2.attachments()).thenReturn(record2);
302+
303+
when(session2.getRemoteState()).thenAnswer(invocation -> {
304+
if (numberOfInvocations.getAndIncrement() < 1) {
305+
return EndpointState.UNINITIALIZED;
306+
} else {
307+
return EndpointState.ACTIVE;
308+
}
309+
});
310+
311+
when(session.attachments()).thenReturn(record);
312+
when(session.getRemoteState()).thenAnswer(invocation -> {
313+
return EndpointState.UNINITIALIZED;
314+
});
315+
316+
when(reactor.connectionToHost(connectionHandler.getHostname(), connectionHandler.getProtocolPort(),
317+
connectionHandler)).thenReturn(connectionProtonJ);
318+
when(connectionProtonJ.session()).thenReturn(session, session2);
319+
320+
// We only want it to emit a session when it is active.
321+
when(connectionProtonJ.getRemoteState()).thenReturn(EndpointState.ACTIVE);
322+
connectionHandler.onConnectionRemoteOpen(connectionEvent);
323+
324+
final Event sessionEvent2 = mock(Event.class);
325+
when(sessionEvent2.getSession()).thenReturn(session2);
326+
327+
328+
// Act & Assert
329+
330+
// Assert that the first session timed out while being created.
331+
StepVerifier.create(connection.createSession(SESSION_NAME))
332+
.expectErrorSatisfies(error -> {
333+
assertTrue(error instanceof AmqpException);
334+
335+
final AmqpException exception = (AmqpException) error;
336+
assertTrue(exception.isTransient());
337+
})
338+
.verify();
339+
340+
// Assert that the second time, a new session is obtained.
341+
StepVerifier.create(connection.createSession(SESSION_NAME))
342+
.then(() -> {
343+
System.out.println("Pushing new session open downstream.");
344+
sessionHandler2.onSessionRemoteOpen(sessionEvent2);
345+
})
346+
.assertNext(s -> {
347+
assertNotNull(s);
348+
assertEquals(SESSION_NAME, s.getSessionName());
349+
assertTrue(s instanceof ReactorSession);
350+
assertSame(session2, ((ReactorSession) s).session());
351+
})
352+
.expectComplete()
353+
.verify();
354+
355+
verify(record).set(Handler.class, Handler.class, sessionHandler);
356+
verify(record2).set(Handler.class, Handler.class, sessionHandler2);
357+
}
358+
280359
/**
281360
* Creates a session with the given name and set handler.
282361
*/

0 commit comments

Comments
 (0)