Skip to content

Commit 386eb18

Browse files
authored
Merge pull request #1029 from rust-lang/limit-output
Track WebSocket meta information for errors
2 parents 02674fc + c5d5f3a commit 386eb18

File tree

10 files changed

+248
-51
lines changed

10 files changed

+248
-51
lines changed

compiler/base/orchestrator/Cargo.lock

Lines changed: 20 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

compiler/base/orchestrator/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ once_cell = { version = "1.18.0", default-features = false, features = ["std"] }
1515
serde = { version = "1.0", default-features = false, features = ["derive"] }
1616
serde_json = { version = "1.0.108", default-features = false, features = ["std"] }
1717
snafu = { version = "0.8.0", default-features = false, features = ["futures", "std"] }
18+
strum_macros = { version = "0.25.3", default-features = false }
1819
tokio = { version = "1.28", default-features = false, features = ["fs", "io-std", "io-util", "macros", "process", "rt", "time", "sync"] }
1920
tokio-stream = { version = "0.1.14", default-features = false }
2021
tokio-util = { version = "0.7.8", default-features = false, features = ["io", "io-util"] }

compiler/base/orchestrator/src/coordinator.rs

Lines changed: 79 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use crate::{
3333
message::{
3434
CommandStatistics, CoordinatorMessage, DeleteFileRequest, ExecuteCommandRequest,
3535
ExecuteCommandResponse, JobId, Multiplexed, OneToOneResponse, ReadFileRequest,
36-
ReadFileResponse, SerializedError, WorkerMessage, WriteFileRequest,
36+
ReadFileResponse, SerializedError2, WorkerMessage, WriteFileRequest,
3737
},
3838
DropErrorDetailsExt,
3939
};
@@ -1754,16 +1754,29 @@ impl Container {
17541754
WorkerMessage::ExecuteCommand(resp) => {
17551755
return Ok(resp);
17561756
}
1757+
17571758
WorkerMessage::StdoutPacket(packet) => {
17581759
stdout_tx.send(packet).await.ok(/* Receiver gone, that's OK */);
17591760
}
1761+
17601762
WorkerMessage::StderrPacket(packet) => {
17611763
stderr_tx.send(packet).await.ok(/* Receiver gone, that's OK */);
17621764
}
1765+
17631766
WorkerMessage::CommandStatistics(stats) => {
17641767
status_tx.send(stats).await.ok(/* Receiver gone, that's OK */);
17651768
}
1766-
_ => return UnexpectedMessageSnafu.fail(),
1769+
1770+
WorkerMessage::Error(e) =>
1771+
return Err(SerializedError2::adapt(e)).context(WorkerSnafu),
1772+
1773+
WorkerMessage::Error2(e) =>
1774+
return Err(e).context(WorkerSnafu),
1775+
1776+
_ => {
1777+
let message = container_msg.as_ref();
1778+
return UnexpectedMessageSnafu { message }.fail()
1779+
},
17671780
}
17681781
},
17691782

@@ -2084,8 +2097,11 @@ pub enum SpawnCargoError {
20842097
#[snafu(display("Could not start Cargo"))]
20852098
CouldNotStartCargo { source: CommanderError },
20862099

2087-
#[snafu(display("Received an unexpected message"))]
2088-
UnexpectedMessage,
2100+
#[snafu(display("The worker operation failed"))]
2101+
Worker { source: SerializedError2 },
2102+
2103+
#[snafu(display("Received the unexpected message `{message}`"))]
2104+
UnexpectedMessage { message: String },
20892105

20902106
#[snafu(display("There are no more messages"))]
20912107
UnexpectedEndOfMessages,
@@ -2328,7 +2344,7 @@ impl Commander {
23282344
where
23292345
M: Into<CoordinatorMessage>,
23302346
M: OneToOneResponse,
2331-
Result<M::Response, SerializedError>: TryFrom<WorkerMessage>,
2347+
Result<M::Response, SerializedError2>: TryFrom<WorkerMessage>,
23322348
{
23332349
use commander_error::*;
23342350

@@ -2346,9 +2362,8 @@ impl Commander {
23462362
.await
23472363
.context(UnableToReceiveFromDemultiplexerSnafu)?;
23482364

2349-
match msg.try_into() {
2350-
Ok(Ok(v)) => Ok(v),
2351-
Ok(Err(e)) => WorkerOperationFailedSnafu { text: e.0 }.fail(),
2365+
match <Result<_, _>>::try_from(msg) {
2366+
Ok(v) => v.context(WorkerOperationFailedSnafu),
23522367
Err(_) => UnexpectedResponseTypeSnafu.fail(),
23532368
}
23542369
}
@@ -2401,8 +2416,8 @@ pub enum CommanderError {
24012416
#[snafu(display("Did not receive the expected response type from the worker"))]
24022417
UnexpectedResponseType,
24032418

2404-
#[snafu(display("The worker operation failed: {text}"))]
2405-
WorkerOperationFailed { text: String },
2419+
#[snafu(display("The worker operation failed"))]
2420+
WorkerOperationFailed { source: SerializedError2 },
24062421
}
24072422

24082423
pub trait Backend {
@@ -3431,6 +3446,7 @@ mod tests {
34313446

34323447
coordinator.shutdown().await?;
34333448
}
3449+
34343450
Ok(())
34353451
}
34363452

@@ -3624,6 +3640,8 @@ mod tests {
36243640
let lines = response.code.lines().collect::<Vec<_>>();
36253641
assert_eq!(ARBITRARY_FORMAT_OUTPUT, lines);
36263642

3643+
coordinator.shutdown().await?;
3644+
36273645
Ok(())
36283646
}
36293647

@@ -3644,6 +3662,8 @@ mod tests {
36443662
assert!(response.success, "stderr: {}", response.stderr);
36453663
let lines = response.code.lines().collect::<Vec<_>>();
36463664
assert_eq!(ARBITRARY_FORMAT_OUTPUT, lines);
3665+
3666+
coordinator.shutdown().await?;
36473667
}
36483668

36493669
Ok(())
@@ -3706,6 +3726,8 @@ mod tests {
37063726
assert_contains!(response.stderr, "deny(clippy::eq_op)");
37073727
assert_contains!(response.stderr, "warn(clippy::zero_divided_by_zero)");
37083728

3729+
coordinator.shutdown().await?;
3730+
37093731
Ok(())
37103732
}
37113733

@@ -3737,6 +3759,8 @@ mod tests {
37373759
response.stderr
37383760
);
37393761

3762+
coordinator.shutdown().await?;
3763+
37403764
Ok(())
37413765
},
37423766
)
@@ -3781,6 +3805,8 @@ mod tests {
37813805
assert_contains!(response.stderr, "is out-of-bounds");
37823806
assert_contains!(response.stderr, "has size 0");
37833807

3808+
coordinator.shutdown().await?;
3809+
37843810
Ok(())
37853811
}
37863812

@@ -3817,6 +3843,8 @@ mod tests {
38173843
assert_contains!(response.stdout, "impl ::core::fmt::Debug for Dummy");
38183844
assert_contains!(response.stdout, "Formatter::write_str");
38193845

3846+
coordinator.shutdown().await?;
3847+
38203848
Ok(())
38213849
}
38223850

@@ -3897,6 +3925,8 @@ mod tests {
38973925
let res = coordinator.execute(req).await.unwrap();
38983926
assert_eq!(res.stdout, "hello\n");
38993927

3928+
coordinator.shutdown().await?;
3929+
39003930
Ok(())
39013931
}
39023932

@@ -3960,6 +3990,8 @@ mod tests {
39603990

39613991
assert_contains!(res.stdout, "Failed to connect");
39623992

3993+
coordinator.shutdown().await?;
3994+
39633995
Ok(())
39643996
}
39653997

@@ -3987,6 +4019,8 @@ mod tests {
39874019
// TODO: We need to actually inform the user about this somehow. The UI is blank.
39884020
// assert_contains!(res.stdout, "Killed");
39894021

4022+
coordinator.shutdown().await?;
4023+
39904024
Ok(())
39914025
}
39924026

@@ -4016,6 +4050,41 @@ mod tests {
40164050

40174051
assert_contains!(res.stderr, "Cannot fork");
40184052

4053+
coordinator.shutdown().await?;
4054+
4055+
Ok(())
4056+
}
4057+
4058+
#[tokio::test]
4059+
#[snafu::report]
4060+
async fn amount_of_output_is_limited() -> Result<()> {
4061+
// The limits are only applied to the container
4062+
let coordinator = new_coordinator_docker().await;
4063+
4064+
let req = ExecuteRequest {
4065+
code: r##"
4066+
use std::io::Write;
4067+
4068+
fn main() {
4069+
let a = "a".repeat(1024);
4070+
let out = std::io::stdout();
4071+
let mut out = out.lock();
4072+
loop {//for _ in 0..1_000_000 {
4073+
let _ = out.write_all(a.as_bytes());
4074+
let _ = out.write_all(b"\n");
4075+
}
4076+
}
4077+
"##
4078+
.into(),
4079+
..new_execution_limited_request()
4080+
};
4081+
4082+
let err = coordinator.execute(req).with_timeout().await.unwrap_err();
4083+
let err = snafu::ChainCompat::new(&err).last().unwrap();
4084+
assert_contains!(err.to_string(), "bytes of output, exiting");
4085+
4086+
coordinator.shutdown().await?;
4087+
40194088
Ok(())
40204089
}
40214090

compiler/base/orchestrator/src/message.rs

Lines changed: 53 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
use serde::{Deserialize, Serialize};
2-
use std::collections::HashMap;
2+
use std::{
3+
collections::{HashMap, VecDeque},
4+
fmt,
5+
};
36

47
pub type JobId = u64;
58
pub type Path = String;
@@ -38,7 +41,7 @@ impl_narrow_to_broad!(
3841
ExecuteCommand => ExecuteCommandRequest,
3942
);
4043

41-
#[derive(Debug, Serialize, Deserialize)]
44+
#[derive(Debug, Serialize, Deserialize, strum_macros::AsRefStr)]
4245
pub enum WorkerMessage {
4346
WriteFile(WriteFileResponse),
4447
DeleteFile(DeleteFileResponse),
@@ -47,19 +50,22 @@ pub enum WorkerMessage {
4750
StdoutPacket(String),
4851
StderrPacket(String),
4952
CommandStatistics(CommandStatistics),
53+
/// Vestigial; remove after a while
5054
Error(SerializedError),
55+
Error2(SerializedError2),
5156
}
5257

5358
macro_rules! impl_broad_to_narrow_with_error {
5459
($enum_type:ident, $($variant_name:ident => $variant_type:ty),* $(,)?) => {
5560
$(
56-
impl TryFrom<$enum_type> for Result<$variant_type, SerializedError> {
61+
impl TryFrom<$enum_type> for Result<$variant_type, SerializedError2> {
5762
type Error = $enum_type;
5863

5964
fn try_from(other: $enum_type) -> Result<Self, Self::Error> {
6065
match other {
6166
$enum_type::$variant_name(x) => Ok(Ok(x)),
62-
$enum_type::Error(e) => Ok(Err(e)),
67+
$enum_type::Error(e) => Ok(Err(SerializedError2::adapt(e))),
68+
$enum_type::Error2(e) => Ok(Err(e)),
6369
o => Err(o)
6470
}
6571
}
@@ -153,6 +159,49 @@ impl SerializedError {
153159
}
154160
}
155161

162+
#[derive(Debug, Serialize, Deserialize)]
163+
pub struct SerializedError2 {
164+
pub message: String,
165+
pub source: Option<Box<Self>>,
166+
}
167+
168+
impl SerializedError2 {
169+
pub fn new(e: impl snafu::Error) -> Self {
170+
let mut messages = snafu::CleanedErrorText::new(&e)
171+
.map(|(_, t, _)| t)
172+
.filter(|t| !t.is_empty())
173+
.collect::<VecDeque<_>>();
174+
175+
let message = messages
176+
.pop_front()
177+
.unwrap_or_else(|| "No error message".into());
178+
let source = messages.into_iter().rev().fold(None, |source, message| {
179+
Some(Box::new(Self { source, message }))
180+
});
181+
182+
Self { message, source }
183+
}
184+
185+
pub fn adapt(other: SerializedError) -> Self {
186+
Self {
187+
message: other.0,
188+
source: None,
189+
}
190+
}
191+
}
192+
193+
impl snafu::Error for SerializedError2 {
194+
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
195+
self.source.as_ref().map(|x| &**x as &dyn snafu::Error)
196+
}
197+
}
198+
199+
impl fmt::Display for SerializedError2 {
200+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
201+
self.message.fmt(f)
202+
}
203+
}
204+
156205
pub trait OneToOneResponse {
157206
type Response;
158207
}

0 commit comments

Comments
 (0)