Skip to content

Commit 25a4d85

Browse files
committed
Track WebSocket meta information for errors
This will allow us to connect an error to the event that caused it.
1 parent 02674fc commit 25a4d85

File tree

1 file changed

+21
-16
lines changed

1 file changed

+21
-16
lines changed

ui/src/server_axum/websocket.rs

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use crate::{
66
};
77

88
use axum::extract::ws::{Message, WebSocket};
9-
use futures::{Future, FutureExt, StreamExt};
9+
use futures::{Future, FutureExt, StreamExt, TryFutureExt};
1010
use orchestrator::{
1111
coordinator::{self, Coordinator, DockerBackend},
1212
DropErrorDetailsExt,
@@ -213,7 +213,8 @@ pub(crate) async fn handle(socket: WebSocket, feature_flags: FeatureFlags) {
213213
metrics::DURATION_WS.observe(elapsed.as_secs_f64());
214214
}
215215

216-
type ResponseTx = mpsc::Sender<Result<MessageResponse>>;
216+
type TaggedError = (Error, Option<Meta>);
217+
type ResponseTx = mpsc::Sender<Result<MessageResponse, TaggedError>>;
217218
type SharedCoordinator = Arc<Coordinator<DockerBackend>>;
218219

219220
/// Manages a limited amount of access to the `Coordinator`.
@@ -228,7 +229,7 @@ type SharedCoordinator = Arc<Coordinator<DockerBackend>>;
228229
/// - Allows limited parallelism between jobs of different types.
229230
struct CoordinatorManager {
230231
coordinator: SharedCoordinator,
231-
tasks: JoinSet<Result<()>>,
232+
tasks: JoinSet<Result<(), TaggedError>>,
232233
semaphore: Arc<Semaphore>,
233234
abort_handles: [Option<AbortHandle>; Self::N_KINDS],
234235
}
@@ -255,15 +256,17 @@ impl CoordinatorManager {
255256
self.tasks.is_empty()
256257
}
257258

258-
async fn join_next(&mut self) -> Option<Result<Result<()>, tokio::task::JoinError>> {
259+
async fn join_next(
260+
&mut self,
261+
) -> Option<Result<Result<(), TaggedError>, tokio::task::JoinError>> {
259262
self.tasks.join_next().await
260263
}
261264

262265
async fn spawn<F, Fut>(&mut self, handler: F) -> CoordinatorManagerResult<()>
263266
where
264267
F: FnOnce(SharedCoordinator) -> Fut,
265268
F: 'static + Send,
266-
Fut: Future<Output = Result<()>>,
269+
Fut: Future<Output = Result<(), TaggedError>>,
267270
Fut: 'static + Send,
268271
{
269272
let coordinator = self.coordinator.clone();
@@ -391,7 +394,7 @@ async fn handle_core(mut socket: WebSocket, feature_flags: FeatureFlags) {
391394

392395
// We don't care if there are no running tasks
393396
Some(task) = manager.join_next() => {
394-
let error = match task {
397+
let (error, meta) = match task {
395398
Ok(Ok(())) => continue,
396399
Ok(Err(error)) => error,
397400
Err(error) => {
@@ -405,11 +408,11 @@ async fn handle_core(mut socket: WebSocket, feature_flags: FeatureFlags) {
405408
_ => "An unknown panic occurred".into(),
406409
}
407410
};
408-
WebSocketTaskPanicSnafu { text }.build()
411+
(WebSocketTaskPanicSnafu { text }.build(), None)
409412
}
410413
};
411414

412-
if tx.send(Err(error)).await.is_err() {
415+
if tx.send(Err((error, meta))).await.is_err() {
413416
// We can't send a response
414417
break;
415418
}
@@ -427,7 +430,7 @@ async fn handle_core(mut socket: WebSocket, feature_flags: FeatureFlags) {
427430

428431
let Err(error) = idled else { continue };
429432

430-
if tx.send(Err(error)).await.is_err() {
433+
if tx.send(Err((error, None))).await.is_err() {
431434
// We can't send a response
432435
break;
433436
}
@@ -466,14 +469,14 @@ fn create_server_meta() -> Meta {
466469
})
467470
}
468471

469-
fn error_to_response(error: Error) -> MessageResponse {
472+
fn error_to_response((error, meta): TaggedError) -> MessageResponse {
470473
let error = snafu::CleanedErrorText::new(&error)
471474
.map(|(_, t, _)| t)
472475
.reduce(|e, t| e + ": " + &t)
473476
.unwrap_or_default();
474477
let payload = WSError { error };
475-
// TODO: thread through the Meta from the originating request
476-
let meta = create_server_meta();
478+
479+
let meta = meta.unwrap_or_else(create_server_meta);
477480

478481
MessageResponse::Error { payload, meta }
479482
}
@@ -506,16 +509,18 @@ async fn handle_msg(
506509
let spawned = manager
507510
.spawn({
508511
let tx = tx.clone();
512+
let meta = meta.clone();
509513
|coordinator| {
510-
handle_execute(token, execution_rx, tx, coordinator, payload, meta)
514+
handle_execute(token, execution_rx, tx, coordinator, payload, meta.clone())
511515
.context(StreamingExecuteSnafu)
516+
.map_err(|e| (e, Some(meta)))
512517
}
513518
})
514519
.await
515520
.context(StreamingCoordinatorSpawnSnafu);
516521

517522
if let Err(e) = spawned {
518-
tx.send(Err(e)).await.ok(/* We don't care if the channel is closed */);
523+
tx.send(Err((e, Some(meta)))).await.ok(/* We don't care if the channel is closed */);
519524
}
520525
}
521526

@@ -531,7 +536,7 @@ async fn handle_msg(
531536
.context(StreamingCoordinatorExecuteStdinSnafu);
532537

533538
if let Err(e) = sent {
534-
tx.send(Err(e)).await.ok(/* We don't care if the channel is closed */);
539+
tx.send(Err((e, Some(meta)))).await.ok(/* We don't care if the channel is closed */);
535540
}
536541
}
537542

@@ -553,7 +558,7 @@ async fn handle_msg(
553558
}
554559

555560
Err(e) => {
556-
tx.send(Err(e)).await.ok(/* We don't care if the channel is closed */);
561+
tx.send(Err((e, None))).await.ok(/* We don't care if the channel is closed */);
557562
}
558563
}
559564
}

0 commit comments

Comments
 (0)