Skip to content

Commit 79b2c9b

Browse files
authored
updated cosmos spark configs (Azure#21004)
updated cosmos spark configs
1 parent 67c297e commit 79b2c9b

19 files changed

+130
-103
lines changed

sdk/cosmos/azure-cosmos-spark_3-1_2-12/Samples/Pyspark Sample.ipynb renamed to sdk/cosmos/azure-cosmos-spark_3-1_2-12/Samples/Pyspark-Sample.ipynb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
" \"spark.cosmos.accountKey\" : cosmosMasterKey,\n",
3232
" \"spark.cosmos.database\" : cosmosDatabaseName,\n",
3333
" \"spark.cosmos.container\" : cosmosContainerName,\n",
34-
" \"spark.cosmos.read.inferSchemaEnabled\" : \"true\" \n",
34+
" \"spark.cosmos.read.inferSchema.enabled\" : \"true\" \n",
3535
"}"
3636
]
3737
},

sdk/cosmos/azure-cosmos-spark_3-1_2-12/Samples/Python/NYC-Taxi-Data/01_Batch.ipynb

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -236,8 +236,8 @@
236236
" \"spark.cosmos.database\": \"SampleDatabase\",\n",
237237
" \"spark.cosmos.container\": \"GreenTaxiRecords\",\n",
238238
" \"spark.cosmos.write.strategy\": \"ItemOverwrite\",\n",
239-
" \"spark.cosmos.write.bulkEnabled\": \"true\",\n",
240-
" \"spark.cosmos.throughputControlEnabled\": \"true\",\n",
239+
" \"spark.cosmos.write.bulk.enabled\": \"true\",\n",
240+
" \"spark.cosmos.throughputControl.enabled\": \"true\",\n",
241241
" \"spark.cosmos.throughputControl.name\": \"NYCGreenTaxiDataIngestion\",\n",
242242
" \"spark.cosmos.throughputControl.targetThroughputThreshold\": \"0.95\",\n",
243243
" \"spark.cosmos.throughputControl.globalControl.database\": \"SampleDatabase\",\n",
@@ -321,7 +321,7 @@
321321
" \"spark.cosmos.database\": \"SampleDatabase\",\n",
322322
" \"spark.cosmos.container\": \"GreenTaxiRecords\",\n",
323323
" \"spark.cosmos.partitioning.strategy\": \"Default\",\n",
324-
" \"spark.cosmos.read.inferSchemaEnabled\" : \"false\"\n",
324+
" \"spark.cosmos.read.inferSchema.enabled\" : \"false\"\n",
325325
"}\n",
326326
"\n",
327327
"query_df = spark.read.format(\"cosmos.items\").options(**readCfg).load()\n",
@@ -366,7 +366,7 @@
366366
" \"spark.cosmos.database\": \"SampleDatabase\",\n",
367367
" \"spark.cosmos.container\": \"GreenTaxiRecords\",\n",
368368
" \"spark.cosmos.partitioning.strategy\": \"Default\",\n",
369-
" \"spark.cosmos.read.inferSchemaEnabled\" : \"false\",\n",
369+
" \"spark.cosmos.read.inferSchema.enabled\" : \"false\",\n",
370370
" \"spark.cosmos.changeFeed.startFrom\" : \"Beginning\",\n",
371371
" \"spark.cosmos.changeFeed.mode\" : \"Incremental\"\n",
372372
"}\n",
@@ -506,7 +506,7 @@
506506
"OPTIONS (\n",
507507
" spark.cosmos.database = 'SampleDatabase',\n",
508508
" spark.cosmos.container = 'GreenTaxiRecords',\n",
509-
" spark.cosmos.read.inferSchemaEnabled = 'False',\n",
509+
" spark.cosmos.read.inferSchema.enabled = 'False',\n",
510510
" spark.cosmos.read.inferSchemaIncludeSystemProperties = 'True',\n",
511511
" spark.cosmos.partitioning.strategy = 'Aggressive');\n",
512512
"\n",
@@ -547,7 +547,7 @@
547547
"OPTIONS (\n",
548548
" spark.cosmos.database = 'SampleDatabase',\n",
549549
" spark.cosmos.container = 'GreenTaxiRecords',\n",
550-
" spark.cosmos.read.inferSchemaEnabled = 'True',\n",
550+
" spark.cosmos.read.inferSchema.enabled = 'True',\n",
551551
" spark.cosmos.read.inferSchemaIncludeSystemProperties = 'False',\n",
552552
" spark.cosmos.partitioning.strategy = 'Restrictive');\n",
553553
"\n",

sdk/cosmos/azure-cosmos-spark_3-1_2-12/Samples/Python/NYC-Taxi-Data/02_StructuredStreaming.ipynb

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -243,8 +243,8 @@
243243
" \"spark.cosmos.database\": \"SampleDatabase\",\n",
244244
" \"spark.cosmos.container\": \"GreenTaxiRecords\",\n",
245245
" \"spark.cosmos.partitioning.strategy\": \"Default\",\n",
246-
" \"spark.cosmos.read.inferSchemaEnabled\" : \"true\",\n",
247-
" \"spark.cosmos.read.inferSchemaForceNullableProperties\" : \"true\",\n",
246+
" \"spark.cosmos.read.inferSchema.enabled\" : \"true\",\n",
247+
" \"spark.cosmos.read.inferSchema.forceNullableProperties\" : \"true\",\n",
248248
" \"spark.cosmos.changeFeed.startFrom\" : \"Beginning\",\n",
249249
" \"spark.cosmos.changeFeed.mode\" : \"Incremental\"\n",
250250
" #\"spark.cosmos.changeFeed.maxItemCountPerTriggerHint\" : \"50000\"\n",
@@ -256,7 +256,7 @@
256256
" \"spark.cosmos.database\": \"SampleDatabase\",\n",
257257
" \"spark.cosmos.container\": \"GreenTaxiRecordsCFSink\",\n",
258258
" \"spark.cosmos.write.strategy\": \"ItemOverwrite\",\n",
259-
" \"spark.cosmos.write.bulkEnabled\": \"true\",\n",
259+
" \"spark.cosmos.write.bulk.enabled\": \"true\",\n",
260260
" \"checkpointLocation\": \"/tmp/\" + runId + \"/\"\n",
261261
"}\n",
262262
"\n",
@@ -337,7 +337,7 @@
337337
"OPTIONS (\n",
338338
" spark.cosmos.database = 'SampleDatabase',\n",
339339
" spark.cosmos.container = 'GreenTaxiRecordsCFSink',\n",
340-
" spark.cosmos.read.inferSchemaEnabled = 'False',\n",
340+
" spark.cosmos.read.inferSchema.enabled = 'False',\n",
341341
" spark.cosmos.partitioning.strategy = 'Default');\n",
342342
"\n",
343343
"SELECT COUNT(*) FROM cosmosCatalog.SampleDatabase.GreenTaxiRecordsCFSinkView"

sdk/cosmos/azure-cosmos-spark_3-1_2-12/Samples/Scala Sample.scala renamed to sdk/cosmos/azure-cosmos-spark_3-1_2-12/Samples/Scala-Sample.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ val cfgWithAutoSchemaInference = Map("spark.cosmos.accountEndpoint" -> cosmosEnd
1515
"spark.cosmos.accountKey" -> cosmosMasterKey,
1616
"spark.cosmos.database" -> cosmosDatabaseName,
1717
"spark.cosmos.container" -> cosmosContainerName,
18-
"spark.cosmos.read.inferSchemaEnabled" -> "true"
18+
"spark.cosmos.read.inferSchema.enabled" -> "true"
1919
)
2020

2121
// COMMAND ----------

sdk/cosmos/azure-cosmos-spark_3-1_2-12/docs/configuration-reference.md

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,17 @@ Configuration Reference:
2020
| `spark.cosmos.useGatewayMode` | `false` | Use gateway mode for the client operations |
2121
| `spark.cosmos.read.forceEventualConsistency` | `true` | Makes the client use Eventual consistency for read operations instead of using the default account level consistency |
2222
| `spark.cosmos.applicationName` | None | Application name |
23-
| `spark.cosmos.preferredRegionsList` | None | Preferred regions list to be used for a multi region Cosmos DB account. This is a comma separated value (e.g., `[eastus,westus]`) provided preferred regions will be used as hint. You should use a collocated spark cluster with your Cosmos DB account and pass the spark cluster region as preferred region. See list of azure regions [here](https://docs.microsoft.com/dotnet/api/microsoft.azure.documents.locationnames?view=azure-dotnet) |
23+
| `spark.cosmos.preferredRegionsList` | None | Preferred regions list to be used for a multi region Cosmos DB account. This is a comma separated value (e.g., `[East US, West US]` or `East US, West US`) provided preferred regions will be used as hint. You should use a collocated spark cluster with your Cosmos DB account and pass the spark cluster region as preferred region. See list of azure regions [here](https://docs.microsoft.com/dotnet/api/microsoft.azure.documents.locationnames?view=azure-dotnet) |
2424

2525
### Write Config
2626

2727
| Config Property Name | Default | Description |
2828
| :--- | :---- | :--- |
29-
| `spark.cosmos.write.strategy` | `ItemOverwrite` | Cosmos DB Item write Strategy: ItemOverwrite (using upsert), ItemAppend (using create, ignore Conflicts) |
30-
| `spark.cosmos.write.maxRetryCount` | `3` | Cosmos DB Write Max Retry Attempts on failure |
31-
| `spark.cosmos.write.maxConcurrency` | None | Cosmos DB Item Write Max concurrency. If not specified it will be determined based on the Spark executor VM Size |
32-
| `spark.cosmos.write.bulkEnabled` | `true` | Cosmos DB Item Write bulk enabled |
29+
| `spark.cosmos.write.strategy` | `ItemOverwrite` | Cosmos DB Item write Strategy: `ItemOverwrite` (using upsert), `ItemAppend` (using create, ignore pre-existing items i.e., Conflicts) |
30+
| `spark.cosmos.write.maxRetryCount` | `10` | Cosmos DB Write Max Retry Attempts on retriable failures (e.g., connection error, moderakh add more details) |
31+
| `spark.cosmos.write.point.maxConcurrency` | None | Cosmos DB Item Write Max concurrency. If not specified it will be determined based on the Spark executor VM Size |
32+
| `spark.cosmos.write.bulk.maxPendingOperations` | None | Cosmos DB Item Write Max concurrency. If not specified it will be determined based on the Spark executor VM Size |
33+
| `spark.cosmos.write.bulk.enabled` | `true` | Cosmos DB Item Write bulk enabled |
3334

3435
### Query Config
3536

@@ -39,19 +40,19 @@ When doing read operations, users can specify a custom schema or allow the conne
3940

4041
| Config Property Name | Default | Description |
4142
| :--- | :---- | :--- |
42-
| `spark.cosmos.read.inferSchemaEnabled` | `true` | When schema inference is disabled and user is not providing a schema, raw json will be returned. |
43-
| `spark.cosmos.read.inferSchemaQuery` | `SELECT * FROM r` | When schema inference is enabled, used as custom query to infer it. For example, if you store multiple entities with different schemas within a container and you want to ensure inference only looks at certain document types or you want to project only particular columns. |
44-
| `spark.cosmos.read.inferSchemaSamplingSize` | `1000` | Sampling size to use when inferring schema and not using a query. |
45-
| `spark.cosmos.read.inferSchemaIncludeSystemProperties` | `false` | When schema inference is enabled, whether the resulting schema will include all [Cosmos DB system properties](https://docs.microsoft.com/azure/cosmos-db/account-databases-containers-items#properties-of-an-item). |
46-
| `spark.cosmos.read.inferSchemaIncludeTimestamp` | `false` | When schema inference is enabled, whether the resulting schema will include the document Timestamp (`_ts`). Not required if `spark.cosmos.read.inferSchemaIncludeSystemProperties` is enabled, as it will already include all system properties. |
43+
| `spark.cosmos.read.inferSchema.enabled` | `true` | When schema inference is disabled and user is not providing a schema, raw json will be returned. |
44+
| `spark.cosmos.read.inferSchema.query` | `SELECT * FROM r` | When schema inference is enabled, used as custom query to infer it. For example, if you store multiple entities with different schemas within a container and you want to ensure inference only looks at certain document types or you want to project only particular columns. |
45+
| `spark.cosmos.read.inferSchema.samplingSize` | `1000` | Sampling size to use when inferring schema and not using a query. |
46+
| `spark.cosmos.read.inferSchema.includeSystemProperties` | `false` | When schema inference is enabled, whether the resulting schema will include all [Cosmos DB system properties](https://docs.microsoft.com/azure/cosmos-db/account-databases-containers-items#properties-of-an-item). |
47+
| `spark.cosmos.read.inferSchema.includeTimestamp` | `false` | When schema inference is enabled, whether the resulting schema will include the document Timestamp (`_ts`). Not required if `spark.cosmos.read.inferSchema.includeSystemProperties` is enabled, as it will already include all system properties. |
4748

4849
#### Json conversion configuration
4950

50-
When reading json documents, if a document contains an attribute that does not map to the schema type, the user can decide whether to use a `null` value (Relaxed) or an exception (Strict).
5151

5252
| Config Property Name | Default | Description |
5353
| :--- | :---- | :--- |
54-
| `spark.cosmos.read.schemaConversionMode` | `Relaxed` | The schema conversion behavior (Relaxed, Strict) |
54+
| `spark.cosmos.read.schemaConversionMode` | `Relaxed` | The schema conversion behavior (`Relaxed`, `Strict`). When reading json documents, if a document contains an attribute that does not map to the schema type, the user can decide whether to use a `null` value (Relaxed) or an exception (Strict).
55+
|
5556

5657
#### Partitioning Strategy Config
5758

@@ -65,7 +66,7 @@ When reading json documents, if a document contains an attribute that does not m
6566

6667
| Config Property Name | Default | Description |
6768
| :--- | :---- | :--- |
68-
| `spark.cosmos.throughputControlEnabled` | `false` | Whether throughput control is enabled |
69+
| `spark.cosmos.throughputControl.enabled` | `false` | Whether throughput control is enabled |
6970
| `spark.cosmos.throughputControl.name` | None | Throughput control group name |
7071
| `spark.cosmos.throughputControl.targetThroughput` | None | Throughput control group target throughput |
7172
| `spark.cosmos.throughputControl.targetThroughputThreshold` | None | Throughput control group target throughput threshold |

sdk/cosmos/azure-cosmos-spark_3-1_2-12/docs/quick-start.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ see [Write Configuration](https://github.com/Azure/azure-sdk-for-java/blob/maste
102102
from pyspark.sql.functions import col
103103

104104
df = spark.read.format("cosmos.items").options(**cfg)\
105-
.option("spark.cosmos.read.inferSchemaEnabled", "true")\
105+
.option("spark.cosmos.read.inferSchema.enabled", "true")\
106106
.load()
107107

108108
df.filter(col("isAlive") == True)\
@@ -112,7 +112,7 @@ df.filter(col("isAlive") == True)\
112112
see [Query Configuration](https://github.com/Azure/azure-sdk-for-java/blob/master/sdk/cosmos/azure-cosmos-spark_3-1_2-12/docs/configuration-reference.md#query-config) for more detail.
113113

114114
Note when running queries unless if are interested to get back the raw json payload
115-
we recommend setting `spark.cosmos.read.inferSchemaEnabled` to be `true`.
115+
we recommend setting `spark.cosmos.read.inferSchema.enabled` to be `true`.
116116

117117
see [Schema Inference Configuration](https://github.com/Azure/azure-sdk-for-java/blob/master/sdk/cosmos/azure-cosmos-spark_3-1_2-12/docs/configuration-reference.md#schema-inference-config) for more detail.
118118

@@ -122,7 +122,7 @@ see [Schema Inference Configuration](https://github.com/Azure/azure-sdk-for-java
122122
```python
123123
# Show the inferred schema from Cosmos DB
124124
df = spark.read.format("cosmos.items").options(**cfg)\
125-
.option("spark.cosmos.read.inferSchemaEnabled", "true")\
125+
.option("spark.cosmos.read.inferSchema.enabled", "true")\
126126
.load()
127127

128128
df.printSchema()

sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/main/scala/com/azure/cosmos/spark/BulkWriter.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ package com.azure.cosmos.spark
44

55
import com.azure.cosmos.implementation.guava25.base.Preconditions
66
import com.azure.cosmos.models.PartitionKey
7-
import com.azure.cosmos.spark.BulkWriter.MaxNumberOfThreadsPerCPUCore
7+
import com.azure.cosmos.spark.BulkWriter.DefaultMaxPendingOperationPerCore
88
import com.azure.cosmos.{BulkOperations, CosmosAsyncContainer, CosmosBulkOperationResponse, CosmosException, CosmosItemOperation}
99
import com.fasterxml.jackson.databind.node.ObjectNode
1010
import reactor.core.Disposable
@@ -34,8 +34,8 @@ class BulkWriter(container: CosmosAsyncContainer,
3434

3535
// TODO: moderakh this requires tuning.
3636
// TODO: moderakh should we do a max on the max memory to ensure we don't run out of memory?
37-
private val maxConcurrency = writeConfig.maxConcurrencyOpt
38-
.getOrElse(SparkUtils.getNumberOfHostCPUCores * MaxNumberOfThreadsPerCPUCore)
37+
private val maxPendingOperations = writeConfig.bulkMaxPendingOperations
38+
.getOrElse(SparkUtils.getNumberOfHostCPUCores * DefaultMaxPendingOperationPerCore)
3939

4040
private val closed = new AtomicBoolean(false)
4141
private val lock = new ReentrantLock
@@ -57,7 +57,7 @@ class BulkWriter(container: CosmosAsyncContainer,
5757
// TODO: moderakh once that is added in the core SDK, drop activeOperations and rely on the core SDK
5858
// context passing for bulk
5959
private val activeOperations = new TrieMap[CosmosItemOperation, OperationContext]()
60-
private val semaphore = new Semaphore(maxConcurrency)
60+
private val semaphore = new Semaphore(maxPendingOperations)
6161

6262
private val totalScheduledMetrics = new AtomicLong(0)
6363
private val totalSuccessfulIngestionMetrics = new AtomicLong(0)
@@ -220,7 +220,7 @@ class BulkWriter(container: CosmosAsyncContainer,
220220

221221
assume(activeTasks.get() == 0)
222222
assume(activeOperations.isEmpty)
223-
assume(semaphore.availablePermits() == maxConcurrency)
223+
assume(semaphore.availablePermits() == maxPendingOperations)
224224

225225
logInfo(s"flushAndClose completed with no error. " +
226226
s"totalSuccessfulIngestionMetrics=${totalSuccessfulIngestionMetrics.get()}, totalScheduled=${totalScheduledMetrics}")
@@ -286,7 +286,7 @@ private object BulkWriter {
286286
// hence we want 2MB/ 1KB items per partition to be buffered
287287
// 2 * 1024 * 167 items should get buffered on a 16 CPU core VM
288288
// so per CPU core we want (2 * 1024 * 167 / 16) max items to be buffered
289-
val MaxNumberOfThreadsPerCPUCore = 2 * 1024 * 167 / 16
289+
val DefaultMaxPendingOperationPerCore = 2 * 1024 * 167 / 16
290290

291291
val emitFailureHandler: EmitFailureHandler =
292292
(signalType, emitResult) => if (emitResult.equals(EmitResult.FAIL_NON_SERIALIZED)) true else false

0 commit comments

Comments
 (0)