Skip to content

Commit a22eaa8

Browse files
Session manager cleanup (Azure#32575)
* Cleanup ReceiverManager CloseReceiverIfNeeded * Replace CancelSessionAsync to Cancel * Address Review comments
1 parent 7d5da2a commit a22eaa8

File tree

5 files changed

+20
-19
lines changed

5 files changed

+20
-19
lines changed

sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ProcessSessionEventArgs.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ public virtual async Task SetSessionStateAsync(
140140
public virtual void ReleaseSession() =>
141141
// manager will be null if instance created using the public constructor which is exposed for testing purposes
142142
// This will be awaited when closing the receiver.
143-
_ = _manager?.CancelSessionAsync();
143+
_ = _manager?.CancelAsync();
144144

145145
///<inheritdoc cref="ServiceBusSessionReceiver.RenewSessionLockAsync(CancellationToken)"/>
146146
public virtual async Task RenewSessionLockAsync(CancellationToken cancellationToken = default)

sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ProcessSessionMessageEventArgs.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ await _sessionReceiver.DeferMessageAsync(
232232
public virtual void ReleaseSession() =>
233233
// manager will be null if instance created using the public constructor which is exposed for testing purposes
234234
// This will be awaited when closing the receiver.
235-
_ = _manager?.CancelSessionAsync();
235+
_ = _manager?.CancelAsync();
236236

237237
///<inheritdoc cref="ServiceBusSessionReceiver.RenewSessionLockAsync(CancellationToken)"/>
238238
public virtual async Task RenewSessionLockAsync(CancellationToken cancellationToken = default)

sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ReceiverManager.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,7 @@ public ReceiverManager(
5959
_scopeFactory = scopeFactory;
6060
}
6161

62-
public virtual async Task CloseReceiverIfNeeded(
63-
CancellationToken cancellationToken,
64-
bool forceClose = false)
62+
public virtual async Task CloseReceiverIfNeeded(CancellationToken cancellationToken)
6563
{
6664
var capturedReceiver = Receiver;
6765
if (capturedReceiver != null)
@@ -123,6 +121,8 @@ await RaiseExceptionReceived(
123121
}
124122
}
125123

124+
public virtual Task CancelAsync() => Task.CompletedTask;
125+
126126
protected async Task ProcessOneMessageWithinScopeAsync(ServiceBusReceivedMessage message, string activityName, CancellationToken cancellationToken)
127127
{
128128
using DiagnosticScope scope = _scopeFactory.CreateScope(activityName, DiagnosticScope.ActivityKind.Consumer);

sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusProcessor.cs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -695,7 +695,7 @@ private void ReconcileReceiverManagers(int maxConcurrentSessions)
695695
_orphanedReceiverManagers.Add(_receiverManagers[0]);
696696

697697
// these tasks will be awaited when closing the orphaned receivers as part of CloseAsync
698-
_ = ((SessionReceiverManager) _receiverManagers[0]).CancelSessionAsync();
698+
_ = _receiverManagers[0].CancelAsync();
699699
_receiverManagers.RemoveAt(0);
700700
}
701701
}
@@ -1013,9 +1013,7 @@ public virtual async Task CloseAsync(
10131013

10141014
foreach (ReceiverManager receiverManager in _receiverManagers.Concat(_orphanedReceiverManagers))
10151015
{
1016-
await receiverManager.CloseReceiverIfNeeded(
1017-
cancellationToken,
1018-
forceClose: true)
1016+
await receiverManager.CloseReceiverIfNeeded(cancellationToken)
10191017
.ConfigureAwait(false);
10201018
}
10211019
}

sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/SessionReceiverManager.cs

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -145,21 +145,24 @@ private async Task CreateReceiver(CancellationToken processorCancellationToken)
145145
}
146146
}
147147

148-
public override async Task CloseReceiverIfNeeded(
149-
CancellationToken processorCancellationToken,
150-
bool forceClose = false)
148+
public override async Task CloseReceiverIfNeeded(CancellationToken cancellationToken)
149+
{
150+
await CloseReceiverCore(forceClose: true, cancellationToken).ConfigureAwait(false);
151+
}
152+
153+
private async Task CloseReceiverCore(bool forceClose, CancellationToken cancellationToken)
151154
{
152155
bool releaseSemaphore = false;
153156
try
154157
{
155-
// Intentionally not including processor cancellation token as
158+
// Intentionally not including cancellation token as
156159
// we need to ensure that we at least attempt to close the receiver if needed.
157160
await WaitSemaphore(CancellationToken.None).ConfigureAwait(false);
158161
releaseSemaphore = true;
159162

160163
if (forceClose)
161164
{
162-
await CloseReceiver(processorCancellationToken).ConfigureAwait(false);
165+
await CloseReceiver(cancellationToken).ConfigureAwait(false);
163166
return;
164167
}
165168

@@ -180,7 +183,7 @@ public override async Task CloseReceiverIfNeeded(
180183
// as this means the session lock was lost or the user requested to close the session.
181184
_sessionCancellationSource.IsCancellationRequested)
182185
{
183-
await CloseReceiver(processorCancellationToken).ConfigureAwait(false);
186+
await CloseReceiver(cancellationToken).ConfigureAwait(false);
184187
}
185188
}
186189
}
@@ -225,7 +228,7 @@ await RaiseExceptionReceived(
225228
// cancel the automatic session lock renewal
226229
try
227230
{
228-
await CancelSessionAsync().ConfigureAwait(false);
231+
await CancelAsync().ConfigureAwait(false);
229232
}
230233
catch (Exception ex) when (ex is TaskCanceledException)
231234
{
@@ -317,7 +320,7 @@ await ProcessOneMessageWithinScopeAsync(
317320
if (sbException.Reason == ServiceBusFailureReason.SessionLockLost)
318321
{
319322
// this will be awaited when closing the receiver
320-
_ = CancelSessionAsync();
323+
_ = CancelAsync();
321324
}
322325
}
323326
await RaiseExceptionReceived(
@@ -334,7 +337,7 @@ await RaiseExceptionReceived(
334337
{
335338
if (canProcess)
336339
{
337-
await CloseReceiverIfNeeded(processorCancellationToken).ConfigureAwait(false);
340+
await CloseReceiverCore(forceClose: false, processorCancellationToken).ConfigureAwait(false);
338341
}
339342
}
340343
}
@@ -406,7 +409,7 @@ protected override async Task RaiseExceptionReceived(ProcessErrorEventArgs event
406409
}
407410
}
408411

409-
internal async Task CancelSessionAsync()
412+
public override async Task CancelAsync()
410413
{
411414
if (_sessionCancellationSource is { IsCancellationRequested: false })
412415
{

0 commit comments

Comments
 (0)