Skip to content

Commit ad184d3

Browse files
authored
[Service Bus] Allow 0 prefetch and dynamically use batch size to request link credits (Azure#17546)
1 parent b7bba68 commit ad184d3

File tree

14 files changed

+146
-108
lines changed

14 files changed

+146
-108
lines changed

sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/AmqpReceiveLink.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ public interface AmqpReceiveLink extends AmqpLink {
2525
Flux<Message> receive();
2626

2727
/**
28-
* Adds the specified number of credits to the link.
28+
* Schedule to adds the specified number of credits to the link.
2929
*
3030
* The number of link credits initialises to zero. It is the application's responsibility to call this method to
3131
* allow the receiver to receive {@code credits} more deliveries.
@@ -34,6 +34,21 @@ public interface AmqpReceiveLink extends AmqpLink {
3434
*/
3535
void addCredits(int credits);
3636

37+
/**
38+
* Adds the specified number of credits to the link.
39+
*
40+
* The number of link credits initialises to zero. It is the application's responsibility to call this method to
41+
* allow the receiver to receive {@code credits} more deliveries.
42+
*
43+
* It will update the credits in local memory instantly so {@link #getCredits()} will get
44+
* the updated credits immediately. But the service side may get the credits added with a latency.
45+
* As a contrast, {@link #getCredits()} may return an unchanged value for a short while after
46+
* {@link #addCredits(int)} is called to schedule the credit addition and before the job dispatcher executes it.
47+
*
48+
* @param credits Number of credits to add to the receive link.
49+
*/
50+
void addCreditsInstantly(int credits);
51+
3752
/**
3853
* Gets the current number of credits this link has.
3954
*

sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorReceiver.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,11 @@ public void addCredits(int credits) {
107107
}
108108
}
109109

110+
@Override
111+
public void addCreditsInstantly(int credits) {
112+
receiver.flow(credits);
113+
}
114+
110115
@Override
111116
public int getCredits() {
112117
return receiver.getRemoteCredit();

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusAsyncConsumer.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,7 @@ class ServiceBusAsyncConsumer implements AutoCloseable {
3434
this.linkProcessor = linkProcessor;
3535
this.messageSerializer = messageSerializer;
3636
this.processor = linkProcessor
37-
.map(message -> this.messageSerializer.deserialize(message, ServiceBusReceivedMessage.class))
38-
.publish(receiverOptions.getPrefetchCount())
39-
.autoConnect(1);
37+
.map(message -> this.messageSerializer.deserialize(message, ServiceBusReceivedMessage.class));
4038
}
4139

4240
/**

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ public final class ServiceBusClientBuilder {
9494

9595
// Using 0 pre-fetch count for both receive modes, to avoid message lock lost exceptions in application
9696
// receiving messages at a slow rate. Applications can set it to a higher value if they need better performance.
97-
private static final int DEFAULT_PREFETCH_COUNT = 1;
97+
private static final int DEFAULT_PREFETCH_COUNT = 0;
9898
private static final String NAME_KEY = "name";
9999
private static final String VERSION_KEY = "version";
100100
private static final String UNKNOWN = "UNKNOWN";
@@ -671,11 +671,13 @@ public ServiceBusSessionProcessorClientBuilder maxConcurrentSessions(int maxConc
671671

672672
/**
673673
* Sets the prefetch count of the processor. For both {@link ServiceBusReceiveMode#PEEK_LOCK PEEK_LOCK} and
674-
* {@link ServiceBusReceiveMode#RECEIVE_AND_DELETE RECEIVE_AND_DELETE} modes the default value is 1.
674+
* {@link ServiceBusReceiveMode#RECEIVE_AND_DELETE RECEIVE_AND_DELETE} modes the default value is 0.
675675
*
676676
* Prefetch speeds up the message flow by aiming to have a message readily available for local retrieval when
677677
* and before the application starts the processor.
678678
* Setting a non-zero value will prefetch that number of messages. Setting the value to zero turns prefetch off.
679+
* Using a non-zero prefetch risks of losing messages even though it has better performance.
680+
* @see <a href="https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-prefetch">Service Bus Prefetch</a>
679681
*
680682
* @param prefetchCount The prefetch count.
681683
*
@@ -1446,9 +1448,9 @@ ServiceBusReceiverAsyncClient buildAsyncClient(boolean isAutoCompleteAllowed) {
14461448
}
14471449

14481450
private void validateAndThrow(int prefetchCount) {
1449-
if (prefetchCount < 1) {
1451+
if (prefetchCount < 0) {
14501452
throw logger.logExceptionAsError(new IllegalArgumentException(String.format(
1451-
"prefetchCount (%s) cannot be less than 1.", prefetchCount)));
1453+
"prefetchCount (%s) cannot be less than 0.", prefetchCount)));
14521454
}
14531455
}
14541456

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -573,7 +573,18 @@ Flux<ServiceBusReceivedMessage> peekMessagesAt(int maxMessages, long sequenceNum
573573
* @return An <b>infinite</b> stream of messages from the Service Bus entity.
574574
*/
575575
public Flux<ServiceBusReceivedMessage> receiveMessages() {
576-
return receiveMessagesWithContext()
576+
// Without limitRate(), if the user calls receiveMessages().subscribe(), it will call
577+
// ServiceBusReceiveLinkProcessor.request(long request) where request = Long.MAX_VALUE.
578+
// We turn this one-time non-backpressure request to continuous requests with backpressure.
579+
// If receiverOptions.prefetchCount is set to non-zero, it will be passed to ServiceBusReceiveLinkProcessor
580+
// to auto-refill the prefetch buffer. A request will retrieve one message from this buffer.
581+
// If receiverOptions.prefetchCount is 0 (default value),
582+
// the request will add a link credit so one message is retrieved from the service.
583+
return receiveMessagesNoBackPressure().limitRate(1, 0);
584+
}
585+
586+
Flux<ServiceBusReceivedMessage> receiveMessagesNoBackPressure() {
587+
return receiveMessagesWithContext(0)
577588
.handle((serviceBusMessageContext, sink) -> {
578589
if (serviceBusMessageContext.hasError()) {
579590
sink.error(serviceBusMessageContext.getThrowable());
@@ -598,6 +609,10 @@ public Flux<ServiceBusReceivedMessage> receiveMessages() {
598609
* @return An <b>infinite</b> stream of messages from the Service Bus entity.
599610
*/
600611
Flux<ServiceBusMessageContext> receiveMessagesWithContext() {
612+
return receiveMessagesWithContext(1);
613+
}
614+
615+
Flux<ServiceBusMessageContext> receiveMessagesWithContext(int highTide) {
601616
final Flux<ServiceBusMessageContext> messageFlux = sessionManager != null
602617
? sessionManager.receive()
603618
: getOrCreateConsumer().receive().map(ServiceBusMessageContext::new);
@@ -610,16 +625,19 @@ Flux<ServiceBusMessageContext> receiveMessagesWithContext() {
610625
withAutoLockRenewal = messageFlux;
611626
}
612627

613-
final Flux<ServiceBusMessageContext> withAutoComplete;
628+
Flux<ServiceBusMessageContext> result;
614629
if (receiverOptions.isEnableAutoComplete()) {
615-
withAutoComplete = new FluxAutoComplete(withAutoLockRenewal, completionLock,
630+
result = new FluxAutoComplete(withAutoLockRenewal, completionLock,
616631
context -> context.getMessage() != null ? complete(context.getMessage()) : Mono.empty(),
617632
context -> context.getMessage() != null ? abandon(context.getMessage()) : Mono.empty());
618633
} else {
619-
withAutoComplete = withAutoLockRenewal;
634+
result = withAutoLockRenewal;
620635
}
621636

622-
return withAutoComplete
637+
if (highTide > 0) {
638+
result = result.limitRate(highTide, 0);
639+
}
640+
return result
623641
.onErrorMap(throwable -> mapError(throwable, ServiceBusErrorSource.RECEIVE));
624642
}
625643

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -575,12 +575,12 @@ public void rollbackTransaction(ServiceBusTransactionContext transactionContext)
575575
*/
576576
@Override
577577
public void close() {
578-
asyncClient.close();
579-
580578
SynchronousMessageSubscriber messageSubscriber = synchronousMessageSubscriber.getAndSet(null);
581579
if (messageSubscriber != null && !messageSubscriber.isDisposed()) {
582580
messageSubscriber.dispose();
583581
}
582+
583+
asyncClient.close();
584584
}
585585

586586
/**
@@ -590,19 +590,20 @@ public void close() {
590590
private void queueWork(int maximumMessageCount, Duration maxWaitTime,
591591
FluxSink<ServiceBusReceivedMessage> emitter) {
592592
final long id = idGenerator.getAndIncrement();
593-
final SynchronousReceiveWork work = new SynchronousReceiveWork(id, maximumMessageCount, maxWaitTime, emitter);
594-
593+
final int prefetch = asyncClient.getReceiverOptions().getPrefetchCount();
594+
final int toRequest = prefetch != 0 ? Math.min(maximumMessageCount, prefetch) : maximumMessageCount;
595+
final SynchronousReceiveWork work = new SynchronousReceiveWork(id,
596+
toRequest,
597+
maxWaitTime, emitter);
595598
SynchronousMessageSubscriber messageSubscriber = synchronousMessageSubscriber.get();
596599
if (messageSubscriber == null) {
597-
long prefetch = asyncClient.getReceiverOptions().getPrefetchCount();
598-
SynchronousMessageSubscriber newSubscriber = new SynchronousMessageSubscriber(prefetch, work);
599-
600+
SynchronousMessageSubscriber newSubscriber = new SynchronousMessageSubscriber(toRequest, work);
600601
if (!synchronousMessageSubscriber.compareAndSet(null, newSubscriber)) {
601602
newSubscriber.dispose();
602603
SynchronousMessageSubscriber existing = synchronousMessageSubscriber.get();
603604
existing.queueWork(work);
604605
} else {
605-
asyncClient.receiveMessages().subscribeWith(newSubscriber);
606+
asyncClient.receiveMessagesNoBackPressure().subscribeWith(newSubscriber);
606607
}
607608
} else {
608609
messageSubscriber.queueWork(work);

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,7 @@ private Flux<ServiceBusMessageContext> getSession(Scheduler scheduler, boolean d
309309
onSessionRequest(1L);
310310
}
311311
}))
312-
.publishOn(scheduler);
312+
.publishOn(scheduler, 1);
313313
}
314314

315315
private Mono<ServiceBusManagementNode> getManagementNode() {

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiver.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,14 +63,23 @@ class ServiceBusSessionReceiver implements AutoCloseable {
6363
this.receiveLink = receiveLink;
6464
this.lockContainer = new LockContainer<>(ServiceBusConstants.OPERATION_TIMEOUT);
6565

66-
receiveLink.setEmptyCreditListener(() -> 1);
66+
receiveLink.setEmptyCreditListener(() -> 0);
6767

6868
final Flux<ServiceBusMessageContext> receivedMessagesFlux = receiveLink
6969
.receive()
7070
.publishOn(scheduler)
7171
.doOnSubscribe(subscription -> {
7272
logger.verbose("Adding prefetch to receive link.");
73-
receiveLink.addCredits(prefetch);
73+
if (prefetch > 0) {
74+
receiveLink.addCredits(prefetch);
75+
}
76+
})
77+
.doOnRequest(request -> { // request is of type long.
78+
if (prefetch == 0) { // add "request" number of credits
79+
receiveLink.addCredits((int) request);
80+
} else { // keep total credits "prefetch" if prefetch is not 0.
81+
receiveLink.addCredits(Math.max(0, prefetch - receiveLink.getCredits()));
82+
}
7483
})
7584
.takeUntilOther(cancelReceiveProcessor)
7685
.map(message -> {

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SynchronousMessageSubscriber.java

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,6 @@ protected void hookOnSubscribe(Subscription subscription) {
5757

5858
if (Operators.setOnce(UPSTREAM, this, subscription)) {
5959
this.subscription = subscription;
60-
remaining.addAndGet(requested);
61-
subscription.request(requested);
6260
subscriberInitialized = true;
6361
drain();
6462
} else {
@@ -140,7 +138,6 @@ private void drainQueue() {
140138

141139
while ((currentWork = workQueue.peek()) != null
142140
&& (!currentWork.isProcessingStarted() || bufferMessages.size() > 0)) {
143-
144141
// Additional check for safety, but normally this work should never be terminal
145142
if (currentWork.isTerminal()) {
146143
// This work already finished by either timeout or no more messages to send, process next work.
@@ -155,6 +152,9 @@ private void drainQueue() {
155152
// timer to complete the currentWork in case of timeout trigger
156153
currentTimeoutOperation = getTimeoutOperation(currentWork);
157154
currentWork.startedProcessing();
155+
final long calculatedRequest = currentWork.getNumberOfEvents() - remaining.get();
156+
remaining.addAndGet(calculatedRequest);
157+
subscription.request(calculatedRequest);
158158
}
159159

160160
// Send messages to currentWork from buffer
@@ -174,15 +174,6 @@ private void drainQueue() {
174174
currentTimeoutOperation.dispose();
175175
}
176176
logger.verbose("The work [{}] is complete.", currentWork.getId());
177-
} else {
178-
// Since this work is not complete, find out how much we should request from upstream
179-
long creditToAdd = currentWork.getRemaining() - (remaining.get() + bufferMessages.size());
180-
if (creditToAdd > 0) {
181-
remaining.addAndGet(creditToAdd);
182-
subscription.request(creditToAdd);
183-
logger.verbose("Requesting [{}] from upstream for work [{}].", creditToAdd,
184-
currentWork.getId());
185-
}
186177
}
187178
}
188179
}

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessor.java

Lines changed: 43 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ public class ServiceBusReceiveLinkProcessor extends FluxProcessor<ServiceBusRece
4545
private final Object queueLock = new Object();
4646
private final AtomicBoolean isTerminated = new AtomicBoolean();
4747
private final AtomicInteger retryAttempts = new AtomicInteger();
48-
private final AtomicBoolean linkCreditsAdded = new AtomicBoolean();
4948
private final AtomicReference<String> linkName = new AtomicReference<>();
5049

5150
// Queue containing all the prefetched messages.
@@ -200,12 +199,7 @@ public void onNext(ServiceBusReceiveLink next) {
200199
oldSubscription = currentLinkSubscriptions;
201200

202201
currentLink = next;
203-
next.setEmptyCreditListener(() -> {
204-
final int creditsToAdd = getCreditsToAdd(0);
205-
linkCreditsAdded.set(creditsToAdd > 0);
206-
207-
return creditsToAdd;
208-
});
202+
next.setEmptyCreditListener(() -> 0);
209203

210204
currentLinkSubscriptions = Disposables.composite(
211205
next.receive().publishOn(Schedulers.boundedElastic()).subscribe(message -> {
@@ -499,6 +493,9 @@ private void drainQueue() {
499493
if (receiveMode != ServiceBusReceiveMode.PEEK_LOCK) {
500494
pendingMessages.decrementAndGet();
501495
}
496+
if (prefetch > 0) { // re-fill messageQueue if there is prefetch configured.
497+
checkAndAddCredits(currentLink);
498+
}
502499
} catch (Exception e) {
503500
logger.error("Exception occurred while handling downstream onNext operation.", e);
504501
throw logger.logExceptionAsError(Exceptions.propagate(
@@ -545,18 +542,14 @@ private void checkAndAddCredits(AmqpReceiveLink link) {
545542
return;
546543
}
547544

548-
// Credits have already been added to the link. We won't try again.
549-
if (linkCreditsAdded.getAndSet(true)) {
550-
return;
551-
}
552-
553-
final int credits = getCreditsToAdd(link.getCredits());
554-
linkCreditsAdded.set(credits > 0);
555-
556-
logger.info("Link credits to add. Credits: '{}'", credits);
545+
synchronized (lock) {
546+
final int linkCredits = link.getCredits();
547+
final int credits = getCreditsToAdd(linkCredits);
548+
logger.info("Link credits='{}', Link credits to add: '{}'", linkCredits, credits);
557549

558-
if (credits > 0) {
559-
link.addCredits(credits);
550+
if (credits > 0) {
551+
link.addCredits(credits);
552+
}
560553
}
561554
}
562555

@@ -571,22 +564,40 @@ private int getCreditsToAdd(int linkCredits) {
571564
}
572565

573566
final int creditsToAdd;
574-
if (messageQueue.isEmpty() && !hasBackpressure) {
575-
creditsToAdd = prefetch;
567+
final int expectedTotalCredit;
568+
if (prefetch == 0) {
569+
if (r <= Integer.MAX_VALUE) {
570+
expectedTotalCredit = (int) r;
571+
} else {
572+
//This won't really happen in reality.
573+
//For async client, receiveMessages() calls "return receiveMessagesNoBackPressure().limitRate(1, 0);".
574+
//So it will request one by one from this link processor, even though the user's request has no
575+
//back pressure.
576+
//For sync client, the sync subscriber has back pressure.
577+
//The request count uses the the argument of method receiveMessages(int maxMessages).
578+
//It's at most Integer.MAX_VALUE.
579+
expectedTotalCredit = Integer.MAX_VALUE;
580+
}
576581
} else {
577-
synchronized (queueLock) {
578-
final int queuedMessages = pendingMessages.get();
579-
final int pending = queuedMessages + linkCredits;
580-
581-
if (hasBackpressure) {
582-
creditsToAdd = Math.max(Long.valueOf(r).intValue() - pending, 0);
583-
} else {
584-
// If the queue has less than 1/3 of the prefetch, then add the difference to keep the queue full.
585-
creditsToAdd = minimumNumberOfMessages >= queuedMessages
586-
? Math.max(prefetch - pending, 1)
587-
: 0;
588-
}
582+
expectedTotalCredit = prefetch;
583+
}
584+
logger.info("linkCredits: '{}', expectedTotalCredit: '{}'", linkCredits, expectedTotalCredit);
585+
586+
synchronized (queueLock) {
587+
final int queuedMessages = pendingMessages.get();
588+
final int pending = queuedMessages + linkCredits;
589+
590+
if (hasBackpressure) {
591+
creditsToAdd = Math.max(expectedTotalCredit - pending, 0);
592+
} else {
593+
// If the queue has less than 1/3 of the prefetch, then add the difference to keep the queue full.
594+
creditsToAdd = minimumNumberOfMessages >= queuedMessages
595+
? Math.max(expectedTotalCredit - pending, 0)
596+
: 0;
589597
}
598+
logger.info("prefetch: '{}', requested: '{}', linkCredits: '{}', expectedTotalCredit: '{}', queuedMessages:"
599+
+ "'{}', creditsToAdd: '{}', messageQueue.size(): '{}'", getPrefetch(), r, linkCredits,
600+
expectedTotalCredit, queuedMessages, creditsToAdd, messageQueue.size());
590601
}
591602

592603
return creditsToAdd;

0 commit comments

Comments
 (0)