Skip to content

Commit fb21b03

Browse files
committed
Add async consumer method
1 parent 0c392c8 commit fb21b03

File tree

1 file changed

+12
-2
lines changed

1 file changed

+12
-2
lines changed

src/main/kotlin/com/viartemev/thewhiterabbit/consumer/ConfirmConsumer.kt

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@ import com.rabbitmq.client.Delivery
55
import com.viartemev.thewhiterabbit.exception.AcknowledgeException
66
import kotlinx.coroutines.CoroutineDispatcher
77
import kotlinx.coroutines.Dispatchers
8+
import kotlinx.coroutines.async
89
import kotlinx.coroutines.channels.sendBlocking
10+
import kotlinx.coroutines.coroutineScope
911
import kotlinx.coroutines.withContext
1012
import mu.KotlinLogging
1113
import java.io.IOException
@@ -35,8 +37,16 @@ class ConfirmConsumer internal constructor(private val amqpChannel: Channel, amq
3537
}
3638

3739
/**
38-
* Asynchronously consume one message.
39-
* @throws IOException if an error is encountered
40+
* Consume a message.
41+
* @throws com.viartemev.thewhiterabbit.exception.AcknowledgeException if can't send ack
42+
*/
43+
suspend fun asyncConsumeWithConfirm(handler: suspend (Delivery) -> Unit, handlerDispatcher: CoroutineDispatcher = Dispatchers.Default) = coroutineScope {
44+
async { consumeWithConfirm(handler, handlerDispatcher) }
45+
}
46+
47+
/**
48+
* Consume a message.
49+
* @throws com.viartemev.thewhiterabbit.exception.AcknowledgeException if can't send ack
4050
*/
4151
suspend fun consumeWithConfirm(handler: suspend (Delivery) -> Unit, handlerDispatcher: CoroutineDispatcher = Dispatchers.Default) {
4252
val delivery = deliveries.receive()

0 commit comments

Comments
 (0)