@@ -418,7 +418,7 @@ async fn handle_core(mut socket: WebSocket, feature_flags: FeatureFlags) {
418418 _ = active_execution_gc_interval. tick( ) => {
419419 active_executions = mem:: take( & mut active_executions)
420420 . into_iter( )
421- . filter( |( _id, ( _, tx) ) | !tx. is_closed( ) )
421+ . filter( |( _id, ( _, tx) ) | tx . as_ref ( ) . map_or ( false , |tx| !tx. is_closed( ) ) )
422422 . collect( ) ;
423423 } ,
424424
@@ -489,7 +489,7 @@ async fn handle_msg(
489489 txt : String ,
490490 tx : & ResponseTx ,
491491 manager : & mut CoordinatorManager ,
492- active_executions : & mut BTreeMap < i64 , ( CancellationToken , mpsc:: Sender < String > ) > ,
492+ active_executions : & mut BTreeMap < i64 , ( CancellationToken , Option < mpsc:: Sender < String > > ) > ,
493493) {
494494 use WSMessageRequest :: * ;
495495
@@ -500,7 +500,7 @@ async fn handle_msg(
500500 let token = CancellationToken :: new ( ) ;
501501 let ( execution_tx, execution_rx) = mpsc:: channel ( 8 ) ;
502502
503- active_executions. insert ( meta. sequence_number , ( token. clone ( ) , execution_tx) ) ;
503+ active_executions. insert ( meta. sequence_number , ( token. clone ( ) , Some ( execution_tx) ) ) ;
504504
505505 // TODO: Should a single execute / build / etc. session have a timeout of some kind?
506506 let spawned = manager
@@ -520,7 +520,7 @@ async fn handle_msg(
520520 }
521521
522522 Ok ( ExecuteStdin { payload, meta } ) => {
523- let Some ( ( _, execution_tx) ) = active_executions. get ( & meta. sequence_number ) else {
523+ let Some ( ( _, Some ( execution_tx) ) ) = active_executions. get ( & meta. sequence_number ) else {
524524 warn ! ( "Received stdin for an execution that is no longer active" ) ;
525525 return ;
526526 } ;
@@ -536,14 +536,20 @@ async fn handle_msg(
536536 }
537537
538538 Ok ( ExecuteStdinClose { meta } ) => {
539- let execution_tx = active_executions. remove ( & meta. sequence_number ) ;
540- drop ( execution_tx) ; // Signal closed
539+ let Some ( ( _, execution_tx) ) = active_executions. get_mut ( & meta. sequence_number ) else {
540+ warn ! ( "Received stdin close for an execution that is no longer active" ) ;
541+ return ;
542+ } ;
543+
544+ * execution_tx = None ; // Drop to signal closed
541545 }
542546
543547 Ok ( ExecuteKill { meta } ) => {
544- if let Some ( ( token, _) ) = active_executions. remove ( & meta. sequence_number ) {
545- token. cancel ( ) ;
546- }
548+ let Some ( ( token, _) ) = active_executions. get ( & meta. sequence_number ) else {
549+ warn ! ( "Received kill for an execution that is no longer active" ) ;
550+ return ;
551+ } ;
552+ token. cancel ( ) ;
547553 }
548554
549555 Err ( e) => {
0 commit comments