Skip to content

Commit 155e942

Browse files
Enrich EH stream binder sample with usage of StreamBridge. (Azure#22561)
* * Enrich EH stream binder sample with usage of StreamBridge.
1 parent 2279742 commit 155e942

File tree

6 files changed

+80
-12
lines changed

6 files changed

+80
-12
lines changed

sdk/spring/azure-spring-boot-samples/azure-spring-cloud-sample-eventhubs-binder/README.md

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -189,18 +189,43 @@ spring:
189189
region: [region]
190190
```
191191

192+
#### Enable sync message
193+
To enable message sending in a synchronized way with Spring Cloud Stream 3.x,
194+
azure-spring-cloud-stream-binder-eventhubs supports the sync producer mode to get responses for sent messages.
195+
By enabling following configuration, you could use [StreamBridge][StreamBridge] for the synchronized message producing.
196+
197+
```yaml
198+
spring:
199+
cloud:
200+
stream:
201+
eventhub:
202+
bindings:
203+
supply-out-0:
204+
producer:
205+
sync: true
206+
```
192207

193208
## Examples
194209

195210
1. Run the `mvn spring-boot:run` in the root of the code sample to get the app running.
196211

197212
1. Send a POST request
198-
199-
$ curl -X POST http://localhost:8080/messages?message=hello
200-
213+
214+
$ ### Send messages through imperative.
215+
$ curl -X POST http://localhost:8080/messages/imperative/staticalDestination?message=hello
216+
$ curl -X POST http://localhost:8080/messages/imperative/dynamicDestination?message=hello
217+
218+
$ ### Send messages through reactive.
219+
$ curl -X POST http://localhost:8080/messages/reactive?message=hello
220+
201221
or when the app runs on App Service or VM
202222

203-
$ curl -d -X POST https://[your-app-URL]/messages?message=hello
223+
$ ### Send messages through imperative.
224+
$ curl -d -X POST https://[your-app-URL]/messages/imperative/staticalDestination?message=hello
225+
$ curl -d -X POST https://[your-app-URL]/messages/imperative/dynamicDestination?message=hello
226+
227+
$ ### Send messages through reactive.
228+
$ curl -d -X POST https://[your-app-URL]/messages/reactive?message=hello
204229

205230
1. Verify in your app’s logs that a similar message was posted:
206231

@@ -230,3 +255,4 @@ spring:
230255
[application-mi.yaml]: https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/spring/azure-spring-boot-samples/azure-spring-cloud-sample-eventhubs-binder/src/main/resources/application-mi.yaml
231256
[application.yaml]: https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/spring/azure-spring-boot-samples/azure-spring-cloud-sample-eventhubs-binder/src/main/resources/application.yaml
232257
[application-sp.yaml]: https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/spring/azure-spring-boot-samples/azure-spring-cloud-sample-eventhubs-binder/src/main/resources/application-sp.yaml
258+
[StreamBridge]: https://docs.spring.io/spring-cloud-stream/docs/3.1.3/reference/html/spring-cloud-stream.html#_sending_arbitrary_data_to_an_output_e_g_foreign_event_driven_sources
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.spring.sample.eventhubs.binder;
5+
6+
import org.slf4j.Logger;
7+
import org.slf4j.LoggerFactory;
8+
import org.springframework.beans.factory.annotation.Autowired;
9+
import org.springframework.cloud.stream.function.StreamBridge;
10+
import org.springframework.context.annotation.Profile;
11+
import org.springframework.http.ResponseEntity;
12+
import org.springframework.web.bind.annotation.PostMapping;
13+
import org.springframework.web.bind.annotation.RequestParam;
14+
import org.springframework.web.bind.annotation.RestController;
15+
16+
@RestController
17+
@Profile("manual")
18+
public class ImperativeEventProducerController {
19+
20+
private static final Logger LOGGER = LoggerFactory.getLogger(ImperativeEventProducerController.class);
21+
22+
//TODO Add output destination
23+
private String bindingName = "<output-destination>";
24+
25+
@Autowired
26+
private StreamBridge streamBridge;
27+
28+
@PostMapping("/messages/imperative/staticalDestination")
29+
public ResponseEntity<String> sendMessageToStaticalDestination(@RequestParam String message) {
30+
LOGGER.info("Imperative method to send message: {} to static destination.", message);
31+
streamBridge.send("supply-out-0", message);
32+
LOGGER.info("Sent {}.", message);
33+
return ResponseEntity.ok(message);
34+
}
35+
36+
@PostMapping("/messages/imperative/dynamicDestination")
37+
public ResponseEntity<String> syncSendMessageToDynamicDestination(@RequestParam String message) {
38+
LOGGER.info("Imperative method to send message: {} to dynamic destination.", message);
39+
streamBridge.send(bindingName, message);
40+
LOGGER.info("Sent {}.", message);
41+
return ResponseEntity.ok(message);
42+
}
43+
44+
}

sdk/spring/azure-spring-boot-samples/azure-spring-cloud-sample-eventhubs-binder/src/main/java/com/azure/spring/sample/eventhubs/binder/EventProducerController.java renamed to sdk/spring/azure-spring-boot-samples/azure-spring-cloud-sample-eventhubs-binder/src/main/java/com/azure/spring/sample/eventhubs/binder/ReactiveEventProducerController.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,16 @@
2121
*/
2222
@RestController
2323
@Profile("manual")
24-
public class EventProducerController {
24+
public class ReactiveEventProducerController {
2525

26-
private static final Logger LOGGER = LoggerFactory.getLogger(EventHubBinderApplication.class);
26+
private static final Logger LOGGER = LoggerFactory.getLogger(ReactiveEventProducerController.class);
2727

2828
@Autowired
2929
private Sinks.Many<Message<String>> many;
3030

31-
@PostMapping("/messages")
32-
public ResponseEntity<String> sendMessage(@RequestParam String message) {
33-
LOGGER.info("Going to add message {} to sendMessage.", message);
31+
@PostMapping("/messages/reactive")
32+
public ResponseEntity<String> reactiveSendMessage(@RequestParam String message) {
33+
LOGGER.info("Reactive method to send message: {} to destination.", message);
3434
many.emitNext(MessageBuilder.withPayload(message).build(), Sinks.EmitFailureHandler.FAIL_FAST);
3535
return ResponseEntity.ok(message);
3636
}

sdk/spring/azure-spring-boot-samples/azure-spring-cloud-sample-eventhubs-binder/src/main/resources/application-mi.yaml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,4 +37,3 @@ spring:
3737
poller:
3838
initial-delay: 0
3939
fixed-delay: 1000
40-

sdk/spring/azure-spring-boot-samples/azure-spring-cloud-sample-eventhubs-binder/src/main/resources/application-sp.yaml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,4 +38,3 @@ spring:
3838
poller:
3939
initial-delay: 0
4040
fixed-delay: 1000
41-

sdk/spring/azure-spring-cloud-stream-binder-eventhubs/src/main/java/com/azure/spring/eventhub/stream/binder/properties/EventHubProducerProperties.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ public class EventHubProducerProperties {
1212
* If true, the producer will wait for a response from Event Hub after a send operation before sending next message.
1313
* If false, the producer will keep sending without waiting response
1414
* <p>
15-
* Default: true
15+
* Default: false
1616
*/
1717
private boolean sync;
1818

0 commit comments

Comments
 (0)