Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 57 additions & 8 deletions Lib/multiprocessing/popen_fork.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import atexit
import os
import signal
import threading

from . import util

Expand All @@ -17,23 +18,71 @@ def __init__(self, process_obj):
util._flush_std_streams()
self.returncode = None
self.finalizer = None
self._exit_condition = threading.Condition()
self._exit_blockers = 0
self._launch(process_obj)

def duplicate_for_child(self, fd):
return fd

def poll(self, flag=os.WNOHANG):
if self.returncode is None:
try:
pid, sts = os.waitpid(self.pid, flag)
except OSError:
# Child process not yet created. See #1731717
# e.errno == errno.ECHILD == 10
return None
with self._exit_condition:
if self.returncode is not None:
return self.returncode
elif flag & os.WNOHANG == os.WNOHANG:
return self._nonblocking_poll(flag)
else:
self._exit_blockers += 1

# We have released the lock, so may be racing with blocking &
# non-blocking calls at this point...
pid = None
try:
pid, sts = os.waitpid(self.pid, flag)
except OSError:
# Child process doesn't exist because it hasn't started yet (see
# bpo-1731717) or has already been awaited on a racing thread (see
# gh-130895)
pass

with self._exit_condition:
self._exit_blockers -= 1
if pid == self.pid:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about addding the following test, just between the increment and the pid test:

            if self.returncode is not None:
                return self.returncode

I suggest this change because while the condition RLock was released, the self.returncode attribute could have been updated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so, but it is very possible I've missed something, this is all (way too) subtle...

If the thread has won the race and os.waitpid returned the exit code then the returncode cannot have been set: only one thread will get a valid exit code, so if this one has it no other one can. In that case the if would always fail.

If the thread did not win the race then the winner might have set it, but in that case it isn't enough to just exit immediately. We need to notify in case there was another blocked thread which didn't see the returncode and is waiting.

I.e. consider the following scenario:

  1. Threads 1 & 2 are blocked doing an os.waitpid in poll.
  2. Thread 3 does a non-blocking poll which "wins" and gets the status code (but has not yet set it!)
  3. Thread 1 wakes up. It has lost the race, returncode hasn't yet been set, and there is another blocker, so it goes into the while loop and waits.
  4. Thread 3 sets the returncode.
  5. Thread 2 wakes up. If we add the if it will exit without notifying thread 1, which will then be blocked indefinitely.

Copy link
Contributor

@YvesDup YvesDup Nov 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the thread has won the race and os.waitpid returned the exit code then the returncode cannot have been set:

Are you describe a case of blocking poll ?
In case of non-blocking poll, the self._set_returncode method is always called in a protected block via the condition RLock. The set and notify operations are always executed together in the self._set_returncode method .

only one thread will get a valid exit code, so if this one has it no other one can.

I agree

In that case the if would always fail.

Yes it would failed, but only for the current thread which will continue to run. And the next threads, should not be failed if self.return code is set correctly.

If the thread did not win the race then the winner might have set it, but in that case it isn't enough to just exit immediately. We need to notify in case there was another blocked thread which didn't see the returncode and is waiting.

IMO the notification was already sent from the last call to theself._set_returncode method. Even though I have my doubts, if a new notification is really necessary - when self._exit_blockers is zero ? - the new if is no longer a relevant option.

2. Thread 3 does a non-blocking poll which "wins" and gets the status code (but has not yet set it!)

Regarding my first command, I guess yout talk about a blocking poll.

When I ran your test_racing_joins test with the new if, all seems okay, I never noticed problem with missing notifications. We can check together if you wish.

Your fix seems okay to me. I was just wondering if it was possible to avoid executing the second protected block entirely.
Thank you for your time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the thread did not win the race then the winner might have set it, but in that case it isn't enough to just exit immediately. We need to notify in case there was another blocked thread which didn't see the returncode and is waiting.

IMO the notification was already sent from the last call to theself._set_returncode method. Even though I have my doubts, if a new notification is really necessary - when self._exit_blockers is zero ? - the new if is no longer a relevant option.

You make a good point, and I made a mistake in my analysis above. The early exit you suggest is indeed safe if another racing poll has won and set the return code, since they will have also notified. The problematic case is the pathological one, where some other code entirely called os.waitpid and won the race. In that case _set_returncode is not called and hence we need that backup notification to avoid a hang.

In that scenario the Popen object is broken, since it has no way to get the child's exit code. Methods like is_alive() and mp mechanisms downstream of Popen will give the wrong result (there are a bunch of open issues that boil down to this). It is a "user error" situation, but we should still try and make everything fail as gracefully as possible (i.e. not hang 😉).

So, we could put the early exit in. Ultimately I think the code will behave correctly with or without it. Personally, I think the code reads slightly simpler without it, just in basic terms of fewer conditionals/less cyclomatic complexity, but YMMV on that. If there is a general consensus otherwise I'll happily add it.

When I ran your test_racing_joins test with the new if, all seems okay, I never noticed problem with missing notifications. We can check together if you wish.

Thanks, I would love to collaborate on this! The existing test case does not test the pathological behaviour, so it will indeed miss this. Here is a new test script that does; it hangs reliably for me if the last blocker notification is removed:

import multiprocessing as mp
import os
import threading

N=8

def wait(barrier, p):
    barrier.wait()
    p.join()

    # WRONG due to losing the child's status
    assert p.exitcode is None
    assert p.is_alive()

def race():
    pbarrier = mp.Barrier(2)

    # Ensure child has completed
    p = mp.Process(target=pbarrier.wait, args=(pbarrier,))
    p.start()
    pbarrier.wait()

    # Steal the child's exit code
    pid, sts = os.waitpid(p.pid, 0)
    assert pid == p.pid
    assert sts == 0

    # Since the child is dead, these should not block...
    tbarrier = threading.Barrier(N)
    threads = [threading.Thread(target=wait, args=(tbarrier, p)) for _ in range(N)]
    for t in threads:
        t.start()
    for t in threads:
        t.join()

if __name__ == "__main__":
    mp.set_start_method('fork')
    race()

Does that reproduce for you? I guess it should be tidied up and turned into a unit test...

Your fix seems okay to me. I was just wondering if it was possible to avoid executing the second protected block entirely. Thank you for your time.

Thanks! For the record, I very much appreciate your review, and am grateful for your feedback and intelligent questions. I really dislike the sort of overly-complex synchronisation I'm doing here, so I'm particularly grateful for it in this case. I would love to simplify this!

self.returncode = os.waitstatus_to_exitcode(sts)
self._set_returncode(sts)
elif self._exit_blockers == 0:
self._exit_condition.notify_all()

# Wait until we get a definitive result, or we know there are no
# racing calls that might be about to set it
while self.returncode is None and self._exit_blockers > 0:
self._exit_condition.wait()

return self.returncode

def _nonblocking_poll(self, flag):
assert self._exit_condition._is_owned()
assert self.returncode is None
assert flag & os.WNOHANG == os.WNOHANG
try:
pid, sts = os.waitpid(self.pid, flag)
if pid == self.pid:
self._set_returncode(sts)
except OSError:
# See comments in the poll(...) except clause above
pass

# We may be racing with a blocking wait call, in which case (if we lose
# the race) it is arbitrary whether this returns None or the exit code
# (if there is one): calling code must always be prepared to handle a
# situation where this method returns None but the process has ended.
return self.returncode

def _set_returncode(self, sts):
assert self._exit_condition._is_owned()
assert self.returncode is None
self.returncode = os.waitstatus_to_exitcode(sts)
self._exit_condition.notify_all()

def wait(self, timeout=None):
if self.returncode is None:
if timeout is not None:
Expand Down
17 changes: 11 additions & 6 deletions Lib/multiprocessing/popen_forkserver.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import io
import os
import threading

from .context import reduction, set_spawning_popen
if not reduction.HAVE_SEND_HANDLE:
Expand Down Expand Up @@ -32,6 +33,7 @@ class Popen(popen_fork.Popen):

def __init__(self, process_obj):
self._fds = []
self._lock = threading.Lock()
Copy link
Contributor

@YvesDup YvesDup Nov 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are we do here about the private _exit_condition and _exit_blockers attributes inherit from popen_fork.Popen class ? I suggest a private method in the parent class which define its own synchronize attributes, calls from the __init__.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. I had intended to just ignore them. We are very fortunate the locking is entirely self-contained within each class's poll implementation, so this works just fine, but it does leave the child with unused instance variables inherited from the parent.

If I understand your suggestion (please correct me if not), you are suggesting a polymorphic method to initialise locking, so each class only creates the attributes it requires. I agree, that sounds like an improvement, thanks! I'll update the patch shortly to add it.

BTW, I think all this would be simpler and more robust if popen_forkserver.Popen did not directly inherit from popen_fork.Popen. They should probably both inherit from an abstract base class, instead. Presumably it would be impractical, if not impossible, to change that now due to backward compat concerns, though.

super().__init__(process_obj)

def duplicate_for_child(self, fd):
Expand Down Expand Up @@ -64,11 +66,14 @@ def poll(self, flag=os.WNOHANG):
timeout = 0 if flag == os.WNOHANG else None
if not wait([self.sentinel], timeout):
return None
try:
self.returncode = forkserver.read_signed(self.sentinel)
except (OSError, EOFError):
# This should not happen usually, but perhaps the forkserver
# process itself got killed
self.returncode = 255

with self._lock:
if self.returncode is None:
try:
self.returncode = forkserver.read_signed(self.sentinel)
except (OSError, EOFError):
# This should not happen usually, but perhaps the
# forkserver process itself got killed
self.returncode = 255

return self.returncode
38 changes: 38 additions & 0 deletions Lib/test/_test_multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -953,6 +953,44 @@ def test_forkserver_without_auth_fails(self):
proc.start()
proc.join()

@staticmethod
def _wait_for_barrier(barrier):
barrier.wait()

def _wait_on_proc(self, barrier, proc, errs):
barrier.wait()
proc.join()
if proc.is_alive():
errs.append("process alive after join")
if proc.exitcode != 0:
errs.append("process reported non-zero exit code")

def test_racing_joins(self):
if self.TYPE == "threads":
self.skipTest(f"test not appropriate for {self.TYPE}")

N = 5
ITERATIONS = 10
for _ in range(ITERATIONS):
barrier = self.Barrier(N+1)
proc = self.Process(target=self._wait_for_barrier, args=(barrier,))

errs = []
threads = [threading.Thread(target=self._wait_on_proc,
args=(barrier, proc, errs))
for _ in range(N)]
for t in threads:
t.start()

proc.start()
for t in threads:
t.join()

# On failure(s), report the first since they are likely the same
# error reported from multiple threads
if errs:
raise AssertionError(errs[0])

#
#
#
Expand Down
10 changes: 1 addition & 9 deletions Lib/test/test_concurrent_futures/test_process_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,15 +327,7 @@ def test_force_shutdown_workers_stops_pool(self, function_name):
# error since the process would be alive immediately after the
# test run.. and die a moment later.
worker_process.join(support.SHORT_TIMEOUT)

# Oddly enough, even though join completes, sometimes it takes a
# moment for the process to actually be marked as dead.
# ... that seems a bit buggy.
# We need it dead before ending the test to ensure it doesn't
# get marked as an ENV CHANGE due to living child process.
for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
if not worker_process.is_alive():
break
self.assertFalse(worker_process.is_alive())


create_executor_tests(globals(), ProcessPoolExecutorTest,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix race with ``poll``/``wait``/``join`` in :mod:`multiprocessing`.``Process``.
Loading