Skip to content

Commit b2234d6

Browse files
authored
Making max integrated cache staleness configurable for Spark connector (Azure#32592)
* Add benchmark code * Add tupleSize for readMany-specific benchmarking * Added configurability for MaxIntegratedCacheStaleness for reads * Fixed compilation error * Added edge case handling for tuning integrated cache staleness. * Modified property name for tuning integrated cache staleness. * Modified spec test to verify parsing of max integrated cache staleness. * Add more duration granularities for max integrated cache staleness. * Reduced duration granularities for max integrated cache staleness. * Modified spark config name for max integrated cache staleness. * Modified CHANGELOG.md and configuration-reference.md * Modified CHANGELOG.md * Removed locale information link verification issue. * Modified CHANGELOG.md * Used case-based equality check. * Addressed review comments. * Modified CHANGELOG.md * Updated configuration-reference.md
1 parent 15014b8 commit b2234d6

File tree

10 files changed

+63
-12
lines changed

10 files changed

+63
-12
lines changed

sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
### 4.16.0-beta.1 (Unreleased)
44

55
#### Features Added
6+
* Added the `spark.cosmos.read.maxIntegratedCacheStalenessInMS` configuration key
7+
to make `MaxIntegratedCacheStaleness` tunable for caching queries. - See [PR 32592](https://github.com/Azure/azure-sdk-for-java/pull/32592)
68

79
#### Breaking Changes
810

sdk/cosmos/azure-cosmos-spark_3-2_2-12/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
### 4.16.0-beta.1 (Unreleased)
44

55
#### Features Added
6+
* Added the `spark.cosmos.read.maxIntegratedCacheStalenessInMS` configuration key
7+
to make `MaxIntegratedCacheStaleness` tunable for caching queries. - See [PR 32592](https://github.com/Azure/azure-sdk-for-java/pull/32592)
68

79
#### Breaking Changes
810

sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
### 4.16.0-beta.1 (Unreleased)
44

55
#### Features Added
6+
* Added the `spark.cosmos.read.maxIntegratedCacheStalenessInMS` configuration key
7+
to make `MaxIntegratedCacheStaleness` tunable for caching queries. - See [PR 32592](https://github.com/Azure/azure-sdk-for-java/pull/32592)
68

79
#### Breaking Changes
810

@@ -12,7 +14,6 @@
1214

1315
### 4.15.0 (2022-11-16)
1416

15-
1617
#### Features Added
1718
Spark 3.3 support: - See [PR 31666](https://github.com/Azure/azure-sdk-for-java/pull/31666).
1819
#### Other Changes

sdk/cosmos/azure-cosmos-spark_3_2-12/docs/configuration-reference.md

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,12 @@
3838
| `spark.cosmos.write.patch.filter` | None | Used for [Conditional patch](https://docs.microsoft.com/azure/cosmos-db/partial-document-update-getting-started#java) |
3939

4040
### Query Config
41-
| Config Property Name | Default | Description |
42-
| :--- | :---- | :--- |
43-
| `spark.cosmos.read.customQuery` | None | When provided the custom query will be processed against the Cosmos endpoint instead of dynamically generating the query via predicate push down. Usually it is recommended to rely on Spark's predicate push down because that will allow to generate the most efficient set of filters based on the query plan. But there are a couple of predicates like aggregates (count, group by, avg, sum etc.) that cannot be pushed down yet (at least in Spark 3.1) - so the custom query is a fallback to allow them to be pushed into the query sent to Cosmos. If specified, with schema inference enabled, the custom query will also be used to infer the schema. |
44-
| `spark.cosmos.read.maxItemCount` | `1000` | Overrides the maximum number of documents that can be returned for a single query- or change feed request. The default value is `1000` - consider increasing this only for average document sizes significantly smaller than 1KB or when projection reduces the number of properties selected in queries significantly (like when only selecting "id" of documents etc.). |
41+
| Config Property Name | Default | Description |
42+
|:---------------------------------------------------------------|:--------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
43+
| `spark.cosmos.read.customQuery` | None | When provided the custom query will be processed against the Cosmos endpoint instead of dynamically generating the query via predicate push down. Usually it is recommended to rely on Spark's predicate push down because that will allow to generate the most efficient set of filters based on the query plan. But there are a couple of predicates like aggregates (count, group by, avg, sum etc.) that cannot be pushed down yet (at least in Spark 3.1) - so the custom query is a fallback to allow them to be pushed into the query sent to Cosmos. If specified, with schema inference enabled, the custom query will also be used to infer the schema. |
44+
| `spark.cosmos.read.maxItemCount` | `1000` | Overrides the maximum number of documents that can be returned for a single query- or change feed request. The default value is `1000` - consider increasing this only for average document sizes significantly smaller than 1KB or when projection reduces the number of properties selected in queries significantly (like when only selecting "id" of documents etc.). |
45+
| `spark.cosmos.read.maxIntegratedCacheStalenessInMS` | None | Sets the max time window in hours for which query results remain cached in the integrated cache in the dedicated gateway connectivity mode. Learn more about `MaxIntegratedCacheStaleness` [here](https://learn.microsoft.com/azure/cosmos-db/integrated-cache) |
46+
4547

4648
#### Schema Inference Config
4749
When doing read operations, users can specify a custom schema or allow the connector to infer it. Schema inference is enabled by default.

sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosConfig.scala

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ package com.azure.cosmos.spark
55

66
import com.azure.cosmos.implementation.{SparkBridgeImplementationInternal, Strings}
77
import com.azure.cosmos.implementation.routing.LocationHelper
8-
import com.azure.cosmos.models.{CosmosChangeFeedRequestOptions, CosmosParameterizedQuery, FeedRange}
8+
import com.azure.cosmos.models.{CosmosChangeFeedRequestOptions, CosmosParameterizedQuery, DedicatedGatewayRequestOptions, FeedRange}
99
import com.azure.cosmos.spark.ChangeFeedModes.ChangeFeedMode
1010
import com.azure.cosmos.spark.ChangeFeedStartFromModes.{ChangeFeedStartFromMode, PointInTime}
1111
import com.azure.cosmos.spark.CosmosPatchOperationTypes.CosmosPatchOperationTypes
@@ -86,6 +86,7 @@ private[spark] object CosmosConfigNames {
8686
val ThroughputControlPreferredRegionsList = "spark.cosmos.throughputControl.preferredRegionsList"
8787
val ThroughputControlDisableTcpConnectionEndpointRediscovery = "spark.cosmos.throughputControl.disableTcpConnectionEndpointRediscovery"
8888
val ThroughputControlUseGatewayMode = "spark.cosmos.throughputControl.useGatewayMode"
89+
val ReadMaxIntegratedCacheStalenessInMilliseconds = "spark.cosmos.read.maxIntegratedCacheStalenessInMS"
8990
val ThroughputControlName = "spark.cosmos.throughputControl.name"
9091
val ThroughputControlTargetThroughput = "spark.cosmos.throughputControl.targetThroughput"
9192
val ThroughputControlTargetThroughputThreshold = "spark.cosmos.throughputControl.targetThroughputThreshold"
@@ -153,6 +154,7 @@ private[spark] object CosmosConfigNames {
153154
ThroughputControlPreferredRegionsList,
154155
ThroughputControlDisableTcpConnectionEndpointRediscovery,
155156
ThroughputControlUseGatewayMode,
157+
ReadMaxIntegratedCacheStalenessInMilliseconds,
156158
ThroughputControlName,
157159
ThroughputControlTargetThroughput,
158160
ThroughputControlTargetThroughputThreshold,
@@ -405,6 +407,7 @@ private case class CosmosReadConfig(forceEventualConsistency: Boolean,
405407
schemaConversionMode: SchemaConversionMode,
406408
maxItemCount: Int,
407409
prefetchBufferSize: Int,
410+
dedicatedGatewayRequestOptions: DedicatedGatewayRequestOptions,
408411
customQuery: Option[CosmosParameterizedQuery])
409412

410413
private object SchemaConversionModes extends Enumeration {
@@ -465,12 +468,34 @@ private object CosmosReadConfig {
465468
"See `reactor.util.concurrent.Queues.get(int)` for more details. This means by the max. memory used for " +
466469
"buffering is 5 MB multiplied by the effective prefetch buffer size for each Executor/CPU-Core.")
467470

471+
private val MaxIntegratedCacheStalenessInMilliseconds = CosmosConfigEntry[Duration](
472+
key = CosmosConfigNames.ReadMaxIntegratedCacheStalenessInMilliseconds,
473+
mandatory = false,
474+
defaultValue = None,
475+
parseFromStringFunction = queryText => Duration.ofMillis(queryText.toLong),
476+
helpMessage = "The max integrated cache staleness is the time window in milliseconds within which subsequent reads and queries are served from " +
477+
"the integrated cache configured with the dedicated gateway. The request is served from the integrated cache itself provided the data " +
478+
"has not been evicted from the cache or a new read is run with a lower MaxIntegratedCacheStaleness than the age of the current cached " +
479+
"entry."
480+
)
481+
468482
def parseCosmosReadConfig(cfg: Map[String, String]): CosmosReadConfig = {
469483
val forceEventualConsistency = CosmosConfigEntry.parse(cfg, ForceEventualConsistency)
470484
val jsonSchemaConversionMode = CosmosConfigEntry.parse(cfg, JsonSchemaConversion)
471485
val customQuery = CosmosConfigEntry.parse(cfg, CustomQuery)
472486
val maxItemCount = CosmosConfigEntry.parse(cfg, MaxItemCount)
473487
val prefetchBufferSize = CosmosConfigEntry.parse(cfg, PrefetchBufferSize)
488+
val maxIntegratedCacheStalenessInMilliseconds = CosmosConfigEntry.parse(cfg, MaxIntegratedCacheStalenessInMilliseconds)
489+
val dedicatedGatewayRequestOptions = {
490+
val result = new DedicatedGatewayRequestOptions
491+
maxIntegratedCacheStalenessInMilliseconds match {
492+
case Some(stalenessProvidedByUser) =>
493+
result.setMaxIntegratedCacheStaleness(stalenessProvidedByUser)
494+
case None =>
495+
}
496+
result
497+
}
498+
474499

475500
CosmosReadConfig(
476501
forceEventualConsistency.get,
@@ -487,6 +512,7 @@ private object CosmosReadConfig {
487512
case None => 8
488513
}
489514
),
515+
dedicatedGatewayRequestOptions,
490516
customQuery)
491517
}
492518
}

sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosConstants.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ import com.azure.core.util.CoreUtils
77
import com.azure.cosmos.implementation.HttpConstants
88
import reactor.util.concurrent.Queues
99

10+
import java.time.Duration
11+
1012
// cosmos db related constants
1113
private object CosmosConstants {
1214
private[this] val propertiesFileName = "azure-cosmos-spark.properties"

sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosTableSchemaInferrer.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,13 @@
33
package com.azure.cosmos.spark
44

55
import com.azure.cosmos.implementation.ImplementationBridgeHelpers
6-
import com.azure.cosmos.models.{CosmosQueryRequestOptions, FeedRange}
6+
import com.azure.cosmos.models.{CosmosQueryRequestOptions, DedicatedGatewayRequestOptions, FeedRange}
77
import com.azure.cosmos.spark.diagnostics.BasicLoggingTrait
88
import com.azure.cosmos.util.CosmosPagedIterable
99
import com.fasterxml.jackson.databind.JsonNode
1010
import org.apache.spark.sql.catalyst.analysis.TypeCoercion
1111

12+
import java.time.Duration
1213
import java.util.stream.Collectors
1314

1415
// scalastyle:off underscore.import
@@ -117,6 +118,8 @@ private object CosmosTableSchemaInferrer
117118
SparkUtils.safeOpenConnectionInitCaches(sourceContainer, (msg, e) => logWarning(msg, e))
118119
val queryOptions = new CosmosQueryRequestOptions()
119120
queryOptions.setMaxBufferedItemCount(cosmosInferenceConfig.inferSchemaSamplingSize)
121+
queryOptions.setDedicatedGatewayRequestOptions(cosmosReadConfig.dedicatedGatewayRequestOptions)
122+
120123
val queryText = cosmosInferenceConfig.inferSchemaQuery match {
121124
case None =>
122125
ImplementationBridgeHelpers

sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/ItemsPartitionReader.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ package com.azure.cosmos.spark
55

66
import com.azure.cosmos.implementation.spark.{OperationContextAndListenerTuple, OperationListener}
77
import com.azure.cosmos.implementation.{ImplementationBridgeHelpers, SparkBridgeImplementationInternal, SparkRowItem, Strings}
8-
import com.azure.cosmos.models.{CosmosParameterizedQuery, CosmosQueryRequestOptions, ModelBridgeInternal}
8+
import com.azure.cosmos.models.{CosmosParameterizedQuery, CosmosQueryRequestOptions, DedicatedGatewayRequestOptions, ModelBridgeInternal}
99
import com.azure.cosmos.spark.BulkWriter.getThreadInfo
1010
import com.azure.cosmos.spark.diagnostics.{DiagnosticsContext, DiagnosticsLoader, LoggerHelper, SparkTaskContext}
1111
import org.apache.spark.TaskContext
@@ -16,6 +16,8 @@ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
1616
import org.apache.spark.sql.connector.read.PartitionReader
1717
import org.apache.spark.sql.types.StructType
1818

19+
import java.time.Duration
20+
1921
// per spark task there will be one CosmosPartitionReader.
2022
// This provides iterator to read from the assigned spark partition
2123
// For now we are creating only one spark partition
@@ -49,6 +51,8 @@ private case class ItemsPartitionReader
4951
s"query: ${cosmosQuery.toString}, Context: ${operationContext.toString} ${getThreadInfo}")
5052

5153
private val readConfig = CosmosReadConfig.parseCosmosReadConfig(config)
54+
55+
5256
private val clientCacheItem = CosmosClientCache(
5357
CosmosClientConfiguration(config, readConfig.forceEventualConsistency),
5458
Some(cosmosClientStateHandles.value.cosmosClientMetadataCaches),
@@ -138,6 +142,8 @@ private case class ItemsPartitionReader
138142
).toInt
139143
)
140144

145+
queryOptions.setDedicatedGatewayRequestOptions(readConfig.dedicatedGatewayRequestOptions)
146+
141147
ImplementationBridgeHelpers
142148
.CosmosQueryRequestOptionsHelper
143149
.getCosmosQueryRequestOptionsAccessor

sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/CosmosConfigSpec.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import com.azure.cosmos.spark.utils.CosmosPatchTestHelper
77
import org.apache.spark.sql.types.{NumericType, StructType}
88

99
import java.text.SimpleDateFormat
10-
import java.time.Instant
10+
import java.time.{Duration, Instant}
1111
import java.util.UUID
1212
import scala.collection.mutable.ListBuffer
1313
import scala.util.Random
@@ -245,11 +245,13 @@ class CosmosConfigSpec extends UnitSpec {
245245
config.customQuery shouldBe empty
246246
config.maxItemCount shouldBe 1000
247247
config.prefetchBufferSize shouldBe 8
248+
config.dedicatedGatewayRequestOptions.getMaxIntegratedCacheStaleness shouldBe null
248249

249250
userConfig = Map(
250251
"spark.cosmos.read.forceEventualConsistency" -> "false",
251252
"spark.cosmos.read.schemaConversionMode" -> "Strict",
252-
"spark.cosmos.read.maxItemCount" -> "1000"
253+
"spark.cosmos.read.maxItemCount" -> "1000",
254+
"spark.cosmos.read.maxIntegratedCacheStalenessInMS" -> "1000"
253255
)
254256

255257
config = CosmosReadConfig.parseCosmosReadConfig(userConfig)
@@ -259,6 +261,7 @@ class CosmosConfigSpec extends UnitSpec {
259261
config.customQuery shouldBe empty
260262
config.maxItemCount shouldBe 1000
261263
config.prefetchBufferSize shouldBe 8
264+
config.dedicatedGatewayRequestOptions.getMaxIntegratedCacheStaleness shouldBe Duration.ofMillis(1000)
262265

263266
userConfig = Map(
264267
"spark.cosmos.read.forceEventualConsistency" -> "false",

sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/FilterAnalyzerSpec.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,13 @@
22
// Licensed under the MIT License.
33
package com.azure.cosmos.spark
44

5-
import com.azure.cosmos.models.CosmosParameterizedQuery
5+
import com.azure.cosmos.models.{CosmosParameterizedQuery, DedicatedGatewayRequestOptions}
66
import org.apache.spark.sql.sources.{AlwaysFalse, AlwaysTrue, EqualTo, Filter, In, IsNotNull, IsNull, StringContains, StringEndsWith, StringStartsWith}
77
import org.assertj.core.api.Assertions.assertThat
8+
import org.assertj.core.api.InstanceOfAssertFactories.DURATION
89
import reactor.util.concurrent.Queues
10+
11+
import java.time.Duration
912
// scalastyle:off underscore.import
1013
import scala.collection.JavaConverters._
1114
// scalastyle:on underscore.import
@@ -16,7 +19,7 @@ class FilterAnalyzerSpec extends UnitSpec {
1619

1720
private[this] val readConfigWithoutCustomQuery =
1821
new CosmosReadConfig(
19-
true, SchemaConversionModes.Relaxed, 100, Queues.XS_BUFFER_SIZE, None)
22+
true, SchemaConversionModes.Relaxed, 100, Queues.XS_BUFFER_SIZE, new DedicatedGatewayRequestOptions, None)
2023
private[this] val queryText = "SELECT * FROM c WHERE c.abc='Hello World'"
2124
private[this] val query = Some(CosmosParameterizedQuery(
2225
queryText,
@@ -27,6 +30,7 @@ class FilterAnalyzerSpec extends UnitSpec {
2730
SchemaConversionModes.Relaxed,
2831
100,
2932
Queues.XS_BUFFER_SIZE,
33+
new DedicatedGatewayRequestOptions,
3034
query)
3135

3236
"many filters" should "be translated to cosmos predicates with AND" in {

0 commit comments

Comments
 (0)