-
Notifications
You must be signed in to change notification settings - Fork 61
feat: added backoff for job worker #814
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
ed9aa7c
319736c
3d69643
4725aef
ce52641
177e327
712fb0a
bf15517
15064e2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,87 @@ | ||
| using System; | ||
| using System.Collections.Generic; | ||
| using System.Linq; | ||
| using NUnit.Framework; | ||
| using Zeebe.Client.Api.Worker; | ||
| using Zeebe.Client.Impl.Worker; | ||
|
|
||
| namespace Zeebe.Client; | ||
|
|
||
| [TestFixture] | ||
| public sealed class ExponentialBackoffTests | ||
| { | ||
| [Test] | ||
| public void ShouldReturnDelayWithinBounds_WhenNoJitter() | ||
| { | ||
| // given | ||
| var maxDelay = TimeSpan.FromMilliseconds(1_000); | ||
| var minDelay = TimeSpan.FromMilliseconds(50); | ||
| IBackoffSupplier supplier = new ExponentialBackoffBuilderImpl() | ||
| .MaxDelay(maxDelay) | ||
| .MinDelay(minDelay) | ||
| .BackoffFactor(1.6) | ||
| .JitterFactor(0) | ||
| .Build(); | ||
|
|
||
| var delays = new List<long>(); | ||
| long current = supplier.SupplyRetryDelay(0); | ||
|
|
||
| // when | ||
| for (var i = 0; i < 100; i++) | ||
| { | ||
| delays.Add(current); | ||
| current = supplier.SupplyRetryDelay(current); | ||
| } | ||
|
|
||
| // then - with zero jitter, sequence should monotonically increase until it caps at max | ||
| Assert.That(delays, Is.Not.Empty); | ||
| Assert.That(delays.First(), Is.EqualTo((long)minDelay.TotalMilliseconds)); | ||
| Assert.That(delays.Last(), Is.EqualTo((long)maxDelay.TotalMilliseconds)); | ||
|
|
||
| long previous = -1; | ||
| foreach (var delay in delays) | ||
| { | ||
| if (delay != (long)maxDelay.TotalMilliseconds) | ||
| { | ||
| Assert.That(delay, Is.GreaterThan(previous)); | ||
| } | ||
| previous = delay; | ||
| } | ||
| } | ||
|
|
||
| [Test] | ||
| public void ShouldBeRandomizedWithJitter() | ||
| { | ||
| // given | ||
| var maxDelay = TimeSpan.FromMilliseconds(1_000); | ||
| var minDelay = TimeSpan.FromMilliseconds(50); | ||
| const double jitterFactor = 0.2; | ||
|
|
||
| IBackoffSupplier supplier = new ExponentialBackoffBuilderImpl() | ||
| .MaxDelay(maxDelay) | ||
| .MinDelay(minDelay) | ||
| .BackoffFactor(1.5) | ||
| .JitterFactor(jitterFactor) | ||
| .Build(); | ||
|
|
||
| var lowerMaxBound = (long)Math.Round(maxDelay.TotalMilliseconds + (maxDelay.TotalMilliseconds * -jitterFactor)); | ||
| var upperMaxBound = (long)Math.Round(maxDelay.TotalMilliseconds + (maxDelay.TotalMilliseconds * jitterFactor)); | ||
|
|
||
| // when - always compute from max to test jitter around the cap | ||
| var delays = new List<long>(); | ||
| for (var i = 0; i < 10; i++) | ||
| { | ||
| var value = supplier.SupplyRetryDelay((long)maxDelay.TotalMilliseconds); | ||
| delays.Add(value); | ||
| } | ||
|
|
||
| // then | ||
| Assert.That(delays, Is.Not.Empty); | ||
| foreach (var delay in delays) | ||
| { | ||
| Assert.That(delay, Is.InRange(lowerMaxBound, upperMaxBound)); | ||
| } | ||
| Assert.That(delays.Distinct().Count(), Is.GreaterThan(1)); | ||
| } | ||
| } | ||
|
|
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,17 @@ | ||
| using System; | ||
|
|
||
| namespace Zeebe.Client.Api.Worker; | ||
|
|
||
| /// <summary> | ||
| /// Supplies the delay for the next retry after a failed activate request. | ||
| /// Value is in milliseconds; return value may be zero to retry immediately. | ||
| /// </summary> | ||
| public interface IBackoffSupplier | ||
| { | ||
| /// <summary> | ||
| /// Returns the delay before the next retry. | ||
| /// </summary> | ||
| /// <param name="currentRetryDelay">The previously used delay in milliseconds.</param> | ||
| /// <returns>The new retry delay in milliseconds.</returns> | ||
| long SupplyRetryDelay(long currentRetryDelay); | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,36 @@ | ||
| using System; | ||
|
|
||
| namespace Zeebe.Client.Api.Worker; | ||
|
|
||
| public interface IExponentialBackoffBuilder | ||
| { | ||
| /// <summary> | ||
| /// Sets the maximum retry delay. Default is 5000ms. | ||
| /// </summary> | ||
| IExponentialBackoffBuilder MaxDelay(TimeSpan maxDelay); | ||
|
|
||
| /// <summary> | ||
| /// Sets the minimum retry delay. Default is 50ms. | ||
| /// </summary> | ||
| IExponentialBackoffBuilder MinDelay(TimeSpan minDelay); | ||
|
|
||
| /// <summary> | ||
| /// Sets the multiplication factor (previous delay * factor). Default is 1.6. | ||
| /// </summary> | ||
| IExponentialBackoffBuilder BackoffFactor(double backoffFactor); | ||
|
|
||
| /// <summary> | ||
| /// Sets optional jitter factor (+/- factor of delay). Default is 0.1. | ||
| /// </summary> | ||
| IExponentialBackoffBuilder JitterFactor(double jitterFactor); | ||
|
|
||
| /// <summary> | ||
| /// Sets the random source used to compute jitter. Default is new Random(). | ||
| /// </summary> | ||
| IExponentialBackoffBuilder Random(Random random); | ||
|
|
||
| /// <summary> | ||
| /// Builds the supplier with the provided configuration. | ||
| /// </summary> | ||
| IBackoffSupplier Build(); | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,49 @@ | ||
| using System; | ||
| using Zeebe.Client.Api.Worker; | ||
|
|
||
| namespace Zeebe.Client.Impl.Worker; | ||
|
|
||
| internal sealed class ExponentialBackoffBuilderImpl : IExponentialBackoffBuilder | ||
| { | ||
| private TimeSpan maxDelay = TimeSpan.FromMilliseconds(5000); | ||
| private TimeSpan minDelay = TimeSpan.FromMilliseconds(50); | ||
| private double backoffFactor = 1.6; | ||
| private double jitterFactor = 0.1; | ||
| private Random random = new Random(); | ||
|
|
||
| public IExponentialBackoffBuilder MaxDelay(TimeSpan maxDelay) | ||
| { | ||
| this.maxDelay = maxDelay; | ||
| return this; | ||
| } | ||
|
|
||
| public IExponentialBackoffBuilder MinDelay(TimeSpan minDelay) | ||
| { | ||
| this.minDelay = minDelay; | ||
| return this; | ||
| } | ||
|
|
||
| public IExponentialBackoffBuilder BackoffFactor(double backoffFactor) | ||
| { | ||
| this.backoffFactor = backoffFactor; | ||
| return this; | ||
| } | ||
|
|
||
| public IExponentialBackoffBuilder JitterFactor(double jitterFactor) | ||
| { | ||
| this.jitterFactor = jitterFactor; | ||
| return this; | ||
| } | ||
|
|
||
| public IExponentialBackoffBuilder Random(Random random) | ||
| { | ||
| this.random = random ?? new Random(); | ||
| return this; | ||
| } | ||
|
|
||
| public IBackoffSupplier Build() | ||
| { | ||
| return new ExponentialBackoffSupplier(minDelay, maxDelay, backoffFactor, jitterFactor, random); | ||
| } | ||
| } | ||
|
|
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,65 @@ | ||
| using System; | ||
| using Zeebe.Client.Api.Worker; | ||
|
|
||
| namespace Zeebe.Client.Impl.Worker; | ||
|
|
||
| /// <summary> | ||
| /// An implementation of <see cref="IBackoffSupplier"/> which uses the **Exponential Backoff with Jitter** strategy | ||
| /// for calculating retry delays. | ||
| /// <para> | ||
| /// The implementation uses a simple formula: it multiplies the previous delay with a backoff multiplier | ||
| /// and adds some jitter to avoid multiple clients polling at the same time. | ||
| /// </para> | ||
| /// </summary> | ||
| /// <remarks> | ||
| /// The core logic is copied from the Zeebe Java client's | ||
| /// <c>io.camunda.zeebe.client.impl.worker.ExponentialBackoff</c> implementation | ||
| /// (source: <a href="https://github.com/camunda/camunda/blob/5764a0a3e6c3d3253c5c9608bf4478f8e2281af7/clients/java-deprecated/src/main/java/io/camunda/zeebe/client/impl/worker/ExponentialBackoff.java#L31">GitHub</a>). | ||
| /// <para> | ||
| /// The next delay is calculated by clamping the multiplied delay between <c>minDelay</c> and <c>maxDelay</c>, | ||
| /// and then adding a random jitter: | ||
| /// </para> | ||
| /// <c>max(min(maxDelay, currentDelay * backoffFactor), minDelay) + jitter</c> | ||
| /// <para>The final result is ensured to be non-negative and rounded.</para> | ||
| /// </remarks> | ||
| internal sealed class ExponentialBackoffSupplier : IBackoffSupplier | ||
| { | ||
| private readonly double maxDelayMs; | ||
| private readonly double minDelayMs; | ||
| private readonly double backoffFactor; | ||
| private readonly double jitterFactor; | ||
| private readonly Random random; | ||
|
|
||
| internal ExponentialBackoffSupplier( | ||
| TimeSpan minDelay, | ||
| TimeSpan maxDelay, | ||
| double backoffFactor, | ||
| double jitterFactor, | ||
| Random random) | ||
| { | ||
| maxDelayMs = maxDelay.TotalMilliseconds; | ||
| minDelayMs = minDelay.TotalMilliseconds; | ||
| this.backoffFactor = backoffFactor; | ||
| this.jitterFactor = jitterFactor; | ||
| this.random = random ?? new Random(); | ||
| } | ||
|
|
||
| public long SupplyRetryDelay(long currentRetryDelay) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🔧 Could make a note that this implementation is based on the Java client version
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added summary and remarks to the ExponentialBackoffSupplier.cs |
||
| { | ||
| var previous = (double)currentRetryDelay; | ||
| var multiplied = previous * backoffFactor; | ||
| var clamped = Math.Max(Math.Min(maxDelayMs, multiplied), minDelayMs); | ||
|
|
||
| var range = clamped * jitterFactor; | ||
| var jitter = (random.NextDouble() * (2 * range)) - range; | ||
|
|
||
| var next = clamped + jitter; | ||
| if (next < 0) | ||
| { | ||
| next = 0; | ||
| } | ||
|
|
||
| return (long)Math.Round(next); | ||
| } | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -43,7 +43,10 @@ public sealed class JobWorker : IJobWorker | |
| private readonly JobWorkerBuilder jobWorkerBuilder; | ||
| private readonly ILogger<JobWorker> logger; | ||
| private readonly int maxJobsActive; | ||
| private readonly TimeSpan pollInterval; | ||
| private TimeSpan pollInterval; | ||
| private readonly TimeSpan initialPollInterval; | ||
| private readonly IBackoffSupplier backoffSupplier; | ||
| private readonly IBackoffSupplier defaultBackoffSupplier; | ||
|
|
||
| private readonly CancellationTokenSource source; | ||
| private readonly double thresholdJobsActivation; | ||
|
|
@@ -59,11 +62,14 @@ internal JobWorker(JobWorkerBuilder builder) | |
| jobHandler = jobWorkerBuilder.Handler(); | ||
| autoCompletion = builder.AutoCompletionEnabled(); | ||
| pollInterval = jobWorkerBuilder.PollInterval(); | ||
| initialPollInterval = pollInterval; | ||
| activateJobsRequest = jobWorkerBuilder.Request; | ||
| streamActivateJobsRequest = jobWorkerBuilder.StreamRequest; | ||
| jobActivator = jobWorkerBuilder.Activator; | ||
| maxJobsActive = jobWorkerBuilder.Request.MaxJobsToActivate; | ||
| thresholdJobsActivation = maxJobsActive * 0.6; | ||
| defaultBackoffSupplier = new ExponentialBackoffBuilderImpl().Build(); | ||
| backoffSupplier = jobWorkerBuilder.RetryBackoffSupplier ?? defaultBackoffSupplier; | ||
| } | ||
|
|
||
| /// <inheritdoc /> | ||
|
|
@@ -184,7 +190,7 @@ private async Task StreamJobs(ITargetBlock<IJob> input, CancellationToken cancel | |
| catch (RpcException rpcException) | ||
| { | ||
| LogRpcException(rpcException); | ||
| await Task.Delay(StreamErrorRetryDelayMs, cancellationToken); | ||
| await Backoff(cancellationToken); | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -224,11 +230,12 @@ await jobActivator.SendActivateRequest(activateJobsRequest, | |
| async jobsResponse => await HandleActivationResponse(input, jobsResponse, jobCount), | ||
| null, | ||
| cancellationToken); | ||
| pollInterval = initialPollInterval; | ||
| } | ||
| catch (RpcException rpcException) | ||
| { | ||
| LogRpcException(rpcException); | ||
| await Task.Delay(pollInterval, cancellationToken); | ||
| await Backoff(cancellationToken); | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I changed in my commit to backoff now every RpcException in Stream and Poll Jobs |
||
| } | ||
| } | ||
| else | ||
|
|
@@ -238,6 +245,28 @@ await jobActivator.SendActivateRequest(activateJobsRequest, | |
| } | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Updates pollInterval using the configured backoff supplier and waits for that delay. | ||
| /// Falls back to a default exponential supplier if the custom supplier throws. | ||
| /// </summary> | ||
| private async Task Backoff(CancellationToken cancellationToken) | ||
| { | ||
| var previousMs = Math.Max(0, (long)pollInterval.TotalMilliseconds); | ||
| long nextMs; | ||
| try | ||
| { | ||
| nextMs = backoffSupplier.SupplyRetryDelay(previousMs); | ||
| } | ||
| catch (Exception ex) | ||
| { | ||
| logger?.LogWarning(ex, "Backoff supplier failed; falling back to default backoff."); | ||
| nextMs = defaultBackoffSupplier.SupplyRetryDelay(previousMs); | ||
| } | ||
|
|
||
| pollInterval = TimeSpan.FromMilliseconds(Math.Max(0, nextMs)); | ||
| await Task.Delay(pollInterval, cancellationToken); | ||
| } | ||
|
|
||
| private async Task HandleActivationResponse(ITargetBlock<IJob> input, IActivateJobsResponse response, int? jobCount) | ||
| { | ||
| if (jobCount.HasValue) | ||
|
|
@@ -332,4 +361,4 @@ private Task FailActivatedJob(JobClientWrapper jobClient, IJob activatedJob, Can | |
| } | ||
| }, cancellationToken); | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍🏼