diff --git a/Client.UnitTests/ExponentialBackoffTests.cs b/Client.UnitTests/ExponentialBackoffTests.cs new file mode 100644 index 00000000..1e1c35c2 --- /dev/null +++ b/Client.UnitTests/ExponentialBackoffTests.cs @@ -0,0 +1,87 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using NUnit.Framework; +using Zeebe.Client.Api.Worker; +using Zeebe.Client.Impl.Worker; + +namespace Zeebe.Client; + +[TestFixture] +public sealed class ExponentialBackoffTests +{ + [Test] + public void ShouldReturnDelayWithinBounds_WhenNoJitter() + { + // given + var maxDelay = TimeSpan.FromMilliseconds(1_000); + var minDelay = TimeSpan.FromMilliseconds(50); + IBackoffSupplier supplier = new ExponentialBackoffBuilderImpl() + .MaxDelay(maxDelay) + .MinDelay(minDelay) + .BackoffFactor(1.6) + .JitterFactor(0) + .Build(); + + var delays = new List(); + long current = supplier.SupplyRetryDelay(0); + + // when + for (var i = 0; i < 100; i++) + { + delays.Add(current); + current = supplier.SupplyRetryDelay(current); + } + + // then - with zero jitter, sequence should monotonically increase until it caps at max + Assert.That(delays, Is.Not.Empty); + Assert.That(delays.First(), Is.EqualTo((long)minDelay.TotalMilliseconds)); + Assert.That(delays.Last(), Is.EqualTo((long)maxDelay.TotalMilliseconds)); + + long previous = -1; + foreach (var delay in delays) + { + if (delay != (long)maxDelay.TotalMilliseconds) + { + Assert.That(delay, Is.GreaterThan(previous)); + } + previous = delay; + } + } + + [Test] + public void ShouldBeRandomizedWithJitter() + { + // given + var maxDelay = TimeSpan.FromMilliseconds(1_000); + var minDelay = TimeSpan.FromMilliseconds(50); + const double jitterFactor = 0.2; + + IBackoffSupplier supplier = new ExponentialBackoffBuilderImpl() + .MaxDelay(maxDelay) + .MinDelay(minDelay) + .BackoffFactor(1.5) + .JitterFactor(jitterFactor) + .Build(); + + var lowerMaxBound = (long)Math.Round(maxDelay.TotalMilliseconds + (maxDelay.TotalMilliseconds * -jitterFactor)); + var upperMaxBound = (long)Math.Round(maxDelay.TotalMilliseconds + (maxDelay.TotalMilliseconds * jitterFactor)); + + // when - always compute from max to test jitter around the cap + var delays = new List(); + for (var i = 0; i < 10; i++) + { + var value = supplier.SupplyRetryDelay((long)maxDelay.TotalMilliseconds); + delays.Add(value); + } + + // then + Assert.That(delays, Is.Not.Empty); + foreach (var delay in delays) + { + Assert.That(delay, Is.InRange(lowerMaxBound, upperMaxBound)); + } + Assert.That(delays.Distinct().Count(), Is.GreaterThan(1)); + } +} + diff --git a/Client/Api/Worker/IBackoffSupplier.cs b/Client/Api/Worker/IBackoffSupplier.cs new file mode 100644 index 00000000..7b1f5263 --- /dev/null +++ b/Client/Api/Worker/IBackoffSupplier.cs @@ -0,0 +1,17 @@ +using System; + +namespace Zeebe.Client.Api.Worker; + +/// +/// Supplies the delay for the next retry after a failed activate request. +/// Value is in milliseconds; return value may be zero to retry immediately. +/// +public interface IBackoffSupplier +{ + /// + /// Returns the delay before the next retry. + /// + /// The previously used delay in milliseconds. + /// The new retry delay in milliseconds. + long SupplyRetryDelay(long currentRetryDelay); +} diff --git a/Client/Api/Worker/IExponentialBackoffBuilder.cs b/Client/Api/Worker/IExponentialBackoffBuilder.cs new file mode 100644 index 00000000..ec4c96eb --- /dev/null +++ b/Client/Api/Worker/IExponentialBackoffBuilder.cs @@ -0,0 +1,36 @@ +using System; + +namespace Zeebe.Client.Api.Worker; + +public interface IExponentialBackoffBuilder +{ + /// + /// Sets the maximum retry delay. Default is 5000ms. + /// + IExponentialBackoffBuilder MaxDelay(TimeSpan maxDelay); + + /// + /// Sets the minimum retry delay. Default is 50ms. + /// + IExponentialBackoffBuilder MinDelay(TimeSpan minDelay); + + /// + /// Sets the multiplication factor (previous delay * factor). Default is 1.6. + /// + IExponentialBackoffBuilder BackoffFactor(double backoffFactor); + + /// + /// Sets optional jitter factor (+/- factor of delay). Default is 0.1. + /// + IExponentialBackoffBuilder JitterFactor(double jitterFactor); + + /// + /// Sets the random source used to compute jitter. Default is new Random(). + /// + IExponentialBackoffBuilder Random(Random random); + + /// + /// Builds the supplier with the provided configuration. + /// + IBackoffSupplier Build(); +} diff --git a/Client/Api/Worker/IJobWorkerBuilderStep1.cs b/Client/Api/Worker/IJobWorkerBuilderStep1.cs index 61607c68..ab1b8712 100644 --- a/Client/Api/Worker/IJobWorkerBuilderStep1.cs +++ b/Client/Api/Worker/IJobWorkerBuilderStep1.cs @@ -237,6 +237,15 @@ public interface IJobWorkerBuilderStep3 : ITenantIdsCommandStepthe builder for this worker. IJobWorkerBuilderStep3 HandlerThreads(byte threadCount); + /// + /// Configures a retry backoff supplier used when activate requests fail with + /// gRPC StatusCode.ResourceExhausted. Only applied to polling retries and reset on success. + /// Defaults to an exponential backoff (max 5000ms, min 50ms, factor 1.6, jitter 0.1) if not set. + /// + /// The supplier used to compute the next retry delay in ms. + /// The builder for this worker. + IJobWorkerBuilderStep3 BackoffSupplier(IBackoffSupplier backoffSupplier); + /// /// Enable or disable gRPC job streaming for this worker. /// @@ -249,4 +258,4 @@ public interface IJobWorkerBuilderStep3 : ITenantIdsCommandStep /// the worker. IJobWorker Open(); -} +} \ No newline at end of file diff --git a/Client/Impl/Worker/ExponentialBackoffBuilderImpl.cs b/Client/Impl/Worker/ExponentialBackoffBuilderImpl.cs new file mode 100644 index 00000000..2530dea5 --- /dev/null +++ b/Client/Impl/Worker/ExponentialBackoffBuilderImpl.cs @@ -0,0 +1,51 @@ +using System; +using Zeebe.Client.Api.Worker; + +namespace Zeebe.Client.Impl.Worker; + +internal sealed class ExponentialBackoffBuilderImpl : IExponentialBackoffBuilder +{ + private TimeSpan maxDelay = TimeSpan.FromMilliseconds(5000); + private TimeSpan minDelay = TimeSpan.FromMilliseconds(50); + private double backoffFactor = 1.6; + private double jitterFactor = 0.1; + private Random random = new Random(); + + public IExponentialBackoffBuilder MaxDelay(TimeSpan maxDelay) + { + this.maxDelay = maxDelay; + return this; + } + + public IExponentialBackoffBuilder MinDelay(TimeSpan minDelay) + { + this.minDelay = minDelay; + return this; + } + + public IExponentialBackoffBuilder BackoffFactor(double backoffFactor) + { + this.backoffFactor = backoffFactor; + return this; + } + + public IExponentialBackoffBuilder JitterFactor(double jitterFactor) + { + this.jitterFactor = jitterFactor; + return this; + } + + public IExponentialBackoffBuilder Random(Random random) + { + this.random = random ?? new Random(); + return this; + } + + public IBackoffSupplier Build() + { + if (minDelay > maxDelay) + throw new ArgumentException($"MinDelay ({minDelay}) cannot be greater than MaxDelay ({maxDelay})"); + return new ExponentialBackoffSupplier(minDelay, maxDelay, backoffFactor, jitterFactor, random); + } +} + diff --git a/Client/Impl/Worker/ExponentialBackoffSupplier.cs b/Client/Impl/Worker/ExponentialBackoffSupplier.cs new file mode 100644 index 00000000..f14896e1 --- /dev/null +++ b/Client/Impl/Worker/ExponentialBackoffSupplier.cs @@ -0,0 +1,65 @@ +using System; +using Zeebe.Client.Api.Worker; + +namespace Zeebe.Client.Impl.Worker; + +/// +/// An implementation of which uses the **Exponential Backoff with Jitter** strategy +/// for calculating retry delays. +/// +/// The implementation uses a simple formula: it multiplies the previous delay with a backoff multiplier +/// and adds some jitter to avoid multiple clients polling at the same time. +/// +/// +/// +/// The core logic is copied from the Zeebe Java client's +/// io.camunda.zeebe.client.impl.worker.ExponentialBackoff implementation +/// (source: GitHub). +/// +/// The next delay is calculated by clamping the multiplied delay between minDelay and maxDelay, +/// and then adding a random jitter: +/// +/// max(min(maxDelay, currentDelay * backoffFactor), minDelay) + jitter +/// The final result is ensured to be non-negative and rounded. +/// +internal sealed class ExponentialBackoffSupplier : IBackoffSupplier +{ + private readonly double maxDelayMs; + private readonly double minDelayMs; + private readonly double backoffFactor; + private readonly double jitterFactor; + private readonly Random random; + + internal ExponentialBackoffSupplier( + TimeSpan minDelay, + TimeSpan maxDelay, + double backoffFactor, + double jitterFactor, + Random random) + { + maxDelayMs = maxDelay.TotalMilliseconds; + minDelayMs = minDelay.TotalMilliseconds; + this.backoffFactor = backoffFactor; + this.jitterFactor = jitterFactor; + this.random = random ?? new Random(); + } + + public long SupplyRetryDelay(long currentRetryDelay) + { + var previous = (double)currentRetryDelay; + var multiplied = previous * backoffFactor; + var clamped = Math.Max(Math.Min(maxDelayMs, multiplied), minDelayMs); + + var range = clamped * jitterFactor; + var jitter = (random.NextDouble() * (2 * range)) - range; + + var next = clamped + jitter; + if (next < 0) + { + next = 0; + } + + return (long)Math.Round(next); + } +} + diff --git a/Client/Impl/Worker/JobWorker.cs b/Client/Impl/Worker/JobWorker.cs index ce5acf0f..4a6e65e2 100644 --- a/Client/Impl/Worker/JobWorker.cs +++ b/Client/Impl/Worker/JobWorker.cs @@ -43,7 +43,10 @@ public sealed class JobWorker : IJobWorker private readonly JobWorkerBuilder jobWorkerBuilder; private readonly ILogger logger; private readonly int maxJobsActive; - private readonly TimeSpan pollInterval; + private TimeSpan pollInterval; + private readonly TimeSpan initialPollInterval; + private readonly IBackoffSupplier backoffSupplier; + private readonly IBackoffSupplier defaultBackoffSupplier; private readonly CancellationTokenSource source; private readonly double thresholdJobsActivation; @@ -59,11 +62,14 @@ internal JobWorker(JobWorkerBuilder builder) jobHandler = jobWorkerBuilder.Handler(); autoCompletion = builder.AutoCompletionEnabled(); pollInterval = jobWorkerBuilder.PollInterval(); + initialPollInterval = pollInterval; activateJobsRequest = jobWorkerBuilder.Request; streamActivateJobsRequest = jobWorkerBuilder.StreamRequest; jobActivator = jobWorkerBuilder.Activator; maxJobsActive = jobWorkerBuilder.Request.MaxJobsToActivate; thresholdJobsActivation = maxJobsActive * 0.6; + defaultBackoffSupplier = new ExponentialBackoffBuilderImpl().Build(); + backoffSupplier = jobWorkerBuilder.RetryBackoffSupplier ?? defaultBackoffSupplier; } /// @@ -184,7 +190,7 @@ private async Task StreamJobs(ITargetBlock input, CancellationToken cancel catch (RpcException rpcException) { LogRpcException(rpcException); - await Task.Delay(StreamErrorRetryDelayMs, cancellationToken); + await Backoff(cancellationToken); } } } @@ -224,11 +230,12 @@ await jobActivator.SendActivateRequest(activateJobsRequest, async jobsResponse => await HandleActivationResponse(input, jobsResponse, jobCount), null, cancellationToken); + pollInterval = initialPollInterval; } catch (RpcException rpcException) { LogRpcException(rpcException); - await Task.Delay(pollInterval, cancellationToken); + await Backoff(cancellationToken); } } else @@ -238,6 +245,28 @@ await jobActivator.SendActivateRequest(activateJobsRequest, } } + /// + /// Updates pollInterval using the configured backoff supplier and waits for that delay. + /// Falls back to a default exponential supplier if the custom supplier throws. + /// + private async Task Backoff(CancellationToken cancellationToken) + { + var previousMs = Math.Max(0, (long)pollInterval.TotalMilliseconds); + long nextMs; + try + { + nextMs = backoffSupplier.SupplyRetryDelay(previousMs); + } + catch (Exception ex) + { + logger?.LogWarning(ex, "Backoff supplier failed; falling back to default backoff."); + nextMs = defaultBackoffSupplier.SupplyRetryDelay(previousMs); + } + + pollInterval = TimeSpan.FromMilliseconds(Math.Max(0, nextMs)); + await Task.Delay(pollInterval, cancellationToken); + } + private async Task HandleActivationResponse(ITargetBlock input, IActivateJobsResponse response, int? jobCount) { if (jobCount.HasValue) @@ -332,4 +361,4 @@ private Task FailActivatedJob(JobClientWrapper jobClient, IJob activatedJob, Can } }, cancellationToken); } -} +} \ No newline at end of file diff --git a/Client/Impl/Worker/JobWorkerBuilder.cs b/Client/Impl/Worker/JobWorkerBuilder.cs index 831a4646..06c0b6e3 100644 --- a/Client/Impl/Worker/JobWorkerBuilder.cs +++ b/Client/Impl/Worker/JobWorkerBuilder.cs @@ -21,6 +21,7 @@ using Microsoft.Extensions.Logging; using Zeebe.Client.Api.Worker; using Zeebe.Client.Impl.Commands; +using Zeebe.Client.Impl.Worker; namespace Zeebe.Client.Impl.Worker; @@ -40,6 +41,7 @@ public class JobWorkerBuilder( internal byte ThreadCount { get; set; } = 1; internal ILoggerFactory LoggerFactory { get; } = loggerFactory; internal IJobClient JobClient { get; } = zeebeClient; + internal IBackoffSupplier RetryBackoffSupplier { get; private set; } public IJobWorkerBuilderStep2 JobType(string type) { @@ -147,6 +149,12 @@ public IJobWorkerBuilderStep3 HandlerThreads(byte threadCount) return this; } + public IJobWorkerBuilderStep3 BackoffSupplier(IBackoffSupplier backoffSupplier) + { + RetryBackoffSupplier = backoffSupplier; + return this; + } + public IJobWorker Open() { var worker = new JobWorker(this);