Skip to content

Commit 97b4831

Browse files
authored
Merge pull request #82 from viartemev/remove-consumer-tag
Remove consumer tag comparasion
2 parents 0000150 + 0a4c530 commit 97b4831

File tree

1 file changed

+4
-9
lines changed

1 file changed

+4
-9
lines changed

src/main/kotlin/com/viartemev/thewhiterabbit/consumer/ConfirmConsumer.kt

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,22 +13,17 @@ import kotlinx.coroutines.channels.Channel as KChannel
1313

1414
private val logger = KotlinLogging.logger {}
1515

16-
/**
17-
* @todo What will be if consumer gets huge amount of messages, but handling is very slow?
18-
*/
1916
class ConfirmConsumer internal constructor(private val AMQPChannel: Channel, AMQPQueue: String, prefetchSize: Int = 0) {
2017
private val continuations = KChannel<Delivery>()
21-
private lateinit var consTag: String
18+
private val consTag: String
2219

2320
init {
2421
AMQPChannel.basicQos(prefetchSize, false)
2522
consTag = AMQPChannel.basicConsume(AMQPQueue, false,
26-
{ consumerTag, message -> if (consumerTag == consTag) continuations.sendBlocking(message) },
23+
{ _, message -> continuations.sendBlocking(message) },
2724
{ consumerTag ->
28-
if (consumerTag == consTag) {
29-
logger.info { "Consumer $consumerTag has been cancelled" }
30-
continuations.cancel()
31-
}
25+
logger.info { "Consumer $consumerTag has been cancelled" }
26+
continuations.cancel()
3227
}
3328
)
3429
}

0 commit comments

Comments
 (0)