Skip to content

Commit dd14c10

Browse files
authored
Log connection lost reason (#142)
1 parent b47a2ed commit dd14c10

File tree

6 files changed

+51
-27
lines changed

6 files changed

+51
-27
lines changed

packages/stompman/stompman/connection.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ def _reraise_connection_lost(*causes: type[Exception]) -> Generator[None, None,
3737
try:
3838
yield
3939
except causes as exception:
40-
raise ConnectionLostError from exception
40+
raise ConnectionLostError(reason=exception) from exception
4141

4242

4343
@dataclass(kw_only=True, slots=True)
@@ -86,7 +86,7 @@ async def write_frame(self, frame: AnyClientFrame) -> None:
8686

8787
async def _read_non_empty_bytes(self, max_chunk_size: int) -> bytes:
8888
if (chunk := await self.reader.read(max_chunk_size)) == b"":
89-
raise ConnectionLostError
89+
raise ConnectionLostError(reason="eof")
9090
return chunk
9191

9292
async def read_frames(self) -> AsyncGenerator[AnyServerFrame, None]:

packages/stompman/stompman/connection_manager.py

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ async def _check_server_heartbeat_forever(self, receive_heartbeat_interval_ms: i
103103
if not self._active_connection_state:
104104
continue
105105
if not self._active_connection_state.is_alive(self.check_server_alive_interval_factor):
106-
self._clear_active_connection_state()
106+
self._clear_active_connection_state(ConnectionLostError(reason="server heartbeat timeout"))
107107

108108
async def _create_connection_to_one_server(
109109
self, server: ConnectionParameters
@@ -168,7 +168,7 @@ async def _get_active_connection_state(self, *, is_initial_call: bool = False) -
168168
self._active_connection_state = connection_result
169169
if not is_initial_call:
170170
LOGGER.warning(
171-
"reconnected after failure connection failure. connection_parameters: %s",
171+
"reconnected after connection failure. connection_parameters: %s",
172172
connection_result.lifespan.connection_parameters,
173173
)
174174
return connection_result
@@ -178,11 +178,12 @@ async def _get_active_connection_state(self, *, is_initial_call: bool = False) -
178178

179179
raise FailedAllConnectAttemptsError(retry_attempts=self.connect_retry_attempts, issues=connection_issues)
180180

181-
def _clear_active_connection_state(self) -> None:
181+
def _clear_active_connection_state(self, error_reason: ConnectionLostError) -> None:
182182
if not self._active_connection_state:
183183
return
184184
LOGGER.warning(
185-
"connection lost. connection_parameters: %s",
185+
"connection lost. reason: %s, connection_parameters: %s",
186+
error_reason.reason,
186187
self._active_connection_state.lifespan.connection_parameters,
187188
)
188189
self._active_connection_state = None
@@ -192,8 +193,8 @@ async def write_heartbeat_reconnecting(self) -> None:
192193
connection_state = await self._get_active_connection_state()
193194
try:
194195
return connection_state.connection.write_heartbeat()
195-
except ConnectionLostError:
196-
self._clear_active_connection_state()
196+
except ConnectionLostError as error:
197+
self._clear_active_connection_state(error)
197198

198199
raise FailedAllWriteAttemptsError(retry_attempts=self.write_retry_attempts)
199200

@@ -202,8 +203,8 @@ async def write_frame_reconnecting(self, frame: AnyClientFrame) -> None:
202203
connection_state = await self._get_active_connection_state()
203204
try:
204205
return await connection_state.connection.write_frame(frame)
205-
except ConnectionLostError:
206-
self._clear_active_connection_state()
206+
except ConnectionLostError as error:
207+
self._clear_active_connection_state(error)
207208

208209
raise FailedAllWriteAttemptsError(retry_attempts=self.write_retry_attempts)
209210

@@ -213,15 +214,15 @@ async def read_frames_reconnecting(self) -> AsyncGenerator[AnyServerFrame, None]
213214
try:
214215
async for frame in connection_state.connection.read_frames():
215216
yield frame
216-
except ConnectionLostError:
217-
self._clear_active_connection_state()
217+
except ConnectionLostError as error:
218+
self._clear_active_connection_state(error)
218219

219220
async def maybe_write_frame(self, frame: AnyClientFrame) -> bool:
220221
if not self._active_connection_state:
221222
return False
222223
try:
223224
await self._active_connection_state.connection.write_frame(frame)
224-
except ConnectionLostError:
225-
self._clear_active_connection_state()
225+
except ConnectionLostError as error:
226+
self._clear_active_connection_state(error)
226227
return False
227228
return True

packages/stompman/stompman/errors.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ def __str__(self) -> str:
1414
class ConnectionLostError(Error):
1515
"""Raised in stompman.AbstractConnection—and handled in stompman.ConnectionManager, therefore is private."""
1616

17+
reason: Exception | str
18+
1719

1820
@dataclass(frozen=True, kw_only=True, slots=True)
1921
class ConnectionConfirmationTimeout:

packages/stompman/test_stompman/test_connection_manager.py

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,9 @@ class MockConnection(BaseMockConnection):
132132

133133

134134
async def test_get_active_connection_state_lifespan_flaky_ok() -> None:
135-
enter = mock.AsyncMock(side_effect=[ConnectionLostError, build_dataclass(EstablishedConnectionResult)])
135+
enter = mock.AsyncMock(
136+
side_effect=[build_dataclass(ConnectionLostError), build_dataclass(EstablishedConnectionResult)]
137+
)
136138
lifespan_factory = mock.Mock(return_value=mock.Mock(enter=enter))
137139
manager = EnrichedConnectionManager(lifespan_factory=lifespan_factory, connection_class=BaseMockConnection)
138140

@@ -154,7 +156,7 @@ async def test_get_active_connection_state_lifespan_flaky_ok() -> None:
154156

155157

156158
async def test_get_active_connection_state_lifespan_flaky_fails() -> None:
157-
enter = mock.AsyncMock(side_effect=ConnectionLostError)
159+
enter = mock.AsyncMock(side_effect=build_dataclass(ConnectionLostError))
158160
lifespan_factory = mock.Mock(return_value=mock.Mock(enter=enter))
159161
manager = EnrichedConnectionManager(lifespan_factory=lifespan_factory, connection_class=BaseMockConnection)
160162

@@ -209,16 +211,16 @@ async def test_get_active_connection_state_ok_concurrent() -> None:
209211

210212
async def test_connection_manager_context_connection_lost() -> None:
211213
async with EnrichedConnectionManager(connection_class=BaseMockConnection) as manager:
212-
manager._clear_active_connection_state()
213-
manager._clear_active_connection_state()
214+
manager._clear_active_connection_state(build_dataclass(ConnectionLostError))
215+
manager._clear_active_connection_state(build_dataclass(ConnectionLostError))
214216

215217

216218
async def test_connection_manager_context_lifespan_aexit_raises_connection_lost() -> None:
217219
async with EnrichedConnectionManager(
218220
lifespan_factory=mock.Mock(
219221
return_value=mock.Mock(
220222
enter=mock.AsyncMock(return_value=build_dataclass(EstablishedConnectionResult)),
221-
exit=mock.AsyncMock(side_effect=[ConnectionLostError]),
223+
exit=mock.AsyncMock(side_effect=[build_dataclass(ConnectionLostError)]),
222224
)
223225
),
224226
connection_class=BaseMockConnection,
@@ -248,7 +250,13 @@ class MockConnection(BaseMockConnection):
248250

249251

250252
async def test_write_heartbeat_reconnecting_raises() -> None:
251-
write_heartbeat_mock = mock.Mock(side_effect=[ConnectionLostError, ConnectionLostError, ConnectionLostError])
253+
write_heartbeat_mock = mock.Mock(
254+
side_effect=[
255+
build_dataclass(ConnectionLostError),
256+
build_dataclass(ConnectionLostError),
257+
build_dataclass(ConnectionLostError),
258+
]
259+
)
252260

253261
class MockConnection(BaseMockConnection):
254262
write_heartbeat = write_heartbeat_mock
@@ -260,7 +268,13 @@ class MockConnection(BaseMockConnection):
260268

261269

262270
async def test_write_frame_reconnecting_raises() -> None:
263-
write_frame_mock = mock.AsyncMock(side_effect=[ConnectionLostError, ConnectionLostError, ConnectionLostError])
271+
write_frame_mock = mock.AsyncMock(
272+
side_effect=[
273+
build_dataclass(ConnectionLostError),
274+
build_dataclass(ConnectionLostError),
275+
build_dataclass(ConnectionLostError),
276+
]
277+
)
264278

265279
class MockConnection(BaseMockConnection):
266280
write_frame = write_frame_mock
@@ -271,7 +285,11 @@ class MockConnection(BaseMockConnection):
271285
await manager.write_frame_reconnecting(build_dataclass(ConnectFrame))
272286

273287

274-
SIDE_EFFECTS = [(None,), (ConnectionLostError(), None), (ConnectionLostError(), ConnectionLostError(), None)]
288+
SIDE_EFFECTS = [
289+
(None,),
290+
(build_dataclass(ConnectionLostError), None),
291+
(build_dataclass(ConnectionLostError), build_dataclass(ConnectionLostError), None),
292+
]
275293

276294

277295
@pytest.mark.parametrize("side_effect", SIDE_EFFECTS)
@@ -318,7 +336,7 @@ async def read_frames() -> AsyncGenerator[AnyServerFrame, None]:
318336
attempt += 1
319337
current_effect = side_effect[attempt]
320338
if isinstance(current_effect, ConnectionLostError):
321-
raise ConnectionLostError
339+
raise current_effect
322340
for frame in frames:
323341
yield frame
324342

@@ -339,7 +357,7 @@ async def test_maybe_write_frame_connection_already_lost() -> None:
339357

340358
async def test_maybe_write_frame_connection_now_lost() -> None:
341359
class MockConnection(BaseMockConnection):
342-
write_frame = mock.AsyncMock(side_effect=[ConnectionLostError])
360+
write_frame = mock.AsyncMock(side_effect=[build_dataclass(ConnectionLostError)])
343361

344362
async with EnrichedConnectionManager(connection_class=MockConnection) as manager:
345363
assert not await manager.maybe_write_frame(build_dataclass(ConnectFrame))

packages/stompman/test_stompman/test_subscription.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
SubscribeFrame,
2121
UnsubscribeFrame,
2222
)
23+
from stompman.errors import ConnectionLostError
2324

2425
from test_stompman.conftest import (
2526
CONNECT_FRAME,
@@ -52,7 +53,7 @@ async def test_client_subscriptions_lifespan_resubscribe(ack: AckMode, faker: fa
5253
headers=sub_extra_headers,
5354
on_suppressed_exception=noop_error_handler,
5455
)
55-
client._connection_manager._clear_active_connection_state()
56+
client._connection_manager._clear_active_connection_state(build_dataclass(ConnectionLostError))
5657
await client.send(message_body, destination=message_destination)
5758
await subscription.unsubscribe()
5859
await asyncio.sleep(0)
@@ -352,7 +353,7 @@ async def test_client_listen_raises_on_aexit(monkeypatch: pytest.MonkeyPatch, fa
352353

353354
async def close_connection_soon(client: stompman.Client) -> None:
354355
await asyncio.sleep(0)
355-
client._connection_manager._clear_active_connection_state()
356+
client._connection_manager._clear_active_connection_state(build_dataclass(ConnectionLostError))
356357

357358
with pytest.raises(ExceptionGroup) as exc_info: # noqa: PT012
358359
async with asyncio.TaskGroup() as task_group, EnrichedClient(connection_class=connection_class) as client:

packages/stompman/test_stompman/test_transaction.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,14 @@
1212
CommitFrame,
1313
SendFrame,
1414
)
15+
from stompman.errors import ConnectionLostError
1516

1617
from test_stompman.conftest import (
1718
CONNECT_FRAME,
1819
CONNECTED_FRAME,
1920
EnrichedClient,
2021
SomeError,
22+
build_dataclass,
2123
create_spying_connection,
2224
enrich_expected_frames,
2325
get_read_frames_with_lifespan,
@@ -85,7 +87,7 @@ async def test_commit_pending_transactions(monkeypatch: pytest.MonkeyPatch, fake
8587
async with EnrichedClient(connection_class=connection_class) as client:
8688
async with client.begin() as first_transaction:
8789
await first_transaction.send(body, destination=destination)
88-
client._connection_manager._clear_active_connection_state()
90+
client._connection_manager._clear_active_connection_state(build_dataclass(ConnectionLostError))
8991
async with client.begin() as second_transaction:
9092
await second_transaction.send(body, destination=destination)
9193
await asyncio.sleep(0)

0 commit comments

Comments
 (0)