Skip to content

Commit 0899d47

Browse files
authored
[Event Hubs] Client Identifier Option (#22614)
The focus of these changes is to add an option for setting an Identifier for each Event Hubs client type. The identifier is informational and is associated with the AMQP links used, allowing the service to provide additional context in error messages and the SDK logs to provide an additional point of correlation.
1 parent 02f27a2 commit 0899d47

34 files changed

+1084
-501
lines changed

sdk/eventhub/Azure.Messaging.EventHubs/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
### Features Added
66

7+
- Each Event Hubs client type now offers an option to set an Identifier. The identifier is informational and is associated with the AMQP links used, allowing the service to provide additional context in error messages and the SDK logs to provide an additional point of correlation.
8+
79
### Breaking Changes
810

911
### Bugs Fixed

sdk/eventhub/Azure.Messaging.EventHubs/api/Azure.Messaging.EventHubs.netstandard2.0.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,7 @@ public EventHubConsumerClient(string consumerGroup, string connectionString, str
204204
public string ConsumerGroup { get { throw null; } }
205205
public string EventHubName { get { throw null; } }
206206
public string FullyQualifiedNamespace { get { throw null; } }
207+
public string Identifier { get { throw null; } }
207208
public bool IsClosed { get { throw null; } protected set { } }
208209
public virtual System.Threading.Tasks.Task CloseAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
209210
public virtual System.Threading.Tasks.ValueTask DisposeAsync() { throw null; }
@@ -226,6 +227,7 @@ public partial class EventHubConsumerClientOptions
226227
{
227228
public EventHubConsumerClientOptions() { }
228229
public Azure.Messaging.EventHubs.EventHubConnectionOptions ConnectionOptions { get { throw null; } set { } }
230+
public string Identifier { get { throw null; } set { } }
229231
public Azure.Messaging.EventHubs.EventHubsRetryOptions RetryOptions { get { throw null; } set { } }
230232
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
231233
public override bool Equals(object obj) { throw null; }
@@ -400,6 +402,7 @@ public PartitionReceiver(string consumerGroup, string partitionId, Azure.Messagi
400402
public string ConsumerGroup { get { throw null; } }
401403
public string EventHubName { get { throw null; } }
402404
public string FullyQualifiedNamespace { get { throw null; } }
405+
public string Identifier { get { throw null; } }
403406
public Azure.Messaging.EventHubs.Consumer.EventPosition InitialPosition { get { throw null; } }
404407
public bool IsClosed { get { throw null; } protected set { } }
405408
public string PartitionId { get { throw null; } }
@@ -421,6 +424,7 @@ public partial class PartitionReceiverOptions
421424
public PartitionReceiverOptions() { }
422425
public Azure.Messaging.EventHubs.EventHubConnectionOptions ConnectionOptions { get { throw null; } set { } }
423426
public System.TimeSpan? DefaultMaximumReceiveWaitTime { get { throw null; } set { } }
427+
public string Identifier { get { throw null; } set { } }
424428
public long? OwnerLevel { get { throw null; } set { } }
425429
public int PrefetchCount { get { throw null; } set { } }
426430
public long? PrefetchSizeInBytes { get { throw null; } set { } }
@@ -519,6 +523,7 @@ public EventHubProducerClient(string fullyQualifiedNamespace, string eventHubNam
519523
public EventHubProducerClient(string connectionString, string eventHubName, Azure.Messaging.EventHubs.Producer.EventHubProducerClientOptions clientOptions) { }
520524
public string EventHubName { get { throw null; } }
521525
public string FullyQualifiedNamespace { get { throw null; } }
526+
public string Identifier { get { throw null; } }
522527
public bool IsClosed { get { throw null; } protected set { } }
523528
public virtual System.Threading.Tasks.Task CloseAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
524529
public virtual System.Threading.Tasks.ValueTask<Azure.Messaging.EventHubs.Producer.EventDataBatch> CreateBatchAsync(Azure.Messaging.EventHubs.Producer.CreateBatchOptions options, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
@@ -541,6 +546,7 @@ public partial class EventHubProducerClientOptions
541546
{
542547
public EventHubProducerClientOptions() { }
543548
public Azure.Messaging.EventHubs.EventHubConnectionOptions ConnectionOptions { get { throw null; } set { } }
549+
public string Identifier { get { throw null; } set { } }
544550
public Azure.Messaging.EventHubs.EventHubsRetryOptions RetryOptions { get { throw null; } set { } }
545551
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
546552
public override bool Equals(object obj) { throw null; }

sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpClient.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -403,13 +403,15 @@ public override async Task<PartitionProperties> GetPartitionPropertiesAsync(stri
403403
/// </summary>
404404
///
405405
/// <param name="partitionId">The identifier of the partition to which the transport producer should be bound; if <c>null</c>, the producer is unbound.</param>
406+
/// <param name="producerIdentifier">The identifier to associate with the consumer; if <c>null</c> or <see cref="string.Empty" />, a random identifier will be generated.</param>
406407
/// <param name="requestedFeatures">The flags specifying the set of special transport features that should be opted-into.</param>
407408
/// <param name="partitionOptions">The set of options, if any, that should be considered when initializing the producer.</param>
408409
/// <param name="retryPolicy">The policy which governs retry behavior and try timeouts.</param>
409410
///
410411
/// <returns>A <see cref="TransportProducer"/> configured in the requested manner.</returns>
411412
///
412413
public override TransportProducer CreateProducer(string partitionId,
414+
string producerIdentifier,
413415
TransportProducerFeatures requestedFeatures,
414416
PartitionPublishingOptions partitionOptions,
415417
EventHubsRetryPolicy retryPolicy)
@@ -420,6 +422,7 @@ public override TransportProducer CreateProducer(string partitionId,
420422
(
421423
EventHubName,
422424
partitionId,
425+
producerIdentifier,
423426
ConnectionScope,
424427
MessageConverter,
425428
retryPolicy,
@@ -447,6 +450,7 @@ public override TransportProducer CreateProducer(string partitionId,
447450
///
448451
/// <param name="consumerGroup">The name of the consumer group this consumer is associated with. Events are read in the context of this group.</param>
449452
/// <param name="partitionId">The identifier of the Event Hub partition from which events will be received.</param>
453+
/// <param name="consumerIdentifier">The identifier to associate with the consumer; if <c>null</c> or <see cref="string.Empty" />, a random identifier will be generated.</param>
450454
/// <param name="eventPosition">The position within the partition where the consumer should begin reading events.</param>
451455
/// <param name="retryPolicy">The policy which governs retry behavior and try timeouts.</param>
452456
/// <param name="trackLastEnqueuedEventProperties">Indicates whether information on the last enqueued event on the partition is sent as events are received.</param>
@@ -459,6 +463,7 @@ public override TransportProducer CreateProducer(string partitionId,
459463
///
460464
public override TransportConsumer CreateConsumer(string consumerGroup,
461465
string partitionId,
466+
string consumerIdentifier,
462467
EventPosition eventPosition,
463468
EventHubsRetryPolicy retryPolicy,
464469
bool trackLastEnqueuedEventProperties,
@@ -474,6 +479,7 @@ public override TransportConsumer CreateConsumer(string consumerGroup,
474479
EventHubName,
475480
consumerGroup,
476481
partitionId,
482+
consumerIdentifier,
477483
eventPosition,
478484
trackLastEnqueuedEventProperties,
479485
invalidateConsumerWhenPartitionStolen,

sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpConnectionScope.cs

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,7 @@ public virtual async Task<RequestResponseAmqpLink> OpenManagementLinkAsync(TimeS
308308
/// <param name="prefetchSizeInBytes">The cache size of the prefetch queue. When set, the link makes a best effort to ensure prefetched messages fit into the specified size.</param>
309309
/// <param name="ownerLevel">The relative priority to associate with the link; for a non-exclusive link, this value should be <c>null</c>.</param>
310310
/// <param name="trackLastEnqueuedEventProperties">Indicates whether information on the last enqueued event on the partition is sent as events are received.</param>
311+
/// <param name="linkIdentifier">The identifier to assign to the link; if <c>null</c> or <see cref="string.Empty" />, a random identifier will be generated.</param>
311312
/// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request to cancel the operation.</param>
312313
///
313314
/// <returns>A link for use with consumer operations.</returns>
@@ -320,6 +321,7 @@ public virtual async Task<ReceivingAmqpLink> OpenConsumerLinkAsync(string consum
320321
long? prefetchSizeInBytes,
321322
long? ownerLevel,
322323
bool trackLastEnqueuedEventProperties,
324+
string linkIdentifier,
323325
CancellationToken cancellationToken)
324326
{
325327
Argument.AssertNotDisposed(_disposed, nameof(AmqpConnectionScope));
@@ -341,6 +343,11 @@ public virtual async Task<ReceivingAmqpLink> OpenConsumerLinkAsync(string consum
341343
var connection = await ActiveConnection.GetOrCreateAsync(timeout).ConfigureAwait(false);
342344
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
343345

346+
if (string.IsNullOrEmpty(linkIdentifier))
347+
{
348+
linkIdentifier = Guid.NewGuid().ToString();
349+
}
350+
344351
var link = await CreateReceivingLinkAsync(
345352
connection,
346353
consumerEndpoint,
@@ -350,6 +357,7 @@ public virtual async Task<ReceivingAmqpLink> OpenConsumerLinkAsync(string consum
350357
prefetchSizeInBytes,
351358
ownerLevel,
352359
trackLastEnqueuedEventProperties,
360+
linkIdentifier,
353361
cancellationToken
354362
).ConfigureAwait(false);
355363

@@ -379,6 +387,7 @@ public virtual async Task<ReceivingAmqpLink> OpenConsumerLinkAsync(string consum
379387
/// <param name="features">The set of features which are active for the producer requesting the link.</param>
380388
/// <param name="options">The set of options to consider when creating the link.</param>
381389
/// <param name="timeout">The timeout to apply when creating the link.</param>
390+
/// <param name="linkIdentifier">The identifier to assign to the link; if <c>null</c> or <see cref="string.Empty" />, a random identifier will be generated.</param>
382391
/// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request to cancel the operation.</param>
383392
///
384393
/// <returns>A link for use with producer operations.</returns>
@@ -387,6 +396,7 @@ public virtual async Task<SendingAmqpLink> OpenProducerLinkAsync(string partitio
387396
TransportProducerFeatures features,
388397
PartitionPublishingOptions options,
389398
TimeSpan timeout,
399+
string linkIdentifier,
390400
CancellationToken cancellationToken)
391401
{
392402
Argument.AssertNotDisposed(_disposed, nameof(AmqpConnectionScope));
@@ -405,7 +415,12 @@ public virtual async Task<SendingAmqpLink> OpenProducerLinkAsync(string partitio
405415
var connection = await ActiveConnection.GetOrCreateAsync(timeout).ConfigureAwait(false);
406416
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
407417

408-
var link = await CreateSendingLinkAsync(connection, producerEndpoint, features, options, timeout.CalculateRemaining(stopWatch.GetElapsedTime()), cancellationToken).ConfigureAwait(false);
418+
if (string.IsNullOrEmpty(linkIdentifier))
419+
{
420+
linkIdentifier = Guid.NewGuid().ToString();
421+
}
422+
423+
var link = await CreateSendingLinkAsync(connection, producerEndpoint, features, options, timeout.CalculateRemaining(stopWatch.GetElapsedTime()), linkIdentifier, cancellationToken).ConfigureAwait(false);
409424
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
410425

411426
await OpenAmqpObjectAsync(link, timeout.CalculateRemaining(stopWatch.GetElapsedTime())).ConfigureAwait(false);
@@ -587,12 +602,13 @@ protected virtual async Task<RequestResponseAmqpLink> CreateManagementLinkAsync(
587602
///
588603
/// <param name="connection">The active and opened AMQP connection to use for this link.</param>
589604
/// <param name="endpoint">The fully qualified endpoint to open the link for.</param>
605+
/// <param name="timeout">The timeout to apply when creating the link.</param>
590606
/// <param name="eventPosition">The position of the event in the partition where the link should be filtered to.</param>
591607
/// <param name="prefetchCount">Controls the number of events received and queued locally without regard to whether an operation was requested.</param>
592608
/// <param name="prefetchSizeInBytes">The cache size of the prefetch queue. When set, the link makes a best effort to ensure prefetched messages fit into the specified size.</param>
593609
/// <param name="ownerLevel">The relative priority to associate with the link; for a non-exclusive link, this value should be <c>null</c>.</param>
594610
/// <param name="trackLastEnqueuedEventProperties">Indicates whether information on the last enqueued event on the partition is sent as events are received.</param>
595-
/// <param name="timeout">The timeout to apply when creating the link.</param>
611+
/// <param name="linkIdentifier">The identifier to assign to the link; this is assumed to be a non-null value.</param>
596612
/// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request to cancel the operation.</param>
597613
///
598614
/// <returns>A link for use for operations related to receiving events.</returns>
@@ -605,6 +621,7 @@ protected virtual async Task<ReceivingAmqpLink> CreateReceivingLinkAsync(AmqpCon
605621
long? prefetchSizeInBytes,
606622
long? ownerLevel,
607623
bool trackLastEnqueuedEventProperties,
624+
string linkIdentifier,
608625
CancellationToken cancellationToken)
609626
{
610627
Argument.AssertNotDisposed(IsDisposed, nameof(AmqpConnectionScope));
@@ -641,7 +658,7 @@ protected virtual async Task<ReceivingAmqpLink> CreateReceivingLinkAsync(AmqpCon
641658
AutoSendFlow = prefetchCount > 0,
642659
SettleType = SettleMode.SettleOnSend,
643660
Source = new Source { Address = endpoint.AbsolutePath, FilterSet = filters },
644-
Target = new Target { Address = Guid.NewGuid().ToString() },
661+
Target = new Target { Address = linkIdentifier },
645662
TotalCacheSizeInBytes = prefetchSizeInBytes
646663
};
647664

@@ -652,6 +669,11 @@ protected virtual async Task<ReceivingAmqpLink> CreateReceivingLinkAsync(AmqpCon
652669
linkSettings.AddProperty(AmqpProperty.ConsumerOwnerLevel, ownerLevel.Value);
653670
}
654671

672+
if (linkIdentifier != null)
673+
{
674+
linkSettings.AddProperty(AmqpProperty.ConsumerIdentifier, linkIdentifier);
675+
}
676+
655677
if (trackLastEnqueuedEventProperties)
656678
{
657679
linkSettings.DesiredCapabilities ??= new Multiple<AmqpSymbol>();
@@ -705,6 +727,7 @@ protected virtual async Task<ReceivingAmqpLink> CreateReceivingLinkAsync(AmqpCon
705727
/// <param name="features">The set of features which are active for the producer for which the link is being created.</param>
706728
/// <param name="options">The set of options to consider when creating the link.</param>
707729
/// <param name="timeout">The timeout to apply when creating the link.</param>
730+
/// <param name="linkIdentifier">The identifier to assign to the link; this is assumed to be a non-null value.</param>
708731
/// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request to cancel the operation.</param>
709732
///
710733
/// <returns>A link for use for operations related to receiving events.</returns>
@@ -714,6 +737,7 @@ protected virtual async Task<SendingAmqpLink> CreateSendingLinkAsync(AmqpConnect
714737
TransportProducerFeatures features,
715738
PartitionPublishingOptions options,
716739
TimeSpan timeout,
740+
string linkIdentifier,
717741
CancellationToken cancellationToken)
718742
{
719743
Argument.AssertNotDisposed(IsDisposed, nameof(AmqpConnectionScope));
@@ -744,7 +768,7 @@ protected virtual async Task<SendingAmqpLink> CreateSendingLinkAsync(AmqpConnect
744768
{
745769
Role = false,
746770
InitialDeliveryCount = 0,
747-
Source = new Source { Address = Guid.NewGuid().ToString() },
771+
Source = new Source { Address = linkIdentifier },
748772
Target = new Target { Address = endpoint.AbsolutePath }
749773
};
750774

0 commit comments

Comments
 (0)