Skip to content

Commit 807ecd0

Browse files
authored
[Event Hubs Client] Capture Unobserved AMQP Link Close Errors (Azure#21426)
The AMQP transport library used by Event Hubs manages aspects of the transport in the background, notably the flow of data into the prefetch queue. In doing so it controls attempts to read data, and detecting connection/link failures to provide resiliency for callers. Access to the AMQP link is managed through the `FaultTolerantAmqpObject<T>` which uses a `GetOrCreate` pattern to handle faults and provide a resilient experience for callers. When a link was discarded due to a fault, it was not observable to clients and a new link would be created on the next `GetOrCreate` call. In the majority of scenarios, this behavior is desirable. However, in some scenarios - such as a partition being stolen by another event consumer asserting exclusive access, the failure must be surfaced to callers so that the correct remidiation can be taken. The focus of these changes is to ensure that errors causing link termination are surfaced via logging and that a stolen partition is treated as a special case and is always surfaced to callers. Because this behavior was a direct impact to the load balancing aspects of the event processor, testing exposed some additional load balancing gaps that were also addressed. Lost partitions will now be detected more reliably and the processor will proactively relinquish ownership and allow load balancing to make recovery decisions.
1 parent 554d35b commit 807ecd0

25 files changed

+1559
-123
lines changed

sdk/eventhub/Azure.Messaging.EventHubs.Processor/CHANGELOG.md

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@
22

33
## 5.5.0-beta.1 (Unreleased)
44

5-
## 5.5.0-beta.1 (Unreleased)
6-
75
### Acknowledgments
86

97
Thank you to our developer community members who helped to make the Event Hubs client libraries better with their contributions to this release:
@@ -14,7 +12,15 @@ Thank you to our developer community members who helped to make the Event Hubs c
1412

1513
#### New Features
1614

17-
- When stopping, the `EventProcessorClient` will now attempt to force-close the connection to the Event Hubs service to abort in-process read operations blocked on their timeout. This should significantly help reduce the amount of time the processor takes to stop in many scenarios. _(Based on a community prototype contribution, courtesy of [danielmarbach](https://github.com/danielmarbach))_
15+
- When stopping, the `EventProcessorClient` will now attempt to force-close the connection to the Event Hubs service to abort in-process read operations blocked on their timeout. This should significantly help reduce the amount of time the processor takes to stop in many scenarios. _(Based on a community prototype contribution, courtesy of [danielmarbach](https://github.com/danielmarbach))_
16+
17+
- When the `EventProcessorClient` detects a partition being stolen outside of a load balancing cycle, it will immediately surrender ownership rather than waiting for a load balancing cycle to confirm the ownership change. This will help reduce event duplication from overlapping ownership of processors.
18+
19+
#### Key Bug Fixes
20+
21+
- The `EventProcessorClient` will now properly respect another another consumer stealing ownership of a partition when the service forcibly terminates the active link in the background. Previously, the client did not observe the error directly and attempted to recover the faulted link which reasserted ownership and caused the partition to "bounce" between owners until a load balancing cycle completed.
22+
23+
- The `EventProcessorClient` will now be less aggressive when considering whether or not to steal a partition, doing so only when it will correct an imbalance and preferring the status quo when the overall distribution would not change. This will help reduce event duplication due to partitions moving between owners.
1824

1925
## 5.4.1 (2021-05-11)
2026

sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Processor/PartitionLoadBalancer.cs

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// Licensed under the MIT License.
33

44
using System;
5+
using System.Collections.Concurrent;
56
using System.Collections.Generic;
67
using System.Linq;
78
using System.Threading;
@@ -99,7 +100,7 @@ internal class PartitionLoadBalancer
99100
/// The set of partition ownership the associated event processor owns. Partition ids are used as keys.
100101
/// </summary>
101102
///
102-
private Dictionary<string, EventProcessorPartitionOwnership> InstanceOwnership { get; set; } = new Dictionary<string, EventProcessorPartitionOwnership>();
103+
private ConcurrentDictionary<string, EventProcessorPartitionOwnership> InstanceOwnership { get; set; } = new ConcurrentDictionary<string, EventProcessorPartitionOwnership>();
103104

104105
/// <summary>
105106
/// Initializes a new instance of the <see cref="PartitionLoadBalancer" /> class.
@@ -297,6 +298,16 @@ public virtual async Task RelinquishOwnershipAsync(CancellationToken cancellatio
297298
InstanceOwnership.Clear();
298299
}
299300

301+
/// <summary>
302+
/// Allows reporting that a partition was stolen by another event consumer causing ownership
303+
/// to be considered relinquished until the next load balancing cycle reconciles with persisted
304+
/// state.
305+
/// </summary>
306+
///
307+
/// <param name="partitionId">The identifier of the partition that was stolen.</param>
308+
///
309+
public virtual void ReportPartitionStolen(string partitionId) => InstanceOwnership.TryRemove(partitionId, out _);
310+
300311
/// <summary>
301312
/// Finds and tries to claim an ownership if this processor instance is eligible to increase its ownership list.
302313
/// </summary>
@@ -389,9 +400,11 @@ public virtual async Task RelinquishOwnershipAsync(CancellationToken cancellatio
389400
}
390401
}
391402

392-
// If this processor has less than the minimum or any other processor has more than the maximum, then we need to steal a partition.
403+
// If this processor has less than the minimum or it has less than the maximum at the same time another processor has more than the
404+
// maximum, then we need to steal a partition.
393405

394-
if ((ownedPartitionsCount < minimumOwnedPartitionsCount) || (partitionsOwnedByProcessorWithGreaterThanMaximumOwnedPartitionsCount.Count > 0))
406+
if ((ownedPartitionsCount < minimumOwnedPartitionsCount)
407+
|| (ownedPartitionsCount < maximumOwnedPartitionsCount && partitionsOwnedByProcessorWithGreaterThanMaximumOwnedPartitionsCount.Count > 0))
395408
{
396409
Logger.ShouldStealPartition(OwnerIdentifier);
397410

@@ -476,7 +489,7 @@ private async Task RenewOwnershipAsync(CancellationToken cancellationToken)
476489

477490
foreach (var oldOwnership in ownershipToRenew)
478491
{
479-
InstanceOwnership.Remove(oldOwnership.PartitionId);
492+
InstanceOwnership.TryRemove(oldOwnership.PartitionId, out _);
480493
}
481494

482495
foreach (var newOwnership in newOwnerships)
@@ -520,7 +533,6 @@ private async Task RenewOwnershipAsync(CancellationToken cancellationToken)
520533
CancellationToken cancellationToken)
521534
{
522535
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
523-
524536
Logger.ClaimOwnershipStart(partitionId);
525537

526538
// We need the eTag from the most recent ownership of this partition, even if it's expired. We want to keep the offset and

sdk/eventhub/Azure.Messaging.EventHubs.Shared/tests/Processor/PartitionLoadBalancerTests.cs

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -914,6 +914,112 @@ public async Task RunLoadBalancingAsyncDoesNotRenewFreshPartitions()
914914
Assert.That(storageManager.TotalRenewals, Is.EqualTo(8), "There should be 4 initial claims and 4 renew claims");
915915
}
916916

917+
/// <summary>
918+
/// Verifies functionality of the <see cref="PartitionLoadBalancer" /> when a partition is
919+
/// reported stolen.
920+
/// </summary>
921+
///
922+
[Test]
923+
public async Task ReportPartitionStolenAbandonsOwnership()
924+
{
925+
const int NumberOfPartitions = 3;
926+
927+
var partitionIds = Enumerable.Range(1, NumberOfPartitions).Select(p => p.ToString()).ToArray();
928+
var storageManager = new InMemoryStorageManager();
929+
var loadBalancer = new PartitionLoadBalancer(storageManager, Guid.NewGuid().ToString(), ConsumerGroup, FullyQualifiedNamespace, EventHubName, TimeSpan.FromMinutes(1), TimeSpan.FromSeconds(10));
930+
931+
// Assume ownership of all partitions
932+
933+
await storageManager.ClaimOwnershipAsync(CreatePartitionOwnership(partitionIds, loadBalancer.OwnerIdentifier));
934+
await loadBalancer.RunLoadBalancingAsync(partitionIds, CancellationToken.None);
935+
936+
Assert.That(loadBalancer.OwnedPartitionIds, Is.EquivalentTo(partitionIds), "The load balancer should own all partitions.");
937+
938+
// Report the first partition stolen and validate that it is immediately abandoned.
939+
940+
var firstPartition = partitionIds.First();
941+
partitionIds = partitionIds.Skip(1).ToArray();
942+
Assert.That(partitionIds, Does.Not.Contain(firstPartition), "The first partition should no longer exist in the set of ids.");
943+
944+
loadBalancer.ReportPartitionStolen(firstPartition);
945+
Assert.That(loadBalancer.OwnedPartitionIds, Does.Not.Contain(firstPartition), "The load balancer should not own the first partition after it was stolen.");
946+
Assert.That(loadBalancer.OwnedPartitionIds, Is.EquivalentTo(partitionIds), "The load balancer should own all but the first partition.");
947+
}
948+
949+
/// <summary>
950+
/// Verifies functionality of the <see cref="PartitionLoadBalancer" /> when a partition is
951+
/// reported stolen.
952+
/// </summary>
953+
///
954+
[Test]
955+
public async Task LoadBalancerDoesNotReclaimStolenPartitionIfStorageAgrees()
956+
{
957+
const int NumberOfPartitions = 3;
958+
959+
var partitionIds = Enumerable.Range(1, NumberOfPartitions).Select(p => p.ToString()).ToArray();
960+
var storageManager = new InMemoryStorageManager();
961+
var loadBalancer = new PartitionLoadBalancer(storageManager, Guid.NewGuid().ToString(), ConsumerGroup, FullyQualifiedNamespace, EventHubName, TimeSpan.FromMinutes(1), TimeSpan.FromSeconds(10));
962+
963+
// Assume ownership of all partitions
964+
965+
await storageManager.ClaimOwnershipAsync(CreatePartitionOwnership(partitionIds, loadBalancer.OwnerIdentifier));
966+
await loadBalancer.RunLoadBalancingAsync(partitionIds, CancellationToken.None);
967+
968+
Assert.That(loadBalancer.OwnedPartitionIds, Is.EquivalentTo(partitionIds), "The load balancer should own all partitions.");
969+
970+
// Report the first partition stolen and validate that it is immediately abandoned.
971+
972+
var firstPartition = partitionIds.First();
973+
974+
loadBalancer.ReportPartitionStolen(firstPartition);
975+
Assert.That(loadBalancer.OwnedPartitionIds, Does.Not.Contain(firstPartition), "The load balancer should not own the first partition after it was stolen.");
976+
977+
// Update storage to reflect that the first partition is not owned.
978+
979+
var ownership = (await storageManager.ListOwnershipAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup)).Single(item => item.PartitionId == firstPartition);
980+
ownership.OwnerIdentifier = "another-processor";
981+
982+
await storageManager.ClaimOwnershipAsync(new[] { ownership });
983+
await loadBalancer.RunLoadBalancingAsync(partitionIds, CancellationToken.None);
984+
985+
Assert.That(loadBalancer.OwnedPartitionIds, Does.Not.Contain(firstPartition), "The load balancer should not own the first partition after load balancing.");
986+
}
987+
988+
/// <summary>
989+
/// Verifies functionality of the <see cref="PartitionLoadBalancer" /> when a partition is
990+
/// reported stolen.
991+
/// </summary>
992+
///
993+
[Test]
994+
public async Task LoadBalancerReclaimsStolenPartitionIfStorageDisagrees()
995+
{
996+
const int NumberOfPartitions = 3;
997+
998+
var partitionIds = Enumerable.Range(1, NumberOfPartitions).Select(p => p.ToString()).ToArray();
999+
var storageManager = new InMemoryStorageManager();
1000+
var loadBalancer = new PartitionLoadBalancer(storageManager, Guid.NewGuid().ToString(), ConsumerGroup, FullyQualifiedNamespace, EventHubName, TimeSpan.FromMinutes(1), TimeSpan.FromSeconds(10));
1001+
1002+
// Assume ownership of all partitions
1003+
1004+
await storageManager.ClaimOwnershipAsync(CreatePartitionOwnership(partitionIds, loadBalancer.OwnerIdentifier));
1005+
await loadBalancer.RunLoadBalancingAsync(partitionIds, CancellationToken.None);
1006+
1007+
Assert.That(loadBalancer.OwnedPartitionIds, Is.EquivalentTo(partitionIds), "The load balancer should own all partitions.");
1008+
1009+
// Report the first partition stolen and validate that it is immediately abandoned.
1010+
1011+
var firstPartition = partitionIds.First();
1012+
1013+
loadBalancer.ReportPartitionStolen(firstPartition);
1014+
Assert.That(loadBalancer.OwnedPartitionIds, Does.Not.Contain(firstPartition), "The load balancer should not own the first partition after it was stolen.");
1015+
1016+
// Storage still reflects ownership of the stolen partition; running load balancing should
1017+
// reclaim it.
1018+
1019+
await loadBalancer.RunLoadBalancingAsync(partitionIds, CancellationToken.None);
1020+
Assert.That(loadBalancer.OwnedPartitionIds, Is.EquivalentTo(partitionIds), "The load balancer should own all partitions after considering storage.");
1021+
}
1022+
9171023
/// <summary>
9181024
/// Creates a collection of <see cref="PartitionOwnership" /> based on the specified arguments.
9191025
/// </summary>

sdk/eventhub/Azure.Messaging.EventHubs/CHANGELOG.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,20 @@ Thank you to our developer community members who helped to make the Event Hubs c
1414

1515
- When stopping, the `EventProcessor<TPartition>` will now attempt to force-close the connection to the Event Hubs service to abort in-process read operations blocked on their timeout. This should significantly help reduce the amount of time the processor takes to stop in many scenarios. _(Based on a community prototype contribution, courtesy of [danielmarbach](https://github.com/danielmarbach))_
1616

17+
- When the `EventProcessor<TPartition>` detects a partition being stolen outside of a load balancing cycle, it will immediately surrender ownership rather than waiting for a load balancing cycle to confirm the ownership change. This will help reduce event duplication from overlapping ownership of processors.
18+
19+
- The `EventProcessor<TPartition>` now exposes the `ListPartitionIdsAsync` method, allowing custom processors to control the set of partitions known to the processor. This can be used to reduce complexity when a custom processor is directly assigned a set of partitions to process rather than using load balancing to control ownership.
20+
21+
- Additional verbose logging has been added to allow monitoring of lower-level AMQP operations such as creating links, terminal exceptions that fault a link without an active operation, and when the service force-closes links.
22+
23+
#### Key Bug Fixes
24+
25+
- The `EventProcessor<TPartition>` will now properly respect another another consumer stealing ownership of a partition when the service forcibly terminates the active link in the background. Previously, the client did not observe the error directly and attempted to recover the faulted link which reasserted ownership and caused the partition to "bounce" between owners until a load balancing cycle completed.
26+
27+
- The `EventProcessor<TPartition>` will now be less aggressive when considering whether or not to steal a partition, doing so only when it will correct an imbalance and preferring the status quo when the overall distribution would not change. This will help reduce event duplication due to partitions moving between owners.
28+
29+
- The `EventHubConsumerClient` and `PartitionReceiver` will now properly surface an exception when another another consumer stealing ownership of a partition when the service forcibly terminates the active link in the background. Previously, the client did not observe the error directly and did not make callers attempted to recover the faulted link which reasserted ownership and caused the partition to "bounce" between owners until a load balancing cycle completed.
30+
1731
## 5.4.1 (2021-05-11)
1832

1933
### Changes

sdk/eventhub/Azure.Messaging.EventHubs/api/Azure.Messaging.EventHubs.netstandard2.0.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -367,6 +367,7 @@ protected EventProcessor(int eventBatchMaximumCount, string consumerGroup, strin
367367
public override int GetHashCode() { throw null; }
368368
protected abstract System.Threading.Tasks.Task<System.Collections.Generic.IEnumerable<Azure.Messaging.EventHubs.Primitives.EventProcessorCheckpoint>> ListCheckpointsAsync(System.Threading.CancellationToken cancellationToken);
369369
protected abstract System.Threading.Tasks.Task<System.Collections.Generic.IEnumerable<Azure.Messaging.EventHubs.Primitives.EventProcessorPartitionOwnership>> ListOwnershipAsync(System.Threading.CancellationToken cancellationToken);
370+
protected virtual System.Threading.Tasks.Task<string[]> ListPartitionIdsAsync(Azure.Messaging.EventHubs.EventHubConnection connection, System.Threading.CancellationToken cancellationToken) { throw null; }
370371
protected virtual System.Threading.Tasks.Task OnInitializingPartitionAsync(TPartition partition, System.Threading.CancellationToken cancellationToken) { throw null; }
371372
protected virtual System.Threading.Tasks.Task OnPartitionProcessingStoppedAsync(TPartition partition, Azure.Messaging.EventHubs.Processor.ProcessingStoppedReason reason, System.Threading.CancellationToken cancellationToken) { throw null; }
372373
protected abstract System.Threading.Tasks.Task OnProcessingErrorAsync(System.Exception exception, TPartition partition, string operationDescription, System.Threading.CancellationToken cancellationToken);

sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpClient.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,7 @@ protected AmqpClient(string host,
182182
{
183183
link.Session?.SafeClose();
184184
link.SafeClose();
185+
EventHubsEventSource.Log.FaultTolerantAmqpObjectClose(nameof(RequestResponseAmqpLink), "", EventHubName, "", "", link.TerminalException?.Message);
185186
});
186187
}
187188
finally
@@ -438,6 +439,7 @@ public override TransportProducer CreateProducer(string partitionId,
438439
/// <param name="eventPosition">The position within the partition where the consumer should begin reading events.</param>
439440
/// <param name="retryPolicy">The policy which governs retry behavior and try timeouts.</param>
440441
/// <param name="trackLastEnqueuedEventProperties">Indicates whether information on the last enqueued event on the partition is sent as events are received.</param>
442+
/// <param name="invalidateConsumerWhenPartitionStolen">Indicates whether or not the consumer should consider itself invalid when a partition is stolen.</param>
441443
/// <param name="ownerLevel">The relative priority to associate with the link; for a non-exclusive link, this value should be <c>null</c>.</param>
442444
/// <param name="prefetchCount">Controls the number of events received and queued locally without regard to whether an operation was requested. If <c>null</c> a default will be used.</param>
443445
/// <param name="prefetchSizeInBytes">The cache size of the prefetch queue. When set, the link makes a best effort to ensure prefetched messages fit into the specified size.</param>
@@ -449,6 +451,7 @@ public override TransportConsumer CreateConsumer(string consumerGroup,
449451
EventPosition eventPosition,
450452
EventHubsRetryPolicy retryPolicy,
451453
bool trackLastEnqueuedEventProperties,
454+
bool invalidateConsumerWhenPartitionStolen,
452455
long? ownerLevel,
453456
uint? prefetchCount,
454457
long? prefetchSizeInBytes)
@@ -462,6 +465,7 @@ public override TransportConsumer CreateConsumer(string consumerGroup,
462465
partitionId,
463466
eventPosition,
464467
trackLastEnqueuedEventProperties,
468+
invalidateConsumerWhenPartitionStolen,
465469
ownerLevel,
466470
prefetchCount,
467471
prefetchSizeInBytes,

0 commit comments

Comments
 (0)