Skip to content

Commit 096fee7

Browse files
m-reddingjsquireJoshLove-msft
authored
[Event Hubs Web Jobs Extensions] Adding MinBatchSize and MaxWaitTime Properties (Azure#34698)
* init * changes * class with semaphore * bug fixes * fixing issue where the background task wasn't waiting the full maxWaitTime * updates * adding tests 1 * added end to end test * additional tests * test fix * initial feedback * regenerating API * WIP feedback addressing * Update sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubOptions.cs Co-authored-by: Jesse Squire <jesse.squire@gmail.com> * Apply suggestions from code review Co-authored-by: Jesse Squire <jesse.squire@gmail.com> * WIP feedback 2 * WIP feedback 3 * Update sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/PartitionProcessorEventsManager.cs Co-authored-by: Jesse Squire <jesse.squire@gmail.com> * Update sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/PartitionProcessorEventsManager.cs Co-authored-by: Jesse Squire <jesse.squire@gmail.com> * feedback 4 * add new validation * adding test * Update sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubOptions.cs Co-authored-by: Jesse Squire <jesse.squire@gmail.com> * Update sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubOptions.cs Co-authored-by: Jesse Squire <jesse.squire@gmail.com> * Update sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubOptions.cs Co-authored-by: Jesse Squire <jesse.squire@gmail.com> * feedback * some fixes * additional changes * trying to fix indenting * accidental file check in * trying to fix tab / adding some comments * added some comments * Update sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubOptions.cs * Update sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/PartitionProcessorEventsManager.cs Co-authored-by: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com> * feedback --------- Co-authored-by: Jesse Squire <jesse.squire@gmail.com> Co-authored-by: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com>
1 parent bc94ecb commit 096fee7

13 files changed

+774
-231
lines changed

sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/api/Microsoft.Azure.WebJobs.Extensions.EventHubs.netstandard2.0.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ public EventHubOptions() { }
3333
public Microsoft.Azure.WebJobs.EventHubs.InitialOffsetOptions InitialOffsetOptions { get { throw null; } }
3434
public System.TimeSpan LoadBalancingUpdateInterval { get { throw null; } set { } }
3535
public int MaxEventBatchSize { get { throw null; } set { } }
36+
public System.TimeSpan MaxWaitTime { get { throw null; } set { } }
37+
public int MinEventBatchSize { get { throw null; } set { } }
3638
public System.TimeSpan PartitionOwnershipExpirationInterval { get { throw null; } set { } }
3739
public int PrefetchCount { get { throw null; } set { } }
3840
public long? PrefetchSizeInBytes { get { throw null; } set { } }

sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubOptions.cs

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ public class EventHubOptions : IOptionsFormatter
1919
public EventHubOptions()
2020
{
2121
MaxEventBatchSize = 10;
22+
MinEventBatchSize = 1;
23+
MaxWaitTime = TimeSpan.FromSeconds(60);
2224
ConnectionOptions = new EventHubConnectionOptions()
2325
{
2426
TransportType = EventHubsTransportType.AmqpTcp
@@ -132,6 +134,53 @@ public int MaxEventBatchSize
132134
}
133135
}
134136

137+
private int _minEventBatchSize;
138+
139+
/// <summary>
140+
/// Gets or sets the minimum number of events desired for a batch. This setting applies only to functions that
141+
/// receive multiple events. This value must be less than <see cref="MaxEventBatchSize"/> and is used in
142+
/// conjunction with <see cref="MaxWaitTime"/>. Default 1.
143+
/// </summary>
144+
public int MinEventBatchSize
145+
{
146+
get => _minEventBatchSize;
147+
148+
set
149+
{
150+
if (value < 1)
151+
{
152+
throw new ArgumentException("Batch size must be larger than or equal to 1.");
153+
}
154+
_minEventBatchSize = value;
155+
}
156+
}
157+
158+
private TimeSpan _maxWaitTime;
159+
160+
/// <summary>
161+
/// Gets or sets the maximum time that the trigger should wait to fill a batch before invoking the function.
162+
/// This is only considered when <see cref="MinEventBatchSize"/> is set to larger than 1 and is otherwise unused.
163+
/// If less than <see cref="MinEventBatchSize" /> events were available before the wait time elapses, the function
164+
/// will be invoked with a partial batch. Default is 60 seconds. The longest allowed wait time is 10 minutes.
165+
/// </summary>
166+
public TimeSpan MaxWaitTime
167+
{
168+
get => _maxWaitTime;
169+
170+
set
171+
{
172+
if (value < TimeSpan.Zero)
173+
{
174+
throw new ArgumentException("Max Wait Time must be larger than or equal to 0.");
175+
}
176+
if (value > TimeSpan.FromMinutes(10))
177+
{
178+
throw new ArgumentException("Max Wait Time must be less than or equal to 10 minutes.");
179+
}
180+
_maxWaitTime = value;
181+
}
182+
}
183+
135184
private int? _targetUnprocessedEventThreshold;
136185

137186
/// <summary>
@@ -211,6 +260,8 @@ string IOptionsFormatter.Format()
211260
{
212261
{ nameof(TargetUnprocessedEventThreshold), TargetUnprocessedEventThreshold },
213262
{ nameof(MaxEventBatchSize), MaxEventBatchSize },
263+
{ nameof(MinEventBatchSize), MinEventBatchSize },
264+
{ nameof(MaxWaitTime), MaxWaitTime },
214265
{ nameof(BatchCheckpointFrequency), BatchCheckpointFrequency },
215266
{ nameof(TransportType), TransportType.ToString()},
216267
{ nameof(WebProxy), WebProxy is WebProxy proxy ? proxy.Address.AbsoluteUri : string.Empty },

sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubWebJobsBuilderExtensions.cs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,12 +86,12 @@ public static IWebJobsBuilder AddEventHubs(this IWebJobsBuilder builder, Action<
8686
builder.Services.AddSingleton<EventHubClientFactory>();
8787
builder.Services.AddSingleton<CheckpointClientProvider>();
8888
builder.Services.Configure<EventHubOptions>(configure);
89-
builder.Services.PostConfigure<EventHubOptions>(ConfigureInitialOffsetOptions);
89+
builder.Services.PostConfigure<EventHubOptions>(ConfigureOptions);
9090

9191
return builder;
9292
}
9393

94-
internal static void ConfigureInitialOffsetOptions(EventHubOptions options)
94+
internal static void ConfigureOptions(EventHubOptions options)
9595
{
9696
OffsetType? type = options?.InitialOffsetOptions?.Type;
9797
if (type.HasValue)
@@ -121,6 +121,11 @@ internal static void ConfigureInitialOffsetOptions(EventHubOptions options)
121121
}
122122
// If not specified, EventProcessor's default offset will apply
123123
}
124+
125+
if (options.MinEventBatchSize > options.MaxEventBatchSize)
126+
{
127+
throw new InvalidOperationException("The minimum event batch size cannot be larger than the maximum event batch size.");
128+
}
124129
}
125130
}
126131
}

0 commit comments

Comments
 (0)