Skip to content

Commit 523f482

Browse files
committed
Merge main
2 parents 30e31c0 + 7b588b4 commit 523f482

File tree

14 files changed

+187
-120
lines changed

14 files changed

+187
-120
lines changed

.github/dependabot.yml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,16 @@ updates:
44
directory: /
55
schedule:
66
interval: monthly
7+
groups:
8+
github-actions:
9+
patterns:
10+
- "*"
711

812
- package-ecosystem: pip
913
directory: /
1014
schedule:
1115
interval: monthly
16+
groups:
17+
pip:
18+
patterns:
19+
- "*"

.github/workflows/publish.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ jobs:
99
publish:
1010
runs-on: ubuntu-latest
1111
steps:
12-
- uses: actions/checkout@v4
12+
- uses: actions/checkout@v5
1313
- uses: extractions/setup-just@v3
1414
- uses: astral-sh/setup-uv@v6
1515
with:

.github/workflows/test.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ jobs:
1515
check-types:
1616
runs-on: ubuntu-latest
1717
steps:
18-
- uses: actions/checkout@v4
18+
- uses: actions/checkout@v5
1919
- uses: extractions/setup-just@v3
2020
- uses: astral-sh/setup-uv@v6
2121
with:
@@ -25,7 +25,7 @@ jobs:
2525
lint:
2626
runs-on: ubuntu-latest
2727
steps:
28-
- uses: actions/checkout@v4
28+
- uses: actions/checkout@v5
2929
- uses: extractions/setup-just@v3
3030
- uses: astral-sh/setup-uv@v6
3131
with:
@@ -42,7 +42,7 @@ jobs:
4242
- "3.12"
4343
- "3.13"
4444
steps:
45-
- uses: actions/checkout@v4
45+
- uses: actions/checkout@v5
4646
- uses: extractions/setup-just@v3
4747
- uses: astral-sh/setup-uv@v6
4848
with:

packages/stompman/stompman/client.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
ReceiptFrame,
2121
SendFrame,
2222
)
23+
from stompman.logger import LOGGER
2324
from stompman.subscription import AckableMessageFrame, ActiveSubscriptions, AutoAckSubscription, ManualAckSubscription
2425
from stompman.transaction import Transaction
2526

@@ -29,7 +30,9 @@ class Client:
2930
PROTOCOL_VERSION: ClassVar = "1.2" # https://stomp.github.io/stomp-specification-1.2.html
3031

3132
servers: list[ConnectionParameters] = field(kw_only=False)
32-
on_error_frame: Callable[[ErrorFrame], Any] | None = None
33+
on_error_frame: Callable[[ErrorFrame], Any] | None = lambda error_frame: LOGGER.error(
34+
"received error frame: %s", error_frame
35+
)
3336

3437
heartbeat: Heartbeat = field(default=Heartbeat(1000, 1000))
3538
ssl: Literal[True] | SSLContext | None = None

packages/stompman/stompman/connection.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ def _reraise_connection_lost(*causes: type[Exception]) -> Generator[None, None,
3737
try:
3838
yield
3939
except causes as exception:
40-
raise ConnectionLostError from exception
40+
raise ConnectionLostError(reason=exception) from exception
4141

4242

4343
@dataclass(kw_only=True, slots=True)
@@ -85,10 +85,8 @@ async def write_frame(self, frame: AnyClientFrame) -> None:
8585
await self.writer.drain()
8686

8787
async def _read_non_empty_bytes(self, max_chunk_size: int) -> bytes:
88-
while ( # noqa: ASYNC110
89-
chunk := await self.reader.read(max_chunk_size)
90-
) == b"": # pragma: no cover (it definitely happens)
91-
await asyncio.sleep(0)
88+
if (chunk := await self.reader.read(max_chunk_size)) == b"":
89+
raise ConnectionLostError(reason="eof")
9290
return chunk
9391

9492
async def read_frames(self) -> AsyncGenerator[AnyServerFrame, None]:

packages/stompman/stompman/connection_manager.py

Lines changed: 13 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -55,21 +55,18 @@ class ConnectionManager:
5555
_reconnect_lock: asyncio.Lock = field(init=False, default_factory=asyncio.Lock)
5656
_task_group: asyncio.TaskGroup = field(init=False, default_factory=asyncio.TaskGroup)
5757
_send_heartbeat_task: asyncio.Task[None] = field(init=False, repr=False)
58-
_check_server_heartbeat_task: asyncio.Task[None] = field(init=False, repr=False)
5958

6059
async def __aenter__(self) -> Self:
6160
await self._task_group.__aenter__()
6261
self._send_heartbeat_task = self._task_group.create_task(asyncio.sleep(0))
63-
self._check_server_heartbeat_task = self._task_group.create_task(asyncio.sleep(0))
6462
self._active_connection_state = await self._get_active_connection_state(is_initial_call=True)
6563
return self
6664

6765
async def __aexit__(
6866
self, exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: TracebackType | None
6967
) -> None:
7068
self._send_heartbeat_task.cancel()
71-
self._check_server_heartbeat_task.cancel()
72-
await asyncio.wait([self._send_heartbeat_task, self._check_server_heartbeat_task])
69+
await asyncio.wait([self._send_heartbeat_task])
7370
await self._task_group.__aexit__(exc_type, exc_value, traceback)
7471

7572
if not self._active_connection_state:
@@ -82,29 +79,16 @@ async def __aexit__(
8279

8380
def _restart_heartbeat_tasks(self, server_heartbeat: Heartbeat) -> None:
8481
self._send_heartbeat_task.cancel()
85-
self._check_server_heartbeat_task.cancel()
8682
self._send_heartbeat_task = self._task_group.create_task(
8783
self._send_heartbeats_forever(server_heartbeat.want_to_receive_interval_ms)
8884
)
89-
self._check_server_heartbeat_task = self._task_group.create_task(
90-
self._check_server_heartbeat_forever(server_heartbeat.will_send_interval_ms)
91-
)
9285

9386
async def _send_heartbeats_forever(self, send_heartbeat_interval_ms: int) -> None:
9487
send_heartbeat_interval_seconds = send_heartbeat_interval_ms / 1000
9588
while True:
9689
await self.write_heartbeat_reconnecting()
9790
await asyncio.sleep(send_heartbeat_interval_seconds)
9891

99-
async def _check_server_heartbeat_forever(self, receive_heartbeat_interval_ms: int) -> None:
100-
receive_heartbeat_interval_seconds = receive_heartbeat_interval_ms / 1000
101-
while True:
102-
await asyncio.sleep(receive_heartbeat_interval_seconds * self.check_server_alive_interval_factor)
103-
if not self._active_connection_state:
104-
continue
105-
if not self._active_connection_state.is_alive(self.check_server_alive_interval_factor):
106-
self._clear_active_connection_state()
107-
10892
async def _create_connection_to_one_server(
10993
self, server: ConnectionParameters
11094
) -> tuple[AbstractConnection, ConnectionParameters] | None:
@@ -168,7 +152,7 @@ async def _get_active_connection_state(self, *, is_initial_call: bool = False) -
168152
self._active_connection_state = connection_result
169153
if not is_initial_call:
170154
LOGGER.warning(
171-
"reconnected after failure connection failure. connection_parameters: %s",
155+
"reconnected after connection failure. connection_parameters: %s",
172156
connection_result.lifespan.connection_parameters,
173157
)
174158
return connection_result
@@ -178,11 +162,12 @@ async def _get_active_connection_state(self, *, is_initial_call: bool = False) -
178162

179163
raise FailedAllConnectAttemptsError(retry_attempts=self.connect_retry_attempts, issues=connection_issues)
180164

181-
def _clear_active_connection_state(self) -> None:
165+
def _clear_active_connection_state(self, error_reason: ConnectionLostError) -> None:
182166
if not self._active_connection_state:
183167
return
184168
LOGGER.warning(
185-
"connection lost. connection_parameters: %s",
169+
"connection lost. reason: %r, connection_parameters: %s",
170+
error_reason.reason,
186171
self._active_connection_state.lifespan.connection_parameters,
187172
)
188173
self._active_connection_state = None
@@ -192,8 +177,8 @@ async def write_heartbeat_reconnecting(self) -> None:
192177
connection_state = await self._get_active_connection_state()
193178
try:
194179
return connection_state.connection.write_heartbeat()
195-
except ConnectionLostError:
196-
self._clear_active_connection_state()
180+
except ConnectionLostError as error:
181+
self._clear_active_connection_state(error)
197182

198183
raise FailedAllWriteAttemptsError(retry_attempts=self.write_retry_attempts)
199184

@@ -202,8 +187,8 @@ async def write_frame_reconnecting(self, frame: AnyClientFrame) -> None:
202187
connection_state = await self._get_active_connection_state()
203188
try:
204189
return await connection_state.connection.write_frame(frame)
205-
except ConnectionLostError:
206-
self._clear_active_connection_state()
190+
except ConnectionLostError as error:
191+
self._clear_active_connection_state(error)
207192

208193
raise FailedAllWriteAttemptsError(retry_attempts=self.write_retry_attempts)
209194

@@ -213,15 +198,15 @@ async def read_frames_reconnecting(self) -> AsyncGenerator[AnyServerFrame, None]
213198
try:
214199
async for frame in connection_state.connection.read_frames():
215200
yield frame
216-
except ConnectionLostError:
217-
self._clear_active_connection_state()
201+
except ConnectionLostError as error:
202+
self._clear_active_connection_state(error)
218203

219204
async def maybe_write_frame(self, frame: AnyClientFrame) -> bool:
220205
if not self._active_connection_state:
221206
return False
222207
try:
223208
await self._active_connection_state.connection.write_frame(frame)
224-
except ConnectionLostError:
225-
self._clear_active_connection_state()
209+
except ConnectionLostError as error:
210+
self._clear_active_connection_state(error)
226211
return False
227212
return True

packages/stompman/stompman/errors.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ def __str__(self) -> str:
1414
class ConnectionLostError(Error):
1515
"""Raised in stompman.AbstractConnection—and handled in stompman.ConnectionManager, therefore is private."""
1616

17+
reason: Exception | str
18+
1719

1820
@dataclass(frozen=True, kw_only=True, slots=True)
1921
class ConnectionConfirmationTimeout:

packages/stompman/stompman/serde.py

Lines changed: 70 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
import struct
2-
from collections import deque
32
from collections.abc import Iterator
43
from contextlib import suppress
5-
from dataclasses import dataclass, field
4+
from dataclasses import dataclass
65
from typing import Any, Final, cast
76

87
from stompman.frames import (
@@ -141,53 +140,83 @@ def make_frame_from_parts(*, command: bytes, headers: dict[str, str], body: byte
141140
return frame_type(headers=headers_, body=body) if frame_type in FRAMES_WITH_BODY else frame_type(headers=headers_) # type: ignore[call-arg]
142141

143142

144-
def parse_lines_into_frame(lines: deque[bytearray]) -> AnyClientFrame | AnyServerFrame:
145-
command = bytes(lines.popleft())
146-
headers = {}
147-
148-
while line := lines.popleft():
149-
header = parse_header(line)
150-
if header and header[0] not in headers:
151-
headers[header[0]] = header[1]
152-
body = bytes(lines.popleft()) if lines else b""
153-
return make_frame_from_parts(command=command, headers=headers, body=body)
154-
155-
156-
@dataclass(kw_only=True, slots=True)
143+
@dataclass(kw_only=True, slots=True, init=False)
157144
class FrameParser:
158-
_lines: deque[bytearray] = field(default_factory=deque, init=False)
159-
_current_line: bytearray = field(default_factory=bytearray, init=False)
160-
_previous_byte: bytes = field(default=b"", init=False)
161-
_headers_processed: bool = field(default=False, init=False)
145+
_current_buf: bytearray
146+
_previous_byte: bytes | None
147+
_headers_processed: bool
148+
_command: bytes | None
149+
_headers: dict[str, str]
150+
_content_length: int | None
151+
152+
def __init__(self) -> None:
153+
self._previous_byte = None
154+
self._reset()
162155

163156
def _reset(self) -> None:
157+
self._current_buf = bytearray()
164158
self._headers_processed = False
165-
self._lines.clear()
166-
self._current_line = bytearray()
159+
self._command = None
160+
self._headers = {}
161+
self._content_length = None
162+
163+
def _handle_null_byte(self) -> Iterator[AnyClientFrame | AnyServerFrame]:
164+
if not self._command or not self._headers_processed:
165+
self._reset()
166+
return
167+
if self._content_length is not None and self._content_length != len(self._current_buf):
168+
self._current_buf += NULL
169+
return
170+
yield make_frame_from_parts(command=self._command, headers=self._headers, body=bytes(self._current_buf))
171+
self._reset()
172+
173+
def _handle_newline_byte(self) -> Iterator[HeartbeatFrame]:
174+
if not self._current_buf and not self._command:
175+
yield HeartbeatFrame()
176+
return
177+
if self._previous_byte == CARRIAGE:
178+
self._current_buf.pop()
179+
self._headers_processed = not self._current_buf # extra empty line after headers
180+
181+
if self._command:
182+
self._process_header()
183+
else:
184+
self._process_command()
185+
186+
def _process_command(self) -> None:
187+
current_buf_bytes = bytes(self._current_buf)
188+
if current_buf_bytes not in COMMANDS_TO_FRAMES:
189+
self._reset()
190+
else:
191+
self._command = current_buf_bytes
192+
self._current_buf = bytearray()
193+
194+
def _process_header(self) -> None:
195+
header = parse_header(self._current_buf)
196+
if not header:
197+
self._current_buf = bytearray()
198+
return
199+
header_key, header_value = header
200+
if header_key not in self._headers:
201+
self._headers[header_key] = header_value
202+
if header_key.lower() == "content-length":
203+
with suppress(ValueError):
204+
self._content_length = int(header_value)
205+
self._current_buf = bytearray()
206+
207+
def _handle_body_byte(self, byte: bytes) -> None:
208+
if self._content_length is None or self._content_length != len(self._current_buf):
209+
self._current_buf += byte
167210

168211
def parse_frames_from_chunk(self, chunk: bytes) -> Iterator[AnyClientFrame | AnyServerFrame]:
169212
for byte in iter_bytes(chunk):
170213
if byte == NULL:
171-
if self._headers_processed:
172-
self._lines.append(self._current_line)
173-
yield parse_lines_into_frame(self._lines)
174-
self._reset()
175-
176-
elif not self._headers_processed and byte == NEWLINE:
177-
if self._current_line or self._lines:
178-
if self._previous_byte == CARRIAGE:
179-
self._current_line.pop()
180-
self._headers_processed = not self._current_line # extra empty line after headers
181-
182-
if not self._lines and bytes(self._current_line) not in COMMANDS_TO_FRAMES:
183-
self._reset()
184-
else:
185-
self._lines.append(self._current_line)
186-
self._current_line = bytearray()
187-
else:
188-
yield HeartbeatFrame()
189-
214+
yield from self._handle_null_byte()
215+
elif self._headers_processed:
216+
self._handle_body_byte(byte)
217+
elif byte == NEWLINE:
218+
yield from self._handle_newline_byte()
190219
else:
191-
self._current_line += byte
220+
self._current_buf += byte
192221

193222
self._previous_byte = byte

packages/stompman/test_stompman/test_connection_lifespan.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ async def mock_sleep(delay: float) -> None:
148148
async with EnrichedClient(connection_class=connection_class):
149149
await real_sleep(0)
150150

151-
assert sleep_calls == [0, 0, 1, 3, 1, 3, 1, 3]
151+
assert sleep_calls == [0, 1, 1, 1]
152152
assert write_heartbeat_mock.mock_calls == [mock.call(), mock.call(), mock.call(), mock.call()]
153153

154154

0 commit comments

Comments
 (0)