Skip to content

Commit fdfd44a

Browse files
authored
Add configuration options to tune SearchIndexingBufferedSender<T>. (Azure#16695)
Add configuration options to tune SearchIndexingBufferedSender<T> Also adds better behavior around swallowing notification errors and fixes Azure#16291 to make tests less flaky.
1 parent fdf5650 commit fdfd44a

19 files changed

+25964
-53
lines changed

sdk/search/Azure.Search.Documents/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
### Added
1111

1212
- Added `EncryptionKey` to `SearchIndexer`, `SearchIndexerDataSourceConnection`, and `SearchIndexerSkillset`.
13+
- Add configuration options to tune the performance of `SearchIndexingBufferedSender<T>`.
1314

1415
## 11.2.0-beta.1 (2020-10-09)
1516

sdk/search/Azure.Search.Documents/api/Azure.Search.Documents.netstandard2.0.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,11 @@ public SearchIndexingBufferedSenderOptions() { }
8080
public bool AutoFlush { get { throw null; } set { } }
8181
public System.TimeSpan? AutoFlushInterval { get { throw null; } set { } }
8282
public System.Threading.CancellationToken FlushCancellationToken { get { throw null; } set { } }
83+
public int? InitialBatchActionCount { get { throw null; } set { } }
8384
public System.Func<T, string> KeyFieldAccessor { get { throw null; } set { } }
85+
public int MaxRetries { get { throw null; } set { } }
86+
public System.TimeSpan MaxRetryDelay { get { throw null; } set { } }
87+
public System.TimeSpan RetryDelay { get { throw null; } set { } }
8488
}
8589
public partial class SearchIndexingBufferedSender<T> : System.IAsyncDisposable, System.IDisposable
8690
{

sdk/search/Azure.Search.Documents/src/Batching/Publisher{T}.cs

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ internal abstract partial class Publisher<T> : IDisposable, IAsyncDisposable
7171
/// Manual retry policy to add exponential back-off after throttled
7272
/// requests.
7373
/// </summary>
74-
private ManualRetryDelay _manualRetries = new ManualRetryDelay();
74+
private ManualRetryDelay _manualRetries;
7575

7676
/// <summary>
7777
/// Gets the number of indexing actions currently awaiting submission.
@@ -104,7 +104,7 @@ public int IndexingActionsCount
104104
/// Gets a value indicating the number of actions to group into a batch
105105
/// when tuning the behavior of the publisher.
106106
/// </summary>
107-
protected int BatchActionSize { get; } // TODO: Not automatically tuning yet
107+
protected int BatchActionCount { get; } // TODO: Not automatically tuning yet
108108

109109
/// <summary>
110110
/// Gets a value indicating the number of bytes to use when tuning the
@@ -115,7 +115,7 @@ public int IndexingActionsCount
115115
/// <summary>
116116
/// Gets the number of times to retry a failed document.
117117
/// </summary>
118-
protected int RetryCount { get; } // TODO: Not configurable yet
118+
protected int MaxRetries { get; }
119119

120120
/// <summary>
121121
/// Creates a new Publisher which immediately starts listening to
@@ -134,9 +134,16 @@ public int IndexingActionsCount
134134
/// The number of bytes to use when tuning the behavior of the
135135
/// publisher.
136136
/// </param>
137-
/// <param name="retryCount">
137+
/// <param name="maxRetries">
138138
/// The number of times to retry a failed document.
139139
/// </param>
140+
/// <param name="retryDelay">
141+
/// The initial retry delay on which to base calculations for a
142+
/// backoff-based approach.
143+
/// </param>
144+
/// <param name="maxRetryDelay">
145+
/// The maximum permissible delay between retry attempts.
146+
/// </param>
140147
/// <param name="publisherCancellationToken">
141148
/// A <see cref="CancellationToken"/> to use when publishing.
142149
/// </param>
@@ -145,15 +152,20 @@ public Publisher(
145152
TimeSpan? autoFlushInterval,
146153
int? batchActionSize,
147154
int? batchPayloadSize,
148-
int? retryCount,
155+
int maxRetries,
156+
TimeSpan retryDelay,
157+
TimeSpan maxRetryDelay,
149158
CancellationToken publisherCancellationToken)
150159
{
151160
AutoFlush = autoFlush;
152161
AutoFlushInterval = autoFlushInterval <= TimeSpan.Zero ? null : autoFlushInterval;
153162
PublisherCancellationToken = publisherCancellationToken;
154-
BatchActionSize = batchActionSize ?? SearchIndexingBufferedSenderOptions<T>.DefaultBatchActionSize;
163+
BatchActionCount = batchActionSize ?? SearchIndexingBufferedSenderOptions<T>.DefaultInitialBatchActionCount;
155164
BatchPayloadSize = batchPayloadSize ?? SearchIndexingBufferedSenderOptions<T>.DefaultBatchPayloadSize;
156-
RetryCount = retryCount ?? SearchIndexingBufferedSenderOptions<T>.DefaultRetryCount;
165+
MaxRetries = maxRetries;
166+
167+
// Setup manual retries
168+
_manualRetries = new ManualRetryDelay { Delay = retryDelay, MaxDelay = maxRetryDelay };
157169

158170
// Start the message loop
159171
_readerLoop = Task.Run(ProcessMessagesAsync, publisherCancellationToken);
@@ -372,7 +384,7 @@ private void EnsureNotDisposed()
372384
/// <param name="flush">Whether we're flushing.</param>
373385
/// <returns>If we have a full batch ready to send.</returns>
374386
private bool HasBatch(bool flush = false) =>
375-
IndexingActionsCount > (flush ? 0 : BatchActionSize);
387+
IndexingActionsCount > (flush ? 0 : BatchActionCount);
376388

377389
/// <summary>
378390
/// Publish as many batches are ready.
@@ -389,7 +401,7 @@ private async Task PublishAsync(bool flush, CancellationToken cancellationToken)
389401
do
390402
{
391403
List<PublisherAction<T>> batch = new List<PublisherAction<T>>(
392-
capacity: Math.Min(BatchActionSize, IndexingActionsCount));
404+
capacity: Math.Min(BatchActionCount, IndexingActionsCount));
393405

394406
// Prefer pulling from the _retry queue first
395407
if (!FillBatchFromQueue(batch, _retry))
@@ -415,7 +427,7 @@ bool FillBatchFromQueue(List<PublisherAction<T>> batch, Queue< PublisherAction<T
415427

416428
while (queue.Count > 0)
417429
{
418-
if (batch.Count < BatchActionSize)
430+
if (batch.Count < BatchActionCount)
419431
{
420432
batch.Add(queue.Dequeue());
421433
}
@@ -471,7 +483,7 @@ protected bool EnqueueRetry(
471483
PublisherAction<T> action,
472484
bool skipIncrement = false)
473485
{
474-
bool retriable = skipIncrement || action.RetryAttempts++ < RetryCount;
486+
bool retriable = skipIncrement || action.RetryAttempts++ < MaxRetries;
475487
if (retriable)
476488
{
477489
_retry.Enqueue(action);

sdk/search/Azure.Search.Documents/src/Batching/SearchIndexingBufferedSenderOptions{T}.cs

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,13 @@ public class SearchIndexingBufferedSenderOptions<T>
4040
public CancellationToken FlushCancellationToken { get; set; }
4141

4242
/// <summary>
43-
/// Gets or sets a value indicating the number of actions to group into
44-
/// a batch when tuning the behavior of the sender. The default value
45-
/// is 512. The current service maximum is 32000.
43+
/// Gets or sets a value indicating the initial number of actions to
44+
/// group into a batch when tuning the behavior of the sender. The
45+
/// default value will be 512 if unset. The current service maximum is
46+
/// 32000.
4647
/// </summary>
47-
internal int? BatchActionSize { get; set; } = DefaultBatchActionSize;
48-
internal const int DefaultBatchActionSize = 512;
48+
public int? InitialBatchActionCount { get; set; } = null;
49+
internal const int DefaultInitialBatchActionCount = 512;
4950

5051
/// <summary>
5152
/// Gets or sets a value indicating the number of bytes to use when
@@ -56,10 +57,32 @@ public class SearchIndexingBufferedSenderOptions<T>
5657
internal const int DefaultBatchPayloadSize = 500 * 1024;
5758

5859
/// <summary>
59-
/// Gets or sets the number of times to retry a failed document.
60+
/// Gets or sets the number of times to retry a failed document. Note
61+
/// that this is different than <see cref="Azure.Core.RetryOptions.MaxRetries"/>
62+
/// which will try to resend the same request. This property is used
63+
/// to control the number of attempts we will make to submit an indexing
64+
/// action.
6065
/// </summary>
61-
internal int? RetryCount { get; set; } = DefaultRetryCount;
62-
internal const int DefaultRetryCount = 3;
66+
public int MaxRetries { get; set; } = 3;
67+
68+
/// <summary>
69+
/// The initial retry delay. The delay will increase exponentially with
70+
/// subsequent retries and add random jitter. Note that this is
71+
/// different than <see cref="Azure.Core.RetryOptions.Delay"/> which
72+
/// will only delay before resending the same request. This property
73+
/// is used to add delay between additional batch submissions when our
74+
/// requests are being throttled by the service.
75+
/// </summary>
76+
public TimeSpan RetryDelay { get; set; } = TimeSpan.FromSeconds(0.8);
77+
78+
/// <summary>
79+
/// The maximum permissible delay between retry attempts. Note that
80+
/// this is different than <see cref="Azure.Core.RetryOptions.MaxDelay"/>
81+
/// which will only delay before resending the same request. This
82+
/// property is used to add delay between additional batch
83+
/// submissions when our requests are being throttled by the service.
84+
/// </summary>
85+
public TimeSpan MaxRetryDelay { get; set; } = TimeSpan.FromMinutes(1);
6386

6487
/// <summary>
6588
/// Gets or sets a function that can be used to access the index key

sdk/search/Azure.Search.Documents/src/Batching/SearchIndexingBufferedSender{T}.cs

Lines changed: 60 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -127,9 +127,11 @@ internal SearchIndexingBufferedSender(
127127
this,
128128
options.AutoFlush,
129129
options.AutoFlushInterval,
130-
options.BatchActionSize,
130+
options.InitialBatchActionCount,
131131
options.BatchPayloadSize,
132-
options.RetryCount,
132+
options.MaxRetries,
133+
options.RetryDelay,
134+
options.MaxRetryDelay,
133135
options.FlushCancellationToken);
134136
}
135137

@@ -331,10 +333,20 @@ static Func<T, string> CompileAccessor(string key)
331333
/// </returns>
332334
internal async Task RaiseActionAddedAsync(
333335
IndexDocumentsAction<T> action,
334-
CancellationToken cancellationToken) =>
335-
await ActionAddedAsync
336-
.RaiseAsync(action, cancellationToken)
337-
.ConfigureAwait(false);
336+
CancellationToken cancellationToken)
337+
{
338+
try
339+
{
340+
await ActionAddedAsync
341+
.RaiseAsync(action, cancellationToken)
342+
.ConfigureAwait(false);
343+
}
344+
catch
345+
{
346+
// TODO: #16706 - Log any exceptions raised from async events
347+
// we can't let bubble out because they'd tear down the process
348+
}
349+
}
338350

339351
/// <summary>
340352
/// Raise the <see cref="ActionSentAsync"/> event.
@@ -346,10 +358,20 @@ await ActionAddedAsync
346358
/// </returns>
347359
internal async Task RaiseActionSentAsync(
348360
IndexDocumentsAction<T> action,
349-
CancellationToken cancellationToken) =>
350-
await ActionSentAsync
351-
.RaiseAsync(action, cancellationToken)
352-
.ConfigureAwait(false);
361+
CancellationToken cancellationToken)
362+
{
363+
try
364+
{
365+
await ActionSentAsync
366+
.RaiseAsync(action, cancellationToken)
367+
.ConfigureAwait(false);
368+
}
369+
catch
370+
{
371+
// TODO: #16706 - Log any exceptions raised from async events
372+
// we can't let bubble out because they'd tear down the process
373+
}
374+
}
353375

354376
/// <summary>
355377
/// Raise the <see cref="ActionCompletedAsync"/> event.
@@ -363,10 +385,20 @@ await ActionSentAsync
363385
internal async Task RaiseActionCompletedAsync(
364386
IndexDocumentsAction<T> action,
365387
IndexingResult result,
366-
CancellationToken cancellationToken) =>
367-
await ActionCompletedAsync
368-
.RaiseAsync(action, result, cancellationToken)
369-
.ConfigureAwait(false);
388+
CancellationToken cancellationToken)
389+
{
390+
try
391+
{
392+
await ActionCompletedAsync
393+
.RaiseAsync(action, result, cancellationToken)
394+
.ConfigureAwait(false);
395+
}
396+
catch
397+
{
398+
// TODO: #16706 - Log any exceptions raised from async events
399+
// we can't let bubble out because they'd tear down the process
400+
}
401+
}
370402

371403
/// <summary>
372404
/// Raise the <see cref="ActionFailedAsync"/> event.
@@ -382,10 +414,20 @@ internal async Task RaiseActionFailedAsync(
382414
IndexDocumentsAction<T> action,
383415
IndexingResult result,
384416
Exception exception,
385-
CancellationToken cancellationToken) =>
386-
await ActionFailedAsync
387-
.RaiseAsync(action, result, exception, cancellationToken)
388-
.ConfigureAwait(false);
417+
CancellationToken cancellationToken)
418+
{
419+
try
420+
{
421+
await ActionFailedAsync
422+
.RaiseAsync(action, result, exception, cancellationToken)
423+
.ConfigureAwait(false);
424+
}
425+
catch
426+
{
427+
// TODO: #16706 - Log any exceptions raised from async events
428+
// we can't let bubble out because they'd tear down the process
429+
}
430+
}
389431
#endregion
390432

391433
#region Index Documents

sdk/search/Azure.Search.Documents/src/Batching/SearchIndexingPublisher{T}.cs

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,16 @@ internal class SearchIndexingPublisher<T> : Publisher<IndexDocumentsAction<T>>
4545
/// The number of bytes to use when tuning the behavior of the
4646
/// publisher.
4747
/// </param>
48-
/// <param name="retryCount">
48+
/// <param name="maxRetries">
4949
/// The number of times to retry a failed document.
5050
/// </param>
51+
/// <param name="retryDelay">
52+
/// The initial retry delay on which to base calculations for a
53+
/// backoff-based approach.
54+
/// </param>
55+
/// <param name="maxRetryDelay">
56+
/// The maximum permissible delay between retry attempts.
57+
/// </param>
5158
/// <param name="publisherCancellationToken">
5259
/// A <see cref="CancellationToken"/> to use when publishing.
5360
/// </param>
@@ -57,9 +64,19 @@ public SearchIndexingPublisher(
5764
TimeSpan? autoFlushInterval,
5865
int? batchActionSize,
5966
int? batchPayloadSize,
60-
int? retryCount,
67+
int maxRetries,
68+
TimeSpan retryDelay,
69+
TimeSpan maxRetryDelay,
6170
CancellationToken publisherCancellationToken)
62-
: base(autoFlush, autoFlushInterval, batchActionSize, batchPayloadSize, retryCount, publisherCancellationToken)
71+
: base(
72+
autoFlush,
73+
autoFlushInterval,
74+
batchActionSize,
75+
batchPayloadSize,
76+
maxRetries,
77+
retryDelay,
78+
maxRetryDelay,
79+
publisherCancellationToken)
6380
{
6481
_sender = sender;
6582
}

sdk/search/Azure.Search.Documents/tests/Batching/AsyncEventExtensionsTests.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -369,7 +369,7 @@ public async Task Cancels_AlreadyFinished()
369369
[Test]
370370
public async Task Cancels_StillRunning()
371371
{
372-
TestHandler test = new TestHandler() { Delay = TimeSpan.FromMilliseconds(100) };
372+
TestHandler test = new TestHandler() { Delay = TimeSpan.FromMilliseconds(500) };
373373
CancellationTokenSource cancellation = new CancellationTokenSource();
374374

375375
Func<EventArgs, CancellationToken, Task> handler = null;
@@ -391,9 +391,9 @@ await Task.WhenAll(
391391
[Test]
392392
public async Task Cancels_All()
393393
{
394-
TestHandler first = new TestHandler() { Delay = TimeSpan.FromMilliseconds(100) };
395-
TestHandler second = new TestHandler() { Delay = TimeSpan.FromMilliseconds(100) };
396-
TestHandler third = new TestHandler() { Delay = TimeSpan.FromMilliseconds(100) };
394+
TestHandler first = new TestHandler() { Delay = TimeSpan.FromMilliseconds(500) };
395+
TestHandler second = new TestHandler() { Delay = TimeSpan.FromMilliseconds(500) };
396+
TestHandler third = new TestHandler() { Delay = TimeSpan.FromMilliseconds(500) };
397397
CancellationTokenSource cancellation = new CancellationTokenSource();
398398

399399
Func<EventArgs, CancellationToken, Task> handler = null;

0 commit comments

Comments
 (0)