Skip to content

Commit 40d270b

Browse files
Artemyev VyacheslavArtemyev Vyacheslav
authored andcommitted
Update examples
1 parent 911cbf2 commit 40d270b

File tree

3 files changed

+19
-27
lines changed

3 files changed

+19
-27
lines changed

rabbitmq-kotlin-example/src/main/kotlin/io/github/viartemev/rabbitmq/example/ConsumerExample.kt

Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4,37 +4,28 @@ import com.rabbitmq.client.ConnectionFactory
44
import com.rabbitmq.client.Delivery
55
import io.github.viartemev.rabbitmq.channel.channel
66
import io.github.viartemev.rabbitmq.channel.consume
7-
import kotlinx.coroutines.Dispatchers
8-
import kotlinx.coroutines.async
9-
import kotlinx.coroutines.awaitAll
10-
import kotlinx.coroutines.coroutineScope
11-
import kotlinx.coroutines.delay
12-
import kotlinx.coroutines.runBlocking
13-
import kotlinx.coroutines.withContext
7+
import kotlinx.coroutines.*
148

159
const val CONSUMER_QUEUE_NAME = "test_queue"
1610
const val CONSUME_TIMES = 5
1711
val ioIntensiveFunction: suspend (Delivery) -> Unit = {
1812
delay(2000)
1913
println(message = "Message: ${String(it.body)}")
2014
}
21-
val handler: suspend (Delivery) -> Unit = { delivery: Delivery -> withContext(Dispatchers.IO) { ioIntensiveFunction(delivery) } }
15+
val handler: suspend (Delivery) -> Unit =
16+
{ delivery: Delivery -> withContext(Dispatchers.IO) { ioIntensiveFunction(delivery) } }
2217

23-
fun main() {
18+
fun main(): Unit = runBlocking {
2419
val connectionFactory = ConnectionFactory().apply { useNio() }
25-
val connection = connectionFactory.newConnection()
26-
runBlocking {
27-
connection.channel {
20+
connectionFactory.newConnection().use { connction ->
21+
connction.channel {
2822
try {
2923
consume(CONSUMER_QUEUE_NAME, 1) {
30-
coroutineScope {
31-
(1..CONSUME_TIMES).map { async { consumeMessageWithConfirm(handler) } }.awaitAll()
32-
}
24+
(1..CONSUME_TIMES).map { async(Dispatchers.IO) { consumeMessageWithConfirm(handler) } }.awaitAll()
3325
}
3426
} catch (e: RuntimeException) {
3527
println("Error is here...let's rollback handler actions")
3628
}
3729
}
3830
}
39-
connection.close()
4031
}

rabbitmq-kotlin-example/src/main/kotlin/io/github/viartemev/rabbitmq/example/PublisherExample.kt

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,24 +7,26 @@ import io.github.viartemev.rabbitmq.channel.publish
77
import io.github.viartemev.rabbitmq.publisher.OutboundMessage
88
import io.github.viartemev.rabbitmq.queue.QueueSpecification
99
import io.github.viartemev.rabbitmq.queue.declareQueue
10-
import kotlinx.coroutines.*
10+
import kotlinx.coroutines.Dispatchers
11+
import kotlinx.coroutines.async
12+
import kotlinx.coroutines.awaitAll
13+
import kotlinx.coroutines.runBlocking
1114

1215
const val PUBLISHER_EXCHANGE_NAME = ""
1316
const val PUBLISHER_QUEUE_NAME = "test_queue"
1417
const val TIMES = 1_000
1518

16-
fun main() = runBlocking {
19+
fun main(): Unit = runBlocking {
1720
val connectionFactory = ConnectionFactory().apply { useNio() }
18-
val connection = connectionFactory.newConnection()
19-
connection.confirmChannel {
20-
declareQueue(QueueSpecification(PUBLISHER_QUEUE_NAME)).queue
21-
publish {
22-
coroutineScope {
23-
val messages = (1..TIMES).map { createMessage("") }.map { publishWithConfirm(it) }
21+
connectionFactory.newConnection().use { connection ->
22+
connection.confirmChannel {
23+
declareQueue(QueueSpecification(PUBLISHER_QUEUE_NAME)).queue
24+
publish {
25+
(1..TIMES).map { createMessage("") }.map { async(Dispatchers.IO) { publishWithConfirm(it) } }.awaitAll()
26+
.forEach { println(it) }
2427
}
2528
}
2629
}
27-
connection.close()
2830
}
2931

3032
private fun createMessage(body: String) =

rabbitmq-kotlin-example/src/main/kotlin/io/github/viartemev/rabbitmq/example/rpc/RpcClient.kt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,8 @@ import mu.KotlinLogging
1414
private val logger = KotlinLogging.logger {}
1515

1616
fun main(): Unit = runBlocking {
17-
val connection = ConnectionFactory().apply { useNio() }.newConnection()
1817
val message = OutboundMessage("", "rpc_request", MessageProperties.PERSISTENT_BASIC, "Slava")
19-
connection.use { conn ->
18+
ConnectionFactory().apply { useNio() }.newConnection().use { conn ->
2019
conn.channel {
2120
logger.info { "Asking for greeting request..." }
2221
val response = withTimeoutOrNull(1000) {

0 commit comments

Comments
 (0)