Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions packages/faststream-stomp/faststream_stomp/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ def __init__(
description: str | None = None,
tags: Iterable[Tag | TagDict] = (),
) -> None:
fd_config = FastDependsConfig(use_fastdepends=apply_types)
broker_config = BrokerConfigWithStompClient(
broker_middlewares=middlewares, # type: ignore[arg-type]
broker_parser=parser,
Expand All @@ -115,11 +116,11 @@ def __init__(
log_level=log_level,
default_storage_cls=StompParamsStorage, # type: ignore[type-abstract]
),
fd_config=FastDependsConfig(use_fastdepends=apply_types),
fd_config=fd_config,
broker_dependencies=dependencies,
graceful_timeout=graceful_timeout,
extra_context={"broker": self},
producer=StompProducer(client),
producer=StompProducer(client=client, serializer=fd_config._serializer),
client=client,
)
specification = BrokerSpec(
Expand Down
6 changes: 4 additions & 2 deletions packages/faststream-stomp/faststream_stomp/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from typing import Any, NoReturn

import stompman
from fast_depends.library.serializer import SerializerProto
from faststream import PublishCommand, PublishType
from faststream._internal.basic_types import SendableMessage
from faststream._internal.configs import BrokerConfig
Expand All @@ -23,11 +24,12 @@ class StompProducer(ProducerProto[StompPublishCommand]):
_parser: AsyncCallable
_decoder: AsyncCallable

def __init__(self, client: stompman.Client) -> None:
def __init__(self, *, client: stompman.Client, serializer: SerializerProto | None) -> None:
self.client = client
self.serializer = serializer

async def publish(self, cmd: StompPublishCommand) -> None:
body, content_type = encode_message(cmd.body, serializer=None)
body, content_type = encode_message(cmd.body, serializer=self.serializer)
all_headers = cmd.headers.copy() if cmd.headers else {}
if cmd.correlation_id:
all_headers["correlation-id"] = cmd.correlation_id
Expand Down
2 changes: 1 addition & 1 deletion packages/faststream-stomp/faststream_stomp/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def __init__(self, broker: StompBroker) -> None:
self.broker = broker

async def publish(self, cmd: StompPublishCommand) -> None:
body, content_type = encode_message(cmd.body, serializer=None)
body, content_type = encode_message(cmd.body, serializer=self.broker.config.fd_config._serializer)
all_headers: MessageHeaders = (cmd.headers.copy() if cmd.headers else {}) | { # type: ignore[assignment]
"destination": cmd.destination,
"message-id": str(uuid.uuid4()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import faker
import faststream_stomp
import pydantic
import pytest
import stompman
from asgi_lifespan import LifespanManager
Expand All @@ -17,6 +18,7 @@
from faststream.message import gen_cor_id
from faststream_stomp.models import StompStreamMessage
from faststream_stomp.router import StompRoutePublisher
from polyfactory.factories.pydantic_factory import ModelFactory

if TYPE_CHECKING:
from faststream_stomp.broker import StompBroker
Expand Down Expand Up @@ -235,3 +237,11 @@ async def test_broker_connect_twice(broker: faststream_stomp.StompBroker) -> Non
app = AsgiFastStream(broker, on_startup=[broker.connect])
async with LifespanManager(app):
pass


async def test_publish_pydantic(faker: faker.Faker, broker: faststream_stomp.StompBroker) -> None:
class SomePydanticModel(pydantic.BaseModel):
foo: str

async with broker:
await broker.publish(ModelFactory.create_factory(SomePydanticModel).build(), faker.pystr())
70 changes: 40 additions & 30 deletions packages/faststream-stomp/test_faststream_stomp/test_main.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import faker
import faststream_stomp
import pydantic
import pytest
import stompman
from faststream import FastStream
Expand All @@ -8,6 +9,7 @@
from faststream_stomp.prometheus import StompPrometheusMiddleware
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.trace import TracerProvider
from polyfactory.factories.pydantic_factory import ModelFactory
from prometheus_client import CollectorRegistry
from test_stompman.conftest import build_dataclass

Expand All @@ -24,36 +26,44 @@ def broker(fake_connection_params: stompman.ConnectionParameters) -> faststream_
return faststream_stomp.StompBroker(stompman.Client([fake_connection_params]))


async def test_testing(faker: faker.Faker, broker: faststream_stomp.StompBroker) -> None:
expected_body, first_destination, second_destination, third_destination, correlation_id = (
faker.pystr(),
faker.pystr(),
faker.pystr(),
faker.pystr(),
gen_cor_id(),
)
second_publisher = broker.publisher(second_destination)
third_publisher = broker.publisher(third_destination)

@broker.subscriber(first_destination)
@second_publisher
@third_publisher
def first_handle(body: str) -> str:
assert body == expected_body
return body

@broker.subscriber(second_destination)
def second_handle(body: str) -> None:
assert body == expected_body

async with faststream_stomp.TestStompBroker(broker) as br:
await br.publish(expected_body, first_destination, correlation_id=correlation_id)
assert first_handle.mock
first_handle.mock.assert_called_once_with(expected_body)
assert second_publisher.mock
second_publisher.mock.assert_called_once_with(expected_body)
assert third_publisher.mock
third_publisher.mock.assert_called_once_with(expected_body)
class TestTesting:
async def test_integration(self, faker: faker.Faker, broker: faststream_stomp.StompBroker) -> None:
expected_body, first_destination, second_destination, third_destination, correlation_id = (
faker.pystr(),
faker.pystr(),
faker.pystr(),
faker.pystr(),
gen_cor_id(),
)
second_publisher = broker.publisher(second_destination)
third_publisher = broker.publisher(third_destination)

@broker.subscriber(first_destination)
@second_publisher
@third_publisher
def first_handle(body: str) -> str:
assert body == expected_body
return body

@broker.subscriber(second_destination)
def second_handle(body: str) -> None:
assert body == expected_body

async with faststream_stomp.TestStompBroker(broker) as br:
await br.publish(expected_body, first_destination, correlation_id=correlation_id)
assert first_handle.mock
first_handle.mock.assert_called_once_with(expected_body)
assert second_publisher.mock
second_publisher.mock.assert_called_once_with(expected_body)
assert third_publisher.mock
third_publisher.mock.assert_called_once_with(expected_body)

async def test_publish_pydantic(self, faker: faker.Faker, broker: faststream_stomp.StompBroker) -> None:
class SomePydanticModel(pydantic.BaseModel):
foo: str

async with faststream_stomp.TestStompBroker(broker) as br:
await br.publish(ModelFactory.create_factory(SomePydanticModel).build(), faker.pystr())


class TestNotImplemented:
Expand Down
Loading