-
Notifications
You must be signed in to change notification settings - Fork 1.3k
fix: SendAsync break FlushAsync when RentConnection throw exception #1762
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -30,40 +30,47 @@ public NATSTransport(ILogger<NATSTransport> logger, IConnectionPool connectionPo | |
|
|
||
| public async Task<OperateResult> SendAsync(TransportMessage message) | ||
| { | ||
| var connection = _connectionPool.RentConnection(); | ||
|
|
||
| try | ||
| { | ||
| var msg = new Msg(message.GetName(), message.Body.ToArray()); | ||
| foreach (var header in message.Headers) | ||
| var connection = _connectionPool.RentConnection(); | ||
| try | ||
| { | ||
| msg.Header[header.Key] = header.Value; | ||
| } | ||
| var msg = new Msg(message.GetName(), message.Body.ToArray()); | ||
| foreach (var header in message.Headers) | ||
| { | ||
| msg.Header[header.Key] = header.Value; | ||
| } | ||
|
|
||
| var js = connection.CreateJetStreamContext(_jetStreamOptions); | ||
| var js = connection.CreateJetStreamContext(_jetStreamOptions); | ||
|
|
||
| var builder = PublishOptions.Builder().WithMessageId(message.GetId()); | ||
| var builder = PublishOptions.Builder().WithMessageId(message.GetId()); | ||
|
|
||
| var resp = await js.PublishAsync(msg, builder.Build()); | ||
| var resp = await js.PublishAsync(msg, builder.Build()); | ||
|
|
||
| if (resp.Seq > 0) | ||
| { | ||
| _logger.LogDebug($"NATS stream message [{message.GetName()}] has been published."); | ||
| if (resp.Seq > 0) | ||
| { | ||
| _logger.LogDebug($"NATS stream message [{message.GetName()}] has been published."); | ||
|
|
||
| return OperateResult.Success; | ||
| return OperateResult.Success; | ||
| } | ||
|
|
||
| throw new PublisherSentFailedException("NATS message send failed, no consumer reply!"); | ||
| } | ||
| catch (Exception ex) | ||
| { | ||
| var warpEx = new PublisherSentFailedException(ex.Message, ex); | ||
|
||
|
|
||
| throw new PublisherSentFailedException("NATS message send failed, no consumer reply!"); | ||
| return OperateResult.Failed(warpEx); | ||
| } | ||
|
Comment on lines
+59
to
+64
|
||
| finally | ||
| { | ||
| _connectionPool.Return(connection); | ||
| } | ||
| } | ||
| catch (Exception ex) | ||
| catch (Exception e) | ||
| { | ||
| var warpEx = new PublisherSentFailedException(ex.Message, ex); | ||
|
|
||
| var warpEx = new PublisherSentFailedException(e.Message, e); | ||
|
||
| return OperateResult.Failed(warpEx); | ||
| } | ||
|
Comment on lines
+70
to
74
|
||
| finally | ||
| { | ||
| _connectionPool.Return(connection); | ||
| } | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The outer try-catch structure introduces a bug. If
RentConnection()at line 35 throws an exception, theconnectionvariable will not be initialized, but the finally block at line 67 will still attempt to call_connectionPool.Return(connection)with an uninitialized variable. Consider removing the outer try-catch and handlingRentConnection()exceptions separately, or ensure the finally block checks if the connection was successfully initialized before returning it.