11use futures:: {
22 future:: { BoxFuture , OptionFuture } ,
33 stream:: BoxStream ,
4- Future , FutureExt , StreamExt ,
4+ Future , FutureExt , Stream , StreamExt ,
55} ;
66use once_cell:: sync:: Lazy ;
77use serde:: Deserialize ;
@@ -26,7 +26,7 @@ use tokio::{
2626} ;
2727use tokio_stream:: wrappers:: ReceiverStream ;
2828use tokio_util:: { io:: SyncIoBridge , sync:: CancellationToken } ;
29- use tracing:: { instrument, trace, trace_span, warn, Instrument } ;
29+ use tracing:: { info_span , instrument, trace, trace_span, warn, Instrument } ;
3030
3131use crate :: {
3232 bincode_input_closed,
@@ -764,8 +764,24 @@ impl<T> WithOutput<T> {
764764 where
765765 F : Future < Output = Result < T , E > > ,
766766 {
767- let stdout = ReceiverStream :: new ( stdout_rx) . collect ( ) ;
768- let stderr = ReceiverStream :: new ( stderr_rx) . collect ( ) ;
767+ Self :: try_absorb_stream (
768+ task,
769+ ReceiverStream :: new ( stdout_rx) ,
770+ ReceiverStream :: new ( stderr_rx) ,
771+ )
772+ . await
773+ }
774+
775+ async fn try_absorb_stream < F , E > (
776+ task : F ,
777+ stdout_rx : impl Stream < Item = String > ,
778+ stderr_rx : impl Stream < Item = String > ,
779+ ) -> Result < WithOutput < T > , E >
780+ where
781+ F : Future < Output = Result < T , E > > ,
782+ {
783+ let stdout = stdout_rx. collect ( ) ;
784+ let stderr = stderr_rx. collect ( ) ;
769785
770786 let ( result, stdout, stderr) = join ! ( task, stdout, stderr) ;
771787 let response = result?;
@@ -815,6 +831,15 @@ pub struct Coordinator<B> {
815831 token : CancellationToken ,
816832}
817833
834+ /// Runs things.
835+ ///
836+ /// # Liveness concerns
837+ ///
838+ /// If you use one of the streaming versions (e.g. `begin_execute`),
839+ /// you need to make sure that the stdout / stderr / status channels
840+ /// are continuously read from or dropped completely. If not, one
841+ /// channel can fill up, preventing the other channels from receiving
842+ /// data as well.
818843impl < B > Coordinator < B >
819844where
820845 B : Backend ,
@@ -2610,6 +2635,9 @@ fn spawn_io_queue(stdin: ChildStdin, stdout: ChildStdout, token: CancellationTok
26102635
26112636 let ( tx, from_worker_rx) = mpsc:: channel ( 8 ) ;
26122637 tasks. spawn_blocking ( move || {
2638+ let span = info_span ! ( "child_io_queue::input" ) ;
2639+ let _span = span. enter ( ) ;
2640+
26132641 let stdout = SyncIoBridge :: new ( stdout) ;
26142642 let mut stdout = BufReader :: new ( stdout) ;
26152643
@@ -2632,6 +2660,9 @@ fn spawn_io_queue(stdin: ChildStdin, stdout: ChildStdout, token: CancellationTok
26322660
26332661 let ( to_worker_tx, mut rx) = mpsc:: channel ( 8 ) ;
26342662 tasks. spawn_blocking ( move || {
2663+ let span = info_span ! ( "child_io_queue::output" ) ;
2664+ let _span = span. enter ( ) ;
2665+
26352666 let stdin = SyncIoBridge :: new ( stdin) ;
26362667 let mut stdin = BufWriter :: new ( stdin) ;
26372668
@@ -3182,33 +3213,35 @@ mod tests {
31823213 let ActiveExecution {
31833214 task,
31843215 stdin_tx : _,
3185- mut stdout_rx,
3216+ stdout_rx,
31863217 stderr_rx,
31873218 status_rx : _,
31883219 } = coordinator
31893220 . begin_execute ( token. clone ( ) , request)
31903221 . await
31913222 . unwrap ( ) ;
31923223
3193- // Wait for some output before killing
3194- let early_stdout = stdout_rx . recv ( ) . with_timeout ( ) . await . unwrap ( ) ;
3224+ let stdout_rx = ReceiverStream :: new ( stdout_rx ) ;
3225+ let stderr_rx = ReceiverStream :: new ( stderr_rx ) ;
31953226
3196- token. cancel ( ) ;
3227+ // We (a) want to wait for some output before we try to
3228+ // kill the process and (b) need to keep pumping stdout /
3229+ // stderr / status to avoid locking up the output.
3230+ let stdout_rx = stdout_rx. inspect ( |_| token. cancel ( ) ) ;
31973231
31983232 let WithOutput {
31993233 response,
32003234 stdout,
32013235 stderr,
3202- } = WithOutput :: try_absorb ( task, stdout_rx, stderr_rx)
3236+ } = WithOutput :: try_absorb_stream ( task, stdout_rx, stderr_rx)
32033237 . with_timeout ( )
32043238 . await
32053239 . unwrap ( ) ;
32063240
32073241 assert ! ( !response. success, "{stderr}" ) ;
32083242 assert_contains ! ( response. exit_detail, "kill" ) ;
32093243
3210- assert_contains ! ( early_stdout, "Before" ) ;
3211- assert_not_contains ! ( stdout, "Before" ) ;
3244+ assert_contains ! ( stdout, "Before" ) ;
32123245 assert_not_contains ! ( stdout, "After" ) ;
32133246
32143247 coordinator. shutdown ( ) . await ?;
0 commit comments