Skip to content

Commit a7a784f

Browse files
Allow a callback for Session[Initializing|Closing]Async (Azure#29945)
* Allow a callback for Session[Initializing|Closing]Async Closes Azure#29917 * Remove remarks * Defensive * Approve API * Change log * Make things private and sneakishly hide a problem even the reviewers didn't spot * Properly use the session * Reduce to a single test
1 parent c5a793f commit a7a784f

File tree

6 files changed

+123
-2
lines changed

6 files changed

+123
-2
lines changed

sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
### Features Added
66

7+
- Added ability to register a callback for ` SessionInitializingAsync` and `SessionClosingAsync` to the `ServiceBusOptions`. _(A community contribution, courtesy of [danielmarbach](https://github.com/danielmarbach))_
8+
79
### Breaking Changes
810

911
### Bugs Fixed

sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/api/Microsoft.Azure.WebJobs.Extensions.ServiceBus.netstandard2.0.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,9 @@ public ServiceBusOptions() { }
8383
public int MaxMessageBatchSize { get { throw null; } set { } }
8484
public int PrefetchCount { get { throw null; } set { } }
8585
public System.Func<Azure.Messaging.ServiceBus.ProcessErrorEventArgs, System.Threading.Tasks.Task> ProcessErrorAsync { get { throw null; } set { } }
86+
public System.Func<Azure.Messaging.ServiceBus.ProcessSessionEventArgs, System.Threading.Tasks.Task> SessionClosingAsync { get { throw null; } set { } }
8687
public System.TimeSpan? SessionIdleTimeout { get { throw null; } set { } }
88+
public System.Func<Azure.Messaging.ServiceBus.ProcessSessionEventArgs, System.Threading.Tasks.Task> SessionInitializingAsync { get { throw null; } set { } }
8789
public Azure.Messaging.ServiceBus.ServiceBusTransportType TransportType { get { throw null; } set { } }
8890
public System.Net.IWebProxy WebProxy { get { throw null; } set { } }
8991
string Microsoft.Azure.WebJobs.Hosting.IOptionsFormatter.Format() { throw null; }

sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Config/ServiceBusExtensionConfigProvider.cs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,16 @@ public void Initialize(ExtensionConfigContext context)
8181
LogExceptionReceivedEvent(e, _loggerFactory);
8282
return Task.CompletedTask;
8383
};
84+
Options.SessionInitializingAsync ??= (e) =>
85+
{
86+
LogSessionInitializingEvent(e, _loggerFactory);
87+
return Task.CompletedTask;
88+
};
89+
Options.SessionClosingAsync ??= (e) =>
90+
{
91+
LogSessionClosingEvent(e, _loggerFactory);
92+
return Task.CompletedTask;
93+
};
8494

8595
context
8696
.AddConverter(new MessageToStringConverter())
@@ -132,5 +142,33 @@ private static LogLevel GetLogLevel(Exception ex)
132142
return LogLevel.Information;
133143
}
134144
}
145+
146+
internal static void LogSessionInitializingEvent(ProcessSessionEventArgs e, ILoggerFactory loggerFactory)
147+
{
148+
try
149+
{
150+
var logger = loggerFactory?.CreateLogger<ServiceBusListener>();
151+
string message = $"Session initializing (SessionId={e.SessionId}, SessionLockedUntil={e.SessionLockedUntil})";
152+
logger?.LogInformation(0, message);
153+
}
154+
catch (Exception)
155+
{
156+
// best effort logging
157+
}
158+
}
159+
160+
internal static void LogSessionClosingEvent(ProcessSessionEventArgs e, ILoggerFactory loggerFactory)
161+
{
162+
try
163+
{
164+
var logger = loggerFactory?.CreateLogger<ServiceBusListener>();
165+
string message = $"Session closing (SessionId={e.SessionId}, SessionLockedUntil={e.SessionLockedUntil})";
166+
logger?.LogInformation(0, message);
167+
}
168+
catch (Exception)
169+
{
170+
// best effort logging
171+
}
172+
}
135173
}
136174
}

sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Config/ServiceBusOptions.cs

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,16 @@ public int MaxConcurrentSessions
130130
/// </summary>
131131
public Func<ProcessErrorEventArgs, Task> ProcessErrorAsync { get; set; }
132132

133+
/// <summary>
134+
/// Optional handler that can be set to be notified when a new session is about to be processed.
135+
/// </summary>
136+
public Func<ProcessSessionEventArgs, Task> SessionInitializingAsync { get; set; }
137+
138+
/// <summary>
139+
/// Optional handler that can be set to be notified when a session is about to be closed for processing.
140+
/// </summary>
141+
public Func<ProcessSessionEventArgs, Task> SessionClosingAsync { get; set; }
142+
133143
/// <summary>
134144
/// Gets or sets the maximum number of messages that will be passed to each function call. This only applies for functions that receive
135145
/// a batch of messages. The default value is 1000.
@@ -200,9 +210,28 @@ string IOptionsFormatter.Format()
200210

201211
internal async Task ExceptionReceivedHandler(ProcessErrorEventArgs args)
202212
{
203-
if (ProcessErrorAsync != null)
213+
var processErrorAsync = ProcessErrorAsync;
214+
if (processErrorAsync != null)
215+
{
216+
await processErrorAsync(args).ConfigureAwait(false);
217+
}
218+
}
219+
220+
internal async Task SessionInitializingHandler(ProcessSessionEventArgs args)
221+
{
222+
var sessionInitializingAsync = SessionInitializingAsync;
223+
if (sessionInitializingAsync != null)
224+
{
225+
await sessionInitializingAsync(args).ConfigureAwait(false);
226+
}
227+
}
228+
229+
internal async Task SessionClosingHandler(ProcessSessionEventArgs args)
230+
{
231+
var sessionClosingAsync = SessionClosingAsync;
232+
if (sessionClosingAsync != null)
204233
{
205-
await ProcessErrorAsync(args).ConfigureAwait(false);
234+
await sessionClosingAsync(args).ConfigureAwait(false);
206235
}
207236
}
208237

sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Primitives/MessagingProvider.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,8 @@ protected internal virtual ServiceBusSessionProcessor CreateSessionProcessor(Ser
208208
processor = client.CreateSessionProcessor(entityPath, options);
209209
}
210210
processor.ProcessErrorAsync += Options.ExceptionReceivedHandler;
211+
processor.SessionInitializingAsync += Options.SessionInitializingHandler;
212+
processor.SessionClosingAsync += Options.SessionClosingHandler;
211213
return processor;
212214
}
213215
}

sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/ServiceBusSessionsEndToEndTests.cs

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -574,6 +574,30 @@ public async Task TestSingle_ReceiveFromFunction()
574574
}
575575
}
576576

577+
[Test]
578+
public async Task TestSingle_CustomSessionHandlers()
579+
{
580+
await WriteQueueMessage("{'Name': 'Test1', 'Value': 'Value'}", "session1");
581+
var host = BuildHost<TestCustomSessionHandlers>(SetCustomSessionHandlers);
582+
using (host)
583+
{
584+
bool result1 = _waitHandle1.WaitOne(SBTimeoutMills);
585+
bool result2 = _waitHandle2.WaitOne(SBTimeoutMills);
586+
587+
Assert.True(result1);
588+
Assert.True(result2);
589+
await host.StopAsync();
590+
}
591+
}
592+
593+
private static Action<IHostBuilder> SetCustomSessionHandlers =>
594+
builder => builder.ConfigureWebJobs(b =>
595+
b.AddServiceBus(sbOptions =>
596+
{
597+
sbOptions.SessionInitializingAsync = TestCustomSessionHandlers.SessionInitializingHandler;
598+
sbOptions.SessionClosingAsync = TestCustomSessionHandlers.SessionClosingHandler;
599+
}));
600+
577601
[Test]
578602
public async Task TestBatch_ReceiveFromFunction()
579603
{
@@ -1106,6 +1130,30 @@ public static async Task RunAsync(
11061130
}
11071131
}
11081132

1133+
public class TestCustomSessionHandlers
1134+
{
1135+
public static async Task RunAsync(
1136+
[ServiceBusTrigger(FirstQueueNameKey, IsSessionsEnabled = true)]
1137+
ServiceBusReceivedMessage message,
1138+
ServiceBusSessionMessageActions sessionActions)
1139+
{
1140+
await sessionActions.CompleteMessageAsync(message);
1141+
sessionActions.ReleaseSession();
1142+
}
1143+
1144+
public static Task SessionInitializingHandler(ProcessSessionEventArgs arg)
1145+
{
1146+
_waitHandle1.Set();
1147+
return Task.CompletedTask;
1148+
}
1149+
1150+
public static Task SessionClosingHandler(ProcessSessionEventArgs arg)
1151+
{
1152+
_waitHandle2.Set();
1153+
return Task.CompletedTask;
1154+
}
1155+
}
1156+
11091157
public class TestReceiveFromFunction_Batch
11101158
{
11111159
public static ServiceBusReceiveActions ReceiveActions { get; private set; }

0 commit comments

Comments
 (0)