Skip to content

Commit b705cbc

Browse files
BulkWriter fix for bufferTimeout issue (Azure#37072)
* Adding MaxMicroBatchSize config and siwtching BulKWriter to use bufferUntil (instead of bufferTimeout which has issues when backpressure happens) * Changelogs
1 parent 71acaae commit b705cbc

File tree

11 files changed

+317
-96
lines changed

11 files changed

+317
-96
lines changed

sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,12 @@
33
### 4.23.0-beta.1 (Unreleased)
44

55
#### Features Added
6+
* Added configuration option to control the maximum batch size used - by default the batch size is determined automatically based on the throttling rate - and will auto-adjust to meet the throughput control limits when applied. This setting is mostly added to simplify Spark 2.4 migrations where it was possible to specify a fixed batch size. This setting should only be used when not enabling throughput control - and for new workloads not being migrated from Spark 2.4 using throughput control is preferred. See [PR 37072](https://github.com/Azure/azure-sdk-for-java/pull/37072)
67

78
#### Breaking Changes
89

910
#### Bugs Fixed
11+
* Fixed an issue with backpressure when using WriteStrategy `ItemBulkUpdate` - with this write strategy a Reactor operator `bufferTimeout` was used, which has issues when backpressure happens and can result in an error `verflowException: Could not emit buffer due to lack of requests`. See [PR 37072](https://github.com/Azure/azure-sdk-for-java/pull/37072)
1012

1113
#### Other Changes
1214

sdk/cosmos/azure-cosmos-spark_3-2_2-12/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,12 @@
33
### 4.23.0-beta.1 (Unreleased)
44

55
#### Features Added
6+
* Added configuration option to control the maximum batch size used - by default the batch size is determined automatically based on the throttling rate - and will auto-adjust to meet the throughput control limits when applied. This setting is mostly added to simplify Spark 2.4 migrations where it was possible to specify a fixed batch size. This setting should only be used when not enabling throughput control - and for new workloads not being migrated from Spark 2.4 using throughput control is preferred. See [PR 37072](https://github.com/Azure/azure-sdk-for-java/pull/37072)
67

78
#### Breaking Changes
89

910
#### Bugs Fixed
11+
* Fixed an issue with backpressure when using WriteStrategy `ItemBulkUpdate` - with this write strategy a Reactor operator `bufferTimeout` was used, which has issues when backpressure happens and can result in an error `verflowException: Could not emit buffer due to lack of requests`. See [PR 37072](https://github.com/Azure/azure-sdk-for-java/pull/37072)
1012

1113
#### Other Changes
1214

sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,12 @@
33
### 4.23.0-beta.1 (Unreleased)
44

55
#### Features Added
6+
* Added configuration option to control the maximum batch size used - by default the batch size is determined automatically based on the throttling rate - and will auto-adjust to meet the throughput control limits when applied. This setting is mostly added to simplify Spark 2.4 migrations where it was possible to specify a fixed batch size. This setting should only be used when not enabling throughput control - and for new workloads not being migrated from Spark 2.4 using throughput control is preferred. See [PR 37072](https://github.com/Azure/azure-sdk-for-java/pull/37072)
67

78
#### Breaking Changes
89

910
#### Bugs Fixed
11+
* Fixed an issue with backpressure when using WriteStrategy `ItemBulkUpdate` - with this write strategy a Reactor operator `bufferTimeout` was used, which has issues when backpressure happens and can result in an error `verflowException: Could not emit buffer due to lack of requests`. See [PR 37072](https://github.com/Azure/azure-sdk-for-java/pull/37072)
1012

1113
#### Other Changes
1214

sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,12 @@
33
### 4.23.0-beta.1 (Unreleased)
44

55
#### Features Added
6+
* Added configuration option to control the maximum batch size used - by default the batch size is determined automatically based on the throttling rate - and will auto-adjust to meet the throughput control limits when applied. This setting is mostly added to simplify Spark 2.4 migrations where it was possible to specify a fixed batch size. This setting should only be used when not enabling throughput control - and for new workloads not being migrated from Spark 2.4 using throughput control is preferred. See [PR 37072](https://github.com/Azure/azure-sdk-for-java/pull/37072)
67

78
#### Breaking Changes
89

910
#### Bugs Fixed
11+
* Fixed an issue with backpressure when using WriteStrategy `ItemBulkUpdate` - with this write strategy a Reactor operator `bufferTimeout` was used, which has issues when backpressure happens and can result in an error `verflowException: Could not emit buffer due to lack of requests`. See [PR 37072](https://github.com/Azure/azure-sdk-for-java/pull/37072)
1012

1113
#### Other Changes
1214

sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/BulkWriter.scala

Lines changed: 204 additions & 60 deletions
Large diffs are not rendered by default.

sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosConfig.scala

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ private[spark] object CosmosConfigNames {
8080
val ClientTelemetryEndpoint = "spark.cosmos.clientTelemetry.endpoint"
8181
val WriteBulkEnabled = "spark.cosmos.write.bulk.enabled"
8282
val WriteBulkMaxPendingOperations = "spark.cosmos.write.bulk.maxPendingOperations"
83+
val WriteBulkMaxBatchSize = "spark.cosmos.write.bulk.maxBatchSize"
8384
val WriteBulkMaxConcurrentPartitions = "spark.cosmos.write.bulk.maxConcurrentCosmosPartitions"
8485
val WriteBulkPayloadSizeInBytes = "spark.cosmos.write.bulk.targetedPayloadSizeInBytes"
8586
val WriteBulkInitialBatchSize = "spark.cosmos.write.bulk.initialBatchSize"
@@ -169,6 +170,7 @@ private[spark] object CosmosConfigNames {
169170
WriteBulkMaxConcurrentPartitions,
170171
WriteBulkPayloadSizeInBytes,
171172
WriteBulkInitialBatchSize,
173+
WriteBulkMaxBatchSize,
172174
WritePointMaxConcurrency,
173175
WritePatchDefaultOperationType,
174176
WritePatchColumnConfigs,
@@ -823,7 +825,8 @@ private case class CosmosWriteConfig(itemWriteStrategy: ItemWriteStrategy,
823825
patchConfigs: Option[CosmosPatchConfigs] = None,
824826
throughputControlConfig: Option[CosmosThroughputControlConfig] = None,
825827
maxMicroBatchPayloadSizeInBytes: Option[Int] = None,
826-
initialMicroBatchSize: Option[Int] = None)
828+
initialMicroBatchSize: Option[Int] = None,
829+
maxMicroBatchSize: Option[Int] = None)
827830

828831
private object CosmosWriteConfig {
829832
private val DefaultMaxRetryCount = 10
@@ -854,6 +857,17 @@ private object CosmosWriteConfig {
854857
"initial micro batch size is 100. Reduce this when you want to avoid that the first few requests consume " +
855858
"too many RUs.")
856859

860+
private val maxMicroBatchSize = CosmosConfigEntry[Int](key = CosmosConfigNames.WriteBulkMaxBatchSize,
861+
defaultValue = Option.apply(BatchRequestResponseConstants.MAX_OPERATIONS_IN_DIRECT_MODE_BATCH_REQUEST),
862+
mandatory = false,
863+
parseFromStringFunction = maxBatchSizeString => Math.min(maxBatchSizeString.toInt, BatchRequestResponseConstants.MAX_OPERATIONS_IN_DIRECT_MODE_BATCH_REQUEST),
864+
helpMessage = "Cosmos DB max bulk micro batch size - a micro batch will be flushed to the backend " +
865+
"when the number of documents enqueued exceeds this size - or the target payload size is met. The micro batch " +
866+
"size is getting automatically tuned based on the throttling rate. By default the " +
867+
"max micro batch size is 100. Reduce this when you want to avoid that requests consume " +
868+
"too many RUs and you cannot enable thoughput control. NOTE: using throuhgput control is preferred and will." +
869+
"result in better throughput while still limiting the RU/s used.")
870+
857871
private val bulkMaxPendingOperations = CosmosConfigEntry[Int](key = CosmosConfigNames.WriteBulkMaxPendingOperations,
858872
mandatory = false,
859873
parseFromStringFunction = bulkMaxConcurrencyAsString => bulkMaxConcurrencyAsString.toInt,
@@ -1066,6 +1080,7 @@ private object CosmosWriteConfig {
10661080
val throughputControlConfigOpt = CosmosThroughputControlConfig.parseThroughputControlConfig(cfg)
10671081
val microBatchPayloadSizeInBytesOpt = CosmosConfigEntry.parse(cfg, microBatchPayloadSizeInBytes)
10681082
val initialBatchSizeOpt = CosmosConfigEntry.parse(cfg, initialMicroBatchSize)
1083+
val maxBatchSizeOpt = CosmosConfigEntry.parse(cfg, maxMicroBatchSize)
10691084

10701085
assert(bulkEnabledOpt.isDefined)
10711086

@@ -1095,7 +1110,8 @@ private object CosmosWriteConfig {
10951110
patchConfigs = patchConfigsOpt,
10961111
throughputControlConfig = throughputControlConfigOpt,
10971112
maxMicroBatchPayloadSizeInBytes = microBatchPayloadSizeInBytesOpt,
1098-
initialMicroBatchSize = initialBatchSizeOpt)
1113+
initialMicroBatchSize = initialBatchSizeOpt,
1114+
maxMicroBatchSize = maxBatchSizeOpt)
10991115
}
11001116

11011117
def parsePatchColumnConfigs(cfg: Map[String, String], inputSchema: StructType): TrieMap[String, CosmosPatchColumnConfig] = {

sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EWriteITest.scala

Lines changed: 49 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -25,44 +25,35 @@ class SparkE2EWriteITest
2525
//scalastyle:off magic.number
2626
//scalastyle:off null
2727

28-
private case class UpsertParameterTest(bulkEnabled: Boolean, itemWriteStrategy: ItemWriteStrategy, hasId: Boolean = true, initialBatchSize: Option[Int] = None)
28+
private case class UpsertParameterTest(
29+
bulkEnabled: Boolean,
30+
itemWriteStrategy: ItemWriteStrategy,
31+
hasId: Boolean = true,
32+
initialBatchSize: Option[Int] = None,
33+
maxBatchSize: Option[Int] = None)
2934

3035
private val upsertParameterTest = Seq(
31-
UpsertParameterTest(bulkEnabled = true, itemWriteStrategy = ItemWriteStrategy.ItemOverwrite, initialBatchSize = None),
32-
UpsertParameterTest(bulkEnabled = true, itemWriteStrategy = ItemWriteStrategy.ItemOverwrite, initialBatchSize = Some(1)),
33-
UpsertParameterTest(bulkEnabled = false, itemWriteStrategy = ItemWriteStrategy.ItemOverwrite, initialBatchSize = None),
34-
UpsertParameterTest(bulkEnabled = false, itemWriteStrategy = ItemWriteStrategy.ItemAppend, initialBatchSize = None)
36+
UpsertParameterTest(bulkEnabled = true, itemWriteStrategy = ItemWriteStrategy.ItemOverwrite, initialBatchSize = None, maxBatchSize = None),
37+
UpsertParameterTest(bulkEnabled = true, itemWriteStrategy = ItemWriteStrategy.ItemOverwrite, initialBatchSize = Some(1), maxBatchSize = None),
38+
UpsertParameterTest(bulkEnabled = true, itemWriteStrategy = ItemWriteStrategy.ItemOverwrite, initialBatchSize = Some(1), maxBatchSize = Some(5)),
39+
UpsertParameterTest(bulkEnabled = false, itemWriteStrategy = ItemWriteStrategy.ItemOverwrite, initialBatchSize = None, maxBatchSize = None),
40+
UpsertParameterTest(bulkEnabled = false, itemWriteStrategy = ItemWriteStrategy.ItemAppend, initialBatchSize = None, maxBatchSize = None)
3541
)
3642

37-
for (UpsertParameterTest(bulkEnabled, itemWriteStrategy, hasId, initialBatchSize) <- upsertParameterTest) {
38-
it should s"support upserts with bulkEnabled = $bulkEnabled itemWriteStrategy = $itemWriteStrategy hasId = $hasId initialBatchSize = $initialBatchSize" in {
43+
for (UpsertParameterTest(bulkEnabled, itemWriteStrategy, hasId, initialBatchSize, maxBatchSize) <- upsertParameterTest) {
44+
it should s"support upserts with bulkEnabled = $bulkEnabled itemWriteStrategy = $itemWriteStrategy hasId = $hasId initialBatchSize = $initialBatchSize, maxBatchSize = $maxBatchSize" in {
3945
val cosmosEndpoint = TestConfigurations.HOST
4046
val cosmosMasterKey = TestConfigurations.MASTER_KEY
4147

42-
val cfg = {
43-
44-
initialBatchSize match {
45-
case Some(customInitialBatchSize) =>
46-
Map(
47-
"spark.cosmos.accountEndpoint" -> cosmosEndpoint,
48-
"spark.cosmos.accountKey" -> cosmosMasterKey,
49-
"spark.cosmos.database" -> cosmosDatabase,
50-
"spark.cosmos.container" -> cosmosContainer,
51-
"spark.cosmos.serialization.inclusionMode" -> "NonDefault",
52-
"spark.cosmos.write.bulk.initialBatchSize" -> customInitialBatchSize.toString,
53-
)
54-
case None =>
55-
Map (
56-
"spark.cosmos.accountEndpoint" -> cosmosEndpoint,
57-
"spark.cosmos.accountKey" -> cosmosMasterKey,
58-
"spark.cosmos.database" -> cosmosDatabase,
59-
"spark.cosmos.container" -> cosmosContainer,
60-
"spark.cosmos.serialization.inclusionMode" -> "NonDefault"
61-
)
62-
}
63-
}
48+
val configMapBuilder = scala.collection.mutable.Map(
49+
"spark.cosmos.accountEndpoint" -> cosmosEndpoint,
50+
"spark.cosmos.accountKey" -> cosmosMasterKey,
51+
"spark.cosmos.database" -> cosmosDatabase,
52+
"spark.cosmos.container" -> cosmosContainer,
53+
"spark.cosmos.serialization.inclusionMode" -> "NonDefault"
54+
)
6455

65-
val cfgOverwrite = Map("spark.cosmos.accountEndpoint" -> cosmosEndpoint,
56+
val configOverrideMapBuilder = scala.collection.mutable.Map("spark.cosmos.accountEndpoint" -> cosmosEndpoint,
6657
"spark.cosmos.accountKey" -> cosmosMasterKey,
6758
"spark.cosmos.database" -> cosmosDatabase,
6859
"spark.cosmos.container" -> cosmosContainer,
@@ -71,6 +62,34 @@ class SparkE2EWriteITest
7162
"spark.cosmos.serialization.inclusionMode" -> "NonDefault"
7263
)
7364

65+
initialBatchSize match {
66+
case Some(customInitialBatchSize) =>
67+
configMapBuilder += (
68+
"spark.cosmos.write.bulk.initialBatchSize" -> customInitialBatchSize.toString,
69+
)
70+
71+
configOverrideMapBuilder += (
72+
"spark.cosmos.write.bulk.initialBatchSize" -> customInitialBatchSize.toString,
73+
)
74+
case None =>
75+
}
76+
77+
maxBatchSize match {
78+
case Some(customMaxBatchSize) =>
79+
configMapBuilder += (
80+
"spark.cosmos.write.bulk.maxBatchSize" -> customMaxBatchSize.toString,
81+
)
82+
83+
configOverrideMapBuilder += (
84+
"spark.cosmos.write.bulk.maxBatchSize" -> customMaxBatchSize.toString,
85+
)
86+
case None =>
87+
}
88+
89+
val cfg = configMapBuilder.toMap
90+
91+
val cfgOverwrite = configOverrideMapBuilder.toMap
92+
7493
val newSpark = getSpark
7594

7695
// scalastyle:off underscore.import

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -452,6 +452,9 @@ CosmosBulkExecutionOptions setHeader(CosmosBulkExecutionOptions cosmosBulkExecut
452452

453453
Map<String, String> getCustomOptions(CosmosBulkExecutionOptions cosmosBulkExecutionOptions);
454454
List<String> getExcludeRegions(CosmosBulkExecutionOptions cosmosBulkExecutionOptions);
455+
int getMaxMicroBatchSize(CosmosBulkExecutionOptions cosmosBulkExecutionOptions);
456+
457+
void setMaxMicroBatchSize(CosmosBulkExecutionOptions cosmosBulkExecutionOptions, int maxMicroBatchSize);
455458
}
456459
}
457460

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/BulkExecutorUtil.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ static ServerOperationBatchRequest createBatchRequest(List<CosmosItemOperation>
4242
partitionKeyRangeId,
4343
operations,
4444
maxMicroBatchPayloadSizeInBytes,
45-
BatchRequestResponseConstants.MAX_OPERATIONS_IN_DIRECT_MODE_BATCH_REQUEST);
45+
Math.min(operations.size(), BatchRequestResponseConstants.MAX_OPERATIONS_IN_DIRECT_MODE_BATCH_REQUEST));
4646
}
4747

4848
static void setRetryPolicyForBulk(

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/PartitionScopeThresholds.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ public class PartitionScopeThresholds {
2828
private final double minRetryRate;
2929
private final double maxRetryRate;
3030
private final double avgRetryRate;
31+
private final int maxMicroBatchSize;
3132

3233
public PartitionScopeThresholds(String pkRangeId, CosmosBulkExecutionOptions options) {
3334
checkNotNull(pkRangeId, "expected non-null pkRangeId");
@@ -46,6 +47,11 @@ public PartitionScopeThresholds(String pkRangeId, CosmosBulkExecutionOptions opt
4647
.getCosmosBulkExecutionOptionsAccessor()
4748
.getMaxTargetedMicroBatchRetryRate(options);
4849
this.avgRetryRate = ((this.maxRetryRate + this.minRetryRate)/2);
50+
this.maxMicroBatchSize = Math.min(
51+
ImplementationBridgeHelpers.CosmosBulkExecutionOptionsHelper
52+
.getCosmosBulkExecutionOptionsAccessor()
53+
.getMaxMicroBatchSize(options),
54+
BatchRequestResponseConstants.MAX_OPERATIONS_IN_DIRECT_MODE_BATCH_REQUEST);
4955
}
5056

5157
public String getPartitionKeyRangeId() {
@@ -103,12 +109,12 @@ private void reevaluateThresholds(
103109
int microBatchSizeBefore = this.targetMicroBatchSize.get();
104110
int microBatchSizeAfter = microBatchSizeBefore;
105111

106-
if (retryRate < this.minRetryRate && microBatchSizeBefore < BatchRequestResponseConstants.MAX_OPERATIONS_IN_DIRECT_MODE_BATCH_REQUEST) {
112+
if (retryRate < this.minRetryRate && microBatchSizeBefore < maxMicroBatchSize) {
107113
int targetedNewBatchSize = Math.min(
108114
Math.min(
109115
microBatchSizeBefore * 2,
110-
microBatchSizeBefore + (int)(BatchRequestResponseConstants.MAX_OPERATIONS_IN_DIRECT_MODE_BATCH_REQUEST * this.avgRetryRate)),
111-
BatchRequestResponseConstants.MAX_OPERATIONS_IN_DIRECT_MODE_BATCH_REQUEST);
116+
microBatchSizeBefore + (int)(maxMicroBatchSize * this.avgRetryRate)),
117+
maxMicroBatchSize);
112118
if (this.targetMicroBatchSize.compareAndSet(microBatchSizeBefore, targetedNewBatchSize)) {
113119
microBatchSizeAfter = targetedNewBatchSize;
114120
}

0 commit comments

Comments
 (0)