|
| 1 | +// Copyright (c) Microsoft Corporation. All rights reserved. |
| 2 | +// Licensed under the MIT License. |
| 3 | + |
| 4 | +using System; |
| 5 | +using System.Collections.Generic; |
| 6 | +using System.Linq; |
| 7 | +using System.Net; |
| 8 | +using System.Net.Http; |
| 9 | +using System.Threading; |
| 10 | +using System.Threading.Tasks; |
| 11 | +using System.Web; |
| 12 | +using Azure.Messaging.EventGrid; |
| 13 | +using Microsoft.Azure.WebJobs.Description; |
| 14 | +using Microsoft.Azure.WebJobs.Host.Bindings; |
| 15 | +using Microsoft.Azure.WebJobs.Host.Config; |
| 16 | +using Microsoft.Azure.WebJobs.Host.Executors; |
| 17 | +using Microsoft.Azure.WebJobs.Logging; |
| 18 | +using Microsoft.Extensions.Logging; |
| 19 | +using Newtonsoft.Json; |
| 20 | +using Newtonsoft.Json.Linq; |
| 21 | + |
| 22 | +namespace Microsoft.Azure.WebJobs.Extensions.EventGrid |
| 23 | +{ |
| 24 | + /// <summary> |
| 25 | + /// Defines the configuration options for the EventGrid binding. |
| 26 | + /// </summary> |
| 27 | + [Extension("EventGrid", "EventGrid")] |
| 28 | + internal class EventGridExtensionConfigProvider : IExtensionConfigProvider, |
| 29 | + IAsyncConverter<HttpRequestMessage, HttpResponseMessage> |
| 30 | + { |
| 31 | + private ILogger _logger; |
| 32 | + private readonly ILoggerFactory _loggerFactory; |
| 33 | + private readonly Func<EventGridAttribute, IAsyncCollector<EventGridEvent>> _converter; |
| 34 | + |
| 35 | + // for end to end testing |
| 36 | + internal EventGridExtensionConfigProvider(Func<EventGridAttribute, IAsyncCollector<EventGridEvent>> converter, ILoggerFactory loggerFactory) |
| 37 | + { |
| 38 | + _converter = converter; |
| 39 | + _loggerFactory = loggerFactory; |
| 40 | + } |
| 41 | + |
| 42 | + // default constructor |
| 43 | + public EventGridExtensionConfigProvider(ILoggerFactory loggerFactory) |
| 44 | + { |
| 45 | + _converter = (attr => new EventGridAsyncCollector(new EventGridPublisherClient(new Uri(attr.TopicEndpointUri), new EventGridSharedAccessSignatureCredential(attr.TopicKeySetting)))); |
| 46 | + _loggerFactory = loggerFactory; |
| 47 | + } |
| 48 | + |
| 49 | + public void Initialize(ExtensionConfigContext context) |
| 50 | + { |
| 51 | + if (context == null) |
| 52 | + { |
| 53 | + throw new ArgumentNullException(nameof(context)); |
| 54 | + } |
| 55 | + |
| 56 | + _logger = _loggerFactory.CreateLogger(LogCategories.CreateTriggerCategory("EventGrid")); |
| 57 | + |
| 58 | +#pragma warning disable 618 |
| 59 | + Uri url = context.GetWebhookHandler(); |
| 60 | +#pragma warning restore 618 |
| 61 | + _logger.LogInformation($"registered EventGrid Endpoint = {url?.GetLeftPart(UriPartial.Path)}"); |
| 62 | + |
| 63 | + // Register our extension binding providers |
| 64 | + // use converterManager as a hashTable |
| 65 | + // also take benefit of identity converter |
| 66 | + context |
| 67 | + .AddBindingRule<EventGridTriggerAttribute>() // following converters are for EventGridTriggerAttribute only |
| 68 | + .AddConverter<JToken, string>((jtoken) => jtoken.ToString(Formatting.Indented)) |
| 69 | + .AddConverter<JToken, string[]>((jarray) => jarray.Select(ar => ar.ToString(Formatting.Indented)).ToArray()) |
| 70 | + .AddConverter<JToken, DirectInvokeString>((jtoken) => new DirectInvokeString(null)) |
| 71 | + .AddConverter<JToken, EventGridEvent>((jobject) => jobject.ToObject<EventGridEvent>()) // surface the type to function runtime |
| 72 | + .AddConverter<JToken, EventGridEvent[]>((jobject) => jobject.ToObject<EventGridEvent[]>()) // surface the type to function runtime |
| 73 | + .AddOpenConverter<JToken, OpenType.Poco>(typeof(JTokenToPocoConverter<>)) |
| 74 | + .AddOpenConverter<JToken, OpenType.Poco[]>(typeof(JTokenToPocoConverter<>)) |
| 75 | + .BindToTrigger<JToken>(new EventGridTriggerAttributeBindingProvider(this)); |
| 76 | + |
| 77 | + |
| 78 | + // Register the output binding |
| 79 | + var rule = context |
| 80 | + .AddBindingRule<EventGridAttribute>() |
| 81 | + .AddConverter<string, EventGridEvent>((str) => EventGridEvent.Parse(str).Single()) |
| 82 | + .AddConverter<JObject, EventGridEvent>((jobject) => EventGridEvent.Parse(jobject.ToString()).Single()); |
| 83 | + rule.BindToCollector(_converter); |
| 84 | + rule.AddValidator((a, t) => |
| 85 | + { |
| 86 | + // if app setting is missing, it will be caught by runtime |
| 87 | + // this logic tries to validate the practicality of attribute properties |
| 88 | + if (string.IsNullOrWhiteSpace(a.TopicKeySetting)) |
| 89 | + { |
| 90 | + throw new InvalidOperationException($"The '{nameof(EventGridAttribute.TopicKeySetting)}' property must be the name of an application setting containing the Topic Key"); |
| 91 | + } |
| 92 | + |
| 93 | + if (!Uri.IsWellFormedUriString(a.TopicEndpointUri, UriKind.Absolute)) |
| 94 | + { |
| 95 | + throw new InvalidOperationException($"The '{nameof(EventGridAttribute.TopicEndpointUri)}' property must be a valid absolute Uri"); |
| 96 | + } |
| 97 | + }); |
| 98 | + } |
| 99 | + |
| 100 | + private Dictionary<string, EventGridListener> _listeners = new Dictionary<string, EventGridListener>(); |
| 101 | + |
| 102 | + internal void AddListener(string key, EventGridListener listener) |
| 103 | + { |
| 104 | + _listeners.Add(key, listener); |
| 105 | + } |
| 106 | + |
| 107 | + public async Task<HttpResponseMessage> ConvertAsync(HttpRequestMessage input, CancellationToken cancellationToken) |
| 108 | + { |
| 109 | + var response = ProcessAsync(input); |
| 110 | + return await response.ConfigureAwait(false); |
| 111 | + } |
| 112 | + |
| 113 | + private async Task<HttpResponseMessage> ProcessAsync(HttpRequestMessage req) |
| 114 | + { |
| 115 | + // webjobs.script uses req.GetQueryNameValuePairs(); |
| 116 | + // which requires webapi.core...but this does not work for .netframework2.0 |
| 117 | + // TODO change this once webjobs.script is migrated |
| 118 | + var functionName = HttpUtility.ParseQueryString(req.RequestUri.Query)["functionName"]; |
| 119 | + if (String.IsNullOrEmpty(functionName) || !_listeners.ContainsKey(functionName)) |
| 120 | + { |
| 121 | + _logger.LogInformation($"cannot find function: '{functionName}', available function names: [{string.Join(", ", _listeners.Keys.ToArray())}]"); |
| 122 | + return new HttpResponseMessage(HttpStatusCode.NotFound) { Content = new StringContent($"cannot find function: '{functionName}'") }; |
| 123 | + } |
| 124 | + |
| 125 | + IEnumerable<string> eventTypeHeaders = null; |
| 126 | + string eventTypeHeader = null; |
| 127 | + if (req.Headers.TryGetValues("aeg-event-type", out eventTypeHeaders)) |
| 128 | + { |
| 129 | + eventTypeHeader = eventTypeHeaders.First(); |
| 130 | + } |
| 131 | + |
| 132 | + if (String.Equals(eventTypeHeader, "SubscriptionValidation", StringComparison.OrdinalIgnoreCase)) |
| 133 | + { |
| 134 | + string jsonArray = await req.Content.ReadAsStringAsync().ConfigureAwait(false); |
| 135 | + SubscriptionValidationEvent validationEvent = null; |
| 136 | + List<JObject> events = JsonConvert.DeserializeObject<List<JObject>>(jsonArray); |
| 137 | + // TODO remove unnecessary serialization |
| 138 | + validationEvent = ((JObject)events[0]["data"]).ToObject<SubscriptionValidationEvent>(); |
| 139 | + SubscriptionValidationResponse validationResponse = new SubscriptionValidationResponse { ValidationResponse = validationEvent.ValidationCode }; |
| 140 | + var returnMessage = new HttpResponseMessage(HttpStatusCode.OK); |
| 141 | + returnMessage.Content = new StringContent(JsonConvert.SerializeObject(validationResponse)); |
| 142 | + _logger.LogInformation($"perform handshake with eventGrid for function: {functionName}"); |
| 143 | + return returnMessage; |
| 144 | + } |
| 145 | + else if (String.Equals(eventTypeHeader, "Notification", StringComparison.OrdinalIgnoreCase)) |
| 146 | + { |
| 147 | + JArray events = null; |
| 148 | + string requestContent = await req.Content.ReadAsStringAsync().ConfigureAwait(false); |
| 149 | + var token = JToken.Parse(requestContent); |
| 150 | + if (token.Type == JTokenType.Array) |
| 151 | + { |
| 152 | + // eventgrid schema |
| 153 | + events = (JArray)token; |
| 154 | + } |
| 155 | + else if (token.Type == JTokenType.Object) |
| 156 | + { |
| 157 | + // cloudevent schema |
| 158 | + events = new JArray |
| 159 | + { |
| 160 | + token |
| 161 | + }; |
| 162 | + } |
| 163 | + |
| 164 | + List<Task<FunctionResult>> executions = new List<Task<FunctionResult>>(); |
| 165 | + |
| 166 | + // Single Dispatch |
| 167 | + if (_listeners[functionName].SingleDispatch) |
| 168 | + { |
| 169 | + foreach (var ev in events) |
| 170 | + { |
| 171 | + // assume each event is a JObject |
| 172 | + TriggeredFunctionData triggerData = new TriggeredFunctionData |
| 173 | + { |
| 174 | + TriggerValue = ev |
| 175 | + }; |
| 176 | + executions.Add(_listeners[functionName].Executor.TryExecuteAsync(triggerData, CancellationToken.None)); |
| 177 | + } |
| 178 | + await Task.WhenAll(executions).ConfigureAwait(false); |
| 179 | + } |
| 180 | + // Batch Dispatch |
| 181 | + else |
| 182 | + { |
| 183 | + TriggeredFunctionData triggerData = new TriggeredFunctionData |
| 184 | + { |
| 185 | + TriggerValue = events |
| 186 | + }; |
| 187 | + executions.Add(_listeners[functionName].Executor.TryExecuteAsync(triggerData, CancellationToken.None)); |
| 188 | + } |
| 189 | + |
| 190 | + // FIXME without internal queuing, we are going to process all events in parallel |
| 191 | + // and return 500 if there's at least one failure...which will cause EventGrid to resend the entire payload |
| 192 | + foreach (var execution in executions) |
| 193 | + { |
| 194 | + if (!execution.Result.Succeeded) |
| 195 | + { |
| 196 | + return new HttpResponseMessage(HttpStatusCode.InternalServerError) { Content = new StringContent(execution.Result.Exception.Message) }; |
| 197 | + } |
| 198 | + } |
| 199 | + |
| 200 | + return new HttpResponseMessage(HttpStatusCode.Accepted); |
| 201 | + } |
| 202 | + else if (String.Equals(eventTypeHeader, "Unsubscribe", StringComparison.OrdinalIgnoreCase)) |
| 203 | + { |
| 204 | + // TODO disable function? |
| 205 | + return new HttpResponseMessage(HttpStatusCode.Accepted); |
| 206 | + } |
| 207 | + |
| 208 | + return new HttpResponseMessage(HttpStatusCode.BadRequest); |
| 209 | + |
| 210 | + } |
| 211 | + |
| 212 | + private class JTokenToPocoConverter<T> : IConverter<JToken, T> |
| 213 | + { |
| 214 | + public T Convert(JToken input) |
| 215 | + { |
| 216 | + return input.ToObject<T>(); |
| 217 | + } |
| 218 | + } |
| 219 | + } |
| 220 | +} |
0 commit comments