Skip to content

Commit 0ab856a

Browse files
authored
Resuming query from a continuation token after split (Azure#18698)
* Adds support for resuming query from a continuation token after partition split
1 parent a2c5465 commit 0ab856a

File tree

4 files changed

+183
-20
lines changed

4 files changed

+183
-20
lines changed

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/OrderByDocumentQueryExecutionContext.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,9 +231,14 @@ private ImmutablePair<Integer, FormattedFilterInfo> getFiltersForPartitions(
231231
List<PartitionKeyRange> partitionKeyRanges,
232232
List<SortOrder> sortOrders,
233233
Collection<String> orderByExpressions) {
234+
235+
ValueHolder<Map<String, OrderByContinuationToken>> valueHolder = new ValueHolder<>();
236+
valueHolder.v = this.targetRangeToOrderByContinuationTokenMap;
234237
// Find the partition key range we left off on
235238
int startIndex = this.findTargetRangeAndExtractContinuationTokens(partitionKeyRanges,
236-
orderByContinuationToken.getCompositeContinuationToken().getRange());
239+
orderByContinuationToken.getCompositeContinuationToken().getRange(),
240+
valueHolder,
241+
orderByContinuationToken);
237242

238243
// Get the filters.
239244
FormattedFilterInfo formattedFilterInfo = this.getFormattedFilters(orderByExpressions,

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ParallelDocumentQueryExecutionContext.java

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@
4242
*/
4343
public class ParallelDocumentQueryExecutionContext<T extends Resource>
4444
extends ParallelDocumentQueryExecutionContextBase<T> {
45-
private CosmosQueryRequestOptions cosmosQueryRequestOptions;
45+
private final CosmosQueryRequestOptions cosmosQueryRequestOptions;
46+
private final Map<PartitionKeyRange, String> partitionKeyRangeToContinuationTokenMap;
4647

4748
private ParallelDocumentQueryExecutionContext(
4849
DiagnosticsClientContext diagnosticsClientContext,
@@ -61,6 +62,7 @@ private ParallelDocumentQueryExecutionContext(
6162
super(diagnosticsClientContext, client, partitionKeyRanges, resourceTypeEnum, resourceType, query, cosmosQueryRequestOptions, resourceLink,
6263
rewrittenQuery, isContinuationExpected, getLazyFeedResponse, correlatedActivityId);
6364
this.cosmosQueryRequestOptions = cosmosQueryRequestOptions;
65+
partitionKeyRangeToContinuationTokenMap = new HashMap<>();
6466
}
6567

6668
public static <T extends Resource> Flux<IDocumentQueryExecutionComponent<T>> createAsync(
@@ -133,7 +135,6 @@ private void initialize(
133135
int initialPageSize,
134136
String continuationToken) {
135137
// Generate the corresponding continuation token map.
136-
Map<PartitionKeyRange, String> partitionKeyRangeToContinuationTokenMap = new HashMap<PartitionKeyRange, String>();
137138
if (continuationToken == null) {
138139
// If the user does not give a continuation token,
139140
// then just start the query from the first partition.
@@ -152,7 +153,7 @@ private void initialize(
152153
// then we know that partitions 0, 1, 2 are fully drained.
153154

154155
// Check to see if composite continuation token is a valid JSON.
155-
ValueHolder<CompositeContinuationToken> outCompositeContinuationToken = new ValueHolder<CompositeContinuationToken>();
156+
ValueHolder<CompositeContinuationToken> outCompositeContinuationToken = new ValueHolder<>();
156157
if (!CompositeContinuationToken.tryParse(continuationToken,
157158
outCompositeContinuationToken)) {
158159
String message = String.format("INVALID JSON in continuation token %s for Parallel~Context",
@@ -163,20 +164,17 @@ private void initialize(
163164

164165
CompositeContinuationToken compositeContinuationToken = outCompositeContinuationToken.v;
165166

166-
// Get the right hand side of the query ranges:
167+
// Get the right hand side of the query ranges and set continuation token for relevant ranges in the
168+
// partitionKeyRangeToContinuationTokenMap
167169
List<PartitionKeyRange> filteredPartitionKeyRanges = this.getPartitionKeyRangesForContinuation(
168170
compositeContinuationToken,
169171
targetRanges);
170172

171-
// The first partition is the one we left off on and have a backend continuation
172-
// token for.
173-
partitionKeyRangeToContinuationTokenMap.put(filteredPartitionKeyRanges.get(0),
174-
compositeContinuationToken.getToken());
175-
176173
// The remaining partitions we have yet to touch / have null continuation tokens
177174
for (int i = 1; i < filteredPartitionKeyRanges.size(); i++) {
178-
partitionKeyRangeToContinuationTokenMap.put(filteredPartitionKeyRanges.get(i),
179-
null);
175+
if (!partitionKeyRangeToContinuationTokenMap.containsKey(filteredPartitionKeyRanges.get(i))) {
176+
partitionKeyRangeToContinuationTokenMap.put(filteredPartitionKeyRanges.get(i), null);
177+
}
180178
}
181179
}
182180

@@ -187,14 +185,21 @@ private void initialize(
187185
}
188186

189187
private List<PartitionKeyRange> getPartitionKeyRangesForContinuation(
190-
CompositeContinuationToken compositeContinuationToken,
191-
List<PartitionKeyRange> partitionKeyRanges) {
192-
// Find the partition key range we left off on
188+
CompositeContinuationToken compositeContinuationToken,
189+
List<PartitionKeyRange> partitionKeyRanges) {
190+
Map<String, String> partitionRangeIdToTokenMap = new HashMap<>();
191+
ValueHolder<Map<String, String>> outPartitionRangeIdToTokenMap = new ValueHolder<>(partitionRangeIdToTokenMap);
192+
// Find the partition key range we left off on and fill the range to continuation token map
193193
int startIndex = this.findTargetRangeAndExtractContinuationTokens(partitionKeyRanges,
194-
compositeContinuationToken.getRange());
195-
194+
compositeContinuationToken.getRange(),
195+
outPartitionRangeIdToTokenMap,
196+
compositeContinuationToken.getToken());
196197
List<PartitionKeyRange> rightHandSideRanges = new ArrayList<PartitionKeyRange>();
197198
for (int i = startIndex; i < partitionKeyRanges.size(); i++) {
199+
PartitionKeyRange range = partitionKeyRanges.get(i);
200+
if (partitionRangeIdToTokenMap.containsKey(range.getId())) {
201+
this.partitionKeyRangeToContinuationTokenMap.put(range, compositeContinuationToken.getToken());
202+
}
198203
rightHandSideRanges.add(partitionKeyRanges.get(i));
199204
}
200205

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ParallelDocumentQueryExecutionContextBase.java

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import com.azure.cosmos.implementation.ResourceType;
1212
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
1313
import com.azure.cosmos.implementation.Strings;
14+
import com.azure.cosmos.implementation.Utils;
1415
import com.azure.cosmos.implementation.routing.PartitionKeyInternal;
1516
import com.azure.cosmos.implementation.routing.Range;
1617
import com.azure.cosmos.models.CosmosQueryRequestOptions;
@@ -23,12 +24,14 @@
2324
import reactor.core.publisher.Mono;
2425

2526
import java.util.ArrayList;
27+
import java.util.Comparator;
2628
import java.util.HashMap;
2729
import java.util.List;
2830
import java.util.Map;
2931
import java.util.UUID;
3032
import java.util.concurrent.Callable;
3133
import java.util.function.Function;
34+
import java.util.stream.Collectors;
3235

3336
/**
3437
* While this class is public, but it is not part of our published public APIs.
@@ -103,7 +106,9 @@ protected void initialize(String collectionRid,
103106
}
104107

105108
protected <TContinuationToken> int findTargetRangeAndExtractContinuationTokens(
106-
List<PartitionKeyRange> partitionKeyRanges, Range<String> range) {
109+
List<PartitionKeyRange> partitionKeyRanges, Range<String> range,
110+
Utils.ValueHolder<Map<String, TContinuationToken>> outPartitionRangeToContinuation,
111+
TContinuationToken continuation) {
107112
if (partitionKeyRanges == null) {
108113
throw new IllegalArgumentException("partitionKeyRanges can not be null.");
109114
}
@@ -132,12 +137,29 @@ protected <TContinuationToken> int findTargetRangeAndExtractContinuationTokens(
132137
String.format("Could not find partition key range for continuation token: {0}", needle));
133138
}
134139

140+
List<PartitionKeyRange> replacementRanges;
141+
142+
// find what ranges make up the supplied continuation token
143+
replacementRanges = partitionKeyRanges.stream()
144+
.filter(p -> range.getMin().compareTo(p.getMinInclusive()) <= 0 &&
145+
range.getMax().compareTo(p.getMaxExclusive()) >= 0)
146+
.sorted(Comparator.comparing(PartitionKeyRange::getId))
147+
.collect(Collectors.toList());
148+
149+
if (replacementRanges.isEmpty()) {
150+
throw BridgeInternal.createCosmosException(HttpConstants.StatusCodes.BADREQUEST,
151+
String.format("Cannot find ranges for continuation {}", continuation));
152+
}
153+
154+
replacementRanges.forEach(r -> outPartitionRangeToContinuation.v.put(r.getId(), continuation));
155+
135156
return minIndex;
136157
}
137158

138159
abstract protected DocumentProducer<T> createDocumentProducer(String collectionRid, PartitionKeyRange targetRange,
139-
String initialContinuationToken
140-
, int initialPageSize, CosmosQueryRequestOptions cosmosQueryRequestOptions, SqlQuerySpec querySpecForInit,
160+
String initialContinuationToken, int initialPageSize,
161+
CosmosQueryRequestOptions cosmosQueryRequestOptions,
162+
SqlQuerySpec querySpecForInit,
141163
Map<String, String> commonRequestHeaders,
142164
TriFunction<PartitionKeyRange, String, Integer, RxDocumentServiceRequest> createRequestFunc,
143165
Function<RxDocumentServiceRequest, Mono<FeedResponse<T>>> executeFunc,

sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/QueryValidationTests.java

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// Licensed under the MIT License.
33
package com.azure.cosmos.rx;
44

5+
import com.azure.cosmos.BridgeInternal;
56
import com.azure.cosmos.CosmosAsyncClient;
67
import com.azure.cosmos.CosmosAsyncContainer;
78
import com.azure.cosmos.CosmosAsyncDatabase;
@@ -11,8 +12,10 @@
1112
import com.azure.cosmos.implementation.AsyncDocumentClient;
1213
import com.azure.cosmos.implementation.Configs;
1314
import com.azure.cosmos.implementation.HttpConstants;
15+
import com.azure.cosmos.implementation.PartitionKeyRange;
1416
import com.azure.cosmos.models.CosmosContainerProperties;
1517
import com.azure.cosmos.models.CosmosContainerRequestOptions;
18+
import com.azure.cosmos.models.CosmosContainerResponse;
1619
import com.azure.cosmos.models.CosmosDatabaseProperties;
1720
import com.azure.cosmos.models.CosmosPermissionProperties;
1821
import com.azure.cosmos.models.CosmosQueryRequestOptions;
@@ -26,16 +29,21 @@
2629
import com.azure.cosmos.models.SqlParameter;
2730
import com.azure.cosmos.models.SqlQuerySpec;
2831
import com.azure.cosmos.models.ThroughputProperties;
32+
import com.azure.cosmos.models.ThroughputResponse;
2933
import com.azure.cosmos.models.TriggerOperation;
3034
import com.azure.cosmos.models.TriggerType;
3135
import com.azure.cosmos.util.CosmosPagedFlux;
36+
import com.fasterxml.jackson.databind.JsonNode;
3237
import io.reactivex.subscribers.TestSubscriber;
38+
import org.jetbrains.annotations.NotNull;
3339
import org.testng.annotations.BeforeClass;
3440
import org.testng.annotations.DataProvider;
3541
import org.testng.annotations.Factory;
3642
import org.testng.annotations.Test;
43+
import reactor.core.publisher.Flux;
3744

3845
import java.util.ArrayList;
46+
import java.util.Arrays;
3947
import java.util.Collections;
4048
import java.util.Comparator;
4149
import java.util.List;
@@ -317,6 +325,129 @@ public void queryPlanCacheSinglePartitionParameterizedQueriesCorrectness() {
317325

318326
}
319327

328+
@Test(groups = {"simple"}, timeOut = TIMEOUT * 10)
329+
public void splitQueryContinuationToken() throws Exception {
330+
String containerId = "splittestcontainer_" + UUID.randomUUID();
331+
int itemCount = 20;
332+
333+
//Create container
334+
CosmosContainerProperties containerProperties = new CosmosContainerProperties(containerId, "/mypk");
335+
CosmosContainerResponse containerResponse = createdDatabase.createContainer(containerProperties).block();
336+
CosmosAsyncContainer container = createdDatabase.getContainer(containerId);
337+
AsyncDocumentClient asyncDocumentClient = BridgeInternal.getContextClient(this.client);
338+
339+
//Insert some documents
340+
List<TestObject> testObjects = insertDocuments(itemCount, Arrays.asList("CA", "US"), container);
341+
342+
List<String> sortedObjects = testObjects.stream()
343+
.sorted(Comparator.comparing(TestObject::getProp))
344+
.map(TestObject::getId)
345+
.collect(Collectors.toList());
346+
347+
String query = "Select * from c";
348+
String orderByQuery = "select * from c order by c.prop";
349+
350+
List<PartitionKeyRange> partitionKeyRanges = getPartitionKeyRanges(containerId, asyncDocumentClient);
351+
String requestContinuation = null;
352+
String orderByRequestContinuation = null;
353+
int preferredPageSize = 15;
354+
ArrayList<TestObject> resultList = new ArrayList<>();
355+
ArrayList<TestObject> orderByResultList = new ArrayList<>();
356+
357+
// Query
358+
FeedResponse<TestObject> jsonNodeFeedResponse = container
359+
.queryItems(query, new CosmosQueryRequestOptions(), TestObject.class)
360+
.byPage(preferredPageSize).blockFirst();
361+
assert jsonNodeFeedResponse != null;
362+
resultList.addAll(jsonNodeFeedResponse.getResults());
363+
requestContinuation = jsonNodeFeedResponse.getContinuationToken();
364+
365+
// Orderby query
366+
FeedResponse<TestObject> orderByFeedResponse = container
367+
.queryItems(orderByQuery, new CosmosQueryRequestOptions(),
368+
TestObject.class)
369+
.byPage(preferredPageSize).blockFirst();
370+
assert orderByFeedResponse != null;
371+
orderByResultList.addAll(orderByFeedResponse.getResults());
372+
orderByRequestContinuation = orderByFeedResponse.getContinuationToken();
373+
374+
// Scale up the throughput for a split
375+
logger.info("Scaling up throughput for split");
376+
ThroughputProperties throughputProperties = ThroughputProperties.createManualThroughput(16000);
377+
ThroughputResponse throughputResponse = container.replaceThroughput(throughputProperties).block();
378+
logger.info("Throughput replace request submitted for {} ",
379+
throughputResponse.getProperties().getManualThroughput());
380+
throughputResponse = container.readThroughput().block();
381+
382+
383+
// Wait for the throughput update to complete so that we get the partition split
384+
while (true) {
385+
assert throughputResponse != null;
386+
if (!throughputResponse.isReplacePending()) {
387+
break;
388+
}
389+
logger.info("Waiting for split to complete");
390+
Thread.sleep(10 * 1000);
391+
throughputResponse = container.readThroughput().block();
392+
}
393+
394+
logger.info("Resuming query from the continuation");
395+
// Read number of partitions. Should be greater than one
396+
List<PartitionKeyRange> partitionKeyRangesAfterSplit = getPartitionKeyRanges(containerId, asyncDocumentClient);
397+
assertThat(partitionKeyRangesAfterSplit.size()).isGreaterThan(partitionKeyRanges.size())
398+
.as("Partition ranges should increase after split");
399+
logger.info("After split num partitions = {}", partitionKeyRangesAfterSplit.size());
400+
401+
// Reading item to refresh cache
402+
container.readItem(testObjects.get(0).getId(), new PartitionKey(testObjects.get(0).getMypk()),
403+
JsonNode.class).block();
404+
405+
// Resume the query with continuation token saved above and make sure you get all the documents
406+
Flux<FeedResponse<TestObject>> feedResponseFlux = container
407+
.queryItems(query, new CosmosQueryRequestOptions(),
408+
TestObject.class)
409+
.byPage(requestContinuation, preferredPageSize);
410+
411+
for (FeedResponse<TestObject> nodeFeedResponse : feedResponseFlux.toIterable()) {
412+
resultList.addAll(nodeFeedResponse.getResults());
413+
}
414+
415+
// Resume the orderby query with continuation token saved above and make sure you get all the documents
416+
Flux<FeedResponse<TestObject>> orderfeedResponseFlux = container
417+
.queryItems(orderByQuery, new CosmosQueryRequestOptions(),
418+
TestObject.class)
419+
.byPage(orderByRequestContinuation, preferredPageSize);
420+
421+
for (FeedResponse<TestObject> nodeFeedResponse : orderfeedResponseFlux.toIterable()) {
422+
orderByResultList.addAll(nodeFeedResponse.getResults());
423+
}
424+
425+
List<String> sourceIds = testObjects.stream().map(obj -> obj.getId()).collect(Collectors.toList());
426+
List<String> resultIds = resultList.stream().map(obj -> obj.getId()).collect(Collectors.toList());
427+
List<String> orderResultIds = orderByResultList.stream().map(obj -> obj.getId()).collect(Collectors.toList());
428+
429+
assertThat(resultIds).containsExactlyInAnyOrderElementsOf(sourceIds)
430+
.as("Resuming query from continuation token after split validated");
431+
432+
assertThat(orderResultIds).containsExactlyElementsOf(sortedObjects)
433+
.as("Resuming orderby query from continuation token after split validated");
434+
435+
container.delete().block();
436+
}
437+
438+
@NotNull
439+
private List<PartitionKeyRange> getPartitionKeyRanges(
440+
String containerId, AsyncDocumentClient asyncDocumentClient) {
441+
List<PartitionKeyRange> partitionKeyRanges = new ArrayList<>();
442+
List<FeedResponse<PartitionKeyRange>> partitionFeedResponseList = asyncDocumentClient
443+
.readPartitionKeyRanges("/dbs/" + createdDatabase.getId()
444+
+ "/colls/" + containerId,
445+
new CosmosQueryRequestOptions())
446+
.collectList().block();
447+
partitionFeedResponseList.forEach(f -> partitionKeyRanges.addAll(f.getResults()));
448+
return partitionKeyRanges;
449+
}
450+
320451
private <T> List<T> queryAndGetResults(SqlQuerySpec querySpec, CosmosQueryRequestOptions options, Class<T> type) {
321452
CosmosPagedFlux<T> queryPagedFlux = createdContainer.queryItems(querySpec, options, type);
322453
TestSubscriber<T> testSubscriber = new TestSubscriber<>();

0 commit comments

Comments
 (0)