Skip to content

Commit c4e0611

Browse files
authored
[Event Hubs] Processor Test Scenario (Azure#34924)
The focus of these changes is to add a scenario to the event processor tests to validate that a checkpoint can be created from cached `ProcessEventArgs` after the processor has stopped.
1 parent 905a3da commit c4e0611

File tree

1 file changed

+71
-0
lines changed

1 file changed

+71
-0
lines changed

sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Processor/EventProcessorClientLiveTests.cs

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -927,6 +927,77 @@ public async Task ProcessorClientCeasesProcessingWhenStopping()
927927
Assert.That(eventsProcessedAfterStop, Is.False, "Events should not have been dispatched for processing after the processor has stopped.");
928928
}
929929

930+
/// <summary>
931+
/// Verifies that the <see cref="EventProcessorClient" /> can read a set of published events.
932+
/// </summary>
933+
///
934+
[Test]
935+
public async Task ProcessorClientCanCheckpointAfterStoppping()
936+
{
937+
// Setup the environment.
938+
939+
await using EventHubScope scope = await EventHubScope.CreateAsync(1);
940+
var connectionString = EventHubsTestEnvironment.Instance.BuildConnectionStringForEventHub(scope.EventHubName);
941+
942+
using var cancellationSource = new CancellationTokenSource();
943+
cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit);
944+
945+
// Send a set of events.
946+
947+
var partitions = new HashSet<string>();
948+
var segmentEventCount = 25;
949+
var beforeCheckpointEvents = EventGenerator.CreateEvents(segmentEventCount).ToList();
950+
var afterCheckpointEvents = EventGenerator.CreateEvents(segmentEventCount).ToList();
951+
var sourceEvents = Enumerable.Concat(beforeCheckpointEvents, afterCheckpointEvents).ToList();
952+
var checkpointEvent = beforeCheckpointEvents.Last();
953+
var checkpointArgs = default(ProcessEventArgs);
954+
var sentCount = await SendEvents(connectionString, sourceEvents, cancellationSource.Token);
955+
956+
Assert.That(sentCount, Is.EqualTo(sourceEvents.Count), "Not all of the source events were sent.");
957+
958+
// Attempt to read back the first half of the events and checkpoint.
959+
960+
Func<ProcessEventArgs, Task> processedEventCallback = args =>
961+
{
962+
if (args.Data.IsEquivalentTo(checkpointEvent))
963+
{
964+
partitions.Add(args.Partition.PartitionId);
965+
checkpointArgs = args;
966+
}
967+
968+
return Task.CompletedTask;
969+
};
970+
971+
var processedEvents = new ConcurrentDictionary<string, EventData>();
972+
var completionSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
973+
var beforeCheckpointProcessHandler = CreateEventTrackingHandler(segmentEventCount, processedEvents, completionSource, cancellationSource.Token, processedEventCallback);
974+
var options = new EventProcessorOptions { LoadBalancingUpdateInterval = TimeSpan.FromMilliseconds(250) };
975+
var checkpointStore = new InMemoryCheckpointStore(_ => { });
976+
var processor = CreateProcessor(scope.ConsumerGroups.First(), connectionString, checkpointStore, options);
977+
978+
processor.ProcessErrorAsync += CreateAssertingErrorHandler();
979+
processor.ProcessEventAsync += beforeCheckpointProcessHandler;
980+
981+
await processor.StartProcessingAsync(cancellationSource.Token);
982+
983+
await Task.WhenAny(completionSource.Task, Task.Delay(Timeout.Infinite, cancellationSource.Token));
984+
Assert.That(cancellationSource.IsCancellationRequested, Is.False, "The cancellation token should not have been signaled.");
985+
986+
await processor.StopProcessingAsync(cancellationSource.Token);
987+
988+
// Validate that a single partition was processed and a checkpoint can be written.
989+
990+
Assert.That(partitions.Count, Is.EqualTo(1), "All events should have been processed from a single partition.");
991+
Assert.That(checkpointArgs, Is.Not.Null, "The checkpoint arguments should have been captured.");
992+
Assert.That(async () => await checkpointArgs.UpdateCheckpointAsync(cancellationSource.Token), Throws.Nothing, "Checkpointing should be safe after stopping.");
993+
994+
// Validate a checkpoint was created and that events were processed.
995+
996+
var checkpoint = await checkpointStore.GetCheckpointAsync(processor.FullyQualifiedNamespace, processor.EventHubName, processor.ConsumerGroup, partitions.First(), cancellationSource.Token);
997+
Assert.That(checkpoint, Is.Not.Null, "A checkpoint should have been created.");
998+
Assert.That(processedEvents.Count, Is.AtLeast(beforeCheckpointEvents.Count), "All events before the checkpoint should have been processed.");
999+
}
1000+
9301001
/// <summary>
9311002
/// Creates an <see cref="EventProcessorClient" /> that uses mock storage and
9321003
/// a connection based on a connection string.

0 commit comments

Comments
 (0)