Skip to content

Commit 81add1a

Browse files
authored
[Event Hubs] Tweak processor startup (Azure#35113)
* [Event Hubs] Tweak processor startup The focus of these changes is to tweak the approach used to validate connection permissions when the processor is first starting up. Previously, it would attempt a read using the same explicit consumer used when running. That would potentially cause the processor working on that partition to temporarily be pushed off the partition. Though it would recover on the next load balancing cycle, this would cause a delay in processing. * Applying feedback to changelog phrasing.
1 parent 30dae40 commit 81add1a

File tree

8 files changed

+101
-68
lines changed

8 files changed

+101
-68
lines changed

sdk/eventhub/Azure.Messaging.EventHubs.Processor/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88

99
### Bugs Fixed
1010

11+
- Changed the approach that the event processor uses to validate permissions on startup to ensure that it does not interrupt other processors already running by temporarily asserting ownership of a partition.
12+
1113
### Other Changes
1214

1315
## 5.8.1 (2023-03-09)

sdk/eventhub/Azure.Messaging.EventHubs/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88

99
### Bugs Fixed
1010

11+
- Changed the approach that the event processor uses to validate permissions on startup to ensure that it does not interrupt other processors already running by temporarily asserting ownership of a partition.
12+
1113
### Other Changes
1214

1315
## 5.8.1 (2023-03-09)

sdk/eventhub/Azure.Messaging.EventHubs/src/Primitives/EventProcessor.cs

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -573,6 +573,7 @@ public virtual void StopProcessing(CancellationToken cancellationToken = default
573573
/// <param name="eventPosition">The position in the event stream where the consumer should begin reading.</param>
574574
/// <param name="connection">The connection to use for the consumer.</param>
575575
/// <param name="options">The options to use for configuring the consumer.</param>
576+
/// <param name="exclusive"><c>true</c> if this should be an exclusive consumer; otherwise, <c>false</c>.</param>
576577
///
577578
/// <returns>An <see cref="TransportConsumer" /> with the requested configuration.</returns>
578579
///
@@ -581,8 +582,19 @@ internal virtual TransportConsumer CreateConsumer(string consumerGroup,
581582
string consumerIdentifier,
582583
EventPosition eventPosition,
583584
EventHubConnection connection,
584-
EventProcessorOptions options) =>
585-
connection.CreateTransportConsumer(consumerGroup, partitionId, consumerIdentifier, eventPosition, options.RetryOptions.ToRetryPolicy(), options.TrackLastEnqueuedEventProperties, InvalidateConsumerWhenPartitionIsStolen, prefetchCount: (uint?)options.PrefetchCount, prefetchSizeInBytes: options.PrefetchSizeInBytes, ownerLevel: 0);
585+
EventProcessorOptions options,
586+
bool exclusive) =>
587+
connection.CreateTransportConsumer(
588+
consumerGroup,
589+
partitionId,
590+
consumerIdentifier,
591+
eventPosition,
592+
options.RetryOptions.ToRetryPolicy(),
593+
options.TrackLastEnqueuedEventProperties,
594+
InvalidateConsumerWhenPartitionIsStolen,
595+
prefetchCount: (uint?)options.PrefetchCount,
596+
prefetchSizeInBytes: options.PrefetchSizeInBytes,
597+
ownerLevel: exclusive ? 0 : null);
586598

587599
/// <summary>
588600
/// Performs the tasks needed to process a batch of events.
@@ -767,7 +779,7 @@ async Task performProcessing()
767779
{
768780
try
769781
{
770-
consumer = CreateConsumer(ConsumerGroup, partition.PartitionId, $"P{ partition.PartitionId }-{ Identifier }", startingPosition, connection, Options);
782+
consumer = CreateConsumer(ConsumerGroup, partition.PartitionId, $"P{ partition.PartitionId }-{ Identifier }", startingPosition, connection, Options, true);
771783

772784
// Register for notification when the cancellation token is triggered. Attempt to close the consumer
773785
// in response to force-close the link and short-circuit any receive operation that is blocked and
@@ -2029,12 +2041,17 @@ private async Task ValidateEventHubsConnectionAsync(CancellationToken cancellati
20292041
// To ensure validity of the requested consumer group and that at least one partition exists,
20302042
// attempt to read from a partition.
20312043

2032-
var consumer = CreateConsumer(ConsumerGroup, properties.PartitionIds[0], "validate", EventPosition.Earliest, connection, Options);
2044+
var consumer = CreateConsumer(ConsumerGroup, properties.PartitionIds[0], $"SV-{ Identifier }", EventPosition.Earliest, connection, Options, false);
20332045

20342046
try
20352047
{
20362048
await consumer.ReceiveAsync(1, TimeSpan.FromMilliseconds(5), cancellationToken).ConfigureAwait(false);
20372049
}
2050+
catch (EventHubsException ex) when (ex.Reason == EventHubsException.FailureReason.ConsumerDisconnected)
2051+
{
2052+
// This is expected when another processor is already running; no action is needed, as it
2053+
// validates that the reader was able to connect.
2054+
}
20382055
finally
20392056
{
20402057
await consumer.CloseAsync(cancellationToken).ConfigureAwait(false);

sdk/eventhub/Azure.Messaging.EventHubs/tests/Primitives/EventProcessorTests.Infrastructure.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ public async Task ReadLastEnqueuedEventPropertiesReadsPropertiesWhenThePartition
7575
.Returns(mockConnection.Object);
7676

7777
mockProcessor
78-
.Setup(processor => processor.CreateConsumer(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<EventPosition>(), It.IsAny<EventHubConnection>(), It.IsAny<EventProcessorOptions>()))
78+
.Setup(processor => processor.CreateConsumer(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<EventPosition>(), It.IsAny<EventHubConnection>(), It.IsAny<EventProcessorOptions>(), It.IsAny<bool>()))
7979
.Returns(Mock.Of<SettableTransportConsumer>());
8080

8181
mockProcessor
@@ -132,7 +132,7 @@ public async Task ReadLastEnqueuedEventPropertiesThrowsWhenThePartitionIsNotOwne
132132
.Returns(mockConnection.Object);
133133

134134
mockProcessor
135-
.Setup(processor => processor.CreateConsumer(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<EventPosition>(), It.IsAny<EventHubConnection>(), It.IsAny<EventProcessorOptions>()))
135+
.Setup(processor => processor.CreateConsumer(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<EventPosition>(), It.IsAny<EventHubConnection>(), It.IsAny<EventProcessorOptions>(), It.IsAny<bool>()))
136136
.Returns(Mock.Of<SettableTransportConsumer>());
137137

138138
mockProcessor

0 commit comments

Comments
 (0)