Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,8 @@ public static void declareQueue(Channel channel, String queueName, Durability du
Objects.requireNonNull(act);
Objects.requireNonNull(queueProps);
try {
LOG.info("Declaring queue: {} with durability: {}, exclusivity: {}, auto-remove: {}, properties: {}",
queueName, dur, excl, act, queueProps);
channel.queueDeclare(queueName, dur == Durability.DURABLE, excl == Exclusivity.EXCLUSIVE, act == EmptyAction.AUTO_REMOVE, queueProps);
} catch (IOException e) {
LOG.warn("IO Exception encountered during queueDeclare. Will try do declare passively.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package com.github.workerframework.api;

import java.nio.charset.StandardCharsets;

/**
* This interface should be implemented by CAF Workers which are able to process multiple tasks together.
*
Expand All @@ -31,4 +33,26 @@ public interface BulkWorker
*/
void processTasks(BulkWorkerRuntime runtime)
throws InterruptedException;

/**
* If a message has been identified as a poison message, prepare a WorkerResponse that includes the friendly name
* of the worker.
* For compatibility with existing Worker implementations a default implementation has been provided.
*
* @param workerFriendlyName the worker's friendly name
* @return a response containing details of the worker that encountered a poison message
*/
default WorkerResponse getPoisonMessageResult(String workerFriendlyName, final WorkerTask workerTask) {
final String strData = workerFriendlyName + " could not process the item.";
final byte[] byteArrayData = strData.getBytes(StandardCharsets.UTF_8);

// TODO
// In Abstract Worker we have:
// return new WorkerResponse(getResultQueue(), TaskStatus.RESULT_EXCEPTION, getExceptionData(t), getWorkerIdentifier(), getWorkerApiVersion(), null);
//
// I don't know what the equivalent of getResultQueue(), getWorkerIdentifier() and getWorkerApiVersion() are here,
// or where we could ge them from in the BulkWorker?
return new WorkerResponse(
workerTask.getTo(), TaskStatus.RESULT_EXCEPTION, byteArrayData, "", workerTask.getVersion(), null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,34 +15,74 @@
*/
package com.github.workerframework.core;

import com.github.workerframework.api.BulkWorker;
import com.github.workerframework.api.BulkWorkerRuntime;
import com.github.workerframework.api.TaskMessage;
import com.github.workerframework.api.TaskStatus;
import com.github.workerframework.api.WorkerTask;
import com.google.common.base.MoreObjects;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class BulkWorkerTaskProvider implements BulkWorkerRuntime
{
private static final Logger LOG = LoggerFactory.getLogger(BulkWorkerTaskProvider.class);
private WorkerTaskImpl firstTask;
private final BlockingQueue<WorkerTaskImpl> workQueue;
private final ArrayList<WorkerTaskImpl> consumedTasks;
private final BulkWorker bulkWorker;
private final String bulkWorkerFriendlyName;

public BulkWorkerTaskProvider(
final WorkerTaskImpl firstTask,
final BlockingQueue<WorkerTaskImpl> workQueue
)
final BlockingQueue<WorkerTaskImpl> workQueue,
final BulkWorker bulkWorker,
final String bulkWorkerFriendlyName)
{
this.firstTask = Objects.requireNonNull(firstTask);
this.workQueue = Objects.requireNonNull(workQueue);
this.bulkWorker = Objects.requireNonNull(bulkWorker);
this.bulkWorkerFriendlyName = Objects.requireNonNull(bulkWorkerFriendlyName);
this.consumedTasks = new ArrayList<>();
}

@Override
public WorkerTask getNextWorkerTask()
{
return registerTaskConsumed(getNextWorkerTaskImpl());
try {
return getNextWorkerTaskInternal(null);
} catch (final InterruptedException e) {
// Should never happen - no-arg version doesn't block
throw new RuntimeException(e);
}
}

@Override
public WorkerTask getNextWorkerTask(long millis) throws InterruptedException
{
return getNextWorkerTaskInternal(millis);
}

private WorkerTask getNextWorkerTaskInternal(final Long millis) throws InterruptedException
{
final WorkerTaskImpl workerTask = registerTaskConsumed(
millis == null ? getNextWorkerTaskImpl() : getNextWorkerTaskImpl(millis)
);

if (workerTask != null && workerTask.isPoison()) {
LOG.info("Received poison message, generating poison response for worker: {}", bulkWorkerFriendlyName);
workerTask.setResponse(bulkWorker.getPoisonMessageResult(bulkWorkerFriendlyName, workerTask));
sendCopyToReject(workerTask);
return getNextWorkerTaskInternal(millis);
}

return workerTask;
}

private WorkerTaskImpl getNextWorkerTaskImpl()
Expand All @@ -56,12 +96,6 @@ private WorkerTaskImpl getNextWorkerTaskImpl()
}
}

@Override
public WorkerTask getNextWorkerTask(long millis) throws InterruptedException
{
return registerTaskConsumed(getNextWorkerTaskImpl(millis));
}

private WorkerTaskImpl getNextWorkerTaskImpl(long millis)
throws InterruptedException
{
Expand Down Expand Up @@ -91,4 +125,22 @@ private WorkerTaskImpl registerTaskConsumed(WorkerTaskImpl workerTask)
}
return workerTask;
}

private void sendCopyToReject(final WorkerTaskImpl workerTask) {
final TaskMessage poisonMessage = new TaskMessage(
UUID.randomUUID().toString(),
MoreObjects.firstNonNull(workerTask.getClassifier(), ""),
workerTask.getVersion(),
workerTask.getData(),
TaskStatus.RESULT_EXCEPTION,
Collections.emptyMap(),
workerTask.getRejectQueue(),
workerTask.getTrackingInfo(),
workerTask.getSourceInfo(),
workerTask.getCorrelationId());

LOG.info("Sending poison message to: {}", workerTask.getRejectQueue());

workerTask.sendMessage(poisonMessage);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@
final class BulkWorkerThreadPool implements WorkerThreadPool
{
private static final Logger LOG = LoggerFactory.getLogger(BulkWorkerThreadPool.class);
private static final String CAF_WORKER_FRIENDLY_NAME = System.getenv("CAF_WORKER_FRIENDLY_NAME");

private final BulkWorker bulkWorker;
private final String bulkWorkerFriendlyName;
private final BlockingQueue<WorkerTaskImpl> workQueue;
private final BulkWorkerThread[] bulkWorkerThreads;
private final Runnable throwableHandler;
Expand All @@ -45,6 +47,8 @@ public BulkWorkerThreadPool(
final int nThreads = workerFactory.getWorkerThreads();

this.bulkWorker = (BulkWorker) workerFactory;
this.bulkWorkerFriendlyName = CAF_WORKER_FRIENDLY_NAME != null
? CAF_WORKER_FRIENDLY_NAME : bulkWorker.getClass().getSimpleName();
this.workQueue = new LinkedBlockingQueue<>();
this.bulkWorkerThreads = new BulkWorkerThread[nThreads];
this.throwableHandler = handler;
Expand Down Expand Up @@ -83,7 +87,7 @@ private void execute()
{
final WorkerTaskImpl task = workQueue.take();
final BulkWorkerTaskProvider taskProvider
= new BulkWorkerTaskProvider(task, workQueue);
= new BulkWorkerTaskProvider(task, workQueue, bulkWorker, bulkWorkerFriendlyName);

try {
bulkWorker.processTasks(taskProvider);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,12 @@ public void run()

final WorkerResponse response;
if(workerTask.isPoison()) {
LOG.info("Received poison message, generating poison response for worker: {}", workerFriendlyName);
response = worker.getPoisonMessageResult(workerFriendlyName);
sendCopyToReject();
}
else {
LOG.info("workerTask.isPoison() == false");
Timer.Context t = TIMER.time();
MDC.put(CORRELATION_ID, workerTask.getCorrelationId());
response = worker.doWork();
Expand Down Expand Up @@ -112,6 +114,9 @@ private void sendCopyToReject() {
workerTask.getTrackingInfo(),
workerTask.getSourceInfo(),
workerTask.getCorrelationId());

LOG.info("Sending poison message to: {}", workerTask.getRejectQueue());

workerTask.sendMessage(poisonMessage);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ private void registerNewTaskImpl(final TaskInformation taskInformation, final Ta
throws InvalidTaskException, TaskRejectedException
{
try {
LOG.debug("Received task {} (message id: {})", tm.getTaskId(), taskInformation.getInboundMessageId());
LOG.info("Received task {} (message id: {})", tm.getTaskId(), taskInformation.getInboundMessageId());
validateTaskMessage(tm);
final JobStatus jobStatus;
try {
Expand Down Expand Up @@ -446,7 +446,7 @@ public void send(final TaskInformation taskInformation, final TaskMessage respon
{
Objects.requireNonNull(taskInformation);
Objects.requireNonNull(responseMessage);
LOG.debug("Sending task {} complete (message id: {})", responseMessage.getTaskId(), taskInformation.getInboundMessageId());
LOG.info("Sending task {} complete (message id: {})", responseMessage.getTaskId(), taskInformation.getInboundMessageId());

final String queue = responseMessage.getTo();
checkForTrackingTermination(taskInformation, queue, responseMessage);
Expand All @@ -470,8 +470,8 @@ public void complete(final TaskInformation taskInformation, final String queue,
Objects.requireNonNull(taskInformation);
Objects.requireNonNull(responseMessage);
// queue can be null for a dead end worker
LOG.debug("Task {} complete (message id: {})", responseMessage.getTaskId(), taskInformation.getInboundMessageId());
LOG.debug("Setting destination {} in task {} (message id: {})", queue, responseMessage.getTaskId(), taskInformation.getInboundMessageId());
LOG.info("Task {} complete (message id: {})", responseMessage.getTaskId(), taskInformation.getInboundMessageId());
LOG.info("Setting destination {} in task {} (message id: {})", queue, responseMessage.getTaskId(), taskInformation.getInboundMessageId());
responseMessage.setTo(queue);
checkForTrackingTermination(taskInformation, queue, responseMessage);
try {
Expand Down Expand Up @@ -509,7 +509,7 @@ public void complete(final TaskInformation taskInformation, final String queue,
@Override
public void abandon(final TaskInformation taskInformation, final Exception e)
{
LOG.debug("Rejecting message id {}", taskInformation.getInboundMessageId());
LOG.info("Rejecting message id {}", taskInformation.getInboundMessageId());
workerQueue.rejectTask(taskInformation);
stats.incrementTasksRejected();
workerQueue.disconnectIncoming();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class RabbitTaskInformation implements TaskInformation {

public RabbitTaskInformation(final String inboundMessageId) {
this(inboundMessageId, false);
LOG.info("RabbitTaskInformation(final String inboundMessageId) = isPoison=false");
}

public RabbitTaskInformation(final String inboundMessageId, final boolean isPoison) {
Expand All @@ -47,6 +48,7 @@ public RabbitTaskInformation(
final Optional<String> trackingJobTaskId
)
{
LOG.info("RabbitTaskInformation called with isPoison={}", isPoison);
this.inboundMessageId = inboundMessageId;
this.responseCount = new AtomicInteger(0);
this.isResponseCountFinal = new AtomicBoolean(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public RabbitWorkerQueue(
this.invalidQueue = getInvalidQueueName(config, invalidQueue);
this.dataStore = Objects.requireNonNull(dataStore);
this.codec = Objects.requireNonNull(codec);
LOG.debug("Initialised");
LOG.info("Initialised");
}

private static String getInvalidQueueName(final RabbitWorkerQueueConfiguration config, final String invalidQueue)
Expand Down Expand Up @@ -199,7 +199,7 @@ public void publish(TaskInformation taskInformation, TaskMessage taskMessage, St
final byte[] serializedTaskMessage;
try {
if (config.getIsPayloadOffloadingEnabled() && config.getPayloadOffloadingThreshold() < taskMessage.getTaskData().length) {
LOG.debug("Offloading TaskMessage's TaskData to DataStore for message id '{}'", rabbitTaskInformation.getInboundMessageId());
LOG.info("Offloading TaskMessage's TaskData to DataStore for message id '{}'", rabbitTaskInformation.getInboundMessageId());
final byte[] taskData = taskMessage.getTaskData();
taskMessage.setTaskData(null);
serializedTaskMessage = codec.serialise(taskMessage);
Expand All @@ -209,7 +209,7 @@ public void publish(TaskInformation taskInformation, TaskMessage taskMessage, St
final String taskDataStorageRef = dataStore.store(taskData, partialReference);
publishHeaders.put(RabbitHeaders.RABBIT_HEADER_CAF_PAYLOAD_OFFLOADING_STORAGE_REF, taskDataStorageRef);
} else {
LOG.debug("Not offloading task message for task {}", rabbitTaskInformation.getInboundMessageId());
LOG.info("Not offloading task message for task {}", rabbitTaskInformation.getInboundMessageId());
serializedTaskMessage = codec.serialise(taskMessage);
}
}
Expand Down Expand Up @@ -254,7 +254,7 @@ public void publish(TaskInformation taskInformation, TaskMessage taskMessage, St
public void rejectTask(TaskInformation taskInformation)
{
Objects.requireNonNull(taskInformation);
LOG.debug("Generating reject event for task {}", taskInformation.getInboundMessageId());
LOG.info("Generating reject event for task {}", taskInformation.getInboundMessageId());
consumerQueue.add(new ConsumerRejectEvent(Long.parseLong(taskInformation.getInboundMessageId())));
}

Expand All @@ -267,7 +267,7 @@ public void rejectTask(TaskInformation taskInformation)
public void discardTask(TaskInformation taskInformation)
{
Objects.requireNonNull(taskInformation);
LOG.debug("Generating drop event for task {}", taskInformation.getInboundMessageId());
LOG.info("Generating drop event for task {}", taskInformation.getInboundMessageId());
consumerQueue.add(new ConsumerDropEvent(Long.parseLong(taskInformation.getInboundMessageId())));
}

Expand All @@ -280,7 +280,7 @@ public void discardTask(TaskInformation taskInformation)
public void acknowledgeTask(TaskInformation taskInformation)
{
Objects.requireNonNull(taskInformation);
LOG.debug("Generating acknowledge event for task {}", taskInformation.getInboundMessageId());
LOG.info("Generating acknowledge event for task {}", taskInformation.getInboundMessageId());
consumerQueue.add(new ConsumerAckEvent(Long.parseLong(taskInformation.getInboundMessageId())));
}

Expand Down Expand Up @@ -314,7 +314,7 @@ public String getPausedQueue()
@Override
public void shutdownIncoming()
{
LOG.debug("Closing incoming queues");
LOG.info("Closing incoming queues");
synchronized (consumerLock) {
if (consumerTag != null) {
try {
Expand Down Expand Up @@ -343,7 +343,7 @@ public void shutdownIncoming()
@Override
public void disconnectIncoming()
{
LOG.debug("Disconnecting incoming queues");
LOG.info("Disconnecting incoming queues");
synchronized (consumerLock) {
if (consumerTag != null && incomingChannel.isOpen()) {
try {
Expand Down Expand Up @@ -372,7 +372,7 @@ public void disconnectIncoming()
@Override
public void reconnectIncoming()
{
LOG.debug("Reconnecting incoming queues");
LOG.info("Reconnecting incoming queues");
synchronized (consumerLock) {
if (consumerTag == null && incomingChannel.isOpen()) {
try {
Expand All @@ -387,7 +387,7 @@ public void reconnectIncoming()
@Override
public void shutdown()
{
LOG.debug("Shutting down");
LOG.info("Shutting down");
try {
if (consumer != null) {
consumer.shutdown();
Expand Down
Loading