-
Notifications
You must be signed in to change notification settings - Fork 59
Feature/support cosmos #130
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
ee938d1
512e59d
7db1ea3
cb94a7a
92b6fb7
5baee87
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -143,12 +143,26 @@ private async Task<bool> ProcessStartEvent(Event olEvent, string jobRunId, Envir | |
| } | ||
| try | ||
| { | ||
| var entity = new TableEntity(TABLE_PARTITION, olEvent.Run.RunId) | ||
| if (olEvent.Inputs.Count > 0) | ||
| // Store inputs and env facet. | ||
| { | ||
| { "EnvFacet", JsonConvert.SerializeObject(olEvent.Run.Facets.EnvironmentProperties) } | ||
| }; | ||
| var entity = new TableEntity(TABLE_PARTITION, olEvent.Run.RunId) | ||
| { | ||
| { "EnvFacet", JsonConvert.SerializeObject(olEvent.Run.Facets.EnvironmentProperties) }, | ||
| { "Inputs", JsonConvert.SerializeObject(olEvent.Inputs) } | ||
|
|
||
| }; | ||
| await _tableClient.AddEntityAsync(entity); | ||
| } | ||
| else { | ||
| // Store only env facet. | ||
| var entity = new TableEntity(TABLE_PARTITION, olEvent.Run.RunId) | ||
| { | ||
| { "EnvFacet", JsonConvert.SerializeObject(olEvent.Run.Facets.EnvironmentProperties) } | ||
|
|
||
| await _tableClient.AddEntityAsync(entity); | ||
| }; | ||
| await _tableClient.AddEntityAsync(entity); | ||
| } | ||
| } | ||
| catch (RequestFailedException ex) | ||
| { | ||
|
|
@@ -159,6 +173,7 @@ private async Task<bool> ProcessStartEvent(Event olEvent, string jobRunId, Envir | |
| _log.LogError(ex, $"OlMessageConsolodation-ProcessStartEvent: Error {ex.Message} when processing entity"); | ||
| return false; | ||
| } | ||
|
|
||
| return true; | ||
| } | ||
|
|
||
|
|
@@ -170,6 +185,7 @@ private async Task<bool> JoinEventData(Event olEvent, string jobRunId) | |
| } | ||
|
|
||
| TableEntity te; | ||
| TableEntity te_inputs; | ||
|
|
||
| // Processing time can sometimes cause complete events | ||
| int retryCount = 4; | ||
|
|
@@ -195,6 +211,28 @@ private async Task<bool> JoinEventData(Event olEvent, string jobRunId) | |
| await Task.Delay(delay); | ||
| } | ||
|
|
||
| // Get inputs. Todo: Check if more efficient to get inputs within the same while loop above. Can we get 2 entities at the same time? | ||
| currentRetry = 0; | ||
| while (true) | ||
| { | ||
| try | ||
| { | ||
| _log.LogInformation("Trying to get inputs"); | ||
| te_inputs = await _tableClient.GetEntityAsync<TableEntity>(TABLE_PARTITION, olEvent.Run.RunId, new string[] { "Inputs" }); | ||
| break; | ||
| } | ||
| catch (RequestFailedException) | ||
| { | ||
| currentRetry++; | ||
| _log.LogWarning($"Start event was missing, retrying to consolidate message to get inputs. Retry count: {currentRetry}"); | ||
| if (currentRetry > retryCount) | ||
| { | ||
| return false; | ||
| } | ||
| } | ||
| await Task.Delay(delay); | ||
| } | ||
|
|
||
| // Add Environment to event | ||
| var envFacet = JsonConvert.DeserializeObject<EnvironmentPropsParent>(te["EnvFacet"].ToString() ?? ""); | ||
| if (envFacet is null) | ||
|
|
@@ -204,15 +242,28 @@ private async Task<bool> JoinEventData(Event olEvent, string jobRunId) | |
| } | ||
| olEvent.Run.Facets.EnvironmentProperties = envFacet; | ||
|
|
||
| // clean up table over time | ||
| try | ||
| { | ||
| var delresp = await _tableClient.DeleteEntityAsync(TABLE_PARTITION, olEvent.Run.RunId); | ||
| } | ||
| catch (Exception ex) | ||
| { | ||
| _log.LogError(ex, $"OlMessageConsolodation-JoinEventData: Error {ex.Message} when deleting entity"); | ||
| } | ||
| // Add Inputs to event if not already there (will only be done for DataSourceV2 sources) | ||
| if (olEvent.Inputs.Count == 0) { | ||
| var inputs = JsonConvert.DeserializeObject<List<Inputs>>(te_inputs["Inputs"].ToString() ?? ""); | ||
|
|
||
| if (inputs is null) | ||
| { | ||
| _log.LogWarning($"OlMessageConsolodation-JoinEventData: Warning: no inputs found for datasource v2 COMPLETE event"); | ||
| return false; | ||
| } | ||
| olEvent.Inputs = inputs; | ||
|
|
||
| } | ||
|
|
||
| // clean up table over time. | ||
| try | ||
| { | ||
| var delresp = await _tableClient.DeleteEntityAsync(TABLE_PARTITION, olEvent.Run.RunId); | ||
| } | ||
| catch (Exception ex) | ||
| { | ||
| _log.LogError(ex, $"OlMessageConsolodation-JoinEventData: Error {ex.Message} when deleting entity"); | ||
| } | ||
|
|
||
| return true; | ||
| } | ||
|
|
@@ -228,11 +279,32 @@ private bool IsStartEventEnvironment(Event olEvent) | |
| return false; | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Helper function to determine if the event is one of | ||
| /// the data source v2 ones which need to aggregate data | ||
| /// from the start and complete events | ||
| /// </summary> | ||
| private bool isDataSourceV2Event(Event olEvent) { | ||
| string[] special_cases = {"azurecosmos://", "iceberg://"}; // todo: make this configurable? | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed, we should definitely make this part of the configuration instead. Maybe something like openLineageDataSourceV2Prefixes as the parameter and have it separated by semicolon (;) Similar to the Spark_Entities config. Is there ever a case where "://" would not be part of the prefix? |
||
| // Don't need to process START events here as they have both inputs and outputs | ||
| if (olEvent.EventType == "START") return false; | ||
|
|
||
| foreach (var outp in olEvent.Outputs) | ||
| { | ||
| foreach (var source in special_cases) | ||
| { | ||
| if (outp.NameSpace.StartsWith(source)) return true; | ||
| } | ||
| } | ||
| return false; | ||
| } | ||
|
|
||
| private bool IsJoinEvent(Event olEvent) | ||
| { | ||
| string[] special_cases = {"cosmos", "iceberg"}; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this get used in here? |
||
| if (olEvent.EventType == COMPLETE_EVENT) | ||
| { | ||
| if (olEvent.Inputs.Count > 0 && olEvent.Outputs.Count > 0) | ||
| if ((olEvent.Inputs.Count > 0 && olEvent.Outputs.Count > 0) || (olEvent.Outputs.Count > 0 && isDataSourceV2Event(olEvent))) | ||
| { | ||
| return true; | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -29,18 +29,39 @@ public ValidateOlEvent(ILoggerFactory loggerFactory) | |
| _log = loggerFactory.CreateLogger<ValidateOlEvent>(); | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Helper function to determine if the event is one of | ||
| /// the data source v2 ones which need to aggregate data | ||
| /// from the start and complete events | ||
| /// </summary> | ||
| private bool isDataSourceV2Event(Event olEvent) { | ||
| string[] special_cases = {"azurecosmos://", "iceberg://"}; // todo: make this configurable? | ||
| // Don't need to process START events here as they have both inputs and outputs | ||
| if (olEvent.EventType == "START") return false; | ||
|
|
||
| foreach (var outp in olEvent.Outputs) | ||
| { | ||
| foreach (var source in special_cases) | ||
| { | ||
| if (outp.NameSpace.StartsWith(source)) return true; | ||
| } | ||
| } | ||
| return false; | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Performs initial validation of OpenLineage input | ||
| /// The tested criteria include: | ||
| /// 1. Events have both inputs and outputs | ||
| /// a. Except for special cases covered in isDataSourceV2Event | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we add why this is necessary? Such as "OpenLineage does not emit a start event or complete event with all of the necessary information and so must be consolidated later on"? |
||
| /// 2. Events do not have the same input and output | ||
| /// 3. EventType is START or COMPLETE | ||
| /// 4. If EventType is START, there is a Environment Facet | ||
| /// </summary> | ||
| /// <param name="olEvent">OpenLineage Event message</param> | ||
| /// <returns>true if input is valid, false if not</returns> | ||
| public bool Validate(Event olEvent){ | ||
| if (olEvent.Inputs.Count > 0 && olEvent.Outputs.Count > 0) | ||
| if ((olEvent.Inputs.Count > 0 && olEvent.Outputs.Count > 0) || (olEvent.Outputs.Count > 0 && isDataSourceV2Event(olEvent))) | ||
| { | ||
| // Need to rework for multiple inputs and outputs in one packet - possibly combine and then hash | ||
| if (InOutEqual(olEvent)) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a case where there may be an input present initially and then more inputs later on?
What about a scenario where ABFSS is used and joined with a Data Source V2 source like Cosmos?