@@ -6,7 +6,7 @@ use crate::{
66} ;
77
88use axum:: extract:: ws:: { Message , WebSocket } ;
9- use futures:: { Future , FutureExt , StreamExt , TryFutureExt } ;
9+ use futures:: { future :: Fuse , Future , FutureExt , StreamExt , TryFutureExt } ;
1010use orchestrator:: {
1111 coordinator:: { self , Coordinator , DockerBackend } ,
1212 DropErrorDetailsExt ,
@@ -16,6 +16,7 @@ use std::{
1616 collections:: BTreeMap ,
1717 convert:: TryFrom ,
1818 mem,
19+ pin:: pin,
1920 sync:: {
2021 atomic:: { AtomicU64 , Ordering } ,
2122 Arc ,
@@ -351,9 +352,8 @@ async fn handle_core(mut socket: WebSocket, feature_flags: FeatureFlags) {
351352 }
352353
353354 let mut manager = CoordinatorManager :: new ( ) . await ;
354- tokio:: pin! {
355- let session_timeout = time:: sleep( CoordinatorManager :: SESSION_TIMEOUT ) ;
356- }
355+ let mut session_timeout = pin ! ( time:: sleep( CoordinatorManager :: SESSION_TIMEOUT ) ) ;
356+ let mut idle_timeout = pin ! ( Fuse :: terminated( ) ) ;
357357
358358 let mut active_executions = BTreeMap :: new ( ) ;
359359 let mut active_execution_gc_interval = time:: interval ( Duration :: from_secs ( 30 ) ) ;
@@ -394,6 +394,12 @@ async fn handle_core(mut socket: WebSocket, feature_flags: FeatureFlags) {
394394
395395 // We don't care if there are no running tasks
396396 Some ( task) = manager. join_next( ) => {
397+ // The last task has completed which means we are a
398+ // candidate for idling in a little while.
399+ if manager. is_empty( ) {
400+ idle_timeout. set( time:: sleep( CoordinatorManager :: IDLE_TIMEOUT ) . fuse( ) ) ;
401+ }
402+
397403 let ( error, meta) = match task {
398404 Ok ( Ok ( ( ) ) ) => continue ,
399405 Ok ( Err ( error) ) => error,
@@ -425,7 +431,7 @@ async fn handle_core(mut socket: WebSocket, feature_flags: FeatureFlags) {
425431 . collect( ) ;
426432 } ,
427433
428- _ = time :: sleep ( CoordinatorManager :: IDLE_TIMEOUT ) , if manager. is_empty( ) => {
434+ _ = & mut idle_timeout , if manager. is_empty( ) => {
429435 let idled = manager. idle( ) . await . context( StreamingCoordinatorIdleSnafu ) ;
430436
431437 let Err ( error) = idled else { continue } ;
0 commit comments