Skip to content

Commit 26ed740

Browse files
authored
[Event Hubs Client] AMQP Cancellation Support (Azure#25654)
The focus of these changes is to take advantage of the support for cancellation tokens that was added to the AMQP transport library, preserving existing timeout behavior by specifying an operation timeout for the link.
1 parent 73cbe40 commit 26ed740

15 files changed

+322
-265
lines changed

sdk/eventhub/Azure.Messaging.EventHubs/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
### Features Added
66

7+
- Support for cancellation tokens has been improved for AMQP operations, enabling earlier detection of cancellation requests without needing to wait for the configured timeout to elapse.
8+
79
### Breaking Changes
810

911
### Bugs Fixed

sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpClient.cs

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ internal class AmqpClient : TransportClient
9999
///
100100
/// <param name="host">The fully qualified host name for the Event Hubs namespace. This is likely to be similar to <c>{yournamespace}.servicebus.windows.net</c>.</param>
101101
/// <param name="eventHubName">The name of the specific Event Hub to connect the client to.</param>
102+
/// <param name="operationTimeout">The amount of time to allow for an AMQP operation using the link to complete before attempting to cancel it.</param>
102103
/// <param name="credential">The Azure managed identity credential to use for authorization. Access controls may be specified by the Event Hubs namespace or the requested Event Hub, depending on Azure configuration.</param>
103104
/// <param name="clientOptions">A set of options to apply when configuring the client.</param>
104105
///
@@ -113,8 +114,9 @@ internal class AmqpClient : TransportClient
113114
///
114115
public AmqpClient(string host,
115116
string eventHubName,
117+
TimeSpan operationTimeout,
116118
EventHubTokenCredential credential,
117-
EventHubConnectionOptions clientOptions) : this(host, eventHubName, credential, clientOptions, null, null)
119+
EventHubConnectionOptions clientOptions) : this(host, eventHubName, operationTimeout, credential, clientOptions, null, null)
118120
{
119121
}
120122

@@ -124,6 +126,7 @@ public AmqpClient(string host,
124126
///
125127
/// <param name="host">The fully qualified host name for the Event Hubs namespace. This is likely to be similar to <c>{yournamespace}.servicebus.windows.net</c>.</param>
126128
/// <param name="eventHubName">The name of the specific Event Hub to connect the client to.</param>
129+
/// <param name="operationTimeout">The amount of time to allow for an AMQP operation using the link to complete before attempting to cancel it.</param>
127130
/// <param name="credential">The Azure managed identity credential to use for authorization. Access controls may be specified by the Event Hubs namespace or the requested Event Hub, depending on Azure configuration.</param>
128131
/// <param name="clientOptions">A set of options to apply when configuring the client.</param>
129132
/// <param name="connectionScope">The optional scope to use for AMQP connection management. If <c>null</c>, a new scope will be created.</param>
@@ -140,13 +143,15 @@ public AmqpClient(string host,
140143
///
141144
protected AmqpClient(string host,
142145
string eventHubName,
146+
TimeSpan operationTimeout,
143147
EventHubTokenCredential credential,
144148
EventHubConnectionOptions clientOptions,
145149
AmqpConnectionScope connectionScope,
146150
AmqpMessageConverter messageConverter)
147151
{
148152
Argument.AssertNotNullOrEmpty(host, nameof(host));
149153
Argument.AssertNotNullOrEmpty(eventHubName, nameof(eventHubName));
154+
Argument.AssertNotNegative(operationTimeout, nameof(operationTimeout));
150155
Argument.AssertNotNull(credential, nameof(credential));
151156
Argument.AssertNotNull(clientOptions, nameof(clientOptions));
152157

@@ -189,7 +194,7 @@ protected AmqpClient(string host,
189194
clientOptions.CertificateValidationCallback);
190195

191196
ManagementLink = new FaultTolerantAmqpObject<RequestResponseAmqpLink>(
192-
timeout => ConnectionScope.OpenManagementLinkAsync(timeout, CancellationToken.None),
197+
linkTimeout => ConnectionScope.OpenManagementLinkAsync(operationTimeout, linkTimeout, CancellationToken.None),
193198
link =>
194199
{
195200
link.Session?.SafeClose();
@@ -238,17 +243,13 @@ public override async Task<EventHubProperties> GetPropertiesAsync(EventHubsRetry
238243

239244
var token = await AcquireAccessTokenAsync(cancellationToken).ConfigureAwait(false);
240245
using AmqpMessage request = MessageConverter.CreateEventHubPropertiesRequest(EventHubName, token);
241-
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
242246

243247
RequestResponseAmqpLink link = await ManagementLink.GetOrCreateAsync(UseMinimum(ConnectionScope.SessionTimeout, tryTimeout.CalculateRemaining(stopWatch.GetElapsedTime()))).ConfigureAwait(false);
244248
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
245249

246-
// Send the request and wait for the response.
247-
248-
using AmqpMessage response = await link.RequestAsync(request, tryTimeout.CalculateRemaining(stopWatch.GetElapsedTime())).ConfigureAwait(false);
249-
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
250+
// Send the request and process the response.
250251

251-
// Process the response.
252+
using AmqpMessage response = await link.RequestAsync(request, cancellationToken).ConfigureAwait(false);
252253

253254
AmqpError.ThrowIfErrorResponse(response, EventHubName);
254255
return MessageConverter.CreateEventHubPropertiesFromResponse(response);
@@ -338,17 +339,13 @@ public override async Task<PartitionProperties> GetPartitionPropertiesAsync(stri
338339

339340
token = await AcquireAccessTokenAsync(cancellationToken).ConfigureAwait(false);
340341
using AmqpMessage request = MessageConverter.CreatePartitionPropertiesRequest(EventHubName, partitionId, token);
341-
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
342342

343343
link = await ManagementLink.GetOrCreateAsync(UseMinimum(ConnectionScope.SessionTimeout, tryTimeout.CalculateRemaining(stopWatch.GetElapsedTime()))).ConfigureAwait(false);
344344
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
345345

346-
// Send the request and wait for the response.
346+
// Send the request and process the response.
347347

348-
using AmqpMessage response = await link.RequestAsync(request, tryTimeout.CalculateRemaining(stopWatch.GetElapsedTime())).ConfigureAwait(false);
349-
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
350-
351-
// Process the response.
348+
using AmqpMessage response = await link.RequestAsync(request, cancellationToken).ConfigureAwait(false);
352349

353350
AmqpError.ThrowIfErrorResponse(response, EventHubName);
354351
return MessageConverter.CreatePartitionPropertiesFromResponse(response);
@@ -506,6 +503,7 @@ public override async Task CloseAsync(CancellationToken cancellationToken)
506503
return;
507504
}
508505

506+
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
509507
_closed = true;
510508

511509
var clientId = GetHashCode().ToString(CultureInfo.InvariantCulture);
@@ -514,11 +512,9 @@ public override async Task CloseAsync(CancellationToken cancellationToken)
514512
try
515513
{
516514
EventHubsEventSource.Log.ClientCloseStart(clientType, EventHubName, clientId);
517-
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
518515

519516
if (ManagementLink?.TryGetOpenedObject(out var _) == true)
520517
{
521-
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
522518
await ManagementLink.CloseAsync().ConfigureAwait(false);
523519
}
524520

0 commit comments

Comments
 (0)