Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 78 additions & 39 deletions Client.UnitTests/JobWorkerTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using GatewayProtocol;
using NLog;
using NUnit.Framework;
Expand All @@ -42,19 +43,19 @@ public void ShouldSendRequestReceiveResponseAsExpected()
RequestTimeout = 5_000L
};

TestService.AddRequestHandler(typeof(ActivateJobsRequest), request => CreateExpectedResponse());
TestService.AddRequestHandler(typeof(ActivateJobsRequest), _ => CreateExpectedResponse());

// when
var signal = new EventWaitHandle(false, EventResetMode.AutoReset);
var receivedJobs = new List<IJob>();
using (var jobWorker = ZeebeClient.NewWorker()
.JobType("foo")
.Handler((jobClient, job) =>
.Handler((_, job) =>
{
receivedJobs.Add(job);
if (receivedJobs.Count == 3)
{
_ = signal.Set();
signal.Set();
}
})
.MaxJobsActive(3)
Expand All @@ -65,7 +66,7 @@ public void ShouldSendRequestReceiveResponseAsExpected()
.Open())
{
Assert.True(jobWorker.IsOpen());
_ = signal.WaitOne();
signal.WaitOne();
}

// then
Expand Down Expand Up @@ -118,7 +119,7 @@ public void ShouldSendRequestWithTenantIdsListReceiveResponseAsExpected()
.Open())
{
Assert.True(jobWorker.IsOpen());
_ = signal.WaitOne();
signal.WaitOne();
}

// then
Expand All @@ -139,13 +140,13 @@ public void ShouldFailWithZeroThreadCount()
var aggregateException = Assert.Throws<ArgumentOutOfRangeException>(
() =>
{
_ = ZeebeClient.NewWorker()
ZeebeClient.NewWorker()
.JobType("foo")
.Handler((jobClient, job) => { })
.Handler((_, _) => { })
.HandlerThreads(0);
});
StringAssert.Contains("Expected an handler thread count larger then zero, but got 0.",
aggregateException.Message);
aggregateException?.Message);
}

[Test]
Expand All @@ -161,7 +162,7 @@ public void ShouldSendAsyncCompleteInHandler()
RequestTimeout = 5_000L
};

TestService.AddRequestHandler(typeof(ActivateJobsRequest), request => CreateExpectedResponse());
TestService.AddRequestHandler(typeof(ActivateJobsRequest), _ => CreateExpectedResponse());

// when
var signal = new EventWaitHandle(false, EventResetMode.AutoReset);
Expand All @@ -170,11 +171,11 @@ public void ShouldSendAsyncCompleteInHandler()
.JobType("foo")
.Handler(async (jobClient, job) =>
{
_ = await jobClient.NewCompleteJobCommand(job).Send();
await jobClient.NewCompleteJobCommand(job).Send();
completedJobs.Add(job);
if (completedJobs.Count == 3)
{
_ = signal.Set();
signal.Set();
}
})
.MaxJobsActive(3)
Expand All @@ -186,7 +187,7 @@ public void ShouldSendAsyncCompleteInHandler()
.Open())
{
Assert.True(jobWorker.IsOpen());
_ = signal.WaitOne();
signal.WaitOne();
}

// then
Expand Down Expand Up @@ -214,7 +215,7 @@ public void ShouldUseMultipleHandlerThreads()
RequestTimeout = 5_000L
};

TestService.AddRequestHandler(typeof(ActivateJobsRequest), request => CreateExpectedResponse());
TestService.AddRequestHandler(typeof(ActivateJobsRequest), _ => CreateExpectedResponse());

// when
var signal = new EventWaitHandle(false, EventResetMode.AutoReset);
Expand All @@ -223,11 +224,11 @@ public void ShouldUseMultipleHandlerThreads()
.JobType("foo")
.Handler(async (jobClient, job) =>
{
_ = await jobClient.NewCompleteJobCommand(job).Send();
_ = completedJobs.TryAdd(job.Key, job);
await jobClient.NewCompleteJobCommand(job).Send();
completedJobs.TryAdd(job.Key, job);
if (completedJobs.Count == 3)
{
_ = signal.Set();
signal.Set();
}
})
.MaxJobsActive(3)
Expand All @@ -239,7 +240,7 @@ public void ShouldUseMultipleHandlerThreads()
.Open())
{
Assert.True(jobWorker.IsOpen());
_ = signal.WaitOne();
signal.WaitOne();
}

// then
Expand All @@ -265,7 +266,7 @@ public void ShouldSendCompleteInHandler()
RequestTimeout = 5_000L
};

TestService.AddRequestHandler(typeof(ActivateJobsRequest), request => CreateExpectedResponse());
TestService.AddRequestHandler(typeof(ActivateJobsRequest), _ => CreateExpectedResponse());

// when
var signal = new EventWaitHandle(false, EventResetMode.AutoReset);
Expand All @@ -274,11 +275,11 @@ public void ShouldSendCompleteInHandler()
.JobType("foo")
.Handler((jobClient, job) =>
{
_ = jobClient.NewCompleteJobCommand(job).Send();
jobClient.NewCompleteJobCommand(job).Send();
completedJobs.Add(job);
if (completedJobs.Count == 3)
{
_ = signal.Set();
signal.Set();
}
})
.MaxJobsActive(3)
Expand All @@ -289,7 +290,7 @@ public void ShouldSendCompleteInHandler()
.Open())
{
Assert.True(jobWorker.IsOpen());
_ = signal.WaitOne();
signal.WaitOne();
}

// then
Expand Down Expand Up @@ -332,21 +333,21 @@ public void ShouldSendRequestsWithDifferentAmounts()
RequestTimeout = 5_000L
};

TestService.AddRequestHandler(typeof(ActivateJobsRequest), request => CreateExpectedResponse());
TestService.AddRequestHandler(typeof(ActivateJobsRequest), _ => CreateExpectedResponse());

// when
var receivedJobs = new List<IJob>();
using (var jobWorker = ZeebeClient.NewWorker()
.JobType("foo")
.Handler((jobClient, job) =>
.Handler((_, job) =>
{
// block job handling
receivedJobs.Add(job);
if (receivedJobs.Count == 2)
{
using (var signal = new EventWaitHandle(false, EventResetMode.AutoReset))
{
_ = signal.WaitOne();
signal.WaitOne();
}
}
})
Expand Down Expand Up @@ -384,7 +385,7 @@ public void ShouldSendRequestWithTimeSpanTimeoutAsMilliseconds()
RequestTimeout = 5_000L
};

TestService.AddRequestHandler(typeof(ActivateJobsRequest), request => CreateExpectedResponse());
TestService.AddRequestHandler(typeof(ActivateJobsRequest), _ => CreateExpectedResponse());

// when
var signal = new EventWaitHandle(false, EventResetMode.AutoReset);
Expand All @@ -396,7 +397,7 @@ public void ShouldSendRequestWithTimeSpanTimeoutAsMilliseconds()
receivedJobs.Add(job);
if (receivedJobs.Count == 3)
{
_ = signal.Set();
signal.Set();
}
})
.MaxJobsActive(1)
Expand All @@ -407,7 +408,7 @@ public void ShouldSendRequestWithTimeSpanTimeoutAsMilliseconds()
.Open())
{
Assert.True(jobWorker.IsOpen());
_ = signal.WaitOne();
signal.WaitOne();
}

// then
Expand All @@ -434,7 +435,7 @@ public void ShouldSendRequestWithFetchVariables()
FetchVariable = { "foo", "bar", "test" }
};

TestService.AddRequestHandler(typeof(ActivateJobsRequest), request => CreateExpectedResponse());
TestService.AddRequestHandler(typeof(ActivateJobsRequest), _ => CreateExpectedResponse());

// when
var signal = new EventWaitHandle(false, EventResetMode.AutoReset);
Expand All @@ -446,7 +447,7 @@ public void ShouldSendRequestWithFetchVariables()
receivedJobs.Add(job);
if (receivedJobs.Count == 3)
{
_ = signal.Set();
signal.Set();
}
})
.MaxJobsActive(1)
Expand All @@ -457,7 +458,7 @@ public void ShouldSendRequestWithFetchVariables()
.Open())
{
Assert.True(jobWorker.IsOpen());
_ = signal.WaitOne();
signal.WaitOne();
}

// then
Expand All @@ -484,7 +485,7 @@ public void ShouldSendRequestWithFetchVariablesList()
FetchVariable = { "foo", "bar", "test" }
};
IList<string> variableNames = new List<string> { "foo", "bar", "test" };
TestService.AddRequestHandler(typeof(ActivateJobsRequest), request => CreateExpectedResponse());
TestService.AddRequestHandler(typeof(ActivateJobsRequest), _ => CreateExpectedResponse());

// when
var signal = new EventWaitHandle(false, EventResetMode.AutoReset);
Expand All @@ -496,7 +497,7 @@ public void ShouldSendRequestWithFetchVariablesList()
receivedJobs.Add(job);
if (receivedJobs.Count == 3)
{
_ = signal.Set();
signal.Set();
}
})
.MaxJobsActive(1)
Expand All @@ -507,7 +508,7 @@ public void ShouldSendRequestWithFetchVariablesList()
.Open())
{
Assert.True(jobWorker.IsOpen());
_ = signal.WaitOne();
signal.WaitOne();
}

// then
Expand Down Expand Up @@ -540,12 +541,12 @@ public void ShouldSendFailCommandOnExceptionInJobHandler()
Retries = 2
};

TestService.AddRequestHandler(typeof(ActivateJobsRequest), request => CreateExpectedResponse());
TestService.AddRequestHandler(typeof(ActivateJobsRequest), _ => CreateExpectedResponse());

// when
using (var jobWorker = ZeebeClient.NewWorker()
.JobType("foo")
.Handler((jobClient, job) =>
.Handler((_, job) =>
{
if (job.Key == 1)
{
Expand Down Expand Up @@ -577,7 +578,7 @@ public void ShouldSendFailCommandOnExceptionInJobHandler()
public void ShouldCompleteAfterSendFailCommandOnExceptionInJobHandler()
{
// given
TestService.AddRequestHandler(typeof(ActivateJobsRequest), request => CreateExpectedResponse());
TestService.AddRequestHandler(typeof(ActivateJobsRequest), _ => CreateExpectedResponse());

// when
using (var jobWorker = ZeebeClient.NewWorker()
Expand All @@ -589,7 +590,7 @@ public void ShouldCompleteAfterSendFailCommandOnExceptionInJobHandler()
throw new Exception("Fail");
}

_ = await jobClient.NewCompleteJobCommand(job).Send();
await jobClient.NewCompleteJobCommand(job).Send();
})
.MaxJobsActive(3)
.Name("jobWorker")
Expand Down Expand Up @@ -626,12 +627,12 @@ public void ShouldUseAutoCompleteWithWorker()
};
TestService.AddRequestHandler(
typeof(ActivateJobsRequest),
request => CreateExpectedResponse());
_ => CreateExpectedResponse());

// when
using (var jobWorker = ZeebeClient.NewWorker()
.JobType("foo")
.Handler((jobClient, job) => { Logger.Info("Handler has seen job '{0}'", job); })
.Handler((_, job) => { Logger.Info("Handler has seen job '{Job}'", job); })
.AutoCompletion()
.MaxJobsActive(3)
.Name("jobWorker")
Expand Down Expand Up @@ -659,6 +660,44 @@ public void ShouldUseAutoCompleteWithWorker()
Assert.Contains(3, completeJobRequests);
}

[Test]
public async Task CanDisposeRunningWorker()
{
// given
TestService.AddRequestHandler(
typeof(ActivateJobsRequest),
_ => CreateExpectedResponse());
var signal = new EventWaitHandle(false, EventResetMode.AutoReset);

// when
var jobWorker = ZeebeClient.NewWorker()
.JobType("foo")
.Handler(async (jobClient, job) =>
{
// trigger worker disposal with second job
signal.Set();

// this should not be completed
await jobClient.NewCompleteJobCommand(job).Send();
})
.AutoCompletion()
.MaxJobsActive(3)
.Name("jobWorker")
.Timeout(TimeSpan.FromSeconds(1))
.PollInterval(TimeSpan.FromSeconds(3)) // long
.PollingTimeout(TimeSpan.FromMilliseconds(100))
.Open();
Assert.True(jobWorker.IsOpen());

signal.WaitOne();
// disposal must be quick even though the polling interval is long
await jobWorker.DisposeAsync().ConfigureAwait(false);

// activated job was not awaited
Assert.AreEqual(0, TestService.Requests[typeof(CompleteJobRequest)].Count);
Assert.True(jobWorker.IsClosed());
}

public static ActivateJobsResponse CreateExpectedResponse()
{
return new ActivateJobsResponse
Expand Down
2 changes: 1 addition & 1 deletion Client/Api/Worker/IJobWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ namespace Zeebe.Client.Api.Worker;
/// open, the worker continuously receives jobs from the broker and hands them to a registered
/// <see cref="JobHandler" />.
/// </summary>
public interface IJobWorker : IDisposable
public interface IJobWorker : IDisposable, IAsyncDisposable
{
/// <returns>true if this registration is currently active and work items are being received for it.</returns>
bool IsOpen();
Expand Down
Loading
Loading