Skip to content

Commit 74eccc4

Browse files
authored
feat(csharp): add transformation helpers for object indexing with a transformation (#5452)
1 parent fc185dc commit 74eccc4

File tree

10 files changed

+683
-53
lines changed

10 files changed

+683
-53
lines changed
Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Linq;
4+
using System.Text.Json;
5+
using System.Threading;
6+
using System.Threading.Tasks;
7+
using Algolia.Search.Exceptions;
8+
using Algolia.Search.Http;
9+
using Algolia.Search.Models.Ingestion;
10+
using Algolia.Search.Serializer;
11+
using Algolia.Search.Utils;
12+
13+
namespace Algolia.Search.Clients;
14+
15+
public partial interface IIngestionClient
16+
{
17+
/// <summary>
18+
/// Helper method to call ChunkedPushAsync and convert the response types.
19+
/// This simplifies SearchClient helpers that need to use IngestionClient.
20+
/// </summary>
21+
Task<List<WatchResponse>> ChunkedPushAsync(
22+
string indexName,
23+
IEnumerable<object> objects,
24+
Models.Ingestion.Action action,
25+
bool waitForTasks = false,
26+
int batchSize = 1000,
27+
string referenceIndexName = null,
28+
RequestOptions options = null,
29+
CancellationToken cancellationToken = default
30+
);
31+
32+
/// <summary>
33+
/// Synchronous version of ChunkedPushAsync
34+
/// </summary>
35+
List<WatchResponse> ChunkedPush(
36+
string indexName,
37+
IEnumerable<object> objects,
38+
Models.Ingestion.Action action,
39+
bool waitForTasks = false,
40+
int batchSize = 1000,
41+
string referenceIndexName = null,
42+
RequestOptions options = null,
43+
CancellationToken cancellationToken = default
44+
);
45+
}
46+
47+
public partial class IngestionClient : IIngestionClient
48+
{
49+
/// <summary>
50+
/// Helper: Chunks the given `objects` list in subset of 1000 elements max in order to make it fit
51+
/// in `push` requests by leveraging the Transformation pipeline setup in the Push connector
52+
/// (https://www.algolia.com/doc/guides/sending-and-managing-data/send-and-update-your-data/connectors/push/).
53+
/// </summary>
54+
/// <param name="indexName">The `indexName` to push `objects` to.</param>
55+
/// <param name="objects">The array of `objects` to store in the given Algolia `indexName`.</param>
56+
/// <param name="action">The `action` to perform on the given array of `objects`.</param>
57+
/// <param name="waitForTasks">Whether or not we should wait until every push task has been processed. This operation may slow the total execution time of this method but is more reliable.</param>
58+
/// <param name="batchSize">The size of the chunk of `objects`. The number of push calls will be equal to `length(objects) / batchSize`. Defaults to 1000.</param>
59+
/// <param name="referenceIndexName">This is required when targeting an index that does not have a push connector setup (e.g. a tmp index), but you wish to attach another index's transformation to it (e.g. the source index name).</param>
60+
/// <param name="options">Add extra http header or query parameters to Algolia.</param>
61+
/// <param name="cancellationToken">Cancellation token to cancel the request</param>
62+
/// <returns>List of WatchResponse objects from the push operations</returns>
63+
public async Task<List<WatchResponse>> ChunkedPushAsync(
64+
string indexName,
65+
IEnumerable<object> objects,
66+
Algolia.Search.Models.Ingestion.Action action,
67+
bool waitForTasks = false,
68+
int batchSize = 1000,
69+
string referenceIndexName = null,
70+
RequestOptions options = null,
71+
CancellationToken cancellationToken = default
72+
)
73+
{
74+
var objectsList = objects.ToList();
75+
var responses = new List<WatchResponse>();
76+
var waitBatchSize = Math.Max(batchSize / 10, 1);
77+
var offset = 0;
78+
79+
for (var i = 0; i < objectsList.Count; i += batchSize)
80+
{
81+
var chunk = objectsList.Skip(i).Take(batchSize);
82+
var records = new List<PushTaskRecords>();
83+
84+
foreach (var obj in chunk)
85+
{
86+
var jsonString = JsonSerializer.Serialize(obj, JsonConfig.Options);
87+
var record = JsonSerializer.Deserialize<PushTaskRecords>(jsonString, JsonConfig.Options);
88+
records.Add(record);
89+
}
90+
91+
var payload = new PushTaskPayload(action, records);
92+
93+
var response = await PushAsync(
94+
indexName,
95+
payload,
96+
watch: null,
97+
referenceIndexName: referenceIndexName,
98+
options: options,
99+
cancellationToken: cancellationToken
100+
)
101+
.ConfigureAwait(false);
102+
103+
responses.Add(response);
104+
105+
if (
106+
waitForTasks
107+
&& responses.Count > 0
108+
&& (responses.Count % waitBatchSize == 0 || i + batchSize >= objectsList.Count)
109+
)
110+
{
111+
for (var j = offset; j < responses.Count; j++)
112+
{
113+
var resp = responses[j];
114+
if (string.IsNullOrEmpty(resp.EventID))
115+
{
116+
throw new AlgoliaException(
117+
"Received unexpected response from the push endpoint, eventID must not be null or empty"
118+
);
119+
}
120+
121+
await RetryHelper.RetryUntil(
122+
async () =>
123+
{
124+
try
125+
{
126+
return await GetEventAsync(
127+
resp.RunID,
128+
resp.EventID,
129+
cancellationToken: cancellationToken
130+
)
131+
.ConfigureAwait(false);
132+
}
133+
catch (AlgoliaApiException ex) when (ex.HttpErrorCode == 404)
134+
{
135+
return await Task.FromResult<Algolia.Search.Models.Ingestion.Event>(null);
136+
}
137+
},
138+
eventResponse => eventResponse != null,
139+
maxRetries: 50,
140+
ct: cancellationToken
141+
)
142+
.ConfigureAwait(false);
143+
}
144+
offset = responses.Count;
145+
}
146+
}
147+
148+
return responses;
149+
}
150+
151+
/// <summary>
152+
/// Synchronous version of ChunkedPushAsync
153+
/// </summary>
154+
public List<WatchResponse> ChunkedPush(
155+
string indexName,
156+
IEnumerable<object> objects,
157+
Algolia.Search.Models.Ingestion.Action action,
158+
bool waitForTasks = false,
159+
int batchSize = 1000,
160+
string referenceIndexName = null,
161+
RequestOptions options = null,
162+
CancellationToken cancellationToken = default
163+
) =>
164+
AsyncHelper.RunSync(() =>
165+
ChunkedPushAsync(
166+
indexName,
167+
objects,
168+
action,
169+
waitForTasks,
170+
batchSize,
171+
referenceIndexName,
172+
options,
173+
cancellationToken
174+
)
175+
);
176+
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
using System;
2+
using System.Threading;
3+
using System.Threading.Tasks;
4+
using Algolia.Search.Exceptions;
5+
6+
namespace Algolia.Search.Utils;
7+
8+
/// <summary>
9+
/// A helper class to retry operations
10+
/// </summary>
11+
public static class RetryHelper
12+
{
13+
/// <summary>
14+
/// The default maximum number of retries
15+
/// </summary>
16+
public const int DefaultMaxRetries = 50;
17+
18+
/// <summary>
19+
/// Retry the given function until the validation function returns true or the maximum number of retries is reached
20+
/// </summary>
21+
/// <typeparam name="T">The type of the function's return value</typeparam>
22+
/// <param name="func">The function to retry</param>
23+
/// <param name="validate">The validation function</param>
24+
/// <param name="maxRetries">The maximum number of retries</param>
25+
/// <param name="timeout">A function that takes the retry count and returns the timeout in milliseconds before the next retry</param>
26+
/// <param name="ct">A cancellation token to cancel the operation</param>
27+
/// <returns>The result of the function if the validation function returns true</returns>
28+
/// <exception cref="AlgoliaException">Thrown if the maximum number of retries is reached</exception>
29+
public static async Task<T> RetryUntil<T>(
30+
Func<Task<T>> func,
31+
Func<T, bool> validate,
32+
int maxRetries = DefaultMaxRetries,
33+
Func<int, int> timeout = null,
34+
CancellationToken ct = default
35+
)
36+
{
37+
timeout ??= NextDelay;
38+
39+
var retryCount = 0;
40+
while (retryCount < maxRetries)
41+
{
42+
var resp = await func().ConfigureAwait(false);
43+
if (validate(resp))
44+
{
45+
return resp;
46+
}
47+
48+
await Task.Delay(timeout(retryCount), ct).ConfigureAwait(false);
49+
retryCount++;
50+
}
51+
52+
throw new AlgoliaException(
53+
"The maximum number of retries exceeded. (" + (retryCount + 1) + "/" + maxRetries + ")"
54+
);
55+
}
56+
57+
private static int NextDelay(int retryCount)
58+
{
59+
return Math.Min(retryCount * 200, 5000);
60+
}
61+
}

0 commit comments

Comments
 (0)