Skip to content

Commit 76c6774

Browse files
[Event Hubs Extensions] Update EventHubClientFactory (Azure#31964)
* [Event Hubs Extensions] Update EventHubClientFactory * Fix comments and add tests. * Fix consumerCache and add tests
1 parent 34f6985 commit 76c6774

File tree

2 files changed

+120
-61
lines changed

2 files changed

+120
-61
lines changed

sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubClientFactory.cs

Lines changed: 54 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using System;
55
using System.Collections.Concurrent;
66
using Azure.Core;
7+
using Azure.Messaging.EventHubs;
78
using Azure.Messaging.EventHubs.Consumer;
89
using Azure.Messaging.EventHubs.Producer;
910
using Azure.Storage.Blobs;
@@ -48,37 +49,36 @@ internal EventHubProducerClient GetEventHubProducerClient(string eventHubName, s
4849
eventHubName = _nameResolver.ResolveWholeString(eventHubName);
4950
connection = _nameResolver.ResolveWholeString(connection);
5051

51-
return _producerCache.GetOrAdd(eventHubName, key =>
52+
if (!string.IsNullOrWhiteSpace(connection))
5253
{
53-
if (!string.IsNullOrWhiteSpace(connection))
54+
var info = ResolveConnectionInformation(connection);
55+
var eventHubProducerClientOptions = new EventHubProducerClientOptions
5456
{
55-
var info = ResolveConnectionInformation(connection);
56-
57-
if (info.FullyQualifiedEndpoint != null &&
58-
info.TokenCredential != null)
59-
{
60-
return new EventHubProducerClient(
61-
info.FullyQualifiedEndpoint,
62-
eventHubName,
63-
info.TokenCredential,
64-
new EventHubProducerClientOptions
65-
{
66-
RetryOptions = _options.ClientRetryOptions,
67-
ConnectionOptions = _options.ConnectionOptions
68-
});
69-
}
57+
RetryOptions = _options.ClientRetryOptions,
58+
ConnectionOptions = _options.ConnectionOptions
59+
};
7060

71-
return new EventHubProducerClient(
72-
NormalizeConnectionString(info.ConnectionString, eventHubName),
73-
new EventHubProducerClientOptions
74-
{
75-
RetryOptions = _options.ClientRetryOptions,
76-
ConnectionOptions = _options.ConnectionOptions
77-
});
61+
EventHubConnection eventHubConnection;
62+
63+
if (info.FullyQualifiedEndpoint != null &&
64+
info.TokenCredential != null)
65+
{
66+
eventHubConnection = new EventHubConnection(info.FullyQualifiedEndpoint, eventHubName, info.TokenCredential, eventHubProducerClientOptions.ConnectionOptions);
67+
}
68+
else
69+
{
70+
eventHubConnection = new EventHubConnection(NormalizeConnectionString(info.ConnectionString, eventHubName), eventHubProducerClientOptions.ConnectionOptions);
7871
}
7972

80-
throw new InvalidOperationException("No event hub sender named " + eventHubName);
81-
});
73+
return _producerCache.GetOrAdd(GenerateCacheKey(eventHubConnection), key =>
74+
{
75+
return new EventHubProducerClient(
76+
eventHubConnection,
77+
eventHubProducerClientOptions);
78+
});
79+
}
80+
81+
throw new InvalidOperationException("No event hub sender named " + eventHubName);
8282
}
8383

8484
internal EventProcessorHost GetEventProcessorHost(string eventHubName, string connection, string consumerGroup, bool singleDispatch)
@@ -123,48 +123,36 @@ internal IEventHubConsumerClient GetEventHubConsumerClient(string eventHubName,
123123
connection = _nameResolver.ResolveWholeString(connection);
124124
consumerGroup = _nameResolver.ResolveWholeString(consumerGroup);
125125

126-
return _consumerCache.GetOrAdd(eventHubName, name =>
126+
if (!string.IsNullOrEmpty(connection))
127127
{
128-
EventHubConsumerClient client = null;
129-
130-
if (!string.IsNullOrEmpty(connection))
128+
var info = ResolveConnectionInformation(connection);
129+
var eventHubConsumerClientOptions = new EventHubConsumerClientOptions
131130
{
132-
var info = ResolveConnectionInformation(connection);
131+
RetryOptions = _options.ClientRetryOptions,
132+
ConnectionOptions = _options.ConnectionOptions
133+
};
133134

134-
if (info.FullyQualifiedEndpoint != null &&
135-
info.TokenCredential != null)
136-
{
137-
client = new EventHubConsumerClient(
138-
consumerGroup,
139-
info.FullyQualifiedEndpoint,
140-
eventHubName,
141-
info.TokenCredential,
142-
new EventHubConsumerClientOptions
143-
{
144-
RetryOptions = _options.ClientRetryOptions,
145-
ConnectionOptions = _options.ConnectionOptions
146-
});
147-
}
148-
else
149-
{
150-
client = new EventHubConsumerClient(
151-
consumerGroup,
152-
NormalizeConnectionString(info.ConnectionString, eventHubName),
153-
new EventHubConsumerClientOptions
154-
{
155-
RetryOptions = _options.ClientRetryOptions,
156-
ConnectionOptions = _options.ConnectionOptions
157-
});
158-
}
159-
}
135+
EventHubConnection eventHubConnection;
160136

161-
if (client != null)
137+
if (info.FullyQualifiedEndpoint != null &&
138+
info.TokenCredential != null)
162139
{
163-
return new EventHubConsumerClientImpl(client);
140+
eventHubConnection = new EventHubConnection(info.FullyQualifiedEndpoint, eventHubName, info.TokenCredential, eventHubConsumerClientOptions.ConnectionOptions);
164141
}
142+
else
143+
{
144+
eventHubConnection = new EventHubConnection(NormalizeConnectionString(info.ConnectionString, eventHubName), eventHubConsumerClientOptions.ConnectionOptions);
145+
}
146+
147+
return _consumerCache.GetOrAdd(GenerateCacheKey(eventHubConnection, consumerGroup), key =>
148+
new EventHubConsumerClientImpl(
149+
new EventHubConsumerClient(
150+
consumerGroup,
151+
eventHubConnection,
152+
eventHubConsumerClientOptions)));
153+
}
165154

166-
throw new InvalidOperationException("No event hub receiver named " + eventHubName);
167-
});
155+
throw new InvalidOperationException("No event hub receiver named " + eventHubName);
168156
}
169157

170158
internal BlobContainerClient GetCheckpointStoreClient()
@@ -212,6 +200,11 @@ private EventHubsConnectionInformation ResolveConnectionInformation(string conne
212200
return new EventHubsConnectionInformation(fullyQualifiedNamespace, credential);
213201
}
214202

203+
private static string GenerateCacheKey(EventHubConnection eventHubConnection, string consumerGroup = null) =>
204+
consumerGroup == null
205+
? $"{eventHubConnection.FullyQualifiedNamespace}/{eventHubConnection.EventHubName}"
206+
: $"{eventHubConnection.FullyQualifiedNamespace}/{eventHubConnection.EventHubName}/{consumerGroup}";
207+
215208
private record EventHubsConnectionInformation
216209
{
217210
public EventHubsConnectionInformation(string connectionString)

sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubsClientFactoryTests.cs

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ namespace Microsoft.Azure.WebJobs.EventHubs.UnitTests
2424
public class EventHubsClientFactoryTests
2525
{
2626
private const string ConnectionString = "Endpoint=sb://test89123-ns-x.servicebus.windows.net/;SharedAccessKeyName=ReceiveRule;SharedAccessKey=secretkey";
27+
private const string AnotherConnectionString = "Endpoint=sb://test12345-ns-x.servicebus.windows.net/;SharedAccessKeyName=ReceiveRule;SharedAccessKey=secretkey";
2728
private const string ConnectionStringWithEventHub = "Endpoint=sb://test89123-ns-x.servicebus.windows.net/;SharedAccessKeyName=ReceiveRule;SharedAccessKey=secretkey;EntityPath=path2";
2829

2930
// Validate that if connection string has EntityPath, that takes precedence over the parameter.
@@ -101,6 +102,71 @@ public void ConsumersAndProducersAreCached()
101102
Assert.AreSame(consumer, consumer2);
102103
}
103104

105+
[Test]
106+
public void ConsumersWithSameNameAreProperlyCached()
107+
{
108+
EventHubOptions options = new EventHubOptions();
109+
110+
var componentFactoryMock = new Mock<AzureComponentFactory>();
111+
componentFactoryMock.Setup(c => c.CreateTokenCredential(
112+
It.Is<IConfiguration>(c => c["fullyQualifiedNamespace"] != null)))
113+
.Returns(new DefaultAzureCredential());
114+
115+
var configuration = ConfigurationUtilities.CreateConfiguration(
116+
new KeyValuePair<string, string>("connection1", ConnectionString),
117+
new KeyValuePair<string, string>("connection2", AnotherConnectionString),
118+
new KeyValuePair<string, string>("connection3:fullyQualifiedNamespace", "test89123-ns-x.servicebus.windows.net"),
119+
new KeyValuePair<string, string>("connection4:fullyQualifiedNamespace", "test12345-ns-x.servicebus.windows.net"));
120+
121+
var factory = ConfigurationUtilities.CreateFactory(configuration, options, componentFactoryMock.Object);
122+
var consumer1 = factory.GetEventHubConsumerClient("k1", "connection1", null);
123+
var consumer2 = factory.GetEventHubConsumerClient("k1", "connection2", "csg");
124+
var consumer3 = factory.GetEventHubConsumerClient("k1", "connection3", "csg");
125+
var consumer4 = factory.GetEventHubConsumerClient("k1", "connection4", "csg");
126+
127+
// Create different consumers for different eventhub namespaces.
128+
Assert.AreNotSame(consumer1, consumer2);
129+
Assert.AreNotSame(consumer3, consumer4);
130+
// Create different consumers for different consumer groups.
131+
Assert.AreNotSame(consumer1, consumer3);
132+
// Use the same consumer client for the same namespace/eventhub/consumergroup
133+
Assert.AreSame(consumer2, consumer4);
134+
}
135+
136+
[Test]
137+
public void ProducersWithSameNameAreProperlyCached()
138+
{
139+
EventHubOptions options = new EventHubOptions();
140+
141+
var componentFactoryMock = new Mock<AzureComponentFactory>();
142+
componentFactoryMock.Setup(c => c.CreateTokenCredential(
143+
It.Is<IConfiguration>(c => c["fullyQualifiedNamespace"] != null)))
144+
.Returns(new DefaultAzureCredential());
145+
146+
var configuration = ConfigurationUtilities.CreateConfiguration(
147+
new KeyValuePair<string, string>("connection1", ConnectionString),
148+
new KeyValuePair<string, string>("connection2", AnotherConnectionString),
149+
new KeyValuePair<string, string>("connection3:fullyQualifiedNamespace", "test89123-ns-x.servicebus.windows.net"),
150+
new KeyValuePair<string, string>("connection4:fullyQualifiedNamespace", "test12345-ns-x.servicebus.windows.net"));
151+
152+
var factory = ConfigurationUtilities.CreateFactory(configuration, options, componentFactoryMock.Object);
153+
var producer1 = factory.GetEventHubProducerClient("k1", "connection1");
154+
var producer2 = factory.GetEventHubProducerClient("k1", "connection2");
155+
var producer3 = factory.GetEventHubProducerClient("k1", "connection3");
156+
var producer4 = factory.GetEventHubProducerClient("k1", "connection4");
157+
158+
Assert.AreEqual("k1", producer1.EventHubName);
159+
Assert.AreEqual("k1", producer2.EventHubName);
160+
Assert.AreNotSame(producer1, producer2);
161+
162+
Assert.AreEqual("k1", producer3.EventHubName);
163+
Assert.AreEqual("k1", producer4.EventHubName);
164+
Assert.AreNotSame(producer3, producer4);
165+
166+
Assert.AreSame(producer1, producer3);
167+
Assert.AreSame(producer2, producer4);
168+
}
169+
104170
[Test]
105171
public void UsesDefaultConnectionToStorageAccount()
106172
{

0 commit comments

Comments
 (0)