Skip to content

Commit 5694731

Browse files
Add byte array output binding (Azure#25813)
* Add byte array output binding * PR fb
1 parent 32edecc commit 5694731

File tree

2 files changed

+92
-103
lines changed

2 files changed

+92
-103
lines changed

sdk/eventgrid/Microsoft.Azure.WebJobs.Extensions.EventGrid/src/OutputBinding/EventGridAsyncCollector.cs

Lines changed: 58 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -51,128 +51,83 @@ public EventGridAsyncCollector(EventGridPublisherClient client)
5151
{
5252
// determine the schema by inspecting the first event (a topic can only support a single schema)
5353
var firstEvent = events.First();
54-
if (firstEvent is string str)
54+
switch (firstEvent)
5555
{
56-
bool isEventGridEvent = false;
57-
try
58-
{
59-
var ev = EventGridEvent.Parse(new BinaryData(str));
60-
isEventGridEvent = true;
61-
}
62-
catch (ArgumentException)
63-
{
64-
}
65-
66-
if (isEventGridEvent)
56+
case string:
57+
await SendAsync(events, evt => new BinaryData((string)evt), cancellationToken)
58+
.ConfigureAwait(false);
59+
break;
60+
case BinaryData:
61+
await SendAsync(events, evt => (BinaryData) evt, cancellationToken)
62+
.ConfigureAwait(false);
63+
break;
64+
case byte[]:
65+
await SendAsync(events, evt => new BinaryData((byte[])evt), cancellationToken)
66+
.ConfigureAwait(false);
67+
break;
68+
case JObject:
69+
await SendAsync(events, evt => new BinaryData(((JObject)evt).ToString()), cancellationToken)
70+
.ConfigureAwait(false);
71+
break;
72+
case EventGridEvent:
6773
{
6874
List<EventGridEvent> egEvents = new(events.Count);
69-
foreach (string evt in events)
75+
foreach (object evt in events)
7076
{
71-
egEvents.Add(EventGridEvent.Parse(new BinaryData(evt)));
77+
egEvents.Add((EventGridEvent) evt);
7278
}
73-
7479
await _client.SendEventsAsync(egEvents, cancellationToken).ConfigureAwait(false);
80+
break;
7581
}
76-
else
82+
case CloudEvent:
7783
{
7884
List<CloudEvent> cloudEvents = new(events.Count);
79-
foreach (string evt in events)
85+
foreach (object evt in events)
8086
{
81-
cloudEvents.Add(CloudEvent.Parse(new BinaryData(evt)));
87+
cloudEvents.Add((CloudEvent) evt);
8288
}
83-
8489
await _client.SendEventsAsync(cloudEvents, cancellationToken).ConfigureAwait(false);
90+
break;
8591
}
92+
default:
93+
throw new InvalidOperationException(
94+
$"{firstEvent?.GetType().ToString()} is not a valid event type.");
8695
}
87-
else if (firstEvent is BinaryData data)
88-
{
89-
bool isEventGridEvent = false;
90-
try
91-
{
92-
var ev = EventGridEvent.Parse(data);
93-
isEventGridEvent = true;
94-
}
95-
catch (ArgumentException)
96-
{
97-
}
98-
99-
if (isEventGridEvent)
100-
{
101-
List<EventGridEvent> egEvents = new(events.Count);
102-
foreach (BinaryData evt in events)
103-
{
104-
egEvents.Add(EventGridEvent.Parse(evt));
105-
}
106-
107-
await _client.SendEventsAsync(egEvents, cancellationToken).ConfigureAwait(false);
108-
}
109-
else
110-
{
111-
List<CloudEvent> cloudEvents = new(events.Count);
112-
foreach (BinaryData evt in events)
113-
{
114-
cloudEvents.Add(CloudEvent.Parse(evt));
115-
}
116-
117-
await _client.SendEventsAsync(cloudEvents, cancellationToken).ConfigureAwait(false);
118-
}
119-
}
120-
else if (firstEvent is JObject jObject)
121-
{
122-
bool isEventGridEvent = false;
123-
try
124-
{
125-
var ev = EventGridEvent.Parse(new BinaryData(jObject.ToString()));
126-
isEventGridEvent = true;
127-
}
128-
catch (ArgumentException)
129-
{
130-
}
131-
132-
if (isEventGridEvent)
133-
{
134-
List<EventGridEvent> egEvents = new(events.Count);
135-
foreach (JObject evt in events)
136-
{
137-
egEvents.Add(EventGridEvent.Parse(new BinaryData(evt.ToString())));
138-
}
139-
140-
await _client.SendEventsAsync(egEvents, cancellationToken).ConfigureAwait(false);
141-
}
142-
else
143-
{
144-
List<CloudEvent> cloudEvents = new(events.Count);
145-
foreach (JObject evt in events)
146-
{
147-
cloudEvents.Add(CloudEvent.Parse(new BinaryData(evt.ToString())));
148-
}
96+
}
97+
}
14998

150-
await _client.SendEventsAsync(cloudEvents, cancellationToken).ConfigureAwait(false);
151-
}
152-
}
153-
else if (firstEvent is EventGridEvent)
154-
{
155-
List<EventGridEvent> egEvents = new(events.Count);
156-
foreach (object evt in events)
157-
{
158-
egEvents.Add((EventGridEvent) evt);
159-
}
160-
await _client.SendEventsAsync(egEvents, cancellationToken).ConfigureAwait(false);
161-
}
162-
else if (firstEvent is CloudEvent)
99+
private async Task SendAsync(IList<object> events, Func<object, BinaryData> binaryDataFactory, CancellationToken cancellationToken)
100+
{
101+
bool isEventGridEvent = false;
102+
try
103+
{
104+
// test the first event to determine CloudEvent vs EventGridEvent
105+
// both event types are NOT supported in same list
106+
EventGridEvent.Parse(binaryDataFactory(events.First()));
107+
isEventGridEvent = true;
108+
}
109+
catch (ArgumentException)
110+
{
111+
}
112+
if (isEventGridEvent)
113+
{
114+
List<EventGridEvent> egEvents = new(events.Count);
115+
foreach (object evt in events)
163116
{
164-
List<CloudEvent> cloudEvents = new(events.Count);
165-
foreach (object evt in events)
166-
{
167-
cloudEvents.Add((CloudEvent) evt);
168-
}
169-
await _client.SendEventsAsync(cloudEvents, cancellationToken).ConfigureAwait(false);
117+
egEvents.Add(EventGridEvent.Parse(binaryDataFactory(evt)));
170118
}
171-
else
119+
120+
await _client.SendEventsAsync(egEvents, cancellationToken).ConfigureAwait(false);
121+
}
122+
else
123+
{
124+
List<CloudEvent> cloudEvents = new(events.Count);
125+
foreach (object evt in events)
172126
{
173-
throw new InvalidOperationException(
174-
$"{firstEvent?.GetType().ToString()} is not a valid event type.");
127+
cloudEvents.Add(CloudEvent.Parse(binaryDataFactory(evt)));
175128
}
129+
130+
await _client.SendEventsAsync(cloudEvents, cancellationToken).ConfigureAwait(false);
176131
}
177132
}
178133
}

sdk/eventgrid/Microsoft.Azure.WebJobs.Extensions.EventGrid/tests/JobhostEndToEnd.cs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,7 @@ public void OutputBindingInvalidCredentialTests()
236236
[TestCase("AsyncCollectorEvent", "0 1 2 3 4 5 6")]
237237
[TestCase("StringEvents", "0 1 2 3 4")]
238238
[TestCase("BinaryDataEvents", "0 1 2 3 4")]
239+
[TestCase("ByteArrayEvents", "0 1 2 3 4")]
239240
[TestCase("JObjectEvents", "0 1 2 3 4")]
240241
public async Task OutputBindingParamsTests(string functionName, string expectedCollection)
241242
{
@@ -353,6 +354,7 @@ public void TestEventGridToCustomCollection_Batch([EventGridTrigger] ValidPoco[]
353354
[TestCase("AsyncCollectorEvent", "0 1 2 3 4 5 6")]
354355
[TestCase("StringEvents", "0 1 2 3 4")]
355356
[TestCase("BinaryDataEvents", "0 1 2 3 4")]
357+
[TestCase("ByteArrayEvents", "0 1 2 3 4")]
356358
[TestCase("JObjectEvents", "0 1 2 3 4")]
357359
public async Task OutputCloudEventBindingParamsTests(string functionName, string expectedCollection)
358360
{
@@ -697,6 +699,22 @@ public void BinaryDataEvents([EventGrid(TopicEndpointUri = "eventgridUri", Topic
697699
}
698700
}
699701

702+
public void ByteArrayEvents([EventGrid(TopicEndpointUri = "eventgridUri", TopicKeySetting = "eventgridKey")] out byte[][] data)
703+
{
704+
data = new byte[5][];
705+
for (int i = 0; i < 5; i++)
706+
{
707+
data[i] = new BinaryData($@"
708+
{{
709+
""id"" : ""{i}"",
710+
""data"" : ""{i}"",
711+
""eventType"" : ""custom"",
712+
""subject"" : ""custom"",
713+
""dataVersion"" : ""1""
714+
}}").ToArray();
715+
}
716+
}
717+
700718
public void JObjectEvents([EventGrid(TopicEndpointUri = "eventgridUri", TopicKeySetting = "eventgridKey")] out JObject[] jobjects)
701719
{
702720
jobjects = new JObject[5];
@@ -828,6 +846,22 @@ public void BinaryDataEvents([EventGrid(TopicEndpointUri = "eventgridUri", Topic
828846
}
829847
}
830848

849+
public void ByteArrayEvents([EventGrid(TopicEndpointUri = "eventgridUri", TopicKeySetting = "eventgridKey")] out byte[][] data)
850+
{
851+
data = new byte[5][];
852+
for (int i = 0; i < 5; i++)
853+
{
854+
data[i] = new BinaryData($@"
855+
{{
856+
""id"" : ""{i}"",
857+
""data"" : ""{i}"",
858+
""source"" : ""custom"",
859+
""type"" : ""custom"",
860+
""specversion"" : ""1.0""
861+
}}").ToArray();
862+
}
863+
}
864+
831865
// assume converter is applied correctly with other output binding types
832866
public void JObjectEvents([EventGrid(TopicEndpointUri = "eventgridUri", TopicKeySetting = "eventgridKey")] out JObject[] jobjects)
833867
{

0 commit comments

Comments
 (0)