Skip to content

Commit 21c64b6

Browse files
committed
Send a linked list of errors from the worker to the coordinator
This will allow us to format them nicer at a higher level.
1 parent 0492d75 commit 21c64b6

File tree

3 files changed

+73
-12
lines changed

3 files changed

+73
-12
lines changed

compiler/base/orchestrator/src/coordinator.rs

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use crate::{
3333
message::{
3434
CommandStatistics, CoordinatorMessage, DeleteFileRequest, ExecuteCommandRequest,
3535
ExecuteCommandResponse, JobId, Multiplexed, OneToOneResponse, ReadFileRequest,
36-
ReadFileResponse, SerializedError, WorkerMessage, WriteFileRequest,
36+
ReadFileResponse, SerializedError2, WorkerMessage, WriteFileRequest,
3737
},
3838
DropErrorDetailsExt,
3939
};
@@ -1754,15 +1754,25 @@ impl Container {
17541754
WorkerMessage::ExecuteCommand(resp) => {
17551755
return Ok(resp);
17561756
}
1757+
17571758
WorkerMessage::StdoutPacket(packet) => {
17581759
stdout_tx.send(packet).await.ok(/* Receiver gone, that's OK */);
17591760
}
1761+
17601762
WorkerMessage::StderrPacket(packet) => {
17611763
stderr_tx.send(packet).await.ok(/* Receiver gone, that's OK */);
17621764
}
1765+
17631766
WorkerMessage::CommandStatistics(stats) => {
17641767
status_tx.send(stats).await.ok(/* Receiver gone, that's OK */);
17651768
}
1769+
1770+
WorkerMessage::Error(e) =>
1771+
return Err(SerializedError2::adapt(e)).context(WorkerSnafu),
1772+
1773+
WorkerMessage::Error2(e) =>
1774+
return Err(e).context(WorkerSnafu),
1775+
17661776
_ => return UnexpectedMessageSnafu.fail(),
17671777
}
17681778
},
@@ -2084,6 +2094,9 @@ pub enum SpawnCargoError {
20842094
#[snafu(display("Could not start Cargo"))]
20852095
CouldNotStartCargo { source: CommanderError },
20862096

2097+
#[snafu(display("The worker operation failed"))]
2098+
Worker { source: SerializedError2 },
2099+
20872100
#[snafu(display("Received an unexpected message"))]
20882101
UnexpectedMessage,
20892102

@@ -2328,7 +2341,7 @@ impl Commander {
23282341
where
23292342
M: Into<CoordinatorMessage>,
23302343
M: OneToOneResponse,
2331-
Result<M::Response, SerializedError>: TryFrom<WorkerMessage>,
2344+
Result<M::Response, SerializedError2>: TryFrom<WorkerMessage>,
23322345
{
23332346
use commander_error::*;
23342347

@@ -2346,9 +2359,8 @@ impl Commander {
23462359
.await
23472360
.context(UnableToReceiveFromDemultiplexerSnafu)?;
23482361

2349-
match msg.try_into() {
2350-
Ok(Ok(v)) => Ok(v),
2351-
Ok(Err(e)) => WorkerOperationFailedSnafu { text: e.0 }.fail(),
2362+
match <Result<_, _>>::try_from(msg) {
2363+
Ok(v) => v.context(WorkerOperationFailedSnafu),
23522364
Err(_) => UnexpectedResponseTypeSnafu.fail(),
23532365
}
23542366
}
@@ -2401,8 +2413,8 @@ pub enum CommanderError {
24012413
#[snafu(display("Did not receive the expected response type from the worker"))]
24022414
UnexpectedResponseType,
24032415

2404-
#[snafu(display("The worker operation failed: {text}"))]
2405-
WorkerOperationFailed { text: String },
2416+
#[snafu(display("The worker operation failed"))]
2417+
WorkerOperationFailed { source: SerializedError2 },
24062418
}
24072419

24082420
pub trait Backend {

compiler/base/orchestrator/src/message.rs

Lines changed: 52 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
use serde::{Deserialize, Serialize};
2-
use std::collections::HashMap;
2+
use std::{
3+
collections::{HashMap, VecDeque},
4+
fmt,
5+
};
36

47
pub type JobId = u64;
58
pub type Path = String;
@@ -47,19 +50,22 @@ pub enum WorkerMessage {
4750
StdoutPacket(String),
4851
StderrPacket(String),
4952
CommandStatistics(CommandStatistics),
53+
/// Vestigial; remove after a while
5054
Error(SerializedError),
55+
Error2(SerializedError2),
5156
}
5257

5358
macro_rules! impl_broad_to_narrow_with_error {
5459
($enum_type:ident, $($variant_name:ident => $variant_type:ty),* $(,)?) => {
5560
$(
56-
impl TryFrom<$enum_type> for Result<$variant_type, SerializedError> {
61+
impl TryFrom<$enum_type> for Result<$variant_type, SerializedError2> {
5762
type Error = $enum_type;
5863

5964
fn try_from(other: $enum_type) -> Result<Self, Self::Error> {
6065
match other {
6166
$enum_type::$variant_name(x) => Ok(Ok(x)),
62-
$enum_type::Error(e) => Ok(Err(e)),
67+
$enum_type::Error(e) => Ok(Err(SerializedError2::adapt(e))),
68+
$enum_type::Error2(e) => Ok(Err(e)),
6369
o => Err(o)
6470
}
6571
}
@@ -153,6 +159,49 @@ impl SerializedError {
153159
}
154160
}
155161

162+
#[derive(Debug, Serialize, Deserialize)]
163+
pub struct SerializedError2 {
164+
pub message: String,
165+
pub source: Option<Box<Self>>,
166+
}
167+
168+
impl SerializedError2 {
169+
pub fn new(e: impl snafu::Error) -> Self {
170+
let mut messages = snafu::CleanedErrorText::new(&e)
171+
.map(|(_, t, _)| t)
172+
.filter(|t| !t.is_empty())
173+
.collect::<VecDeque<_>>();
174+
175+
let message = messages
176+
.pop_front()
177+
.unwrap_or_else(|| "No error message".into());
178+
let source = messages.into_iter().rev().fold(None, |source, message| {
179+
Some(Box::new(Self { source, message }))
180+
});
181+
182+
Self { message, source }
183+
}
184+
185+
pub fn adapt(other: SerializedError) -> Self {
186+
Self {
187+
message: other.0,
188+
source: None,
189+
}
190+
}
191+
}
192+
193+
impl snafu::Error for SerializedError2 {
194+
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
195+
self.source.as_ref().map(|x| &**x as &dyn snafu::Error)
196+
}
197+
}
198+
199+
impl fmt::Display for SerializedError2 {
200+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
201+
self.message.fmt(f)
202+
}
203+
}
204+
156205
pub trait OneToOneResponse {
157206
type Response;
158207
}

compiler/base/orchestrator/src/worker.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ use crate::{
5353
message::{
5454
CoordinatorMessage, DeleteFileRequest, DeleteFileResponse, ExecuteCommandRequest,
5555
ExecuteCommandResponse, JobId, Multiplexed, ReadFileRequest, ReadFileResponse,
56-
SerializedError, WorkerMessage, WriteFileRequest, WriteFileResponse,
56+
SerializedError2, WorkerMessage, WriteFileRequest, WriteFileResponse,
5757
},
5858
DropErrorDetailsExt,
5959
};
@@ -268,7 +268,7 @@ impl MultiplexingSender {
268268
}
269269

270270
async fn send_err(&self, e: impl std::error::Error) -> Result<(), MultiplexingSenderError> {
271-
self.send_raw(WorkerMessage::Error(SerializedError::new(e)))
271+
self.send_raw(WorkerMessage::Error2(SerializedError2::new(e)))
272272
.await
273273
}
274274

0 commit comments

Comments
 (0)