@@ -208,4 +208,259 @@ await sut.PublishAsync(
208208 . Should ( )
209209 . Be ( TimeSpan . MaxValue ) ;
210210 }
211+
212+ [ Theory , AutoNSubstituteData ]
213+ internal async Task Should_Get_ServiceBusSender_For_Topic_On_Batch (
214+ [ Frozen ] IServiceBusSenderProvider provider ,
215+ ServiceBusPublisher sut ,
216+ [ Substitute ] ServiceBusSender sender ,
217+ string topicName ,
218+ object messageBody ,
219+ IDictionary < string , string > properties ,
220+ TimeSpan timeToLive ,
221+ string sessionId ,
222+ CancellationToken cancellationToken )
223+ {
224+ var messageBatch = ServiceBusModelFactory . ServiceBusMessageBatch ( 265000 , new List < ServiceBusMessage > ( ) ) ;
225+
226+ provider
227+ . GetSender ( default ! )
228+ . ReturnsForAnyArgs ( sender ) ;
229+
230+ sender
231+ . CreateMessageBatchAsync ( default ! )
232+ . ReturnsForAnyArgs ( messageBatch ) ;
233+
234+ await sut . PublishAsync (
235+ topicName ,
236+ new object [ ] { messageBody } ,
237+ sessionId ,
238+ properties ,
239+ timeToLive ,
240+ cancellationToken ) ;
241+
242+ _ = provider
243+ . Received ( 1 )
244+ . GetSender ( topicName ) ;
245+ }
246+
247+ [ Theory , AutoNSubstituteData ]
248+ internal async Task Should_Create_MessageBatch (
249+ [ Frozen ] IServiceBusSenderProvider provider ,
250+ ServiceBusPublisher sut ,
251+ [ Substitute ] ServiceBusSender sender ,
252+ string topicName ,
253+ object messageBody ,
254+ IDictionary < string , string > properties ,
255+ TimeSpan timeToLive ,
256+ string sessionId ,
257+ CancellationToken cancellationToken )
258+ {
259+ var messageBatch = ServiceBusModelFactory . ServiceBusMessageBatch ( 265000 , new List < ServiceBusMessage > ( ) ) ;
260+
261+ provider
262+ . GetSender ( default ! )
263+ . ReturnsForAnyArgs ( sender ) ;
264+
265+ sender
266+ . CreateMessageBatchAsync ( default ! )
267+ . ReturnsForAnyArgs ( messageBatch ) ;
268+
269+ await sut . PublishAsync (
270+ topicName ,
271+ new object [ ] { messageBody } ,
272+ sessionId ,
273+ properties ,
274+ timeToLive ,
275+ cancellationToken ) ;
276+
277+ _ = await sender
278+ . Received ( 1 )
279+ . CreateMessageBatchAsync ( cancellationToken ) ;
280+ }
281+
282+ [ Theory , AutoNSubstituteData ]
283+ internal async Task Should_Send_Message_On_ServiceBusSender_On_Message_Batch (
284+ [ Frozen ] IServiceBusSenderProvider provider ,
285+ ServiceBusPublisher sut ,
286+ [ Substitute ] ServiceBusSender sender ,
287+ string topicName ,
288+ object messageBody ,
289+ IDictionary < string , string > properties ,
290+ TimeSpan timeToLive ,
291+ string sessionId ,
292+ CancellationToken cancellationToken )
293+ {
294+ var batchList = new List < ServiceBusMessage > ( ) ;
295+ var messageBatch = ServiceBusModelFactory . ServiceBusMessageBatch ( 265000 , batchList ) ;
296+
297+ provider
298+ . GetSender ( default ! )
299+ . ReturnsForAnyArgs ( sender ) ;
300+
301+ sender
302+ . CreateMessageBatchAsync ( default ! )
303+ . ReturnsForAnyArgs ( messageBatch ) ;
304+
305+ await sut . PublishAsync (
306+ topicName ,
307+ new object [ ] { messageBody } ,
308+ sessionId ,
309+ properties ,
310+ timeToLive ,
311+ cancellationToken ) ;
312+
313+ var sendMessageBatch = sender
314+ . ReceivedCallWithArgument < ServiceBusMessageBatch > ( ) ;
315+
316+ sendMessageBatch . Count . Should ( ) . Be ( 1 ) ;
317+ batchList . Count . Should ( ) . Be ( 1 ) ;
318+
319+ batchList [ 0 ] . MessageId
320+ . Should ( )
321+ . NotBeNullOrEmpty ( ) ;
322+ batchList [ 0 ] . Body
323+ . ToString ( )
324+ . Should ( )
325+ . BeEquivalentTo ( JsonSerializer . Serialize ( messageBody ) ) ;
326+ batchList [ 0 ] . ApplicationProperties
327+ . Should ( )
328+ . BeEquivalentTo ( properties ) ;
329+ batchList [ 0 ] . TimeToLive
330+ . Should ( )
331+ . Be ( timeToLive ) ;
332+ }
333+
334+ [ Theory , AutoNSubstituteData ]
335+ internal async Task Should_Create_New_MessageBatch_When_First_Batch_Is_Full (
336+ [ Frozen ] IServiceBusSenderProvider provider ,
337+ ServiceBusPublisher sut ,
338+ [ Substitute ] ServiceBusSender sender ,
339+ string topicName ,
340+ object messageBody ,
341+ IDictionary < string , string > properties ,
342+ TimeSpan timeToLive ,
343+ string sessionId ,
344+ CancellationToken cancellationToken )
345+ {
346+ var firstBatchList = new List < ServiceBusMessage > ( ) ;
347+ var secondBatchList = new List < ServiceBusMessage > ( ) ;
348+ var firstMessageBatch = ServiceBusModelFactory . ServiceBusMessageBatch ( 265000 , firstBatchList , tryAddCallback : _ => false ) ;
349+ var secondMessageBatch = ServiceBusModelFactory . ServiceBusMessageBatch ( 265000 , secondBatchList ) ;
350+
351+ provider
352+ . GetSender ( default ! )
353+ . ReturnsForAnyArgs ( sender ) ;
354+
355+ sender
356+ . CreateMessageBatchAsync ( default ! )
357+ . ReturnsForAnyArgs ( firstMessageBatch , secondMessageBatch ) ;
358+
359+ await sut . PublishAsync (
360+ topicName ,
361+ new object [ ] { messageBody } ,
362+ sessionId ,
363+ properties ,
364+ timeToLive ,
365+ cancellationToken ) ;
366+
367+ _ = await sender
368+ . Received ( 2 )
369+ . CreateMessageBatchAsync ( cancellationToken ) ;
370+ }
371+
372+ [ Theory , AutoNSubstituteData ]
373+ internal async Task Should_Send_Multiple_Batches_If_When_Messages_Exceeds_Single_Batch (
374+ [ Frozen ] IServiceBusSenderProvider provider ,
375+ ServiceBusPublisher sut ,
376+ [ Substitute ] ServiceBusSender sender ,
377+ string topicName ,
378+ object messageBody ,
379+ IDictionary < string , string > properties ,
380+ TimeSpan timeToLive ,
381+ string sessionId ,
382+ CancellationToken cancellationToken )
383+ {
384+ var firstBatchList = new List < ServiceBusMessage > ( ) ;
385+ var secondBatchList = new List < ServiceBusMessage > ( ) ;
386+ var firstMessageBatch = ServiceBusModelFactory . ServiceBusMessageBatch ( 265000 , firstBatchList , tryAddCallback : _ => false ) ;
387+ var secondMessageBatch = ServiceBusModelFactory . ServiceBusMessageBatch ( 265000 , secondBatchList ) ;
388+
389+ provider
390+ . GetSender ( default ! )
391+ . ReturnsForAnyArgs ( sender ) ;
392+
393+ sender
394+ . CreateMessageBatchAsync ( default ! )
395+ . ReturnsForAnyArgs ( firstMessageBatch , secondMessageBatch ) ;
396+
397+ await sut . PublishAsync (
398+ topicName ,
399+ new object [ ] { messageBody } ,
400+ sessionId ,
401+ properties ,
402+ timeToLive ,
403+ cancellationToken ) ;
404+
405+ _ = sender
406+ . Received ( 1 )
407+ . SendMessagesAsync ( firstMessageBatch , cancellationToken ) ;
408+
409+ _ = sender
410+ . Received ( 1 )
411+ . SendMessagesAsync ( secondMessageBatch , cancellationToken ) ;
412+
413+ firstBatchList . Should ( ) . BeEmpty ( ) ;
414+ secondBatchList . Count . Should ( ) . Be ( 1 ) ;
415+
416+ secondBatchList [ 0 ] . MessageId
417+ . Should ( )
418+ . NotBeNullOrEmpty ( ) ;
419+ secondBatchList [ 0 ] . Body
420+ . ToString ( )
421+ . Should ( )
422+ . BeEquivalentTo ( JsonSerializer . Serialize ( messageBody ) ) ;
423+ secondBatchList [ 0 ] . ApplicationProperties
424+ . Should ( )
425+ . BeEquivalentTo ( properties ) ;
426+ secondBatchList [ 0 ] . TimeToLive
427+ . Should ( )
428+ . Be ( timeToLive ) ;
429+ }
430+
431+ [ Theory , AutoNSubstituteData ]
432+ internal Task Should_Throw_If_Message_Is_Too_Large_To_Fit_In_New_Batch (
433+ [ Frozen ] IServiceBusSenderProvider provider ,
434+ ServiceBusPublisher sut ,
435+ [ Substitute ] ServiceBusSender sender ,
436+ string topicName ,
437+ object messageBody ,
438+ IDictionary < string , string > properties ,
439+ TimeSpan timeToLive ,
440+ string sessionId ,
441+ CancellationToken cancellationToken )
442+ {
443+ var firstBatchList = new List < ServiceBusMessage > ( ) ;
444+ var secondBatchList = new List < ServiceBusMessage > ( ) ;
445+ var firstMessageBatch = ServiceBusModelFactory . ServiceBusMessageBatch ( 265000 , firstBatchList , tryAddCallback : _ => false ) ;
446+ var secondMessageBatch = ServiceBusModelFactory . ServiceBusMessageBatch ( 265000 , secondBatchList , tryAddCallback : _ => false ) ;
447+
448+ provider
449+ . GetSender ( default ! )
450+ . ReturnsForAnyArgs ( sender ) ;
451+
452+ sender
453+ . CreateMessageBatchAsync ( default ! )
454+ . ReturnsForAnyArgs ( firstMessageBatch , secondMessageBatch ) ;
455+
456+ var act = async ( ) => await sut . PublishAsync (
457+ topicName ,
458+ new object [ ] { messageBody } ,
459+ sessionId ,
460+ properties ,
461+ timeToLive ,
462+ cancellationToken ) ;
463+
464+ return act . Should ( ) . ThrowAsync < InvalidOperationException > ( ) ;
465+ }
211466}
0 commit comments