Skip to content

Commit 425358a

Browse files
Cosmos Spark: Adding more robust retry policy for transient errors (Azure#26029)
* Cosmos Spark: Adding more robust retry policy for transient errors * Fixing StyleCop violations * Reacting to CR feedback
1 parent eb14385 commit 425358a

File tree

9 files changed

+316
-12
lines changed

9 files changed

+316
-12
lines changed

sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/main/scala/com/azure/cosmos/spark/ContainerFeedRangesCache.scala

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -40,14 +40,16 @@ private[spark] object ContainerFeedRangesCache {
4040
}
4141

4242
private[this] def refreshFeedRanges(key: String, container: CosmosAsyncContainer): SMono[List[FeedRange]] = {
43-
container
44-
.getFeedRanges
45-
.map[List[FeedRange]](javaList => {
46-
val scalaList = javaList.asScala.toList
47-
cache.put(key, CachedFeedRanges(scalaList, Instant.now))
48-
scalaList
49-
})
50-
.asScala
43+
44+
TransientErrorsRetryPolicy.executeWithRetry(() =>
45+
container
46+
.getFeedRanges
47+
.map[List[FeedRange]](javaList => {
48+
val scalaList = javaList.asScala.toList
49+
cache.put(key, CachedFeedRanges(scalaList, Instant.now))
50+
scalaList
51+
})
52+
.asScala)
5153
}
5254

5355
private case class CachedFeedRanges(feedRanges: List[FeedRange], retrievedAt: Instant)

sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/main/scala/com/azure/cosmos/spark/CosmosCatalog.scala

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,12 @@ class CosmosCatalog
112112
override def listNamespaces(): Array[Array[String]] = {
113113
logDebug("catalog:listNamespaces")
114114

115+
TransientErrorsRetryPolicy.executeWithRetry(() => listNamespacesImpl())
116+
}
117+
118+
private[this] def listNamespacesImpl(): Array[Array[String]] = {
119+
logDebug("catalog:listNamespaces")
120+
115121
Loan(CosmosClientCache(
116122
CosmosClientConfiguration(config, readConfig.forceEventualConsistency),
117123
None,
@@ -153,6 +159,12 @@ class CosmosCatalog
153159
override def loadNamespaceMetadata(
154160
namespace: Array[String]): util.Map[String, String] = {
155161

162+
TransientErrorsRetryPolicy.executeWithRetry(() => loadNamespaceMetadataImpl(namespace))
163+
}
164+
165+
private[this] def loadNamespaceMetadataImpl(
166+
namespace: Array[String]): util.Map[String, String] = {
167+
156168
checkNamespace(namespace)
157169

158170
Loan(CosmosClientCache(
@@ -187,6 +199,12 @@ class CosmosCatalog
187199
@throws(classOf[NamespaceAlreadyExistsException])
188200
override def createNamespace(namespace: Array[String],
189201
metadata: util.Map[String, String]): Unit = {
202+
TransientErrorsRetryPolicy.executeWithRetry(() => createNamespaceImpl(namespace, metadata))
203+
}
204+
205+
@throws(classOf[NamespaceAlreadyExistsException])
206+
private[this] def createNamespaceImpl(namespace: Array[String],
207+
metadata: util.Map[String, String]): Unit = {
190208
checkNamespace(namespace)
191209
val throughputPropertiesOpt =
192210
CosmosThroughputProperties.tryGetThroughputProperties(
@@ -234,6 +252,11 @@ class CosmosCatalog
234252
*/
235253
@throws(classOf[NoSuchNamespaceException])
236254
override def dropNamespace(namespace: Array[String]): Boolean = {
255+
TransientErrorsRetryPolicy.executeWithRetry(() => dropNamespaceImpl(namespace))
256+
}
257+
258+
@throws(classOf[NoSuchNamespaceException])
259+
private[this] def dropNamespaceImpl(namespace: Array[String]): Boolean = {
237260
checkNamespace(namespace)
238261
try {
239262
Loan(CosmosClientCache(
@@ -256,6 +279,10 @@ class CosmosCatalog
256279
}
257280

258281
override def listTables(namespace: Array[String]): Array[Identifier] = {
282+
TransientErrorsRetryPolicy.executeWithRetry(() => listTablesImpl(namespace))
283+
}
284+
285+
private[this] def listTablesImpl(namespace: Array[String]): Array[Identifier] = {
259286
checkNamespace(namespace)
260287
val databaseName = toCosmosDatabaseName(namespace.head)
261288

@@ -290,6 +317,10 @@ class CosmosCatalog
290317
}
291318

292319
override def loadTable(ident: Identifier): Table = {
320+
TransientErrorsRetryPolicy.executeWithRetry(() => loadTableImpl(ident))
321+
}
322+
323+
private[this] def loadTableImpl(ident: Identifier): Table = {
293324
checkNamespace(ident.namespace())
294325
val databaseName = toCosmosDatabaseName(ident.namespace().head)
295326
val containerName = toCosmosContainerName(ident.name())
@@ -328,6 +359,15 @@ class CosmosCatalog
328359
schema: StructType,
329360
partitions: Array[Transform],
330361
properties: util.Map[String, String]): Table = {
362+
363+
TransientErrorsRetryPolicy.executeWithRetry(() =>
364+
createTableImpl(ident, schema, partitions, properties))
365+
}
366+
367+
private[this] def createTableImpl(ident: Identifier,
368+
schema: StructType,
369+
partitions: Array[Transform],
370+
properties: util.Map[String, String]): Table = {
331371
checkNamespace(ident.namespace())
332372

333373
val databaseName = toCosmosDatabaseName(ident.namespace().head)
@@ -348,6 +388,10 @@ class CosmosCatalog
348388
}
349389

350390
override def dropTable(ident: Identifier): Boolean = {
391+
TransientErrorsRetryPolicy.executeWithRetry(() => dropTableImpl(ident))
392+
}
393+
394+
private[this] def dropTableImpl(ident: Identifier): Boolean = {
351395
checkNamespace(ident.namespace())
352396

353397
val databaseName = toCosmosDatabaseName(ident.namespace().head)

sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/main/scala/com/azure/cosmos/spark/CosmosConstants.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ private object CosmosConstants {
1414
val currentName: String =
1515
CoreUtils.getProperties(propertiesFileName).get("name")
1616
val userAgentSuffix = s"SparkConnector/$currentName/$currentVersion"
17+
val initialMaxRetryIntervalForTransientFailuresInMs = 100
1718
val maxRetryIntervalForTransientFailuresInMs = 5000
1819
val maxRetryCountForTransientFailures = 100
1920
val defaultDirectRequestTimeoutInSeconds = 10L

sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/main/scala/com/azure/cosmos/spark/CosmosPartitionPlanner.scala

Lines changed: 77 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,26 @@ private object CosmosPartitionPlanner extends BasicLoggingTrait {
3030
val DefaultPartitionSizeInMB: Int = 5 * 1024 // 10 GB
3131

3232
def createInputPartitions
33+
(
34+
cosmosPartitioningConfig: CosmosPartitioningConfig,
35+
container: CosmosAsyncContainer,
36+
partitionMetadata: Array[PartitionMetadata],
37+
defaultMinimalPartitionCount: Int,
38+
defaultMaxPartitionSizeInMB: Int,
39+
readLimit: ReadLimit
40+
): Array[CosmosInputPartition] = {
41+
42+
TransientErrorsRetryPolicy.executeWithRetry(() =>
43+
createInputPartitionsImpl(
44+
cosmosPartitioningConfig,
45+
container,
46+
partitionMetadata,
47+
defaultMinimalPartitionCount,
48+
defaultMaxPartitionSizeInMB,
49+
readLimit))
50+
}
51+
52+
private[this] def createInputPartitionsImpl
3353
(
3454
cosmosPartitioningConfig: CosmosPartitioningConfig,
3555
container: CosmosAsyncContainer,
@@ -89,8 +109,20 @@ private object CosmosPartitionPlanner extends BasicLoggingTrait {
89109
.flatten
90110
}
91111

92-
// scalastyle:off method.length
93112
def createInitialOffset
113+
(
114+
container: CosmosAsyncContainer,
115+
changeFeedConfig: CosmosChangeFeedConfig,
116+
streamId: Option[String]
117+
): String = {
118+
119+
TransientErrorsRetryPolicy.executeWithRetry(() =>
120+
createInitialOffsetImpl(container, changeFeedConfig, streamId)
121+
)
122+
}
123+
124+
// scalastyle:off method.length
125+
private[this] def createInitialOffsetImpl
94126
(
95127
container: CosmosAsyncContainer,
96128
changeFeedConfig: CosmosChangeFeedConfig,
@@ -150,10 +182,40 @@ private object CosmosPartitionPlanner extends BasicLoggingTrait {
150182
}
151183
// scalastyle:on method.length
152184

185+
// scalastyle:off parameter.number
186+
def getLatestOffset
187+
(
188+
userConfig: Map[String, String],
189+
startOffset: ChangeFeedOffset,
190+
readLimit: ReadLimit,
191+
maxStaleness: Duration,
192+
clientConfiguration: CosmosClientConfiguration,
193+
cosmosClientStateHandle: Broadcast[CosmosClientMetadataCachesSnapshot],
194+
containerConfig: CosmosContainerConfig,
195+
partitioningConfig: CosmosPartitioningConfig,
196+
defaultParallelism: Int,
197+
container: CosmosAsyncContainer
198+
): ChangeFeedOffset = {
199+
200+
TransientErrorsRetryPolicy.executeWithRetry(() =>
201+
getLatestOffsetImpl(
202+
userConfig,
203+
startOffset,
204+
readLimit,
205+
maxStaleness,
206+
clientConfiguration,
207+
cosmosClientStateHandle,
208+
containerConfig,
209+
partitioningConfig,
210+
defaultParallelism,
211+
container))
212+
}
213+
// scalastyle:on parameter.number
214+
153215
// scalastyle:off method.length
154216
// scalastyle:off parameter.number
155217
// Based on a start offset, calculate which is the next end offset
156-
def getLatestOffset
218+
private[this] def getLatestOffsetImpl
157219
(
158220
userConfig: Map[String, String],
159221
startOffset: ChangeFeedOffset,
@@ -471,6 +533,19 @@ private object CosmosPartitionPlanner extends BasicLoggingTrait {
471533
}
472534

473535
def getPartitionMetadata(
536+
userConfig: Map[String, String],
537+
cosmosClientConfig: CosmosClientConfiguration,
538+
cosmosClientStateHandle: Option[Broadcast[CosmosClientMetadataCachesSnapshot]],
539+
cosmosContainerConfig: CosmosContainerConfig,
540+
maxStaleness: Option[Duration] = None
541+
): Array[PartitionMetadata] = {
542+
543+
TransientErrorsRetryPolicy.executeWithRetry(() =>
544+
getPartitionMetadataImpl(
545+
userConfig, cosmosClientConfig, cosmosClientStateHandle, cosmosContainerConfig, maxStaleness))
546+
}
547+
548+
private[this] def getPartitionMetadataImpl(
474549
userConfig: Map[String, String],
475550
cosmosClientConfig: CosmosClientConfiguration,
476551
cosmosClientStateHandle: Option[Broadcast[CosmosClientMetadataCachesSnapshot]],

sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/main/scala/com/azure/cosmos/spark/CosmosTableSchemaInferrer.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,14 @@ private object CosmosTableSchemaInferrer
8989
private[spark] def inferSchema(client: CosmosAsyncClient,
9090
userConfig: Map[String, String],
9191
defaultSchema: StructType): StructType = {
92+
93+
TransientErrorsRetryPolicy.executeWithRetry(() =>
94+
inferSchemaImpl(client, userConfig, defaultSchema))
95+
}
96+
97+
private[this] def inferSchemaImpl(client: CosmosAsyncClient,
98+
userConfig: Map[String, String],
99+
defaultSchema: StructType): StructType = {
92100
val cosmosInferenceConfig = CosmosSchemaInferenceConfig.parseCosmosInferenceConfig(userConfig)
93101
val cosmosReadConfig = CosmosReadConfig.parseCosmosReadConfig(userConfig)
94102
if (cosmosInferenceConfig.inferSchemaEnabled) {

sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/main/scala/com/azure/cosmos/spark/PartitionMetadataCache.scala

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,8 +137,28 @@ private object PartitionMetadataCache extends BasicLoggingTrait {
137137

138138
this.startRefreshTimer()
139139

140-
//scalastyle:off method.length
141140
private def readPartitionMetadata
141+
(
142+
userConfig: Map[String, String],
143+
cosmosClientConfiguration: CosmosClientConfiguration,
144+
cosmosClientStateHandle: Option[Broadcast[CosmosClientMetadataCachesSnapshot]],
145+
cosmosContainerConfig: CosmosContainerConfig,
146+
feedRange: NormalizedRange,
147+
tolerateNotFound: Boolean
148+
): SMono[Option[PartitionMetadata]] = {
149+
150+
TransientErrorsRetryPolicy.executeWithRetry(() =>
151+
readPartitionMetadataImpl(
152+
userConfig,
153+
cosmosClientConfiguration,
154+
cosmosClientStateHandle,
155+
cosmosContainerConfig,
156+
feedRange,
157+
tolerateNotFound))
158+
}
159+
160+
//scalastyle:off method.length
161+
private def readPartitionMetadataImpl
142162
(
143163
userConfig: Map[String, String],
144164
cosmosClientConfiguration: CosmosClientConfiguration,
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
package com.azure.cosmos.spark
4+
5+
import com.azure.cosmos.CosmosException
6+
import com.azure.cosmos.spark.diagnostics.BasicLoggingTrait
7+
8+
import java.util.concurrent.atomic.AtomicLong
9+
import scala.util.Random
10+
import scala.util.control.Breaks
11+
12+
private[spark] object TransientErrorsRetryPolicy extends BasicLoggingTrait {
13+
private val rnd = Random
14+
15+
def executeWithRetry[T]
16+
(
17+
func: () => T,
18+
initialMaxRetryIntervalInMs: Int = CosmosConstants.initialMaxRetryIntervalForTransientFailuresInMs,
19+
maxRetryIntervalInMs: Int = CosmosConstants.maxRetryIntervalForTransientFailuresInMs,
20+
maxRetryCount: Int = Int.MaxValue
21+
): T = {
22+
val loop = new Breaks()
23+
val retryCount = new AtomicLong(0)
24+
var returnValue: Option[T] = None
25+
26+
loop.breakable {
27+
var currentMaxRetryIntervalInMs = Math.min(initialMaxRetryIntervalInMs, maxRetryIntervalInMs)
28+
while (true) {
29+
val retryIntervalInMs = rnd.nextInt(currentMaxRetryIntervalInMs)
30+
31+
try {
32+
returnValue = Some(func())
33+
loop.break
34+
}
35+
catch {
36+
case cosmosException: CosmosException =>
37+
if (Exceptions.canBeTransientFailure(cosmosException)) {
38+
val retryCountSnapshot = retryCount.incrementAndGet()
39+
if (retryCountSnapshot > maxRetryCount) {
40+
logError(
41+
s"Too many transient failure retry attempts ($retryCountSnapshot) in " +
42+
s"TransientIORetryPolicy.executeWithRetry",
43+
cosmosException)
44+
throw cosmosException
45+
} else {
46+
logWarning(
47+
s"Transient failure handled in TransientIORetryPolicy.executeWithRetry -" +
48+
s" will be retried (attempt#$retryCountSnapshot) in ${retryIntervalInMs}ms",
49+
cosmosException)
50+
}
51+
} else {
52+
throw cosmosException
53+
}
54+
case other: Throwable => throw other
55+
}
56+
57+
Thread.sleep(retryIntervalInMs)
58+
currentMaxRetryIntervalInMs = Math.min(2 * currentMaxRetryIntervalInMs, maxRetryIntervalInMs)
59+
}
60+
}
61+
62+
returnValue.get
63+
}
64+
}

0 commit comments

Comments
 (0)