Skip to content

Commit 87d97a9

Browse files
authored
[Service Bus] Shutdown scheduler before closing client (Azure#17738)
* Shutdown scheduler before closing client * Change equality check * Refactored close logic
1 parent 69088d0 commit 87d97a9

File tree

1 file changed

+28
-9
lines changed

1 file changed

+28
-9
lines changed

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -120,16 +120,25 @@ public synchronized void start() {
120120
logger.info("Processor is already running");
121121
return;
122122
}
123+
124+
if (asyncClient.get() == null) {
125+
ServiceBusReceiverAsyncClient newReceiverClient = this.receiverBuilder == null
126+
? this.sessionReceiverBuilder.buildAsyncClientForProcessor()
127+
: this.receiverBuilder.buildAsyncClient();
128+
asyncClient.set(newReceiverClient);
129+
}
130+
123131
receiveMessages();
124132

125133
// Start an executor to periodically check if the client's connection is active
126-
this.scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
127-
scheduledExecutor.scheduleWithFixedDelay(() -> {
128-
if (this.asyncClient.get().isConnectionClosed()) {
129-
restartMessageReceiver();
130-
}
131-
}, SCHEDULER_INTERVAL_IN_SECONDS, SCHEDULER_INTERVAL_IN_SECONDS, TimeUnit.SECONDS);
132-
134+
if (this.scheduledExecutor == null) {
135+
this.scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
136+
scheduledExecutor.scheduleWithFixedDelay(() -> {
137+
if (this.asyncClient.get().isConnectionClosed()) {
138+
restartMessageReceiver();
139+
}
140+
}, SCHEDULER_INTERVAL_IN_SECONDS, SCHEDULER_INTERVAL_IN_SECONDS, TimeUnit.SECONDS);
141+
}
133142
}
134143

135144
/**
@@ -149,9 +158,16 @@ public synchronized void close() {
149158
isRunning.set(false);
150159
if (receiverSubscription.get() != null) {
151160
receiverSubscription.get().cancel();
161+
receiverSubscription.set(null);
162+
}
163+
if (scheduledExecutor != null) {
164+
scheduledExecutor.shutdown();
165+
scheduledExecutor = null;
166+
}
167+
if (asyncClient.get() != null) {
168+
asyncClient.get().close();
169+
asyncClient.set(null);
152170
}
153-
asyncClient.get().close();
154-
scheduledExecutor.shutdown();
155171
}
156172

157173
/**
@@ -301,6 +317,9 @@ private void handleError(Throwable throwable) {
301317
}
302318

303319
private void restartMessageReceiver() {
320+
if (!isRunning()) {
321+
return;
322+
}
304323
receiverSubscription.set(null);
305324
ServiceBusReceiverAsyncClient receiverClient = asyncClient.get();
306325
receiverClient.close();

0 commit comments

Comments
 (0)