Skip to content

Commit 12d171e

Browse files
committed
Address flaky execute_kill test
The primary loop in `spawn_cargo_task` works by getting messages from the worker and then handing them off to the stdout / stderr / status channels. If one channel is full, then everything will queue behind it, resulting in all work stopping. The test was waiting on the stdout channel and had dropped the status channel, but nothing was processing the stderr channel. This means that if enough[^1] stderr packets had been delivered before any stdout, the test would fail. To reproduce this, you can hack the loop in `spawn_cargo_task` so that it delivers every `stderr` packet multiple times, forcing the channel to be filled immediately. [^1]: Over eight with the current setup.
1 parent 9c067cd commit 12d171e

File tree

1 file changed

+37
-10
lines changed

1 file changed

+37
-10
lines changed

compiler/base/orchestrator/src/coordinator.rs

Lines changed: 37 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use futures::{
22
future::{BoxFuture, OptionFuture},
33
stream::BoxStream,
4-
Future, FutureExt, StreamExt,
4+
Future, FutureExt, Stream, StreamExt,
55
};
66
use once_cell::sync::Lazy;
77
use serde::Deserialize;
@@ -764,8 +764,24 @@ impl<T> WithOutput<T> {
764764
where
765765
F: Future<Output = Result<T, E>>,
766766
{
767-
let stdout = ReceiverStream::new(stdout_rx).collect();
768-
let stderr = ReceiverStream::new(stderr_rx).collect();
767+
Self::try_absorb_stream(
768+
task,
769+
ReceiverStream::new(stdout_rx),
770+
ReceiverStream::new(stderr_rx),
771+
)
772+
.await
773+
}
774+
775+
async fn try_absorb_stream<F, E>(
776+
task: F,
777+
stdout_rx: impl Stream<Item = String>,
778+
stderr_rx: impl Stream<Item = String>,
779+
) -> Result<WithOutput<T>, E>
780+
where
781+
F: Future<Output = Result<T, E>>,
782+
{
783+
let stdout = stdout_rx.collect();
784+
let stderr = stderr_rx.collect();
769785

770786
let (result, stdout, stderr) = join!(task, stdout, stderr);
771787
let response = result?;
@@ -815,6 +831,15 @@ pub struct Coordinator<B> {
815831
token: CancellationToken,
816832
}
817833

834+
/// Runs things.
835+
///
836+
/// # Liveness concerns
837+
///
838+
/// If you use one of the streaming versions (e.g. `begin_execute`),
839+
/// you need to make sure that the stdout / stderr / status channels
840+
/// are continuously read from or dropped completely. If not, one
841+
/// channel can fill up, preventing the other channels from receiving
842+
/// data as well.
818843
impl<B> Coordinator<B>
819844
where
820845
B: Backend,
@@ -3188,33 +3213,35 @@ mod tests {
31883213
let ActiveExecution {
31893214
task,
31903215
stdin_tx: _,
3191-
mut stdout_rx,
3216+
stdout_rx,
31923217
stderr_rx,
31933218
status_rx: _,
31943219
} = coordinator
31953220
.begin_execute(token.clone(), request)
31963221
.await
31973222
.unwrap();
31983223

3199-
// Wait for some output before killing
3200-
let early_stdout = stdout_rx.recv().with_timeout().await.unwrap();
3224+
let stdout_rx = ReceiverStream::new(stdout_rx);
3225+
let stderr_rx = ReceiverStream::new(stderr_rx);
32013226

3202-
token.cancel();
3227+
// We (a) want to wait for some output before we try to
3228+
// kill the process and (b) need to keep pumping stdout /
3229+
// stderr / status to avoid locking up the output.
3230+
let stdout_rx = stdout_rx.inspect(|_| token.cancel());
32033231

32043232
let WithOutput {
32053233
response,
32063234
stdout,
32073235
stderr,
3208-
} = WithOutput::try_absorb(task, stdout_rx, stderr_rx)
3236+
} = WithOutput::try_absorb_stream(task, stdout_rx, stderr_rx)
32093237
.with_timeout()
32103238
.await
32113239
.unwrap();
32123240

32133241
assert!(!response.success, "{stderr}");
32143242
assert_contains!(response.exit_detail, "kill");
32153243

3216-
assert_contains!(early_stdout, "Before");
3217-
assert_not_contains!(stdout, "Before");
3244+
assert_contains!(stdout, "Before");
32183245
assert_not_contains!(stdout, "After");
32193246

32203247
coordinator.shutdown().await?;

0 commit comments

Comments
 (0)