Skip to content

Commit df46c56

Browse files
authored
[Event Hubs Client] Custom Endpoint Support (Azure#18014)
The focus of these changes is to add an option for specifying a custom address to use for establishing the connection to the Event Hubs service; the namespace address will continue to be used for communication with the service once the connection is established when the custom endpoint is configured.
1 parent a99c3e0 commit df46c56

14 files changed

+192
-70
lines changed

sdk/eventhub/Azure.Messaging.EventHubs.Processor/Azure.Messaging.EventHubs.Processor.sln

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Azure.Core.TestFramework",
1111
EndProject
1212
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "External", "External", "{797FF941-76FD-45FD-AC17-A73DFE2BA621}"
1313
EndProject
14+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Azure.Messaging.EventHubs", "..\Azure.Messaging.EventHubs\src\Azure.Messaging.EventHubs.csproj", "{9E1B2729-AB07-4C4D-A05E-8B1ACDE968A4}"
15+
EndProject
1416
Global
1517
GlobalSection(SolutionConfigurationPlatforms) = preSolution
1618
Debug|Any CPU = Debug|Any CPU
@@ -29,12 +31,17 @@ Global
2931
{7DFF0E65-DC9A-410D-9A11-AD6A06860FE1}.Debug|Any CPU.Build.0 = Debug|Any CPU
3032
{7DFF0E65-DC9A-410D-9A11-AD6A06860FE1}.Release|Any CPU.ActiveCfg = Release|Any CPU
3133
{7DFF0E65-DC9A-410D-9A11-AD6A06860FE1}.Release|Any CPU.Build.0 = Release|Any CPU
34+
{9E1B2729-AB07-4C4D-A05E-8B1ACDE968A4}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
35+
{9E1B2729-AB07-4C4D-A05E-8B1ACDE968A4}.Debug|Any CPU.Build.0 = Debug|Any CPU
36+
{9E1B2729-AB07-4C4D-A05E-8B1ACDE968A4}.Release|Any CPU.ActiveCfg = Release|Any CPU
37+
{9E1B2729-AB07-4C4D-A05E-8B1ACDE968A4}.Release|Any CPU.Build.0 = Release|Any CPU
3238
EndGlobalSection
3339
GlobalSection(SolutionProperties) = preSolution
3440
HideSolutionNode = FALSE
3541
EndGlobalSection
3642
GlobalSection(NestedProjects) = preSolution
3743
{7DFF0E65-DC9A-410D-9A11-AD6A06860FE1} = {797FF941-76FD-45FD-AC17-A73DFE2BA621}
44+
{9E1B2729-AB07-4C4D-A05E-8B1ACDE968A4} = {797FF941-76FD-45FD-AC17-A73DFE2BA621}
3845
EndGlobalSection
3946
GlobalSection(ExtensibilityGlobals) = postSolution
4047
SolutionGuid = {44BD3BD5-61DF-464D-8627-E00B0BC4B3A3}

sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/Azure.Messaging.EventHubs.Processor.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
<ItemGroup>
1515
<!-- TEMP -->
16-
<PackageReference Include="Azure.Messaging.EventHubs" VersionOverride="5.3.0-beta.4" /><!-- This override will be removed when v5.3.0 is released for GA -->
16+
<ProjectReference Include="..\..\Azure.Messaging.EventHubs\src\Azure.Messaging.EventHubs.csproj" /><!-- This will be changed to a package reference when v5.3.0 is released for GA -->
1717
<!--END TEMP-->
1818

1919
<PackageReference Include="Azure.Storage.Blobs" />

sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Core/EventHubConnectionOptionsExtensions.cs

100755100644
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ public static EventHubConnectionOptions Clone(this EventHubConnectionOptions ins
2222
new EventHubConnectionOptions
2323
{
2424
TransportType = instance.TransportType,
25-
Proxy = instance.Proxy
25+
Proxy = instance.Proxy,
26+
CustomEndpointAddress = instance.CustomEndpointAddress
2627
};
2728
}
2829
}

sdk/eventhub/Azure.Messaging.EventHubs.Shared/tests/Azure.Messaging.EventHubs.Shared.Tests.csproj

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
<ItemGroup>
2222
<ProjectReference Include="../../Azure.Messaging.EventHubs/src/Azure.Messaging.EventHubs.csproj" />
2323
<ProjectReference Include="$(AzureCoreTestFramework)" />
24-
<ProjectReference Include="..\..\..\core\Azure.Core\src\Azure.Core.csproj" />
2524
</ItemGroup>
2625

2726
<!-- Import Azure.Core shared source -->

sdk/eventhub/Azure.Messaging.EventHubs.Shared/tests/Core/EventHubConnectionOptionsExtensionsTests.cs

100755100644
Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// Copyright (c) Microsoft Corporation. All rights reserved.
22
// Licensed under the MIT License.
33

4+
using System;
45
using System.Net;
56
using Azure.Messaging.EventHubs.Core;
67
using Moq;
@@ -27,13 +28,15 @@ public void CloneProducesACopy()
2728
var options = new EventHubConnectionOptions
2829
{
2930
TransportType = EventHubsTransportType.AmqpWebSockets,
30-
Proxy = Mock.Of<IWebProxy>()
31+
Proxy = Mock.Of<IWebProxy>(),
32+
CustomEndpointAddress = new Uri("https://fake.servciebus.net")
3133
};
3234

3335
EventHubConnectionOptions clone = options.Clone();
3436
Assert.That(clone, Is.Not.Null, "The clone should not be null.");
3537
Assert.That(clone.TransportType, Is.EqualTo(options.TransportType), "The connection type of the clone should match.");
3638
Assert.That(clone.Proxy, Is.EqualTo(options.Proxy), "The proxy of the clone should match.");
39+
Assert.That(clone.CustomEndpointAddress, Is.EqualTo(options.CustomEndpointAddress), "The custom endpoint address clone should match.");
3740
}
3841
}
3942
}

sdk/eventhub/Azure.Messaging.EventHubs/api/Azure.Messaging.EventHubs.netstandard2.0.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ public EventHubConnection(string connectionString, string eventHubName, Azure.Me
4949
public partial class EventHubConnectionOptions
5050
{
5151
public EventHubConnectionOptions() { }
52+
public System.Uri CustomEndpointAddress { get { throw null; } set { } }
5253
public System.Net.IWebProxy Proxy { get { throw null; } set { } }
5354
public Azure.Messaging.EventHubs.EventHubsTransportType TransportType { get { throw null; } set { } }
5455
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]

sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpClient.cs

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,12 @@ internal class AmqpClient : TransportClient
5656
///
5757
public override Uri ServiceEndpoint { get; }
5858

59+
/// <summary>
60+
/// The endpoint to used establishing a connection to the Event Hubs service to which the scope is associated.
61+
/// </summary>
62+
///
63+
public Uri ConnectionEndpoint { get; }
64+
5965
/// <summary>
6066
/// The name of the Event Hub to which the client is bound.
6167
/// </summary>
@@ -154,10 +160,21 @@ protected AmqpClient(string host,
154160
Host = host
155161
}.Uri;
156162

163+
ConnectionEndpoint = clientOptions.CustomEndpointAddress switch
164+
{
165+
null => ServiceEndpoint,
166+
167+
_ => new UriBuilder
168+
{
169+
Scheme = ServiceEndpoint.Scheme,
170+
Host = clientOptions.CustomEndpointAddress.Host
171+
}.Uri
172+
};
173+
157174
EventHubName = eventHubName;
158175
Credential = credential;
159176
MessageConverter = messageConverter ?? new AmqpMessageConverter();
160-
ConnectionScope = connectionScope ?? new AmqpConnectionScope(ServiceEndpoint, eventHubName, credential, clientOptions.TransportType, clientOptions.Proxy);
177+
ConnectionScope = connectionScope ?? new AmqpConnectionScope(ServiceEndpoint, ConnectionEndpoint, eventHubName, credential, clientOptions.TransportType, clientOptions.Proxy);
161178

162179
ManagementLink = new FaultTolerantAmqpObject<RequestResponseAmqpLink>(
163180
timeout => ConnectionScope.OpenManagementLinkAsync(timeout, CancellationToken.None),
@@ -233,7 +250,7 @@ public override async Task<EventHubProperties> GetPropertiesAsync(EventHubsRetry
233250
++failedAttemptCount;
234251
retryDelay = retryPolicy.CalculateRetryDelay(activeEx, failedAttemptCount);
235252

236-
if ((retryDelay.HasValue) && (!ConnectionScope.IsDisposed) && (!cancellationToken.IsCancellationRequested))
253+
if ((retryDelay.HasValue) && (!ConnectionScope.IsDisposed) && (!_closed) && (!cancellationToken.IsCancellationRequested))
237254
{
238255
EventHubsEventSource.Log.GetPropertiesError(EventHubName, activeEx.Message);
239256
await Task.Delay(retryDelay.Value, cancellationToken).ConfigureAwait(false);
@@ -333,7 +350,7 @@ public override async Task<PartitionProperties> GetPartitionPropertiesAsync(stri
333350
++failedAttemptCount;
334351
retryDelay = retryPolicy.CalculateRetryDelay(activeEx, failedAttemptCount);
335352

336-
if ((retryDelay.HasValue) && (!ConnectionScope.IsDisposed) && (!cancellationToken.IsCancellationRequested))
353+
if ((retryDelay.HasValue) && (!ConnectionScope.IsDisposed) && (!_closed) && (!cancellationToken.IsCancellationRequested))
337354
{
338355
EventHubsEventSource.Log.GetPartitionPropertiesError(EventHubName, partitionId, activeEx.Message);
339356
await Task.Delay(retryDelay.Value, cancellationToken).ConfigureAwait(false);

sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpConnectionScope.cs

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,12 @@ public bool IsDisposed
126126
///
127127
private Uri ServiceEndpoint { get; }
128128

129+
/// <summary>
130+
/// The endpoint to used establishing a connection to the Event Hubs service to which the scope is associated.
131+
/// </summary>
132+
///
133+
private Uri ConnectionEndpoint { get; }
134+
129135
/// <summary>
130136
/// The name of the Event Hub to which the scope is associated.
131137
/// </summary>
@@ -160,36 +166,37 @@ public bool IsDisposed
160166
/// Initializes a new instance of the <see cref="AmqpConnectionScope"/> class.
161167
/// </summary>
162168
///
163-
/// <param name="serviceEndpoint">Endpoint for the Event Hubs service to which the scope is associated.</param>
169+
/// <param name="serviceEndpoint">The endpoint for the Event Hubs service to which the scope is associated.</param>
170+
/// <param name="connectionEndpoint">The endpoint to used establishing a connection to the Event Hubs service to which the scope is associated.</param>
164171
/// <param name="eventHubName"> The name of the Event Hub to which the scope is associated.</param>
165172
/// <param name="credential">The credential to use for authorization with the Event Hubs service.</param>
166173
/// <param name="transport">The transport to use for communication.</param>
167174
/// <param name="proxy">The proxy, if any, to use for communication.</param>
168175
/// <param name="identifier">The identifier to assign this scope; if not provided, one will be generated.</param>
169176
///
170177
public AmqpConnectionScope(Uri serviceEndpoint,
178+
Uri connectionEndpoint,
171179
string eventHubName,
172180
EventHubTokenCredential credential,
173181
EventHubsTransportType transport,
174182
IWebProxy proxy,
175183
string identifier = default)
176184
{
177185
Argument.AssertNotNull(serviceEndpoint, nameof(serviceEndpoint));
186+
Argument.AssertNotNull(connectionEndpoint, nameof(connectionEndpoint));
178187
Argument.AssertNotNullOrEmpty(eventHubName, nameof(eventHubName));
179188
Argument.AssertNotNull(credential, nameof(credential));
180189
ValidateTransport(transport);
181190

182191
ServiceEndpoint = serviceEndpoint;
192+
ConnectionEndpoint = connectionEndpoint;
183193
EventHubName = eventHubName;
184194
Transport = transport;
185195
Proxy = proxy;
186196
TokenProvider = new CbsTokenProvider(new EventHubTokenCredential(credential, serviceEndpoint.ToString()), OperationCancellationSource.Token);
187197
Id = identifier ?? $"{ eventHubName }-{ Guid.NewGuid().ToString("D", CultureInfo.InvariantCulture).Substring(0, 8) }";
188198

189-
#pragma warning disable CA2214 // Do not call overridable methods in constructors. This internal method is virtual for testing purposes.
190-
Task<AmqpConnection> connectionFactory(TimeSpan timeout) => CreateAndOpenConnectionAsync(AmqpVersion, ServiceEndpoint, Transport, Proxy, Id, timeout);
191-
#pragma warning restore CA2214 // Do not call overridable methods in constructors
192-
199+
Task<AmqpConnection> connectionFactory(TimeSpan timeout) => CreateAndOpenConnectionAsync(AmqpVersion, ServiceEndpoint, ConnectionEndpoint, Transport, Proxy, Id, timeout);
193200
ActiveConnection = new FaultTolerantAmqpObject<AmqpConnection>(connectionFactory, CloseConnection);
194201
}
195202

@@ -351,6 +358,7 @@ public void Dispose()
351358
///
352359
/// <param name="amqpVersion">The version of AMQP to use for the connection.</param>
353360
/// <param name="serviceEndpoint">The endpoint for the Event Hubs service to which the scope is associated.</param>
361+
/// <param name="connectionEndpoint">The endpoint to used establishing a connection to the Event Hubs service to which the scope is associated.</param>
354362
/// <param name="transportType">The type of transport to use for communication.</param>
355363
/// <param name="proxy">The proxy, if any, to use for communication.</param>
356364
/// <param name="scopeIdentifier">The unique identifier for the associated scope.</param>
@@ -360,35 +368,34 @@ public void Dispose()
360368
///
361369
protected virtual async Task<AmqpConnection> CreateAndOpenConnectionAsync(Version amqpVersion,
362370
Uri serviceEndpoint,
371+
Uri connectionEndpoint,
363372
EventHubsTransportType transportType,
364373
IWebProxy proxy,
365374
string scopeIdentifier,
366375
TimeSpan timeout)
367376
{
368-
var hostName = serviceEndpoint.Host;
369-
AmqpSettings amqpSettings = CreateAmpqSettings(AmqpVersion);
370-
AmqpConnectionSettings connectionSetings = CreateAmqpConnectionSettings(hostName, scopeIdentifier);
377+
var amqpSettings = CreateAmpqSettings(AmqpVersion);
378+
var connectionSetings = CreateAmqpConnectionSettings(serviceEndpoint.Host, scopeIdentifier);
371379

372-
TransportSettings transportSettings = transportType.IsWebSocketTransport()
373-
? CreateTransportSettingsForWebSockets(hostName, proxy)
374-
: CreateTransportSettingsforTcp(hostName, serviceEndpoint.Port);
380+
var transportSettings = transportType.IsWebSocketTransport()
381+
? CreateTransportSettingsForWebSockets(connectionEndpoint.Host, proxy)
382+
: CreateTransportSettingsforTcp(connectionEndpoint.Host, connectionEndpoint.Port);
375383

376384
// Create and open the connection, respecting the timeout constraint
377385
// that was received.
378386

379387
var stopWatch = ValueStopwatch.StartNew();
380388

381389
var initiator = new AmqpTransportInitiator(amqpSettings, transportSettings);
382-
TransportBase transport = await initiator.ConnectTaskAsync(timeout).ConfigureAwait(false);
390+
var transport = await initiator.ConnectTaskAsync(timeout).ConfigureAwait(false);
383391

384392
var connection = new AmqpConnection(transport, amqpSettings, connectionSetings);
385393
await OpenAmqpObjectAsync(connection, timeout.CalculateRemaining(stopWatch.GetElapsedTime())).ConfigureAwait(false);
386394

387-
#pragma warning disable CA1806 // Do not ignore method results
388395
// Create the CBS link that will be used for authorization. The act of creating the link will associate
389396
// it with the connection.
390-
new AmqpCbsLink(connection);
391-
#pragma warning restore CA1806 // Do not ignore method results
397+
398+
_ = new AmqpCbsLink(connection);
392399

393400
// When the connection is closed, close each of the links associated with it.
394401

sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpConsumer.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -301,7 +301,7 @@ public override async Task<IReadOnlyList<EventData>> ReceiveAsync(int maximumEve
301301
++failedAttemptCount;
302302
retryDelay = RetryPolicy.CalculateRetryDelay(activeEx, failedAttemptCount);
303303

304-
if ((retryDelay.HasValue) && (!ConnectionScope.IsDisposed) && (!cancellationToken.IsCancellationRequested))
304+
if ((retryDelay.HasValue) && (!ConnectionScope.IsDisposed) && (!_closed) && (!cancellationToken.IsCancellationRequested))
305305
{
306306
EventHubsEventSource.Log.EventReceiveError(EventHubName, ConsumerGroup, PartitionId, operationId, activeEx.Message);
307307
await Task.Delay(UseMinimum(retryDelay.Value, waitTime.CalculateRemaining(stopWatch.GetElapsedTime())), cancellationToken).ConfigureAwait(false);

sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpProducer.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,7 @@ public override async ValueTask<TransportEventBatch> CreateBatchAsync(CreateBatc
264264
var activeEx = ex.TranslateServiceException(EventHubName);
265265
var retryDelay = RetryPolicy.CalculateRetryDelay(activeEx, failedAttemptCount);
266266

267-
if ((retryDelay.HasValue) && (!ConnectionScope.IsDisposed) && (!cancellationToken.IsCancellationRequested))
267+
if ((retryDelay.HasValue) && (!ConnectionScope.IsDisposed) && (!_closed) && (!cancellationToken.IsCancellationRequested))
268268
{
269269
await Task.Delay(retryDelay.Value, cancellationToken).ConfigureAwait(false);
270270
tryTimeout = RetryPolicy.CalculateTryTimeout(failedAttemptCount);
@@ -478,7 +478,7 @@ protected virtual async Task SendAsync(Func<AmqpMessage> messageFactory,
478478
++failedAttemptCount;
479479
retryDelay = RetryPolicy.CalculateRetryDelay(activeEx, failedAttemptCount);
480480

481-
if ((retryDelay.HasValue) && (!ConnectionScope.IsDisposed) && (!cancellationToken.IsCancellationRequested))
481+
if ((retryDelay.HasValue) && (!ConnectionScope.IsDisposed) && (!_closed) && (!cancellationToken.IsCancellationRequested))
482482
{
483483
EventHubsEventSource.Log.EventPublishError(EventHubName, logPartition, operationId, activeEx.Message);
484484
await Task.Delay(retryDelay.Value, cancellationToken).ConfigureAwait(false);

0 commit comments

Comments
 (0)