Skip to content

Commit 4ea9fcf

Browse files
[azure-cosmos-benchmark] Linkedin CTL workload implementation (Azure#18689)
* Adding test setup implementations * Adding data gen for an entity * Adding modified version of the LI data accessor * Implement the GetTestRunner; fix the BulkLoad client creation * Adapting implementation to CTL metric accumulation strategy * Execute tests in parallel * Fixing the static and other imports * Adding copyright header * Addressing code review comments * Update sdk/cosmos/azure-cosmos-benchmark/src/test/java/com/azure/cosmos/benchmark/linkedin/data/TestInvitationDataGenerator.java Co-authored-by: Mohammad Derakhshani <moderakh@users.noreply.github.com> Co-authored-by: Mohammad Derakhshani <moderakh@users.noreply.github.com>
1 parent 1551f06 commit 4ea9fcf

36 files changed

+2466
-42
lines changed

sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/Configuration.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,9 @@ public class Configuration {
108108
+ "\tReadMyWrites - run a workflow of writes followed by reads and queries attempting to read the write.*\n"
109109
+ "\tCtlWorkload - run a ctl workflow.*\n"
110110
+ "\tReadAllItemsOfLogicalPartition - run a workload that uses readAllItems for a logical partition and prints throughput\n"
111-
+ "\n\t* writes 10k documents initially, which are used in the reads", converter = OperationTypeConverter.class)
111+
+ "\n\t* writes 10k documents initially, which are used in the reads"
112+
+ "\tLinkedInCtlWorkload - ctl for LinkedIn workload.*\n",
113+
converter = OperationTypeConverter.class)
112114
private Operation operation = Operation.WriteThroughput;
113115

114116
@Parameter(names = "-concurrency", description = "Degree of Concurrency in Inserting Documents."
@@ -177,7 +179,8 @@ public enum Operation {
177179
ReadMyWrites,
178180
ReadThroughputWithMultipleClients,
179181
CtlWorkload,
180-
ReadAllItemsOfLogicalPartition;
182+
ReadAllItemsOfLogicalPartition,
183+
LinkedInCtlWorkload;
181184

182185
static Operation fromString(String code) {
183186

sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/Main.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,15 @@
44
package com.azure.cosmos.benchmark;
55

66
import com.azure.cosmos.benchmark.ctl.AsyncCtlWorkload;
7+
import com.azure.cosmos.benchmark.linkedin.LICtlWorkload;
78
import com.beust.jcommander.JCommander;
89
import com.beust.jcommander.ParameterException;
10+
import java.util.Optional;
911
import org.slf4j.Logger;
1012
import org.slf4j.LoggerFactory;
1113

1214
import static com.azure.cosmos.benchmark.Configuration.Operation.CtlWorkload;
15+
import static com.azure.cosmos.benchmark.Configuration.Operation.LinkedInCtlWorkload;
1316
import static com.azure.cosmos.benchmark.Configuration.Operation.ReadThroughputWithMultipleClients;
1417

1518
public class Main {
@@ -38,6 +41,8 @@ public static void main(String[] args) throws Exception {
3841
asyncMultiClientBenchmark(cfg);
3942
} else if(cfg.getOperationType().equals(CtlWorkload)) {
4043
asyncCtlWorkload(cfg);
44+
} else if (cfg.getOperationType().equals(LinkedInCtlWorkload)) {
45+
linkedInCtlWorkload(cfg);
4146
}
4247
else {
4348
asyncBenchmark(cfg);
@@ -181,4 +186,26 @@ private static void asyncCtlWorkload(Configuration cfg) throws Exception {
181186
}
182187
}
183188
}
189+
190+
private static void linkedInCtlWorkload(Configuration cfg) {
191+
LOGGER.info("Executing the LinkedIn ctl workload");
192+
LICtlWorkload workload = null;
193+
try {
194+
workload = new LICtlWorkload(cfg);
195+
196+
LOGGER.info("Setting up the LinkedIn ctl workload");
197+
workload.setup();
198+
199+
LOGGER.info("Starting the LinkedIn ctl workload");
200+
workload.run();
201+
} catch (Exception e) {
202+
LOGGER.error("Exception received while executing the LinkedIn ctl workload", e);
203+
throw e;
204+
}
205+
finally {
206+
Optional.ofNullable(workload)
207+
.ifPresent(LICtlWorkload::shutdown);
208+
}
209+
LOGGER.info("Completed LinkedIn ctl workload execution");
210+
}
184211
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.cosmos.benchmark;
5+
6+
import com.codahale.metrics.ConsoleReporter;
7+
import com.codahale.metrics.CsvReporter;
8+
import com.codahale.metrics.MetricFilter;
9+
import com.codahale.metrics.MetricRegistry;
10+
import com.codahale.metrics.ScheduledReporter;
11+
import com.codahale.metrics.graphite.Graphite;
12+
import com.codahale.metrics.graphite.GraphiteReporter;
13+
import java.net.InetSocketAddress;
14+
import java.util.concurrent.TimeUnit;
15+
16+
17+
public class ScheduledReporterFactory {
18+
19+
private ScheduledReporterFactory() {
20+
}
21+
22+
/**
23+
* @param configuration CTL workload parameters
24+
* @param metricsRegistry MetricRegistry instance for tracking various execution metrics
25+
* @return ScheduledReporter for reporting the captured metrics
26+
*/
27+
public static ScheduledReporter create(final Configuration configuration,
28+
final MetricRegistry metricsRegistry) {
29+
if (configuration.getGraphiteEndpoint() != null) {
30+
final Graphite graphite = new Graphite(new InetSocketAddress(
31+
configuration.getGraphiteEndpoint(),
32+
configuration.getGraphiteEndpointPort()));
33+
return GraphiteReporter.forRegistry(metricsRegistry)
34+
.prefixedWith(configuration.getOperationType().name())
35+
.convertDurationsTo(TimeUnit.MILLISECONDS)
36+
.convertRatesTo(TimeUnit.SECONDS)
37+
.filter(MetricFilter.ALL)
38+
.build(graphite);
39+
} else if (configuration.getReportingDirectory() != null) {
40+
return CsvReporter.forRegistry(metricsRegistry)
41+
.convertDurationsTo(TimeUnit.MILLISECONDS)
42+
.convertRatesTo(TimeUnit.SECONDS)
43+
.build(configuration.getReportingDirectory());
44+
} else {
45+
return ConsoleReporter.forRegistry(metricsRegistry)
46+
.convertDurationsTo(TimeUnit.MILLISECONDS)
47+
.convertRatesTo(TimeUnit.SECONDS)
48+
.build();
49+
}
50+
}
51+
}

sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/ctl/AsyncCtlWorkload.java

Lines changed: 11 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -16,35 +16,21 @@
1616
import com.azure.cosmos.benchmark.BenchmarkRequestSubscriber;
1717
import com.azure.cosmos.benchmark.Configuration;
1818
import com.azure.cosmos.benchmark.PojoizedJson;
19+
import com.azure.cosmos.benchmark.ScheduledReporterFactory;
1920
import com.azure.cosmos.implementation.HttpConstants;
2021
import com.azure.cosmos.implementation.OperationType;
2122
import com.azure.cosmos.implementation.RequestOptions;
2223
import com.azure.cosmos.models.CosmosQueryRequestOptions;
2324
import com.azure.cosmos.models.PartitionKey;
2425
import com.azure.cosmos.models.ThroughputProperties;
25-
import com.codahale.metrics.ConsoleReporter;
26-
import com.codahale.metrics.CsvReporter;
2726
import com.codahale.metrics.Meter;
28-
import com.codahale.metrics.MetricFilter;
2927
import com.codahale.metrics.MetricRegistry;
3028
import com.codahale.metrics.ScheduledReporter;
3129
import com.codahale.metrics.Timer;
32-
import com.codahale.metrics.graphite.Graphite;
33-
import com.codahale.metrics.graphite.GraphiteReporter;
3430
import com.codahale.metrics.jvm.CachedThreadStatesGaugeSet;
3531
import com.codahale.metrics.jvm.GarbageCollectorMetricSet;
3632
import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
3733
import io.micrometer.core.instrument.MeterRegistry;
38-
import org.apache.commons.lang3.RandomStringUtils;
39-
import org.mpierce.metrics.reservoir.hdrhistogram.HdrHistogramResetOnSnapshotReservoir;
40-
import org.slf4j.Logger;
41-
import org.slf4j.LoggerFactory;
42-
import reactor.core.publisher.BaseSubscriber;
43-
import reactor.core.publisher.Flux;
44-
import reactor.core.publisher.Mono;
45-
import reactor.core.scheduler.Schedulers;
46-
47-
import java.net.InetSocketAddress;
4834
import java.util.ArrayList;
4935
import java.util.HashMap;
5036
import java.util.List;
@@ -54,6 +40,15 @@
5440
import java.util.concurrent.Semaphore;
5541
import java.util.concurrent.TimeUnit;
5642
import java.util.concurrent.atomic.AtomicLong;
43+
import org.apache.commons.lang3.RandomStringUtils;
44+
import org.mpierce.metrics.reservoir.hdrhistogram.HdrHistogramResetOnSnapshotReservoir;
45+
import org.slf4j.Logger;
46+
import org.slf4j.LoggerFactory;
47+
import reactor.core.publisher.BaseSubscriber;
48+
import reactor.core.publisher.Flux;
49+
import reactor.core.publisher.Mono;
50+
import reactor.core.scheduler.Schedulers;
51+
5752

5853
public class AsyncCtlWorkload {
5954
private final String PERCENT_PARSING_ERROR = "Unable to parse user provided readWriteQueryPct ";
@@ -124,7 +119,7 @@ public AsyncCtlWorkload(Configuration cfg) {
124119
metricsRegistry.register("memory", new MemoryUsageGaugeSet());
125120
}
126121

127-
initializeReporter(cfg);
122+
reporter = ScheduledReporterFactory.create(cfg, metricsRegistry);
128123

129124
MeterRegistry registry = configuration.getAzureMonitorMeterRegistry();
130125

@@ -346,28 +341,4 @@ private void createDatabaseAndContainers(Configuration cfg) {
346341
}
347342
}
348343
}
349-
350-
private void initializeReporter(Configuration configuration) {
351-
if (configuration.getGraphiteEndpoint() != null) {
352-
final Graphite graphite = new Graphite(new InetSocketAddress(
353-
configuration.getGraphiteEndpoint(),
354-
configuration.getGraphiteEndpointPort()));
355-
reporter = GraphiteReporter.forRegistry(metricsRegistry)
356-
.prefixedWith(configuration.getOperationType().name())
357-
.convertDurationsTo(TimeUnit.MILLISECONDS)
358-
.convertRatesTo(TimeUnit.SECONDS)
359-
.filter(MetricFilter.ALL)
360-
.build(graphite);
361-
} else if (configuration.getReportingDirectory() != null) {
362-
reporter = CsvReporter.forRegistry(metricsRegistry)
363-
.convertDurationsTo(TimeUnit.MILLISECONDS)
364-
.convertRatesTo(TimeUnit.SECONDS)
365-
.build(configuration.getReportingDirectory());
366-
} else {
367-
reporter = ConsoleReporter.forRegistry(metricsRegistry)
368-
.convertDurationsTo(TimeUnit.MILLISECONDS)
369-
.convertRatesTo(TimeUnit.SECONDS)
370-
.build();
371-
}
372-
}
373344
}
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.cosmos.benchmark.linkedin;
5+
6+
import com.azure.cosmos.ConnectionMode;
7+
import com.azure.cosmos.ConsistencyLevel;
8+
import com.azure.cosmos.CosmosAsyncClient;
9+
import com.azure.cosmos.CosmosClientBuilder;
10+
import com.azure.cosmos.DirectConnectionConfig;
11+
import com.azure.cosmos.GatewayConnectionConfig;
12+
import com.azure.cosmos.ThrottlingRetryOptions;
13+
import com.azure.cosmos.benchmark.Configuration;
14+
import com.google.common.base.Preconditions;
15+
import java.time.Duration;
16+
17+
18+
/**
19+
* Factory for initializing an Async client for all CosmosDB operations
20+
*
21+
* Different values for the DirectConnection and GatewayConnectionConfigs are explicitly defined.
22+
* In most cases, the value is the default from the SDK at the time the class was implemented.
23+
* This will allow us to tweak each param, and observe performance changes + also decouple the CTL
24+
* from changes to the default values.
25+
*/
26+
public class AsyncClientFactory {
27+
28+
private static final DirectConnectionConfig DIRECT_CONNECTION_CONFIG = defaultDirectConfig();
29+
private static final GatewayConnectionConfig GATEWAY_CONNECTION_CONFIG = defaultGatewayConfig();
30+
private static final ThrottlingRetryOptions DEFAULT_THROTTLING_RETRY_OPTIONS = defaultThrottlingRetryOptions();
31+
private static final ThrottlingRetryOptions BULKLOAD_THROTTLING_RETRY_OPTIONS = bulkloadThrottlingRetryOptions();
32+
/**
33+
* Prevent direct initialization
34+
*/
35+
private AsyncClientFactory() {
36+
}
37+
38+
/**
39+
* Builds a Cosmos async client using the configuration options defined
40+
*
41+
* @param cfg Configuration encapsulating options for configuring the AsyncClient
42+
* @return CosmosAsyncClient initialized using the parameters in the Configuration
43+
*/
44+
public static CosmosAsyncClient buildAsyncClient(final Configuration cfg) {
45+
Preconditions.checkNotNull(cfg, "The Workload configuration defining the parameters can not be null");
46+
final CosmosClientBuilder cosmosClientBuilder = new CosmosClientBuilder()
47+
.endpoint(cfg.getServiceEndpoint())
48+
.key(cfg.getMasterKey())
49+
.consistencyLevel(cfg.getConsistencyLevel())
50+
.throttlingRetryOptions(DEFAULT_THROTTLING_RETRY_OPTIONS)
51+
.contentResponseOnWriteEnabled(Boolean.parseBoolean(cfg.isContentResponseOnWriteEnabled()));
52+
53+
// Configure the Direct/Gateway mode
54+
if (cfg.getConnectionMode().equals(ConnectionMode.DIRECT)) {
55+
cosmosClientBuilder.directMode(DIRECT_CONNECTION_CONFIG, GATEWAY_CONNECTION_CONFIG);
56+
} else {
57+
cosmosClientBuilder.gatewayMode(GATEWAY_CONNECTION_CONFIG);
58+
}
59+
60+
return cosmosClientBuilder
61+
.endpointDiscoveryEnabled(false)
62+
.multipleWriteRegionsEnabled(false)
63+
.buildAsyncClient();
64+
}
65+
66+
/**
67+
* Builds a Cosmos async client used for bulk loading the data in the collection. The throttling
68+
* and the direct connection configs will be set differently for this.
69+
*
70+
* @param cfg Configuration encapsulating options for configuring the Bulkload AsyncClient
71+
* @return CosmosAsyncClient for Bulk loading the data into the collection
72+
*/
73+
public static CosmosAsyncClient buildBulkLoadAsyncClient(final Configuration cfg) {
74+
Preconditions.checkNotNull(cfg, "The Workload configuration defining the parameters can not be null");
75+
final CosmosClientBuilder cosmosClientBuilder = new CosmosClientBuilder()
76+
.endpoint(cfg.getServiceEndpoint())
77+
.key(cfg.getMasterKey())
78+
.consistencyLevel(ConsistencyLevel.EVENTUAL)
79+
.throttlingRetryOptions(BULKLOAD_THROTTLING_RETRY_OPTIONS)
80+
.contentResponseOnWriteEnabled(Boolean.parseBoolean(cfg.isContentResponseOnWriteEnabled()));
81+
82+
// Configure the Direct/Gateway mode
83+
if (cfg.getConnectionMode().equals(ConnectionMode.DIRECT)) {
84+
cosmosClientBuilder.directMode(DIRECT_CONNECTION_CONFIG, GATEWAY_CONNECTION_CONFIG);
85+
} else {
86+
cosmosClientBuilder.gatewayMode(GATEWAY_CONNECTION_CONFIG);
87+
}
88+
89+
return cosmosClientBuilder
90+
.endpointDiscoveryEnabled(false)
91+
.multipleWriteRegionsEnabled(false)
92+
.buildAsyncClient();
93+
}
94+
95+
private static DirectConnectionConfig defaultDirectConfig() {
96+
return new DirectConnectionConfig()
97+
.setConnectTimeout(Duration.ofSeconds(5L)) // Default
98+
.setConnectionEndpointRediscoveryEnabled(true) // Custom
99+
.setIdleEndpointTimeout(Duration.ofHours(1L)) // Default
100+
.setIdleConnectionTimeout(Duration.ofSeconds(60)) // Custom
101+
.setMaxConnectionsPerEndpoint(130) // Default
102+
.setMaxRequestsPerConnection(30); // Default
103+
}
104+
105+
private static GatewayConnectionConfig defaultGatewayConfig() {
106+
return new GatewayConnectionConfig()
107+
.setMaxConnectionPoolSize(3200) // Custom
108+
.setIdleConnectionTimeout(Duration.ofSeconds(60)); // Default
109+
}
110+
111+
private static ThrottlingRetryOptions defaultThrottlingRetryOptions() {
112+
return new ThrottlingRetryOptions()
113+
.setMaxRetryAttemptsOnThrottledRequests(0) // Custom
114+
.setMaxRetryWaitTime(Duration.ofMillis(0)); // Custom
115+
}
116+
117+
private static ThrottlingRetryOptions bulkloadThrottlingRetryOptions() {
118+
return new ThrottlingRetryOptions()
119+
.setMaxRetryAttemptsOnThrottledRequests(5) // Custom
120+
.setMaxRetryWaitTime(Duration.ofSeconds(60)); // Custom
121+
}
122+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.cosmos.benchmark.linkedin;
5+
6+
import com.azure.cosmos.benchmark.linkedin.data.InvitationDataGenerator;
7+
import com.azure.cosmos.benchmark.linkedin.data.Key;
8+
import com.fasterxml.jackson.databind.node.ObjectNode;
9+
import java.util.Map;
10+
11+
12+
public class DataGenerator {
13+
14+
private DataGenerator() {
15+
16+
}
17+
18+
/**
19+
* Generate N records modeling the Invitation Data stored in CosmosDB
20+
*
21+
* @param recordCount Number of records to generate
22+
* @return Map of Key to JsonNode representing each record
23+
*/
24+
public static Map<Key, ObjectNode> createInvitationRecords(int recordCount) {
25+
final InvitationDataGenerator invitationDataGenerator = new InvitationDataGenerator();
26+
return invitationDataGenerator.generate(recordCount);
27+
}
28+
}

0 commit comments

Comments
 (0)