11use futures:: {
22 future:: { BoxFuture , OptionFuture } ,
3- Future , FutureExt ,
3+ stream:: BoxStream ,
4+ Future , FutureExt , StreamExt ,
45} ;
56use once_cell:: sync:: Lazy ;
67use serde:: Deserialize ;
@@ -23,7 +24,7 @@ use tokio::{
2324 task:: { JoinHandle , JoinSet } ,
2425 time:: { self , MissedTickBehavior } ,
2526} ;
26- use tokio_stream:: { wrappers:: ReceiverStream , StreamExt } ;
27+ use tokio_stream:: wrappers:: ReceiverStream ;
2728use tokio_util:: { io:: SyncIoBridge , sync:: CancellationToken } ;
2829use tracing:: { instrument, trace, trace_span, warn, Instrument } ;
2930
@@ -416,6 +417,25 @@ impl CargoTomlModifier for ExecuteRequest {
416417 }
417418}
418419
420+ #[ derive( Debug , Clone ) ]
421+ pub struct ExecuteStatus {
422+ pub resident_set_size_bytes : u64 ,
423+ pub total_time_secs : f64 ,
424+ }
425+
426+ impl From < CommandStatistics > for ExecuteStatus {
427+ fn from ( value : CommandStatistics ) -> Self {
428+ let CommandStatistics {
429+ total_time_secs,
430+ resident_set_size_bytes,
431+ } = value;
432+ Self {
433+ resident_set_size_bytes,
434+ total_time_secs,
435+ }
436+ }
437+ }
438+
419439#[ derive( Debug , Clone ) ]
420440pub struct ExecuteResponse {
421441 pub success : bool ,
@@ -1257,6 +1277,19 @@ impl Container {
12571277 }
12581278 . boxed ( ) ;
12591279
1280+ let status_rx = tokio_stream:: wrappers:: ReceiverStream :: new ( status_rx)
1281+ . map ( |s| {
1282+ let CommandStatistics {
1283+ total_time_secs,
1284+ resident_set_size_bytes,
1285+ } = s;
1286+ ExecuteStatus {
1287+ resident_set_size_bytes,
1288+ total_time_secs,
1289+ }
1290+ } )
1291+ . boxed ( ) ;
1292+
12601293 Ok ( ActiveExecution {
12611294 task,
12621295 stdin_tx,
@@ -1781,7 +1814,7 @@ pub struct ActiveExecution {
17811814 pub stdin_tx : mpsc:: Sender < String > ,
17821815 pub stdout_rx : mpsc:: Receiver < String > ,
17831816 pub stderr_rx : mpsc:: Receiver < String > ,
1784- pub status_rx : mpsc :: Receiver < CommandStatistics > ,
1817+ pub status_rx : BoxStream < ' static , ExecuteStatus > ,
17851818}
17861819
17871820impl fmt:: Debug for ActiveExecution {
@@ -3197,19 +3230,13 @@ mod tests {
31973230 stdin_tx : _,
31983231 stdout_rx,
31993232 stderr_rx,
3200- mut status_rx,
3233+ status_rx,
32013234 } = coordinator
32023235 . begin_execute ( token. clone ( ) , request)
32033236 . await
32043237 . unwrap ( ) ;
32053238
3206- let statuses = async {
3207- let mut statuses = Vec :: new ( ) ;
3208- while let Some ( s) = status_rx. recv ( ) . await {
3209- statuses. push ( s) ;
3210- }
3211- statuses
3212- } ;
3239+ let statuses = status_rx. collect :: < Vec < _ > > ( ) ;
32133240
32143241 let output = WithOutput :: try_absorb ( task, stdout_rx, stderr_rx) ;
32153242
0 commit comments