File tree Expand file tree Collapse file tree 7 files changed +118
-0
lines changed
main/kotlin/com/baeldung/redispubsub
test/kotlin/com/baeldung/redispubsub Expand file tree Collapse file tree 7 files changed +118
-0
lines changed Original file line number Diff line number Diff line change 9595 <version >${moshi.version} </version >
9696 <scope >test</scope >
9797 </dependency >
98+ <dependency >
99+ <groupId >com.github.codemonstur</groupId >
100+ <artifactId >embedded-redis</artifactId >
101+ <version >${embedded-redis.version} </version >
102+ </dependency >
103+ <dependency >
104+ <groupId >io.lettuce</groupId >
105+ <artifactId >lettuce-core</artifactId >
106+ <version >${lettuce-core.version} </version >
107+ </dependency >
98108 </dependencies >
99109
100110 <build >
161171 <retrofit .version>2.9.0</retrofit .version>
162172 <wiremock-jre8 .version>2.35.1</wiremock-jre8 .version>
163173 <moshi .version>1.15.1</moshi .version>
174+ <kotlinx-coroutines .version>1.7.1</kotlinx-coroutines .version>
175+ <embedded-redis .version>1.4.3</embedded-redis .version>
176+ <lettuce-core .version>6.5.0.RELEASE</lettuce-core .version>
164177 </properties >
165178
166179</project >
Original file line number Diff line number Diff line change 1+ package com.baeldung.redispubsub
2+
3+ data class Message (val content : String )
Original file line number Diff line number Diff line change 1+ package com.baeldung.redispubsub
2+
3+ import io.lettuce.core.pubsub.RedisPubSubAdapter
4+ import java.util.concurrent.CountDownLatch
5+
6+ class MessageListener : RedisPubSubAdapter <String , String >() {
7+
8+ var latch: CountDownLatch = CountDownLatch (1 )
9+
10+ var messagesReceived: List <String > = emptyList()
11+ override fun message (channel : String? , message : String? ) {
12+ println (" Received message: $message from channel: $channel " )
13+ messagesReceived = messagesReceived.plus(message!! )
14+ latch.countDown()
15+ }
16+
17+ }
Original file line number Diff line number Diff line change 1+ package com.baeldung.redispubsub
2+
3+ import io.lettuce.core.RedisClient
4+ import io.lettuce.core.api.StatefulRedisConnection
5+ import io.lettuce.core.api.sync.RedisCommands
6+ import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands
7+
8+ object RedisConnectionManager: AutoCloseable {
9+ private val redisClient: RedisClient = RedisClient .create(" redis://localhost:6379" )
10+ private val connection: StatefulRedisConnection <String , String > = redisClient.connect()
11+
12+ override fun close () {
13+ connection.close()
14+ redisClient.shutdown()
15+ }
16+
17+ fun redisSyncCommands (): RedisCommands <String , String >? {
18+ return connection.sync()
19+ }
20+
21+ fun redisPubSubAsyncCommands (messageListener : MessageListener ): RedisPubSubAsyncCommands <String , String > {
22+ val pubSubConnection = redisClient.connectPubSub()
23+ pubSubConnection.addListener(messageListener)
24+ return pubSubConnection.async()
25+ }
26+ }
Original file line number Diff line number Diff line change 1+ package com.baeldung.redispubsub
2+
3+ class RedisPublisher {
4+
5+ fun publishMessage (channel : String , message : Message ) {
6+ RedisConnectionManager .redisSyncCommands()?.publish(channel, message.content)
7+ println (" Message published: $message " )
8+ }
9+ }
Original file line number Diff line number Diff line change 1+ package com.baeldung.redispubsub
2+
3+ class RedisSubscriber (private val messageListener : MessageListener ) {
4+
5+ fun subscribeToChannel (channel : String ) {
6+ RedisConnectionManager .redisPubSubAsyncCommands(messageListener).subscribe(channel)
7+ }
8+
9+ }
Original file line number Diff line number Diff line change 1+ package com.baeldung.redispubsub
2+
3+ import org.junit.jupiter.api.AfterAll
4+ import org.junit.jupiter.api.Assertions.*
5+ import org.junit.jupiter.api.BeforeAll
6+ import org.junit.jupiter.api.Test
7+ import org.junit.jupiter.api.TestInstance
8+ import redis.embedded.RedisServer
9+ import java.util.concurrent.TimeUnit
10+
11+ @TestInstance(TestInstance .Lifecycle .PER_CLASS )
12+ class RedisSubscriberUnitTest {
13+
14+ val messageListener = MessageListener ()
15+ val redisSubscriber = RedisSubscriber (messageListener)
16+ val redisPublisher = RedisPublisher ()
17+ val channel = " channel"
18+ val message = Message (" Hello, Redis!" )
19+
20+ val redisServer = RedisServer (6379 )
21+
22+ @BeforeAll
23+ fun setUp () {
24+ redisServer.start()
25+ }
26+
27+ @AfterAll
28+ fun tearDown () {
29+ RedisConnectionManager .close()
30+ redisServer.stop()
31+ }
32+
33+ @Test
34+ fun givenMessageListener_whenMessagePublished_thenMessageReceived () {
35+ redisSubscriber.subscribeToChannel(channel)
36+ redisPublisher.publishMessage(channel, message)
37+ messageListener.latch.await(500 , TimeUnit .MILLISECONDS )
38+ assertEquals(message.content, messageListener.messagesReceived.get(0 ))
39+ }
40+
41+ }
You can’t perform that action at this time.
0 commit comments