@@ -185,15 +185,17 @@ private async Task<bool> JoinEventData(Event olEvent, string jobRunId)
185185 return false ;
186186 }
187187
188- TableEntity te ;
189- TableEntity te_inputs ;
188+ TableEntity te = null ;
189+ TableEntity te_inputs = null ;
190+
191+ bool ret_val = true ;
190192
191193 // Processing time can sometimes cause complete events
192194 int retryCount = 4 ;
193195 int currentRetry = 0 ;
194196 TimeSpan delay = TimeSpan . FromSeconds ( 1 ) ;
195197
196- while ( true )
198+ while ( ret_val )
197199 {
198200 try
199201 {
@@ -206,15 +208,16 @@ private async Task<bool> JoinEventData(Event olEvent, string jobRunId)
206208 _log . LogWarning ( $ "Start event was missing, retrying to consolidate message. Retry count: { currentRetry } ") ;
207209 if ( currentRetry > retryCount )
208210 {
209- return false ;
211+ ret_val = false ;
212+ break ;
210213 }
211214 }
212215 await Task . Delay ( delay ) ;
213216 }
214217
215218 // Get inputs. Todo: Check if more efficient to get inputs within the same while loop above. Can we get 2 entities at the same time?
216219 currentRetry = 0 ;
217- while ( true )
220+ while ( ret_val ) // ret_val instead of just true, because if didn't have the env_facet then don't bother getting inputs either
218221 {
219222 try
220223 {
@@ -227,7 +230,8 @@ private async Task<bool> JoinEventData(Event olEvent, string jobRunId)
227230 _log . LogWarning ( $ "Start event was missing, retrying to consolidate message to get inputs. Retry count: { currentRetry } ") ;
228231 if ( currentRetry > retryCount )
229232 {
230- return false ;
233+ ret_val = false ;
234+ break ;
231235 }
232236 }
233237 await Task . Delay ( delay ) ;
@@ -238,28 +242,53 @@ private async Task<bool> JoinEventData(Event olEvent, string jobRunId)
238242 if ( envFacet is null )
239243 {
240244 _log . LogWarning ( $ "OlMessageConsolodation-JoinEventData: Warning environment facet for COMPLETE event is null") ;
241- return false ;
245+ ret_val = false ;
242246 }
243247 olEvent . Run . Facets . EnvironmentProperties = envFacet ;
244248
245249 // Check if saved any inputs from the START event (will only be done for events containing DataSourceV2 sources)
246250 if ( te_inputs is not null ) {
247251 // TODO: Possible source of error.
248- var saved_inputs = JsonConvert . DeserializeObject < List < Inputs > > ( te_inputs [ "Inputs" ] . ToString ( ) ?? "" ) ;
252+ if ( te_inputs . ContainsKey ( "Inputs" ) ) {
253+ _log . LogInformation ( $ "New Code #1") ;
254+ try {
255+ var saved_inputs = JsonConvert . DeserializeObject < List < Inputs > > ( te_inputs [ "Inputs" ] . ToString ( ) ?? "" ) ;
256+ _log . LogInformation ( $ "New Code #1.1") ;
257+ _log . LogInformation ( "Inputs in dictionary? " , te_inputs . ContainsKey ( "Inputs" ) . ToString ( ) ) ;
258+
259+ if ( saved_inputs is null ) {
260+ // Unecessary check?
261+ _log . LogInformation ( $ "OlMessageConsolodation-JoinEventData: No inputs found for COMPLETE event") ;
262+ //ret_val = false;
263+ }
264+
265+ else {
266+ // Check inputs saved against inputs captured in this COMPLETE event and combine while removing any duplicates.
267+ // Checking for duplicates needed since we save all the inputs captured from the START event. Perhaps it may be better to
268+ // only save the DataSourceV2 inputs?
269+ var inputs = new List < Inputs > ( saved_inputs . Count + olEvent . Inputs . Count ) ;
270+ inputs . AddRange ( saved_inputs ) ;
271+ inputs . AddRange ( olEvent . Inputs ) ;
272+ var unique_inputs = inputs . Distinct ( ) ;
273+ olEvent . Inputs = unique_inputs . ToList ( ) ;
274+ _log . LogInformation ( $ "OlMessageConsolodation-JoinEventData: Captured inputs for COMPLETE event") ;
275+ }
276+ }
277+ catch ( System . Exception ex ) {
278+ _log . LogError ( ex , $ "OlMessageConsolodation-JoinEventData: Error { ex . Message } when deserializing inputs") ;
279+ ret_val = false ;
280+ }
281+
249282
250- if ( saved_inputs is null ) {
251- _log . LogWarning ( $ "OlMessageConsolodation-JoinEventData: Warning: no inputs found for datasource v2 COMPLETE event") ;
252- return false ;
283+
253284 }
254285
255- // Check inputs saved against inputs captured in this COMPLETE event and combine while removing any duplicates.
256- // Checking for duplicates needed since we save all the inputs captured from the START event. Perhaps it may be better to
257- // only save the DataSourceV2 inputs?
258- var inputs = new List < Inputs > ( saved_inputs . Count + olEvent . Inputs . Count ) ;
259- inputs . AddRange ( saved_inputs ) ;
260- inputs . AddRange ( olEvent . Inputs ) ;
261- var unique_inputs = inputs . Distinct ( ) ;
262- olEvent . Inputs = unique_inputs . ToList ( ) ;
286+ else {
287+ _log . LogInformation ( $ "New Code #2") ;
288+ _log . LogInformation ( $ "OlMessageConsolodation-JoinEventData: No inputs found for COMPLETE event") ;
289+ //ret_val = false;
290+ }
291+
263292 }
264293
265294 // clean up table over time.
@@ -275,10 +304,10 @@ private async Task<bool> JoinEventData(Event olEvent, string jobRunId)
275304 // If no inputs were saved from the start event, then we need to make sure we're only processing this COMPLETE event
276305 // if it has both inputs and outputs (reflects original logic, prior to supporting DataSourceV2 events)
277306 if ( te_inputs is null && ! ( olEvent . Inputs . Count > 0 && olEvent . Outputs . Count > 0 ) ) {
278- return false ;
307+ ret_val = false ;
279308 }
280309
281- return true ;
310+ return ret_val ;
282311 }
283312
284313 // Returns true if olEvent is of type START and has the environment facet
0 commit comments