Skip to content

Commit 636b13e

Browse files
committed
Fix null error when accessing inputs from table storage, and clean up code
1 parent e91f145 commit 636b13e

File tree

3 files changed

+39
-71
lines changed

3 files changed

+39
-71
lines changed

function-app/adb-to-purview/src/Function.Domain/Helpers/OlProcessing/OlMessageConsolodation.cs

Lines changed: 30 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -215,9 +215,9 @@ private async Task<bool> JoinEventData(Event olEvent, string jobRunId)
215215
await Task.Delay(delay);
216216
}
217217

218-
// Get inputs. Todo: Check if more efficient to get inputs within the same while loop above. Can we get 2 entities at the same time?
218+
// Get inputs. TODO: Check if more efficient to get inputs within the same while loop above. Can we get 2 entities at the same time?
219219
currentRetry = 0;
220-
while (ret_val) // ret_val instead of just true, because if didn't have the env_facet then don't bother getting inputs either
220+
while (ret_val) // use a variable instead of just true, because if we didn't have the env_facet then we don't need to get inputs
221221
{
222222
try
223223
{
@@ -248,45 +248,36 @@ private async Task<bool> JoinEventData(Event olEvent, string jobRunId)
248248

249249
// Check if saved any inputs from the START event (will only be done for events containing DataSourceV2 sources)
250250
if (te_inputs is not null) {
251-
// TODO: Possible source of error.
252-
if (te_inputs.ContainsKey("Inputs")){
253-
_log.LogInformation($"New Code #1");
251+
252+
if (te_inputs.ContainsKey("Inputs")) {
254253
try {
255-
var saved_inputs = JsonConvert.DeserializeObject<List<Inputs>>(te_inputs["Inputs"].ToString() ?? "");
256-
_log.LogInformation($"New Code #1.1");
257-
_log.LogInformation("Inputs in dictionary? ", te_inputs.ContainsKey("Inputs").ToString());
258-
259-
if (saved_inputs is null) {
260-
// Unecessary check?
261-
_log.LogInformation($"OlMessageConsolodation-JoinEventData: No inputs found for COMPLETE event");
262-
//ret_val = false;
263-
}
264-
265-
else {
266-
// Check inputs saved against inputs captured in this COMPLETE event and combine while removing any duplicates.
267-
// Checking for duplicates needed since we save all the inputs captured from the START event. Perhaps it may be better to
268-
// only save the DataSourceV2 inputs?
269-
var inputs = new List<Inputs>(saved_inputs.Count + olEvent.Inputs.Count);
270-
inputs.AddRange(saved_inputs);
271-
inputs.AddRange(olEvent.Inputs);
272-
var unique_inputs = inputs.Distinct();
273-
olEvent.Inputs = unique_inputs.ToList();
274-
_log.LogInformation($"OlMessageConsolodation-JoinEventData: Captured inputs for COMPLETE event");
254+
//TODO: Find out why inputs might be null. Technically inputs are only added to the table if they exist. This is also not an issue when running locally.
255+
if (te_inputs["Inputs"] != null) {
256+
257+
var saved_inputs = JsonConvert.DeserializeObject<List<Inputs>>(te_inputs["Inputs"].ToString() ?? "");
258+
259+
if (saved_inputs is null) {
260+
_log.LogInformation($"OlMessageConsolodation-JoinEventData: No inputs found for COMPLETE event");
261+
}
262+
263+
else {
264+
// Check inputs saved against inputs captured in this COMPLETE event and combine while removing any duplicates.
265+
// Checking for duplicates needed since we save all the inputs captured from the START event. Perhaps it may be better to
266+
// only save the DataSourceV2 inputs?
267+
var inputs = new List<Inputs>(saved_inputs.Count + olEvent.Inputs.Count);
268+
inputs.AddRange(saved_inputs);
269+
inputs.AddRange(olEvent.Inputs);
270+
var unique_inputs = inputs.Distinct();
271+
olEvent.Inputs = unique_inputs.ToList();
272+
_log.LogInformation($"OlMessageConsolodation-JoinEventData: Captured inputs for COMPLETE event");
273+
}
275274
}
276275
}
277276
catch (System.Exception ex) {
278277
_log.LogError(ex, $"OlMessageConsolodation-JoinEventData: Error {ex.Message} when deserializing inputs");
279278
ret_val = false;
280279
}
281280

282-
283-
284-
}
285-
286-
else {
287-
_log.LogInformation($"New Code #2");
288-
_log.LogInformation($"OlMessageConsolodation-JoinEventData: No inputs found for COMPLETE event");
289-
//ret_val = false;
290281
}
291282

292283
}
@@ -301,9 +292,9 @@ private async Task<bool> JoinEventData(Event olEvent, string jobRunId)
301292
_log.LogError(ex, $"OlMessageConsolodation-JoinEventData: Error {ex.Message} when deleting entity");
302293
}
303294

304-
// If no inputs were saved from the start event, then we need to make sure we're only processing this COMPLETE event
305-
// if it has both inputs and outputs (reflects original logic, prior to supporting DataSourceV2 events)
306-
if (te_inputs is null && !(olEvent.Inputs.Count > 0 && olEvent.Outputs.Count > 0)) {
295+
// Need to make sure we're only processing this COMPLETE event if it has both
296+
// inputs and outputs (reflects original logic, prior to supporting DataSourceV2 events)
297+
if (!(olEvent.Inputs.Count > 0 && olEvent.Outputs.Count > 0)) {
307298
ret_val = false;
308299
}
309300

@@ -314,9 +305,9 @@ private async Task<bool> JoinEventData(Event olEvent, string jobRunId)
314305
private bool IsStartEventEnvironment(Event olEvent)
315306
{
316307
if (olEvent.EventType == START_EVENT && olEvent.Run.Facets.EnvironmentProperties != null)
317-
{
318-
return true;
319-
}
308+
{
309+
return true;
310+
}
320311

321312
return false;
322313
}

function-app/adb-to-purview/src/Function.Domain/Helpers/OlProcessing/ValidateOlEvent.cs

Lines changed: 8 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -29,54 +29,37 @@ public ValidateOlEvent(ILoggerFactory loggerFactory)
2929
_log = loggerFactory.CreateLogger<ValidateOlEvent>();
3030
}
3131

32-
/// <summary>
33-
/// Helper function to determine if the event is one of
34-
/// the data source v2 ones which needs us to save the
35-
/// inputs from the start event
36-
/// </summary>
37-
// private bool isDataSourceV2Event(Event olEvent) {
38-
// string[] special_cases = {"azurecosmos://", "iceberg://"}; // todo: make this configurable?
39-
40-
// foreach (var outp in olEvent.Outputs)
41-
// {
42-
// foreach (var source in special_cases)
43-
// {
44-
// if (outp.NameSpace.StartsWith(source)) return true;
45-
// }
46-
// }
47-
// return false;
48-
// }
49-
5032
/// <summary>
5133
/// Performs initial validation of OpenLineage input
5234
/// The tested criteria include:
53-
/// 1. Events have both inputs and outputs (TODO: UPDATE)
35+
/// 1. Events have outputs (not both inputs and outputs, because in the case of DataSourceV2 events, the COMPLETE event will not have inputs)
5436
/// 2. Events do not have the same input and output
5537
/// 3. EventType is START or COMPLETE
5638
/// 4. If EventType is START, there is a Environment Facet
5739
/// </summary>
5840
/// <param name="olEvent">OpenLineage Event message</param>
5941
/// <returns>true if input is valid, false if not</returns>
6042
public bool Validate(Event olEvent){
61-
// if ((olEvent.Inputs.Count > 0 && olEvent.Outputs.Count > 0) || (olEvent.Outputs.Count > 0 && isDataSourceV2Event(olEvent)))
62-
if (olEvent.Outputs.Count > 0) // TODO: check if this breaks any logic down the line.
63-
// Want to save COMPLETE events even if they only have outputs for the cosmos case
43+
if (olEvent.Outputs.Count > 0)
44+
// Want to save COMPLETE events even if they only have outputs, to deal with cosmos
6445
{
6546
// Need to rework for multiple inputs and outputs in one packet - possibly combine and then hash
6647
if (InOutEqual(olEvent))
67-
{
48+
{
6849
return false;
6950
}
7051
if (olEvent.EventType == "START")
7152
{
53+
// START events should contain both inputs and outputs, as well as the EnvironmentProperties facet
7254
if (olEvent.Run.Facets.EnvironmentProperties == null || !(olEvent.Inputs.Count > 0 && olEvent.Outputs.Count > 0))
73-
{ // START events should contain both inputs and outputs, as well as the EnvironmentProperties facet
55+
{
7456
return false;
7557
}
7658
return true;
7759
}
60+
// COMPLETE events might not contain inputs, but should have at least one output.
7861
else if (olEvent.EventType == "COMPLETE" && olEvent.Outputs.Count > 0)
79-
{ // COMPLETE events might not contain inputs, but should have at least one output.
62+
{
8063
return true;
8164
}
8265
else

function-app/adb-to-purview/src/Function.Domain/Services/OlConsolodateEnrich.cs

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -93,26 +93,20 @@ public OlConsolodateEnrich(
9393
// consolidate and enrich the complete event if possible
9494
else if (_event.EventType == COMPLETE_EVENT_TYPE)
9595
{
96-
_logger.LogInformation("is COMPLETE_EVENT_TYPE #1");
9796
var consolodatedEvent = await olMessageConsolodation.ConsolodateCompleteEvent(_event, _event.Run.RunId);
9897
if (consolodatedEvent == null)
9998
{
100-
_logger.LogInformation("is COMPLETE_EVENT_TYPE #2");
10199
return null;
102100
}
103101
else
104102
{
105-
_logger.LogInformation("is COMPLETE_EVENT_TYPE #3");
106103
var enrichedEvent = await olEnrichMessage.GetEnrichedEvent(consolodatedEvent);
107-
_logger.LogInformation("is COMPLETE_EVENT_TYPE #4");
108104

109105
if (enrichedEvent == null)
110106
{
111-
_logger.LogInformation("is COMPLETE_EVENT_TYPE #5");
112107
return null;
113108
}
114-
_logger.LogInformation("is COMPLETE_EVENT_TYPE #6");
115-
109+
116110
return enrichedEvent;
117111
}
118112
}

0 commit comments

Comments
 (0)