Skip to content

Commit 312cf11

Browse files
authored
Avoid sending ACK and NACK frames in new connection for messages received in old connection that was lost (#161)
1 parent 8053f15 commit 312cf11

File tree

5 files changed

+236
-2
lines changed

5 files changed

+236
-2
lines changed

AGENTS.md

Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
# stompman Project Context
2+
3+
## Project Overview
4+
5+
stompman is a modern, asynchronous Python client for the STOMP (Simple Text Oriented Messaging Protocol) messaging protocol. It provides a typed, modern, and comprehensible API for working with STOMP-compatible message brokers like ActiveMQ Artemis and ActiveMQ Classic.
6+
7+
The project consists of two main packages:
8+
1. `stompman` - The core STOMP client library
9+
2. `faststream-stomp` - A FastStream broker implementation for STOMP
10+
11+
## Key Features
12+
13+
- Fully asynchronous implementation using Python's asyncio
14+
- Modern, typed API with comprehensive type hints
15+
- Automatic connection management with reconnection capabilities
16+
- Support for transactions, subscriptions, and message acknowledgment
17+
- Built-in heartbeat support for connection health monitoring
18+
- Integration with FastStream for declarative message handling
19+
- Compatible with STOMP 1.2 protocol specification
20+
- Tested with ActiveMQ Artemis and ActiveMQ Classic
21+
22+
## Project Structure
23+
24+
```
25+
stompman/
26+
├── packages/
27+
│ ├── stompman/ # Core STOMP client library
28+
│ │ ├── stompman/ # Main source code
29+
│ │ └── test_stompman/ # Unit and integration tests
30+
│ └── faststream-stomp/ # FastStream broker implementation
31+
│ ├── faststream_stomp/ # Main source code
32+
│ └── test_faststream_stomp/ # Unit and integration tests
33+
├── examples/ # Usage examples
34+
├── docker-compose.yml # Development environment with ActiveMQ containers
35+
└── Justfile # Project commands and workflows
36+
```
37+
38+
## Core Components (stompman package)
39+
40+
### Main Classes
41+
42+
- `Client` - The main entry point for interacting with STOMP servers
43+
- `ConnectionParameters` - Configuration for connecting to STOMP servers
44+
- `Heartbeat` - Configuration for connection heartbeats
45+
46+
### Key Methods
47+
48+
- `Client.send()` - Send messages to destinations
49+
- `Client.subscribe()` - Subscribe to destinations with automatic ACK/NACK handling
50+
- `Client.subscribe_with_manual_ack()` - Subscribe with manual ACK/NACK control
51+
- `Client.begin()` - Start a transaction context manager
52+
- `Client.is_alive()` - Check connection health
53+
54+
### Error Handling
55+
56+
- `FailedAllConnectAttemptsError` - Raised when all connection attempts fail
57+
- `FailedAllWriteAttemptsError` - Raised when writes fail after all retries
58+
- Various other specific error types for different failure scenarios
59+
60+
## FastStream Integration (faststream-stomp package)
61+
62+
Provides a FastStream broker implementation that allows using FastStream's declarative approach with STOMP:
63+
64+
- `StompBroker` - Main broker class
65+
- Decorators for subscribers and publishers
66+
- Testing utilities with `TestStompBroker`
67+
68+
## Development Environment
69+
70+
The project uses Docker Compose to provide a development environment with:
71+
- ActiveMQ Artemis on port 9000
72+
- ActiveMQ Classic on port 9001
73+
74+
## Building and Running
75+
76+
### Prerequisites
77+
78+
- Python 3.11 or newer
79+
- uv (package manager)
80+
- Docker and Docker Compose (for development environment)
81+
82+
### Setup
83+
84+
```bash
85+
# Install dependencies
86+
just install
87+
88+
# Or manually:
89+
uv lock --upgrade
90+
uv sync --all-extras --all-packages --frozen
91+
```
92+
93+
### Running Tests
94+
95+
```bash
96+
# Run fast tests (unit tests only)
97+
just test-fast
98+
99+
# Run all tests (including integration tests with Docker)
100+
just test
101+
102+
# Run tests with specific arguments
103+
just test-fast -k "test_specific_feature"
104+
```
105+
106+
### Code Quality
107+
108+
```bash
109+
# Run linters
110+
just lint
111+
112+
# Check types
113+
just check-types
114+
115+
# Format code
116+
uv run ruff format .
117+
```
118+
119+
### Running Examples
120+
121+
```bash
122+
# Start ActiveMQ Artemis
123+
just run-artemis
124+
125+
# Run consumer example
126+
just run-consumer
127+
128+
# Run producer example
129+
just run-producer
130+
```
131+
132+
## Development Conventions
133+
134+
### Code Style
135+
136+
- Strict adherence to type hints with mypy in strict mode
137+
- Code formatting with ruff (line length 120)
138+
- Comprehensive unit and integration tests
139+
- Modern Python features (3.11+) encouraged
140+
141+
### Testing
142+
143+
- Unit tests in `test_stompman/` directory
144+
- Integration tests that require Docker containers
145+
- Property-based testing with hypothesis
146+
- Test coverage reporting enabled
147+
148+
### CI/CD
149+
150+
- Automated testing on multiple platforms
151+
- Type checking and linting in CI pipeline
152+
- Automated publishing to PyPI
153+
154+
## Common Development Tasks
155+
156+
1. **Adding a new feature**:
157+
- Implement in the appropriate module under `stompman/`
158+
- Add unit tests in `test_stompman/`
159+
- Update documentation in docstrings and README if needed
160+
161+
2. **Fixing a bug**:
162+
- Write a failing test that reproduces the issue
163+
- Fix the implementation
164+
- Verify the test now passes
165+
166+
3. **Updating dependencies**:
167+
- Modify `pyproject.toml` files
168+
- Run `uv lock --upgrade` to update lock files
169+
170+
4. **Running integration tests**:
171+
- Ensure Docker is running
172+
- Run `just test` to start containers and run tests

packages/faststream-stomp/test_faststream_stomp/test_integration.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ def handle_destination(message_frame: Annotated[stompman.MessageFrame, Context("
226226
extra = {"destination": destination, "message_id": message_id}
227227
assert log_mock.mock_calls[-3:] == [
228228
mock.call("Received", extra=extra),
229-
mock.call(message="MyError: ", extra=extra, exc_info=MyError()),
229+
mock.call(message="MyError: ", extra=extra, exc_info=MyError(), log_level=logging.ERROR),
230230
mock.call(message="Processed", extra=extra),
231231
]
232232

packages/stompman/stompman/connection_manager.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ class ConnectionManager:
5555
_reconnect_lock: asyncio.Lock = field(init=False, default_factory=asyncio.Lock)
5656
_task_group: asyncio.TaskGroup = field(init=False, default_factory=asyncio.TaskGroup)
5757
_send_heartbeat_task: asyncio.Task[None] = field(init=False, repr=False)
58+
_reconnection_count: int = field(default=0, init=False)
5859

5960
async def __aenter__(self) -> Self:
6061
await self._task_group.__aenter__()
@@ -171,6 +172,7 @@ def _clear_active_connection_state(self, error_reason: ConnectionLostError) -> N
171172
self._active_connection_state.lifespan.connection_parameters,
172173
)
173174
self._active_connection_state = None
175+
self._reconnection_count += 1
174176

175177
async def write_heartbeat_reconnecting(self) -> None:
176178
for _ in range(self.write_retry_attempts):

packages/stompman/stompman/subscription.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,13 +59,15 @@ class BaseSubscription:
5959
ack: AckMode
6060
_connection_manager: ConnectionManager
6161
_active_subscriptions: ActiveSubscriptions
62+
_bound_reconnection_count: int = field(init=False)
6263

6364
async def _subscribe(self) -> None:
6465
await self._connection_manager.write_frame_reconnecting(
6566
SubscribeFrame.build(
6667
subscription_id=self.id, destination=self.destination, ack=self.ack, headers=self.headers
6768
)
6869
)
70+
self._bound_reconnection_count = self._connection_manager._reconnection_count
6971
self._active_subscriptions.add(self) # type: ignore[arg-type]
7072

7173
async def unsubscribe(self) -> None:
@@ -91,6 +93,16 @@ async def _nack(self, frame: MessageFrame) -> None:
9193
frame.headers.keys(),
9294
)
9395
return
96+
if self._bound_reconnection_count != self._connection_manager._reconnection_count:
97+
LOGGER.debug(
98+
"skipping nack for message frame: connection changed since message was received. "
99+
"message_id: %s, subscription_id: %s, bound_reconnection_count: %s, current_reconnection_count: %s",
100+
frame.headers["message-id"],
101+
self.id,
102+
self._bound_reconnection_count,
103+
self._connection_manager._reconnection_count,
104+
)
105+
return
94106
await self._connection_manager.maybe_write_frame(NackFrame(headers={"id": ack_id, "subscription": self.id}))
95107

96108
async def _ack(self, frame: MessageFrame) -> None:
@@ -112,6 +124,16 @@ async def _ack(self, frame: MessageFrame) -> None:
112124
frame.headers.keys(),
113125
)
114126
return
127+
if self._bound_reconnection_count != self._connection_manager._reconnection_count:
128+
LOGGER.debug(
129+
"skipping ack for message frame: connection changed since message was received. "
130+
"message_id: %s, subscription_id: %s, bound_reconnection_count: %s, current_reconnection_count: %s",
131+
frame.headers["message-id"],
132+
self.id,
133+
self._bound_reconnection_count,
134+
self._connection_manager._reconnection_count,
135+
)
136+
return
115137
await self._connection_manager.maybe_write_frame(AckFrame(headers={"id": ack_id, "subscription": self.id}))
116138

117139

packages/stompman/test_stompman/test_subscription.py

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import asyncio
2+
import logging
23
from functools import partial
34
from typing import get_args
45
from unittest import mock
@@ -10,6 +11,7 @@
1011
AckFrame,
1112
AckMode,
1213
ConnectedFrame,
14+
ConnectionLostError,
1315
ErrorFrame,
1416
FailedAllConnectAttemptsError,
1517
HeartbeatFrame,
@@ -20,7 +22,6 @@
2022
SubscribeFrame,
2123
UnsubscribeFrame,
2224
)
23-
from stompman.errors import ConnectionLostError
2425

2526
from test_stompman.conftest import (
2627
CONNECT_FRAME,
@@ -373,6 +374,43 @@ async def close_connection_soon(client: stompman.Client) -> None:
373374
assert isinstance(inner_inner_group.exceptions[0], FailedAllConnectAttemptsError)
374375

375376

377+
async def test_subscription_skips_ack_nack_after_reconnection(
378+
monkeypatch: pytest.MonkeyPatch, faker: faker.Faker, caplog: pytest.LogCaptureFixture
379+
) -> None:
380+
subscription_id, destination, message_id, ack_id = faker.pystr(), faker.pystr(), faker.pystr(), faker.pystr()
381+
monkeypatch.setattr(stompman.subscription, "_make_subscription_id", mock.Mock(return_value=subscription_id))
382+
message_frame = build_dataclass(
383+
MessageFrame,
384+
headers={"destination": destination, "message-id": message_id, "subscription": subscription_id, "ack": ack_id},
385+
)
386+
connection_class, collected_frames = create_spying_connection(*get_read_frames_with_lifespan([message_frame]))
387+
stored_message = None
388+
389+
async def track_ack_nack_frames(message: stompman.subscription.AckableMessageFrame) -> None:
390+
nonlocal stored_message
391+
stored_message = message
392+
await asyncio.sleep(0)
393+
394+
async with EnrichedClient(connection_class=connection_class) as client:
395+
subscription = await client.subscribe_with_manual_ack(destination, track_ack_nack_frames)
396+
await asyncio.sleep(0)
397+
client._connection_manager._clear_active_connection_state(build_dataclass(ConnectionLostError))
398+
await asyncio.sleep(0)
399+
400+
with caplog.at_level(logging.DEBUG, logger="stompman"):
401+
assert stored_message
402+
await stored_message.ack()
403+
await stored_message.nack()
404+
405+
await subscription.unsubscribe()
406+
407+
assert not [one_frame for one_frame in collected_frames if isinstance(one_frame, AckFrame)]
408+
assert not [one_frame for one_frame in collected_frames if isinstance(one_frame, NackFrame)]
409+
assert any(
410+
"connection changed since message was received" in one_message.lower() for one_message in caplog.messages
411+
)
412+
413+
376414
def test_make_subscription_id() -> None:
377415
stompman.subscription._make_subscription_id()
378416

0 commit comments

Comments
 (0)