Skip to content

Commit 8155a11

Browse files
Batch receive fixes (Azure#21566)
* Batch receive fixes * Use AwaitWithCancellation * volatile * Remove unnecessary catch * Fix tests * Fix flaky tests * Fix test
1 parent 012db17 commit 8155a11

File tree

7 files changed

+330
-231
lines changed

7 files changed

+330
-231
lines changed

sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Listeners/ServiceBusListener.cs

Lines changed: 157 additions & 142 deletions
Large diffs are not rendered by default.

sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Microsoft.Azure.WebJobs.Extensions.ServiceBus.csproj

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@
1414
<PackageReference Include="Microsoft.Azure.WebJobs.Sources" />
1515
<PackageReference Include="Microsoft.Azure.WebJobs" />
1616
<PackageReference Include="Microsoft.Extensions.Azure" />
17-
<PackageReference Include="Azure.Messaging.ServiceBus" />
17+
<!--Revert after Service Bus release-->
18+
<!--<PackageReference Include="Azure.Messaging.ServiceBus" />-->
1819
</ItemGroup>
1920

2021
<ItemGroup>
@@ -28,4 +29,9 @@
2829
<Compile Include="..\..\..\extensions\Microsoft.Azure.WebJobs.Extensions.Clients\src\Shared\WebJobsConfigurationExtensions.cs" LinkBase="Shared" />
2930

3031
</ItemGroup>
32+
33+
<ItemGroup>
34+
<!--Revert after Service Bus release-->
35+
<ProjectReference Include="..\..\Azure.Messaging.ServiceBus\src\Azure.Messaging.ServiceBus.csproj" />
36+
</ItemGroup>
3137
</Project>

sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Primitives/ServiceBusMessageActions.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ public class ServiceBusMessageActions
1717
private readonly ProcessMessageEventArgs _eventArgs;
1818
private readonly ProcessSessionMessageEventArgs _sessionEventArgs;
1919

20+
internal HashSet<ServiceBusReceivedMessage> SettledMessages { get; } = new();
21+
2022
internal ServiceBusMessageActions(ProcessSessionMessageEventArgs sessionEventArgs)
2123
{
2224
_sessionEventArgs = sessionEventArgs;
@@ -50,6 +52,8 @@ public virtual async Task AbandonMessageAsync(
5052
{
5153
await _sessionEventArgs.AbandonMessageAsync(message, propertiesToModify, cancellationToken).ConfigureAwait(false);
5254
}
55+
56+
SettledMessages.Add(message);
5357
}
5458

5559
///<inheritdoc cref="ServiceBusReceiver.CompleteMessageAsync(ServiceBusReceivedMessage, CancellationToken)"/>
@@ -69,6 +73,8 @@ public virtual async Task CompleteMessageAsync(
6973
{
7074
await _sessionEventArgs.CompleteMessageAsync(message, cancellationToken).ConfigureAwait(false);
7175
}
76+
77+
SettledMessages.Add(message);
7278
}
7379

7480
///<inheritdoc cref="ServiceBusReceiver.DeadLetterMessageAsync(ServiceBusReceivedMessage, string, string, CancellationToken)"/>
@@ -105,6 +111,8 @@ await _sessionEventArgs.DeadLetterMessageAsync(
105111
cancellationToken)
106112
.ConfigureAwait(false);
107113
}
114+
115+
SettledMessages.Add(message);
108116
}
109117

110118
///<inheritdoc cref="ServiceBusReceiver.DeadLetterMessageAsync(ServiceBusReceivedMessage, IDictionary{string, object}, CancellationToken)"/>
@@ -137,6 +145,8 @@ await _sessionEventArgs.DeadLetterMessageAsync(
137145
cancellationToken)
138146
.ConfigureAwait(false);
139147
}
148+
149+
SettledMessages.Add(message);
140150
}
141151

142152
///<inheritdoc cref="ServiceBusReceiver.DeferMessageAsync(ServiceBusReceivedMessage, IDictionary{string, object}, CancellationToken)"/>
@@ -169,6 +179,8 @@ await _sessionEventArgs.DeferMessageAsync(
169179
cancellationToken)
170180
.ConfigureAwait(false);
171181
}
182+
183+
SettledMessages.Add(message);
172184
}
173185
}
174186
}

sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Triggers/ServiceBusTriggerAttributeBindingProvider.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ private bool GetAutoCompleteMessagesOptionToUse(ServiceBusTriggerAttribute attri
101101
{
102102
if (attribute.IsAutoCompleteMessagesOptionSet)
103103
{
104-
_logger.LogInformation($"The 'AutoCompleteMessages' option has been overrriden to '{attribute.AutoCompleteMessages}' value for '{functionName}' function.");
104+
_logger.LogInformation($"The 'AutoCompleteMessages' option has been overriden to '{attribute.AutoCompleteMessages}' value for '{functionName}' function.");
105105

106106
return attribute.AutoCompleteMessages;
107107
}

0 commit comments

Comments
 (0)