Skip to content

Commit 0d749b0

Browse files
Allowing CorrelationActivityId to be sent to BE and retrieved from FeedResponse (Azure#26908)
* Allowing CorrelationActivityId to be sent to BE and retrieved from FeedResponse * Fixing build issue * Adding an end-to-end unit test from Spark to verify that only one correlationActivityId is used * Fix typo in comment * Adding more error details * Adding additional check for query diagnostics assertion * Fixed UUID encoding (only used for ActivityId and CorrelatedActivityId - so safe) * Fixing test issue * Small logging changes * Reverting new public ctor in CosmosBulkExecutionOptions
1 parent a08de4c commit 0d749b0

File tree

27 files changed

+363
-43
lines changed

27 files changed

+363
-43
lines changed

eng/versioning/version_client.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,8 @@ com.azure:azure-cosmos;4.25.0;4.26.0-beta.2
8282
com.azure:azure-cosmos-benchmark;4.0.1-beta.1;4.0.1-beta.1
8383
com.azure:azure-cosmos-dotnet-benchmark;4.0.1-beta.1;4.0.1-beta.1
8484
com.azure.cosmos.spark:azure-cosmos-spark_3_2-12;1.0.0-beta.1;1.0.0-beta.1
85-
com.azure.cosmos.spark:azure-cosmos-spark_3-1_2-12;4.6.0;4.7.0-beta.1
86-
com.azure.cosmos.spark:azure-cosmos-spark_3-2_2-12;4.6.0;4.7.0-beta.1
85+
com.azure.cosmos.spark:azure-cosmos-spark_3-1_2-12;4.6.0;4.6.1-beta.1
86+
com.azure.cosmos.spark:azure-cosmos-spark_3-2_2-12;4.6.0;4.6.1-beta.1
8787
com.azure:azure-cosmos-encryption;1.0.0-beta.9;1.0.0-beta.10
8888
com.azure:azure-data-appconfiguration;1.2.5;1.3.0-beta.1
8989
com.azure:azure-data-appconfiguration-perf;1.0.0-beta.1;1.0.0-beta.1

sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
## Release History
2-
### 4.7.0-beta.1 (Unreleased)
2+
### 4.6.1-beta.1 (Unreleased)
33

44
### 4.6.0 (2022-01-25)
55
#### Key Bug Fixes

sdk/cosmos/azure-cosmos-spark_3-1_2-12/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
</parent>
1212
<groupId>com.azure.cosmos.spark</groupId>
1313
<artifactId>azure-cosmos-spark_3-1_2-12</artifactId>
14-
<version>4.7.0-beta.1</version> <!-- {x-version-update;com.azure.cosmos.spark:azure-cosmos-spark_3-1_2-12;current} -->
14+
<version>4.6.1-beta.1</version> <!-- {x-version-update;com.azure.cosmos.spark:azure-cosmos-spark_3-1_2-12;current} -->
1515
<packaging>jar</packaging>
1616
<url>https://github.com/Azure/azure-sdk-for-java/tree/main/sdk/cosmos/azure-cosmos-spark_3-1_2-12</url>
1717
<name>OLTP Spark 3.1 Connector for Azure Cosmos DB SQL API</name>

sdk/cosmos/azure-cosmos-spark_3-2_2-12/CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
## Release History
2-
### 4.7.0-beta.1 (Unreleased)
2+
### 4.6.1-beta.1 (Unreleased)
33

44
### 4.6.0 (2022-01-25)
55
#### Key Bug Fixes

sdk/cosmos/azure-cosmos-spark_3-2_2-12/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
</parent>
1212
<groupId>com.azure.cosmos.spark</groupId>
1313
<artifactId>azure-cosmos-spark_3-2_2-12</artifactId>
14-
<version>4.7.0-beta.1</version> <!-- {x-version-update;com.azure.cosmos.spark:azure-cosmos-spark_3-2_2-12;current} -->
14+
<version>4.6.1-beta.1</version> <!-- {x-version-update;com.azure.cosmos.spark:azure-cosmos-spark_3-2_2-12;current} -->
1515
<packaging>jar</packaging>
1616
<url>https://github.com/Azure/azure-sdk-for-java/tree/main/sdk/cosmos/azure-cosmos-spark_3-2_2-12</url>
1717
<name>OLTP Spark 3.2 Connector for Azure Cosmos DB SQL API</name>

sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/BulkWriter.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ class BulkWriter(container: CosmosAsyncContainer,
9595
private def initializeOperationContext(): SparkTaskContext = {
9696
val taskContext = TaskContext.get
9797

98-
val diagnosticsContext: DiagnosticsContext = DiagnosticsContext(UUID.randomUUID().toString, "BulkWriter")
98+
val diagnosticsContext: DiagnosticsContext = DiagnosticsContext(UUID.randomUUID(), "BulkWriter")
9999

100100
if (taskContext != null) {
101101
val taskDiagnosticsContext = SparkTaskContext(diagnosticsContext.correlationActivityId,

sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/ItemsPartitionReader.scala

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,10 @@ private case class ItemsPartitionReader
3636

3737
private val containerTargetConfig = CosmosContainerConfig.parseCosmosContainerConfig(config)
3838
log.logInfo(s"Reading from feed range $feedRange of " +
39-
s"container ${containerTargetConfig.database}.${containerTargetConfig.container}")
39+
s"container ${containerTargetConfig.database}.${containerTargetConfig.container} - " +
40+
s"correlationActivityId ${diagnosticsContext.correlationActivityId}, " +
41+
s"query: ${cosmosQuery.toString}")
42+
4043
private val readConfig = CosmosReadConfig.parseCosmosReadConfig(config)
4144
private val clientCacheItem = CosmosClientCache(
4245
CosmosClientConfiguration(config, readConfig.forceEventualConsistency),
@@ -63,11 +66,12 @@ private case class ItemsPartitionReader
6366
val taskContext = TaskContext.get
6467
assert(taskContext != null)
6568

66-
val taskDiagnosticsContext = SparkTaskContext(diagnosticsContext.correlationActivityId,
69+
val taskDiagnosticsContext = SparkTaskContext(
70+
diagnosticsContext.correlationActivityId,
6771
taskContext.stageId(),
6872
taskContext.partitionId(),
6973
taskContext.taskAttemptId(),
70-
feedRange.toString + " " + cosmosQuery.toSqlQuerySpec.getQueryText)
74+
feedRange.toString + " " + cosmosQuery.toString)
7175

7276
val listener: OperationListener =
7377
DiagnosticsLoader.getDiagnosticsProvider(diagnosticsConfig).getLogger(this.getClass)
@@ -91,6 +95,14 @@ private case class ItemsPartitionReader
9195
queryOptions, null, readConfig.maxItemCount)
9296
// scalastyle:on null
9397
}
98+
99+
ImplementationBridgeHelpers
100+
.CosmosQueryRequestOptionsHelper
101+
.getCosmosQueryRequestOptionsAccessor
102+
.setCorrelationActivityId(
103+
queryOptions,
104+
diagnosticsContext.correlationActivityId)
105+
94106
cosmosAsyncContainer.queryItems(cosmosQuery.toSqlQuerySpec, queryOptions, classOf[ObjectNode])
95107
},
96108
readConfig.maxItemCount

sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/ItemsScan.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,10 +103,12 @@ private case class ItemsScan(session: SparkSession,
103103
}
104104

105105
override def createReaderFactory(): PartitionReaderFactory = {
106+
val correlationActivityId = UUID.randomUUID()
107+
log.logInfo(s"Creating ItemsScan with CorrelationActivityId '${correlationActivityId.toString}' for query '${cosmosQuery.queryText}'")
106108
ItemsScanPartitionReaderFactory(config,
107109
schema,
108110
cosmosQuery,
109-
DiagnosticsContext(UUID.randomUUID().toString, cosmosQuery.queryText),
111+
DiagnosticsContext(correlationActivityId, cosmosQuery.queryText),
110112
cosmosClientStateHandle,
111113
DiagnosticsConfig.parseDiagnosticsConfig(config))
112114
}

sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/PointWriter.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ class PointWriter(container: CosmosAsyncContainer, cosmosWriteConfig: CosmosWrit
5353
private val pendingPointWrites = new TrieMap[Future[Unit], Boolean]()
5454
private val closed = new AtomicBoolean(false)
5555

56-
private val diagnosticsContext: DiagnosticsContext = DiagnosticsContext(UUID.randomUUID().toString, "PointWriter")
56+
private val diagnosticsContext: DiagnosticsContext = DiagnosticsContext(UUID.randomUUID(), "PointWriter")
5757

5858
private val taskDiagnosticsContext = SparkTaskContext(diagnosticsContext.correlationActivityId,
5959
taskContext.stageId(),

sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/diagnostics/DiagnosticsContext.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,6 @@
33

44
package com.azure.cosmos.spark.diagnostics
55

6-
private[spark] case class DiagnosticsContext(correlationActivityId: String, details: String)
6+
import java.util.UUID
7+
8+
private[spark] case class DiagnosticsContext(correlationActivityId: UUID, details: String)

0 commit comments

Comments
 (0)