Skip to content

Commit 64474e3

Browse files
authored
Refactor service bus queue multi binder sample (Azure#18946)
1 parent fb99570 commit 64474e3

File tree

11 files changed

+236
-152
lines changed

11 files changed

+236
-152
lines changed

sdk/spring/azure-spring-boot-samples/azure-spring-cloud-sample-servicebus-queue-multibinders/README.md

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,10 @@
22

33
## Key concepts
44
This code sample demonstrates how to use the Spring Cloud Stream Binder for
5-
multiple Azure Service Bus namespaces.
6-
In this sample you will bind to two Service Bus namespaces separately through
7-
two queue binders.
8-
The sample app exposes RESTful APIs to receive string message.
9-
Then message is sent through Azure Service Bus to a `sink` which simply logs the message.
5+
multiple Azure Service Bus namespaces. In this sample you will bind to two Service Bus namespaces separately through
6+
two queue binders.The sample app has two operating modes. One way is to expose a Restful API to receive string message,
7+
another way is to automatically provide string messages. These messages are published to a service bus.
8+
The sample will also consume messages from the same service bus.
109

1110
## Getting started
1211

@@ -42,14 +41,14 @@ is completed before the run.
4241
cloud:
4342
stream:
4443
bindings:
45-
input:
44+
consume1-in-0:
4645
destination: [servicebus-queue-1-name]
47-
output:
46+
supply1-out-0:
4847
destination: [servicebus-queue-1-name-same-as-above]
49-
input1:
48+
consume2-in-0:
5049
binder: servicebus-2
5150
destination: [servicebus-queue-2-name]
52-
output1:
51+
supply2-out-0:
5352
binder: servicebus-2
5453
destination: [servicebus-queue-2-name-same-as-above]
5554

@@ -71,7 +70,15 @@ is completed before the run.
7170
cloud:
7271
azure:
7372
servicebus:
74-
connection-string: [servicebus-namespace-2-connection-string]
73+
connection-string: [servicebus-namespace-2-connection-string]
74+
75+
#To specify which functional bean to bind to the external destination(s) exposed by the bindings
76+
function:
77+
definition: consume1;supply1;consume2;supply2;
78+
poller:
79+
initial-delay: 0
80+
fixed-delay: 1000
81+
7582
```
7683

7784
> The **defaultCandidate** configuration item:
@@ -80,26 +87,30 @@ default binder, or can be used only when explicitly referenced. This
8087
allows adding binder configurations without interfering with the default
8188
processing.
8289

90+
>[!Important]
91+
>
92+
> When using the Restful API to send messages, the **Active profiles** must contain `manual`.
93+
8394
1. Run the `mvn clean spring-boot:run` in the root of the code sample
8495
to get the app running.
8596

8697
1. Send a POST request to test the default binder
8798

88-
$ curl -X POST http://localhost:8080/messages?message=hello
99+
$ curl -X POST http://localhost:8080/messages1?message=hello
89100

90101
1. Verify in your app’s logs that a similar message was posted:
91102

92-
[1] New message received: 'hello'
93-
[1] Message 'hello' successfully checkpointed
103+
[1] New message1 received: 'hello'
104+
[1] Message1 'hello' successfully checkpointed
94105

95106
1. Send another POST request to test the other binder
96107

97-
$ curl -X POST http://localhost:8080/messages1?message=hello
108+
$ curl -X POST http://localhost:8080/messages2?message=hello
98109

99110
1. Verify in your app’s logs that a similar message was posted:
100111

101-
[2] New message received: 'hello'
102-
[2] Message 'hello' successfully checkpointed
112+
[2] New message2 received: 'hello'
113+
[2] Message2 'hello' successfully checkpointed
103114

104115
6. Delete the resources on [Azure Portal][azure-portal]
105116
to avoid unexpected charges.

sdk/spring/azure-spring-boot-samples/azure-spring-cloud-sample-servicebus-queue-multibinders/src/main/java/com/azure/spring/sample/servicebus/queue/multibinders/CustomProcessor.java

Lines changed: 0 additions & 36 deletions
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.spring.sample.servicebus.queue.multibinders;
5+
6+
import org.slf4j.Logger;
7+
import org.slf4j.LoggerFactory;
8+
import org.springframework.context.annotation.Bean;
9+
import org.springframework.context.annotation.Configuration;
10+
import org.springframework.context.annotation.Profile;
11+
import org.springframework.messaging.Message;
12+
import reactor.core.publisher.EmitterProcessor;
13+
import reactor.core.publisher.Flux;
14+
15+
import java.util.function.Supplier;
16+
17+
@Configuration
18+
@Profile("manual")
19+
public class ManualServiceProducerConfiguration {
20+
21+
private static final Logger LOGGER = LoggerFactory.getLogger(ServiceBusQueueMultiBindersApplication.class);
22+
23+
@Bean
24+
public EmitterProcessor<Message<String>> emitterProcessor1() {
25+
return EmitterProcessor.create();
26+
}
27+
28+
@Bean
29+
public EmitterProcessor<Message<String>> emitterProcessor2() {
30+
return EmitterProcessor.create();
31+
}
32+
33+
@Bean
34+
public Supplier<Flux<Message<String>>> supply1(EmitterProcessor<Message<String>> emitterProcessor1) {
35+
return () -> Flux.from(emitterProcessor1)
36+
.doOnNext(m -> LOGGER.info("Manually sending message1 {}", m))
37+
.doOnError(t -> LOGGER.error("Error encountered", t));
38+
}
39+
40+
@Bean
41+
public Supplier<Flux<Message<String>>> supply2(EmitterProcessor<Message<String>> emitterProcessor2) {
42+
return () -> Flux.from(emitterProcessor2)
43+
.doOnNext(m -> LOGGER.info("Manually sending message2 {}", m))
44+
.doOnError(t -> LOGGER.error("Error encountered", t));
45+
}
46+
}

sdk/spring/azure-spring-boot-samples/azure-spring-cloud-sample-servicebus-queue-multibinders/src/main/java/com/azure/spring/sample/servicebus/queue/multibinders/ServiceBusQueueMultiBindersApplication.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,56 @@
33

44
package com.azure.spring.sample.servicebus.queue.multibinders;
55

6+
import com.azure.spring.integration.core.api.Checkpointer;
7+
import org.slf4j.Logger;
8+
import org.slf4j.LoggerFactory;
69
import org.springframework.boot.SpringApplication;
710
import org.springframework.boot.autoconfigure.SpringBootApplication;
11+
import org.springframework.context.annotation.Bean;
12+
import org.springframework.messaging.Message;
13+
14+
import java.util.function.Consumer;
15+
16+
import static com.azure.spring.integration.core.AzureHeaders.CHECKPOINTER;
817

918
/**
1019
* @author Yi Liu, 2020-4-30.
1120
*/
1221
@SpringBootApplication
1322
public class ServiceBusQueueMultiBindersApplication {
1423

24+
private static final Logger LOGGER = LoggerFactory.getLogger(ServiceBusQueueMultiBindersApplication.class);
25+
1526
public static void main(String[] args) {
1627
SpringApplication.run(ServiceBusQueueMultiBindersApplication.class, args);
1728
}
1829

30+
@Bean
31+
public Consumer<Message<String>> consume1() {
32+
return message -> {
33+
Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
34+
LOGGER.info("New message1 received: '{}'", message);
35+
checkpointer.success().handle((r, ex) -> {
36+
if (ex == null) {
37+
LOGGER.info("Message1 '{}' successfully checkpointed", message);
38+
}
39+
return null;
40+
});
41+
};
42+
}
43+
44+
@Bean
45+
public Consumer<Message<String>> consume2() {
46+
return message -> {
47+
Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
48+
LOGGER.info("New message2 received: '{}'", message);
49+
checkpointer.success().handle((r, ex) -> {
50+
if (ex == null) {
51+
LOGGER.info("Message2 '{}' successfully checkpointed", message);
52+
}
53+
return null;
54+
});
55+
};
56+
}
57+
1958
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.spring.sample.servicebus.queue.multibinders;
5+
6+
import org.slf4j.Logger;
7+
import org.slf4j.LoggerFactory;
8+
import org.springframework.context.annotation.Bean;
9+
import org.springframework.context.annotation.Configuration;
10+
import org.springframework.context.annotation.Profile;
11+
import org.springframework.messaging.Message;
12+
import org.springframework.messaging.support.MessageBuilder;
13+
14+
import java.util.function.Supplier;
15+
16+
@Configuration
17+
@Profile("!manual")
18+
public class ServiceProducerConfiguration {
19+
20+
private static final Logger LOGGER = LoggerFactory.getLogger(ServiceBusQueueMultiBindersApplication.class);
21+
22+
private int i = 0;
23+
private int j = 0;
24+
25+
@Bean
26+
public Supplier<Message<String>> supply1() {
27+
return () -> {
28+
LOGGER.info("Sending message1, sequence1 " + i);
29+
return MessageBuilder.withPayload("Hello world1, " + i++).build();
30+
};
31+
}
32+
33+
@Bean
34+
public Supplier<Message<String>> supply2() {
35+
return () -> {
36+
LOGGER.info("Sending message2, sequence2 " + j);
37+
return MessageBuilder.withPayload("Hello world2, " + j++).build();
38+
};
39+
}
40+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.spring.sample.servicebus.queue.multibinders;
5+
6+
import org.slf4j.Logger;
7+
import org.slf4j.LoggerFactory;
8+
import org.springframework.context.annotation.Profile;
9+
import org.springframework.http.ResponseEntity;
10+
import org.springframework.messaging.Message;
11+
import org.springframework.messaging.support.MessageBuilder;
12+
import org.springframework.web.bind.annotation.PostMapping;
13+
import org.springframework.web.bind.annotation.RequestParam;
14+
import org.springframework.web.bind.annotation.RestController;
15+
import reactor.core.publisher.EmitterProcessor;
16+
17+
import javax.annotation.Resource;
18+
19+
@RestController
20+
@Profile("manual")
21+
public class ServiceProducerController {
22+
23+
private static final Logger LOGGER = LoggerFactory.getLogger(ServiceBusQueueMultiBindersApplication.class);
24+
25+
@Resource(name = "emitterProcessor1")
26+
private EmitterProcessor<Message<String>> emitterProcessor1;
27+
28+
@Resource(name = "emitterProcessor2")
29+
private EmitterProcessor<Message<String>> emitterProcessor2;
30+
31+
@PostMapping("/messages1")
32+
public ResponseEntity<String> sendMessage1(@RequestParam String message) {
33+
LOGGER.info("Going to add message {} to emitter1", message);
34+
emitterProcessor1.onNext(MessageBuilder.withPayload(message).build());
35+
return ResponseEntity.ok("Sent1!");
36+
}
37+
38+
@PostMapping("/messages2")
39+
public ResponseEntity<String> sendMessage2(@RequestParam String message) {
40+
LOGGER.info("Going to add message {} to emitter2", message);
41+
emitterProcessor2.onNext(MessageBuilder.withPayload(message).build());
42+
return ResponseEntity.ok("Sent2!");
43+
}
44+
}

sdk/spring/azure-spring-boot-samples/azure-spring-cloud-sample-servicebus-queue-multibinders/src/main/java/com/azure/spring/sample/servicebus/queue/multibinders/SinkExample.java

Lines changed: 0 additions & 40 deletions
This file was deleted.

sdk/spring/azure-spring-boot-samples/azure-spring-cloud-sample-servicebus-queue-multibinders/src/main/java/com/azure/spring/sample/servicebus/queue/multibinders/SourceExample.java

Lines changed: 0 additions & 35 deletions
This file was deleted.

0 commit comments

Comments
 (0)