Skip to content

Commit 78fb87e

Browse files
Binary data binding (Azure#17728)
Fixes Azure#17473
1 parent b3e61dd commit 78fb87e

File tree

2 files changed

+105
-3
lines changed

2 files changed

+105
-3
lines changed

sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubExtensionConfigProvider.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ public void Initialize(ExtensionConfigContext context)
6464
.AddConverter<EventData, string>(ConvertEventDataToString)
6565
.AddConverter<byte[], EventData>(ConvertBytes2EventData)
6666
.AddConverter<EventData, byte[]>(ConvertEventDataToBytes)
67+
.AddConverter<BinaryData, EventData>(ConvertBinaryDataToEventData)
68+
.AddConverter<EventData, BinaryData>(ConvertEventDataToBinaryData)
6769
.AddOpenConverter<OpenType.Poco, EventData>(ConvertPocoToEventData);
6870

6971
// register our trigger binding provider
@@ -114,5 +116,11 @@ private static Task<object> ConvertPocoToEventData(object arg, Attribute attrRes
114116
{
115117
return Task.FromResult<object>(ConvertStringToEventData(JsonConvert.SerializeObject(arg)));
116118
}
119+
120+
private static EventData ConvertBinaryDataToEventData(BinaryData input)
121+
=> new EventData(input);
122+
123+
private static BinaryData ConvertEventDataToBinaryData(EventData input)
124+
=> input.EventBody;
117125
}
118126
}

sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs

Lines changed: 97 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ namespace Microsoft.Azure.WebJobs.Host.EndToEndTests
2424
{
2525
[NonParallelizable]
2626
[LiveOnly]
27-
public class EventHubEndToEndTests: WebJobsEventHubTestBase
27+
public class EventHubEndToEndTests : WebJobsEventHubTestBase
2828
{
2929
private static EventWaitHandle _eventWait;
3030
private static List<string> _results;
@@ -48,7 +48,7 @@ public async Task EventHub_PocoBinding()
4848
using (jobHost)
4949
{
5050
var method = typeof(EventHubTestBindToPocoJobs).GetMethod(nameof(EventHubTestBindToPocoJobs.SendEvent_TestHub), BindingFlags.Static | BindingFlags.Public);
51-
await jobHost.CallAsync(method, new { input = "{ Name: 'foo', Value: '" + _testId +"' }" });
51+
await jobHost.CallAsync(method, new { input = "{ Name: 'foo', Value: '" + _testId + "' }" });
5252

5353
bool result = _eventWait.WaitOne(Timeout);
5454
Assert.True(result);
@@ -90,6 +90,26 @@ public async Task EventHub_SingleDispatch()
9090
Assert.True(result);
9191
}
9292

93+
AssertSingleDispatchLogs(host);
94+
}
95+
96+
[Test]
97+
public async Task EventHub_SingleDispatch_BinaryData()
98+
{
99+
var (jobHost, host) = BuildHost<EventHubTestSingleDispatchJobsBinaryData>();
100+
using (jobHost)
101+
{
102+
await jobHost.CallAsync(nameof(EventHubTestSingleDispatchJobsBinaryData.SendEvent_TestHub), new { input = _testId });
103+
104+
bool result = _eventWait.WaitOne(Timeout);
105+
Assert.True(result);
106+
}
107+
108+
AssertSingleDispatchLogs(host);
109+
}
110+
111+
private static void AssertSingleDispatchLogs(IHost host)
112+
{
93113
IEnumerable<LogMessage> logMessages = host.GetTestLoggerProvider()
94114
.GetAllLogMessages();
95115

@@ -165,14 +185,35 @@ public async Task EventHub_MultipleDispatch()
165185
var (jobHost, host) = BuildHost<EventHubTestMultipleDispatchJobs>();
166186
using (jobHost)
167187
{
168-
var method = typeof(EventHubTestMultipleDispatchJobs).GetMethod("SendEvents_TestHub", BindingFlags.Static | BindingFlags.Public);
188+
var method = typeof(EventHubTestMultipleDispatchJobs).GetMethod(nameof(EventHubTestMultipleDispatchJobs.SendEvents_TestHub), BindingFlags.Static | BindingFlags.Public);
169189
int numEvents = 5;
170190
await jobHost.CallAsync(method, new { numEvents = numEvents, input = _testId });
171191

172192
bool result = _eventWait.WaitOne(Timeout);
173193
Assert.True(result);
174194
}
175195

196+
AssertMultipleDispatchLogs(host);
197+
}
198+
199+
[Test]
200+
public async Task EventHub_MultipleDispatch_BinaryData()
201+
{
202+
var (jobHost, host) = BuildHost<EventHubTestMultipleDispatchJobsBinaryData>();
203+
using (jobHost)
204+
{
205+
int numEvents = 5;
206+
await jobHost.CallAsync(nameof(EventHubTestMultipleDispatchJobsBinaryData.SendEvents_TestHub), new { numEvents = numEvents, input = _testId });
207+
208+
bool result = _eventWait.WaitOne(Timeout);
209+
Assert.True(result);
210+
}
211+
212+
AssertMultipleDispatchLogs(host);
213+
}
214+
215+
private static void AssertMultipleDispatchLogs(IHost host)
216+
{
176217
IEnumerable<LogMessage> logMessages = host.GetTestLoggerProvider()
177218
.GetAllLogMessages();
178219

@@ -235,6 +276,26 @@ public static void ProcessSingleEvent([EventHubTrigger(TestHubName)] string evt,
235276
}
236277
}
237278

279+
public class EventHubTestSingleDispatchJobsBinaryData
280+
{
281+
public static void SendEvent_TestHub(string input, [EventHub(TestHubName)] out BinaryData evt)
282+
{
283+
evt = new BinaryData(input);
284+
}
285+
286+
public static void ProcessSingleEvent([EventHubTrigger(TestHubName)] BinaryData evt,
287+
string partitionKey, DateTime enqueuedTimeUtc, IDictionary<string, object> properties,
288+
IDictionary<string, object> systemProperties)
289+
{
290+
// filter for the ID the current test is using
291+
if (evt.ToString() == _testId)
292+
{
293+
Assert.True((DateTime.Now - enqueuedTimeUtc).TotalSeconds < 30);
294+
_eventWait.Set();
295+
}
296+
}
297+
}
298+
238299
public class EventHubTestBindToPocoJobs
239300
{
240301
public static void SendEvent_TestHub(string input, [EventHub(TestHubName)] out EventData evt)
@@ -312,6 +373,39 @@ public static void ProcessMultipleEvents([EventHubTrigger(TestHubName)] string[]
312373
}
313374
}
314375

376+
public class EventHubTestMultipleDispatchJobsBinaryData
377+
{
378+
private static int s_eventCount;
379+
private static int s_processedEventCount;
380+
public static void SendEvents_TestHub(int numEvents, string input, [EventHub(TestHubName)] out BinaryData[] events)
381+
{
382+
s_eventCount = numEvents;
383+
events = new BinaryData[numEvents];
384+
for (int i = 0; i < numEvents; i++)
385+
{
386+
events[i] = new BinaryData(input);
387+
}
388+
}
389+
390+
public static void ProcessMultipleEventsBinaryData([EventHubTrigger(TestHubName)] BinaryData[] events,
391+
string[] partitionKeyArray, DateTime[] enqueuedTimeUtcArray, IDictionary<string, object>[] propertiesArray,
392+
IDictionary<string, object>[] systemPropertiesArray)
393+
{
394+
Assert.AreEqual(events.Length, partitionKeyArray.Length);
395+
Assert.AreEqual(events.Length, enqueuedTimeUtcArray.Length);
396+
Assert.AreEqual(events.Length, propertiesArray.Length);
397+
Assert.AreEqual(events.Length, systemPropertiesArray.Length);
398+
399+
s_processedEventCount += events.Length;
400+
401+
// filter for the ID the current test is using
402+
if (events[0].ToString() == _testId && s_processedEventCount == s_eventCount)
403+
{
404+
_eventWait.Set();
405+
}
406+
}
407+
}
408+
315409
public class EventHubPartitionKeyTestJobs
316410
{
317411
// send more events per partition than the EventHubsOptions.MaxBatchSize

0 commit comments

Comments
 (0)