From 1f732592dc6b0b3fbe7704cd13c9998f33dbc026 Mon Sep 17 00:00:00 2001 From: Daniel Marbach Date: Sun, 8 Dec 2024 10:00:51 +0100 Subject: [PATCH] Ensure Connection and Channel cancellation token properly float into handlers * Pass cancellation through the dispatcher * Quite likely no need to quiescing multiple times * Allow WaitForShutdown to be cancelled * Do not invoke OnException when operation was cancelled --- .../AsyncConsumerDispatcher.cs | 8 +- .../ConsumerDispatcherChannelBase.cs | 98 ++++++++++---- .../IConsumerDispatcher.cs | 2 +- projects/RabbitMQ.Client/IChannel.cs | 9 ++ .../RabbitMQ.Client/IConnectionExtensions.cs | 48 ++++++- .../Impl/AsyncEventingWrapper.cs | 4 + .../Impl/AutorecoveringChannel.cs | 9 +- projects/RabbitMQ.Client/Impl/Channel.cs | 33 +++-- .../Impl/Connection.Heartbeat.cs | 2 +- .../Impl/Connection.Receive.cs | 15 ++- projects/RabbitMQ.Client/Impl/Connection.cs | 18 ++- .../Impl/SocketFrameHandler.cs | 10 +- .../RabbitMQ.Client/PublicAPI.Shipped.txt | 6 +- .../RabbitMQ.Client/PublicAPI.Unshipped.txt | 3 + projects/Test/Common/Common.csproj | 4 +- projects/Test/Integration/Integration.csproj | 2 +- .../Test/Integration/TestAsyncConsumer.cs | 4 +- .../TestAsyncConsumerCancellation.cs | 72 ++++++++++ .../Test/Integration/TestChannelShutdown.cs | 37 ++++++ .../Integration/TestConnectionShutdown.cs | 124 ++++++++++++++++-- .../SequentialIntegration.csproj | 4 +- projects/Test/Unit/Unit.csproj | 4 +- 22 files changed, 415 insertions(+), 101 deletions(-) create mode 100644 projects/Test/Integration/TestAsyncConsumerCancellation.cs diff --git a/projects/RabbitMQ.Client/ConsumerDispatching/AsyncConsumerDispatcher.cs b/projects/RabbitMQ.Client/ConsumerDispatching/AsyncConsumerDispatcher.cs index d9958fae02..cfbf2c5466 100644 --- a/projects/RabbitMQ.Client/ConsumerDispatching/AsyncConsumerDispatcher.cs +++ b/projects/RabbitMQ.Client/ConsumerDispatching/AsyncConsumerDispatcher.cs @@ -34,20 +34,20 @@ protected override async Task ProcessChannelAsync() { await work.Consumer.HandleBasicDeliverAsync( work.ConsumerTag!, work.DeliveryTag, work.Redelivered, - work.Exchange!, work.RoutingKey!, work.BasicProperties!, work.Body.Memory) + work.Exchange!, work.RoutingKey!, work.BasicProperties!, work.Body.Memory, work.CancellationToken) .ConfigureAwait(false); } break; case WorkType.Cancel: - await work.Consumer.HandleBasicCancelAsync(work.ConsumerTag!) + await work.Consumer.HandleBasicCancelAsync(work.ConsumerTag!, work.CancellationToken) .ConfigureAwait(false); break; case WorkType.CancelOk: - await work.Consumer.HandleBasicCancelOkAsync(work.ConsumerTag!) + await work.Consumer.HandleBasicCancelOkAsync(work.ConsumerTag!, work.CancellationToken) .ConfigureAwait(false); break; case WorkType.ConsumeOk: - await work.Consumer.HandleBasicConsumeOkAsync(work.ConsumerTag!) + await work.Consumer.HandleBasicConsumeOkAsync(work.ConsumerTag!, work.CancellationToken) .ConfigureAwait(false); break; case WorkType.Shutdown: diff --git a/projects/RabbitMQ.Client/ConsumerDispatching/ConsumerDispatcherChannelBase.cs b/projects/RabbitMQ.Client/ConsumerDispatching/ConsumerDispatcherChannelBase.cs index 0100fb3f32..be9ebf8027 100644 --- a/projects/RabbitMQ.Client/ConsumerDispatching/ConsumerDispatcherChannelBase.cs +++ b/projects/RabbitMQ.Client/ConsumerDispatching/ConsumerDispatcherChannelBase.cs @@ -46,6 +46,7 @@ internal abstract class ConsumerDispatcherChannelBase : ConsumerDispatcherBase, private readonly ushort _concurrency; private long _isQuiescing; private bool _disposed; + private readonly CancellationTokenSource _shutdownCts = new CancellationTokenSource(); internal ConsumerDispatcherChannelBase(Impl.Channel channel, ushort concurrency) { @@ -92,7 +93,7 @@ public async ValueTask HandleBasicConsumeOkAsync(IAsyncBasicConsumer consumer, s try { AddConsumer(consumer, consumerTag); - WorkStruct work = WorkStruct.CreateConsumeOk(consumer, consumerTag); + WorkStruct work = WorkStruct.CreateConsumeOk(consumer, consumerTag, _shutdownCts); await _writer.WriteAsync(work, cancellationToken) .ConfigureAwait(false); } @@ -113,7 +114,7 @@ public async ValueTask HandleBasicDeliverAsync(string consumerTag, ulong deliver if (false == _disposed && false == IsQuiescing) { IAsyncBasicConsumer consumer = GetConsumerOrDefault(consumerTag); - var work = WorkStruct.CreateDeliver(consumer, consumerTag, deliveryTag, redelivered, exchange, routingKey, basicProperties, body); + var work = WorkStruct.CreateDeliver(consumer, consumerTag, deliveryTag, redelivered, exchange, routingKey, basicProperties, body, _shutdownCts); await _writer.WriteAsync(work, cancellationToken) .ConfigureAwait(false); } @@ -126,7 +127,7 @@ public async ValueTask HandleBasicCancelOkAsync(string consumerTag, Cancellation if (false == _disposed && false == IsQuiescing) { IAsyncBasicConsumer consumer = GetAndRemoveConsumer(consumerTag); - WorkStruct work = WorkStruct.CreateCancelOk(consumer, consumerTag); + WorkStruct work = WorkStruct.CreateCancelOk(consumer, consumerTag, _shutdownCts); await _writer.WriteAsync(work, cancellationToken) .ConfigureAwait(false); } @@ -139,7 +140,7 @@ public async ValueTask HandleBasicCancelAsync(string consumerTag, CancellationTo if (false == _disposed && false == IsQuiescing) { IAsyncBasicConsumer consumer = GetAndRemoveConsumer(consumerTag); - WorkStruct work = WorkStruct.CreateCancel(consumer, consumerTag); + WorkStruct work = WorkStruct.CreateCancel(consumer, consumerTag, _shutdownCts); await _writer.WriteAsync(work, cancellationToken) .ConfigureAwait(false); } @@ -147,10 +148,23 @@ await _writer.WriteAsync(work, cancellationToken) public void Quiesce() { + if (IsQuiescing) + { + return; + } + Interlocked.Exchange(ref _isQuiescing, 1); + try + { + _shutdownCts.Cancel(); + } + catch + { + // ignore + } } - public async Task WaitForShutdownAsync() + public async Task WaitForShutdownAsync(CancellationToken cancellationToken) { if (_disposed) { @@ -169,7 +183,7 @@ public async Task WaitForShutdownAsync() * * await _reader.Completion.ConfigureAwait(false); */ - await _worker + await _worker.WaitAsync(cancellationToken) .ConfigureAwait(false); } catch (AggregateException aex) @@ -203,18 +217,13 @@ protected bool IsQuiescing { get { - if (Interlocked.Read(ref _isQuiescing) == 1) - { - return true; - } - - return false; + return Interlocked.Read(ref _isQuiescing) == 1; } } protected sealed override void ShutdownConsumer(IAsyncBasicConsumer consumer, ShutdownEventArgs reason) { - _writer.TryWrite(WorkStruct.CreateShutdown(consumer, reason)); + _writer.TryWrite(WorkStruct.CreateShutdown(consumer, reason, _shutdownCts)); } protected override Task InternalShutdownAsync() @@ -237,25 +246,32 @@ protected override Task InternalShutdownAsync() public readonly RentedMemory Body; public readonly ShutdownEventArgs? Reason; public readonly WorkType WorkType; + public readonly CancellationToken CancellationToken; + private readonly CancellationTokenSource? _cancellationTokenSource; - private WorkStruct(WorkType type, IAsyncBasicConsumer consumer, string consumerTag) + private WorkStruct(WorkType type, IAsyncBasicConsumer consumer, string consumerTag, CancellationToken cancellationToken) : this() { WorkType = type; Consumer = consumer; ConsumerTag = consumerTag; + CancellationToken = cancellationToken; + _cancellationTokenSource = null; } - private WorkStruct(IAsyncBasicConsumer consumer, ShutdownEventArgs reason) + private WorkStruct(IAsyncBasicConsumer consumer, ShutdownEventArgs reason, CancellationTokenSource? cancellationTokenSource) : this() { WorkType = WorkType.Shutdown; Consumer = consumer; Reason = reason; + CancellationToken = cancellationTokenSource?.Token ?? CancellationToken.None; + this._cancellationTokenSource = cancellationTokenSource; } private WorkStruct(IAsyncBasicConsumer consumer, string consumerTag, ulong deliveryTag, bool redelivered, - string exchange, string routingKey, IReadOnlyBasicProperties basicProperties, RentedMemory body) + string exchange, string routingKey, IReadOnlyBasicProperties basicProperties, RentedMemory body, + CancellationToken cancellationToken) { WorkType = WorkType.Deliver; Consumer = consumer; @@ -266,37 +282,62 @@ private WorkStruct(IAsyncBasicConsumer consumer, string consumerTag, ulong deliv RoutingKey = routingKey; BasicProperties = basicProperties; Body = body; - Reason = default; + Reason = null; + CancellationToken = cancellationToken; + _cancellationTokenSource = null; } - public static WorkStruct CreateCancel(IAsyncBasicConsumer consumer, string consumerTag) + public static WorkStruct CreateCancel(IAsyncBasicConsumer consumer, string consumerTag, CancellationTokenSource cancellationTokenSource) { - return new WorkStruct(WorkType.Cancel, consumer, consumerTag); + return new WorkStruct(WorkType.Cancel, consumer, consumerTag, cancellationTokenSource.Token); } - public static WorkStruct CreateCancelOk(IAsyncBasicConsumer consumer, string consumerTag) + public static WorkStruct CreateCancelOk(IAsyncBasicConsumer consumer, string consumerTag, CancellationTokenSource cancellationTokenSource) { - return new WorkStruct(WorkType.CancelOk, consumer, consumerTag); + return new WorkStruct(WorkType.CancelOk, consumer, consumerTag, cancellationTokenSource.Token); } - public static WorkStruct CreateConsumeOk(IAsyncBasicConsumer consumer, string consumerTag) + public static WorkStruct CreateConsumeOk(IAsyncBasicConsumer consumer, string consumerTag, CancellationTokenSource cancellationTokenSource) { - return new WorkStruct(WorkType.ConsumeOk, consumer, consumerTag); + return new WorkStruct(WorkType.ConsumeOk, consumer, consumerTag, cancellationTokenSource.Token); } - public static WorkStruct CreateShutdown(IAsyncBasicConsumer consumer, ShutdownEventArgs reason) + public static WorkStruct CreateShutdown(IAsyncBasicConsumer consumer, ShutdownEventArgs reason, CancellationTokenSource cancellationTokenSource) { - return new WorkStruct(consumer, reason); + // Create a linked CTS so the shutdown args token reflects both dispatcher cancellation and any upstream token. + CancellationTokenSource? linked = null; + try + { + if (reason.CancellationToken.CanBeCanceled) + { + linked = CancellationTokenSource.CreateLinkedTokenSource(cancellationTokenSource.Token, reason.CancellationToken); + } + } + catch + { + linked = null; + } + + CancellationToken token = linked?.Token ?? cancellationTokenSource.Token; + ShutdownEventArgs argsWithToken = reason.Exception != null ? + new ShutdownEventArgs(reason.Initiator, reason.ReplyCode, reason.ReplyText, reason.Exception, token) : + new ShutdownEventArgs(reason.Initiator, reason.ReplyCode, reason.ReplyText, reason.ClassId, reason.MethodId, reason.Cause, token); + + return new WorkStruct(consumer, argsWithToken, linked); } public static WorkStruct CreateDeliver(IAsyncBasicConsumer consumer, string consumerTag, ulong deliveryTag, bool redelivered, - string exchange, string routingKey, IReadOnlyBasicProperties basicProperties, RentedMemory body) + string exchange, string routingKey, IReadOnlyBasicProperties basicProperties, RentedMemory body, CancellationTokenSource cancellationTokenSource) { return new WorkStruct(consumer, consumerTag, deliveryTag, redelivered, - exchange, routingKey, basicProperties, body); + exchange, routingKey, basicProperties, body, cancellationTokenSource.Token); } - public void Dispose() => Body.Dispose(); + public void Dispose() + { + Body.Dispose(); + _cancellationTokenSource?.Dispose(); + } } protected enum WorkType : byte @@ -317,6 +358,7 @@ protected virtual void Dispose(bool disposing) if (disposing) { Quiesce(); + _shutdownCts.Dispose(); } } catch diff --git a/projects/RabbitMQ.Client/ConsumerDispatching/IConsumerDispatcher.cs b/projects/RabbitMQ.Client/ConsumerDispatching/IConsumerDispatcher.cs index 79676f37cf..47ed29cbd6 100644 --- a/projects/RabbitMQ.Client/ConsumerDispatching/IConsumerDispatcher.cs +++ b/projects/RabbitMQ.Client/ConsumerDispatching/IConsumerDispatcher.cs @@ -64,6 +64,6 @@ ValueTask HandleBasicDeliverAsync(string consumerTag, void Quiesce(); Task ShutdownAsync(ShutdownEventArgs reason); - Task WaitForShutdownAsync(); + Task WaitForShutdownAsync(CancellationToken cancellationToken); } } diff --git a/projects/RabbitMQ.Client/IChannel.cs b/projects/RabbitMQ.Client/IChannel.cs index b3d4cfb74a..dab7557b85 100644 --- a/projects/RabbitMQ.Client/IChannel.cs +++ b/projects/RabbitMQ.Client/IChannel.cs @@ -257,6 +257,14 @@ ValueTask BasicRejectAsync(ulong deliveryTag, bool requeue, Task CloseAsync(ushort replyCode, string replyText, bool abort, CancellationToken cancellationToken = default); + /// + /// Asynchronously close this session. + /// + /// The instance containing the close data. + /// Whether or not the close is an abort (ignoring certain exceptions). + /// + Task CloseAsync(ShutdownEventArgs reason, bool abort); + /// /// Asynchronously close this session. /// @@ -264,6 +272,7 @@ Task CloseAsync(ushort replyCode, string replyText, bool abort, /// Whether or not the close is an abort (ignoring certain exceptions). /// CancellationToken for this operation. /// + [Obsolete("7.2.0 - cancellationToken is ignored")] Task CloseAsync(ShutdownEventArgs reason, bool abort, CancellationToken cancellationToken = default); diff --git a/projects/RabbitMQ.Client/IConnectionExtensions.cs b/projects/RabbitMQ.Client/IConnectionExtensions.cs index 33b9bc64c4..bb32a00250 100644 --- a/projects/RabbitMQ.Client/IConnectionExtensions.cs +++ b/projects/RabbitMQ.Client/IConnectionExtensions.cs @@ -93,21 +93,36 @@ public static Task CloseAsync(this IConnection connection, ushort reasonCode, st /// /// /// Note that all active channels and sessions will be closed if this method is called. - /// In comparison to normal method, will not throw + /// In comparison to normal method, will not throw /// during closing connection. ///This method waits infinitely for the in-progress close operation to complete. /// public static Task AbortAsync(this IConnection connection) { - return connection.CloseAsync(Constants.ReplySuccess, "Connection close forced", InternalConstants.DefaultConnectionAbortTimeout, true, - CancellationToken.None); + return connection.CloseAsync(Constants.ReplySuccess, "Connection close forced", + InternalConstants.DefaultConnectionAbortTimeout, true, default); + } + + /// + /// Asynchronously abort this connection and all its channels. + /// + /// + /// Note that all active channels and sessions will be closed if this method is called. + /// In comparison to normal method, will not throw + /// during closing connection. + ///This method waits infinitely for the in-progress close operation to complete. + /// + public static Task AbortAsync(this IConnection connection, CancellationToken cancellationToken = default) + { + return connection.CloseAsync(Constants.ReplySuccess, "Connection close forced", + InternalConstants.DefaultConnectionAbortTimeout, true, cancellationToken); } /// /// Asynchronously abort this connection and all its channels. /// /// - /// The method behaves in the same way as , with the only + /// The method behaves in the same way as , with the only /// difference that the connection is closed with the given connection close code and message. /// /// The close code (See under "Reply Codes" in the AMQP 0-9-1 specification) @@ -118,8 +133,27 @@ public static Task AbortAsync(this IConnection connection) /// public static Task AbortAsync(this IConnection connection, ushort reasonCode, string reasonText) { - return connection.CloseAsync(reasonCode, reasonText, InternalConstants.DefaultConnectionAbortTimeout, true, - CancellationToken.None); + return connection.CloseAsync(reasonCode, reasonText, + InternalConstants.DefaultConnectionAbortTimeout, true, default); + } + + /// + /// Asynchronously abort this connection and all its channels. + /// + /// + /// The method behaves in the same way as , with the only + /// difference that the connection is closed with the given connection close code and message. + /// + /// The close code (See under "Reply Codes" in the AMQP 0-9-1 specification) + /// + /// + /// A message indicating the reason for closing the connection + /// + /// + public static Task AbortAsync(this IConnection connection, ushort reasonCode, string reasonText, CancellationToken cancellationToken = default) + { + return connection.CloseAsync(reasonCode, reasonText, + InternalConstants.DefaultConnectionAbortTimeout, true, cancellationToken); } /// @@ -127,7 +161,7 @@ public static Task AbortAsync(this IConnection connection, ushort reasonCode, st /// timeout for all the in-progress close operations to complete. /// /// - /// This method, behaves in a similar way as method with the + /// This method, behaves in a similar way as method with the /// only difference that it explicitly specifies a timeout given /// for all the in-progress close operations to complete. /// If timeout is reached and the close operations haven't finished, then socket is forced to close. diff --git a/projects/RabbitMQ.Client/Impl/AsyncEventingWrapper.cs b/projects/RabbitMQ.Client/Impl/AsyncEventingWrapper.cs index 7599129acf..f3b091b345 100644 --- a/projects/RabbitMQ.Client/Impl/AsyncEventingWrapper.cs +++ b/projects/RabbitMQ.Client/Impl/AsyncEventingWrapper.cs @@ -61,6 +61,10 @@ private readonly async Task InternalInvoke(Delegate[] handlers, object sender, T await action(sender, @event) .ConfigureAwait(false); } + catch (OperationCanceledException) + { + // Ignore cancellation exceptions + } catch (Exception exception) { if (_onException != null) diff --git a/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs b/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs index 0ea8a26993..f6b7752e7a 100644 --- a/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs +++ b/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs @@ -234,13 +234,18 @@ await _connection.DeleteRecordedChannelAsync(this, } } - public async Task CloseAsync(ShutdownEventArgs args, bool abort, + public Task CloseAsync(ShutdownEventArgs args, bool abort, CancellationToken cancellationToken) + { + return CloseAsync(args, abort); + } + + public async Task CloseAsync(ShutdownEventArgs args, bool abort) { ThrowIfDisposed(); try { - await _innerChannel.CloseAsync(args, abort, cancellationToken) + await _innerChannel.CloseAsync(args, abort) .ConfigureAwait(false); } finally diff --git a/projects/RabbitMQ.Client/Impl/Channel.cs b/projects/RabbitMQ.Client/Impl/Channel.cs index a8ee311a07..33dbe9305a 100644 --- a/projects/RabbitMQ.Client/Impl/Channel.cs +++ b/projects/RabbitMQ.Client/Impl/Channel.cs @@ -201,22 +201,23 @@ protected void TakeOver(Channel other) public Task CloseAsync(ushort replyCode, string replyText, bool abort, CancellationToken cancellationToken) { - var args = new ShutdownEventArgs(ShutdownInitiator.Application, replyCode, replyText); - return CloseAsync(args, abort, cancellationToken); + var args = new ShutdownEventArgs(ShutdownInitiator.Application, replyCode, replyText, cancellationToken: cancellationToken); + return CloseAsync(args, abort); } - public async Task CloseAsync(ShutdownEventArgs args, bool abort, + public Task CloseAsync(ShutdownEventArgs args, bool abort, CancellationToken cancellationToken) { - CancellationToken argCancellationToken = cancellationToken; - if (IsOpen) - { - // Note: we really do need to try and close this channel! - cancellationToken = CancellationToken.None; - } + return CloseAsync(args, abort); + } + + public async Task CloseAsync(ShutdownEventArgs args, bool abort) + { + CancellationToken cancellationToken = args.CancellationToken; bool enqueued = false; - var k = new ChannelCloseAsyncRpcContinuation(ContinuationTimeout, cancellationToken); + // We should really try to close the channel and therefore we don't allow this to be canceled by the user + var k = new ChannelCloseAsyncRpcContinuation(ContinuationTimeout, IsOpen ? CancellationToken.None : cancellationToken); await _rpcSemaphore.WaitAsync(k.CancellationToken) .ConfigureAwait(false); @@ -236,7 +237,7 @@ await ModelSendAsync(in method, k.CancellationToken) AssertResultIsTrue(await k); - await ConsumerDispatcher.WaitForShutdownAsync() + await ConsumerDispatcher.WaitForShutdownAsync(cancellationToken) .ConfigureAwait(false); } catch (AlreadyClosedException) @@ -265,7 +266,6 @@ await ConsumerDispatcher.WaitForShutdownAsync() MaybeDisposeContinuation(enqueued, k); _rpcSemaphore.Release(); ChannelShutdownAsync -= k.OnConnectionShutdownAsync; - argCancellationToken.ThrowIfCancellationRequested(); } } @@ -851,7 +851,7 @@ await Session.Connection.HandleConnectionBlockedAsync(reason, cancellationToken) protected async Task HandleConnectionCloseAsync(IncomingCommand cmd, CancellationToken cancellationToken) { var method = new ConnectionClose(cmd.MethodSpan); - var reason = new ShutdownEventArgs(ShutdownInitiator.Peer, method._replyCode, method._replyText, method._classId, method._methodId); + var reason = new ShutdownEventArgs(ShutdownInitiator.Peer, method._replyCode, method._replyText, method._classId, method._methodId, cancellationToken: cancellationToken); try { /* @@ -863,7 +863,7 @@ protected async Task HandleConnectionCloseAsync(IncomingCommand cmd, Cance await ModelSendAsync(in replyMethod, cancellationToken) .ConfigureAwait(false); - await Session.Connection.ClosedViaPeerAsync(reason, cancellationToken) + await Session.Connection.ClosedViaPeerAsync(reason) .ConfigureAwait(false); SetCloseReason(Session.Connection.CloseReason!); @@ -896,10 +896,9 @@ protected async Task HandleConnectionStartAsync(IncomingCommand cmd, Cance { if (m_connectionStartCell is null) { - var reason = new ShutdownEventArgs(ShutdownInitiator.Library, Constants.CommandInvalid, "Unexpected Connection.Start"); + var reason = new ShutdownEventArgs(ShutdownInitiator.Library, Constants.CommandInvalid, "Unexpected Connection.Start", cancellationToken: cancellationToken); await Session.Connection.CloseAsync(reason, false, - InternalConstants.DefaultConnectionCloseTimeout, - cancellationToken) + InternalConstants.DefaultConnectionCloseTimeout) .ConfigureAwait(false); } else diff --git a/projects/RabbitMQ.Client/Impl/Connection.Heartbeat.cs b/projects/RabbitMQ.Client/Impl/Connection.Heartbeat.cs index 3eb94272d7..5e82d2b2cc 100644 --- a/projects/RabbitMQ.Client/Impl/Connection.Heartbeat.cs +++ b/projects/RabbitMQ.Client/Impl/Connection.Heartbeat.cs @@ -109,7 +109,7 @@ private async void HeartbeatReadTimerCallback(object? state) { var eose = new EndOfStreamException($"Heartbeat missing with heartbeat == {_heartbeat} seconds"); LogCloseError(eose.Message, eose); - await HandleMainLoopExceptionAsync(new ShutdownEventArgs(ShutdownInitiator.Library, 0, "End of stream", eose)) + await HandleMainLoopExceptionAsync(new ShutdownEventArgs(ShutdownInitiator.Library, 0, "End of stream", eose, _mainLoopCts.Token)) .ConfigureAwait(false); shouldTerminate = true; } diff --git a/projects/RabbitMQ.Client/Impl/Connection.Receive.cs b/projects/RabbitMQ.Client/Impl/Connection.Receive.cs index ae67544e3e..5aa4c02816 100644 --- a/projects/RabbitMQ.Client/Impl/Connection.Receive.cs +++ b/projects/RabbitMQ.Client/Impl/Connection.Receive.cs @@ -62,7 +62,8 @@ await ReceiveLoopAsync(mainLoopToken) var ea = new ShutdownEventArgs(ShutdownInitiator.Library, Constants.InternalError, "Thread aborted (AppDomain unloaded?)", - exception: taex); + exception: taex, + cancellationToken: mainLoopToken); await HandleMainLoopExceptionAsync(ea) .ConfigureAwait(false); } @@ -73,7 +74,8 @@ await HandleMainLoopExceptionAsync(ea) var ea = new ShutdownEventArgs(ShutdownInitiator.Library, 0, "End of stream", - exception: eose); + exception: eose, + cancellationToken: mainLoopToken); await HandleMainLoopExceptionAsync(ea) .ConfigureAwait(false); } @@ -91,7 +93,8 @@ await HardProtocolExceptionHandlerAsync(hpe, mainLoopToken) var ea = new ShutdownEventArgs(ShutdownInitiator.Library, Constants.InternalError, fileLoadException.Message, - exception: fileLoadException); + exception: fileLoadException, + cancellationToken: mainLoopToken); await HandleMainLoopExceptionAsync(ea) .ConfigureAwait(false); } @@ -106,7 +109,8 @@ await HandleMainLoopExceptionAsync(ea) var ea = new ShutdownEventArgs(ShutdownInitiator.Library, Constants.InternalError, ocex.Message, - exception: ocex); + exception: ocex, + cancellationToken: mainLoopToken); await HandleMainLoopExceptionAsync(ea) .ConfigureAwait(false); } @@ -116,7 +120,8 @@ await HandleMainLoopExceptionAsync(ea) var ea = new ShutdownEventArgs(ShutdownInitiator.Library, Constants.InternalError, ex.Message, - exception: ex); + exception: ex, + cancellationToken: mainLoopToken); await HandleMainLoopExceptionAsync(ea) .ConfigureAwait(false); } diff --git a/projects/RabbitMQ.Client/Impl/Connection.cs b/projects/RabbitMQ.Client/Impl/Connection.cs index f661669af3..3a1aa6ba22 100644 --- a/projects/RabbitMQ.Client/Impl/Connection.cs +++ b/projects/RabbitMQ.Client/Impl/Connection.cs @@ -254,10 +254,9 @@ await _channel0.ConnectionOpenAsync(_config.VirtualHost, cancellationToken) { try { - var ea = new ShutdownEventArgs(ShutdownInitiator.Library, Constants.InternalError, "FailedOpen"); + var ea = new ShutdownEventArgs(ShutdownInitiator.Library, Constants.InternalError, "FailedOpen", cancellationToken: cancellationToken); await CloseAsync(ea, true, - InternalConstants.DefaultConnectionAbortTimeout, - cancellationToken).ConfigureAwait(false); + InternalConstants.DefaultConnectionAbortTimeout).ConfigureAwait(false); } catch { } @@ -299,8 +298,8 @@ internal void EnsureIsOpen() public Task CloseAsync(ushort reasonCode, string reasonText, TimeSpan timeout, bool abort, CancellationToken cancellationToken = default) { - var reason = new ShutdownEventArgs(ShutdownInitiator.Application, reasonCode, reasonText); - return CloseAsync(reason, abort, timeout, cancellationToken); + var reason = new ShutdownEventArgs(ShutdownInitiator.Application, reasonCode, reasonText, cancellationToken: cancellationToken); + return CloseAsync(reason, abort, timeout); } ///Asychronously try to close connection in a graceful way @@ -318,9 +317,9 @@ public Task CloseAsync(ushort reasonCode, string reasonText, TimeSpan timeout, b ///to complete. /// /// - internal async Task CloseAsync(ShutdownEventArgs reason, bool abort, TimeSpan timeout, CancellationToken cancellationToken) + internal async Task CloseAsync(ShutdownEventArgs reason, bool abort, TimeSpan timeout) { - CancellationToken argCancellationToken = cancellationToken; + CancellationToken cancellationToken = reason.CancellationToken; if (abort && timeout < InternalConstants.DefaultConnectionAbortTimeout) { @@ -431,12 +430,11 @@ await _frameHandler.CloseAsync(cts.Token) throw; } } - - argCancellationToken.ThrowIfCancellationRequested(); } - internal async Task ClosedViaPeerAsync(ShutdownEventArgs reason, CancellationToken cancellationToken) + internal async Task ClosedViaPeerAsync(ShutdownEventArgs reason) { + CancellationToken cancellationToken = reason.CancellationToken; if (false == SetCloseReason(reason)) { if (_closed) diff --git a/projects/RabbitMQ.Client/Impl/SocketFrameHandler.cs b/projects/RabbitMQ.Client/Impl/SocketFrameHandler.cs index 4a970f00d5..4ce54391c3 100644 --- a/projects/RabbitMQ.Client/Impl/SocketFrameHandler.cs +++ b/projects/RabbitMQ.Client/Impl/SocketFrameHandler.cs @@ -158,7 +158,7 @@ public static async Task CreateAsync(AmqpTcpEndpoint amqpTcp } SocketFrameHandler socketFrameHandler = new(amqpTcpEndpoint, socket, stream); - socketFrameHandler._writerTask = Task.Run(socketFrameHandler.WriteLoop, cancellationToken); + socketFrameHandler._writerTask = Task.Run(socketFrameHandler.WriteLoopAsync, cancellationToken); return socketFrameHandler; } @@ -236,13 +236,11 @@ public ValueTask WriteAsync(RentedMemory frames, CancellationToken cancellationT frames.Dispose(); return default; } - else - { - return _channelWriter.WriteAsync(frames, cancellationToken); - } + + return _channelWriter.WriteAsync(frames, cancellationToken); } - private async Task WriteLoop() + private async Task WriteLoopAsync() { try { diff --git a/projects/RabbitMQ.Client/PublicAPI.Shipped.txt b/projects/RabbitMQ.Client/PublicAPI.Shipped.txt index 8e5220768b..da6257103e 100644 --- a/projects/RabbitMQ.Client/PublicAPI.Shipped.txt +++ b/projects/RabbitMQ.Client/PublicAPI.Shipped.txt @@ -770,7 +770,7 @@ virtual RabbitMQ.Client.TcpClientAdapter.ReceiveTimeout.set -> void ~RabbitMQ.Client.TopologyRecoveryExceptionHandler.QueueRecoveryExceptionHandlerAsync.set -> void ~static RabbitMQ.Client.IChannelExtensions.AbortAsync(this RabbitMQ.Client.IChannel channel, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task ~static RabbitMQ.Client.IChannelExtensions.CloseAsync(this RabbitMQ.Client.IChannel channel, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task -~static RabbitMQ.Client.IConnectionExtensions.AbortAsync(this RabbitMQ.Client.IConnection connection, ushort reasonCode, string reasonText) -> System.Threading.Tasks.Task +~static RabbitMQ.Client.IConnectionExtensions.AbortAsync(this RabbitMQ.Client.IConnection connection, ushort reasonCode, string reasonText, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task ~static RabbitMQ.Client.IConnectionExtensions.CloseAsync(this RabbitMQ.Client.IConnection connection, ushort reasonCode, string reasonText, System.TimeSpan timeout) -> System.Threading.Tasks.Task ~static RabbitMQ.Client.RabbitMQActivitySource.ContextExtractor.get -> System.Func ~static RabbitMQ.Client.RabbitMQActivitySource.ContextExtractor.set -> void @@ -784,7 +784,7 @@ static RabbitMQ.Client.IChannelExtensions.ExchangeDeclareAsync(this RabbitMQ.Cli static RabbitMQ.Client.IChannelExtensions.QueueDeclareAsync(this RabbitMQ.Client.IChannel! channel, string! queue = "", bool durable = false, bool exclusive = true, bool autoDelete = true, System.Collections.Generic.IDictionary? arguments = null, bool noWait = false, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! static RabbitMQ.Client.IChannelExtensions.QueueDeleteAsync(this RabbitMQ.Client.IChannel! channel, string! queue, bool ifUnused = false, bool ifEmpty = false, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! static RabbitMQ.Client.IChannelExtensions.QueueUnbindAsync(this RabbitMQ.Client.IChannel! channel, string! queue, string! exchange, string! routingKey, System.Collections.Generic.IDictionary? arguments = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! -static RabbitMQ.Client.IConnectionExtensions.AbortAsync(this RabbitMQ.Client.IConnection! connection) -> System.Threading.Tasks.Task! +static RabbitMQ.Client.IConnectionExtensions.AbortAsync(this RabbitMQ.Client.IConnection! connection, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! static RabbitMQ.Client.IConnectionExtensions.AbortAsync(this RabbitMQ.Client.IConnection! connection, System.TimeSpan timeout) -> System.Threading.Tasks.Task! static RabbitMQ.Client.IConnectionExtensions.AbortAsync(this RabbitMQ.Client.IConnection! connection, ushort reasonCode, string! reasonText, System.TimeSpan timeout) -> System.Threading.Tasks.Task! static RabbitMQ.Client.IConnectionExtensions.CloseAsync(this RabbitMQ.Client.IConnection! connection, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! @@ -876,7 +876,7 @@ RabbitMQ.Client.Exceptions.AlreadyClosedException.AlreadyClosedException(RabbitM RabbitMQ.Client.Exceptions.OperationInterruptedException.ShutdownReason.get -> RabbitMQ.Client.Events.ShutdownEventArgs? RabbitMQ.Client.IAsyncBasicConsumer.HandleChannelShutdownAsync(object! channel, RabbitMQ.Client.Events.ShutdownEventArgs! reason) -> System.Threading.Tasks.Task! RabbitMQ.Client.IChannel.ChannelShutdownAsync -> RabbitMQ.Client.Events.AsyncEventHandler! -RabbitMQ.Client.IChannel.CloseAsync(RabbitMQ.Client.Events.ShutdownEventArgs! reason, bool abort, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! +RabbitMQ.Client.IChannel.CloseAsync(RabbitMQ.Client.Events.ShutdownEventArgs! reason, bool abort) -> System.Threading.Tasks.Task! RabbitMQ.Client.IChannel.CloseReason.get -> RabbitMQ.Client.Events.ShutdownEventArgs? RabbitMQ.Client.IConnection.CloseReason.get -> RabbitMQ.Client.Events.ShutdownEventArgs? RabbitMQ.Client.IConnection.ConnectionShutdownAsync -> RabbitMQ.Client.Events.AsyncEventHandler! diff --git a/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt b/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt index b78bae4b23..453d0f67ee 100644 --- a/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt +++ b/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt @@ -5,11 +5,14 @@ RabbitMQ.Client.Exceptions.PublishReturnException.PublishReturnException(ulong p RabbitMQ.Client.Exceptions.PublishReturnException.ReplyCode.get -> ushort RabbitMQ.Client.Exceptions.PublishReturnException.ReplyText.get -> string! RabbitMQ.Client.Exceptions.PublishReturnException.RoutingKey.get -> string! +RabbitMQ.Client.IChannel.CloseAsync(RabbitMQ.Client.Events.ShutdownEventArgs! reason, bool abort, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! RabbitMQ.Client.RabbitMQTracingOptions RabbitMQ.Client.RabbitMQTracingOptions.RabbitMQTracingOptions() -> void RabbitMQ.Client.RabbitMQTracingOptions.UsePublisherAsParent.get -> bool RabbitMQ.Client.RabbitMQTracingOptions.UsePublisherAsParent.set -> void RabbitMQ.Client.RabbitMQTracingOptions.UseRoutingKeyAsOperationName.get -> bool RabbitMQ.Client.RabbitMQTracingOptions.UseRoutingKeyAsOperationName.set -> void +static RabbitMQ.Client.IConnectionExtensions.AbortAsync(this RabbitMQ.Client.IConnection! connection) -> System.Threading.Tasks.Task! +static RabbitMQ.Client.IConnectionExtensions.AbortAsync(this RabbitMQ.Client.IConnection! connection, ushort reasonCode, string! reasonText) -> System.Threading.Tasks.Task! static RabbitMQ.Client.RabbitMQActivitySource.TracingOptions.get -> RabbitMQ.Client.RabbitMQTracingOptions! static RabbitMQ.Client.RabbitMQActivitySource.TracingOptions.set -> void diff --git a/projects/Test/Common/Common.csproj b/projects/Test/Common/Common.csproj index fbc1a81c65..1469d3c26e 100644 --- a/projects/Test/Common/Common.csproj +++ b/projects/Test/Common/Common.csproj @@ -2,13 +2,13 @@ net8.0;net472 - $(NoWarn);CA2007 + $(NoWarn);CA2007;IDE1006 true net8.0 - $(NoWarn);CA2007 + $(NoWarn);CA2007;IDE1006 true diff --git a/projects/Test/Integration/Integration.csproj b/projects/Test/Integration/Integration.csproj index 0bd80368fd..b5b08a2a66 100644 --- a/projects/Test/Integration/Integration.csproj +++ b/projects/Test/Integration/Integration.csproj @@ -2,7 +2,7 @@ net8.0;net472 - $(NoWarn);CA2007 + $(NoWarn);CA2007;IDE1006 true diff --git a/projects/Test/Integration/TestAsyncConsumer.cs b/projects/Test/Integration/TestAsyncConsumer.cs index e8e6849975..3c60969197 100644 --- a/projects/Test/Integration/TestAsyncConsumer.cs +++ b/projects/Test/Integration/TestAsyncConsumer.cs @@ -514,7 +514,7 @@ await _channel.BasicConsumeAsync(queue: queueName, autoAck: false, Assert.True(await publishSyncSource.Task); Assert.Equal(messageCount, messagesReceived); await _channel.QueueDeleteAsync(queue: queueName); - await _channel.CloseAsync(_closeArgs, false, CancellationToken.None); + await _channel.CloseAsync(_closeArgs, false); } [Fact] @@ -591,7 +591,7 @@ await _channel.BasicConsumeAsync(queue: queueName, autoAck: false, Assert.Equal((uint)0, consumerCount); } - await _channel.CloseAsync(_closeArgs, false, CancellationToken.None); + await _channel.CloseAsync(_closeArgs, false); } [Fact] diff --git a/projects/Test/Integration/TestAsyncConsumerCancellation.cs b/projects/Test/Integration/TestAsyncConsumerCancellation.cs new file mode 100644 index 0000000000..994b721ab5 --- /dev/null +++ b/projects/Test/Integration/TestAsyncConsumerCancellation.cs @@ -0,0 +1,72 @@ +using System; +using System.Threading.Tasks; +using RabbitMQ.Client; +using RabbitMQ.Client.Events; +using Xunit; +using Xunit.Abstractions; + +namespace Test.Integration +{ + public class TestAsyncConsumerCancellation : IntegrationFixture + { + public TestAsyncConsumerCancellation(ITestOutputHelper output) : base(output) + { + } + + [Fact] + public async Task TestConsumerCancellation() + { + string exchangeName = GenerateExchangeName(); + string queueName = GenerateQueueName(); + string routingKey = string.Empty; + + await _channel.ExchangeDeclareAsync(exchangeName, ExchangeType.Direct); + await _channel.QueueDeclareAsync(queueName, false, false, true, null); + await _channel.QueueBindAsync(queueName, exchangeName, routingKey, null); + + var tcsMessageReceived = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var tcsReceivedCancelled = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var tcsShutdownCancelled = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + var consumer = new AsyncEventingBasicConsumer(_channel); + consumer.ShutdownAsync += async (model, ea) => + { + try + { + await Task.Delay(TimeSpan.FromMinutes(1), ea.CancellationToken); + } + catch (OperationCanceledException) + { + tcsShutdownCancelled.SetResult(true); + } + }; + consumer.ReceivedAsync += async (model, ea) => + { + tcsMessageReceived.SetResult(true); + try + { + await Task.Delay(TimeSpan.FromMinutes(1), ea.CancellationToken); + } + catch (OperationCanceledException) + { + tcsReceivedCancelled.SetResult(true); + } + }; + await _channel.BasicConsumeAsync(queueName, false, consumer); + + //publisher + await using IChannel publisherChannel = await _conn.CreateChannelAsync(_createChannelOptions); + byte[] messageBodyBytes = "Hello, world!"u8.ToArray(); + var props = new BasicProperties(); + await publisherChannel.BasicPublishAsync(exchange: exchangeName, routingKey: string.Empty, + mandatory: false, basicProperties: props, body: messageBodyBytes); + + await WaitAsync(tcsMessageReceived, TimeSpan.FromSeconds(5), "Consumer received message"); + + await _channel.CloseAsync(); + + await WaitAsync(tcsMessageReceived, TimeSpan.FromSeconds(5), "Consumer closed"); + await WaitAsync(tcsShutdownCancelled, TimeSpan.FromSeconds(5), "Consumer closed"); + } + } +} diff --git a/projects/Test/Integration/TestChannelShutdown.cs b/projects/Test/Integration/TestChannelShutdown.cs index 3b2e6f2d3d..52337841d8 100644 --- a/projects/Test/Integration/TestChannelShutdown.cs +++ b/projects/Test/Integration/TestChannelShutdown.cs @@ -64,6 +64,43 @@ public async Task TestConsumerDispatcherShutdown() Assert.True(autorecoveringChannel.ConsumerDispatcher.IsShutdown, "dispatcher should be shut down after CloseAsync"); } + [Fact] + public async Task TestChannelClose() + { + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + _channel.ChannelShutdownAsync += (channel, args) => + { + tcs.SetResult(true); + return Task.CompletedTask; + }; + + await _channel.CloseAsync(); + await WaitAsync(tcs, TimeSpan.FromSeconds(5), "channel shutdown"); + } + + [Fact] + public async Task TestChannelCloseWithCancellation() + { + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(2)); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + _channel.ChannelShutdownAsync += async (channel, args) => + { + try + { + await Task.Delay(TimeSpan.FromMinutes(1), args.CancellationToken); + } + catch (OperationCanceledException) + { + tcs.SetResult(true); + } + }; + + await _channel.CloseAsync(cts.Token); + await WaitAsync(tcs, TimeSpan.FromSeconds(5), "channel shutdown"); + } + [Fact] public async Task TestConcurrentDisposeAsync_GH1749() { diff --git a/projects/Test/Integration/TestConnectionShutdown.cs b/projects/Test/Integration/TestConnectionShutdown.cs index 0a7a380b87..8b13cdade7 100644 --- a/projects/Test/Integration/TestConnectionShutdown.cs +++ b/projects/Test/Integration/TestConnectionShutdown.cs @@ -31,6 +31,8 @@ using System; using System.IO; +using System.Threading; +using System.Threading.Channels; using System.Threading.Tasks; using RabbitMQ.Client; using RabbitMQ.Client.Exceptions; @@ -43,6 +45,9 @@ namespace Test.Integration { public class TestConnectionShutdown : IntegrationFixture { + // default Connection.Abort() timeout and then some + private readonly TimeSpan _waitSpan = TimeSpan.FromSeconds(6); + public TestConnectionShutdown(ITestOutputHelper output) : base(output) { } @@ -58,18 +63,31 @@ public async Task TestCleanClosureWithSocketClosedOutOfBand() }; var c = (AutorecoveringConnection)_conn; - await c.CloseFrameHandlerAsync(); - + ValueTask frameHandlerCloseTask = c.CloseFrameHandlerAsync(); try { - await _conn.CloseAsync(TimeSpan.FromSeconds(4)); + await _conn.CloseAsync(_waitSpan); } catch (AlreadyClosedException ex) { Assert.IsAssignableFrom(ex.InnerException); } + catch (ChannelClosedException) + { + /* + * TODO: ideally we'd not see this exception! + */ + } - await WaitAsync(tcs, TimeSpan.FromSeconds(5), "channel shutdown"); + try + { + await WaitAllAsync(tcs, frameHandlerCloseTask); + } + finally + { + _conn = null; + _channel = null; + } } [Fact] @@ -83,12 +101,50 @@ public async Task TestAbortWithSocketClosedOutOfBand() }; var c = (AutorecoveringConnection)_conn; - await c.CloseFrameHandlerAsync(); + ValueTask frameHandlerCloseTask = c.CloseFrameHandlerAsync(); + try + { + await _conn.AbortAsync(); + await WaitAllAsync(tcs, frameHandlerCloseTask); + } + finally + { + _conn = null; + _channel = null; + } + } + + [Fact] + public async Task TestAbortWithSocketClosedOutOfBandAndCancellation() + { + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(1)); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + _channel.ChannelShutdownAsync += async (channel, args) => + { + try + { + await Task.Delay(TimeSpan.FromMinutes(1), args.CancellationToken); + } + catch (OperationCanceledException) + { + tcs.SetResult(true); + } + }; - await _conn.AbortAsync(); + var c = (AutorecoveringConnection)_conn; + ValueTask frameHandlerCloseTask = c.CloseFrameHandlerAsync(); - // default Connection.Abort() timeout and then some - await WaitAsync(tcs, TimeSpan.FromSeconds(6), "channel shutdown"); + try + { + await _conn.AbortAsync(cts.Token); + await WaitAllAsync(tcs, frameHandlerCloseTask); + } + finally + { + _conn = null; + _channel = null; + } } [Fact] @@ -134,6 +190,52 @@ public async Task TestShutdownSignalPropagationToChannels() await WaitAsync(tcs, TimeSpan.FromSeconds(3), "channel shutdown"); } + [Fact] + public async Task TestShutdownCancellation() + { + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(1)); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + _conn.ConnectionShutdownAsync += async (channel, args) => + { + try + { + await Task.Delay(TimeSpan.FromMinutes(1), args.CancellationToken); + } + catch (OperationCanceledException) + { + tcs.SetResult(true); + } + }; + + await _conn.CloseAsync(cancellationToken: cts.Token); + + await WaitAsync(tcs, TimeSpan.FromSeconds(3), "connection shutdown"); + } + + [Fact] + public async Task TestShutdownSignalPropagationWithCancellationToChannels() + { + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(1)); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + _channel.ChannelShutdownAsync += async (channel, args) => + { + try + { + await Task.Delay(TimeSpan.FromMinutes(1), args.CancellationToken); + } + catch (OperationCanceledException) + { + tcs.SetResult(true); + } + }; + + await _conn.CloseAsync(cts.Token); + + await WaitAsync(tcs, TimeSpan.FromSeconds(3), "channel shutdown"); + } + [Fact] public async Task TestShutdownSignalPropagationToChannelsUsingDispose() { @@ -174,5 +276,11 @@ public async Task TestDisposeAfterAbort_GH825() await _channel.AbortAsync(); await _channel.DisposeAsync(); } + + private async Task WaitAllAsync(TaskCompletionSource tcs, ValueTask frameHandlerCloseTask) + { + await WaitAsync(tcs, _waitSpan, "channel shutdown"); + await frameHandlerCloseTask.AsTask().WaitAsync(_waitSpan); + } } } diff --git a/projects/Test/SequentialIntegration/SequentialIntegration.csproj b/projects/Test/SequentialIntegration/SequentialIntegration.csproj index 44047e45af..d23c2d0702 100644 --- a/projects/Test/SequentialIntegration/SequentialIntegration.csproj +++ b/projects/Test/SequentialIntegration/SequentialIntegration.csproj @@ -2,13 +2,13 @@ net8.0;net472 - $(NoWarn);CA2007 + $(NoWarn);CA2007;IDE1006 true net8.0 - $(NoWarn);CA2007 + $(NoWarn);CA2007;IDE1006 true diff --git a/projects/Test/Unit/Unit.csproj b/projects/Test/Unit/Unit.csproj index 3a18c796dd..26f99f98bb 100644 --- a/projects/Test/Unit/Unit.csproj +++ b/projects/Test/Unit/Unit.csproj @@ -2,13 +2,13 @@ net8.0;net472 - $(NoWarn);CA2007 + $(NoWarn);CA2007;IDE1006 true net8.0 - $(NoWarn);CA2007 + $(NoWarn);CA2007;IDE1006 true