Skip to content

Commit 4006c07

Browse files
Add overrideable OnProcess* methods (Azure#19772)
* Add overrideable OnProcess* methods * PR FB
1 parent abcc17b commit 4006c07

File tree

7 files changed

+339
-176
lines changed

7 files changed

+339
-176
lines changed

sdk/servicebus/Azure.Messaging.ServiceBus/api/Azure.Messaging.ServiceBus.netstandard2.0.cs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,6 @@ protected ServiceBusProcessor() { }
203203
public virtual int MaxConcurrentCalls { get { throw null; } }
204204
public virtual int PrefetchCount { get { throw null; } }
205205
public virtual Azure.Messaging.ServiceBus.ServiceBusReceiveMode ReceiveMode { get { throw null; } }
206-
public virtual string TransactionGroup { get { throw null; } }
207206
public event System.Func<Azure.Messaging.ServiceBus.ProcessErrorEventArgs, System.Threading.Tasks.Task> ProcessErrorAsync { add { } remove { } }
208207
public event System.Func<Azure.Messaging.ServiceBus.ProcessMessageEventArgs, System.Threading.Tasks.Task> ProcessMessageAsync { add { } remove { } }
209208
public virtual System.Threading.Tasks.Task CloseAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
@@ -212,6 +211,8 @@ protected ServiceBusProcessor() { }
212211
public override bool Equals(object obj) { throw null; }
213212
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
214213
public override int GetHashCode() { throw null; }
214+
protected internal virtual System.Threading.Tasks.Task OnProcessErrorAsync(Azure.Messaging.ServiceBus.ProcessErrorEventArgs args) { throw null; }
215+
protected internal virtual System.Threading.Tasks.Task OnProcessMessageAsync(Azure.Messaging.ServiceBus.ProcessMessageEventArgs args) { throw null; }
215216
public virtual System.Threading.Tasks.Task StartProcessingAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
216217
public virtual System.Threading.Tasks.Task StopProcessingAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
217218
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
@@ -368,6 +369,7 @@ protected ServiceBusSessionProcessor() { }
368369
public virtual bool AutoCompleteMessages { get { throw null; } }
369370
public virtual string EntityPath { get { throw null; } }
370371
public virtual string FullyQualifiedNamespace { get { throw null; } }
372+
protected virtual Azure.Messaging.ServiceBus.ServiceBusProcessor InnerProcessor { get { throw null; } }
371373
public virtual bool IsClosed { get { throw null; } }
372374
public virtual bool IsProcessing { get { throw null; } }
373375
public virtual System.TimeSpan MaxAutoLockRenewalDuration { get { throw null; } }
@@ -376,7 +378,6 @@ protected ServiceBusSessionProcessor() { }
376378
public virtual int PrefetchCount { get { throw null; } }
377379
public virtual Azure.Messaging.ServiceBus.ServiceBusReceiveMode ReceiveMode { get { throw null; } }
378380
public virtual System.TimeSpan? SessionIdleTimeout { get { throw null; } }
379-
public virtual string TransactionGroup { get { throw null; } }
380381
public event System.Func<Azure.Messaging.ServiceBus.ProcessErrorEventArgs, System.Threading.Tasks.Task> ProcessErrorAsync { add { } remove { } }
381382
public event System.Func<Azure.Messaging.ServiceBus.ProcessSessionMessageEventArgs, System.Threading.Tasks.Task> ProcessMessageAsync { add { } remove { } }
382383
public event System.Func<Azure.Messaging.ServiceBus.ProcessSessionEventArgs, System.Threading.Tasks.Task> SessionClosingAsync { add { } remove { } }
@@ -387,6 +388,10 @@ protected ServiceBusSessionProcessor() { }
387388
public override bool Equals(object obj) { throw null; }
388389
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
389390
public override int GetHashCode() { throw null; }
391+
protected internal virtual System.Threading.Tasks.Task OnProcessErrorAsync(Azure.Messaging.ServiceBus.ProcessErrorEventArgs args) { throw null; }
392+
protected internal virtual System.Threading.Tasks.Task OnProcessSessionMessageAsync(Azure.Messaging.ServiceBus.ProcessSessionMessageEventArgs args) { throw null; }
393+
protected internal virtual System.Threading.Tasks.Task OnSessionClosingAsync(Azure.Messaging.ServiceBus.ProcessSessionEventArgs args) { throw null; }
394+
protected internal virtual System.Threading.Tasks.Task OnSessionInitializingAsync(Azure.Messaging.ServiceBus.ProcessSessionEventArgs args) { throw null; }
390395
public virtual System.Threading.Tasks.Task StartProcessingAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
391396
public virtual System.Threading.Tasks.Task StopProcessingAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
392397
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]

sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ReceiverManager.cs

Lines changed: 31 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -12,59 +12,44 @@
1212
namespace Azure.Messaging.ServiceBus
1313
{
1414
/// <summary>
15-
/// Represents a single receiver instance that multiple threads spawned by the
15+
/// Represents a single receiver instance that multiple tasks spawned by the
1616
/// <see cref="ServiceBusProcessor"/> may be using to receive and process messages.
1717
/// The manager will delegate to the user provided callbacks and handle automatic
1818
/// locking of messages.
1919
/// </summary>
2020
internal class ReceiverManager
2121
{
2222
protected virtual ServiceBusReceiver Receiver { get; set; }
23-
protected readonly ServiceBusConnection _connection;
24-
protected readonly string _fullyQualifiedNamespace;
25-
protected readonly string _entityPath;
26-
protected readonly string _identifier;
23+
24+
protected readonly ServiceBusProcessor Processor;
2725
protected readonly TimeSpan? _maxReceiveWaitTime;
2826
private readonly ServiceBusReceiverOptions _receiverOptions;
29-
protected readonly ServiceBusProcessorOptions _processorOptions;
30-
private readonly Func<ProcessErrorEventArgs, Task> _errorHandler;
31-
private readonly Func<ProcessMessageEventArgs, Task> _messageHandler;
27+
protected readonly ServiceBusProcessorOptions ProcessorOptions;
3228
protected readonly EntityScopeFactory _scopeFactory;
3329
protected readonly IList<ServiceBusPlugin> _plugins;
3430

35-
protected bool AutoRenewLock => _processorOptions.MaxAutoLockRenewalDuration > TimeSpan.Zero;
31+
protected bool AutoRenewLock => ProcessorOptions.MaxAutoLockRenewalDuration > TimeSpan.Zero;
3632

3733
public ReceiverManager(
38-
ServiceBusConnection connection,
39-
string fullyQualifiedNamespace,
40-
string entityPath,
41-
string identifier,
42-
ServiceBusProcessorOptions processorOptions,
43-
Func<ProcessMessageEventArgs, Task> messageHandler,
44-
Func<ProcessErrorEventArgs, Task> errorHandler,
34+
ServiceBusProcessor processor,
4535
EntityScopeFactory scopeFactory,
4636
IList<ServiceBusPlugin> plugins)
4737
{
48-
_connection = connection;
49-
_fullyQualifiedNamespace = fullyQualifiedNamespace;
50-
_entityPath = entityPath;
51-
_processorOptions = processorOptions;
38+
Processor = processor;
39+
ProcessorOptions = processor.Options;
5240
_receiverOptions = new ServiceBusReceiverOptions
5341
{
54-
ReceiveMode = _processorOptions.ReceiveMode,
55-
PrefetchCount = _processorOptions.PrefetchCount,
42+
ReceiveMode = ProcessorOptions.ReceiveMode,
43+
PrefetchCount = ProcessorOptions.PrefetchCount,
5644
};
57-
_maxReceiveWaitTime = _processorOptions.MaxReceiveWaitTime;
58-
_identifier = identifier;
45+
_maxReceiveWaitTime = ProcessorOptions.MaxReceiveWaitTime;
5946
_plugins = plugins;
6047
Receiver = new ServiceBusReceiver(
61-
connection: _connection,
62-
entityPath: _entityPath,
48+
connection: Processor.Connection,
49+
entityPath: Processor.EntityPath,
6350
isSessionEntity: false,
6451
plugins: _plugins,
6552
options: _receiverOptions);
66-
_errorHandler = errorHandler;
67-
_messageHandler = messageHandler;
6853
_scopeFactory = scopeFactory;
6954
}
7055

@@ -121,8 +106,8 @@ await RaiseExceptionReceived(
121106
new ProcessErrorEventArgs(
122107
ex,
123108
errorSource,
124-
_fullyQualifiedNamespace,
125-
_entityPath,
109+
Processor.FullyQualifiedNamespace,
110+
Processor.EntityPath,
126111
cancellationToken))
127112
.ConfigureAwait(false);
128113
}
@@ -147,12 +132,6 @@ await ProcessOneMessage(
147132
}
148133
}
149134

150-
/// <summary>
151-
///
152-
/// </summary>
153-
/// <param name="message"></param>
154-
/// <param name="cancellationToken"></param>
155-
/// <returns></returns>
156135
private async Task ProcessOneMessage(
157136
ServiceBusReceivedMessage message,
158137
CancellationToken cancellationToken)
@@ -177,18 +156,18 @@ private async Task ProcessOneMessage(
177156

178157
try
179158
{
180-
ServiceBusEventSource.Log.ProcessorMessageHandlerStart(_identifier, message.SequenceNumber);
159+
ServiceBusEventSource.Log.ProcessorMessageHandlerStart(Processor.Identifier, message.SequenceNumber);
181160
await OnMessageHandler(message, cancellationToken).ConfigureAwait(false);
182-
ServiceBusEventSource.Log.ProcessorMessageHandlerComplete(_identifier, message.SequenceNumber);
161+
ServiceBusEventSource.Log.ProcessorMessageHandlerComplete(Processor.Identifier, message.SequenceNumber);
183162
}
184163
catch (Exception ex)
185164
{
186-
ServiceBusEventSource.Log.ProcessorMessageHandlerException(_identifier, message.SequenceNumber, ex.ToString());
165+
ServiceBusEventSource.Log.ProcessorMessageHandlerException(Processor.Identifier, message.SequenceNumber, ex.ToString());
187166
throw;
188167
}
189168

190169
if (Receiver.ReceiveMode == ServiceBusReceiveMode.PeekLock &&
191-
_processorOptions.AutoCompleteMessages &&
170+
ProcessorOptions.AutoCompleteMessages &&
192171
!message.IsSettled)
193172
{
194173
errorSource = ServiceBusErrorSource.Complete;
@@ -214,8 +193,8 @@ await RaiseExceptionReceived(
214193
new ProcessErrorEventArgs(
215194
ex,
216195
errorSource,
217-
_fullyQualifiedNamespace,
218-
_entityPath,
196+
Processor.FullyQualifiedNamespace,
197+
Processor.EntityPath,
219198
cancellationToken))
220199
.ConfigureAwait(false);
221200

@@ -244,8 +223,8 @@ await RaiseExceptionReceived(
244223
new ProcessErrorEventArgs(
245224
exception,
246225
ServiceBusErrorSource.Abandon,
247-
_fullyQualifiedNamespace,
248-
_entityPath,
226+
Processor.FullyQualifiedNamespace,
227+
Processor.EntityPath,
249228
cancellationToken))
250229
.ConfigureAwait(false);
251230
}
@@ -264,26 +243,20 @@ protected virtual async Task OnMessageHandler(ServiceBusReceivedMessage message,
264243
message,
265244
Receiver,
266245
processorCancellationToken);
267-
await _messageHandler(args).ConfigureAwait(false);
246+
await Processor.OnProcessMessageAsync(args).ConfigureAwait(false);
268247
}
269248

270-
/// <summary>
271-
///
272-
/// </summary>
273-
/// <param name="message"></param>
274-
/// <param name="cancellationTokenSource"></param>
275-
/// <returns></returns>
276249
private async Task RenewMessageLock(
277250
ServiceBusReceivedMessage message,
278251
CancellationTokenSource cancellationTokenSource)
279252
{
280-
cancellationTokenSource.CancelAfter(_processorOptions.MaxAutoLockRenewalDuration);
253+
cancellationTokenSource.CancelAfter(ProcessorOptions.MaxAutoLockRenewalDuration);
281254
CancellationToken cancellationToken = cancellationTokenSource.Token;
282255
while (!cancellationToken.IsCancellationRequested)
283256
{
284257
try
285258
{
286-
ServiceBusEventSource.Log.ProcessorRenewMessageLockStart(_identifier, 1, message.LockToken);
259+
ServiceBusEventSource.Log.ProcessorRenewMessageLockStart(Processor.Identifier, 1, message.LockToken);
287260
TimeSpan delay = CalculateRenewDelay(message.LockedUntil);
288261

289262
// We're awaiting the task created by 'ContinueWith' to avoid awaiting the Delay task which may be canceled
@@ -300,11 +273,11 @@ private async Task RenewMessageLock(
300273
}
301274

302275
await Receiver.RenewMessageLockAsync(message, cancellationToken).ConfigureAwait(false);
303-
ServiceBusEventSource.Log.ProcessorRenewMessageLockComplete(_identifier);
276+
ServiceBusEventSource.Log.ProcessorRenewMessageLockComplete(Processor.Identifier);
304277
}
305278
catch (Exception ex) when (!(ex is TaskCanceledException))
306279
{
307-
ServiceBusEventSource.Log.ProcessorRenewMessageLockException(_identifier, ex.ToString());
280+
ServiceBusEventSource.Log.ProcessorRenewMessageLockException(Processor.Identifier, ex.ToString());
308281
await HandleRenewLockException(ex, cancellationToken).ConfigureAwait(false);
309282

310283
// if the error was not transient, break out of the loop
@@ -363,22 +336,17 @@ await RaiseExceptionReceived(
363336
new ProcessErrorEventArgs(
364337
ex,
365338
ServiceBusErrorSource.RenewLock,
366-
_fullyQualifiedNamespace,
367-
_entityPath,
339+
Processor.FullyQualifiedNamespace,
340+
Processor.EntityPath,
368341
cancellationToken)).ConfigureAwait(false);
369342
}
370343
}
371344

372-
/// <summary>
373-
///
374-
/// </summary>
375-
/// <param name="eventArgs"></param>
376-
/// <returns></returns>
377345
protected async Task RaiseExceptionReceived(ProcessErrorEventArgs eventArgs)
378346
{
379347
try
380348
{
381-
await _errorHandler(eventArgs).ConfigureAwait(false);
349+
await Processor.OnProcessErrorAsync(eventArgs).ConfigureAwait(false);
382350
}
383351
catch (Exception exception)
384352
{
@@ -387,11 +355,6 @@ protected async Task RaiseExceptionReceived(ProcessErrorEventArgs eventArgs)
387355
}
388356
}
389357

390-
/// <summary>
391-
///
392-
/// </summary>
393-
/// <param name="lockedUntil"></param>
394-
/// <returns></returns>
395358
protected static TimeSpan CalculateRenewDelay(DateTimeOffset lockedUntil)
396359
{
397360
var remainingTime = lockedUntil - DateTimeOffset.UtcNow;

0 commit comments

Comments
 (0)