Skip to content

Commit b029c99

Browse files
authored
Adding the maxDegreeOfParallelism property to the Spring Code (Azure#28756)
* Initial changes for adding the maxDegreeOfParallelism property to the spring code base. * Initial changes for adding the maxDegreeOfParallelism property to the spring code base. * Fixing param to default to 0 and fixing application.properties files. * Fixing more maxDegreeOfParallelism properties. * Fixing unit test that is broken. * Adding some integration tests. * Changing unit tests. * Fixing read me. * Removing unused import. * Fixing constructors. * Removing unneeded code. * Fixing unit tests to better test the functionality. * Fixing comments: * Changing setMaxDegreeOfParallelism to maxDegreeOfParallelism * Removing un-needed constructor * Fixing comments setMaxDegreeOfParallelism in spring-test directory. * Updating the CHANGELOG.md * Updating the CHANGELOG.md to link PR * Updating the CHANGELOG.md to link PR
1 parent 163979e commit b029c99

File tree

16 files changed

+184
-9
lines changed

16 files changed

+184
-9
lines changed

sdk/cosmos/azure-spring-data-cosmos-test/src/test/java/com/azure/spring/data/cosmos/core/CosmosTemplateIT.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@
5151
import org.springframework.data.repository.query.parser.Part;
5252
import org.springframework.test.context.ContextConfiguration;
5353
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
54+
import org.springframework.test.util.ReflectionTestUtils;
55+
import reactor.core.publisher.Flux;
5456

5557
import java.lang.reflect.InvocationTargetException;
5658
import java.lang.reflect.Method;
@@ -708,6 +710,38 @@ public void createDatabaseWithThroughput() throws ClassNotFoundException {
708710
assertEquals(expectedRequestUnits, response.getProperties().getManualThroughput());
709711
}
710712

713+
@Test
714+
public void queryWithMaxDegreeOfParallelism() throws ClassNotFoundException {
715+
final CosmosConfig config = CosmosConfig.builder()
716+
.maxDegreeOfParallelism(20)
717+
.build();
718+
final CosmosTemplate maxDegreeOfParallelismCosmosTemplate = createCosmosTemplate(config, TestConstants.DB_NAME);
719+
720+
final Criteria criteria = Criteria.getInstance(CriteriaType.IS_EQUAL, "firstName",
721+
Collections.singletonList(TEST_PERSON.getFirstName()), Part.IgnoreCaseType.NEVER);
722+
final CosmosQuery query = new CosmosQuery(criteria);
723+
724+
final long count = maxDegreeOfParallelismCosmosTemplate.count(query, containerName);
725+
726+
assertEquals((int) ReflectionTestUtils.getField(maxDegreeOfParallelismCosmosTemplate, "maxDegreeOfParallelism"), 20);
727+
}
728+
729+
@Test
730+
public void queryDatabaseWithQueryMerticsEnabled() throws ClassNotFoundException {
731+
final CosmosConfig config = CosmosConfig.builder()
732+
.enableQueryMetrics(true)
733+
.build();
734+
final CosmosTemplate queryMetricsEnabledCosmosTemplate = createCosmosTemplate(config, TestConstants.DB_NAME);
735+
736+
final Criteria criteria = Criteria.getInstance(CriteriaType.IS_EQUAL, "firstName",
737+
Collections.singletonList(TEST_PERSON.getFirstName()), Part.IgnoreCaseType.NEVER);
738+
final CosmosQuery query = new CosmosQuery(criteria);
739+
740+
final long count = queryMetricsEnabledCosmosTemplate.count(query, containerName);
741+
742+
assertEquals((boolean) ReflectionTestUtils.getField(queryMetricsEnabledCosmosTemplate, "queryMetricsEnabled"), true);
743+
}
744+
711745
@Test
712746
public void userAgentSpringDataCosmosSuffix() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
713747
// getUserAgentSuffix method from CosmosClientBuilder

sdk/cosmos/azure-spring-data-cosmos-test/src/test/java/com/azure/spring/data/cosmos/core/ReactiveCosmosTemplateIT.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.springframework.data.repository.query.parser.Part;
4949
import org.springframework.test.context.ContextConfiguration;
5050
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
51+
import org.springframework.test.util.ReflectionTestUtils;
5152
import reactor.core.publisher.Flux;
5253
import reactor.core.publisher.Mono;
5354
import reactor.test.StepVerifier;
@@ -574,6 +575,46 @@ public void createDatabaseWithThroughput() throws ClassNotFoundException {
574575
assertEquals(expectedRequestUnits, response.getProperties().getManualThroughput());
575576
}
576577

578+
@Test
579+
public void queryWithMaxDegreeOfParallelism() throws ClassNotFoundException {
580+
final CosmosConfig config = CosmosConfig.builder()
581+
.maxDegreeOfParallelism(20)
582+
.build();
583+
final ReactiveCosmosTemplate maxDegreeOfParallelismCosmosTemplate = createReactiveCosmosTemplate(config, TestConstants.DB_NAME);
584+
585+
final AuditableEntity entity = new AuditableEntity();
586+
entity.setId(UUID.randomUUID().toString());
587+
588+
auditableRepository.save(entity);
589+
590+
Criteria equals = Criteria.getInstance(CriteriaType.IS_EQUAL, "id", Collections.singletonList(entity.getId()), Part.IgnoreCaseType.NEVER);
591+
final SqlQuerySpec sqlQuerySpec = new FindQuerySpecGenerator().generateCosmos(new CosmosQuery(equals));
592+
final Flux<AuditableEntity> flux = maxDegreeOfParallelismCosmosTemplate.runQuery(sqlQuerySpec, AuditableEntity.class, AuditableEntity.class);
593+
594+
StepVerifier.create(flux).expectNextCount(1).verifyComplete();
595+
assertEquals((int) ReflectionTestUtils.getField(maxDegreeOfParallelismCosmosTemplate, "maxDegreeOfParallelism"), 20);
596+
}
597+
598+
@Test
599+
public void queryWithQueryMerticsEnabled() throws ClassNotFoundException {
600+
final CosmosConfig config = CosmosConfig.builder()
601+
.enableQueryMetrics(true)
602+
.build();
603+
final ReactiveCosmosTemplate queryMetricsEnabledCosmosTemplate = createReactiveCosmosTemplate(config, TestConstants.DB_NAME);
604+
605+
final AuditableEntity entity = new AuditableEntity();
606+
entity.setId(UUID.randomUUID().toString());
607+
608+
auditableRepository.save(entity);
609+
610+
Criteria equals = Criteria.getInstance(CriteriaType.IS_EQUAL, "id", Collections.singletonList(entity.getId()), Part.IgnoreCaseType.NEVER);
611+
final SqlQuerySpec sqlQuerySpec = new FindQuerySpecGenerator().generateCosmos(new CosmosQuery(equals));
612+
final Flux<AuditableEntity> flux = queryMetricsEnabledCosmosTemplate.runQuery(sqlQuerySpec, AuditableEntity.class, AuditableEntity.class);
613+
614+
StepVerifier.create(flux).expectNextCount(1).verifyComplete();
615+
assertEquals((boolean) ReflectionTestUtils.getField(queryMetricsEnabledCosmosTemplate, "queryMetricsEnabled"), true);
616+
}
617+
577618
@Test
578619
public void userAgentSpringDataCosmosSuffix() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
579620
// getUserAgentSuffix method from CosmosClientBuilder

sdk/cosmos/azure-spring-data-cosmos-test/src/test/java/com/azure/spring/data/cosmos/repository/SecondaryTestRepositoryConfig.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ public class SecondaryTestRepositoryConfig {
3434
@Value("${cosmos.secondary.queryMetricsEnabled}")
3535
private boolean queryMetricsEnabled;
3636

37+
@Value("${cosmos.secondary.maxDegreeOfParallelism}")
38+
private int maxDegreeOfParallelism;
39+
3740
@Bean
3841
public CosmosClientBuilder secondaryCosmosClientBuilder() {
3942
return new CosmosClientBuilder()
@@ -57,6 +60,7 @@ public ReactiveCosmosTemplate secondaryReactiveCosmosTemplate(@Qualifier("second
5760

5861
CosmosConfig config = CosmosConfig.builder()
5962
.enableQueryMetrics(queryMetricsEnabled)
63+
.maxDegreeOfParallelism(maxDegreeOfParallelism)
6064
.build();
6165

6266
return new ReactiveCosmosTemplate(new CosmosFactory(client, getFirstDatabase()), config, mappingCosmosConverter);
@@ -73,6 +77,7 @@ public ReactiveCosmosTemplate secondaryReactiveCosmosTemplate1(@Qualifier("secon
7377

7478
CosmosConfig config = CosmosConfig.builder()
7579
.enableQueryMetrics(queryMetricsEnabled)
80+
.maxDegreeOfParallelism(maxDegreeOfParallelism)
7681
.build();
7782

7883
return new ReactiveCosmosTemplate(new CosmosFactory(client, getSecondDatabase()), config, mappingCosmosConverter);

sdk/cosmos/azure-spring-data-cosmos-test/src/test/java/com/azure/spring/data/cosmos/repository/TestRepositoryConfig.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ public class TestRepositoryConfig extends AbstractCosmosConfiguration {
3838
@Value("${cosmos.queryMetricsEnabled}")
3939
private boolean queryMetricsEnabled;
4040

41+
@Value("${cosmos.maxDegreeOfParallelism}")
42+
private int maxDegreeOfParallelism;
43+
4144
@Bean
4245
public ResponseDiagnosticsTestUtils responseDiagnosticsTestUtils() {
4346
return new ResponseDiagnosticsTestUtils();
@@ -56,6 +59,7 @@ public CosmosClientBuilder cosmosClientBuilder() {
5659
public CosmosConfig cosmosConfig() {
5760
return CosmosConfig.builder()
5861
.enableQueryMetrics(queryMetricsEnabled)
62+
.maxDegreeOfParallelism(maxDegreeOfParallelism)
5963
.responseDiagnosticsProcessor(responseDiagnosticsTestUtils().getResponseDiagnosticsProcessor())
6064
.build();
6165
}

sdk/cosmos/azure-spring-data-cosmos-test/src/test/resources/application.properties

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ cosmos.secondaryKey=${SECONDARY_ACCOUNT_KEY}
55
dynamic.collection.name=spel-property-collection
66
# Populate query metrics
77
cosmos.queryMetricsEnabled=true
8+
# Max Degree of Parallelism allowed
9+
cosmos.maxDegreeOfParallelism=0
810

911
# Secondary DataSource Config
1012
cosmos.secondary.uri=${NEW_ACCOUNT_HOST}
@@ -13,6 +15,7 @@ cosmos.secondary.secondaryKey=${NEW_SECONDARY_ACCOUNT_KEY}
1315

1416
# Populate query metrics
1517
cosmos.secondary.queryMetricsEnabled=true
16-
18+
# Max Degree of Parallelism allowed
19+
cosmos.secondary.maxDegreeOfParallelism=0
1720

1821

sdk/cosmos/azure-spring-data-cosmos/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
### 3.22.0-beta.1 (Unreleased)
44

55
#### Features Added
6+
* Exposed `maxDegreeOfParallelism` feature from CosmosQueryRequestOptions through application.properties flag - See [PR 28756](https://github.com/Azure/azure-sdk-for-java/pull/28756)
67

78
#### Breaking Changes
89

sdk/cosmos/azure-spring-data-cosmos/README.md

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,9 +86,10 @@ SLF4J is only needed if you plan to use logging, please also download an SLF4J b
8686
### Setup Configuration Class
8787
- In order to set up configuration class, you'll need to extend `AbstractCosmosConfiguration`
8888

89-
- Azure-spring-data-cosmos also supports `Response Diagnostics String` and `Query Metrics`.
89+
- Azure-spring-data-cosmos also supports `Response Diagnostics String`, `Query Metrics` and `Max Degree of Parallelism`.
9090
Set `queryMetricsEnabled` flag to true in application.properties to enable query metrics.
9191
In addition to setting the flag, implement `ResponseDiagnosticsProcessor` to log diagnostics information.
92+
Set `maxDegreeOfParallelism` flag to an integer in application.properties to allow parallel processing; setting the value to -1 will lead to the SDK deciding the optimal value.
9293

9394
```java readme-sample-AppConfiguration
9495
@Configuration
@@ -111,6 +112,9 @@ public class AppConfiguration extends AbstractCosmosConfiguration {
111112

112113
@Value("${azure.cosmos.queryMetricsEnabled}")
113114
private boolean queryMetricsEnabled;
115+
116+
@Value("${azure.cosmos.maxDegreeOfParallelism}")
117+
private int maxDegreeOfParallelism;
114118

115119
private AzureKeyCredential azureKeyCredential;
116120

@@ -129,6 +133,7 @@ public class AppConfiguration extends AbstractCosmosConfiguration {
129133
public CosmosConfig cosmosConfig() {
130134
return CosmosConfig.builder()
131135
.enableQueryMetrics(queryMetricsEnabled)
136+
.maxDegreeOfParallelism(maxDegreeOfParallelism)
132137
.responseDiagnosticsProcessor(new ResponseDiagnosticsProcessorImplementation())
133138
.build();
134139
}
@@ -170,6 +175,7 @@ public CosmosClientBuilder getCosmosClientBuilder() {
170175
public CosmosConfig cosmosConfig() {
171176
return CosmosConfig.builder()
172177
.enableQueryMetrics(queryMetricsEnabled)
178+
.maxDegreeOfParallelism(maxDegreeOfParallelism)
173179
.responseDiagnosticsProcessor(new ResponseDiagnosticsProcessorImplementation())
174180
.build();
175181
}
@@ -646,6 +652,7 @@ public class SecondaryDatasourceConfiguration {
646652
public CosmosConfig getCosmosConfig() {
647653
return CosmosConfig.builder()
648654
.enableQueryMetrics(true)
655+
.maxDegreeOfParallelism(0)
649656
.responseDiagnosticsProcessor(new ResponseDiagnosticsProcessorImplementation())
650657
.build();
651658
}
@@ -681,18 +688,20 @@ public CosmosAsyncClient getCosmosAsyncClient(@Qualifier("secondary") CosmosProp
681688
public CosmosConfig getCosmosConfig() {
682689
return CosmosConfig.builder()
683690
.enableQueryMetrics(true)
691+
.maxDegreeOfParallelism(0)
684692
.responseDiagnosticsProcessor(new ResponseDiagnosticsProcessorImplementation())
685693
.build();
686694
}
687695
```
688696

689-
- Besides, if you want to define `queryMetricsEnabled` or `ResponseDiagnosticsProcessor` , you can create the `CosmosConfig` for your cosmos template.
697+
- Besides, if you want to define `queryMetricsEnabled`, `ResponseDiagnosticsProcessor` or `maxDegreeOfParallelism` , you can create the `CosmosConfig` for your cosmos template.
690698

691699
```java
692700
@Bean("secondaryCosmosConfig")
693701
public CosmosConfig getCosmosConfig() {
694702
return CosmosConfig.builder()
695703
.enableQueryMetrics(true)
704+
.maxDegreeOfParallelism(0)
696705
.responseDiagnosticsProcessor(new ResponseDiagnosticsProcessorImplementation())
697706
.build();
698707
}

sdk/cosmos/azure-spring-data-cosmos/src/main/java/com/azure/spring/data/cosmos/config/CosmosConfig.java

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ public class CosmosConfig {
1717

1818
private final boolean queryMetricsEnabled;
1919

20+
private final int maxDegreeOfParallelism;
21+
2022
/**
2123
* Initialization
2224
*
@@ -33,7 +35,7 @@ public CosmosConfig(ResponseDiagnosticsProcessor responseDiagnosticsProcessor,
3335
* Initialization
3436
*
3537
* @param responseDiagnosticsProcessor must not be {@literal null}
36-
* @param databaseThroughputConfig may be @{literal null}
38+
* @param databaseThroughputConfig may be {@literal null}
3739
* @param queryMetricsEnabled must not be {@literal null}
3840
*/
3941
@ConstructorProperties({"responseDiagnosticsProcessor", "databaseThroughputConfig", "queryMetricsEnabled"})
@@ -43,6 +45,26 @@ public CosmosConfig(ResponseDiagnosticsProcessor responseDiagnosticsProcessor,
4345
this.responseDiagnosticsProcessor = responseDiagnosticsProcessor;
4446
this.databaseThroughputConfig = databaseThroughputConfig;
4547
this.queryMetricsEnabled = queryMetricsEnabled;
48+
this.maxDegreeOfParallelism = 0;
49+
}
50+
51+
/**
52+
* Initialization
53+
*
54+
* @param responseDiagnosticsProcessor must not be {@literal null}
55+
* @param databaseThroughputConfig may be {@literal null}
56+
* @param queryMetricsEnabled must not be {@literal null}
57+
* @param maxDegreeOfParallelism must not be {@literal null}
58+
*/
59+
@ConstructorProperties({"responseDiagnosticsProcessor", "databaseThroughputConfig", "queryMetricsEnabled", "maxDegreeOfParallelism"})
60+
public CosmosConfig(ResponseDiagnosticsProcessor responseDiagnosticsProcessor,
61+
DatabaseThroughputConfig databaseThroughputConfig,
62+
boolean queryMetricsEnabled,
63+
int maxDegreeOfParallelism) {
64+
this.responseDiagnosticsProcessor = responseDiagnosticsProcessor;
65+
this.databaseThroughputConfig = databaseThroughputConfig;
66+
this.queryMetricsEnabled = queryMetricsEnabled;
67+
this.maxDegreeOfParallelism = maxDegreeOfParallelism;
4668
}
4769

4870
/**
@@ -63,6 +85,15 @@ public boolean isQueryMetricsEnabled() {
6385
return queryMetricsEnabled;
6486
}
6587

88+
/**
89+
* Gets the value of maxDegreeOfParallelism
90+
*
91+
* @return int, value of maxDegreeOfParallelism
92+
*/
93+
public int getMaxDegreeOfParallelism() {
94+
return maxDegreeOfParallelism;
95+
}
96+
6697
/**
6798
* Gets the database throughput configuration.
6899
*
@@ -88,6 +119,7 @@ public static class CosmosConfigBuilder {
88119
private ResponseDiagnosticsProcessor responseDiagnosticsProcessor;
89120
private DatabaseThroughputConfig databaseThroughputConfig;
90121
private boolean queryMetricsEnabled;
122+
private int maxDegreeOfParallelism;
91123
CosmosConfigBuilder() {
92124
}
93125

@@ -115,6 +147,17 @@ public CosmosConfigBuilder enableQueryMetrics(boolean queryMetricsEnabled) {
115147
return this;
116148
}
117149

150+
/**
151+
* Set maxDegreeOfParallelism
152+
*
153+
* @param maxDegreeOfParallelism value to initialize
154+
* @return CosmosConfigBuilder
155+
*/
156+
public CosmosConfigBuilder maxDegreeOfParallelism(int maxDegreeOfParallelism) {
157+
this.maxDegreeOfParallelism = maxDegreeOfParallelism;
158+
return this;
159+
}
160+
118161
/**
119162
* Enable database throughput
120163
*
@@ -133,7 +176,7 @@ public CosmosConfigBuilder enableDatabaseThroughput(boolean autoscale, int reque
133176
* @return CosmosConfig
134177
*/
135178
public CosmosConfig build() {
136-
return new CosmosConfig(this.responseDiagnosticsProcessor, this.databaseThroughputConfig, this.queryMetricsEnabled);
179+
return new CosmosConfig(this.responseDiagnosticsProcessor, this.databaseThroughputConfig, this.queryMetricsEnabled, this.maxDegreeOfParallelism);
137180
}
138181

139182
@Override
@@ -142,6 +185,7 @@ public String toString() {
142185
+ "responseDiagnosticsProcessor=" + responseDiagnosticsProcessor
143186
+ ", databaseThroughputConfig=" + databaseThroughputConfig
144187
+ ", queryMetricsEnabled=" + queryMetricsEnabled
188+
+ ", maxDegreeOfParallelism=" + maxDegreeOfParallelism
145189
+ '}';
146190
}
147191
}

0 commit comments

Comments
 (0)