Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions Client.Examples/BackgroundServiceExample.cs.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
## Using Job Worker In BackgroundService Example

This example showcases how Zeebe job worker can be used in ASP.NET BackgroundService with asynchronous job handler method.

It uses the job handler delegate variant that supports injection of the job worker cancellation token.

It also passes the BackroundService stopping token to the job worker, so that cancellation of the background service is propagated to the worker.

```csharp
using Microsoft.Extensions.Hosting;
using Zeebe.Client;
using Zeebe.Client.Api.Responses;
using Zeebe.Client.Api.Worker;

namespace MyWebApplication;

public class ZeebeBackgroundService : BackgroundService
{
private readonly IZeebeClient client;

public ZeebeBackgroundService(IZeebeClient client)
{
this.client = client;
}

protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
this.client.NewWorker()
.JobType("My-Job")
.Handler(HandleJobAsync)
.MaxJobsActive(5)
.Name(Environment.MachineName)
.AutoCompletion()
.PollInterval(TimeSpan.FromSeconds(1))
.Timeout(TimeSpan.FromSeconds(10))
.PollingTimeout(TimeSpan.FromSeconds(30))
.Open(stoppingToken); // Passes the stopping token to the worker to gracefully cancel it in case of background service cancellation.

return Task.CompletedTask;
}

private static async Task HandleJobAsync(IJobClient jobClient, IJob job, CancellationToken cancellationToken)
{
await Task.Delay(TimeSpan.FromSeconds(5), cancellationToken);

await jobClient
.NewCompleteJobCommand(job)
.Send(cancellationToken);
}
}
```
64 changes: 61 additions & 3 deletions Client.UnitTests/JobWorkerTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.

using GatewayProtocol;
using NLog;
using NUnit.Framework;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using GatewayProtocol;
using NLog;
using NUnit.Framework;
using System.Threading.Tasks;
using Zeebe.Client.Api.Responses;

namespace Zeebe.Client;
Expand Down Expand Up @@ -706,6 +708,62 @@ public void ShouldUseAutoCompleteWithWorker()
Assert.Contains(3, completeJobRequests);
}

[Test]
public void ShouldCancelAllJobsWhenHostIsCancelled()
{
// given
TestService.AddRequestHandler(typeof(ActivateJobsRequest), request => CreateExpectedResponse());
using var stoppingCts = new CancellationTokenSource();

// when
var startedJobs = new ConcurrentBag<IJob>();
var cancelledJobs = new ConcurrentBag<IJob>();
var completedJobs = new ConcurrentBag<IJob>();
using var jobWorker = ZeebeClient.NewWorker()
.JobType("foo")
.Handler(async (jobClient, job, cancellationToken) =>
{
startedJobs.Add(job);
try
{
await Task.Delay(TimeSpan.FromSeconds(5), cancellationToken);
}
catch (TaskCanceledException)
{
cancelledJobs.Add(job);
}
finally
{
completedJobs.Add(job);
}
})
.AutoCompletion()
.MaxJobsActive(3)
.Name("jobWorker")
.Timeout(TimeSpan.FromSeconds(123))
.PollInterval(TimeSpan.FromSeconds(10))
.PollingTimeout(TimeSpan.FromSeconds(10))
.HandlerThreads(3)
.Open(stoppingCts.Token);

Assert.True(jobWorker.IsOpen());

while (startedJobs.Count < 3)
{
}

stoppingCts.Cancel();

while (completedJobs.Count < 3)
{
}

// then
Assert.AreEqual(3, startedJobs.Count);
Assert.AreEqual(3, cancelledJobs.Count);
Assert.AreEqual(3, completedJobs.Count);
}

public static ActivateJobsResponse CreateExpectedResponse()
{
return new ActivateJobsResponse
Expand Down
40 changes: 39 additions & 1 deletion Client/Api/Worker/IJobWorkerBuilderStep1.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Zeebe.Client.Api.Commands;
using Zeebe.Client.Api.Responses;
Expand Down Expand Up @@ -46,6 +47,15 @@ public interface IJobWorkerBuilderStep1
/// <returns>A <see cref="Task" /> representing the asynchronous operation.</returns>
public delegate Task AsyncJobHandler(IJobClient client, IJob activatedJob);

/// <summary>
/// The asynchronous job handler which contains the business logic.
/// </summary>
/// <param name="client">the job client to complete or fail the job.</param>
/// <param name="activatedJob">the job, which was activated by the worker.</param>
/// <param name="cancellationToken">the token to cancel the job handler.</param>
/// <returns>A <see cref="Task" /> representing the asynchronous operation.</returns>
public delegate Task AsyncJobHandlerWithCancellationToken(IJobClient client, IJob activatedJob, CancellationToken cancellationToken);

public interface IJobWorkerBuilderStep2
{
/// <summary>
Expand Down Expand Up @@ -99,6 +109,33 @@ public interface IJobWorkerBuilderStep2
/// <param name="handler">the handle to process the jobs.</param>
/// <returns>the builder for this worker.</returns>
IJobWorkerBuilderStep3 Handler(AsyncJobHandler handler);

/// <summary>
/// Set an async handler to process the jobs asynchronously. At the end of the processing, the handler can
/// complete the job or mark it as failed.
/// This version of the handler supports a <see cref="CancellationToken"/> that can be used in underliying tasks.
/// </summary>
/// <example>
/// <para>
/// Example JobHandler implementation:
/// </para>
/// <code>
/// var handler = async (client, job, cancellationToken) =>
/// {
/// String json = job.Variables;
/// // modify variables
///
/// await client
/// .CompleteCommand(job.Key)
/// .Variables(json)
/// .Send(cancellationToken);
/// };
/// </code>
/// </example>
/// The handler must be thread-safe.
/// <param name="handler">the handle to process the jobs.</param>
/// <returns>the builder for this worker.</returns>
IJobWorkerBuilderStep3 Handler(AsyncJobHandlerWithCancellationToken handler);
}

public interface IJobWorkerBuilderStep3 : ITenantIdsCommandStep<IJobWorkerBuilderStep3>
Expand Down Expand Up @@ -247,6 +284,7 @@ public interface IJobWorkerBuilderStep3 : ITenantIdsCommandStep<IJobWorkerBuilde
/// <summary>
/// Open the worker and start to work on available tasks.
/// </summary>
/// <param name="cancellationToken">the token to cancel the job worker.</param>
/// <returns>the worker.</returns>
IJobWorker Open();
IJobWorker Open(CancellationToken cancellationToken = default);
}
56 changes: 34 additions & 22 deletions Client/Impl/Worker/JobWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,14 @@ public sealed class JobWorker : IJobWorker
private readonly StreamActivatedJobsRequest streamActivateJobsRequest;
private readonly bool autoCompletion;
private readonly JobActivator jobActivator;
private readonly AsyncJobHandler jobHandler;
private readonly AsyncJobHandlerWithCancellationToken jobHandler;
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ChrisKujawa this change is to fix a problem that I was not able to predict. To prevent breaking change, my first idea was to just set the cancellationToken optional in AsyncJobHandler. But for a reason that I don't understand for now, all usages like:

ZeebeClient.NewWorker()
     .JobType("foo")
     .Handler(async (jobClient, job) =>
     {
        await SomethingAsync();
     })

were considered as a synchronous JobHandler, event if the Func return a Task. This was the root cause of test fail.

private readonly JobWorkerBuilder jobWorkerBuilder;
private readonly ILogger<JobWorker> logger;
private readonly int maxJobsActive;
private readonly TimeSpan pollInterval;

private readonly CancellationTokenSource source;
private readonly CancellationTokenSource localCts;
private CancellationTokenSource? linkedCts;
private readonly double thresholdJobsActivation;

private int currentJobsActive;
Expand All @@ -54,7 +55,7 @@ public sealed class JobWorker : IJobWorker
internal JobWorker(JobWorkerBuilder builder)
{
jobWorkerBuilder = builder;
source = new CancellationTokenSource();
localCts = new CancellationTokenSource();
logger = builder.LoggerFactory?.CreateLogger<JobWorker>();
jobHandler = jobWorkerBuilder.Handler();
autoCompletion = builder.AutoCompletionEnabled();
Expand All @@ -69,13 +70,15 @@ internal JobWorker(JobWorkerBuilder builder)
/// <inheritdoc />
public void Dispose()
{
source.Cancel();
localCts.Cancel();
// delay disposing, since poll and handler take some time to close
_ = Task.Delay(TimeSpan.FromMilliseconds(pollInterval.TotalMilliseconds * 2))
.ContinueWith(t =>
{
logger?.LogError("Dispose source");
source.Dispose();
localCts.Dispose();
var currentLinkedCts = Interlocked.Exchange(ref this.linkedCts, null);
currentLinkedCts?.Dispose();
});
isRunning = false;
}
Expand All @@ -96,16 +99,21 @@ public bool IsClosed()
/// Opens the configured JobWorker to activate jobs in the given poll interval
/// and handle with the given handler.
/// </summary>
internal void Open()
/// <param name="cancellationToken">The host cancellation token.</param>
internal void Open(CancellationToken cancellationToken)
{
isRunning = true;
var cancellationToken = source.Token;
var bufferOptions = CreateBufferOptions(cancellationToken);
var executionOptions = CreateExecutionOptions(cancellationToken);

var newLinkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, localCts.Token);
var oldLinkedCts = Interlocked.Exchange(ref this.linkedCts, newLinkedCts);
oldLinkedCts?.Dispose();
var linkedCancellationToken = newLinkedCts.Token;
var bufferOptions = CreateBufferOptions(linkedCancellationToken);
var executionOptions = CreateExecutionOptions(linkedCancellationToken);

var input = new BufferBlock<IJob>(bufferOptions);
var transformer = new TransformBlock<IJob, IJob>(
async activatedJob => await HandleActivatedJob(activatedJob, cancellationToken),
async activatedJob => await HandleActivatedJob(activatedJob, linkedCancellationToken),
executionOptions);
var output = new ActionBlock<IJob>(activatedJob => { _ = Interlocked.Decrement(ref currentJobsActive); },
executionOptions);
Expand All @@ -115,15 +123,15 @@ internal void Open()

if (jobWorkerBuilder.GrpcStreamEnabled)
{
_ = Task.Run(async () => await StreamJobs(input, cancellationToken),
cancellationToken).ContinueWith(
_ = Task.Run(async () => await StreamJobs(input, linkedCancellationToken),
linkedCancellationToken).ContinueWith(
t => logger?.LogError(t.Exception, "Job stream failed."),
TaskContinuationOptions.OnlyOnFaulted);
}

// Start polling
_ = Task.Run(async () => await PollJobs(input, cancellationToken),
cancellationToken).ContinueWith(
_ = Task.Run(async () => await PollJobs(input, linkedCancellationToken),
linkedCancellationToken).ContinueWith(
t => logger?.LogError(t.Exception, "Job polling failed."),
TaskContinuationOptions.OnlyOnFaulted);

Expand All @@ -135,7 +143,7 @@ internal void Open()

private async Task StreamJobs(ITargetBlock<IJob> input, CancellationToken cancellationToken)
{
while (!source.IsCancellationRequested)
while (!cancellationToken.IsCancellationRequested)
{
try
{
Expand All @@ -151,7 +159,7 @@ private async Task StreamJobs(ITargetBlock<IJob> input, CancellationToken cancel
activateJobsRequest.Worker,
activateJobsRequest.Type);

while (!source.IsCancellationRequested)
while (!cancellationToken.IsCancellationRequested)
{
var currentJobs = Thread.VolatileRead(ref currentJobsActive);

Expand All @@ -178,7 +186,7 @@ private async Task StreamJobs(ITargetBlock<IJob> input, CancellationToken cancel
grpcActivatedJob.Worker,
grpcActivatedJob.Key);

await HandleActivationResponse(input, response, null);
await HandleActivationResponse(input, response, null, cancellationToken);
}
}
catch (RpcException rpcException)
Expand Down Expand Up @@ -210,7 +218,7 @@ private static DataflowBlockOptions CreateBufferOptions(CancellationToken cancel

private async Task PollJobs(ITargetBlock<IJob> input, CancellationToken cancellationToken)
{
while (!source.IsCancellationRequested)
while (!cancellationToken.IsCancellationRequested)
{
var currentJobs = Thread.VolatileRead(ref currentJobsActive);
if (currentJobs < thresholdJobsActivation)
Expand All @@ -221,7 +229,7 @@ private async Task PollJobs(ITargetBlock<IJob> input, CancellationToken cancella
try
{
await jobActivator.SendActivateRequest(activateJobsRequest,
async jobsResponse => await HandleActivationResponse(input, jobsResponse, jobCount),
async jobsResponse => await HandleActivationResponse(input, jobsResponse, jobCount, cancellationToken),
null,
cancellationToken);
}
Expand All @@ -238,7 +246,11 @@ await jobActivator.SendActivateRequest(activateJobsRequest,
}
}

private async Task HandleActivationResponse(ITargetBlock<IJob> input, IActivateJobsResponse response, int? jobCount)
private async Task HandleActivationResponse(
ITargetBlock<IJob> input,
IActivateJobsResponse response,
int? jobCount,
CancellationToken cancellationToken)
{
if (jobCount.HasValue)
{
Expand All @@ -258,7 +270,7 @@ private async Task HandleActivationResponse(ITargetBlock<IJob> input, IActivateJ

foreach (var job in response.Jobs)
{
_ = await input.SendAsync(job);
_ = await input.SendAsync(job, cancellationToken);
_ = Interlocked.Increment(ref currentJobsActive);
}
}
Expand All @@ -269,7 +281,7 @@ private async Task<IJob> HandleActivatedJob(IJob activatedJob, CancellationToken

try
{
await jobHandler(jobClient, activatedJob);
await jobHandler(jobClient, activatedJob, cancellationToken);
await TryToAutoCompleteJob(jobClient, activatedJob, cancellationToken);
}
catch (Exception exception)
Expand Down
Loading