Skip to content

Commit d75996a

Browse files
Add ServiceBusTransportMetrics (Azure#27663)
* Add ServiceBusTransportMetrics * PR fb * Fix tests * Fix test
1 parent 1c83dd4 commit d75996a

17 files changed

+339
-66
lines changed

sdk/servicebus/Azure.Messaging.ServiceBus/api/Azure.Messaging.ServiceBus.netstandard2.0.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,11 +87,13 @@ public ServiceBusClient(string connectionString, Azure.Messaging.ServiceBus.Serv
8787
public virtual Azure.Messaging.ServiceBus.ServiceBusSessionProcessor CreateSessionProcessor(string queueName, Azure.Messaging.ServiceBus.ServiceBusSessionProcessorOptions options = null) { throw null; }
8888
public virtual Azure.Messaging.ServiceBus.ServiceBusSessionProcessor CreateSessionProcessor(string topicName, string subscriptionName, Azure.Messaging.ServiceBus.ServiceBusSessionProcessorOptions options = null) { throw null; }
8989
public virtual System.Threading.Tasks.ValueTask DisposeAsync() { throw null; }
90+
public virtual Azure.Messaging.ServiceBus.ServiceBusTransportMetrics GetTransportMetrics() { throw null; }
9091
}
9192
public partial class ServiceBusClientOptions
9293
{
9394
public ServiceBusClientOptions() { }
9495
public bool EnableCrossEntityTransactions { get { throw null; } set { } }
96+
public bool EnableTransportMetrics { get { throw null; } set { } }
9597
public Azure.Messaging.ServiceBus.ServiceBusRetryOptions RetryOptions { get { throw null; } set { } }
9698
public Azure.Messaging.ServiceBus.ServiceBusTransportType TransportType { get { throw null; } set { } }
9799
public System.Net.IWebProxy WebProxy { get { throw null; } set { } }
@@ -205,6 +207,7 @@ public static partial class ServiceBusModelFactory
205207
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
206208
public static Azure.Messaging.ServiceBus.ServiceBusReceivedMessage ServiceBusReceivedMessage(System.BinaryData body, string messageId, string partitionKey, string viaPartitionKey, string sessionId, string replyToSessionId, System.TimeSpan timeToLive, string correlationId, string subject, string to, string contentType, string replyTo, System.DateTimeOffset scheduledEnqueueTime, System.Collections.Generic.IDictionary<string, object> properties, System.Guid lockTokenGuid, int deliveryCount, System.DateTimeOffset lockedUntil, long sequenceNumber, string deadLetterSource, long enqueuedSequenceNumber, System.DateTimeOffset enqueuedTime) { throw null; }
207209
public static Azure.Messaging.ServiceBus.ServiceBusReceivedMessage ServiceBusReceivedMessage(System.BinaryData body = null, string messageId = null, string partitionKey = null, string viaPartitionKey = null, string sessionId = null, string replyToSessionId = null, System.TimeSpan timeToLive = default(System.TimeSpan), string correlationId = null, string subject = null, string to = null, string contentType = null, string replyTo = null, System.DateTimeOffset scheduledEnqueueTime = default(System.DateTimeOffset), System.Collections.Generic.IDictionary<string, object> properties = null, System.Guid lockTokenGuid = default(System.Guid), int deliveryCount = 0, System.DateTimeOffset lockedUntil = default(System.DateTimeOffset), long sequenceNumber = (long)-1, string deadLetterSource = null, long enqueuedSequenceNumber = (long)0, System.DateTimeOffset enqueuedTime = default(System.DateTimeOffset), Azure.Messaging.ServiceBus.ServiceBusMessageState serviceBusMessageState = Azure.Messaging.ServiceBus.ServiceBusMessageState.Active) { throw null; }
210+
public static Azure.Messaging.ServiceBus.ServiceBusTransportMetrics ServiceBusTransportMetrics(System.DateTimeOffset? lastHeartbeat = default(System.DateTimeOffset?), System.DateTimeOffset? lastConnectionOpen = default(System.DateTimeOffset?), System.DateTimeOffset? lastConnectionClose = default(System.DateTimeOffset?)) { throw null; }
208211
public static Azure.Messaging.ServiceBus.Administration.SubscriptionProperties SubscriptionProperties(string topicName, string subscriptionName, System.TimeSpan lockDuration = default(System.TimeSpan), bool requiresSession = false, System.TimeSpan defaultMessageTimeToLive = default(System.TimeSpan), System.TimeSpan autoDeleteOnIdle = default(System.TimeSpan), bool deadLetteringOnMessageExpiration = false, int maxDeliveryCount = 0, bool enableBatchedOperations = false, Azure.Messaging.ServiceBus.Administration.EntityStatus status = default(Azure.Messaging.ServiceBus.Administration.EntityStatus), string forwardTo = null, string forwardDeadLetteredMessagesTo = null, string userMetadata = null) { throw null; }
209212
public static Azure.Messaging.ServiceBus.Administration.SubscriptionRuntimeProperties SubscriptionRuntimeProperties(string topicName, string subscriptionName, long activeMessageCount = (long)0, long deadLetterMessageCount = (long)0, long transferDeadLetterMessageCount = (long)0, long transferMessageCount = (long)0, long totalMessageCount = (long)0, System.DateTimeOffset createdAt = default(System.DateTimeOffset), System.DateTimeOffset updatedAt = default(System.DateTimeOffset), System.DateTimeOffset accessedAt = default(System.DateTimeOffset)) { throw null; }
210213
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
@@ -485,6 +488,13 @@ public ServiceBusSessionReceiverOptions() { }
485488
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
486489
public override string ToString() { throw null; }
487490
}
491+
public partial class ServiceBusTransportMetrics
492+
{
493+
protected internal ServiceBusTransportMetrics() { }
494+
public System.DateTimeOffset? LastConnectionClose { get { throw null; } }
495+
public System.DateTimeOffset? LastConnectionOpen { get { throw null; } }
496+
public System.DateTimeOffset? LastHeartBeat { get { throw null; } }
497+
}
488498
public enum ServiceBusTransportType
489499
{
490500
AmqpTcp = 0,

sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpClient.cs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ internal class AmqpClient : TransportClient
6161
///
6262
private AmqpConnectionScope ConnectionScope { get; }
6363

64+
public override ServiceBusTransportMetrics TransportMetrics { get; }
65+
6466
/// <summary>
6567
/// Initializes a new instance of the <see cref="AmqpClient"/> class.
6668
/// </summary>
@@ -94,13 +96,18 @@ internal AmqpClient(
9496
}.Uri;
9597

9698
Credential = credential;
99+
if (options.EnableTransportMetrics)
100+
{
101+
TransportMetrics = new ServiceBusTransportMetrics();
102+
}
97103
ConnectionScope = new AmqpConnectionScope(
98104
ServiceEndpoint,
99105
credential,
100106
options.TransportType,
101107
options.WebProxy,
102108
options.EnableCrossEntityTransactions,
103-
options.RetryOptions.TryTimeout);
109+
options.RetryOptions.TryTimeout,
110+
TransportMetrics);
104111
}
105112

106113
/// <summary>

sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpConnectionScope.cs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -178,13 +178,15 @@ public override bool IsDisposed
178178
/// <param name="proxy">The proxy, if any, to use for communication.</param>
179179
/// <param name="useSingleSession">If true, all links will use a single session.</param>
180180
/// <param name="operationTimeout">The timeout for operations associated with the connection.</param>
181+
/// <param name="metrics">The metrics instance to populate transport metrics. May be null.</param>
181182
public AmqpConnectionScope(
182183
Uri serviceEndpoint,
183184
ServiceBusTokenCredential credential,
184185
ServiceBusTransportType transport,
185186
IWebProxy proxy,
186187
bool useSingleSession,
187-
TimeSpan operationTimeout)
188+
TimeSpan operationTimeout,
189+
ServiceBusTransportMetrics metrics)
188190
{
189191
Argument.AssertNotNull(serviceEndpoint, nameof(serviceEndpoint));
190192
Argument.AssertNotNull(credential, nameof(credential));
@@ -198,7 +200,7 @@ public AmqpConnectionScope(
198200
TokenProvider = new CbsTokenProvider(new ServiceBusTokenCredential(credential), AuthorizationTokenExpirationBuffer, OperationCancellationSource.Token);
199201
_useSingleSession = useSingleSession;
200202
#pragma warning disable CA2214 // Do not call overridable methods in constructors. This internal method is virtual for testing purposes.
201-
Task<AmqpConnection> connectionFactory(TimeSpan timeout) => CreateAndOpenConnectionAsync(AmqpVersion, ServiceEndpoint, Transport, Proxy, Id, timeout);
203+
Task<AmqpConnection> connectionFactory(TimeSpan timeout) => CreateAndOpenConnectionAsync(AmqpVersion, ServiceEndpoint, Transport, Proxy, Id, timeout, metrics);
202204
#pragma warning restore CA2214 // Do not call overridable methods in constructors
203205

204206
ActiveConnection = new FaultTolerantAmqpObject<AmqpConnection>(
@@ -436,15 +438,16 @@ public override void Dispose()
436438
/// <param name="proxy">The proxy, if any, to use for communication.</param>
437439
/// <param name="scopeIdentifier">The unique identifier for the associated scope.</param>
438440
/// <param name="timeout">The timeout to consider when creating the connection.</param>
439-
///
441+
/// <param name="metrics">The metrics instance to populate transport metrics. May be null.</param>
440442
/// <returns>An AMQP connection that may be used for communicating with the Service Bus service.</returns>
441443
protected virtual async Task<AmqpConnection> CreateAndOpenConnectionAsync(
442444
Version amqpVersion,
443445
Uri serviceEndpoint,
444446
ServiceBusTransportType transportType,
445447
IWebProxy proxy,
446448
string scopeIdentifier,
447-
TimeSpan timeout)
449+
TimeSpan timeout,
450+
ServiceBusTransportMetrics metrics)
448451
{
449452
var hostName = serviceEndpoint.Host;
450453
AmqpSettings amqpSettings = CreateAmpqSettings(AmqpVersion);
@@ -463,6 +466,11 @@ protected virtual async Task<AmqpConnection> CreateAndOpenConnectionAsync(
463466
TransportBase transport = await initiator.ConnectTaskAsync(timeout).ConfigureAwait(false);
464467

465468
var connection = new AmqpConnection(transport, amqpSettings, connectionSetings);
469+
if (metrics != null)
470+
{
471+
connection.UsageMeter = new AmqpUsageMeter(metrics);
472+
}
473+
466474
await OpenAmqpObjectAsync(connection, timeout.CalculateRemaining(stopWatch.GetElapsedTime()), CancellationToken.None).ConfigureAwait(false);
467475

468476
// Create the CBS link that will be used for authorization. The act of creating the link will associate
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
using System;
5+
using System.Threading;
6+
using Microsoft.Azure.Amqp;
7+
8+
namespace Azure.Messaging.ServiceBus.Amqp
9+
{
10+
internal class AmqpUsageMeter : IAmqpUsageMeter
11+
{
12+
private readonly ServiceBusTransportMetrics _metrics;
13+
14+
public AmqpUsageMeter(ServiceBusTransportMetrics metrics)
15+
{
16+
_metrics = metrics;
17+
}
18+
public void OnTransportWrite(int bufferSize, int writeSize, long queueSize, long latencyTicks)
19+
{
20+
// not implemented
21+
}
22+
23+
public void OnTransportRead(int bufferSize, int readSize, int cacheHits, long latencyTicks)
24+
{
25+
// not implemented
26+
}
27+
28+
public void OnRead(AmqpConnection connection, ulong frameCode, int numberOfBytes)
29+
{
30+
switch (frameCode)
31+
{
32+
case 0x00:
33+
_metrics.LastHeartBeat = DateTimeOffset.UtcNow;
34+
break;
35+
case 0x10:
36+
_metrics.LastConnectionOpen = DateTimeOffset.UtcNow;
37+
break;
38+
case 0x18:
39+
_metrics.LastConnectionClose = DateTimeOffset.UtcNow;
40+
break;
41+
}
42+
}
43+
44+
public void OnWrite(AmqpConnection connection, ulong frameCode, int numberOfBytes)
45+
{
46+
// not implemented
47+
}
48+
}
49+
}

sdk/servicebus/Azure.Messaging.ServiceBus/src/Client/ServiceBusClient.cs

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -87,17 +87,26 @@ public virtual async ValueTask DisposeAsync()
8787
}
8888

8989
/// <summary>
90-
/// Can be used for mocking.
90+
/// Gets the metrics associated with this <see cref="ServiceBusClient"/> instance. The metrics returned represent a snapshot and will not be updated.
91+
/// To get updated metrics, this method should be called again.
92+
/// In order to use this property, <see cref="ServiceBusClientOptions.EnableTransportMetrics"/> must be set to <value>true</value>.
9193
/// </summary>
92-
protected ServiceBusClient()
93-
{
94-
}
94+
public virtual ServiceBusTransportMetrics GetTransportMetrics()
95+
=> Connection.InnerClient.TransportMetrics?.Clone() ??
96+
throw new InvalidOperationException("Transport metrics are not enabled. To enable transport metrics, set the EnableTransportMetrics property on the ServiceBusClientOptions.");
9597

9698
/// <summary>
9799
/// The connection that is used for the client.
98100
/// </summary>
99101
internal ServiceBusConnection Connection { get; }
100102

103+
/// <summary>
104+
/// Can be used for mocking.
105+
/// </summary>
106+
protected ServiceBusClient()
107+
{
108+
}
109+
101110
/// <summary>
102111
/// Initializes a new instance of the <see cref="ServiceBusClient"/> class.
103112
/// </summary>

sdk/servicebus/Azure.Messaging.ServiceBus/src/Client/ServiceBusClientOptions.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,12 @@ public ServiceBusRetryOptions RetryOptions
6161
///</value>
6262
public bool EnableCrossEntityTransactions { get; set; }
6363

64+
/// <summary>
65+
/// Gets or sets whether or not to enable metrics for the associated <see cref="ServiceBusClient"/> instance.
66+
/// If set to <value>true</value>, <see cref="ServiceBusClient.GetTransportMetrics"/> can be called.
67+
/// </summary>
68+
public bool EnableTransportMetrics { get; set; }
69+
6470
/// <summary>
6571
/// Determines whether the specified <see cref="System.Object" /> is equal to this instance.
6672
/// </summary>
@@ -103,6 +109,7 @@ internal ServiceBusClientOptions Clone() =>
103109
WebProxy = WebProxy,
104110
RetryOptions = RetryOptions.Clone(),
105111
EnableCrossEntityTransactions = EnableCrossEntityTransactions,
112+
EnableTransportMetrics = EnableTransportMetrics
106113
};
107114
}
108115
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
using System;
5+
6+
namespace Azure.Messaging.ServiceBus
7+
{
8+
/// <summary>
9+
/// A set of metrics that can be used to monitor communication between the client and service.
10+
/// </summary>
11+
public class ServiceBusTransportMetrics
12+
{
13+
/// <summary>
14+
/// Initializes a new instance of the <see cref="ServiceBusTransportMetrics"/> class for mocking.
15+
/// </summary>
16+
protected internal ServiceBusTransportMetrics()
17+
{
18+
}
19+
20+
/// <summary>
21+
/// Gets the last time that a heartbeat was received from the Service Bus service. These heartbeats are sent from the
22+
/// service approximately every 30 seconds.
23+
/// </summary>
24+
public DateTimeOffset? LastHeartBeat { get; internal set; }
25+
26+
/// <summary>
27+
/// Gets the last time that a connection was opened for the associated <see cref="ServiceBusClient"/> instance.
28+
/// </summary>
29+
public DateTimeOffset? LastConnectionOpen { get; internal set; }
30+
31+
/// <summary>
32+
/// Gets the last time that a connection was closed for the associated <see cref="ServiceBusClient"/> instance. If the <see cref="ServiceBusClient"/>
33+
/// was disposed, then this time will not be updated again. It may be updated multiple times if the close is initiated by the service.
34+
/// </summary>
35+
public DateTimeOffset? LastConnectionClose { get; internal set; }
36+
37+
internal ServiceBusTransportMetrics Clone() =>
38+
new()
39+
{
40+
LastHeartBeat = LastHeartBeat,
41+
LastConnectionOpen = LastConnectionOpen,
42+
LastConnectionClose = LastConnectionClose
43+
};
44+
}
45+
}

sdk/servicebus/Azure.Messaging.ServiceBus/src/Core/TransportClient.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,11 @@ internal abstract class TransportClient : IAsyncDisposable
3535
///
3636
public virtual Uri ServiceEndpoint { get; }
3737

38+
/// <summary>
39+
/// The metrics related to the client.
40+
/// </summary>
41+
public virtual ServiceBusTransportMetrics TransportMetrics { get; }
42+
3843
/// <summary>
3944
/// Creates a sender strongly aligned with the active protocol and transport,
4045
/// responsible for sending <see cref="ServiceBusMessage" /> to the entity.

0 commit comments

Comments
 (0)