Skip to content

Commit 00cba53

Browse files
authored
[Service Bus] Use processor in the readme (Azure#17386)
The processor client is what we should share in the getting started scenario and not the sync/async receiver clients. In this PR I have, - used the processor client to receive messages instead of sync/async receiver clients - removed the part on renewing locks as this is now done by default. These are remnants from the time when people had to opt in explicitly - merged settling messages into the processor sample - removed the session receiver example, put in just a note that it is similar to the non sessions case and listed out differences. @srnagar I put together this PR in a haste, please make corrections as you see fit. I'd like to see the basic version checked in ASAP and you can make improvements in a different PR
1 parent 8da4b10 commit 00cba53

File tree

1 file changed

+34
-106
lines changed
  • sdk/servicebus/azure-messaging-servicebus

1 file changed

+34
-106
lines changed

sdk/servicebus/azure-messaging-servicebus/README.md

Lines changed: 34 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -169,96 +169,44 @@ sender.sendMessages(messages);
169169
sender.close();
170170
```
171171

172-
### Receive messages and renew lock
172+
### Receive messages
173173

174-
You'll need to create an asynchronous [`ServiceBusReceiverAsyncClient`][ServiceBusReceiverAsyncClient] or a synchronous
175-
[`ServiceBusReceiverClient`][ServiceBusReceiverClient] to receive messages. Each receiver can consume messages from
176-
either a queue or a topic subscription.
174+
To receive messages, you will need to create a `ServiceBusProcessorClient` with callbacks for incoming messages and any error that occurs in the process. You can then start and stop the client as required.
177175

178-
By default, the receive mode is [`ReceiveMode.PEEK_LOCK`][ReceiveMode]. This tells the broker that the receiving client
179-
wants to manage settlement of received messages explicitly. The message is made available for the receiver to process,
180-
while held under an exclusive lock in the service so that other, competing receivers cannot see it.
181-
`ServiceBusReceivedMessage.getLockedUntil()` indicates when the lock expires and can be extended by the client using
182-
`receiver.renewMessageLock()`. A session lock can be extended by `receiver.renewSessionLock()` for a session enabled
183-
queue or topic/subscriber.
176+
By default, the `autoComplete` feature is enabled on the processor client which means that after executing your callback for the message, the client will complete the message i.e. remove it from the queue/subscription. If your callback throws an error, then the client will abandon the message i.e. make it available to be received again. You can disable this feature when creating the processor client.
184177

185-
#### Receive a batch of messages
186-
187-
The snippet below creates a [`ServiceBusReceiverClient`][ServiceBusReceiverClient] to receive messages from a topic
188-
subscription.
189-
190-
<!-- embedme ./src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java#L84-L101 -->
191-
```java
192-
ServiceBusReceiverClient receiver = new ServiceBusClientBuilder()
193-
.connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
194-
.receiver()
195-
.topicName("<< TOPIC NAME >>")
196-
.subscriptionName("<< SUBSCRIPTION NAME >>")
197-
.buildClient();
198-
199-
// Receives a batch of messages when 10 messages are received or until 30 seconds have elapsed, whichever
200-
// happens first.
201-
IterableStream<ServiceBusReceivedMessage> messages = receiver.receiveMessages(10, Duration.ofSeconds(30));
202-
messages.forEach(message -> {
203-
204-
System.out.printf("Id: %s. Contents: %s%n", message.getMessageId(),
205-
message.getBody().toString());
206-
});
207-
208-
// When you are done using the receiver, dispose of it.
209-
receiver.close();
210-
```
211-
212-
#### Receive a stream of messages
213-
214-
The asynchronous [`ServiceBusReceiverAsyncClient`][ServiceBusReceiverAsyncClient] continuously fetches messages until
215-
the `subscription` is disposed.
216-
217-
<!-- embedme ./src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java#L108-L130 -->
218-
```java
219-
ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder()
220-
.connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
221-
.receiver()
222-
.queueName("<< QUEUE NAME >>")
223-
.buildAsyncClient();
224-
225-
// receive() operation continuously fetches messages until the subscription is disposed.
226-
// The stream is infinite, and completes when the subscription or receiver is closed.
227-
Disposable subscription = receiver.receiveMessages().subscribe(message -> {
228-
229-
System.out.printf("Id: %s%n", message.getMessageId());
230-
System.out.printf("Contents: %s%n", message.getBody().toString());
231-
}, error -> {
232-
System.err.println("Error occurred while receiving messages: " + error);
233-
}, () -> {
234-
System.out.println("Finished receiving messages.");
235-
});
236-
237-
// Continue application processing. When you are finished receiving messages, dispose of the subscription.
238-
subscription.dispose();
239-
240-
// When you are done using the receiver, dispose of it.
241-
receiver.close();
242-
```
243-
244-
### Settle messages
245-
246-
When a message is received, it can be settled using any of the `complete()`, `abandon()`, `defer()`, or `deadLetter()`
247-
overloads. The sample below completes a received message from synchronous
248-
[`ServiceBusReceiverClient`][ServiceBusReceiverClient].
249-
250-
<!-- embedme ./src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java#L145-L151 -->
251178
```java
252-
// This fetches a batch of 10 messages or until the default operation timeout has elapsed.
253-
receiver.receiveMessages(10).forEach(message -> {
254-
// Process message and then complete it.
255-
System.out.println("Completing message " + message.getLockToken());
256-
257-
receiver.complete(message);
258-
});
179+
// Sample code that processes a single message
180+
Consumer<ServiceBusReceivedMessageContext> processMessage = messageContext -> {
181+
try {
182+
System.out.println(messageContext.getMessage().getMessageId());
183+
// other message processing code
184+
messageContext.complete();
185+
} catch (Exception ex) {
186+
messageContext.abandon();
187+
}
188+
}
189+
190+
// Sample code that gets called if there's an error
191+
Consumer<Throwable> processError = throwable -> {
192+
logError(throwable);
193+
metrics.recordError(throwable);
194+
}
195+
196+
// create the processor client via the builder and its sub-builder
197+
ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
198+
.connectionString("connection-string")
199+
.processor()
200+
.queueName("queue-name")
201+
.processMessage(processMessage)
202+
.processError(processError)
203+
.buildProcessorClient();
204+
205+
// Starts the processor in the background and returns immediately
206+
processorClient.start();
259207
```
260208

261-
There are four ways of settling messages:
209+
There are four ways of settling messages using the methods on the message context passed to your callback.
262210
- Complete - causes the message to be deleted from the queue or topic.
263211
- Abandon - releases the receiver's lock on the message allowing for the message to be received by other receivers.
264212
- Defer - defers the message from being received by normal means. In order to receive deferred messages, the sequence
@@ -298,29 +246,9 @@ sender.sendMessage(message);
298246

299247
#### Receive messages from a session
300248

301-
Receivers can fetch messages from a specific session or the first available, unlocked session.
249+
Receiving messages from sessions is similar to receiving messages from a non session enabled queue or subscription. The difference is in the builder and the class you use.
302250

303-
<!-- embedme ./src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java#L175-L181 -->
304-
```java
305-
// Creates a session-enabled receiver that gets messages from the session "greetings".
306-
ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder()
307-
.connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
308-
.sessionReceiver()
309-
.queueName("<< QUEUE NAME >>")
310-
.buildAsyncClient();
311-
Mono<ServiceBusReceiverAsyncClient> receiverAsyncClient = sessionReceiver.acceptSession("greetings");
312-
```
313-
314-
<!-- embedme ./src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java#L188-L194 -->
315-
```java
316-
// Creates a session-enabled receiver that gets messages from the first available session.
317-
ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder()
318-
.connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
319-
.sessionReceiver()
320-
.queueName("<< QUEUE NAME >>")
321-
.buildAsyncClient();
322-
Mono<ServiceBusReceiverAsyncClient> receiverAsyncClient = sessionReceiver.acceptNextSession();
323-
```
251+
In non-session case, you would use the sub builder `processor()`. In case of sessions, you would use the sub builder `sessionProcessor()`. Both sub builders will create an instance of `ServiceBusProcessorClient` configured to work on a session or a non-session Service Bus entity. In the case of the session processor, you can pass the maximum number of sessions you want the processor to process concurrently as well.
324252

325253
### Create a dead-letter queue Receiver
326254

0 commit comments

Comments
 (0)