22// Licensed under the MIT License.
33
44using System ;
5+ using System . Linq ;
56using System . Threading . Tasks ;
67using System . Threading ;
78using System . Collections . Generic ;
@@ -143,15 +144,12 @@ private async Task<bool> ProcessStartEvent(Event olEvent, string jobRunId, Envir
143144 }
144145 try
145146 {
146- if ( olEvent . Inputs . Count > 0 )
147+ if ( isDataSourceV2Event ( olEvent ) )
147148 // Store inputs and env facet.
148149 {
149150 var entity = new TableEntity ( TABLE_PARTITION , olEvent . Run . RunId )
150151 {
151152 { "EnvFacet" , JsonConvert . SerializeObject ( olEvent . Run . Facets . EnvironmentProperties ) } ,
152- // TODO: Add logic here to only save DataSourceV2 event inputs. Old logic of checking the outputs
153- // for datasourceV2 events is not right because the issue is with inputs, and we may miss them if
154- // the output was not a datasource v2 event.
155153 { "Inputs" , JsonConvert . SerializeObject ( olEvent . Inputs ) }
156154
157155 } ;
@@ -186,7 +184,7 @@ private async Task<bool> JoinEventData(Event olEvent, string jobRunId)
186184 {
187185 return false ;
188186 }
189-
187+
190188 TableEntity te ;
191189 TableEntity te_inputs ;
192190
@@ -253,28 +251,15 @@ private async Task<bool> JoinEventData(Event olEvent, string jobRunId)
253251 return false ;
254252 }
255253
256- // Check inputs saved against inputs captured in this COMPLETE event and combine, then remove any duplicates.
257- // Checking for duplicates may be overkill because we observed that DataSource V2 COMPLETE events do not show up in the inputs
258- // of the COMPLETE event, so in theory we would only have saved DataSource V2 inputs from the START event, and they would
259- // not show up in the COMPLETE event
254+ // Check inputs saved against inputs captured in this COMPLETE event and combine while removing any duplicates.
255+ // Checking for duplicates needed since we save all the inputs captured from the START event. Perhaps it may be better to
256+ // only save the DataSourceV2 inputs?
260257 var inputs = new List < Inputs > ( saved_inputs . Count + olEvent . Inputs . Count ) ;
261-
262258 inputs . AddRange ( saved_inputs ) ;
263259 inputs . AddRange ( olEvent . Inputs ) ;
264-
265- // TODO: Come back to this
266-
260+ var unique_inputs = inputs . Distinct ( ) ;
261+ olEvent . Inputs = unique_inputs . ToList ( ) ;
267262 }
268- // if (olEvent.Inputs.Count == 0) {
269- // var inputs = JsonConvert.DeserializeObject<List<Inputs>>(te_inputs["Inputs"].ToString() ?? "");
270-
271- // if (inputs is null)
272- // {
273- // _log.LogWarning($"OlMessageConsolodation-JoinEventData: Warning: no inputs found for datasource v2 COMPLETE event");
274- // return false;
275- // }
276- olEvent . Inputs = inputs ;
277- // }
278263
279264 // clean up table over time.
280265 try
@@ -286,6 +271,12 @@ private async Task<bool> JoinEventData(Event olEvent, string jobRunId)
286271 _log . LogError ( ex , $ "OlMessageConsolodation-JoinEventData: Error { ex . Message } when deleting entity") ;
287272 }
288273
274+ // If no inputs were saved from the start event, then we need to make sure we're only processing this COMPLETE event
275+ // if it has both inputs and outputs (reflects original logic, prior to supporting DataSourceV2 events)
276+ if ( te_inputs is null && ! ( olEvent . Inputs . Count > 0 && olEvent . Outputs . Count > 0 ) ) {
277+ return false ;
278+ }
279+
289280 return true ;
290281 }
291282
@@ -302,13 +293,13 @@ private bool IsStartEventEnvironment(Event olEvent)
302293
303294 /// <summary>
304295 /// Helper function to determine if the event is one of
305- /// the data source v2 ones which need to aggregate data
306- /// from the start and complete events
296+ /// the data source v2 ones which needs us to save the
297+ /// inputs from the start event
307298 /// </summary>
308299 private bool isDataSourceV2Event ( Event olEvent ) {
309300 string [ ] special_cases = { "azurecosmos://" , "iceberg://" } ; // todo: make this configurable?
310- // Don't need to process START events here as they have both inputs and outputs
311- if ( olEvent . EventType == "START " ) return false ;
301+ // // Don't need to process START events here as they have both inputs and outputs
302+ // if (olEvent.EventType == "COMPLETE ") return false;
312303
313304 foreach ( var outp in olEvent . Outputs )
314305 {
@@ -324,7 +315,7 @@ private bool IsJoinEvent(Event olEvent)
324315 {
325316 if ( olEvent . EventType == COMPLETE_EVENT )
326317 {
327- if ( ( olEvent . Inputs . Count > 0 && olEvent . Outputs . Count > 0 ) || ( olEvent . Outputs . Count > 0 && isDataSourceV2Event ( olEvent ) ) )
318+ if ( olEvent . Outputs . Count > 0 )
328319 {
329320 return true ;
330321 }
0 commit comments