Skip to content

Commit fc4e224

Browse files
authored
Merge pull request #5 from atc-net/feature/service-bus-publish-batch
Add new PublishAsync to send multiple messages in batches
2 parents a23a511 + 7fc9130 commit fc4e224

File tree

4 files changed

+350
-2
lines changed

4 files changed

+350
-2
lines changed

README.md

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,27 @@ internal class BarPublisher
115115
}
116116
```
117117

118+
### Batch Publish
119+
120+
Multiple messages can also be published in batches to a topic or queue. Simply call the `PublishAsync` method with a list of messages. The messages will be added to a batch until the batch is full before it the batch is published and continue to work on the remaining messages. This process continues until all messages are consumed.
121+
122+
An `InvalidOperationException` is thrown if a single message cannot fit inside a batch by itself. In this case, any previous published batches will not be rolled back and any remaining messages will remain unpublished.
123+
124+
```csharp
125+
internal class BarBatchPublisher
126+
{
127+
private readonly IServiceBusPublisher publisher;
128+
129+
public BarBatchPublisher(IServiceBusPublisher publisher)
130+
{
131+
this.publisher = publisher;
132+
}
133+
134+
public Task Publish(object[] messages)
135+
=> publisher.PublishAsync("[existing servicebus topic]", messages);
136+
}
137+
```
138+
118139
Here's a full example of how to use the publishers above using a Minimal API setup (SwaggerUI enabled) with a single endpoint called `POST /data` that accepts a simple request body `{ "a": "string", "b": "string", "c": "string" }` which publishes the request to an EventHub and a ServiceBus topic
119140

120141
```csharp

src/Atc.Azure.Messaging/ServiceBus/IServiceBusPublisher.cs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,24 @@ Task PublishAsync(
2323
TimeSpan? timeToLive = null,
2424
CancellationToken cancellationToken = default);
2525

26+
/// <summary>
27+
/// Publishes multiple messages in batches. The list of messages will be split in multiple batches if the messages exceeds a single batch size.
28+
/// </summary>
29+
/// <param name="topicOrQueue">The topic or queue name.</param>
30+
/// <param name="messages">The messages to be published.</param>
31+
/// <param name="sessionId">Optional id for appending the message to a known session. If not set, then defaults to a new session.</param>
32+
/// <param name="properties">Optional custom metadata about the message.</param>
33+
/// <param name="timeToLive">Optional <see cref="TimeSpan"/> for message to be consumed. If not set, then defaults to the value specified on queue or topic.</param>
34+
/// <param name="cancellationToken">The <see cref="CancellationToken"/> used.</param>
35+
/// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns>
36+
Task PublishAsync(
37+
string topicOrQueue,
38+
IReadOnlyCollection<object> messages,
39+
string? sessionId = null,
40+
IDictionary<string, string>? properties = null,
41+
TimeSpan? timeToLive = null,
42+
CancellationToken cancellationToken = default);
43+
2644
/// <summary>
2745
/// Schedules a message for publishing at a later time.
2846
/// </summary>
@@ -44,7 +62,7 @@ Task<long> SchedulePublishAsync(
4462
CancellationToken cancellationToken = default);
4563

4664
/// <summary>
47-
/// Cansels a scheduled publish of a message if it has not been published yet.
65+
/// Cancels a scheduled publish of a message if it has not been published yet.
4866
/// </summary>
4967
/// <param name="topicOrQueue">The topic or queue name.</param>
5068
/// <param name="sequenceNumber">The sequence number of the scheduled message to cancel.</param>

src/Atc.Azure.Messaging/ServiceBus/ServiceBusPublisher.cs

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,60 @@ public Task PublishAsync(
2828
cancellationToken);
2929
}
3030

31+
public async Task PublishAsync(
32+
string topicOrQueue,
33+
IReadOnlyCollection<object> messages,
34+
string? sessionId = null,
35+
IDictionary<string, string>? properties = null,
36+
TimeSpan? timeToLive = null,
37+
CancellationToken cancellationToken = default)
38+
{
39+
var sender = clientProvider.GetSender(topicOrQueue);
40+
41+
var batch = await sender
42+
.CreateMessageBatchAsync(cancellationToken)
43+
.ConfigureAwait(false);
44+
45+
foreach (var message in messages)
46+
{
47+
if (cancellationToken.IsCancellationRequested)
48+
{
49+
break;
50+
}
51+
52+
var busMessage = CreateServiceBusMessage(
53+
sessionId,
54+
JsonSerializer.Serialize(message),
55+
properties,
56+
timeToLive);
57+
58+
if (batch.TryAddMessage(busMessage))
59+
{
60+
continue;
61+
}
62+
63+
await sender
64+
.SendMessagesAsync(batch, cancellationToken)
65+
.ConfigureAwait(false);
66+
67+
batch.Dispose();
68+
batch = await sender
69+
.CreateMessageBatchAsync(cancellationToken)
70+
.ConfigureAwait(false);
71+
72+
if (!batch.TryAddMessage(busMessage))
73+
{
74+
throw new InvalidOperationException("Unable to add message to batch. The message size exceeds what can be send in a batch");
75+
}
76+
}
77+
78+
await sender
79+
.SendMessagesAsync(batch, cancellationToken)
80+
.ConfigureAwait(false);
81+
82+
batch.Dispose();
83+
}
84+
3185
public Task<long> SchedulePublishAsync(
3286
string topicOrQueue,
3387
object message,
@@ -89,5 +143,5 @@ private static ServiceBusMessage CreateServiceBusMessage(
89143
}
90144

91145
return message;
92-
}
146+
}
93147
}

test/Atc.Azure.Messaging.Tests/ServiceBus/ServiceBusPublisherTests.cs

Lines changed: 255 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,4 +208,259 @@ await sut.PublishAsync(
208208
.Should()
209209
.Be(TimeSpan.MaxValue);
210210
}
211+
212+
[Theory, AutoNSubstituteData]
213+
internal async Task Should_Get_ServiceBusSender_For_Topic_On_Batch(
214+
[Frozen] IServiceBusSenderProvider provider,
215+
ServiceBusPublisher sut,
216+
[Substitute] ServiceBusSender sender,
217+
string topicName,
218+
object messageBody,
219+
IDictionary<string, string> properties,
220+
TimeSpan timeToLive,
221+
string sessionId,
222+
CancellationToken cancellationToken)
223+
{
224+
var messageBatch = ServiceBusModelFactory.ServiceBusMessageBatch(265000, new List<ServiceBusMessage>());
225+
226+
provider
227+
.GetSender(default!)
228+
.ReturnsForAnyArgs(sender);
229+
230+
sender
231+
.CreateMessageBatchAsync(default!)
232+
.ReturnsForAnyArgs(messageBatch);
233+
234+
await sut.PublishAsync(
235+
topicName,
236+
new object[] { messageBody },
237+
sessionId,
238+
properties,
239+
timeToLive,
240+
cancellationToken);
241+
242+
_ = provider
243+
.Received(1)
244+
.GetSender(topicName);
245+
}
246+
247+
[Theory, AutoNSubstituteData]
248+
internal async Task Should_Create_MessageBatch(
249+
[Frozen] IServiceBusSenderProvider provider,
250+
ServiceBusPublisher sut,
251+
[Substitute] ServiceBusSender sender,
252+
string topicName,
253+
object messageBody,
254+
IDictionary<string, string> properties,
255+
TimeSpan timeToLive,
256+
string sessionId,
257+
CancellationToken cancellationToken)
258+
{
259+
var messageBatch = ServiceBusModelFactory.ServiceBusMessageBatch(265000, new List<ServiceBusMessage>());
260+
261+
provider
262+
.GetSender(default!)
263+
.ReturnsForAnyArgs(sender);
264+
265+
sender
266+
.CreateMessageBatchAsync(default!)
267+
.ReturnsForAnyArgs(messageBatch);
268+
269+
await sut.PublishAsync(
270+
topicName,
271+
new object[] { messageBody },
272+
sessionId,
273+
properties,
274+
timeToLive,
275+
cancellationToken);
276+
277+
_ = await sender
278+
.Received(1)
279+
.CreateMessageBatchAsync(cancellationToken);
280+
}
281+
282+
[Theory, AutoNSubstituteData]
283+
internal async Task Should_Send_Message_On_ServiceBusSender_On_Message_Batch(
284+
[Frozen] IServiceBusSenderProvider provider,
285+
ServiceBusPublisher sut,
286+
[Substitute] ServiceBusSender sender,
287+
string topicName,
288+
object messageBody,
289+
IDictionary<string, string> properties,
290+
TimeSpan timeToLive,
291+
string sessionId,
292+
CancellationToken cancellationToken)
293+
{
294+
var batchList = new List<ServiceBusMessage>();
295+
var messageBatch = ServiceBusModelFactory.ServiceBusMessageBatch(265000, batchList);
296+
297+
provider
298+
.GetSender(default!)
299+
.ReturnsForAnyArgs(sender);
300+
301+
sender
302+
.CreateMessageBatchAsync(default!)
303+
.ReturnsForAnyArgs(messageBatch);
304+
305+
await sut.PublishAsync(
306+
topicName,
307+
new object[] { messageBody },
308+
sessionId,
309+
properties,
310+
timeToLive,
311+
cancellationToken);
312+
313+
var sendMessageBatch = sender
314+
.ReceivedCallWithArgument<ServiceBusMessageBatch>();
315+
316+
sendMessageBatch.Count.Should().Be(1);
317+
batchList.Count.Should().Be(1);
318+
319+
batchList[0].MessageId
320+
.Should()
321+
.NotBeNullOrEmpty();
322+
batchList[0].Body
323+
.ToString()
324+
.Should()
325+
.BeEquivalentTo(JsonSerializer.Serialize(messageBody));
326+
batchList[0].ApplicationProperties
327+
.Should()
328+
.BeEquivalentTo(properties);
329+
batchList[0].TimeToLive
330+
.Should()
331+
.Be(timeToLive);
332+
}
333+
334+
[Theory, AutoNSubstituteData]
335+
internal async Task Should_Create_New_MessageBatch_When_First_Batch_Is_Full(
336+
[Frozen] IServiceBusSenderProvider provider,
337+
ServiceBusPublisher sut,
338+
[Substitute] ServiceBusSender sender,
339+
string topicName,
340+
object messageBody,
341+
IDictionary<string, string> properties,
342+
TimeSpan timeToLive,
343+
string sessionId,
344+
CancellationToken cancellationToken)
345+
{
346+
var firstBatchList = new List<ServiceBusMessage>();
347+
var secondBatchList = new List<ServiceBusMessage>();
348+
var firstMessageBatch = ServiceBusModelFactory.ServiceBusMessageBatch(265000, firstBatchList, tryAddCallback: _ => false);
349+
var secondMessageBatch = ServiceBusModelFactory.ServiceBusMessageBatch(265000, secondBatchList);
350+
351+
provider
352+
.GetSender(default!)
353+
.ReturnsForAnyArgs(sender);
354+
355+
sender
356+
.CreateMessageBatchAsync(default!)
357+
.ReturnsForAnyArgs(firstMessageBatch, secondMessageBatch);
358+
359+
await sut.PublishAsync(
360+
topicName,
361+
new object[] { messageBody },
362+
sessionId,
363+
properties,
364+
timeToLive,
365+
cancellationToken);
366+
367+
_ = await sender
368+
.Received(2)
369+
.CreateMessageBatchAsync(cancellationToken);
370+
}
371+
372+
[Theory, AutoNSubstituteData]
373+
internal async Task Should_Send_Multiple_Batches_If_When_Messages_Exceeds_Single_Batch(
374+
[Frozen] IServiceBusSenderProvider provider,
375+
ServiceBusPublisher sut,
376+
[Substitute] ServiceBusSender sender,
377+
string topicName,
378+
object messageBody,
379+
IDictionary<string, string> properties,
380+
TimeSpan timeToLive,
381+
string sessionId,
382+
CancellationToken cancellationToken)
383+
{
384+
var firstBatchList = new List<ServiceBusMessage>();
385+
var secondBatchList = new List<ServiceBusMessage>();
386+
var firstMessageBatch = ServiceBusModelFactory.ServiceBusMessageBatch(265000, firstBatchList, tryAddCallback: _ => false);
387+
var secondMessageBatch = ServiceBusModelFactory.ServiceBusMessageBatch(265000, secondBatchList);
388+
389+
provider
390+
.GetSender(default!)
391+
.ReturnsForAnyArgs(sender);
392+
393+
sender
394+
.CreateMessageBatchAsync(default!)
395+
.ReturnsForAnyArgs(firstMessageBatch, secondMessageBatch);
396+
397+
await sut.PublishAsync(
398+
topicName,
399+
new object[] { messageBody },
400+
sessionId,
401+
properties,
402+
timeToLive,
403+
cancellationToken);
404+
405+
_ = sender
406+
.Received(1)
407+
.SendMessagesAsync(firstMessageBatch, cancellationToken);
408+
409+
_ = sender
410+
.Received(1)
411+
.SendMessagesAsync(secondMessageBatch, cancellationToken);
412+
413+
firstBatchList.Should().BeEmpty();
414+
secondBatchList.Count.Should().Be(1);
415+
416+
secondBatchList[0].MessageId
417+
.Should()
418+
.NotBeNullOrEmpty();
419+
secondBatchList[0].Body
420+
.ToString()
421+
.Should()
422+
.BeEquivalentTo(JsonSerializer.Serialize(messageBody));
423+
secondBatchList[0].ApplicationProperties
424+
.Should()
425+
.BeEquivalentTo(properties);
426+
secondBatchList[0].TimeToLive
427+
.Should()
428+
.Be(timeToLive);
429+
}
430+
431+
[Theory, AutoNSubstituteData]
432+
internal Task Should_Throw_If_Message_Is_Too_Large_To_Fit_In_New_Batch(
433+
[Frozen] IServiceBusSenderProvider provider,
434+
ServiceBusPublisher sut,
435+
[Substitute] ServiceBusSender sender,
436+
string topicName,
437+
object messageBody,
438+
IDictionary<string, string> properties,
439+
TimeSpan timeToLive,
440+
string sessionId,
441+
CancellationToken cancellationToken)
442+
{
443+
var firstBatchList = new List<ServiceBusMessage>();
444+
var secondBatchList = new List<ServiceBusMessage>();
445+
var firstMessageBatch = ServiceBusModelFactory.ServiceBusMessageBatch(265000, firstBatchList, tryAddCallback: _ => false);
446+
var secondMessageBatch = ServiceBusModelFactory.ServiceBusMessageBatch(265000, secondBatchList, tryAddCallback: _ => false);
447+
448+
provider
449+
.GetSender(default!)
450+
.ReturnsForAnyArgs(sender);
451+
452+
sender
453+
.CreateMessageBatchAsync(default!)
454+
.ReturnsForAnyArgs(firstMessageBatch, secondMessageBatch);
455+
456+
var act = async () => await sut.PublishAsync(
457+
topicName,
458+
new object[] { messageBody },
459+
sessionId,
460+
properties,
461+
timeToLive,
462+
cancellationToken);
463+
464+
return act.Should().ThrowAsync<InvalidOperationException>();
465+
}
211466
}

0 commit comments

Comments
 (0)