@@ -45,7 +45,8 @@ public sealed class JobWorker : IJobWorker
4545 private readonly int maxJobsActive ;
4646 private readonly TimeSpan pollInterval ;
4747
48- private readonly CancellationTokenSource source ;
48+ private readonly CancellationTokenSource localCts ;
49+ private CancellationTokenSource ? linkedCts ;
4950 private readonly double thresholdJobsActivation ;
5051
5152 private int currentJobsActive ;
@@ -54,7 +55,7 @@ public sealed class JobWorker : IJobWorker
5455 internal JobWorker ( JobWorkerBuilder builder )
5556 {
5657 jobWorkerBuilder = builder ;
57- source = new CancellationTokenSource ( ) ;
58+ localCts = new CancellationTokenSource ( ) ;
5859 logger = builder . LoggerFactory ? . CreateLogger < JobWorker > ( ) ;
5960 jobHandler = jobWorkerBuilder . Handler ( ) ;
6061 autoCompletion = builder . AutoCompletionEnabled ( ) ;
@@ -69,13 +70,15 @@ internal JobWorker(JobWorkerBuilder builder)
6970 /// <inheritdoc />
7071 public void Dispose ( )
7172 {
72- source . Cancel ( ) ;
73+ localCts . Cancel ( ) ;
7374 // delay disposing, since poll and handler take some time to close
7475 _ = Task . Delay ( TimeSpan . FromMilliseconds ( pollInterval . TotalMilliseconds * 2 ) )
7576 . ContinueWith ( t =>
7677 {
7778 logger ? . LogError ( "Dispose source" ) ;
78- source . Dispose ( ) ;
79+ localCts . Dispose ( ) ;
80+ linkedCts ? . Dispose ( ) ;
81+ linkedCts = null ;
7982 } ) ;
8083 isRunning = false ;
8184 }
@@ -96,10 +99,12 @@ public bool IsClosed()
9699 /// Opens the configured JobWorker to activate jobs in the given poll interval
97100 /// and handle with the given handler.
98101 /// </summary>
99- internal void Open ( )
102+ /// <param name="stoppingToken">The host cancellation token.</param>
103+ internal void Open ( CancellationToken stoppingToken )
100104 {
101105 isRunning = true ;
102- var cancellationToken = source . Token ;
106+ this . linkedCts = CancellationTokenSource . CreateLinkedTokenSource ( stoppingToken , localCts . Token ) ;
107+ var cancellationToken = linkedCts . Token ;
103108 var bufferOptions = CreateBufferOptions ( cancellationToken ) ;
104109 var executionOptions = CreateExecutionOptions ( cancellationToken ) ;
105110
@@ -135,7 +140,7 @@ internal void Open()
135140
136141 private async Task StreamJobs ( ITargetBlock < IJob > input , CancellationToken cancellationToken )
137142 {
138- while ( ! source . IsCancellationRequested )
143+ while ( ! cancellationToken . IsCancellationRequested )
139144 {
140145 try
141146 {
@@ -151,7 +156,7 @@ private async Task StreamJobs(ITargetBlock<IJob> input, CancellationToken cancel
151156 activateJobsRequest . Worker ,
152157 activateJobsRequest . Type ) ;
153158
154- while ( ! source . IsCancellationRequested )
159+ while ( ! cancellationToken . IsCancellationRequested )
155160 {
156161 var currentJobs = Thread . VolatileRead ( ref currentJobsActive ) ;
157162
@@ -178,7 +183,7 @@ private async Task StreamJobs(ITargetBlock<IJob> input, CancellationToken cancel
178183 grpcActivatedJob . Worker ,
179184 grpcActivatedJob . Key ) ;
180185
181- await HandleActivationResponse ( input , response , null ) ;
186+ await HandleActivationResponse ( input , response , null , cancellationToken ) ;
182187 }
183188 }
184189 catch ( RpcException rpcException )
@@ -210,7 +215,7 @@ private static DataflowBlockOptions CreateBufferOptions(CancellationToken cancel
210215
211216 private async Task PollJobs ( ITargetBlock < IJob > input , CancellationToken cancellationToken )
212217 {
213- while ( ! source . IsCancellationRequested )
218+ while ( ! cancellationToken . IsCancellationRequested )
214219 {
215220 var currentJobs = Thread . VolatileRead ( ref currentJobsActive ) ;
216221 if ( currentJobs < thresholdJobsActivation )
@@ -221,7 +226,7 @@ private async Task PollJobs(ITargetBlock<IJob> input, CancellationToken cancella
221226 try
222227 {
223228 await jobActivator . SendActivateRequest ( activateJobsRequest ,
224- async jobsResponse => await HandleActivationResponse ( input , jobsResponse , jobCount ) ,
229+ async jobsResponse => await HandleActivationResponse ( input , jobsResponse , jobCount , cancellationToken ) ,
225230 null ,
226231 cancellationToken ) ;
227232 }
@@ -238,7 +243,11 @@ await jobActivator.SendActivateRequest(activateJobsRequest,
238243 }
239244 }
240245
241- private async Task HandleActivationResponse ( ITargetBlock < IJob > input , IActivateJobsResponse response , int ? jobCount )
246+ private async Task HandleActivationResponse (
247+ ITargetBlock < IJob > input ,
248+ IActivateJobsResponse response ,
249+ int ? jobCount ,
250+ CancellationToken cancellationToken )
242251 {
243252 if ( jobCount . HasValue )
244253 {
@@ -258,7 +267,7 @@ private async Task HandleActivationResponse(ITargetBlock<IJob> input, IActivateJ
258267
259268 foreach ( var job in response . Jobs )
260269 {
261- _ = await input . SendAsync ( job ) ;
270+ _ = await input . SendAsync ( job , cancellationToken ) ;
262271 _ = Interlocked . Increment ( ref currentJobsActive ) ;
263272 }
264273 }
@@ -269,7 +278,7 @@ private async Task<IJob> HandleActivatedJob(IJob activatedJob, CancellationToken
269278
270279 try
271280 {
272- await jobHandler ( jobClient , activatedJob ) ;
281+ await jobHandler ( jobClient , activatedJob , cancellationToken ) ;
273282 await TryToAutoCompleteJob ( jobClient , activatedJob , cancellationToken ) ;
274283 }
275284 catch ( Exception exception )
0 commit comments