-
-
Notifications
You must be signed in to change notification settings - Fork 33.6k
gh-141860: Add on_error= keyword arg to multiprocessing.set_forkserver_preload
#141859
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
gpshead
wants to merge
17
commits into
python:main
Choose a base branch
from
gpshead:claude/forkserver-raise-exceptions-019rrqWeqQGL2zKWava8nshj
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
17 commits
Select commit
Hold shift + click to select a range
8b7c3ee
Add raise_exceptions parameter to multiprocessing.set_forkserver_preload
gpshead e8836d5
Skip forkserver preload tests on platforms without fork support
gpshead 5ce91ba
Skip all forkserver tests on platforms without fork support
gpshead 75495cc
Refactor set_forkserver_preload to use on_error parameter
gpshead 84c9e5b
Fix unused import and make __notes__ test more robust
gpshead a399218
Fix warn mode to work when warnings are configured as errors
gpshead 9c3ba84
Change __main__ warning message from 'import' to 'preload'
gpshead 70c05d8
Refactor set_forkserver_preload to use _handle_preload helper
gpshead 045be92
Simplify temporary file handling in tests
gpshead 6d4c521
Remove obvious comments and improve import style in tests
gpshead 30c2cf8
Fix warn mode to work when warnings are configured as errors
gpshead bad9691
Add comments explaining exception catching strategy
gpshead 9d8125f
Use double quotes for string values in documentation
gpshead 2f8edb8
Fix warn mode to work when warnings are configured as errors
gpshead 622345d
Add Gregory P. Smith to NEWS entry contributors
gpshead 42e8eb1
Simplify comments and exception note message
gpshead 64ca5a0
Update _send_value docstring to explain pickling requirement
gpshead File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -42,6 +42,7 @@ def __init__(self): | |
| self._inherited_fds = None | ||
| self._lock = threading.Lock() | ||
| self._preload_modules = ['__main__'] | ||
| self._preload_on_error = 'ignore' | ||
|
|
||
| def _stop(self): | ||
| # Method used by unit tests to stop the server | ||
|
|
@@ -64,11 +65,22 @@ def _stop_unlocked(self): | |
| self._forkserver_address = None | ||
| self._forkserver_authkey = None | ||
|
|
||
| def set_forkserver_preload(self, modules_names): | ||
| '''Set list of module names to try to load in forkserver process.''' | ||
| def set_forkserver_preload(self, modules_names, *, on_error='ignore'): | ||
| '''Set list of module names to try to load in forkserver process. | ||
|
|
||
| The on_error parameter controls how import failures are handled: | ||
| "ignore" (default) silently ignores failures, "warn" emits warnings, | ||
| and "fail" raises exceptions breaking the forkserver context. | ||
| ''' | ||
| if not all(type(mod) is str for mod in modules_names): | ||
| raise TypeError('module_names must be a list of strings') | ||
| if on_error not in ('ignore', 'warn', 'fail'): | ||
| raise ValueError( | ||
| f"on_error must be 'ignore', 'warn', or 'fail', " | ||
| f"not {on_error!r}" | ||
| ) | ||
| self._preload_modules = modules_names | ||
| self._preload_on_error = on_error | ||
|
|
||
| def get_inherited_fds(self): | ||
| '''Return list of fds inherited from parent process. | ||
|
|
@@ -107,6 +119,14 @@ def connect_to_new_process(self, fds): | |
| wrapped_client, self._forkserver_authkey) | ||
| connection.deliver_challenge( | ||
| wrapped_client, self._forkserver_authkey) | ||
| except (EOFError, ConnectionError, BrokenPipeError) as exc: | ||
| if (self._preload_modules and | ||
| self._preload_on_error == 'fail'): | ||
| exc.add_note( | ||
| "Forkserver process may have crashed during module " | ||
| "preloading. Check stderr." | ||
| ) | ||
| raise | ||
| finally: | ||
| wrapped_client._detach() | ||
| del wrapped_client | ||
|
|
@@ -152,6 +172,8 @@ def ensure_running(self): | |
| main_kws['sys_path'] = data['sys_path'] | ||
| if 'init_main_from_path' in data: | ||
| main_kws['main_path'] = data['init_main_from_path'] | ||
| if self._preload_on_error != 'ignore': | ||
| main_kws['on_error'] = self._preload_on_error | ||
|
|
||
| with socket.socket(socket.AF_UNIX) as listener: | ||
| address = connection.arbitrary_address('AF_UNIX') | ||
|
|
@@ -196,8 +218,68 @@ def ensure_running(self): | |
| # | ||
| # | ||
|
|
||
| def _handle_preload(preload, main_path=None, sys_path=None, on_error='ignore'): | ||
| """Handle module preloading with configurable error handling. | ||
|
|
||
| Args: | ||
| preload: List of module names to preload. | ||
| main_path: Path to __main__ module if '__main__' is in preload. | ||
| sys_path: sys.path to use for imports (None means use current). | ||
| on_error: How to handle import errors ("ignore", "warn", or "fail"). | ||
| """ | ||
| if not preload: | ||
| return | ||
|
|
||
| if sys_path is not None: | ||
| sys.path[:] = sys_path | ||
|
|
||
| if '__main__' in preload and main_path is not None: | ||
| process.current_process()._inheriting = True | ||
| try: | ||
| spawn.import_main_path(main_path) | ||
| except Exception as e: | ||
| # Catch broad Exception because import_main_path() uses | ||
| # runpy.run_path() which executes the script and can raise | ||
| # any exception, not just ImportError | ||
| match on_error: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A very minor suggestion, feel free to ignore, but this and the section below could be factored out into a common function to reduce code duplication slightly: diff --git a/Lib/multiprocessing/forkserver.py b/Lib/multiprocessing/forkserver.py
index 7d9033415c0..ede3b32786c 100644
--- a/Lib/multiprocessing/forkserver.py
+++ b/Lib/multiprocessing/forkserver.py
@@ -218,6 +218,21 @@ def ensure_running(self):
#
#
+def _handle_import_error(on_error, modinfo, exc):
+ match on_error:
+ case 'fail':
+ raise
+ case 'warn':
+ import warnings
+ warnings.warn(
+ f"Failed to preload {modinfo}: {exc}",
+ ImportWarning,
+ stacklevel=3
+ )
+ case 'ignore':
+ pass
+
+
def _handle_preload(preload, main_path=None, sys_path=None, on_error='ignore'):
"""Handle module preloading with configurable error handling.
@@ -241,18 +256,7 @@ def _handle_preload(preload, main_path=None, sys_path=None, on_error='ignore'):
# Catch broad Exception because import_main_path() uses
# runpy.run_path() which executes the script and can raise
# any exception, not just ImportError
- match on_error:
- case 'fail':
- raise
- case 'warn':
- import warnings
- warnings.warn(
- f"Failed to preload __main__ from {main_path!r}: {e}",
- ImportWarning,
- stacklevel=2
- )
- case 'ignore':
- pass
+ _handle_import_error(on_error, f"__main__ from {main_path!r}", e)
finally:
del process.current_process()._inheriting
@@ -260,18 +264,7 @@ def _handle_preload(preload, main_path=None, sys_path=None, on_error='ignore'):
try:
__import__(modname)
except ImportError as e:
- match on_error:
- case 'fail':
- raise
- case 'warn':
- import warnings
- warnings.warn(
- f"Failed to preload module {modname!r}: {e}",
- ImportWarning,
- stacklevel=2
- )
- case 'ignore':
- pass
+ _handle_import_error(on_error, f"module {modname!r}", e)
# gh-135335: flush stdout/stderr in case any of the preloaded modules
# wrote to them, otherwise children might inherit buffered data |
||
| case 'fail': | ||
| raise | ||
| case 'warn': | ||
| import warnings | ||
| warnings.warn( | ||
| f"Failed to preload __main__ from {main_path!r}: {e}", | ||
| ImportWarning, | ||
| stacklevel=2 | ||
| ) | ||
| case 'ignore': | ||
| pass | ||
| finally: | ||
| del process.current_process()._inheriting | ||
|
|
||
| for modname in preload: | ||
| try: | ||
| __import__(modname) | ||
| except ImportError as e: | ||
| match on_error: | ||
| case 'fail': | ||
| raise | ||
| case 'warn': | ||
| import warnings | ||
| warnings.warn( | ||
| f"Failed to preload module {modname!r}: {e}", | ||
| ImportWarning, | ||
| stacklevel=2 | ||
| ) | ||
| case 'ignore': | ||
| pass | ||
|
|
||
| # gh-135335: flush stdout/stderr in case any of the preloaded modules | ||
| # wrote to them, otherwise children might inherit buffered data | ||
| util._flush_std_streams() | ||
|
|
||
|
|
||
| def main(listener_fd, alive_r, preload, main_path=None, sys_path=None, | ||
| *, authkey_r=None): | ||
| *, authkey_r=None, on_error='ignore'): | ||
| """Run forkserver.""" | ||
| if authkey_r is not None: | ||
| try: | ||
|
|
@@ -208,24 +290,7 @@ def main(listener_fd, alive_r, preload, main_path=None, sys_path=None, | |
| else: | ||
| authkey = b'' | ||
|
|
||
| if preload: | ||
| if sys_path is not None: | ||
| sys.path[:] = sys_path | ||
| if '__main__' in preload and main_path is not None: | ||
| process.current_process()._inheriting = True | ||
| try: | ||
| spawn.import_main_path(main_path) | ||
| finally: | ||
| del process.current_process()._inheriting | ||
| for modname in preload: | ||
| try: | ||
| __import__(modname) | ||
| except ImportError: | ||
| pass | ||
|
|
||
| # gh-135335: flush stdout/stderr in case any of the preloaded modules | ||
| # wrote to them, otherwise children might inherit buffered data | ||
| util._flush_std_streams() | ||
| _handle_preload(preload, main_path, sys_path, on_error) | ||
|
|
||
| util._close_stdin() | ||
|
|
||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
179 changes: 179 additions & 0 deletions
179
Lib/test/test_multiprocessing_forkserver/test_preload.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,179 @@ | ||
| """Tests for forkserver preload functionality.""" | ||
|
|
||
| import multiprocessing | ||
| import sys | ||
| import tempfile | ||
| import unittest | ||
| from multiprocessing import forkserver | ||
|
|
||
|
|
||
| class TestForkserverPreload(unittest.TestCase): | ||
| """Tests for forkserver preload functionality.""" | ||
|
|
||
| def setUp(self): | ||
| self._saved_warnoptions = sys.warnoptions.copy() | ||
| # Remove warning options that would convert ImportWarning to errors: | ||
| # - 'error' converts all warnings to errors | ||
| # - 'error::ImportWarning' specifically converts ImportWarning | ||
| # Keep other specific options like 'error::BytesWarning' that | ||
| # subprocess's _args_from_interpreter_flags() expects to remove | ||
| sys.warnoptions[:] = [ | ||
| opt for opt in sys.warnoptions | ||
| if opt not in ('error', 'error::ImportWarning') | ||
| ] | ||
| self.ctx = multiprocessing.get_context('forkserver') | ||
| forkserver._forkserver._stop() | ||
|
|
||
| def tearDown(self): | ||
| sys.warnoptions[:] = self._saved_warnoptions | ||
| forkserver._forkserver._stop() | ||
|
|
||
| @staticmethod | ||
| def _send_value(conn, value): | ||
| """Send value through connection. Static method to be picklable as Process target.""" | ||
| conn.send(value) | ||
|
|
||
| def test_preload_on_error_ignore_default(self): | ||
| """Test that invalid modules are silently ignored by default.""" | ||
| self.ctx.set_forkserver_preload(['nonexistent_module_xyz']) | ||
|
|
||
| r, w = self.ctx.Pipe(duplex=False) | ||
| p = self.ctx.Process(target=self._send_value, args=(w, 42)) | ||
| p.start() | ||
| w.close() | ||
| result = r.recv() | ||
| r.close() | ||
| p.join() | ||
gpshead marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| self.assertEqual(result, 42) | ||
| self.assertEqual(p.exitcode, 0) | ||
|
|
||
| def test_preload_on_error_ignore_explicit(self): | ||
| """Test that invalid modules are silently ignored with on_error='ignore'.""" | ||
| self.ctx.set_forkserver_preload(['nonexistent_module_xyz'], on_error='ignore') | ||
|
|
||
| r, w = self.ctx.Pipe(duplex=False) | ||
| p = self.ctx.Process(target=self._send_value, args=(w, 99)) | ||
| p.start() | ||
| w.close() | ||
| result = r.recv() | ||
| r.close() | ||
| p.join() | ||
|
|
||
| self.assertEqual(result, 99) | ||
| self.assertEqual(p.exitcode, 0) | ||
|
|
||
| def test_preload_on_error_warn(self): | ||
| """Test that invalid modules emit warnings with on_error='warn'.""" | ||
| self.ctx.set_forkserver_preload(['nonexistent_module_xyz'], on_error='warn') | ||
|
|
||
| r, w = self.ctx.Pipe(duplex=False) | ||
| p = self.ctx.Process(target=self._send_value, args=(w, 123)) | ||
| p.start() | ||
| w.close() | ||
| result = r.recv() | ||
| r.close() | ||
| p.join() | ||
|
|
||
| self.assertEqual(result, 123) | ||
| self.assertEqual(p.exitcode, 0) | ||
|
|
||
| def test_preload_on_error_fail_breaks_context(self): | ||
| """Test that invalid modules with on_error='fail' breaks the forkserver.""" | ||
| self.ctx.set_forkserver_preload(['nonexistent_module_xyz'], on_error='fail') | ||
|
|
||
| r, w = self.ctx.Pipe(duplex=False) | ||
| try: | ||
| p = self.ctx.Process(target=self._send_value, args=(w, 42)) | ||
| with self.assertRaises((EOFError, ConnectionError, BrokenPipeError)) as cm: | ||
| p.start() | ||
| notes = getattr(cm.exception, '__notes__', []) | ||
| self.assertTrue(notes, "Expected exception to have __notes__") | ||
| self.assertIn('Forkserver process may have crashed', notes[0]) | ||
| finally: | ||
| w.close() | ||
| r.close() | ||
|
|
||
| def test_preload_valid_modules_with_on_error_fail(self): | ||
| """Test that valid modules work fine with on_error='fail'.""" | ||
| self.ctx.set_forkserver_preload(['os', 'sys'], on_error='fail') | ||
|
|
||
| r, w = self.ctx.Pipe(duplex=False) | ||
| p = self.ctx.Process(target=self._send_value, args=(w, 'success')) | ||
| p.start() | ||
| w.close() | ||
| result = r.recv() | ||
| r.close() | ||
| p.join() | ||
|
|
||
| self.assertEqual(result, 'success') | ||
| self.assertEqual(p.exitcode, 0) | ||
|
|
||
| def test_preload_invalid_on_error_value(self): | ||
| """Test that invalid on_error values raise ValueError.""" | ||
| with self.assertRaises(ValueError) as cm: | ||
| self.ctx.set_forkserver_preload(['os'], on_error='invalid') | ||
| self.assertIn("on_error must be 'ignore', 'warn', or 'fail'", str(cm.exception)) | ||
|
|
||
|
|
||
| class TestHandlePreload(unittest.TestCase): | ||
| """Unit tests for _handle_preload() function.""" | ||
|
|
||
| def test_handle_preload_main_on_error_fail(self): | ||
| """Test that __main__ import failures raise with on_error='fail'.""" | ||
| with tempfile.NamedTemporaryFile(mode='w', suffix='.py') as f: | ||
| f.write('raise RuntimeError("test error in __main__")\n') | ||
| f.flush() | ||
| with self.assertRaises(RuntimeError) as cm: | ||
| forkserver._handle_preload(['__main__'], main_path=f.name, on_error='fail') | ||
| self.assertIn("test error in __main__", str(cm.exception)) | ||
|
|
||
| def test_handle_preload_main_on_error_warn(self): | ||
| """Test that __main__ import failures warn with on_error='warn'.""" | ||
| with tempfile.NamedTemporaryFile(mode='w', suffix='.py') as f: | ||
| f.write('raise ImportError("test import error")\n') | ||
| f.flush() | ||
| with self.assertWarns(ImportWarning) as cm: | ||
| forkserver._handle_preload(['__main__'], main_path=f.name, on_error='warn') | ||
| self.assertIn("Failed to preload __main__", str(cm.warning)) | ||
| self.assertIn("test import error", str(cm.warning)) | ||
|
|
||
| def test_handle_preload_main_on_error_ignore(self): | ||
| """Test that __main__ import failures are ignored with on_error='ignore'.""" | ||
| with tempfile.NamedTemporaryFile(mode='w', suffix='.py') as f: | ||
| f.write('raise ImportError("test import error")\n') | ||
| f.flush() | ||
| forkserver._handle_preload(['__main__'], main_path=f.name, on_error='ignore') | ||
|
|
||
| def test_handle_preload_main_valid(self): | ||
| """Test that valid __main__ preload works.""" | ||
| with tempfile.NamedTemporaryFile(mode='w', suffix='.py') as f: | ||
| f.write('test_var = 42\n') | ||
| f.flush() | ||
| forkserver._handle_preload(['__main__'], main_path=f.name, on_error='fail') | ||
|
|
||
| def test_handle_preload_module_on_error_fail(self): | ||
| """Test that module import failures raise with on_error='fail'.""" | ||
| with self.assertRaises(ModuleNotFoundError): | ||
| forkserver._handle_preload(['nonexistent_test_module_xyz'], on_error='fail') | ||
|
|
||
| def test_handle_preload_module_on_error_warn(self): | ||
| """Test that module import failures warn with on_error='warn'.""" | ||
| with self.assertWarns(ImportWarning) as cm: | ||
| forkserver._handle_preload(['nonexistent_test_module_xyz'], on_error='warn') | ||
| self.assertIn("Failed to preload module", str(cm.warning)) | ||
|
|
||
| def test_handle_preload_module_on_error_ignore(self): | ||
| """Test that module import failures are ignored with on_error='ignore'.""" | ||
| forkserver._handle_preload(['nonexistent_test_module_xyz'], on_error='ignore') | ||
|
|
||
| def test_handle_preload_combined(self): | ||
| """Test preloading both __main__ and modules.""" | ||
| with tempfile.NamedTemporaryFile(mode='w', suffix='.py') as f: | ||
| f.write('import sys\n') | ||
| f.flush() | ||
| forkserver._handle_preload(['__main__', 'os', 'sys'], main_path=f.name, on_error='fail') | ||
|
|
||
|
|
||
| if __name__ == '__main__': | ||
| unittest.main() | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@duaneg - any thoughts on what the right thing to do for exceptions thrown from fork server preloading
__main__is? we were not catching and ignoring them before, this PR by adding the feature to control what we do with exceptions is currently oriented towards being consistent across all preloads. Q: Should__main__be treated differently?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My intuition is to be consistent, but I've not got enough real-world experience with its use to back that up with anything more. I might know someone who does, though: I just came back from Kiwi PyCon, where there was a great talk by a dev using
forkserverat scale in production. I'll drop them a note and see if they have an opinion.FWIW I'm looking through the patch set at the moment and I like the look of the changes. A couple of things I've noticed so far: some succeeding tests are dumping warnings and errors to stderr, which isn't ideal, but I'm not sure how practical it would be to suppress them. I guess it would require the tests to redirect
stderrwhen spawning theforkserver, but I think the functionality to do that would have to be added to the production code and it looks like it would get quite messy. How much do we care about noisy tests, in general?I also tried to run a ref leak check, but the
TestForkserverPreloadtests are failing on the second run due to missing generated__main__modules. I'm looking into what is going wrong there. TheTestHandlePreloadtests are leaking references, but perhaps that is inevitable given they are doing imports?I'll try and do a more thorough review over the next couple of days.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the noise could be addressed but i'll let that sit for a bit while i think about it. We know when the forkserver is going to start so temporarily dup'ing the stderr fd that it will inherit to a pipe we control and capture is feasible. Sometimes complication like that is just complication that makes life more difficult though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah right, so we can setup fd 2 to being a pipe in the test code before starting the fork server, and restore the original after ofc. That makes sense, and no need to hack the production code after all. It is a bit of a faff, but if it is contained to the test code perhaps it isn't so bad.
I just tested this, and it does indeed seem work! This needs cleaning up and finishing off, but it looks promising. I'm not sure what the best way to show a proposed change here is, but here is the diff:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cleaned up the stderr redirect, error handling is still not perfect (e.g. if an
os.dup2call fails some descriptors won't be closed), but it probably doesn't matter.The errors when running a reference leak check are because
_handle_preloadcallsspawn.import_main_pathwhich calls_fixup_main_from_path, which replaces the__main__module insys.moduleswith a new__mp_main__one. This change remains after the test finishes, and subsequent preload tests think they need to load__main__from the generated test file it refers to. This can be fixed by storing the current module insetUpand restoring it intearDown.The reference leaks are because when the
__main__module is replaced the old one is appended to a list (spawn.old_main_modules). They can be fixed by clearing the list intearDown, but as the list is never used (and as far as I can tell has never been used since it was introduced) perhaps we can just get rid of it?Updated diff:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Following up, FWIW the power user I talked to about this agreed it should be treated consistently.