Skip to content

Commit cdd844a

Browse files
authored
EventGrid blob trigger support (Azure#17137)
1 parent 9b89fff commit cdd844a

30 files changed

+629
-155
lines changed
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
2+
Microsoft Visual Studio Solution File, Format Version 12.00
3+
# Visual Studio Version 16
4+
VisualStudioVersion = 16.0.30704.19
5+
MinimumVisualStudioVersion = 10.0.40219.1
6+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Azure.WebJobs.Extensions.EventGrid.Tests", "tests\Microsoft.Azure.WebJobs.Extensions.EventGrid.Tests.csproj", "{E58845C7-D3EF-41F8-9E02-C8F02C1DEFA5}"
7+
EndProject
8+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Azure.WebJobs.Extensions.EventGrid", "src\Microsoft.Azure.WebJobs.Extensions.EventGrid.csproj", "{9322A9CD-ADC3-4BF3-B3AA-063A66585113}"
9+
EndProject
10+
Global
11+
GlobalSection(SolutionConfigurationPlatforms) = preSolution
12+
Debug|Any CPU = Debug|Any CPU
13+
Release|Any CPU = Release|Any CPU
14+
EndGlobalSection
15+
GlobalSection(ProjectConfigurationPlatforms) = postSolution
16+
{E58845C7-D3EF-41F8-9E02-C8F02C1DEFA5}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
17+
{E58845C7-D3EF-41F8-9E02-C8F02C1DEFA5}.Debug|Any CPU.Build.0 = Debug|Any CPU
18+
{E58845C7-D3EF-41F8-9E02-C8F02C1DEFA5}.Release|Any CPU.ActiveCfg = Release|Any CPU
19+
{E58845C7-D3EF-41F8-9E02-C8F02C1DEFA5}.Release|Any CPU.Build.0 = Release|Any CPU
20+
{9322A9CD-ADC3-4BF3-B3AA-063A66585113}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
21+
{9322A9CD-ADC3-4BF3-B3AA-063A66585113}.Debug|Any CPU.Build.0 = Debug|Any CPU
22+
{9322A9CD-ADC3-4BF3-B3AA-063A66585113}.Release|Any CPU.ActiveCfg = Release|Any CPU
23+
{9322A9CD-ADC3-4BF3-B3AA-063A66585113}.Release|Any CPU.Build.0 = Release|Any CPU
24+
EndGlobalSection
25+
GlobalSection(SolutionProperties) = preSolution
26+
HideSolutionNode = FALSE
27+
EndGlobalSection
28+
GlobalSection(ExtensibilityGlobals) = postSolution
29+
SolutionGuid = {D8FAA1CE-4C73-49FE-A59D-CCEBDAA223CE}
30+
EndGlobalSection
31+
EndGlobal

sdk/eventgrid/Microsoft.Azure.WebJobs.Extensions.EventGrid/src/EventGridExtensionConfigProvider.cs

Lines changed: 37 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -32,18 +32,24 @@ internal class EventGridExtensionConfigProvider : IExtensionConfigProvider,
3232
private ILogger _logger;
3333
private readonly ILoggerFactory _loggerFactory;
3434
private readonly Func<EventGridAttribute, IAsyncCollector<EventGridEvent>> _converter;
35+
private readonly HttpRequestProcessor _httpRequestProcessor;
3536

3637
// for end to end testing
37-
internal EventGridExtensionConfigProvider(Func<EventGridAttribute, IAsyncCollector<EventGridEvent>> converter, ILoggerFactory loggerFactory)
38+
internal EventGridExtensionConfigProvider(
39+
Func<EventGridAttribute, IAsyncCollector<EventGridEvent>> converter,
40+
HttpRequestProcessor httpRequestProcessor,
41+
ILoggerFactory loggerFactory)
3842
{
3943
_converter = converter;
44+
_httpRequestProcessor = httpRequestProcessor;
4045
_loggerFactory = loggerFactory;
4146
}
4247

4348
// default constructor
44-
public EventGridExtensionConfigProvider(ILoggerFactory loggerFactory)
49+
public EventGridExtensionConfigProvider(HttpRequestProcessor httpRequestProcessor, ILoggerFactory loggerFactory)
4550
{
4651
_converter = (attr => new EventGridAsyncCollector(new EventGridPublisherClient(new Uri(attr.TopicEndpointUri), new AzureKeyCredential(attr.TopicKeySetting))));
52+
_httpRequestProcessor = httpRequestProcessor;
4753
_loggerFactory = loggerFactory;
4854
}
4955

@@ -122,90 +128,48 @@ private async Task<HttpResponseMessage> ProcessAsync(HttpRequestMessage req)
122128
return new HttpResponseMessage(HttpStatusCode.NotFound) { Content = new StringContent($"cannot find function: '{functionName}'") };
123129
}
124130

125-
IEnumerable<string> eventTypeHeaders = null;
126-
string eventTypeHeader = null;
127-
if (req.Headers.TryGetValues("aeg-event-type", out eventTypeHeaders))
128-
{
129-
eventTypeHeader = eventTypeHeaders.First();
130-
}
131-
132-
if (String.Equals(eventTypeHeader, "SubscriptionValidation", StringComparison.OrdinalIgnoreCase))
133-
{
134-
string jsonArray = await req.Content.ReadAsStringAsync().ConfigureAwait(false);
135-
SubscriptionValidationEvent validationEvent = null;
136-
List<JObject> events = JsonConvert.DeserializeObject<List<JObject>>(jsonArray);
137-
// TODO remove unnecessary serialization
138-
validationEvent = ((JObject)events[0]["data"]).ToObject<SubscriptionValidationEvent>();
139-
SubscriptionValidationResponse validationResponse = new SubscriptionValidationResponse { ValidationResponse = validationEvent.ValidationCode };
140-
var returnMessage = new HttpResponseMessage(HttpStatusCode.OK);
141-
returnMessage.Content = new StringContent(JsonConvert.SerializeObject(validationResponse));
142-
_logger.LogInformation($"perform handshake with eventGrid for function: {functionName}");
143-
return returnMessage;
144-
}
145-
else if (String.Equals(eventTypeHeader, "Notification", StringComparison.OrdinalIgnoreCase))
146-
{
147-
JArray events = null;
148-
string requestContent = await req.Content.ReadAsStringAsync().ConfigureAwait(false);
149-
var token = JToken.Parse(requestContent);
150-
if (token.Type == JTokenType.Array)
151-
{
152-
// eventgrid schema
153-
events = (JArray)token;
154-
}
155-
else if (token.Type == JTokenType.Object)
156-
{
157-
// cloudevent schema
158-
events = new JArray
159-
{
160-
token
161-
};
162-
}
131+
return await _httpRequestProcessor.ProcessAsync(req, functionName, ProcessEventsAsync, CancellationToken.None).ConfigureAwait(false);
132+
}
163133

164-
List<Task<FunctionResult>> executions = new List<Task<FunctionResult>>();
134+
private async Task<HttpResponseMessage> ProcessEventsAsync(JArray events, string functionName, CancellationToken cancellationToken)
135+
{
136+
List<Task<FunctionResult>> executions = new List<Task<FunctionResult>>();
165137

166-
// Single Dispatch
167-
if (_listeners[functionName].SingleDispatch)
168-
{
169-
foreach (var ev in events)
170-
{
171-
// assume each event is a JObject
172-
TriggeredFunctionData triggerData = new TriggeredFunctionData
173-
{
174-
TriggerValue = ev
175-
};
176-
executions.Add(_listeners[functionName].Executor.TryExecuteAsync(triggerData, CancellationToken.None));
177-
}
178-
await Task.WhenAll(executions).ConfigureAwait(false);
179-
}
180-
// Batch Dispatch
181-
else
138+
// Single Dispatch
139+
if (_listeners[functionName].SingleDispatch)
140+
{
141+
foreach (var ev in events)
182142
{
143+
// assume each event is a JObject
183144
TriggeredFunctionData triggerData = new TriggeredFunctionData
184145
{
185-
TriggerValue = events
146+
TriggerValue = ev
186147
};
187148
executions.Add(_listeners[functionName].Executor.TryExecuteAsync(triggerData, CancellationToken.None));
188149
}
189-
190-
// FIXME without internal queuing, we are going to process all events in parallel
191-
// and return 500 if there's at least one failure...which will cause EventGrid to resend the entire payload
192-
foreach (var execution in executions)
150+
await Task.WhenAll(executions).ConfigureAwait(false);
151+
}
152+
// Batch Dispatch
153+
else
154+
{
155+
TriggeredFunctionData triggerData = new TriggeredFunctionData
193156
{
194-
if (!execution.Result.Succeeded)
195-
{
196-
return new HttpResponseMessage(HttpStatusCode.InternalServerError) { Content = new StringContent(execution.Result.Exception.Message) };
197-
}
198-
}
199-
200-
return new HttpResponseMessage(HttpStatusCode.Accepted);
157+
TriggerValue = events
158+
};
159+
executions.Add(_listeners[functionName].Executor.TryExecuteAsync(triggerData, CancellationToken.None));
201160
}
202-
else if (String.Equals(eventTypeHeader, "Unsubscribe", StringComparison.OrdinalIgnoreCase))
161+
162+
// FIXME without internal queuing, we are going to process all events in parallel
163+
// and return 500 if there's at least one failure...which will cause EventGrid to resend the entire payload
164+
foreach (var execution in executions)
203165
{
204-
// TODO disable function?
205-
return new HttpResponseMessage(HttpStatusCode.Accepted);
166+
if (!execution.Result.Succeeded)
167+
{
168+
return new HttpResponseMessage(HttpStatusCode.InternalServerError) { Content = new StringContent(execution.Result.Exception.Message) };
169+
}
206170
}
207171

208-
return new HttpResponseMessage(HttpStatusCode.BadRequest);
172+
return new HttpResponseMessage(HttpStatusCode.Accepted);
209173
}
210174

211175
private class JTokenToPocoConverter<T> : IConverter<JToken, T>

sdk/eventgrid/Microsoft.Azure.WebJobs.Extensions.EventGrid/src/EventGridWebJobsBuilderExtensions.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// Licensed under the MIT License.
33

44
using System;
5+
using Microsoft.Extensions.DependencyInjection.Extensions;
56

67
namespace Microsoft.Azure.WebJobs.Extensions.EventGrid
78
{
@@ -21,6 +22,7 @@ public static IWebJobsBuilder AddEventGrid(this IWebJobsBuilder builder)
2122
throw new ArgumentNullException(nameof(builder));
2223
}
2324

25+
builder.Services.TryAddSingleton<HttpRequestProcessor>();
2426
builder.AddExtension<EventGridExtensionConfigProvider>();
2527
return builder;
2628
}
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
using System;
5+
using System.Collections.Generic;
6+
using System.Linq;
7+
using System.Net;
8+
using System.Net.Http;
9+
using System.Threading;
10+
using System.Threading.Tasks;
11+
using Microsoft.Extensions.Logging;
12+
using Newtonsoft.Json;
13+
using Newtonsoft.Json.Linq;
14+
15+
namespace Microsoft.Azure.WebJobs.Extensions.EventGrid
16+
{
17+
internal class HttpRequestProcessor
18+
{
19+
private readonly ILogger _logger;
20+
21+
public HttpRequestProcessor(ILogger<HttpRequestProcessor> logger)
22+
{
23+
_logger = logger;
24+
}
25+
26+
internal async Task<HttpResponseMessage> ProcessAsync(HttpRequestMessage req, string functionName, Func<JArray, string, CancellationToken, Task<HttpResponseMessage>> eventsFunc, CancellationToken cancellationToken)
27+
{
28+
IEnumerable<string> eventTypeHeaders = null;
29+
string eventTypeHeader = null;
30+
if (req.Headers.TryGetValues("aeg-event-type", out eventTypeHeaders))
31+
{
32+
eventTypeHeader = eventTypeHeaders.First();
33+
}
34+
35+
if (String.Equals(eventTypeHeader, "SubscriptionValidation", StringComparison.OrdinalIgnoreCase))
36+
{
37+
string jsonArray = await req.Content.ReadAsStringAsync().ConfigureAwait(false);
38+
SubscriptionValidationEvent validationEvent = null;
39+
List<JObject> events = JsonConvert.DeserializeObject<List<JObject>>(jsonArray);
40+
// TODO remove unnecessary serialization
41+
validationEvent = ((JObject)events[0]["data"]).ToObject<SubscriptionValidationEvent>();
42+
SubscriptionValidationResponse validationResponse = new SubscriptionValidationResponse { ValidationResponse = validationEvent.ValidationCode };
43+
var returnMessage = new HttpResponseMessage(HttpStatusCode.OK);
44+
returnMessage.Content = new StringContent(JsonConvert.SerializeObject(validationResponse));
45+
_logger.LogInformation($"perform handshake with eventGrid for function: {functionName}");
46+
return returnMessage;
47+
}
48+
else if (String.Equals(eventTypeHeader, "Notification", StringComparison.OrdinalIgnoreCase))
49+
{
50+
JArray events = null;
51+
string requestContent = await req.Content.ReadAsStringAsync().ConfigureAwait(false);
52+
var token = JToken.Parse(requestContent);
53+
if (token.Type == JTokenType.Array)
54+
{
55+
// eventgrid schema
56+
events = (JArray)token;
57+
}
58+
else if (token.Type == JTokenType.Object)
59+
{
60+
// cloudevent schema
61+
events = new JArray
62+
{
63+
token
64+
};
65+
}
66+
67+
return await eventsFunc(events, functionName, cancellationToken).ConfigureAwait(false);
68+
}
69+
else if (String.Equals(eventTypeHeader, "Unsubscribe", StringComparison.OrdinalIgnoreCase))
70+
{
71+
// TODO disable function?
72+
return new HttpResponseMessage(HttpStatusCode.Accepted);
73+
}
74+
75+
return new HttpResponseMessage(HttpStatusCode.BadRequest);
76+
}
77+
}
78+
}

sdk/eventgrid/Microsoft.Azure.WebJobs.Extensions.EventGrid/tests/JobhostEndToEnd.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
using Microsoft.Azure.WebJobs.Host.Indexers;
1515
using Microsoft.Azure.WebJobs.Host.TestCommon;
1616
using Microsoft.Extensions.Logging;
17+
using Microsoft.Extensions.Logging.Abstractions;
1718
using Moq;
1819
using Newtonsoft.Json.Linq;
1920
using NUnit.Framework;
@@ -205,7 +206,7 @@ public async Task OutputBindingParamsTests(string functionName, string expectedC
205206
ILoggerFactory loggerFactory = new LoggerFactory();
206207
loggerFactory.AddProvider(new TestLoggerProvider());
207208
// use moq eventgridclient for test extension
208-
var customExtension = new EventGridExtensionConfigProvider(customConverter, loggerFactory);
209+
var customExtension = new EventGridExtensionConfigProvider(customConverter, new HttpRequestProcessor(NullLoggerFactory.Instance.CreateLogger<HttpRequestProcessor>()), loggerFactory);
209210

210211
var configuration = new Dictionary<string, string>
211212
{

sdk/eventgrid/Microsoft.Azure.WebJobs.Extensions.EventGrid/tests/TestListener.cs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
using Newtonsoft.Json.Linq;
1313
using Microsoft.Extensions.Logging.Abstractions;
1414
using NUnit.Framework;
15+
using Microsoft.Extensions.Logging;
1516

1617
namespace Microsoft.Azure.WebJobs.Extensions.EventGrid.Tests
1718
{
@@ -30,7 +31,7 @@ public void SetupTestListener()
3031
[Test]
3132
public async Task TestUnsubscribe()
3233
{
33-
var ext = new EventGridExtensionConfigProvider(NullLoggerFactory.Instance);
34+
var ext = new EventGridExtensionConfigProvider(new HttpRequestProcessor(NullLoggerFactory.Instance.CreateLogger<HttpRequestProcessor>()), NullLoggerFactory.Instance);
3435
var host = TestHelpers.NewHost<MyProg1>(ext);
3536
await host.StartAsync(); // add listener
3637

@@ -46,7 +47,7 @@ public async Task TestUnsubscribe()
4647
[Test]
4748
public async Task TestDispatch()
4849
{
49-
var ext = new EventGridExtensionConfigProvider(NullLoggerFactory.Instance);
50+
var ext = new EventGridExtensionConfigProvider(new HttpRequestProcessor(NullLoggerFactory.Instance.CreateLogger<HttpRequestProcessor>()), NullLoggerFactory.Instance);
5051
var host = TestHelpers.NewHost<MyProg1>(ext);
5152
await host.StartAsync(); // add listener
5253

@@ -70,7 +71,7 @@ public async Task TestDispatch()
7071
public async Task TestCloudEvent()
7172
{
7273
// individual elements
73-
var ext = new EventGridExtensionConfigProvider(NullLoggerFactory.Instance);
74+
var ext = new EventGridExtensionConfigProvider(new HttpRequestProcessor(NullLoggerFactory.Instance.CreateLogger<HttpRequestProcessor>()), NullLoggerFactory.Instance);
7475
var host = TestHelpers.NewHost<MyProg1>(ext);
7576
await host.StartAsync(); // add listener
7677

@@ -87,7 +88,7 @@ public async Task TestCloudEvent()
8788
[Test]
8889
public async Task WrongFunctionNameTest()
8990
{
90-
var ext = new EventGridExtensionConfigProvider(NullLoggerFactory.Instance);
91+
var ext = new EventGridExtensionConfigProvider(new HttpRequestProcessor(NullLoggerFactory.Instance.CreateLogger<HttpRequestProcessor>()), NullLoggerFactory.Instance);
9192
var host = TestHelpers.NewHost<MyProg2>(ext);
9293
await host.StartAsync(); // add listener
9394

@@ -103,7 +104,7 @@ public async Task WrongFunctionNameTest()
103104
[Test]
104105
public async Task ExecutionFailureTest()
105106
{
106-
var ext = new EventGridExtensionConfigProvider(NullLoggerFactory.Instance);
107+
var ext = new EventGridExtensionConfigProvider(new HttpRequestProcessor(NullLoggerFactory.Instance.CreateLogger<HttpRequestProcessor>()), NullLoggerFactory.Instance);
107108
var host = TestHelpers.NewHost<MyProg2>(ext);
108109
await host.StartAsync(); // add listener
109110

sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/api/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs.netstandard2.0.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,12 @@ public sealed partial class BlobTriggerAttribute : System.Attribute, Microsoft.A
2222
public BlobTriggerAttribute(string blobPath) { }
2323
public string BlobPath { get { throw null; } }
2424
public string Connection { get { throw null; } set { } }
25+
public Microsoft.Azure.WebJobs.BlobTriggerSource Source { get { throw null; } set { } }
26+
}
27+
public enum BlobTriggerSource
28+
{
29+
LogsAndContainerScan = 0,
30+
EventGrid = 1,
2531
}
2632
}
2733
namespace Microsoft.Azure.WebJobs.Extensions.Storage

sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/BlobTriggerAttribute.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ public sealed class BlobTriggerAttribute : Attribute, IConnectionProvider
3838
{
3939
private readonly string _blobPath;
4040

41+
// LogsAndContainerScan is default kind as it does not require additional actions to set up a blob trigger
42+
private BlobTriggerSource _blobTriggerSource = BlobTriggerSource.LogsAndContainerScan;
43+
4144
/// <summary>
4245
/// Initializes a new instance of the <see cref="BlobTriggerAttribute"/> class.
4346
/// </summary>
@@ -65,5 +68,14 @@ public string BlobPath
6568
{
6669
get { return _blobPath; }
6770
}
71+
72+
/// <summary>
73+
/// Returns a bool value that indicates whether EventGrid is used.
74+
/// </summary>
75+
public BlobTriggerSource Source
76+
{
77+
get { return _blobTriggerSource; }
78+
set { _blobTriggerSource = value; }
79+
}
6880
}
6981
}

0 commit comments

Comments
 (0)