Skip to content

Commit 4b39622

Browse files
authored
faststream-stomp: Fix publishing Pydantic models (#164)
1 parent c808f4b commit 4b39622

File tree

5 files changed

+58
-35
lines changed

5 files changed

+58
-35
lines changed

packages/faststream-stomp/faststream_stomp/broker.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ def __init__(
106106
description: str | None = None,
107107
tags: Iterable[Tag | TagDict] = (),
108108
) -> None:
109+
fd_config = FastDependsConfig(use_fastdepends=apply_types)
109110
broker_config = BrokerConfigWithStompClient(
110111
broker_middlewares=middlewares, # type: ignore[arg-type]
111112
broker_parser=parser,
@@ -115,11 +116,11 @@ def __init__(
115116
log_level=log_level,
116117
default_storage_cls=StompParamsStorage, # type: ignore[type-abstract]
117118
),
118-
fd_config=FastDependsConfig(use_fastdepends=apply_types),
119+
fd_config=fd_config,
119120
broker_dependencies=dependencies,
120121
graceful_timeout=graceful_timeout,
121122
extra_context={"broker": self},
122-
producer=StompProducer(client),
123+
producer=StompProducer(client=client, serializer=fd_config._serializer),
123124
client=client,
124125
)
125126
specification = BrokerSpec(

packages/faststream-stomp/faststream_stomp/publisher.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
from typing import Any, NoReturn
33

44
import stompman
5+
from fast_depends.library.serializer import SerializerProto
56
from faststream import PublishCommand, PublishType
67
from faststream._internal.basic_types import SendableMessage
78
from faststream._internal.configs import BrokerConfig
@@ -23,11 +24,12 @@ class StompProducer(ProducerProto[StompPublishCommand]):
2324
_parser: AsyncCallable
2425
_decoder: AsyncCallable
2526

26-
def __init__(self, client: stompman.Client) -> None:
27+
def __init__(self, *, client: stompman.Client, serializer: SerializerProto | None) -> None:
2728
self.client = client
29+
self.serializer = serializer
2830

2931
async def publish(self, cmd: StompPublishCommand) -> None:
30-
body, content_type = encode_message(cmd.body, serializer=None)
32+
body, content_type = encode_message(cmd.body, serializer=self.serializer)
3133
all_headers = cmd.headers.copy() if cmd.headers else {}
3234
if cmd.correlation_id:
3335
all_headers["correlation-id"] = cmd.correlation_id

packages/faststream-stomp/faststream_stomp/testing.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ def __init__(self, broker: StompBroker) -> None:
6262
self.broker = broker
6363

6464
async def publish(self, cmd: StompPublishCommand) -> None:
65-
body, content_type = encode_message(cmd.body, serializer=None)
65+
body, content_type = encode_message(cmd.body, serializer=self.broker.config.fd_config._serializer)
6666
all_headers: MessageHeaders = (cmd.headers.copy() if cmd.headers else {}) | { # type: ignore[assignment]
6767
"destination": cmd.destination,
6868
"message-id": str(uuid.uuid4()),

packages/faststream-stomp/test_faststream_stomp/test_integration.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
import faker
1010
import faststream_stomp
11+
import pydantic
1112
import pytest
1213
import stompman
1314
from asgi_lifespan import LifespanManager
@@ -17,6 +18,7 @@
1718
from faststream.message import gen_cor_id
1819
from faststream_stomp.models import StompStreamMessage
1920
from faststream_stomp.router import StompRoutePublisher
21+
from polyfactory.factories.pydantic_factory import ModelFactory
2022

2123
if TYPE_CHECKING:
2224
from faststream_stomp.broker import StompBroker
@@ -235,3 +237,11 @@ async def test_broker_connect_twice(broker: faststream_stomp.StompBroker) -> Non
235237
app = AsgiFastStream(broker, on_startup=[broker.connect])
236238
async with LifespanManager(app):
237239
pass
240+
241+
242+
async def test_publish_pydantic(faker: faker.Faker, broker: faststream_stomp.StompBroker) -> None:
243+
class SomePydanticModel(pydantic.BaseModel):
244+
foo: str
245+
246+
async with broker:
247+
await broker.publish(ModelFactory.create_factory(SomePydanticModel).build(), faker.pystr())

packages/faststream-stomp/test_faststream_stomp/test_main.py

Lines changed: 40 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import faker
22
import faststream_stomp
3+
import pydantic
34
import pytest
45
import stompman
56
from faststream import FastStream
@@ -8,6 +9,7 @@
89
from faststream_stomp.prometheus import StompPrometheusMiddleware
910
from opentelemetry.sdk.metrics import MeterProvider
1011
from opentelemetry.sdk.trace import TracerProvider
12+
from polyfactory.factories.pydantic_factory import ModelFactory
1113
from prometheus_client import CollectorRegistry
1214
from test_stompman.conftest import build_dataclass
1315

@@ -24,36 +26,44 @@ def broker(fake_connection_params: stompman.ConnectionParameters) -> faststream_
2426
return faststream_stomp.StompBroker(stompman.Client([fake_connection_params]))
2527

2628

27-
async def test_testing(faker: faker.Faker, broker: faststream_stomp.StompBroker) -> None:
28-
expected_body, first_destination, second_destination, third_destination, correlation_id = (
29-
faker.pystr(),
30-
faker.pystr(),
31-
faker.pystr(),
32-
faker.pystr(),
33-
gen_cor_id(),
34-
)
35-
second_publisher = broker.publisher(second_destination)
36-
third_publisher = broker.publisher(third_destination)
37-
38-
@broker.subscriber(first_destination)
39-
@second_publisher
40-
@third_publisher
41-
def first_handle(body: str) -> str:
42-
assert body == expected_body
43-
return body
44-
45-
@broker.subscriber(second_destination)
46-
def second_handle(body: str) -> None:
47-
assert body == expected_body
48-
49-
async with faststream_stomp.TestStompBroker(broker) as br:
50-
await br.publish(expected_body, first_destination, correlation_id=correlation_id)
51-
assert first_handle.mock
52-
first_handle.mock.assert_called_once_with(expected_body)
53-
assert second_publisher.mock
54-
second_publisher.mock.assert_called_once_with(expected_body)
55-
assert third_publisher.mock
56-
third_publisher.mock.assert_called_once_with(expected_body)
29+
class TestTesting:
30+
async def test_integration(self, faker: faker.Faker, broker: faststream_stomp.StompBroker) -> None:
31+
expected_body, first_destination, second_destination, third_destination, correlation_id = (
32+
faker.pystr(),
33+
faker.pystr(),
34+
faker.pystr(),
35+
faker.pystr(),
36+
gen_cor_id(),
37+
)
38+
second_publisher = broker.publisher(second_destination)
39+
third_publisher = broker.publisher(third_destination)
40+
41+
@broker.subscriber(first_destination)
42+
@second_publisher
43+
@third_publisher
44+
def first_handle(body: str) -> str:
45+
assert body == expected_body
46+
return body
47+
48+
@broker.subscriber(second_destination)
49+
def second_handle(body: str) -> None:
50+
assert body == expected_body
51+
52+
async with faststream_stomp.TestStompBroker(broker) as br:
53+
await br.publish(expected_body, first_destination, correlation_id=correlation_id)
54+
assert first_handle.mock
55+
first_handle.mock.assert_called_once_with(expected_body)
56+
assert second_publisher.mock
57+
second_publisher.mock.assert_called_once_with(expected_body)
58+
assert third_publisher.mock
59+
third_publisher.mock.assert_called_once_with(expected_body)
60+
61+
async def test_publish_pydantic(self, faker: faker.Faker, broker: faststream_stomp.StompBroker) -> None:
62+
class SomePydanticModel(pydantic.BaseModel):
63+
foo: str
64+
65+
async with faststream_stomp.TestStompBroker(broker) as br:
66+
await br.publish(ModelFactory.create_factory(SomePydanticModel).build(), faker.pystr())
5767

5868

5969
class TestNotImplemented:

0 commit comments

Comments
 (0)