diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 0000000..395859d --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,172 @@ +# stompman Project Context + +## Project Overview + +stompman is a modern, asynchronous Python client for the STOMP (Simple Text Oriented Messaging Protocol) messaging protocol. It provides a typed, modern, and comprehensible API for working with STOMP-compatible message brokers like ActiveMQ Artemis and ActiveMQ Classic. + +The project consists of two main packages: +1. `stompman` - The core STOMP client library +2. `faststream-stomp` - A FastStream broker implementation for STOMP + +## Key Features + +- Fully asynchronous implementation using Python's asyncio +- Modern, typed API with comprehensive type hints +- Automatic connection management with reconnection capabilities +- Support for transactions, subscriptions, and message acknowledgment +- Built-in heartbeat support for connection health monitoring +- Integration with FastStream for declarative message handling +- Compatible with STOMP 1.2 protocol specification +- Tested with ActiveMQ Artemis and ActiveMQ Classic + +## Project Structure + +``` +stompman/ +├── packages/ +│ ├── stompman/ # Core STOMP client library +│ │ ├── stompman/ # Main source code +│ │ └── test_stompman/ # Unit and integration tests +│ └── faststream-stomp/ # FastStream broker implementation +│ ├── faststream_stomp/ # Main source code +│ └── test_faststream_stomp/ # Unit and integration tests +├── examples/ # Usage examples +├── docker-compose.yml # Development environment with ActiveMQ containers +└── Justfile # Project commands and workflows +``` + +## Core Components (stompman package) + +### Main Classes + +- `Client` - The main entry point for interacting with STOMP servers +- `ConnectionParameters` - Configuration for connecting to STOMP servers +- `Heartbeat` - Configuration for connection heartbeats + +### Key Methods + +- `Client.send()` - Send messages to destinations +- `Client.subscribe()` - Subscribe to destinations with automatic ACK/NACK handling +- `Client.subscribe_with_manual_ack()` - Subscribe with manual ACK/NACK control +- `Client.begin()` - Start a transaction context manager +- `Client.is_alive()` - Check connection health + +### Error Handling + +- `FailedAllConnectAttemptsError` - Raised when all connection attempts fail +- `FailedAllWriteAttemptsError` - Raised when writes fail after all retries +- Various other specific error types for different failure scenarios + +## FastStream Integration (faststream-stomp package) + +Provides a FastStream broker implementation that allows using FastStream's declarative approach with STOMP: + +- `StompBroker` - Main broker class +- Decorators for subscribers and publishers +- Testing utilities with `TestStompBroker` + +## Development Environment + +The project uses Docker Compose to provide a development environment with: +- ActiveMQ Artemis on port 9000 +- ActiveMQ Classic on port 9001 + +## Building and Running + +### Prerequisites + +- Python 3.11 or newer +- uv (package manager) +- Docker and Docker Compose (for development environment) + +### Setup + +```bash +# Install dependencies +just install + +# Or manually: +uv lock --upgrade +uv sync --all-extras --all-packages --frozen +``` + +### Running Tests + +```bash +# Run fast tests (unit tests only) +just test-fast + +# Run all tests (including integration tests with Docker) +just test + +# Run tests with specific arguments +just test-fast -k "test_specific_feature" +``` + +### Code Quality + +```bash +# Run linters +just lint + +# Check types +just check-types + +# Format code +uv run ruff format . +``` + +### Running Examples + +```bash +# Start ActiveMQ Artemis +just run-artemis + +# Run consumer example +just run-consumer + +# Run producer example +just run-producer +``` + +## Development Conventions + +### Code Style + +- Strict adherence to type hints with mypy in strict mode +- Code formatting with ruff (line length 120) +- Comprehensive unit and integration tests +- Modern Python features (3.11+) encouraged + +### Testing + +- Unit tests in `test_stompman/` directory +- Integration tests that require Docker containers +- Property-based testing with hypothesis +- Test coverage reporting enabled + +### CI/CD + +- Automated testing on multiple platforms +- Type checking and linting in CI pipeline +- Automated publishing to PyPI + +## Common Development Tasks + +1. **Adding a new feature**: + - Implement in the appropriate module under `stompman/` + - Add unit tests in `test_stompman/` + - Update documentation in docstrings and README if needed + +2. **Fixing a bug**: + - Write a failing test that reproduces the issue + - Fix the implementation + - Verify the test now passes + +3. **Updating dependencies**: + - Modify `pyproject.toml` files + - Run `uv lock --upgrade` to update lock files + +4. **Running integration tests**: + - Ensure Docker is running + - Run `just test` to start containers and run tests diff --git a/packages/faststream-stomp/test_faststream_stomp/test_integration.py b/packages/faststream-stomp/test_faststream_stomp/test_integration.py index 8fc3ac6..2bda3a1 100644 --- a/packages/faststream-stomp/test_faststream_stomp/test_integration.py +++ b/packages/faststream-stomp/test_faststream_stomp/test_integration.py @@ -226,7 +226,7 @@ def handle_destination(message_frame: Annotated[stompman.MessageFrame, Context(" extra = {"destination": destination, "message_id": message_id} assert log_mock.mock_calls[-3:] == [ mock.call("Received", extra=extra), - mock.call(message="MyError: ", extra=extra, exc_info=MyError()), + mock.call(message="MyError: ", extra=extra, exc_info=MyError(), log_level=logging.ERROR), mock.call(message="Processed", extra=extra), ] diff --git a/packages/stompman/stompman/connection_manager.py b/packages/stompman/stompman/connection_manager.py index 6f8ec25..1766c84 100644 --- a/packages/stompman/stompman/connection_manager.py +++ b/packages/stompman/stompman/connection_manager.py @@ -55,6 +55,7 @@ class ConnectionManager: _reconnect_lock: asyncio.Lock = field(init=False, default_factory=asyncio.Lock) _task_group: asyncio.TaskGroup = field(init=False, default_factory=asyncio.TaskGroup) _send_heartbeat_task: asyncio.Task[None] = field(init=False, repr=False) + _reconnection_count: int = field(default=0, init=False) async def __aenter__(self) -> Self: await self._task_group.__aenter__() @@ -171,6 +172,7 @@ def _clear_active_connection_state(self, error_reason: ConnectionLostError) -> N self._active_connection_state.lifespan.connection_parameters, ) self._active_connection_state = None + self._reconnection_count += 1 async def write_heartbeat_reconnecting(self) -> None: for _ in range(self.write_retry_attempts): diff --git a/packages/stompman/stompman/subscription.py b/packages/stompman/stompman/subscription.py index 6d483d7..14c9268 100644 --- a/packages/stompman/stompman/subscription.py +++ b/packages/stompman/stompman/subscription.py @@ -59,6 +59,7 @@ class BaseSubscription: ack: AckMode _connection_manager: ConnectionManager _active_subscriptions: ActiveSubscriptions + _bound_reconnection_count: int = field(init=False) async def _subscribe(self) -> None: await self._connection_manager.write_frame_reconnecting( @@ -66,6 +67,7 @@ async def _subscribe(self) -> None: subscription_id=self.id, destination=self.destination, ack=self.ack, headers=self.headers ) ) + self._bound_reconnection_count = self._connection_manager._reconnection_count self._active_subscriptions.add(self) # type: ignore[arg-type] async def unsubscribe(self) -> None: @@ -91,6 +93,16 @@ async def _nack(self, frame: MessageFrame) -> None: frame.headers.keys(), ) return + if self._bound_reconnection_count != self._connection_manager._reconnection_count: + LOGGER.debug( + "skipping nack for message frame: connection changed since message was received. " + "message_id: %s, subscription_id: %s, bound_reconnection_count: %s, current_reconnection_count: %s", + frame.headers["message-id"], + self.id, + self._bound_reconnection_count, + self._connection_manager._reconnection_count, + ) + return await self._connection_manager.maybe_write_frame(NackFrame(headers={"id": ack_id, "subscription": self.id})) async def _ack(self, frame: MessageFrame) -> None: @@ -112,6 +124,16 @@ async def _ack(self, frame: MessageFrame) -> None: frame.headers.keys(), ) return + if self._bound_reconnection_count != self._connection_manager._reconnection_count: + LOGGER.debug( + "skipping ack for message frame: connection changed since message was received. " + "message_id: %s, subscription_id: %s, bound_reconnection_count: %s, current_reconnection_count: %s", + frame.headers["message-id"], + self.id, + self._bound_reconnection_count, + self._connection_manager._reconnection_count, + ) + return await self._connection_manager.maybe_write_frame(AckFrame(headers={"id": ack_id, "subscription": self.id})) diff --git a/packages/stompman/test_stompman/test_subscription.py b/packages/stompman/test_stompman/test_subscription.py index ec4a133..488d114 100644 --- a/packages/stompman/test_stompman/test_subscription.py +++ b/packages/stompman/test_stompman/test_subscription.py @@ -1,4 +1,5 @@ import asyncio +import logging from functools import partial from typing import get_args from unittest import mock @@ -10,6 +11,7 @@ AckFrame, AckMode, ConnectedFrame, + ConnectionLostError, ErrorFrame, FailedAllConnectAttemptsError, HeartbeatFrame, @@ -20,7 +22,6 @@ SubscribeFrame, UnsubscribeFrame, ) -from stompman.errors import ConnectionLostError from test_stompman.conftest import ( CONNECT_FRAME, @@ -373,6 +374,43 @@ async def close_connection_soon(client: stompman.Client) -> None: assert isinstance(inner_inner_group.exceptions[0], FailedAllConnectAttemptsError) +async def test_subscription_skips_ack_nack_after_reconnection( + monkeypatch: pytest.MonkeyPatch, faker: faker.Faker, caplog: pytest.LogCaptureFixture +) -> None: + subscription_id, destination, message_id, ack_id = faker.pystr(), faker.pystr(), faker.pystr(), faker.pystr() + monkeypatch.setattr(stompman.subscription, "_make_subscription_id", mock.Mock(return_value=subscription_id)) + message_frame = build_dataclass( + MessageFrame, + headers={"destination": destination, "message-id": message_id, "subscription": subscription_id, "ack": ack_id}, + ) + connection_class, collected_frames = create_spying_connection(*get_read_frames_with_lifespan([message_frame])) + stored_message = None + + async def track_ack_nack_frames(message: stompman.subscription.AckableMessageFrame) -> None: + nonlocal stored_message + stored_message = message + await asyncio.sleep(0) + + async with EnrichedClient(connection_class=connection_class) as client: + subscription = await client.subscribe_with_manual_ack(destination, track_ack_nack_frames) + await asyncio.sleep(0) + client._connection_manager._clear_active_connection_state(build_dataclass(ConnectionLostError)) + await asyncio.sleep(0) + + with caplog.at_level(logging.DEBUG, logger="stompman"): + assert stored_message + await stored_message.ack() + await stored_message.nack() + + await subscription.unsubscribe() + + assert not [one_frame for one_frame in collected_frames if isinstance(one_frame, AckFrame)] + assert not [one_frame for one_frame in collected_frames if isinstance(one_frame, NackFrame)] + assert any( + "connection changed since message was received" in one_message.lower() for one_message in caplog.messages + ) + + def test_make_subscription_id() -> None: stompman.subscription._make_subscription_id()