Skip to content

Commit b44f595

Browse files
Don't close session when closing link if cross entity transactions are enabled (Azure#28351)
* Don't close session when closing link if cross entity transactions are enabled * sender * flaky tests
1 parent 24f8132 commit b44f595

File tree

5 files changed

+69
-27
lines changed

5 files changed

+69
-27
lines changed

sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpConnectionScope.cs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1330,5 +1330,23 @@ private static void ValidateTransport(ServiceBusTransportType transport)
13301330
throw new ArgumentException(string.Format(CultureInfo.CurrentCulture, Resources.UnknownConnectionType, transport), nameof(transport));
13311331
}
13321332
}
1333+
1334+
internal void CloseLink(AmqpLink link)
1335+
{
1336+
if (!_useSingleSession)
1337+
{
1338+
link.Session?.SafeClose();
1339+
}
1340+
link.SafeClose();
1341+
}
1342+
1343+
internal void CloseLink(RequestResponseAmqpLink link)
1344+
{
1345+
if (!_useSingleSession)
1346+
{
1347+
link.Session?.SafeClose();
1348+
}
1349+
link.SafeClose();
1350+
}
13331351
}
13341352
}

sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -167,11 +167,11 @@ public AmqpReceiver(
167167
// it is okay to register the user provided cancellationToken from the AcceptNextSessionAsync call in
168168
// the fault tolerant object because session receivers are never reconnected.
169169
cancellationToken: cancellationToken),
170-
link => CloseLink(link));
170+
link => _connectionScope.CloseLink(link));
171171

172172
_managementLink = new FaultTolerantAmqpObject<RequestResponseAmqpLink>(
173173
timeout => OpenManagementLinkAsync(timeout),
174-
link => CloseLink(link));
174+
link => _connectionScope.CloseLink(link));
175175
}
176176

177177
private async Task<RequestResponseAmqpLink> OpenManagementLinkAsync(
@@ -254,18 +254,6 @@ private async Task<ReceivingAmqpLink> OpenReceiverLinkAsync(
254254
}
255255
}
256256

257-
private static void CloseLink(ReceivingAmqpLink link)
258-
{
259-
link.Session?.SafeClose();
260-
link.SafeClose();
261-
}
262-
263-
private static void CloseLink(RequestResponseAmqpLink link)
264-
{
265-
link.Session?.SafeClose();
266-
link.SafeClose();
267-
}
268-
269257
/// <summary>
270258
/// Receives a list of <see cref="ServiceBusReceivedMessage" /> from the entity using <see cref="ServiceBusReceiveMode"/> mode.
271259
/// </summary>
@@ -1325,10 +1313,10 @@ public override async Task CloseAsync(CancellationToken cancellationToken)
13251313
private void OnReceiverLinkClosed(object receiver, EventArgs e)
13261314
{
13271315
var receivingAmqpLink = (ReceivingAmqpLink)receiver;
1328-
if (_isSessionReceiver && receivingAmqpLink != null)
1316+
if (receivingAmqpLink != null)
13291317
{
13301318
Exception exception = receivingAmqpLink.GetInnerException();
1331-
if (((exception is ServiceBusException sbException) && sbException.Reason != ServiceBusFailureReason.SessionLockLost) ||
1319+
if (_isSessionReceiver && ((exception is ServiceBusException sbException) && sbException.Reason != ServiceBusFailureReason.SessionLockLost) ||
13321320
!(exception is ServiceBusException))
13331321
{
13341322
exception = new ServiceBusException(

sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpSender.cs

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -116,19 +116,11 @@ public AmqpSender(
116116

117117
_sendLink = new FaultTolerantAmqpObject<SendingAmqpLink>(
118118
timeout => CreateLinkAndEnsureSenderStateAsync(timeout, CancellationToken.None),
119-
link =>
120-
{
121-
link.Session?.SafeClose();
122-
link.SafeClose();
123-
});
119+
link => _connectionScope.CloseLink(link));
124120

125121
_managementLink = new FaultTolerantAmqpObject<RequestResponseAmqpLink>(
126122
timeout => OpenManagementLinkAsync(timeout),
127-
link =>
128-
{
129-
link.Session?.SafeClose();
130-
link.SafeClose();
131-
});
123+
link => _connectionScope.CloseLink(link));
132124
}
133125

134126
private async Task<RequestResponseAmqpLink> OpenManagementLinkAsync(

sdk/servicebus/Azure.Messaging.ServiceBus/tests/Client/ServiceBusClientLiveTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,7 @@ public async Task MetricsAreUpdatedCorrectly()
265265
await client.DisposeAsync();
266266
// The close frame does not come back from the service before the DisposeAsync
267267
// call is returned.
268-
await Task.Delay(500);
268+
await Task.Delay(1000);
269269
metrics = client.GetTransportMetrics();
270270
Assert.Greater(metrics.LastConnectionClose, thirdOpen);
271271
Assert.Greater(metrics.LastHeartBeat, firstHeartBeat);

sdk/servicebus/Azure.Messaging.ServiceBus/tests/Diagnostics/EventSourceLiveTests.cs

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -487,5 +487,49 @@ public void LogsMessageEvents()
487487
ServiceBusEventSource.PartitionKeyValueOverwritten,
488488
e => e.Payload.Contains("sessionId1") && e.Payload.Contains("sessionId2") && e.Payload.Contains("messageId"));
489489
}
490+
491+
[Test]
492+
public async Task ClosingSendLinkDoesNotCloseSessionWithCrossEntityTransactionEnabled()
493+
{
494+
await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false))
495+
{
496+
await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString,
497+
new ServiceBusClientOptions { EnableCrossEntityTransactions = true });
498+
var sender = client.CreateSender(scope.QueueName);
499+
var receiver = client.CreateReceiver(scope.QueueName);
500+
await sender.SendMessageAsync(ServiceBusTestUtilities.GetMessage());
501+
var message = await receiver.ReceiveMessageAsync();
502+
Assert.IsNotNull(message);
503+
await sender.CloseAsync();
504+
505+
// link closed event is fired asynchronously, so add a small delay
506+
await Task.Delay(TimeSpan.FromSeconds(5));
507+
508+
_listener.SingleEventById(ServiceBusEventSource.SendLinkClosedEvent, e => e.Payload.Contains(sender.Identifier));
509+
Assert.False(_listener.EventsById(ServiceBusEventSource.ReceiveLinkClosedEvent).Any(e => e.Payload.Contains(receiver.Identifier)));
510+
}
511+
}
512+
513+
[Test]
514+
public async Task ClosingReceiveLinkDoesNotCloseSessionWithCrossEntityTransactionEnabled()
515+
{
516+
await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false))
517+
{
518+
await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString,
519+
new ServiceBusClientOptions { EnableCrossEntityTransactions = true });
520+
var sender = client.CreateSender(scope.QueueName);
521+
var receiver = client.CreateReceiver(scope.QueueName);
522+
await sender.SendMessageAsync(ServiceBusTestUtilities.GetMessage());
523+
var message = await receiver.ReceiveMessageAsync();
524+
Assert.IsNotNull(message);
525+
await receiver.CloseAsync();
526+
527+
// link closed event is fired asynchronously, so add a small delay
528+
await Task.Delay(TimeSpan.FromSeconds(5));
529+
530+
_listener.SingleEventById(ServiceBusEventSource.ReceiveLinkClosedEvent, e => e.Payload.Contains(receiver.Identifier));
531+
Assert.False(_listener.EventsById(ServiceBusEventSource.SendLinkClosedEvent).Any(e => e.Payload.Contains(sender.Identifier)));
532+
}
533+
}
490534
}
491535
}

0 commit comments

Comments
 (0)