Skip to content

Commit 6e4c8ea

Browse files
Cross entity transactions (Azure#18402)
* cross entity txn * Add processor support and address pr fb * Export API * PR FB * Export API
1 parent ac67a38 commit 6e4c8ea

33 files changed

+1186
-121
lines changed

sdk/servicebus/Azure.Messaging.ServiceBus/MigrationGuide.md

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ We assume that you are familiar with the `Microsoft.Azure.ServiceBus` library. I
1414
- [Sending messages](#sending-messages)
1515
- [Receiving messages](#receiving-messages)
1616
- [Working with sessions](#working-with-sessions)
17+
- [Cross-entity transactions](#cross-entity-transactions)
1718
- [Known gaps](#known-gaps-from-previous-library)
1819
- [Additional samples](#additional-samples)
1920

@@ -284,7 +285,7 @@ Previously, in `Microsoft.Azure.ServiceBus`, you had the below options to receiv
284285

285286
While the first option is similar to what you would do in a non-session scenario, the second that allows you finer-grained control is very different from any other pattern used in the library.
286287

287-
Now in `Azure.Messaging.ServiceBus`, we simplfify this by giving session variants of the same methods and classes that are available when working with queues/subscriptions that do not have sessions enabled.
288+
Now in `Azure.Messaging.ServiceBus`, we simplify this by giving session variants of the same methods and classes that are available when working with queues/subscriptions that do not have sessions enabled.
288289

289290
The below code snippet shows you the session variation of the `ServiceBusProcessor`.
290291

@@ -367,6 +368,49 @@ ServiceBusReceivedMessage receivedMessage = await receiver.ReceiveMessageAsync()
367368
Console.WriteLine(receivedMessage.SessionId);
368369
```
369370

371+
### Cross-Entity transactions (Unreleased)
372+
373+
Previously, in `Microsoft.Azure.ServiceBus`, when performing a transaction that spanned multiple queues, topics, or subscriptions you would need to use the "Send-Via" option
374+
in the `MessageSender`.
375+
376+
Now in `Azure.Messaging.ServiceBus`, there is a `TransactionGroup` property on the sender, receiver, and processor options bags. This property is used to group senders, receivers, and processors that will be involved in entities that span transactions. When using this property, the first operation that occurs among this group implicitly becomes the send-via entity. Because of this, subsequent operations must either be by senders, or if they are by receivers, the receiver must be receiving from the send-via entity. For this reason, it probably makes more sense to have your first operation be a receive rather than a send when using transaction groups.
377+
378+
The below code snippet shows you how to use transaction groups.
379+
380+
```C# Snippet:ServiceBusTransactionGroup
381+
// The first sender won't be part of our transaction group.
382+
ServiceBusSender senderA = client.CreateSender(queueA.QueueName);
383+
384+
string transactionGroup = "myTxn";
385+
386+
ServiceBusReceiver receiverA = client.CreateReceiver(queueA.QueueName, new ServiceBusReceiverOptions
387+
{
388+
TransactionGroup = transactionGroup
389+
});
390+
ServiceBusSender senderB = client.CreateSender(queueB.QueueName, new ServiceBusSenderOptions
391+
{
392+
TransactionGroup = transactionGroup
393+
});
394+
ServiceBusSender senderC = client.CreateSender(topicC.TopicName, new ServiceBusSenderOptions
395+
{
396+
TransactionGroup = transactionGroup
397+
});
398+
399+
var message = new ServiceBusMessage();
400+
401+
await senderA.SendMessageAsync(message);
402+
403+
ServiceBusReceivedMessage receivedMessage = await receiverA.ReceiveMessageAsync();
404+
405+
using (var ts = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled))
406+
{
407+
await receiverA.CompleteMessageAsync(receivedMessage);
408+
await senderB.SendMessageAsync(message);
409+
await senderC.SendMessageAsync(message);
410+
ts.Complete();
411+
}
412+
```
413+
370414
## Known Gaps from Previous Library
371415
There are a few features that are yet to be implemented in `Azure.Messaging.ServiceBus`, but were present in the previous library `Microsoft.Azure.ServiceBus`. The plan is to add these features in upcoming releases (unless otherwise noted), but they will not be available in the version 7.0.0:
372416
- **Cross entity transactions** - In the previous library, Microsoft.Azure.ServiceBus, transactions could work across multiple entities by leveraging the [`viaEntityPath`](https://github.com/Azure/azure-sdk-for-net/blob/master/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageSender.cs#L118) parameter of the `MessageSender` constructor. The service is planning to make backward compatible updates that would make it possible to do cross-entity transactions without specificing a "via" entity. The new library plans to take advantage of this new feature and therefore, the cross entity transactions feature will be available in the upcoming release instead of the current version 7.0.0. Support for this feature can be tracked via https://github.com/Azure/azure-sdk-for-net/issues/17355.

sdk/servicebus/Azure.Messaging.ServiceBus/api/Azure.Messaging.ServiceBus.netstandard2.0.cs

100755100644
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,7 @@ public ServiceBusMessage(string body) { }
168168
public string Subject { get { throw null; } set { } }
169169
public System.TimeSpan TimeToLive { get { throw null; } set { } }
170170
public string To { get { throw null; } set { } }
171+
public string TransactionPartitionKey { get { throw null; } set { } }
171172
public Azure.Core.Amqp.AmqpAnnotatedMessage GetRawAmqpMessage() { throw null; }
172173
public override string ToString() { throw null; }
173174
}
@@ -201,6 +202,7 @@ protected ServiceBusProcessor() { }
201202
public virtual int MaxConcurrentCalls { get { throw null; } }
202203
public virtual int PrefetchCount { get { throw null; } }
203204
public virtual Azure.Messaging.ServiceBus.ServiceBusReceiveMode ReceiveMode { get { throw null; } }
205+
public virtual string TransactionGroup { get { throw null; } }
204206
public event System.Func<Azure.Messaging.ServiceBus.ProcessErrorEventArgs, System.Threading.Tasks.Task> ProcessErrorAsync { add { } remove { } }
205207
public event System.Func<Azure.Messaging.ServiceBus.ProcessMessageEventArgs, System.Threading.Tasks.Task> ProcessMessageAsync { add { } remove { } }
206208
public virtual System.Threading.Tasks.Task CloseAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
@@ -222,6 +224,7 @@ public ServiceBusProcessorOptions() { }
222224
public int MaxConcurrentCalls { get { throw null; } set { } }
223225
public int PrefetchCount { get { throw null; } set { } }
224226
public Azure.Messaging.ServiceBus.ServiceBusReceiveMode ReceiveMode { get { throw null; } set { } }
227+
public string TransactionGroup { get { throw null; } set { } }
225228
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
226229
public override bool Equals(object obj) { throw null; }
227230
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
@@ -255,6 +258,7 @@ internal ServiceBusReceivedMessage() { }
255258
public string Subject { get { throw null; } }
256259
public System.TimeSpan TimeToLive { get { throw null; } }
257260
public string To { get { throw null; } }
261+
public string TransactionPartitionKey { get { throw null; } }
258262
public Azure.Core.Amqp.AmqpAnnotatedMessage GetRawAmqpMessage() { throw null; }
259263
public override string ToString() { throw null; }
260264
}
@@ -271,6 +275,7 @@ protected ServiceBusReceiver() { }
271275
public virtual bool IsClosed { get { throw null; } }
272276
public virtual int PrefetchCount { get { throw null; } }
273277
public virtual Azure.Messaging.ServiceBus.ServiceBusReceiveMode ReceiveMode { get { throw null; } }
278+
public virtual string TransactionGroup { get { throw null; } }
274279
public virtual System.Threading.Tasks.Task AbandonMessageAsync(Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, System.Collections.Generic.IDictionary<string, object> propertiesToModify = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
275280
public virtual System.Threading.Tasks.Task CloseAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
276281
public virtual System.Threading.Tasks.Task CompleteMessageAsync(Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
@@ -299,6 +304,7 @@ public ServiceBusReceiverOptions() { }
299304
public int PrefetchCount { get { throw null; } set { } }
300305
public Azure.Messaging.ServiceBus.ServiceBusReceiveMode ReceiveMode { get { throw null; } set { } }
301306
public Azure.Messaging.ServiceBus.SubQueue SubQueue { get { throw null; } set { } }
307+
public string TransactionGroup { get { throw null; } set { } }
302308
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
303309
public override bool Equals(object obj) { throw null; }
304310
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
@@ -339,6 +345,7 @@ protected ServiceBusSender() { }
339345
public virtual string EntityPath { get { throw null; } }
340346
public virtual string FullyQualifiedNamespace { get { throw null; } }
341347
public virtual bool IsClosed { get { throw null; } }
348+
public virtual string TransactionGroup { get { throw null; } }
342349
public virtual System.Threading.Tasks.Task CancelScheduledMessageAsync(long sequenceNumber, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
343350
public virtual System.Threading.Tasks.Task CancelScheduledMessagesAsync(System.Collections.Generic.IEnumerable<long> sequenceNumbers, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
344351
public virtual System.Threading.Tasks.Task CloseAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
@@ -357,6 +364,17 @@ protected ServiceBusSender() { }
357364
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
358365
public override string ToString() { throw null; }
359366
}
367+
public partial class ServiceBusSenderOptions
368+
{
369+
public ServiceBusSenderOptions() { }
370+
public string TransactionGroup { get { throw null; } set { } }
371+
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
372+
public override bool Equals(object obj) { throw null; }
373+
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
374+
public override int GetHashCode() { throw null; }
375+
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
376+
public override string ToString() { throw null; }
377+
}
360378
public partial class ServiceBusSessionProcessor : System.IAsyncDisposable
361379
{
362380
protected ServiceBusSessionProcessor() { }
@@ -371,6 +389,7 @@ protected ServiceBusSessionProcessor() { }
371389
public virtual int PrefetchCount { get { throw null; } }
372390
public virtual Azure.Messaging.ServiceBus.ServiceBusReceiveMode ReceiveMode { get { throw null; } }
373391
public virtual System.TimeSpan? SessionIdleTimeout { get { throw null; } }
392+
public virtual string TransactionGroup { get { throw null; } }
374393
public event System.Func<Azure.Messaging.ServiceBus.ProcessErrorEventArgs, System.Threading.Tasks.Task> ProcessErrorAsync { add { } remove { } }
375394
public event System.Func<Azure.Messaging.ServiceBus.ProcessSessionMessageEventArgs, System.Threading.Tasks.Task> ProcessMessageAsync { add { } remove { } }
376395
public event System.Func<Azure.Messaging.ServiceBus.ProcessSessionEventArgs, System.Threading.Tasks.Task> SessionClosingAsync { add { } remove { } }
@@ -397,6 +416,7 @@ public ServiceBusSessionProcessorOptions() { }
397416
public Azure.Messaging.ServiceBus.ServiceBusReceiveMode ReceiveMode { get { throw null; } set { } }
398417
public System.TimeSpan? SessionIdleTimeout { get { throw null; } set { } }
399418
public System.Collections.Generic.IList<string> SessionIds { get { throw null; } }
419+
public string TransactionGroup { get { throw null; } set { } }
400420
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
401421
public override bool Equals(object obj) { throw null; }
402422
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
@@ -418,6 +438,7 @@ public partial class ServiceBusSessionReceiverOptions
418438
public ServiceBusSessionReceiverOptions() { }
419439
public int PrefetchCount { get { throw null; } set { } }
420440
public Azure.Messaging.ServiceBus.ServiceBusReceiveMode ReceiveMode { get { throw null; } set { } }
441+
public string TransactionGroup { get { throw null; } set { } }
421442
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
422443
public override bool Equals(object obj) { throw null; }
423444
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]

sdk/servicebus/Azure.Messaging.ServiceBus/samples/Sample06_Transactions.md

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,84 @@ using (var ts = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled))
4444
}
4545
```
4646

47+
### Transactions across entities (Unreleased)
48+
49+
When creating senders and receivers that should be part of a cross-entity transaction, you can specify the same `TransactionGroup` string in the options as shown below. When using transaction groups, the first entity that an operation occurs on becomes the entity through which all subsequent sends will be routed through. This enables the service to perform a transaction that is meant to span multiple entities. This means that subsequent entities that perform their first operation need to either be senders, or if they are receivers they need to be on the same entity as the initial entity through which all sends are routed through (otherwise the service would not be able to ensure that the transaction is committed because it cannot route a receive operation through a different entity). For instance, if you have SenderA and ReceiverB that are part of the same transaction group, you would need to receive first with ReceiverB to allow this to work. If you first used SenderA to send to QueueA, and then attempted to receive from QueueB, an `InvalidOperationException` would be thrown. You could still add a ReceiverA to the same transaction group after initially sending to SenderA, since they are both using the same queue. This would be useful if you also had a SenderB that you want to include as part of the transaction group (otherwise there would be no need to use transaction groups as you would be dealing with only one entity).
50+
51+
```C# Snippet:ServiceBusTransactionGroup
52+
// The first sender won't be part of our transaction group.
53+
ServiceBusSender senderA = client.CreateSender(queueA.QueueName);
54+
55+
string transactionGroup = "myTxn";
56+
57+
ServiceBusReceiver receiverA = client.CreateReceiver(queueA.QueueName, new ServiceBusReceiverOptions
58+
{
59+
TransactionGroup = transactionGroup
60+
});
61+
ServiceBusSender senderB = client.CreateSender(queueB.QueueName, new ServiceBusSenderOptions
62+
{
63+
TransactionGroup = transactionGroup
64+
});
65+
ServiceBusSender senderC = client.CreateSender(topicC.TopicName, new ServiceBusSenderOptions
66+
{
67+
TransactionGroup = transactionGroup
68+
});
69+
70+
var message = new ServiceBusMessage();
71+
72+
await senderA.SendMessageAsync(message);
73+
74+
ServiceBusReceivedMessage receivedMessage = await receiverA.ReceiveMessageAsync();
75+
76+
using (var ts = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled))
77+
{
78+
await receiverA.CompleteMessageAsync(receivedMessage);
79+
await senderB.SendMessageAsync(message);
80+
await senderC.SendMessageAsync(message);
81+
ts.Complete();
82+
}
83+
```
84+
In the following snippet, we will look at an incorrect ordering that would cause an `InvalidOperationException` to be thrown.
85+
Bad:
86+
```C# Snippet:ServiceBusTransactionGroupWrongOrder
87+
// The first sender won't be part of our transaction group.
88+
ServiceBusSender senderA = client.CreateSender(queueA.QueueName);
89+
90+
string transactionGroup = "myTxn";
91+
92+
ServiceBusReceiver receiverA = client.CreateReceiver(queueA.QueueName, new ServiceBusReceiverOptions
93+
{
94+
TransactionGroup = transactionGroup
95+
});
96+
ServiceBusSender senderB = client.CreateSender(queueB.QueueName, new ServiceBusSenderOptions
97+
{
98+
TransactionGroup = transactionGroup
99+
});
100+
ServiceBusSender senderC = client.CreateSender(topicC.TopicName, new ServiceBusSenderOptions
101+
{
102+
TransactionGroup = transactionGroup
103+
});
104+
105+
var message = new ServiceBusMessage();
106+
107+
// SenderB becomes the entity through which subsequent "sends" are routed through.
108+
await senderB.SendMessageAsync(message);
109+
110+
await senderA.SendMessageAsync(message);
111+
112+
ServiceBusReceivedMessage receivedMessage = await receiverA.ReceiveMessageAsync();
113+
114+
using (var ts = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled))
115+
{
116+
// This will through an InvalidOperationException because a "receive" cannot be
117+
// routed through a different entity.
118+
await receiverA.CompleteMessageAsync(receivedMessage);
119+
await senderB.SendMessageAsync(message);
120+
await senderC.SendMessageAsync(message);
121+
ts.Complete();
122+
}
123+
```
124+
47125
## Source
48126

49127
To see the full example source, see:

sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpClient.cs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -109,12 +109,14 @@ internal AmqpClient(
109109
/// <param name="entityPath">The entity path to send the message to.</param>
110110
/// <param name="retryPolicy">The policy which governs retry behavior and try timeouts.</param>
111111
/// <param name="identifier">The identifier for the sender.</param>
112+
/// <param name="transactionGroup"></param>
112113
///
113114
/// <returns>A <see cref="TransportSender"/> configured in the requested manner.</returns>
114115
public override TransportSender CreateSender(
115116
string entityPath,
116117
ServiceBusRetryPolicy retryPolicy,
117-
string identifier)
118+
string identifier,
119+
string transactionGroup)
118120
{
119121
Argument.AssertNotDisposed(_closed, nameof(AmqpClient));
120122

@@ -123,7 +125,8 @@ public override TransportSender CreateSender(
123125
entityPath,
124126
ConnectionScope,
125127
retryPolicy,
126-
identifier
128+
identifier,
129+
transactionGroup
127130
);
128131
}
129132

@@ -139,6 +142,7 @@ public override TransportSender CreateSender(
139142
/// <param name="identifier"></param>
140143
/// <param name="sessionId"></param>
141144
/// <param name="isSessionReceiver"></param>
145+
/// <param name="transactionGroup"></param>
142146
///
143147
/// <returns>A <see cref="TransportReceiver" /> configured in the requested manner.</returns>
144148
///
@@ -149,7 +153,8 @@ public override TransportReceiver CreateReceiver(
149153
uint prefetchCount,
150154
string identifier,
151155
string sessionId,
152-
bool isSessionReceiver)
156+
bool isSessionReceiver,
157+
string transactionGroup)
153158
{
154159
Argument.AssertNotDisposed(_closed, nameof(AmqpClient));
155160

@@ -162,7 +167,8 @@ public override TransportReceiver CreateReceiver(
162167
retryPolicy,
163168
identifier,
164169
sessionId,
165-
isSessionReceiver
170+
isSessionReceiver,
171+
transactionGroup
166172
);
167173
}
168174

0 commit comments

Comments
 (0)