Skip to content

Commit 0c7d99f

Browse files
Add OTel support for Service Bus (Azure#34492)
* Add OTel support for Service Bus * Update extension * ref docs * Don't include new shared source in Azure.Core.csproj * Fix ref * Fix tests * PR fb * Add batch count test cases * Core updates to support setting tracestate * PR fb * PR fb * Use destination.name for everything other than receive * receive or process
1 parent 336f34b commit 0c7d99f

31 files changed

+914
-288
lines changed

sdk/core/Azure.Core.TestFramework/src/TestActivitySourceListener.cs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
using System.Collections.Generic;
66
using System.Diagnostics;
77
using System.Linq;
8+
using NUnit.Framework;
89

910
namespace Azure.Core.Tests
1011
{
@@ -16,11 +17,11 @@ public class TestActivitySourceListener: IDisposable
1617
public Queue<Activity> Activities { get; } =
1718
new Queue<Activity>();
1819

19-
public TestActivitySourceListener(string name) : this(source => source.Name == name)
20+
public TestActivitySourceListener(string name, Action<Activity> activityStartedCallback = default) : this(source => source.Name.StartsWith(name), activityStartedCallback)
2021
{
2122
}
2223

23-
public TestActivitySourceListener(Func<ActivitySource, bool> sourceSelector)
24+
public TestActivitySourceListener(Func<ActivitySource, bool> sourceSelector, Action<Activity> activityStartedCallback = default)
2425
{
2526
_listener = new ActivityListener
2627
{
@@ -31,6 +32,7 @@ public TestActivitySourceListener(Func<ActivitySource, bool> sourceSelector)
3132
{
3233
Activities.Enqueue(activity);
3334
}
35+
activityStartedCallback?.Invoke(activity);
3436
},
3537
Sample = (ref ActivityCreationOptions<ActivityContext> options) =>
3638
{
@@ -46,6 +48,13 @@ public TestActivitySourceListener(Func<ActivitySource, bool> sourceSelector)
4648
ActivitySource.AddActivityListener(_listener);
4749
}
4850

51+
public Activity AssertAndRemoveActivity(string name)
52+
{
53+
var activity = Activities.Dequeue();
54+
Assert.AreEqual(name, activity.OperationName);
55+
return activity;
56+
}
57+
4958
public void Dispose()
5059
{
5160
_listener?.Dispose();

sdk/core/Azure.Core/src/Shared/DiagnosticScope.cs

Lines changed: 46 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ public void AddAttribute<T>(string name, T value, Func<T, string> format)
107107
/// <param name="traceparent">The traceparent for the link.</param>
108108
/// <param name="tracestate">The tracestate for the link.</param>
109109
/// <param name="attributes">Optional attributes to associate with the link.</param>
110-
public void AddLink(string traceparent, string tracestate, IDictionary<string, string>? attributes = null)
110+
public void AddLink(string traceparent, string? tracestate, IDictionary<string, string>? attributes = null)
111111
{
112112
_activityAdapter?.AddLink(traceparent, tracestate, attributes);
113113
}
@@ -124,12 +124,13 @@ public void SetStartTime(DateTime dateTime)
124124
}
125125

126126
/// <summary>
127-
/// Sets the trace parent for the current scope.
127+
/// Sets the trace context for the current scope.
128128
/// </summary>
129129
/// <param name="traceparent">The trace parent to set for the current scope.</param>
130-
public void SetTraceparent(string traceparent)
130+
/// <param name="tracestate">The trace state to set for the current scope.</param>
131+
public void SetTraceContext(string traceparent, string? tracestate = default)
131132
{
132-
_activityAdapter?.SetTraceparent(traceparent);
133+
_activityAdapter?.SetTraceContext(traceparent, tracestate);
133134
}
134135

135136
public void Dispose()
@@ -199,6 +200,7 @@ private class ActivityAdapter : IDisposable
199200
private DateTimeOffset _startTime;
200201
private List<Activity>? _links;
201202
private string? _traceparent;
203+
private string? _tracestate;
202204

203205
public ActivityAdapter(object? activitySource, DiagnosticSource diagnosticSource, string activityName, ActivityKind kind, object? diagnosticSourceArgs)
204206
{
@@ -258,7 +260,7 @@ public void AddTag(string name, object value)
258260
return linkCollection;
259261
}
260262

261-
public void AddLink(string traceparent, string tracestate, IDictionary<string, string>? attributes)
263+
public void AddLink(string traceparent, string? tracestate, IDictionary<string, string>? attributes)
262264
{
263265
var linkedActivity = new Activity("LinkedActivity");
264266
linkedActivity.SetW3CFormat();
@@ -334,6 +336,11 @@ public void AddLink(string traceparent, string tracestate, IDictionary<string, s
334336
_currentActivity.SetParentId(_traceparent);
335337
}
336338

339+
if (_tracestate != null)
340+
{
341+
_currentActivity.SetTraceState(_tracestate);
342+
}
343+
337344
_currentActivity.Start();
338345
}
339346

@@ -351,7 +358,8 @@ public void AddLink(string traceparent, string tracestate, IDictionary<string, s
351358
startTime: _startTime,
352359
tags: _tagCollection,
353360
links: GetActivitySourceLinkCollection(),
354-
parentId: _traceparent);
361+
traceparent: _traceparent,
362+
tracestate: _tracestate);
355363
}
356364

357365
public void SetStartTime(DateTime startTime)
@@ -365,13 +373,14 @@ public void MarkFailed(Exception exception)
365373
_diagnosticSource?.Write(_activityName + ".Exception", exception);
366374
}
367375

368-
public void SetTraceparent(string traceparent)
376+
public void SetTraceContext(string traceparent, string? tracestate)
369377
{
370378
if (_currentActivity != null)
371379
{
372380
throw new InvalidOperationException("Traceparent can not be set after the activity is started.");
373381
}
374382
_traceparent = traceparent;
383+
_tracestate = tracestate;
375384
}
376385

377386
public void Dispose()
@@ -418,13 +427,14 @@ static ActivityExtensions()
418427
private static Action<Activity, string?>? SetTraceStateStringMethod;
419428
private static Func<Activity, int>? GetIdFormatMethod;
420429
private static Action<Activity, string, object?>? ActivityAddTagMethod;
421-
private static Func<object, string, int, string?, ICollection<KeyValuePair<string, object>>?, IList?, DateTimeOffset, Activity?>? ActivitySourceStartActivityMethod;
430+
private static Func<object, string, int, object?, ICollection<KeyValuePair<string, object>>?, IList?, DateTimeOffset, Activity?>? ActivitySourceStartActivityMethod;
422431
private static Func<object, bool>? ActivitySourceHasListenersMethod;
423432
private static Func<string, string?, ICollection<KeyValuePair<string, object>>?, object?>? CreateActivityLinkMethod;
424433
private static Func<ICollection<KeyValuePair<string,object>>?>? CreateTagsCollectionMethod;
425434
private static Func<Activity, string, object?>? GetCustomPropertyMethod;
426435
private static Action<Activity, string, object>? SetCustomPropertyMethod;
427436
private static readonly ParameterExpression ActivityParameter = Expression.Parameter(typeof(Activity));
437+
private static MethodInfo? ParseActivityContextMethod;
428438

429439
public static object? GetCustomProperty(this Activity activity, string propertyName)
430440
{
@@ -687,13 +697,23 @@ public static bool ActivitySourceHasListeners(object? activitySource)
687697
return ActivitySourceHasListenersMethod.Invoke(activitySource);
688698
}
689699

690-
public static Activity? ActivitySourceStartActivity(object? activitySource, string activityName, int kind, DateTimeOffset startTime, ICollection<KeyValuePair<string, object>>? tags, IList? links, string? parentId)
700+
public static Activity? ActivitySourceStartActivity(
701+
object? activitySource,
702+
string activityName,
703+
int kind,
704+
DateTimeOffset startTime,
705+
ICollection<KeyValuePair<string, object>>? tags,
706+
IList? links,
707+
string? traceparent,
708+
string? tracestate)
691709
{
692710
if (activitySource == null)
693711
{
694712
return null;
695713
}
696714

715+
object? activityContext = default;
716+
697717
if (ActivitySourceStartActivityMethod == null)
698718
{
699719
if (ActivityLinkType == null ||
@@ -709,7 +729,7 @@ public static bool ActivitySourceHasListeners(object? activitySource)
709729
{
710730
typeof(string),
711731
ActivityKindType,
712-
typeof(string),
732+
ActivityContextType,
713733
typeof(IEnumerable<KeyValuePair<string, object>>),
714734
typeof(IEnumerable<>).MakeGenericType(ActivityLinkType),
715735
typeof(DateTimeOffset)
@@ -724,27 +744,38 @@ public static bool ActivitySourceHasListeners(object? activitySource)
724744
var sourceParameter = Expression.Parameter(typeof(object));
725745
var nameParameter = Expression.Parameter(typeof(string));
726746
var kindParameter = Expression.Parameter(typeof(int));
727-
var parentIdParameter = Expression.Parameter(typeof(string));
747+
var contextParameter = Expression.Parameter(typeof(object));
728748
var startTimeParameter = Expression.Parameter(typeof(DateTimeOffset));
729749
var tagsParameter = Expression.Parameter(typeof(ICollection<KeyValuePair<string, object>>));
730750
var linksParameter = Expression.Parameter(typeof(IList));
731751
var methodParameter = method.GetParameters();
732-
ActivitySourceStartActivityMethod = Expression.Lambda<Func<object, string, int, string?, ICollection<KeyValuePair<string, object>>?, IList?, DateTimeOffset, Activity?>>(
752+
ParseActivityContextMethod = ActivityContextType.GetMethod("Parse", BindingFlags.Static | BindingFlags.Public);
753+
754+
ActivitySourceStartActivityMethod = Expression.Lambda<Func<object, string, int, object?, ICollection<KeyValuePair<string, object>>?, IList?, DateTimeOffset, Activity?>>(
733755
Expression.Call(
734756
Expression.Convert(sourceParameter, method.DeclaringType!),
735757
method,
736758
nameParameter,
737759
Expression.Convert(kindParameter, methodParameter[1].ParameterType),
738-
Expression.Convert(parentIdParameter, methodParameter[2].ParameterType),
760+
Expression.Convert(contextParameter, methodParameter[2].ParameterType),
739761
Expression.Convert(tagsParameter, methodParameter[3].ParameterType),
740762
Expression.Convert(linksParameter, methodParameter[4].ParameterType),
741763
Expression.Convert(startTimeParameter, methodParameter[5].ParameterType)),
742-
sourceParameter, nameParameter, kindParameter, parentIdParameter, tagsParameter, linksParameter, startTimeParameter).Compile();
764+
sourceParameter, nameParameter, kindParameter, contextParameter, tagsParameter, linksParameter, startTimeParameter).Compile();
743765
}
744766
}
745767
}
746768

747-
return ActivitySourceStartActivityMethod.Invoke(activitySource, activityName, kind, parentId, tags, links, startTime);
769+
if (ActivityContextType != null && ParseActivityContextMethod != null)
770+
{
771+
if (traceparent != null)
772+
activityContext = ParseActivityContextMethod.Invoke(null, new[] {traceparent, tracestate})!;
773+
else
774+
// because ActivityContext is a struct, we need to create a default instance rather than allowing the argument to be null
775+
activityContext = Activator.CreateInstance(ActivityContextType);
776+
}
777+
778+
return ActivitySourceStartActivityMethod.Invoke(activitySource, activityName, kind, activityContext, tags, links, startTime);
748779
}
749780

750781
public static object? CreateActivitySource(string name)
Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
using System.Collections.Generic;
5+
using System.Diagnostics;
6+
using Azure.Core.Pipeline;
7+
8+
#nullable enable
9+
10+
namespace Azure.Core.Shared
11+
{
12+
/// <summary>
13+
/// Client Diagnostics support for messaging clients. Currently, this is only used for AMQP clients.
14+
/// HTTP libraries should use the ClientDiagnostics type instead.
15+
/// </summary>
16+
internal class MessagingClientDiagnostics
17+
{
18+
private readonly string _fullyQualifiedNamespace;
19+
private readonly string _entityPath;
20+
private readonly string _messagingSystem;
21+
private readonly DiagnosticScopeFactory _scopeFactory;
22+
23+
#region OTel-specific messaging attributes
24+
public const string MessagingSystem = "messaging.system";
25+
public const string DestinationName = "messaging.destination.name";
26+
public const string SourceName = "messaging.source.name";
27+
public const string MessagingOperation = "messaging.operation";
28+
public const string NetPeerName = "net.peer.name";
29+
public const string BatchCount = "messaging.batch.message_count";
30+
public const string TraceParent = "traceparent";
31+
public const string TraceState = "tracestate";
32+
#endregion
33+
34+
#region legacy compat attributes
35+
public const string MessageBusDestination = "message_bus.destination";
36+
public const string PeerAddress = "peer.address";
37+
public const string Component = "component";
38+
#endregion
39+
40+
public const string DiagnosticIdAttribute = "Diagnostic-Id";
41+
42+
public MessagingClientDiagnostics(string clientNamespace, string? resourceProviderNamespace, string messagingSystem, string fullyQualifiedNamespace, string entityPath)
43+
{
44+
Argument.AssertNotNull(messagingSystem, nameof(messagingSystem));
45+
Argument.AssertNotNull(fullyQualifiedNamespace, nameof(fullyQualifiedNamespace));
46+
Argument.AssertNotNull(entityPath, nameof(entityPath));
47+
48+
_messagingSystem = messagingSystem;
49+
_fullyQualifiedNamespace = fullyQualifiedNamespace;
50+
_entityPath = entityPath;
51+
_scopeFactory = new DiagnosticScopeFactory(clientNamespace, resourceProviderNamespace, true, false);
52+
}
53+
54+
public DiagnosticScope CreateScope(
55+
string activityName,
56+
DiagnosticScope.ActivityKind kind,
57+
MessagingDiagnosticOperation operation = default)
58+
{
59+
DiagnosticScope scope = _scopeFactory.CreateScope(activityName, kind);
60+
if (ActivityExtensions.SupportsActivitySource())
61+
{
62+
scope.AddAttribute(MessagingSystem, _messagingSystem);
63+
if (operation != default)
64+
{
65+
scope.AddAttribute(MessagingOperation, operation.ToString());
66+
}
67+
68+
scope.AddAttribute(NetPeerName, _fullyQualifiedNamespace);
69+
scope.AddAttribute(operation == MessagingDiagnosticOperation.Receive || operation == MessagingDiagnosticOperation.Process ? SourceName : DestinationName, _entityPath);
70+
}
71+
else
72+
{
73+
scope.AddAttribute(Component, _messagingSystem);
74+
scope.AddAttribute(PeerAddress, _fullyQualifiedNamespace);
75+
scope.AddAttribute(MessageBusDestination, _entityPath);
76+
}
77+
78+
return scope;
79+
}
80+
81+
/// <summary>
82+
/// Attempts to extract the trace context from a message's properties.
83+
/// </summary>
84+
///
85+
/// <param name="properties">The properties holding the trace context.</param>
86+
/// <param name="traceparent">The trace parent of the message.</param>
87+
/// <param name="tracestate">The trace state of the message.</param>
88+
/// <returns><c>true</c> if the message properties contained the diagnostic id; otherwise, <c>false</c>.</returns>
89+
public static bool TryExtractTraceContext(IReadOnlyDictionary<string, object> properties, out string? traceparent, out string? tracestate)
90+
{
91+
traceparent = null;
92+
tracestate = null;
93+
94+
if (properties.TryGetValue(TraceParent, out var traceParent) && traceParent is string traceParentString)
95+
{
96+
traceparent = traceParentString;
97+
if (properties.TryGetValue(TraceState, out object state) && state is string stateString)
98+
{
99+
tracestate = stateString;
100+
}
101+
return true;
102+
}
103+
104+
// trace state is not valid without trace parent, so we don't need to check for it for the legacy attribute
105+
if (properties.TryGetValue(DiagnosticIdAttribute, out var diagnosticId) && diagnosticId is string diagnosticIdString)
106+
{
107+
traceparent = diagnosticIdString;
108+
return true;
109+
}
110+
return false;
111+
}
112+
113+
/// <summary>
114+
/// Attempts to extract the trace context from a message's properties.
115+
/// </summary>
116+
///
117+
/// <param name="properties">The properties holding the trace context.</param>
118+
/// <param name="traceparent">The trace parent of the message.</param>
119+
/// <param name="tracestate">The trace state of the message.</param>
120+
/// <returns><c>true</c> if the message properties contained the diagnostic id; otherwise, <c>false</c>.</returns>
121+
public static bool TryExtractTraceContext(IDictionary<string, object> properties, out string? traceparent, out string? tracestate)
122+
{
123+
traceparent = null;
124+
tracestate = null;
125+
126+
if (properties.TryGetValue(TraceParent, out var traceParent) && traceParent is string traceParentString)
127+
{
128+
traceparent = traceParentString;
129+
if (properties.TryGetValue(TraceState, out object state) && state is string stateString)
130+
{
131+
tracestate = stateString;
132+
}
133+
return true;
134+
}
135+
136+
// trace state is not valid without trace parent, so we don't need to check for it for the legacy attribute
137+
if (properties.TryGetValue(DiagnosticIdAttribute, out var diagnosticId) && diagnosticId is string diagnosticIdString)
138+
{
139+
traceparent = diagnosticIdString;
140+
return true;
141+
}
142+
return false;
143+
}
144+
145+
/// <summary>
146+
/// Instrument the message properties for tracing. If tracing is enabled, a diagnostic id will be added to the message properties,
147+
/// which alters the message size.
148+
/// </summary>
149+
/// <param name="properties">The dictionary of application message properties.</param>
150+
/// <param name="activityName">The activity name to use for the diagnostic scope.</param>
151+
public void InstrumentMessage(IDictionary<string, object> properties, string activityName)
152+
{
153+
if (!properties.ContainsKey(DiagnosticIdAttribute) && !properties.ContainsKey(TraceParent))
154+
{
155+
using DiagnosticScope messageScope = CreateScope(
156+
activityName,
157+
DiagnosticScope.ActivityKind.Producer);
158+
messageScope.Start();
159+
160+
Activity activity = Activity.Current;
161+
if (activity != null)
162+
{
163+
properties[DiagnosticIdAttribute] = activity.Id;
164+
properties[TraceParent] = activity.Id;
165+
if (activity.TraceStateString != null)
166+
properties[TraceState] = activity.TraceStateString;
167+
}
168+
}
169+
}
170+
}
171+
}

0 commit comments

Comments
 (0)