Skip to content

Commit def2a63

Browse files
authored
Fix an issue of creating multiple sessions for $management & $cbs channel for a single connection and improve logging (#443)
* Fix an issue of creating multiple sessions for $management & $cbs for a connection and improve logging
1 parent 4267307 commit def2a63

37 files changed

+265
-209
lines changed

azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/EventProcessorHost.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
import java.net.URISyntaxException;
1515
import java.security.InvalidKeyException;
16+
import java.util.Locale;
1617
import java.util.UUID;
1718
import java.util.concurrent.*;
1819
import java.util.concurrent.atomic.AtomicInteger;
@@ -556,7 +557,7 @@ public Thread newThread(Runnable r) {
556557
}
557558

558559
private String getNamePrefix() {
559-
return String.format("[%s|%s|%s]-%s-",
560+
return String.format(Locale.US, "[%s|%s|%s]-%s-",
560561
this.entityName, this.consumerGroupName, this.hostName, poolNumber.getAndIncrement());
561562
}
562563

azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/ConnectionStringBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -407,7 +407,7 @@ private void parseConnectionString(final String connectionString) {
407407
this.transportType = TransportType.fromString(values[valueIndex]);
408408
} catch (IllegalArgumentException exception) {
409409
throw new IllegalConnectionStringFormatException(
410-
String.format("Invalid value specified for property '%s' in the ConnectionString.", TransportTypeConfigName),
410+
String.format(Locale.US, "Invalid value specified for property '%s' in the ConnectionString.", TransportTypeConfigName),
411411
exception);
412412
}
413413
} else {

azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ActiveClientTokenManager.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,10 @@ public ActiveClientTokenManager(
4141
}
4242

4343
public void cancel() {
44+
if (TRACE_LOGGER.isInfoEnabled()) {
45+
TRACE_LOGGER.info(String.format(Locale.US, "clientEntity[%s] - canceling ActiveClientLinkManager",
46+
clientEntity.getClientId()));
47+
}
4448

4549
synchronized (this.timerLock) {
4650
this.timer.cancel(false);
@@ -62,9 +66,8 @@ public void run() {
6266
} else {
6367

6468
if (TRACE_LOGGER.isInfoEnabled()) {
65-
TRACE_LOGGER.info(
66-
String.format(Locale.US,
67-
"clientEntity[%s] - closing ActiveClientLinkManager", clientEntity.getClientId()));
69+
TRACE_LOGGER.info(String.format(Locale.US, "clientEntity[%s] - closing ActiveClientLinkManager",
70+
clientEntity.getClientId()));
6871
}
6972
}
7073
}

azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/BaseLinkHandler.java

Lines changed: 17 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,16 @@
99
import org.slf4j.Logger;
1010
import org.slf4j.LoggerFactory;
1111

12+
import java.util.Locale;
13+
1214
public class BaseLinkHandler extends BaseHandler {
1315
protected static final Logger TRACE_LOGGER = LoggerFactory.getLogger(BaseLinkHandler.class);
1416

17+
private final String name;
1518
private final AmqpLink underlyingEntity;
1619

17-
public BaseLinkHandler(final AmqpLink amqpLink) {
20+
public BaseLinkHandler(final AmqpLink amqpLink, final String name) {
21+
this.name = name;
1822
this.underlyingEntity = amqpLink;
1923
}
2024

@@ -24,10 +28,8 @@ public void onLinkLocalClose(Event event) {
2428
final ErrorCondition condition = link.getCondition();
2529

2630
if (TRACE_LOGGER.isInfoEnabled()) {
27-
TRACE_LOGGER.info(String.format("onLinkLocalClose linkName[%s], errorCondition[%s], errorDescription[%s]",
28-
link.getName(),
29-
condition != null ? condition.getCondition() : "n/a",
30-
condition != null ? condition.getDescription() : "n/a"));
31+
TRACE_LOGGER.info(String.format(Locale.US, "onLinkLocalClose clientName[%s], linkName[%s], errorCondition[%s], errorDescription[%s]",
32+
this.name, link.getName(), condition != null ? condition.getCondition() : "n/a", condition != null ? condition.getDescription() : "n/a"));
3133
}
3234

3335
closeSession(link, link.getCondition());
@@ -39,10 +41,8 @@ public void onLinkRemoteClose(Event event) {
3941
final ErrorCondition condition = link.getRemoteCondition();
4042

4143
if (TRACE_LOGGER.isInfoEnabled()) {
42-
TRACE_LOGGER.info(String.format("onLinkRemoteClose linkName[%s], errorCondition[%s], errorDescription[%s]",
43-
link.getName(),
44-
condition != null ? condition.getCondition() : "n/a",
45-
condition != null ? condition.getDescription() : "n/a"));
44+
TRACE_LOGGER.info(String.format(Locale.US, "onLinkRemoteClose clientName[%s], linkName[%s], errorCondition[%s], errorDescription[%s]",
45+
this.name, link.getName(), condition != null ? condition.getCondition() : "n/a", condition != null ? condition.getDescription() : "n/a"));
4646
}
4747

4848
handleRemoteLinkClosed(event);
@@ -54,31 +54,26 @@ public void onLinkRemoteDetach(Event event) {
5454
final ErrorCondition condition = link.getCondition();
5555

5656
if (TRACE_LOGGER.isInfoEnabled()) {
57-
TRACE_LOGGER.info(String.format("onLinkRemoteDetach linkName[%s], errorCondition[%s], errorDescription[%s]",
58-
link.getName(),
59-
condition != null ? condition.getCondition() : "n/a",
60-
condition != null ? condition.getDescription() : "n/a"));
57+
TRACE_LOGGER.info(String.format(Locale.US, "onLinkRemoteDetach clientName[%s], linkName[%s], errorCondition[%s], errorDescription[%s]",
58+
this.name, link.getName(), condition != null ? condition.getCondition() : "n/a", condition != null ? condition.getDescription() : "n/a"));
6159
}
6260

6361
handleRemoteLinkClosed(event);
6462
}
6563

6664
public void processOnClose(Link link, ErrorCondition condition) {
6765
if (TRACE_LOGGER.isInfoEnabled()) {
68-
TRACE_LOGGER.info(String.format("processOnClose linkName[%s], errorCondition[%s], errorDescription[%s]",
69-
link.getName(),
70-
condition != null ? condition.getCondition() : "n/a",
71-
condition != null ? condition.getDescription() : "n/a"));
66+
TRACE_LOGGER.info(String.format(Locale.US, "processOnClose clientName[%s], linkName[%s], errorCondition[%s], errorDescription[%s]",
67+
this.name, link.getName(), condition != null ? condition.getCondition() : "n/a", condition != null ? condition.getDescription() : "n/a"));
7268
}
7369

7470
this.underlyingEntity.onClose(condition);
7571
}
7672

7773
public void processOnClose(Link link, Exception exception) {
7874
if (TRACE_LOGGER.isInfoEnabled()) {
79-
TRACE_LOGGER.info(String.format("processOnClose linkName[%s], exception[%s]",
80-
link.getName(),
81-
exception != null ? exception.getMessage() : "n/a"));
75+
TRACE_LOGGER.info(String.format(Locale.US, "processOnClose clientName[%s], linkName[%s], exception[%s]",
76+
this.name, link.getName(), exception != null ? exception.getMessage() : "n/a"));
8277
}
8378

8479
this.underlyingEntity.onError(exception);
@@ -89,10 +84,8 @@ private void closeSession(Link link, ErrorCondition condition) {
8984

9085
if (session != null && session.getLocalState() != EndpointState.CLOSED) {
9186
if (TRACE_LOGGER.isInfoEnabled()) {
92-
TRACE_LOGGER.info(String.format("closeSession for linkName[%s], errorCondition[%s], errorDescription[%s]",
93-
link.getName(),
94-
condition != null ? condition.getCondition() : "n/a",
95-
condition != null ? condition.getDescription() : "n/a"));
87+
TRACE_LOGGER.info(String.format(Locale.US, "closeSession for clientName[%s], linkName[%s], errorCondition[%s], errorDescription[%s]",
88+
this.name, link.getName(), condition != null ? condition.getCondition() : "n/a", condition != null ? condition.getDescription() : "n/a"));
9689
}
9790

9891
session.setCondition(condition);

azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/CBSChannel.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,15 @@ final class CBSChannel {
2020

2121
public CBSChannel(
2222
final SessionProvider sessionProvider,
23-
final AmqpConnection connection) {
23+
final AmqpConnection connection,
24+
final String clientId) {
2425

2526
this.sessionProvider = sessionProvider;
2627
this.connectionEventDispatcher = connection;
2728

2829
RequestResponseCloser closer = new RequestResponseCloser();
2930
this.innerChannel = new FaultTolerantObject<>(
30-
new RequestResponseOpener(sessionProvider, "cbs-session", "cbs", ClientConstants.CBS_ADDRESS, connection),
31+
new RequestResponseOpener(sessionProvider, clientId, "cbs-session", "cbs", ClientConstants.CBS_ADDRESS, connection),
3132
closer);
3233
closer.setInnerChannel(this.innerChannel);
3334
}

azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ClientConstants.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ public final class ClientConstants {
2828
public final static Duration TIMER_TOLERANCE = Duration.ofSeconds(1);
2929
public final static Duration DEFAULT_RETRY_MIN_BACKOFF = Duration.ofSeconds(0);
3030
public final static Duration DEFAULT_RETRY_MAX_BACKOFF = Duration.ofSeconds(30);
31-
public final static Duration TOKEN_REFRESH_INTERVAL = Duration.ofMinutes(10); // renew every 10 mins, which expires 20 mins
31+
public final static Duration TOKEN_REFRESH_INTERVAL = Duration.ofMinutes(5); // renew every 5 minutes, which expires 20 minutes
3232
public final static Duration TOKEN_VALIDITY = Duration.ofMinutes(20);
3333
public final static int DEFAULT_MAX_RETRY_COUNT = 10;
3434
public final static boolean DEFAULT_IS_TRANSIENT = true;

azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ConnectionHandler.java

Lines changed: 27 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -18,21 +18,21 @@
1818
import java.util.Locale;
1919
import java.util.Map;
2020

21-
// ServiceBus <-> ProtonReactor interaction handles all
22-
// amqp_connection/transport related events from reactor
2321
public class ConnectionHandler extends BaseHandler {
2422

2523
private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(ConnectionHandler.class);
2624

2725
private final AmqpConnection amqpConnection;
26+
private final String connectionId;
2827

29-
protected ConnectionHandler(final AmqpConnection amqpConnection) {
28+
protected ConnectionHandler(final AmqpConnection amqpConnection, final String connectionId) {
3029

3130
add(new Handshaker());
3231
this.amqpConnection = amqpConnection;
32+
this.connectionId = connectionId;
3333
}
3434

35-
static ConnectionHandler create(TransportType transportType, AmqpConnection amqpConnection) {
35+
static ConnectionHandler create(TransportType transportType, AmqpConnection amqpConnection, String connectionId) {
3636
switch (transportType) {
3737
case AMQP_WEB_SOCKETS:
3838
if (WebSocketProxyConnectionHandler.shouldUseProxy(amqpConnection.getHostName())) {
@@ -42,7 +42,7 @@ static ConnectionHandler create(TransportType transportType, AmqpConnection amqp
4242
}
4343
case AMQP:
4444
default:
45-
return new ConnectionHandler(amqpConnection);
45+
return new ConnectionHandler(amqpConnection, connectionId);
4646
}
4747
}
4848

@@ -63,17 +63,18 @@ protected AmqpConnection getAmqpConnection() {
6363
@Override
6464
public void onConnectionInit(Event event) {
6565
if (TRACE_LOGGER.isInfoEnabled()) {
66-
TRACE_LOGGER.info(String.format(Locale.US, "onConnectionInit hostname[%s]", this.amqpConnection.getHostName()));
66+
TRACE_LOGGER.info(String.format(Locale.US, "onConnectionInit hostname[%s], connectionId[%s]",
67+
this.amqpConnection.getHostName(), this.connectionId));
6768
}
6869

6970
final Connection connection = event.getConnection();
7071
final String hostName = new StringBuilder(this.amqpConnection.getHostName())
7172
.append(":")
72-
.append(String.valueOf(this.getProtocolPort()))
73+
.append(this.getProtocolPort())
7374
.toString();
7475

7576
connection.setHostname(hostName);
76-
connection.setContainer(StringUtil.getRandomString());
77+
connection.setContainer(this.connectionId);
7778

7879
final Map<Symbol, Object> connectionProperties = new HashMap<>();
7980
connectionProperties.put(AmqpConstants.PRODUCT, ClientConstants.PRODUCT_NAME);
@@ -137,7 +138,8 @@ protected int getMaxFrameSize() {
137138
@Override
138139
public void onConnectionBound(Event event) {
139140
if (TRACE_LOGGER.isInfoEnabled()) {
140-
TRACE_LOGGER.info(String.format(Locale.US, "onConnectionBound hostname[%s]", this.amqpConnection.getHostName()));
141+
TRACE_LOGGER.info(String.format(Locale.US, "onConnectionBound hostname[%s], connectionId[%s]",
142+
this.amqpConnection.getHostName(), this.connectionId));
141143
}
142144

143145
final Transport transport = event.getTransport();
@@ -150,8 +152,8 @@ public void onConnectionUnbound(Event event) {
150152

151153
final Connection connection = event.getConnection();
152154
if (TRACE_LOGGER.isInfoEnabled()) {
153-
TRACE_LOGGER.info(String.format(Locale.US, "onConnectionUnbound: hostname[%s], state[%s], remoteState[%s]",
154-
connection.getHostname(), connection.getLocalState(), connection.getRemoteState()));
155+
TRACE_LOGGER.info(String.format(Locale.US, "onConnectionUnbound hostname[%s], connectionId[%s], state[%s], remoteState[%s]",
156+
connection.getHostname(), this.connectionId, connection.getLocalState(), connection.getRemoteState()));
155157
}
156158

157159
// if failure happened while establishing transport - nothing to free up.
@@ -167,9 +169,8 @@ public void onTransportError(Event event) {
167169
final ErrorCondition condition = transport.getCondition();
168170

169171
if (TRACE_LOGGER.isWarnEnabled()) {
170-
TRACE_LOGGER.warn(String.format(Locale.US, "onTransportError: hostname[%s], error[%s]",
171-
connection != null ? connection.getHostname() : "n/a",
172-
condition != null ? condition.getDescription() : "n/a"));
172+
TRACE_LOGGER.warn(String.format(Locale.US, "onTransportError hostname[%s], connectionId[%s], error[%s]",
173+
connection != null ? connection.getHostname() : "n/a", this.connectionId, condition != null ? condition.getDescription() : "n/a"));
173174
}
174175

175176
if (connection != null && connection.getRemoteState() != EndpointState.CLOSED) {
@@ -192,8 +193,8 @@ public void onTransportClosed(Event event) {
192193
final ErrorCondition condition = transport.getCondition();
193194

194195
if (TRACE_LOGGER.isInfoEnabled()) {
195-
TRACE_LOGGER.info(String.format(Locale.US, "onTransportClosed: hostname[%s], error[%s]",
196-
connection != null ? connection.getHostname() : "n/a", (condition != null ? condition.getDescription() : "n/a")));
196+
TRACE_LOGGER.info(String.format(Locale.US, "onTransportClosed hostname[%s], connectionId[%s], error[%s]",
197+
connection != null ? connection.getHostname() : "n/a", this.connectionId, (condition != null ? condition.getDescription() : "n/a")));
197198
}
198199

199200
if (connection != null && connection.getRemoteState() != EndpointState.CLOSED) {
@@ -209,19 +210,17 @@ public void onConnectionLocalOpen(Event event) {
209210
final ErrorCondition error = connection.getCondition();
210211

211212
if (TRACE_LOGGER.isInfoEnabled()) {
212-
TRACE_LOGGER.info(String.format(Locale.US, "onConnectionLocalOpen: hostname[%s], errorCondition[%s], errorDescription[%s]",
213-
connection.getHostname(),
214-
error != null ? error.getCondition() : "n/a",
215-
error != null ? error.getDescription() : "n/a"));
213+
TRACE_LOGGER.info(String.format(Locale.US, "onConnectionLocalOpen hostname[%s], connectionId[%s], errorCondition[%s], errorDescription[%s]",
214+
connection.getHostname(), this.connectionId, error != null ? error.getCondition() : "n/a", error != null ? error.getDescription() : "n/a"));
216215
}
217216
}
218217

219218
@Override
220219
public void onConnectionRemoteOpen(Event event) {
221220

222221
if (TRACE_LOGGER.isInfoEnabled()) {
223-
TRACE_LOGGER.info(String.format(Locale.US, "onConnectionRemoteOpen: hostname[%s], remoteContainer[%s]",
224-
event.getConnection().getHostname(), event.getConnection().getRemoteContainer()));
222+
TRACE_LOGGER.info(String.format(Locale.US, "onConnectionRemoteOpen hostname[%s], connectionId[%s], remoteContainer[%s]",
223+
event.getConnection().getHostname(), this.connectionId, event.getConnection().getRemoteContainer()));
225224
}
226225

227226
this.amqpConnection.onOpenComplete(null);
@@ -234,10 +233,8 @@ public void onConnectionLocalClose(Event event) {
234233
final ErrorCondition error = connection.getCondition();
235234

236235
if (TRACE_LOGGER.isInfoEnabled()) {
237-
TRACE_LOGGER.info(String.format(Locale.US, "onConnectionLocalClose: hostname[%s], errorCondition[%s], errorDescription[%s]",
238-
connection.getHostname(),
239-
error != null ? error.getCondition() : "n/a",
240-
error != null ? error.getDescription() : "n/a"));
236+
TRACE_LOGGER.info(String.format(Locale.US, "onConnectionLocalClose hostname[%s], connectionId[%s], errorCondition[%s], errorDescription[%s]",
237+
connection.getHostname(), this.connectionId, error != null ? error.getCondition() : "n/a", error != null ? error.getDescription() : "n/a"));
241238
}
242239

243240
if (connection.getRemoteState() == EndpointState.CLOSED) {
@@ -256,10 +253,8 @@ public void onConnectionRemoteClose(Event event) {
256253
final ErrorCondition error = connection.getRemoteCondition();
257254

258255
if (TRACE_LOGGER.isInfoEnabled()) {
259-
TRACE_LOGGER.info(String.format(Locale.US, "onConnectionRemoteClose: hostname[%s], errorCondition[%s], errorDescription[%s]",
260-
connection.getHostname(),
261-
error != null ? error.getCondition() : "n/a",
262-
error != null ? error.getDescription() : "n/a"));
256+
TRACE_LOGGER.info(String.format(Locale.US, "onConnectionRemoteClose hostname[%s], connectionId[%s], errorCondition[%s], errorDescription[%s]",
257+
connection.getHostname(), this.connectionId, error != null ? error.getCondition() : "n/a", error != null ? error.getDescription() : "n/a"));
263258
}
264259

265260
this.amqpConnection.onConnectionError(error);
@@ -271,10 +266,8 @@ public void onConnectionFinal(Event event) {
271266
final ErrorCondition error = connection.getCondition();
272267

273268
if (TRACE_LOGGER.isInfoEnabled()) {
274-
TRACE_LOGGER.info(String.format(Locale.US, "onConnectionFinal: hostname[%s], errorCondition[%s], errorDescription[%s]",
275-
connection.getHostname(),
276-
error != null ? error.getCondition() : "n/a",
277-
error != null ? error.getDescription() : "n/a"));
269+
TRACE_LOGGER.info(String.format(Locale.US, "onConnectionFinal hostname[%s], connectionId[%s], errorCondition[%s], errorDescription[%s]",
270+
connection.getHostname(), this.connectionId, error != null ? error.getCondition() : "n/a", error != null ? error.getDescription() : "n/a"));
278271
}
279272
}
280273
}

azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/CustomIOHandler.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,20 @@ public class CustomIOHandler extends IOHandler {
1313

1414
private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(CustomIOHandler.class);
1515

16+
private final String name;
17+
18+
public CustomIOHandler(final String name) {
19+
this.name = name;
20+
}
21+
1622
@Override
1723
public void onTransportClosed(Event event) {
1824
final Transport transport = event.getTransport();
1925
final Connection connection = event.getConnection();
2026

2127
if (TRACE_LOGGER.isInfoEnabled()) {
22-
TRACE_LOGGER.info(String.format(Locale.US, "onTransportClosed hostname[%s]",
23-
(connection != null ? connection.getHostname() : "n/a")));
28+
TRACE_LOGGER.info(String.format(Locale.US, "onTransportClosed name[%s], hostname[%s]",
29+
this.name, (connection != null ? connection.getHostname() : "n/a")));
2430
}
2531

2632
if (transport != null && connection != null && connection.getTransport() != null) {

0 commit comments

Comments
 (0)