Skip to content

Conversation

@foliv57
Copy link

@foliv57 foliv57 commented Nov 29, 2025

Introduces an optional CancellationToken argument to JobWorker.Open() method to inject a host cancellation token and expose token to AsyncJobHandler delegate.

closes #519

@foliv57 foliv57 requested a review from ChrisKujawa as a code owner November 29, 2025 12:57
Allows passing a CancellationToken to JobWorker Open() method to inject a host cancellationToken and expose token to AsyncJobHandler delegate.

Refs: camunda-community-hub#519
@foliv57 foliv57 force-pushed the 519-pass-cancellationtoken-to-worker branch from 5f5e2c8 to 20964a6 Compare November 29, 2025 13:07
@foliv57 foliv57 changed the title fix: Add CancellationToken to job worker and handler feat: Add CancellationToken to job worker and handler Nov 29, 2025
Copy link
Collaborator

@ChrisKujawa ChrisKujawa left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your contribution!

Can you clarify how the usage and integration would look like then for the user? Like with an example? Then we can also add it to the docs/examples maybe :)

{
isRunning = true;
var cancellationToken = source.Token;
this.linkedCts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken, localCts.Token);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❓ Just for my understanding: This links both token source together, right? When the above "source" is closed the others are closed as well?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The token created by linkedCts will be cancelled when stoppingToken OR localCts.Token is cancelled. Allowing to create a master token that can gracefully cancel all job worker tasks.

Let's imagine this scenario: You want to start 3 job workers in an ASP.NET BackgroundService. This service exposes a stoppingToken. By passing this token to the job workers, and liking it with the local token, all job workers will receive the signal to stop if the background service is stopped OR user can still dispose a single worker.

The important change in this PR is that the token is passed to the job handler delegate, so that users can pass it to underlying blocking tasks.

This PR will gain full power when the in-progress PR "AsyncDisposable" will be completed, because you will be able to gracefully await end of all worker tasks instead of the random "Task.Delay" in the Dispose method.

I created the small example below to showcase:

var masterSource = new CancellationTokenSource();
var masterCancellationToken = masterSource.Token;
var pool = new Dictionary<int, JobWorker>();

for (int i = 1; i < 4; i++)
{
    var worker = new JobWorker(i);
    pool.Add(i, worker);
    worker.StartAsync(masterCancellationToken);
}

// Stops only worker 2 after 2s
await Task.Delay(2000);
Console.WriteLine("\nStop worker 2");
await pool[2].StopAsync();

// Stops all other workers
await Task.Delay(2000);
Console.WriteLine("\nStop all workers");
await masterSource.CancelAsync();

await Task.Delay(3000);
masterSource.Dispose();

public class JobWorker(int id)
{
    private readonly CancellationTokenSource localCts = new();
    private CancellationTokenSource? linkedCst;

    public Task StartAsync(CancellationToken cancellationToken)
    {
        linkedCst = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, localCts.Token);
        var linkedCancellationToken = linkedCst.Token;

        return Task.Run(() => PollJobs(linkedCancellationToken));
    }

    private async Task PollJobs(CancellationToken cancellationToken)
    {
        Console.WriteLine($"JobWorker {id} started");
        while (!cancellationToken.IsCancellationRequested)
        {
            await Task.Delay(500);
        }
        Console.WriteLine($"JobWorker {id} cancelled");
    }

    public async Task StopAsync()
    {
        await localCts.CancelAsync();
        localCts.Dispose();
        linkedCst?.Dispose();
    }
}

If you run it, you will have the result:
image

@ChrisKujawa ChrisKujawa requested a review from Copilot December 4, 2025 08:51
Copilot finished reviewing on behalf of ChrisKujawa December 4, 2025 08:53
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This pull request adds CancellationToken support to the job worker infrastructure, enabling host applications to inject their cancellation tokens and allowing job handlers to respond to cancellation requests. This addresses issue #519.

Key changes:

  • Added CancellationToken parameter to AsyncJobHandler delegate signature
  • Modified IJobWorker.Open() to accept an optional CancellationToken parameter
  • Implemented linked cancellation token source to combine host and local cancellation tokens

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 5 comments.

File Description
Client/Api/Worker/IJobWorkerBuilderStep1.cs Updated AsyncJobHandler delegate to include CancellationToken parameter and added cancellationToken parameter to IJobWorker.Open() method
Client/Impl/Worker/JobWorkerBuilder.cs Modified Handler() method to pass cancellation token to wrapped synchronous handlers and updated Open() to forward token to worker
Client/Impl/Worker/JobWorker.cs Introduced linkedCts for combining host and local tokens, propagated token throughout job activation and handling pipeline, and updated disposal logic

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@foliv57
Copy link
Author

foliv57 commented Dec 5, 2025

Thanks for your contribution!

Can you clarify how the usage and integration would look like then for the user? Like with an example? Then we can also add it to the docs/examples maybe :)

I added BackgroundServiceExample.cs.md in Client.Examples. Is it fine for you, or you were thinking about something better?

@foliv57
Copy link
Author

foliv57 commented Dec 5, 2025

@ChrisKujawa did you squash commit during merge, or should I pre-squash a single commit before?

private readonly bool autoCompletion;
private readonly JobActivator jobActivator;
private readonly AsyncJobHandler jobHandler;
private readonly AsyncJobHandlerWithCancellationToken jobHandler;
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ChrisKujawa this change is to fix a problem that I was not able to predict. To prevent breaking change, my first idea was to just set the cancellationToken optional in AsyncJobHandler. But for a reason that I don't understand for now, all usages like:

ZeebeClient.NewWorker()
     .JobType("foo")
     .Handler(async (jobClient, job) =>
     {
        await SomethingAsync();
     })

were considered as a synchronous JobHandler, event if the Func return a Task. This was the root cause of test fail.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

I can pass a custom cancellationToken to the JobWorkerBuilder

3 participants