Skip to content

Commit 0a7005f

Browse files
Service bus stream binder should configure partitionkey by default. (Azure#23193)
* * Fixed an issue where the session id and partition key are used simultaneously and they are different. * ServiceBus related partition key logic changes.
1 parent 25abad9 commit 0a7005f

File tree

14 files changed

+480
-51
lines changed

14 files changed

+480
-51
lines changed

sdk/spring/azure-spring-cloud-autoconfigure/CHANGELOG.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@ This release is compatible with Spring Boot 2.5.0 - 2.5.3 and Spring Cloud 2020.
66
- Fixed service bus cleint factory destroyed, resource not released bug.([#23195](https://github.com/Azure/azure-sdk-for-java/pull/23195))
77
### Dependency Upgrades
88
- Upgrade to [spring-boot-dependencies:2.5.3](https://repo.maven.apache.org/maven2/org/springframework/boot/spring-boot-dependencies/2.5.3/spring-boot-dependencies-2.5.3.pom).
9-
9+
### Breaking Changes
10+
- Override paritionkey when session id is set. ([#23135](https://github.com/Azure/azure-sdk-for-java/pull/23135))
11+
- Adjust the order of different partition key header. ([#23135](https://github.com/Azure/azure-sdk-for-java/pull/23135))
1012

1113
## 2.7.0 (2021-07-20)
1214
### Key Bug Fixes

sdk/spring/azure-spring-cloud-starter-servicebus/CHANGELOG.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@ This release is compatible with Spring Boot 2.5.0 - 2.5.3 and Spring Cloud 2020.
66
- Fixed service bus cleint factory destroyed, resource not released bug.([#23195](https://github.com/Azure/azure-sdk-for-java/pull/23195))
77
### Dependency Upgrades
88
- Upgrade to [spring-boot-dependencies:2.5.3](https://repo.maven.apache.org/maven2/org/springframework/boot/spring-boot-dependencies/2.5.3/spring-boot-dependencies-2.5.3.pom).
9-
9+
### Breaking Changes
10+
- Override paritionkey when session id is set. ([#23135](https://github.com/Azure/azure-sdk-for-java/pull/23135))
11+
- Adjust the order of different partition key header. ([#23135](https://github.com/Azure/azure-sdk-for-java/pull/23135))
1012

1113
## 2.7.0 (2021-07-20)
1214
### Key Bug Fixes

sdk/spring/azure-spring-cloud-starter-servicebus/README.md

Lines changed: 79 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,88 @@ SessionID | com.azure.spring.integration.servicebus.converter.ServiceBusMessageH
4747
CorrelationId | com.azure.spring.integration.servicebus.converter.ServiceBusMessageHeaders.CORRELATION_ID | String | N/A
4848
To | com.azure.spring.integration.servicebus.converter.ServiceBusMessageHeaders.TO | String | N/A
4949
ReplyToSessionId | com.azure.spring.integration.servicebus.converter.ServiceBusMessageHeaders.REPLY_TO_SESSION_ID | String | N/A
50-
PartitionKey | com.azure.spring.integration.servicebus.converter.ServiceBusMessageHeaders.PARTITION_KEY | String | N/A
50+
**PartitionKey** | com.azure.spring.integration.servicebus.converter.ServiceBusMessageHeaders.PARTITION_KEY | String | 1
51+
**PartitionKey** | com.azure.spring.integration.core.AzureHeaders.PARTITION_KEY | String | 2
5152

5253
## Examples
54+
## Usage examples
55+
**Example: Manually set the partition key for the message**
56+
57+
This example demonstrates how to manually set the partition key for the message in the application.
58+
59+
**Way 1:**
60+
This example requires that `spring.cloud.stream.default.producer.partitionKeyExpression` be set `"'partitionKey-' + headers[<message-header-key>]"`.
61+
```yaml
62+
spring:
63+
cloud:
64+
azure:
65+
servicebus:
66+
connection-string: [servicebus-namespace-connection-string]
67+
stream:
68+
default:
69+
producer:
70+
partitionKeyExpression: "'partitionKey-' + headers[<message-header-key>]"
71+
```
72+
```java
73+
@PostMapping("/messages")
74+
public ResponseEntity<String> sendMessage(@RequestParam String message) {
75+
LOGGER.info("Going to add message {} to Sinks.Many.", message);
76+
many.emitNext(MessageBuilder.withPayload(message)
77+
.setHeader("<message-header-key>", "Customize partirion key")
78+
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
79+
return ResponseEntity.ok("Sent!");
80+
}
81+
```
82+
83+
> **NOTE:** When using `application.yml` to configure the partition key, its priority will be the lowest.
84+
> It will take effect only when the `ServiceBusMessageHeaders.SESSION_ID`, `ServiceBusMessageHeaders.PARTITION_KEY`, `AzureHeaders.PARTITION_KEY` are not configured.
85+
86+
**Way 2:**
87+
Manually add the partition Key in the message header by code.
88+
89+
*Recommended:* Use `ServiceBusMessageHeaders.PARTITION_KEY` as the key of the header.
90+
```java
91+
@PostMapping("/messages")
92+
public ResponseEntity<String> sendMessage(@RequestParam String message) {
93+
LOGGER.info("Going to add message {} to Sinks.Many.", message);
94+
many.emitNext(MessageBuilder.withPayload(message)
95+
.setHeader(ServiceBusMessageHeaders.PARTITION_KEY, "Customize partirion key")
96+
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
97+
return ResponseEntity.ok("Sent!");
98+
}
99+
```
100+
101+
*Not recommended but currently supported:* `AzureHeaders.PARTITION_KEY` as the key of the header.
102+
```java
103+
@PostMapping("/messages")
104+
public ResponseEntity<String> sendMessage(@RequestParam String message) {
105+
LOGGER.info("Going to add message {} to Sinks.Many.", message);
106+
many.emitNext(MessageBuilder.withPayload(message)
107+
.setHeader(AzureHeaders.PARTITION_KEY, "Customize partirion key")
108+
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
109+
return ResponseEntity.ok("Sent!");
110+
}
111+
```
112+
> **NOTE:** When both `ServiceBusMessageHeaders.PARTITION_KEY` and `AzureHeaders.PARTITION_KEY` are set in the message headers,
113+
> `ServiceBusMessageHeaders.PARTITION_KEY` is preferred.
114+
115+
**Example: Set the session id for the message**
116+
117+
This example demonstrates how to manually set the session id of a message in the application.
118+
119+
```java
120+
@PostMapping("/messages")
121+
public ResponseEntity<String> sendMessage(@RequestParam String message) {
122+
LOGGER.info("Going to add message {} to Sinks.Many.", message);
123+
many.emitNext(MessageBuilder.withPayload(message)
124+
.setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session id")
125+
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
126+
return ResponseEntity.ok("Sent!");
127+
}
128+
```
53129

130+
> **NOTE:** When the `ServiceBusMessageHeaders.SESSION_ID` is set in the message headers, and a different `ServiceBusMessageHeaders.PARTITION_KEY` (or `AzureHeaders.PARTITION_KEY`) header is also set,
131+
> the value of the session id will eventually be used to overwrite the value of the partition key.
54132
55133
## Troubleshooting
56134
### Enable Spring logging

sdk/spring/azure-spring-cloud-stream-binder-servicebus-core/src/main/java/com/azure/spring/servicebus/stream/binder/ServiceBusMessageChannelBinder.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,20 +3,20 @@
33

44
package com.azure.spring.servicebus.stream.binder;
55

6-
import com.azure.spring.servicebus.stream.binder.properties.ServiceBusConsumerProperties;
7-
import com.azure.spring.servicebus.stream.binder.properties.ServiceBusExtendedBindingProperties;
8-
import com.azure.spring.servicebus.stream.binder.properties.ServiceBusProducerProperties;
9-
import com.azure.spring.servicebus.stream.binder.provisioning.ServiceBusChannelProvisioner;
106
import com.azure.spring.integration.core.DefaultMessageHandler;
117
import com.azure.spring.integration.core.api.CheckpointConfig;
128
import com.azure.spring.integration.core.api.SendOperation;
139
import com.azure.spring.integration.servicebus.ServiceBusClientConfig;
10+
import com.azure.spring.servicebus.stream.binder.properties.ServiceBusConsumerProperties;
11+
import com.azure.spring.servicebus.stream.binder.properties.ServiceBusExtendedBindingProperties;
12+
import com.azure.spring.servicebus.stream.binder.properties.ServiceBusProducerProperties;
13+
import com.azure.spring.servicebus.stream.binder.provisioning.ServiceBusChannelProvisioner;
1414
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
15+
import org.springframework.cloud.stream.binder.BinderHeaders;
16+
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
1517
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
1618
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
1719
import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
18-
import org.springframework.cloud.stream.binder.BinderHeaders;
19-
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
2020
import org.springframework.cloud.stream.provisioning.ProducerDestination;
2121
import org.springframework.integration.expression.FunctionExpression;
2222
import org.springframework.integration.support.DefaultErrorMessageStrategy;
@@ -31,9 +31,9 @@
3131
*/
3232
public abstract class ServiceBusMessageChannelBinder<T extends ServiceBusExtendedBindingProperties> extends
3333
AbstractMessageChannelBinder<ExtendedConsumerProperties<ServiceBusConsumerProperties>,
34-
ExtendedProducerProperties<ServiceBusProducerProperties>,
34+
ExtendedProducerProperties<ServiceBusProducerProperties>,
3535
ServiceBusChannelProvisioner>
36-
implements
36+
implements
3737
ExtendedPropertiesBinder<MessageChannel, ServiceBusConsumerProperties, ServiceBusProducerProperties> {
3838

3939
protected T bindingProperties;
@@ -59,7 +59,7 @@ protected MessageHandler createProducerMessageHandler(
5959
handler.setSendFailureChannel(errorChannel);
6060
if (producerProperties.isPartitioned()) {
6161
handler.setPartitionKeyExpressionString(
62-
"'partitionKey-' + headers['" + BinderHeaders.PARTITION_HEADER + "']");
62+
"'partitionKey-' + headers['" + BinderHeaders.PARTITION_HEADER + "']");
6363
} else {
6464
handler.setPartitionKeyExpression(new FunctionExpression<Message<?>>(m -> m.getPayload().hashCode()));
6565
}
@@ -97,15 +97,15 @@ public void setBindingProperties(T bindingProperties) {
9797
}
9898

9999
protected CheckpointConfig buildCheckpointConfig(
100-
ExtendedConsumerProperties<ServiceBusConsumerProperties> properties) {
100+
ExtendedConsumerProperties<ServiceBusConsumerProperties> properties) {
101101

102102
return CheckpointConfig.builder()
103103
.checkpointMode(properties.getExtension().getCheckpointMode())
104104
.build();
105105
}
106106

107107
protected ServiceBusClientConfig buildClientConfig(
108-
ExtendedConsumerProperties<ServiceBusConsumerProperties> properties) {
108+
ExtendedConsumerProperties<ServiceBusConsumerProperties> properties) {
109109

110110
ServiceBusConsumerProperties consumerProperties = properties.getExtension();
111111
return ServiceBusClientConfig.builder()

sdk/spring/azure-spring-cloud-stream-binder-servicebus-queue/CHANGELOG.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@ This release is compatible with Spring Boot 2.5.0 - 2.5.3 and Spring Cloud 2020.
66
- Fixed service bus cleint factory destroyed, resource not released bug.([#23195](https://github.com/Azure/azure-sdk-for-java/pull/23195))
77
### Dependency Upgrades
88
- Upgrade to [spring-boot-dependencies:2.5.3](https://repo.maven.apache.org/maven2/org/springframework/boot/spring-boot-dependencies/2.5.3/spring-boot-dependencies-2.5.3.pom).
9-
9+
### Breaking Changes
10+
- Override paritionkey when session id is set. ([#23135](https://github.com/Azure/azure-sdk-for-java/pull/23135))
11+
- Adjust the order of different partition key header. ([#23135](https://github.com/Azure/azure-sdk-for-java/pull/23135))
1012

1113
## 2.7.0 (2021-07-20)
1214
### Key Bug Fixes

sdk/spring/azure-spring-cloud-stream-binder-servicebus-queue/README.md

Lines changed: 81 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,11 +126,90 @@ SessionID | com.azure.spring.integration.servicebus.converter.ServiceBusMessageH
126126
CorrelationId | com.azure.spring.integration.servicebus.converter.ServiceBusMessageHeaders.CORRELATION_ID | String | N/A
127127
To | com.azure.spring.integration.servicebus.converter.ServiceBusMessageHeaders.TO | String | N/A
128128
ReplyToSessionId | com.azure.spring.integration.servicebus.converter.ServiceBusMessageHeaders.REPLY_TO_SESSION_ID | String | N/A
129-
PartitionKey | com.azure.spring.integration.servicebus.converter.ServiceBusMessageHeaders.PARTITION_KEY | String | N/A
129+
**PartitionKey** | com.azure.spring.integration.servicebus.converter.ServiceBusMessageHeaders.PARTITION_KEY | String | 1
130+
**PartitionKey** | com.azure.spring.integration.core.AzureHeaders.PARTITION_KEY | String | 2
130131

131132
## Examples
133+
## Usage examples
134+
**Example: Manually set the partition key for the message**
135+
136+
This example demonstrates how to manually set the partition key for the message in the application.
137+
138+
**Way 1:**
139+
This example requires that `spring.cloud.stream.default.producer.partitionKeyExpression` be set `"'partitionKey-' + headers[<message-header-key>]"`.
140+
```yaml
141+
spring:
142+
cloud:
143+
azure:
144+
servicebus:
145+
connection-string: [servicebus-namespace-connection-string]
146+
stream:
147+
default:
148+
producer:
149+
partitionKeyExpression: "'partitionKey-' + headers[<message-header-key>]"
150+
```
151+
```java
152+
@PostMapping("/messages")
153+
public ResponseEntity<String> sendMessage(@RequestParam String message) {
154+
LOGGER.info("Going to add message {} to Sinks.Many.", message);
155+
many.emitNext(MessageBuilder.withPayload(message)
156+
.setHeader("<message-header-key>", "Customize partirion key")
157+
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
158+
return ResponseEntity.ok("Sent!");
159+
}
160+
```
161+
162+
> **NOTE:** When using `application.yml` to configure the partition key, its priority will be the lowest.
163+
> It will take effect only when the `ServiceBusMessageHeaders.SESSION_ID`, `ServiceBusMessageHeaders.PARTITION_KEY`, `AzureHeaders.PARTITION_KEY` are not configured.
164+
165+
**Way 2:**
166+
Manually add the partition Key in the message header by code.
167+
168+
*Recommended:* Use `ServiceBusMessageHeaders.PARTITION_KEY` as the key of the header.
169+
```java
170+
@PostMapping("/messages")
171+
public ResponseEntity<String> sendMessage(@RequestParam String message) {
172+
LOGGER.info("Going to add message {} to Sinks.Many.", message);
173+
many.emitNext(MessageBuilder.withPayload(message)
174+
.setHeader(ServiceBusMessageHeaders.PARTITION_KEY, "Customize partirion key")
175+
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
176+
return ResponseEntity.ok("Sent!");
177+
}
178+
```
179+
180+
*Not recommended but currently supported:* `AzureHeaders.PARTITION_KEY` as the key of the header.
181+
```java
182+
@PostMapping("/messages")
183+
public ResponseEntity<String> sendMessage(@RequestParam String message) {
184+
LOGGER.info("Going to add message {} to Sinks.Many.", message);
185+
many.emitNext(MessageBuilder.withPayload(message)
186+
.setHeader(AzureHeaders.PARTITION_KEY, "Customize partirion key")
187+
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
188+
return ResponseEntity.ok("Sent!");
189+
}
190+
```
191+
> **NOTE:** When both `ServiceBusMessageHeaders.PARTITION_KEY` and `AzureHeaders.PARTITION_KEY` are set in the message headers,
192+
> `ServiceBusMessageHeaders.PARTITION_KEY` is preferred.
193+
194+
**Example: Set the session id for the message**
195+
196+
This example demonstrates how to manually set the session id of a message in the application.
197+
198+
```java
199+
@PostMapping("/messages")
200+
public ResponseEntity<String> sendMessage(@RequestParam String message) {
201+
LOGGER.info("Going to add message {} to Sinks.Many.", message);
202+
many.emitNext(MessageBuilder.withPayload(message)
203+
.setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session id")
204+
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
205+
return ResponseEntity.ok("Sent!");
206+
}
207+
```
208+
209+
> **NOTE:** When the `ServiceBusMessageHeaders.SESSION_ID` is set in the message headers, and a different `ServiceBusMessageHeaders.PARTITION_KEY` (or `AzureHeaders.PARTITION_KEY`) header is also set,
210+
> the value of the session id will eventually be used to overwrite the value of the partition key.
132211
133-
Please use this `sample` as a reference for how to use this binder in your projects.
212+
Please use this `sample` as a reference to learn more about how to use this binder in your project.
134213
- [Service Bus Queue][spring_cloud_stream_binder_service_bus_queue]
135214

136215
## Troubleshooting

sdk/spring/azure-spring-cloud-stream-binder-servicebus-topic/CHANGELOG.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@ This release is compatible with Spring Boot 2.5.0 - 2.5.3 and Spring Cloud 2020.
66
- Fixed service bus cleint factory destroyed, resource not released bug.([#23195](https://github.com/Azure/azure-sdk-for-java/pull/23195))
77
### Dependency Upgrades
88
- Upgrade to [spring-boot-dependencies:2.5.3](https://repo.maven.apache.org/maven2/org/springframework/boot/spring-boot-dependencies/2.5.3/spring-boot-dependencies-2.5.3.pom).
9-
9+
### Breaking Changes
10+
- Override paritionkey when session id is set. ([#23135](https://github.com/Azure/azure-sdk-for-java/pull/23135))
11+
- Adjust the order of different partition key header. ([#23135](https://github.com/Azure/azure-sdk-for-java/pull/23135))
1012

1113
## 2.7.0 (2021-07-20)
1214
### Key Bug Fixes

0 commit comments

Comments
 (0)