Skip to content

Commit cc30f80

Browse files
Use batch size of 1 for single dispatch functions to keep tracing aligned between function and SDK (Azure#28341)
1 parent be9ca03 commit cc30f80

File tree

6 files changed

+168
-51
lines changed

6 files changed

+168
-51
lines changed

sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubClientFactory.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ internal EventHubProducerClient GetEventHubProducerClient(string eventHubName, s
8181
});
8282
}
8383

84-
internal EventProcessorHost GetEventProcessorHost(string eventHubName, string connection, string consumerGroup)
84+
internal EventProcessorHost GetEventProcessorHost(string eventHubName, string connection, string consumerGroup, bool singleDispatch)
8585
{
8686
consumerGroup ??= EventHubConsumerClient.DefaultConsumerGroupName;
8787

@@ -92,7 +92,7 @@ internal EventProcessorHost GetEventProcessorHost(string eventHubName, string co
9292
if (!string.IsNullOrEmpty(connection))
9393
{
9494
var info = ResolveConnectionInformation(connection);
95-
95+
var maxEventBatchSize = singleDispatch ? 1 : _options.MaxEventBatchSize;
9696
if (info.FullyQualifiedEndpoint != null &&
9797
info.TokenCredential != null)
9898
{
@@ -101,15 +101,15 @@ internal EventProcessorHost GetEventProcessorHost(string eventHubName, string co
101101
eventHubName: eventHubName,
102102
credential: info.TokenCredential,
103103
options: _options.EventProcessorOptions,
104-
eventBatchMaximumCount: _options.MaxEventBatchSize,
104+
eventBatchMaximumCount: maxEventBatchSize,
105105
exceptionHandler: _options.ExceptionHandler);
106106
}
107107

108108
return new EventProcessorHost(consumerGroup: consumerGroup,
109109
connectionString: NormalizeConnectionString(info.ConnectionString, eventHubName),
110110
eventHubName: eventHubName,
111111
options: _options.EventProcessorOptions,
112-
eventBatchMaximumCount: _options.MaxEventBatchSize,
112+
eventBatchMaximumCount: maxEventBatchSize,
113113
exceptionHandler: _options.ExceptionHandler);
114114
}
115115

sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubOptions.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,8 @@ public int BatchCheckpointFrequency
115115
private int _maxEventBatchSize;
116116

117117
/// <summary>
118-
/// Gets or sets the maximum number of events delivered in a batch. Default 10.
118+
/// Gets or sets the maximum number of events delivered in a batch. This setting applies only to functions that
119+
/// receive multiple events. Default 10.
119120
/// </summary>
120121
public int MaxEventBatchSize
121122
{

sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Triggers/EventHubTriggerAttributeBindingProvider.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ public Task<ITriggerBinding> TryCreateAsync(TriggerBindingProviderContext contex
6262
IListener listener = new EventHubListener(
6363
factoryContext.Descriptor.Id,
6464
factoryContext.Executor,
65-
_clientFactory.GetEventProcessorHost(attribute.EventHubName, attribute.Connection, attribute.ConsumerGroup),
65+
_clientFactory.GetEventProcessorHost(attribute.EventHubName, attribute.Connection, attribute.ConsumerGroup, singleDispatch),
6666
singleDispatch,
6767
_clientFactory.GetEventHubConsumerClient(attribute.EventHubName, attribute.Connection, attribute.ConsumerGroup),
6868
checkpointStore,
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
using Microsoft.Extensions.Configuration;
5+
using System.Collections.Generic;
6+
using Azure.Storage.Blobs;
7+
using Microsoft.Azure.WebJobs.EventHubs.Processor;
8+
using Microsoft.Extensions.Azure;
9+
using Microsoft.Extensions.Logging;
10+
using Microsoft.Extensions.Logging.Abstractions;
11+
using Microsoft.Extensions.Options;
12+
using Moq;
13+
14+
namespace Microsoft.Azure.WebJobs.EventHubs.Tests
15+
{
16+
public static class ConfigurationUtilities
17+
{
18+
public static IConfiguration CreateConfiguration(params KeyValuePair<string, string>[] data)
19+
{
20+
return new ConfigurationBuilder().AddInMemoryCollection(data).Build();
21+
}
22+
23+
internal static EventHubClientFactory CreateFactory(IConfiguration configuration, EventHubOptions options, AzureComponentFactory componentFactory = null)
24+
{
25+
componentFactory ??= Mock.Of<AzureComponentFactory>();
26+
var loggerFactory = new NullLoggerFactory();
27+
var azureEventSourceLogForwarder = new AzureEventSourceLogForwarder(loggerFactory);
28+
return new EventHubClientFactory(
29+
configuration,
30+
componentFactory,
31+
Options.Create(options),
32+
new DefaultNameResolver(configuration),
33+
azureEventSourceLogForwarder,
34+
new CheckpointClientProvider(configuration, componentFactory, azureEventSourceLogForwarder, loggerFactory.CreateLogger<BlobServiceClient>()));
35+
}
36+
}
37+
}
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the MIT License. See License.txt in the project root for license information.
3+
4+
using System.Collections.Generic;
5+
using System.Reflection;
6+
using System.Threading;
7+
using System.Threading.Tasks;
8+
using Azure.Core;
9+
using Azure.Messaging.EventHubs;
10+
using Azure.Messaging.EventHubs.Primitives;
11+
using Azure.Storage.Blobs;
12+
using Microsoft.Azure.WebJobs.EventHubs;
13+
using Microsoft.Azure.WebJobs.EventHubs.Listeners;
14+
using Microsoft.Azure.WebJobs.EventHubs.Processor;
15+
using Microsoft.Azure.WebJobs.EventHubs.Tests;
16+
using Microsoft.Azure.WebJobs.Host.Executors;
17+
using Microsoft.Azure.WebJobs.Host.Listeners;
18+
using Microsoft.Azure.WebJobs.Host.Protocols;
19+
using Microsoft.Azure.WebJobs.Host.Triggers;
20+
using Microsoft.Extensions.Azure;
21+
using Microsoft.Extensions.Configuration;
22+
using Microsoft.Extensions.Logging.Abstractions;
23+
using Microsoft.Extensions.Options;
24+
using Moq;
25+
using NUnit.Framework;
26+
27+
namespace Microsoft.Azure.WebJobs.Extensions.EventHubs.UnitTests
28+
{
29+
public class EventHubTriggerAttributeBindingProviderTests
30+
{
31+
private readonly EventHubTriggerAttributeBindingProvider _provider;
32+
33+
public EventHubTriggerAttributeBindingProviderTests()
34+
{
35+
var configuration =
36+
ConfigurationUtilities.CreateConfiguration(
37+
new KeyValuePair<string, string>("connection", "Endpoint=sb://test.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=abc123=;"),
38+
new KeyValuePair<string, string>("Storage", "Endpoint=sb://test.blob.core.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=abc123="));
39+
40+
var options = new EventHubOptions();
41+
42+
Mock<IConverterManager> convertManager = new Mock<IConverterManager>(MockBehavior.Default);
43+
44+
// mock the BlobServiceClient and BlobContainerClient which are used for the checkpointing
45+
var blobServiceClient = new Mock<BlobServiceClient>();
46+
blobServiceClient.Setup(client => client.GetBlobContainerClient(It.IsAny<string>()))
47+
.Returns(Mock.Of<BlobContainerClient>());
48+
var componentFactory = new Mock<AzureComponentFactory>();
49+
componentFactory.Setup(
50+
factory => factory.CreateClient(
51+
typeof(BlobServiceClient),
52+
It.IsAny<IConfiguration>(),
53+
It.IsAny<TokenCredential>(),
54+
It.IsAny<BlobClientOptions>())).Returns(blobServiceClient.Object);
55+
56+
var factory = ConfigurationUtilities.CreateFactory(configuration, options, componentFactory.Object);
57+
_provider = new EventHubTriggerAttributeBindingProvider(convertManager.Object, Options.Create(options), NullLoggerFactory.Instance, factory);
58+
}
59+
60+
[Test]
61+
[TestCase(nameof(SingleDispatch), 1)]
62+
[TestCase(nameof(MultipleDispatch), 10)]
63+
public async Task TryCreateAsync_BatchCountsDefaultedCorrectly(string function, int expectedBatchCount)
64+
{
65+
ParameterInfo parameter = GetType().GetMethod(function, BindingFlags.NonPublic | BindingFlags.Static).GetParameters()[0];
66+
TriggerBindingProviderContext context = new TriggerBindingProviderContext(parameter, CancellationToken.None);
67+
68+
ITriggerBinding binding = await _provider.TryCreateAsync(context);
69+
Assert.NotNull(binding);
70+
71+
var listener = await binding.CreateListenerAsync(new ListenerFactoryContext(new FunctionDescriptor(),
72+
new Mock<ITriggeredFunctionExecutor>().Object, CancellationToken.None));
73+
74+
var processorHost = (EventProcessorHost)typeof(EventHubListener)
75+
.GetField("_eventProcessorHost", BindingFlags.NonPublic | BindingFlags.Instance)
76+
.GetValue(listener);
77+
var batchCount = (int) typeof(EventProcessor<EventProcessorHostPartition>).GetProperty("EventBatchMaximumCount", BindingFlags.NonPublic | BindingFlags.Instance)
78+
.GetValue(processorHost);
79+
Assert.AreEqual(expectedBatchCount, batchCount);
80+
}
81+
82+
internal static void SingleDispatch(
83+
[EventHubTrigger("test", Connection = "connection")]
84+
EventData eventData)
85+
{
86+
}
87+
88+
internal static void MultipleDispatch(
89+
[EventHubTrigger("test", Connection = "connection")]
90+
EventData[] eventData)
91+
{
92+
}
93+
}
94+
}

0 commit comments

Comments
 (0)