Skip to content

Commit 044c5b6

Browse files
authored
[Event Hubs] Enrich processor log (Azure#35169)
* [Event Hubs] Enrich processor log The focus of these changes is to add the source of the event position chosen for a partition reader to the log, to help troubleshooting to identify whether a checkpoint was found or a default was used. * Resolving reference issue and applying feedback
1 parent 9fad17a commit 044c5b6

File tree

6 files changed

+23
-16
lines changed

6 files changed

+23
-16
lines changed

sdk/eventhub/Azure.Messaging.EventHubs.Processor/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212

1313
### Other Changes
1414

15+
- Enhanced the log emitted when an event processor begins reading from a partition to report whether the offset chosen was based on a checkpoint or default value.
16+
1517
## 5.8.1 (2023-03-09)
1618

1719
### Bugs Fixed

sdk/eventhub/Azure.Messaging.EventHubs/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414

1515
### Other Changes
1616

17+
- Enhanced the log emitted when an event processor begins reading from a partition to report whether the offset chosen was based on a checkpoint or default value.
18+
1719
## 5.8.1 (2023-03-09)
1820

1921
### Other Changes

sdk/eventhub/Azure.Messaging.EventHubs/src/Diagnostics/EventHubsEventSource.cs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2144,17 +2144,20 @@ public virtual void EventProcessorHighPartitionOwnershipWarning(string identifie
21442144
/// <param name="eventHubName">The name of the Event Hub that the processor is associated with.</param>
21452145
/// <param name="consumerGroup">The name of the consumer group that the processor is associated with.</param>
21462146
/// <param name="startingPosition">The position in the event stream that reading will start from.</param>
2147+
/// <param name="checkpointUsed"><c>true</c> if a checkpoint was used for the position; otherwise, <c>false</c>.</param>
21472148
///
2148-
[Event(105, Level = EventLevel.Verbose, Message = "The processor instance with identifier '{1}' for Event Hub: {2} and Consumer Group: {3} is initializing partition '{0}' with starting position: [{4}]")]
2149+
[Event(105, Level = EventLevel.Verbose, Message = "The processor instance with identifier '{1}' for Event Hub: {2} and Consumer Group: {3} is initializing partition '{0}' with starting position: [{4}]. Position chosen by {5}.")]
21492150
public virtual void EventProcessorPartitionProcessingEventPositionDetermined(string partitionId,
21502151
string identifier,
21512152
string eventHubName,
21522153
string consumerGroup,
2153-
string startingPosition)
2154+
string startingPosition,
2155+
bool checkpointUsed)
21542156
{
21552157
if (IsEnabled())
21562158
{
2157-
WriteEvent(105, partitionId ?? string.Empty, identifier ?? string.Empty, eventHubName ?? string.Empty, consumerGroup ?? string.Empty, startingPosition ?? string.Empty);
2159+
var selectionBasedOn = checkpointUsed ? "checkpoint" : "default";
2160+
WriteEvent(105, partitionId ?? string.Empty, identifier ?? string.Empty, eventHubName ?? string.Empty, consumerGroup ?? string.Empty, startingPosition ?? string.Empty, selectionBasedOn);
21582161
}
21592162
}
21602163

sdk/eventhub/Azure.Messaging.EventHubs/src/Primitives/EventProcessor.cs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -743,13 +743,13 @@ async Task performProcessing()
743743
// was passed. In the event that initialization is run and encounters an
744744
// exception, it takes responsibility for firing the error handler.
745745

746-
var startingPosition = startingPositionOverride switch
746+
var (startingPosition, checkpointUsed) = startingPositionOverride switch
747747
{
748-
_ when startingPositionOverride.HasValue => startingPositionOverride.Value,
748+
_ when startingPositionOverride.HasValue => (startingPositionOverride.Value, false),
749749
_ => await InitializePartitionForProcessingAsync(partition, cancellationSource.Token).ConfigureAwait(false)
750750
};
751751

752-
Logger.EventProcessorPartitionProcessingEventPositionDetermined(partition.PartitionId, Identifier, EventHubName, ConsumerGroup, startingPosition.ToString());
752+
Logger.EventProcessorPartitionProcessingEventPositionDetermined(partition.PartitionId, Identifier, EventHubName, ConsumerGroup, startingPosition.ToString(), checkpointUsed);
753753

754754
// Create the connection to be used for spawning consumers; if the creation
755755
// fails, then consider the processing task to be failed. The main processing
@@ -1795,15 +1795,15 @@ await Task.WhenAll(LoadBalancer.OwnedPartitionIds
17951795
/// <param name="partition">The partition to initialize.</param>
17961796
/// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel the operation.</param>
17971797
///
1798-
/// <returns>The <see cref="EventPosition" /> to start processing from.</returns>
1798+
/// <returns>A tuple containing the <see cref="EventPosition" /> to start processing from and whether the position was based on a checkpoint or not.</returns>
17991799
///
18001800
/// <remarks>
18011801
/// This method will invoke the error handler should an exception be encountered; the
18021802
/// exception will then be bubbled to callers.
18031803
/// </remarks>
18041804
///
1805-
private async Task<EventPosition> InitializePartitionForProcessingAsync(TPartition partition,
1806-
CancellationToken cancellationToken)
1805+
private async Task<(EventPosition Position, bool CheckpointUsed)> InitializePartitionForProcessingAsync(TPartition partition,
1806+
CancellationToken cancellationToken)
18071807
{
18081808
var operationDescription = Resources.OperationClaimOwnership;
18091809

@@ -1824,10 +1824,10 @@ private async Task<EventPosition> InitializePartitionForProcessingAsync(TPartiti
18241824

18251825
if (checkpoint != null)
18261826
{
1827-
return checkpoint.StartingPosition;
1827+
return (checkpoint.StartingPosition, true);
18281828
}
18291829

1830-
return Options.DefaultStartingPosition;
1830+
return (Options.DefaultStartingPosition, false);
18311831
}
18321832
catch (Exception ex)
18331833
{

sdk/eventhub/Azure.Messaging.EventHubs/stress/src/Azure.Messaging.EventHubs.Stress.csproj

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88

99
<ItemGroup>
1010
<PackageReference Include="System.Memory.Data" />
11-
<PackageReference Include="Azure.Storage.Blobs" />
1211
<PackageReference Include="Microsoft.ApplicationInsights" />
1312
<PackageReference Include="CommandLineParser" />
1413
</ItemGroup>

sdk/eventhub/Azure.Messaging.EventHubs/tests/Primitives/EventProcessorTests.PartitionProcessing.cs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1628,15 +1628,15 @@ public async Task CreatePartitionProcessorProcessingTaskLogsTheStartingPosition(
16281628
cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit);
16291629

16301630
var partition = new EventProcessorPartition { PartitionId = "4" };
1631-
var expectedEevntPosition = EventPosition.FromSequenceNumber(332);
1631+
var expectedEventPosition = EventPosition.FromSequenceNumber(332);
16321632
var completionSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
16331633
var mockLogger = new Mock<EventHubsEventSource>();
16341634
var mockConnection = Mock.Of<EventHubConnection>();
16351635
var mockConsumer = new Mock<SettableTransportConsumer>();
16361636
var mockProcessor = new Mock<EventProcessor<EventProcessorPartition>>(5, "consumerGroup", "namespace", "eventHub", Mock.Of<TokenCredential>(), new EventProcessorOptions()) { CallBase = true };
16371637

16381638
mockLogger
1639-
.Setup(log => log.EventProcessorPartitionProcessingEventPositionDetermined(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>()))
1639+
.Setup(log => log.EventProcessorPartitionProcessingEventPositionDetermined(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<bool>()))
16401640
.Callback(() => completionSource.TrySetResult(true));
16411641

16421642
mockConsumer
@@ -1653,7 +1653,7 @@ public async Task CreatePartitionProcessorProcessingTaskLogsTheStartingPosition(
16531653
.Setup(processor => processor.CreateConsumer(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<EventPosition>(), mockConnection, It.IsAny<EventProcessorOptions>(), It.IsAny<bool>()))
16541654
.Returns(mockConsumer.Object);
16551655

1656-
var partitionProcessor = mockProcessor.Object.CreatePartitionProcessor(partition, cancellationSource, expectedEevntPosition);
1656+
var partitionProcessor = mockProcessor.Object.CreatePartitionProcessor(partition, cancellationSource, expectedEventPosition);
16571657

16581658
await Task.WhenAny(completionSource.Task, Task.Delay(Timeout.Infinite, cancellationSource.Token));
16591659
Assert.That(cancellationSource.IsCancellationRequested, Is.False, "The cancellation token should not have been signaled.");
@@ -1664,7 +1664,8 @@ public async Task CreatePartitionProcessorProcessingTaskLogsTheStartingPosition(
16641664
mockProcessor.Object.Identifier,
16651665
mockProcessor.Object.EventHubName,
16661666
mockProcessor.Object.ConsumerGroup,
1667-
expectedEevntPosition.ToString()),
1667+
expectedEventPosition.ToString(),
1668+
false),
16681669
Times.Once);
16691670

16701671
cancellationSource.Cancel();

0 commit comments

Comments
 (0)