Skip to content

Commit f0cd63f

Browse files
pakrymazure-sdk
andauthored
Add client and model readmes and fix partition context (Azure#19114)
Co-authored-by: azure-sdk <azuresdk@microsoft.com>
1 parent a872a8e commit f0cd63f

File tree

9 files changed

+165
-24
lines changed

9 files changed

+165
-24
lines changed

sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/README.md

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,6 @@ public static string Run([TimerTrigger("0 */5 * * * *")] TimerInfo myTimer)
9898

9999
To send multiple events from a single Azure Function invocation you can apply the `EventHubAttribute` to the `IAsyncCollector<string>` or `IAsyncCollector<EventData>` parameter.
100100

101-
102101
```C# Snippet:BindingToCollector
103102
[FunctionName("BindingToCollector")]
104103
public static async Task Run(
@@ -111,6 +110,39 @@ public static async Task Run(
111110
}
112111
```
113112

113+
### Using binding to strongly-typed models
114+
115+
To use strongly-typed model classes with the EventHub binding apply the `EventHubAttribute` to the model parameter.
116+
117+
```C# Snippet:TriggerSingleModel
118+
[FunctionName("TriggerSingleModel")]
119+
public static void Run(
120+
[EventHubTrigger("<event_hub_name>", Connection = "<connection_name>")] Dog dog,
121+
ILogger logger)
122+
{
123+
logger.LogInformation($"Who's a good dog? {dog.Name} is!");
124+
}
125+
```
126+
127+
### Sending multiple events using EventHubProducerClient
128+
129+
You can also bind to the `EventHubProducerClient` directly to have the most control over the event sending.
130+
131+
```C# Snippet:BindingToProducerClient
132+
[FunctionName("BindingToProducerClient")]
133+
public static async Task Run(
134+
[TimerTrigger("0 */5 * * * *")] TimerInfo myTimer,
135+
[EventHub("<event_hub_name>", Connection = "<connection_name>")] EventHubProducerClient eventHubProducerClient)
136+
{
137+
// IAsyncCollector allows sending multiple events in a single function invocation
138+
await eventHubProducerClient.SendAsync(new[]
139+
{
140+
new EventData(new BinaryData($"Event 1 added at: {DateTime.Now}")),
141+
new EventData(new BinaryData($"Event 2 added at: {DateTime.Now}"))
142+
});
143+
}
144+
```
145+
114146
### Per-event triggers
115147

116148
To run a function every time an event is sent to Event Hub apply the `EventHubTriggerAttribute` to a `string` or `EventData` parameter.

sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ public async Task ProcessEventsAsync(EventProcessorHostPartition context, IEnume
141141
var triggerInput = new EventHubTriggerInput
142142
{
143143
Events = events,
144-
PartitionContext = context
144+
ProcessorPartition = context
145145
};
146146

147147
TriggeredFunctionData input = null;
@@ -271,9 +271,9 @@ private static string GetOperationDetails(EventProcessorHostPartition context, s
271271
{
272272
writer.WritePropertyName("runtimeInformation");
273273
writer.WriteStartObject();
274-
WritePropertyIfNotNull(writer, "lastEnqueuedOffset", context.LastEnqueuedEventProperties.Value.Offset?.ToString(CultureInfo.InvariantCulture));
275-
WritePropertyIfNotNull(writer, "lastSequenceNumber", context.LastEnqueuedEventProperties.Value.SequenceNumber?.ToString(CultureInfo.InvariantCulture));
276-
WritePropertyIfNotNull(writer, "lastEnqueuedTimeUtc", context.LastEnqueuedEventProperties.Value.EnqueuedTime?.ToString("o", CultureInfo.InvariantCulture));
274+
WritePropertyIfNotNull(writer, "lastEnqueuedOffset", context.LastEnqueuedEventProperties.Offset?.ToString(CultureInfo.InvariantCulture));
275+
WritePropertyIfNotNull(writer, "lastSequenceNumber", context.LastEnqueuedEventProperties.SequenceNumber?.ToString(CultureInfo.InvariantCulture));
276+
WritePropertyIfNotNull(writer, "lastEnqueuedTimeUtc", context.LastEnqueuedEventProperties.EnqueuedTime?.ToString("o", CultureInfo.InvariantCulture));
277277
writer.WriteEndObject();
278278
}
279279
writer.WriteEndObject();

sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/EventProcessorHostPartition.cs

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ namespace Microsoft.Azure.WebJobs.EventHubs.Processor
1111
{
1212
internal class EventProcessorHostPartition : EventProcessorPartition
1313
{
14+
private PartitionContext _partitionContext;
15+
1416
public EventProcessorHostPartition()
1517
{
1618
}
@@ -20,17 +22,21 @@ public EventProcessorHostPartition(string partitionId)
2022
PartitionId = partitionId;
2123
}
2224

25+
public PartitionContext PartitionContext => _partitionContext ??= new EventProcessorHostPartitionContext(this);
26+
2327
public string Owner => ProcessorHost.Identifier;
2428
public string EventHubPath => ProcessorHost.EventHubName;
2529
public CheckpointInfo? Checkpoint { get; set; }
2630

27-
public LastEnqueuedEventProperties? LastEnqueuedEventProperties
31+
public LastEnqueuedEventProperties LastEnqueuedEventProperties
2832
{
2933
get
3034
{
35+
if (ReadLastEnqueuedEventPropertiesFunc == null) return default;
36+
3137
try
3238
{
33-
return ReadLastEnqueuedEventPropertiesFunc?.Invoke(PartitionId);
39+
return ReadLastEnqueuedEventPropertiesFunc.Invoke(PartitionId);
3440
}
3541
catch (EventHubsException e) when (e.Reason == EventHubsException.FailureReason.ClientClosed)
3642
{
@@ -49,5 +55,17 @@ public async Task CheckpointAsync(EventData checkpointEvent)
4955
await ProcessorHost.CheckpointAsync(PartitionId, checkpointEvent).ConfigureAwait(false);
5056
Checkpoint = new CheckpointInfo(checkpointEvent.Offset, checkpointEvent.SequenceNumber);
5157
}
58+
59+
private class EventProcessorHostPartitionContext : PartitionContext
60+
{
61+
private readonly EventProcessorHostPartition _hostPartition;
62+
63+
public EventProcessorHostPartitionContext(EventProcessorHostPartition hostPartition) : base(hostPartition.PartitionId)
64+
{
65+
_hostPartition = hostPartition;
66+
}
67+
68+
public override LastEnqueuedEventProperties ReadLastEnqueuedEventProperties() => _hostPartition.LastEnqueuedEventProperties;
69+
}
5270
}
5371
}

sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Triggers/EventHubTriggerBindingStrategy.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ public Dictionary<string, object> GetBindingData(EventHubTriggerInput value)
7878
}
7979

8080
var bindingData = new Dictionary<string, object>(StringComparer.OrdinalIgnoreCase);
81-
SafeAddValue(() => bindingData.Add(nameof(value.PartitionContext), value.PartitionContext));
81+
SafeAddValue(() => bindingData.Add("PartitionContext", value.ProcessorPartition?.PartitionContext));
8282

8383
if (value.IsSingleDispatch)
8484
{

sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Triggers/EventHubTriggerInput.cs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@
44
using Azure.Messaging.EventHubs;
55
using System.Collections.Generic;
66
using System.Globalization;
7+
using Azure.Messaging.EventHubs.Consumer;
78
using Azure.Messaging.EventHubs.Primitives;
9+
using Microsoft.Azure.WebJobs.EventHubs.Processor;
810

911
namespace Microsoft.Azure.WebJobs.EventHubs
1012
{
@@ -17,7 +19,7 @@ internal sealed class EventHubTriggerInput
1719

1820
internal EventData[] Events { get; set; }
1921

20-
internal EventProcessorPartition PartitionContext { get; set; }
22+
internal EventProcessorHostPartition ProcessorPartition { get; set; }
2123

2224
public bool IsSingleDispatch
2325
{
@@ -31,7 +33,7 @@ public static EventHubTriggerInput New(EventData eventData)
3133
{
3234
return new EventHubTriggerInput
3335
{
34-
PartitionContext = null,
36+
ProcessorPartition = null,
3537
Events = new EventData[]
3638
{
3739
eventData
@@ -45,7 +47,7 @@ public EventHubTriggerInput GetSingleEventTriggerInput(int idx)
4547
return new EventHubTriggerInput
4648
{
4749
Events = this.Events,
48-
PartitionContext = this.PartitionContext,
50+
ProcessorPartition = this.ProcessorPartition,
4951
_selector = idx
5052
};
5153
}

sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs

Lines changed: 42 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public async Task EventHub_PocoBinding()
4949
var (jobHost, host) = BuildHost<EventHubTestBindToPocoJobs>();
5050
using (jobHost)
5151
{
52-
await jobHost.CallAsync(nameof(EventHubTestBindToPocoJobs.SendEvent_TestHub), new { input = "{ Name: 'foo', Value: 'data' }" });
52+
await jobHost.CallAsync(nameof(EventHubTestBindToPocoJobs.SendEvent_TestHub));
5353

5454
bool result = _eventWait.WaitOne(Timeout);
5555
Assert.True(result);
@@ -131,6 +131,19 @@ public async Task EventHub_SingleDispatch_BinaryData()
131131
AssertSingleDispatchLogs(host);
132132
}
133133

134+
[Test]
135+
public async Task EventHub_ProducerClient()
136+
{
137+
var (jobHost, host) = BuildHost<EventHubTestClientDispatch>();
138+
using (jobHost)
139+
{
140+
await jobHost.CallAsync(nameof(EventHubTestClientDispatch.SendEvents));
141+
142+
bool result = _eventWait.WaitOne(Timeout);
143+
Assert.True(result);
144+
}
145+
}
146+
134147
private static void AssertSingleDispatchLogs(IHost host)
135148
{
136149
IEnumerable<LogMessage> logMessages = host.GetTestLoggerProvider()
@@ -385,13 +398,34 @@ public static void SendEvent_TestHub(string input, [EventHub(TestHubName, Connec
385398

386399
public static void ProcessSingleEvent([EventHubTrigger(TestHubName, Connection = TestHubName)] string evt,
387400
string partitionKey, DateTime enqueuedTimeUtc, IDictionary<string, object> properties,
388-
IDictionary<string, object> systemProperties)
401+
IDictionary<string, object> systemProperties,
402+
PartitionContext partitionContext)
389403
{
390404
Assert.True((DateTime.Now - enqueuedTimeUtc).TotalSeconds < 30);
391405

392406
Assert.AreEqual("value1", properties["TestProp1"]);
393407
Assert.AreEqual("value2", properties["TestProp2"]);
394408

409+
Assert.NotNull(partitionContext.PartitionId);
410+
Assert.NotNull(partitionContext.ReadLastEnqueuedEventProperties());
411+
412+
_eventWait.Set();
413+
}
414+
}
415+
416+
public class EventHubTestClientDispatch
417+
{
418+
public static async Task SendEvents([EventHub(TestHubName, Connection = TestHubName)] EventHubProducerClient producer)
419+
{
420+
await producer.SendAsync(new[]
421+
{
422+
new EventData(new BinaryData("Event 1")),
423+
});
424+
}
425+
426+
public static void ProcessSingleEvent([EventHubTrigger(TestHubName, Connection = TestHubName)] EventData eventData)
427+
{
428+
Assert.AreEqual(eventData.EventBody.ToString(), "Event 1");
395429
_eventWait.Set();
396430
}
397431
}
@@ -429,16 +463,16 @@ public static void ProcessSingleEvent([EventHubTrigger(TestHubName, Connection =
429463

430464
public class EventHubTestBindToPocoJobs
431465
{
432-
public static void SendEvent_TestHub(string input, [EventHub(TestHubName, Connection = TestHubName)] out EventData evt)
466+
public static void SendEvent_TestHub([EventHub(TestHubName, Connection = TestHubName)] out TestPoco evt)
433467
{
434-
evt = new EventData(Encoding.UTF8.GetBytes(input));
468+
evt = new TestPoco() {Value = "data", Name = "foo"};
435469
}
436470

437-
public static void BindToPoco([EventHubTrigger(TestHubName, Connection = TestHubName)] TestPoco input, string value, string name, ILogger logger)
471+
public static void BindToPoco([EventHubTrigger(TestHubName, Connection = TestHubName)] TestPoco input, ILogger logger)
438472
{
439-
Assert.AreEqual(input.Value, value);
440-
Assert.AreEqual(input.Name, name);
441-
logger.LogInformation($"PocoValues({name},{value})");
473+
Assert.AreEqual(input.Value, "data");
474+
Assert.AreEqual(input.Name, "foo");
475+
logger.LogInformation($"PocoValues(foo,data)");
442476
_eventWait.Set();
443477
}
444478
}

sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubTests.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -73,13 +73,13 @@ public void GetBindingData_SingleDispatch_ReturnsExpectedValue()
7373
var evt = GetSystemProperties(new byte[] { });
7474

7575
var input = EventHubTriggerInput.New(evt);
76-
input.PartitionContext = GetPartitionContext();
76+
input.ProcessorPartition = GetPartitionContext();
7777

7878
var strategy = new EventHubTriggerBindingStrategy();
7979
var bindingData = strategy.GetBindingData(input);
8080

8181
Assert.AreEqual(7, bindingData.Count);
82-
Assert.AreSame(input.PartitionContext, bindingData["PartitionContext"]);
82+
Assert.AreSame(input.ProcessorPartition.PartitionContext, bindingData["PartitionContext"]);
8383
Assert.AreEqual(evt.PartitionKey, bindingData["PartitionKey"]);
8484
Assert.AreEqual(evt.Offset, bindingData["Offset"]);
8585
Assert.AreEqual(evt.SequenceNumber, bindingData["SequenceNumber"]);
@@ -118,13 +118,13 @@ public void GetBindingData_MultipleDispatch_ReturnsExpectedValue()
118118
var input = new EventHubTriggerInput
119119
{
120120
Events = events,
121-
PartitionContext = GetPartitionContext(),
121+
ProcessorPartition = GetPartitionContext(),
122122
};
123123
var strategy = new EventHubTriggerBindingStrategy();
124124
var bindingData = strategy.GetBindingData(input);
125125

126126
Assert.AreEqual(7, bindingData.Count);
127-
Assert.AreSame(input.PartitionContext, bindingData["PartitionContext"]);
127+
Assert.AreSame(input.ProcessorPartition.PartitionContext, bindingData["PartitionContext"]);
128128

129129
// verify an array was created for each binding data type
130130
Assert.AreEqual(events.Length, ((string[])bindingData["PartitionKeyArray"]).Length);
@@ -145,8 +145,8 @@ public void TriggerStrategy()
145145
string data = "123";
146146

147147
var strategy = new EventHubTriggerBindingStrategy();
148-
EventHubTriggerInput triggerInput = strategy.ConvertFromString(data);
149148

149+
EventHubTriggerInput triggerInput = strategy.ConvertFromString(data);
150150
var contract = strategy.GetBindingData(triggerInput);
151151

152152
EventData single = strategy.BindSingle(triggerInput, null);
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the MIT License. See License.txt in the project root for license information.
3+
4+
using System;
5+
using System.Threading.Tasks;
6+
using Azure.Messaging.EventHubs;
7+
using Azure.Messaging.EventHubs.Producer;
8+
9+
namespace Microsoft.Azure.WebJobs.Extensions.EventHubs.Tests.Samples
10+
{
11+
public static class BindingToProducerClient
12+
{
13+
#region Snippet:BindingToProducerClient
14+
[FunctionName("BindingToProducerClient")]
15+
public static async Task Run(
16+
[TimerTrigger("0 */5 * * * *")] TimerInfo myTimer,
17+
[EventHub("<event_hub_name>", Connection = "<connection_name>")] EventHubProducerClient eventHubProducerClient)
18+
{
19+
// IAsyncCollector allows sending multiple events in a single function invocation
20+
await eventHubProducerClient.SendAsync(new[]
21+
{
22+
new EventData(new BinaryData($"Event 1 added at: {DateTime.Now}")),
23+
new EventData(new BinaryData($"Event 2 added at: {DateTime.Now}"))
24+
});
25+
}
26+
#endregion
27+
}
28+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the MIT License. See License.txt in the project root for license information.
3+
4+
using Microsoft.Extensions.Logging;
5+
6+
namespace Microsoft.Azure.WebJobs.Extensions.EventHubs.Tests.Samples
7+
{
8+
public static class TriggerSingleModel
9+
{
10+
public class Dog
11+
{
12+
public string Name { get; set; }
13+
public string Breed { get; set; }
14+
public int Age { get; set; }
15+
}
16+
17+
#region Snippet:TriggerSingleModel
18+
[FunctionName("TriggerSingleModel")]
19+
public static void Run(
20+
[EventHubTrigger("<event_hub_name>", Connection = "<connection_name>")] Dog dog,
21+
ILogger logger)
22+
{
23+
logger.LogInformation($"Who's a good dog? {dog.Name} is!");
24+
}
25+
#endregion
26+
}
27+
}

0 commit comments

Comments
 (0)