Skip to content

Commit 81db0c2

Browse files
authored
Option to disable autocomplete for processor (Azure#17190)
1 parent 01d52ac commit 81db0c2

File tree

4 files changed

+98
-2
lines changed

4 files changed

+98
-2
lines changed

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -771,6 +771,20 @@ public ServiceBusSessionProcessorClientBuilder maxConcurrentCalls(int maxConcurr
771771
return this;
772772
}
773773

774+
/**
775+
* Disables auto-complete and auto-abandon of received messages. By default, a successfully processed message is
776+
* {@link ServiceBusReceivedMessageContext#complete() completed}. If an error happens when
777+
* the message is processed, it is {@link ServiceBusReceivedMessageContext#abandon()
778+
* abandoned}.
779+
*
780+
* @return The modified {@link ServiceBusSessionProcessorClientBuilder} object.
781+
*/
782+
public ServiceBusSessionProcessorClientBuilder disableAutoComplete() {
783+
sessionReceiverClientBuilder.disableAutoComplete();
784+
processorClientOptions.setDisableAutoComplete(true);
785+
return this;
786+
}
787+
774788
/**
775789
* Creates a <b>session-aware</b> Service Bus processor responsible for reading
776790
* {@link ServiceBusReceivedMessage messages} from a specific queue or topic.
@@ -1191,6 +1205,20 @@ public ServiceBusProcessorClientBuilder maxConcurrentCalls(int maxConcurrentCall
11911205
return this;
11921206
}
11931207

1208+
/**
1209+
* Disables auto-complete and auto-abandon of received messages. By default, a successfully processed message is
1210+
* {@link ServiceBusReceivedMessageContext#complete() completed}. If an error happens when
1211+
* the message is processed, it is {@link ServiceBusReceivedMessageContext#abandon()
1212+
* abandoned}.
1213+
*
1214+
* @return The modified {@link ServiceBusProcessorClientBuilder} object.
1215+
*/
1216+
public ServiceBusProcessorClientBuilder disableAutoComplete() {
1217+
serviceBusReceiverClientBuilder.disableAutoComplete();
1218+
processorClientOptions.setDisableAutoComplete(true);
1219+
return this;
1220+
}
1221+
11941222
/**
11951223
* Creates Service Bus message processor responsible for reading {@link ServiceBusReceivedMessage
11961224
* messages} from a specific queue or topic.

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -169,8 +169,10 @@ public void onNext(ServiceBusMessageContext serviceBusMessageContext) {
169169
processMessage.accept(serviceBusReceivedMessageContext);
170170
} catch (Exception ex) {
171171
handleError(new ServiceBusReceiverException(ex, ServiceBusErrorSource.USER_CALLBACK));
172-
logger.warning("Error when processing message. Abandoning message.", ex);
173-
abandonMessage(serviceBusMessageContext, receiverClient);
172+
if (!processorOptions.isDisableAutoComplete()) {
173+
logger.warning("Error when processing message. Abandoning message.", ex);
174+
abandonMessage(serviceBusMessageContext, receiverClient);
175+
}
174176
}
175177
}
176178
if (isRunning.get()) {

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/models/ServiceBusProcessorClientOptions.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,24 @@
1313
public final class ServiceBusProcessorClientOptions {
1414

1515
private int maxConcurrentCalls = 1;
16+
private boolean disableAutoComplete;
17+
18+
/**
19+
* Returns true if the auto-complete and auto-abandon feature is disabled.
20+
* @return true if the auto-complete and auto-abandon feature is disabled.
21+
*/
22+
public boolean isDisableAutoComplete() {
23+
return disableAutoComplete;
24+
}
25+
26+
/**
27+
* Disables auto-complete and auto-abandon feature if this is set to {@code true}.
28+
* @param disableAutoComplete Disables auto-complete and auto-abandon feature if this is set to {@code true}.
29+
*/
30+
public ServiceBusProcessorClientOptions setDisableAutoComplete(boolean disableAutoComplete) {
31+
this.disableAutoComplete = disableAutoComplete;
32+
return this;
33+
}
1634

1735
/**
1836
* The max concurrent messages that should be processed by the processor.

sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorTest.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import static org.mockito.ArgumentMatchers.any;
2525
import static org.mockito.Mockito.doNothing;
2626
import static org.mockito.Mockito.mock;
27+
import static org.mockito.Mockito.never;
2728
import static org.mockito.Mockito.times;
2829
import static org.mockito.Mockito.verify;
2930
import static org.mockito.Mockito.when;
@@ -271,6 +272,53 @@ public void testUserMessageHandlerError() throws InterruptedException {
271272
verify(asyncClient, times(5)).abandon(any(ServiceBusReceivedMessage.class));
272273
}
273274

275+
@Test
276+
public void testUserMessageHandlerErrorWithAutoCompleteDisabled() throws InterruptedException {
277+
278+
Flux<ServiceBusMessageContext> messageFlux =
279+
Flux.create(emitter -> {
280+
for (int i = 0; i < 5; i++) {
281+
ServiceBusReceivedMessage serviceBusReceivedMessage =
282+
new ServiceBusReceivedMessage(BinaryData.fromString("hello"));
283+
serviceBusReceivedMessage.setMessageId(String.valueOf(i));
284+
ServiceBusMessageContext serviceBusMessageContext =
285+
new ServiceBusMessageContext(serviceBusReceivedMessage);
286+
emitter.next(serviceBusMessageContext);
287+
}
288+
});
289+
290+
ServiceBusClientBuilder.ServiceBusReceiverClientBuilder receiverBuilder =
291+
mock(ServiceBusClientBuilder.ServiceBusReceiverClientBuilder.class);
292+
293+
ServiceBusReceiverAsyncClient asyncClient = mock(ServiceBusReceiverAsyncClient.class);
294+
when(receiverBuilder.buildAsyncClient()).thenReturn(asyncClient);
295+
when(asyncClient.receiveMessagesWithContext()).thenReturn(messageFlux);
296+
when(asyncClient.isConnectionClosed()).thenReturn(false);
297+
doNothing().when(asyncClient).close();
298+
299+
AtomicInteger messageId = new AtomicInteger();
300+
CountDownLatch countDownLatch = new CountDownLatch(5);
301+
ServiceBusProcessorClient serviceBusProcessorClient = new ServiceBusProcessorClient(receiverBuilder,
302+
messageContext -> {
303+
assertEquals(String.valueOf(messageId.getAndIncrement()), messageContext.getMessage().getMessageId());
304+
throw new IllegalStateException(); // throw error from user handler
305+
},
306+
error -> {
307+
assertTrue(error instanceof ServiceBusReceiverException);
308+
ServiceBusReceiverException exception = (ServiceBusReceiverException) error;
309+
assertTrue(exception.getErrorSource() == ServiceBusErrorSource.USER_CALLBACK);
310+
countDownLatch.countDown();
311+
},
312+
new ServiceBusProcessorClientOptions().setMaxConcurrentCalls(1).setDisableAutoComplete(true));
313+
314+
serviceBusProcessorClient.start();
315+
boolean success = countDownLatch.await(5, TimeUnit.SECONDS);
316+
serviceBusProcessorClient.close();
317+
assertTrue(success, "Failed to receive all expected messages");
318+
319+
verify(asyncClient, never()).abandon(any(ServiceBusReceivedMessage.class));
320+
}
321+
274322
private ServiceBusClientBuilder.ServiceBusReceiverClientBuilder getBuilder(
275323
Flux<ServiceBusMessageContext> messageFlux) {
276324

0 commit comments

Comments
 (0)