Skip to content

Commit 82c92af

Browse files
committed
Allow killing the process after closing stdin
Removing it from the map means we can never do anything else with it. Oops.
1 parent c65c693 commit 82c92af

File tree

2 files changed

+37
-9
lines changed

2 files changed

+37
-9
lines changed

tests/spec/features/streaming_spec.rb

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,28 @@
108108
end
109109
end
110110

111+
scenario "The process can be killed after stdin is closed" do
112+
editor.set <<~EOF
113+
fn main() {
114+
loop {
115+
std::thread::sleep(std::time::Duration::from_secs(1));
116+
}
117+
}
118+
EOF
119+
120+
click_on("Run")
121+
122+
within(:stdin) do
123+
click_on 'Execution control'
124+
click_on 'Close stdin'
125+
click_on 'Execution control'
126+
click_on 'Kill process'
127+
end
128+
129+
within(:output, :error) do
130+
expect(page).to have_content 'SIGKILL'
131+
end
132+
end
111133

112134
def editor
113135
Editor.new(page)

ui/src/server_axum/websocket.rs

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -418,7 +418,7 @@ async fn handle_core(mut socket: WebSocket, feature_flags: FeatureFlags) {
418418
_ = active_execution_gc_interval.tick() => {
419419
active_executions = mem::take(&mut active_executions)
420420
.into_iter()
421-
.filter(|(_id, (_, tx))| !tx.is_closed())
421+
.filter(|(_id, (_, tx))| tx.as_ref().map_or(false, |tx| !tx.is_closed()))
422422
.collect();
423423
},
424424

@@ -489,7 +489,7 @@ async fn handle_msg(
489489
txt: String,
490490
tx: &ResponseTx,
491491
manager: &mut CoordinatorManager,
492-
active_executions: &mut BTreeMap<i64, (CancellationToken, mpsc::Sender<String>)>,
492+
active_executions: &mut BTreeMap<i64, (CancellationToken, Option<mpsc::Sender<String>>)>,
493493
) {
494494
use WSMessageRequest::*;
495495

@@ -500,7 +500,7 @@ async fn handle_msg(
500500
let token = CancellationToken::new();
501501
let (execution_tx, execution_rx) = mpsc::channel(8);
502502

503-
active_executions.insert(meta.sequence_number, (token.clone(), execution_tx));
503+
active_executions.insert(meta.sequence_number, (token.clone(), Some(execution_tx)));
504504

505505
// TODO: Should a single execute / build / etc. session have a timeout of some kind?
506506
let spawned = manager
@@ -520,7 +520,7 @@ async fn handle_msg(
520520
}
521521

522522
Ok(ExecuteStdin { payload, meta }) => {
523-
let Some((_, execution_tx)) = active_executions.get(&meta.sequence_number) else {
523+
let Some((_, Some(execution_tx))) = active_executions.get(&meta.sequence_number) else {
524524
warn!("Received stdin for an execution that is no longer active");
525525
return;
526526
};
@@ -536,14 +536,20 @@ async fn handle_msg(
536536
}
537537

538538
Ok(ExecuteStdinClose { meta }) => {
539-
let execution_tx = active_executions.remove(&meta.sequence_number);
540-
drop(execution_tx); // Signal closed
539+
let Some((_, execution_tx)) = active_executions.get_mut(&meta.sequence_number) else {
540+
warn!("Received stdin close for an execution that is no longer active");
541+
return;
542+
};
543+
544+
*execution_tx = None; // Drop to signal closed
541545
}
542546

543547
Ok(ExecuteKill { meta }) => {
544-
if let Some((token, _)) = active_executions.remove(&meta.sequence_number) {
545-
token.cancel();
546-
}
548+
let Some((token, _)) = active_executions.get(&meta.sequence_number) else {
549+
warn!("Received kill for an execution that is no longer active");
550+
return;
551+
};
552+
token.cancel();
547553
}
548554

549555
Err(e) => {

0 commit comments

Comments
 (0)