Skip to content

Commit f8ab748

Browse files
authored
Target base scale support for ServiceBus (Azure#31367)
1 parent e9bce61 commit f8ab748

File tree

9 files changed

+510
-210
lines changed

9 files changed

+510
-210
lines changed

sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Listeners/ServiceBusListener.cs

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,18 @@
99
using System.Threading.Tasks;
1010
using Azure.Core.Pipeline;
1111
using Azure.Messaging.ServiceBus;
12+
using Azure.Messaging.ServiceBus.Administration;
1213
using Azure.Messaging.ServiceBus.Diagnostics;
1314
using Microsoft.Azure.WebJobs.Extensions.ServiceBus.Config;
15+
using Microsoft.Azure.WebJobs.Extensions.ServiceBus.Listeners;
1416
using Microsoft.Azure.WebJobs.Host.Executors;
1517
using Microsoft.Azure.WebJobs.Host.Listeners;
1618
using Microsoft.Azure.WebJobs.Host.Scale;
1719
using Microsoft.Extensions.Logging;
1820

1921
namespace Microsoft.Azure.WebJobs.ServiceBus.Listeners
2022
{
21-
internal sealed class ServiceBusListener : IListener, IScaleMonitorProvider
23+
internal sealed class ServiceBusListener : IListener, IScaleMonitorProvider, ITargetScalerProvider
2224
{
2325
private readonly ITriggeredFunctionExecutor _triggerExecutor;
2426
private readonly string _entityPath;
@@ -34,6 +36,8 @@ internal sealed class ServiceBusListener : IListener, IScaleMonitorProvider
3436
private readonly Lazy<ServiceBusClient> _client;
3537
private readonly Lazy<SessionMessageProcessor> _sessionMessageProcessor;
3638
private readonly Lazy<ServiceBusScaleMonitor> _scaleMonitor;
39+
private readonly Lazy<ServiceBusTargetScaler> _targetScaler;
40+
private readonly Lazy<ServiceBusAdministrationClient> _administrationClient;
3741
private readonly ConcurrencyUpdateManager _concurrencyUpdateManager;
3842

3943
// internal for testing
@@ -72,8 +76,7 @@ public ServiceBusListener(
7276
_functionId = functionId;
7377

7478
_client = new Lazy<ServiceBusClient>(
75-
() =>
76-
clientFactory.CreateClientFromSetting(connection));
79+
() => clientFactory.CreateClientFromSetting(connection));
7780

7881
_batchReceiver = new Lazy<ServiceBusReceiver>(
7982
() => messagingProvider.CreateBatchMessageReceiver(
@@ -95,15 +98,31 @@ public ServiceBusListener(
9598
return messagingProvider.CreateSessionMessageProcessor(_client.Value,_entityPath, sessionProcessorOptions);
9699
});
97100

101+
_administrationClient = new Lazy<ServiceBusAdministrationClient>(
102+
() => clientFactory.CreateAdministrationClient(connection));
103+
98104
_scaleMonitor = new Lazy<ServiceBusScaleMonitor>(
99105
() => new ServiceBusScaleMonitor(
100106
functionId,
107+
_entityPath,
101108
entityType,
109+
_batchReceiver,
110+
_administrationClient,
111+
loggerFactory
112+
));
113+
114+
_targetScaler = new Lazy<ServiceBusTargetScaler>(
115+
() => new ServiceBusTargetScaler(
116+
functionId,
102117
_entityPath,
103-
connection,
118+
entityType,
104119
_batchReceiver,
105-
loggerFactory,
106-
clientFactory));
120+
_administrationClient,
121+
options,
122+
_isSessionsEnabled,
123+
_singleDispatch,
124+
loggerFactory
125+
));
107126

108127
_scopeFactory = new Lazy<EntityScopeFactory>(
109128
() => new EntityScopeFactory(_batchReceiver.Value.EntityPath, _batchReceiver.Value.FullyQualifiedNamespace));
@@ -536,6 +555,11 @@ public IScaleMonitor GetMonitor()
536555
return _scaleMonitor.Value;
537556
}
538557

558+
public ITargetScaler GetTargetScaler()
559+
{
560+
return _targetScaler.Value;
561+
}
562+
539563
/// <summary>
540564
/// Responsible for handling dynamic concurrency concurrency updates for message processors.
541565
/// </summary>
Lines changed: 239 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,239 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the MIT License. See License.txt in the project root for license information.
3+
4+
using Microsoft.Azure.WebJobs.ServiceBus.Listeners;
5+
using Microsoft.Azure.WebJobs.ServiceBus;
6+
using System;
7+
using System.Threading.Tasks;
8+
using Microsoft.Extensions.Logging;
9+
using Azure.Messaging.ServiceBus;
10+
using Azure.Messaging.ServiceBus.Administration;
11+
using Microsoft.Azure.WebJobs.Extensions.ServiceBus.Config;
12+
13+
namespace Microsoft.Azure.WebJobs.Extensions.ServiceBus.Listeners
14+
{
15+
internal class ServiceBusMetricsProvider
16+
{
17+
private const string DeadLetterQueuePath = @"/$DeadLetterQueue";
18+
19+
private readonly ILogger _logger;
20+
private readonly string _entityPath;
21+
private readonly ServiceBusEntityType _serviceBusEntityType;
22+
private readonly Lazy<ServiceBusReceiver> _receiver;
23+
private readonly bool _isListeningOnDeadLetterQueue;
24+
private readonly Lazy<ServiceBusAdministrationClient> _administrationClient;
25+
26+
private DateTime _nextWarningTime;
27+
28+
public ServiceBusMetricsProvider(
29+
string entityPath,
30+
ServiceBusEntityType serviceBusEntityType,
31+
Lazy<ServiceBusReceiver> receiver,
32+
Lazy<ServiceBusAdministrationClient> administrationClient,
33+
ILoggerFactory loggerFactory)
34+
{
35+
_serviceBusEntityType = serviceBusEntityType;
36+
_receiver = receiver;
37+
_entityPath = entityPath;
38+
_isListeningOnDeadLetterQueue = entityPath.EndsWith(DeadLetterQueuePath, StringComparison.OrdinalIgnoreCase);
39+
_administrationClient = administrationClient;
40+
_logger = loggerFactory.CreateLogger<ServiceBusMetricsProvider>();
41+
_nextWarningTime = DateTime.UtcNow;
42+
}
43+
44+
public async Task<long> GetMessageCountAsync()
45+
{
46+
long activeMessageCount = 0;
47+
long deadLetterCount = 0;
48+
string entityName = _serviceBusEntityType == ServiceBusEntityType.Queue ? "queue" : "topic";
49+
try
50+
{
51+
if (_serviceBusEntityType == ServiceBusEntityType.Queue)
52+
{
53+
QueueRuntimeProperties queueRuntimeProperties = await _administrationClient.Value.GetQueueRuntimePropertiesAsync(_entityPath).ConfigureAwait(false);
54+
activeMessageCount = queueRuntimeProperties.ActiveMessageCount;
55+
deadLetterCount = queueRuntimeProperties.DeadLetterMessageCount;
56+
}
57+
else
58+
{
59+
ServiceBusEntityPathHelper.ParseTopicAndSubscription(_entityPath, out string topicPath, out string subscriptionPath);
60+
61+
SubscriptionRuntimeProperties subscriptionProperties = await _administrationClient.Value.GetSubscriptionRuntimePropertiesAsync(topicPath, subscriptionPath).ConfigureAwait(false);
62+
activeMessageCount = subscriptionProperties.ActiveMessageCount;
63+
deadLetterCount = subscriptionProperties.DeadLetterMessageCount;
64+
}
65+
}
66+
catch (ServiceBusException ex)
67+
when (ex.Reason == ServiceBusFailureReason.MessagingEntityNotFound)
68+
{
69+
_logger.LogWarning($"ServiceBus {entityName} '{_entityPath}' was not found.");
70+
}
71+
catch (UnauthorizedAccessException ex)
72+
{
73+
if (TimeToLogWarning())
74+
{
75+
_logger.LogWarning($"Connection string does not have 'Manage Claim' for {entityName} '{_entityPath}'. Unable to determine active message count.", ex);
76+
}
77+
throw ex;
78+
}
79+
catch (Exception e)
80+
{
81+
_logger.LogWarning($"Error querying for Service Bus {entityName} scale status: {e.Message}");
82+
}
83+
84+
long totalNewMessageCount = 0;
85+
if ((!_isListeningOnDeadLetterQueue && activeMessageCount > 0) || (_isListeningOnDeadLetterQueue && deadLetterCount > 0))
86+
{
87+
totalNewMessageCount = _isListeningOnDeadLetterQueue ? deadLetterCount : activeMessageCount;
88+
}
89+
90+
return totalNewMessageCount;
91+
}
92+
93+
public async Task<ServiceBusTriggerMetrics> GetMetricsAsync()
94+
{
95+
ServiceBusReceivedMessage activeMessage = null;
96+
string entityName = _serviceBusEntityType == ServiceBusEntityType.Queue ? "queue" : "topic";
97+
98+
try
99+
{
100+
// Do a first attempt to peek one message from the head of the queue
101+
var peekedMessage = await _receiver.Value.PeekMessageAsync(fromSequenceNumber: 0).ConfigureAwait(false);
102+
if (peekedMessage == null)
103+
{
104+
// ignore it. The Get[Queue|Topic]MetricsAsync methods deal with activeMessage being null
105+
}
106+
else if (peekedMessage.State == ServiceBusMessageState.Active)
107+
{
108+
activeMessage = peekedMessage;
109+
}
110+
else
111+
{
112+
// Do another attempt to peek ten message from last peek sequence number
113+
var peekedMessages = await _receiver.Value.PeekMessagesAsync(10, fromSequenceNumber: peekedMessage.SequenceNumber).ConfigureAwait(false);
114+
foreach (var receivedMessage in peekedMessages)
115+
{
116+
if (receivedMessage.State == ServiceBusMessageState.Active)
117+
{
118+
activeMessage = receivedMessage;
119+
break;
120+
}
121+
}
122+
123+
// Batch contains messages but none are active in the peeked batch
124+
if (peekedMessages.Count > 0 && activeMessage == null)
125+
{
126+
_logger.LogDebug("{_serviceBusEntityType} {_entityPath} contains multiple messages but none are active in the peeked batch.");
127+
}
128+
}
129+
130+
if (_serviceBusEntityType == ServiceBusEntityType.Queue)
131+
{
132+
return await GetQueueMetricsAsync(activeMessage).ConfigureAwait(false);
133+
}
134+
else
135+
{
136+
return await GetTopicMetricsAsync(activeMessage).ConfigureAwait(false);
137+
}
138+
}
139+
catch (ServiceBusException ex)
140+
when (ex.Reason == ServiceBusFailureReason.MessagingEntityNotFound)
141+
{
142+
_logger.LogWarning($"ServiceBus {entityName} '{_entityPath}' was not found.");
143+
}
144+
catch (UnauthorizedAccessException) // When manage claim is not used on Service Bus connection string
145+
{
146+
if (TimeToLogWarning())
147+
{
148+
_logger.LogWarning($"Connection string does not have Manage claim for {entityName} '{_entityPath}'. Failed to get {entityName} description to " +
149+
$"derive {entityName} length metrics. Falling back to using first message enqueued time.");
150+
}
151+
}
152+
catch (Exception e)
153+
{
154+
_logger.LogWarning($"Error querying for Service Bus {entityName} scale status: {e.Message}");
155+
}
156+
157+
// Path for connection strings with no manage claim
158+
return CreateTriggerMetrics(activeMessage, 0, 0, 0, _isListeningOnDeadLetterQueue);
159+
}
160+
161+
private async Task<ServiceBusTriggerMetrics> GetQueueMetricsAsync(ServiceBusReceivedMessage message)
162+
{
163+
QueueRuntimeProperties queueRuntimeProperties;
164+
QueueProperties queueProperties;
165+
long activeMessageCount = 0;
166+
long deadLetterCount = 0;
167+
int partitionCount = 0;
168+
169+
queueRuntimeProperties = await _administrationClient.Value.GetQueueRuntimePropertiesAsync(_entityPath).ConfigureAwait(false);
170+
activeMessageCount = queueRuntimeProperties.ActiveMessageCount;
171+
deadLetterCount = queueRuntimeProperties.DeadLetterMessageCount;
172+
173+
// If partitioning is turned on, then Service Bus automatically partitions queues into 16 partitions
174+
// See more information here: https://docs.microsoft.com/azure/service-bus-messaging/service-bus-partitioning#standard
175+
queueProperties = await _administrationClient.Value.GetQueueAsync(_entityPath).ConfigureAwait(false);
176+
partitionCount = queueProperties.EnablePartitioning ? 16 : 0;
177+
178+
return CreateTriggerMetrics(message, activeMessageCount, deadLetterCount, partitionCount, _isListeningOnDeadLetterQueue);
179+
}
180+
181+
private async Task<ServiceBusTriggerMetrics> GetTopicMetricsAsync(ServiceBusReceivedMessage message)
182+
{
183+
TopicProperties topicProperties;
184+
SubscriptionRuntimeProperties subscriptionProperties;
185+
string topicPath, subscriptionPath;
186+
long activeMessageCount = 0;
187+
long deadLetterCount = 0;
188+
int partitionCount = 0;
189+
190+
ServiceBusEntityPathHelper.ParseTopicAndSubscription(_entityPath, out topicPath, out subscriptionPath);
191+
192+
subscriptionProperties = await _administrationClient.Value.GetSubscriptionRuntimePropertiesAsync(topicPath, subscriptionPath).ConfigureAwait(false);
193+
activeMessageCount = subscriptionProperties.ActiveMessageCount;
194+
deadLetterCount = subscriptionProperties.DeadLetterMessageCount;
195+
196+
// If partitioning is turned on, then Service Bus automatically partitions queues into 16 partitions
197+
// See more information here: https://docs.microsoft.com/azure/service-bus-messaging/service-bus-partitioning#standard
198+
topicProperties = await _administrationClient.Value.GetTopicAsync(topicPath).ConfigureAwait(false);
199+
partitionCount = topicProperties.EnablePartitioning ? 16 : 0;
200+
201+
return CreateTriggerMetrics(message, activeMessageCount, deadLetterCount, partitionCount, _isListeningOnDeadLetterQueue);
202+
}
203+
204+
private bool TimeToLogWarning()
205+
{
206+
DateTime currentTime = DateTime.UtcNow;
207+
bool timeToLog = currentTime >= _nextWarningTime;
208+
if (timeToLog)
209+
{
210+
_nextWarningTime = currentTime.AddHours(1);
211+
}
212+
return timeToLog;
213+
}
214+
215+
internal static ServiceBusTriggerMetrics CreateTriggerMetrics(ServiceBusReceivedMessage message, long activeMessageCount, long deadLetterCount, int partitionCount, bool isListeningOnDeadLetterQueue)
216+
{
217+
long totalNewMessageCount = 0;
218+
TimeSpan queueTime = TimeSpan.Zero;
219+
220+
if (message != null)
221+
{
222+
queueTime = DateTimeOffset.UtcNow.Subtract(message.EnqueuedTime);
223+
totalNewMessageCount = 1; // There's at least one if message != null. Default for connection string with no manage claim
224+
}
225+
226+
if ((!isListeningOnDeadLetterQueue && activeMessageCount > 0) || (isListeningOnDeadLetterQueue && deadLetterCount > 0))
227+
{
228+
totalNewMessageCount = isListeningOnDeadLetterQueue ? deadLetterCount : activeMessageCount;
229+
}
230+
231+
return new ServiceBusTriggerMetrics
232+
{
233+
MessageCount = totalNewMessageCount,
234+
PartitionCount = partitionCount,
235+
QueueTime = queueTime
236+
};
237+
}
238+
}
239+
}

0 commit comments

Comments
 (0)