Skip to content

Commit 22cb7ad

Browse files
Tracing batch fix (Azure#19091)
* Instrument messages when adding to batch * Fix tests * Add docs
1 parent 4128795 commit 22cb7ad

File tree

8 files changed

+148
-97
lines changed

8 files changed

+148
-97
lines changed

sdk/servicebus/Azure.Messaging.ServiceBus/src/Diagnostics/EntityScopeFactory.cs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
// Licensed under the MIT License.
33

44
using System.Collections.Generic;
5-
using System.Threading.Tasks;
5+
using System.Diagnostics;
66
using Azure.Core.Pipeline;
77

88
namespace Azure.Messaging.ServiceBus.Diagnostics
@@ -71,5 +71,22 @@ public DiagnosticScope CreateScope(
7171
scope.AddAttribute(DiagnosticProperty.EndpointAttribute, _fullyQualifiedNamespace);
7272
return scope;
7373
}
74+
75+
public void InstrumentMessage(ServiceBusMessage message)
76+
{
77+
if (!message.ApplicationProperties.ContainsKey(DiagnosticProperty.DiagnosticIdAttribute))
78+
{
79+
using DiagnosticScope messageScope = CreateScope(
80+
DiagnosticProperty.MessageActivityName,
81+
DiagnosticProperty.ProducerKind);
82+
messageScope.Start();
83+
84+
Activity activity = Activity.Current;
85+
if (activity != null)
86+
{
87+
message.ApplicationProperties[DiagnosticProperty.DiagnosticIdAttribute] = activity.Id;
88+
}
89+
}
90+
}
7491
}
7592
}

sdk/servicebus/Azure.Messaging.ServiceBus/src/Primitives/ServiceBusModelFactory.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,11 @@
33

44
using System;
55
using System.Collections.Generic;
6-
using System.ComponentModel;
76
using Azure.Core.Amqp;
87
using Azure.Messaging.ServiceBus.Amqp;
98
using Azure.Messaging.ServiceBus.Administration;
109
using Azure.Messaging.ServiceBus.Core;
10+
using Azure.Messaging.ServiceBus.Diagnostics;
1111

1212
namespace Azure.Messaging.ServiceBus
1313
{
@@ -237,7 +237,7 @@ public static ServiceBusMessageBatch ServiceBusMessageBatch(long batchSizeBytes,
237237
batchOptions.MaxSizeInBytes ??= long.MaxValue;
238238

239239
var transportBatch = new ListTransportBatch(batchOptions.MaxSizeInBytes.Value, batchSizeBytes, batchMessageStore, tryAddCallback);
240-
return new ServiceBusMessageBatch(transportBatch);
240+
return new ServiceBusMessageBatch(transportBatch, new EntityScopeFactory("mock", "mock"));
241241
}
242242

243243
/// <summary>

sdk/servicebus/Azure.Messaging.ServiceBus/src/Sender/ServiceBusMessageBatch.cs

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
using System.Collections.Generic;
66
using Azure.Core;
77
using Azure.Messaging.ServiceBus.Core;
8+
using Azure.Messaging.ServiceBus.Diagnostics;
89

910
namespace Azure.Messaging.ServiceBus
1011
{
@@ -29,32 +30,35 @@ public sealed class ServiceBusMessageBatch : IDisposable
2930
/// well as any overhead for the batch itself when sent to the Queue/Topic.
3031
/// </summary>
3132
///
32-
public long MaxSizeInBytes => InnerBatch.MaxSizeInBytes;
33+
public long MaxSizeInBytes => _innerBatch.MaxSizeInBytes;
3334

3435
/// <summary>
3536
/// The size of the batch, in bytes, as it will be sent to the Queue/Topic.
3637
/// </summary>
3738
///
38-
public long SizeInBytes => InnerBatch.SizeInBytes;
39+
public long SizeInBytes => _innerBatch.SizeInBytes;
3940

4041
/// <summary>
4142
/// The count of messages contained in the batch.
4243
/// </summary>
4344
///
44-
public int Count => InnerBatch.Count;
45+
public int Count => _innerBatch.Count;
4546

4647
/// <summary>
4748
/// The transport-specific batch responsible for performing the batch operations
4849
/// in a manner compatible with the associated <see cref="TransportSender" />.
4950
/// </summary>
5051
///
51-
private TransportMessageBatch InnerBatch { get; }
52+
private readonly TransportMessageBatch _innerBatch;
53+
54+
private readonly EntityScopeFactory _scopeFactory;
5255

5356
/// <summary>
5457
/// Initializes a new instance of the <see cref="ServiceBusMessageBatch"/> class.
5558
/// </summary>
5659
///
5760
/// <param name="transportBatch">The transport-specific batch responsible for performing the batch operations.</param>
61+
/// <param name="entityScope">The entity scope used for instrumentation.</param>
5862
///
5963
/// <remarks>
6064
/// As an internal type, this class performs only basic sanity checks against its arguments. It
@@ -65,15 +69,19 @@ public sealed class ServiceBusMessageBatch : IDisposable
6569
/// caller.
6670
/// </remarks>
6771
///
68-
internal ServiceBusMessageBatch(TransportMessageBatch transportBatch)
72+
internal ServiceBusMessageBatch(TransportMessageBatch transportBatch, EntityScopeFactory entityScope)
6973
{
7074
Argument.AssertNotNull(transportBatch, nameof(transportBatch));
71-
InnerBatch = transportBatch;
75+
_innerBatch = transportBatch;
76+
_scopeFactory = entityScope;
7277
}
7378

7479
/// <summary>
7580
/// Attempts to add a message to the batch, ensuring that the size
76-
/// of the batch does not exceed its maximum.
81+
/// of the batch does not exceed its maximum. If the message is modified
82+
/// after being added to the batch, the batch will fail to send if the modification
83+
/// caused the batch to exceed the maximum allowable size. Therefore it is best
84+
/// to not modify a message after adding it to the batch.
7785
/// </summary>
7886
///
7987
/// <param name="message">The message to attempt to add to the batch.</param>
@@ -91,7 +99,9 @@ public bool TryAddMessage(ServiceBusMessage message)
9199
lock (_syncGuard)
92100
{
93101
AssertNotLocked();
94-
return InnerBatch.TryAddMessage(message);
102+
103+
_scopeFactory.InstrumentMessage(message);
104+
return _innerBatch.TryAddMessage(message);
95105
}
96106
}
97107

@@ -104,7 +114,7 @@ public void Dispose()
104114
lock (_syncGuard)
105115
{
106116
AssertNotLocked();
107-
InnerBatch.Dispose();
117+
_innerBatch.Dispose();
108118
}
109119
}
110120

@@ -118,7 +128,7 @@ internal void Clear()
118128
lock (_syncGuard)
119129
{
120130
AssertNotLocked();
121-
InnerBatch.Clear();
131+
_innerBatch.Clear();
122132
}
123133
}
124134

@@ -130,7 +140,7 @@ internal void Clear()
130140
///
131141
/// <returns>The set of messages as an enumerable of the requested type.</returns>
132142
///
133-
internal IEnumerable<T> AsEnumerable<T>() => InnerBatch.AsEnumerable<T>();
143+
internal IEnumerable<T> AsEnumerable<T>() => _innerBatch.AsEnumerable<T>();
134144

135145
/// <summary>
136146
/// Locks the batch to prevent new messages from being added while a service

sdk/servicebus/Azure.Messaging.ServiceBus/src/Sender/ServiceBusSender.cs

Lines changed: 5 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,10 @@ private async Task ApplyPlugins(IReadOnlyList<ServiceBusMessage> messages)
246246

247247
private DiagnosticScope CreateDiagnosticScope(IEnumerable<ServiceBusMessage> messages, string activityName)
248248
{
249-
InstrumentMessages(messages);
249+
foreach (ServiceBusMessage message in messages)
250+
{
251+
_scopeFactory.InstrumentMessage(message);
252+
}
250253

251254
// create a new scope for the specified operation
252255
DiagnosticScope scope = _scopeFactory.CreateScope(
@@ -257,32 +260,6 @@ private DiagnosticScope CreateDiagnosticScope(IEnumerable<ServiceBusMessage> mes
257260
return scope;
258261
}
259262

260-
/// <summary>
261-
/// Performs the actions needed to instrument a set of messages.
262-
/// </summary>
263-
///
264-
/// <param name="messages">The messages to instrument.</param>
265-
///
266-
private void InstrumentMessages(IEnumerable<ServiceBusMessage> messages)
267-
{
268-
foreach (ServiceBusMessage message in messages)
269-
{
270-
if (!message.ApplicationProperties.ContainsKey(DiagnosticProperty.DiagnosticIdAttribute))
271-
{
272-
using DiagnosticScope messageScope = _scopeFactory.CreateScope(
273-
DiagnosticProperty.MessageActivityName,
274-
DiagnosticProperty.ProducerKind);
275-
messageScope.Start();
276-
277-
Activity activity = Activity.Current;
278-
if (activity != null)
279-
{
280-
message.ApplicationProperties[DiagnosticProperty.DiagnosticIdAttribute] = activity.Id;
281-
}
282-
}
283-
}
284-
}
285-
286263
/// <summary>
287264
/// Creates a size-constraint batch to which <see cref="ServiceBusMessage" /> may be added using
288265
/// a <see cref="ServiceBusMessageBatch.TryAddMessage"/>. If a message would exceed the maximum
@@ -329,7 +306,7 @@ public virtual async ValueTask<ServiceBusMessageBatch> CreateMessageBatchAsync(
329306
try
330307
{
331308
TransportMessageBatch transportBatch = await _innerSender.CreateMessageBatchAsync(options, cancellationToken).ConfigureAwait(false);
332-
batch = new ServiceBusMessageBatch(transportBatch);
309+
batch = new ServiceBusMessageBatch(transportBatch, _scopeFactory);
333310
}
334311
catch (Exception ex)
335312
{

sdk/servicebus/Azure.Messaging.ServiceBus/tests/Sender/SenderLiveTests.cs

Lines changed: 0 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,6 @@
66
using System.Linq;
77
using System.Net;
88
using System.Threading.Tasks;
9-
using Azure.Messaging.ServiceBus;
10-
using Azure.Messaging.ServiceBus.Tests;
119
using NUnit.Framework;
1210

1311
namespace Azure.Messaging.ServiceBus.Tests.Sender
@@ -131,48 +129,6 @@ public async Task CanSendAnEmptyBodyMessageBatch()
131129
}
132130
}
133131

134-
[Test]
135-
public async Task CanSendLargeMessageBatch()
136-
{
137-
await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: true, enableSession: true))
138-
{
139-
await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString);
140-
ServiceBusSender sender = client.CreateSender(scope.QueueName);
141-
using ServiceBusMessageBatch batch = await sender.CreateMessageBatchAsync();
142-
143-
await AddAndSendMessages();
144-
145-
batch.Clear();
146-
Assert.AreEqual(0, batch.Count);
147-
Assert.AreEqual(0, batch.SizeInBytes);
148-
149-
await AddAndSendMessages();
150-
151-
async Task AddAndSendMessages()
152-
{
153-
// service limits to 4500 messages but we have not added this to our client validation yet
154-
while (batch.Count < 4500 && batch.TryAddMessage(
155-
new ServiceBusMessage(new byte[50])
156-
{
157-
MessageId = "new message ID that takes up some space",
158-
SessionId = "sessionId",
159-
PartitionKey = "sessionId",
160-
ApplicationProperties = { { "key", "value" } }
161-
}))
162-
{
163-
}
164-
165-
if (batch.Count < 4500)
166-
{
167-
// the difference in size from the max allowable size should be less than the size of 1 message
168-
Assert.IsTrue(batch.MaxSizeInBytes - batch.SizeInBytes < 180);
169-
}
170-
Assert.Greater(batch.Count, 0);
171-
await sender.SendMessagesAsync(batch);
172-
}
173-
}
174-
}
175-
176132
[Test]
177133
public async Task CannotSendLargerThanMaximumSize()
178134
{

sdk/servicebus/Azure.Messaging.ServiceBus/tests/Sender/SenderTests.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
using System.Threading;
99
using System.Threading.Tasks;
1010
using Azure.Messaging.ServiceBus.Core;
11+
using Azure.Messaging.ServiceBus.Diagnostics;
1112
using Azure.Messaging.ServiceBus.Plugins;
1213
using Moq;
1314
using NUnit.Framework;
@@ -145,7 +146,8 @@ public async Task SendBatchManagesLockingTheBatch()
145146

146147
var completionSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
147148
var mockTransportBatch = new Mock<TransportMessageBatch>();
148-
var batch = new ServiceBusMessageBatch(mockTransportBatch.Object);
149+
var mockScope = new EntityScopeFactory("mock", "mock");
150+
var batch = new ServiceBusMessageBatch(mockTransportBatch.Object, mockScope);
149151
var mockTransportSender = new Mock<TransportSender>();
150152
var mockConnection = new Mock<ServiceBusConnection>();
151153

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
using System.Threading.Tasks;
5+
using Azure.Core.Tests;
6+
using Azure.Messaging.ServiceBus.Diagnostics;
7+
using NUnit.Framework;
8+
9+
namespace Azure.Messaging.ServiceBus.Tests.Sender
10+
{
11+
[NonParallelizable]
12+
public class ServiceBusMessageBatchLiveTests : ServiceBusLiveTestBase
13+
{
14+
[Test]
15+
[TestCase(true)]
16+
[TestCase(false)]
17+
public async Task CanSendLargeMessageBatch(bool enableTracing)
18+
{
19+
await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: true, enableSession: true))
20+
{
21+
TestDiagnosticListener listener = null;
22+
if (enableTracing)
23+
{
24+
listener = new TestDiagnosticListener(EntityScopeFactory.DiagnosticNamespace);
25+
}
26+
try
27+
{
28+
await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString);
29+
ServiceBusSender sender = client.CreateSender(scope.QueueName);
30+
using ServiceBusMessageBatch batch = await sender.CreateMessageBatchAsync();
31+
32+
await AddAndSendMessages();
33+
34+
batch.Clear();
35+
Assert.AreEqual(0, batch.Count);
36+
Assert.AreEqual(0, batch.SizeInBytes);
37+
38+
await AddAndSendMessages();
39+
40+
async Task AddAndSendMessages()
41+
{
42+
// service limits to 4500 messages but we have not added this to our client validation yet
43+
while (batch.Count < 4500 && batch.TryAddMessage(
44+
new ServiceBusMessage(new byte[50])
45+
{
46+
MessageId = "new message ID that takes up some space",
47+
SessionId = "sessionId",
48+
PartitionKey = "sessionId",
49+
ApplicationProperties = { { "key", "value" } }
50+
}))
51+
{
52+
}
53+
54+
if (batch.Count < 4500)
55+
{
56+
var diff = batch.MaxSizeInBytes - batch.SizeInBytes;
57+
// the difference in size from the max allowable size should be less than the size of 1 message
58+
Assert.IsTrue(diff < 220, diff.ToString());
59+
}
60+
Assert.Greater(batch.Count, 0);
61+
await sender.SendMessagesAsync(batch);
62+
}
63+
}
64+
finally
65+
{
66+
listener?.Dispose();
67+
}
68+
}
69+
}
70+
}
71+
}

0 commit comments

Comments
 (0)