Skip to content

Commit b21e8a8

Browse files
xinlian12annie-mac
andauthored
alwaysUsePagedFluxMaxItemCount (Azure#36847)
* alwaysUsePagedFluxMaxItemCount --------- Co-authored-by: annie-mac <xinlian@microsoft.com>
1 parent 409a5ac commit b21e8a8

File tree

13 files changed

+404
-166
lines changed

13 files changed

+404
-166
lines changed

sdk/cosmos/azure-cosmos-encryption/src/main/java/com/azure/cosmos/encryption/CosmosEncryptionAsyncContainer.java

Lines changed: 182 additions & 98 deletions
Large diffs are not rendered by default.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
package com.azure.cosmos.encryption.implementation;
4+
5+
import com.azure.cosmos.BridgeInternal;
6+
import com.azure.cosmos.implementation.CosmosPagedFluxOptions;
7+
import com.azure.cosmos.implementation.ItemDeserializer;
8+
import com.azure.cosmos.implementation.query.Transformer;
9+
import com.azure.cosmos.models.FeedResponse;
10+
import com.azure.cosmos.models.ModelBridgeInternal;
11+
import com.fasterxml.jackson.databind.JsonNode;
12+
import reactor.core.publisher.Flux;
13+
import reactor.core.publisher.Mono;
14+
import reactor.core.scheduler.Scheduler;
15+
16+
import java.util.List;
17+
import java.util.function.Function;
18+
import java.util.stream.Collectors;
19+
20+
public class CosmosEncryptionQueryTransformer<T> implements Transformer<T> {
21+
private final Scheduler encryptionScheduler;
22+
private final EncryptionProcessor encryptionProcessor;
23+
private final ItemDeserializer itemDeserializer;
24+
private final Class<T> classType;
25+
private final boolean isChangeFeed;
26+
27+
public CosmosEncryptionQueryTransformer(
28+
Scheduler encryptionScheduler,
29+
EncryptionProcessor encryptionProcessor,
30+
ItemDeserializer itemDeserializer,
31+
Class<T> classType,
32+
Boolean isChangeFeed) {
33+
this.encryptionScheduler = encryptionScheduler;
34+
this.encryptionProcessor = encryptionProcessor;
35+
this.itemDeserializer = itemDeserializer;
36+
this.classType = classType;
37+
this.isChangeFeed = isChangeFeed;
38+
}
39+
40+
@Override
41+
public Function<CosmosPagedFluxOptions, Flux<FeedResponse<T>>> transform(Function<CosmosPagedFluxOptions, Flux<FeedResponse<JsonNode>>> func) {
42+
return queryDecryptionTransformer(this.classType, this.isChangeFeed, func);
43+
}
44+
45+
private <T> Function<CosmosPagedFluxOptions, Flux<FeedResponse<T>>> queryDecryptionTransformer(
46+
Class<T> classType,
47+
boolean isChangeFeed,
48+
Function<CosmosPagedFluxOptions, Flux<FeedResponse<JsonNode>>> func) {
49+
return func.andThen(flux ->
50+
flux.publishOn(encryptionScheduler)
51+
.flatMap(
52+
page -> {
53+
boolean useEtagAsContinuation = isChangeFeed;
54+
boolean isNoChangesResponse = isChangeFeed ?
55+
ModelBridgeInternal.getNoChangesFromFeedResponse(page)
56+
: false;
57+
List<Mono<JsonNode>> jsonNodeArrayMonoList =
58+
page.getResults().stream().map(jsonNode -> decryptResponseNode(jsonNode)).collect(Collectors.toList());
59+
return Flux.concat(jsonNodeArrayMonoList).map(
60+
item -> this.itemDeserializer.convert(classType, item)
61+
).collectList().map(itemList -> BridgeInternal.createFeedResponseWithQueryMetrics(itemList,
62+
page.getResponseHeaders(),
63+
BridgeInternal.queryMetricsFromFeedResponse(page),
64+
ModelBridgeInternal.getQueryPlanDiagnosticsContext(page),
65+
useEtagAsContinuation,
66+
isNoChangesResponse,
67+
page.getCosmosDiagnostics())
68+
);
69+
}
70+
)
71+
);
72+
}
73+
74+
Mono<JsonNode> decryptResponseNode(
75+
JsonNode jsonNode) {
76+
77+
if (jsonNode == null) {
78+
return Mono.empty();
79+
}
80+
81+
return this.encryptionProcessor.decryptJsonNode(
82+
jsonNode);
83+
}
84+
}
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.cosmos.rx;
5+
6+
import com.azure.cosmos.CosmosAsyncClient;
7+
import com.azure.cosmos.CosmosAsyncContainer;
8+
import com.azure.cosmos.CosmosClientBuilder;
9+
import com.azure.cosmos.TestObject;
10+
import com.azure.cosmos.implementation.FeedResponseListValidator;
11+
import com.azure.cosmos.models.CosmosQueryRequestOptions;
12+
import com.azure.cosmos.models.PartitionKey;
13+
import com.azure.cosmos.util.CosmosPagedFlux;
14+
import org.testng.annotations.AfterClass;
15+
import org.testng.annotations.BeforeClass;
16+
import org.testng.annotations.Factory;
17+
import org.testng.annotations.Test;
18+
19+
import java.util.UUID;
20+
21+
public class CosmosReadAllItemsTests extends TestSuiteBase {
22+
private final static int TIMEOUT = 30000;
23+
private CosmosAsyncClient client;
24+
private CosmosAsyncContainer container;
25+
26+
@Factory(dataProvider = "clientBuildersWithSessionConsistency")
27+
public CosmosReadAllItemsTests(CosmosClientBuilder clientBuilder) {
28+
super(clientBuilder);
29+
}
30+
31+
@Test(groups = { "query" }, timeOut = 2 * TIMEOUT)
32+
public void readMany_UsePageSizeInPagedFluxOption() {
33+
34+
// first creating few items
35+
String pkValue = UUID.randomUUID().toString();
36+
int itemCount = 10;
37+
for (int i = 0; i < itemCount; i++) {
38+
TestObject testObject = TestObject.create(pkValue);
39+
this.container.createItem(testObject).block();
40+
}
41+
42+
CosmosQueryRequestOptions cosmosQueryRequestOptions = new CosmosQueryRequestOptions();
43+
44+
FeedResponseListValidator<TestObject> validator1 =
45+
new FeedResponseListValidator
46+
.Builder<TestObject>()
47+
.totalSize(itemCount)
48+
.numberOfPages(2)
49+
.build();
50+
CosmosPagedFlux<TestObject> queryObservable1 =
51+
this.container.readAllItems(new PartitionKey(pkValue), cosmosQueryRequestOptions, TestObject.class);
52+
53+
validateQuerySuccess(queryObservable1.byPage(5), validator1, TIMEOUT);
54+
55+
CosmosQueryRequestOptions options = new CosmosQueryRequestOptions();
56+
options.setMaxDegreeOfParallelism(2);
57+
58+
FeedResponseListValidator<TestObject> validator2 =
59+
new FeedResponseListValidator
60+
.Builder<TestObject>()
61+
.totalSize(itemCount)
62+
.numberOfPages(1)
63+
.build();
64+
validateQuerySuccess(queryObservable1.byPage(), validator2, TIMEOUT);
65+
}
66+
67+
@AfterClass(groups = { "query" }, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true)
68+
public void afterClass() {
69+
safeClose(client);
70+
}
71+
72+
@BeforeClass(groups = { "query" }, timeOut = SETUP_TIMEOUT)
73+
public void before_TopQueryTests() {
74+
this.client = getClientBuilder().buildAsyncClient();
75+
this.container = getSharedSinglePartitionCosmosContainer(client);
76+
}
77+
}

sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/OrderbyDocumentQueryTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -445,8 +445,9 @@ public void queryScopedToSinglePartition_StartWithContinuationToken() throws Exc
445445
options.setPartitionKey(new PartitionKey("duplicatePartitionKeyValue"));
446446
CosmosPagedFlux<InternalObjectNode> queryObservable = createdCollection.queryItems(query, options, InternalObjectNode.class);
447447

448+
int preferredPageSize = 3;
448449
TestSubscriber<FeedResponse<InternalObjectNode>> subscriber = new TestSubscriber<>();
449-
queryObservable.byPage(3).take(1).subscribe(subscriber);
450+
queryObservable.byPage(preferredPageSize).take(1).subscribe(subscriber);
450451

451452
subscriber.awaitTerminalEvent();
452453
subscriber.assertComplete();
@@ -464,8 +465,7 @@ public void queryScopedToSinglePartition_StartWithContinuationToken() throws Exc
464465
List<InternalObjectNode> expectedDocs = createdDocuments.stream()
465466
.filter(d -> (StringUtils.equals("duplicatePartitionKeyValue", ModelBridgeInternal.getStringFromJsonSerializable(d,"mypk"))))
466467
.filter(d -> (ModelBridgeInternal.getIntFromJsonSerializable(d,"propScopedPartitionInt") > 2)).collect(Collectors.toList());
467-
Integer maxItemCount = ModelBridgeInternal.getMaxItemCountFromQueryRequestOptions(options);
468-
int expectedPageSize = (expectedDocs.size() + maxItemCount - 1) / maxItemCount;
468+
int expectedPageSize = (expectedDocs.size() + preferredPageSize - 1) / preferredPageSize;
469469

470470
assertThat(expectedDocs).hasSize(10 - 3);
471471

@@ -481,7 +481,7 @@ public void queryScopedToSinglePartition_StartWithContinuationToken() throws Exc
481481
.requestChargeGreaterThanOrEqualTo(1.0).build())
482482
.build();
483483

484-
validateQuerySuccess(queryObservable.byPage(page.getContinuationToken()), validator);
484+
validateQuerySuccess(queryObservable.byPage(page.getContinuationToken(), preferredPageSize), validator);
485485
}
486486

487487
@Test(groups = { "query" }, timeOut = TIMEOUT)

sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ParallelDocumentQueryTest.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,12 +115,14 @@ public void queryMetricEquality() throws Exception {
115115
options.setQueryMetricsEnabled(true);
116116
options.setMaxDegreeOfParallelism(0);
117117

118+
int preferredPageSize = 5;
119+
118120
CosmosPagedFlux<InternalObjectNode> queryObservable = createdCollection.queryItems(query, options, InternalObjectNode.class);
119-
List<FeedResponse<InternalObjectNode>> resultList1 = queryObservable.byPage(5).collectList().block();
121+
List<FeedResponse<InternalObjectNode>> resultList1 = queryObservable.byPage(preferredPageSize).collectList().block();
120122

121123
options.setMaxDegreeOfParallelism(4);
122124
CosmosPagedFlux<InternalObjectNode> threadedQueryObs = createdCollection.queryItems(query, options, InternalObjectNode.class);
123-
List<FeedResponse<InternalObjectNode>> resultList2 = threadedQueryObs.byPage().collectList().block();
125+
List<FeedResponse<InternalObjectNode>> resultList2 = threadedQueryObs.byPage(preferredPageSize).collectList().block();
124126

125127
assertThat(resultList1.size()).isEqualTo(resultList2.size());
126128
for(int i = 0; i < resultList1.size(); i++){

sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/SinglePartitionDocumentQueryTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,7 @@ public void continuationToken() throws Exception {
281281
.allPagesSatisfy(new FeedResponseValidator.Builder<InternalObjectNode>()
282282
.requestChargeGreaterThanOrEqualTo(1.0).build())
283283
.build();
284-
validateQuerySuccess(queryObservable.byPage(page.getContinuationToken()), validator);
284+
validateQuerySuccess(queryObservable.byPage(page.getContinuationToken(), maxItemCount), validator);
285285
}
286286

287287
@Test(groups = { "query" }, timeOut = TIMEOUT)

sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TopQueryTests.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ public void queryDocumentsWithTop(Boolean qmEnabled) throws Exception {
5858

5959
int expectedTotalSize = 20;
6060
int expectedNumberOfPages = 3;
61+
int pageSize = 9;
6162
int[] expectedPageLengths = new int[] { 9, 9, 2 };
6263

6364
for (int i = 0; i < 2; i++) {
@@ -81,11 +82,24 @@ public void queryDocumentsWithTop(Boolean qmEnabled) throws Exception {
8182

8283
CosmosPagedFlux<InternalObjectNode> queryObservable3 = createdCollection.queryItems("SELECT TOP 20 * from c", options, InternalObjectNode.class);
8384

85+
// validate the pageSize in byPage() will be honored
8486
FeedResponseListValidator<InternalObjectNode> validator3 = new FeedResponseListValidator.Builder<InternalObjectNode>()
8587
.totalSize(expectedTotalSize).numberOfPages(expectedNumberOfPages).pageLengths(expectedPageLengths)
8688
.hasValidQueryMetrics(qmEnabled).build();
8789

88-
validateQuerySuccess(queryObservable3.byPage(), validator3, TIMEOUT);
90+
validateQuerySuccess(queryObservable3.byPage(pageSize), validator3, TIMEOUT);
91+
92+
// validate default value will be used for byPage
93+
FeedResponseListValidator<InternalObjectNode> validator4 =
94+
new FeedResponseListValidator
95+
.Builder<InternalObjectNode>()
96+
.totalSize(expectedTotalSize)
97+
.numberOfPages(1)
98+
.pageLengths(new int[] { expectedTotalSize })
99+
.hasValidQueryMetrics(qmEnabled)
100+
.build();
101+
102+
validateQuerySuccess(queryObservable3.byPage(), validator4, TIMEOUT);
89103

90104
if (i == 0) {
91105
options.setPartitionKey(new PartitionKey(firstPk));

sdk/cosmos/azure-cosmos/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010

1111
#### Bugs Fixed
1212
* Fixed staleness issue of `COSMOS.MIN_CONNECTION_POOL_SIZE_PER_ENDPOINT` system property - See [PR 36599](https://github.com/Azure/azure-sdk-for-java/pull/36599).
13+
* Fixed an issue where `pageSize` from `byPage` is not always being honored. This only happens when the same `CosmosQueryRequestOptions` being used through different
14+
requests, and different pageSize being used. See [PR 36847](https://github.com/Azure/azure-sdk-for-java/pull/36847)
1315

1416
#### Other Changes
1517
* Handling negative end-to-end timeouts provided more gracefully by throwing a `CosmsoException` (`OperationCancelledException`) instead of `IllegalArgumentException`. - See [PR 36507](https://github.com/Azure/azure-sdk-for-java/pull/36507)

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncContainer.java

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -652,8 +652,6 @@ <T> CosmosPagedFlux<T> readAllItems(CosmosQueryRequestOptions options, Class<T>
652652
queryOptionsAccessor.withEmptyPageDiagnosticsEnabled(nonNullOptions, true)
653653
: nonNullOptions;
654654

655-
queryOptionsAccessor.applyMaxItemCount(options, pagedFluxOptions);
656-
657655
pagedFluxOptions.setTracerAndTelemetryInformation(
658656
this.readAllItemsSpanName,
659657
database.getId(),
@@ -956,8 +954,6 @@ <T> Function<CosmosPagedFluxOptions, Flux<FeedResponse<T>>> queryItemsInternalFu
956954
: nonNullOptions;
957955
String spanName = this.queryItemsSpanName;
958956

959-
queryOptionsAccessor.applyMaxItemCount(options, pagedFluxOptions);
960-
961957
pagedFluxOptions.setTracerAndTelemetryInformation(
962958
spanName,
963959
database.getId(),
@@ -994,7 +990,6 @@ <T> Function<CosmosPagedFluxOptions, Flux<FeedResponse<T>>> queryItemsInternalFu
994990
queryOptionsAccessor.withEmptyPageDiagnosticsEnabled(nonNullOptions, true)
995991
: nonNullOptions;
996992
String spanName = this.queryItemsSpanName;
997-
queryOptionsAccessor.applyMaxItemCount(options, pagedFluxOptions);
998993
pagedFluxOptions.setTracerAndTelemetryInformation(
999994
spanName,
1000995
database.getId(),
@@ -1084,7 +1079,6 @@ <T> Function<CosmosPagedFluxOptions, Flux<FeedResponse<T>>> queryChangeFeedInter
10841079

10851080
CosmosAsyncClient client = this.getDatabase().getClient();
10861081
String spanName = this.queryChangeFeedSpanName;
1087-
cfOptionsAccessor.applyMaxItemCount(cosmosChangeFeedRequestOptions, pagedFluxOptions);
10881082
pagedFluxOptions.setTracerAndTelemetryInformation(
10891083
spanName,
10901084
database.getId(),
@@ -1555,7 +1549,6 @@ public <T> CosmosPagedFlux<T> readAllItems(
15551549
requestOptions.setPartitionKey(partitionKey);
15561550

15571551
return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> {
1558-
queryOptionsAccessor.applyMaxItemCount(options, pagedFluxOptions);
15591552
pagedFluxOptions.setTracerAndTelemetryInformation(
15601553
this.readAllItemsOfLogicalPartitionSpanName,
15611554
database.getId(),
@@ -2709,6 +2702,25 @@ public <T> Mono<FeedResponse<T>> readMany(
27092702

27102703
return cosmosAsyncContainer.readMany(itemIdentityList, requestOptions, classType);
27112704
}
2705+
2706+
@Override
2707+
public <T> Function<CosmosPagedFluxOptions, Flux<FeedResponse<T>>> queryItemsInternalFunc(
2708+
CosmosAsyncContainer cosmosAsyncContainer,
2709+
SqlQuerySpec sqlQuerySpec,
2710+
CosmosQueryRequestOptions cosmosQueryRequestOptions,
2711+
Class<T> classType) {
2712+
2713+
return cosmosAsyncContainer.queryItemsInternalFunc(sqlQuerySpec, cosmosQueryRequestOptions, classType);
2714+
}
2715+
2716+
@Override
2717+
public <T> Function<CosmosPagedFluxOptions, Flux<FeedResponse<T>>> queryItemsInternalFuncWithMonoSqlQuerySpec(
2718+
CosmosAsyncContainer cosmosAsyncContainer,
2719+
Mono<SqlQuerySpec> sqlQuerySpecMono,
2720+
CosmosQueryRequestOptions cosmosQueryRequestOptions,
2721+
Class<T> classType) {
2722+
return cosmosAsyncContainer.queryItemsInternalFunc(sqlQuerySpecMono, cosmosQueryRequestOptions, classType);
2723+
}
27122724
});
27132725
}
27142726

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosBridgeInternal.java

Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,13 @@
55

66
import com.azure.cosmos.implementation.AsyncDocumentClient;
77
import com.azure.cosmos.implementation.ConnectionPolicy;
8-
import com.azure.cosmos.implementation.CosmosClientMetadataCachesSnapshot;
98
import com.azure.cosmos.implementation.Strings;
109
import com.azure.cosmos.implementation.Warning;
1110
import com.azure.cosmos.implementation.query.Transformer;
1211
import com.azure.cosmos.models.CosmosChangeFeedRequestOptions;
13-
import com.azure.cosmos.models.CosmosQueryRequestOptions;
14-
import com.azure.cosmos.models.SqlQuerySpec;
1512
import com.azure.cosmos.util.CosmosPagedFlux;
1613
import com.azure.cosmos.util.UtilBridgeInternal;
1714
import com.fasterxml.jackson.databind.JsonNode;
18-
import reactor.core.publisher.Mono;
1915

2016
import static com.azure.cosmos.implementation.Warning.INTERNAL_USE_ONLY_WARNING;
2117

@@ -133,28 +129,6 @@ public static CosmosException cosmosException(int statusCode, Exception innerExc
133129
return new CosmosException(statusCode, innerException);
134130
}
135131

136-
@Warning(value = INTERNAL_USE_ONLY_WARNING)
137-
public static <T> CosmosPagedFlux<T> queryItemsInternal(CosmosAsyncContainer container,
138-
SqlQuerySpec sqlQuerySpec,
139-
CosmosQueryRequestOptions cosmosQueryRequestOptions,
140-
Transformer<T> transformer) {
141-
return UtilBridgeInternal.createCosmosPagedFlux(transformer.transform(container.queryItemsInternalFunc(
142-
sqlQuerySpec,
143-
cosmosQueryRequestOptions,
144-
JsonNode.class)));
145-
}
146-
147-
@Warning(value = INTERNAL_USE_ONLY_WARNING)
148-
public static <T> CosmosPagedFlux<T> queryItemsInternal(CosmosAsyncContainer container,
149-
Mono<SqlQuerySpec> sqlQuerySpecMono,
150-
CosmosQueryRequestOptions cosmosQueryRequestOptions,
151-
Transformer<T> transformer) {
152-
return UtilBridgeInternal.createCosmosPagedFlux(transformer.transform(container.queryItemsInternalFunc(
153-
sqlQuerySpecMono,
154-
cosmosQueryRequestOptions,
155-
JsonNode.class)));
156-
}
157-
158132
@Warning(value = INTERNAL_USE_ONLY_WARNING)
159133
public static <T> CosmosPagedFlux<T> queryChangeFeedInternal(
160134
CosmosAsyncContainer container,

0 commit comments

Comments
 (0)