44using System ;
55using System . Collections . Generic ;
66using System . Diagnostics ;
7- using System . Globalization ;
87using System . Linq ;
98using System . Runtime . ExceptionServices ;
109using System . Threading ;
2524
2625namespace Microsoft . Azure . WebJobs . Extensions . Storage . Common . Listeners
2726{
28- internal sealed partial class QueueListener : IListener , ITaskSeriesCommand , INotificationCommand , IScaleMonitor < QueueTriggerMetrics >
27+ internal sealed partial class QueueListener : IListener , ITaskSeriesCommand , INotificationCommand , ITargetScalerProvider , IScaleMonitorProvider
2928 {
30- private const int NumberOfSamplesToConsider = 5 ;
31-
3229 private readonly ITaskSeriesTimer _timer ;
3330 private readonly IDelayStrategy _delayStrategy ;
3431 private readonly QueueClient _queue ;
@@ -44,8 +41,9 @@ internal sealed partial class QueueListener : IListener, ITaskSeriesCommand, INo
4441 private readonly ILogger < QueueListener > _logger ;
4542 private readonly FunctionDescriptor _functionDescriptor ;
4643 private readonly string _functionId ;
47- private readonly ScaleMonitorDescriptor _scaleMonitorDescriptor ;
4844 private readonly CancellationTokenSource _shutdownCancellationTokenSource ;
45+ private readonly Lazy < QueueTargetScaler > _targetScaler ;
46+ private readonly Lazy < QueueScaleMonitor > _scaleMonitor ;
4947
5048 private bool ? _queueExists ;
5149 private bool _foundMessageSinceLastDelay ;
@@ -57,6 +55,8 @@ internal sealed partial class QueueListener : IListener, ITaskSeriesCommand, INo
5755 // for mock testing only
5856 internal QueueListener ( )
5957 {
58+ _scaleMonitor = new Lazy < QueueScaleMonitor > ( ( ) => new QueueScaleMonitor ( ) ) ;
59+ _targetScaler = new Lazy < QueueTargetScaler > ( ( ) => new QueueTargetScaler ( ) ) ;
6060 }
6161
6262 public QueueListener ( QueueClient queue ,
@@ -130,10 +130,19 @@ public QueueListener(QueueClient queue,
130130
131131 _delayStrategy = new RandomizedExponentialBackoffStrategy ( QueuePollingIntervals . Minimum , maximumInterval ) ;
132132
133- _scaleMonitorDescriptor = new ScaleMonitorDescriptor ( $ "{ _functionId } -QueueTrigger-{ _queue . Name } ". ToLower ( CultureInfo . InvariantCulture ) , _functionId ) ;
134133 _shutdownCancellationTokenSource = new CancellationTokenSource ( ) ;
135134
136135 _concurrencyManager = concurrencyManager ;
136+
137+ _targetScaler = new Lazy < QueueTargetScaler > (
138+ ( ) => new QueueTargetScaler (
139+ _functionId ,
140+ queue ,
141+ queueOptions ,
142+ loggerFactory
143+ ) ) ;
144+
145+ _scaleMonitor = new Lazy < QueueScaleMonitor > ( ( ) => new QueueScaleMonitor ( _functionId , _queue , loggerFactory ) ) ;
137146 }
138147
139148 // for testing
@@ -444,185 +453,14 @@ internal static void RegisterSharedWatcherWithQueueProcessor(QueueProcessor queu
444453 }
445454 }
446455
447- public ScaleMonitorDescriptor Descriptor
448- {
449- get
450- {
451- return _scaleMonitorDescriptor ;
452- }
453- }
454-
455- async Task < ScaleMetrics > IScaleMonitor . GetMetricsAsync ( )
456- {
457- return await GetMetricsAsync ( ) . ConfigureAwait ( false ) ;
458- }
459-
460- public async Task < QueueTriggerMetrics > GetMetricsAsync ( )
461- {
462- int queueLength = 0 ;
463- TimeSpan queueTime = TimeSpan . Zero ;
464-
465- try
466- {
467- QueueProperties queueProperties = await _queue . GetPropertiesAsync ( ) . ConfigureAwait ( false ) ;
468- queueLength = queueProperties . ApproximateMessagesCount ;
469-
470- if ( queueLength > 0 )
471- {
472- PeekedMessage message = ( await _queue . PeekMessagesAsync ( 1 ) . ConfigureAwait ( false ) ) . Value . FirstOrDefault ( ) ;
473- if ( message != null )
474- {
475- if ( message . InsertedOn . HasValue )
476- {
477- queueTime = DateTime . UtcNow . Subtract ( message . InsertedOn . Value . DateTime ) ;
478- }
479- }
480- else
481- {
482- // ApproximateMessageCount often returns a stale value,
483- // especially when the queue is empty.
484- queueLength = 0 ;
485- }
486- }
487- }
488- catch ( RequestFailedException ex )
489- {
490- if ( ex . IsNotFoundQueueNotFound ( ) ||
491- ex . IsConflictQueueBeingDeletedOrDisabled ( ) ||
492- ex . IsServerSideError ( ) )
493- {
494- // ignore transient errors, and return default metrics
495- // E.g. if the queue doesn't exist, we'll return a zero queue length
496- // and scale in
497- _logger . LogWarning ( $ "Error querying for queue scale status: { ex . Message } ") ;
498- }
499- }
500-
501- return new QueueTriggerMetrics
502- {
503- QueueLength = queueLength ,
504- QueueTime = queueTime ,
505- Timestamp = DateTime . UtcNow
506- } ;
507- }
508-
509- ScaleStatus IScaleMonitor . GetScaleStatus ( ScaleStatusContext context )
510- {
511- return GetScaleStatusCore ( context . WorkerCount , context . Metrics ? . Cast < QueueTriggerMetrics > ( ) . ToArray ( ) ) ;
512- }
513-
514- public ScaleStatus GetScaleStatus ( ScaleStatusContext < QueueTriggerMetrics > context )
456+ public ITargetScaler GetTargetScaler ( )
515457 {
516- return GetScaleStatusCore ( context . WorkerCount , context . Metrics ? . ToArray ( ) ) ;
458+ return _targetScaler . Value ;
517459 }
518460
519- private ScaleStatus GetScaleStatusCore ( int workerCount , QueueTriggerMetrics [ ] metrics )
461+ public IScaleMonitor GetMonitor ( )
520462 {
521- ScaleStatus status = new ScaleStatus
522- {
523- Vote = ScaleVote . None
524- } ;
525-
526- // verify we have enough samples to make a scale decision.
527- if ( metrics == null || ( metrics . Length < NumberOfSamplesToConsider ) )
528- {
529- return status ;
530- }
531-
532- // Maintain a minimum ratio of 1 worker per 1,000 queue messages.
533- long latestQueueLength = metrics . Last ( ) . QueueLength ;
534- if ( latestQueueLength > workerCount * 1000 )
535- {
536- status . Vote = ScaleVote . ScaleOut ;
537- _logger . LogInformation ( $ "QueueLength ({ latestQueueLength } ) > workerCount ({ workerCount } ) * 1,000") ;
538- _logger . LogInformation ( $ "Length of queue ({ _queue . Name } , { latestQueueLength } ) is too high relative to the number of instances ({ workerCount } ).") ;
539- return status ;
540- }
541-
542- // Check to see if the queue has been empty for a while.
543- bool queueIsIdle = metrics . All ( p => p . QueueLength == 0 ) ;
544- if ( queueIsIdle )
545- {
546- status . Vote = ScaleVote . ScaleIn ;
547- _logger . LogInformation ( $ "Queue '{ _queue . Name } ' is idle") ;
548- return status ;
549- }
550-
551- // Samples are in chronological order. Check for a continuous increase in time or length.
552- // If detected, this results in an automatic scale out.
553- if ( metrics [ 0 ] . QueueLength > 0 )
554- {
555- bool queueLengthIncreasing =
556- IsTrueForLastN (
557- metrics ,
558- NumberOfSamplesToConsider ,
559- ( prev , next ) => prev . QueueLength < next . QueueLength ) ;
560- if ( queueLengthIncreasing )
561- {
562- status . Vote = ScaleVote . ScaleOut ;
563- _logger . LogInformation ( $ "Queue length is increasing for '{ _queue . Name } '") ;
564- return status ;
565- }
566- }
567-
568- if ( metrics [ 0 ] . QueueTime > TimeSpan . Zero && metrics [ 0 ] . QueueTime < metrics [ NumberOfSamplesToConsider - 1 ] . QueueTime )
569- {
570- bool queueTimeIncreasing =
571- IsTrueForLastN (
572- metrics ,
573- NumberOfSamplesToConsider ,
574- ( prev , next ) => prev . QueueTime <= next . QueueTime ) ;
575- if ( queueTimeIncreasing )
576- {
577- status . Vote = ScaleVote . ScaleOut ;
578- _logger . LogInformation ( $ "Queue time is increasing for '{ _queue . Name } '") ;
579- return status ;
580- }
581- }
582-
583- bool queueLengthDecreasing =
584- IsTrueForLastN (
585- metrics ,
586- NumberOfSamplesToConsider ,
587- ( prev , next ) => prev . QueueLength > next . QueueLength ) ;
588- if ( queueLengthDecreasing )
589- {
590- status . Vote = ScaleVote . ScaleIn ;
591- _logger . LogInformation ( $ "Queue length is decreasing for '{ _queue . Name } '") ;
592- return status ;
593- }
594-
595- bool queueTimeDecreasing = IsTrueForLastN (
596- metrics ,
597- NumberOfSamplesToConsider ,
598- ( prev , next ) => prev . QueueTime > next . QueueTime ) ;
599- if ( queueTimeDecreasing )
600- {
601- status . Vote = ScaleVote . ScaleIn ;
602- _logger . LogInformation ( $ "Queue time is decreasing for '{ _queue . Name } '") ;
603- return status ;
604- }
605-
606- _logger . LogInformation ( $ "Queue '{ _queue . Name } ' is steady") ;
607-
608- return status ;
609- }
610-
611- private static bool IsTrueForLastN ( IList < QueueTriggerMetrics > samples , int count , Func < QueueTriggerMetrics , QueueTriggerMetrics , bool > predicate )
612- {
613- Debug . Assert ( count > 1 , "count must be greater than 1." ) ;
614- Debug . Assert ( count <= samples . Count , "count must be less than or equal to the list size." ) ;
615-
616- // Walks through the list from left to right starting at len(samples) - count.
617- for ( int i = samples . Count - count ; i < samples . Count - 1 ; i ++ )
618- {
619- if ( ! predicate ( samples [ i ] , samples [ i + 1 ] ) )
620- {
621- return false ;
622- }
623- }
624-
625- return true ;
463+ return _scaleMonitor . Value ;
626464 }
627465 }
628466}
0 commit comments