Skip to content

Commit 627b9c8

Browse files
authored
Replace @Streamlistener with functional approach in eventhub-binder sample (Azure#18800)
1 parent 957bfc5 commit 627b9c8

File tree

10 files changed

+195
-103
lines changed

10 files changed

+195
-103
lines changed

sdk/spring/azure-spring-boot-samples/azure-spring-cloud-sample-eventhubs-binder/README.md

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,9 @@
22

33
## Key concepts
44

5-
This code sample demonstrates how to use the Spring Cloud Stream Binder
6-
for Azure Event Hub. The sample app exposes a RESTful API to receive string
7-
message. Then message is sent through Azure Event Hub to a `sink` which
8-
simply logs the message.
5+
This code sample demonstrates how to use the Spring Cloud Stream Binder for Azure Event Hub.The sample app has two operating modes.
6+
One way is to expose a Restful API to receive string message, another way is to automatically provide string messages.
7+
These messages are published to an event hub. The sample will also consume messages from the same event hub.
98

109
## Getting started
1110

@@ -22,6 +21,10 @@ completed before the run.
2221
We have several ways to config the Spring Cloud Stream Binder for Azure
2322
Event Hub. You can choose anyone of them.
2423

24+
>[!Important]
25+
>
26+
> When using the Restful API to send messages, the **Active profiles** must contain `manual`.
27+
2528

2629
#### Method 1: Connection string based usage
2730

@@ -46,10 +49,10 @@ Event Hub. You can choose anyone of them.
4649
checkpoint-container: [checkpoint-container]
4750
stream:
4851
bindings:
49-
input:
52+
consume-in-0:
5053
destination: [eventhub-name]
5154
group: [consumer-group]
52-
output:
55+
supply-out-0:
5356
destination: [the-same-eventhub-name-as-above]
5457
```
5558
@@ -63,6 +66,15 @@ Event Hub. You can choose anyone of them.
6366
can create your own Consumer Group or use the default "$Default" Consumer Group.
6467

6568
1. Create [Azure Storage][create-azure-storage] for checkpoint use.
69+
70+
1. Add Role Assignment for Event Hub, Storage Account and Resource group. See
71+
[Service principal for Azure resources with Event Hubs][role-assignment]
72+
to add role assignment for Event Hub, Storage Account, Resource group is similar.
73+
74+
- Resource group: assign `Contributor` role for service principal.
75+
- Event Hub: assign `Contributor` role for service principal.
76+
- Storage Account: assign `Storage Account Key Operator Service Role`
77+
role for service principal.
6678

6779
1. Update [application-sp.yaml][application-sp.yaml].
6880
```yaml
@@ -79,12 +91,13 @@ Event Hub. You can choose anyone of them.
7991
checkpoint-container: [checkpoint-container]
8092
stream:
8193
bindings:
82-
input:
94+
consume-in-0:
8395
destination: [eventhub-name]
8496
group: [consumer-group]
85-
output:
97+
supply-out-0:
8698
destination: [the-same-eventhub-name-as-above]
8799
```
100+
> We should specify `spring.profiles.active=sp` to run the Spring Boot application.
88101

89102
#### Method 3: MSI credential based usage
90103

@@ -127,10 +140,10 @@ Please follow [create managed identity][create-managed-identity] to set up manag
127140
checkpoint-container: [checkpoint-container]
128141
stream:
129142
bindings:
130-
input:
143+
consume-in-0:
131144
destination: [eventhub-name]
132145
group: [consumer-group]
133-
output:
146+
supply-out-0:
134147
destination: [the-same-eventhub-name-as-above]
135148
```
136149
> We should specify `spring.profiles.active=mi` to run the Spring Boot application.

sdk/spring/azure-spring-boot-samples/azure-spring-cloud-sample-eventhubs-binder/src/main/java/com/azure/spring/sample/eventhubs/binder/EventHubBinderApplication.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,53 @@
33

44
package com.azure.spring.sample.eventhubs.binder;
55

6+
import com.azure.spring.integration.core.api.reactor.Checkpointer;
7+
import org.slf4j.Logger;
8+
import org.slf4j.LoggerFactory;
69
import org.springframework.boot.SpringApplication;
710
import org.springframework.boot.autoconfigure.SpringBootApplication;
11+
import org.springframework.context.annotation.Bean;
12+
import org.springframework.integration.annotation.ServiceActivator;
13+
import org.springframework.messaging.Message;
14+
15+
import java.util.function.Consumer;
16+
17+
import static com.azure.spring.integration.core.AzureHeaders.CHECKPOINTER;
818

919
/**
1020
* @author Warren Zhu
1121
*/
1222
@SpringBootApplication
1323
public class EventHubBinderApplication {
1424

25+
private static final Logger LOGGER = LoggerFactory.getLogger(EventHubBinderApplication.class);
26+
1527
public static void main(String[] args) {
1628
SpringApplication.run(EventHubBinderApplication.class, args);
1729
}
30+
31+
@Bean
32+
public Consumer<Message<String>> consume() {
33+
return message -> {
34+
Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
35+
LOGGER.info("New message received: '{}'", message);
36+
checkpointer.success()
37+
.doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message))
38+
.doOnError(error -> LOGGER.error("Exception: {}", error.getMessage()))
39+
.subscribe();
40+
};
41+
}
42+
43+
// Replace destination with spring.cloud.stream.bindings.consume-in-0.destination
44+
// Replace group with spring.cloud.stream.bindings.consume-in-0.group
45+
@ServiceActivator(inputChannel = "{destination}.{group}.errors")
46+
public void consumerError(Message<?> message) {
47+
LOGGER.error("Handling customer ERROR: " + message);
48+
}
49+
50+
// Replace destination with spring.cloud.stream.bindings.supply-out-0.destination
51+
@ServiceActivator(inputChannel = "{destination}.errors")
52+
public void producerError(Message<?> message) {
53+
LOGGER.error("Handling Producer ERROR: " + message);
54+
}
1855
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.spring.sample.eventhubs.binder;
5+
6+
import org.slf4j.Logger;
7+
import org.slf4j.LoggerFactory;
8+
import org.springframework.context.annotation.Bean;
9+
import org.springframework.context.annotation.Configuration;
10+
import org.springframework.context.annotation.Profile;
11+
import org.springframework.messaging.Message;
12+
import org.springframework.messaging.support.MessageBuilder;
13+
14+
import java.util.function.Supplier;
15+
16+
@Configuration
17+
@Profile("!manual")
18+
public class EventProducerConfiguration {
19+
20+
private static final Logger LOGGER = LoggerFactory.getLogger(EventHubBinderApplication.class);
21+
22+
private int i = 0;
23+
24+
@Bean
25+
public Supplier<Message<String>> supply() {
26+
return () -> {
27+
LOGGER.info("Sending message, sequence " + i);
28+
return MessageBuilder.withPayload("Hello world, " + i++).build();
29+
};
30+
}
31+
32+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.spring.sample.eventhubs.binder;
5+
6+
import org.slf4j.Logger;
7+
import org.slf4j.LoggerFactory;
8+
import org.springframework.beans.factory.annotation.Autowired;
9+
import org.springframework.context.annotation.Profile;
10+
import org.springframework.http.ResponseEntity;
11+
import org.springframework.messaging.Message;
12+
import org.springframework.messaging.support.MessageBuilder;
13+
import org.springframework.web.bind.annotation.GetMapping;
14+
import org.springframework.web.bind.annotation.PostMapping;
15+
import org.springframework.web.bind.annotation.RequestParam;
16+
import org.springframework.web.bind.annotation.RestController;
17+
import reactor.core.publisher.EmitterProcessor;
18+
19+
/**
20+
* @author Warren Zhu
21+
*/
22+
@RestController
23+
@Profile("manual")
24+
public class EventProducerController {
25+
26+
private static final Logger LOGGER = LoggerFactory.getLogger(EventHubBinderApplication.class);
27+
28+
@Autowired
29+
private EmitterProcessor<Message<String>> emitterProcessor;
30+
31+
@PostMapping("/messages")
32+
public ResponseEntity<String> sendMessage(@RequestParam String message) {
33+
LOGGER.info("Going to add message {} to emitter", message);
34+
emitterProcessor.onNext(MessageBuilder.withPayload(message).build());
35+
return ResponseEntity.ok("Sent!");
36+
}
37+
38+
@GetMapping("/")
39+
public String welcome() {
40+
return "welcome";
41+
}
42+
43+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.spring.sample.eventhubs.binder;
5+
6+
import org.slf4j.Logger;
7+
import org.slf4j.LoggerFactory;
8+
import org.springframework.context.annotation.Bean;
9+
import org.springframework.context.annotation.Configuration;
10+
import org.springframework.context.annotation.Profile;
11+
import org.springframework.messaging.Message;
12+
import reactor.core.publisher.EmitterProcessor;
13+
import reactor.core.publisher.Flux;
14+
15+
import java.util.function.Supplier;
16+
17+
@Configuration
18+
@Profile("manual")
19+
public class ManualEventProducerConfiguration {
20+
21+
private static final Logger LOGGER = LoggerFactory.getLogger(EventHubBinderApplication.class);
22+
23+
@Bean
24+
public EmitterProcessor<Message<String>> emitter() {
25+
return EmitterProcessor.create();
26+
}
27+
28+
@Bean
29+
public Supplier<Flux<Message<String>>> supply(EmitterProcessor<Message<String>> emitter) {
30+
return () -> Flux.from(emitter)
31+
.doOnNext(m -> LOGGER.info("Manually sending message {}", m))
32+
.doOnError(t -> LOGGER.error("Error encountered", t));
33+
}
34+
}

sdk/spring/azure-spring-boot-samples/azure-spring-cloud-sample-eventhubs-binder/src/main/java/com/azure/spring/sample/eventhubs/binder/SinkExample.java

Lines changed: 0 additions & 46 deletions
This file was deleted.

sdk/spring/azure-spring-boot-samples/azure-spring-cloud-sample-eventhubs-binder/src/main/java/com/azure/spring/sample/eventhubs/binder/SourceExample.java

Lines changed: 0 additions & 36 deletions
This file was deleted.

sdk/spring/azure-spring-boot-samples/azure-spring-cloud-sample-eventhubs-binder/src/main/resources/application-mi.yaml

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,23 @@ spring:
1818
checkpoint-container: [checkpoint-container]
1919
stream:
2020
bindings:
21-
input:
21+
consume-in-0:
2222
destination: [eventhub-name]
2323
group: [consumer-group]
24-
output:
24+
supply-out-0:
2525
destination: [the-same-eventhub-name-as-above]
2626

2727
eventhub:
2828
bindings:
29-
input:
29+
consume-in-0:
3030
consumer:
3131
checkpoint-mode: MANUAL
3232
default:
3333
producer:
3434
errorChannelEnabled: true
35+
function:
36+
definition: consume;supply;
37+
poller:
38+
initial-delay: 0
39+
fixed-delay: 1000
40+

sdk/spring/azure-spring-boot-samples/azure-spring-cloud-sample-eventhubs-binder/src/main/resources/application-sp.yaml

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,19 +19,23 @@ spring:
1919
checkpoint-container: [checkpoint-container]
2020
stream:
2121
bindings:
22-
input:
22+
consume-in-0:
2323
destination: [eventhub-name]
2424
group: [consumer-group]
25-
output:
25+
supply-out-0:
2626
destination: [the-same-eventhub-name-as-above]
2727

2828
eventhub:
2929
bindings:
30-
input:
30+
consume-in-0:
3131
consumer:
3232
checkpoint-mode: MANUAL
3333
default:
3434
producer:
3535
errorChannelEnabled: true
36-
36+
function:
37+
definition: consume;supply;
38+
poller:
39+
initial-delay: 0
40+
fixed-delay: 1000
3741

0 commit comments

Comments
 (0)