Skip to content

Commit 7f24fa2

Browse files
committed
WIP - found flaws in DataSourceV2 events logic
1 parent ba6c4aa commit 7f24fa2

File tree

1 file changed

+29
-9
lines changed

1 file changed

+29
-9
lines changed

function-app/adb-to-purview/src/Function.Domain/Helpers/OlProcessing/OlMessageConsolodation.cs

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,9 @@ private async Task<bool> ProcessStartEvent(Event olEvent, string jobRunId, Envir
149149
var entity = new TableEntity(TABLE_PARTITION, olEvent.Run.RunId)
150150
{
151151
{ "EnvFacet", JsonConvert.SerializeObject(olEvent.Run.Facets.EnvironmentProperties) },
152+
// TODO: Add logic here to only save DataSourceV2 event inputs. Old logic of checking the outputs
153+
// for datasourceV2 events is not right because the issue is with inputs, and we may miss them if
154+
// the output was not a datasource v2 event.
152155
{ "Inputs", JsonConvert.SerializeObject(olEvent.Inputs) }
153156

154157
};
@@ -217,7 +220,6 @@ private async Task<bool> JoinEventData(Event olEvent, string jobRunId)
217220
{
218221
try
219222
{
220-
_log.LogInformation("Trying to get inputs");
221223
te_inputs = await _tableClient.GetEntityAsync<TableEntity>(TABLE_PARTITION, olEvent.Run.RunId, new string[] { "Inputs" });
222224
break;
223225
}
@@ -242,18 +244,37 @@ private async Task<bool> JoinEventData(Event olEvent, string jobRunId)
242244
}
243245
olEvent.Run.Facets.EnvironmentProperties = envFacet;
244246

245-
// Add Inputs to event if not already there (will only be done for DataSourceV2 sources)
246-
if (olEvent.Inputs.Count == 0) {
247-
var inputs = JsonConvert.DeserializeObject<List<Inputs>>(te_inputs["Inputs"].ToString() ?? "");
247+
// Check if saved any inputs from the START event (will only be done for events containing DataSourceV2 sources)
248+
if (te_inputs is not null) {
249+
var saved_inputs = JsonConvert.DeserializeObject<List<Inputs>>(te_inputs["Inputs"].ToString() ?? "");
248250

249-
if (inputs is null)
250-
{
251+
if (saved_inputs is null) {
251252
_log.LogWarning($"OlMessageConsolodation-JoinEventData: Warning: no inputs found for datasource v2 COMPLETE event");
252253
return false;
253-
}
254-
olEvent.Inputs = inputs;
254+
}
255+
256+
// Check inputs saved against inputs captured in this COMPLETE event and combine, then remove any duplicates.
257+
// Checking for duplicates may be overkill because we observed that DataSource V2 COMPLETE events do not show up in the inputs
258+
// of the COMPLETE event, so in theory we would only have saved DataSource V2 inputs from the START event, and they would
259+
// not show up in the COMPLETE event
260+
var inputs = new List<Inputs>(saved_inputs.Count + olEvent.Inputs.Count);
261+
262+
inputs.AddRange(saved_inputs);
263+
inputs.AddRange(olEvent.Inputs);
264+
265+
// TODO: Come back to this
255266

256267
}
268+
// if (olEvent.Inputs.Count == 0) {
269+
// var inputs = JsonConvert.DeserializeObject<List<Inputs>>(te_inputs["Inputs"].ToString() ?? "");
270+
271+
// if (inputs is null)
272+
// {
273+
// _log.LogWarning($"OlMessageConsolodation-JoinEventData: Warning: no inputs found for datasource v2 COMPLETE event");
274+
// return false;
275+
// }
276+
olEvent.Inputs = inputs;
277+
// }
257278

258279
// clean up table over time.
259280
try
@@ -301,7 +322,6 @@ private bool isDataSourceV2Event(Event olEvent) {
301322

302323
private bool IsJoinEvent(Event olEvent)
303324
{
304-
string[] special_cases = {"cosmos", "iceberg"};
305325
if (olEvent.EventType == COMPLETE_EVENT)
306326
{
307327
if ((olEvent.Inputs.Count > 0 && olEvent.Outputs.Count > 0) || (olEvent.Outputs.Count > 0 && isDataSourceV2Event(olEvent)))

0 commit comments

Comments
 (0)