Skip to content

Commit a23a511

Browse files
Merge pull request #4 from Anders-Toegersen/feature/added-schedule-publish-for-servicebus
Added Scheduled Publish for ServiceBus
2 parents d91ea78 + c6e3a4c commit a23a511

File tree

3 files changed

+177
-17
lines changed

3 files changed

+177
-17
lines changed

src/Atc.Azure.Messaging/ServiceBus/IServiceBusPublisher.cs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,53 @@ namespace Atc.Azure.Messaging.ServiceBus;
55
/// </summary>
66
public interface IServiceBusPublisher
77
{
8+
/// <summary>
9+
/// Publishes a message.
10+
/// </summary>
11+
/// <param name="topicOrQueue">The topic or queue name.</param>
12+
/// <param name="message">The message to be published.</param>
13+
/// <param name="sessionId">Optional id for appending the message to a known session. If not set, then defaults to a new session.</param>
14+
/// <param name="properties">Optional custom metadata about the message.</param>
15+
/// <param name="timeToLive">Optional <see cref="TimeSpan"/> for message to be consumed. If not set, then defaults to the value specified on queue or topic.</param>
16+
/// <param name="cancellationToken">The <see cref="CancellationToken"/> used.</param>
17+
/// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns>
818
Task PublishAsync(
919
string topicOrQueue,
1020
object message,
1121
string? sessionId = null,
1222
IDictionary<string, string>? properties = null,
1323
TimeSpan? timeToLive = null,
24+
CancellationToken cancellationToken = default);
25+
26+
/// <summary>
27+
/// Schedules a message for publishing at a later time.
28+
/// </summary>
29+
/// <param name="topicOrQueue">The topic or queue name.</param>
30+
/// <param name="message">The message to be published.</param>
31+
/// <param name="scheduledEnqueueTime">The time for the message to be published.</param>
32+
/// <param name="sessionId">Optional id for appending the message to a known session. If not set, then defaults to a new session.</param>
33+
/// <param name="properties">Optional custom metadata about the message.</param>
34+
/// <param name="timeToLive">Optional <see cref="TimeSpan"/> for message to be consumed. If not set, then defaults to the value specified on queue or topic.</param>
35+
/// <param name="cancellationToken">The <see cref="CancellationToken"/> used.</param>
36+
/// <returns>A <see cref="Task"/> containing the sequence number of the scheduled message.</returns>
37+
Task<long> SchedulePublishAsync(
38+
string topicOrQueue,
39+
object message,
40+
DateTimeOffset scheduledEnqueueTime,
41+
string? sessionId = null,
42+
IDictionary<string, string>? properties = null,
43+
TimeSpan? timeToLive = null,
44+
CancellationToken cancellationToken = default);
45+
46+
/// <summary>
47+
/// Cansels a scheduled publish of a message if it has not been published yet.
48+
/// </summary>
49+
/// <param name="topicOrQueue">The topic or queue name.</param>
50+
/// <param name="sequenceNumber">The sequence number of the scheduled message to cancel.</param>
51+
/// <param name="cancellationToken">The <see cref="CancellationToken"/> used.</param>
52+
/// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns>
53+
Task CancelScheduledPublishAsync(
54+
string topicOrQueue,
55+
long sequenceNumber,
1456
CancellationToken cancellationToken = default);
1557
}

src/Atc.Azure.Messaging/ServiceBus/ServiceBusPublisher.cs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,39 @@ public Task PublishAsync(
2626
properties,
2727
timeToLive),
2828
cancellationToken);
29+
}
30+
31+
public Task<long> SchedulePublishAsync(
32+
string topicOrQueue,
33+
object message,
34+
DateTimeOffset scheduledEnqueueTime,
35+
string? sessionId = null,
36+
IDictionary<string, string>? properties = null,
37+
TimeSpan? timeToLive = null,
38+
CancellationToken cancellationToken = default)
39+
{
40+
return clientProvider
41+
.GetSender(topicOrQueue)
42+
.ScheduleMessageAsync(
43+
CreateServiceBusMessage(
44+
sessionId,
45+
JsonSerializer.Serialize(message),
46+
properties,
47+
timeToLive),
48+
scheduledEnqueueTime,
49+
cancellationToken);
50+
}
51+
52+
public Task CancelScheduledPublishAsync(
53+
string topicOrQueue,
54+
long sequenceNumber,
55+
CancellationToken cancellationToken = default)
56+
{
57+
return clientProvider
58+
.GetSender(topicOrQueue)
59+
.CancelScheduledMessageAsync(
60+
sequenceNumber,
61+
cancellationToken);
2962
}
3063

3164
private static ServiceBusMessage CreateServiceBusMessage(

test/Atc.Azure.Messaging.Tests/ServiceBus/ServiceBusPublisherTests.cs

Lines changed: 102 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -75,21 +75,106 @@ await sut.PublishAsync(
7575
.Should()
7676
.BeEquivalentTo(properties);
7777
message.TimeToLive
78-
.Should()
79-
.Be(timeToLive);
80-
}
81-
82-
[Theory, AutoNSubstituteData]
83-
internal async Task Should_Handle_Default_Parameters(
84-
[Frozen] IServiceBusSenderProvider provider,
85-
ServiceBusPublisher sut,
86-
[Substitute] ServiceBusSender sender,
87-
string topicName,
88-
object messageBody,
89-
string sessionId)
90-
{
91-
provider
92-
.GetSender(topicName)
78+
.Should()
79+
.Be(timeToLive);
80+
}
81+
82+
[Theory, AutoNSubstituteData]
83+
internal async Task Should_Schedule_Message_On_ServiceBusSender(
84+
[Frozen] IServiceBusSenderProvider provider,
85+
ServiceBusPublisher sut,
86+
[Substitute] ServiceBusSender sender,
87+
long expectedSequenceNumber,
88+
string topicName,
89+
object messageBody,
90+
DateTimeOffset scheduleTime,
91+
IDictionary<string, string> properties,
92+
TimeSpan timeToLive,
93+
string sessionId,
94+
CancellationToken cancellationToken)
95+
{
96+
provider
97+
.GetSender(topicName)
98+
.Returns(sender);
99+
100+
sender
101+
.ScheduleMessageAsync(default!, default, default)
102+
.ReturnsForAnyArgs(expectedSequenceNumber);
103+
104+
var actualSequenceNumber = await sut.SchedulePublishAsync(
105+
topicName,
106+
messageBody,
107+
scheduleTime,
108+
sessionId,
109+
properties,
110+
timeToLive,
111+
cancellationToken);
112+
113+
_ = sender
114+
.Received(1)
115+
.ScheduleMessageAsync(
116+
Arg.Any<ServiceBusMessage>(),
117+
scheduleTime,
118+
cancellationToken);
119+
120+
actualSequenceNumber
121+
.Should()
122+
.Be(expectedSequenceNumber);
123+
124+
var message = sender
125+
.ReceivedCallWithArgument<ServiceBusMessage>();
126+
127+
message.MessageId
128+
.Should()
129+
.NotBeNullOrEmpty();
130+
message.Body
131+
.ToString()
132+
.Should()
133+
.BeEquivalentTo(JsonSerializer.Serialize(messageBody));
134+
message.ApplicationProperties
135+
.Should()
136+
.BeEquivalentTo(properties);
137+
message.TimeToLive
138+
.Should()
139+
.Be(timeToLive);
140+
}
141+
142+
[Theory, AutoNSubstituteData]
143+
internal async Task Should_Cancel_Scheduled_Message_On_ServiceBusSender(
144+
[Frozen] IServiceBusSenderProvider provider,
145+
ServiceBusPublisher sut,
146+
[Substitute] ServiceBusSender sender,
147+
long sequenceNumber,
148+
string topicName,
149+
CancellationToken cancellationToken)
150+
{
151+
provider
152+
.GetSender(topicName)
153+
.Returns(sender);
154+
155+
await sut.CancelScheduledPublishAsync(
156+
topicName,
157+
sequenceNumber,
158+
cancellationToken);
159+
160+
_ = sender
161+
.Received(1)
162+
.CancelScheduledMessageAsync(
163+
sequenceNumber,
164+
cancellationToken);
165+
}
166+
167+
[Theory, AutoNSubstituteData]
168+
internal async Task Should_Handle_Default_Parameters(
169+
[Frozen] IServiceBusSenderProvider provider,
170+
ServiceBusPublisher sut,
171+
[Substitute] ServiceBusSender sender,
172+
string topicName,
173+
object messageBody,
174+
string sessionId)
175+
{
176+
provider
177+
.GetSender(topicName)
93178
.Returns(sender);
94179

95180
await sut.PublishAsync(
@@ -120,7 +205,7 @@ await sut.PublishAsync(
120205
.Should()
121206
.BeEmpty();
122207
message.TimeToLive
123-
.Should()
124-
.Be(TimeSpan.MaxValue);
208+
.Should()
209+
.Be(TimeSpan.MaxValue);
125210
}
126211
}

0 commit comments

Comments
 (0)