Skip to content

Commit 50f3129

Browse files
committed
Cosmos WIP
1 parent b67bbe1 commit 50f3129

File tree

1 file changed

+12
-13
lines changed

1 file changed

+12
-13
lines changed

function-app/adb-to-purview/src/Function.Domain/Helpers/OlProcessing/ValidateOlEvent.cs

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -29,39 +29,38 @@ public ValidateOlEvent(ILoggerFactory loggerFactory)
2929
_log = loggerFactory.CreateLogger<ValidateOlEvent>();
3030
}
3131

32-
/// <summary>
32+
/// <summary>
3333
/// Helper function to determine if the event is one of
34-
/// the data source v2 ones which need to aggregate data
35-
/// from the start and complete events
34+
/// the data source v2 ones which needs us to save the
35+
/// inputs from the start event
3636
/// </summary>
3737
private bool isDataSourceV2Event(Event olEvent) {
3838
string[] special_cases = {"azurecosmos://", "iceberg://"}; // todo: make this configurable?
39-
// Don't need to process START events here as they have both inputs and outputs
40-
if (olEvent.EventType == "START") return false;
4139

4240
foreach (var outp in olEvent.Outputs)
4341
{
4442
foreach (var source in special_cases)
4543
{
4644
if (outp.NameSpace.StartsWith(source)) return true;
47-
}
45+
}
4846
}
4947
return false;
5048
}
5149

5250
/// <summary>
5351
/// Performs initial validation of OpenLineage input
5452
/// The tested criteria include:
55-
/// 1. Events have both inputs and outputs
56-
/// a. Except for special cases covered in isDataSourceV2Event
53+
/// 1. Events have both inputs and outputs (TODO: UPDATE)
5754
/// 2. Events do not have the same input and output
5855
/// 3. EventType is START or COMPLETE
5956
/// 4. If EventType is START, there is a Environment Facet
6057
/// </summary>
6158
/// <param name="olEvent">OpenLineage Event message</param>
6259
/// <returns>true if input is valid, false if not</returns>
6360
public bool Validate(Event olEvent){
64-
if ((olEvent.Inputs.Count > 0 && olEvent.Outputs.Count > 0) || (olEvent.Outputs.Count > 0 && isDataSourceV2Event(olEvent)))
61+
// if ((olEvent.Inputs.Count > 0 && olEvent.Outputs.Count > 0) || (olEvent.Outputs.Count > 0 && isDataSourceV2Event(olEvent)))
62+
if (olEvent.Outputs.Count > 0) // TODO: check if this breaks any logic down the line.
63+
// Want to save COMPLETE events even if they only have outputs for the cosmos case
6564
{
6665
// Need to rework for multiple inputs and outputs in one packet - possibly combine and then hash
6766
if (InOutEqual(olEvent))
@@ -70,14 +69,14 @@ public bool Validate(Event olEvent){
7069
}
7170
if (olEvent.EventType == "START")
7271
{
73-
if (olEvent.Run.Facets.EnvironmentProperties == null)
74-
{
72+
if (olEvent.Run.Facets.EnvironmentProperties == null || !(olEvent.Inputs.Count > 0 && olEvent.Outputs.Count > 0))
73+
{ // START events should contain both inputs and outputs, as well as the EnvironmentProperties facet
7574
return false;
7675
}
7776
return true;
7877
}
79-
else if (olEvent.EventType == "COMPLETE")
80-
{
78+
else if (olEvent.EventType == "COMPLETE" && olEvent.Outputs.Count > 0)
79+
{ // COMPLETE events might not contain inputs, but should have at least one output.
8180
return true;
8281
}
8382
else

0 commit comments

Comments
 (0)