diff --git a/util-rabbitmq/src/main/java/com/github/workerframework/util/rabbitmq/RabbitUtil.java b/util-rabbitmq/src/main/java/com/github/workerframework/util/rabbitmq/RabbitUtil.java index ca5f28455..85783b589 100644 --- a/util-rabbitmq/src/main/java/com/github/workerframework/util/rabbitmq/RabbitUtil.java +++ b/util-rabbitmq/src/main/java/com/github/workerframework/util/rabbitmq/RabbitUtil.java @@ -220,6 +220,8 @@ public static void declareQueue(Channel channel, String queueName, Durability du Objects.requireNonNull(act); Objects.requireNonNull(queueProps); try { + LOG.info("Declaring queue: {} with durability: {}, exclusivity: {}, auto-remove: {}, properties: {}", + queueName, dur, excl, act, queueProps); channel.queueDeclare(queueName, dur == Durability.DURABLE, excl == Exclusivity.EXCLUSIVE, act == EmptyAction.AUTO_REMOVE, queueProps); } catch (IOException e) { LOG.warn("IO Exception encountered during queueDeclare. Will try do declare passively.", e); diff --git a/worker-api/src/main/java/com/github/workerframework/api/BulkWorker.java b/worker-api/src/main/java/com/github/workerframework/api/BulkWorker.java index 8e4352eeb..df7b530e9 100644 --- a/worker-api/src/main/java/com/github/workerframework/api/BulkWorker.java +++ b/worker-api/src/main/java/com/github/workerframework/api/BulkWorker.java @@ -15,6 +15,8 @@ */ package com.github.workerframework.api; +import java.nio.charset.StandardCharsets; + /** * This interface should be implemented by CAF Workers which are able to process multiple tasks together. * @@ -31,4 +33,26 @@ public interface BulkWorker */ void processTasks(BulkWorkerRuntime runtime) throws InterruptedException; + + /** + * If a message has been identified as a poison message, prepare a WorkerResponse that includes the friendly name + * of the worker. + * For compatibility with existing Worker implementations a default implementation has been provided. + * + * @param workerFriendlyName the worker's friendly name + * @return a response containing details of the worker that encountered a poison message + */ + default WorkerResponse getPoisonMessageResult(String workerFriendlyName, final WorkerTask workerTask) { + final String strData = workerFriendlyName + " could not process the item."; + final byte[] byteArrayData = strData.getBytes(StandardCharsets.UTF_8); + + // TODO + // In Abstract Worker we have: + // return new WorkerResponse(getResultQueue(), TaskStatus.RESULT_EXCEPTION, getExceptionData(t), getWorkerIdentifier(), getWorkerApiVersion(), null); + // + // I don't know what the equivalent of getResultQueue(), getWorkerIdentifier() and getWorkerApiVersion() are here, + // or where we could ge them from in the BulkWorker? + return new WorkerResponse( + workerTask.getTo(), TaskStatus.RESULT_EXCEPTION, byteArrayData, "", workerTask.getVersion(), null); + } } diff --git a/worker-core/src/main/java/com/github/workerframework/core/BulkWorkerTaskProvider.java b/worker-core/src/main/java/com/github/workerframework/core/BulkWorkerTaskProvider.java index 79faad8d4..ac88dc0c3 100644 --- a/worker-core/src/main/java/com/github/workerframework/core/BulkWorkerTaskProvider.java +++ b/worker-core/src/main/java/com/github/workerframework/core/BulkWorkerTaskProvider.java @@ -15,34 +15,74 @@ */ package com.github.workerframework.core; +import com.github.workerframework.api.BulkWorker; import com.github.workerframework.api.BulkWorkerRuntime; +import com.github.workerframework.api.TaskMessage; +import com.github.workerframework.api.TaskStatus; import com.github.workerframework.api.WorkerTask; +import com.google.common.base.MoreObjects; import java.util.ArrayList; import java.util.Collections; import java.util.Objects; +import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; final class BulkWorkerTaskProvider implements BulkWorkerRuntime { + private static final Logger LOG = LoggerFactory.getLogger(BulkWorkerTaskProvider.class); private WorkerTaskImpl firstTask; private final BlockingQueue workQueue; private final ArrayList consumedTasks; + private final BulkWorker bulkWorker; + private final String bulkWorkerFriendlyName; public BulkWorkerTaskProvider( final WorkerTaskImpl firstTask, - final BlockingQueue workQueue - ) + final BlockingQueue workQueue, + final BulkWorker bulkWorker, + final String bulkWorkerFriendlyName) { this.firstTask = Objects.requireNonNull(firstTask); this.workQueue = Objects.requireNonNull(workQueue); + this.bulkWorker = Objects.requireNonNull(bulkWorker); + this.bulkWorkerFriendlyName = Objects.requireNonNull(bulkWorkerFriendlyName); this.consumedTasks = new ArrayList<>(); } @Override public WorkerTask getNextWorkerTask() { - return registerTaskConsumed(getNextWorkerTaskImpl()); + try { + return getNextWorkerTaskInternal(null); + } catch (final InterruptedException e) { + // Should never happen - no-arg version doesn't block + throw new RuntimeException(e); + } + } + + @Override + public WorkerTask getNextWorkerTask(long millis) throws InterruptedException + { + return getNextWorkerTaskInternal(millis); + } + + private WorkerTask getNextWorkerTaskInternal(final Long millis) throws InterruptedException + { + final WorkerTaskImpl workerTask = registerTaskConsumed( + millis == null ? getNextWorkerTaskImpl() : getNextWorkerTaskImpl(millis) + ); + + if (workerTask != null && workerTask.isPoison()) { + LOG.info("Received poison message, generating poison response for worker: {}", bulkWorkerFriendlyName); + workerTask.setResponse(bulkWorker.getPoisonMessageResult(bulkWorkerFriendlyName, workerTask)); + sendCopyToReject(workerTask); + return getNextWorkerTaskInternal(millis); + } + + return workerTask; } private WorkerTaskImpl getNextWorkerTaskImpl() @@ -56,12 +96,6 @@ private WorkerTaskImpl getNextWorkerTaskImpl() } } - @Override - public WorkerTask getNextWorkerTask(long millis) throws InterruptedException - { - return registerTaskConsumed(getNextWorkerTaskImpl(millis)); - } - private WorkerTaskImpl getNextWorkerTaskImpl(long millis) throws InterruptedException { @@ -91,4 +125,22 @@ private WorkerTaskImpl registerTaskConsumed(WorkerTaskImpl workerTask) } return workerTask; } + + private void sendCopyToReject(final WorkerTaskImpl workerTask) { + final TaskMessage poisonMessage = new TaskMessage( + UUID.randomUUID().toString(), + MoreObjects.firstNonNull(workerTask.getClassifier(), ""), + workerTask.getVersion(), + workerTask.getData(), + TaskStatus.RESULT_EXCEPTION, + Collections.emptyMap(), + workerTask.getRejectQueue(), + workerTask.getTrackingInfo(), + workerTask.getSourceInfo(), + workerTask.getCorrelationId()); + + LOG.info("Sending poison message to: {}", workerTask.getRejectQueue()); + + workerTask.sendMessage(poisonMessage); + } } diff --git a/worker-core/src/main/java/com/github/workerframework/core/BulkWorkerThreadPool.java b/worker-core/src/main/java/com/github/workerframework/core/BulkWorkerThreadPool.java index f6fbc502e..19efefbeb 100644 --- a/worker-core/src/main/java/com/github/workerframework/core/BulkWorkerThreadPool.java +++ b/worker-core/src/main/java/com/github/workerframework/core/BulkWorkerThreadPool.java @@ -28,8 +28,10 @@ final class BulkWorkerThreadPool implements WorkerThreadPool { private static final Logger LOG = LoggerFactory.getLogger(BulkWorkerThreadPool.class); + private static final String CAF_WORKER_FRIENDLY_NAME = System.getenv("CAF_WORKER_FRIENDLY_NAME"); private final BulkWorker bulkWorker; + private final String bulkWorkerFriendlyName; private final BlockingQueue workQueue; private final BulkWorkerThread[] bulkWorkerThreads; private final Runnable throwableHandler; @@ -45,6 +47,8 @@ public BulkWorkerThreadPool( final int nThreads = workerFactory.getWorkerThreads(); this.bulkWorker = (BulkWorker) workerFactory; + this.bulkWorkerFriendlyName = CAF_WORKER_FRIENDLY_NAME != null + ? CAF_WORKER_FRIENDLY_NAME : bulkWorker.getClass().getSimpleName(); this.workQueue = new LinkedBlockingQueue<>(); this.bulkWorkerThreads = new BulkWorkerThread[nThreads]; this.throwableHandler = handler; @@ -83,7 +87,7 @@ private void execute() { final WorkerTaskImpl task = workQueue.take(); final BulkWorkerTaskProvider taskProvider - = new BulkWorkerTaskProvider(task, workQueue); + = new BulkWorkerTaskProvider(task, workQueue, bulkWorker, bulkWorkerFriendlyName); try { bulkWorker.processTasks(taskProvider); diff --git a/worker-core/src/main/java/com/github/workerframework/core/StreamingWorkerWrapper.java b/worker-core/src/main/java/com/github/workerframework/core/StreamingWorkerWrapper.java index a3372267a..7a6cd314b 100644 --- a/worker-core/src/main/java/com/github/workerframework/core/StreamingWorkerWrapper.java +++ b/worker-core/src/main/java/com/github/workerframework/core/StreamingWorkerWrapper.java @@ -66,10 +66,12 @@ public void run() final WorkerResponse response; if(workerTask.isPoison()) { + LOG.info("Received poison message, generating poison response for worker: {}", workerFriendlyName); response = worker.getPoisonMessageResult(workerFriendlyName); sendCopyToReject(); } else { + LOG.info("workerTask.isPoison() == false"); Timer.Context t = TIMER.time(); MDC.put(CORRELATION_ID, workerTask.getCorrelationId()); response = worker.doWork(); @@ -112,6 +114,9 @@ private void sendCopyToReject() { workerTask.getTrackingInfo(), workerTask.getSourceInfo(), workerTask.getCorrelationId()); + + LOG.info("Sending poison message to: {}", workerTask.getRejectQueue()); + workerTask.sendMessage(poisonMessage); } diff --git a/worker-core/src/main/java/com/github/workerframework/core/WorkerCore.java b/worker-core/src/main/java/com/github/workerframework/core/WorkerCore.java index f379e2b21..99ccc4fbb 100644 --- a/worker-core/src/main/java/com/github/workerframework/core/WorkerCore.java +++ b/worker-core/src/main/java/com/github/workerframework/core/WorkerCore.java @@ -170,7 +170,7 @@ private void registerNewTaskImpl(final TaskInformation taskInformation, final Ta throws InvalidTaskException, TaskRejectedException { try { - LOG.debug("Received task {} (message id: {})", tm.getTaskId(), taskInformation.getInboundMessageId()); + LOG.info("Received task {} (message id: {})", tm.getTaskId(), taskInformation.getInboundMessageId()); validateTaskMessage(tm); final JobStatus jobStatus; try { @@ -446,7 +446,7 @@ public void send(final TaskInformation taskInformation, final TaskMessage respon { Objects.requireNonNull(taskInformation); Objects.requireNonNull(responseMessage); - LOG.debug("Sending task {} complete (message id: {})", responseMessage.getTaskId(), taskInformation.getInboundMessageId()); + LOG.info("Sending task {} complete (message id: {})", responseMessage.getTaskId(), taskInformation.getInboundMessageId()); final String queue = responseMessage.getTo(); checkForTrackingTermination(taskInformation, queue, responseMessage); @@ -470,8 +470,8 @@ public void complete(final TaskInformation taskInformation, final String queue, Objects.requireNonNull(taskInformation); Objects.requireNonNull(responseMessage); // queue can be null for a dead end worker - LOG.debug("Task {} complete (message id: {})", responseMessage.getTaskId(), taskInformation.getInboundMessageId()); - LOG.debug("Setting destination {} in task {} (message id: {})", queue, responseMessage.getTaskId(), taskInformation.getInboundMessageId()); + LOG.info("Task {} complete (message id: {})", responseMessage.getTaskId(), taskInformation.getInboundMessageId()); + LOG.info("Setting destination {} in task {} (message id: {})", queue, responseMessage.getTaskId(), taskInformation.getInboundMessageId()); responseMessage.setTo(queue); checkForTrackingTermination(taskInformation, queue, responseMessage); try { @@ -509,7 +509,7 @@ public void complete(final TaskInformation taskInformation, final String queue, @Override public void abandon(final TaskInformation taskInformation, final Exception e) { - LOG.debug("Rejecting message id {}", taskInformation.getInboundMessageId()); + LOG.info("Rejecting message id {}", taskInformation.getInboundMessageId()); workerQueue.rejectTask(taskInformation); stats.incrementTasksRejected(); workerQueue.disconnectIncoming(); diff --git a/worker-queue-rabbit/src/main/java/com/github/workerframework/queues/rabbit/RabbitTaskInformation.java b/worker-queue-rabbit/src/main/java/com/github/workerframework/queues/rabbit/RabbitTaskInformation.java index 3999f9b80..1760532bd 100644 --- a/worker-queue-rabbit/src/main/java/com/github/workerframework/queues/rabbit/RabbitTaskInformation.java +++ b/worker-queue-rabbit/src/main/java/com/github/workerframework/queues/rabbit/RabbitTaskInformation.java @@ -35,6 +35,7 @@ public class RabbitTaskInformation implements TaskInformation { public RabbitTaskInformation(final String inboundMessageId) { this(inboundMessageId, false); + LOG.info("RabbitTaskInformation(final String inboundMessageId) = isPoison=false"); } public RabbitTaskInformation(final String inboundMessageId, final boolean isPoison) { @@ -47,6 +48,7 @@ public RabbitTaskInformation( final Optional trackingJobTaskId ) { + LOG.info("RabbitTaskInformation called with isPoison={}", isPoison); this.inboundMessageId = inboundMessageId; this.responseCount = new AtomicInteger(0); this.isResponseCountFinal = new AtomicBoolean(false); diff --git a/worker-queue-rabbit/src/main/java/com/github/workerframework/queues/rabbit/RabbitWorkerQueue.java b/worker-queue-rabbit/src/main/java/com/github/workerframework/queues/rabbit/RabbitWorkerQueue.java index 8eb866bff..f4b39b380 100644 --- a/worker-queue-rabbit/src/main/java/com/github/workerframework/queues/rabbit/RabbitWorkerQueue.java +++ b/worker-queue-rabbit/src/main/java/com/github/workerframework/queues/rabbit/RabbitWorkerQueue.java @@ -102,7 +102,7 @@ public RabbitWorkerQueue( this.invalidQueue = getInvalidQueueName(config, invalidQueue); this.dataStore = Objects.requireNonNull(dataStore); this.codec = Objects.requireNonNull(codec); - LOG.debug("Initialised"); + LOG.info("Initialised"); } private static String getInvalidQueueName(final RabbitWorkerQueueConfiguration config, final String invalidQueue) @@ -199,7 +199,7 @@ public void publish(TaskInformation taskInformation, TaskMessage taskMessage, St final byte[] serializedTaskMessage; try { if (config.getIsPayloadOffloadingEnabled() && config.getPayloadOffloadingThreshold() < taskMessage.getTaskData().length) { - LOG.debug("Offloading TaskMessage's TaskData to DataStore for message id '{}'", rabbitTaskInformation.getInboundMessageId()); + LOG.info("Offloading TaskMessage's TaskData to DataStore for message id '{}'", rabbitTaskInformation.getInboundMessageId()); final byte[] taskData = taskMessage.getTaskData(); taskMessage.setTaskData(null); serializedTaskMessage = codec.serialise(taskMessage); @@ -209,7 +209,7 @@ public void publish(TaskInformation taskInformation, TaskMessage taskMessage, St final String taskDataStorageRef = dataStore.store(taskData, partialReference); publishHeaders.put(RabbitHeaders.RABBIT_HEADER_CAF_PAYLOAD_OFFLOADING_STORAGE_REF, taskDataStorageRef); } else { - LOG.debug("Not offloading task message for task {}", rabbitTaskInformation.getInboundMessageId()); + LOG.info("Not offloading task message for task {}", rabbitTaskInformation.getInboundMessageId()); serializedTaskMessage = codec.serialise(taskMessage); } } @@ -254,7 +254,7 @@ public void publish(TaskInformation taskInformation, TaskMessage taskMessage, St public void rejectTask(TaskInformation taskInformation) { Objects.requireNonNull(taskInformation); - LOG.debug("Generating reject event for task {}", taskInformation.getInboundMessageId()); + LOG.info("Generating reject event for task {}", taskInformation.getInboundMessageId()); consumerQueue.add(new ConsumerRejectEvent(Long.parseLong(taskInformation.getInboundMessageId()))); } @@ -267,7 +267,7 @@ public void rejectTask(TaskInformation taskInformation) public void discardTask(TaskInformation taskInformation) { Objects.requireNonNull(taskInformation); - LOG.debug("Generating drop event for task {}", taskInformation.getInboundMessageId()); + LOG.info("Generating drop event for task {}", taskInformation.getInboundMessageId()); consumerQueue.add(new ConsumerDropEvent(Long.parseLong(taskInformation.getInboundMessageId()))); } @@ -280,7 +280,7 @@ public void discardTask(TaskInformation taskInformation) public void acknowledgeTask(TaskInformation taskInformation) { Objects.requireNonNull(taskInformation); - LOG.debug("Generating acknowledge event for task {}", taskInformation.getInboundMessageId()); + LOG.info("Generating acknowledge event for task {}", taskInformation.getInboundMessageId()); consumerQueue.add(new ConsumerAckEvent(Long.parseLong(taskInformation.getInboundMessageId()))); } @@ -314,7 +314,7 @@ public String getPausedQueue() @Override public void shutdownIncoming() { - LOG.debug("Closing incoming queues"); + LOG.info("Closing incoming queues"); synchronized (consumerLock) { if (consumerTag != null) { try { @@ -343,7 +343,7 @@ public void shutdownIncoming() @Override public void disconnectIncoming() { - LOG.debug("Disconnecting incoming queues"); + LOG.info("Disconnecting incoming queues"); synchronized (consumerLock) { if (consumerTag != null && incomingChannel.isOpen()) { try { @@ -372,7 +372,7 @@ public void disconnectIncoming() @Override public void reconnectIncoming() { - LOG.debug("Reconnecting incoming queues"); + LOG.info("Reconnecting incoming queues"); synchronized (consumerLock) { if (consumerTag == null && incomingChannel.isOpen()) { try { @@ -387,7 +387,7 @@ public void reconnectIncoming() @Override public void shutdown() { - LOG.debug("Shutting down"); + LOG.info("Shutting down"); try { if (consumer != null) { consumer.shutdown(); diff --git a/worker-queue-rabbit/src/main/java/com/github/workerframework/queues/rabbit/WorkerQueueConsumerImpl.java b/worker-queue-rabbit/src/main/java/com/github/workerframework/queues/rabbit/WorkerQueueConsumerImpl.java index 8bb715a3d..a2bbbd1e4 100644 --- a/worker-queue-rabbit/src/main/java/com/github/workerframework/queues/rabbit/WorkerQueueConsumerImpl.java +++ b/worker-queue-rabbit/src/main/java/com/github/workerframework/queues/rabbit/WorkerQueueConsumerImpl.java @@ -127,61 +127,121 @@ public void processDelivery(Delivery delivery) final String routingKey = delivery.getEnvelope().getRoutingKey(); final Map deliveryHeaders = delivery.getHeaders(); final boolean isRedelivered = delivery.getEnvelope().isRedeliver(); - final int retries = deliveryHeaders.containsKey(RabbitHeaders.RABBIT_HEADER_CAF_DELIVERY_COUNT) - ? Integer.parseInt(String.valueOf(deliveryHeaders.getOrDefault(RabbitHeaders.RABBIT_HEADER_CAF_DELIVERY_COUNT, "0"))) - : Integer.parseInt(String.valueOf(deliveryHeaders.getOrDefault(RabbitHeaders.RABBIT_HEADER_CAF_WORKER_RETRY, "0"))); + + // Determine retry count - log which header is being used + final boolean hasDeliveryCountHeader = deliveryHeaders.containsKey(RabbitHeaders.RABBIT_HEADER_CAF_DELIVERY_COUNT); + LOG.info("hasDeliveryCountHeader=" + hasDeliveryCountHeader); + final boolean hasWorkerRetryHeader = deliveryHeaders.containsKey(RabbitHeaders.RABBIT_HEADER_CAF_WORKER_RETRY); + LOG.info("hasWorkerRetryHeader=" + hasWorkerRetryHeader); + final Object deliveryCountValue = deliveryHeaders.get(RabbitHeaders.RABBIT_HEADER_CAF_DELIVERY_COUNT); + final Object workerRetryValue = deliveryHeaders.get(RabbitHeaders.RABBIT_HEADER_CAF_WORKER_RETRY); + + LOG.info("RETRY CALCULATION START - messageId={}, hasDeliveryCountHeader={}, hasWorkerRetryHeader={}, " + + "deliveryCountValue={} (type={}), workerRetryValue={} (type={})", + inboundMessageId, hasDeliveryCountHeader, hasWorkerRetryHeader, + deliveryCountValue, (deliveryCountValue != null ? deliveryCountValue.getClass().getSimpleName() : "null"), + workerRetryValue, (workerRetryValue != null ? workerRetryValue.getClass().getSimpleName() : "null")); + + final int retries; + if (hasDeliveryCountHeader) { + final String deliveryCountStr = String.valueOf(deliveryHeaders.getOrDefault(RabbitHeaders.RABBIT_HEADER_CAF_DELIVERY_COUNT, "0")); + retries = Integer.parseInt(deliveryCountStr); + LOG.info("RETRY CALCULATION - Using DELIVERY_COUNT header: messageId={}, rawValue={}, stringValue='{}', parsedRetries={}, " + + "reason='Quorum queue - delivery count header present'", + inboundMessageId, deliveryCountValue, deliveryCountStr, retries); + } else { + final String workerRetryStr = String.valueOf(deliveryHeaders.getOrDefault(RabbitHeaders.RABBIT_HEADER_CAF_WORKER_RETRY, "0")); + retries = Integer.parseInt(workerRetryStr); + LOG.info("RETRY CALCULATION - Using WORKER_RETRY header: messageId={}, rawValue={}, stringValue='{}', parsedRetries={}, " + + "reason='Classic queue - no delivery count header, using worker retry header'", + inboundMessageId, workerRetryValue, workerRetryStr, retries); + } + + + final Optional taskMessageStorageRefOpt = Optional.ofNullable(deliveryHeaders.get(RABBIT_HEADER_CAF_PAYLOAD_OFFLOADING_STORAGE_REF)).map(Object::toString); + + LOG.info("Processing delivery: messageId={}, routingKey={}, isRedelivered={}, retries={}, retryLimit={}, " + + "hasDeliveryCountHeader={}, hasRetryHeader={}, deliveryCountValue={}, retryHeaderValue={}, " + + "hasStorageRef={}, storageRef={}, allHeaders={}", + inboundMessageId, routingKey, isRedelivered, retries, retryLimit, + deliveryHeaders.containsKey(RabbitHeaders.RABBIT_HEADER_CAF_DELIVERY_COUNT), + deliveryHeaders.containsKey(RabbitHeaders.RABBIT_HEADER_CAF_WORKER_RETRY), + deliveryHeaders.get(RabbitHeaders.RABBIT_HEADER_CAF_DELIVERY_COUNT), + deliveryHeaders.get(RabbitHeaders.RABBIT_HEADER_CAF_WORKER_RETRY), + taskMessageStorageRefOpt.isPresent(), taskMessageStorageRefOpt.orElse("none"), + deliveryHeaders); + metrics.incrementReceived(); final byte[] deliveryMessageData = delivery.getMessageData(); final TaskMessage taskMessage; try { try { + LOG.info("Attempting to deserialize TaskMessage for messageId={}", inboundMessageId); taskMessage = codec.deserialise(deliveryMessageData, TaskMessage.class, DecodeMethod.LENIENT); + LOG.info("Successfully deserialized TaskMessage for messageId={}, taskId={}, taskClassifier={}", + inboundMessageId, taskMessage.getTaskId(), taskMessage.getTaskClassifier()); } catch (final CodecException e) { + LOG.error("Failed to deserialize TaskMessage for messageId={}", inboundMessageId, e); handleInvalidDelivery(inboundMessageId, deliveryMessageData, deliveryHeaders, - "Cannot deserialize delivery messageData to TaskMessage"); + "Cannot deserialize delivery messageData to TaskMessage"); return; } try { + LOG.info("Handling taskData injection for messageId={}, hasStorageRef={}", + inboundMessageId, taskMessageStorageRefOpt.isPresent()); handleTaskDataInjection(taskMessage, inboundMessageId, taskMessageStorageRefOpt); + LOG.info("Successfully handled taskData injection for messageId={}", inboundMessageId); } catch (final InvalidDeliveryException ex) { + LOG.error("Invalid delivery detected for messageId={}: {}", inboundMessageId, ex.getMessage(), ex); handleMisingOffloadedPayload(inboundMessageId, taskMessage, deliveryMessageData, deliveryHeaders, - ex.getMessage()); + ex.getMessage()); return; } final PoisonMessageStatus poisonMessageStatus = getPoisonMessageStatus( isRedelivered, deliveryHeaders, retries); + LOG.info("Determined poison message status for messageId={}: status={}, isRedelivered={}, retries={}, retryLimit={}", + inboundMessageId, poisonMessageStatus, isRedelivered, retries, retryLimit); + if (poisonMessageStatus == PoisonMessageStatus.CLASSIC_POSSIBLY_POISON) { + LOG.info("Message is CLASSIC_POSSIBLY_POISON, republishing to retry queue: messageId={}, " + + "routingKey={}, retries={}, retryLimit={}", + inboundMessageId, delivery.getEnvelope().getRoutingKey(), retries, retryLimit); republishClassicRedelivery( - delivery.getEnvelope().getRoutingKey(), - inboundMessageId, - deliveryMessageData, - taskMessage.getTaskData(), - deliveryHeaders, - retries, - taskMessage.getTracking(), - taskMessageStorageRefOpt + delivery.getEnvelope().getRoutingKey(), + inboundMessageId, + deliveryMessageData, + taskMessage.getTaskData(), + deliveryHeaders, + retries, + taskMessage.getTracking(), + taskMessageStorageRefOpt ); return; } + LOG.info("Processing delivery normally: messageId={}, isPoison={}", + inboundMessageId, poisonMessageStatus == PoisonMessageStatus.POISON); processDelivery( - inboundMessageId, - routingKey, - deliveryHeaders, - taskMessage, - deliveryMessageData, - poisonMessageStatus == PoisonMessageStatus.POISON + inboundMessageId, + routingKey, + deliveryHeaders, + taskMessage, + deliveryMessageData, + poisonMessageStatus == PoisonMessageStatus.POISON ); } catch (final TransientDeliveryException e) { LOG.warn("Transient error processing message id {}, disconnecting.", inboundMessageId, e); + LOG.info("Removing offloaded payload from deletion list: messageId={}, hadPayload={}", + inboundMessageId, offloadedPayloadsToDelete.containsKey(inboundMessageId)); offloadedPayloadsToDelete.remove(inboundMessageId); //Disconnect the channel to allow for a reconnect when the HealthCheck passes. + LOG.info("Triggering disconnect callback due to transient error: messageId={}", inboundMessageId); disconnectCallback.run(); } } @@ -192,43 +252,65 @@ public void processDelivery(Delivery delivery) * If invalid, handles as poison message (publishes to retry queue) and returns false. */ private void handleTaskDataInjection(final TaskMessage taskMessage, final long inboundMessageId, - final Optional taskMessageStorageRefOpt) + final Optional taskMessageStorageRefOpt) throws InvalidDeliveryException, TransientDeliveryException { final byte[] currentTaskData = taskMessage.getTaskData(); final boolean hasStorageRef = taskMessageStorageRefOpt.isPresent(); final boolean hasTaskData = currentTaskData != null; + LOG.info("Checking taskData injection requirements: messageId={}, hasTaskData={}, hasStorageRef={}, storageRef={}", + inboundMessageId, hasTaskData, hasStorageRef, taskMessageStorageRefOpt.orElse("none")); + if (hasTaskData && hasStorageRef) { + LOG.error("INVALID: Message has both taskData and storage reference: messageId={}, storageRef={}", + inboundMessageId, taskMessageStorageRefOpt.get()); throw new InvalidDeliveryException( - "TaskMessage contains both taskData and a storage reference. This is invalid.", inboundMessageId); + "TaskMessage contains both taskData and a storage reference. This is invalid.", inboundMessageId); } if (!hasTaskData && !hasStorageRef) { + LOG.error("INVALID: Message has neither taskData nor storage reference: messageId={}", inboundMessageId); throw new InvalidDeliveryException( - "TaskMessage contains neither taskData nor a storage reference. This is invalid.", inboundMessageId); + "TaskMessage contains neither taskData nor a storage reference. This is invalid.", inboundMessageId); } if (hasStorageRef) { + LOG.info("Retrieving offloaded taskData from store: messageId={}, storageRef={}", + inboundMessageId, taskMessageStorageRefOpt.get()); final byte[] offloadedTaskData; try { offloadedTaskData = retrieveTaskDataFromStore(taskMessageStorageRefOpt.get(), inboundMessageId); + LOG.info("Successfully retrieved offloaded taskData: messageId={}, dataSize={} bytes", + inboundMessageId, offloadedTaskData.length); } catch (final ReferenceNotFoundException e) { + LOG.error("Offloaded payload not found: messageId={}, storageRef={}", + inboundMessageId, taskMessageStorageRefOpt.get(), e); throw new InvalidDeliveryException(e.getMessage(), inboundMessageId); } taskMessage.setTaskData(offloadedTaskData); + LOG.info("Injected offloaded taskData into TaskMessage: messageId={}", inboundMessageId); + } else { + LOG.info("Using inline taskData (no offloading): messageId={}, dataSize={} bytes", + inboundMessageId, currentTaskData.length); } // If hasTaskData and !hasStorageRef, nothing to do } - private byte[] retrieveTaskDataFromStore(final String taskMessageStorageRef, final long inboundMessageId) - throws ReferenceNotFoundException, TransientDeliveryException + private byte[] retrieveTaskDataFromStore(final String taskMessageStorageRef, final long inboundMessageId) + throws ReferenceNotFoundException, TransientDeliveryException { + LOG.info("Retrieving taskData from datastore: messageId={}, storageRef={}", inboundMessageId, taskMessageStorageRef); try (final var inputStream = dataStore.retrieve(taskMessageStorageRef)) { final var taskData = inputStream.readAllBytes(); offloadedPayloadsToDelete.put(inboundMessageId, taskMessageStorageRef); + LOG.info("Successfully retrieved and scheduled for deletion: messageId={}, storageRef={}, dataSize={} bytes", + inboundMessageId, taskMessageStorageRef, taskData.length); return taskData; } catch (final ReferenceNotFoundException ex) { + LOG.error("Storage reference not found: messageId={}, storageRef={}", inboundMessageId, taskMessageStorageRef, ex); throw ex; } catch (final IOException | DataStoreException ex) { + LOG.error("Transient error retrieving from datastore: messageId={}, storageRef={}", + inboundMessageId, taskMessageStorageRef, ex); throw new TransientDeliveryException( "TaskMessage's TaskData could not be retrieved from DataStore", inboundMessageId, ex); } @@ -241,32 +323,47 @@ private void handleInvalidDelivery( final String exceptionMesssage ) { + LOG.info("Handling invalid delivery: messageId={}, reason={}, invalidRoutingKey={}", + inboundMessageId, exceptionMesssage, invalidRoutingKey); + final RabbitTaskInformation taskInformation = new RabbitTaskInformation(String.valueOf(inboundMessageId), true); taskInformation.incrementResponseCount(true); final var publishHeaders = new HashMap<>(deliveryHeaders); publishHeaders.put(RABBIT_HEADER_CAF_WORKER_INVALID, exceptionMesssage); + LOG.info("Publishing invalid message: messageId={}, routingKey={}, headerAdded={}", + inboundMessageId, invalidRoutingKey, RABBIT_HEADER_CAF_WORKER_INVALID); publisherEventQueue.add(new WorkerPublishQueueEvent(deliveryMessageData, invalidRoutingKey, taskInformation, publishHeaders)); } private void handleMisingOffloadedPayload( - final long inboundMessageId, - final TaskMessage taskMessage, - final byte[] deliveryMessageData, - final Map deliveryHeaders, - final String exceptionMessage + final long inboundMessageId, + final TaskMessage taskMessage, + final byte[] deliveryMessageData, + final Map deliveryHeaders, + final String exceptionMessage ) { + LOG.info("Handling missing offloaded payload: messageId={}, reason={}, queue={}", + inboundMessageId, exceptionMessage, missingOffloadedPayloadQueue); + try { final RabbitTaskInformation taskInformation = new RabbitTaskInformation(String.valueOf(inboundMessageId), true); taskInformation.incrementResponseCount(true); final var publishHeaders = new HashMap<>(deliveryHeaders); publishHeaders.put(RABBIT_HEADER_CAF_PAYLOAD_OFFLOADING_MISSING, exceptionMessage); + LOG.info("Publishing to missing offloaded payload queue: messageId={}, queue={}, headerAdded={}", + inboundMessageId, missingOffloadedPayloadQueue, RABBIT_HEADER_CAF_PAYLOAD_OFFLOADING_MISSING); publisherEventQueue.add(new WorkerPublishQueueEvent(deliveryMessageData, missingOffloadedPayloadQueue, taskInformation, publishHeaders)); if(taskMessage.getTracking() != null) { + LOG.info("Sending failure tracking report: messageId={}, trackingPipe={}, jobTaskId={}", + inboundMessageId, taskMessage.getTracking().getTrackingPipe(), + taskMessage.getTracking().getJobTaskId()); sendFailureTrackingReport(taskMessage, exceptionMessage, taskInformation); + } else { + LOG.info("No tracking info, skipping failure report: messageId={}", inboundMessageId); } } catch (final CodecException e) { LOG.error("Failed to serialise report update task data."); @@ -309,7 +406,7 @@ private void sendFailureTrackingReport( taskMessage.getCorrelationId()); publisherEventQueue.add(new WorkerPublishQueueEvent(codec.serialise(failureReportTaskMessage), - trackingInfo.getTrackingPipe(), rabbitTaskInformation, Collections.emptyMap())); + trackingInfo.getTrackingPipe(), rabbitTaskInformation, Collections.emptyMap())); } private PoisonMessageStatus getPoisonMessageStatus( @@ -317,20 +414,37 @@ private PoisonMessageStatus getPoisonMessageStatus( final Map deliveryHeaders, final int retries ) { + LOG.info("Evaluating poison message status: isRedelivered={}, retries={}, retryLimit={}, " + + "hasDeliveryCountHeader={}, hasRetryHeader={}", + isRedelivered, retries, retryLimit, + deliveryHeaders.containsKey(RabbitHeaders.RABBIT_HEADER_CAF_DELIVERY_COUNT), + deliveryHeaders.containsKey(RabbitHeaders.RABBIT_HEADER_CAF_WORKER_RETRY)); + // If the message is being redelivered it is potentially a poison message. if (isRedelivered) { + LOG.info("Message IS redelivered, checking if classic queue..."); // If the headers do not contain the delivery count, then it is a classic queue. if (!deliveryHeaders.containsKey(RabbitHeaders.RABBIT_HEADER_CAF_DELIVERY_COUNT)) { + LOG.info("Classic queue detected (no delivery count header). Checking retry limit..."); // If the retries have not been exceeded, then republish the message // with a header recording the retry count if (retries < retryLimit) { + LOG.info("Retries ({}) < retryLimit ({}), returning CLASSIC_POSSIBLY_POISON", retries, retryLimit); return PoisonMessageStatus.CLASSIC_POSSIBLY_POISON; } + LOG.info("Retries ({}) >= retryLimit ({}), will check final poison status", retries, retryLimit); + } else { + LOG.info("Quorum queue detected (has delivery count header)"); } - return (retries >= retryLimit) + + final PoisonMessageStatus status = (retries >= retryLimit) ? PoisonMessageStatus.POISON : PoisonMessageStatus.NOT_POISON; + LOG.info("Final redelivered message status: {} (retries={}, retryLimit={})", status, retries, retryLimit); + return status; } + + LOG.info("Message is NOT redelivered, returning NOT_POISON"); return PoisonMessageStatus.NOT_POISON; } @@ -344,20 +458,29 @@ private void processDelivery( ) { final TrackingInfo trackingInfo = taskMessage.getTracking(); final String trackingJobTaskId = trackingInfo != null ? trackingInfo.getJobTaskId() : "untracked"; + + LOG.info("Processing delivery with poison status: messageId={}, routingKey={}, isPoison={}, " + + "trackingJobTaskId={}, taskId={}", + inboundMessageId, routingKey, isPoison, trackingJobTaskId, taskMessage.getTaskId()); + final RabbitTaskInformation taskInformation = new RabbitTaskInformation( String.valueOf(inboundMessageId), isPoison, Optional.of(trackingJobTaskId) ); try { LOG.debug("Registering new message {}", inboundMessageId); + LOG.info("Calling callback.registerNewTask for messageId={}", inboundMessageId); callback.registerNewTask(taskInformation, taskMessage, deliveryHeaders); + LOG.info("Successfully registered new task for messageId={}", inboundMessageId); } catch (final InvalidTaskException e) { LOG.error("Cannot register new message, rejecting {}", inboundMessageId, e); + LOG.info("Publishing to invalid queue: messageId={}, routingKey={}", inboundMessageId, invalidRoutingKey); taskInformation.incrementResponseCount(true); final var publishHeaders = new HashMap(); publishHeaders.put(RabbitHeaders.RABBIT_HEADER_CAF_WORKER_INVALID, e); publisherEventQueue.add(new WorkerPublishQueueEvent(taskMessageByteArray, invalidRoutingKey, taskInformation, publishHeaders)); } catch (final TaskRejectedException e) { LOG.warn("Message {} rejected as a task at this time, returning to queue", inboundMessageId, e); + LOG.info("Republishing rejected task: messageId={}, routingKey={}", inboundMessageId, routingKey); taskInformation.incrementResponseCount(true); publisherEventQueue.add(new WorkerPublishQueueEvent(taskMessageByteArray, routingKey, taskInformation, deliveryHeaders)); } @@ -367,12 +490,16 @@ private void processDelivery( public void processAck(long tag) { if (tag == -1) { + LOG.info("Skipping ack for invalid tag: -1"); return; } + LOG.info("Processing ACK: messageId={}", tag); + try { LOG.debug("Acknowledging message {}", tag); channel.basicAck(tag, false); + LOG.info("Successfully acknowledged message: messageId={}", tag); } catch (IOException e) { LOG.warn("Couldn't ack message {}, will retry", tag, e); metrics.incremementErrors(); @@ -382,12 +509,17 @@ public void processAck(long tag) final String datastorePayloadReference = offloadedPayloadsToDelete.remove(tag); if (datastorePayloadReference != null) { + LOG.info("Deleting offloaded payload: messageId={}, storageRef={}", tag, datastorePayloadReference); try { dataStore.delete(datastorePayloadReference, true); + LOG.info("Successfully deleted offloaded payload: messageId={}, storageRef={}", + tag, datastorePayloadReference); } catch (final DataStoreException e) { LOG.warn("Couldn't delete offloaded payload '{}' for delivery tag '{}' from datastore message.", datastorePayloadReference, tag, e); } + } else { + LOG.info("No offloaded payload to delete for messageId={}", tag); } } @@ -416,13 +548,17 @@ private void processReject(long id, boolean requeue) return; } + LOG.info("Processing REJECT: messageId={}, requeue={}", id, requeue); + try { channel.basicReject(id, requeue); if (requeue) { LOG.debug("Rejecting message {}", id); + LOG.info("Message rejected and requeued: messageId={}", id); metrics.incrementRejected(); } else { LOG.warn("Dropping message {}", id); + LOG.info("Message dropped (sent to DLX): messageId={}", id); metrics.incrementDropped(); } } catch (IOException e) { @@ -446,31 +582,59 @@ private void republishClassicRedelivery( final String trackingJobTaskId = tracking != null ? tracking.getJobTaskId() : "untracked"; final RabbitTaskInformation taskInformation = new RabbitTaskInformation( String.valueOf(inboundMessageId), false, Optional.of(trackingJobTaskId)); + + LOG.info("Republishing classic redelivery: messageId={}, deliveryQueue={}, retryRoutingKey={}, " + + "currentRetries={}, newRetries={}, retryLimit={}, hasStorageRef={}, trackingJobTaskId={}", + inboundMessageId, deliveryQueue, retryRoutingKey, retries, retries + 1, + retryLimit, taskMessageStorageRefOpt.isPresent(), trackingJobTaskId); + LOG.debug("Received redelivered message with id {}, retry count {}, retry limit {}, republishing to retry queue", inboundMessageId, retryLimit, retries + 1); + final Map publishHeaders = new HashMap<>(deliveryHeaders); publishHeaders.put(RabbitHeaders.RABBIT_HEADER_CAF_WORKER_RETRY, String.valueOf(retries + 1)); + + LOG.info("Updated retry header: messageId={}, {}={}", + inboundMessageId, RabbitHeaders.RABBIT_HEADER_CAF_WORKER_RETRY, retries + 1); + taskInformation.incrementResponseCount(true); + if(taskMessageStorageRefOpt.isPresent()) { + LOG.info("Message has offloaded payload: messageId={}, storageRef={}", + inboundMessageId, taskMessageStorageRefOpt.get()); + if (!retryRoutingKey.equals(deliveryQueue)) { + LOG.info("Different routing key detected, relocating payload: messageId={}, from={}, to={}", + inboundMessageId, deliveryQueue, retryRoutingKey); try { - final String newStorageReference = - dataStore.store(serializedTaskData, - taskMessageStorageRefOpt.get().replace(deliveryQueue, retryRoutingKey)); + final String newStorageReference = + dataStore.store(serializedTaskData, + taskMessageStorageRefOpt.get().replace(deliveryQueue, retryRoutingKey)); publishHeaders.put(RABBIT_HEADER_CAF_PAYLOAD_OFFLOADING_STORAGE_REF, newStorageReference); - } + LOG.info("Successfully relocated payload: messageId={}, oldRef={}, newRef={}", + inboundMessageId, taskMessageStorageRefOpt.get(), newStorageReference); + } catch (final DataStoreException e) { LOG.error("Failed to relocate offloaded payload for message id {} from {} to {}", - inboundMessageId, deliveryQueue, retryRoutingKey, e); + inboundMessageId, deliveryQueue, retryRoutingKey, e); //Disconnect the channel to allow for a reconnect when the HealthCheck passes. disconnectCallback.run(); + return; } } else { + LOG.info("Same routing key, reusing existing payload: messageId={}, routingKey={}, storageRef={}", + inboundMessageId, retryRoutingKey, taskMessageStorageRefOpt.get()); //We are reusing the same routing key, so we do not need to relocate the payload. offloadedPayloadsToDelete.remove(inboundMessageId); } + } else { + LOG.info("Message has inline payload (no offloaded storage): messageId={}", inboundMessageId); } + + LOG.info("Adding message to publisher queue: messageId={}, routingKey={}, updatedHeaders={}", + inboundMessageId, retryRoutingKey, publishHeaders); publisherEventQueue.add(new WorkerPublishQueueEvent(serializedTaskMessage, retryRoutingKey, taskInformation, publishHeaders)); + LOG.info("Successfully queued classic redelivery for republishing: messageId={}", inboundMessageId); } -} +} \ No newline at end of file