Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions deployment/infra/OlToPurviewMappings.json
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,19 @@
"qualifiedName": "mysql://{nameSpcBodyParts[0]}/{nameSpcBodyParts[2]}/{nameGroups[0]}",
"purviewDataType": "azure_mysql_table",
"purviewPrefix": "mysql"
},
{
"name": "azureCosmos",
"parserConditions": [
{
"op1": "prefix",
"compare": "=",
"op2": "azurecosmos"
}
],
"qualifiedName": "https://{nameSpcBodyParts[0]}/{nameSpcBodyParts[1]}/{nameSpcBodyParts[2]}/{nameGroups[0]}",
"purviewDataType": "azure_cosmosdb_sqlapi_collection",
"purviewPrefix": "https"
}
]
}
2 changes: 1 addition & 1 deletion deployment/infra/newdeploymenttemp.json

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -143,12 +143,26 @@ private async Task<bool> ProcessStartEvent(Event olEvent, string jobRunId, Envir
}
try
{
var entity = new TableEntity(TABLE_PARTITION, olEvent.Run.RunId)
if (olEvent.Inputs.Count > 0)
// Store inputs and env facet.
{
{ "EnvFacet", JsonConvert.SerializeObject(olEvent.Run.Facets.EnvironmentProperties) }
};
var entity = new TableEntity(TABLE_PARTITION, olEvent.Run.RunId)
{
{ "EnvFacet", JsonConvert.SerializeObject(olEvent.Run.Facets.EnvironmentProperties) },
{ "Inputs", JsonConvert.SerializeObject(olEvent.Inputs) }

};
await _tableClient.AddEntityAsync(entity);
}
else {
// Store only env facet.
var entity = new TableEntity(TABLE_PARTITION, olEvent.Run.RunId)
{
{ "EnvFacet", JsonConvert.SerializeObject(olEvent.Run.Facets.EnvironmentProperties) }

await _tableClient.AddEntityAsync(entity);
};
await _tableClient.AddEntityAsync(entity);
}
}
catch (RequestFailedException ex)
{
Expand All @@ -159,6 +173,7 @@ private async Task<bool> ProcessStartEvent(Event olEvent, string jobRunId, Envir
_log.LogError(ex, $"OlMessageConsolodation-ProcessStartEvent: Error {ex.Message} when processing entity");
return false;
}

return true;
}

Expand All @@ -170,6 +185,7 @@ private async Task<bool> JoinEventData(Event olEvent, string jobRunId)
}

TableEntity te;
TableEntity te_inputs;

// Processing time can sometimes cause complete events
int retryCount = 4;
Expand All @@ -195,6 +211,28 @@ private async Task<bool> JoinEventData(Event olEvent, string jobRunId)
await Task.Delay(delay);
}

// Get inputs. Todo: Check if more efficient to get inputs within the same while loop above. Can we get 2 entities at the same time?
currentRetry = 0;
while (true)
{
try
{
_log.LogInformation("Trying to get inputs");
te_inputs = await _tableClient.GetEntityAsync<TableEntity>(TABLE_PARTITION, olEvent.Run.RunId, new string[] { "Inputs" });
break;
}
catch (RequestFailedException)
{
currentRetry++;
_log.LogWarning($"Start event was missing, retrying to consolidate message to get inputs. Retry count: {currentRetry}");
if (currentRetry > retryCount)
{
return false;
}
}
await Task.Delay(delay);
}

// Add Environment to event
var envFacet = JsonConvert.DeserializeObject<EnvironmentPropsParent>(te["EnvFacet"].ToString() ?? "");
if (envFacet is null)
Expand All @@ -204,15 +242,28 @@ private async Task<bool> JoinEventData(Event olEvent, string jobRunId)
}
olEvent.Run.Facets.EnvironmentProperties = envFacet;

// clean up table over time
try
{
var delresp = await _tableClient.DeleteEntityAsync(TABLE_PARTITION, olEvent.Run.RunId);
}
catch (Exception ex)
{
_log.LogError(ex, $"OlMessageConsolodation-JoinEventData: Error {ex.Message} when deleting entity");
}
// Add Inputs to event if not already there (will only be done for DataSourceV2 sources)
if (olEvent.Inputs.Count == 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a case where there may be an input present initially and then more inputs later on?

What about a scenario where ABFSS is used and joined with a Data Source V2 source like Cosmos?

var inputs = JsonConvert.DeserializeObject<List<Inputs>>(te_inputs["Inputs"].ToString() ?? "");

if (inputs is null)
{
_log.LogWarning($"OlMessageConsolodation-JoinEventData: Warning: no inputs found for datasource v2 COMPLETE event");
return false;
}
olEvent.Inputs = inputs;

}

// clean up table over time.
try
{
var delresp = await _tableClient.DeleteEntityAsync(TABLE_PARTITION, olEvent.Run.RunId);
}
catch (Exception ex)
{
_log.LogError(ex, $"OlMessageConsolodation-JoinEventData: Error {ex.Message} when deleting entity");
}

return true;
}
Expand All @@ -228,11 +279,32 @@ private bool IsStartEventEnvironment(Event olEvent)
return false;
}

/// <summary>
/// Helper function to determine if the event is one of
/// the data source v2 ones which need to aggregate data
/// from the start and complete events
/// </summary>
private bool isDataSourceV2Event(Event olEvent) {
string[] special_cases = {"azurecosmos://", "iceberg://"}; // todo: make this configurable?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, we should definitely make this part of the configuration instead. Maybe something like openLineageDataSourceV2Prefixes as the parameter and have it separated by semicolon (;)

Similar to the Spark_Entities config. Is there ever a case where "://" would not be part of the prefix?

// Don't need to process START events here as they have both inputs and outputs
if (olEvent.EventType == "START") return false;

foreach (var outp in olEvent.Outputs)
{
foreach (var source in special_cases)
{
if (outp.NameSpace.StartsWith(source)) return true;
}
}
return false;
}

private bool IsJoinEvent(Event olEvent)
{
string[] special_cases = {"cosmos", "iceberg"};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this get used in here?

if (olEvent.EventType == COMPLETE_EVENT)
{
if (olEvent.Inputs.Count > 0 && olEvent.Outputs.Count > 0)
if ((olEvent.Inputs.Count > 0 && olEvent.Outputs.Count > 0) || (olEvent.Outputs.Count > 0 && isDataSourceV2Event(olEvent)))
{
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,39 @@ public ValidateOlEvent(ILoggerFactory loggerFactory)
_log = loggerFactory.CreateLogger<ValidateOlEvent>();
}

/// <summary>
/// Helper function to determine if the event is one of
/// the data source v2 ones which need to aggregate data
/// from the start and complete events
/// </summary>
private bool isDataSourceV2Event(Event olEvent) {
string[] special_cases = {"azurecosmos://", "iceberg://"}; // todo: make this configurable?
// Don't need to process START events here as they have both inputs and outputs
if (olEvent.EventType == "START") return false;

foreach (var outp in olEvent.Outputs)
{
foreach (var source in special_cases)
{
if (outp.NameSpace.StartsWith(source)) return true;
}
}
return false;
}

/// <summary>
/// Performs initial validation of OpenLineage input
/// The tested criteria include:
/// 1. Events have both inputs and outputs
/// a. Except for special cases covered in isDataSourceV2Event
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add why this is necessary? Such as "OpenLineage does not emit a start event or complete event with all of the necessary information and so must be consolidated later on"?

/// 2. Events do not have the same input and output
/// 3. EventType is START or COMPLETE
/// 4. If EventType is START, there is a Environment Facet
/// </summary>
/// <param name="olEvent">OpenLineage Event message</param>
/// <returns>true if input is valid, false if not</returns>
public bool Validate(Event olEvent){
if (olEvent.Inputs.Count > 0 && olEvent.Outputs.Count > 0)
if ((olEvent.Inputs.Count > 0 && olEvent.Outputs.Count > 0) || (olEvent.Outputs.Count > 0 && isDataSourceV2Event(olEvent)))
{
// Need to rework for multiple inputs and outputs in one packet - possibly combine and then hash
if (InOutEqual(olEvent))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,6 @@ public DatabricksProcess GetDatabricksProcess(string taskQn)
{
var databricksProcess = new DatabricksProcess();
//var ColumnAttributes = new ColumnLevelAttributes();

var inputs = new List<InputOutput>();
foreach (IInputsOutputs input in _eEvent.OlEvent!.Inputs)
{
Expand All @@ -292,10 +291,9 @@ public DatabricksProcess GetDatabricksProcess(string taskQn)
{
outputs.Add(GetInputOutputs(output));
}

databricksProcess.Attributes = GetProcAttributes(taskQn, inputs,outputs,_eEvent.OlEvent);
//databricksProcess.Attributes.ColumnMapping = JsonConvert.SerializeObject(_colParser.GetColIdentifiers());
databricksProcess.RelationshipAttributes.Task.QualifiedName = taskQn;
databricksProcess.RelationshipAttributes.Task.QualifiedName = taskQn;
return databricksProcess;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public PurviewIdentifier GetIdentifiers(string nameSpace, string name)
// Break the name and nameSpace values into their individual / referencable parts
var olParts = new OlParts(nameSpace, name);


// Get a dictionary assigning the configuration string keys to each of the olParts
var olDynParts = olParts.GetDynamicPairs(JSON_KEY_NAMES);

Expand All @@ -75,7 +76,6 @@ public PurviewIdentifier GetIdentifiers(string nameSpace, string name)

// Use the relevant configuration mapping and the olParts to construct the PurviewIdentifier
purviewIdentifier = GetPurviewIdentifier(mapping, olDynParts);

return purviewIdentifier;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,10 @@ public OlConsolodateEnrich(

var validateOlEvent = new ValidateOlEvent(_loggerFactory);
var olMessageConsolodation = new OlMessageConsolodation(_loggerFactory, _configuration);

var olEnrichMessage = new OlMessageEnrichment(_loggerFactory, _configuration);


// Validate the event
if (!validateOlEvent.Validate(_event))
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,20 @@ public OlToPurviewParsingService(ILoggerFactory loggerFactory, IConfiguration co
_logger.LogWarning($"OlToPurviewParsingService-GetPurviewFromOlEventAsync: Event data is not valid - eventData: {JsonConvert.SerializeObject(eventData)}");
return null;
}

IDatabricksToPurviewParser parser = new DatabricksToPurviewParser(_loggerFactory, _config, eventData);

if (eventData.IsInteractiveNotebook)
{
return ParseInteractiveNotebook(parser);


}

else if (parser.GetJobType() == JobType.JobNotebook)
{
return ParseJobNotebook(parser);
}

else
{
return ParseJobTask(parser);
Expand All @@ -71,7 +74,6 @@ private string ParseInteractiveNotebook(IDatabricksToPurviewParser parser)
var databricksWorkspace = parser.GetDatabricksWorkspace();
var databricksNotebook = parser.GetDatabricksNotebook(databricksWorkspace.Attributes.QualifiedName, true);
var databricksProcess = parser.GetDatabricksProcess(databricksNotebook.Attributes.QualifiedName);

var databricksWorkspaceStr = JsonConvert.SerializeObject(databricksWorkspace);
var databricksNotebookStr = JsonConvert.SerializeObject(databricksNotebook);
var databricksProcessStr = JsonConvert.SerializeObject(databricksProcess);
Expand Down
1 change: 1 addition & 0 deletions function-app/adb-to-purview/src/Functions/PurviewOut.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public async Task<string> Run(
_logger.LogInformation($"Start event, duplicate event, or no context found - eventData: {input}");
return "";
}

var purviewEvent = _olToPurviewParsingService.GetPurviewFromOlEvent(enrichedEvent);
if (purviewEvent == null)
{
Expand Down