Skip to content

Commit 4cf0edf

Browse files
authored
Merge pull request #84 from viartemev/add-proper-connection-closing
Add proper connection closing
2 parents 631f4af + 65e6915 commit 4cf0edf

File tree

5 files changed

+80
-36
lines changed

5 files changed

+80
-36
lines changed
Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,22 @@
11
package com.viartemev.thewhiterabbit.channel
22

33
import com.rabbitmq.client.Channel
4+
import com.rabbitmq.client.Connection
45
import com.viartemev.thewhiterabbit.consumer.ConfirmConsumer
56

6-
fun Channel.consumer(queue: String) = ConfirmConsumer(this, queue)
7+
fun Channel.consumer(queue: String) = ConfirmConsumer(this, queue)
8+
9+
suspend fun Connection.channel(block: suspend Channel.() -> Unit): Channel {
10+
val channel = this.createChannel()
11+
channel.use { block(it) }
12+
return channel
13+
}
14+
15+
suspend fun Channel.consume(queue: String, block: suspend ConfirmConsumer.() -> Unit) {
16+
val consumer = this.consumer(queue)
17+
try {
18+
block(consumer)
19+
} finally {
20+
consumer.cancel()
21+
}
22+
}

src/main/kotlin/com/viartemev/thewhiterabbit/channel/ConfirmChannel.kt

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,14 @@ class ConfirmChannel internal constructor(private val channel: Channel) : Channe
1717
* @see com.rabbitmq.client.Channel.confirmSelect()
1818
*/
1919
fun Connection.createConfirmChannel(): ConfirmChannel = ConfirmChannel(this.createChannel())
20+
21+
suspend fun Connection.confirmChannel(block: suspend ConfirmChannel.() -> Unit): ConfirmChannel {
22+
val confirmChannel = this.createConfirmChannel()
23+
confirmChannel.use { block(it) }
24+
return confirmChannel
25+
}
26+
27+
suspend fun ConfirmChannel.publish(block: suspend ConfirmPublisher.() -> Unit) {
28+
val publisher = this.publisher()
29+
block(publisher)
30+
}

src/main/kotlin/com/viartemev/thewhiterabbit/consumer/ConfirmConsumer.kt

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,13 @@ class ConfirmConsumer internal constructor(private val AMQPChannel: Channel, AMQ
2020
init {
2121
AMQPChannel.basicQos(prefetchSize, false)
2222
consTag = AMQPChannel.basicConsume(AMQPQueue, false,
23-
{ _, message -> continuations.sendBlocking(message) },
23+
{ consumerTag, message ->
24+
try {
25+
continuations.sendBlocking(message)
26+
} catch (e: Exception) {
27+
logger.info { "Consumer $consumerTag has been cancelled" }
28+
}
29+
},
2430
{ consumerTag ->
2531
logger.info { "Consumer $consumerTag has been cancelled" }
2632
continuations.cancel()
@@ -44,4 +50,9 @@ class ConfirmConsumer internal constructor(private val AMQPChannel: Channel, AMQ
4450
throw AcknowledgeException(errorMessage)
4551
}
4652
}
53+
54+
fun cancel() {
55+
AMQPChannel.basicCancel(consTag)
56+
continuations.cancel()
57+
}
4758
}

src/test/kotlin/com/viartemev/thewhiterabbit/consumer/ConfirmConsumerTest.kt

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@ package com.viartemev.thewhiterabbit.consumer
22

33
import com.rabbitmq.client.ConnectionFactory
44
import com.rabbitmq.client.Delivery
5-
import com.viartemev.thewhiterabbit.channel.consumer
5+
import com.viartemev.thewhiterabbit.channel.channel
6+
import com.viartemev.thewhiterabbit.channel.consume
67
import com.viartemev.thewhiterabbit.queue.QueueSpecification
78
import com.viartemev.thewhiterabbit.queue.declareQueue
89
import kotlinx.coroutines.delay
@@ -29,11 +30,12 @@ class ConfirmConsumerTest {
2930
@Test
3031
fun `test message consuming`() {
3132
factory.newConnection().use { connection ->
32-
connection.createChannel().use { channel ->
33-
runBlocking {
34-
channel.declareQueue(QueueSpecification(QUEUE_NAME))
35-
val consumer = channel.consumer(QUEUE_NAME)
36-
for (i in 1..3) consumer.consumeWithConfirm({ handleDelivery(it) })
33+
runBlocking {
34+
connection.channel {
35+
declareQueue(QueueSpecification(QUEUE_NAME))
36+
consume(QUEUE_NAME) {
37+
for (i in 1..3) consumeWithConfirm({ handleDelivery(it) })
38+
}
3739
}
3840
}
3941
}

src/test/kotlin/com/viartemev/thewhiterabbit/publisher/PublisherTest.kt

Lines changed: 32 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@ package com.viartemev.thewhiterabbit.publisher
22

33
import com.rabbitmq.client.ConnectionFactory
44
import com.rabbitmq.client.MessageProperties
5-
import com.viartemev.thewhiterabbit.channel.createConfirmChannel
5+
import com.viartemev.thewhiterabbit.channel.confirmChannel
6+
import com.viartemev.thewhiterabbit.channel.publish
67
import com.viartemev.thewhiterabbit.queue.QueueSpecification
78
import com.viartemev.thewhiterabbit.queue.declareQueue
89
import kotlinx.coroutines.async
@@ -33,14 +34,16 @@ class PublisherTest {
3334

3435
@Test
3536
fun `test one message publishing`() {
36-
factory.newConnection().use { connection ->
37-
connection.createConfirmChannel().use { channel ->
38-
val publisher = channel.publisher()
39-
runBlocking {
40-
channel.declareQueue(QueueSpecification(QUEUE_NAME))
41-
val message = createMessage("Hello")
42-
val ack = publisher.publishWithConfirm(message)
43-
assertTrue { ack }
37+
factory.newConnection().use {
38+
val connection = it
39+
runBlocking {
40+
connection.confirmChannel {
41+
declareQueue(QueueSpecification(QUEUE_NAME))
42+
publish {
43+
val message = createMessage("Hello")
44+
val ack = publishWithConfirm(message)
45+
assertTrue { ack }
46+
}
4447
}
4548
}
4649
}
@@ -50,19 +53,19 @@ class PublisherTest {
5053
fun `test n-messages publishing manually`() {
5154
val times = 10
5255
factory.newConnection().use { connection ->
53-
connection.createConfirmChannel().use { channel ->
54-
val publisher = channel.publisher()
55-
runBlocking {
56-
channel.declareQueue(QueueSpecification(QUEUE_NAME))
57-
val acks = coroutineScope {
58-
59-
(1..times).map {
60-
async {
61-
publisher.publishWithConfirm(createMessage("Hello #$it"))
62-
}
63-
}.awaitAll()
56+
runBlocking {
57+
connection.confirmChannel {
58+
declareQueue(QueueSpecification(QUEUE_NAME))
59+
publish {
60+
val acks = coroutineScope {
61+
(1..times).map {
62+
async {
63+
publishWithConfirm(createMessage("Hello #$it"))
64+
}
65+
}.awaitAll()
66+
}
67+
assertTrue { acks.all { true } }
6468
}
65-
assertTrue { acks.all { true } }
6669
}
6770
}
6871
}
@@ -72,13 +75,14 @@ class PublisherTest {
7275
fun `test n-messages publishing`() {
7376
val times = 10
7477
factory.newConnection().use { connection ->
75-
connection.createConfirmChannel().use { channel ->
76-
val publisher = channel.publisher()
77-
runBlocking {
78-
channel.declareQueue(QueueSpecification(QUEUE_NAME))
79-
val messages = (1..times).map { createMessage("Hello #$it") }
80-
val acks = publisher.publishWithConfirm(messages).awaitAll()
81-
assertTrue { acks.all { true } }
78+
runBlocking {
79+
connection.confirmChannel {
80+
declareQueue(QueueSpecification(QUEUE_NAME))
81+
publish {
82+
val messages = (1..times).map { createMessage("Hello #$it") }
83+
val acks = publishWithConfirm(messages).awaitAll()
84+
assertTrue { acks.all { true } }
85+
}
8286
}
8387
}
8488
}

0 commit comments

Comments
 (0)