Skip to content

Commit f3f35dd

Browse files
authored
Fixes ServiceBusProcessor.DisposeAsync exceptions (Azure#18027)
This fixes an issues when a ServiceBusProcessor is disposed after it has been closed or is disposed multiple times. If a processor is used within a using block and also has Close explicitly called before the processor goes out of scope, it will throw an NRE. Regarding multiple calls to dispose, I can't think of a good use case for why somebody would do that but it is documented behavior for IAsyncDisposable. https://docs.microsoft.com/en-us/dotnet/api/system.iasyncdisposable.disposeasync?view=dotnet-plat-ext-5.0#remarks > If an object's DisposeAsync method is called more than once, the object must ignore all calls after the first one and synchronously return a successfully completed ValueTask. The object must not throw an exception if its DisposeAsync method is called multiple times.
1 parent eab29fa commit f3f35dd

File tree

3 files changed

+67
-6
lines changed

3 files changed

+67
-6
lines changed

sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ReceiverManager.cs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -72,13 +72,17 @@ public virtual async Task CloseReceiverIfNeeded(
7272
CancellationToken cancellationToken,
7373
bool forceClose = false)
7474
{
75-
try
75+
var capturedReceiver = Receiver;
76+
if (capturedReceiver != null)
7677
{
77-
await Receiver.DisposeAsync().ConfigureAwait(false);
78-
}
79-
finally
80-
{
81-
Receiver = null;
78+
try
79+
{
80+
await capturedReceiver.DisposeAsync().ConfigureAwait(false);
81+
}
82+
finally
83+
{
84+
Receiver = null;
85+
}
8286
}
8387
}
8488

sdk/servicebus/Azure.Messaging.ServiceBus/tests/Infrastructure/ServiceBusTestBase.cs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@
44
using System;
55
using System.Collections.Generic;
66
using System.Security.Cryptography;
7+
using System.Threading;
78
using System.Threading.Tasks;
9+
using Azure.Messaging.ServiceBus.Core;
810
using Moq;
911
using NUnit.Framework;
1012

@@ -79,11 +81,31 @@ protected byte[] GetRandomBuffer(long size)
7981

8082
internal ServiceBusConnection GetMockedConnection()
8183
{
84+
var mockTransportReceiver = new Mock<TransportReceiver>();
85+
mockTransportReceiver
86+
.Setup(receiver => receiver.ReceiveMessagesAsync(It.IsAny<int>(), It.IsAny<TimeSpan?>(), It.IsAny<CancellationToken>()))
87+
.Returns(async (int maximumMessageCount, TimeSpan? maxWaitTime, CancellationToken cancellationToken) =>
88+
{
89+
await Task.Delay(Timeout.Infinite, cancellationToken);
90+
throw new NotImplementedException();
91+
});
92+
8293
var mockConnection = new Mock<ServiceBusConnection>();
8394

8495
mockConnection
8596
.Setup(connection => connection.RetryOptions)
8697
.Returns(new ServiceBusRetryOptions());
98+
99+
mockConnection
100+
.Setup(connection => connection.CreateTransportReceiver(
101+
It.IsAny<string>(),
102+
It.IsAny<ServiceBusRetryPolicy>(),
103+
It.IsAny<ServiceBusReceiveMode>(),
104+
It.IsAny<uint>(),
105+
It.IsAny<string>(),
106+
It.IsAny<string>(),
107+
It.IsAny<bool>()))
108+
.Returns(mockTransportReceiver.Object);
87109
return mockConnection.Object;
88110
}
89111
}

sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorTests.cs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -281,5 +281,40 @@ public void UserSettledPropertySetCorrectlyOnException()
281281
Throws.InstanceOf<Exception>());
282282
Assert.IsFalse(msg.IsSettled);
283283
}
284+
285+
[Test]
286+
public async Task CanDisposeStartedProcessorMultipleTimes()
287+
{
288+
var processor = new ServiceBusProcessor(
289+
GetMockedConnection(),
290+
"entityPath",
291+
false,
292+
new ServiceBusPlugin[] { },
293+
new ServiceBusProcessorOptions());
294+
processor.ProcessMessageAsync += _ => Task.CompletedTask;
295+
processor.ProcessErrorAsync += _ => Task.CompletedTask;
296+
await processor.StartProcessingAsync().ConfigureAwait(false);
297+
298+
await processor.DisposeAsync();
299+
await processor.DisposeAsync();
300+
}
301+
302+
[Test]
303+
public async Task CanDisposeClosedProcessor()
304+
{
305+
var processor = new ServiceBusProcessor(
306+
GetMockedConnection(),
307+
"entityPath",
308+
false,
309+
new ServiceBusPlugin[] { },
310+
new ServiceBusProcessorOptions());
311+
312+
processor.ProcessMessageAsync += _ => Task.CompletedTask;
313+
processor.ProcessErrorAsync += _ => Task.CompletedTask;
314+
await processor.StartProcessingAsync().ConfigureAwait(false);
315+
await processor.CloseAsync();
316+
317+
await processor.DisposeAsync();
318+
}
284319
}
285320
}

0 commit comments

Comments
 (0)