Skip to content

Commit 0301834

Browse files
Cosmos Spark: Fixing ItemCountPerTriggerHint handling in Structured Streaming (Azure#27101)
* Cosmos Spark: Fixing ItemCountPerTriggerHint handling in Structured Streaming * Updating changelog * Update CosmosPartitionPlanner.scala * Making endLsn calculation more robust * Update sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosPartitionPlanner.scala Co-authored-by: Matias Quaranta <ealsur@users.noreply.github.com> * Update sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosPartitionPlanner.scala Co-authored-by: Matias Quaranta <ealsur@users.noreply.github.com> * Update sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosPartitionPlanner.scala Co-authored-by: Matias Quaranta <ealsur@users.noreply.github.com> * Update sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosPartitionPlanner.scala Co-authored-by: Matias Quaranta <ealsur@users.noreply.github.com> * Update CosmosPartitionPlanner.scala * Update PartitionMetadataCache.scala Co-authored-by: Matias Quaranta <ealsur@users.noreply.github.com>
1 parent 32ed314 commit 0301834

File tree

13 files changed

+854
-132
lines changed

13 files changed

+854
-132
lines changed

sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
#### Breaking Changes
88

99
#### Bugs Fixed
10+
* Fixed an issue preventing preferred regions configured in `spark.cosmos.preferredRegionsList` from being used - See [PR 27084](https://github.com/Azure/azure-sdk-for-java/pull/27084)
11+
* Fixed `spark.cosmos.changeFeed.itemCountPerTriggerHint` handling when using structured streaming - there was an issue that would reduce the throughput in subsequent micro batches too aggressively. - See [PR 27101](https://github.com/Azure/azure-sdk-for-java/pull/27101)
1012

1113
#### Other Changes
1214

sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedMicroBatchStream.scala

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -74,15 +74,15 @@ private class ChangeFeedMicroBatchStream
7474
assert(startOffset.isInstanceOf[ChangeFeedOffset], "Argument 'startOffset' is not a change feed offset.")
7575
assert(endOffset.isInstanceOf[ChangeFeedOffset], "Argument 'endOffset' is not a change feed offset.")
7676

77-
log.logInfo(s"--> planInputPartitions.$streamId, startOffset: ${startOffset.json()} - endOffset: ${endOffset.json()}")
77+
log.logDebug(s"--> planInputPartitions.$streamId, startOffset: ${startOffset.json()} - endOffset: ${endOffset.json()}")
7878
val start = startOffset.asInstanceOf[ChangeFeedOffset]
7979
val end = endOffset.asInstanceOf[ChangeFeedOffset]
8080

8181
val startChangeFeedState = new String(java.util.Base64.getUrlDecoder.decode(start.changeFeedState))
82-
log.logInfo(s"Start-ChangeFeedState.$streamId: $startChangeFeedState")
82+
log.logDebug(s"Start-ChangeFeedState.$streamId: $startChangeFeedState")
8383

8484
val endChangeFeedState = new String(java.util.Base64.getUrlDecoder.decode(end.changeFeedState))
85-
log.logInfo(s"End-ChangeFeedState.$streamId: $endChangeFeedState")
85+
log.logDebug(s"End-ChangeFeedState.$streamId: $endChangeFeedState")
8686

8787
assert(end.inputPartitions.isDefined, "Argument 'endOffset.inputPartitions' must not be null or empty.")
8888

@@ -100,7 +100,7 @@ private class ChangeFeedMicroBatchStream
100100
* Returns a factory to create a `PartitionReader` for each `InputPartition`.
101101
*/
102102
override def createReaderFactory(): PartitionReaderFactory = {
103-
log.logInfo(s"--> createReaderFactory.$streamId")
103+
log.logDebug(s"--> createReaderFactory.$streamId")
104104
ChangeFeedScanPartitionReaderFactory(config, schema, cosmosClientStateHandle, diagnosticsConfig)
105105
}
106106

@@ -121,7 +121,7 @@ private class ChangeFeedMicroBatchStream
121121
// serialize them in the end offset returned to avoid any IO calls for the actual partitioning
122122
override def latestOffset(startOffset: Offset, readLimit: ReadLimit): Offset = {
123123

124-
log.logInfo(s"--> latestOffset.$streamId")
124+
log.logDebug(s"--> latestOffset.$streamId")
125125

126126
val startChangeFeedOffset = startOffset.asInstanceOf[ChangeFeedOffset]
127127
val offset = CosmosPartitionPlanner.getLatestOffset(
@@ -138,11 +138,11 @@ private class ChangeFeedMicroBatchStream
138138
)
139139

140140
if (offset.changeFeedState != startChangeFeedOffset.changeFeedState) {
141-
log.logInfo(s"<-- latestOffset.$streamId - new offset ${offset.json()}")
141+
log.logDebug(s"<-- latestOffset.$streamId - new offset ${offset.json()}")
142142
this.latestOffsetSnapshot = Some(offset)
143143
offset
144144
} else {
145-
log.logInfo(s"<-- latestOffset.$streamId - Finished returning null")
145+
log.logDebug(s"<-- latestOffset.$streamId - Finished returning null")
146146

147147
this.latestOffsetSnapshot = None
148148

@@ -173,7 +173,7 @@ private class ChangeFeedMicroBatchStream
173173
newOffsetJson
174174
}
175175

176-
log.logInfo(s"MicroBatch stream $streamId: Initial offset '$offsetJson'.")
176+
log.logDebug(s"MicroBatch stream $streamId: Initial offset '$offsetJson'.")
177177
ChangeFeedOffset(offsetJson, None)
178178
}
179179

@@ -210,15 +210,15 @@ private class ChangeFeedMicroBatchStream
210210
* equal to `end` and will only request offsets greater than `end` in the future.
211211
*/
212212
override def commit(offset: Offset): Unit = {
213-
log.logInfo(s"MicroBatch stream $streamId: Committed offset '${offset.json()}'.")
213+
log.logDebug(s"MicroBatch stream $streamId: Committed offset '${offset.json()}'.")
214214
}
215215

216216
/**
217217
* Stop this source and free any resources it has allocated.
218218
*/
219219
override def stop(): Unit = {
220220
clientCacheItem.close()
221-
log.logInfo(s"MicroBatch stream $streamId: stopped.")
221+
log.logDebug(s"MicroBatch stream $streamId: stopped.")
222222
}
223223
}
224224
// scalastyle:on multiple.string.literals

sdk/cosmos/azure-cosmos-spark_3-2_2-12/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
#### Breaking Changes
88

99
#### Bugs Fixed
10+
* Fixed an issue preventing preferred regions configured in `spark.cosmos.preferredRegionsList` from being used - See [PR 27084](https://github.com/Azure/azure-sdk-for-java/pull/27084)
11+
* Fixed `spark.cosmos.changeFeed.itemCountPerTriggerHint` handling when using structured streaming - there was an issue that would reduce the throughput in subsequent micro batches too aggressively. - See [PR 27101](https://github.com/Azure/azure-sdk-for-java/pull/27101)
1012

1113
#### Other Changes
1214

sdk/cosmos/azure-cosmos-spark_3-2_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedMicroBatchStream.scala

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -74,15 +74,15 @@ private class ChangeFeedMicroBatchStream
7474
assert(startOffset.isInstanceOf[ChangeFeedOffset], "Argument 'startOffset' is not a change feed offset.")
7575
assert(endOffset.isInstanceOf[ChangeFeedOffset], "Argument 'endOffset' is not a change feed offset.")
7676

77-
log.logInfo(s"--> planInputPartitions.$streamId, startOffset: ${startOffset.json()} - endOffset: ${endOffset.json()}")
77+
log.logDebug(s"--> planInputPartitions.$streamId, startOffset: ${startOffset.json()} - endOffset: ${endOffset.json()}")
7878
val start = startOffset.asInstanceOf[ChangeFeedOffset]
7979
val end = endOffset.asInstanceOf[ChangeFeedOffset]
8080

8181
val startChangeFeedState = new String(java.util.Base64.getUrlDecoder.decode(start.changeFeedState))
82-
log.logInfo(s"Start-ChangeFeedState.$streamId: $startChangeFeedState")
82+
log.logDebug(s"Start-ChangeFeedState.$streamId: $startChangeFeedState")
8383

8484
val endChangeFeedState = new String(java.util.Base64.getUrlDecoder.decode(end.changeFeedState))
85-
log.logInfo(s"End-ChangeFeedState.$streamId: $endChangeFeedState")
85+
log.logDebug(s"End-ChangeFeedState.$streamId: $endChangeFeedState")
8686

8787
assert(end.inputPartitions.isDefined, "Argument 'endOffset.inputPartitions' must not be null or empty.")
8888

@@ -100,7 +100,7 @@ private class ChangeFeedMicroBatchStream
100100
* Returns a factory to create a `PartitionReader` for each `InputPartition`.
101101
*/
102102
override def createReaderFactory(): PartitionReaderFactory = {
103-
log.logInfo(s"--> createReaderFactory.$streamId")
103+
log.logDebug(s"--> createReaderFactory.$streamId")
104104
ChangeFeedScanPartitionReaderFactory(config, schema, cosmosClientStateHandle, diagnosticsConfig)
105105
}
106106

@@ -121,7 +121,7 @@ private class ChangeFeedMicroBatchStream
121121
// serialize them in the end offset returned to avoid any IO calls for the actual partitioning
122122
override def latestOffset(startOffset: Offset, readLimit: ReadLimit): Offset = {
123123

124-
log.logInfo(s"--> latestOffset.$streamId")
124+
log.logDebug(s"--> latestOffset.$streamId")
125125

126126
val startChangeFeedOffset = startOffset.asInstanceOf[ChangeFeedOffset]
127127
val offset = CosmosPartitionPlanner.getLatestOffset(
@@ -138,11 +138,11 @@ private class ChangeFeedMicroBatchStream
138138
)
139139

140140
if (offset.changeFeedState != startChangeFeedOffset.changeFeedState) {
141-
log.logInfo(s"<-- latestOffset.$streamId - new offset ${offset.json()}")
141+
log.logDebug(s"<-- latestOffset.$streamId - new offset ${offset.json()}")
142142
this.latestOffsetSnapshot = Some(offset)
143143
offset
144144
} else {
145-
log.logInfo(s"<-- latestOffset.$streamId - Finished returning null")
145+
log.logDebug(s"<-- latestOffset.$streamId - Finished returning null")
146146

147147
this.latestOffsetSnapshot = None
148148

@@ -173,7 +173,7 @@ private class ChangeFeedMicroBatchStream
173173
newOffsetJson
174174
}
175175

176-
log.logInfo(s"MicroBatch stream $streamId: Initial offset '$offsetJson'.")
176+
log.logDebug(s"MicroBatch stream $streamId: Initial offset '$offsetJson'.")
177177
ChangeFeedOffset(offsetJson, None)
178178
}
179179

@@ -210,15 +210,15 @@ private class ChangeFeedMicroBatchStream
210210
* equal to `end` and will only request offsets greater than `end` in the future.
211211
*/
212212
override def commit(offset: Offset): Unit = {
213-
log.logInfo(s"MicroBatch stream $streamId: Committed offset '${offset.json()}'.")
213+
log.logDebug(s"MicroBatch stream $streamId: Committed offset '${offset.json()}'.")
214214
}
215215

216216
/**
217217
* Stop this source and free any resources it has allocated.
218218
*/
219219
override def stop(): Unit = {
220220
clientCacheItem.close()
221-
log.logInfo(s"MicroBatch stream $streamId: stopped.")
221+
log.logDebug(s"MicroBatch stream $streamId: stopped.")
222222
}
223223
}
224224
// scalastyle:on multiple.string.literals

sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosPartitionPlanner.scala

Lines changed: 41 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -93,15 +93,23 @@ private object CosmosPartitionPlanner extends BasicLoggingTrait {
9393
}
9494

9595
private[this] def getContinuationTokenLsnOfFirstItem(items: Iterable[ObjectNode]): Option[String] = {
96+
getLsnOfFirstItem(items) match {
97+
case Some(firstLsn) =>
98+
Some(SparkBridgeImplementationInternal.toContinuationToken(firstLsn))
99+
case None => None
100+
}
101+
}
102+
103+
private[spark] def getLsnOfFirstItem(items: Iterable[ObjectNode]): Option[Long] = {
96104
items
97105
.collectFirst({
98106
case item: ObjectNode if item != null =>
99107
val lsnNode = item.get(LsnAttributeName)
100108
if (lsnNode != null && lsnNode.isNumber) {
101-
// when grabbing the LSN from the item we need to use the item's LSN -1
102-
// to ensure we would retrieve this item again
103109
Some(
104-
SparkBridgeImplementationInternal.toContinuationToken(lsnNode.asLong() - 1))
110+
// when grabbing the LSN from the item we need to use the item's LSN -1
111+
// to ensure we would retrieve this item again
112+
lsnNode.asLong() - 1)
105113
} else {
106114
None
107115
}
@@ -432,7 +440,7 @@ private object CosmosPartitionPlanner extends BasicLoggingTrait {
432440
val scaleFactor = if (storageSizeInMB == 0) {
433441
1
434442
} else {
435-
progressWeightFactor * storageSizeInMB.toDouble
443+
progressWeightFactor * storageSizeInMB
436444
}
437445

438446
val planningInfo = PartitionPlanningInfo(
@@ -466,18 +474,35 @@ private object CosmosPartitionPlanner extends BasicLoggingTrait {
466474
.map(metadata => {
467475
val endLsn = readLimit match {
468476
case _: ReadAllAvailable => metadata.latestLsn
469-
case _: ReadMaxRows =>
470-
val gap = math.max(0, metadata.latestLsn - metadata.startLsn)
471-
val weightFactor = metadata.getWeightedLsnGap.toDouble / totalWeightedLsnGap.get
472-
val allowedRate = (weightFactor * gap).toLong.max(1)
473-
if (isDebugLogEnabled) {
474-
val calculateDebugLine = s"calculateEndLsn - gap $gap weightFactor $weightFactor " +
475-
s"documentCount ${metadata.documentCount} latestLsn ${metadata.latestLsn} " +
476-
s"startLsn ${metadata.startLsn} allowedRate $allowedRate weightedGap ${metadata.getWeightedLsnGap}"
477-
logDebug(calculateDebugLine)
478-
}
477+
case maxRowsLimit: ReadMaxRows =>
478+
if (totalWeightedLsnGap.get <= maxRowsLimit.maxRows) {
479+
if (isDebugLogEnabled) {
480+
val calculateDebugLine = s"calculateEndLsn (feedRange: ${metadata.feedRange}) - avg. Docs " +
481+
s"per LSN: ${metadata.getAvgItemsPerLsn} documentCount ${metadata.documentCount} firstLsn " +
482+
s"${metadata.firstLsn} latestLsn ${metadata.latestLsn} startLsn ${metadata.startLsn} weightedGap " +
483+
s"${metadata.getWeightedLsnGap} effectiveEndLsn ${metadata.latestLsn} maxRows ${maxRowsLimit.maxRows}"
484+
logDebug(calculateDebugLine)
485+
}
486+
metadata.latestLsn
487+
} else {
488+
// the weight of this feedRange compared to other feedRanges
489+
val feedRangeWeightFactor = metadata.getWeightedLsnGap.toDouble / totalWeightedLsnGap.get
490+
491+
val allowedRate = (feedRangeWeightFactor * maxRowsLimit.maxRows() / metadata.getAvgItemsPerLsn)
492+
.toLong
493+
.max(1)
494+
val effectiveEndLsn = math.min(metadata.latestLsn, metadata.startLsn + allowedRate)
495+
if (isDebugLogEnabled) {
496+
val calculateDebugLine = s"calculateEndLsn (feedRange: ${metadata.feedRange}) - avg. Docs/LSN: " +
497+
s"${metadata.getAvgItemsPerLsn} feedRangeWeightFactor $feedRangeWeightFactor documentCount " +
498+
s"${metadata.documentCount} firstLsn ${metadata.firstLsn} latestLsn ${metadata.latestLsn} startLsn " +
499+
s"${metadata.startLsn} allowedRate $allowedRate weightedGap ${metadata.getWeightedLsnGap} " +
500+
s"effectiveEndLsn $effectiveEndLsn maxRows $maxRowsLimit.maxRows"
501+
logDebug(calculateDebugLine)
502+
}
479503

480-
math.min(metadata.latestLsn, metadata.startLsn + allowedRate)
504+
effectiveEndLsn
505+
}
481506
case _: ReadMaxFiles => throw new IllegalStateException("ReadLimitMaxFiles not supported by this source.")
482507
}
483508

@@ -496,7 +521,7 @@ private object CosmosPartitionPlanner extends BasicLoggingTrait {
496521
} else if (effectiveEndLsn <= metadata.startLsn) {
497522
// If progress has caught up with estimation already make sure we only use one Spark partition
498523
// for the physical partition in Cosmos
499-
1 / storageSizeInMB.toDouble
524+
1 / storageSizeInMB
500525
} else {
501526
// Use weight factor based on progress. This estimate assumes equal distribution of storage
502527
// size per LSN - which is a "good enough" simplification

sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/PartitionMetadata.scala

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,16 @@ private object PartitionMetadata {
2424
feedRange: NormalizedRange,
2525
documentCount: Long,
2626
totalDocumentSizeInKB: Long,
27-
continuationToken: String,
27+
firstLsn: Option[Long],
28+
fromNowContinuationToken: String,
2829
startLsn: Long = 0,
2930
endLsn: Option[Long] = None): PartitionMetadata = {
3031
// scalastyle:on parameter.number
3132

3233
val nowEpochMs = Instant.now().toEpochMilli
3334

3435
val latestLsn = SparkBridgeImplementationInternal.extractLsnFromChangeFeedContinuation(
35-
continuationToken)
36+
fromNowContinuationToken)
3637

3738
PartitionMetadata(
3839
userConfig,
@@ -42,6 +43,7 @@ private object PartitionMetadata {
4243
feedRange,
4344
documentCount,
4445
totalDocumentSizeInKB,
46+
firstLsn,
4547
latestLsn,
4648
startLsn,
4749
endLsn,
@@ -60,6 +62,7 @@ private[cosmos] case class PartitionMetadata
6062
feedRange: NormalizedRange,
6163
documentCount: Long,
6264
totalDocumentSizeInKB: Long,
65+
firstLsn: Option[Long],
6366
latestLsn: Long,
6467
startLsn: Long,
6568
endLsn: Option[Long],
@@ -83,6 +86,7 @@ private[cosmos] case class PartitionMetadata
8386
subRange,
8487
this.documentCount,
8588
this.totalDocumentSizeInKB,
89+
this.firstLsn,
8690
this.latestLsn,
8791
startLsn,
8892
this.endLsn,
@@ -100,6 +104,7 @@ private[cosmos] case class PartitionMetadata
100104
this.feedRange,
101105
this.documentCount,
102106
this.totalDocumentSizeInKB,
107+
this.firstLsn,
103108
this.latestLsn,
104109
startLsn,
105110
Some(explicitEndLsn),
@@ -110,14 +115,24 @@ private[cosmos] case class PartitionMetadata
110115

111116
def getWeightedLsnGap: Long = {
112117
val progressFactor = math.max(this.latestLsn - this.startLsn, 0)
113-
val averageItemsPerLsn = if (this.documentCount == 0) {
114-
1d
118+
if (progressFactor == 0) {
119+
0
115120
} else {
116-
this.latestLsn / this.documentCount.toDouble
121+
val averageItemsPerLsn = getAvgItemsPerLsn
122+
123+
val weightedGap: Double = progressFactor * averageItemsPerLsn
124+
// Any double less than 1 gets rounded to 0 when toLong is invoked
125+
weightedGap.toLong.max(1)
117126
}
127+
}
118128

119-
val weightedGap: Double = progressFactor * averageItemsPerLsn
120-
// Any double less than 1 gets rounded to 0 when toLong is invoked
121-
weightedGap.toLong.max(1)
129+
def getAvgItemsPerLsn: Double = {
130+
if (this.firstLsn.isEmpty) {
131+
math.max(1d, this.documentCount.toDouble / this.latestLsn)
132+
} else if (this.documentCount == 0 || (this.latestLsn - this.firstLsn.get) <= 0) {
133+
1d
134+
} else {
135+
this.documentCount.toDouble / (this.latestLsn - this.firstLsn.get)
136+
}
122137
}
123138
}

0 commit comments

Comments
 (0)