@@ -2,7 +2,6 @@ Module kotlin-kafka
22
33[ ![ Maven Central] ( https://img.shields.io/maven-central/v/io.github.nomisrev/kotlin-kafka?color=4caf50&label=latest%20release )] ( https://maven-badges.herokuapp.com/maven-central/io.github.nomisrev/kotlin-kafka )
44
5- <!-- - TEST_NAME ReadmeTest -->
65<!-- - TOC -->
76
87* [ Rationale] ( #rationale )
@@ -12,22 +11,31 @@ Module kotlin-kafka
1211
1312<!-- - END -->
1413
15- This project is still under development, andd started as a playground where I was playing around with Kafka in Kotlin
16- and the Kafka SDK whilst reading the Kafka book Definite Guide from Confluent.
17- https://www.confluent.io/resources/kafka-the-definitive-guide-v2/
18-
1914## Rationale
2015
21- At the time of starting this repository I didn't find any bindings between Kafka SDK and Kotlin suspension. These
22- operators should be implemented low-level, so they can guarantee correct cancellation support, and high optimised
23- runtimes.
16+ At the time of starting this repository I didn't find any bindings between Kafka SDK and Kotlin suspension, or KotlinX Coroutines Flow.
17+ These operators should be implemented low-level, so they can guarantee correct cancellation support, and high optimised runtimes.
18+
19+ Some important aspects of Kafka are tricky to implement with the "low-level" Kafka API,
20+ especially properly streaming records from Kafka and correctly committing them.
21+ Additional complexity is involved in this process, more details [ here] ( https://tuleism.github.io/blog/2021/parallel-backpressured-kafka-consumer/ ) .
22+
23+ To solve these problems a couple of projects in the JVM already exist:
24+ - [ Alpakka Kafka] ( https://github.com/akka/alpakka-kafka )
25+ - [ reactor-kafka] ( https://github.com/reactor/reactor-kafka )
26+
27+ There was no implementation for KotlinX Coroutines Flow,
28+ you can however quite easily use reactor-kafka with [ KotlinX Coroutines Reactor bindings] ( https://github.com/Kotlin/kotlinx.coroutines/blob/master/reactive/kotlinx-coroutines-reactor/README.md ) .
29+
30+ This project implements the same strategies as [ reactor-kafka] directly on top of KotlinX Coroutines to benefit from ** all** their benefits,
31+ and to open the door to potentially becoming a Kotlin MPP library in the future.
2432
2533## Goals
2634
27- - Lean Core library built on top of Kotlin Std & KotlinX Coroutines (possible extensions with Arrow in additional
28- module)
35+ - Lean Core library built on top of Kotlin Std & KotlinX Coroutines
2936- Extensions to easily operate over the Kafka SDK with KotlinX Coroutines and ` suspend ` .
3037- Flow based operators, so you can easily compose KotlinX Flow based Kafka programs
38+ - Strong guarantees about committing record offsets, and performance optimisations in regard to re-balancing/partitioning.
3139- example for testing Kafka with Test Containers in Kotlin.
3240
3341## Adding Dependency
@@ -43,20 +51,24 @@ dependencies {
4351## Example
4452
4553<!-- - INCLUDE
46- import java.util.UUID
47- import kotlinx.coroutines.Dispatchers.Default
54+ import io.github.nomisRev.kafka.receiver.KafkaReceiver
55+ import io.github.nomisRev.kafka.receiver.ReceiverSettings
56+ import kotlinx.coroutines.Dispatchers
4857import kotlinx.coroutines.coroutineScope
58+ import kotlinx.coroutines.delay
59+ import kotlinx.coroutines.flow.Flow
4960import kotlinx.coroutines.flow.asFlow
50- import kotlinx.coroutines.flow.collect
5161import kotlinx.coroutines.flow.map
5262import kotlinx.coroutines.flow.take
5363import kotlinx.coroutines.launch
64+ import kotlinx.coroutines.runBlocking
5465import org.apache.kafka.clients.admin.NewTopic
5566import org.apache.kafka.clients.producer.ProducerRecord
5667import org.apache.kafka.common.serialization.IntegerDeserializer
5768import org.apache.kafka.common.serialization.IntegerSerializer
5869import org.apache.kafka.common.serialization.StringDeserializer
5970import org.apache.kafka.common.serialization.StringSerializer
71+ import java.util.UUID
6072-->
6173
6274``` kotlin
@@ -66,49 +78,46 @@ value class Key(val index: Int)
6678@JvmInline
6779value class Message (val content : String )
6880
69- fun main (): Unit =
70- runBlocking(Default ) {
71- val topicName = " test-topic"
72- val msgCount = 10
73- val kafka = Kafka .container
81+ fun main (): Unit = runBlocking(Dispatchers .Default ) {
82+ val topicName = " test-topic"
83+ val msgCount = 10
84+ val kafka = Kafka .container
85+
86+ Admin (AdminSettings (kafka.bootstrapServers)).use { client ->
87+ client.createTopic(NewTopic (topicName, 1 , 1 ))
88+ }
7489
75- Admin (AdminSettings (kafka.bootstrapServers)).use { client ->
76- client.createTopic(NewTopic (topicName, 1 , 1 ))
90+ coroutineScope { // Run produces and consumer in a single scope
91+ launch(Dispatchers .IO ) { // Send 20 messages, and then close the producer
92+ val settings: ProducerSettings <Key , Message > = ProducerSettings (
93+ kafka.bootstrapServers,
94+ IntegerSerializer ().imap { key: Key -> key.index },
95+ StringSerializer ().imap { msg: Message -> msg.content },
96+ Acks .All
97+ )
98+ (1 .. msgCount)
99+ .asFlow()
100+ .map { index -> ProducerRecord (topicName, Key (index), Message (" msg: $index " )) }
101+ .produce(settings)
102+ .collect(::println)
77103 }
78104
79- coroutineScope { // Run produces and consumer in a single scope
80- launch { // Send 20 messages, and then close the producer
81- val settings: ProducerSettings <Key , Message > =
82- ProducerSettings (
83- kafka.bootstrapServers,
84- IntegerSerializer ().imap { key: Key -> key.index },
85- StringSerializer ().imap { msg: Message -> msg.content },
86- Acks .All
87- )
88- (1 .. msgCount)
89- .map { index -> ProducerRecord (topicName, Key (index), Message (" msg: $index " )) }
90- .asFlow()
91- .produce(settings)
92- .collect(::println)
93- }
94-
95- launch { // Consume 20 messages as a stream, and then close the consumer
96- val settings: ConsumerSettings <Key , Message > =
97- ConsumerSettings (
98- kafka.bootstrapServers,
99- IntegerDeserializer ().map(::Key ),
100- StringDeserializer ().map(::Message ),
101- groupId = UUID .randomUUID().toString(),
102- autoOffsetReset = AutoOffsetReset .Earliest
103- )
104- kafkaConsumer(settings)
105- .subscribeTo(topicName)
106- .take(msgCount)
107- .map { " ${it.key()} -> ${it.value()} " }
108- .collect(::println)
109- }
105+ launch(Dispatchers .IO ) { // Consume 20 messages as a stream, and then close the consumer
106+ val settings: ReceiverSettings <Key , Message > = ReceiverSettings (
107+ kafka.bootstrapServers,
108+ IntegerDeserializer ().map(::Key ),
109+ StringDeserializer ().map(::Message ),
110+ groupId = UUID .randomUUID().toString(),
111+ autoOffsetReset = AutoOffsetReset .Earliest
112+ )
113+ KafkaReceiver (settings)
114+ .receive(topicName)
115+ .take(msgCount)
116+ .map { " ${it.key()} -> ${it.value()} " }
117+ .collect(::println)
110118 }
111119 }
120+ }
112121```
113122
114123> You can get the full code [ here] ( guide/example/example-readme-01.kt ) .
@@ -135,5 +144,3 @@ Key(index=8) -> Message(content=msg: 8)
135144Key(index=9) -> Message(content=msg: 9)
136145Key(index=10) -> Message(content=msg: 10)
137146```
138-
139- <!-- - TEST -->
0 commit comments