Skip to content

Commit f711924

Browse files
authored
fix scs sources cannot be appended to Kafka binder context (Azure#31715)
1 parent 91aa36c commit f711924

File tree

4 files changed

+189
-25
lines changed

4 files changed

+189
-25
lines changed

sdk/spring/CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@ This section includes changes in `spring-cloud-azure-autoconfigure` module.
1212
- Fix bug: RestOperations is not well configured when jwkResolver is null. [#31218](https://github.com/Azure/azure-sdk-for-java/issues/31218).
1313
- Fix bug: Duplicated "scope" parameter. [#31191](https://github.com/Azure/azure-sdk-for-java/issues/31191).
1414
- Fix bug: NimbusJwtDecoder still uses `RestTemplate()` instead `RestTemplateBuilder` [#31233](https://github.com/Azure/azure-sdk-for-java/issues/31233)
15-
- Fix bug: Proxy setting not work in Azure AD B2C web application. [31593](https://github.com/Azure/azure-sdk-for-java/issues/31593)
15+
- Fix bug: Proxy setting not work in Azure AD B2C web application [31593](https://github.com/Azure/azure-sdk-for-java/issues/31593)
16+
- Fix bug: `spring.main.sources` configuration from Spring Cloud Stream Kafka binder cannot take effect. [#31715](https://github.com/Azure/azure-sdk-for-java/pull/31715)
1617
- Fix Bug: NoClassDefFoundError for JSONArray. [31716](https://github.com/Azure/azure-sdk-for-java/issues/31716)
1718

1819
## 4.4.0 (2022-09-26)

sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/kafka/BindingServicePropertiesBeanPostProcessor.java

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,10 @@
66
import org.springframework.beans.factory.config.BeanPostProcessor;
77
import org.springframework.cloud.stream.config.BinderProperties;
88
import org.springframework.cloud.stream.config.BindingServiceProperties;
9+
import org.springframework.util.StringUtils;
910

1011
import java.util.HashMap;
12+
import java.util.LinkedHashMap;
1113
import java.util.Map;
1214

1315
/**
@@ -28,7 +30,7 @@ public Object postProcessBeforeInitialization(Object bean, String beanName) thro
2830
BindingServiceProperties bindingServiceProperties = (BindingServiceProperties) bean;
2931
if (bindingServiceProperties.getBinders().isEmpty()) {
3032
BinderProperties kafkaBinderSourceProperty = new BinderProperties();
31-
configureBinderSources(kafkaBinderSourceProperty, AzureKafkaSpringCloudStreamConfiguration.AZURE_KAFKA_SPRING_CLOUD_STREAM_CONFIGURATION_CLASS);
33+
configureBinderSources(readSpringMainPropertiesMap(kafkaBinderSourceProperty.getEnvironment()));
3234

3335
Map<String, BinderProperties> kafkaBinderPropertyMap = new HashMap<>();
3436
kafkaBinderPropertyMap.put(KAKFA_BINDER_DEFAULT_NAME, kafkaBinderSourceProperty);
@@ -39,23 +41,39 @@ public Object postProcessBeforeInitialization(Object bean, String beanName) thro
3941
if (entry.getKey() != null && entry.getValue() != null
4042
&& (KAKFA_BINDER_TYPE.equalsIgnoreCase(entry.getValue().getType())
4143
|| KAKFA_BINDER_DEFAULT_NAME.equalsIgnoreCase(entry.getKey()))) {
42-
configureBinderSources(entry.getValue(), buildKafkaBinderSources(entry.getValue()));
44+
configureBinderSources(readSpringMainPropertiesMap(entry.getValue().getEnvironment()));
4345
}
4446
}
4547
}
4648
}
4749
return bean;
4850
}
4951

50-
private String buildKafkaBinderSources(BinderProperties binderProperties) {
52+
void configureBinderSources(Map<String, Object> originalSources) {
5153
StringBuilder sources = new StringBuilder(AzureKafkaSpringCloudStreamConfiguration.AZURE_KAFKA_SPRING_CLOUD_STREAM_CONFIGURATION_CLASS);
52-
if (binderProperties.getEnvironment().get(SPRING_MAIN_SOURCES_PROPERTY) != null) {
53-
sources.append("," + binderProperties.getEnvironment().get(SPRING_MAIN_SOURCES_PROPERTY));
54+
if (StringUtils.hasText((String) originalSources.get("sources"))) {
55+
sources.append("," + originalSources.get("sources"));
5456
}
55-
return sources.toString();
57+
originalSources.put("sources", sources.toString());
5658
}
5759

58-
private void configureBinderSources(BinderProperties binderProperties, String sources) {
59-
binderProperties.getEnvironment().put(SPRING_MAIN_SOURCES_PROPERTY, sources);
60+
@SuppressWarnings("unchecked")
61+
Map<String, Object> readSpringMainPropertiesMap(Map<String, Object> map) {
62+
if (map.containsKey("spring")) {
63+
Map<String, Object> spring = (Map<String, Object>) map.get("spring");
64+
if (spring.containsKey("main")) {
65+
return (Map<String, Object>) spring.get("main");
66+
} else {
67+
LinkedHashMap<String, Object> main = new LinkedHashMap<>();
68+
spring.put("main", main);
69+
return main;
70+
}
71+
} else {
72+
Map<String, Object> main = new LinkedHashMap<>();
73+
Map<String, Object> spring = new LinkedHashMap<>();
74+
spring.put("main", main);
75+
map.put("spring", spring);
76+
return main;
77+
}
6078
}
6179
}

sdk/spring/spring-cloud-azure-autoconfigure/src/test/java/com/azure/spring/cloud/autoconfigure/kafka/AzureEventHubsKafkaBinderOAuth2AutoConfigurationTest.java

Lines changed: 40 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,14 @@
22
// Licensed under the MIT License.
33
package com.azure.spring.cloud.autoconfigure.kafka;
44

5+
import java.util.Map;
6+
57
import org.junit.jupiter.api.Test;
68
import org.springframework.boot.autoconfigure.AutoConfigurations;
79
import org.springframework.boot.test.context.FilteredClassLoader;
810
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
911
import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfiguration;
1012
import org.springframework.cloud.stream.config.BinderFactoryAutoConfiguration;
11-
import org.springframework.cloud.stream.config.BinderProperties;
1213
import org.springframework.cloud.stream.config.BindingServiceProperties;
1314
import org.springframework.context.support.ConversionServiceFactoryBean;
1415
import org.springframework.integration.support.utils.IntegrationUtils;
@@ -28,6 +29,7 @@ class AzureEventHubsKafkaBinderOAuth2AutoConfigurationTest {
2829
// Required by the init method of BindingServiceProperties
2930
.withBean(IntegrationUtils.INTEGRATION_CONVERSION_SERVICE_BEAN_NAME, ConversionServiceFactoryBean.class,
3031
ConversionServiceFactoryBean::new);
32+
private final BindingServicePropertiesBeanPostProcessor bpp = new BindingServicePropertiesBeanPostProcessor();
3133

3234
@Test
3335
void shouldNotConfigureWithoutKafkaBinderConfigurationClass() {
@@ -69,6 +71,39 @@ void shouldConfigureWhenBinderNameSpecified() {
6971
assertThat(context).hasSingleBean(BindingServiceProperties.class);
7072

7173
testBinderSources(context.getBean(BindingServiceProperties.class), "kafka", AZURE_KAFKA_SPRING_CLOUD_STREAM_CONFIGURATION_CLASS);
74+
assertEquals("value", context.getBean(BindingServiceProperties.class).getBinders().get("kafka").getEnvironment().get("key"));
75+
});
76+
}
77+
78+
@Test
79+
@SuppressWarnings("unchecked")
80+
void shouldConfigureWhenOtherSpringEnvironmentSpecified() {
81+
this.contextRunner
82+
.withPropertyValues("spring.cloud.stream.binders.kafka.environment.spring.profiles.active=value")
83+
.run(context -> {
84+
assertThat(context).hasSingleBean(AzureEventHubsKafkaBinderOAuth2AutoConfiguration.class);
85+
assertThat(context).hasSingleBean(BindingServicePropertiesBeanPostProcessor.class);
86+
assertThat(context).hasSingleBean(BindingServiceProperties.class);
87+
88+
testBinderSources(context.getBean(BindingServiceProperties.class), "kafka", AZURE_KAFKA_SPRING_CLOUD_STREAM_CONFIGURATION_CLASS);
89+
assertEquals("value", ((Map<String, Map<String, Object>>) context.getBean(BindingServiceProperties.class).getBinders().get("kafka").getEnvironment().get("spring"))
90+
.get("profiles").get("active"));
91+
});
92+
}
93+
94+
@Test
95+
@SuppressWarnings("unchecked")
96+
void shouldConfigureWhenOtherSpringMainEnvironmentSpecified() {
97+
this.contextRunner
98+
.withPropertyValues("spring.cloud.stream.binders.kafka.environment.spring.main.banner-mode=console")
99+
.run(context -> {
100+
assertThat(context).hasSingleBean(AzureEventHubsKafkaBinderOAuth2AutoConfiguration.class);
101+
assertThat(context).hasSingleBean(BindingServicePropertiesBeanPostProcessor.class);
102+
assertThat(context).hasSingleBean(BindingServiceProperties.class);
103+
104+
testBinderSources(context.getBean(BindingServiceProperties.class), "kafka", AZURE_KAFKA_SPRING_CLOUD_STREAM_CONFIGURATION_CLASS);
105+
assertEquals("console", ((Map<String, Map<String, Object>>) context.getBean(BindingServiceProperties.class).getBinders().get("kafka").getEnvironment().get("spring"))
106+
.get("main").get("banner-mode"));
72107
});
73108
}
74109

@@ -107,32 +142,21 @@ void shouldConfigureWithMultipleBinders() {
107142

108143
@Test
109144
void shouldAppendOriginalSources() {
110-
111-
new ApplicationContextRunner()
112-
.withConfiguration(AutoConfigurations.of(AzureEventHubsKafkaBinderOAuth2AutoConfiguration.class))
113-
.withBean(IntegrationUtils.INTEGRATION_CONVERSION_SERVICE_BEAN_NAME, ConversionServiceFactoryBean.class,
114-
ConversionServiceFactoryBean::new)
115-
.withBean(BindingServiceProperties.class, () -> {
116-
BindingServiceProperties bindingServiceProperties = new BindingServiceProperties();
117-
BinderProperties kafkaBinderSourceProperty = new BinderProperties();
118-
kafkaBinderSourceProperty.getEnvironment().put(SPRING_MAIN_SOURCES_PROPERTY, "test");
119-
bindingServiceProperties.getBinders().put("kafka", kafkaBinderSourceProperty);
120-
return bindingServiceProperties;
121-
})
145+
this.contextRunner
146+
.withPropertyValues("spring.cloud.stream.binders.kafka.environment.spring.main.sources=value")
122147
.run(context -> {
123148
assertThat(context).hasSingleBean(AzureEventHubsKafkaBinderOAuth2AutoConfiguration.class);
124149
assertThat(context).hasSingleBean(BindingServicePropertiesBeanPostProcessor.class);
125150
assertThat(context).hasSingleBean(BindingServiceProperties.class);
126151

127-
testBinderSources(context.getBean(BindingServiceProperties.class), "kafka", AZURE_KAFKA_SPRING_CLOUD_STREAM_CONFIGURATION_CLASS + ",test");
152+
testBinderSources(context.getBean(BindingServiceProperties.class), "kafka", AZURE_KAFKA_SPRING_CLOUD_STREAM_CONFIGURATION_CLASS + ",value");
128153
});
129154
}
130155

131156
private void testBinderSources(BindingServiceProperties bindingServiceProperties, String binderName, String binderSources) {
132157
assertFalse(bindingServiceProperties.getBinders().isEmpty());
133158
assertNotNull(bindingServiceProperties.getBinders().get(binderName));
134-
assertEquals(binderSources,
135-
bindingServiceProperties.getBinders().get(binderName).getEnvironment().get(SPRING_MAIN_SOURCES_PROPERTY));
159+
assertEquals(binderSources, bpp.readSpringMainPropertiesMap(bindingServiceProperties.getBinders().get(binderName).getEnvironment()).get("sources"));
136160
}
137161

138162

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
package com.azure.spring.cloud.autoconfigure.kafka;
4+
5+
import java.util.HashMap;
6+
import java.util.LinkedHashMap;
7+
import java.util.Map;
8+
9+
import org.junit.jupiter.api.Test;
10+
11+
import org.springframework.cloud.stream.config.BinderProperties;
12+
import org.springframework.cloud.stream.config.BindingServiceProperties;
13+
import org.springframework.util.StringUtils;
14+
15+
import static com.azure.spring.cloud.autoconfigure.kafka.AzureKafkaSpringCloudStreamConfiguration.AZURE_KAFKA_SPRING_CLOUD_STREAM_CONFIGURATION_CLASS;
16+
import static org.junit.jupiter.api.Assertions.assertEquals;
17+
import static org.junit.jupiter.api.Assertions.assertSame;
18+
19+
@SuppressWarnings("unchecked")
20+
class BindingServicePropertiesBeanPostProcessorTest {
21+
22+
private final BindingServicePropertiesBeanPostProcessor bpp = new BindingServicePropertiesBeanPostProcessor();
23+
24+
@Test
25+
void testReadSpringMainPropertiesMapWithoutOriginalValues() {
26+
Map<String, Object> env = new LinkedHashMap<>();
27+
Map<String, Object> mainPropertiesMap = buildSpringMainPropertiesMap(env, null, null, null);
28+
assertSame(mainPropertiesMap, ((Map<String, Object>) env.get("spring")).get("main"));
29+
}
30+
31+
@Test
32+
void testReadSpringMainPropertiesMapWithSpringProp() {
33+
Map<String, Object> env = new LinkedHashMap<>();
34+
Map<String, Object> mainPropertiesMap = buildSpringMainPropertiesMap(env, "profiles", "active", "dev");
35+
36+
assertEquals("dev", ((Map<String, Map<String, Object>>) env.get("spring")).get("profiles").get("active"));
37+
assertSame(mainPropertiesMap, ((Map<String, Object>) env.get("spring")).get("main"));
38+
}
39+
40+
@Test
41+
void testReadSpringMainPropertiesMapWithMainProp() {
42+
Map<String, Object> env = new LinkedHashMap<>();
43+
Map<String, Object> mainPropertiesMap = buildSpringMainPropertiesMap(env, "main", "banner-mode", "test");
44+
45+
assertEquals("test", ((Map<String, Map<String, Object>>) env.get("spring")).get("main").get("banner-mode"));
46+
assertSame(mainPropertiesMap, ((Map<String, Object>) env.get("spring")).get("main"));
47+
}
48+
49+
@Test
50+
void testReadSpringMainPropertiesMapWithSourcesProp() {
51+
Map<String, Object> env = new LinkedHashMap<>();
52+
Map<String, Object> mainPropertiesMap = buildSpringMainPropertiesMap(env, "main", "sources", "test");
53+
54+
assertEquals("test", ((Map<String, Map<String, Object>>) env.get("spring")).get("main").get("sources"));
55+
assertSame(mainPropertiesMap, ((Map<String, Object>) env.get("spring")).get("main"));
56+
}
57+
58+
@Test
59+
void testConfigureBinderSources() {
60+
Map<String, Object> env = new LinkedHashMap<>();
61+
Map<String, Object> mainPropertiesMap = buildSpringMainPropertiesMap(env, "main", "sources", "test");
62+
bpp.configureBinderSources(mainPropertiesMap);
63+
assertEquals(AZURE_KAFKA_SPRING_CLOUD_STREAM_CONFIGURATION_CLASS + ",test", ((Map<String, Map<String, Object>>) env.get("spring")).get("main").get("sources"));
64+
65+
env.clear();
66+
mainPropertiesMap = buildSpringMainPropertiesMap(env, "main", "profiles", "active");
67+
bpp.configureBinderSources(mainPropertiesMap);
68+
assertEquals(AZURE_KAFKA_SPRING_CLOUD_STREAM_CONFIGURATION_CLASS, ((Map<String, Map<String, Object>>) env.get("spring")).get("main").get("sources"));
69+
}
70+
71+
@Test
72+
void testBindKafkaByDefault() {
73+
BindingServiceProperties bindingServiceProperties = new BindingServiceProperties();
74+
bpp.postProcessBeforeInitialization(bindingServiceProperties, null);
75+
Map<String, Object> env = bindingServiceProperties.getBinders().get("kafka")
76+
.getEnvironment();
77+
assertEquals(AZURE_KAFKA_SPRING_CLOUD_STREAM_CONFIGURATION_CLASS, ((Map<String, Map<String, Object>>) env.get("spring")).get("main").get("sources"));
78+
79+
}
80+
81+
@Test
82+
void testBindKafkaByName() {
83+
BinderProperties binderProperties = new BinderProperties();
84+
Map<String, BinderProperties> binders = new HashMap<>();
85+
binders.put("kafka", binderProperties);
86+
BindingServiceProperties bindingServiceProperties = new BindingServiceProperties();
87+
bindingServiceProperties.setBinders(binders);
88+
89+
bpp.postProcessBeforeInitialization(bindingServiceProperties, null);
90+
Map<String, Object> env = bindingServiceProperties.getBinders().get("kafka")
91+
.getEnvironment();
92+
assertEquals(AZURE_KAFKA_SPRING_CLOUD_STREAM_CONFIGURATION_CLASS, ((Map<String, Map<String, Object>>) env.get("spring")).get("main").get("sources"));
93+
}
94+
95+
@Test
96+
void testBindKafkaByType() {
97+
BinderProperties binderProperties = new BinderProperties();
98+
Map<String, BinderProperties> binders = new HashMap<>();
99+
binders.put("test", binderProperties);
100+
binderProperties.setType("kafka");
101+
BindingServiceProperties bindingServiceProperties = new BindingServiceProperties();
102+
bindingServiceProperties.setBinders(binders);
103+
104+
bpp.postProcessBeforeInitialization(bindingServiceProperties, null);
105+
Map<String, Object> env = bindingServiceProperties.getBinders().get("test")
106+
.getEnvironment();
107+
assertEquals(AZURE_KAFKA_SPRING_CLOUD_STREAM_CONFIGURATION_CLASS, ((Map<String, Map<String, Object>>) env.get("spring")).get("main").get("sources"));
108+
}
109+
110+
private Map<String, Object> buildSpringMainPropertiesMap(Map<String, Object> env, String secondProperty, String thirdProperty, String value) {
111+
if (StringUtils.hasText(secondProperty)) {
112+
Map<String, Object> second = new LinkedHashMap<>();
113+
second.put(thirdProperty, value);
114+
Map<String, Object> first = new LinkedHashMap<>();
115+
first.put(secondProperty, second);
116+
env.put("spring", first);
117+
}
118+
return bpp.readSpringMainPropertiesMap(env);
119+
}
120+
121+
}

0 commit comments

Comments
 (0)