Skip to content

Commit c3d77a4

Browse files
authored
[Event Hubs] Minor Fixes (#24590)
The focus of these changes is a set of minor fixes, including: - Fixing the flaky `LoadBalancingAppliesTheGreedyStrategy` test - Logging the selected event position when a partition is initialized - Removing an outdated and incorrect set of remarks.
1 parent 8b7faa7 commit c3d77a4

File tree

5 files changed

+81
-8
lines changed

5 files changed

+81
-8
lines changed

sdk/eventhub/Azure.Messaging.EventHubs/src/Consumer/EventPosition.cs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,6 @@ public struct EventPosition : IEquatable<EventPosition>
4343
///
4444
/// <value>Expected to be <c>null</c> if the event position represents a sequence number or enqueue time.</value>
4545
///
46-
/// <remarks>
47-
/// The offset is the relative position for event in the context of the stream. The offset
48-
/// should not be considered a stable value, as the same offset may refer to a different event
49-
/// as events reach the age limit for retention and are no longer visible within the stream.
50-
/// </remarks>
51-
///
5246
internal string Offset { get; set; }
5347

5448
/// <summary>

sdk/eventhub/Azure.Messaging.EventHubs/src/Diagnostics/EventHubsEventSource.cs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1619,5 +1619,28 @@ public virtual void EventProcessorHighPartitionOwnershipWarning(string identifie
16191619
WriteEvent(104, identifier ?? string.Empty, eventHubName ?? string.Empty, totalPartitionCount, ownedPartitionCount, maximumAdvisedCount);
16201620
}
16211621
}
1622+
1623+
/// <summary>
1624+
/// Indicates that an <see cref="EventProcessor{TPartition}" /> instance has taken ownership of a partition and is actively processing it.
1625+
/// </summary>
1626+
///
1627+
/// <param name="partitionId">The identifier of the Event Hub partition whose processing is starting.</param>
1628+
/// <param name="identifier">A unique name used to identify the event processor.</param>
1629+
/// <param name="eventHubName">The name of the Event Hub that the processor is associated with.</param>
1630+
/// <param name="consumerGroup">The name of the consumer group that the processor is associated with.</param>
1631+
/// <param name="startingPosition">The position in the event stream that reading will start from.</param>
1632+
///
1633+
[Event(105, Level = EventLevel.Verbose, Message = "The processor instance with identifier '{1}' for Event Hub: {2} and Consumer Group: {3} is initializing partition '{0}' with starting position: [{4}]")]
1634+
public virtual void EventProcessorPartitionProcessingEventPositionDetermined(string partitionId,
1635+
string identifier,
1636+
string eventHubName,
1637+
string consumerGroup,
1638+
string startingPosition)
1639+
{
1640+
if (IsEnabled())
1641+
{
1642+
WriteEvent(105, partitionId ?? string.Empty, identifier ?? string.Empty, eventHubName ?? string.Empty, consumerGroup ?? string.Empty, startingPosition ?? string.Empty);
1643+
}
1644+
}
16221645
}
16231646
}

sdk/eventhub/Azure.Messaging.EventHubs/src/Primitives/EventProcessor.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -690,6 +690,8 @@ async Task performProcessing()
690690
_ => await InitializePartitionForProcessingAsync(partition, cancellationSource.Token).ConfigureAwait(false)
691691
};
692692

693+
Logger.EventProcessorPartitionProcessingEventPositionDetermined(partition.PartitionId, Identifier, EventHubName, ConsumerGroup, startingPosition.ToString());
694+
693695
// Create the connection to be used for spawning consumers; if the creation
694696
// fails, then consider the processing task to be failed. The main processing
695697
// loop will take responsibility for attempting to restart or relinquishing ownership.

sdk/eventhub/Azure.Messaging.EventHubs/tests/Primitives/EventProcessorTests.MainProcessingLoop.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1871,10 +1871,10 @@ public async Task LoadBalancingAppliesTheGreedyStrategy()
18711871
Assert.That(ownedPartitions.Contains(firstPartiton), Is.True, "The first partition should be owned.");
18721872
Assert.That(ownedPartitions.Contains(secondPartition), Is.True, "The second partition should be owned.");
18731873

1874-
// Due to non-determinism, the exact number of cycle is not known; allow for a small range.
1874+
// Due to non-determinism, the exact number of cycle is not known.
18751875

18761876
mockLoadBalancer
1877-
.Verify(lb => lb.RunLoadBalancingAsync(partitionIds, It.IsAny<CancellationToken>()), Times.Between(3, 6, Moq.Range.Inclusive), "The load balancer did not run the correct number of cycles.");
1877+
.Verify(lb => lb.RunLoadBalancingAsync(partitionIds, It.IsAny<CancellationToken>()), Times.AtLeast(3), "The load balancer did not run the correct number of cycles.");
18781878
}
18791879

18801880
/// <summary>

sdk/eventhub/Azure.Messaging.EventHubs/tests/Primitives/EventProcessorTests.PartitionProcessing.cs

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1526,6 +1526,60 @@ public async Task CreatePartitionProcessorProcessingTaskStartsTheConsumerAtTheCo
15261526
Times.Once);
15271527
}
15281528

1529+
/// <summary>
1530+
/// Verifies functionality of the <see cref="EventProcessor{TPartition}.CreatePartitionProcessor" />
1531+
/// method.
1532+
/// </summary>
1533+
///
1534+
[Test]
1535+
public async Task CreatePartitionProcessorProcessingTaskLogsTheStartingPosition()
1536+
{
1537+
using var cancellationSource = new CancellationTokenSource();
1538+
cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit);
1539+
1540+
var partition = new EventProcessorPartition { PartitionId = "4" };
1541+
var expectedEevntPosition = EventPosition.FromSequenceNumber(332);
1542+
var completionSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
1543+
var mockLogger = new Mock<EventHubsEventSource>();
1544+
var mockConnection = Mock.Of<EventHubConnection>();
1545+
var mockConsumer = new Mock<SettableTransportConsumer>();
1546+
var mockProcessor = new Mock<EventProcessor<EventProcessorPartition>>(5, "consumerGroup", "namespace", "eventHub", Mock.Of<TokenCredential>(), new EventProcessorOptions()) { CallBase = true };
1547+
1548+
mockLogger
1549+
.Setup(log => log.EventProcessorPartitionProcessingEventPositionDetermined(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>()))
1550+
.Callback(() => completionSource.TrySetResult(true));
1551+
1552+
mockConsumer
1553+
.Setup(consumer => consumer.ReceiveAsync(It.IsAny<int>(), It.IsAny<TimeSpan?>(), It.IsAny<CancellationToken>()))
1554+
.ReturnsAsync(EmptyBatch);
1555+
1556+
mockProcessor.Object.Logger = mockLogger.Object;
1557+
1558+
mockProcessor
1559+
.Setup(processor => processor.CreateConnection())
1560+
.Returns(mockConnection);
1561+
1562+
mockProcessor
1563+
.Setup(processor => processor.CreateConsumer(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<EventPosition>(), mockConnection, It.IsAny<EventProcessorOptions>()))
1564+
.Returns(mockConsumer.Object);
1565+
1566+
var partitionProcessor = mockProcessor.Object.CreatePartitionProcessor(partition, cancellationSource, expectedEevntPosition);
1567+
1568+
await Task.WhenAny(completionSource.Task, Task.Delay(Timeout.Infinite, cancellationSource.Token));
1569+
Assert.That(cancellationSource.IsCancellationRequested, Is.False, "The cancellation token should not have been signaled.");
1570+
1571+
mockLogger
1572+
.Verify(log => log.EventProcessorPartitionProcessingEventPositionDetermined(
1573+
partition.PartitionId,
1574+
mockProcessor.Object.Identifier,
1575+
mockProcessor.Object.EventHubName,
1576+
mockProcessor.Object.ConsumerGroup,
1577+
expectedEevntPosition.ToString()),
1578+
Times.Once);
1579+
1580+
cancellationSource.Cancel();
1581+
}
1582+
15291583
/// <summary>
15301584
/// Verifies functionality of the <see cref="EventProcessor{TPartition}.CreatePartitionProcessor" />
15311585
/// method.

0 commit comments

Comments
 (0)