Skip to content

Commit 4dcd3ab

Browse files
authored
[EH Perf improvement] Translate Reactor's no-backpressure to prefetch in amqp Flow (Azure#22819)
* Translate Reactor's no-backpressure to prefetch in amqp Flow * Clarified the credit for no-backpressure case in the getCreditsToAdd(..) method javadoc
1 parent 86e6d62 commit 4dcd3ab

File tree

2 files changed

+9
-8
lines changed

2 files changed

+9
-8
lines changed

sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/AmqpReceiveLinkProcessor.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -572,7 +572,8 @@ private void addCreditsToLink(String message) {
572572

573573
/**
574574
* Gets the number of credits to add based on {@link #requested} and how many messages are still in queue.
575-
* If {@link #requested} is {@link Long#MAX_VALUE}, then we add credits 1 by 1. Similar to Track 1's behaviour.
575+
* If {@link #requested} is {@link Long#MAX_VALUE}, which indicates no-backpressure,
576+
* then we use the {@link #prefetch} value as credit.
576577
*
577578
* @return The number of credits to add.
578579
*/
@@ -584,7 +585,7 @@ private int getCreditsToAdd() {
584585
if (subscriber == null || request == 0) {
585586
credits = 0;
586587
} else if (request == Long.MAX_VALUE) {
587-
credits = 1;
588+
credits = prefetch;
588589
} else {
589590
final int remaining = Long.valueOf(request).intValue() - messageQueue.size();
590591
credits = Math.max(remaining, 0);

sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/implementation/AmqpReceiveLinkProcessorTest.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -153,8 +153,8 @@ void createNewLink() {
153153
Assertions.assertNotNull(value);
154154

155155
final Integer creditValue = value.get();
156-
// Expecting 1 because it is Long.MAX_VALUE.
157-
Assertions.assertEquals(1, creditValue);
156+
// Expecting PREFETCH because it is Long.MAX_VALUE.
157+
Assertions.assertEquals(PREFETCH, creditValue);
158158
}
159159

160160
/**
@@ -462,8 +462,8 @@ void receivesUntilFirstLinkClosed() {
462462
Assertions.assertNotNull(value);
463463

464464
final Integer creditValue = value.get();
465-
// Expecting 1 because it is Long.MAX_VALUE.
466-
Assertions.assertEquals(1, creditValue);
465+
// Expecting PREFETCH because it is Long.MAX_VALUE.
466+
Assertions.assertEquals(PREFETCH, creditValue);
467467
}
468468

469469
@Test
@@ -496,8 +496,8 @@ void receivesFromFirstLink() {
496496
Assertions.assertNotNull(value);
497497

498498
final Integer creditValue = value.get();
499-
// Expecting 1 because it is Long.MAX_VALUE.
500-
Assertions.assertEquals(1, creditValue);
499+
// Expecting PREFETCH because it is Long.MAX_VALUE.
500+
Assertions.assertEquals(PREFETCH, creditValue);
501501
}
502502

503503
/**

0 commit comments

Comments
 (0)