Skip to content

Commit 61d3b59

Browse files
authored
Open up Buffered Sender Configurations (Azure#12297)
* Open up Buffered Sender Configurations * Response to PR Comments * Update API File * Added Jitter value * PR Comments II * Add jitterValue to the correct place * Format * Sync Retry Logic with core-https * Changed Name to initialBatchActionCount * Update API
1 parent 9190ef5 commit 61d3b59

File tree

4 files changed

+88
-9
lines changed

4 files changed

+88
-9
lines changed

sdk/search/search-documents/review/search-documents.api.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1275,6 +1275,11 @@ export type SearchIndexingBufferedSenderMergeOrUploadDocumentsOptions = Operatio
12751275
// @public
12761276
export interface SearchIndexingBufferedSenderOptions {
12771277
autoFlush?: boolean;
1278+
flushWindowInMs?: number;
1279+
initialBatchActionCount?: number;
1280+
maxRetries?: number;
1281+
maxRetryDelayInMs?: number;
1282+
retryDelayInMs?: number;
12781283
}
12791284

12801285
// @public

sdk/search/search-documents/src/indexModels.ts

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,32 @@ export interface SearchIndexingBufferedSenderOptions {
3737
* Indicates if autoFlush is enabled.
3838
*/
3939
autoFlush?: boolean;
40+
/**
41+
* Initial Batch Action Count.
42+
*
43+
* A batch request will be sent once the documents
44+
* reach the initialBatchActionCount.
45+
*/
46+
initialBatchActionCount?: number;
47+
/**
48+
* Flush Window.
49+
*
50+
* A batch request will be sent after flushWindowInMs is
51+
* reached.
52+
*/
53+
flushWindowInMs?: number;
54+
/**
55+
* Maximum number of Retries
56+
*/
57+
maxRetries?: number;
58+
/**
59+
* Delay between retries
60+
*/
61+
retryDelayInMs?: number;
62+
/**
63+
* Max Delay between retries
64+
*/
65+
maxRetryDelayInMs?: number;
4066
}
4167

4268
/**

sdk/search/search-documents/src/searchIndexingBufferedSenderImpl.ts

Lines changed: 46 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,29 @@ import EventEmitter from "events";
1818
import { createSpan } from "./tracing";
1919
import { CanonicalCode } from "@opentelemetry/api";
2020
import { SearchIndexingBufferedSender } from "./searchIndexingBufferedSender";
21+
import { delay } from "@azure/core-http";
22+
import { getRandomIntegerInclusive } from "./serviceUtils";
2123

2224
/**
2325
* Default Batch Size
2426
*/
25-
export const DEFAULT_BATCH_SIZE: number = 1000;
27+
export const DEFAULT_BATCH_SIZE: number = 512;
2628
/**
2729
* Default window flush interval
2830
*/
2931
export const DEFAULT_FLUSH_WINDOW: number = 60000;
3032
/**
31-
* Default number of times to retry
33+
* Default number of times to retry.
3234
*/
3335
export const DEFAULT_RETRY_COUNT: number = 3;
36+
/**
37+
* Default retry delay.
38+
*/
39+
export const DEFAULT_RETRY_DELAY: number = 800;
40+
/**
41+
* Default Max Delay between retries.
42+
*/
43+
export const DEFAULT_MAX_RETRY_DELAY: number = 60000;
3444

3545
/**
3646
* Class used to perform buffered operations against a search index,
@@ -49,10 +59,22 @@ class SearchIndexingBufferedSenderImpl<T> implements SearchIndexingBufferedSende
4959
* Interval between flushes (in milliseconds).
5060
*/
5161
private flushWindowInMs: number;
62+
/**
63+
* Delay between retries
64+
*/
65+
private retryDelayInMs: number;
66+
/**
67+
* Maximum number of Retries
68+
*/
69+
private maxRetries: number;
70+
/**
71+
* Max Delay between retries
72+
*/
73+
private maxRetryDelayInMs: number;
5274
/**
5375
* Size of the batch.
5476
*/
55-
private batchSize: number;
77+
private initialBatchActionCount: number;
5678
/**
5779
* Batch object used to complete the service call.
5880
*/
@@ -75,9 +97,15 @@ class SearchIndexingBufferedSenderImpl<T> implements SearchIndexingBufferedSende
7597
*/
7698
constructor(client: SearchClient<T>, options: SearchIndexingBufferedSenderOptions = {}) {
7799
this.client = client;
100+
// General Configuration properties
78101
this.autoFlush = options.autoFlush ?? false;
79-
this.flushWindowInMs = DEFAULT_FLUSH_WINDOW;
80-
this.batchSize = DEFAULT_BATCH_SIZE;
102+
this.initialBatchActionCount = options.initialBatchActionCount ?? DEFAULT_BATCH_SIZE;
103+
this.flushWindowInMs = options.flushWindowInMs ?? DEFAULT_FLUSH_WINDOW;
104+
// Retry specific configuration properties
105+
this.retryDelayInMs = options.retryDelayInMs ?? DEFAULT_FLUSH_WINDOW;
106+
this.maxRetries = options.maxRetries ?? DEFAULT_RETRY_COUNT;
107+
this.maxRetryDelayInMs = options.maxRetryDelayInMs ?? DEFAULT_MAX_RETRY_DELAY;
108+
81109
this.batchObject = new IndexDocumentsBatch<T>();
82110
if (this.autoFlush) {
83111
const interval = setInterval(() => this.flush(), this.flushWindowInMs);
@@ -323,7 +351,7 @@ class SearchIndexingBufferedSenderImpl<T> implements SearchIndexingBufferedSende
323351
}
324352

325353
private isBatchReady(): boolean {
326-
return this.batchObject.actions.length >= this.batchSize;
354+
return this.batchObject.actions.length >= this.initialBatchActionCount;
327355
}
328356

329357
private async internalFlush(force: boolean, options: OperationOptions = {}): Promise<void> {
@@ -332,7 +360,7 @@ class SearchIndexingBufferedSenderImpl<T> implements SearchIndexingBufferedSende
332360
const actions: IndexDocumentsAction<T>[] = this.batchObject.actions;
333361
this.batchObject = new IndexDocumentsBatch<T>();
334362
while (actions.length > 0) {
335-
const actionsToSend = actions.splice(0, this.batchSize);
363+
const actionsToSend = actions.splice(0, this.initialBatchActionCount);
336364
await this.submitDocuments(actionsToSend, options);
337365
}
338366
}
@@ -341,7 +369,7 @@ class SearchIndexingBufferedSenderImpl<T> implements SearchIndexingBufferedSende
341369
private async submitDocuments(
342370
actionsToSend: IndexDocumentsAction<T>[],
343371
options: OperationOptions,
344-
retryAttempt: number = 0
372+
retryAttempt: number = 1
345373
): Promise<void> {
346374
try {
347375
for (const action of actionsToSend) {
@@ -354,7 +382,16 @@ class SearchIndexingBufferedSenderImpl<T> implements SearchIndexingBufferedSende
354382
// raise success event
355383
this.emitter.emit("batchSucceeded", result);
356384
} catch (e) {
357-
if (this.isRetryAbleError(e) && retryAttempt < DEFAULT_RETRY_COUNT) {
385+
if (this.isRetryAbleError(e) && retryAttempt <= this.maxRetries) {
386+
// Exponentially increase the delay each time
387+
const exponentialDelay = this.retryDelayInMs * Math.pow(2, retryAttempt);
388+
// Don't let the delay exceed the maximum
389+
const clampedExponentialDelay = Math.min(this.maxRetryDelayInMs, exponentialDelay);
390+
// Allow the final value to have some "jitter" (within 50% of the delay size) so
391+
// that retries across multiple clients don't occur simultaneously.
392+
const delayWithJitter =
393+
clampedExponentialDelay / 2 + getRandomIntegerInclusive(0, clampedExponentialDelay / 2);
394+
await delay(delayWithJitter);
358395
this.submitDocuments(actionsToSend, options, retryAttempt + 1);
359396
} else {
360397
this.emitter.emit("batchFailed", e);

sdk/search/search-documents/src/serviceUtils.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -574,3 +574,14 @@ export function convertDataDeletionDetectionPolicyToPublic(
574574

575575
return dataDeletionDetectionPolicy as SoftDeleteColumnDeletionDetectionPolicy;
576576
}
577+
578+
export function getRandomIntegerInclusive(min: number, max: number): number {
579+
// Make sure inputs are integers.
580+
min = Math.ceil(min);
581+
max = Math.floor(max);
582+
// Pick a random offset from zero to the size of the range.
583+
// Since Math.random() can never return 1, we have to make the range one larger
584+
// in order to be inclusive of the maximum value after we take the floor.
585+
const offset = Math.floor(Math.random() * (max - min + 1));
586+
return offset + min;
587+
}

0 commit comments

Comments
 (0)