From 05ddfe2cd925e5c08e01c541e92bf6083937ad89 Mon Sep 17 00:00:00 2001 From: Lev Vereshchagin Date: Tue, 5 Nov 2024 09:58:13 +0300 Subject: [PATCH 1/8] Unuse ARTEMIS_HOST variable --- examples/consumer.py | 8 +------- examples/producer.py | 8 +------- 2 files changed, 2 insertions(+), 14 deletions(-) diff --git a/examples/consumer.py b/examples/consumer.py index 3eaba086..5bde0554 100644 --- a/examples/consumer.py +++ b/examples/consumer.py @@ -1,14 +1,8 @@ import asyncio -import os import stompman -server = stompman.ConnectionParameters( - host=os.environ.get("ARTEMIS_HOST", "0.0.0.0"), # noqa: S104 - port=61616, - login="admin", - passcode=":=123", -) +server = stompman.ConnectionParameters(host="0.0.0.0", port=61616, login="admin", passcode=":=123") # noqa: S104 async def handle_message(message_frame: stompman.MessageFrame) -> None: diff --git a/examples/producer.py b/examples/producer.py index 8fd1533d..05eb1f8e 100644 --- a/examples/producer.py +++ b/examples/producer.py @@ -1,14 +1,8 @@ import asyncio -import os import stompman -server = stompman.ConnectionParameters( - host=os.environ.get("ARTEMIS_HOST", "0.0.0.0"), # noqa: S104 - port=61616, - login="admin", - passcode=":=123", -) +server = stompman.ConnectionParameters(host="0.0.0.0", port=61616, login="admin", passcode=":=123") # noqa: S104 async def main() -> None: From 969ca15492cdc39d792e0d22d948bd3beddcd612 Mon Sep 17 00:00:00 2001 From: Lev Vereshchagin Date: Tue, 5 Nov 2024 09:58:26 +0300 Subject: [PATCH 2/8] Add activemq-classic image --- docker-compose.yml | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 45a82225..d48878b9 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -5,12 +5,10 @@ services: args: PYTHON_IMAGE: ${PYTHON_IMAGE:-python:3.13-slim-bullseye} depends_on: - artemis: - condition: service_started - environment: - ARTEMIS_HOST: artemis + - activemq-artemis + - activemq-classic - artemis: + activemq-artemis: image: apache/activemq-artemis:2.37.0-alpine environment: ARTEMIS_USER: admin @@ -18,3 +16,13 @@ services: ports: - 8161:8161 - 61616:61616 + + activemq-classic: + image: apache/activemq-classic:6.1.2 + environment: + ACTIVEMQ_CONNECTION_USER: admin + ACTIVEMQ_CONNECTION_PASSWORD: ":=123" + # command: activemq console --help + # ports: + # - 8161:8161 + # - 61616:61616 From e05de2449e2ae4c7c2e8d5dcc10b2a374158362d Mon Sep 17 00:00:00 2001 From: Lev Vereshchagin Date: Tue, 5 Nov 2024 10:01:07 +0300 Subject: [PATCH 3/8] Test for both --- tests/integration.py | 29 +++++++++++++++++++---------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/tests/integration.py b/tests/integration.py index 06092625..670b8a63 100644 --- a/tests/integration.py +++ b/tests/integration.py @@ -1,9 +1,8 @@ import asyncio -import os from collections.abc import AsyncGenerator, Callable from contextlib import asynccontextmanager from itertools import starmap -from typing import Final +from typing import Final, cast from uuid import uuid4 import pytest @@ -21,23 +20,29 @@ parse_header, ) -pytestmark = pytest.mark.anyio +DESTINATION: Final = "DLQ" + -CONNECTION_PARAMETERS: Final = stompman.ConnectionParameters( - host=os.environ["ARTEMIS_HOST"], port=61616, login="admin", passcode=":=123" +@pytest.fixture( + params=[ + stompman.ConnectionParameters(host="activemq-artemis", port=61616, login="admin", passcode=":=123"), + stompman.ConnectionParameters(host="activemq-classic", port=61613, login="admin", passcode=":=123"), + ] ) -DESTINATION: Final = "DLQ" +def connection_parameters(request: pytest.FixtureRequest) -> stompman.ConnectionParameters: + return cast(stompman.ConnectionParameters, request.param) @asynccontextmanager -async def create_client() -> AsyncGenerator[stompman.Client, None]: +async def create_client(connection_parameters: stompman.ConnectionParameters) -> AsyncGenerator[stompman.Client, None]: async with stompman.Client( - servers=[CONNECTION_PARAMETERS], read_timeout=10, connection_confirmation_timeout=10 + servers=[connection_parameters], read_timeout=10, connection_confirmation_timeout=10 ) as client: yield client -async def test_ok() -> None: +@pytest.mark.anyio +async def test_ok(connection_parameters: stompman.ConnectionParameters) -> None: async def produce() -> None: for message in messages[200:]: await producer.send(body=message, destination=DESTINATION, headers={"hello": "from outside transaction"}) @@ -65,7 +70,11 @@ async def handle_message(frame: stompman.MessageFrame) -> None: # noqa: RUF029 messages = [str(uuid4()).encode() for _ in range(10000)] - async with create_client() as consumer, create_client() as producer, asyncio.TaskGroup() as task_group: + async with ( + create_client(connection_parameters) as consumer, + create_client(connection_parameters) as producer, + asyncio.TaskGroup() as task_group, + ): task_group.create_task(consume()) task_group.create_task(produce()) From 4c037820dbd219fab34cd4914e4ae0013ba4b80b Mon Sep 17 00:00:00 2001 From: Lev Vereshchagin Date: Tue, 5 Nov 2024 10:13:37 +0300 Subject: [PATCH 4/8] Use `ack`, not `message-id` for acking/nacking to support activemq classic --- docker-compose.yml | 10 ++++------ stompman/subscription.py | 18 ++++++++++++------ 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index d48878b9..b64fd933 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -5,8 +5,10 @@ services: args: PYTHON_IMAGE: ${PYTHON_IMAGE:-python:3.13-slim-bullseye} depends_on: - - activemq-artemis - - activemq-classic + activemq-artemis: + condition: service_started + activemq-classic: + condition: service_started activemq-artemis: image: apache/activemq-artemis:2.37.0-alpine @@ -22,7 +24,3 @@ services: environment: ACTIVEMQ_CONNECTION_USER: admin ACTIVEMQ_CONNECTION_PASSWORD: ":=123" - # command: activemq console --help - # ports: - # - 8161:8161 - # - 61616:61616 diff --git a/stompman/subscription.py b/stompman/subscription.py index c6b3efbc..bd4e8431 100644 --- a/stompman/subscription.py +++ b/stompman/subscription.py @@ -49,18 +49,24 @@ async def _run_handler(self, *, frame: MessageFrame) -> None: try: await self.handler(frame) except self.suppressed_exception_classes as exception: - if self._should_handle_ack_nack and self.id in self._active_subscriptions: + if ( + self._should_handle_ack_nack + and self.id in self._active_subscriptions + and (ack_id := frame.headers["ack"]) + ): await self._connection_manager.maybe_write_frame( - NackFrame( - headers={"id": frame.headers["message-id"], "subscription": frame.headers["subscription"]} - ) + NackFrame(headers={"id": ack_id, "subscription": frame.headers["subscription"]}) ) self.on_suppressed_exception(exception, frame) else: - if self._should_handle_ack_nack and self.id in self._active_subscriptions: + if ( + self._should_handle_ack_nack + and self.id in self._active_subscriptions + and (ack_id := frame.headers["ack"]) + ): await self._connection_manager.maybe_write_frame( AckFrame( - headers={"id": frame.headers["message-id"], "subscription": frame.headers["subscription"]}, + headers={"id": ack_id, "subscription": frame.headers["subscription"]}, ) ) From 80b02c4ed209ea0f206bb2ae650bc9a7ee9e1969 Mon Sep 17 00:00:00 2001 From: Lev Vereshchagin Date: Tue, 5 Nov 2024 10:14:31 +0300 Subject: [PATCH 5/8] Fix tests --- tests/test_subscription.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/test_subscription.py b/tests/test_subscription.py index 0867557a..87ab1e64 100644 --- a/tests/test_subscription.py +++ b/tests/test_subscription.py @@ -223,11 +223,11 @@ async def test_client_listen_unsubscribe_before_ack_or_nack( async def test_client_listen_ack_nack_sent( monkeypatch: pytest.MonkeyPatch, faker: faker.Faker, ack: AckMode, *, ok: bool ) -> None: - subscription_id, destination, message_id = faker.pystr(), faker.pystr(), faker.pystr() + subscription_id, destination, ack_id = faker.pystr(), faker.pystr(), faker.pystr() monkeypatch.setattr(stompman.subscription, "_make_subscription_id", mock.Mock(return_value=subscription_id)) message_frame = build_dataclass( - MessageFrame, headers={"destination": destination, "message-id": message_id, "subscription": subscription_id} + MessageFrame, headers={"destination": destination, "ack": ack_id, "subscription": subscription_id} ) connection_class, collected_frames = create_spying_connection(*get_read_frames_with_lifespan([message_frame])) message_handler = mock.AsyncMock(side_effect=None if ok else SomeError) @@ -244,9 +244,9 @@ async def test_client_listen_ack_nack_sent( assert collected_frames == enrich_expected_frames( SubscribeFrame(headers={"id": subscription_id, "destination": destination, "ack": ack}), message_frame, - AckFrame(headers={"id": message_id, "subscription": subscription_id}) + AckFrame(headers={"id": ack_id, "subscription": subscription_id}) if ok - else NackFrame(headers={"id": message_id, "subscription": subscription_id}), + else NackFrame(headers={"id": ack_id, "subscription": subscription_id}), UnsubscribeFrame(headers={"id": subscription_id}), ) From d984db091a4b3a9f8d8858c6c16458ea21c43009 Mon Sep 17 00:00:00 2001 From: Lev Vereshchagin Date: Tue, 5 Nov 2024 10:15:40 +0300 Subject: [PATCH 6/8] Update readme --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 911cf9df..590873d8 100644 --- a/README.md +++ b/README.md @@ -127,7 +127,7 @@ stompman takes care of cleaning up resources automatically. When you leave the c Also, I want to pointed out that: - Protocol parsing is inspired by [aiostomp](https://github.com/pedrokiefer/aiostomp/blob/3449dcb53f43e5956ccc7662bb5b7d76bc6ef36b/aiostomp/protocol.py) (meaning: consumed by me and refactored from). -- stompman is tested and used with [Artemis ActiveMQ](https://activemq.apache.org/components/artemis/). +- stompman is tested and used with [ActiveMQ Artemis](https://activemq.apache.org/components/artemis/) and [ActiveMQ Classic](https://activemq.apache.org/components/classic/). - Specification says that headers in CONNECT and CONNECTED frames shouldn't be escaped for backwards compatibility. stompman escapes headers in CONNECT frame (outcoming), but does not unescape headers in CONNECTED (outcoming). ### Examples From 6a7bfbb3c2f0d785be5c02c07eece4c171292623 Mon Sep 17 00:00:00 2001 From: Lev Vereshchagin Date: Tue, 5 Nov 2024 10:16:15 +0300 Subject: [PATCH 7/8] Revert "Fix tests" This reverts commit 80b02c4ed209ea0f206bb2ae650bc9a7ee9e1969. --- tests/test_subscription.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/test_subscription.py b/tests/test_subscription.py index 87ab1e64..0867557a 100644 --- a/tests/test_subscription.py +++ b/tests/test_subscription.py @@ -223,11 +223,11 @@ async def test_client_listen_unsubscribe_before_ack_or_nack( async def test_client_listen_ack_nack_sent( monkeypatch: pytest.MonkeyPatch, faker: faker.Faker, ack: AckMode, *, ok: bool ) -> None: - subscription_id, destination, ack_id = faker.pystr(), faker.pystr(), faker.pystr() + subscription_id, destination, message_id = faker.pystr(), faker.pystr(), faker.pystr() monkeypatch.setattr(stompman.subscription, "_make_subscription_id", mock.Mock(return_value=subscription_id)) message_frame = build_dataclass( - MessageFrame, headers={"destination": destination, "ack": ack_id, "subscription": subscription_id} + MessageFrame, headers={"destination": destination, "message-id": message_id, "subscription": subscription_id} ) connection_class, collected_frames = create_spying_connection(*get_read_frames_with_lifespan([message_frame])) message_handler = mock.AsyncMock(side_effect=None if ok else SomeError) @@ -244,9 +244,9 @@ async def test_client_listen_ack_nack_sent( assert collected_frames == enrich_expected_frames( SubscribeFrame(headers={"id": subscription_id, "destination": destination, "ack": ack}), message_frame, - AckFrame(headers={"id": ack_id, "subscription": subscription_id}) + AckFrame(headers={"id": message_id, "subscription": subscription_id}) if ok - else NackFrame(headers={"id": ack_id, "subscription": subscription_id}), + else NackFrame(headers={"id": message_id, "subscription": subscription_id}), UnsubscribeFrame(headers={"id": subscription_id}), ) From 15f737369a6545f11eacb9e168d33390b01f37a9 Mon Sep 17 00:00:00 2001 From: Lev Vereshchagin Date: Tue, 5 Nov 2024 10:16:27 +0300 Subject: [PATCH 8/8] Revert "Use `ack`, not `message-id` for acking/nacking to support activemq classic" This reverts commit 4c037820dbd219fab34cd4914e4ae0013ba4b80b. --- stompman/subscription.py | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/stompman/subscription.py b/stompman/subscription.py index bd4e8431..c6b3efbc 100644 --- a/stompman/subscription.py +++ b/stompman/subscription.py @@ -49,24 +49,18 @@ async def _run_handler(self, *, frame: MessageFrame) -> None: try: await self.handler(frame) except self.suppressed_exception_classes as exception: - if ( - self._should_handle_ack_nack - and self.id in self._active_subscriptions - and (ack_id := frame.headers["ack"]) - ): + if self._should_handle_ack_nack and self.id in self._active_subscriptions: await self._connection_manager.maybe_write_frame( - NackFrame(headers={"id": ack_id, "subscription": frame.headers["subscription"]}) + NackFrame( + headers={"id": frame.headers["message-id"], "subscription": frame.headers["subscription"]} + ) ) self.on_suppressed_exception(exception, frame) else: - if ( - self._should_handle_ack_nack - and self.id in self._active_subscriptions - and (ack_id := frame.headers["ack"]) - ): + if self._should_handle_ack_nack and self.id in self._active_subscriptions: await self._connection_manager.maybe_write_frame( AckFrame( - headers={"id": ack_id, "subscription": frame.headers["subscription"]}, + headers={"id": frame.headers["message-id"], "subscription": frame.headers["subscription"]}, ) )