3232using System ;
3333using System . IO ;
3434using System . Threading ;
35+ using System . Threading . Channels ;
3536using System . Threading . Tasks ;
3637using RabbitMQ . Client ;
3738using RabbitMQ . Client . Exceptions ;
@@ -44,6 +45,9 @@ namespace Test.Integration
4445{
4546 public class TestConnectionShutdown : IntegrationFixture
4647 {
48+ // default Connection.Abort() timeout and then some
49+ private readonly TimeSpan _waitSpan = TimeSpan . FromSeconds ( 6 ) ;
50+
4751 public TestConnectionShutdown ( ITestOutputHelper output ) : base ( output )
4852 {
4953 }
@@ -59,18 +63,31 @@ public async Task TestCleanClosureWithSocketClosedOutOfBand()
5963 } ;
6064
6165 var c = ( AutorecoveringConnection ) _conn ;
62- await c . CloseFrameHandlerAsync ( ) ;
63-
66+ ValueTask frameHandlerCloseTask = c . CloseFrameHandlerAsync ( ) ;
6467 try
6568 {
66- await _conn . CloseAsync ( TimeSpan . FromSeconds ( 4 ) ) ;
69+ await _conn . CloseAsync ( _waitSpan ) ;
6770 }
6871 catch ( AlreadyClosedException ex )
6972 {
7073 Assert . IsAssignableFrom < IOException > ( ex . InnerException ) ;
7174 }
75+ catch ( ChannelClosedException )
76+ {
77+ /*
78+ * TODO: ideally we'd not see this exception!
79+ */
80+ }
7281
73- await WaitAsync ( tcs , TimeSpan . FromSeconds ( 5 ) , "channel shutdown" ) ;
82+ try
83+ {
84+ await WaitAllAsync ( tcs , frameHandlerCloseTask ) ;
85+ }
86+ finally
87+ {
88+ _conn = null ;
89+ _channel = null ;
90+ }
7491 }
7592
7693 [ Fact ]
@@ -84,12 +101,17 @@ public async Task TestAbortWithSocketClosedOutOfBand()
84101 } ;
85102
86103 var c = ( AutorecoveringConnection ) _conn ;
87- await c . CloseFrameHandlerAsync ( ) ;
88-
89- await _conn . AbortAsync ( ) ;
90-
91- // default Connection.Abort() timeout and then some
92- await WaitAsync ( tcs , TimeSpan . FromSeconds ( 6 ) , "channel shutdown" ) ;
104+ ValueTask frameHandlerCloseTask = c . CloseFrameHandlerAsync ( ) ;
105+ try
106+ {
107+ await _conn . AbortAsync ( ) ;
108+ await WaitAllAsync ( tcs , frameHandlerCloseTask ) ;
109+ }
110+ finally
111+ {
112+ _conn = null ;
113+ _channel = null ;
114+ }
93115 }
94116
95117 [ Fact ]
@@ -111,12 +133,18 @@ public async Task TestAbortWithSocketClosedOutOfBandAndCancellation()
111133 } ;
112134
113135 var c = ( AutorecoveringConnection ) _conn ;
114- await c . CloseFrameHandlerAsync ( ) ;
115-
116- await _conn . AbortAsync ( cts . Token ) ;
136+ ValueTask frameHandlerCloseTask = c . CloseFrameHandlerAsync ( ) ;
117137
118- // default Connection.Abort() timeout and then some
119- await WaitAsync ( tcs , TimeSpan . FromSeconds ( 6 ) , "channel shutdown" ) ;
138+ try
139+ {
140+ await _conn . AbortAsync ( cts . Token ) ;
141+ await WaitAllAsync ( tcs , frameHandlerCloseTask ) ;
142+ }
143+ finally
144+ {
145+ _conn = null ;
146+ _channel = null ;
147+ }
120148 }
121149
122150 [ Fact ]
@@ -248,5 +276,11 @@ public async Task TestDisposeAfterAbort_GH825()
248276 await _channel . AbortAsync ( ) ;
249277 await _channel . DisposeAsync ( ) ;
250278 }
279+
280+ private async Task WaitAllAsync ( TaskCompletionSource < bool > tcs , ValueTask frameHandlerCloseTask )
281+ {
282+ await WaitAsync ( tcs , _waitSpan , "channel shutdown" ) ;
283+ await frameHandlerCloseTask . AsTask ( ) . WaitAsync ( _waitSpan ) ;
284+ }
251285 }
252286}
0 commit comments