Skip to content

Commit 2ad2c21

Browse files
Add option for initial offset (Azure#17727)
Fixes Azure#17521
1 parent 36f50d5 commit 2ad2c21

File tree

8 files changed

+308
-106
lines changed

8 files changed

+308
-106
lines changed

sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/api/Microsoft.Azure.WebJobs.Extensions.EventHubs.netstandard2.0.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ public partial class EventHubOptions : Microsoft.Azure.WebJobs.Hosting.IOptionsF
2929
public EventHubOptions() { }
3030
public int BatchCheckpointFrequency { get { throw null; } set { } }
3131
public Azure.Messaging.EventHubs.Primitives.EventProcessorOptions EventProcessorOptions { get { throw null; } }
32+
public Microsoft.Azure.WebJobs.EventHubs.InitialOffsetOptions InitialOffsetOptions { get { throw null; } }
3233
public bool InvokeProcessorAfterReceiveTimeout { get { throw null; } set { } }
3334
public int MaxBatchSize { get { throw null; } set { } }
3435
public void AddReceiver(string eventHubName, string receiverConnectionString) { }
@@ -42,6 +43,12 @@ public partial class EventHubsWebJobsStartup : Microsoft.Azure.WebJobs.Hosting.I
4243
public EventHubsWebJobsStartup() { }
4344
public void Configure(Microsoft.Azure.WebJobs.IWebJobsBuilder builder) { }
4445
}
46+
public partial class InitialOffsetOptions
47+
{
48+
public InitialOffsetOptions() { }
49+
public string EnqueuedTimeUTC { get { throw null; } set { } }
50+
public string Type { get { throw null; } set { } }
51+
}
4552
}
4653
namespace Microsoft.Extensions.Hosting
4754
{

sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubOptions.cs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ public EventHubOptions()
3434
PrefetchCount = 300,
3535
DefaultStartingPosition = EventPosition.Earliest,
3636
};
37+
InitialOffsetOptions = new InitialOffsetOptions();
3738
}
3839

3940
public EventProcessorOptions EventProcessorOptions { get; }
@@ -81,6 +82,12 @@ public int MaxBatchSize
8182
/// </summary>
8283
public bool InvokeProcessorAfterReceiveTimeout { get; set; }
8384

85+
/// <summary>
86+
/// Gets the initial offset options to apply when processing. This only applies
87+
/// when no checkpoint information is available.
88+
/// </summary>
89+
public InitialOffsetOptions InitialOffsetOptions { get; }
90+
8491
/// <summary>
8592
/// Gets or sets the Azure Blobs container name that the event processor uses to coordinate load balancing listening on an event hub.
8693
/// </summary>
@@ -275,12 +282,23 @@ public string Format()
275282
};
276283
}
277284

285+
JObject initialOffsetOptions = null;
286+
if (InitialOffsetOptions != null)
287+
{
288+
initialOffsetOptions = new JObject
289+
{
290+
{ nameof(InitialOffsetOptions.Type), InitialOffsetOptions.Type },
291+
{ nameof(InitialOffsetOptions.EnqueuedTimeUTC), InitialOffsetOptions.EnqueuedTimeUTC },
292+
};
293+
}
294+
278295
JObject options = new JObject
279296
{
280297
{ nameof(MaxBatchSize), MaxBatchSize },
281298
{ nameof(InvokeProcessorAfterReceiveTimeout), InvokeProcessorAfterReceiveTimeout },
282299
{ nameof(BatchCheckpointFrequency), BatchCheckpointFrequency },
283300
{ nameof(EventProcessorOptions), eventProcessorOptions },
301+
{ nameof(InitialOffsetOptions), initialOffsetOptions }
284302
};
285303

286304
return options.ToString(Formatting.Indented);

sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubWebJobsBuilderExtensions.cs

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
// Licensed under the MIT License. See License.txt in the project root for license information.
33

44
using System;
5+
using System.Globalization;
6+
using Azure.Messaging.EventHubs.Consumer;
57
using Microsoft.Azure.WebJobs;
68
using Microsoft.Azure.WebJobs.EventHubs;
79
using Microsoft.Extensions.Azure;
@@ -19,8 +21,7 @@ public static IWebJobsBuilder AddEventHubs(this IWebJobsBuilder builder)
1921
throw new ArgumentNullException(nameof(builder));
2022
}
2123

22-
builder.AddEventHubs(p => {});
23-
24+
builder.AddEventHubs(p => { });
2425
return builder;
2526
}
2627

@@ -85,8 +86,42 @@ public static IWebJobsBuilder AddEventHubs(this IWebJobsBuilder builder, Action<
8586
builder.Services.AddAzureClientsCore();
8687
builder.Services.AddSingleton<EventHubClientFactory>();
8788
builder.Services.Configure<EventHubOptions>(configure);
89+
builder.Services.Configure<EventHubOptions>(ConfigureInitialOffsetOptions);
8890

8991
return builder;
9092
}
93+
94+
internal static void ConfigureInitialOffsetOptions(EventHubOptions options)
95+
{
96+
string offsetType = options?.InitialOffsetOptions?.Type?.ToLower(CultureInfo.InvariantCulture) ?? string.Empty;
97+
if (!string.IsNullOrEmpty(offsetType))
98+
{
99+
switch (offsetType)
100+
{
101+
case "fromstart":
102+
options.EventProcessorOptions.DefaultStartingPosition = EventPosition.Earliest;
103+
break;
104+
case "fromend":
105+
options.EventProcessorOptions.DefaultStartingPosition = EventPosition.Latest;
106+
break;
107+
case "fromenqueuedtime":
108+
try
109+
{
110+
DateTime enqueuedTimeUTC = DateTime.Parse(options.InitialOffsetOptions.EnqueuedTimeUTC, CultureInfo.InvariantCulture).ToUniversalTime();
111+
options.EventProcessorOptions.DefaultStartingPosition = EventPosition.FromEnqueuedTime(enqueuedTimeUTC);
112+
}
113+
catch (FormatException fe)
114+
{
115+
string message = $"{nameof(EventHubOptions)}:{nameof(InitialOffsetOptions)}:{nameof(InitialOffsetOptions.EnqueuedTimeUTC)} is configured with an invalid format. " +
116+
"Please use a format supported by DateTime.Parse(). e.g. 'yyyy-MM-ddTHH:mm:ssZ'";
117+
throw new InvalidOperationException(message, fe);
118+
}
119+
break;
120+
default:
121+
throw new InvalidOperationException("An unsupported value was supplied for initialOffsetOptions.type");
122+
}
123+
// If not specified, EventProcessor's default offset will apply
124+
}
125+
}
91126
}
92127
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the MIT License. See License.txt in the project root for license information.
3+
4+
namespace Microsoft.Azure.WebJobs.EventHubs
5+
{
6+
/// <summary>
7+
/// Configuration options to control the initial offset to use when processing an Event Hub. These options only apply
8+
/// when no checkpoint information is available.
9+
/// </summary>
10+
public class InitialOffsetOptions
11+
{
12+
/// <summary>
13+
/// Gets or sets the type of the initial offset.
14+
/// <list type="bullet">
15+
/// <item>
16+
/// <description>fromStart: The default option if not specified. Will start processing from the start of the stream.</description>
17+
/// </item>
18+
/// <item>
19+
/// <description>fromEnd: Will start processing events from the end of the stream.Use this option if you only want to process events
20+
/// that are added after the function starts.</description>
21+
/// </item>
22+
/// <item>
23+
/// <description>fromEnqueuedTime: Will process events that were enqueued by Event Hubs on or after the specified time.Note that this applies
24+
/// to all Event Hubs partitions and there is no support for specifying a per-partition value.</description>
25+
/// </item>
26+
/// </list>
27+
/// </summary>
28+
public string Type { get; set; } = "";
29+
30+
/// <summary>
31+
/// Gets or sets the time that events should be processed after. Any parsable format is accepted.
32+
/// Only applies when the <see cref="Type"/> is "fromEnqueuedTime".
33+
/// </summary>
34+
public string EnqueuedTimeUTC { get; set; } = "";
35+
}
36+
}

sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubApplicationInsightsTests.cs

Lines changed: 17 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ public async Task EventHub_SingleDispatch()
6363
var (jobHost, host) = BuildHost<EventHubTestSingleDispatchJobs>();
6464
using (host)
6565
{
66-
await jobHost.CallAsync(nameof(EventHubTestSingleDispatchJobs.SendEvent_TestHub), new { input = _testId });
66+
await jobHost.CallAsync(nameof(EventHubTestSingleDispatchJobs.SendEvent_TestHub), new { input = "data" });
6767
bool result = _eventWait.WaitOne(Timeout);
6868
Assert.True(result);
6969
}
@@ -120,7 +120,7 @@ public async Task EventHub_MultipleDispatch_BatchSend()
120120
var (jobHost, host) = BuildHost<EventHubTestMultipleDispatchJobs>();
121121
using (host)
122122
{
123-
await jobHost.CallAsync(nameof(EventHubTestMultipleDispatchJobs.SendEvents_TestHub), new { input = _testId });
123+
await jobHost.CallAsync(nameof(EventHubTestMultipleDispatchJobs.SendEvents_TestHub), new { input = "data" });
124124

125125
bool result = _eventWait.WaitOne(Timeout);
126126
Assert.True(result);
@@ -154,14 +154,14 @@ public async Task EventHub_MultipleDispatch_BatchSend()
154154
// EventHub can batch events in a different ways
155155
foreach (var ehTriggerRequest in ehTriggerRequests)
156156
{
157-
ValidateEventHubRequest(
158-
ehTriggerRequest,
159-
true,
160-
EventHubsTestEnvironment.Instance.FullyQualifiedNamespace,
161-
_eventHubScope.EventHubName,
162-
nameof(EventHubTestMultipleDispatchJobs.ProcessMultipleEvents),
163-
null,
164-
null);
157+
ValidateEventHubRequest(
158+
ehTriggerRequest,
159+
true,
160+
EventHubsTestEnvironment.Instance.FullyQualifiedNamespace,
161+
_eventHubScope.EventHubName,
162+
nameof(EventHubTestMultipleDispatchJobs.ProcessMultipleEvents),
163+
null,
164+
null);
165165

166166
Assert.NotNull(ehTriggerRequest.Context.Operation.Id);
167167
Assert.Null(ehTriggerRequest.Context.Operation.ParentId);
@@ -200,9 +200,9 @@ public async Task EventHub_MultipleDispatch_IndependentMessages()
200200
id = spanId
201201
};
202202

203-
messages[i] = new EventData(Encoding.UTF8.GetBytes(_testId + i))
203+
messages[i] = new EventData(Encoding.UTF8.GetBytes(i.ToString()))
204204
{
205-
Properties = {["Diagnostic-Id"] = $"00-{operationId}-{spanId}-01"}
205+
Properties = { ["Diagnostic-Id"] = $"00-{operationId}-{spanId}-01" }
206206
};
207207
}
208208

@@ -337,10 +337,7 @@ public static void SendEvent_TestHub(string input, [EventHub(TestHubName)] out E
337337

338338
public static void ProcessSingleEvent([EventHubTrigger(TestHubName)] string evt)
339339
{
340-
if (evt.StartsWith(_testId))
341-
{
342-
_eventWait.Set();
343-
}
340+
_eventWait.Set();
344341
}
345342
}
346343

@@ -365,15 +362,14 @@ public static void SendEvents_TestHub(string input, [EventHub(TestHubName)] out
365362

366363
public static void ProcessMultipleEvents([EventHubTrigger(TestHubName)] string[] events)
367364
{
368-
var eventsFromCurrentTest = events.Where(e => e.StartsWith(_testId)).ToArray();
369-
Activity.Current.AddTag("receivedMessages", eventsFromCurrentTest.Length.ToString());
365+
Activity.Current.AddTag("receivedMessages", events.Length.ToString());
370366
lock (MessageLock)
371367
{
372-
MessagesCount += eventsFromCurrentTest.Length;
368+
MessagesCount += events.Length;
373369

374-
if (eventsFromCurrentTest.Length > 0)
370+
if (events.Length > 0)
375371
{
376-
LinksCount.Add(eventsFromCurrentTest.Length);
372+
LinksCount.Add(events.Length);
377373
}
378374

379375
if (MessagesCount >= EventCount)

sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubConfigurationTests.cs

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ public void ConfigureOptions_AppliesValuesCorrectly()
4242
Assert.AreEqual(5, options.BatchCheckpointFrequency);
4343
Assert.AreEqual(31, options.EventProcessorOptions.PartitionOwnershipExpirationInterval.TotalSeconds);
4444
Assert.AreEqual(21, options.EventProcessorOptions.LoadBalancingUpdateInterval.TotalSeconds);
45+
Assert.AreEqual("FromEnqueuedTime", options.InitialOffsetOptions.Type);
46+
Assert.AreEqual("2020-09-13T12:00Z", options.InitialOffsetOptions.EnqueuedTimeUTC);
4547
}
4648

4749
[Test]
@@ -53,14 +55,16 @@ public void ConfigureOptions_Format_Returns_Expected()
5355
JObject iObj = JObject.Parse(format);
5456
EventHubOptions result = iObj.ToObject<EventHubOptions>();
5557

56-
Assert.AreEqual(123, options.MaxBatchSize);
57-
Assert.AreEqual(result.BatchCheckpointFrequency, 5);
58-
Assert.AreEqual(result.EventProcessorOptions.TrackLastEnqueuedEventProperties, true);
59-
Assert.AreEqual(result.InvokeProcessorAfterReceiveTimeout, true);
60-
Assert.AreEqual(result.EventProcessorOptions.PrefetchCount, 123);
61-
Assert.AreEqual(result.EventProcessorOptions.MaximumWaitTime, TimeSpan.FromSeconds(33));
62-
Assert.AreEqual(result.EventProcessorOptions.PartitionOwnershipExpirationInterval, TimeSpan.FromSeconds(31));
63-
Assert.AreEqual(result.EventProcessorOptions.LoadBalancingUpdateInterval, TimeSpan.FromSeconds(21));
58+
Assert.AreEqual(123, result.MaxBatchSize);
59+
Assert.AreEqual(5, result.BatchCheckpointFrequency);
60+
Assert.True(result.EventProcessorOptions.TrackLastEnqueuedEventProperties);
61+
Assert.True(result.InvokeProcessorAfterReceiveTimeout);
62+
Assert.AreEqual(123, result.EventProcessorOptions.PrefetchCount);
63+
Assert.AreEqual(TimeSpan.FromSeconds(33), result.EventProcessorOptions.MaximumWaitTime);
64+
Assert.AreEqual(TimeSpan.FromSeconds(31), result.EventProcessorOptions.PartitionOwnershipExpirationInterval);
65+
Assert.AreEqual(TimeSpan.FromSeconds(21), result.EventProcessorOptions.LoadBalancingUpdateInterval);
66+
Assert.AreEqual("FromEnqueuedTime", result.InitialOffsetOptions.Type);
67+
Assert.AreEqual("2020-09-13T12:00Z", result.InitialOffsetOptions.EnqueuedTimeUTC);
6468
}
6569

6670
private EventHubOptions CreateOptions()
@@ -76,6 +80,8 @@ private EventHubOptions CreateOptions()
7680
{ $"{extensionPath}:BatchCheckpointFrequency", "5" },
7781
{ $"{extensionPath}:PartitionManagerOptions:LeaseDuration", "00:00:31" },
7882
{ $"{extensionPath}:PartitionManagerOptions:RenewInterval", "00:00:21" },
83+
{ $"{extensionPath}:InitialOffsetOptions:Type", "FromEnqueuedTime" },
84+
{ $"{extensionPath}:InitialOffsetOptions:EnqueuedTimeUTC", "2020-09-13T12:00Z" },
7985
};
8086

8187
return TestHelpers.GetConfiguredOptions<EventHubOptions>(b =>

0 commit comments

Comments
 (0)