Skip to content

Commit f8dbe83

Browse files
authored
[Event Hubs] Log improvements + Amqp version bump (Azure#38319)
* [Event Hubs] Log improvements + Amqp version bump The focus of these changes is to improve logs for the event processor types and AMQP consumer. The most notable change is the addition of the first and last sequence number of the batch when reading from Event Hubs and dispatching events to be processed. The version of the AMQP library has been bumped to the latest stable (2.6.3), which includes a fix for timer duration calculations during link creation and a small handful of efficiency improvements.
1 parent 172738a commit f8dbe83

File tree

13 files changed

+341
-41
lines changed

13 files changed

+341
-41
lines changed

eng/Packages.Data.props

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@
105105
<PackageReference Update="Azure.ResourceManager" Version="1.7.0" />
106106

107107
<!-- Other approved packages -->
108-
<PackageReference Update="Microsoft.Azure.Amqp" Version="2.6.2" />
108+
<PackageReference Update="Microsoft.Azure.Amqp" Version="2.6.3" />
109109
<PackageReference Update="Microsoft.Azure.WebPubSub.Common" Version="1.2.0" />
110110
<PackageReference Update="Microsoft.Identity.Client" Version="4.54.1" />
111111
<PackageReference Update="Microsoft.Identity.Client.Extensions.Msal" Version="2.31.0" />

sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/Diagnostics/EventProcessorClientEventSource.cs

Lines changed: 93 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -48,16 +48,18 @@ protected EventProcessorClientEventSource() : base(EventSourceName)
4848
/// <param name="identifier">A unique name used to identify the event processor.</param>
4949
/// <param name="eventHubName">The name of the Event Hub that the processor is associated with.</param>
5050
/// <param name="consumerGroup">The name of the consumer group that the processor is associated with.</param>
51+
/// <param name="operationId">An identifier for the processing operation, allowing its activities to be correlated.</param>
5152
///
52-
[Event(20, Level = EventLevel.Verbose, Message = "Starting to process a batch of events for partition '{0}' by processor instance with identifier '{1}' for Event Hub: {2} and Consumer Group: {3}.")]
53+
[Event(20, Level = EventLevel.Verbose, Message = "Starting to process a batch of events for partition '{0}' by processor instance with identifier '{1}' for Event Hub: {2} and Consumer Group: {3}. Operation Id: '{4}'")]
5354
public virtual void EventBatchProcessingStart(string partitionId,
5455
string identifier,
5556
string eventHubName,
56-
string consumerGroup)
57+
string consumerGroup,
58+
string operationId)
5759
{
5860
if (IsEnabled())
5961
{
60-
WriteEvent(20, partitionId ?? string.Empty, identifier ?? string.Empty, eventHubName ?? string.Empty, consumerGroup ?? string.Empty);
62+
WriteEvent(20, partitionId ?? string.Empty, identifier ?? string.Empty, eventHubName ?? string.Empty, consumerGroup ?? string.Empty, operationId ?? string.Empty);
6163
}
6264
}
6365

@@ -69,16 +71,18 @@ public virtual void EventBatchProcessingStart(string partitionId,
6971
/// <param name="identifier">A unique name used to identify the event processor.</param>
7072
/// <param name="eventHubName">The name of the Event Hub that the processor is associated with.</param>
7173
/// <param name="consumerGroup">The name of the consumer group that the processor is associated with.</param>
74+
/// <param name="operationId">An identifier for the processing operation, allowing its activities to be correlated.</param>
7275
///
73-
[Event(21, Level = EventLevel.Verbose, Message = "Completed processing a batch of events for partition '{0}' by processor instance with identifier '{1}' for Event Hub: {2} and Consumer Group: {3}.")]
76+
[Event(21, Level = EventLevel.Verbose, Message = "Completed processing a batch of events for partition '{0}' by processor instance with identifier '{1}' for Event Hub: {2} and Consumer Group: {3}. Operation Id: '{4}'")]
7477
public virtual void EventBatchProcessingComplete(string partitionId,
7578
string identifier,
7679
string eventHubName,
77-
string consumerGroup)
80+
string consumerGroup,
81+
string operationId)
7882
{
7983
if (IsEnabled())
8084
{
81-
WriteEvent(21, partitionId ?? string.Empty, identifier ?? string.Empty, eventHubName ?? string.Empty, consumerGroup ?? string.Empty);
85+
WriteEvent(21, partitionId ?? string.Empty, identifier ?? string.Empty, eventHubName ?? string.Empty, consumerGroup ?? string.Empty, operationId ?? string.Empty);
8286
}
8387
}
8488

@@ -90,18 +94,20 @@ public virtual void EventBatchProcessingComplete(string partitionId,
9094
/// <param name="identifier">A unique name used to identify the event processor.</param>
9195
/// <param name="eventHubName">The name of the Event Hub that the processor is associated with.</param>
9296
/// <param name="consumerGroup">The name of the consumer group that the processor is associated with.</param>
97+
/// <param name="operationId">An identifier for the processing operation, allowing its activities to be correlated.</param>
9398
/// <param name="errorMessage">The message for the exception that occurred.</param>
9499
///
95-
[Event(22, Level = EventLevel.Error, Message = "An exception occurred while processing events for partition '{0}' by processor instance with identifier '{1}' for Event Hub: {2} and Consumer Group: {3}. Error Message: '{4}'")]
100+
[Event(22, Level = EventLevel.Error, Message = "An exception occurred while processing events for partition '{0}' by processor instance with identifier '{1}' for Event Hub: {2} and Consumer Group: {3}. Operation Id: '{5}'; Error Message: '{4}'")]
96101
public virtual void EventBatchProcessingError(string partitionId,
97102
string identifier,
98103
string eventHubName,
99104
string consumerGroup,
105+
string operationId,
100106
string errorMessage)
101107
{
102108
if (IsEnabled())
103109
{
104-
WriteEvent(22, partitionId ?? string.Empty, identifier ?? string.Empty, eventHubName ?? string.Empty, consumerGroup ?? string.Empty, errorMessage ?? string.Empty);
110+
WriteEvent(22, partitionId ?? string.Empty, identifier ?? string.Empty, eventHubName ?? string.Empty, consumerGroup ?? string.Empty, errorMessage ?? string.Empty, operationId ?? string.Empty);
105111
}
106112
}
107113

@@ -191,6 +197,31 @@ public virtual void ValidationCleanupError(string identifier,
191197
}
192198
}
193199

200+
/// <summary>
201+
/// Indicates that an <see cref="EventProcessorClient" /> has begin processing a batch of events for a partition.
202+
/// </summary>
203+
///
204+
/// <param name="sequenceNumber">The sequence number of the event being passed to the handler for processing.</param>
205+
/// <param name="partitionId">The identifier of the Event Hub partition whose processing is taking place.</param>
206+
/// <param name="identifier">A unique name used to identify the event processor.</param>
207+
/// <param name="eventHubName">The name of the Event Hub that the processor is associated with.</param>
208+
/// <param name="consumerGroup">The name of the consumer group that the processor is associated with.</param>
209+
/// <param name="operationId">An identifier for the processing operation, allowing its activities to be correlated.</param>
210+
///
211+
[Event(27, Level = EventLevel.Verbose, Message = "Invoking the event processing handler for sequence number '{0}'. Partition '{1}'; Processor Identifier '{2}'; Event Hub: {3}; Consumer Group: {4}; Operation Id: '{5}'")]
212+
public virtual void EventBatchProcessingHandlerCall(string sequenceNumber,
213+
string partitionId,
214+
string identifier,
215+
string eventHubName,
216+
string consumerGroup,
217+
string operationId)
218+
{
219+
if (IsEnabled())
220+
{
221+
WriteEvent(27, sequenceNumber ?? string.Empty, partitionId ?? string.Empty, identifier ?? string.Empty, eventHubName ?? string.Empty, consumerGroup ?? string.Empty, operationId ?? string.Empty);
222+
}
223+
}
224+
194225
/// <summary>
195226
/// Writes an event with four string arguments into a stack allocated <see cref="EventSource.EventData"/> struct
196227
/// to avoid the parameter array allocation on the WriteEvent methods.
@@ -280,5 +311,59 @@ private unsafe void WriteEvent(int eventId,
280311
WriteEventCore(eventId, 5, eventPayload);
281312
}
282313
}
314+
315+
/// <summary>
316+
/// Writes an event with five string arguments into a stack allocated
317+
/// <see cref="EventSource.EventData"/> struct to avoid the parameter array allocation on the WriteEvent methods.
318+
/// </summary>
319+
///
320+
/// <param name="eventId">The identifier of the event.</param>
321+
/// <param name="arg1">The first argument.</param>
322+
/// <param name="arg2">The second argument.</param>
323+
/// <param name="arg3">The third argument.</param>
324+
/// <param name="arg4">The fourth argument.</param>
325+
/// <param name="arg5">The fifth argument.</param>
326+
/// <param name="arg6">The sixth argument.</param>
327+
///
328+
[NonEvent]
329+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
330+
private unsafe void WriteEvent(int eventId,
331+
string arg1,
332+
string arg2,
333+
string arg3,
334+
string arg4,
335+
string arg5,
336+
string arg6)
337+
{
338+
fixed (char* arg1Ptr = arg1)
339+
fixed (char* arg2Ptr = arg2)
340+
fixed (char* arg3Ptr = arg3)
341+
fixed (char* arg4Ptr = arg4)
342+
fixed (char* arg5Ptr = arg5)
343+
fixed (char* arg6Ptr = arg6)
344+
{
345+
var eventPayload = stackalloc EventData[6];
346+
347+
eventPayload[0].Size = (arg1.Length + 1) * sizeof(char);
348+
eventPayload[0].DataPointer = (IntPtr)arg1Ptr;
349+
350+
eventPayload[1].Size = (arg2.Length + 1) * sizeof(char);
351+
eventPayload[1].DataPointer = (IntPtr)arg2Ptr;
352+
353+
eventPayload[2].Size = (arg3.Length + 1) * sizeof(char);
354+
eventPayload[2].DataPointer = (IntPtr)arg3Ptr;
355+
356+
eventPayload[3].Size = (arg4.Length + 1) * sizeof(char);
357+
eventPayload[3].DataPointer = (IntPtr)arg4Ptr;
358+
359+
eventPayload[4].Size = (arg5.Length + 1) * sizeof(char);
360+
eventPayload[4].DataPointer = (IntPtr)arg5Ptr;
361+
362+
eventPayload[5].Size = (arg5.Length + 1) * sizeof(char);
363+
eventPayload[5].DataPointer = (IntPtr)arg5Ptr;
364+
365+
WriteEventCore(eventId, 6, eventPayload);
366+
}
367+
}
283368
}
284369
}

sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/EventProcessorClient.cs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1015,14 +1015,15 @@ protected override async Task OnProcessingEventBatchAsync(IEnumerable<EventData>
10151015
{
10161016
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
10171017

1018+
var operation = Guid.NewGuid().ToString("D", CultureInfo.InvariantCulture);
10181019
var context = default(PartitionContext);
10191020
var eventArgs = default(ProcessEventArgs);
10201021
var caughtExceptions = default(List<Exception>);
10211022
var emptyBatch = true;
10221023

10231024
try
10241025
{
1025-
Logger.EventBatchProcessingStart(partition.PartitionId, Identifier, EventHubName, ConsumerGroup);
1026+
Logger.EventBatchProcessingStart(partition.PartitionId, Identifier, EventHubName, ConsumerGroup, operation);
10261027

10271028
// Attempt to process each event in the batch, marking if the batch was non-empty. Exceptions during
10281029
// processing should be logged and cached, as the batch must be processed completely to avoid losing events.
@@ -1043,6 +1044,8 @@ protected override async Task OnProcessingEventBatchAsync(IEnumerable<EventData>
10431044

10441045
try
10451046
{
1047+
Logger.EventBatchProcessingHandlerCall(eventData.SequenceNumber.ToString(), partition.PartitionId, Identifier, EventHubName, ConsumerGroup, operation);
1048+
10461049
context ??= new ProcessorPartitionContext(FullyQualifiedNamespace, EventHubName, ConsumerGroup, partition.PartitionId, () => ReadLastEnqueuedEventProperties(partition.PartitionId));
10471050
eventArgs = new ProcessEventArgs(context, eventData, updateToken => UpdateCheckpointAsync(partition.PartitionId, eventData.Offset, eventData.SequenceNumber, updateToken), cancellationToken);
10481051

@@ -1053,7 +1056,7 @@ protected override async Task OnProcessingEventBatchAsync(IEnumerable<EventData>
10531056
// This exception is not surfaced to the error handler or bubbled, as the entire batch must be
10541057
// processed or events will be lost. Preserve the exceptions, should any occur.
10551058

1056-
Logger.EventBatchProcessingError(partition.PartitionId, Identifier, EventHubName, ConsumerGroup, ex.Message);
1059+
Logger.EventBatchProcessingError(partition.PartitionId, Identifier, EventHubName, ConsumerGroup, operation, ex.Message);
10571060

10581061
caughtExceptions ??= new List<Exception>();
10591062
caughtExceptions.Add(ex);
@@ -1065,6 +1068,8 @@ protected override async Task OnProcessingEventBatchAsync(IEnumerable<EventData>
10651068

10661069
if (emptyBatch)
10671070
{
1071+
Logger.EventBatchProcessingHandlerCall("<< No Event >>", partition.PartitionId, Identifier, EventHubName, ConsumerGroup, operation);
1072+
10681073
eventArgs = new ProcessEventArgs(new EmptyPartitionContext(FullyQualifiedNamespace, EventHubName, ConsumerGroup, partition.PartitionId), null, EmptyEventUpdateCheckpoint, cancellationToken);
10691074
await _processEventAsync(eventArgs).ConfigureAwait(false);
10701075
}
@@ -1074,12 +1079,12 @@ protected override async Task OnProcessingEventBatchAsync(IEnumerable<EventData>
10741079
// This exception was either not related to processing events or was the result of sending an empty batch to be
10751080
// processed. Since there would be no other caught exceptions, tread this like a single case.
10761081

1077-
Logger.EventBatchProcessingError(partition.PartitionId, Identifier, EventHubName, ConsumerGroup, ex.Message);
1082+
Logger.EventBatchProcessingError(partition.PartitionId, Identifier, EventHubName, ConsumerGroup, operation, ex.Message);
10781083
throw;
10791084
}
10801085
finally
10811086
{
1082-
Logger.EventBatchProcessingComplete(partition.PartitionId, Identifier, EventHubName, ConsumerGroup);
1087+
Logger.EventBatchProcessingComplete(partition.PartitionId, Identifier, EventHubName, ConsumerGroup, operation);
10831088
}
10841089

10851090
// Deal with any exceptions that occurred while processing the batch. If more than one was

sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Processor/EventProcessorClientTests.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1130,15 +1130,17 @@ public async Task EventProcessingLogsExecution()
11301130
partitionId,
11311131
processorClient.Identifier,
11321132
processorClient.EventHubName,
1133-
processorClient.ConsumerGroup),
1133+
processorClient.ConsumerGroup,
1134+
It.IsAny<string>()),
11341135
Times.Once);
11351136

11361137
mockLogger
11371138
.Verify(log => log.EventBatchProcessingComplete(
11381139
partitionId,
11391140
processorClient.Identifier,
11401141
processorClient.EventHubName,
1141-
processorClient.ConsumerGroup),
1142+
processorClient.ConsumerGroup,
1143+
It.IsAny<string>()),
11421144
Times.Once);
11431145

11441146
cancellationSource.Cancel();
@@ -1176,6 +1178,7 @@ public async Task EventProcessingLogsExceptions()
11761178
processorClient.Identifier,
11771179
processorClient.EventHubName,
11781180
processorClient.ConsumerGroup,
1181+
It.IsAny<string>(),
11791182
expectedException.Message),
11801183
Times.Exactly(eventBatch.Length));
11811184

sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Diagnostics/PartitionLoadBalancerEventSource.cs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -136,14 +136,18 @@ public virtual void ShouldStealPartition(string identifier)
136136
/// Indicates that stealable partitions were found, so randomly picking one of them to claim.
137137
/// </summary>
138138
///
139+
/// <param name="partitionId">The identifier of the partition that was selected to be stolen.</param>
140+
/// <param name="stolenFrom">The identifier of the event processor that is being stolen from.</param>
139141
/// <param name="identifier">A unique name used to identify the associated event processor.</param>
140142
///
141-
[Event(7, Level = EventLevel.Informational, Message = "No unclaimed partitions, stealing from another event processor. (Identifier: '{0}')")]
142-
public virtual void StealPartition(string identifier)
143+
[Event(7, Level = EventLevel.Informational, Message = "No unclaimed partitions, attempting to steal partition '{0}' from event processor '{1}'. (Identifier: '{2}')")]
144+
public virtual void StealPartition(string partitionId,
145+
string stolenFrom,
146+
string identifier)
143147
{
144148
if (IsEnabled())
145149
{
146-
WriteEvent(7, identifier ?? string.Empty);
150+
WriteEvent(7, partitionId ?? string.Empty, stolenFrom ?? string.Empty, identifier ?? string.Empty);
147151
}
148152
}
149153

0 commit comments

Comments
 (0)