Skip to content

Commit 20964a6

Browse files
committed
feat: Add CancellationToken to job worker and handler
Allows passing a CancellationToken to JobWorker Open() method to inject a host cancellationToken and expose token to AsyncJobHandler delegate. Refs: #519
1 parent fc22de5 commit 20964a6

File tree

3 files changed

+32
-19
lines changed

3 files changed

+32
-19
lines changed

Client/Api/Worker/IJobWorkerBuilderStep1.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
using System;
1717
using System.Collections.Generic;
18+
using System.Threading;
1819
using System.Threading.Tasks;
1920
using Zeebe.Client.Api.Commands;
2021
using Zeebe.Client.Api.Responses;
@@ -43,8 +44,9 @@ public interface IJobWorkerBuilderStep1
4344
/// </summary>
4445
/// <param name="client">the job client to complete or fail the job.</param>
4546
/// <param name="activatedJob">the job, which was activated by the worker.</param>
47+
/// <param name="cancellationToken">the token to cancel the job handler.</param>
4648
/// <returns>A <see cref="Task" /> representing the asynchronous operation.</returns>
47-
public delegate Task AsyncJobHandler(IJobClient client, IJob activatedJob);
49+
public delegate Task AsyncJobHandler(IJobClient client, IJob activatedJob, CancellationToken cancellationToken = default);
4850

4951
public interface IJobWorkerBuilderStep2
5052
{
@@ -247,6 +249,7 @@ public interface IJobWorkerBuilderStep3 : ITenantIdsCommandStep<IJobWorkerBuilde
247249
/// <summary>
248250
/// Open the worker and start to work on available tasks.
249251
/// </summary>
252+
/// <param name="cancellationToken">the token to cancel the job worker.</param>
250253
/// <returns>the worker.</returns>
251-
IJobWorker Open();
254+
IJobWorker Open(CancellationToken cancellationToken = default);
252255
}

Client/Impl/Worker/JobWorker.cs

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,8 @@ public sealed class JobWorker : IJobWorker
4545
private readonly int maxJobsActive;
4646
private readonly TimeSpan pollInterval;
4747

48-
private readonly CancellationTokenSource source;
48+
private readonly CancellationTokenSource localCts;
49+
private CancellationTokenSource? linkedCts;
4950
private readonly double thresholdJobsActivation;
5051

5152
private int currentJobsActive;
@@ -54,7 +55,7 @@ public sealed class JobWorker : IJobWorker
5455
internal JobWorker(JobWorkerBuilder builder)
5556
{
5657
jobWorkerBuilder = builder;
57-
source = new CancellationTokenSource();
58+
localCts = new CancellationTokenSource();
5859
logger = builder.LoggerFactory?.CreateLogger<JobWorker>();
5960
jobHandler = jobWorkerBuilder.Handler();
6061
autoCompletion = builder.AutoCompletionEnabled();
@@ -69,13 +70,15 @@ internal JobWorker(JobWorkerBuilder builder)
6970
/// <inheritdoc />
7071
public void Dispose()
7172
{
72-
source.Cancel();
73+
localCts.Cancel();
7374
// delay disposing, since poll and handler take some time to close
7475
_ = Task.Delay(TimeSpan.FromMilliseconds(pollInterval.TotalMilliseconds * 2))
7576
.ContinueWith(t =>
7677
{
7778
logger?.LogError("Dispose source");
78-
source.Dispose();
79+
localCts.Dispose();
80+
linkedCts?.Dispose();
81+
linkedCts = null;
7982
});
8083
isRunning = false;
8184
}
@@ -96,10 +99,12 @@ public bool IsClosed()
9699
/// Opens the configured JobWorker to activate jobs in the given poll interval
97100
/// and handle with the given handler.
98101
/// </summary>
99-
internal void Open()
102+
/// <param name="stoppingToken">The host cancellation token.</param>
103+
internal void Open(CancellationToken stoppingToken)
100104
{
101105
isRunning = true;
102-
var cancellationToken = source.Token;
106+
this.linkedCts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken, localCts.Token);
107+
var cancellationToken = linkedCts.Token;
103108
var bufferOptions = CreateBufferOptions(cancellationToken);
104109
var executionOptions = CreateExecutionOptions(cancellationToken);
105110

@@ -135,7 +140,7 @@ internal void Open()
135140

136141
private async Task StreamJobs(ITargetBlock<IJob> input, CancellationToken cancellationToken)
137142
{
138-
while (!source.IsCancellationRequested)
143+
while (!cancellationToken.IsCancellationRequested)
139144
{
140145
try
141146
{
@@ -151,7 +156,7 @@ private async Task StreamJobs(ITargetBlock<IJob> input, CancellationToken cancel
151156
activateJobsRequest.Worker,
152157
activateJobsRequest.Type);
153158

154-
while (!source.IsCancellationRequested)
159+
while (!cancellationToken.IsCancellationRequested)
155160
{
156161
var currentJobs = Thread.VolatileRead(ref currentJobsActive);
157162

@@ -178,7 +183,7 @@ private async Task StreamJobs(ITargetBlock<IJob> input, CancellationToken cancel
178183
grpcActivatedJob.Worker,
179184
grpcActivatedJob.Key);
180185

181-
await HandleActivationResponse(input, response, null);
186+
await HandleActivationResponse(input, response, null, cancellationToken);
182187
}
183188
}
184189
catch (RpcException rpcException)
@@ -210,7 +215,7 @@ private static DataflowBlockOptions CreateBufferOptions(CancellationToken cancel
210215

211216
private async Task PollJobs(ITargetBlock<IJob> input, CancellationToken cancellationToken)
212217
{
213-
while (!source.IsCancellationRequested)
218+
while (!cancellationToken.IsCancellationRequested)
214219
{
215220
var currentJobs = Thread.VolatileRead(ref currentJobsActive);
216221
if (currentJobs < thresholdJobsActivation)
@@ -221,7 +226,7 @@ private async Task PollJobs(ITargetBlock<IJob> input, CancellationToken cancella
221226
try
222227
{
223228
await jobActivator.SendActivateRequest(activateJobsRequest,
224-
async jobsResponse => await HandleActivationResponse(input, jobsResponse, jobCount),
229+
async jobsResponse => await HandleActivationResponse(input, jobsResponse, jobCount, cancellationToken),
225230
null,
226231
cancellationToken);
227232
}
@@ -238,7 +243,11 @@ await jobActivator.SendActivateRequest(activateJobsRequest,
238243
}
239244
}
240245

241-
private async Task HandleActivationResponse(ITargetBlock<IJob> input, IActivateJobsResponse response, int? jobCount)
246+
private async Task HandleActivationResponse(
247+
ITargetBlock<IJob> input,
248+
IActivateJobsResponse response,
249+
int? jobCount,
250+
CancellationToken cancellationToken)
242251
{
243252
if (jobCount.HasValue)
244253
{
@@ -258,7 +267,7 @@ private async Task HandleActivationResponse(ITargetBlock<IJob> input, IActivateJ
258267

259268
foreach (var job in response.Jobs)
260269
{
261-
_ = await input.SendAsync(job);
270+
_ = await input.SendAsync(job, cancellationToken);
262271
_ = Interlocked.Increment(ref currentJobsActive);
263272
}
264273
}
@@ -269,7 +278,7 @@ private async Task<IJob> HandleActivatedJob(IJob activatedJob, CancellationToken
269278

270279
try
271280
{
272-
await jobHandler(jobClient, activatedJob);
281+
await jobHandler(jobClient, activatedJob, cancellationToken);
273282
await TryToAutoCompleteJob(jobClient, activatedJob, cancellationToken);
274283
}
275284
catch (Exception exception)

Client/Impl/Worker/JobWorkerBuilder.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
using System;
1717
using System.Collections.Generic;
1818
using System.Linq;
19+
using System.Threading;
1920
using System.Threading.Tasks;
2021
using GatewayProtocol;
2122
using Microsoft.Extensions.Logging;
@@ -50,7 +51,7 @@ public IJobWorkerBuilderStep2 JobType(string type)
5051

5152
public IJobWorkerBuilderStep3 Handler(JobHandler handler)
5253
{
53-
asyncJobHandler = (c, j) => Task.Run(() => handler.Invoke(c, j));
54+
asyncJobHandler = (c, j, cts) => Task.Run(() => handler.Invoke(c, j), cts);
5455
return this;
5556
}
5657

@@ -147,11 +148,11 @@ public IJobWorkerBuilderStep3 HandlerThreads(byte threadCount)
147148
return this;
148149
}
149150

150-
public IJobWorker Open()
151+
public IJobWorker Open(CancellationToken cancellationToken = default)
151152
{
152153
var worker = new JobWorker(this);
153154

154-
worker.Open();
155+
worker.Open(cancellationToken);
155156

156157
return worker;
157158
}

0 commit comments

Comments
 (0)