Skip to content

Commit bbb09a6

Browse files
authored
Replay the terminal error to work (receive calls) that arrives after the underlying async client termination. (Azure#34330)
* Replay the terminal error to work (receive calls) that arrives after the underlying async client termination. * Adding sync equivalent sample for ServiceBusReceiverAsyncClientRetrySample * Using volatile for disposalReason instead of AtomicReference * Update changelog
1 parent 177964e commit bbb09a6

File tree

3 files changed

+190
-0
lines changed

3 files changed

+190
-0
lines changed

sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
### Bugs Fixed
1010
- Fixed the issue of not retrying for a new session after the last acquire-session timeout. ([#34314](https://github.com/Azure/azure-sdk-for-java/issues/34314))
11+
- Replay the terminal error to backing work of sync receive calls that arrive after the inner async client termination. ([#34332](https://github.com/Azure/azure-sdk-for-java/issues/34332))
1112
### Other Changes
1213

1314
## 7.13.3 (2023-03-07)

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SynchronousMessageSubscriber.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@
2929
*/
3030
class SynchronousMessageSubscriber extends BaseSubscriber<ServiceBusReceivedMessage> {
3131
private static final ClientLogger LOGGER = new ClientLogger(SynchronousMessageSubscriber.class);
32+
private static final RuntimeException CLIENT_TERMINATED_ERROR = new RuntimeException("The receiver client is terminated. Re-create the client to continue receive attempt.");
3233
private final AtomicBoolean isDisposed = new AtomicBoolean();
34+
private volatile Throwable disposalReason;
3335
private final AtomicInteger wip = new AtomicInteger();
3436
private final ConcurrentLinkedQueue<SynchronousReceiveWork> workQueue = new ConcurrentLinkedQueue<>();
3537
private final ConcurrentLinkedDeque<ServiceBusReceivedMessage> bufferMessages = new ConcurrentLinkedDeque<>();
@@ -132,6 +134,14 @@ protected void hookOnNext(ServiceBusReceivedMessage message) {
132134
void queueWork(SynchronousReceiveWork work) {
133135
Objects.requireNonNull(work, "'work' cannot be null");
134136

137+
if (isTerminated()) {
138+
Throwable reason = disposalReason;
139+
if (reason == null) {
140+
reason = CLIENT_TERMINATED_ERROR;
141+
}
142+
work.complete("The receiver client is terminated. Re-create the client to continue receive attempt.", reason);
143+
return;
144+
}
135145
workQueue.add(work);
136146

137147
LoggingEventBuilder logBuilder = LOGGER.atVerbose()
@@ -362,6 +372,7 @@ private void dispose(String message, Throwable throwable) {
362372
if (isDisposed.getAndSet(true)) {
363373
return;
364374
}
375+
disposalReason = throwable;
365376

366377
synchronized (currentWorkLock) {
367378
if (currentWork != null) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.messaging.servicebus;
5+
6+
import com.azure.core.util.IterableStream;
7+
import com.azure.core.util.logging.ClientLogger;
8+
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
9+
import org.junit.jupiter.api.Test;
10+
11+
import java.time.Duration;
12+
import java.util.concurrent.TimeUnit;
13+
import java.util.concurrent.atomic.AtomicBoolean;
14+
15+
/**
16+
* The sample demonstrates how handle terminal error when enumerating the {@link com.azure.core.util.IterableStream} from
17+
* {@link ServiceBusReceiverClient#receiveMessages(int, Duration)} or {@link ServiceBusReceiverClient#receiveMessages(int)}
18+
* and recreate the client to continue receiving messages.
19+
*
20+
* <p>
21+
* Enumerating the {@link com.azure.core.util.IterableStream} from the receiveMessages APIs may emit a terminal error
22+
* (hence no longer emit messages) in the following cases -
23+
*
24+
* <ul>
25+
* <li>When the connection or link encounters a non-retriable error. A few examples of non-retriable errors are -
26+
* the app attempting to connect to a queue that does not exist, someone deleting the queue in the middle of receiving,
27+
* the user explicitly initiating Geo-DR, user disabling the queue. These are certain events where the Service Bus
28+
* service communicates to the SDK that a non-retriable error occurred.
29+
* </li>
30+
* <li>a series of connection or link recovery attempts fail in a row which exhausts the max-retry.</li>
31+
* </ul>
32+
*
33+
* <p>
34+
* When these cases happen, the usual pattern is to log the terminal error for auditing, close current client and create
35+
* a new client to continue receiving messages.
36+
*/
37+
public class ServiceBusReceiverClientRetrySample {
38+
private static final ClientLogger LOGGER = new ClientLogger(ServiceBusReceiverClientRetrySample.class);
39+
private final AtomicBoolean isRunning = new AtomicBoolean(false);
40+
String connectionString = System.getenv("AZURE_SERVICEBUS_NAMESPACE_CONNECTION_STRING");
41+
String queueName = System.getenv("AZURE_SERVICEBUS_SAMPLE_QUEUE_NAME");
42+
43+
/**
44+
* Main method of the sample showing how to handle terminal error that {@link com.azure.core.util.IterableStream}
45+
* from receiveMessages API throws and recreates the client to continue receiving messages.
46+
*
47+
* @param args Unused arguments to the program.
48+
* @throws InterruptedException if the program is unable to sleep while waiting for the receive.
49+
*/
50+
public static void main(String[] args) throws InterruptedException {
51+
final ServiceBusReceiverClientRetrySample sample = new ServiceBusReceiverClientRetrySample();
52+
sample.run();
53+
}
54+
55+
/**
56+
* Run method to invoke this demo on how to handle terminal error that {@link com.azure.core.util.IterableStream}
57+
* from receiveMessages API throws and recreates the client to continue receiving messages.
58+
*/
59+
@Test
60+
public void run() throws InterruptedException {
61+
startReceive();
62+
}
63+
64+
void startReceive() {
65+
isRunning.set(true);
66+
receiveMessages();
67+
}
68+
69+
void stopReceive() {
70+
isRunning.set(false);
71+
}
72+
73+
private void receiveMessages() {
74+
final ServiceBusReceiverClient client = createClient();
75+
76+
final int maxMessages = 5;
77+
final Duration maxWaitTime = Duration.ofSeconds(30);
78+
79+
Exception terminalError = null;
80+
// The message loop
81+
while (isRunning.get()) {
82+
final IterableStream<ServiceBusReceivedMessage> messages = client.receiveMessages(maxMessages, maxWaitTime);
83+
try {
84+
messages.forEach(message -> {
85+
final boolean success = handleMessage(message);
86+
if (success) {
87+
completeMessage(client, message);
88+
} else {
89+
abandonMessage(client, message);
90+
}
91+
});
92+
} catch (Exception enumerationError) {
93+
terminalError = enumerationError;
94+
// Handle the terminal error while enumerating the messages. The 'messages' enumeration (forEach) can
95+
// throw if the client runs into a non-retriable error or retry exhausts. In such cases, the current
96+
// client reaches terminal state and can no longer receive, so exit the message loop,
97+
break;
98+
}
99+
}
100+
// close the client
101+
client.close();
102+
if (!isRunning.get()) {
103+
return;
104+
}
105+
// log the terminal error for auditing
106+
LOGGER.warning("Receiver client's retry exhausted or a non-retryable error occurred.", terminalError);
107+
// and attempt to receive using new client.
108+
receiveMessages();
109+
}
110+
111+
/**
112+
* A business domain specific logic taking 5 seconds to handle the message which randomly fails.
113+
*
114+
* @param message The message to handle.
115+
* @return {@code true} if message handled successfully, {@code false} otherwise.
116+
*/
117+
private boolean handleMessage(ServiceBusReceivedMessage message) {
118+
LOGGER.info("Handling message: " + message.getMessageId());
119+
try {
120+
// The sleep API is used only to demonstrate any external 'blocking' IO (e.g., network, DB)
121+
TimeUnit.SECONDS.sleep(5);
122+
} catch (InterruptedException e) {
123+
e.printStackTrace();
124+
}
125+
final boolean handlingSucceeded = Math.random() < 0.5;
126+
if (handlingSucceeded) {
127+
return true;
128+
} else {
129+
LOGGER.info("Business logic failed to handle message: " + message.getMessageId());
130+
return false;
131+
}
132+
}
133+
134+
/**
135+
* Completes the message using the given client and logs any exception during completion.
136+
*
137+
* @param client the client.
138+
* @param message the message to complete.
139+
*/
140+
private void completeMessage(ServiceBusReceiverClient client, ServiceBusReceivedMessage message) {
141+
try {
142+
client.complete(message);
143+
} catch (Throwable completionError) {
144+
LOGGER.warning("Couldn't complete message {}", message.getMessageId(), completionError);
145+
}
146+
}
147+
148+
/**
149+
* Abandon the message using the given client and logs any exception during abandoning.
150+
*
151+
* @param client the client.
152+
* @param message the message to complete.
153+
*/
154+
private void abandonMessage(ServiceBusReceiverClient client, ServiceBusReceivedMessage message) {
155+
try {
156+
client.abandon(message);
157+
} catch (Throwable abandonError) {
158+
LOGGER.warning("Couldn't abandon message {}", message.getMessageId(), abandonError);
159+
}
160+
}
161+
162+
/**
163+
* Creates a receiver client.
164+
*
165+
* @return the receiver client.
166+
*/
167+
private ServiceBusReceiverClient createClient() {
168+
return new ServiceBusClientBuilder()
169+
.connectionString(connectionString)
170+
.receiver()
171+
.receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
172+
.queueName(queueName)
173+
.disableAutoComplete()
174+
.maxAutoLockRenewDuration(Duration.ZERO)
175+
.prefetchCount(0)
176+
.buildClient();
177+
}
178+
}

0 commit comments

Comments
 (0)