Skip to content

Commit 68bf6c9

Browse files
Add MessageActions/SessionMessageActions (Azure#19074)
* Add MessageActions/SessionMessageActions * PR FB * Export API
1 parent f0cd63f commit 68bf6c9

18 files changed

+390
-137
lines changed

sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/api/Microsoft.Azure.WebJobs.Extensions.ServiceBus.netstandard2.0.cs

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,8 @@ public enum EntityType
5252
public partial class MessageProcessor
5353
{
5454
public MessageProcessor(Azure.Messaging.ServiceBus.ServiceBusProcessor processor) { }
55-
public virtual System.Threading.Tasks.Task<bool> BeginProcessingMessageAsync(Azure.Messaging.ServiceBus.ServiceBusReceiver receiver, Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, System.Threading.CancellationToken cancellationToken) { throw null; }
56-
public virtual System.Threading.Tasks.Task CompleteProcessingMessageAsync(Azure.Messaging.ServiceBus.ServiceBusReceiver receiver, Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, Microsoft.Azure.WebJobs.Host.Executors.FunctionResult result, System.Threading.CancellationToken cancellationToken) { throw null; }
55+
public virtual System.Threading.Tasks.Task<bool> BeginProcessingMessageAsync(Microsoft.Azure.WebJobs.ServiceBus.ServiceBusMessageActions messageActions, Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, System.Threading.CancellationToken cancellationToken) { throw null; }
56+
public virtual System.Threading.Tasks.Task CompleteProcessingMessageAsync(Microsoft.Azure.WebJobs.ServiceBus.ServiceBusMessageActions messageActions, Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, Microsoft.Azure.WebJobs.Host.Executors.FunctionResult result, System.Threading.CancellationToken cancellationToken) { throw null; }
5757
}
5858
public partial class MessagingProvider
5959
{
@@ -65,6 +65,15 @@ public MessagingProvider(Microsoft.Extensions.Options.IOptions<Microsoft.Azure.W
6565
public virtual Azure.Messaging.ServiceBus.ServiceBusProcessor CreateProcessor(string entityPath, string connectionString) { throw null; }
6666
public virtual Microsoft.Azure.WebJobs.ServiceBus.SessionMessageProcessor CreateSessionMessageProcessor(string entityPath, string connectionString) { throw null; }
6767
}
68+
public partial class ServiceBusMessageActions
69+
{
70+
internal ServiceBusMessageActions() { }
71+
public virtual System.Threading.Tasks.Task AbandonMessageAsync(Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, System.Collections.Generic.IDictionary<string, object> propertiesToModify = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
72+
public virtual System.Threading.Tasks.Task CompleteMessageAsync(Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
73+
public virtual System.Threading.Tasks.Task DeadLetterMessageAsync(Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, System.Collections.Generic.IDictionary<string, object> propertiesToModify = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
74+
public virtual System.Threading.Tasks.Task DeadLetterMessageAsync(Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, string deadLetterReason, string deadLetterErrorDescription = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
75+
public virtual System.Threading.Tasks.Task DeferMessageAsync(Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, System.Collections.Generic.IDictionary<string, object> propertiesToModify = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
76+
}
6877
public partial class ServiceBusOptions : Microsoft.Azure.WebJobs.Hosting.IOptionsFormatter
6978
{
7079
public ServiceBusOptions() { }
@@ -75,13 +84,19 @@ public ServiceBusOptions() { }
7584
public int MaxConcurrentCalls { get { throw null; } set { } }
7685
public int MaxConcurrentSessions { get { throw null; } set { } }
7786
public int MaxMessages { get { throw null; } set { } }
78-
public System.TimeSpan? MaxWaitTime { get { throw null; } set { } }
7987
public int PrefetchCount { get { throw null; } set { } }
8088
public Azure.Messaging.ServiceBus.ServiceBusRetryOptions RetryOptions { get { throw null; } set { } }
89+
public System.TimeSpan? SessionIdleTimeout { get { throw null; } set { } }
8190
public Azure.Messaging.ServiceBus.ServiceBusTransportType TransportType { get { throw null; } set { } }
8291
public System.Net.IWebProxy WebProxy { get { throw null; } set { } }
8392
public string Format() { throw null; }
8493
}
94+
public partial class ServiceBusSessionMessageActions : Microsoft.Azure.WebJobs.ServiceBus.ServiceBusMessageActions
95+
{
96+
internal ServiceBusSessionMessageActions() { }
97+
public virtual System.Threading.Tasks.Task<System.BinaryData> GetSessionStateAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
98+
public virtual System.Threading.Tasks.Task SetSessionStateAsync(System.BinaryData sessionState, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
99+
}
85100
public partial class ServiceBusWebJobsStartup : Microsoft.Azure.WebJobs.Hosting.IWebJobsStartup
86101
{
87102
public ServiceBusWebJobsStartup() { }
@@ -90,8 +105,8 @@ public void Configure(Microsoft.Azure.WebJobs.IWebJobsBuilder builder) { }
90105
public partial class SessionMessageProcessor
91106
{
92107
public SessionMessageProcessor(Azure.Messaging.ServiceBus.ServiceBusSessionProcessor processor) { }
93-
public virtual System.Threading.Tasks.Task<bool> BeginProcessingMessageAsync(Azure.Messaging.ServiceBus.ServiceBusSessionReceiver receiver, Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, System.Threading.CancellationToken cancellationToken) { throw null; }
94-
public virtual System.Threading.Tasks.Task CompleteProcessingMessageAsync(Azure.Messaging.ServiceBus.ServiceBusSessionReceiver receiver, Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, Microsoft.Azure.WebJobs.Host.Executors.FunctionResult result, System.Threading.CancellationToken cancellationToken) { throw null; }
108+
public virtual System.Threading.Tasks.Task<bool> BeginProcessingMessageAsync(Microsoft.Azure.WebJobs.ServiceBus.ServiceBusSessionMessageActions sessionActions, Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, System.Threading.CancellationToken cancellationToken) { throw null; }
109+
public virtual System.Threading.Tasks.Task CompleteProcessingMessageAsync(Microsoft.Azure.WebJobs.ServiceBus.ServiceBusSessionMessageActions sessionActions, Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, Microsoft.Azure.WebJobs.Host.Executors.FunctionResult result, System.Threading.CancellationToken cancellationToken) { throw null; }
95110
}
96111
}
97112
namespace Microsoft.Extensions.Hosting

sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Config/ServiceBusHostBuilderExtensions.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,8 @@ public static IWebJobsBuilder AddServiceBus(this IWebJobsBuilder builder, Action
7575
"SessionHandlerOptions:MaxConcurrentSessions",
7676
options.MaxConcurrentSessions);
7777

78+
options.SessionIdleTimeout = section.GetValue("SessionHandlerOptions:MessageWaitTime", options.SessionIdleTimeout);
79+
7880
section.Bind(options);
7981

8082
configure(options);

sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Config/ServiceBusOptions.cs

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -131,19 +131,20 @@ public int MaxConcurrentSessions
131131
/// </summary>
132132
public Func<ProcessErrorEventArgs, Task> ExceptionHandler { get; set; }
133133

134-
/// <summary>
135-
/// Gets or sets an optional <see cref="TimeSpan"/> specifying the maximum time to wait when attempting to receive messages.
136-
/// If not specified, the <see cref="ServiceBusRetryOptions.TryTimeout"/> will be used. This only applies for functions that receive
137-
/// a batch of messages.
138-
/// </summary>
139-
public TimeSpan? MaxWaitTime { get; set; } // TODO this should probably be exposed in the Processor as well and would then also apply for functions that receive a single message.
140-
141134
/// <summary>
142135
/// Gets or sets the maximum number of messages that will be passed to each function call. This only applies for functions that receive
143136
/// a batch of messages. The default value is 1000.
144137
/// </summary>
145138
public int MaxMessages { get; set; } = 1000;
146139

140+
/// <summary>
141+
/// Gets or sets the maximum amount of time to wait for a message to be received for the
142+
/// currently active session. After this time has elapsed, the processor will close the session
143+
/// and attempt to process another session.
144+
/// If not specified, the <see cref="ServiceBusRetryOptions.TryTimeout"/> will be used.
145+
/// </summary>
146+
public TimeSpan? SessionIdleTimeout { get; set; }
147+
147148
/// <summary>
148149
/// Formats the options as JSON objects for display.
149150
/// </summary>
@@ -171,7 +172,7 @@ public string Format()
171172
{ nameof(MaxConcurrentCalls), MaxConcurrentCalls },
172173
{ nameof(MaxConcurrentSessions), MaxConcurrentSessions },
173174
{ nameof(MaxMessages), MaxMessages },
174-
{ nameof(MaxWaitTime), MaxWaitTime.ToString() ?? string.Empty }
175+
{ nameof(SessionIdleTimeout), SessionIdleTimeout.ToString() ?? string.Empty }
175176
};
176177

177178
return options.ToString(Formatting.Indented);
@@ -199,7 +200,8 @@ internal ServiceBusSessionProcessorOptions ToSessionProcessorOptions() =>
199200
AutoCompleteMessages = AutoCompleteMessages,
200201
PrefetchCount = PrefetchCount,
201202
MaxAutoLockRenewalDuration = MaxAutoLockRenewalDuration,
202-
MaxConcurrentSessions = MaxConcurrentSessions
203+
MaxConcurrentSessions = MaxConcurrentSessions,
204+
SessionIdleTimeout = SessionIdleTimeout
203205
};
204206

205207
internal ServiceBusClientOptions ToClientOptions() =>

sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Listeners/ServiceBusListener.cs

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
using System.Collections.Generic;
66
using System.Diagnostics.CodeAnalysis;
77
using System.Linq;
8-
using System.Reflection;
98
using System.Threading;
109
using System.Threading.Tasks;
1110
using Azure.Core.Pipeline;
@@ -194,40 +193,37 @@ internal async Task ProcessMessageAsync(ProcessMessageEventArgs args)
194193
{
195194
using (CancellationTokenSource linkedCts = CancellationTokenSource.CreateLinkedTokenSource(args.CancellationToken, _cancellationTokenSource.Token))
196195
{
197-
//TODO consider using internals visible or exposing the Receiver property instead of reflection
198-
ServiceBusReceiver receiver = (ServiceBusReceiver) typeof(ProcessMessageEventArgs).GetField("_receiver", BindingFlags.NonPublic | BindingFlags.Instance).GetValue(args);
199-
200-
if (!await _messageProcessor.BeginProcessingMessageAsync(receiver, args.Message, linkedCts.Token).ConfigureAwait(false))
196+
var actions = new ServiceBusMessageActions(args);
197+
if (!await _messageProcessor.BeginProcessingMessageAsync(actions, args.Message, linkedCts.Token).ConfigureAwait(false))
201198
{
202199
return;
203200
}
204201

205202
ServiceBusTriggerInput input = ServiceBusTriggerInput.CreateSingle(args.Message);
206-
input.Receiver = receiver;
203+
input.MessageActions = actions;
207204

208205
TriggeredFunctionData data = input.GetTriggerFunctionData();
209206
FunctionResult result = await _triggerExecutor.TryExecuteAsync(data, linkedCts.Token).ConfigureAwait(false);
210-
await _messageProcessor.CompleteProcessingMessageAsync(receiver, args.Message, result, linkedCts.Token).ConfigureAwait(false);
207+
await _messageProcessor.CompleteProcessingMessageAsync(actions, args.Message, result, linkedCts.Token).ConfigureAwait(false);
211208
}
212209
}
213210

214211
internal async Task ProcessSessionMessageAsync(ProcessSessionMessageEventArgs args)
215212
{
216213
using (CancellationTokenSource linkedCts = CancellationTokenSource.CreateLinkedTokenSource(args.CancellationToken, _cancellationTokenSource.Token))
217214
{
218-
ServiceBusSessionReceiver receiver = (ServiceBusSessionReceiver)typeof(ProcessSessionMessageEventArgs).GetField("_sessionReceiver", BindingFlags.NonPublic | BindingFlags.Instance).GetValue(args);
219-
220-
if (!await _sessionMessageProcessor.BeginProcessingMessageAsync(receiver, args.Message, linkedCts.Token).ConfigureAwait(false))
215+
var actions = new ServiceBusSessionMessageActions(args);
216+
if (!await _sessionMessageProcessor.BeginProcessingMessageAsync(actions, args.Message, linkedCts.Token).ConfigureAwait(false))
221217
{
222218
return;
223219
}
224220

225221
ServiceBusTriggerInput input = ServiceBusTriggerInput.CreateSingle(args.Message);
226-
input.SessionReceiver = receiver;
222+
input.MessageActions = actions;
227223

228224
TriggeredFunctionData data = input.GetTriggerFunctionData();
229225
FunctionResult result = await _triggerExecutor.TryExecuteAsync(data, linkedCts.Token).ConfigureAwait(false);
230-
await _sessionMessageProcessor.CompleteProcessingMessageAsync(receiver, args.Message, result, linkedCts.Token).ConfigureAwait(false);
226+
await _sessionMessageProcessor.CompleteProcessingMessageAsync(actions, args.Message, result, linkedCts.Token).ConfigureAwait(false);
231227
}
232228
}
233229

@@ -273,21 +269,20 @@ internal void StartMessageBatchReceiver(CancellationToken cancellationToken)
273269
}
274270
}
275271

276-
IReadOnlyList<ServiceBusReceivedMessage> messages = await receiver.ReceiveMessagesAsync(
277-
_serviceBusOptions.MaxMessages,
278-
_serviceBusOptions.MaxWaitTime).ConfigureAwait(false);
272+
IReadOnlyList<ServiceBusReceivedMessage> messages =
273+
await receiver.ReceiveMessagesAsync(_serviceBusOptions.MaxMessages).ConfigureAwait(false);
279274

280275
if (messages != null)
281276
{
282277
ServiceBusReceivedMessage[] messagesArray = messages.ToArray();
283278
ServiceBusTriggerInput input = ServiceBusTriggerInput.CreateBatch(messagesArray);
284279
if (_isSessionsEnabled)
285280
{
286-
input.SessionReceiver = (ServiceBusSessionReceiver) receiver;
281+
input.MessageActions = new ServiceBusSessionMessageActions((ServiceBusSessionReceiver) receiver);
287282
}
288283
else
289284
{
290-
input.Receiver = receiver;
285+
input.MessageActions = new ServiceBusMessageActions(receiver);
291286
}
292287
FunctionResult result = await _triggerExecutor.TryExecuteAsync(input.GetTriggerFunctionData(), cancellationToken).ConfigureAwait(false);
293288

sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/MessageProcessor.cs renamed to sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Primitives/MessageProcessor.cs

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -10,37 +10,33 @@
1010
namespace Microsoft.Azure.WebJobs.ServiceBus
1111
{
1212
/// <summary>
13-
/// This class defines a strategy used for processing ServiceBus messages.
13+
/// This class defines a strategy used for processing Service Bus messages.
1414
/// </summary>
15-
/// <remarks>
16-
/// Custom <see cref="ServiceBus.MessageProcessor"/> implementations can be specified by implementing
17-
/// a custom <see cref="MessagingProvider"/> and setting it via ServiceBusOptions.MessagingProvider.
18-
/// </remarks>
1915
public class MessageProcessor
2016
{
2117
/// <summary>
22-
/// Constructs a new instance.
18+
/// Initializes a new instance of <see cref="MessageProcessor"/>.
2319
/// </summary>
24-
/// <param name="processor">The <see cref="Processor"/>.</param>
20+
/// <param name="processor">The <see cref="ServiceBusProcessor"/> to use.</param>
2521
public MessageProcessor(ServiceBusProcessor processor)
2622
{
2723
Processor = processor ?? throw new ArgumentNullException(nameof(processor));
2824
}
2925

3026
/// <summary>
31-
/// Gets or sets the <see cref="Processor"/> that will be used by the <see cref="Processor"/>.
27+
/// Gets or sets the <see cref="ServiceBusProcessor"/> that will be used by the <see cref="Processor"/>.
3228
/// </summary>
3329
internal ServiceBusProcessor Processor { get; set; }
3430

3531
/// <summary>
3632
/// This method is called when there is a new message to process, before the job function is invoked.
3733
/// This allows any preprocessing to take place on the message before processing begins.
3834
/// </summary>
39-
/// <param name="receiver"></param>
40-
/// <param name="message"></param>
41-
/// <param name="cancellationToken"></param>
35+
/// <param name="messageActions">The set of actions that can be performed on a <see cref="ServiceBusReceivedMessage"/>.</param>
36+
/// <param name="message">The <see cref="ServiceBusReceivedMessage"/> to process.</param>
37+
/// <param name="cancellationToken">A cancellation token that will be cancelled when the processor is shutting down.</param>
4238
/// <returns>A <see cref="Task"/> that returns true if the message processing should continue, false otherwise.</returns>
43-
public virtual Task<bool> BeginProcessingMessageAsync(ServiceBusReceiver receiver, ServiceBusReceivedMessage message, CancellationToken cancellationToken)
39+
public virtual Task<bool> BeginProcessingMessageAsync(ServiceBusMessageActions messageActions, ServiceBusReceivedMessage message, CancellationToken cancellationToken)
4440
{
4541
return Task.FromResult<bool>(true);
4642
}
@@ -53,12 +49,12 @@ public virtual Task<bool> BeginProcessingMessageAsync(ServiceBusReceiver receive
5349
/// is configured. E.g. if <see cref="ServiceBusProcessorOptions.AutoCompleteMessages"/> is false, it is up to the job function to complete
5450
/// the message.
5551
/// </remarks>
56-
/// <param name="receiver"></param>
57-
/// <param name="message"></param>
52+
/// <param name="messageActions">The set of actions that can be performed on a <see cref="ServiceBusReceivedMessage"/>.</param>
53+
/// <param name="message">The <see cref="ServiceBusReceivedMessage"/> to process.</param>
5854
/// <param name="result">The <see cref="FunctionResult"/> from the job invocation.</param>
59-
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to use</param>
55+
/// <param name="cancellationToken">A cancellation token that will be cancelled when the processor is shutting down.</param>
6056
/// <returns>A <see cref="Task"/> that will complete the message processing.</returns>
61-
public virtual Task CompleteProcessingMessageAsync(ServiceBusReceiver receiver, ServiceBusReceivedMessage message, FunctionResult result, CancellationToken cancellationToken)
57+
public virtual Task CompleteProcessingMessageAsync(ServiceBusMessageActions messageActions, ServiceBusReceivedMessage message, FunctionResult result, CancellationToken cancellationToken)
6258
{
6359
if (message is null)
6460
{

sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/MessagingProvider.cs renamed to sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Primitives/MessagingProvider.cs

File renamed without changes.

0 commit comments

Comments
 (0)