Skip to content

Commit d836145

Browse files
Read many optimization (Azure#31723)
* added functionality to choose point reads for single item partition requests * added transformations for CosmosQueryRequestOptions and ResourceResponse * refactor * added functionality to choose point reads for single item partition requests * refactor * added tests * reformat * added tests * added diagnostics to aggregated feed response * replaced var with explicit type * enhanced unit tests * enhanced unit tests * enhanced unit tests * updated CHANGELOG.md * attempt at fixing failing build * attempt at fixing failing build * attempt at fixing failing build * address review comments * address review comments * address review comments * suppress warning * remove unused method * attempt at fixing failing build * attempt at fixing failing build * attempt at fixing failing build * attempt at fixing failing build * added benchmark for readMany * fixed deserialization for readMany * added test for request charge comparison * removed unused imports * revert * Fix leaky abstraction * Added tupleSize for readMany-specific benchmarking * Add tupleSize for readMany-specific benchmarking * Addressed review comments * Added code changes for few code review comments on renaming * Update CHANGELOG.md Co-authored-by: Kushagra Thapar <kuthapar@microsoft.com>
1 parent 6da6052 commit d836145

File tree

13 files changed

+855
-24
lines changed

13 files changed

+855
-24
lines changed

sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncBenchmark.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -313,6 +313,7 @@ private boolean latencyAwareOperations(Configuration.Operation operation) {
313313
case QueryTopOrderby:
314314
case Mixed:
315315
case ReadAllItemsOfLogicalPartition:
316+
case ReadManyLatency:
316317
return true;
317318
default:
318319
return false;
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.cosmos.benchmark;
5+
6+
import com.azure.cosmos.models.CosmosItemIdentity;
7+
import com.azure.cosmos.models.FeedResponse;
8+
import com.azure.cosmos.models.PartitionKey;
9+
import com.codahale.metrics.Timer;
10+
import org.reactivestreams.Subscription;
11+
import reactor.core.publisher.BaseSubscriber;
12+
import reactor.core.publisher.Mono;
13+
import reactor.core.scheduler.Schedulers;
14+
import java.util.ArrayList;
15+
import java.util.List;
16+
import java.util.Random;
17+
18+
class AsyncReadManyBenchmark extends AsyncBenchmark<FeedResponse<PojoizedJson>> {
19+
20+
private final Random r;
21+
22+
static class LatencySubscriber<T> extends BaseSubscriber<T> {
23+
24+
Timer.Context context;
25+
BaseSubscriber<T> baseSubscriber;
26+
27+
LatencySubscriber(BaseSubscriber<T> baseSubscriber) { this.baseSubscriber = baseSubscriber; }
28+
29+
@Override
30+
protected void hookOnSubscribe(Subscription subscription) {
31+
super.hookOnSubscribe(subscription);
32+
}
33+
34+
@Override
35+
protected void hookOnNext(T value) {}
36+
37+
@Override
38+
protected void hookOnComplete() {
39+
context.stop();
40+
baseSubscriber.onComplete();
41+
}
42+
43+
@Override
44+
protected void hookOnError(Throwable throwable) {
45+
context.stop();
46+
super.hookOnError(throwable);
47+
}
48+
}
49+
50+
AsyncReadManyBenchmark(Configuration cfg) {
51+
super(cfg);
52+
r = new Random();
53+
}
54+
55+
@Override
56+
protected void performWorkload(BaseSubscriber<FeedResponse<PojoizedJson>> baseSubscriber, long i) throws Exception {
57+
int tupleSize = configuration.getTupleSize();
58+
int randomIdx = r.nextInt(configuration.getNumberOfPreCreatedDocuments());
59+
List<CosmosItemIdentity> cosmosItemIdentities = new ArrayList<>();
60+
61+
for (int idx = randomIdx; idx < randomIdx + tupleSize; idx++) {
62+
int index = idx % configuration.getNumberOfPreCreatedDocuments();
63+
PojoizedJson doc = docsToRead.get(index);
64+
String partitionKeyValue = (String) doc.getProperty(partitionKey);
65+
PartitionKey partitionKey = new PartitionKey(partitionKeyValue);
66+
67+
cosmosItemIdentities.add(new CosmosItemIdentity(partitionKey, doc.getId()));
68+
}
69+
70+
Mono<FeedResponse<PojoizedJson>> obs = cosmosAsyncContainer.readMany(cosmosItemIdentities, PojoizedJson.class);
71+
72+
concurrencyControlSemaphore.acquire();
73+
74+
switch (configuration.getOperationType()) {
75+
case ReadManyLatency:
76+
readManyLatency(obs, baseSubscriber);
77+
break;
78+
case ReadManyThroughput:
79+
readManyThroughput(obs, baseSubscriber);
80+
break;
81+
default:
82+
throw new IllegalArgumentException("invalid workload type " + configuration.getOperationType());
83+
}
84+
}
85+
86+
private void readManyLatency(Mono<FeedResponse<PojoizedJson>> obs, BaseSubscriber<FeedResponse<PojoizedJson>> baseSubscriber) {
87+
LatencySubscriber<FeedResponse<PojoizedJson>> latencySubscriber = new LatencySubscriber<>(baseSubscriber);
88+
latencySubscriber.context = latency.time();
89+
90+
obs.subscribeOn(Schedulers.parallel()).subscribe(latencySubscriber);
91+
}
92+
93+
private void readManyThroughput(Mono<FeedResponse<PojoizedJson>> obs, BaseSubscriber<FeedResponse<PojoizedJson>> baseSubscriber) {
94+
obs.subscribeOn(Schedulers.parallel()).subscribe(baseSubscriber);
95+
}
96+
}

sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/Configuration.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,9 @@ public class Configuration {
109109
@Parameter(names = "-encryptionEnabled", description = "Control switch to enable the encryption operation")
110110
private boolean encryptionEnabled = false;
111111

112+
@Parameter(names = "-tupleSize", description = "Number of cosmos identity tuples to be queried using readMany")
113+
private int tupleSize = 1;
114+
112115
@Parameter(names = "-operation", description = "Type of Workload:\n"
113116
+ "\tReadThroughput- run a READ workload that prints only throughput *\n"
114117
+ "\tReadThroughputWithMultipleClients - run a READ workload that prints throughput and latency for multiple client read.*\n"
@@ -129,7 +132,9 @@ public class Configuration {
129132
+ "\tCtlWorkload - run a ctl workflow.*\n"
130133
+ "\tReadAllItemsOfLogicalPartition - run a workload that uses readAllItems for a logical partition and prints throughput\n"
131134
+ "\n\t* writes 10k documents initially, which are used in the reads"
132-
+ "\tLinkedInCtlWorkload - ctl for LinkedIn workload.*\n",
135+
+ "\tLinkedInCtlWorkload - ctl for LinkedIn workload.*\n"
136+
+ "\tReadManyLatency - run a workload for readMany for a finite number of cosmos identity tuples that prints both throughput and latency*\n"
137+
+ "\tReadManyThroughput - run a workload for readMany for a finite no of cosmos identity tuples that prints throughput*\n",
133138
converter = Operation.OperationTypeConverter.class)
134139
private Operation operation = Operation.WriteThroughput;
135140

@@ -238,7 +243,9 @@ public enum Operation {
238243
ReadThroughputWithMultipleClients,
239244
CtlWorkload,
240245
ReadAllItemsOfLogicalPartition,
241-
LinkedInCtlWorkload;
246+
LinkedInCtlWorkload,
247+
ReadManyLatency,
248+
ReadManyThroughput;
242249

243250
static Operation fromString(String code) {
244251

@@ -511,6 +518,10 @@ public int getClientTelemetrySchedulingInSeconds() {
511518
return clientTelemetrySchedulingInSeconds;
512519
}
513520

521+
public Integer getTupleSize() {
522+
return tupleSize;
523+
}
524+
514525
public void tryGetValuesFromSystem() {
515526
serviceEndpoint = StringUtils.defaultString(Strings.emptyToNull(System.getenv().get("SERVICE_END_POINT")),
516527
serviceEndpoint);
@@ -568,6 +579,10 @@ public void tryGetValuesFromSystem() {
568579
encryptionEnabled = Boolean.parseBoolean(StringUtils.defaultString(Strings.emptyToNull(System.getenv().get(
569580
"ENCRYPTED_ENABLED")),
570581
Boolean.toString(encryptionEnabled)));
582+
583+
tupleSize = Integer.parseInt(
584+
StringUtils.defaultString(Strings.emptyToNull(System.getenv().get("COSMOS_IDENTITY_TUPLE_SIZE")),
585+
Integer.toString(tupleSize)));
571586
}
572587

573588
private synchronized MeterRegistry azureMonitorMeterRegistry(String instrumentationKey) {

sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/Main.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,10 @@ private static void asyncBenchmark(Configuration cfg) throws Exception {
145145
case ReadAllItemsOfLogicalPartition:
146146
benchmark = new AsyncQueryBenchmark(cfg);
147147
break;
148-
148+
case ReadManyLatency:
149+
case ReadManyThroughput:
150+
benchmark = new AsyncReadManyBenchmark(cfg);
151+
break;
149152
case Mixed:
150153
benchmark = new AsyncMixedBenchmark(cfg);
151154
break;

sdk/cosmos/azure-cosmos/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
#### Other Changes
1212
* Fixed issue on noisy `CancellationException` log - See [PR 31882](https://github.com/Azure/azure-sdk-for-java/pull/31882)
1313
* Added `retyrAfterInMs` to `StoreResult` in `CosmosDiagnostics` - See [31219](https://github.com/Azure/azure-sdk-for-java/pull/31219)
14+
* Optimized the `readMany` API to make use of point reads when a single item is requested for a given physical partition - See [PR 31723](https://github.com/Azure/azure-sdk-for-java/pull/31723)
1415

1516
### 4.39.0 (2022-11-16)
1617

sdk/cosmos/azure-cosmos/pom.xml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,6 @@ Licensed under the MIT License.
258258
<version>4.5.1</version> <!-- {x-version-update;org.mockito:mockito-inline;external_dependency} -->
259259
<scope>test</scope>
260260
</dependency>
261-
262261
</dependencies>
263262

264263
<build>

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,7 @@ public interface CosmosQueryRequestOptionsAccessor {
237237
<T> Function<JsonNode, T> getItemFactoryMethod(CosmosQueryRequestOptions queryRequestOptions, Class<T> classOfT);
238238
CosmosQueryRequestOptions setItemFactoryMethod(CosmosQueryRequestOptions queryRequestOptions, Function<JsonNode, ?> factoryMethod);
239239
String getQueryNameOrDefault(CosmosQueryRequestOptions queryRequestOptions, String defaultQueryName);
240+
RequestOptions toRequestOptions(CosmosQueryRequestOptions queryRequestOptions);
240241
}
241242
}
242243

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java

Lines changed: 60 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import com.azure.cosmos.models.CosmosBatchResponse;
5858
import com.azure.cosmos.models.CosmosChangeFeedRequestOptions;
5959
import com.azure.cosmos.models.CosmosItemIdentity;
60+
import com.azure.cosmos.models.CosmosItemResponse;
6061
import com.azure.cosmos.models.CosmosPatchOperations;
6162
import com.azure.cosmos.models.CosmosQueryRequestOptions;
6263
import com.azure.cosmos.models.FeedRange;
@@ -82,6 +83,7 @@
8283
import java.nio.ByteBuffer;
8384
import java.time.Instant;
8485
import java.util.ArrayList;
86+
import java.util.Arrays;
8587
import java.util.Collections;
8688
import java.util.EnumSet;
8789
import java.util.HashMap;
@@ -2230,9 +2232,9 @@ public <T> Mono<FeedResponse<T>> readMany(
22302232
collection.getResourceId(),
22312233
null,
22322234
null);
2235+
22332236
return valueHolderMono.flatMap(collectionRoutingMapValueHolder -> {
2234-
Map<PartitionKeyRange, List<CosmosItemIdentity>> partitionRangeItemKeyMap =
2235-
new HashMap<>();
2237+
Map<PartitionKeyRange, List<CosmosItemIdentity>> partitionRangeItemKeyMap = new HashMap<>();
22362238
CollectionRoutingMap routingMap = collectionRoutingMapValueHolder.v;
22372239
if (routingMap == null) {
22382240
throw new IllegalStateException("Failed to get routing map.");
@@ -2267,21 +2269,29 @@ public <T> Mono<FeedResponse<T>> readMany(
22672269

22682270
//Create the range query map that contains the query to be run for that
22692271
// partitionkeyrange
2270-
Map<PartitionKeyRange, SqlQuerySpec> rangeQueryMap;
2271-
rangeQueryMap = getRangeQueryMap(partitionRangeItemKeyMap,
2272-
collection.getPartitionKey());
2272+
Map<PartitionKeyRange, SqlQuerySpec> rangeQueryMap = getRangeQueryMap(partitionRangeItemKeyMap, collection.getPartitionKey());
2273+
2274+
// create point reads
2275+
Flux<FeedResponse<Document>> pointReads = createPointReadOperations(
2276+
partitionRangeItemKeyMap,
2277+
resourceLink,
2278+
options,
2279+
klass);
22732280

22742281
// create the executable query
2275-
return createReadManyQuery(
2282+
Flux<FeedResponse<Document>> queries = createReadManyQuery(
22762283
resourceLink,
22772284
new SqlQuerySpec(DUMMY_SQL_QUERY),
22782285
options,
22792286
Document.class,
22802287
ResourceType.Document,
22812288
collection,
2282-
Collections.unmodifiableMap(rangeQueryMap))
2283-
.collectList() // aggregating the result construct a FeedResponse and
2284-
// aggregate RUs.
2289+
Collections.unmodifiableMap(rangeQueryMap));
2290+
2291+
// merge results from point reads and queries
2292+
return Flux.merge(pointReads, queries)
2293+
.collectList()
2294+
// aggregating the result to construct a FeedResponse and aggregate RUs.
22852295
.map(feedList -> {
22862296
List<T> finalList = new ArrayList<>();
22872297
HashMap<String, String> headers = new HashMap<>();
@@ -2334,14 +2344,16 @@ private Map<PartitionKeyRange, SqlQuerySpec> getRangeQueryMap(
23342344
for(Map.Entry<PartitionKeyRange, List<CosmosItemIdentity>> entry: partitionRangeItemKeyMap.entrySet()) {
23352345

23362346
SqlQuerySpec sqlQuerySpec;
2337-
if (partitionKeySelector.equals("[\"id\"]")) {
2338-
sqlQuerySpec = createReadManyQuerySpecPartitionKeyIdSame(entry.getValue(), partitionKeySelector);
2339-
} else {
2340-
sqlQuerySpec = createReadManyQuerySpec(entry.getValue(), partitionKeySelector);
2347+
List<CosmosItemIdentity> cosmosItemIdentityList = entry.getValue();
2348+
if (cosmosItemIdentityList.size() > 1) {
2349+
if (partitionKeySelector.equals("[\"id\"]")) {
2350+
sqlQuerySpec = createReadManyQuerySpecPartitionKeyIdSame(cosmosItemIdentityList, partitionKeySelector);
2351+
} else {
2352+
sqlQuerySpec = createReadManyQuerySpec(cosmosItemIdentityList, partitionKeySelector);
2353+
}
2354+
// Add query for this partition to rangeQueryMap
2355+
rangeQueryMap.put(entry.getKey(), sqlQuerySpec);
23412356
}
2342-
// Add query for this partition to rangeQueryMap
2343-
rangeQueryMap.put(entry.getKey(), sqlQuerySpec);
2344-
23452357
}
23462358

23472359
return rangeQueryMap;
@@ -2436,6 +2448,10 @@ private <T extends Resource> Flux<FeedResponse<T>> createReadManyQuery(
24362448
DocumentCollection collection,
24372449
Map<PartitionKeyRange, SqlQuerySpec> rangeQueryMap) {
24382450

2451+
if (rangeQueryMap.isEmpty()) {
2452+
return Flux.empty();
2453+
}
2454+
24392455
UUID activityId = Utils.randomUUID();
24402456
IDocumentQueryClient queryClient = documentQueryClientImpl(RxDocumentClientImpl.this, getOperationContextAndListenerTuple(options));
24412457
Flux<? extends IDocumentQueryExecutionContext<T>> executionContext =
@@ -2451,6 +2467,34 @@ private <T extends Resource> Flux<FeedResponse<T>> createReadManyQuery(
24512467
return executionContext.flatMap(IDocumentQueryExecutionContext<T>::executeAsync);
24522468
}
24532469

2470+
private <T> Flux<FeedResponse<Document>> createPointReadOperations(
2471+
Map<PartitionKeyRange, List<CosmosItemIdentity>> singleItemPartitionRequestMap,
2472+
String resourceLink,
2473+
CosmosQueryRequestOptions queryRequestOptions,
2474+
Class<T> klass
2475+
) {
2476+
return Flux.fromIterable(singleItemPartitionRequestMap.values())
2477+
.flatMap(cosmosItemIdentityList -> {
2478+
if (cosmosItemIdentityList.size() == 1) {
2479+
CosmosItemIdentity firstIdentity = cosmosItemIdentityList.get(0);
2480+
RequestOptions requestOptions = ImplementationBridgeHelpers
2481+
.CosmosQueryRequestOptionsHelper
2482+
.getCosmosQueryRequestOptionsAccessor()
2483+
.toRequestOptions(queryRequestOptions);
2484+
requestOptions.setPartitionKey(firstIdentity.getPartitionKey());
2485+
return this.readDocument((resourceLink + firstIdentity.getId()), requestOptions);
2486+
}
2487+
return Mono.empty();
2488+
})
2489+
.flatMap(resourceResponse -> {
2490+
CosmosItemResponse<T> cosmosItemResponse =
2491+
ModelBridgeInternal.createCosmosAsyncItemResponse(resourceResponse, klass, getItemDeserializer());
2492+
FeedResponse<Document> feedResponse = ModelBridgeInternal.createFeedResponse(Arrays.asList(InternalObjectNode.fromObject(cosmosItemResponse.getItem())), cosmosItemResponse.getResponseHeaders());
2493+
BridgeInternal.addClientSideDiagnosticsToFeed(feedResponse.getCosmosDiagnostics(), Arrays.asList(BridgeInternal.getClientSideRequestStatics(cosmosItemResponse.getDiagnostics())));
2494+
return Mono.just(feedResponse);
2495+
});
2496+
}
2497+
24542498
@Override
24552499
public <T> Flux<FeedResponse<T>> queryDocuments(
24562500
String collectionLink, String query, CosmosQueryRequestOptions options, Class<T> classOfT) {

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosQueryRequestOptions.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import com.azure.cosmos.ConsistencyLevel;
77
import com.azure.cosmos.implementation.Configs;
88
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
9+
import com.azure.cosmos.implementation.RequestOptions;
910
import com.azure.cosmos.implementation.Strings;
1011
import com.azure.cosmos.implementation.spark.OperationContextAndListenerTuple;
1112
import com.azure.cosmos.util.Beta;
@@ -731,6 +732,26 @@ public String getQueryNameOrDefault(CosmosQueryRequestOptions queryRequestOption
731732

732733
return queryRequestOptions.getQueryNameOrDefault(defaultQueryName);
733734
}
735+
736+
@Override
737+
public RequestOptions toRequestOptions(CosmosQueryRequestOptions queryRequestOptions) {
738+
RequestOptions requestOptions = new RequestOptions();
739+
requestOptions.setConsistencyLevel(queryRequestOptions.getConsistencyLevel());
740+
requestOptions.setSessionToken(queryRequestOptions.getSessionToken());
741+
requestOptions.setPartitionKey(queryRequestOptions.getPartitionKey());
742+
requestOptions.setThroughputControlGroupName(queryRequestOptions.getThroughputControlGroupName());
743+
requestOptions.setOperationContextAndListenerTuple(queryRequestOptions.getOperationContextAndListenerTuple());
744+
requestOptions.setDedicatedGatewayRequestOptions(queryRequestOptions.getDedicatedGatewayRequestOptions());
745+
requestOptions.setThresholdForDiagnosticsOnTracer(queryRequestOptions.getThresholdForDiagnosticsOnTracer());
746+
747+
if (queryRequestOptions.customOptions != null) {
748+
for(Map.Entry<String, String> entry : queryRequestOptions.customOptions.entrySet()) {
749+
requestOptions.setHeader(entry.getKey(), entry.getValue());
750+
}
751+
}
752+
753+
return requestOptions;
754+
}
734755
});
735756
}
736757

0 commit comments

Comments
 (0)