Skip to content

Commit 82eed31

Browse files
xinlian12annie-mac
andauthored
add multi-orderby continuation token support (Azure#26267)
* add multi-orderby continuation token support Co-authored-by: annie-mac <annie-mac@MININT-8V4CL0S.redmond.corp.microsoft.com>
1 parent 30b8fb7 commit 82eed31

File tree

13 files changed

+407
-72
lines changed

13 files changed

+407
-72
lines changed

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/HttpConstants.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,7 @@ public static class HttpHeaders {
251251
public static final String QUERY_METRICS = "x-ms-documentdb-query-metrics";
252252
public static final String POPULATE_INDEX_METRICS = "x-ms-cosmos-populateindexmetrics";
253253
public static final String INDEX_UTILIZATION = "x-ms-cosmos-index-utilization";
254+
public static final String QUERY_EXECUTION_INFO = "x-ms-cosmos-query-execution-info";
254255

255256
// Batch operations
256257
public static final String IS_BATCH_ATOMIC = "x-ms-cosmos-batch-atomic";

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/WFConstants.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,5 +76,6 @@ public static class BackendHeaders {
7676
public static final String IS_USER_REQUEST = "x-ms-cosmos-internal-is-user-request";
7777
public static final String BACKEND_REQUEST_DURATION_MILLISECONDS = "x-ms-request-duration-ms";
7878
public static final String INDEX_UTILIZATION = "x-ms-cosmos-index-utilization";
79+
public static final String QUERY_EXECUTION_INFO = "x-ms-cosmos-query-execution-info";
7980
}
8081
}

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdConstants.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -842,6 +842,7 @@ public enum RntbdResponseHeader implements RntbdHeader {
842842
IsRUPerMinuteUsed((short) 0x0027, RntbdTokenType.Byte, false),
843843
QueryMetrics((short) 0x0028, RntbdTokenType.String, false),
844844
IndexUtilization((short) 0x0044, RntbdTokenType.String, false),
845+
QueryExecutionInfo((short) 0x0045, RntbdTokenType.String, false),
845846
GlobalCommittedLSN((short) 0x0029, RntbdTokenType.LongLong, false),
846847
NumberOfReadRegions((short) 0x0030, RntbdTokenType.ULong, false),
847848
OfferReplacePending((short) 0x0031, RntbdTokenType.Byte, false),

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdResponseHeaders.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,8 @@ class RntbdResponseHeaders extends RntbdTokenStream<RntbdResponseHeader> {
9898
@JsonProperty
9999
private final RntbdToken indexUtilization;
100100
@JsonProperty
101+
private final RntbdToken queryExecutionInfo;
102+
@JsonProperty
101103
private final RntbdToken quorumAckedLSN;
102104
@JsonProperty
103105
private final RntbdToken quorumAckedLocalLSN;
@@ -174,6 +176,7 @@ private RntbdResponseHeaders(ByteBuf in) {
174176
this.queriesPerformed = this.get(RntbdResponseHeader.QueriesPerformed);
175177
this.queryMetrics = this.get(RntbdResponseHeader.QueryMetrics);
176178
this.indexUtilization = this.get(RntbdResponseHeader.IndexUtilization);
179+
this.queryExecutionInfo = this.get(RntbdResponseHeader.QueryExecutionInfo);
177180
this.quorumAckedLSN = this.get(RntbdResponseHeader.QuorumAckedLSN);
178181
this.quorumAckedLocalLSN = this.get(RntbdResponseHeader.QuorumAckedLocalLSN);
179182
this.readsPerformed = this.get(RntbdResponseHeader.ReadsPerformed);
@@ -276,6 +279,7 @@ public void setValues(final Map<String, String> headers) {
276279
this.mapValue(this.partitionKeyRangeId, BackendHeaders.PARTITION_KEY_RANGE_ID, String::toString, headers);
277280
this.mapValue(this.queryMetrics, BackendHeaders.QUERY_METRICS, String::toString, headers);
278281
this.mapValue(this.indexUtilization, BackendHeaders.INDEX_UTILIZATION, String::toString, headers);
282+
this.mapValue(this.queryExecutionInfo, BackendHeaders.QUERY_EXECUTION_INFO, String::toString, headers);
279283
this.mapValue(this.quorumAckedLSN, BackendHeaders.QUORUM_ACKED_LSN, Long::parseLong, headers);
280284
this.mapValue(this.quorumAckedLocalLSN, BackendHeaders.QUORUM_ACKED_LOCAL_LSN, Long::parseLong, headers);
281285
this.mapValue(this.requestCharge, HttpHeaders.REQUEST_CHARGE, Double::parseDouble, headers);
@@ -422,6 +426,9 @@ private void collectEntries(final BiConsumer<RntbdToken, Function<RntbdToken, Ma
422426
toStringEntry(BackendHeaders.INDEX_UTILIZATION, token)
423427
);
424428

429+
collector.accept(this.queryExecutionInfo, tokens ->
430+
toStringEntry(BackendHeaders.QUERY_EXECUTION_INFO, tokens));
431+
425432
collector.accept(this.quorumAckedLSN, token ->
426433
toLongEntry(BackendHeaders.QUORUM_ACKED_LSN, token)
427434
);

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/OrderByDocumentQueryExecutionContext.java

Lines changed: 152 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,12 @@
1919
import com.azure.cosmos.implementation.Undefined;
2020
import com.azure.cosmos.implementation.Utils;
2121
import com.azure.cosmos.implementation.Utils.ValueHolder;
22-
import com.azure.cosmos.implementation.apachecommons.lang.NotImplementedException;
2322
import com.azure.cosmos.implementation.apachecommons.lang.tuple.ImmutablePair;
2423
import com.azure.cosmos.implementation.apachecommons.lang.tuple.Pair;
2524
import com.azure.cosmos.implementation.feedranges.FeedRangeEpkImpl;
25+
import com.azure.cosmos.implementation.query.orderbyquery.ComparisonFilters;
26+
import com.azure.cosmos.implementation.query.orderbyquery.ComparisonWithDefinedFilters;
27+
import com.azure.cosmos.implementation.query.orderbyquery.ComparisonWithUndefinedFilters;
2628
import com.azure.cosmos.implementation.query.orderbyquery.OrderByRowResult;
2729
import com.azure.cosmos.implementation.query.orderbyquery.OrderbyRowComparer;
2830
import com.azure.cosmos.implementation.routing.Range;
@@ -279,47 +281,26 @@ private OrderByDocumentQueryExecutionContext<T>.FormattedFilterInfo getFormatted
279281
SortOrder sortOrder = sortOrders[0];
280282
QueryItem orderByItem = orderByItems[0];
281283
Object rawItem = orderByItem.getItem();
282-
String orderByItemToString;
283-
if (rawItem instanceof String) {
284-
orderByItemToString = "\"" + rawItem.toString().replaceAll("\"",
285-
"\\\"") + "\"";
286-
} else {
287-
if (rawItem != null) {
288-
orderByItemToString = rawItem.toString();
289-
} else {
290-
orderByItemToString = "null";
291-
}
292-
}
293-
if (rawItem == Undefined.value()) {
294-
// Handling undefined needs filter literals
295-
// What we really want is to support expression > undefined,
296-
// but the engine evaluates to undefined instead of true or false,
297-
// so we work around this by using the IS_DEFINED() system function.
298284

299-
left.append(sortOrder == SortOrder.Descending ? "false" : "IS_DEFINED(" + expression + ") ");
300-
target.append(sortOrder == SortOrder.Descending ? "NOT IS_DEFINED(" + expression + ")" : "true ");
301-
right.append(sortOrder == SortOrder.Descending ? "NOT IS_DEFINED(" + expression + ")" : "true ");
285+
this.appendToBuilders(left, target, right, "(");
302286

303-
} else {
287+
String orderByItemToString = this.getOrderByItemString(rawItem);
304288

305-
left.append(getFilterString(expression,
306-
(sortOrder == SortOrder.Descending ? "<" : ">"),
307-
orderByItemToString));
308-
309-
if (inclusive) {
310-
target.append(getFilterString(expression,
311-
(sortOrder == SortOrder.Descending ? "<=" : ">="),
312-
orderByItemToString));
313-
} else {
314-
target.append(getFilterString(expression,
315-
(sortOrder == SortOrder.Descending ? "<" : ">"),
316-
orderByItemToString));
317-
}
289+
// Handling undefined needs filter literals
290+
// What we really want is to support expression > undefined,
291+
// but the engine evaluates to undefined instead of true or false,
292+
// so we work around this by using the IS_DEFINED() system function
293+
// ComparisonWithUndefinedFilters is used to handle the logic mentioned above
294+
ComparisonFilters filters =
295+
rawItem == Undefined.value() ? new ComparisonWithUndefinedFilters(expression) : new ComparisonWithDefinedFilters(expression, orderByItemToString);
318296

319-
right.append(getFilterString(expression,
320-
(sortOrder == SortOrder.Descending ? "<=" : ">="),
321-
orderByItemToString));
297+
left.append(sortOrder == SortOrder.Descending ? filters.lessThan() : filters.greaterThan());
298+
if (inclusive) {
299+
target.append(sortOrder == SortOrder.Descending ? filters.lessThanOrEqualTo() : filters.greaterThanOrEqualTo());
300+
} else {
301+
target.append(sortOrder == SortOrder.Descending ? filters.lessThan() : filters.greaterThan());
322302
}
303+
right.append(sortOrder == SortOrder.Descending ? filters.lessThanOrEqualTo() : filters.greaterThanOrEqualTo());
323304

324305
// Now we need to include all the types that match the sort order.
325306
List<String> definedFunctions =
@@ -336,20 +317,148 @@ private OrderByDocumentQueryExecutionContext<T>.FormattedFilterInfo getFormatted
336317
target.append(isDefinedFunctions);
337318
right.append(isDefinedFunctions);
338319

320+
this.appendToBuilders(left, target, right, ")");
321+
339322
} else {
340-
// This code path needs to be implemented, but it's error prone and needs
341-
// testing.
342-
// You can port the implementation from the .net SDK and it should work if
343-
// ported right.
344-
throw new NotImplementedException(
345-
"Resuming a multi order by query from a continuation token is not supported yet.");
323+
// For a multi order by query
324+
// Suppose the query is SELECT* FROM c ORDER BY c.string ASC, c.number ASC
325+
// And we left off on partition N with the value("A", 1)
326+
// Then
327+
// All the partitions to the left will have finished reading("A", 1)
328+
// Partition N is still reading("A", 1)
329+
// All the partitions to the right have let to read a "(A", 1)
330+
// The filters are harder to derive since there are multiple columns
331+
// But the problem reduces to "How do you know one document comes after another in a multi order by query"
332+
// The answer is to just look at it one column at a time.
333+
// For this particular scenario:
334+
// If a first column is greater ex. ("B", blah), then the document comes later in the sort order
335+
// Therefore we want all documents where the first column is greater than "A" which means > "A"
336+
// Or if the first column is a tie, then you look at the second column ex. ("A", blah).
337+
// Therefore we also want all documents where the first column was a tie but the second column is greater which means = "A" AND > 1
338+
// Therefore the filters should be
339+
// (> "A") OR (= "A" AND > 1), (> "A") OR (= "A" AND >= 1), (> "A") OR (= "A" AND >= 1)
340+
// Notice that if we repeated the same logic we for single order by we would have gotten
341+
// > "A" AND > 1, >= "A" AND >= 1, >= "A" AND >= 1
342+
// which is wrong since we missed some documents
343+
// Repeat the same logic for ASC, DESC
344+
// (> "A") OR (= "A" AND < 1), (> "A") OR (= "A" AND <= 1), (> "A") OR (= "A" AND <= 1)
345+
// Again for DESC, ASC
346+
// (< "A") OR (= "A" AND > 1), (< "A") OR (= "A" AND >= 1), (< "A") OR (= "A" AND >= 1)
347+
// And again for DESC DESC
348+
// (< "A") OR (= "A" AND < 1), (< "A") OR (= "A" AND <= 1), (< "A") OR (= "A" AND <= 1)
349+
// The general we look at all prefixes of the order by columns to look for tie breakers.
350+
// Except for the full prefix whose last column follows the rules for single item order by
351+
// And then you just OR all the possibilities together
352+
353+
for (int prefixLength = 1; prefixLength <= numOrderByItems; prefixLength++) {
354+
boolean lastPrefix = prefixLength == numOrderByItems;
355+
356+
this.appendToBuilders(left, target, right, "(");
357+
358+
for (int index = 0; index < prefixLength; index++) {
359+
String expression = expressions[index];
360+
SortOrder sortOrder = sortOrders[index];
361+
QueryItem orderbyItem = orderByItems[index];
362+
Object orderbyRawItem = orderbyItem.getItem();
363+
364+
boolean lastItem = index == prefixLength - 1;
365+
366+
this.appendToBuilders(left, target, right, "(");
367+
String orderByItemToString = getOrderByItemString(orderbyRawItem);
368+
ComparisonFilters filters =
369+
orderbyRawItem == Undefined.value() ? new ComparisonWithUndefinedFilters((expression)) : new ComparisonWithDefinedFilters(expression, orderByItemToString);
370+
371+
if (lastItem) {
372+
if (lastPrefix) {
373+
left.append(sortOrder == SortOrder.Descending ? filters.lessThan() : filters.greaterThan());
374+
375+
if (inclusive) {
376+
target.append(sortOrder == SortOrder.Descending ? filters.lessThanOrEqualTo() : filters.greaterThanOrEqualTo());
377+
} else {
378+
target.append(sortOrder == SortOrder.Descending ? filters.lessThan() : filters.greaterThan());
379+
}
380+
381+
right.append(sortOrder == SortOrder.Descending ? filters.lessThanOrEqualTo() : filters.greaterThanOrEqualTo());
382+
} else {
383+
left.append(sortOrder == SortOrder.Descending ? filters.lessThan() : filters.greaterThan());
384+
target.append(sortOrder == SortOrder.Descending ? filters.lessThan() : filters.greaterThan());
385+
right.append(sortOrder == SortOrder.Descending ? filters.lessThan() : filters.greaterThan());
386+
}
387+
388+
} else {
389+
left.append(filters.equalTo());
390+
target.append(filters.equalTo());
391+
right.append(filters.equalTo());
392+
}
393+
394+
if (lastItem) {
395+
// Now we need to include all the types that match the sort order.
396+
List<String> definedFunctions =
397+
IsSystemFunctions.getIsDefinedFunctions(ItemTypeHelper.getOrderByItemType(orderbyRawItem),
398+
sortOrder == SortOrder.Ascending);
399+
StringBuilder isDefinedFuncBuilder = new StringBuilder();
400+
for (String idf : definedFunctions) {
401+
isDefinedFuncBuilder.append(" OR ");
402+
isDefinedFuncBuilder.append(String.format("%s(%s)", idf, expression));
403+
}
404+
405+
String isDefinedFunctions = isDefinedFuncBuilder.toString();
406+
left.append(isDefinedFunctions);
407+
target.append(isDefinedFunctions);
408+
right.append(isDefinedFunctions);
409+
}
410+
411+
this.appendToBuilders(left, target, right, ")");
412+
if (!lastItem) {
413+
this.appendToBuilders(left, target, right, " AND ");
414+
}
415+
}
416+
417+
this.appendToBuilders(left, target, right, ")");
418+
if (!lastPrefix) {
419+
this.appendToBuilders(left, target, right, " OR ");
420+
}
421+
}
346422
}
347423

348424
return new FormattedFilterInfo(left.toString(),
349425
target.toString(),
350426
right.toString());
351427
}
352428

429+
private String getOrderByItemString(Object orderbyRawItem) {
430+
String orderByItemToString;
431+
if (orderbyRawItem instanceof String) {
432+
orderByItemToString = "\"" + orderbyRawItem.toString().replaceAll("\"",
433+
"\\\"") + "\"";
434+
} else {
435+
if (orderbyRawItem != null) {
436+
orderByItemToString = orderbyRawItem.toString();
437+
} else {
438+
orderByItemToString = "null";
439+
}
440+
}
441+
442+
return orderByItemToString;
443+
}
444+
445+
private void appendToBuilders(StringBuilder leftBuilder, StringBuilder targetBuilder, StringBuilder rightBuilder, String appendText) {
446+
this.appendToBuilders(leftBuilder, targetBuilder, rightBuilder, appendText, appendText, appendText);
447+
}
448+
449+
private void appendToBuilders(
450+
StringBuilder leftBuilder,
451+
StringBuilder targetBuilder,
452+
StringBuilder rightBuilder,
453+
String leftAppendText,
454+
String targetAppendText,
455+
String rightAppendText) {
456+
457+
leftBuilder.append(leftAppendText);
458+
targetBuilder.append(targetAppendText);
459+
rightBuilder.append(rightAppendText);
460+
}
461+
353462
static class IsSystemFunctions {
354463
static final String Defined = "IS_DEFINED";
355464
static final String NotDefined = "NOT IS_DEFINED";
@@ -420,10 +529,6 @@ List<String> getExtendedTypesIsDefinedFunctions(int index, boolean isAscending)
420529
}
421530
}
422531

423-
private String getFilterString(String s1, String s2, String s3) {
424-
return String.format("%s %s %s", s1, s2, s3);
425-
}
426-
427532
@Override
428533
protected OrderByDocumentProducer<T> createDocumentProducer(
429534
String collectionRid,

0 commit comments

Comments
 (0)