Skip to content

Commit e93e84e

Browse files
authored
Merge pull request #2262 from GitoxideLabs/copilot/fix-non-blocking-piped-data
Fix deadlock in single-file filter drivers with large inputs
2 parents 896cb4d + 1aaa6fa commit e93e84e

File tree

4 files changed

+181
-9
lines changed

4 files changed

+181
-9
lines changed

Cargo.lock

Lines changed: 0 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

gix-filter/src/driver/apply.rs

Lines changed: 92 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ impl State {
7777
///
7878
/// ### Deviation
7979
///
80-
/// If a long running process returns the 'abort' status after receiving the data, it will be removed similarly to how `git` does it.
80+
/// If a long-running process returns the 'abort' status after receiving the data, it will be removed similarly to how `git` does it.
8181
/// However, if it returns an unsuccessful error status later, it will not be removed, but reports the error only.
8282
/// If any other non-'error' status is received, the process will be stopped. But that doesn't happen if such a status is received
8383
/// after reading the filtered result.
@@ -97,9 +97,9 @@ impl State {
9797
}
9898
}
9999

100-
/// Like [`apply()]`[Self::apply()], but use `delay` to determine if the filter result may be delayed or not.
100+
/// Like [`apply()`](Self::apply()), but use `delay` to determine if the filter result may be delayed or not.
101101
///
102-
/// Poll [`list_delayed_paths()`][Self::list_delayed_paths()] until it is empty and query the available paths again.
102+
/// Poll [`list_delayed_paths()`](Self::list_delayed_paths()) until it is empty and query the available paths again.
103103
/// Note that even though it's possible, the API assumes that commands aren't mixed when delays are allowed.
104104
pub fn apply_delayed<'a>(
105105
&'a mut self,
@@ -111,10 +111,24 @@ impl State {
111111
) -> Result<Option<MaybeDelayed<'a>>, Error> {
112112
match self.maybe_launch_process(driver, operation, ctx.rela_path)? {
113113
Some(Process::SingleFile { mut child, command }) => {
114-
std::io::copy(src, &mut child.stdin.take().expect("configured"))?;
114+
// To avoid deadlock when the filter immediately echoes input to output (like `cat`),
115+
// we need to write to stdin and read from stdout concurrently. If we write all data
116+
// to stdin before reading from stdout, and the pipe buffer fills up, both processes
117+
// will block: the filter blocks writing to stdout (buffer full), and we block writing
118+
// to stdin (waiting for the filter to consume data).
119+
//
120+
// Solution: Read all data into a buffer, then spawn a thread to write it to stdin
121+
// while we can immediately read from stdout.
122+
let mut input_data = Vec::new();
123+
std::io::copy(src, &mut input_data)?;
124+
125+
let stdin = child.stdin.take().expect("configured");
126+
let write_thread = WriterThread::write_all_in_background(input_data, stdin)?;
127+
115128
Ok(Some(MaybeDelayed::Immediate(Box::new(ReadFilterOutput {
116129
inner: child.stdout.take(),
117130
child: driver.required.then_some((child, command)),
131+
write_thread: Some(write_thread),
118132
}))))
119133
}
120134
Some(Process::MultiFile { client, key }) => {
@@ -202,11 +216,62 @@ pub enum MaybeDelayed<'a> {
202216
Immediate(Box<dyn std::io::Read + 'a>),
203217
}
204218

219+
/// A helper to manage writing to stdin on a separate thread to avoid deadlock.
220+
struct WriterThread {
221+
handle: Option<std::thread::JoinHandle<std::io::Result<()>>>,
222+
}
223+
224+
impl WriterThread {
225+
/// Spawn a thread that will write all data from `data` to `stdin`.
226+
fn write_all_in_background(data: Vec<u8>, mut stdin: std::process::ChildStdin) -> std::io::Result<Self> {
227+
let handle = std::thread::Builder::new()
228+
.name("gix-filter-stdin-writer".into())
229+
.stack_size(128 * 1024)
230+
.spawn(move || {
231+
use std::io::Write;
232+
stdin.write_all(&data)?;
233+
// Explicitly drop stdin to close the pipe and signal EOF to the child
234+
drop(stdin);
235+
Ok(())
236+
})?;
237+
238+
Ok(Self { handle: Some(handle) })
239+
}
240+
241+
/// Wait for the writer thread to complete and return any error that occurred.
242+
fn join(&mut self) -> std::io::Result<()> {
243+
let Some(handle) = self.handle.take() else {
244+
return Ok(());
245+
};
246+
handle.join().map_err(|panic_info| {
247+
let msg = if let Some(s) = panic_info.downcast_ref::<String>() {
248+
format!("Writer thread panicked: {s}")
249+
} else if let Some(s) = panic_info.downcast_ref::<&str>() {
250+
format!("Writer thread panicked: {s}")
251+
} else {
252+
"Writer thread panicked while writing to filter stdin".to_string()
253+
};
254+
std::io::Error::other(msg)
255+
})?
256+
}
257+
}
258+
259+
impl Drop for WriterThread {
260+
fn drop(&mut self) {
261+
// Best effort join on drop.
262+
if let Err(_err) = self.join() {
263+
gix_trace::debug!(err = %_err, "Failed to join writer thread during drop");
264+
}
265+
}
266+
}
267+
205268
/// A utility type to facilitate streaming the output of a filter process.
206269
struct ReadFilterOutput {
207270
inner: Option<std::process::ChildStdout>,
208271
/// The child is present if we need its exit code to be positive.
209272
child: Option<(std::process::Child, std::process::Command)>,
273+
/// The thread writing to stdin, if any. Must be joined when reading is done.
274+
write_thread: Option<WriterThread>,
210275
}
211276

212277
pub(crate) fn handle_io_err(err: &std::io::Error, running: &mut HashMap<BString, process::Client>, process: &BStr) {
@@ -222,9 +287,31 @@ impl std::io::Read for ReadFilterOutput {
222287
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
223288
match self.inner.as_mut() {
224289
Some(inner) => {
225-
let num_read = inner.read(buf)?;
290+
let num_read = match inner.read(buf) {
291+
Ok(n) => n,
292+
Err(e) => {
293+
// On read error, ensure we join the writer thread before propagating the error.
294+
// This is expected to finish with failure as well as it should fail to write
295+
// to the process which now fails to produce output (that we try to read).
296+
if let Some(mut write_thread) = self.write_thread.take() {
297+
// Try to join but prioritize the original read error
298+
if let Err(_thread_err) = write_thread.join() {
299+
gix_trace::debug!(thread_err = %_thread_err, read_err = %e, "write to stdin error during failed read");
300+
}
301+
}
302+
return Err(e);
303+
}
304+
};
305+
226306
if num_read == 0 {
227307
self.inner.take();
308+
309+
// Join the writer thread first to ensure all data has been written
310+
// and that resources are freed now.
311+
if let Some(mut write_thread) = self.write_thread.take() {
312+
write_thread.join()?;
313+
}
314+
228315
if let Some((mut child, cmd)) = self.child.take() {
229316
let status = child.wait()?;
230317
if !status.success() {

gix-filter/tests/filter/driver.rs

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -371,6 +371,95 @@ pub(crate) mod apply {
371371
Ok(())
372372
}
373373

374+
#[serial]
375+
#[test]
376+
fn large_file_with_cat_filter_does_not_hang() -> crate::Result {
377+
// This test reproduces issue #2080 where using `cat` as a filter with a large file
378+
// causes a deadlock. The pipe buffer is typically 64KB on Linux, so we use files
379+
// larger than that to ensure the buffer fills up.
380+
381+
// Typical pipe buffer sizes on Unix systems
382+
const PIPE_BUFFER_SIZE: usize = 64 * 1024; // 64KB
383+
384+
let mut state = gix_filter::driver::State::default();
385+
386+
// Create a driver that uses `cat` command (which echoes input to output immediately)
387+
let driver = Driver {
388+
name: "cat".into(),
389+
clean: Some("cat".into()),
390+
smudge: Some("cat".into()),
391+
process: None,
392+
required: false,
393+
};
394+
395+
// Test with multiple sizes to ensure robustness
396+
for size in [
397+
PIPE_BUFFER_SIZE,
398+
2 * PIPE_BUFFER_SIZE,
399+
8 * PIPE_BUFFER_SIZE,
400+
16 * PIPE_BUFFER_SIZE,
401+
] {
402+
let input = vec![b'a'; size];
403+
404+
// Apply the filter - this should not hang
405+
let mut filtered = state
406+
.apply(
407+
&driver,
408+
&mut input.as_slice(),
409+
driver::Operation::Smudge,
410+
context_from_path("large-file.txt"),
411+
)?
412+
.expect("filter present");
413+
414+
let mut output = Vec::new();
415+
filtered.read_to_end(&mut output)?;
416+
417+
assert_eq!(
418+
input.len(),
419+
output.len(),
420+
"cat should pass through all data unchanged for {size} bytes"
421+
);
422+
assert_eq!(input, output, "cat should not modify the data");
423+
}
424+
Ok(())
425+
}
426+
427+
#[serial]
428+
#[test]
429+
fn large_file_with_cat_filter_early_drop() -> crate::Result {
430+
// Test that dropping the reader early doesn't cause issues (thread cleanup)
431+
let mut state = gix_filter::driver::State::default();
432+
433+
let driver = Driver {
434+
name: "cat".into(),
435+
clean: Some("cat".into()),
436+
smudge: Some("cat".into()),
437+
process: None,
438+
required: false,
439+
};
440+
441+
let input = vec![b'x'; 256 * 1024];
442+
443+
// Apply the filter but only read a small amount
444+
let mut filtered = state
445+
.apply(
446+
&driver,
447+
&mut input.as_slice(),
448+
driver::Operation::Clean,
449+
context_from_path("early-drop.txt"),
450+
)?
451+
.expect("filter present");
452+
453+
let mut output = vec![0u8; 100];
454+
filtered.read_exact(&mut output)?;
455+
assert_eq!(output, vec![b'x'; 100], "should read first 100 bytes");
456+
457+
// Drop the reader early - the thread should still clean up properly
458+
drop(filtered);
459+
460+
Ok(())
461+
}
462+
374463
pub(crate) fn extract_delayed_key(res: Option<apply::MaybeDelayed<'_>>) -> driver::Key {
375464
match res {
376465
Some(apply::MaybeDelayed::Immediate(_)) | None => {

gix-worktree-state/Cargo.toml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,7 @@ doctest = false
1919
gix-worktree = { version = "^0.43.0", path = "../gix-worktree", default-features = false, features = ["attributes"] }
2020
gix-index = { version = "^0.42.0", path = "../gix-index" }
2121
gix-fs = { version = "^0.17.0", path = "../gix-fs" }
22-
gix-hash = { version = "^0.20.0", path = "../gix-hash" }
2322
gix-object = { version = "^0.51.0", path = "../gix-object" }
24-
gix-glob = { version = "^0.22.0", path = "../gix-glob" }
2523
gix-path = { version = "^0.10.21", path = "../gix-path" }
2624
gix-features = { version = "^0.44.0", path = "../gix-features" }
2725
gix-filter = { version = "^0.21.0", path = "../gix-filter" }

0 commit comments

Comments
 (0)