Skip to content

Commit e9bce61

Browse files
Add new TargetUnprocessedEventThreshold parameter in EventHub Options + Target Based Scaling for Event Hubs (Azure#33675)
1 parent 4142499 commit e9bce61

11 files changed

+805
-340
lines changed

sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/api/Microsoft.Azure.WebJobs.Extensions.EventHubs.netstandard2.0.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ public EventHubOptions() { }
3636
public System.TimeSpan PartitionOwnershipExpirationInterval { get { throw null; } set { } }
3737
public int PrefetchCount { get { throw null; } set { } }
3838
public long? PrefetchSizeInBytes { get { throw null; } set { } }
39+
public int? TargetUnprocessedEventThreshold { get { throw null; } set { } }
3940
public bool TrackLastEnqueuedEventProperties { get { throw null; } set { } }
4041
public Azure.Messaging.EventHubs.EventHubsTransportType TransportType { get { throw null; } set { } }
4142
public System.Net.IWebProxy WebProxy { get { throw null; } set { } }

sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubOptions.cs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,27 @@ public int MaxEventBatchSize
132132
}
133133
}
134134

135+
private int? _targetUnprocessedEventThreshold;
136+
137+
/// <summary>
138+
/// Get or sets the target number of unprocessed events per worker for Event Hub-triggered functions. This is used in target-based scaling to override the default scaling threshold inferred from the <see cref="MaxEventBatchSize" /> option.
139+
///
140+
/// If TargetUnprocessedEventThreshold is set, the total unprocessed event count will be divided by this value to determine the number of worker instances, which will then be rounded up to a worker instance count that creates a balanced partition distribution.
141+
/// </summary>
142+
public int? TargetUnprocessedEventThreshold
143+
{
144+
get => _targetUnprocessedEventThreshold;
145+
146+
set
147+
{
148+
if (value < 1)
149+
{
150+
throw new ArgumentException("Unprocessed Event Threshold must be larger than 0.");
151+
}
152+
_targetUnprocessedEventThreshold = value;
153+
}
154+
}
155+
135156
/// <summary>
136157
/// Gets the initial offset options to apply when processing. This only applies
137158
/// when no checkpoint information is available.
@@ -188,6 +209,7 @@ string IOptionsFormatter.Format()
188209
{
189210
JObject options = new JObject
190211
{
212+
{ nameof(TargetUnprocessedEventThreshold), TargetUnprocessedEventThreshold },
191213
{ nameof(MaxEventBatchSize), MaxEventBatchSize },
192214
{ nameof(BatchCheckpointFrequency), BatchCheckpointFrequency },
193215
{ nameof(TransportType), TransportType.ToString()},

sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.cs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
using Azure.Messaging.EventHubs.Primitives;
1313
using Azure.Messaging.EventHubs.Processor;
1414
using Microsoft.Azure.WebJobs.EventHubs.Processor;
15+
using Microsoft.Azure.WebJobs.Extensions.EventHubs.Listeners;
1516
using Microsoft.Azure.WebJobs.Host.Executors;
1617
using Microsoft.Azure.WebJobs.Host.Listeners;
1718
using Microsoft.Azure.WebJobs.Host.Scale;
@@ -20,7 +21,7 @@
2021

2122
namespace Microsoft.Azure.WebJobs.EventHubs.Listeners
2223
{
23-
internal sealed class EventHubListener : IListener, IEventProcessorFactory, IScaleMonitorProvider
24+
internal sealed class EventHubListener : IListener, IEventProcessorFactory, IScaleMonitorProvider, ITargetScalerProvider
2425
{
2526
private readonly ITriggeredFunctionExecutor _executor;
2627
private readonly EventProcessorHost _eventProcessorHost;
@@ -29,6 +30,7 @@ internal sealed class EventHubListener : IListener, IEventProcessorFactory, ISca
2930
private readonly EventHubOptions _options;
3031

3132
private Lazy<EventHubsScaleMonitor> _scaleMonitor;
33+
private Lazy<EventHubsTargetScaler> _targetScaler;
3234
private readonly ILoggerFactory _loggerFactory;
3335
private readonly ILogger _logger;
3436
private string _details;
@@ -51,13 +53,23 @@ public EventHubListener(
5153
_options = options;
5254
_logger = _loggerFactory.CreateLogger<EventHubListener>();
5355

56+
EventHubMetricsProvider metricsProvider = new EventHubMetricsProvider(functionId, consumerClient, checkpointStore, _loggerFactory.CreateLogger<EventHubMetricsProvider>());
57+
5458
_scaleMonitor = new Lazy<EventHubsScaleMonitor>(
5559
() => new EventHubsScaleMonitor(
5660
functionId,
5761
consumerClient,
5862
checkpointStore,
5963
_loggerFactory.CreateLogger<EventHubsScaleMonitor>()));
6064

65+
_targetScaler = new Lazy<EventHubsTargetScaler>(
66+
() => new EventHubsTargetScaler(
67+
functionId,
68+
consumerClient,
69+
options,
70+
metricsProvider,
71+
_loggerFactory.CreateLogger<EventHubsTargetScaler>()));
72+
6173
_details = $"'namespace='{eventProcessorHost?.FullyQualifiedNamespace}', eventHub='{eventProcessorHost?.EventHubName}', " +
6274
$"consumerGroup='{eventProcessorHost?.ConsumerGroup}', functionId='{functionId}', singleDispatch='{singleDispatch}'";
6375
}
@@ -104,6 +116,11 @@ public IScaleMonitor GetMonitor()
104116
return _scaleMonitor.Value;
105117
}
106118

119+
public ITargetScaler GetTargetScaler()
120+
{
121+
return _targetScaler.Value;
122+
}
123+
107124
// We get a new instance each time Start() is called.
108125
// We'll get a listener per partition - so they can potentially run in parallel even on a single machine.
109126
internal class EventProcessor : IEventProcessor, IDisposable
Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the MIT License. See License.txt in the project root for license information.
3+
4+
using System;
5+
using System.Collections.Generic;
6+
using System.Linq;
7+
using System.Threading;
8+
using System.Threading.Tasks;
9+
using Azure.Messaging.EventHubs;
10+
using Azure.Messaging.EventHubs.Primitives;
11+
using Microsoft.Azure.WebJobs.EventHubs.Listeners;
12+
using Microsoft.Extensions.Logging;
13+
14+
namespace Microsoft.Azure.WebJobs.Extensions.EventHubs.Listeners
15+
{
16+
internal class EventHubMetricsProvider
17+
{
18+
private const int PartitionLogIntervalInMinutes = 5;
19+
20+
private readonly string _functionId;
21+
private readonly IEventHubConsumerClient _client;
22+
private readonly ILogger _logger;
23+
private readonly BlobCheckpointStoreInternal _checkpointStore;
24+
25+
private DateTime _nextPartitionLogTime;
26+
private DateTime _nextPartitionWarningTime;
27+
28+
// Used for mocking.
29+
public EventHubMetricsProvider() { }
30+
31+
public EventHubMetricsProvider(string functionId, IEventHubConsumerClient client, BlobCheckpointStoreInternal checkpointStore, ILogger logger)
32+
{
33+
_functionId = functionId;
34+
_logger = logger;
35+
_checkpointStore = checkpointStore;
36+
_nextPartitionLogTime = DateTime.UtcNow;
37+
_nextPartitionWarningTime = DateTime.UtcNow;
38+
_client = client;
39+
}
40+
41+
public async Task<EventHubsTriggerMetrics> GetMetricsAsync()
42+
{
43+
EventHubsTriggerMetrics metrics = new EventHubsTriggerMetrics();
44+
string[] partitions = null;
45+
46+
try
47+
{
48+
partitions = await _client.GetPartitionsAsync().ConfigureAwait(false);
49+
metrics.PartitionCount = partitions.Length;
50+
}
51+
catch (Exception e)
52+
{
53+
_logger.LogWarning($"Encountered an exception while checking EventHub '{_client.EventHubName}'. Error: {e.Message}");
54+
return metrics;
55+
}
56+
57+
// Get the PartitionRuntimeInformation for all partitions
58+
_logger.LogInformation($"Querying partition information for {partitions.Length} partitions.");
59+
var partitionPropertiesTasks = new Task<PartitionProperties>[partitions.Length];
60+
var checkpointTasks = new Task<EventProcessorCheckpoint>[partitionPropertiesTasks.Length];
61+
62+
for (int i = 0; i < partitions.Length; i++)
63+
{
64+
partitionPropertiesTasks[i] = _client.GetPartitionPropertiesAsync(partitions[i]);
65+
66+
checkpointTasks[i] = _checkpointStore.GetCheckpointAsync(
67+
_client.FullyQualifiedNamespace,
68+
_client.EventHubName,
69+
_client.ConsumerGroup,
70+
partitions[i],
71+
CancellationToken.None);
72+
}
73+
74+
await Task.WhenAll(partitionPropertiesTasks).ConfigureAwait(false);
75+
EventProcessorCheckpoint[] checkpoints;
76+
77+
try
78+
{
79+
checkpoints = await Task.WhenAll(checkpointTasks).ConfigureAwait(false);
80+
}
81+
catch
82+
{
83+
// GetCheckpointsAsync would log
84+
return metrics;
85+
}
86+
87+
return CreateTriggerMetrics(partitionPropertiesTasks.Select(t => t.Result).ToList(), checkpoints);
88+
}
89+
90+
private EventHubsTriggerMetrics CreateTriggerMetrics(List<PartitionProperties> partitionRuntimeInfo, EventProcessorCheckpoint[] checkpoints, bool alwaysLog = false)
91+
{
92+
long totalUnprocessedEventCount = 0;
93+
94+
DateTime utcNow = DateTime.UtcNow;
95+
bool logPartitionInfo = alwaysLog ? true : utcNow >= _nextPartitionLogTime;
96+
bool logPartitionWarning = alwaysLog ? true : utcNow >= _nextPartitionWarningTime;
97+
98+
// For each partition, get the last enqueued sequence number.
99+
// If the last enqueued sequence number does not equal the SequenceNumber from the lease info in storage,
100+
// accumulate new event counts across partitions to derive total new event counts.
101+
List<string> partitionErrors = new List<string>();
102+
for (int i = 0; i < partitionRuntimeInfo.Count; i++)
103+
{
104+
var partitionProperties = partitionRuntimeInfo[i];
105+
106+
var checkpoint = (BlobCheckpointStoreInternal.BlobStorageCheckpoint)checkpoints.SingleOrDefault(c => c?.PartitionId == partitionProperties.Id);
107+
108+
// Check for the unprocessed messages when there are messages on the Event Hub partition
109+
// In that case, LastEnqueuedSequenceNumber will be >= 0
110+
111+
if ((partitionProperties.LastEnqueuedSequenceNumber != -1 && partitionProperties.LastEnqueuedSequenceNumber != (checkpoint?.SequenceNumber ?? -1))
112+
|| (checkpoint == null && partitionProperties.LastEnqueuedSequenceNumber >= 0))
113+
{
114+
long partitionUnprocessedEventCount = GetUnprocessedEventCount(partitionProperties, checkpoint);
115+
totalUnprocessedEventCount += partitionUnprocessedEventCount;
116+
}
117+
}
118+
119+
// Only log if not all partitions are failing or it's time to log
120+
if (partitionErrors.Count > 0 && (partitionErrors.Count != partitionRuntimeInfo.Count || logPartitionWarning))
121+
{
122+
_logger.LogWarning($"Function '{_functionId}': Unable to deserialize partition or checkpoint info with the " +
123+
$"following errors: {string.Join(" ", partitionErrors)}");
124+
_nextPartitionWarningTime = DateTime.UtcNow.AddMinutes(PartitionLogIntervalInMinutes);
125+
}
126+
127+
if (totalUnprocessedEventCount > 0 && logPartitionInfo)
128+
{
129+
_logger.LogInformation($"Function '{_functionId}', Total new events: {totalUnprocessedEventCount}");
130+
_nextPartitionLogTime = DateTime.UtcNow.AddMinutes(PartitionLogIntervalInMinutes);
131+
}
132+
133+
return new EventHubsTriggerMetrics
134+
{
135+
Timestamp = DateTime.UtcNow,
136+
PartitionCount = partitionRuntimeInfo.Count,
137+
EventCount = totalUnprocessedEventCount
138+
};
139+
}
140+
141+
// Get the number of unprocessed events by deriving the delta between the server side info and the partition lease info,
142+
private static long GetUnprocessedEventCount(PartitionProperties partitionInfo, BlobCheckpointStoreInternal.BlobStorageCheckpoint checkpoint)
143+
{
144+
// If the partition is empty, there are no events to process.
145+
146+
if (partitionInfo.IsEmpty)
147+
{
148+
return 0;
149+
}
150+
151+
// If there is no checkpoint and the beginning and last sequence numbers for the partition are the same
152+
// this partition received its first event.
153+
154+
if (checkpoint == null
155+
&& partitionInfo.LastEnqueuedSequenceNumber == partitionInfo.BeginningSequenceNumber)
156+
{
157+
return 1;
158+
}
159+
160+
var startingSequenceNumber = checkpoint?.SequenceNumber switch
161+
{
162+
// There was no checkpoint, use the beginning sequence number - 1, since
163+
// that event hasn't been processed yet.
164+
165+
null => partitionInfo.BeginningSequenceNumber - 1,
166+
167+
// Use the checkpoint.
168+
169+
long seq => seq
170+
};
171+
172+
// For normal scenarios, the last sequence number will be greater than the starting number and
173+
// simple subtraction can be used.
174+
175+
if (partitionInfo.LastEnqueuedSequenceNumber > startingSequenceNumber)
176+
{
177+
return (partitionInfo.LastEnqueuedSequenceNumber - startingSequenceNumber);
178+
}
179+
180+
// Partition is a circular buffer, so it is possible that
181+
// LastEnqueuedSequenceNumber < startingSequenceNumber
182+
183+
long count = 0;
184+
unchecked
185+
{
186+
count = (long.MaxValue - partitionInfo.LastEnqueuedSequenceNumber) + startingSequenceNumber;
187+
}
188+
189+
// It's possible for the starting sequence number to be ahead of the last sequence number,
190+
// especially if checkpointing is happening often and load is very low. If count is negative,
191+
// we need to know that this read is invalid, so return 0.
192+
// e.g., (9223372036854775807 - 10) + 11 = -9223372036854775808
193+
194+
return (count < 0) ? 0 : count;
195+
}
196+
}
197+
}

0 commit comments

Comments
 (0)