-
-
Notifications
You must be signed in to change notification settings - Fork 33.5k
gh-130895: fix multiprocessing.Process join/wait/poll races #131440
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 2 commits
a1408a4
ad102f4
87f391f
6ff7c04
5b4dbb0
e7f7144
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,7 @@ | ||
| import atexit | ||
| import os | ||
| import signal | ||
| import threading | ||
|
|
||
| from . import util | ||
|
|
||
|
|
@@ -17,23 +18,71 @@ def __init__(self, process_obj): | |
| util._flush_std_streams() | ||
| self.returncode = None | ||
| self.finalizer = None | ||
| self._exit_condition = threading.Condition() | ||
| self._exit_blockers = 0 | ||
| self._launch(process_obj) | ||
|
|
||
| def duplicate_for_child(self, fd): | ||
| return fd | ||
|
|
||
| def poll(self, flag=os.WNOHANG): | ||
| if self.returncode is None: | ||
| try: | ||
| pid, sts = os.waitpid(self.pid, flag) | ||
| except OSError: | ||
| # Child process not yet created. See #1731717 | ||
| # e.errno == errno.ECHILD == 10 | ||
| return None | ||
| with self._exit_condition: | ||
| if self.returncode is not None: | ||
| return self.returncode | ||
| elif flag & os.WNOHANG == os.WNOHANG: | ||
| return self._nonblocking_poll(flag) | ||
| else: | ||
| self._exit_blockers += 1 | ||
|
|
||
| # We have released the lock, so may be racing with blocking & | ||
| # non-blocking calls at this point... | ||
| pid = None | ||
| try: | ||
| pid, sts = os.waitpid(self.pid, flag) | ||
| except OSError: | ||
| # Child process doesn't exist because it hasn't started yet (see | ||
| # bpo-1731717) or has already been awaited on a racing thread (see | ||
| # gh-130895) | ||
| pass | ||
|
|
||
| with self._exit_condition: | ||
| self._exit_blockers -= 1 | ||
| if pid == self.pid: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What do you think about addding the following test, just between the increment and the I suggest this change because while the condition
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think so, but it is very possible I've missed something, this is all (way too) subtle... If the thread has won the race and If the thread did not win the race then the winner might have set it, but in that case it isn't enough to just exit immediately. We need to notify in case there was another blocked thread which didn't see the I.e. consider the following scenario:
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Are you describe a case of blocking poll ?
I agree
Yes it would failed, but only for the current thread which will continue to run. And the next threads, should not be failed if
IMO the notification was already sent from the last call to the
Regarding my first command, I guess yout talk about a blocking poll. When I ran your Your fix seems okay to me. I was just wondering if it was possible to avoid executing the second protected block entirely.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
You make a good point, and I made a mistake in my analysis above. The early exit you suggest is indeed safe if another racing In that scenario the So, we could put the early exit in. Ultimately I think the code will behave correctly with or without it. Personally, I think the code reads slightly simpler without it, just in basic terms of fewer conditionals/less cyclomatic complexity, but YMMV on that. If there is a general consensus otherwise I'll happily add it.
Thanks, I would love to collaborate on this! The existing test case does not test the pathological behaviour, so it will indeed miss this. Here is a new test script that does; it hangs reliably for me if the last blocker notification is removed: import multiprocessing as mp
import os
import threading
N=8
def wait(barrier, p):
barrier.wait()
p.join()
# WRONG due to losing the child's status
assert p.exitcode is None
assert p.is_alive()
def race():
pbarrier = mp.Barrier(2)
# Ensure child has completed
p = mp.Process(target=pbarrier.wait, args=(pbarrier,))
p.start()
pbarrier.wait()
# Steal the child's exit code
pid, sts = os.waitpid(p.pid, 0)
assert pid == p.pid
assert sts == 0
# Since the child is dead, these should not block...
tbarrier = threading.Barrier(N)
threads = [threading.Thread(target=wait, args=(tbarrier, p)) for _ in range(N)]
for t in threads:
t.start()
for t in threads:
t.join()
if __name__ == "__main__":
mp.set_start_method('fork')
race()Does that reproduce for you? I guess it should be tidied up and turned into a unit test...
Thanks! For the record, I very much appreciate your review, and am grateful for your feedback and intelligent questions. I really dislike the sort of overly-complex synchronisation I'm doing here, so I'm particularly grateful for it in this case. I would love to simplify this! |
||
| self.returncode = os.waitstatus_to_exitcode(sts) | ||
| self._set_returncode(sts) | ||
| elif self._exit_blockers == 0: | ||
| self._exit_condition.notify_all() | ||
|
|
||
| # Wait until we get a definitive result, or we know there are no | ||
| # racing calls that might be about to set it | ||
| while self.returncode is None and self._exit_blockers > 0: | ||
| self._exit_condition.wait() | ||
|
|
||
duaneg marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| return self.returncode | ||
|
|
||
| def _nonblocking_poll(self, flag): | ||
| assert self._exit_condition._is_owned() | ||
| assert self.returncode is None | ||
| assert flag & os.WNOHANG == os.WNOHANG | ||
| try: | ||
| pid, sts = os.waitpid(self.pid, flag) | ||
| if pid == self.pid: | ||
| self._set_returncode(sts) | ||
| except OSError: | ||
| # See comments in the poll(...) except clause above | ||
| pass | ||
|
|
||
| # We may be racing with a blocking wait call, in which case (if we lose | ||
| # the race) it is arbitrary whether this returns None or the exit code | ||
| # (if there is one): calling code must always be prepared to handle a | ||
| # situation where this method returns None but the process has ended. | ||
| return self.returncode | ||
|
|
||
| def _set_returncode(self, sts): | ||
| assert self._exit_condition._is_owned() | ||
| assert self.returncode is None | ||
| self.returncode = os.waitstatus_to_exitcode(sts) | ||
| self._exit_condition.notify_all() | ||
|
|
||
| def wait(self, timeout=None): | ||
| if self.returncode is None: | ||
| if timeout is not None: | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,6 @@ | ||
| import io | ||
| import os | ||
| import threading | ||
|
|
||
| from .context import reduction, set_spawning_popen | ||
| if not reduction.HAVE_SEND_HANDLE: | ||
|
|
@@ -32,6 +33,7 @@ class Popen(popen_fork.Popen): | |
|
|
||
| def __init__(self, process_obj): | ||
| self._fds = [] | ||
| self._lock = threading.Lock() | ||
|
||
| super().__init__(process_obj) | ||
|
|
||
| def duplicate_for_child(self, fd): | ||
|
|
@@ -64,11 +66,14 @@ def poll(self, flag=os.WNOHANG): | |
| timeout = 0 if flag == os.WNOHANG else None | ||
| if not wait([self.sentinel], timeout): | ||
| return None | ||
duaneg marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| try: | ||
| self.returncode = forkserver.read_signed(self.sentinel) | ||
| except (OSError, EOFError): | ||
| # This should not happen usually, but perhaps the forkserver | ||
| # process itself got killed | ||
| self.returncode = 255 | ||
|
|
||
| with self._lock: | ||
| if self.returncode is None: | ||
| try: | ||
| self.returncode = forkserver.read_signed(self.sentinel) | ||
| except (OSError, EOFError): | ||
| # This should not happen usually, but perhaps the | ||
| # forkserver process itself got killed | ||
| self.returncode = 255 | ||
|
|
||
| return self.returncode | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| Fix race with ``poll``/``wait``/``join`` in :mod:`multiprocessing`.``Process``. |
Uh oh!
There was an error while loading. Please reload this page.