-
Notifications
You must be signed in to change notification settings - Fork 61
feat: added backoff for job worker #814
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 9 commits
ed9aa7c
319736c
3d69643
4725aef
ce52641
177e327
712fb0a
bf15517
15064e2
2df9dd6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,87 @@ | ||
| using System; | ||
| using System.Collections.Generic; | ||
| using System.Linq; | ||
| using NUnit.Framework; | ||
| using Zeebe.Client.Api.Worker; | ||
| using Zeebe.Client.Impl.Worker; | ||
|
|
||
| namespace Zeebe.Client; | ||
|
|
||
| [TestFixture] | ||
| public sealed class ExponentialBackoffTests | ||
| { | ||
| [Test] | ||
| public void ShouldReturnDelayWithinBounds_WhenNoJitter() | ||
| { | ||
| // given | ||
| var maxDelay = TimeSpan.FromMilliseconds(1_000); | ||
| var minDelay = TimeSpan.FromMilliseconds(50); | ||
| IBackoffSupplier supplier = new ExponentialBackoffBuilderImpl() | ||
| .MaxDelay(maxDelay) | ||
| .MinDelay(minDelay) | ||
| .BackoffFactor(1.6) | ||
| .JitterFactor(0) | ||
| .Build(); | ||
|
|
||
| var delays = new List<long>(); | ||
| long current = supplier.SupplyRetryDelay(0); | ||
|
|
||
| // when | ||
| for (var i = 0; i < 100; i++) | ||
| { | ||
| delays.Add(current); | ||
| current = supplier.SupplyRetryDelay(current); | ||
| } | ||
|
|
||
| // then - with zero jitter, sequence should monotonically increase until it caps at max | ||
| Assert.That(delays, Is.Not.Empty); | ||
| Assert.That(delays.First(), Is.EqualTo((long)minDelay.TotalMilliseconds)); | ||
| Assert.That(delays.Last(), Is.EqualTo((long)maxDelay.TotalMilliseconds)); | ||
|
|
||
| long previous = -1; | ||
| foreach (var delay in delays) | ||
| { | ||
| if (delay != (long)maxDelay.TotalMilliseconds) | ||
| { | ||
| Assert.That(delay, Is.GreaterThan(previous)); | ||
| } | ||
| previous = delay; | ||
| } | ||
| } | ||
|
|
||
| [Test] | ||
| public void ShouldBeRandomizedWithJitter() | ||
| { | ||
| // given | ||
| var maxDelay = TimeSpan.FromMilliseconds(1_000); | ||
| var minDelay = TimeSpan.FromMilliseconds(50); | ||
| const double jitterFactor = 0.2; | ||
|
|
||
| IBackoffSupplier supplier = new ExponentialBackoffBuilderImpl() | ||
| .MaxDelay(maxDelay) | ||
| .MinDelay(minDelay) | ||
| .BackoffFactor(1.5) | ||
| .JitterFactor(jitterFactor) | ||
| .Build(); | ||
|
|
||
| var lowerMaxBound = (long)Math.Round(maxDelay.TotalMilliseconds + (maxDelay.TotalMilliseconds * -jitterFactor)); | ||
| var upperMaxBound = (long)Math.Round(maxDelay.TotalMilliseconds + (maxDelay.TotalMilliseconds * jitterFactor)); | ||
|
|
||
| // when - always compute from max to test jitter around the cap | ||
| var delays = new List<long>(); | ||
| for (var i = 0; i < 10; i++) | ||
| { | ||
| var value = supplier.SupplyRetryDelay((long)maxDelay.TotalMilliseconds); | ||
| delays.Add(value); | ||
| } | ||
|
|
||
| // then | ||
| Assert.That(delays, Is.Not.Empty); | ||
| foreach (var delay in delays) | ||
| { | ||
| Assert.That(delay, Is.InRange(lowerMaxBound, upperMaxBound)); | ||
| } | ||
| Assert.That(delays.Distinct().Count(), Is.GreaterThan(1)); | ||
| } | ||
| } | ||
|
|
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,17 @@ | ||
| using System; | ||
|
|
||
| namespace Zeebe.Client.Api.Worker; | ||
|
|
||
| /// <summary> | ||
| /// Supplies the delay for the next retry after a failed activate request. | ||
| /// Value is in milliseconds; return value may be zero to retry immediately. | ||
| /// </summary> | ||
| public interface IBackoffSupplier | ||
| { | ||
| /// <summary> | ||
| /// Returns the delay before the next retry. | ||
| /// </summary> | ||
| /// <param name="currentRetryDelay">The previously used delay in milliseconds.</param> | ||
| /// <returns>The new retry delay in milliseconds.</returns> | ||
| long SupplyRetryDelay(long currentRetryDelay); | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,36 @@ | ||
| using System; | ||
|
|
||
| namespace Zeebe.Client.Api.Worker; | ||
|
|
||
| public interface IExponentialBackoffBuilder | ||
| { | ||
| /// <summary> | ||
| /// Sets the maximum retry delay. Default is 5000ms. | ||
| /// </summary> | ||
| IExponentialBackoffBuilder MaxDelay(TimeSpan maxDelay); | ||
|
|
||
| /// <summary> | ||
| /// Sets the minimum retry delay. Default is 50ms. | ||
| /// </summary> | ||
| IExponentialBackoffBuilder MinDelay(TimeSpan minDelay); | ||
|
|
||
| /// <summary> | ||
| /// Sets the multiplication factor (previous delay * factor). Default is 1.6. | ||
| /// </summary> | ||
| IExponentialBackoffBuilder BackoffFactor(double backoffFactor); | ||
|
|
||
| /// <summary> | ||
| /// Sets optional jitter factor (+/- factor of delay). Default is 0.1. | ||
| /// </summary> | ||
| IExponentialBackoffBuilder JitterFactor(double jitterFactor); | ||
|
|
||
| /// <summary> | ||
| /// Sets the random source used to compute jitter. Default is new Random(). | ||
| /// </summary> | ||
| IExponentialBackoffBuilder Random(Random random); | ||
|
|
||
| /// <summary> | ||
| /// Builds the supplier with the provided configuration. | ||
| /// </summary> | ||
| IBackoffSupplier Build(); | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -237,6 +237,15 @@ public interface IJobWorkerBuilderStep3 : ITenantIdsCommandStep<IJobWorkerBuilde | |
| /// <returns>the builder for this worker.</returns> | ||
| IJobWorkerBuilderStep3 HandlerThreads(byte threadCount); | ||
|
|
||
| /// <summary> | ||
| /// Configures a retry backoff supplier used when activate requests fail with | ||
| /// gRPC StatusCode.ResourceExhausted. Only applied to polling retries and reset on success. | ||
| /// Defaults to an exponential backoff (max 5000ms, min 50ms, factor 1.6, jitter 0.1) if not set. | ||
| /// </summary> | ||
| /// <param name="backoffSupplier">The supplier used to compute the next retry delay in ms.</param> | ||
| /// <returns>The builder for this worker.</returns> | ||
| IJobWorkerBuilderStep3 BackoffSupplier(IBackoffSupplier backoffSupplier); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍🏼 |
||
|
|
||
| /// <summary> | ||
| /// Enable or disable gRPC job streaming for this worker. | ||
| /// </summary> | ||
|
|
@@ -249,4 +258,4 @@ public interface IJobWorkerBuilderStep3 : ITenantIdsCommandStep<IJobWorkerBuilde | |
| /// </summary> | ||
| /// <returns>the worker.</returns> | ||
| IJobWorker Open(); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,49 @@ | ||
| using System; | ||
| using Zeebe.Client.Api.Worker; | ||
|
|
||
| namespace Zeebe.Client.Impl.Worker; | ||
|
|
||
| internal sealed class ExponentialBackoffBuilderImpl : IExponentialBackoffBuilder | ||
| { | ||
| private TimeSpan maxDelay = TimeSpan.FromMilliseconds(5000); | ||
| private TimeSpan minDelay = TimeSpan.FromMilliseconds(50); | ||
| private double backoffFactor = 1.6; | ||
| private double jitterFactor = 0.1; | ||
| private Random random = new Random(); | ||
|
|
||
| public IExponentialBackoffBuilder MaxDelay(TimeSpan maxDelay) | ||
| { | ||
| this.maxDelay = maxDelay; | ||
| return this; | ||
| } | ||
|
|
||
| public IExponentialBackoffBuilder MinDelay(TimeSpan minDelay) | ||
| { | ||
| this.minDelay = minDelay; | ||
| return this; | ||
| } | ||
|
|
||
| public IExponentialBackoffBuilder BackoffFactor(double backoffFactor) | ||
| { | ||
| this.backoffFactor = backoffFactor; | ||
| return this; | ||
| } | ||
|
|
||
| public IExponentialBackoffBuilder JitterFactor(double jitterFactor) | ||
| { | ||
| this.jitterFactor = jitterFactor; | ||
| return this; | ||
| } | ||
|
|
||
| public IExponentialBackoffBuilder Random(Random random) | ||
| { | ||
| this.random = random ?? new Random(); | ||
| return this; | ||
| } | ||
ChrisKujawa marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| public IBackoffSupplier Build() | ||
| { | ||
ChrisKujawa marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| return new ExponentialBackoffSupplier(minDelay, maxDelay, backoffFactor, jitterFactor, random); | ||
| } | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,65 @@ | ||
| using System; | ||
| using Zeebe.Client.Api.Worker; | ||
|
|
||
| namespace Zeebe.Client.Impl.Worker; | ||
|
|
||
| /// <summary> | ||
| /// An implementation of <see cref="IBackoffSupplier"/> which uses the **Exponential Backoff with Jitter** strategy | ||
| /// for calculating retry delays. | ||
| /// <para> | ||
| /// The implementation uses a simple formula: it multiplies the previous delay with a backoff multiplier | ||
| /// and adds some jitter to avoid multiple clients polling at the same time. | ||
| /// </para> | ||
| /// </summary> | ||
| /// <remarks> | ||
| /// The core logic is copied from the Zeebe Java client's | ||
| /// <c>io.camunda.zeebe.client.impl.worker.ExponentialBackoff</c> implementation | ||
| /// (source: <a href="https://github.com/camunda/camunda/blob/5764a0a3e6c3d3253c5c9608bf4478f8e2281af7/clients/java-deprecated/src/main/java/io/camunda/zeebe/client/impl/worker/ExponentialBackoff.java#L31">GitHub</a>). | ||
| /// <para> | ||
| /// The next delay is calculated by clamping the multiplied delay between <c>minDelay</c> and <c>maxDelay</c>, | ||
| /// and then adding a random jitter: | ||
| /// </para> | ||
| /// <c>max(min(maxDelay, currentDelay * backoffFactor), minDelay) + jitter</c> | ||
| /// <para>The final result is ensured to be non-negative and rounded.</para> | ||
| /// </remarks> | ||
| internal sealed class ExponentialBackoffSupplier : IBackoffSupplier | ||
| { | ||
| private readonly double maxDelayMs; | ||
| private readonly double minDelayMs; | ||
| private readonly double backoffFactor; | ||
| private readonly double jitterFactor; | ||
| private readonly Random random; | ||
|
|
||
| internal ExponentialBackoffSupplier( | ||
| TimeSpan minDelay, | ||
| TimeSpan maxDelay, | ||
| double backoffFactor, | ||
| double jitterFactor, | ||
| Random random) | ||
| { | ||
| maxDelayMs = maxDelay.TotalMilliseconds; | ||
| minDelayMs = minDelay.TotalMilliseconds; | ||
| this.backoffFactor = backoffFactor; | ||
| this.jitterFactor = jitterFactor; | ||
| this.random = random ?? new Random(); | ||
| } | ||
|
|
||
| public long SupplyRetryDelay(long currentRetryDelay) | ||
ChrisKujawa marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| { | ||
| var previous = (double)currentRetryDelay; | ||
| var multiplied = previous * backoffFactor; | ||
| var clamped = Math.Max(Math.Min(maxDelayMs, multiplied), minDelayMs); | ||
|
|
||
| var range = clamped * jitterFactor; | ||
| var jitter = (random.NextDouble() * (2 * range)) - range; | ||
ChrisKujawa marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| var next = clamped + jitter; | ||
| if (next < 0) | ||
| { | ||
| next = 0; | ||
| } | ||
|
|
||
| return (long)Math.Round(next); | ||
| } | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -43,7 +43,10 @@ public sealed class JobWorker : IJobWorker | |
| private readonly JobWorkerBuilder jobWorkerBuilder; | ||
| private readonly ILogger<JobWorker> logger; | ||
| private readonly int maxJobsActive; | ||
| private readonly TimeSpan pollInterval; | ||
| private TimeSpan pollInterval; | ||
ChrisKujawa marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| private readonly TimeSpan initialPollInterval; | ||
| private readonly IBackoffSupplier backoffSupplier; | ||
| private readonly IBackoffSupplier defaultBackoffSupplier; | ||
|
|
||
| private readonly CancellationTokenSource source; | ||
| private readonly double thresholdJobsActivation; | ||
|
|
@@ -59,11 +62,14 @@ internal JobWorker(JobWorkerBuilder builder) | |
| jobHandler = jobWorkerBuilder.Handler(); | ||
| autoCompletion = builder.AutoCompletionEnabled(); | ||
| pollInterval = jobWorkerBuilder.PollInterval(); | ||
| initialPollInterval = pollInterval; | ||
| activateJobsRequest = jobWorkerBuilder.Request; | ||
| streamActivateJobsRequest = jobWorkerBuilder.StreamRequest; | ||
| jobActivator = jobWorkerBuilder.Activator; | ||
| maxJobsActive = jobWorkerBuilder.Request.MaxJobsToActivate; | ||
| thresholdJobsActivation = maxJobsActive * 0.6; | ||
| defaultBackoffSupplier = new ExponentialBackoffBuilderImpl().Build(); | ||
| backoffSupplier = jobWorkerBuilder.RetryBackoffSupplier ?? defaultBackoffSupplier; | ||
| } | ||
|
Comment on lines
62
to
73
|
||
|
|
||
| /// <inheritdoc /> | ||
|
|
@@ -184,7 +190,7 @@ private async Task StreamJobs(ITargetBlock<IJob> input, CancellationToken cancel | |
| catch (RpcException rpcException) | ||
| { | ||
| LogRpcException(rpcException); | ||
| await Task.Delay(StreamErrorRetryDelayMs, cancellationToken); | ||
| await Backoff(cancellationToken); | ||
ChrisKujawa marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
| } | ||
| } | ||
|
|
@@ -224,11 +230,12 @@ await jobActivator.SendActivateRequest(activateJobsRequest, | |
| async jobsResponse => await HandleActivationResponse(input, jobsResponse, jobCount), | ||
| null, | ||
| cancellationToken); | ||
| pollInterval = initialPollInterval; | ||
| } | ||
| catch (RpcException rpcException) | ||
| { | ||
| LogRpcException(rpcException); | ||
| await Task.Delay(pollInterval, cancellationToken); | ||
| await Backoff(cancellationToken); | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I changed in my commit to backoff now every RpcException in Stream and Poll Jobs
ChrisKujawa marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
| } | ||
| else | ||
|
|
@@ -238,6 +245,28 @@ await jobActivator.SendActivateRequest(activateJobsRequest, | |
| } | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Updates pollInterval using the configured backoff supplier and waits for that delay. | ||
| /// Falls back to a default exponential supplier if the custom supplier throws. | ||
| /// </summary> | ||
| private async Task Backoff(CancellationToken cancellationToken) | ||
| { | ||
| var previousMs = Math.Max(0, (long)pollInterval.TotalMilliseconds); | ||
| long nextMs; | ||
| try | ||
| { | ||
| nextMs = backoffSupplier.SupplyRetryDelay(previousMs); | ||
| } | ||
| catch (Exception ex) | ||
| { | ||
| logger?.LogWarning(ex, "Backoff supplier failed; falling back to default backoff."); | ||
| nextMs = defaultBackoffSupplier.SupplyRetryDelay(previousMs); | ||
ChrisKujawa marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| pollInterval = TimeSpan.FromMilliseconds(Math.Max(0, nextMs)); | ||
| await Task.Delay(pollInterval, cancellationToken); | ||
| } | ||
|
|
||
| private async Task HandleActivationResponse(ITargetBlock<IJob> input, IActivateJobsResponse response, int? jobCount) | ||
| { | ||
| if (jobCount.HasValue) | ||
|
|
@@ -332,4 +361,4 @@ private Task FailActivatedJob(JobClientWrapper jobClient, IJob activatedJob, Can | |
| } | ||
| }, cancellationToken); | ||
| } | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.