Skip to content

Commit b8b45be

Browse files
committed
Deprecate some of the older/unsafe APIs
1 parent fb3e56a commit b8b45be

File tree

5 files changed

+193
-13
lines changed

5 files changed

+193
-13
lines changed

src/main/kotlin/io/github/nomisRev/kafka/Consumer.kt

Lines changed: 64 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
package io.github.nomisRev.kafka
44

55
import io.github.nomisRev.kafka.internal.chunked
6+
import io.github.nomisRev.kafka.receiver.ReceiverSettings
67
import kotlinx.coroutines.CancellationException
78
import kotlinx.coroutines.CoroutineDispatcher
89
import java.time.Duration
@@ -73,6 +74,13 @@ import kotlin.coroutines.suspendCoroutine
7374
import kotlin.time.Duration.Companion.milliseconds
7475
import kotlin.time.toJavaDuration
7576

77+
@Deprecated(
78+
"Use KafkaReceiver instead. This function will be removed in 0.4.0",
79+
ReplaceWith(
80+
"KafkaReceiver(settings.toReceiverSettings())",
81+
"io.github.nomisRev.kafka.receiver.KafkaReceiver"
82+
)
83+
)
7684
public fun <K, V> KafkaConsumer(settings: ConsumerSettings<K, V>): KafkaConsumer<K, V> =
7785
KafkaConsumer(settings.properties(), settings.keyDeserializer, settings.valueDeserializer)
7886

@@ -102,6 +110,13 @@ public fun <K, V> KafkaConsumer(settings: ConsumerSettings<K, V>): KafkaConsumer
102110
* ```
103111
* <!--- KNIT example-consumer-01.kt -->
104112
*/
113+
@Deprecated(
114+
"Use KafkaReceiver#receive instead. This function will be removed in 0.4.0",
115+
ReplaceWith(
116+
"KafkaReceiver(settings.toReceiverSettings()).receive()",
117+
"io.github.nomisRev.kafka.receiver.KafkaReceiver"
118+
)
119+
)
105120
public fun <K, V> kafkaConsumer(settings: ConsumerSettings<K, V>): Flow<KafkaConsumer<K, V>> =
106121
flow {
107122
KafkaConsumer(settings).use {
@@ -115,6 +130,7 @@ public fun <K, V> kafkaConsumer(settings: ConsumerSettings<K, V>): Flow<KafkaCon
115130
@OptIn(FlowPreview::class)
116131
@ExperimentalCoroutinesApi
117132
@JvmName("commitBatchesWithin")
133+
@Deprecated("Use KafkaReceiver instead. It comes with strong guarantees about commits")
118134
public fun <K, V> Flow<ConsumerRecords<K, V>>.commitBatchWithin(
119135
settings: ConsumerSettings<K, V>,
120136
count: Int,
@@ -129,6 +145,7 @@ public fun <K, V> Flow<ConsumerRecords<K, V>>.commitBatchWithin(
129145

130146
@OptIn(FlowPreview::class)
131147
@ExperimentalCoroutinesApi
148+
@Deprecated("Use KafkaReceiver instead. It comes with strong guarantees about commits")
132149
public fun <K, V> Flow<ConsumerRecord<K, V>>.commitBatchWithin(
133150
settings: ConsumerSettings<K, V>,
134151
count: Int,
@@ -143,6 +160,10 @@ public fun <K, V> Flow<ConsumerRecord<K, V>>.commitBatchWithin(
143160
}
144161
}
145162

163+
@Deprecated(
164+
"Use KafkaReceiver instead. It comes with strong guarantees about commits." +
165+
"You can only commit while polling, which is done automatically for you with KafkaReceiver"
166+
)
146167
public suspend fun <K, V> KafkaConsumer<K, V>.commitAwait(
147168
offsets: Map<TopicPartition, OffsetAndMetadata>,
148169
): Map<TopicPartition, OffsetAndMetadata> =
@@ -158,15 +179,16 @@ public operator fun <K, V> ConsumerRecord<K, V>.component2(): V = value()
158179

159180
public fun <K, V> Iterable<ConsumerRecord<K, V>>.offsets(
160181
metadata: ((record: ConsumerRecord<K, V>) -> String)? = null,
161-
): Map<TopicPartition, OffsetAndMetadata> = mutableMapOf<TopicPartition, OffsetAndMetadata>().apply {
162-
this@offsets.forEach { record ->
163-
val key = TopicPartition(record.topic(), record.partition())
164-
val value = metadata?.let {
165-
OffsetAndMetadata(record.offset() + 1, record.leaderEpoch(), metadata(record))
166-
} ?: OffsetAndMetadata(record.offset() + 1)
167-
put(key, value)
182+
): Map<TopicPartition, OffsetAndMetadata> =
183+
mutableMapOf<TopicPartition, OffsetAndMetadata>().apply {
184+
this@offsets.forEach { record ->
185+
val key = TopicPartition(record.topic(), record.partition())
186+
val value = metadata?.let {
187+
OffsetAndMetadata(record.offset() + 1, record.leaderEpoch(), metadata(record))
188+
} ?: OffsetAndMetadata(record.offset() + 1)
189+
put(key, value)
190+
}
168191
}
169-
}
170192

171193
public fun <K, V> ConsumerRecord<K, V>.offsets(
172194
metadata: ((record: ConsumerRecord<K, V>) -> String)? = null,
@@ -187,6 +209,13 @@ public fun <K, V> List<ConsumerRecords<K, V>>.offsets(
187209
}
188210
}
189211

212+
@Deprecated(
213+
"Use KafkaReceiver#receive instead. This function will be removed in 0.4.0",
214+
ReplaceWith(
215+
"KafkaReceiver(settings.toReceiverSettings()).receive()",
216+
"io.github.nomisRev.kafka.receiver.KafkaReceiver"
217+
)
218+
)
190219
@OptIn(FlowPreview::class)
191220
public fun <K, V> Flow<KafkaConsumer<K, V>>.subscribeTo(
192221
name: String,
@@ -197,6 +226,13 @@ public fun <K, V> Flow<KafkaConsumer<K, V>>.subscribeTo(
197226
consumer.subscribeTo(name, dispatcher, listener, timeout)
198227
}
199228

229+
@Deprecated(
230+
"Use KafkaReceiver#receive instead. This function will be removed in 0.4.0",
231+
ReplaceWith(
232+
"KafkaReceiver(settings.toReceiverSettings()).receive()",
233+
"io.github.nomisRev.kafka.receiver.KafkaReceiver"
234+
)
235+
)
200236
/** Subscribes to the [KafkaConsumer] and polls for events in an interruptible way. */
201237
public fun <K, V> KafkaConsumer<K, V>.subscribeTo(
202238
name: String,
@@ -220,13 +256,22 @@ public fun <K, V> KafkaConsumer<K, V>.subscribeTo(
220256
}
221257
}
222258

259+
@Deprecated(
260+
"Use io.github.nomisRev.kafka.receiver.AutoOffsetReset instead",
261+
ReplaceWith(
262+
"this",
263+
"io.github.nomisRev.kafka.receiver.AutoOffsetReset"
264+
)
265+
)
223266
public enum class AutoOffsetReset(public val value: String) {
224267
Earliest("earliest"), Latest("latest"), None("none")
225268
}
226269

227-
// TODO Compare with reactor-kafka
228-
// TODO should be easier to use `null`/`Nothing` Key
229270
/** Default values taken from [org.apache.kafka.clients.consumer.ConsumerConfig] */
271+
@Deprecated(
272+
"Use ReceiverSettings with KafkaReceiver instead.",
273+
ReplaceWith("toReceiverSettings()")
274+
)
230275
public data class ConsumerSettings<K, V>(
231276
// BOOTSTRAP_SERVERS_CONFIG
232277
val bootstrapServers: String,
@@ -299,6 +344,15 @@ public data class ConsumerSettings<K, V>(
299344
// Optional parameter that allows for setting properties not defined here
300345
private val properties: Properties? = null,
301346
) {
347+
public fun toReceiverSettings(): ReceiverSettings<K, V> =
348+
ReceiverSettings(
349+
bootstrapServers,
350+
keyDeserializer,
351+
valueDeserializer,
352+
groupId,
353+
properties = properties()
354+
)
355+
302356
public fun properties(): Properties = Properties().apply {
303357
properties?.let { putAll(it) }
304358
put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)

src/main/kotlin/io/github/nomisRev/kafka/receiver/ReceiverSettings.kt

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
package io.github.nomisRev.kafka.receiver
22

3-
import io.github.nomisRev.kafka.AutoOffsetReset
4-
import io.github.nomisRev.kafka.NothingDeserializer
53
import org.apache.kafka.clients.consumer.ConsumerConfig
64
import org.apache.kafka.common.serialization.Deserializer
75
import java.util.Properties
@@ -15,6 +13,9 @@ private const val DEFAULT_MAX_COMMIT_ATTEMPTS = 100
1513
private val DEFAULT_COMMIT_RETRY_INTERVAL = 500.milliseconds
1614
private val DEFAULT_COMMIT_INTERVAL = 5.seconds
1715

16+
public typealias AutoOffsetReset =
17+
io.github.nomisRev.kafka.AutoOffsetReset
18+
1819
/**
1920
* A data class that exposes configuration for [KafkaReceiver],
2021
* and the underlying [org.apache.kafka.clients.consumer.KafkaConsumer].
@@ -45,7 +46,9 @@ public data class ReceiverSettings<K, V>(
4546

4647
internal fun toProperties() = Properties().apply {
4748
put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
48-
put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer::class.qualifiedName)
49+
if (keyDeserializer !== NothingDeserializer) {
50+
put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer::class.qualifiedName)
51+
}
4952
put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer::class.qualifiedName)
5053
put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
5154
put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
@@ -82,3 +85,9 @@ public fun <V> ReceiverSettings(
8285
closeTimeout,
8386
properties
8487
)
88+
89+
private object NothingDeserializer : Deserializer<Nothing> {
90+
override fun close(): Unit = Unit
91+
override fun configure(configs: MutableMap<String, *>?, isKey: Boolean): Unit = Unit
92+
override fun deserialize(topic: String?, data: ByteArray?): Nothing = TODO("Impossible")
93+
}

src/main/kotlin/io/github/nomisRev/kafka/utils/Closeable.kt

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,12 @@ import kotlinx.coroutines.flow.Flow
44
import kotlinx.coroutines.flow.flow
55
import java.io.Closeable
66

7+
@Deprecated(
8+
"Will be removed in Kotlin-Kafka 0.4.x",
9+
ReplaceWith(
10+
"flow { use { emit(it) } }",
11+
"kotlinx.coroutines.flow.flow"
12+
)
13+
)
714
public fun <A : Closeable> A.asFlow(): Flow<A> =
815
flow { use { emit(it) } }
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package io.github.nomisrev.kafka
2+
3+
import kotlinx.coroutines.flow.Flow
4+
import kotlinx.coroutines.flow.map
5+
6+
inline fun <A, B> Flow<A>.mapIndexed(
7+
crossinline transform: suspend (index: Int, value: A) -> B,
8+
): Flow<B> {
9+
var index = 0
10+
return map { value ->
11+
transform(index++, value)
12+
}
13+
}

src/test/kotlin/io/github/nomisrev/kafka/consumer/KafakReceiverSpec.kt

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,22 @@
11
package io.github.nomisrev.kafka.consumer
22

3+
import io.github.nomisRev.kafka.receiver.CommitStrategy
34
import io.github.nomisRev.kafka.receiver.KafkaReceiver
45
import io.github.nomisrev.kafka.KafkaSpec
6+
import io.github.nomisrev.kafka.mapIndexed
57
import io.kotest.assertions.assertSoftly
68
import io.kotest.matchers.collections.shouldContainExactlyInAnyOrder
79
import io.kotest.matchers.shouldBe
10+
import kotlinx.coroutines.CompletableDeferred
811
import kotlinx.coroutines.FlowPreview
12+
import kotlinx.coroutines.awaitCancellation
13+
import kotlinx.coroutines.cancelAndJoin
914
import kotlinx.coroutines.flow.collect
1015
import kotlinx.coroutines.flow.collectIndexed
1116
import kotlinx.coroutines.flow.flatMapConcat
1217
import kotlinx.coroutines.flow.flattenMerge
1318
import kotlinx.coroutines.flow.flowOf
19+
import kotlinx.coroutines.flow.launchIn
1420
import kotlinx.coroutines.flow.map
1521
import kotlinx.coroutines.flow.take
1622
import kotlinx.coroutines.flow.toList
@@ -21,6 +27,7 @@ import org.apache.kafka.clients.producer.ProducerRecord
2127
class KafakReceiverSpec : KafkaSpec({
2228

2329
val depth = 100
30+
val lastIndex = depth - 1
2431
fun produced(
2532
startIndex: Int = 0,
2633
lastIndex: Int = depth,
@@ -83,6 +90,96 @@ class KafakReceiverSpec : KafkaSpec({
8390
}
8491
}
8592

93+
"All acknowledged messages are committed on flow completion" {
94+
withTopic(partitions = 3) { topic ->
95+
publishToKafka(topic, produced())
96+
val receiver = KafkaReceiver(
97+
receiverSetting().copy(
98+
commitStrategy = CommitStrategy.BySize(2 * depth)
99+
)
100+
)
101+
receiver.receive(topic.name())
102+
.take(depth)
103+
.collectIndexed { index, value ->
104+
if (index == lastIndex) {
105+
value.offset.acknowledge()
106+
receiver.committedCount(topic.name()) shouldBe 0
107+
} else value.offset.acknowledge()
108+
}
109+
110+
receiver.committedCount(topic.name()) shouldBe 100
111+
}
112+
}
113+
114+
"All acknowledged messages are committed on flow failure" {
115+
withTopic(partitions = 3) { topic ->
116+
publishToKafka(topic, produced())
117+
val receiver = KafkaReceiver(
118+
receiverSetting().copy(
119+
commitStrategy = CommitStrategy.BySize(2 * depth)
120+
)
121+
)
122+
val failure = RuntimeException("Flow terminates")
123+
runCatching {
124+
receiver.receive(topic.name())
125+
.collectIndexed { index, value ->
126+
if (index == lastIndex) {
127+
value.offset.acknowledge()
128+
receiver.committedCount(topic.name()) shouldBe 0
129+
throw failure
130+
} else value.offset.acknowledge()
131+
}
132+
}.exceptionOrNull() shouldBe failure
133+
134+
receiver.committedCount(topic.name()) shouldBe 100
135+
}
136+
}
137+
138+
"All acknowledged messages are committed on flow cancellation" {
139+
val scope = this
140+
withTopic(partitions = 3) { topic ->
141+
publishToKafka(topic, produced())
142+
val receiver = KafkaReceiver(
143+
receiverSetting().copy(
144+
commitStrategy = CommitStrategy.BySize(2 * depth)
145+
)
146+
)
147+
val latch = CompletableDeferred<Unit>()
148+
val job = receiver.receive(topic.name())
149+
.mapIndexed { index, value ->
150+
if (index == lastIndex) {
151+
value.offset.acknowledge()
152+
receiver.committedCount(topic.name()) shouldBe 0
153+
require(latch.complete(Unit)) { "Latch completed twice" }
154+
} else value.offset.acknowledge()
155+
}.launchIn(scope)
156+
157+
latch.await()
158+
job.cancelAndJoin()
159+
160+
receiver.committedCount(topic.name()) shouldBe 100
161+
}
162+
}
163+
164+
"Manual commit also commits all acknowledged offsets" {
165+
withTopic(partitions = 3) { topic ->
166+
publishToKafka(topic, produced())
167+
val receiver = KafkaReceiver(
168+
receiverSetting().copy(
169+
commitStrategy = CommitStrategy.BySize(2 * depth)
170+
)
171+
)
172+
receiver.receive(topic.name())
173+
.take(depth)
174+
.collectIndexed { index, value ->
175+
if (index == lastIndex) {
176+
value.offset.commit()
177+
receiver.committedCount(topic.name()) shouldBe 100
178+
} else value.offset.acknowledge()
179+
}
180+
}
181+
}
182+
86183
"receiveAutoAck" {
87184
withTopic(partitions = 3) { topic ->
88185
publishToKafka(topic, produced())

0 commit comments

Comments
 (0)