-
Notifications
You must be signed in to change notification settings - Fork 61
feat: Add CancellationToken to job worker and handler #829
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
feat: Add CancellationToken to job worker and handler #829
Conversation
Allows passing a CancellationToken to JobWorker Open() method to inject a host cancellationToken and expose token to AsyncJobHandler delegate. Refs: camunda-community-hub#519
5f5e2c8 to
20964a6
Compare
ChrisKujawa
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your contribution!
Can you clarify how the usage and integration would look like then for the user? Like with an example? Then we can also add it to the docs/examples maybe :)
Client/Impl/Worker/JobWorker.cs
Outdated
| { | ||
| isRunning = true; | ||
| var cancellationToken = source.Token; | ||
| this.linkedCts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken, localCts.Token); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❓ Just for my understanding: This links both token source together, right? When the above "source" is closed the others are closed as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The token created by linkedCts will be cancelled when stoppingToken OR localCts.Token is cancelled. Allowing to create a master token that can gracefully cancel all job worker tasks.
Let's imagine this scenario: You want to start 3 job workers in an ASP.NET BackgroundService. This service exposes a stoppingToken. By passing this token to the job workers, and liking it with the local token, all job workers will receive the signal to stop if the background service is stopped OR user can still dispose a single worker.
The important change in this PR is that the token is passed to the job handler delegate, so that users can pass it to underlying blocking tasks.
This PR will gain full power when the in-progress PR "AsyncDisposable" will be completed, because you will be able to gracefully await end of all worker tasks instead of the random "Task.Delay" in the Dispose method.
I created the small example below to showcase:
var masterSource = new CancellationTokenSource();
var masterCancellationToken = masterSource.Token;
var pool = new Dictionary<int, JobWorker>();
for (int i = 1; i < 4; i++)
{
var worker = new JobWorker(i);
pool.Add(i, worker);
worker.StartAsync(masterCancellationToken);
}
// Stops only worker 2 after 2s
await Task.Delay(2000);
Console.WriteLine("\nStop worker 2");
await pool[2].StopAsync();
// Stops all other workers
await Task.Delay(2000);
Console.WriteLine("\nStop all workers");
await masterSource.CancelAsync();
await Task.Delay(3000);
masterSource.Dispose();
public class JobWorker(int id)
{
private readonly CancellationTokenSource localCts = new();
private CancellationTokenSource? linkedCst;
public Task StartAsync(CancellationToken cancellationToken)
{
linkedCst = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, localCts.Token);
var linkedCancellationToken = linkedCst.Token;
return Task.Run(() => PollJobs(linkedCancellationToken));
}
private async Task PollJobs(CancellationToken cancellationToken)
{
Console.WriteLine($"JobWorker {id} started");
while (!cancellationToken.IsCancellationRequested)
{
await Task.Delay(500);
}
Console.WriteLine($"JobWorker {id} cancelled");
}
public async Task StopAsync()
{
await localCts.CancelAsync();
localCts.Dispose();
linkedCst?.Dispose();
}
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This pull request adds CancellationToken support to the job worker infrastructure, enabling host applications to inject their cancellation tokens and allowing job handlers to respond to cancellation requests. This addresses issue #519.
Key changes:
- Added
CancellationTokenparameter toAsyncJobHandlerdelegate signature - Modified
IJobWorker.Open()to accept an optionalCancellationTokenparameter - Implemented linked cancellation token source to combine host and local cancellation tokens
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 5 comments.
| File | Description |
|---|---|
| Client/Api/Worker/IJobWorkerBuilderStep1.cs | Updated AsyncJobHandler delegate to include CancellationToken parameter and added cancellationToken parameter to IJobWorker.Open() method |
| Client/Impl/Worker/JobWorkerBuilder.cs | Modified Handler() method to pass cancellation token to wrapped synchronous handlers and updated Open() to forward token to worker |
| Client/Impl/Worker/JobWorker.cs | Introduced linkedCts for combining host and local tokens, propagated token throughout job activation and handling pipeline, and updated disposal logic |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
…cellation token is not provided. Introduces AsyncJobHandlerWithCancellationToken to prevent breaking change.
I added |
|
@ChrisKujawa did you squash commit during merge, or should I pre-squash a single commit before? |
| private readonly bool autoCompletion; | ||
| private readonly JobActivator jobActivator; | ||
| private readonly AsyncJobHandler jobHandler; | ||
| private readonly AsyncJobHandlerWithCancellationToken jobHandler; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ChrisKujawa this change is to fix a problem that I was not able to predict. To prevent breaking change, my first idea was to just set the cancellationToken optional in AsyncJobHandler. But for a reason that I don't understand for now, all usages like:
ZeebeClient.NewWorker()
.JobType("foo")
.Handler(async (jobClient, job) =>
{
await SomethingAsync();
})were considered as a synchronous JobHandler, event if the Func return a Task. This was the root cause of test fail.

Introduces an optional
CancellationTokenargument toJobWorker.Open()method to inject a host cancellation token and expose token toAsyncJobHandlerdelegate.closes #519