Skip to content

Commit 7786a39

Browse files
amarathavaleAmar Athavale
andauthored
Adding the QueryExecutor implementation (Azure#19515)
* Adding the QueryExecutor implementation, without using it. Refactored resource management * Rebased off the latest main branch * Changing the batch load params, and explicitly setting the PartKey in the queryItems * Adding a configurable param for the bulkLoad batch size * Resolving changes from the PR review Co-authored-by: Amar Athavale <aathaval@linkedin.com>
1 parent e8505ad commit 7786a39

File tree

16 files changed

+593
-127
lines changed

16 files changed

+593
-127
lines changed

sdk/cosmos/azure-cosmos-benchmark/ctl/linkedin/run_benchmark.sh

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
## - ctl_concurrency (optional: default 50)
1111
## - ctl_consistency_level (optional: default Session)
1212
## - ctl_number_of_precreated_documents (optional: default 100,000)
13+
## - ctl_bulk_load_batch_size (optional: default 200,000)
1314
## - ctl_number_of_operations (optional: default 1,000,000)
1415
## - ctl_max_running_time_duration (optional: default 10 minutes)
1516
## - ctl_printing_interval (optional: default 30 seconds)
@@ -63,6 +64,13 @@ else
6364
number_of_precreated_documents=$ctl_number_of_precreated_documents
6465
fi
6566

67+
if [ -z "$ctl_bulk_load_batch_size" ]
68+
then
69+
bulk_load_batch_size=200000
70+
else
71+
bulk_load_batch_size=$ctl_bulk_load_batch_size
72+
fi
73+
6674
if [ -z "$ctl_number_of_operations" ]
6775
then
6876
number_of_operations=-1
@@ -98,9 +106,9 @@ jvm_opt=""
98106

99107
if [ -z "$ctl_graphite_endpoint" ]
100108
then
101-
java -Xmx8g -Xms8g $jvm_opt -Dcosmos.directModeProtocol=$protocol -Dazure.cosmos.directModeProtocol=$protocol -jar "$jar_file" -serviceEndpoint "$service_endpoint" -masterKey "$master_key" -databaseId "$database_name" -collectionId "$collection_name" -numberOfCollectionForCtl "$number_Of_collection" -throughput $throughput -consistencyLevel $consistency_level -concurrency $concurrency -numberOfOperations $number_of_operations -operation $operation -connectionMode $connection_mode -maxRunningTimeDuration $max_running_time_duration -numberOfPreCreatedDocuments $number_of_precreated_documents -printingInterval $printing_interval -manageResources 2>&1 | tee -a "$log_filename"
109+
java -Xmx8g -Xms8g $jvm_opt -Dcosmos.directModeProtocol=$protocol -Dazure.cosmos.directModeProtocol=$protocol -jar "$jar_file" -serviceEndpoint "$service_endpoint" -masterKey "$master_key" -databaseId "$database_name" -collectionId "$collection_name" -numberOfCollectionForCtl "$number_Of_collection" -throughput $throughput -consistencyLevel $consistency_level -concurrency $concurrency -numberOfOperations $number_of_operations -operation $operation -connectionMode $connection_mode -maxRunningTimeDuration $max_running_time_duration -numberOfPreCreatedDocuments $number_of_precreated_documents -bulkloadBatchSize $bulk_load_batch_size -printingInterval $printing_interval 2>&1 | tee -a "$log_filename"
102110
else
103-
java -Xmx8g -Xms8g $jvm_opt -Dcosmos.directModeProtocol=$protocol -Dazure.cosmos.directModeProtocol=$protocol -jar "$jar_file" -serviceEndpoint "$service_endpoint" -masterKey "$master_key" -databaseId "$database_name" -collectionId "$collection_name" -numberOfCollectionForCtl "$number_Of_collection" -throughput $throughput -consistencyLevel $consistency_level -concurrency $concurrency -numberOfOperations $number_of_operations -operation $operation -connectionMode $connection_mode -maxRunningTimeDuration $max_running_time_duration -graphiteEndpoint $ctl_graphite_endpoint -numberOfPreCreatedDocuments $number_of_precreated_documents -printingInterval $printing_interval -manageResources 2>&1 | tee -a "$log_filename"
111+
java -Xmx8g -Xms8g $jvm_opt -Dcosmos.directModeProtocol=$protocol -Dazure.cosmos.directModeProtocol=$protocol -jar "$jar_file" -serviceEndpoint "$service_endpoint" -masterKey "$master_key" -databaseId "$database_name" -collectionId "$collection_name" -numberOfCollectionForCtl "$number_Of_collection" -throughput $throughput -consistencyLevel $consistency_level -concurrency $concurrency -numberOfOperations $number_of_operations -operation $operation -connectionMode $connection_mode -maxRunningTimeDuration $max_running_time_duration -graphiteEndpoint $ctl_graphite_endpoint -numberOfPreCreatedDocuments $number_of_precreated_documents -bulkloadBatchSize $bulk_load_batch_size -printingInterval $printing_interval 2>&1 | tee -a "$log_filename"
104112
fi
105113

106114
end=`date +%s`

sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/Configuration.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -89,8 +89,8 @@ public class Configuration {
8989
@Parameter(names = "-readWriteQueryPct", description = "Comma separated read write query workload percent")
9090
private String readWriteQueryPct = "90,9,1";
9191

92-
@Parameter(names = "-manageResources", description = "Control switch for creating/deleting underlying test resources")
93-
private boolean manageResources = false;
92+
@Parameter(names = "-manageDatabase", description = "Control switch for creating/deleting underlying database resource")
93+
private boolean manageDatabase = false;
9494

9595
@Parameter(names = "-operation", description = "Type of Workload:\n"
9696
+ "\tReadThroughput- run a READ workload that prints only throughput *\n"
@@ -161,6 +161,9 @@ public Duration convert(String value) {
161161
@Parameter(names = "-contentResponseOnWriteEnabled", description = "if set to false, does not returns content response on document write operations")
162162
private String contentResponseOnWriteEnabled = String.valueOf(true);
163163

164+
@Parameter(names = "-bulkloadBatchSize", description = "Control the number of documents uploaded in each BulkExecutor load iteration (Only supported for the LinkedInCtlWorkload)")
165+
private int bulkloadBatchSize = 200000;
166+
164167
@Parameter(names = {"-h", "-help", "--help"}, description = "Help", help = true)
165168
private boolean help = false;
166169

@@ -393,8 +396,12 @@ public String getReadWriteQueryPct() {
393396
return this.readWriteQueryPct;
394397
}
395398

396-
public boolean shouldManageResources() {
397-
return this.manageResources;
399+
public boolean shouldManageDatabase() {
400+
return this.manageDatabase;
401+
}
402+
403+
public int getBulkloadBatchSize() {
404+
return this.bulkloadBatchSize;
398405
}
399406

400407
public String toString() {

sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/linkedin/ResourceManagerImpl.java renamed to sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/linkedin/CollectionResourceManager.java

Lines changed: 18 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -10,83 +10,69 @@
1010
import com.azure.cosmos.benchmark.Configuration;
1111
import com.azure.cosmos.benchmark.linkedin.data.CollectionAttributes;
1212
import com.azure.cosmos.benchmark.linkedin.data.EntityConfiguration;
13+
import com.azure.cosmos.benchmark.linkedin.impl.Constants;
1314
import com.azure.cosmos.models.CosmosContainerProperties;
14-
import com.azure.cosmos.models.ThroughputProperties;
1515
import com.google.common.base.Preconditions;
1616
import java.time.Duration;
1717
import java.util.List;
1818
import java.util.stream.Collectors;
1919
import org.slf4j.Logger;
2020
import org.slf4j.LoggerFactory;
2121

22-
import static com.azure.cosmos.benchmark.linkedin.impl.Constants.PARTITION_KEY_PATH;
23-
import static com.azure.cosmos.models.ThroughputProperties.createManualThroughput;
2422

25-
26-
public class ResourceManagerImpl implements ResourceManager {
27-
private static final Logger LOGGER = LoggerFactory.getLogger(ResourceManagerImpl.class);
23+
/**
24+
* Implementation for managing only the Collections for this test. This class facilitates
25+
* container creation after the CTL environment has provisioned the database with the
26+
* required throughput
27+
*/
28+
public class CollectionResourceManager implements ResourceManager {
29+
private static final Logger LOGGER = LoggerFactory.getLogger(CollectionResourceManager.class);
2830
private static final Duration RESOURCE_CRUD_WAIT_TIME = Duration.ofSeconds(30);
2931

3032
private final Configuration _configuration;
3133
private final EntityConfiguration _entityConfiguration;
3234
private final CosmosAsyncClient _client;
3335

34-
public ResourceManagerImpl(final Configuration configuration,
36+
public CollectionResourceManager(final Configuration configuration,
3537
final EntityConfiguration entityConfiguration,
3638
final CosmosAsyncClient client) {
3739
Preconditions.checkNotNull(configuration,
3840
"The Workload configuration defining the parameters can not be null");
3941
Preconditions.checkNotNull(entityConfiguration,
4042
"The Test Entity specific configuration can not be null");
4143
Preconditions.checkNotNull(client, "Need a non-null client for "
42-
+ "setting up the Database and containers for the test");
44+
+ "setting up the Database and collections for the test");
4345
_configuration = configuration;
4446
_entityConfiguration = entityConfiguration;
4547
_client = client;
4648
}
4749

4850
@Override
49-
public void createDatabase() throws CosmosException {
50-
try {
51-
LOGGER.info("Creating database {} for the ctl workload if one doesn't exist", _configuration.getDatabaseId());
52-
final ThroughputProperties throughputProperties = createManualThroughput(_configuration.getThroughput());
53-
_client.createDatabaseIfNotExists(_configuration.getDatabaseId(), throughputProperties)
54-
.block(RESOURCE_CRUD_WAIT_TIME);
55-
} catch (CosmosException e) {
56-
LOGGER.error("Exception while creating database {}", _configuration.getDatabaseId(), e);
57-
throw e;
58-
}
59-
60-
deleteExistingContainers();
61-
}
62-
63-
@Override
64-
public void createContainer() throws CosmosException {
51+
public void createResources() throws CosmosException {
6552
final String containerName = _configuration.getCollectionId();
6653
final CosmosAsyncDatabase database = _client.getDatabase(_configuration.getDatabaseId());
6754
final CollectionAttributes collectionAttributes = _entityConfiguration.collectionAttributes();
6855
try {
6956
LOGGER.info("Creating container {} in the database {}", containerName, database.getId());
7057
final CosmosContainerProperties containerProperties =
71-
new CosmosContainerProperties(containerName, PARTITION_KEY_PATH)
58+
new CosmosContainerProperties(containerName, Constants.PARTITION_KEY_PATH)
7259
.setIndexingPolicy(collectionAttributes.indexingPolicy());
7360
database.createContainerIfNotExists(containerProperties)
7461
.block(RESOURCE_CRUD_WAIT_TIME);
7562
} catch (CosmosException e) {
76-
LOGGER.error("Exception while creating container {}", containerName, e);
63+
LOGGER.error("Exception while creating collection {}", containerName, e);
7764
throw e;
7865
}
7966
}
8067

8168
@Override
8269
public void deleteResources() {
83-
// Delete all the containers in the database
84-
deleteExistingContainers();
70+
deleteExistingCollections();
8571

86-
LOGGER.info("Resource cleanup completed");
72+
LOGGER.info("Collection resource cleanup completed");
8773
}
8874

89-
private void deleteExistingContainers() {
75+
private void deleteExistingCollections() {
9076
final CosmosAsyncDatabase database = _client.getDatabase(_configuration.getDatabaseId());
9177
final List<CosmosAsyncContainer> cosmosAsyncContainers = database.readAllContainers()
9278
.byPage()
@@ -98,12 +84,12 @@ private void deleteExistingContainers() {
9884

9985
// Run a best effort attempt to delete all existing containers and data there-in
10086
for (CosmosAsyncContainer cosmosAsyncContainer : cosmosAsyncContainers) {
101-
LOGGER.info("Deleting container {} in the Database {}", cosmosAsyncContainer.getId(), _configuration.getDatabaseId());
87+
LOGGER.info("Deleting collection {} in the Database {}", cosmosAsyncContainer.getId(), _configuration.getDatabaseId());
10288
try {
10389
cosmosAsyncContainer.delete()
10490
.block(RESOURCE_CRUD_WAIT_TIME);
10591
} catch (CosmosException e) {
106-
LOGGER.error("Error deleting container {} in the Database {}",
92+
LOGGER.error("Error deleting collection {} in the Database {}",
10793
cosmosAsyncContainer.getId(), _configuration.getDatabaseId(), e);
10894
}
10995
}

sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/linkedin/DataGenerationIterator.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,29 +9,32 @@
99
import com.google.common.base.Preconditions;
1010
import java.util.Iterator;
1111
import java.util.Map;
12-
import java.util.stream.Collectors;
1312

1413

1514
/**
1615
* This class facilitates generating data in batches.
1716
*/
1817
public class DataGenerationIterator implements Iterator<Map<Key, ObjectNode>> {
1918

20-
public static final int BATCH_SIZE = 200000;
21-
2219
private final DataGenerator _dataGenerator;
2320
private final int _totalRecordCount;
2421
private int _totalDataGenerated;
22+
private final int _batchLoadBatchSize;
2523

2624
/**
2725
* @param dataGenerator The underlying DataGenerator capable of generating a batch of records
2826
* @param recordCount Number of records we want to generate generate for this test.
29-
* Actual data generation happens in pre-determined batch size
27+
* @param batchLoadBatchSize The number of documents to generate, and load, in each BulkExecutor iteration
3028
*/
31-
public DataGenerationIterator(final DataGenerator dataGenerator, int recordCount) {
29+
public DataGenerationIterator(final DataGenerator dataGenerator, int recordCount, int batchLoadBatchSize) {
30+
Preconditions.checkArgument(recordCount > 0,
31+
"The number of documents to generate must be greater than 0");
32+
Preconditions.checkArgument(batchLoadBatchSize > 0,
33+
"The number of documents to generate and load on each BulkExecutor load iteration must be greater than 0");
3234
_dataGenerator = Preconditions.checkNotNull(dataGenerator,
3335
"The underlying DataGenerator for this iterator can not be null");
3436
_totalRecordCount = recordCount;
37+
_batchLoadBatchSize = batchLoadBatchSize;
3538
_totalDataGenerated = 0;
3639
}
3740

@@ -42,7 +45,7 @@ public boolean hasNext() {
4245

4346
@Override
4447
public Map<Key, ObjectNode> next() {
45-
final int recordsToGenerate = Math.min(BATCH_SIZE, _totalRecordCount - _totalDataGenerated);
48+
final int recordsToGenerate = Math.min(_batchLoadBatchSize, _totalRecordCount - _totalDataGenerated);
4649

4750
// Filter Keys in case there are duplicates
4851
final Map<Key, ObjectNode> newDocuments = _dataGenerator.generate(recordsToGenerate);

sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/linkedin/DataLoader.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,9 @@
2929
public class DataLoader {
3030
private static final Logger LOGGER = LoggerFactory.getLogger(DataLoader.class);
3131

32-
private static final int MAX_BATCH_SIZE = 20000;
33-
private static final int BULK_OPERATION_CONCURRENCY = 5;
34-
private static final Duration BULK_LOAD_WAIT_DURATION = Duration.ofSeconds(120);
32+
private static final int MAX_BATCH_SIZE = 10000;
33+
private static final int BULK_OPERATION_CONCURRENCY = 10;
34+
private static final Duration VALIDATE_DATA_WAIT_DURATION = Duration.ofSeconds(120);
3535
private static final String COUNT_ALL_QUERY = "SELECT COUNT(1) FROM c";
3636
private static final String COUNT_ALL_QUERY_RESULT_FIELD = "$1";
3737

@@ -48,11 +48,13 @@ public DataLoader(final Configuration configuration,
4848
_client = Preconditions.checkNotNull(client,
4949
"The CosmosAsyncClient needed for data loading can not be null");
5050
_dataGenerator = new DataGenerationIterator(entityConfiguration.dataGenerator(),
51-
_configuration.getNumberOfPreCreatedDocuments());
51+
_configuration.getNumberOfPreCreatedDocuments(),
52+
_configuration.getBulkloadBatchSize());
5253
}
5354

5455
public void loadData() {
55-
LOGGER.info("Starting batched data loading, loading {} documents in each iteration", DataGenerationIterator.BATCH_SIZE);
56+
LOGGER.info("Starting batched data loading, loading {} documents in each iteration",
57+
_configuration.getBulkloadBatchSize());
5658
while (_dataGenerator.hasNext()) {
5759
final Map<Key, ObjectNode> newDocuments = _dataGenerator.next();
5860
bulkCreateItems(newDocuments);
@@ -71,11 +73,14 @@ private void bulkCreateItems(final Map<Key, ObjectNode> records) {
7173
database.getId(),
7274
containerName);
7375

76+
// We want to wait longer depending on the number of documents in each iteration
77+
final Duration blockingWaitTime = Duration.ofSeconds(120 *
78+
(((_configuration.getBulkloadBatchSize() - 1) / 200000) + 1));
7479
final BulkProcessingOptions<Object> bulkProcessingOptions = new BulkProcessingOptions<>(Object.class);
7580
bulkProcessingOptions.setMaxMicroBatchSize(MAX_BATCH_SIZE)
7681
.setMaxMicroBatchConcurrency(BULK_OPERATION_CONCURRENCY);
7782
container.processBulkOperations(Flux.fromIterable(cosmosItemOperations), bulkProcessingOptions)
78-
.blockLast(BULK_LOAD_WAIT_DURATION);
83+
.blockLast(blockingWaitTime);
7984

8085
LOGGER.info("Completed loading {} documents into [{}:{}]", cosmosItemOperations.size(),
8186
database.getId(),
@@ -93,7 +98,7 @@ private void validateDataCreation(int expectedSize) {
9398
.queryItems(COUNT_ALL_QUERY, ObjectNode.class)
9499
.byPage()
95100
.collectList()
96-
.block(BULK_LOAD_WAIT_DURATION);
101+
.block(VALIDATE_DATA_WAIT_DURATION);
97102
final int resultCount = Optional.ofNullable(queryItemsResponseList)
98103
.map(responseList -> responseList.get(0))
99104
.map(FeedResponse::getResults)
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.cosmos.benchmark.linkedin;
5+
6+
import com.azure.cosmos.CosmosAsyncClient;
7+
import com.azure.cosmos.CosmosAsyncDatabase;
8+
import com.azure.cosmos.CosmosException;
9+
import com.azure.cosmos.benchmark.Configuration;
10+
import com.azure.cosmos.benchmark.linkedin.data.EntityConfiguration;
11+
import com.azure.cosmos.models.ThroughputProperties;
12+
import com.google.common.base.Preconditions;
13+
import java.time.Duration;
14+
import org.slf4j.Logger;
15+
import org.slf4j.LoggerFactory;
16+
17+
18+
/**
19+
* For local testing, the database creation needs to happen as part of the Test setup. This class
20+
* manages the database AND collection setup, and useful for ensuring database and other resources
21+
* and not left unused after local testing
22+
*/
23+
public class DatabaseResourceManager implements ResourceManager {
24+
25+
private static final Logger LOGGER = LoggerFactory.getLogger(DatabaseResourceManager.class);
26+
private static final Duration RESOURCE_CRUD_WAIT_TIME = Duration.ofSeconds(30);
27+
28+
private final Configuration _configuration;
29+
private final CosmosAsyncClient _client;
30+
private final CollectionResourceManager _collectionResourceManager;
31+
32+
public DatabaseResourceManager(final Configuration configuration,
33+
final EntityConfiguration entityConfiguration,
34+
final CosmosAsyncClient client) {
35+
Preconditions.checkNotNull(configuration,
36+
"The Workload configuration defining the parameters can not be null");
37+
Preconditions.checkNotNull(entityConfiguration,
38+
"The Test Entity specific configuration can not be null");
39+
Preconditions.checkNotNull(client, "Need a non-null client for "
40+
+ "setting up the Database and collections for the test");
41+
_configuration = configuration;
42+
_client = client;
43+
_collectionResourceManager = new CollectionResourceManager(_configuration, entityConfiguration, _client);
44+
}
45+
46+
@Override
47+
public void createResources() throws CosmosException {
48+
try {
49+
LOGGER.info("Creating database {} for the ctl workload if one doesn't exist", _configuration.getDatabaseId());
50+
final ThroughputProperties throughputProperties =
51+
ThroughputProperties.createManualThroughput(_configuration.getThroughput());
52+
_client.createDatabaseIfNotExists(_configuration.getDatabaseId(), throughputProperties)
53+
.block(RESOURCE_CRUD_WAIT_TIME);
54+
} catch (CosmosException e) {
55+
LOGGER.error("Exception while creating database {}", _configuration.getDatabaseId(), e);
56+
throw e;
57+
}
58+
59+
// Delete any existing collections/containers in this database
60+
_collectionResourceManager.deleteResources();
61+
62+
// And recreate the collections for this test
63+
_collectionResourceManager.createResources();
64+
}
65+
66+
@Override
67+
public void deleteResources() {
68+
// Followed by the main database used for testing
69+
final CosmosAsyncDatabase database = _client.getDatabase(_configuration.getDatabaseId());
70+
try {
71+
LOGGER.info("Deleting the main database {} used in this test. Collection", _configuration.getDatabaseId());
72+
database.delete()
73+
.block(RESOURCE_CRUD_WAIT_TIME);
74+
} catch (CosmosException e) {
75+
LOGGER.error("Exception deleting the database {}", _configuration.getDatabaseId(), e);
76+
throw e;
77+
}
78+
79+
LOGGER.info("Database resource cleanup completed");
80+
}
81+
}

0 commit comments

Comments
 (0)