@@ -27,22 +27,25 @@ namespace Microsoft.Azure.WebJobs.Host.EndToEndTests
2727 [ LiveOnly ( true ) ]
2828 public class EventHubEndToEndTests : WebJobsEventHubTestBase
2929 {
30+ private static readonly TimeSpan NoEventReadTimeout = TimeSpan . FromSeconds ( 5 ) ;
31+
3032 private static EventWaitHandle _eventWait ;
3133 private static List < string > _results ;
3234 private static DateTimeOffset _initialOffsetEnqueuedTimeUTC ;
3335
34- /// <summary>
35- /// Performs the tasks needed to initialize the test fixture. This
36- /// method runs once for the entire fixture, prior to running any tests.
37- /// </summary>
38- ///
3936 [ SetUp ]
4037 public void SetUp ( )
4138 {
4239 _results = new List < string > ( ) ;
4340 _eventWait = new ManualResetEvent ( initialState : false ) ;
4441 }
4542
43+ [ TearDown ]
44+ public void TearDown ( )
45+ {
46+ _eventWait . Dispose ( ) ;
47+ }
48+
4649 [ Test ]
4750 public async Task EventHub_PocoBinding ( )
4851 {
@@ -317,9 +320,7 @@ public async Task EventHub_PartitionKey()
317320 var ( jobHost , host ) = BuildHost < EventHubPartitionKeyTestJobs > ( ) ;
318321 using ( jobHost )
319322 {
320- _eventWait = new ManualResetEvent ( initialState : false ) ;
321323 await jobHost . CallAsync ( nameof ( EventHubPartitionKeyTestJobs . SendEvents_TestHub ) , new { input = "data" } ) ;
322-
323324 bool result = _eventWait . WaitOne ( Timeout ) ;
324325
325326 Assert . True ( result ) ;
@@ -329,7 +330,7 @@ public async Task EventHub_PartitionKey()
329330 [ Test ]
330331 public async Task EventHub_InitialOffsetFromStart ( )
331332 {
332- var producer = new EventHubProducerClient ( EventHubsTestEnvironment . Instance . EventHubsConnectionString , _eventHubScope . EventHubName ) ;
333+ await using var producer = new EventHubProducerClient ( EventHubsTestEnvironment . Instance . EventHubsConnectionString , _eventHubScope . EventHubName ) ;
333334 await producer . SendAsync ( new EventData [ ] { new EventData ( new BinaryData ( "data" ) ) } ) ;
334335
335336 var ( jobHost , host ) = BuildHost < EventHubTestInitialOffsetFromStartEndJobs > (
@@ -355,7 +356,7 @@ public async Task EventHub_InitialOffsetFromStart()
355356 public async Task EventHub_InitialOffsetFromEnd ( )
356357 {
357358 // Send a message to ensure the stream is not empty as we are trying to validate that no messages are delivered in this case
358- var producer = new EventHubProducerClient ( EventHubsTestEnvironment . Instance . EventHubsConnectionString , _eventHubScope . EventHubName ) ;
359+ await using var producer = new EventHubProducerClient ( EventHubsTestEnvironment . Instance . EventHubsConnectionString , _eventHubScope . EventHubName ) ;
359360 await producer . SendAsync ( new EventData [ ] { new EventData ( new BinaryData ( "data" ) ) } ) ;
360361
361362 var ( jobHost , host ) = BuildHost < EventHubTestInitialOffsetFromStartEndJobs > (
@@ -372,13 +373,29 @@ public async Task EventHub_InitialOffsetFromEnd()
372373 } ) ;
373374 using ( jobHost )
374375 {
375- // We don't expect to get signalled as there should be no messages received with a FromEnd initial offset
376- bool result = _eventWait . WaitOne ( Timeout ) ;
376+ // We don't expect to get signaled as there should be no messages received with a FromEnd initial offset
377+ bool result = _eventWait . WaitOne ( NoEventReadTimeout ) ;
377378 Assert . False ( result , "An event was received while none were expected." ) ;
378379
379- // send a new event which should be received
380- await producer . SendAsync ( new EventData [ ] { new EventData ( new BinaryData ( "data" ) ) } ) ;
380+ // send events which should be received. To ensure that the test is
381+ // resilient to any errors where the link needs to be reestablished,
382+ // continue sending events until cancellation takes place.
383+
384+ using var cts = new CancellationTokenSource ( ) ;
385+
386+ var sendTask = Task . Run ( async ( ) =>
387+ {
388+ while ( ! cts . IsCancellationRequested )
389+ {
390+ await producer . SendAsync ( new [ ] { new EventData ( "data" ) } , cts . Token ) . ConfigureAwait ( false ) ;
391+ }
392+ } ) ;
393+
381394 result = _eventWait . WaitOne ( Timeout ) ;
395+
396+ cts . Cancel ( ) ;
397+ try { await sendTask ; } catch { /* Ignore, we're not testing sends */ }
398+
382399 Assert . True ( result ) ;
383400 }
384401 }
@@ -387,7 +404,8 @@ public async Task EventHub_InitialOffsetFromEnd()
387404 public async Task EventHub_InitialOffsetFromEnqueuedTime ( )
388405 {
389406 await using var producer = new EventHubProducerClient ( EventHubsTestEnvironment . Instance . EventHubsConnectionString , _eventHubScope . EventHubName ) ;
390- for ( int i = 0 ; i < 3 ; i ++ )
407+
408+ for ( int i = 0 ; i < 5 ; i ++ )
391409 {
392410 // send one at a time so they will have slightly different enqueued times
393411 await producer . SendAsync ( new EventData [ ] { new EventData ( new BinaryData ( "data" ) ) } ) ;
@@ -398,9 +416,7 @@ public async Task EventHub_InitialOffsetFromEnqueuedTime()
398416 EventHubsTestEnvironment . Instance . EventHubsConnectionString ,
399417 _eventHubScope . EventHubName ) ;
400418
401- var events = consumer . ReadEventsAsync ( ) ;
402- _initialOffsetEnqueuedTimeUTC = DateTime . UtcNow ;
403- await foreach ( PartitionEvent evt in events )
419+ await foreach ( PartitionEvent evt in consumer . ReadEventsAsync ( ) )
404420 {
405421 // use the timestamp from the first event for our FromEnqueuedTime
406422 _initialOffsetEnqueuedTimeUTC = evt . Data . EnqueuedTime ;
@@ -416,9 +432,9 @@ public async Task EventHub_InitialOffsetFromEnqueuedTime()
416432 services . Configure < EventHubOptions > ( options =>
417433 {
418434 options . InitialOffsetOptions . Type = OffsetType . FromEnqueuedTime ;
435+
419436 // Reads from enqueue time are non-inclusive. To ensure that we start with the desired event, set the time slightly in the past.
420- var dto = DateTimeOffset . Parse ( _initialOffsetEnqueuedTimeUTC . AddMilliseconds ( - 150 ) . ToString ( "yyyy-MM-ddTHH:mm:ssZ" ) ) ;
421- options . InitialOffsetOptions . EnqueuedTimeUtc = dto ;
437+ options . InitialOffsetOptions . EnqueuedTimeUtc = _initialOffsetEnqueuedTimeUTC . Subtract ( TimeSpan . FromMilliseconds ( 250 ) ) ;
422438 } ) ;
423439 } ) ;
424440 ConfigureTestEventHub ( builder ) ;
@@ -450,8 +466,6 @@ public static void ProcessSingleEvent([EventHubTrigger(TestHubName, Connection =
450466 PartitionContext partitionContext ,
451467 TriggerPartitionContext triggerPartitionContext )
452468 {
453- Assert . True ( ( DateTime . Now - enqueuedTimeUtc ) . TotalSeconds < 30 ) ;
454-
455469 Assert . AreEqual ( "value1" , properties [ "TestProp1" ] ) ;
456470 Assert . AreEqual ( "value2" , properties [ "TestProp2" ] ) ;
457471
@@ -509,7 +523,6 @@ public static void ProcessSingleEvent([EventHubTrigger(TestHubName, Connection =
509523 string partitionKey , DateTime enqueuedTimeUtc , IDictionary < string , object > properties ,
510524 IDictionary < string , object > systemProperties )
511525 {
512- Assert . True ( ( DateTime . Now - enqueuedTimeUtc ) . TotalSeconds < 30 ) ;
513526 Assert . AreEqual ( "data" , evt . ToString ( ) ) ;
514527 _eventWait . Set ( ) ;
515528 }
@@ -631,7 +644,7 @@ public class EventHubPartitionKeyTestJobs
631644 // so that we get coverage for receiving events from the same partition in multiple chunks
632645 private const int EventsPerPartition = 15 ;
633646 private const int PartitionCount = 5 ;
634- private const int TotalEventsCount = EventsPerPartition * PartitionCount ;
647+ private const string PartitionKeyPrefix = "test_pk" ;
635648
636649 public static async Task SendEvents_TestHub (
637650 string input ,
@@ -647,7 +660,7 @@ public static async Task SendEvents_TestHub(
647660 for ( int i = 0 ; i < PartitionCount ; i ++ )
648661 {
649662 evt = new EventData ( Encoding . UTF8 . GetBytes ( input ) ) ;
650- tasks . Add ( client . SendAsync ( Enumerable . Repeat ( evt , EventsPerPartition ) , new SendEventOptions ( ) { PartitionKey = "test_pk" + i } ) ) ;
663+ tasks . Add ( client . SendAsync ( Enumerable . Repeat ( evt , EventsPerPartition ) , new SendEventOptions ( ) { PartitionKey = PartitionKeyPrefix + i } ) ) ;
651664 }
652665
653666 await Task . WhenAll ( tasks ) ;
@@ -658,15 +671,17 @@ public static void ProcessMultiplePartitionEvents([EventHubTrigger(TestHubName,
658671 foreach ( EventData eventData in events )
659672 {
660673 string message = Encoding . UTF8 . GetString ( eventData . Body . ToArray ( ) ) ;
661-
662674 _results . Add ( eventData . PartitionKey ) ;
663- _results . Sort ( ) ;
664675
665- // count is 1 more because we sent an event without PK
666- if ( _results . Count == TotalEventsCount + 1 && _results [ TotalEventsCount ] == "test_pk4" )
667- {
668- _eventWait . Set ( ) ;
669- }
676+ Assert . True ( eventData . PartitionKey . StartsWith ( PartitionKeyPrefix ) ) ;
677+ }
678+
679+ // The size of the batch read may not contain all events sent. If any
680+ // were read, the format of the partition key was read and verified.
681+
682+ if ( _results . Count > 0 )
683+ {
684+ _eventWait . Set ( ) ;
670685 }
671686 }
672687 }
@@ -682,8 +697,6 @@ public static void SendEvent_TestHub(string input, [EventHub(TestHubName, Connec
682697
683698 public static void ProcessSingleEvent ( [ EventHubTrigger ( TestHubName , Connection = "TestConnection" ) ] string evt , DateTime enqueuedTimeUtc , IDictionary < string , object > properties )
684699 {
685- Assert . True ( ( DateTime . Now - enqueuedTimeUtc ) . TotalSeconds < 30 ) ;
686-
687700 Assert . AreEqual ( "value1" , properties [ "TestProp1" ] ) ;
688701 Assert . AreEqual ( "value2" , properties [ "TestProp2" ] ) ;
689702
@@ -712,18 +725,20 @@ public class EventHubTestInitialOffsetFromEnqueuedTimeJobs
712725
713726 public static void ProcessMultipleEvents ( [ EventHubTrigger ( TestHubName , Connection = TestHubName ) ] EventData [ ] events )
714727 {
715- Assert . LessOrEqual ( events . Length , ExpectedEventsCount ) ;
728+ // there's potentially some level of rewind due to clock differences; allow a small delta when validating.
729+ var earliestAllowedOffset = _initialOffsetEnqueuedTimeUTC . Subtract ( TimeSpan . FromMilliseconds ( 500 ) ) ;
730+
716731 foreach ( EventData eventData in events )
717732 {
718733 string message = Encoding . UTF8 . GetString ( eventData . Body . ToArray ( ) ) ;
719734
720735 _results . Add ( eventData . EnqueuedTime . ToString ( "MM/dd/yyyy hh:mm:ss.fff tt" ) ) ;
721736
722- if ( _results . Count = = ExpectedEventsCount )
737+ if ( _results . Count > = ExpectedEventsCount )
723738 {
724739 foreach ( var result in _results )
725740 {
726- Assert . GreaterOrEqual ( DateTimeOffset . Parse ( result ) , _initialOffsetEnqueuedTimeUTC ) ;
741+ Assert . GreaterOrEqual ( DateTimeOffset . Parse ( result ) , earliestAllowedOffset ) ;
727742 }
728743 _eventWait . Set ( ) ;
729744 }
0 commit comments