Skip to content
Closed
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 87 additions & 0 deletions Client.UnitTests/ExponentialBackoffTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
using System;
using System.Collections.Generic;
using System.Linq;
using NUnit.Framework;
using Zeebe.Client.Api.Worker;
using Zeebe.Client.Impl.Worker;

namespace Zeebe.Client;

[TestFixture]
public sealed class ExponentialBackoffTests
{
[Test]
public void ShouldReturnDelayWithinBounds_WhenNoJitter()
{
// given
var maxDelay = TimeSpan.FromMilliseconds(1_000);
var minDelay = TimeSpan.FromMilliseconds(50);
IBackoffSupplier supplier = new ExponentialBackoffBuilderImpl()
.MaxDelay(maxDelay)
.MinDelay(minDelay)
.BackoffFactor(1.6)
.JitterFactor(0)
.Build();

var delays = new List<long>();
long current = supplier.SupplyRetryDelay(0);

// when
for (var i = 0; i < 100; i++)
{
delays.Add(current);
current = supplier.SupplyRetryDelay(current);
}

// then - with zero jitter, sequence should monotonically increase until it caps at max
Assert.That(delays, Is.Not.Empty);
Assert.That(delays.First(), Is.EqualTo((long)minDelay.TotalMilliseconds));
Assert.That(delays.Last(), Is.EqualTo((long)maxDelay.TotalMilliseconds));

long previous = -1;
foreach (var delay in delays)
{
if (delay != (long)maxDelay.TotalMilliseconds)
{
Assert.That(delay, Is.GreaterThan(previous));
}
previous = delay;
}
}

[Test]
public void ShouldBeRandomizedWithJitter()
{
// given
var maxDelay = TimeSpan.FromMilliseconds(1_000);
var minDelay = TimeSpan.FromMilliseconds(50);
const double jitterFactor = 0.2;

IBackoffSupplier supplier = new ExponentialBackoffBuilderImpl()
.MaxDelay(maxDelay)
.MinDelay(minDelay)
.BackoffFactor(1.5)
.JitterFactor(jitterFactor)
.Build();

var lowerMaxBound = (long)Math.Round(maxDelay.TotalMilliseconds + (maxDelay.TotalMilliseconds * -jitterFactor));
var upperMaxBound = (long)Math.Round(maxDelay.TotalMilliseconds + (maxDelay.TotalMilliseconds * jitterFactor));

// when - always compute from max to test jitter around the cap
var delays = new List<long>();
for (var i = 0; i < 10; i++)
{
var value = supplier.SupplyRetryDelay((long)maxDelay.TotalMilliseconds);
delays.Add(value);
}

// then
Assert.That(delays, Is.Not.Empty);
foreach (var delay in delays)
{
Assert.That(delay, Is.InRange(lowerMaxBound, upperMaxBound));
}
Assert.That(delays.Distinct().Count(), Is.GreaterThan(1));
}
}

17 changes: 17 additions & 0 deletions Client/Api/Worker/IBackoffSupplier.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
using System;

namespace Zeebe.Client.Api.Worker;

/// <summary>
/// Supplies the delay for the next retry after a failed activate request.
/// Value is in milliseconds; return value may be zero to retry immediately.
/// </summary>
public interface IBackoffSupplier
{
/// <summary>
/// Returns the delay before the next retry.
/// </summary>
/// <param name="currentRetryDelay">The previously used delay in milliseconds.</param>
/// <returns>The new retry delay in milliseconds.</returns>
long SupplyRetryDelay(long currentRetryDelay);
}
36 changes: 36 additions & 0 deletions Client/Api/Worker/IExponentialBackoffBuilder.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
using System;

namespace Zeebe.Client.Api.Worker;

public interface IExponentialBackoffBuilder
{
/// <summary>
/// Sets the maximum retry delay. Default is 5000ms.
/// </summary>
IExponentialBackoffBuilder MaxDelay(TimeSpan maxDelay);

/// <summary>
/// Sets the minimum retry delay. Default is 50ms.
/// </summary>
IExponentialBackoffBuilder MinDelay(TimeSpan minDelay);

/// <summary>
/// Sets the multiplication factor (previous delay * factor). Default is 1.6.
/// </summary>
IExponentialBackoffBuilder BackoffFactor(double backoffFactor);

/// <summary>
/// Sets optional jitter factor (+/- factor of delay). Default is 0.1.
/// </summary>
IExponentialBackoffBuilder JitterFactor(double jitterFactor);

/// <summary>
/// Sets the random source used to compute jitter. Default is new Random().
/// </summary>
IExponentialBackoffBuilder Random(Random random);

/// <summary>
/// Builds the supplier with the provided configuration.
/// </summary>
IBackoffSupplier Build();
}
11 changes: 10 additions & 1 deletion Client/Api/Worker/IJobWorkerBuilderStep1.cs
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,15 @@ public interface IJobWorkerBuilderStep3 : ITenantIdsCommandStep<IJobWorkerBuilde
/// <returns>the builder for this worker.</returns>
IJobWorkerBuilderStep3 HandlerThreads(byte threadCount);

/// <summary>
/// Configures a retry backoff supplier used when activate requests fail with
/// gRPC StatusCode.ResourceExhausted. Only applied to polling retries and reset on success.
/// Defaults to an exponential backoff (max 5000ms, min 50ms, factor 1.6, jitter 0.1) if not set.
/// </summary>
/// <param name="backoffSupplier">The supplier used to compute the next retry delay in ms.</param>
/// <returns>The builder for this worker.</returns>
IJobWorkerBuilderStep3 BackoffSupplier(IBackoffSupplier backoffSupplier);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍🏼


/// <summary>
/// Enable or disable gRPC job streaming for this worker.
/// </summary>
Expand All @@ -249,4 +258,4 @@ public interface IJobWorkerBuilderStep3 : ITenantIdsCommandStep<IJobWorkerBuilde
/// </summary>
/// <returns>the worker.</returns>
IJobWorker Open();
}
}
49 changes: 49 additions & 0 deletions Client/Impl/Worker/ExponentialBackoffBuilderImpl.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
using System;
using Zeebe.Client.Api.Worker;

namespace Zeebe.Client.Impl.Worker;

internal sealed class ExponentialBackoffBuilderImpl : IExponentialBackoffBuilder
{
private TimeSpan maxDelay = TimeSpan.FromMilliseconds(5000);
private TimeSpan minDelay = TimeSpan.FromMilliseconds(50);
private double backoffFactor = 1.6;
private double jitterFactor = 0.1;
private Random random = new Random();

public IExponentialBackoffBuilder MaxDelay(TimeSpan maxDelay)
{
this.maxDelay = maxDelay;
return this;
}

public IExponentialBackoffBuilder MinDelay(TimeSpan minDelay)
{
this.minDelay = minDelay;
return this;
}

public IExponentialBackoffBuilder BackoffFactor(double backoffFactor)
{
this.backoffFactor = backoffFactor;
return this;
}

public IExponentialBackoffBuilder JitterFactor(double jitterFactor)
{
this.jitterFactor = jitterFactor;
return this;
}

public IExponentialBackoffBuilder Random(Random random)
{
this.random = random ?? new Random();
return this;
}

public IBackoffSupplier Build()
{
return new ExponentialBackoffSupplier(minDelay, maxDelay, backoffFactor, jitterFactor, random);
}
}

65 changes: 65 additions & 0 deletions Client/Impl/Worker/ExponentialBackoffSupplier.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
using System;
using Zeebe.Client.Api.Worker;

namespace Zeebe.Client.Impl.Worker;

/// <summary>
/// An implementation of <see cref="IBackoffSupplier"/> which uses the **Exponential Backoff with Jitter** strategy
/// for calculating retry delays.
/// <para>
/// The implementation uses a simple formula: it multiplies the previous delay with a backoff multiplier
/// and adds some jitter to avoid multiple clients polling at the same time.
/// </para>
/// </summary>
/// <remarks>
/// The core logic is copied from the Zeebe Java client's
/// <c>io.camunda.zeebe.client.impl.worker.ExponentialBackoff</c> implementation
/// (source: <a href="https://github.com/camunda/camunda/blob/5764a0a3e6c3d3253c5c9608bf4478f8e2281af7/clients/java-deprecated/src/main/java/io/camunda/zeebe/client/impl/worker/ExponentialBackoff.java#L31">GitHub</a>).
/// <para>
/// The next delay is calculated by clamping the multiplied delay between <c>minDelay</c> and <c>maxDelay</c>,
/// and then adding a random jitter:
/// </para>
/// <c>max(min(maxDelay, currentDelay * backoffFactor), minDelay) + jitter</c>
/// <para>The final result is ensured to be non-negative and rounded.</para>
/// </remarks>
internal sealed class ExponentialBackoffSupplier : IBackoffSupplier
{
private readonly double maxDelayMs;
private readonly double minDelayMs;
private readonly double backoffFactor;
private readonly double jitterFactor;
private readonly Random random;

internal ExponentialBackoffSupplier(
TimeSpan minDelay,
TimeSpan maxDelay,
double backoffFactor,
double jitterFactor,
Random random)
{
maxDelayMs = maxDelay.TotalMilliseconds;
minDelayMs = minDelay.TotalMilliseconds;
this.backoffFactor = backoffFactor;
this.jitterFactor = jitterFactor;
this.random = random ?? new Random();
}

public long SupplyRetryDelay(long currentRetryDelay)
{
var previous = (double)currentRetryDelay;
var multiplied = previous * backoffFactor;
var clamped = Math.Max(Math.Min(maxDelayMs, multiplied), minDelayMs);

var range = clamped * jitterFactor;
var jitter = (random.NextDouble() * (2 * range)) - range;

var next = clamped + jitter;
if (next < 0)
{
next = 0;
}

return (long)Math.Round(next);
}
}

37 changes: 33 additions & 4 deletions Client/Impl/Worker/JobWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@ public sealed class JobWorker : IJobWorker
private readonly JobWorkerBuilder jobWorkerBuilder;
private readonly ILogger<JobWorker> logger;
private readonly int maxJobsActive;
private readonly TimeSpan pollInterval;
private TimeSpan pollInterval;
private readonly TimeSpan initialPollInterval;
private readonly IBackoffSupplier backoffSupplier;
private readonly IBackoffSupplier defaultBackoffSupplier;

private readonly CancellationTokenSource source;
private readonly double thresholdJobsActivation;
Expand All @@ -59,11 +62,14 @@ internal JobWorker(JobWorkerBuilder builder)
jobHandler = jobWorkerBuilder.Handler();
autoCompletion = builder.AutoCompletionEnabled();
pollInterval = jobWorkerBuilder.PollInterval();
initialPollInterval = pollInterval;
activateJobsRequest = jobWorkerBuilder.Request;
streamActivateJobsRequest = jobWorkerBuilder.StreamRequest;
jobActivator = jobWorkerBuilder.Activator;
maxJobsActive = jobWorkerBuilder.Request.MaxJobsToActivate;
thresholdJobsActivation = maxJobsActive * 0.6;
defaultBackoffSupplier = new ExponentialBackoffBuilderImpl().Build();
backoffSupplier = jobWorkerBuilder.RetryBackoffSupplier ?? defaultBackoffSupplier;
}
Comment on lines 62 to 73
Copy link

Copilot AI Dec 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The integration of the backoff supplier with the JobWorker class lacks test coverage. While ExponentialBackoffTests.cs tests the backoff calculation logic, there are no tests verifying:

  1. That the backoff is applied when RPC exceptions occur
  2. That the pollInterval is reset after successful activations
  3. That the custom backoff supplier is used when provided
  4. That the fallback to default backoff occurs when a custom supplier throws

Consider adding integration tests in JobWorkerTest.cs to cover these scenarios.

Copilot uses AI. Check for mistakes.

/// <inheritdoc />
Expand Down Expand Up @@ -184,7 +190,7 @@ private async Task StreamJobs(ITargetBlock<IJob> input, CancellationToken cancel
catch (RpcException rpcException)
{
LogRpcException(rpcException);
await Task.Delay(StreamErrorRetryDelayMs, cancellationToken);
await Backoff(cancellationToken);
}
}
}
Expand Down Expand Up @@ -224,11 +230,12 @@ await jobActivator.SendActivateRequest(activateJobsRequest,
async jobsResponse => await HandleActivationResponse(input, jobsResponse, jobCount),
null,
cancellationToken);
pollInterval = initialPollInterval;
}
catch (RpcException rpcException)
{
LogRpcException(rpcException);
await Task.Delay(pollInterval, cancellationToken);
await Backoff(cancellationToken);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed in my commit to backoff now every RpcException in Stream and Poll Jobs

}
}
else
Expand All @@ -238,6 +245,28 @@ await jobActivator.SendActivateRequest(activateJobsRequest,
}
}

/// <summary>
/// Updates pollInterval using the configured backoff supplier and waits for that delay.
/// Falls back to a default exponential supplier if the custom supplier throws.
/// </summary>
private async Task Backoff(CancellationToken cancellationToken)
{
var previousMs = Math.Max(0, (long)pollInterval.TotalMilliseconds);
long nextMs;
try
{
nextMs = backoffSupplier.SupplyRetryDelay(previousMs);
}
catch (Exception ex)
{
logger?.LogWarning(ex, "Backoff supplier failed; falling back to default backoff.");
nextMs = defaultBackoffSupplier.SupplyRetryDelay(previousMs);
}

pollInterval = TimeSpan.FromMilliseconds(Math.Max(0, nextMs));
await Task.Delay(pollInterval, cancellationToken);
}

private async Task HandleActivationResponse(ITargetBlock<IJob> input, IActivateJobsResponse response, int? jobCount)
{
if (jobCount.HasValue)
Expand Down Expand Up @@ -332,4 +361,4 @@ private Task FailActivatedJob(JobClientWrapper jobClient, IJob activatedJob, Can
}
}, cancellationToken);
}
}
}
8 changes: 8 additions & 0 deletions Client/Impl/Worker/JobWorkerBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
using Microsoft.Extensions.Logging;
using Zeebe.Client.Api.Worker;
using Zeebe.Client.Impl.Commands;
using Zeebe.Client.Impl.Worker;

namespace Zeebe.Client.Impl.Worker;

Expand All @@ -40,6 +41,7 @@ public class JobWorkerBuilder(
internal byte ThreadCount { get; set; } = 1;
internal ILoggerFactory LoggerFactory { get; } = loggerFactory;
internal IJobClient JobClient { get; } = zeebeClient;
internal IBackoffSupplier RetryBackoffSupplier { get; private set; }

public IJobWorkerBuilderStep2 JobType(string type)
{
Expand Down Expand Up @@ -147,6 +149,12 @@ public IJobWorkerBuilderStep3 HandlerThreads(byte threadCount)
return this;
}

public IJobWorkerBuilderStep3 BackoffSupplier(IBackoffSupplier backoffSupplier)
{
RetryBackoffSupplier = backoffSupplier;
return this;
}

public IJobWorker Open()
{
var worker = new JobWorker(this);
Expand Down
Loading