Skip to content

Commit fb99570

Browse files
authored
Refactor service bus queue binder sample with functional approach (Azure#18920)
1 parent 1cc0be6 commit fb99570

File tree

10 files changed

+189
-85
lines changed

10 files changed

+189
-85
lines changed

sdk/spring/azure-spring-boot-samples/azure-spring-cloud-sample-servicebus-queue-binder/README.md

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@
33
## Key concepts
44

55
This code sample demonstrates how to use the Spring Cloud Stream binder for
6-
Azure Service Bus queue. The sample app exposes a RESTful API to receive string message.
7-
Then message is sent through Azure Service Bus to a `sink` which simply logs the message.
6+
Azure Service Bus queue. The sample app has two operating modes.
7+
One way is to expose a Restful API to receive string message, another way is to automatically provide string messages.
8+
These messages are published to a service bus queue. The sample will also consume messages from the same service bus queue.
89

910
## Getting started
1011

@@ -18,9 +19,12 @@ completed before the run.
1819

1920
### Create Azure resources
2021

21-
We have several ways to config the Spring Cloud Stream Binder for Azure
22-
Event Hub. You can choose anyone of them.
22+
We have several ways to config the Spring Cloud Stream Binder for Service
23+
Bus Queue. You can choose anyone of them.
2324

25+
>[!Important]
26+
>
27+
> When using the Restful API to send messages, the **Active profiles** must contain `manual`.
2428
2529
#### Method 1: Connection string based usage
2630

@@ -36,10 +40,15 @@ Event Hub. You can choose anyone of them.
3640
connection-string: [servicebus-namespace-connection-string]
3741
stream:
3842
bindings:
39-
input:
43+
consume-in-0:
4044
destination: [servicebus-queue-name]
41-
output:
45+
supply-out-0:
4246
destination: [servicebus-queue-name-same-as-above]
47+
function:
48+
definition: consume;supply;
49+
poller:
50+
fixed-delay: 1000
51+
initial-delay: 0
4352
```
4453
4554
#### Method 2: Service principal based usage
@@ -49,6 +58,10 @@ Event Hub. You can choose anyone of them.
4958
5059
1. Create Azure Service Bus namespace and queue.
5160
Please see [how to create][create-service-bus].
61+
62+
1. Add Role Assignment for Service Bus. See
63+
[Service principal for Azure resources with Service Bus][role-assignment]
64+
to add role assignment for Service Bus. Assign `Contributor` role for service bus.
5265

5366
1. Update [application-sp.yaml][application-sp.yaml].
5467
```yaml
@@ -63,12 +76,16 @@ Event Hub. You can choose anyone of them.
6376
namespace: [servicebus-namespace]
6477
stream:
6578
bindings:
66-
input:
79+
consume-in-0:
6780
destination: [servicebus-queue-name]
68-
output:
81+
supply-out-0:
6982
destination: [servicebus-queue-name-same-as-above]
83+
function:
84+
definition: consume;supply;
85+
poller:
86+
fixed-delay: 1000
87+
initial-delay: 0
7088
```
71-
7289
#### Method 3: MSI credential based usage
7390

7491
##### Set up managed identity
@@ -100,10 +117,15 @@ Please follow [create managed identity][create-managed-identity] to set up manag
100117
namespace: [servicebus-namespace]
101118
stream:
102119
bindings:
103-
input:
120+
consume-in-0:
104121
destination: [servicebus-queue-name]
105-
output:
122+
supply-out-0:
106123
destination: [servicebus-queue-name-same-as-above]
124+
function:
125+
definition: consume;supply;
126+
poller:
127+
fixed-delay: 1000
128+
initial-delay: 0
107129
```
108130
> We should specify `spring.profiles.active=mi` to run the Spring Boot application.
109131
For App Service, please add a configuration entry for this.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.spring.sample.servicebus.queue.binder;
5+
6+
import org.slf4j.Logger;
7+
import org.slf4j.LoggerFactory;
8+
import org.springframework.context.annotation.Bean;
9+
import org.springframework.context.annotation.Configuration;
10+
import org.springframework.context.annotation.Profile;
11+
import org.springframework.messaging.Message;
12+
import reactor.core.publisher.EmitterProcessor;
13+
import reactor.core.publisher.Flux;
14+
15+
import java.util.function.Supplier;
16+
17+
@Configuration
18+
@Profile("manual")
19+
public class ManualServiceProducerConfiguration {
20+
21+
private static final Logger LOGGER = LoggerFactory.getLogger(ServiceBusQueueBinderApplication.class);
22+
23+
@Bean
24+
public EmitterProcessor<Message<String>> emitter() {
25+
return EmitterProcessor.create();
26+
}
27+
28+
@Bean
29+
public Supplier<Flux<Message<String>>> supply(EmitterProcessor<Message<String>> emitter) {
30+
return () -> Flux.from(emitter)
31+
.doOnNext(m -> LOGGER.info("Manually sending message {}", m))
32+
.doOnError(t -> LOGGER.error("Error encountered", t));
33+
}
34+
}

sdk/spring/azure-spring-boot-samples/azure-spring-cloud-sample-servicebus-queue-binder/src/main/java/com/azure/spring/sample/servicebus/queue/binder/ServiceBusQueueBinderApplication.java

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,38 @@
33

44
package com.azure.spring.sample.servicebus.queue.binder;
55

6+
import com.azure.spring.integration.core.api.Checkpointer;
7+
import org.slf4j.Logger;
8+
import org.slf4j.LoggerFactory;
69
import org.springframework.boot.SpringApplication;
710
import org.springframework.boot.autoconfigure.SpringBootApplication;
11+
import org.springframework.context.annotation.Bean;
12+
import org.springframework.messaging.Message;
13+
14+
import java.util.function.Consumer;
15+
16+
import static com.azure.spring.integration.core.AzureHeaders.CHECKPOINTER;
817

9-
/**
10-
* @author Warren Zhu
11-
*/
1218
@SpringBootApplication
1319
public class ServiceBusQueueBinderApplication {
1420

21+
private static final Logger LOGGER = LoggerFactory.getLogger(ServiceBusQueueBinderApplication.class);
22+
1523
public static void main(String[] args) {
1624
SpringApplication.run(ServiceBusQueueBinderApplication.class, args);
1725
}
26+
27+
@Bean
28+
public Consumer<Message<String>> consume() {
29+
return message -> {
30+
Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
31+
LOGGER.info("New message received: '{}'", message);
32+
checkpointer.success().handle((r, ex) -> {
33+
if (ex == null) {
34+
LOGGER.info("Message '{}' successfully checkpointed", message);
35+
}
36+
return null;
37+
});
38+
};
39+
}
1840
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.spring.sample.servicebus.queue.binder;
5+
6+
import org.slf4j.Logger;
7+
import org.slf4j.LoggerFactory;
8+
import org.springframework.context.annotation.Bean;
9+
import org.springframework.context.annotation.Configuration;
10+
import org.springframework.context.annotation.Profile;
11+
import org.springframework.messaging.Message;
12+
import org.springframework.messaging.support.MessageBuilder;
13+
14+
import java.util.function.Supplier;
15+
16+
@Configuration
17+
@Profile("!manual")
18+
public class ServiceProducerConfiguration {
19+
20+
private static final Logger LOGGER = LoggerFactory.getLogger(ServiceBusQueueBinderApplication.class);
21+
22+
private int i = 0;
23+
24+
@Bean
25+
public Supplier<Message<String>> supply() {
26+
return () -> {
27+
LOGGER.info("Sending message, sequence " + i);
28+
return MessageBuilder.withPayload("Hello world, " + i++).build();
29+
};
30+
}
31+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.spring.sample.servicebus.queue.binder;
5+
6+
import org.slf4j.Logger;
7+
import org.slf4j.LoggerFactory;
8+
import org.springframework.beans.factory.annotation.Autowired;
9+
import org.springframework.context.annotation.Profile;
10+
import org.springframework.http.ResponseEntity;
11+
import org.springframework.messaging.Message;
12+
import org.springframework.messaging.support.MessageBuilder;
13+
import org.springframework.web.bind.annotation.GetMapping;
14+
import org.springframework.web.bind.annotation.PostMapping;
15+
import org.springframework.web.bind.annotation.RequestParam;
16+
import org.springframework.web.bind.annotation.RestController;
17+
import reactor.core.publisher.EmitterProcessor;
18+
19+
@RestController
20+
@Profile("manual")
21+
public class ServiceProducerController {
22+
23+
private static final Logger LOGGER = LoggerFactory.getLogger(ServiceBusQueueBinderApplication.class);
24+
25+
@Autowired
26+
private EmitterProcessor<Message<String>> emitterProcessor;
27+
28+
@PostMapping("/messages")
29+
public ResponseEntity<String> sendMessage(@RequestParam String message) {
30+
LOGGER.info("Going to add message {} to emitter", message);
31+
emitterProcessor.onNext(MessageBuilder.withPayload(message).build());
32+
return ResponseEntity.ok("Sent!");
33+
}
34+
35+
@GetMapping("/")
36+
public String welcome() {
37+
return "welcome";
38+
}
39+
}

sdk/spring/azure-spring-boot-samples/azure-spring-cloud-sample-servicebus-queue-binder/src/main/java/com/azure/spring/sample/servicebus/queue/binder/SinkExample.java

Lines changed: 0 additions & 29 deletions
This file was deleted.

sdk/spring/azure-spring-boot-samples/azure-spring-cloud-sample-servicebus-queue-binder/src/main/java/com/azure/spring/sample/servicebus/queue/binder/SourceExample.java

Lines changed: 0 additions & 30 deletions
This file was deleted.

sdk/spring/azure-spring-boot-samples/azure-spring-cloud-sample-servicebus-queue-binder/src/main/resources/application-mi.yaml

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,18 @@ spring:
1616
namespace: [servicebus-namespace]
1717
stream:
1818
bindings:
19-
input:
19+
consume-in-0:
2020
destination: [servicebus-queue-name]
21-
output:
21+
supply-out-0:
2222
destination: [servicebus-queue-name-same-as-above]
2323
servicebus:
2424
queue:
2525
bindings:
26-
input:
26+
consume-in-0:
2727
consumer:
28-
checkpoint-mode: MANUAL
28+
checkpoint-mode: MANUAL
29+
function:
30+
definition: consume;supply;
31+
poller:
32+
fixed-delay: 1000
33+
initial-delay: 0

sdk/spring/azure-spring-boot-samples/azure-spring-cloud-sample-servicebus-queue-binder/src/main/resources/application-sp.yaml

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,18 @@ spring:
1717
namespace: [servicebus-namespace]
1818
stream:
1919
bindings:
20-
input:
20+
consume-in-0:
2121
destination: [servicebus-queue-name]
22-
output:
22+
supply-out-0:
2323
destination: [servicebus-queue-name-same-as-above]
2424
servicebus:
2525
queue:
2626
bindings:
27-
input:
27+
consume-in-0:
2828
consumer:
29-
checkpoint-mode: MANUAL
29+
checkpoint-mode: MANUAL
30+
function:
31+
definition: consume;supply;
32+
poller:
33+
fixed-delay: 1000
34+
initial-delay: 0

sdk/spring/azure-spring-boot-samples/azure-spring-cloud-sample-servicebus-queue-binder/src/main/resources/application.yaml

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,18 @@ spring:
55
connection-string: [servicebus-namespace-connection-string]
66
stream:
77
bindings:
8-
input:
8+
consume-in-0:
99
destination: [servicebus-queue-name]
10-
output:
10+
supply-out-0:
1111
destination: [servicebus-queue-name-same-as-above]
1212
servicebus:
1313
queue:
1414
bindings:
15-
input:
15+
consume-in-0:
1616
consumer:
17-
checkpoint-mode: MANUAL
17+
checkpoint-mode: MANUAL
18+
function:
19+
definition: consume;supply;
20+
poller:
21+
fixed-delay: 1000
22+
initial-delay: 0

0 commit comments

Comments
 (0)