From 9fc1d36f43ef172a0cb9926529c7b5a8bf2a7c7e Mon Sep 17 00:00:00 2001 From: dandrsantos Date: Sun, 30 Nov 2025 07:11:25 +0000 Subject: [PATCH 1/8] feat: add Jupyter notebook detection and stderr helpers - Add _is_jupyter_notebook() to detect Jupyter/IPython environments - Add _print_stderr() to handle stderr output with IPython display support --- src/mcp/client/stdio/__init__.py | 100 +++++++++++++++++++++++++- src/mcp/os/win32/utilities.py | 41 ++++++++--- tests/client/test_stdio.py | 119 ++++++++++++++++++++++++++++++- 3 files changed, 247 insertions(+), 13 deletions(-) diff --git a/src/mcp/client/stdio/__init__.py b/src/mcp/client/stdio/__init__.py index 0d76bb958b..61f2e435d2 100644 --- a/src/mcp/client/stdio/__init__.py +++ b/src/mcp/client/stdio/__init__.py @@ -1,5 +1,6 @@ import logging import os +import subprocess import sys from contextlib import asynccontextmanager from pathlib import Path @@ -48,6 +49,47 @@ PROCESS_TERMINATION_TIMEOUT = 2.0 +def _is_jupyter_notebook() -> bool: + """ + Detect if running in a Jupyter notebook or IPython environment. + + Returns: + bool: True if running in Jupyter/IPython, False otherwise + """ + try: + from IPython import get_ipython + + ipython = get_ipython() + return ipython is not None and ipython.__class__.__name__ in ("ZMQInteractiveShell", "TerminalInteractiveShell") + except ImportError: + return False + + +def _print_stderr(line: str, errlog: TextIO) -> None: + """ + Print stderr output, using IPython's display system if in Jupyter notebook. + + Args: + line: The line to print + errlog: The fallback TextIO stream (used when not in Jupyter) + """ + if _is_jupyter_notebook(): + try: + from IPython.display import HTML, display + + # Use IPython's display system with red color for stderr + # This ensures proper rendering in Jupyter notebooks + display(HTML(f'
{line}
')) + except Exception: + # If IPython display fails, fall back to regular print + # Log the error but continue (non-critical) + logger.debug("Failed to use IPython display for stderr, falling back to print", exc_info=True) + print(line, file=errlog) + else: + # Not in Jupyter, use standard stderr redirection + print(line, file=errlog) + + def get_default_environment() -> dict[str, str]: """ Returns a default environment object including only environment variables deemed @@ -107,6 +149,18 @@ async def stdio_client(server: StdioServerParameters, errlog: TextIO = sys.stder """ Client transport for stdio: this will connect to a server by spawning a process and communicating with it over stdin/stdout. + + This function automatically handles stderr output in a way that is compatible + with Jupyter notebook environments. When running in Jupyter, stderr output + is displayed using IPython's display system with red color formatting. + When not in Jupyter, stderr is redirected to the provided errlog stream + (defaults to sys.stderr). + + Args: + server: Parameters for the server process to spawn + errlog: TextIO stream for stderr output when not in Jupyter (defaults to sys.stderr). + This parameter is kept for backward compatibility but may be ignored + when running in Jupyter notebook environments. """ read_stream: MemoryObjectReceiveStream[SessionMessage | Exception] read_stream_writer: MemoryObjectSendStream[SessionMessage | Exception] @@ -179,12 +233,49 @@ async def stdin_writer(): except anyio.ClosedResourceError: # pragma: no cover await anyio.lowlevel.checkpoint() + async def stderr_reader(): + """Read stderr from the process and display it appropriately.""" + if not process.stderr: + return + + try: + buffer = "" + async for chunk in TextReceiveStream( + process.stderr, + encoding=server.encoding, + errors=server.encoding_error_handler, + ): + lines = (buffer + chunk).split("\n") + buffer = lines.pop() + + for line in lines: + if line.strip(): # Only print non-empty lines + try: + _print_stderr(line, errlog) + except Exception: + # Log errors but continue (non-critical) + logger.debug("Failed to print stderr line", exc_info=True) + + # Print any remaining buffer content + if buffer.strip(): + try: + _print_stderr(buffer, errlog) + except Exception: + logger.debug("Failed to print final stderr buffer", exc_info=True) + except anyio.ClosedResourceError: # pragma: no cover + await anyio.lowlevel.checkpoint() + except Exception: + # Log errors but continue (non-critical) + logger.debug("Error reading stderr", exc_info=True) + async with ( anyio.create_task_group() as tg, process, ): tg.start_soon(stdout_reader) tg.start_soon(stdin_writer) + if process.stderr: + tg.start_soon(stderr_reader) try: yield read_stream, write_stream finally: @@ -244,14 +335,19 @@ async def _create_platform_compatible_process( Unix: Creates process in a new session/process group for killpg support Windows: Creates process in a Job Object for reliable child termination + + Note: stderr is piped (not redirected) to allow async reading for Jupyter + notebook compatibility. The errlog parameter is kept for backward compatibility + but is only used when not in Jupyter environments. """ if sys.platform == "win32": # pragma: no cover - process = await create_windows_process(command, args, env, errlog, cwd) + process = await create_windows_process(command, args, env, errlog, cwd, pipe_stderr=True) else: + # Pipe stderr instead of redirecting to allow async reading process = await anyio.open_process( [command, *args], env=env, - stderr=errlog, + stderr=subprocess.PIPE, cwd=cwd, start_new_session=True, ) # pragma: no cover diff --git a/src/mcp/os/win32/utilities.py b/src/mcp/os/win32/utilities.py index 962be0229b..d61d9475ea 100644 --- a/src/mcp/os/win32/utilities.py +++ b/src/mcp/os/win32/utilities.py @@ -70,7 +70,7 @@ class FallbackProcess: A fallback process wrapper for Windows to handle async I/O when using subprocess.Popen, which provides sync-only FileIO objects. - This wraps stdin and stdout into async-compatible + This wraps stdin, stdout, and stderr into async-compatible streams (FileReadStream, FileWriteStream), so that MCP clients expecting async streams can work properly. """ @@ -79,10 +79,12 @@ def __init__(self, popen_obj: subprocess.Popen[bytes]): self.popen: subprocess.Popen[bytes] = popen_obj self.stdin_raw = popen_obj.stdin # type: ignore[assignment] self.stdout_raw = popen_obj.stdout # type: ignore[assignment] - self.stderr = popen_obj.stderr # type: ignore[assignment] + self.stderr_raw = popen_obj.stderr # type: ignore[assignment] self.stdin = FileWriteStream(cast(BinaryIO, self.stdin_raw)) if self.stdin_raw else None self.stdout = FileReadStream(cast(BinaryIO, self.stdout_raw)) if self.stdout_raw else None + # Wrap stderr in async stream if it's piped (for Jupyter compatibility) + self.stderr = FileReadStream(cast(BinaryIO, self.stderr_raw)) if self.stderr_raw else None async def __aenter__(self): """Support async context manager entry.""" @@ -103,12 +105,14 @@ async def __aexit__( await self.stdin.aclose() if self.stdout: await self.stdout.aclose() + if self.stderr: + await self.stderr.aclose() if self.stdin_raw: self.stdin_raw.close() if self.stdout_raw: self.stdout_raw.close() - if self.stderr: - self.stderr.close() + if self.stderr_raw: + self.stderr_raw.close() async def wait(self): """Async wait for process completion.""" @@ -139,6 +143,7 @@ async def create_windows_process( env: dict[str, str] | None = None, errlog: TextIO | None = sys.stderr, cwd: Path | str | None = None, + pipe_stderr: bool = False, ) -> Process | FallbackProcess: """ Creates a subprocess in a Windows-compatible way with Job Object support. @@ -155,8 +160,11 @@ async def create_windows_process( command (str): The executable to run args (list[str]): List of command line arguments env (dict[str, str] | None): Environment variables - errlog (TextIO | None): Where to send stderr output (defaults to sys.stderr) + errlog (TextIO | None): Where to send stderr output (defaults to sys.stderr). + Only used when pipe_stderr is False. cwd (Path | str | None): Working directory for the subprocess + pipe_stderr (bool): If True, pipe stderr instead of redirecting to errlog. + This allows async reading of stderr for Jupyter compatibility. Returns: Process | FallbackProcess: Async-compatible subprocess with stdin and stdout streams @@ -164,6 +172,8 @@ async def create_windows_process( job = _create_job_object() process = None + stderr_target = subprocess.PIPE if pipe_stderr else errlog + try: # First try using anyio with Windows-specific flags to hide console window process = await anyio.open_process( @@ -173,18 +183,18 @@ async def create_windows_process( creationflags=subprocess.CREATE_NO_WINDOW # type: ignore if hasattr(subprocess, "CREATE_NO_WINDOW") else 0, - stderr=errlog, + stderr=stderr_target, cwd=cwd, ) except NotImplementedError: # If Windows doesn't support async subprocess creation, use fallback - process = await _create_windows_fallback_process(command, args, env, errlog, cwd) + process = await _create_windows_fallback_process(command, args, env, errlog, cwd, pipe_stderr=pipe_stderr) except Exception: # Try again without creation flags process = await anyio.open_process( [command, *args], env=env, - stderr=errlog, + stderr=stderr_target, cwd=cwd, ) @@ -198,19 +208,30 @@ async def _create_windows_fallback_process( env: dict[str, str] | None = None, errlog: TextIO | None = sys.stderr, cwd: Path | str | None = None, + pipe_stderr: bool = False, ) -> FallbackProcess: """ Create a subprocess using subprocess.Popen as a fallback when anyio fails. This function wraps the sync subprocess.Popen in an async-compatible interface. + + Args: + command: The executable to run + args: List of command line arguments + env: Environment variables + errlog: Where to send stderr output (only used when pipe_stderr is False) + cwd: Working directory for the subprocess + pipe_stderr: If True, pipe stderr instead of redirecting to errlog """ + stderr_target = subprocess.PIPE if pipe_stderr else errlog + try: # Try launching with creationflags to avoid opening a new console window popen_obj = subprocess.Popen( [command, *args], stdin=subprocess.PIPE, stdout=subprocess.PIPE, - stderr=errlog, + stderr=stderr_target, env=env, cwd=cwd, bufsize=0, # Unbuffered output @@ -222,7 +243,7 @@ async def _create_windows_fallback_process( [command, *args], stdin=subprocess.PIPE, stdout=subprocess.PIPE, - stderr=errlog, + stderr=stderr_target, env=env, cwd=cwd, bufsize=0, diff --git a/tests/client/test_stdio.py b/tests/client/test_stdio.py index ce6c85962d..ed7071858b 100644 --- a/tests/client/test_stdio.py +++ b/tests/client/test_stdio.py @@ -1,16 +1,24 @@ import errno +import io import os import shutil import sys import tempfile import textwrap import time +from unittest.mock import MagicMock, patch import anyio import pytest from mcp.client.session import ClientSession -from mcp.client.stdio import StdioServerParameters, _create_platform_compatible_process, stdio_client +from mcp.client.stdio import ( + StdioServerParameters, + _create_platform_compatible_process, + _is_jupyter_notebook, + _print_stderr, + stdio_client, +) from mcp.shared.exceptions import McpError from mcp.shared.message import SessionMessage from mcp.types import CONNECTION_CLOSED, JSONRPCMessage, JSONRPCRequest, JSONRPCResponse @@ -630,3 +638,112 @@ def sigterm_handler(signum, frame): f"stdio_client cleanup took {elapsed:.1f} seconds for stdin-ignoring process. " f"Expected between 2-4 seconds (2s stdin timeout + termination time)." ) + + +@pytest.mark.anyio +async def test_stderr_capture(): + """Test that stderr output from the server process is captured and displayed.""" + # Create a Python script that writes to stderr + script_content = textwrap.dedent( + """ + import sys + import time + + # Write to stderr + print("starting echo server", file=sys.stderr, flush=True) + time.sleep(0.1) + print("another stderr line", file=sys.stderr, flush=True) + + # Keep running to read stdin + try: + while True: + line = sys.stdin.readline() + if not line: + break + except: + pass + """ + ) + + server_params = StdioServerParameters( + command=sys.executable, + args=["-c", script_content], + ) + + # Capture stderr output + stderr_capture = io.StringIO() + + async with stdio_client(server_params, errlog=stderr_capture) as (_, _): + # Give the process time to write to stderr + await anyio.sleep(0.3) + + # Check that stderr was captured + stderr_output = stderr_capture.getvalue() + assert "starting echo server" in stderr_output or "another stderr line" in stderr_output + + +@pytest.mark.anyio +async def test_stderr_piped_in_process(): + """Test that stderr is piped (not redirected) when creating processes.""" + # Create a script that writes to stderr + script_content = textwrap.dedent( + """ + import sys + print("stderr output", file=sys.stderr, flush=True) + sys.exit(0) + """ + ) + + process = await _create_platform_compatible_process( + sys.executable, + ["-c", script_content], + ) + + # Verify stderr is piped (process.stderr should exist) + assert process.stderr is not None, "stderr should be piped, not redirected" + + # Clean up + await process.wait() + + +def test_is_jupyter_notebook_detection(): + """Test Jupyter notebook detection.""" + # When not in Jupyter, should return False + # (This test verifies the function doesn't crash when IPython is not available) + result = _is_jupyter_notebook() + # In test environment, IPython is likely not available, so should be False + assert isinstance(result, bool) + + +def test_print_stderr_non_jupyter(): + """Test stderr printing when not in Jupyter.""" + stderr_capture = io.StringIO() + _print_stderr("test error message", stderr_capture) + + assert "test error message" in stderr_capture.getvalue() + + +def test_print_stderr_jupyter(): + """Test stderr printing when in Jupyter using IPython display.""" + # Mock the Jupyter detection and IPython display + with patch("mcp.client.stdio._is_jupyter_notebook", return_value=True), patch( + "IPython.display.display" + ) as mock_display, patch("IPython.display.HTML") as mock_html: + _print_stderr("test error message", sys.stderr) + + # Verify IPython display was called + mock_html.assert_called_once() + mock_display.assert_called_once() + + +def test_print_stderr_jupyter_fallback(): + """Test stderr printing falls back to regular print if IPython display fails.""" + stderr_capture = io.StringIO() + + with patch("mcp.client.stdio._is_jupyter_notebook", return_value=True), patch( + "IPython.display.display", side_effect=Exception("Display failed") + ): + _print_stderr("test error message", stderr_capture) + + # Should fall back to regular print + assert "test error message" in stderr_capture.getvalue() From 54b3817a409d0b293d4e8cf8a35a666a7221091a Mon Sep 17 00:00:00 2001 From: dandrsantos Date: Sun, 30 Nov 2025 07:24:31 +0000 Subject: [PATCH 2/8] test: fix IPython import mocking in Jupyter stderr tests - Mock builtins.__import__ instead of patching IPython.display directly - Handle 'from IPython.display import HTML, display' import pattern - Tests now work without IPython installed in test environment --- tests/client/test_stdio.py | 56 ++++++++++++++++++++++++++++++++++---- 1 file changed, 51 insertions(+), 5 deletions(-) diff --git a/tests/client/test_stdio.py b/tests/client/test_stdio.py index ed7071858b..72e518e89a 100644 --- a/tests/client/test_stdio.py +++ b/tests/client/test_stdio.py @@ -726,22 +726,68 @@ def test_print_stderr_non_jupyter(): def test_print_stderr_jupyter(): """Test stderr printing when in Jupyter using IPython display.""" # Mock the Jupyter detection and IPython display + # We need to mock the import inside the function since IPython may not be installed + mock_html_class = MagicMock() + mock_display_func = MagicMock() + + # Create a mock module structure that matches "from IPython.display import HTML, display" + mock_display_module = MagicMock() + mock_display_module.HTML = mock_html_class + mock_display_module.display = mock_display_func + + # Create mock IPython module with display submodule + mock_ipython_module = MagicMock() + mock_ipython_module.display = mock_display_module + + original_import = __import__ + + def mock_import(name, globals=None, locals=None, fromlist=(), level=0): + if name == "IPython.display": + return mock_display_module + if name == "IPython": + return mock_ipython_module + # For other imports, use real import + return original_import(name, globals, locals, fromlist, level) + with patch("mcp.client.stdio._is_jupyter_notebook", return_value=True), patch( - "IPython.display.display" - ) as mock_display, patch("IPython.display.HTML") as mock_html: + "builtins.__import__", side_effect=mock_import + ): _print_stderr("test error message", sys.stderr) # Verify IPython display was called - mock_html.assert_called_once() - mock_display.assert_called_once() + mock_html_class.assert_called_once() + mock_display_func.assert_called_once() def test_print_stderr_jupyter_fallback(): """Test stderr printing falls back to regular print if IPython display fails.""" stderr_capture = io.StringIO() + + # Mock IPython import to raise exception on display + mock_html_class = MagicMock() + mock_display_func = MagicMock(side_effect=Exception("Display failed")) + + # Create a mock module structure that matches "from IPython.display import HTML, display" + mock_display_module = MagicMock() + mock_display_module.HTML = mock_html_class + mock_display_module.display = mock_display_func + + # Create mock IPython module with display submodule + mock_ipython_module = MagicMock() + mock_ipython_module.display = mock_display_module + + original_import = __import__ + + def mock_import(name, globals=None, locals=None, fromlist=(), level=0): + if name == "IPython.display": + return mock_display_module + if name == "IPython": + return mock_ipython_module + # For other imports, use real import + return original_import(name, globals, locals, fromlist, level) with patch("mcp.client.stdio._is_jupyter_notebook", return_value=True), patch( - "IPython.display.display", side_effect=Exception("Display failed") + "builtins.__import__", side_effect=mock_import ): _print_stderr("test error message", stderr_capture) From 5bdaf056775aa95719204ded9b64a51537f69e9f Mon Sep 17 00:00:00 2001 From: dandrsantos Date: Sun, 30 Nov 2025 07:29:19 +0000 Subject: [PATCH 3/8] test: add coverage for missing lines in stderr handling - Add test for _is_jupyter_notebook with valid IPython instances - Add test for stderr_reader exception handling - Improve test coverage to reach 100% --- tests/client/test_stdio.py | 100 +++++++++++++++++++++++++++++++++++++ 1 file changed, 100 insertions(+) diff --git a/tests/client/test_stdio.py b/tests/client/test_stdio.py index 72e518e89a..34dd31baf1 100644 --- a/tests/client/test_stdio.py +++ b/tests/client/test_stdio.py @@ -714,6 +714,48 @@ def test_is_jupyter_notebook_detection(): # In test environment, IPython is likely not available, so should be False assert isinstance(result, bool) + # Test when IPython is available and returns ZMQInteractiveShell + mock_ipython = MagicMock() + mock_ipython.__class__.__name__ = "ZMQInteractiveShell" + + # Mock the import inside the function + def mock_import(name, globals=None, locals=None, fromlist=(), level=0): + if name == "IPython": + mock_ipython_module = MagicMock() + mock_ipython_module.get_ipython = MagicMock(return_value=mock_ipython) + return mock_ipython_module + # For other imports, use real import + import builtins + return builtins.__import__(name, globals, locals, fromlist, level) + + with patch("builtins.__import__", side_effect=mock_import): + # Re-import to get fresh function that will use the mocked import + import importlib + import mcp.client.stdio + + importlib.reload(mcp.client.stdio) + assert mcp.client.stdio._is_jupyter_notebook() + + # Test when IPython is available and returns TerminalInteractiveShell + mock_ipython = MagicMock() + mock_ipython.__class__.__name__ = "TerminalInteractiveShell" + + def mock_import2(name, globals=None, locals=None, fromlist=(), level=0): + if name == "IPython": + mock_ipython_module = MagicMock() + mock_ipython_module.get_ipython = MagicMock(return_value=mock_ipython) + return mock_ipython_module + # For other imports, use real import + import builtins + return builtins.__import__(name, globals, locals, fromlist, level) + + with patch("builtins.__import__", side_effect=mock_import2): + import importlib + import mcp.client.stdio + + importlib.reload(mcp.client.stdio) + assert mcp.client.stdio._is_jupyter_notebook() + def test_print_stderr_non_jupyter(): """Test stderr printing when not in Jupyter.""" @@ -793,3 +835,61 @@ def mock_import(name, globals=None, locals=None, fromlist=(), level=0): # Should fall back to regular print assert "test error message" in stderr_capture.getvalue() + + +@pytest.mark.anyio +async def test_stderr_reader_no_stderr(): + """Test stderr_reader when process has no stderr stream.""" + # Create a process without stderr + script_content = textwrap.dedent( + """ + import sys + print("stdout only", flush=True) + sys.exit(0) + """ + ) + + server_params = StdioServerParameters( + command=sys.executable, + args=["-c", script_content], + ) + + # The stderr_reader should handle None stderr gracefully + async with stdio_client(server_params) as (_, _): + # Give it a moment to start + await anyio.sleep(0.1) + + +@pytest.mark.anyio +async def test_stderr_reader_exception_handling(): + """Test stderr_reader handles exceptions gracefully.""" + # Create a script that writes to stderr + script_content = textwrap.dedent( + """ + import sys + import time + print("stderr line 1", file=sys.stderr, flush=True) + time.sleep(0.1) + print("stderr line 2", file=sys.stderr, flush=True) + # Keep running + try: + while True: + line = sys.stdin.readline() + if not line: + break + except: + pass + """ + ) + + server_params = StdioServerParameters( + command=sys.executable, + args=["-c", script_content], + ) + + # Mock _print_stderr to raise an exception to test error handling + with patch("mcp.client.stdio._print_stderr", side_effect=Exception("Print failed")): + async with stdio_client(server_params) as (_, _): + # Give it time to process stderr + await anyio.sleep(0.3) + # Should not crash, just log the error From 06e21f2e70060258f5ef4261ffd8ff66d9892f47 Mon Sep 17 00:00:00 2001 From: dandrsantos Date: Sun, 30 Nov 2025 07:33:33 +0000 Subject: [PATCH 4/8] test: fix recursion error and invalid JSON in tests - Store original __import__ before patching to avoid recursion - Fix test_stderr_reader_no_stderr to not print to stdout - Apply ruff formatting fixes --- tests/client/test_stdio.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/tests/client/test_stdio.py b/tests/client/test_stdio.py index 34dd31baf1..570aafa12e 100644 --- a/tests/client/test_stdio.py +++ b/tests/client/test_stdio.py @@ -840,12 +840,19 @@ def mock_import(name, globals=None, locals=None, fromlist=(), level=0): @pytest.mark.anyio async def test_stderr_reader_no_stderr(): """Test stderr_reader when process has no stderr stream.""" - # Create a process without stderr + # Create a process that just exits without printing anything to stdout + # (to avoid breaking JSON parsing in stdout_reader) script_content = textwrap.dedent( """ import sys - print("stdout only", flush=True) - sys.exit(0) + import time + # Write to stderr only + print("stderr message", file=sys.stderr, flush=True) + # Wait for stdin to close + try: + sys.stdin.read() + except: + pass """ ) @@ -854,7 +861,7 @@ async def test_stderr_reader_no_stderr(): args=["-c", script_content], ) - # The stderr_reader should handle None stderr gracefully + # The stderr_reader should handle stderr gracefully async with stdio_client(server_params) as (_, _): # Give it a moment to start await anyio.sleep(0.1) From c83b5ed38dc2a891d27c8d4983e6c41b40c1e3d4 Mon Sep 17 00:00:00 2001 From: dandrsantos Date: Sun, 30 Nov 2025 07:34:51 +0000 Subject: [PATCH 5/8] test: fix recursion in _is_jupyter_notebook test --- tests/client/test_stdio.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/tests/client/test_stdio.py b/tests/client/test_stdio.py index 570aafa12e..247fbcbe22 100644 --- a/tests/client/test_stdio.py +++ b/tests/client/test_stdio.py @@ -715,9 +715,12 @@ def test_is_jupyter_notebook_detection(): assert isinstance(result, bool) # Test when IPython is available and returns ZMQInteractiveShell + # Store the original import before patching to avoid recursion + original_import = __import__ + mock_ipython = MagicMock() mock_ipython.__class__.__name__ = "ZMQInteractiveShell" - + # Mock the import inside the function def mock_import(name, globals=None, locals=None, fromlist=(), level=0): if name == "IPython": @@ -725,9 +728,8 @@ def mock_import(name, globals=None, locals=None, fromlist=(), level=0): mock_ipython_module.get_ipython = MagicMock(return_value=mock_ipython) return mock_ipython_module # For other imports, use real import - import builtins - return builtins.__import__(name, globals, locals, fromlist, level) - + return original_import(name, globals, locals, fromlist, level) + with patch("builtins.__import__", side_effect=mock_import): # Re-import to get fresh function that will use the mocked import import importlib @@ -739,16 +741,15 @@ def mock_import(name, globals=None, locals=None, fromlist=(), level=0): # Test when IPython is available and returns TerminalInteractiveShell mock_ipython = MagicMock() mock_ipython.__class__.__name__ = "TerminalInteractiveShell" - + def mock_import2(name, globals=None, locals=None, fromlist=(), level=0): if name == "IPython": mock_ipython_module = MagicMock() mock_ipython_module.get_ipython = MagicMock(return_value=mock_ipython) return mock_ipython_module # For other imports, use real import - import builtins - return builtins.__import__(name, globals, locals, fromlist, level) - + return original_import(name, globals, locals, fromlist, level) + with patch("builtins.__import__", side_effect=mock_import2): import importlib import mcp.client.stdio From 850fba28366819a89c4132e7403a05d459f4f18d Mon Sep 17 00:00:00 2001 From: dandrsantos Date: Sun, 30 Nov 2025 07:47:11 +0000 Subject: [PATCH 6/8] style: fix import sorting in test_stdio.py --- tests/client/test_stdio.py | 32 ++++++++++++++++++-------------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/tests/client/test_stdio.py b/tests/client/test_stdio.py index 247fbcbe22..9aa25a749d 100644 --- a/tests/client/test_stdio.py +++ b/tests/client/test_stdio.py @@ -733,6 +733,7 @@ def mock_import(name, globals=None, locals=None, fromlist=(), level=0): with patch("builtins.__import__", side_effect=mock_import): # Re-import to get fresh function that will use the mocked import import importlib + import mcp.client.stdio importlib.reload(mcp.client.stdio) @@ -752,6 +753,7 @@ def mock_import2(name, globals=None, locals=None, fromlist=(), level=0): with patch("builtins.__import__", side_effect=mock_import2): import importlib + import mcp.client.stdio importlib.reload(mcp.client.stdio) @@ -772,18 +774,18 @@ def test_print_stderr_jupyter(): # We need to mock the import inside the function since IPython may not be installed mock_html_class = MagicMock() mock_display_func = MagicMock() - + # Create a mock module structure that matches "from IPython.display import HTML, display" mock_display_module = MagicMock() mock_display_module.HTML = mock_html_class mock_display_module.display = mock_display_func - + # Create mock IPython module with display submodule mock_ipython_module = MagicMock() mock_ipython_module.display = mock_display_module - + original_import = __import__ - + def mock_import(name, globals=None, locals=None, fromlist=(), level=0): if name == "IPython.display": return mock_display_module @@ -791,9 +793,10 @@ def mock_import(name, globals=None, locals=None, fromlist=(), level=0): return mock_ipython_module # For other imports, use real import return original_import(name, globals, locals, fromlist, level) - - with patch("mcp.client.stdio._is_jupyter_notebook", return_value=True), patch( - "builtins.__import__", side_effect=mock_import + + with ( + patch("mcp.client.stdio._is_jupyter_notebook", return_value=True), + patch("builtins.__import__", side_effect=mock_import), ): _print_stderr("test error message", sys.stderr) @@ -805,22 +808,22 @@ def mock_import(name, globals=None, locals=None, fromlist=(), level=0): def test_print_stderr_jupyter_fallback(): """Test stderr printing falls back to regular print if IPython display fails.""" stderr_capture = io.StringIO() - + # Mock IPython import to raise exception on display mock_html_class = MagicMock() mock_display_func = MagicMock(side_effect=Exception("Display failed")) - + # Create a mock module structure that matches "from IPython.display import HTML, display" mock_display_module = MagicMock() mock_display_module.HTML = mock_html_class mock_display_module.display = mock_display_func - + # Create mock IPython module with display submodule mock_ipython_module = MagicMock() mock_ipython_module.display = mock_display_module - + original_import = __import__ - + def mock_import(name, globals=None, locals=None, fromlist=(), level=0): if name == "IPython.display": return mock_display_module @@ -829,8 +832,9 @@ def mock_import(name, globals=None, locals=None, fromlist=(), level=0): # For other imports, use real import return original_import(name, globals, locals, fromlist, level) - with patch("mcp.client.stdio._is_jupyter_notebook", return_value=True), patch( - "builtins.__import__", side_effect=mock_import + with ( + patch("mcp.client.stdio._is_jupyter_notebook", return_value=True), + patch("builtins.__import__", side_effect=mock_import), ): _print_stderr("test error message", stderr_capture) From c17e2cd8014f819c5ee122d4bdd52236c9f1aadc Mon Sep 17 00:00:00 2001 From: dandrsantos Date: Sun, 30 Nov 2025 07:54:37 +0000 Subject: [PATCH 7/8] refactor: extract helper functions to reduce stdio_client complexity --- src/mcp/client/stdio/__init__.py | 182 +++++++++++++++++-------------- 1 file changed, 101 insertions(+), 81 deletions(-) diff --git a/src/mcp/client/stdio/__init__.py b/src/mcp/client/stdio/__init__.py index 61f2e435d2..fc490381d2 100644 --- a/src/mcp/client/stdio/__init__.py +++ b/src/mcp/client/stdio/__init__.py @@ -144,6 +144,104 @@ class StdioServerParameters(BaseModel): """ +async def _stdout_reader( + process: Process | FallbackProcess, + read_stream_writer: MemoryObjectSendStream[SessionMessage | Exception], + encoding: str, + encoding_error_handler: str, +): + """Read stdout from the process and parse JSONRPC messages.""" + assert process.stdout, "Opened process is missing stdout" + + try: + async with read_stream_writer: + buffer = "" + async for chunk in TextReceiveStream( + process.stdout, + encoding=encoding, + errors=encoding_error_handler, + ): + lines = (buffer + chunk).split("\n") + buffer = lines.pop() + + for line in lines: + try: + message = types.JSONRPCMessage.model_validate_json(line) + except Exception as exc: # pragma: no cover + logger.exception("Failed to parse JSONRPC message from server") + await read_stream_writer.send(exc) + continue + + session_message = SessionMessage(message) + await read_stream_writer.send(session_message) + except anyio.ClosedResourceError: # pragma: no cover + await anyio.lowlevel.checkpoint() + + +async def _stdin_writer( + process: Process | FallbackProcess, + write_stream_reader: MemoryObjectReceiveStream[SessionMessage], + encoding: str, + encoding_error_handler: str, +): + """Write session messages to the process stdin.""" + assert process.stdin, "Opened process is missing stdin" + + try: + async with write_stream_reader: + async for session_message in write_stream_reader: + json = session_message.message.model_dump_json(by_alias=True, exclude_none=True) + await process.stdin.send( + (json + "\n").encode( + encoding=encoding, + errors=encoding_error_handler, + ) + ) + except anyio.ClosedResourceError: # pragma: no cover + await anyio.lowlevel.checkpoint() + + +async def _stderr_reader( + process: Process | FallbackProcess, + errlog: TextIO, + encoding: str, + encoding_error_handler: str, +): + """Read stderr from the process and display it appropriately.""" + if not process.stderr: + return + + try: + buffer = "" + async for chunk in TextReceiveStream( + process.stderr, + encoding=encoding, + errors=encoding_error_handler, + ): + lines = (buffer + chunk).split("\n") + buffer = lines.pop() + + for line in lines: + if line.strip(): # Only print non-empty lines + try: + _print_stderr(line, errlog) + except Exception: + # Log errors but continue (non-critical) + logger.debug("Failed to print stderr line", exc_info=True) + + # Print any remaining buffer content + if buffer.strip(): + try: + _print_stderr(buffer, errlog) + except Exception: + logger.debug("Failed to print final stderr buffer", exc_info=True) + except anyio.ClosedResourceError: # pragma: no cover + await anyio.lowlevel.checkpoint() + except Exception: + # Log errors but continue (non-critical) + logger.debug("Error reading stderr", exc_info=True) + + @asynccontextmanager async def stdio_client(server: StdioServerParameters, errlog: TextIO = sys.stderr): """ @@ -190,92 +288,14 @@ async def stdio_client(server: StdioServerParameters, errlog: TextIO = sys.stder await write_stream_reader.aclose() raise - async def stdout_reader(): - assert process.stdout, "Opened process is missing stdout" - - try: - async with read_stream_writer: - buffer = "" - async for chunk in TextReceiveStream( - process.stdout, - encoding=server.encoding, - errors=server.encoding_error_handler, - ): - lines = (buffer + chunk).split("\n") - buffer = lines.pop() - - for line in lines: - try: - message = types.JSONRPCMessage.model_validate_json(line) - except Exception as exc: # pragma: no cover - logger.exception("Failed to parse JSONRPC message from server") - await read_stream_writer.send(exc) - continue - - session_message = SessionMessage(message) - await read_stream_writer.send(session_message) - except anyio.ClosedResourceError: # pragma: no cover - await anyio.lowlevel.checkpoint() - - async def stdin_writer(): - assert process.stdin, "Opened process is missing stdin" - - try: - async with write_stream_reader: - async for session_message in write_stream_reader: - json = session_message.message.model_dump_json(by_alias=True, exclude_none=True) - await process.stdin.send( - (json + "\n").encode( - encoding=server.encoding, - errors=server.encoding_error_handler, - ) - ) - except anyio.ClosedResourceError: # pragma: no cover - await anyio.lowlevel.checkpoint() - - async def stderr_reader(): - """Read stderr from the process and display it appropriately.""" - if not process.stderr: - return - - try: - buffer = "" - async for chunk in TextReceiveStream( - process.stderr, - encoding=server.encoding, - errors=server.encoding_error_handler, - ): - lines = (buffer + chunk).split("\n") - buffer = lines.pop() - - for line in lines: - if line.strip(): # Only print non-empty lines - try: - _print_stderr(line, errlog) - except Exception: - # Log errors but continue (non-critical) - logger.debug("Failed to print stderr line", exc_info=True) - - # Print any remaining buffer content - if buffer.strip(): - try: - _print_stderr(buffer, errlog) - except Exception: - logger.debug("Failed to print final stderr buffer", exc_info=True) - except anyio.ClosedResourceError: # pragma: no cover - await anyio.lowlevel.checkpoint() - except Exception: - # Log errors but continue (non-critical) - logger.debug("Error reading stderr", exc_info=True) - async with ( anyio.create_task_group() as tg, process, ): - tg.start_soon(stdout_reader) - tg.start_soon(stdin_writer) + tg.start_soon(_stdout_reader, process, read_stream_writer, server.encoding, server.encoding_error_handler) + tg.start_soon(_stdin_writer, process, write_stream_reader, server.encoding, server.encoding_error_handler) if process.stderr: - tg.start_soon(stderr_reader) + tg.start_soon(_stderr_reader, process, errlog, server.encoding, server.encoding_error_handler) try: yield read_stream, write_stream finally: From 7a6d762210ed653048060f3f07c3753406925643 Mon Sep 17 00:00:00 2001 From: dandrsantos Date: Sun, 30 Nov 2025 07:59:03 +0000 Subject: [PATCH 8/8] test: add coverage for stderr exception handlers and empty lines --- tests/client/test_stdio.py | 71 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 71 insertions(+) diff --git a/tests/client/test_stdio.py b/tests/client/test_stdio.py index 9aa25a749d..abc66bfc93 100644 --- a/tests/client/test_stdio.py +++ b/tests/client/test_stdio.py @@ -905,3 +905,74 @@ async def test_stderr_reader_exception_handling(): # Give it time to process stderr await anyio.sleep(0.3) # Should not crash, just log the error + + +@pytest.mark.anyio +async def test_stderr_reader_final_buffer_exception(): + """Test stderr reader handles exception in final buffer flush.""" + # Write stderr without trailing newline to trigger final buffer path + script_content = textwrap.dedent( + """ + import sys + sys.stderr.write("no newline") + sys.stderr.flush() + # Keep running briefly + import time + time.sleep(0.1) + """ + ) + + server_params = StdioServerParameters( + command=sys.executable, + args=["-c", script_content], + ) + + # Mock _print_stderr to raise an exception only on final buffer call + call_count = 0 + + def mock_print_stderr_side_effect(line, errlog): + nonlocal call_count + call_count += 1 + if call_count > 1: # Raise on subsequent calls (final buffer) + raise Exception("Print failed on final buffer") + + with patch("mcp.client.stdio._print_stderr", side_effect=mock_print_stderr_side_effect): + async with stdio_client(server_params) as (_, _): + await anyio.sleep(0.3) + # Should not crash, just log the error + + +@pytest.mark.anyio +async def test_stderr_with_empty_lines(): + """Test that empty stderr lines are skipped.""" + script_content = textwrap.dedent( + """ + import sys + print("line1", file=sys.stderr, flush=True) + print("", file=sys.stderr, flush=True) # Empty line + print(" ", file=sys.stderr, flush=True) # Whitespace only + print("line2", file=sys.stderr, flush=True) + # Keep running + try: + while True: + line = sys.stdin.readline() + if not line: + break + except: + pass + """ + ) + + server_params = StdioServerParameters( + command=sys.executable, + args=["-c", script_content], + ) + + stderr_capture = io.StringIO() + async with stdio_client(server_params, errlog=stderr_capture) as (_, _): + await anyio.sleep(0.3) + + stderr_output = stderr_capture.getvalue() + # Should have line1 and line2, but not empty lines + assert "line1" in stderr_output + assert "line2" in stderr_output