Skip to content

Commit eb14385

Browse files
Fixing hangs when bulk ingesting via Cosmos Spark into containers with >255 physical partitions (Azure#26017)
* Fixing hangs when bulk ingesting via Cosmos Spark into containers with >255 physical partitions * Update BulkExecutor.java * Reacting to code review feedback * Reacting to CR feedback
1 parent e5b28c9 commit eb14385

File tree

7 files changed

+474
-153
lines changed

7 files changed

+474
-153
lines changed

sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/main/scala/com/azure/cosmos/spark/BulkWriter.scala

Lines changed: 100 additions & 30 deletions
Large diffs are not rendered by default.

sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/main/scala/com/azure/cosmos/spark/CosmosConfig.scala

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ private[spark] object CosmosConfigNames {
5959
val DiagnosticsMode = "spark.cosmos.diagnostics"
6060
val WriteBulkEnabled = "spark.cosmos.write.bulk.enabled"
6161
val WriteBulkMaxPendingOperations = "spark.cosmos.write.bulk.maxPendingOperations"
62+
val WriteBulkMaxConcurrentPartitions = "spark.cosmos.write.bulk.maxConcurrentCosmosPartitions"
6263
val WritePointMaxConcurrency = "spark.cosmos.write.point.maxConcurrency"
6364
val WriteStrategy = "spark.cosmos.write.strategy"
6465
val WriteMaxRetryCount = "spark.cosmos.write.maxRetryCount"
@@ -106,6 +107,7 @@ private[spark] object CosmosConfigNames {
106107
DiagnosticsMode,
107108
WriteBulkEnabled,
108109
WriteBulkMaxPendingOperations,
110+
WriteBulkMaxConcurrentPartitions,
109111
WritePointMaxConcurrency,
110112
WriteStrategy,
111113
WriteMaxRetryCount,
@@ -458,8 +460,9 @@ private object ItemWriteStrategy extends Enumeration {
458460
private case class CosmosWriteConfig(itemWriteStrategy: ItemWriteStrategy,
459461
maxRetryCount: Int,
460462
bulkEnabled: Boolean,
461-
bulkMaxPendingOperations: Option[Int] = Option.empty,
462-
pointMaxConcurrency: Option[Int] = Option.empty)
463+
bulkMaxPendingOperations: Option[Int] = None,
464+
pointMaxConcurrency: Option[Int] = None,
465+
maxConcurrentCosmosPartitions: Option[Int] = None)
463466

464467
private object CosmosWriteConfig {
465468
private val bulkEnabled = CosmosConfigEntry[Boolean](key = CosmosConfigNames.WriteBulkEnabled,
@@ -476,6 +479,18 @@ private object CosmosWriteConfig {
476479
helpMessage = s"Cosmos DB Item Write Max Pending Operations." +
477480
s" If not specified it will be determined based on the Spark executor VM Size")
478481

482+
private val bulkMaxConcurrentPartitions = CosmosConfigEntry[Int](
483+
key = CosmosConfigNames.WriteBulkMaxConcurrentPartitions,
484+
mandatory = false,
485+
parseFromStringFunction = bulkMaxConcurrencyAsString => bulkMaxConcurrencyAsString.toInt,
486+
helpMessage = s"Cosmos DB Item Write Max Concurrent Cosmos Partitions." +
487+
s" If not specified it will be determined based on the number of the container's physical partitions -" +
488+
s" which would indicate every Spark partition is expected to have data from all Cosmos physical partitions." +
489+
s" If specified it indicates from at most how many Cosmos Physical Partitions each Spark partition contains" +
490+
s" data. So this config can be used to make bulk processing more efficient when input data in Spark has been" +
491+
s" repartitioned to balance to how many Cosmos partitions each Spark partition needs to write. This is mainly" +
492+
s" useful for very large containers (with hundreds of physical partitions).")
493+
479494
private val pointWriteConcurrency = CosmosConfigEntry[Int](key = CosmosConfigNames.WritePointMaxConcurrency,
480495
mandatory = false,
481496
parseFromStringFunction = bulkMaxConcurrencyAsString => bulkMaxConcurrencyAsString.toInt,
@@ -517,7 +532,8 @@ private object CosmosWriteConfig {
517532
maxRetryCountOpt.get,
518533
bulkEnabled = bulkEnabledOpt.get,
519534
bulkMaxPendingOperations = CosmosConfigEntry.parse(cfg, bulkMaxPendingOperations),
520-
pointMaxConcurrency = CosmosConfigEntry.parse(cfg, pointWriteConcurrency))
535+
pointMaxConcurrency = CosmosConfigEntry.parse(cfg, pointWriteConcurrency),
536+
maxConcurrentCosmosPartitions = CosmosConfigEntry.parse(cfg, bulkMaxConcurrentPartitions))
521537
}
522538
}
523539

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncContainer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import com.azure.cosmos.implementation.Configs;
88
import com.azure.cosmos.implementation.Constants;
99
import com.azure.cosmos.implementation.CosmosPagedFluxOptions;
10+
import com.azure.cosmos.implementation.CosmosSchedulers;
1011
import com.azure.cosmos.implementation.Document;
1112
import com.azure.cosmos.implementation.DocumentCollection;
1213
import com.azure.cosmos.implementation.HttpConstants;
@@ -865,7 +866,7 @@ public <TContext> Flux<CosmosBulkOperationResponse<TContext>> executeBulkOperati
865866
return Flux.deferContextual(context -> {
866867
final BulkExecutor<TContext> executor = new BulkExecutor<>(this, operations, cosmosBulkExecutionOptions);
867868

868-
return executor.execute();
869+
return executor.execute().publishOn(CosmosSchedulers.BULK_EXECUTOR_BOUNDED_ELASTIC);
869870
});
870871
}
871872

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosSchedulers.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
public class CosmosSchedulers {
1010
private final static String COSMOS_PARALLEL_THREAD_NAME = "cosmos-parallel";
1111
private final static String TRANSPORT_RESPONSE_BOUNDED_ELASTIC_THREAD_NAME = "transport-response-bounded-elastic";
12+
private final static String BULK_EXECUTOR_BOUNDED_ELASTIC_THREAD_NAME = "bulk-executor-bounded-elastic";
1213
private final static int TTL_FOR_SCHEDULER_WORKER_IN_SECONDS = 60; // same as BoundedElasticScheduler.DEFAULT_TTL_SECONDS
1314

1415
// Using a custom parallel scheduler to be able to schedule retries etc.
@@ -27,4 +28,13 @@ public class CosmosSchedulers {
2728
TTL_FOR_SCHEDULER_WORKER_IN_SECONDS,
2829
true
2930
);
31+
32+
// Custom bounded elastic scheduler process bulk execution tasks
33+
public final static Scheduler BULK_EXECUTOR_BOUNDED_ELASTIC = Schedulers.newBoundedElastic(
34+
2 * Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE,
35+
Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
36+
BULK_EXECUTOR_BOUNDED_ELASTIC_THREAD_NAME,
37+
TTL_FOR_SCHEDULER_WORKER_IN_SECONDS,
38+
true
39+
);
3040
}

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,11 @@ CosmosBulkExecutionOptions setTargetedMicroBatchRetryRate(
288288

289289
int getMaxMicroBatchConcurrency(CosmosBulkExecutionOptions options);
290290

291+
Integer getMaxConcurrentCosmosPartitions(CosmosBulkExecutionOptions options);
292+
293+
CosmosBulkExecutionOptions setMaxConcurrentCosmosPartitions(
294+
CosmosBulkExecutionOptions options, int mxConcurrentCosmosPartitions);
295+
291296
Duration getMaxMicroBatchInterval(CosmosBulkExecutionOptions options);
292297
}
293298
}

0 commit comments

Comments
 (0)