File tree Expand file tree Collapse file tree 1 file changed +18
-0
lines changed
src/main/kotlin/io/github/viartemev/rabbitmq/consumer Expand file tree Collapse file tree 1 file changed +18
-0
lines changed Original file line number Diff line number Diff line change @@ -41,6 +41,19 @@ class ConfirmConsumer internal constructor(
4141 deliveries.close()
4242 }
4343
44+ /* *
45+ * Consume a message from the channel with manual confirmation.
46+ *
47+ * This method suspends the current coroutine, receives a message from the channel, and passes it to the provided
48+ * handler function. After the handler completes without throwing an exception, the method sends an acknowledgement
49+ * (basicAck) to the AMQP channel to confirm the message processing. If the handler throws an exception, the method
50+ * cancels the coroutine and logs an error.
51+ *
52+ * @param handler The suspend function that will be called with the received [Delivery] object.
53+ * The function should handle the processing of the message and optionally throw an exception if an error occurs.
54+ *
55+ * @throws [CancellationException] if the coroutine is canceled due to an error in the handler function.
56+ */
4457 suspend fun consumeMessageWithConfirm (handler : suspend (Delivery ) -> Unit ) = coroutineScope {
4558 logger.debug { " Trying to receive a message from the channel" }
4659 val delivery = deliveries.receive()
@@ -56,6 +69,11 @@ class ConfirmConsumer internal constructor(
5669 }
5770 }
5871
72+ /* *
73+ * Suspend function to consume messages with confirm.
74+ *
75+ * @param handler The handler function that processes the delivery.
76+ */
5977 suspend fun consumeMessagesWithConfirm (handler : suspend (Delivery ) -> Unit ) = coroutineScope {
6078 val semaphore = Semaphore (prefetchSize)
6179 while (isActive) {
You can’t perform that action at this time.
0 commit comments