Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions libs/opsqueue_python/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import subprocess
import uuid
import os
from libs.opsqueue_python.tests.util import wait_for_server
Copy link

Copilot AI Dec 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The import statement uses an absolute path starting with libs.opsqueue_python.tests.util which might not work correctly depending on how the Python path is configured. Consider using a relative import like from .util import wait_for_server or from tests.util import wait_for_server instead, which would be more robust to different project structures and import contexts.

Suggested change
from libs.opsqueue_python.tests.util import wait_for_server
from .util import wait_for_server

Copilot uses AI. Check for mistakes.
import psutil
import pytest
from dataclasses import dataclass
from pathlib import Path
Expand Down Expand Up @@ -52,13 +54,10 @@ def opsqueue() -> Generator[OpsqueueProcess, None, None]:

@contextmanager
def opsqueue_service(
*, port: int | None = None
*, port: int = 0,
) -> Generator[OpsqueueProcess, None, None]:
global test_opsqueue_port_offset
Copy link

Copilot AI Dec 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The global variable test_opsqueue_port_offset is declared but never actually used in this function. This appears to be leftover from the previous implementation. Consider removing this line to avoid confusion.

Suggested change
global test_opsqueue_port_offset

Copilot uses AI. Check for mistakes.

if port is None:
port = random_free_port()

temp_dbname = f"/tmp/opsqueue_tests-{uuid.uuid4()}.db"

command = [
Expand All @@ -72,7 +71,8 @@ def opsqueue_service(
if env.get("RUST_LOG") is None:
env["RUST_LOG"] = "off"

with subprocess.Popen(command, cwd=PROJECT_ROOT, env=env) as process:
with psutil.Popen(command, cwd=PROJECT_ROOT, env=env) as process:
_host, port = wait_for_server(process)
try:
wrapper = OpsqueueProcess(port=port, process=process)
Comment on lines +74 to 77
Copy link

Copilot AI Dec 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The process type has been changed from subprocess.Popen to psutil.Popen, but the OpsqueueProcess dataclass still expects subprocess.Popen[bytes]. This type mismatch should be addressed by updating the dataclass type annotation to match. Note: psutil.Popen is a subclass of subprocess.Popen, so the code will work at runtime, but type checkers may report errors.

Copilot uses AI. Check for mistakes.
yield wrapper
Expand Down
57 changes: 57 additions & 0 deletions libs/opsqueue_python/tests/util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import logging

import psutil
from opnieuw import retry
from psutil._common import pconn
Copy link

Copilot AI Dec 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Importing from psutil._common accesses a private/internal module (indicated by the underscore prefix). This could break in future versions of psutil as internal APIs are not guaranteed to be stable. Consider using public psutil APIs instead, or be aware that this might require changes when psutil is updated.

Copilot uses AI. Check for mistakes.

LOGGER = logging.getLogger(__name__)


@retry(
retry_on_exceptions=ValueError,
Copy link

Copilot AI Dec 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The RuntimeError exceptions raised for unexpected process states (lines 43, 45-47) and for multiple listening ports (line 57) are not configured to be retried by the @retry decorator, which only retries on ValueError. If a process temporarily has access issues before it's fully started, or if there's a race condition where multiple ports are briefly detected, these will immediately fail without retry. Consider whether these error conditions should also be retried or if the retry logic should be adjusted.

Suggested change
retry_on_exceptions=ValueError,
retry_on_exceptions=(ValueError, RuntimeError),

Copilot uses AI. Check for mistakes.
max_calls_total=5,
retry_window_after_first_call_in_seconds=5,
)
def wait_for_server(proc: psutil.Popen) -> tuple[str, int]:
"""
Wait for a process to be listening on a single port.
If the process is listening on no ports, a ValueError is thrown and this is retried.
If multiple ports are listening, a RuntimeError is thrown.
Comment on lines +17 to +19
Copy link

Copilot AI Dec 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docstring states "If multiple ports are listening, a RuntimeError is thrown" but doesn't explain why this is considered an error condition. Since the function is designed to wait for servers that should listen on a single port, it would be helpful to document this assumption more clearly, such as: "This function expects the process to listen on exactly one port. If multiple ports are listening, a RuntimeError is raised as this indicates an unexpected server configuration."

Suggested change
Wait for a process to be listening on a single port.
If the process is listening on no ports, a ValueError is thrown and this is retried.
If multiple ports are listening, a RuntimeError is thrown.
Wait for a process to be listening on exactly one port and return that address.
This function expects the given process to listen on a single port only. If the
process is listening on no ports, a ValueError is raised and the check is retried.
If multiple ports are listening, a RuntimeError is raised as this indicates an
unexpected server configuration.

Copilot uses AI. Check for mistakes.
"""
if not proc.is_running():
raise ValueError(f"Process {proc} is not running")

try:
# Try to get the connections of the main process first, if that fails try the children.
# Processes wrapped with `timeout` do not have connections themselves.
connections: list[pconn] = (
proc.net_connections()
or [
child_conn
for child in proc.children(recursive=False)
for child_conn in child.net_connections()
]
or [
child_conn
for child in proc.children(recursive=True)
for child_conn in child.net_connections()
]
)
except psutil.AccessDenied as e:
match proc.status():
case psutil.STATUS_ZOMBIE | psutil.STATUS_DEAD | psutil.STATUS_STOPPED:
raise RuntimeError(f"Process {proc} has exited unexpectedly") from e
case _:
raise RuntimeError(
f"Could not get `net_connections` for process {proc}, access denied "
) from e

ports = [x for x in connections if x.status == psutil.CONN_LISTEN]
listen_count = len(ports)

if listen_count == 0:
raise ValueError(f"Process {proc} is not listening on any ports")
if listen_count == 1:
return ports[0].laddr

raise RuntimeError(f"Process {proc} is listening on multiple ports")
Comment on lines +15 to +57
Copy link

Copilot AI Dec 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new wait_for_server utility function lacks test coverage. Given that it contains complex logic with multiple error paths (process not running, access denied, zombie/dead status, no ports listening, multiple ports), this function should have dedicated unit tests to ensure reliability. The repository appears to use comprehensive testing (test files exist in the tests directory), so this new utility should follow the same pattern.

Copilot uses AI. Check for mistakes.
Loading