Skip to content

Commit 6c163d5

Browse files
authored
Merge pull request #142 from viartemev/fix-publisher
Fix publisher
2 parents 12f4e84 + 12bc5c1 commit 6c163d5

File tree

2 files changed

+18
-34
lines changed

2 files changed

+18
-34
lines changed
Lines changed: 17 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,17 @@
11
package com.viartemev.thewhiterabbit.publisher
22

33
import com.rabbitmq.client.Channel
4-
import com.viartemev.thewhiterabbit.exception.PublishException
54
import kotlinx.coroutines.Deferred
65
import kotlinx.coroutines.async
76
import kotlinx.coroutines.coroutineScope
87
import kotlinx.coroutines.suspendCancellableCoroutine
98
import mu.KotlinLogging
109
import java.io.IOException
10+
import java.util.concurrent.CancellationException
1111
import java.util.concurrent.ConcurrentHashMap
1212
import kotlin.coroutines.Continuation
13+
import kotlin.coroutines.CoroutineContext
14+
import kotlin.coroutines.EmptyCoroutineContext
1315

1416
private val logger = KotlinLogging.logger {}
1517

@@ -25,52 +27,34 @@ class ConfirmPublisher internal constructor(private val channel: Channel) {
2527
*
2628
* @see com.viartemev.thewhiterabbit.publisher.OutboundMessage
2729
* @return acknowledgement - represent messages handled successfully or lost by the broker.
28-
* @throws com.viartemev.thewhiterabbit.exception.PublishException if can't publish the message
30+
* @throws java.util.concurrent.CancellationException if can't publish the message
2931
*/
3032
suspend fun publishWithConfirm(message: OutboundMessage): Boolean {
3133
val messageSequenceNumber = channel.nextPublishSeqNo
3234
logger.debug { "The message Sequence Number: $messageSequenceNumber" }
33-
try {
34-
return suspendCancellableCoroutine { continuation ->
35-
continuations[messageSequenceNumber] = continuation
35+
return suspendCancellableCoroutine { continuation ->
36+
continuations[messageSequenceNumber] = continuation
37+
continuation.invokeOnCancellation { continuations.remove(messageSequenceNumber) }
38+
try {
3639
message.run { channel.basicPublish(exchange, routingKey, properties, msg.toByteArray()) }
40+
} catch (e: IOException) {
41+
val cancelled = continuation.cancel(e)
42+
if (!cancelled) throw CancellationException(e.message)
3743
}
38-
} catch (e: IOException) {
39-
continuations.remove(messageSequenceNumber)
40-
throw PublishException("Can't publish message: $message")
4144
}
4245
}
4346

44-
/**
45-
* Asynchronously publish a message with the waiting of confirmation.
46-
*
47-
* @see com.viartemev.thewhiterabbit.publisher.OutboundMessage
48-
* @return acknowledgement - represent messages handled successfully or lost by the broker.
49-
* @throws java.io.IOException if an error is encountered
50-
*/
51-
suspend fun asyncPublishWithConfirm(message: OutboundMessage): Deferred<Boolean> = coroutineScope {
52-
async { publishWithConfirm(message) }
53-
}
54-
55-
/**
56-
* Publish a list of messages with the waiting of confirmation.
57-
*
58-
* @see com.viartemev.thewhiterabbit.publisher.OutboundMessage
59-
* @return list of acknowledgements - represent messages handled successfully or lost by the broker.
60-
* @throws java.io.IOException if an error is encountered
61-
*/
62-
suspend fun publishWithConfirm(messages: List<OutboundMessage>): List<Boolean> {
63-
return messages.map { publishWithConfirm(it) }
64-
}
65-
6647
/**
6748
* Asynchronously publish a list of messages with the waiting of confirmation.
6849
*
6950
* @see com.viartemev.thewhiterabbit.publisher.OutboundMessage
7051
* @return list of acknowledgements - represent messages handled successfully or lost by the broker.
71-
* @throws java.io.IOException if an error is encountered
52+
* @throws java.util.concurrent.CancellationException if can't publish one of the messages
7253
*/
73-
suspend fun asyncPublishWithConfirm(messages: List<OutboundMessage>): List<Deferred<Boolean>> = coroutineScope {
74-
messages.map { async { publishWithConfirm(it) } }
54+
suspend fun publishWithConfirmAsync(
55+
coroutineContext: CoroutineContext = EmptyCoroutineContext,
56+
messages: List<OutboundMessage>
57+
): List<Deferred<Boolean>> = coroutineScope {
58+
messages.map { async(coroutineContext) { publishWithConfirm(it) } }
7559
}
7660
}

src/test/kotlin/com/viartemev/thewhiterabbit/publisher/PublisherTest.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ class PublisherTest : AbstractTestContainersTest() {
6666
declareQueue(QueueSpecification(QUEUE_NAME))
6767
publish {
6868
val messages = (1..times).map { createMessage("Hello #$it") }
69-
val acks = asyncPublishWithConfirm(messages).awaitAll()
69+
val acks = publishWithConfirmAsync(messages = messages).awaitAll()
7070
assertTrue { acks.all { it } }
7171
}
7272
}

0 commit comments

Comments
 (0)