Skip to content
Open
87 changes: 87 additions & 0 deletions Client.UnitTests/ExponentialBackoffTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
using System;
using System.Collections.Generic;
using System.Linq;
using NUnit.Framework;
using Zeebe.Client.Api.Worker;
using Zeebe.Client.Impl.Worker;

namespace Zeebe.Client;

[TestFixture]
public sealed class ExponentialBackoffTests
{
[Test]
public void ShouldReturnDelayWithinBounds_WhenNoJitter()
{
// given
var maxDelay = TimeSpan.FromMilliseconds(1_000);
var minDelay = TimeSpan.FromMilliseconds(50);
IBackoffSupplier supplier = new ExponentialBackoffBuilderImpl()
.MaxDelay(maxDelay)
.MinDelay(minDelay)
.BackoffFactor(1.6)
.JitterFactor(0)
.Build();

var delays = new List<long>();
long current = supplier.SupplyRetryDelay(0);

// when
for (var i = 0; i < 100; i++)
{
delays.Add(current);
current = supplier.SupplyRetryDelay(current);
}

// then - with zero jitter, sequence should monotonically increase until it caps at max
Assert.That(delays, Is.Not.Empty);
Assert.That(delays.First(), Is.EqualTo((long)minDelay.TotalMilliseconds));
Assert.That(delays.Last(), Is.EqualTo((long)maxDelay.TotalMilliseconds));

long previous = -1;
foreach (var delay in delays)
{
if (delay != (long)maxDelay.TotalMilliseconds)
{
Assert.That(delay, Is.GreaterThan(previous));
}
previous = delay;
}
}

[Test]
public void ShouldBeRandomizedWithJitter()
{
// given
var maxDelay = TimeSpan.FromMilliseconds(1_000);
var minDelay = TimeSpan.FromMilliseconds(50);
const double jitterFactor = 0.2;

IBackoffSupplier supplier = new ExponentialBackoffBuilderImpl()
.MaxDelay(maxDelay)
.MinDelay(minDelay)
.BackoffFactor(1.5)
.JitterFactor(jitterFactor)
.Build();

var lowerMaxBound = (long)Math.Round(maxDelay.TotalMilliseconds + (maxDelay.TotalMilliseconds * -jitterFactor));
var upperMaxBound = (long)Math.Round(maxDelay.TotalMilliseconds + (maxDelay.TotalMilliseconds * jitterFactor));

// when - always compute from max to test jitter around the cap
var delays = new List<long>();
for (var i = 0; i < 10; i++)
{
var value = supplier.SupplyRetryDelay((long)maxDelay.TotalMilliseconds);
delays.Add(value);
}

// then
Assert.That(delays, Is.Not.Empty);
foreach (var delay in delays)
{
Assert.That(delay, Is.InRange(lowerMaxBound, upperMaxBound));
}
Assert.That(delays.Distinct().Count(), Is.GreaterThan(1));
}
}

17 changes: 17 additions & 0 deletions Client/Api/Worker/IBackoffSupplier.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
using System;

namespace Zeebe.Client.Api.Worker;

/// <summary>
/// Supplies the delay for the next retry after a failed activate request.
/// Value is in milliseconds; return value may be zero to retry immediately.
/// </summary>
public interface IBackoffSupplier
{
/// <summary>
/// Returns the delay before the next retry.
/// </summary>
/// <param name="currentRetryDelay">The previously used delay in milliseconds.</param>
/// <returns>The new retry delay in milliseconds.</returns>
long SupplyRetryDelay(long currentRetryDelay);
}
36 changes: 36 additions & 0 deletions Client/Api/Worker/IExponentialBackoffBuilder.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
using System;

namespace Zeebe.Client.Api.Worker;

public interface IExponentialBackoffBuilder
{
/// <summary>
/// Sets the maximum retry delay. Default is 5000ms.
/// </summary>
IExponentialBackoffBuilder MaxDelay(TimeSpan maxDelay);

/// <summary>
/// Sets the minimum retry delay. Default is 50ms.
/// </summary>
IExponentialBackoffBuilder MinDelay(TimeSpan minDelay);

/// <summary>
/// Sets the multiplication factor (previous delay * factor). Default is 1.6.
/// </summary>
IExponentialBackoffBuilder BackoffFactor(double backoffFactor);

/// <summary>
/// Sets optional jitter factor (+/- factor of delay). Default is 0.1.
/// </summary>
IExponentialBackoffBuilder JitterFactor(double jitterFactor);

/// <summary>
/// Sets the random source used to compute jitter. Default is new Random().
/// </summary>
IExponentialBackoffBuilder Random(Random random);

/// <summary>
/// Builds the supplier with the provided configuration.
/// </summary>
IBackoffSupplier Build();
}
11 changes: 10 additions & 1 deletion Client/Api/Worker/IJobWorkerBuilderStep1.cs
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,15 @@ public interface IJobWorkerBuilderStep3 : ITenantIdsCommandStep<IJobWorkerBuilde
/// <returns>the builder for this worker.</returns>
IJobWorkerBuilderStep3 HandlerThreads(byte threadCount);

/// <summary>
/// Configures a retry backoff supplier used when activate requests fail with
/// gRPC StatusCode.ResourceExhausted. Only applied to polling retries and reset on success.
/// Defaults to an exponential backoff (max 5000ms, min 50ms, factor 1.6, jitter 0.1) if not set.
/// </summary>
/// <param name="backoffSupplier">The supplier used to compute the next retry delay in ms.</param>
/// <returns>The builder for this worker.</returns>
IJobWorkerBuilderStep3 BackoffSupplier(IBackoffSupplier backoffSupplier);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍🏼


/// <summary>
/// Enable or disable gRPC job streaming for this worker.
/// </summary>
Expand All @@ -249,4 +258,4 @@ public interface IJobWorkerBuilderStep3 : ITenantIdsCommandStep<IJobWorkerBuilde
/// </summary>
/// <returns>the worker.</returns>
IJobWorker Open();
}
}
49 changes: 49 additions & 0 deletions Client/Impl/Worker/ExponentialBackoffBuilderImpl.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
using System;
using Zeebe.Client.Api.Worker;

namespace Zeebe.Client.Impl.Worker;

internal sealed class ExponentialBackoffBuilderImpl : IExponentialBackoffBuilder
{
private TimeSpan maxDelay = TimeSpan.FromMilliseconds(5000);
private TimeSpan minDelay = TimeSpan.FromMilliseconds(50);
private double backoffFactor = 1.6;
private double jitterFactor = 0.1;
private Random random = new Random();

public IExponentialBackoffBuilder MaxDelay(TimeSpan maxDelay)
{
this.maxDelay = maxDelay;
return this;
}

public IExponentialBackoffBuilder MinDelay(TimeSpan minDelay)
{
this.minDelay = minDelay;
return this;
}

public IExponentialBackoffBuilder BackoffFactor(double backoffFactor)
{
this.backoffFactor = backoffFactor;
return this;
}

public IExponentialBackoffBuilder JitterFactor(double jitterFactor)
{
this.jitterFactor = jitterFactor;
return this;
}

public IExponentialBackoffBuilder Random(Random random)
{
this.random = random ?? new Random();
return this;
}

public IBackoffSupplier Build()
{
return new ExponentialBackoffSupplier(minDelay, maxDelay, backoffFactor, jitterFactor, random);
}
}

65 changes: 65 additions & 0 deletions Client/Impl/Worker/ExponentialBackoffSupplier.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
using System;
using Zeebe.Client.Api.Worker;

namespace Zeebe.Client.Impl.Worker;

/// <summary>
/// An implementation of <see cref="IBackoffSupplier"/> which uses the **Exponential Backoff with Jitter** strategy
/// for calculating retry delays.
/// <para>
/// The implementation uses a simple formula: it multiplies the previous delay with a backoff multiplier
/// and adds some jitter to avoid multiple clients polling at the same time.
/// </para>
/// </summary>
/// <remarks>
/// The core logic is copied from the Zeebe Java client's
/// <c>io.camunda.zeebe.client.impl.worker.ExponentialBackoff</c> implementation
/// (source: <a href="https://github.com/camunda/camunda/blob/5764a0a3e6c3d3253c5c9608bf4478f8e2281af7/clients/java-deprecated/src/main/java/io/camunda/zeebe/client/impl/worker/ExponentialBackoff.java#L31">GitHub</a>).
/// <para>
/// The next delay is calculated by clamping the multiplied delay between <c>minDelay</c> and <c>maxDelay</c>,
/// and then adding a random jitter:
/// </para>
/// <c>max(min(maxDelay, currentDelay * backoffFactor), minDelay) + jitter</c>
/// <para>The final result is ensured to be non-negative and rounded.</para>
/// </remarks>
internal sealed class ExponentialBackoffSupplier : IBackoffSupplier
{
private readonly double maxDelayMs;
private readonly double minDelayMs;
private readonly double backoffFactor;
private readonly double jitterFactor;
private readonly Random random;

internal ExponentialBackoffSupplier(
TimeSpan minDelay,
TimeSpan maxDelay,
double backoffFactor,
double jitterFactor,
Random random)
{
maxDelayMs = maxDelay.TotalMilliseconds;
minDelayMs = minDelay.TotalMilliseconds;
this.backoffFactor = backoffFactor;
this.jitterFactor = jitterFactor;
this.random = random ?? new Random();
}

public long SupplyRetryDelay(long currentRetryDelay)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔧 Could make a note that this implementation is based on the Java client version

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added summary and remarks to the ExponentialBackoffSupplier.cs

{
var previous = (double)currentRetryDelay;
var multiplied = previous * backoffFactor;
var clamped = Math.Max(Math.Min(maxDelayMs, multiplied), minDelayMs);

var range = clamped * jitterFactor;
var jitter = (random.NextDouble() * (2 * range)) - range;

var next = clamped + jitter;
if (next < 0)
{
next = 0;
}

return (long)Math.Round(next);
}
}

37 changes: 33 additions & 4 deletions Client/Impl/Worker/JobWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@ public sealed class JobWorker : IJobWorker
private readonly JobWorkerBuilder jobWorkerBuilder;
private readonly ILogger<JobWorker> logger;
private readonly int maxJobsActive;
private readonly TimeSpan pollInterval;
private TimeSpan pollInterval;
private readonly TimeSpan initialPollInterval;
private readonly IBackoffSupplier backoffSupplier;
private readonly IBackoffSupplier defaultBackoffSupplier;

private readonly CancellationTokenSource source;
private readonly double thresholdJobsActivation;
Expand All @@ -59,11 +62,14 @@ internal JobWorker(JobWorkerBuilder builder)
jobHandler = jobWorkerBuilder.Handler();
autoCompletion = builder.AutoCompletionEnabled();
pollInterval = jobWorkerBuilder.PollInterval();
initialPollInterval = pollInterval;
activateJobsRequest = jobWorkerBuilder.Request;
streamActivateJobsRequest = jobWorkerBuilder.StreamRequest;
jobActivator = jobWorkerBuilder.Activator;
maxJobsActive = jobWorkerBuilder.Request.MaxJobsToActivate;
thresholdJobsActivation = maxJobsActive * 0.6;
defaultBackoffSupplier = new ExponentialBackoffBuilderImpl().Build();
backoffSupplier = jobWorkerBuilder.RetryBackoffSupplier ?? defaultBackoffSupplier;
}

/// <inheritdoc />
Expand Down Expand Up @@ -184,7 +190,7 @@ private async Task StreamJobs(ITargetBlock<IJob> input, CancellationToken cancel
catch (RpcException rpcException)
{
LogRpcException(rpcException);
await Task.Delay(StreamErrorRetryDelayMs, cancellationToken);
await Backoff(cancellationToken);
}
}
}
Expand Down Expand Up @@ -224,11 +230,12 @@ await jobActivator.SendActivateRequest(activateJobsRequest,
async jobsResponse => await HandleActivationResponse(input, jobsResponse, jobCount),
null,
cancellationToken);
pollInterval = initialPollInterval;
}
catch (RpcException rpcException)
{
LogRpcException(rpcException);
await Task.Delay(pollInterval, cancellationToken);
await Backoff(cancellationToken);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed in my commit to backoff now every RpcException in Stream and Poll Jobs

}
}
else
Expand All @@ -238,6 +245,28 @@ await jobActivator.SendActivateRequest(activateJobsRequest,
}
}

/// <summary>
/// Updates pollInterval using the configured backoff supplier and waits for that delay.
/// Falls back to a default exponential supplier if the custom supplier throws.
/// </summary>
private async Task Backoff(CancellationToken cancellationToken)
{
var previousMs = Math.Max(0, (long)pollInterval.TotalMilliseconds);
long nextMs;
try
{
nextMs = backoffSupplier.SupplyRetryDelay(previousMs);
}
catch (Exception ex)
{
logger?.LogWarning(ex, "Backoff supplier failed; falling back to default backoff.");
nextMs = defaultBackoffSupplier.SupplyRetryDelay(previousMs);
}

pollInterval = TimeSpan.FromMilliseconds(Math.Max(0, nextMs));
await Task.Delay(pollInterval, cancellationToken);
}

private async Task HandleActivationResponse(ITargetBlock<IJob> input, IActivateJobsResponse response, int? jobCount)
{
if (jobCount.HasValue)
Expand Down Expand Up @@ -332,4 +361,4 @@ private Task FailActivatedJob(JobClientWrapper jobClient, IJob activatedJob, Can
}
}, cancellationToken);
}
}
}
8 changes: 8 additions & 0 deletions Client/Impl/Worker/JobWorkerBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
using Microsoft.Extensions.Logging;
using Zeebe.Client.Api.Worker;
using Zeebe.Client.Impl.Commands;
using Zeebe.Client.Impl.Worker;

namespace Zeebe.Client.Impl.Worker;

Expand All @@ -40,6 +41,7 @@ public class JobWorkerBuilder(
internal byte ThreadCount { get; set; } = 1;
internal ILoggerFactory LoggerFactory { get; } = loggerFactory;
internal IJobClient JobClient { get; } = zeebeClient;
internal IBackoffSupplier RetryBackoffSupplier { get; private set; }

public IJobWorkerBuilderStep2 JobType(string type)
{
Expand Down Expand Up @@ -147,6 +149,12 @@ public IJobWorkerBuilderStep3 HandlerThreads(byte threadCount)
return this;
}

public IJobWorkerBuilderStep3 BackoffSupplier(IBackoffSupplier backoffSupplier)
{
RetryBackoffSupplier = backoffSupplier;
return this;
}

public IJobWorker Open()
{
var worker = new JobWorker(this);
Expand Down
Loading