From ed9aa7c22670189f9fbaa4134f8a0215ae9a29d1 Mon Sep 17 00:00:00 2001 From: Lennart Kleymann Date: Tue, 23 Sep 2025 14:27:12 +0200 Subject: [PATCH 1/8] feat: add backoff abstractions and builder APIs --- Client/Api/Worker/IBackoffSupplier.cs | 17 +++++++++ .../Api/Worker/IExponentialBackoffBuilder.cs | 36 +++++++++++++++++++ Client/Api/Worker/IJobWorkerBuilderStep1.cs | 9 +++++ 3 files changed, 62 insertions(+) create mode 100644 Client/Api/Worker/IBackoffSupplier.cs create mode 100644 Client/Api/Worker/IExponentialBackoffBuilder.cs diff --git a/Client/Api/Worker/IBackoffSupplier.cs b/Client/Api/Worker/IBackoffSupplier.cs new file mode 100644 index 00000000..7b1f5263 --- /dev/null +++ b/Client/Api/Worker/IBackoffSupplier.cs @@ -0,0 +1,17 @@ +using System; + +namespace Zeebe.Client.Api.Worker; + +/// +/// Supplies the delay for the next retry after a failed activate request. +/// Value is in milliseconds; return value may be zero to retry immediately. +/// +public interface IBackoffSupplier +{ + /// + /// Returns the delay before the next retry. + /// + /// The previously used delay in milliseconds. + /// The new retry delay in milliseconds. + long SupplyRetryDelay(long currentRetryDelay); +} diff --git a/Client/Api/Worker/IExponentialBackoffBuilder.cs b/Client/Api/Worker/IExponentialBackoffBuilder.cs new file mode 100644 index 00000000..ec4c96eb --- /dev/null +++ b/Client/Api/Worker/IExponentialBackoffBuilder.cs @@ -0,0 +1,36 @@ +using System; + +namespace Zeebe.Client.Api.Worker; + +public interface IExponentialBackoffBuilder +{ + /// + /// Sets the maximum retry delay. Default is 5000ms. + /// + IExponentialBackoffBuilder MaxDelay(TimeSpan maxDelay); + + /// + /// Sets the minimum retry delay. Default is 50ms. + /// + IExponentialBackoffBuilder MinDelay(TimeSpan minDelay); + + /// + /// Sets the multiplication factor (previous delay * factor). Default is 1.6. + /// + IExponentialBackoffBuilder BackoffFactor(double backoffFactor); + + /// + /// Sets optional jitter factor (+/- factor of delay). Default is 0.1. + /// + IExponentialBackoffBuilder JitterFactor(double jitterFactor); + + /// + /// Sets the random source used to compute jitter. Default is new Random(). + /// + IExponentialBackoffBuilder Random(Random random); + + /// + /// Builds the supplier with the provided configuration. + /// + IBackoffSupplier Build(); +} diff --git a/Client/Api/Worker/IJobWorkerBuilderStep1.cs b/Client/Api/Worker/IJobWorkerBuilderStep1.cs index 44f364ae..8c7e1c69 100644 --- a/Client/Api/Worker/IJobWorkerBuilderStep1.cs +++ b/Client/Api/Worker/IJobWorkerBuilderStep1.cs @@ -230,6 +230,15 @@ public interface IJobWorkerBuilderStep3 : ITenantIdsCommandStepthe builder for this worker. IJobWorkerBuilderStep3 HandlerThreads(byte threadCount); + /// + /// Configures a retry backoff supplier used when activate requests fail with + /// gRPC StatusCode.ResourceExhausted. Only applied to polling retries and reset on success. + /// Defaults to an exponential backoff (max 5000ms, min 50ms, factor 1.6, jitter 0.1) if not set. + /// + /// The supplier used to compute the next retry delay in ms. + /// The builder for this worker. + IJobWorkerBuilderStep3 BackoffSupplier(IBackoffSupplier backoffSupplier); + /// /// Open the worker and start to work on available tasks. /// From 319736c2616f3b68a702b8d62c027d8f51846876 Mon Sep 17 00:00:00 2001 From: Lennart Kleymann Date: Tue, 23 Sep 2025 14:27:47 +0200 Subject: [PATCH 2/8] feat: implement exponential backoff supplier and builder --- .../Worker/ExponentialBackoffBuilderImpl.cs | 49 +++++++++++++++++++ .../Impl/Worker/ExponentialBackoffSupplier.cs | 46 +++++++++++++++++ 2 files changed, 95 insertions(+) create mode 100644 Client/Impl/Worker/ExponentialBackoffBuilderImpl.cs create mode 100644 Client/Impl/Worker/ExponentialBackoffSupplier.cs diff --git a/Client/Impl/Worker/ExponentialBackoffBuilderImpl.cs b/Client/Impl/Worker/ExponentialBackoffBuilderImpl.cs new file mode 100644 index 00000000..e05c8764 --- /dev/null +++ b/Client/Impl/Worker/ExponentialBackoffBuilderImpl.cs @@ -0,0 +1,49 @@ +using System; +using Zeebe.Client.Api.Worker; + +namespace Zeebe.Client.Impl.Worker; + +internal sealed class ExponentialBackoffBuilderImpl : IExponentialBackoffBuilder +{ + private TimeSpan maxDelay = TimeSpan.FromMilliseconds(5000); + private TimeSpan minDelay = TimeSpan.FromMilliseconds(50); + private double backoffFactor = 1.6; + private double jitterFactor = 0.1; + private Random random = new Random(); + + public IExponentialBackoffBuilder MaxDelay(TimeSpan maxDelay) + { + this.maxDelay = maxDelay; + return this; + } + + public IExponentialBackoffBuilder MinDelay(TimeSpan minDelay) + { + this.minDelay = minDelay; + return this; + } + + public IExponentialBackoffBuilder BackoffFactor(double backoffFactor) + { + this.backoffFactor = backoffFactor; + return this; + } + + public IExponentialBackoffBuilder JitterFactor(double jitterFactor) + { + this.jitterFactor = jitterFactor; + return this; + } + + public IExponentialBackoffBuilder Random(Random random) + { + this.random = random ?? new Random(); + return this; + } + + public IBackoffSupplier Build() + { + return new ExponentialBackoffSupplier(minDelay, maxDelay, backoffFactor, jitterFactor, random); + } +} + diff --git a/Client/Impl/Worker/ExponentialBackoffSupplier.cs b/Client/Impl/Worker/ExponentialBackoffSupplier.cs new file mode 100644 index 00000000..12267eb1 --- /dev/null +++ b/Client/Impl/Worker/ExponentialBackoffSupplier.cs @@ -0,0 +1,46 @@ +using System; +using Zeebe.Client.Api.Worker; + +namespace Zeebe.Client.Impl.Worker; + +internal sealed class ExponentialBackoffSupplier : IBackoffSupplier +{ + private readonly double maxDelayMs; + private readonly double minDelayMs; + private readonly double backoffFactor; + private readonly double jitterFactor; + private readonly Random random; + + internal ExponentialBackoffSupplier( + TimeSpan minDelay, + TimeSpan maxDelay, + double backoffFactor, + double jitterFactor, + Random random) + { + maxDelayMs = maxDelay.TotalMilliseconds; + minDelayMs = minDelay.TotalMilliseconds; + this.backoffFactor = backoffFactor; + this.jitterFactor = jitterFactor; + this.random = random ?? new Random(); + } + + public long SupplyRetryDelay(long currentRetryDelay) + { + var previous = (double)currentRetryDelay; + var multiplied = previous * backoffFactor; + var clamped = Math.Max(Math.Min(maxDelayMs, multiplied), minDelayMs); + + var range = clamped * jitterFactor; + var jitter = (random.NextDouble() * (2 * range)) - range; + + var next = clamped + jitter; + if (next < 0) + { + next = 0; + } + + return (long)Math.Round(next); + } +} + From 3d69643bc9b5d36f858dcd0ac8c61247c93e81d9 Mon Sep 17 00:00:00 2001 From: Lennart Kleymann Date: Tue, 23 Sep 2025 14:28:41 +0200 Subject: [PATCH 3/8] feat: wire backoff configuration into builder --- Client/Impl/Worker/JobWorkerBuilder.cs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/Client/Impl/Worker/JobWorkerBuilder.cs b/Client/Impl/Worker/JobWorkerBuilder.cs index b1d90a21..568ade03 100644 --- a/Client/Impl/Worker/JobWorkerBuilder.cs +++ b/Client/Impl/Worker/JobWorkerBuilder.cs @@ -21,6 +21,7 @@ using Microsoft.Extensions.Logging; using Zeebe.Client.Api.Worker; using Zeebe.Client.Impl.Commands; +using Zeebe.Client.Impl.Worker; namespace Zeebe.Client.Impl.Worker; @@ -38,6 +39,7 @@ public class JobWorkerBuilder( internal byte ThreadCount { get; set; } = 1; internal ILoggerFactory LoggerFactory { get; } = loggerFactory; internal IJobClient JobClient { get; } = zeebeClient; + internal IBackoffSupplier RetryBackoffSupplier { get; private set; } public IJobWorkerBuilderStep2 JobType(string type) { @@ -128,6 +130,12 @@ public IJobWorkerBuilderStep3 HandlerThreads(byte threadCount) return this; } + public IJobWorkerBuilderStep3 BackoffSupplier(IBackoffSupplier backoffSupplier) + { + RetryBackoffSupplier = backoffSupplier; + return this; + } + public IJobWorker Open() { var worker = new JobWorker(this); @@ -151,4 +159,4 @@ internal bool AutoCompletionEnabled() { return autoCompletion; } -} \ No newline at end of file +} From 4725aefb8960351d541d8b3528b4a4e741dc23d6 Mon Sep 17 00:00:00 2001 From: Lennart Kleymann Date: Tue, 23 Sep 2025 14:29:27 +0200 Subject: [PATCH 4/8] feat: apply backoff on RESOURCE_EXHAUSTED and reset on success --- Client/Impl/Worker/JobWorker.cs | 42 ++++++++++++++++++++++++++++++--- 1 file changed, 39 insertions(+), 3 deletions(-) diff --git a/Client/Impl/Worker/JobWorker.cs b/Client/Impl/Worker/JobWorker.cs index 3d7a3ad3..9d9394a9 100644 --- a/Client/Impl/Worker/JobWorker.cs +++ b/Client/Impl/Worker/JobWorker.cs @@ -38,7 +38,9 @@ public sealed class JobWorker : IJobWorker private readonly JobWorkerBuilder jobWorkerBuilder; private readonly ILogger logger; private readonly int maxJobsActive; - private readonly TimeSpan pollInterval; + private TimeSpan pollInterval; + private readonly TimeSpan initialPollInterval; + private readonly IBackoffSupplier backoffSupplier; private readonly CancellationTokenSource source; private readonly double thresholdJobsActivation; @@ -54,10 +56,13 @@ internal JobWorker(JobWorkerBuilder builder) jobHandler = jobWorkerBuilder.Handler(); autoCompletion = builder.AutoCompletionEnabled(); pollInterval = jobWorkerBuilder.PollInterval(); + initialPollInterval = pollInterval; activateJobsRequest = jobWorkerBuilder.Request; jobActivator = jobWorkerBuilder.Activator; maxJobsActive = jobWorkerBuilder.Request.MaxJobsToActivate; thresholdJobsActivation = maxJobsActive * 0.6; + backoffSupplier = jobWorkerBuilder.RetryBackoffSupplier ?? + new ExponentialBackoffBuilderImpl().Build(); } /// @@ -154,11 +159,19 @@ await jobActivator.SendActivateRequest(activateJobsRequest, async jobsResponse => await HandleActivationResponse(input, jobsResponse, jobCount), null, cancellationToken); + pollInterval = initialPollInterval; } catch (RpcException rpcException) { LogRpcException(rpcException); - await Task.Delay(pollInterval, cancellationToken); + if (rpcException.StatusCode == StatusCode.ResourceExhausted) + { + await Backoff(cancellationToken); + } + else + { + await Task.Delay(pollInterval, cancellationToken); + } } } else @@ -168,6 +181,29 @@ await jobActivator.SendActivateRequest(activateJobsRequest, } } + /// + /// Updates pollInterval using the configured backoff supplier and waits for that delay. + /// Falls back to a default exponential supplier if the custom supplier throws. + /// + private async Task Backoff(CancellationToken cancellationToken) + { + var previousMs = Math.Max(0, (long)pollInterval.TotalMilliseconds); + long nextMs; + try + { + nextMs = backoffSupplier.SupplyRetryDelay(previousMs); + } + catch (Exception ex) + { + logger?.LogWarning(ex, "Backoff supplier failed; falling back to default backoff."); + var defaultSupplier = new ExponentialBackoffBuilderImpl().Build(); + nextMs = defaultSupplier.SupplyRetryDelay(previousMs); + } + + pollInterval = TimeSpan.FromMilliseconds(Math.Max(0, nextMs)); + await Task.Delay(pollInterval, cancellationToken); + } + private async Task HandleActivationResponse(ITargetBlock input, IActivateJobsResponse response, int jobCount) { logger?.LogDebug( @@ -252,4 +288,4 @@ private Task FailActivatedJob(JobClientWrapper jobClient, IJob activatedJob, Can } }, cancellationToken); } -} \ No newline at end of file +} From ce526411d37247ba4dc3c6d2d7e41443ae5e03c2 Mon Sep 17 00:00:00 2001 From: Lennart Kleymann Date: Tue, 23 Sep 2025 14:41:04 +0200 Subject: [PATCH 5/8] test: add tests for exponential backoff --- Client.UnitTests/ExponentialBackoffTests.cs | 87 +++++++++++++++++++++ 1 file changed, 87 insertions(+) create mode 100644 Client.UnitTests/ExponentialBackoffTests.cs diff --git a/Client.UnitTests/ExponentialBackoffTests.cs b/Client.UnitTests/ExponentialBackoffTests.cs new file mode 100644 index 00000000..1e1c35c2 --- /dev/null +++ b/Client.UnitTests/ExponentialBackoffTests.cs @@ -0,0 +1,87 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using NUnit.Framework; +using Zeebe.Client.Api.Worker; +using Zeebe.Client.Impl.Worker; + +namespace Zeebe.Client; + +[TestFixture] +public sealed class ExponentialBackoffTests +{ + [Test] + public void ShouldReturnDelayWithinBounds_WhenNoJitter() + { + // given + var maxDelay = TimeSpan.FromMilliseconds(1_000); + var minDelay = TimeSpan.FromMilliseconds(50); + IBackoffSupplier supplier = new ExponentialBackoffBuilderImpl() + .MaxDelay(maxDelay) + .MinDelay(minDelay) + .BackoffFactor(1.6) + .JitterFactor(0) + .Build(); + + var delays = new List(); + long current = supplier.SupplyRetryDelay(0); + + // when + for (var i = 0; i < 100; i++) + { + delays.Add(current); + current = supplier.SupplyRetryDelay(current); + } + + // then - with zero jitter, sequence should monotonically increase until it caps at max + Assert.That(delays, Is.Not.Empty); + Assert.That(delays.First(), Is.EqualTo((long)minDelay.TotalMilliseconds)); + Assert.That(delays.Last(), Is.EqualTo((long)maxDelay.TotalMilliseconds)); + + long previous = -1; + foreach (var delay in delays) + { + if (delay != (long)maxDelay.TotalMilliseconds) + { + Assert.That(delay, Is.GreaterThan(previous)); + } + previous = delay; + } + } + + [Test] + public void ShouldBeRandomizedWithJitter() + { + // given + var maxDelay = TimeSpan.FromMilliseconds(1_000); + var minDelay = TimeSpan.FromMilliseconds(50); + const double jitterFactor = 0.2; + + IBackoffSupplier supplier = new ExponentialBackoffBuilderImpl() + .MaxDelay(maxDelay) + .MinDelay(minDelay) + .BackoffFactor(1.5) + .JitterFactor(jitterFactor) + .Build(); + + var lowerMaxBound = (long)Math.Round(maxDelay.TotalMilliseconds + (maxDelay.TotalMilliseconds * -jitterFactor)); + var upperMaxBound = (long)Math.Round(maxDelay.TotalMilliseconds + (maxDelay.TotalMilliseconds * jitterFactor)); + + // when - always compute from max to test jitter around the cap + var delays = new List(); + for (var i = 0; i < 10; i++) + { + var value = supplier.SupplyRetryDelay((long)maxDelay.TotalMilliseconds); + delays.Add(value); + } + + // then + Assert.That(delays, Is.Not.Empty); + foreach (var delay in delays) + { + Assert.That(delay, Is.InRange(lowerMaxBound, upperMaxBound)); + } + Assert.That(delays.Distinct().Count(), Is.GreaterThan(1)); + } +} + From 712fb0ac7d2fdfa4bf7ec2ed9d86175a708cc031 Mon Sep 17 00:00:00 2001 From: lkleymann Date: Fri, 28 Nov 2025 12:29:12 +0100 Subject: [PATCH 6/8] docs: add XML documentation for ExponentialBackoffSupplier implementation --- .../Impl/Worker/ExponentialBackoffSupplier.cs | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/Client/Impl/Worker/ExponentialBackoffSupplier.cs b/Client/Impl/Worker/ExponentialBackoffSupplier.cs index 12267eb1..f14896e1 100644 --- a/Client/Impl/Worker/ExponentialBackoffSupplier.cs +++ b/Client/Impl/Worker/ExponentialBackoffSupplier.cs @@ -3,6 +3,25 @@ namespace Zeebe.Client.Impl.Worker; +/// +/// An implementation of which uses the **Exponential Backoff with Jitter** strategy +/// for calculating retry delays. +/// +/// The implementation uses a simple formula: it multiplies the previous delay with a backoff multiplier +/// and adds some jitter to avoid multiple clients polling at the same time. +/// +/// +/// +/// The core logic is copied from the Zeebe Java client's +/// io.camunda.zeebe.client.impl.worker.ExponentialBackoff implementation +/// (source: GitHub). +/// +/// The next delay is calculated by clamping the multiplied delay between minDelay and maxDelay, +/// and then adding a random jitter: +/// +/// max(min(maxDelay, currentDelay * backoffFactor), minDelay) + jitter +/// The final result is ensured to be non-negative and rounded. +/// internal sealed class ExponentialBackoffSupplier : IBackoffSupplier { private readonly double maxDelayMs; From bf155170b46001d38c05d51abc23dbcbddffe5d2 Mon Sep 17 00:00:00 2001 From: lkleymann Date: Fri, 28 Nov 2025 12:30:47 +0100 Subject: [PATCH 7/8] refactor: add default backoff supplier in JobWorker to reuse instances --- Client/Impl/Worker/JobWorker.cs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/Client/Impl/Worker/JobWorker.cs b/Client/Impl/Worker/JobWorker.cs index b059774e..0616db63 100644 --- a/Client/Impl/Worker/JobWorker.cs +++ b/Client/Impl/Worker/JobWorker.cs @@ -46,6 +46,7 @@ public sealed class JobWorker : IJobWorker private TimeSpan pollInterval; private readonly TimeSpan initialPollInterval; private readonly IBackoffSupplier backoffSupplier; + private readonly IBackoffSupplier defaultBackoffSupplier; private readonly CancellationTokenSource source; private readonly double thresholdJobsActivation; @@ -67,8 +68,8 @@ internal JobWorker(JobWorkerBuilder builder) jobActivator = jobWorkerBuilder.Activator; maxJobsActive = jobWorkerBuilder.Request.MaxJobsToActivate; thresholdJobsActivation = maxJobsActive * 0.6; - backoffSupplier = jobWorkerBuilder.RetryBackoffSupplier ?? - new ExponentialBackoffBuilderImpl().Build(); + defaultBackoffSupplier = new ExponentialBackoffBuilderImpl().Build(); + backoffSupplier = jobWorkerBuilder.RetryBackoffSupplier ?? defaultBackoffSupplier; } /// @@ -266,8 +267,7 @@ private async Task Backoff(CancellationToken cancellationToken) catch (Exception ex) { logger?.LogWarning(ex, "Backoff supplier failed; falling back to default backoff."); - var defaultSupplier = new ExponentialBackoffBuilderImpl().Build(); - nextMs = defaultSupplier.SupplyRetryDelay(previousMs); + nextMs = defaultBackoffSupplier.SupplyRetryDelay(previousMs); } pollInterval = TimeSpan.FromMilliseconds(Math.Max(0, nextMs)); From 15064e28ae832c3224f33ee67933b6c11b2b29bd Mon Sep 17 00:00:00 2001 From: lkleymann Date: Fri, 28 Nov 2025 12:43:13 +0100 Subject: [PATCH 8/8] refactor: unify backoff handling in JobWorker for RPC exceptions --- Client/Impl/Worker/JobWorker.cs | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/Client/Impl/Worker/JobWorker.cs b/Client/Impl/Worker/JobWorker.cs index 0616db63..4a6e65e2 100644 --- a/Client/Impl/Worker/JobWorker.cs +++ b/Client/Impl/Worker/JobWorker.cs @@ -190,7 +190,7 @@ private async Task StreamJobs(ITargetBlock input, CancellationToken cancel catch (RpcException rpcException) { LogRpcException(rpcException); - await Task.Delay(StreamErrorRetryDelayMs, cancellationToken); + await Backoff(cancellationToken); } } } @@ -235,14 +235,7 @@ await jobActivator.SendActivateRequest(activateJobsRequest, catch (RpcException rpcException) { LogRpcException(rpcException); - if (rpcException.StatusCode == StatusCode.ResourceExhausted) - { - await Backoff(cancellationToken); - } - else - { - await Task.Delay(pollInterval, cancellationToken); - } + await Backoff(cancellationToken); } } else