Skip to content

Commit 88b2b2f

Browse files
authored
use configurable logger instead of slf4j logger everywhere. (Azure#21076)
- use configurable logger instead of slf4j logger everywhere in the spark connector layer (doesn't touch the SDK). - wired up per request operation/response (for query, and write operations) against spark diagnostics. please look at these classes for starting to review: DiagnosticsProvider.scala SparkTaskContext.scala PointWriter For point writes when diagnostics enabled we will get something as follows: ``` 21/06/01 11:18:21 INFO PointWriter: SparkTaskContext(correlationActivityId=abc7c1bc-1d3d-456b-bd43-27f63419f3c8,stageId=0,partitionId=0,details=), response: response{statusCode:200, headers{activityId:befce693-c305-11eb-a373-b51dba521584}} 21/06/01 11:18:21 INFO PointWriter: UpsertOperation(SparkTaskContext(correlationActivityId=abc7c1bc-1d3d-456b-bd43-27f63419f3c8,stageId=0,partitionId=0,details=PointWriter),CosmosItemIdentifier(bat,["bat"])) completed 21/06/01 11:18:21 INFO PointWriter: SparkTaskContext(correlationActivityId=abc7c1bc-1d3d-456b-bd43-27f63419f3c8,stageId=0,partitionId=0,details=), response: response{statusCode:200, headers{activityId:befd82d4-c305-11eb-a373-b51dba521584}} 21/06/01 11:18:21 INFO PointWriter: UpsertOperation(SparkTaskContext(correlationActivityId=abc7c1bc-1d3d-456b-bd43-27f63419f3c8,stageId=0,partitionId=0,details=PointWriter),CosmosItemIdentifier(mouse,["mouse"])) completed 21/06/01 11:18:21 INFO PointWriter: SparkTaskContext(correlationActivityId=abc7c1bc-1d3d-456b-bd43-27f63419f3c8,stageId=0,partitionId=0,details=), response: response{statusCode:200, headers{activityId:befda9e5-c305-11eb-a373-b51dba521584}} 21/06/01 11:18:21 INFO PointWriter: UpsertOperation(SparkTaskContext(correlationActivityId=abc7c1bc-1d3d-456b-bd43-27f63419f3c8,stageId=0,partitionId=0,details=PointWriter),CosmosItemIdentifier(horse,["horse"])) completed ``` for bulk write when diagnostics enabled we get something like following. ``` 21/06/01 11:13:59 INFO BulkWriter: SparkTaskContext(correlationActivityId=1fe6232b-809c-4e2b-98eb-c10708656300,stageId=0,partitionId=0,details=), request: request{operationType:Batch, resourceType:Document, partitionKey:null, resourceAddress:dbs/testDB/colls/testContainer, headers{activityId:22f471f7-c305-11eb-98b7-f93f10f7d1fe, items:3}} 21/06/01 11:13:59 INFO BulkWriter: SparkTaskContext(correlationActivityId=1fe6232b-809c-4e2b-98eb-c10708656300,stageId=0,partitionId=0,details=), response: response{statusCode:200, headers{activityId:22f471f7-c305-11eb-98b7-f93f10f7d1fe, itemCount:3, continuationToken:null}} ``` similar to point write, if we agree on the design I can add the per item details to bulk as
1 parent c4905e7 commit 88b2b2f

File tree

61 files changed

+1027
-352
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

61 files changed

+1027
-352
lines changed

sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/main/scala/com/azure/cosmos/implementation/SparkBridgeImplementationInternal.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,13 @@ import com.azure.cosmos.implementation.feedranges.{FeedRangeContinuation, FeedRa
1010
import com.azure.cosmos.implementation.query.CompositeContinuationToken
1111
import com.azure.cosmos.implementation.routing.Range
1212
import com.azure.cosmos.models.FeedRange
13-
import com.azure.cosmos.spark.{CosmosLoggingTrait, NormalizedRange}
13+
import com.azure.cosmos.spark.NormalizedRange
1414

1515
// scalastyle:off underscore.import
1616
import scala.collection.JavaConverters._
1717
// scalastyle:on underscore.import
1818

19-
private[cosmos] object SparkBridgeImplementationInternal extends CosmosLoggingTrait {
19+
private[cosmos] object SparkBridgeImplementationInternal {
2020
def setMetadataCacheSnapshot(cosmosClientBuilder: CosmosClientBuilder,
2121
metadataCache: CosmosClientMetadataCachesSnapshot): Unit = {
2222

sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/main/scala/com/azure/cosmos/spark/BulkWriter.scala

Lines changed: 59 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -2,35 +2,42 @@
22
// Licensed under the MIT License.
33
package com.azure.cosmos.spark
44

5+
import com.azure.cosmos.implementation.ImplementationBridgeHelpers
56
import com.azure.cosmos.implementation.guava25.base.Preconditions
7+
import com.azure.cosmos.implementation.spark.{OperationContextAndListenerTuple, OperationListener}
68
import com.azure.cosmos.models.PartitionKey
7-
import com.azure.cosmos.spark.BulkWriter.DefaultMaxPendingOperationPerCore
8-
import com.azure.cosmos.{BulkItemRequestOptions, BulkOperations, CosmosAsyncContainer, CosmosBulkOperationResponse, CosmosException, CosmosItemOperation}
9+
import com.azure.cosmos.spark.BulkWriter.{DefaultMaxPendingOperationPerCore, emitFailureHandler}
10+
import com.azure.cosmos.spark.diagnostics.{DiagnosticsContext, DiagnosticsLoader, LoggerHelper, SparkTaskContext}
11+
import com.azure.cosmos.{
12+
BulkItemRequestOptions,
13+
BulkOperations, BulkProcessingOptions, CosmosAsyncContainer, CosmosBulkOperationResponse, CosmosException, CosmosItemOperation
14+
}
915
import com.fasterxml.jackson.databind.node.ObjectNode
16+
import org.apache.spark.TaskContext
1017
import reactor.core.Disposable
1118
import reactor.core.publisher.Sinks
19+
import reactor.core.publisher.Sinks.{EmitFailureHandler, EmitResult}
1220
import reactor.core.scala.publisher.SMono.PimpJFlux
1321
import reactor.core.scala.publisher.{SFlux, SMono}
1422
import reactor.core.scheduler.Schedulers
1523

24+
import java.util.UUID
1625
import java.util.concurrent.Semaphore
1726
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicLong, AtomicReference}
1827
import java.util.concurrent.locks.ReentrantLock
19-
import com.azure.cosmos.spark.BulkWriter.emitFailureHandler
20-
import reactor.core.publisher.Sinks.EmitFailureHandler
21-
import reactor.core.publisher.Sinks.EmitResult
22-
2328
import scala.collection.concurrent.TrieMap
2429

2530
//scalastyle:off null
2631
//scalastyle:off multiple.string.literals
2732
class BulkWriter(container: CosmosAsyncContainer,
28-
writeConfig: CosmosWriteConfig)
29-
extends AsyncItemWriter
30-
with CosmosLoggingTrait {
33+
writeConfig: CosmosWriteConfig,
34+
diagnosticsConfig: DiagnosticsConfig)
35+
extends AsyncItemWriter {
36+
37+
private val log = LoggerHelper.getLogger(diagnosticsConfig, this.getClass)
3138

3239
// TODO: moderakh add a mocking unit test for Bulk where CosmosClient is mocked to simulator failure/retry scenario
33-
logInfo("BulkWriter instantiated ....")
40+
log.logInfo("BulkWriter instantiated ....")
3441

3542
// TODO: moderakh this requires tuning.
3643
// TODO: moderakh should we do a max on the max memory to ensure we don't run out of memory?
@@ -62,9 +69,34 @@ class BulkWriter(container: CosmosAsyncContainer,
6269
private val totalScheduledMetrics = new AtomicLong(0)
6370
private val totalSuccessfulIngestionMetrics = new AtomicLong(0)
6471

72+
private val bulkOptions = new BulkProcessingOptions[Object]()
73+
initializeDiagnosticsIfConfigured()
74+
75+
private def initializeDiagnosticsIfConfigured(): Unit = {
76+
if (diagnosticsConfig.mode.isDefined) {
77+
val taskContext = TaskContext.get
78+
assert(taskContext != null)
79+
80+
val diagnosticsContext: DiagnosticsContext = DiagnosticsContext(UUID.randomUUID().toString, "BulkWriter")
81+
82+
val taskDiagnosticsContext = SparkTaskContext(diagnosticsContext.correlationActivityId,
83+
taskContext.stageId(),
84+
taskContext.partitionId(),
85+
"")
86+
87+
val listener: OperationListener =
88+
DiagnosticsLoader.getDiagnosticsProvider(diagnosticsConfig).getLogger(this.getClass)
89+
90+
val operationContextAndListenerTuple = new OperationContextAndListenerTuple(taskDiagnosticsContext, listener)
91+
ImplementationBridgeHelpers.CosmosBulkProcessingOptionsHelper
92+
.getCosmosBulkProcessingOptionAccessor()
93+
.setOperationContext(bulkOptions, operationContextAndListenerTuple)
94+
}
95+
}
96+
6597
private val subscriptionDisposable: Disposable = {
6698
val bulkOperationResponseFlux: SFlux[CosmosBulkOperationResponse[Object]] =
67-
container.processBulkOperations[Object](bulkInputEmitter.asFlux()).asScala
99+
container.processBulkOperations[Object](bulkInputEmitter.asFlux(), bulkOptions).asScala
68100

69101
bulkOperationResponseFlux.subscribe(
70102
resp => {
@@ -78,15 +110,15 @@ class BulkWriter(container: CosmosAsyncContainer,
78110
if (resp.getException != null) {
79111
Option(resp.getException) match {
80112
case Some(cosmosException: CosmosException) => {
81-
logDebug(s"encountered ${cosmosException.getStatusCode}")
113+
log.logDebug(s"encountered ${cosmosException.getStatusCode}")
82114
if (shouldIgnore(cosmosException)) {
83-
logDebug(s"for itemId=[${context.itemId}], partitionKeyValue=[${context.partitionKeyValue}], " +
115+
log.logDebug(s"for itemId=[${context.itemId}], partitionKeyValue=[${context.partitionKeyValue}], " +
84116
s"ignored encountered ${cosmosException.getStatusCode}")
85117
totalSuccessfulIngestionMetrics.getAndIncrement()
86118
// work done
87119
} else if (shouldRetry(cosmosException, contextOpt.get)) {
88120
// requeue
89-
logWarning(s"for itemId=[${context.itemId}], partitionKeyValue=[${context.partitionKeyValue}], " +
121+
log.logWarning(s"for itemId=[${context.itemId}], partitionKeyValue=[${context.partitionKeyValue}], " +
90122
s"encountered ${cosmosException.getStatusCode}, will retry! " +
91123
s"attemptNumber=${context.attemptNumber}, exceptionMessage=${cosmosException.getMessage}")
92124

@@ -102,15 +134,15 @@ class BulkWriter(container: CosmosAsyncContainer,
102134

103135
isGettingRetried = true
104136
} else {
105-
logWarning(s"for itemId=[${context.itemId}], partitionKeyValue=[${context.partitionKeyValue}], " +
137+
log.logWarning(s"for itemId=[${context.itemId}], partitionKeyValue=[${context.partitionKeyValue}], " +
106138
s"encountered ${cosmosException.getStatusCode}, all retries exhausted! " +
107139
s"attemptNumber=${context.attemptNumber}, exceptionMessage=${cosmosException.getMessage}")
108140
captureIfFirstFailure(cosmosException)
109141
cancelWork()
110142
}
111143
}
112144
case _ =>
113-
logWarning(s"unexpected failure: itemId=[${context.itemId}], partitionKeyValue=[${context.partitionKeyValue}], " +
145+
log.logWarning(s"unexpected failure: itemId=[${context.itemId}], partitionKeyValue=[${context.partitionKeyValue}], " +
114146
s"encountered , attemptNumber=${context.attemptNumber}, exceptionMessage=${resp.getException.getMessage}", resp.getException)
115147
captureIfFirstFailure(resp.getException)
116148
cancelWork()
@@ -131,7 +163,7 @@ class BulkWriter(container: CosmosAsyncContainer,
131163
},
132164
errorConsumer = Option.apply(
133165
ex => {
134-
logError("Unexpected failure code path in Bulk ingestion", ex)
166+
log.logError("Unexpected failure code path in Bulk ingestion", ex)
135167
// if there is any failure this closes the bulk.
136168
// at this point bulk api doesn't allow any retrying
137169
// we don't know the list of failed item-operations
@@ -150,21 +182,21 @@ class BulkWriter(container: CosmosAsyncContainer,
150182
override def scheduleWrite(partitionKeyValue: PartitionKey, objectNode: ObjectNode): Unit = {
151183
Preconditions.checkState(!closed.get())
152184
if (errorCaptureFirstException.get() != null) {
153-
logWarning("encountered failure earlier, rejecting new work")
185+
log.logWarning("encountered failure earlier, rejecting new work")
154186
throw errorCaptureFirstException.get()
155187
}
156188

157189
semaphore.acquire()
158190
val cnt = totalScheduledMetrics.getAndIncrement()
159-
logDebug(s"total scheduled ${cnt}")
191+
log.logDebug(s"total scheduled ${cnt}")
160192

161193
scheduleWriteInternal(partitionKeyValue, objectNode, OperationContext(getId(objectNode), partitionKeyValue, getETag(objectNode), 1))
162194
}
163195

164196
private def scheduleWriteInternal(partitionKeyValue: PartitionKey, objectNode: ObjectNode, operationContext: OperationContext): Unit = {
165197
activeTasks.incrementAndGet()
166198
if (operationContext.attemptNumber > 1) {
167-
logInfo(s"bulk scheduleWrite attemptCnt: ${operationContext.attemptNumber}")
199+
log.logInfo(s"bulk scheduleWrite attemptCnt: ${operationContext.attemptNumber}")
168200
}
169201

170202
val bulkItemOperation = writeConfig.itemWriteStrategy match {
@@ -202,9 +234,9 @@ class BulkWriter(container: CosmosAsyncContainer,
202234
// scalastyle:on return
203235
}
204236

205-
logInfo("flushAndClose invoked")
237+
log.logInfo("flushAndClose invoked")
206238

207-
logInfo(s"completed so far ${totalSuccessfulIngestionMetrics.get()}, pending tasks ${activeOperations.size}")
239+
log.logInfo(s"completed so far ${totalSuccessfulIngestionMetrics.get()}, pending tasks ${activeOperations.size}")
208240

209241
// error handling, if there is any error and the subscription is cancelled
210242
// the remaining tasks will not be processed hence we never reach activeTasks = 0
@@ -218,21 +250,21 @@ class BulkWriter(container: CosmosAsyncContainer,
218250
lock.unlock()
219251
}
220252

221-
logInfo("invoking bulkInputEmitter.onComplete()")
253+
log.logInfo("invoking bulkInputEmitter.onComplete()")
222254
semaphore.release(activeTasks.get())
223255
bulkInputEmitter.tryEmitComplete()
224256

225257
// which error to report?
226258
if (errorCaptureFirstException.get() != null) {
227-
logError(s"flushAndClose throw captured error ${errorCaptureFirstException.get().getMessage}")
259+
log.logError(s"flushAndClose throw captured error ${errorCaptureFirstException.get().getMessage}")
228260
throw errorCaptureFirstException.get()
229261
}
230262

231263
assume(activeTasks.get() == 0)
232264
assume(activeOperations.isEmpty)
233265
assume(semaphore.availablePermits() == maxPendingOperations)
234266

235-
logInfo(s"flushAndClose completed with no error. " +
267+
log.logInfo(s"flushAndClose completed with no error. " +
236268
s"totalSuccessfulIngestionMetrics=${totalSuccessfulIngestionMetrics.get()}, totalScheduled=${totalScheduledMetrics}")
237269
assume(totalScheduledMetrics.get() == totalSuccessfulIngestionMetrics.get)
238270
} finally {
@@ -253,7 +285,7 @@ class BulkWriter(container: CosmosAsyncContainer,
253285
}
254286

255287
private def captureIfFirstFailure(throwable: Throwable) = {
256-
logError("capture failure", throwable)
288+
log.logError("capture failure", throwable)
257289
lock.lock()
258290
try {
259291
errorCaptureFirstException.compareAndSet(null, throwable)
@@ -264,7 +296,7 @@ class BulkWriter(container: CosmosAsyncContainer,
264296
}
265297

266298
private def cancelWork(): Unit = {
267-
logInfo(s"cancelling remaining un process tasks ${activeTasks.get}")
299+
log.logInfo(s"cancelling remaining un process tasks ${activeTasks.get}")
268300
subscriptionDisposable.dispose()
269301
}
270302

sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedBatch.scala

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
package com.azure.cosmos.spark
44

55
import com.azure.cosmos.implementation.{CosmosClientMetadataCachesSnapshot, SparkBridgeImplementationInternal}
6+
import com.azure.cosmos.spark.diagnostics.LoggerHelper
67
import org.apache.spark.broadcast.Broadcast
78
import org.apache.spark.sql.SparkSession
89
import org.apache.spark.sql.connector.read.{Batch, InputPartition, PartitionReaderFactory}
@@ -16,17 +17,19 @@ private class ChangeFeedBatch
1617
session: SparkSession,
1718
schema: StructType,
1819
config: Map[String, String],
19-
cosmosClientStateHandle: Broadcast[CosmosClientMetadataCachesSnapshot]
20-
) extends Batch
21-
with CosmosLoggingTrait {
20+
cosmosClientStateHandle: Broadcast[CosmosClientMetadataCachesSnapshot],
21+
diagnosticsConfig: DiagnosticsConfig
22+
) extends Batch {
23+
24+
@transient private lazy val log = LoggerHelper.getLogger(diagnosticsConfig, this.getClass)
2225

2326
val batchId = UUID.randomUUID().toString()
24-
logTrace(s"Instantiated ${this.getClass.getSimpleName}")
27+
log.logTrace(s"Instantiated ${this.getClass.getSimpleName}")
2528
val defaultParallelism = session.sparkContext.defaultParallelism
2629

2730
override def planInputPartitions(): Array[InputPartition] = {
2831

29-
logInfo(s"--> planInputPartitions $batchId")
32+
log.logInfo(s"--> planInputPartitions $batchId")
3033
val readConfig = CosmosReadConfig.parseCosmosReadConfig(config)
3134
val clientConfiguration = CosmosClientConfiguration.apply(config, readConfig.forceEventualConsistency)
3235
val containerConfig = CosmosContainerConfig.parseCosmosContainerConfig(config)
@@ -66,11 +69,11 @@ private class ChangeFeedBatch
6669
.extractChangeFeedStateForRange(initialOffsetJson, partition.feedRange),
6770
clearEndLsn = true))
6871

69-
logInfo(s"<-- planInputPartitions $batchId (creating ${inputPartitions.length} partitions)" )
72+
log.logInfo(s"<-- planInputPartitions $batchId (creating ${inputPartitions.length} partitions)" )
7073
inputPartitions
7174
}
7275

7376
override def createReaderFactory(): PartitionReaderFactory = {
74-
ChangeFeedScanPartitionReaderFactory(config, schema, cosmosClientStateHandle)
77+
ChangeFeedScanPartitionReaderFactory(config, schema, cosmosClientStateHandle, diagnosticsConfig)
7578
}
7679
}

sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedMicroBatchStream.scala

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,11 @@ package com.azure.cosmos.spark
44

55
import com.azure.cosmos.implementation.{CosmosClientMetadataCachesSnapshot, SparkBridgeImplementationInternal}
66
import com.azure.cosmos.spark.CosmosPredicates.{assertNotNull, assertNotNullOrEmpty, assertOnSparkDriver}
7+
import com.azure.cosmos.spark.diagnostics.LoggerHelper
78
import org.apache.spark.broadcast.Broadcast
89
import org.apache.spark.sql.SparkSession
9-
import org.apache.spark.sql.connector.read.{InputPartition, PartitionReaderFactory}
1010
import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset, ReadLimit, SupportsAdmissionControl}
11+
import org.apache.spark.sql.connector.read.{InputPartition, PartitionReaderFactory}
1112
import org.apache.spark.sql.types.StructType
1213

1314
import java.time.Duration
@@ -21,13 +22,15 @@ private class ChangeFeedMicroBatchStream
2122
val schema: StructType,
2223
val config: Map[String, String],
2324
val cosmosClientStateHandle: Broadcast[CosmosClientMetadataCachesSnapshot],
24-
val checkpointLocation: String
25+
val checkpointLocation: String,
26+
diagnosticsConfig: DiagnosticsConfig
2527
) extends MicroBatchStream
26-
with SupportsAdmissionControl
27-
with CosmosLoggingTrait {
28+
with SupportsAdmissionControl {
29+
30+
@transient private lazy val log = LoggerHelper.getLogger(diagnosticsConfig, this.getClass)
2831

2932
private val streamId = UUID.randomUUID().toString
30-
logTrace(s"Instantiated ${this.getClass.getSimpleName}.$streamId")
33+
log.logTrace(s"Instantiated ${this.getClass.getSimpleName}.$streamId")
3134

3235
private val defaultParallelism = session.sparkContext.defaultParallelism
3336
private val readConfig = CosmosReadConfig.parseCosmosReadConfig(config)
@@ -66,15 +69,15 @@ private class ChangeFeedMicroBatchStream
6669
assert(startOffset.isInstanceOf[ChangeFeedOffset], "Argument 'startOffset' is not a change feed offset.")
6770
assert(endOffset.isInstanceOf[ChangeFeedOffset], "Argument 'endOffset' is not a change feed offset.")
6871

69-
logInfo(s"--> planInputPartitions.$streamId, startOffset: ${startOffset.json()} - endOffset: ${endOffset.json()}")
72+
log.logInfo(s"--> planInputPartitions.$streamId, startOffset: ${startOffset.json()} - endOffset: ${endOffset.json()}")
7073
val start = startOffset.asInstanceOf[ChangeFeedOffset]
7174
val end = endOffset.asInstanceOf[ChangeFeedOffset]
7275

7376
val startChangeFeedState = new String(java.util.Base64.getUrlDecoder.decode(start.changeFeedState))
74-
logInfo(s"Start-ChangeFeedState.$streamId: $startChangeFeedState")
77+
log.logInfo(s"Start-ChangeFeedState.$streamId: $startChangeFeedState")
7578

7679
val endChangeFeedState = new String(java.util.Base64.getUrlDecoder.decode(end.changeFeedState))
77-
logInfo(s"End-ChangeFeedState.$streamId: $endChangeFeedState")
80+
log.logInfo(s"End-ChangeFeedState.$streamId: $endChangeFeedState")
7881

7982
assert(end.inputPartitions.isDefined, "Argument 'endOffset.inputPartitions' must not be null or empty.")
8083

@@ -92,8 +95,8 @@ private class ChangeFeedMicroBatchStream
9295
* Returns a factory to create a `PartitionReader` for each `InputPartition`.
9396
*/
9497
override def createReaderFactory(): PartitionReaderFactory = {
95-
logInfo(s"--> createReaderFactory.$streamId")
96-
ChangeFeedScanPartitionReaderFactory(config, schema, cosmosClientStateHandle)
98+
log.logInfo(s"--> createReaderFactory.$streamId")
99+
ChangeFeedScanPartitionReaderFactory(config, schema, cosmosClientStateHandle, diagnosticsConfig)
97100
}
98101

99102
/**
@@ -113,7 +116,7 @@ private class ChangeFeedMicroBatchStream
113116
// serialize them in the end offset returned to avoid any IO calls for the actual partitioning
114117
override def latestOffset(startOffset: Offset, readLimit: ReadLimit): Offset = {
115118

116-
logInfo(s"--> latestOffset.$streamId")
119+
log.logInfo(s"--> latestOffset.$streamId")
117120

118121
val startChangeFeedOffset = startOffset.asInstanceOf[ChangeFeedOffset]
119122
val offset = CosmosPartitionPlanner.getLatestOffset(
@@ -130,11 +133,11 @@ private class ChangeFeedMicroBatchStream
130133
)
131134

132135
if (offset.changeFeedState != startChangeFeedOffset.changeFeedState) {
133-
logInfo(s"<-- latestOffset.$streamId - new offset ${offset.json()}")
136+
log.logInfo(s"<-- latestOffset.$streamId - new offset ${offset.json()}")
134137
this.latestOffsetSnapshot = Some(offset)
135138
offset
136139
} else {
137-
logInfo(s"<-- latestOffset.$streamId - Finished returning null")
140+
log.logInfo(s"<-- latestOffset.$streamId - Finished returning null")
138141

139142
this.latestOffsetSnapshot = None
140143

@@ -165,7 +168,7 @@ private class ChangeFeedMicroBatchStream
165168
newOffsetJson
166169
}
167170

168-
logInfo(s"MicroBatch stream $streamId: Initial offset '$offsetJson'.")
171+
log.logInfo(s"MicroBatch stream $streamId: Initial offset '$offsetJson'.")
169172
ChangeFeedOffset(offsetJson, None)
170173
}
171174

@@ -194,7 +197,7 @@ private class ChangeFeedMicroBatchStream
194197
* @throws IllegalArgumentException if the JSON does not encode a valid offset for this reader
195198
*/
196199
override def deserializeOffset(s: String): Offset = {
197-
logDebug(s"MicroBatch stream $streamId: Deserialized offset '$s'.")
200+
log.logDebug(s"MicroBatch stream $streamId: Deserialized offset '$s'.")
198201
ChangeFeedOffset.fromJson(s)
199202
}
200203

@@ -203,14 +206,14 @@ private class ChangeFeedMicroBatchStream
203206
* equal to `end` and will only request offsets greater than `end` in the future.
204207
*/
205208
override def commit(offset: Offset): Unit = {
206-
logInfo(s"MicroBatch stream $streamId: Committed offset '${offset.json()}'.")
209+
log.logInfo(s"MicroBatch stream $streamId: Committed offset '${offset.json()}'.")
207210
}
208211

209212
/**
210213
* Stop this source and free any resources it has allocated.
211214
*/
212215
override def stop(): Unit = {
213-
logInfo(s"MicroBatch stream $streamId: stopped.")
216+
log.logInfo(s"MicroBatch stream $streamId: stopped.")
214217
}
215218
}
216219
// scalastyle:on multiple.string.literals

0 commit comments

Comments
 (0)