Skip to content

Commit c5d5f3a

Browse files
committed
Prevent massive amounts of outgoing stderr / stdout
We've noticed some bad resource usage patterns in production which manifests as 100% CPU usage, split evenly between system and user. My hypothesis is that people have written some infinite loop that prints output. We are able to transfer that data from the process to the worker to the coordinator/ui to nginx to the user. However, the user can't actually consume 10+ MiB of output. There's also a possibility that Docker has effectively an infinite buffer for stdout/stderr. In that case, we might just be bloating up memory somewhere. This is intended as a first step, to find people that are legitimately trying to produce a bunch of output. From there, we could potentially tweak the code to allow people to opt-in to generating even more data.
1 parent 2ba73e4 commit c5d5f3a

File tree

2 files changed

+74
-4
lines changed

2 files changed

+74
-4
lines changed

compiler/base/orchestrator/src/coordinator.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4055,6 +4055,39 @@ mod tests {
40554055
Ok(())
40564056
}
40574057

4058+
#[tokio::test]
4059+
#[snafu::report]
4060+
async fn amount_of_output_is_limited() -> Result<()> {
4061+
// The limits are only applied to the container
4062+
let coordinator = new_coordinator_docker().await;
4063+
4064+
let req = ExecuteRequest {
4065+
code: r##"
4066+
use std::io::Write;
4067+
4068+
fn main() {
4069+
let a = "a".repeat(1024);
4070+
let out = std::io::stdout();
4071+
let mut out = out.lock();
4072+
loop {//for _ in 0..1_000_000 {
4073+
let _ = out.write_all(a.as_bytes());
4074+
let _ = out.write_all(b"\n");
4075+
}
4076+
}
4077+
"##
4078+
.into(),
4079+
..new_execution_limited_request()
4080+
};
4081+
4082+
let err = coordinator.execute(req).with_timeout().await.unwrap_err();
4083+
let err = snafu::ChainCompat::new(&err).last().unwrap();
4084+
assert_contains!(err.to_string(), "bytes of output, exiting");
4085+
4086+
coordinator.shutdown().await?;
4087+
4088+
Ok(())
4089+
}
4090+
40584091
static TIMEOUT: Lazy<Duration> = Lazy::new(|| {
40594092
let millis = env::var("TESTS_TIMEOUT_MS")
40604093
.ok()

compiler/base/orchestrator/src/worker.rs

Lines changed: 41 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -612,24 +612,43 @@ async fn process_end(
612612
) -> Result<ExecuteCommandResponse, ProcessError> {
613613
use process_error::*;
614614

615-
select! {
616-
() = token.cancelled() => child.kill().await.context(KillChildSnafu)?,
617-
_ = child.wait() => {},
615+
let mut killed = false;
616+
617+
let status = loop {
618+
select! {
619+
// The user requested that the process be killed
620+
() = token.cancelled(), if !killed => {
621+
child.kill().await.context(KillChildSnafu)?;
622+
killed = true;
623+
},
624+
625+
// The process exited normally
626+
status = child.wait() => break status,
627+
628+
// One of our tasks exited unexpectedly
629+
// TODO: dedupe errors or fully split them
630+
Some(task) = task_set.join_next() => {
631+
task.context(StdioTaskPanickedSnafu)?
632+
.context(StdioTaskFailedSnafu)?;
633+
},
634+
};
618635
};
619636

620-
let status = child.wait().await.context(WaitChildSnafu)?;
637+
let status = status.context(WaitChildSnafu)?;
621638

622639
stdin_shutdown_tx
623640
.send(job_id)
624641
.await
625642
.drop_error_details()
626643
.context(UnableToSendStdinShutdownSnafu)?;
627644

645+
// Check any remaining tasks to see if they had an error
628646
while let Some(task) = task_set.join_next().await {
629647
task.context(StdioTaskPanickedSnafu)?
630648
.context(StdioTaskFailedSnafu)?;
631649
}
632650

651+
// TODO: check this for death earlier?
633652
statistics_task
634653
.await
635654
.context(StatisticsTaskPanickedSnafu)?
@@ -1213,6 +1232,8 @@ mod test {
12131232
}
12141233
}
12151234

1235+
const OUTPUT_BYTE_LIMIT: usize = 640 * 1024;
1236+
12161237
async fn copy_child_output(
12171238
output: impl AsyncRead + Unpin,
12181239
coordinator_tx: MultiplexingSender,
@@ -1221,17 +1242,28 @@ async fn copy_child_output(
12211242
use copy_child_output_error::*;
12221243

12231244
let mut buf = Utf8BufReader::new(output);
1245+
let mut n_total_bytes: usize = 0;
12241246

12251247
while let Some(buffer) = buf.next().await.context(UnableToReadSnafu)? {
1248+
let n_bytes = buffer.len();
1249+
12261250
coordinator_tx
12271251
.send_ok(xform(buffer))
12281252
.await
12291253
.context(UnableToSendSnafu)?;
1254+
1255+
n_total_bytes = n_total_bytes.saturating_add(n_bytes);
1256+
ensure!(
1257+
n_total_bytes <= OUTPUT_BYTE_LIMIT,
1258+
TooManyBytesSnafu { n_total_bytes }
1259+
);
12301260
}
12311261

12321262
Ok(())
12331263
}
12341264

1265+
const BYTE_LIMIT_URL: &str = "https://github.com/rust-lang/rust-playground/discussions/1027";
1266+
12351267
#[derive(Debug, Snafu)]
12361268
#[snafu(module)]
12371269
pub enum CopyChildOutputError {
@@ -1240,6 +1272,11 @@ pub enum CopyChildOutputError {
12401272

12411273
#[snafu(display("Failed to send output packet"))]
12421274
UnableToSend { source: MultiplexingSenderError },
1275+
1276+
#[snafu(display(
1277+
"Generated {n_total_bytes} bytes of output, exiting (640K ought to be enough for anybody). If this was not an accident, tell us more at {BYTE_LIMIT_URL}"
1278+
))]
1279+
TooManyBytes { n_total_bytes: usize },
12431280
}
12441281

12451282
// stdin/out <--> messages.

0 commit comments

Comments
 (0)