From 75f74dabc6835351a2d99f623788d62e940dc5ad Mon Sep 17 00:00:00 2001 From: witskeeper Date: Thu, 30 Oct 2025 15:37:00 +0800 Subject: [PATCH] fix: SendAsync break FlushAsync when RentConnection throw exception When `_connectionPool.RentConnection() ` throw exception The `publisher.PublishAsync` will throw exception, --- src/DotNetCore.CAP.NATS/ITransport.NATS.cs | 51 ++++++++++++---------- 1 file changed, 29 insertions(+), 22 deletions(-) diff --git a/src/DotNetCore.CAP.NATS/ITransport.NATS.cs b/src/DotNetCore.CAP.NATS/ITransport.NATS.cs index 1049989e4..7804a5ca4 100644 --- a/src/DotNetCore.CAP.NATS/ITransport.NATS.cs +++ b/src/DotNetCore.CAP.NATS/ITransport.NATS.cs @@ -30,40 +30,47 @@ public NATSTransport(ILogger logger, IConnectionPool connectionPo public async Task SendAsync(TransportMessage message) { - var connection = _connectionPool.RentConnection(); - try { - var msg = new Msg(message.GetName(), message.Body.ToArray()); - foreach (var header in message.Headers) + var connection = _connectionPool.RentConnection(); + try { - msg.Header[header.Key] = header.Value; - } + var msg = new Msg(message.GetName(), message.Body.ToArray()); + foreach (var header in message.Headers) + { + msg.Header[header.Key] = header.Value; + } - var js = connection.CreateJetStreamContext(_jetStreamOptions); + var js = connection.CreateJetStreamContext(_jetStreamOptions); - var builder = PublishOptions.Builder().WithMessageId(message.GetId()); + var builder = PublishOptions.Builder().WithMessageId(message.GetId()); - var resp = await js.PublishAsync(msg, builder.Build()); + var resp = await js.PublishAsync(msg, builder.Build()); - if (resp.Seq > 0) - { - _logger.LogDebug($"NATS stream message [{message.GetName()}] has been published."); + if (resp.Seq > 0) + { + _logger.LogDebug($"NATS stream message [{message.GetName()}] has been published."); - return OperateResult.Success; + return OperateResult.Success; + } + + throw new PublisherSentFailedException("NATS message send failed, no consumer reply!"); } + catch (Exception ex) + { + var warpEx = new PublisherSentFailedException(ex.Message, ex); - throw new PublisherSentFailedException("NATS message send failed, no consumer reply!"); + return OperateResult.Failed(warpEx); + } + finally + { + _connectionPool.Return(connection); + } } - catch (Exception ex) + catch (Exception e) { - var warpEx = new PublisherSentFailedException(ex.Message, ex); - + var warpEx = new PublisherSentFailedException(e.Message, e); return OperateResult.Failed(warpEx); } - finally - { - _connectionPool.Return(connection); - } } -} \ No newline at end of file +}