33
44using System ;
55using System . Collections . Generic ;
6+ using System . Globalization ;
67using System . Text ;
78using Azure . Messaging . EventHubs ;
89using Azure . Messaging . EventHubs . Consumer ;
@@ -202,7 +203,8 @@ public void InitializeFromHostMetadata()
202203 { "AzureWebJobs:extensions:EventHubs:EventProcessorOptions:PrefetchCount" , "200" } ,
203204 { "AzureWebJobs:extensions:EventHubs:BatchCheckpointFrequency" , "5" } ,
204205 { "AzureWebJobs:extensions:EventHubs:PartitionManagerOptions:LeaseDuration" , "00:00:31" } ,
205- { "AzureWebJobs:extensions:EventHubs:PartitionManagerOptions:RenewInterval" , "00:00:21" }
206+ { "AzureWebJobs:extensions:EventHubs:PartitionManagerOptions:RenewInterval" , "00:00:21" } ,
207+ { "AzureWebJobs:extensions:EventHubs:InitialOffsetOptions:Type" , "FromEnd" } ,
206208 } ) ;
207209 } )
208210 . Build ( ) ;
@@ -217,6 +219,85 @@ public void InitializeFromHostMetadata()
217219 Assert . AreEqual ( 100 , options . MaxBatchSize ) ;
218220 Assert . AreEqual ( 31 , options . EventProcessorOptions . PartitionOwnershipExpirationInterval . TotalSeconds ) ;
219221 Assert . AreEqual ( 21 , options . EventProcessorOptions . LoadBalancingUpdateInterval . TotalSeconds ) ;
222+ Assert . AreEqual ( EventPosition . Latest , eventProcessorOptions . DefaultStartingPosition ) ;
223+ }
224+
225+ [ Test ]
226+ public void InitializeFromCodeRespectsFinalOffsetOptions_FromStart ( )
227+ {
228+ IHost host = new HostBuilder ( )
229+ . ConfigureDefaultTestHost ( builder =>
230+ {
231+ builder . AddEventHubs ( options => options . InitialOffsetOptions . Type = "FromEnd" ) ;
232+ } )
233+ . ConfigureServices ( services =>
234+ {
235+ services . Configure < EventHubOptions > ( options =>
236+ {
237+ options . InitialOffsetOptions . Type = "FromStart" ;
238+ } ) ;
239+ } )
240+ . Build ( ) ;
241+
242+ // Force the ExtensionRegistryFactory to run, which will initialize the EventHubConfiguration.
243+ var extensionRegistry = host . Services . GetService < IExtensionRegistry > ( ) ;
244+ var options = host . Services . GetService < IOptions < EventHubOptions > > ( ) . Value ;
245+
246+ var eventProcessorOptions = options . EventProcessorOptions ;
247+ Assert . AreEqual ( EventPosition . Earliest , eventProcessorOptions . DefaultStartingPosition ) ;
248+ }
249+
250+ [ Test ]
251+ public void InitializeFromCodeRespectsFinalOffsetOptions_FromEnd ( )
252+ {
253+ var host = new HostBuilder ( )
254+ . ConfigureDefaultTestHost ( builder =>
255+ {
256+ builder . AddEventHubs ( options => options . InitialOffsetOptions . Type = "FromStart" ) ;
257+ } )
258+ . ConfigureServices ( services =>
259+ {
260+ services . Configure < EventHubOptions > ( options =>
261+ {
262+ options . InitialOffsetOptions . Type = "FromEnd" ;
263+ } ) ;
264+ } )
265+ . Build ( ) ;
266+
267+ // Force the ExtensionRegistryFactory to run, which will initialize the EventHubConfiguration.
268+ var extensionRegistry = host . Services . GetService < IExtensionRegistry > ( ) ;
269+ var options = host . Services . GetService < IOptions < EventHubOptions > > ( ) . Value ;
270+
271+ var eventProcessorOptions = options . EventProcessorOptions ;
272+ Assert . AreEqual ( EventPosition . Latest , eventProcessorOptions . DefaultStartingPosition ) ;
273+ }
274+
275+ [ Test ]
276+ public void InitializeFromCodeRespectsFinalOffsetOptions_FromEnqueuedTime ( )
277+ {
278+ var host = new HostBuilder ( )
279+ . ConfigureDefaultTestHost ( builder =>
280+ {
281+ builder . AddEventHubs ( options => options . InitialOffsetOptions . Type = "FromStart" ) ;
282+ } )
283+ . ConfigureServices ( services =>
284+ {
285+ services . Configure < EventHubOptions > ( options =>
286+ {
287+ options . InitialOffsetOptions . Type = "FromEnqueuedTime" ;
288+ options . InitialOffsetOptions . EnqueuedTimeUTC = DateTimeOffset . UtcNow . ToString ( ) ;
289+ } ) ;
290+ } )
291+ . Build ( ) ;
292+
293+ // Force the ExtensionRegistryFactory to run, which will initialize the EventHubConfiguration.
294+ var extensionRegistry = host . Services . GetService < IExtensionRegistry > ( ) ;
295+ var options = host . Services . GetService < IOptions < EventHubOptions > > ( ) . Value ;
296+
297+ var eventProcessorOptions = options . EventProcessorOptions ;
298+ Assert . AreEqual (
299+ EventPosition . FromEnqueuedTime ( DateTime . Parse ( options . InitialOffsetOptions . EnqueuedTimeUTC ,
300+ CultureInfo . InvariantCulture ) . ToUniversalTime ( ) ) , eventProcessorOptions . DefaultStartingPosition ) ;
220301 }
221302
222303 internal static EventProcessorHostPartition GetPartitionContext ( string partitionId = "0" , string eventHubPath = "path" ,
0 commit comments