Skip to content

Commit aca2b75

Browse files
Remove link from active links when creation fails (Azure#23628)
* Remove link from active links when creation fails * PR FB
1 parent 31431b9 commit aca2b75

File tree

1 file changed

+27
-12
lines changed

1 file changed

+27
-12
lines changed

sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpConnectionScope.cs

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -514,6 +514,7 @@ protected virtual async Task<RequestResponseAmqpLink> CreateManagementLinkAsync(
514514

515515
var session = default(AmqpSession);
516516
var stopWatch = ValueStopwatch.StartNew();
517+
RequestResponseAmqpLink link = null;
517518

518519
try
519520
{
@@ -548,7 +549,7 @@ protected virtual async Task<RequestResponseAmqpLink> CreateManagementLinkAsync(
548549
identifier: identifier)
549550
.ConfigureAwait(false);
550551

551-
var link = new RequestResponseAmqpLink(
552+
link = new RequestResponseAmqpLink(
552553
AmqpClientConstants.EntityTypeManagement,
553554
session,
554555
entityPath,
@@ -575,11 +576,13 @@ protected virtual async Task<RequestResponseAmqpLink> CreateManagementLinkAsync(
575576

576577
// Track the link before returning it, so that it can be managed with the scope.
577578

578-
BeginTrackingLinkAsActive(entityPath, link, refreshTimer);
579+
StartTrackingLinkAsActive(entityPath, link, refreshTimer);
579580
return link;
580581
}
581582
catch (Exception exception)
582583
{
584+
StopTrackingLinkAsActive(link);
585+
583586
// Aborting the session will perform any necessary cleanup of
584587
// the associated link as well.
585588

@@ -626,6 +629,7 @@ protected virtual async Task<ReceivingAmqpLink> CreateReceivingLinkAsync(
626629

627630
var session = default(AmqpSession);
628631
var stopWatch = ValueStopwatch.StartNew();
632+
ReceivingAmqpLink link = null;
629633

630634
try
631635
{
@@ -668,7 +672,7 @@ protected virtual async Task<ReceivingAmqpLink> CreateReceivingLinkAsync(
668672
OperationTimeout = _operationTimeout
669673
};
670674

671-
var link = new ReceivingAmqpLink(linkSettings);
675+
link = new ReceivingAmqpLink(linkSettings);
672676
linkSettings.LinkName = $"{connection.Settings.ContainerId};{connection.Identifier}:{session.Identifier}:{link.Identifier}:{linkSettings.Source.ToString()}";
673677

674678
link.AttachTo(session);
@@ -694,14 +698,14 @@ protected virtual async Task<ReceivingAmqpLink> CreateReceivingLinkAsync(
694698

695699
// Track the link before returning it, so that it can be managed with the scope.
696700

697-
BeginTrackingLinkAsActive(entityPath, link, refreshTimer);
701+
StartTrackingLinkAsActive(entityPath, link, refreshTimer);
698702
return link;
699703
}
700704
catch (Exception exception)
701705
{
706+
StopTrackingLinkAsActive(link);
702707
// Aborting the session will perform any necessary cleanup of
703708
// the associated link as well.
704-
705709
session?.Abort();
706710
ExceptionDispatchInfo.Capture(AmqpExceptionHelper.TranslateException(
707711
exception,
@@ -764,6 +768,7 @@ protected virtual async Task<SendingAmqpLink> CreateSendingLinkAsync(
764768

765769
var session = default(AmqpSession);
766770
var stopWatch = ValueStopwatch.StartNew();
771+
SendingAmqpLink link = null;
767772

768773
ValidateCanCreateSenderLink(entityPath);
769774

@@ -809,7 +814,7 @@ protected virtual async Task<SendingAmqpLink> CreateSendingLinkAsync(
809814

810815
linkSettings.AddProperty(AmqpClientConstants.TimeoutName, (uint)timeout.CalculateRemaining(stopWatch.GetElapsedTime()).TotalMilliseconds);
811816

812-
var link = new SendingAmqpLink(linkSettings);
817+
link = new SendingAmqpLink(linkSettings);
813818
linkSettings.LinkName = $"{ Id };{ connection.Identifier }:{ session.Identifier }:{ link.Identifier }";
814819
link.AttachTo(session);
815820

@@ -835,11 +840,13 @@ protected virtual async Task<SendingAmqpLink> CreateSendingLinkAsync(
835840

836841
// Track the link before returning it, so that it can be managed with the scope.
837842

838-
BeginTrackingLinkAsActive(entityPath, link, refreshTimer);
843+
StartTrackingLinkAsActive(entityPath, link, refreshTimer);
839844
return link;
840845
}
841846
catch (Exception exception)
842847
{
848+
StopTrackingLinkAsActive(link);
849+
843850
// Aborting the session will perform any necessary cleanup of
844851
// the associated link as well.
845852

@@ -895,7 +902,7 @@ private void ValidateCanCreateSenderLink(string entityPath)
895902
/// for active tracking; no assumptions are made about the open/connected state of the link nor are
896903
/// its communication properties modified.
897904
/// </remarks>
898-
protected virtual void BeginTrackingLinkAsActive(
905+
protected virtual void StartTrackingLinkAsActive(
899906
string entityPath,
900907
AmqpObject link,
901908
Timer authorizationRefreshTimer = null)
@@ -930,17 +937,25 @@ protected virtual void BeginTrackingLinkAsActive(
930937

931938
closeHandler = (snd, args) =>
932939
{
933-
ActiveLinks.TryRemove(link, out var timer);
934-
935-
timer?.Change(Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan);
936-
timer?.Dispose();
940+
StopTrackingLinkAsActive(link);
937941

938942
link.Closed -= closeHandler;
939943
};
940944

941945
link.Closed += closeHandler;
942946
}
943947

948+
private void StopTrackingLinkAsActive(AmqpObject link)
949+
{
950+
if (link != null)
951+
{
952+
ActiveLinks.TryRemove(link, out var timer);
953+
954+
timer?.Change(Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan);
955+
timer?.Dispose();
956+
}
957+
}
958+
944959
/// <summary>
945960
/// Performs the tasks needed to close a connection.
946961
/// </summary>

0 commit comments

Comments
 (0)