Skip to content

Commit 6d827ec

Browse files
authored
Avoid error in one faulty receiver bypassing to parent connection hosting other healthy receivers, ensure ExceutionRejectedException is handled in all places that uses Dispatcher.invoke (Azure#27839)
1 parent 9a7c96b commit 6d827ec

File tree

7 files changed

+42
-15
lines changed

7 files changed

+42
-15
lines changed

eng/versioning/version_client.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -354,6 +354,7 @@ com.azure.tools:azure-sdk-build-tool;1.0.0-beta.1;1.0.0-beta.2
354354
unreleased_com.azure:azure-aot-graalvm-support;1.0.0-beta.1
355355
unreleased_com.azure:azure-aot-graalvm-support-netty;1.0.0-beta.1
356356
unreleased_com.azure:azure-aot-graalvm-perf;1.0.0-beta.1
357+
unreleased_com.azure:azure-core-amqp;2.5.0-beta.1
357358
unreleased_com.azure:azure-core;1.27.0-beta.1
358359

359360
# Released Beta dependencies: Copy the entry from above, prepend "beta_", remove the current

sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorReceiver.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,11 @@ protected ReactorReceiver(AmqpConnection amqpConnection, String entityPath, Rece
8383
return Mono.create(sink -> {
8484
try {
8585
this.dispatcher.invoke(() -> {
86+
if (isDisposed()) {
87+
sink.error(new IllegalStateException(
88+
"Cannot decode delivery when ReactorReceiver instance is closed."));
89+
return;
90+
}
8691
final Message message = decodeDelivery(delivery);
8792
final int creditsLeft = receiver.getRemoteCredit();
8893

sdk/eventhubs/azure-messaging-eventhubs/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
<dependency>
4343
<groupId>com.azure</groupId>
4444
<artifactId>azure-core-amqp</artifactId>
45-
<version>2.4.1</version> <!-- {x-version-update;com.azure:azure-core-amqp;dependency} -->
45+
<version>2.5.0-beta.1</version> <!-- {x-version-update;unreleased_com.azure:azure-core-amqp;dependency} -->
4646
</dependency>
4747

4848
<!-- Test dependencies -->

sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/AmqpReceiveLinkProcessor.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -283,10 +283,20 @@ public void onNext(AmqpReceiveLink next) {
283283
}),
284284
next.receive()
285285
.onBackpressureBuffer(maxQueueSize, BufferOverflowStrategy.ERROR)
286-
.subscribe(message -> {
287-
messageQueue.add(message);
288-
drain();
289-
}));
286+
.subscribe(
287+
message -> {
288+
messageQueue.add(message);
289+
drain();
290+
},
291+
error -> {
292+
// When the receive on AmqpReceiveLink (e.g., ReactorReceiver) terminates
293+
// with an error, we expect the recovery to happen in response to the terminal events
294+
// in link EndpointState Flux.
295+
logger.atVerbose()
296+
.addKeyValue(LINK_NAME_KEY, linkName)
297+
.addKeyValue(ENTITY_PATH_KEY, entityPath)
298+
.log("Receiver is terminated.", error);
299+
}));
290300
}
291301

292302
disposeReceiver(oldChannel);

sdk/servicebus/azure-messaging-servicebus/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@
5353
<dependency>
5454
<groupId>com.azure</groupId>
5555
<artifactId>azure-core-amqp</artifactId>
56-
<version>2.4.1</version> <!-- {x-version-update;com.azure:azure-core-amqp;dependency} -->
56+
<version>2.5.0-beta.1</version> <!-- {x-version-update;unreleased_com.azure:azure-core-amqp;dependency} -->
5757
</dependency>
5858
<dependency>
5959
<groupId>com.azure</groupId>

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorReceiver.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import java.util.StringJoiner;
5050
import java.util.UUID;
5151
import java.util.concurrent.ConcurrentHashMap;
52+
import java.util.concurrent.RejectedExecutionException;
5253
import java.util.concurrent.atomic.AtomicBoolean;
5354
import java.util.concurrent.atomic.AtomicInteger;
5455

@@ -265,7 +266,7 @@ private Mono<Void> updateDispositionInternal(String lockToken, DeliveryState del
265266
unsettled.disposition(deliveryState);
266267
pendingUpdates.put(lockToken, workItem);
267268
});
268-
} catch (IOException error) {
269+
} catch (IOException | RejectedExecutionException error) {
269270
sink.error(new AmqpException(false, "updateDisposition failed while dispatching to Reactor.",
270271
error, handler.getErrorContext(receiver)));
271272
}
@@ -348,7 +349,7 @@ private void updateOutcome(String lockToken, Delivery delivery) {
348349
workItem.resetStartTime();
349350
try {
350351
provider.getReactorDispatcher().invoke(() -> delivery.disposition(workItem.getDeliveryState()));
351-
} catch (IOException error) {
352+
} catch (IOException | RejectedExecutionException error) {
352353
final Throwable amqpException = logger.atError()
353354
.addKeyValue(LOCK_TOKEN_KEY, lockToken)
354355
.log(new AmqpException(false,

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessor.java

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -217,14 +217,24 @@ public void onNext(ServiceBusReceiveLink next) {
217217
next.setEmptyCreditListener(() -> 0);
218218

219219
currentLinkSubscriptions = Disposables.composite(
220-
next.receive().publishOn(Schedulers.boundedElastic()).subscribe(message -> {
221-
synchronized (queueLock) {
222-
messageQueue.add(message);
223-
pendingMessages.incrementAndGet();
224-
}
220+
next.receive().publishOn(Schedulers.boundedElastic()).subscribe(
221+
message -> {
222+
synchronized (queueLock) {
223+
messageQueue.add(message);
224+
pendingMessages.incrementAndGet();
225+
}
225226

226-
drain();
227-
}),
227+
drain();
228+
},
229+
error -> {
230+
// When the receive on AmqpReceiveLink (e.g., ServiceBusReactorReceiver) terminates
231+
// with an error, we expect the recovery to happen in response to the terminal events
232+
// in link EndpointState Flux.
233+
logger.atVerbose()
234+
.addKeyValue(LINK_NAME_KEY, linkName)
235+
.addKeyValue(ENTITY_PATH_KEY, entityPath)
236+
.log("Receiver is terminated.", error);
237+
}),
228238
next.getEndpointStates().subscribeOn(Schedulers.boundedElastic()).subscribe(
229239
state -> {
230240
// Connection was successfully opened, we can reset the retry interval.

0 commit comments

Comments
 (0)