From 20964a69020f11ce5099333b55545b725826770c Mon Sep 17 00:00:00 2001 From: Olivier Freund Date: Sat, 29 Nov 2025 13:53:41 +0100 Subject: [PATCH 1/5] feat: Add CancellationToken to job worker and handler Allows passing a CancellationToken to JobWorker Open() method to inject a host cancellationToken and expose token to AsyncJobHandler delegate. Refs: #519 --- Client/Api/Worker/IJobWorkerBuilderStep1.cs | 7 ++-- Client/Impl/Worker/JobWorker.cs | 37 +++++++++++++-------- Client/Impl/Worker/JobWorkerBuilder.cs | 7 ++-- 3 files changed, 32 insertions(+), 19 deletions(-) diff --git a/Client/Api/Worker/IJobWorkerBuilderStep1.cs b/Client/Api/Worker/IJobWorkerBuilderStep1.cs index 61607c68..a554b882 100644 --- a/Client/Api/Worker/IJobWorkerBuilderStep1.cs +++ b/Client/Api/Worker/IJobWorkerBuilderStep1.cs @@ -15,6 +15,7 @@ using System; using System.Collections.Generic; +using System.Threading; using System.Threading.Tasks; using Zeebe.Client.Api.Commands; using Zeebe.Client.Api.Responses; @@ -43,8 +44,9 @@ public interface IJobWorkerBuilderStep1 /// /// the job client to complete or fail the job. /// the job, which was activated by the worker. +/// the token to cancel the job handler. /// A representing the asynchronous operation. -public delegate Task AsyncJobHandler(IJobClient client, IJob activatedJob); +public delegate Task AsyncJobHandler(IJobClient client, IJob activatedJob, CancellationToken cancellationToken = default); public interface IJobWorkerBuilderStep2 { @@ -247,6 +249,7 @@ public interface IJobWorkerBuilderStep3 : ITenantIdsCommandStep /// Open the worker and start to work on available tasks. /// + /// the token to cancel the job worker. /// the worker. - IJobWorker Open(); + IJobWorker Open(CancellationToken cancellationToken = default); } diff --git a/Client/Impl/Worker/JobWorker.cs b/Client/Impl/Worker/JobWorker.cs index ce5acf0f..812c4e94 100644 --- a/Client/Impl/Worker/JobWorker.cs +++ b/Client/Impl/Worker/JobWorker.cs @@ -45,7 +45,8 @@ public sealed class JobWorker : IJobWorker private readonly int maxJobsActive; private readonly TimeSpan pollInterval; - private readonly CancellationTokenSource source; + private readonly CancellationTokenSource localCts; + private CancellationTokenSource? linkedCts; private readonly double thresholdJobsActivation; private int currentJobsActive; @@ -54,7 +55,7 @@ public sealed class JobWorker : IJobWorker internal JobWorker(JobWorkerBuilder builder) { jobWorkerBuilder = builder; - source = new CancellationTokenSource(); + localCts = new CancellationTokenSource(); logger = builder.LoggerFactory?.CreateLogger(); jobHandler = jobWorkerBuilder.Handler(); autoCompletion = builder.AutoCompletionEnabled(); @@ -69,13 +70,15 @@ internal JobWorker(JobWorkerBuilder builder) /// public void Dispose() { - source.Cancel(); + localCts.Cancel(); // delay disposing, since poll and handler take some time to close _ = Task.Delay(TimeSpan.FromMilliseconds(pollInterval.TotalMilliseconds * 2)) .ContinueWith(t => { logger?.LogError("Dispose source"); - source.Dispose(); + localCts.Dispose(); + linkedCts?.Dispose(); + linkedCts = null; }); isRunning = false; } @@ -96,10 +99,12 @@ public bool IsClosed() /// Opens the configured JobWorker to activate jobs in the given poll interval /// and handle with the given handler. /// - internal void Open() + /// The host cancellation token. + internal void Open(CancellationToken stoppingToken) { isRunning = true; - var cancellationToken = source.Token; + this.linkedCts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken, localCts.Token); + var cancellationToken = linkedCts.Token; var bufferOptions = CreateBufferOptions(cancellationToken); var executionOptions = CreateExecutionOptions(cancellationToken); @@ -135,7 +140,7 @@ internal void Open() private async Task StreamJobs(ITargetBlock input, CancellationToken cancellationToken) { - while (!source.IsCancellationRequested) + while (!cancellationToken.IsCancellationRequested) { try { @@ -151,7 +156,7 @@ private async Task StreamJobs(ITargetBlock input, CancellationToken cancel activateJobsRequest.Worker, activateJobsRequest.Type); - while (!source.IsCancellationRequested) + while (!cancellationToken.IsCancellationRequested) { var currentJobs = Thread.VolatileRead(ref currentJobsActive); @@ -178,7 +183,7 @@ private async Task StreamJobs(ITargetBlock input, CancellationToken cancel grpcActivatedJob.Worker, grpcActivatedJob.Key); - await HandleActivationResponse(input, response, null); + await HandleActivationResponse(input, response, null, cancellationToken); } } catch (RpcException rpcException) @@ -210,7 +215,7 @@ private static DataflowBlockOptions CreateBufferOptions(CancellationToken cancel private async Task PollJobs(ITargetBlock input, CancellationToken cancellationToken) { - while (!source.IsCancellationRequested) + while (!cancellationToken.IsCancellationRequested) { var currentJobs = Thread.VolatileRead(ref currentJobsActive); if (currentJobs < thresholdJobsActivation) @@ -221,7 +226,7 @@ private async Task PollJobs(ITargetBlock input, CancellationToken cancella try { await jobActivator.SendActivateRequest(activateJobsRequest, - async jobsResponse => await HandleActivationResponse(input, jobsResponse, jobCount), + async jobsResponse => await HandleActivationResponse(input, jobsResponse, jobCount, cancellationToken), null, cancellationToken); } @@ -238,7 +243,11 @@ await jobActivator.SendActivateRequest(activateJobsRequest, } } - private async Task HandleActivationResponse(ITargetBlock input, IActivateJobsResponse response, int? jobCount) + private async Task HandleActivationResponse( + ITargetBlock input, + IActivateJobsResponse response, + int? jobCount, + CancellationToken cancellationToken) { if (jobCount.HasValue) { @@ -258,7 +267,7 @@ private async Task HandleActivationResponse(ITargetBlock input, IActivateJ foreach (var job in response.Jobs) { - _ = await input.SendAsync(job); + _ = await input.SendAsync(job, cancellationToken); _ = Interlocked.Increment(ref currentJobsActive); } } @@ -269,7 +278,7 @@ private async Task HandleActivatedJob(IJob activatedJob, CancellationToken try { - await jobHandler(jobClient, activatedJob); + await jobHandler(jobClient, activatedJob, cancellationToken); await TryToAutoCompleteJob(jobClient, activatedJob, cancellationToken); } catch (Exception exception) diff --git a/Client/Impl/Worker/JobWorkerBuilder.cs b/Client/Impl/Worker/JobWorkerBuilder.cs index 831a4646..98209911 100644 --- a/Client/Impl/Worker/JobWorkerBuilder.cs +++ b/Client/Impl/Worker/JobWorkerBuilder.cs @@ -16,6 +16,7 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Threading; using System.Threading.Tasks; using GatewayProtocol; using Microsoft.Extensions.Logging; @@ -50,7 +51,7 @@ public IJobWorkerBuilderStep2 JobType(string type) public IJobWorkerBuilderStep3 Handler(JobHandler handler) { - asyncJobHandler = (c, j) => Task.Run(() => handler.Invoke(c, j)); + asyncJobHandler = (c, j, cts) => Task.Run(() => handler.Invoke(c, j), cts); return this; } @@ -147,11 +148,11 @@ public IJobWorkerBuilderStep3 HandlerThreads(byte threadCount) return this; } - public IJobWorker Open() + public IJobWorker Open(CancellationToken cancellationToken = default) { var worker = new JobWorker(this); - worker.Open(); + worker.Open(cancellationToken); return worker; } From 6f4988aa3c89f249f3fd848828cf8be9dce43d6d Mon Sep 17 00:00:00 2001 From: Olivier Freund Date: Fri, 5 Dec 2025 14:51:34 +0100 Subject: [PATCH 2/5] Fix issue where AsyncJobHandler are detected as JobHandler if the cancellation token is not provided. Introduces AsyncJobHandlerWithCancellationToken to prevent breaking change. --- Client/Api/Worker/IJobWorkerBuilderStep1.cs | 37 ++++++++++++++++++++- Client/Impl/Worker/JobWorker.cs | 2 +- Client/Impl/Worker/JobWorkerBuilder.cs | 10 ++++-- 3 files changed, 45 insertions(+), 4 deletions(-) diff --git a/Client/Api/Worker/IJobWorkerBuilderStep1.cs b/Client/Api/Worker/IJobWorkerBuilderStep1.cs index a554b882..e6859beb 100644 --- a/Client/Api/Worker/IJobWorkerBuilderStep1.cs +++ b/Client/Api/Worker/IJobWorkerBuilderStep1.cs @@ -39,6 +39,14 @@ public interface IJobWorkerBuilderStep1 /// the job, which was activated by the worker. public delegate void JobHandler(IJobClient client, IJob activatedJob); +/// +/// The asynchronous job handler which contains the business logic. +/// +/// the job client to complete or fail the job. +/// the job, which was activated by the worker. +/// A representing the asynchronous operation. +public delegate Task AsyncJobHandler(IJobClient client, IJob activatedJob); + /// /// The asynchronous job handler which contains the business logic. /// @@ -46,7 +54,7 @@ public interface IJobWorkerBuilderStep1 /// the job, which was activated by the worker. /// the token to cancel the job handler. /// A representing the asynchronous operation. -public delegate Task AsyncJobHandler(IJobClient client, IJob activatedJob, CancellationToken cancellationToken = default); +public delegate Task AsyncJobHandlerWithCancellationToken(IJobClient client, IJob activatedJob, CancellationToken cancellationToken); public interface IJobWorkerBuilderStep2 { @@ -101,6 +109,33 @@ public interface IJobWorkerBuilderStep2 /// the handle to process the jobs. /// the builder for this worker. IJobWorkerBuilderStep3 Handler(AsyncJobHandler handler); + + /// + /// Set an async handler to process the jobs asynchronously. At the end of the processing, the handler can + /// complete the job or mark it as failed. + /// This version of the handler supports a that can be used in underliying tasks. + /// + /// + /// + /// Example JobHandler implementation: + /// + /// + /// var handler = async (client, job, cancellationToken) => + /// { + /// String json = job.Variables; + /// // modify variables + /// + /// await client + /// .CompleteCommand(job.Key) + /// .Variables(json) + /// .Send(cancellationToken); + /// }; + /// + /// + /// The handler must be thread-safe. + /// the handle to process the jobs. + /// the builder for this worker. + IJobWorkerBuilderStep3 Handler(AsyncJobHandlerWithCancellationToken handler); } public interface IJobWorkerBuilderStep3 : ITenantIdsCommandStep diff --git a/Client/Impl/Worker/JobWorker.cs b/Client/Impl/Worker/JobWorker.cs index 812c4e94..d7b6d55d 100644 --- a/Client/Impl/Worker/JobWorker.cs +++ b/Client/Impl/Worker/JobWorker.cs @@ -39,7 +39,7 @@ public sealed class JobWorker : IJobWorker private readonly StreamActivatedJobsRequest streamActivateJobsRequest; private readonly bool autoCompletion; private readonly JobActivator jobActivator; - private readonly AsyncJobHandler jobHandler; + private readonly AsyncJobHandlerWithCancellationToken jobHandler; private readonly JobWorkerBuilder jobWorkerBuilder; private readonly ILogger logger; private readonly int maxJobsActive; diff --git a/Client/Impl/Worker/JobWorkerBuilder.cs b/Client/Impl/Worker/JobWorkerBuilder.cs index 98209911..f09641d4 100644 --- a/Client/Impl/Worker/JobWorkerBuilder.cs +++ b/Client/Impl/Worker/JobWorkerBuilder.cs @@ -31,7 +31,7 @@ public class JobWorkerBuilder( ILoggerFactory loggerFactory = null) : IJobWorkerBuilderStep1, IJobWorkerBuilderStep2, IJobWorkerBuilderStep3 { - private AsyncJobHandler asyncJobHandler; + private AsyncJobHandlerWithCancellationToken asyncJobHandler; private bool autoCompletion; private TimeSpan pollInterval; public bool GrpcStreamEnabled { get; private set; } @@ -56,6 +56,12 @@ public IJobWorkerBuilderStep3 Handler(JobHandler handler) } public IJobWorkerBuilderStep3 Handler(AsyncJobHandler handler) + { + asyncJobHandler = (c, j, cts) => handler(c, j); + return this; + } + + public IJobWorkerBuilderStep3 Handler(AsyncJobHandlerWithCancellationToken handler) { asyncJobHandler = handler; return this; @@ -157,7 +163,7 @@ public IJobWorker Open(CancellationToken cancellationToken = default) return worker; } - internal AsyncJobHandler Handler() + internal AsyncJobHandlerWithCancellationToken Handler() { return asyncJobHandler; } From 4b79c3cc1a66e86302da86dc28883d6355fc89a2 Mon Sep 17 00:00:00 2001 From: Olivier Freund Date: Fri, 5 Dec 2025 15:06:35 +0100 Subject: [PATCH 3/5] Add sample in Client.Examples --- .../BackgroundServiceExample.cs.md | 51 +++++++++++++++++++ 1 file changed, 51 insertions(+) create mode 100644 Client.Examples/BackgroundServiceExample.cs.md diff --git a/Client.Examples/BackgroundServiceExample.cs.md b/Client.Examples/BackgroundServiceExample.cs.md new file mode 100644 index 00000000..904cae97 --- /dev/null +++ b/Client.Examples/BackgroundServiceExample.cs.md @@ -0,0 +1,51 @@ +## Using Job Worker In BackgroundService Example + +This example showcases how Zeebe job worker can be used in ASP.NET BackgroundService with asynchronous job handler method. + +It uses the job handler delegate variant that supports injection of the job worker cancellation token. + +It also passes the BackroundService stopping token to the job worker, so that cancellation of the background service is propagated to the worker. + +```csharp +using Microsoft.Extensions.Hosting; +using Zeebe.Client; +using Zeebe.Client.Api.Responses; +using Zeebe.Client.Api.Worker; + +namespace MyWebApplication; + +public class ZeebeBackgroundService : BackgroundService +{ + private readonly IZeebeClient client; + + public ZeebeBackgroundService(IZeebeClient client) + { + this.client = client; + } + + protected override Task ExecuteAsync(CancellationToken stoppingToken) + { + this.client.NewWorker() + .JobType("My-Job") + .Handler(HandleJobAsync) + .MaxJobsActive(5) + .Name(Environment.MachineName) + .AutoCompletion() + .PollInterval(TimeSpan.FromSeconds(1)) + .Timeout(TimeSpan.FromSeconds(10)) + .PollingTimeout(TimeSpan.FromSeconds(30)) + .Open(stoppingToken); // Passes the stopping token to the worker to gracefully cancel it in case of background service cancellation. + + return Task.CompletedTask; + } + + private static async Task HandleJobAsync(IJobClient jobClient, IJob job, CancellationToken cancellationToken) + { + await Task.Delay(TimeSpan.FromSeconds(5), cancellationToken); + + await jobClient + .NewCompleteJobCommand(job) + .Send(cancellationToken); + } +} +``` \ No newline at end of file From bbc94b1cbe8ffed9e50e21320fba5bfb27bb17ce Mon Sep 17 00:00:00 2001 From: Olivier Freund Date: Fri, 5 Dec 2025 17:31:56 +0100 Subject: [PATCH 4/5] Add unit test for cancellation --- Client.UnitTests/JobWorkerTest.cs | 64 +++++++++++++++++++++++++++++-- 1 file changed, 61 insertions(+), 3 deletions(-) diff --git a/Client.UnitTests/JobWorkerTest.cs b/Client.UnitTests/JobWorkerTest.cs index 4ef86ce3..bc5dbf76 100644 --- a/Client.UnitTests/JobWorkerTest.cs +++ b/Client.UnitTests/JobWorkerTest.cs @@ -12,14 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. +using GatewayProtocol; +using NLog; +using NUnit.Framework; using System; using System.Collections.Concurrent; using System.Collections.Generic; +using System.Diagnostics; using System.Linq; using System.Threading; -using GatewayProtocol; -using NLog; -using NUnit.Framework; +using System.Threading.Tasks; using Zeebe.Client.Api.Responses; namespace Zeebe.Client; @@ -706,6 +708,62 @@ public void ShouldUseAutoCompleteWithWorker() Assert.Contains(3, completeJobRequests); } + [Test] + public void ShouldCancelAllJobsWhenHostIsCancelled() + { + // given + TestService.AddRequestHandler(typeof(ActivateJobsRequest), request => CreateExpectedResponse()); + using var stoppingCts = new CancellationTokenSource(); + + // when + var startedJobs = new ConcurrentBag(); + var cancelledJobs = new ConcurrentBag(); + var completedJobs = new ConcurrentBag(); + using var jobWorker = ZeebeClient.NewWorker() + .JobType("foo") + .Handler(async (jobClient, job, cancellationToken) => + { + startedJobs.Add(job); + try + { + await Task.Delay(TimeSpan.FromSeconds(5), cancellationToken); + } + catch (TaskCanceledException) + { + cancelledJobs.Add(job); + } + finally + { + completedJobs.Add(job); + } + }) + .AutoCompletion() + .MaxJobsActive(3) + .Name("jobWorker") + .Timeout(TimeSpan.FromSeconds(123)) + .PollInterval(TimeSpan.FromSeconds(10)) + .PollingTimeout(TimeSpan.FromSeconds(10)) + .HandlerThreads(3) + .Open(stoppingCts.Token); + + Assert.True(jobWorker.IsOpen()); + + while (startedJobs.Count < 3) + { + } + + stoppingCts.Cancel(); + + while (completedJobs.Count < 3) + { + } + + // then + Assert.AreEqual(3, startedJobs.Count); + Assert.AreEqual(3, cancelledJobs.Count); + Assert.AreEqual(3, completedJobs.Count); + } + public static ActivateJobsResponse CreateExpectedResponse() { return new ActivateJobsResponse From 0f2fe6e0f68756a923fc96a81c3b0450661f0ca8 Mon Sep 17 00:00:00 2001 From: Olivier Freund Date: Fri, 5 Dec 2025 17:55:25 +0100 Subject: [PATCH 5/5] Fix copilot comments --- Client/Impl/Worker/JobWorker.cs | 29 ++++++++++++++------------ Client/Impl/Worker/JobWorkerBuilder.cs | 4 ++-- 2 files changed, 18 insertions(+), 15 deletions(-) diff --git a/Client/Impl/Worker/JobWorker.cs b/Client/Impl/Worker/JobWorker.cs index d7b6d55d..a07a40cc 100644 --- a/Client/Impl/Worker/JobWorker.cs +++ b/Client/Impl/Worker/JobWorker.cs @@ -77,8 +77,8 @@ public void Dispose() { logger?.LogError("Dispose source"); localCts.Dispose(); - linkedCts?.Dispose(); - linkedCts = null; + var currentLinkedCts = Interlocked.Exchange(ref this.linkedCts, null); + currentLinkedCts?.Dispose(); }); isRunning = false; } @@ -99,18 +99,21 @@ public bool IsClosed() /// Opens the configured JobWorker to activate jobs in the given poll interval /// and handle with the given handler. /// - /// The host cancellation token. - internal void Open(CancellationToken stoppingToken) + /// The host cancellation token. + internal void Open(CancellationToken cancellationToken) { isRunning = true; - this.linkedCts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken, localCts.Token); - var cancellationToken = linkedCts.Token; - var bufferOptions = CreateBufferOptions(cancellationToken); - var executionOptions = CreateExecutionOptions(cancellationToken); + + var newLinkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, localCts.Token); + var oldLinkedCts = Interlocked.Exchange(ref this.linkedCts, newLinkedCts); + oldLinkedCts?.Dispose(); + var linkedCancellationToken = newLinkedCts.Token; + var bufferOptions = CreateBufferOptions(linkedCancellationToken); + var executionOptions = CreateExecutionOptions(linkedCancellationToken); var input = new BufferBlock(bufferOptions); var transformer = new TransformBlock( - async activatedJob => await HandleActivatedJob(activatedJob, cancellationToken), + async activatedJob => await HandleActivatedJob(activatedJob, linkedCancellationToken), executionOptions); var output = new ActionBlock(activatedJob => { _ = Interlocked.Decrement(ref currentJobsActive); }, executionOptions); @@ -120,15 +123,15 @@ internal void Open(CancellationToken stoppingToken) if (jobWorkerBuilder.GrpcStreamEnabled) { - _ = Task.Run(async () => await StreamJobs(input, cancellationToken), - cancellationToken).ContinueWith( + _ = Task.Run(async () => await StreamJobs(input, linkedCancellationToken), + linkedCancellationToken).ContinueWith( t => logger?.LogError(t.Exception, "Job stream failed."), TaskContinuationOptions.OnlyOnFaulted); } // Start polling - _ = Task.Run(async () => await PollJobs(input, cancellationToken), - cancellationToken).ContinueWith( + _ = Task.Run(async () => await PollJobs(input, linkedCancellationToken), + linkedCancellationToken).ContinueWith( t => logger?.LogError(t.Exception, "Job polling failed."), TaskContinuationOptions.OnlyOnFaulted); diff --git a/Client/Impl/Worker/JobWorkerBuilder.cs b/Client/Impl/Worker/JobWorkerBuilder.cs index f09641d4..92bde0a0 100644 --- a/Client/Impl/Worker/JobWorkerBuilder.cs +++ b/Client/Impl/Worker/JobWorkerBuilder.cs @@ -51,13 +51,13 @@ public IJobWorkerBuilderStep2 JobType(string type) public IJobWorkerBuilderStep3 Handler(JobHandler handler) { - asyncJobHandler = (c, j, cts) => Task.Run(() => handler.Invoke(c, j), cts); + asyncJobHandler = (jobClient, job, cancellationToken) => Task.Run(() => handler.Invoke(jobClient, job), cancellationToken); return this; } public IJobWorkerBuilderStep3 Handler(AsyncJobHandler handler) { - asyncJobHandler = (c, j, cts) => handler(c, j); + asyncJobHandler = (jobClient, job, cancellationToken) => handler(jobClient, job); return this; }