Skip to content
This repository was archived by the owner on Aug 19, 2025. It is now read-only.

Conversation

@random-things
Copy link

Provides a new backend--RedisPydanticStreamBackend--derived from RedisStreamBackend. Instead of a message: str being passed to publish, it takes a Pydantic BaseModel. On the other side, it deserializes the object back into a Pydantic model. Now that I'm reviewing this, I think it'd be beneficial to add the following to publish:

    async def publish(self: typing.Self, channel: str, message: BaseModel) -> None:
        """Publish a message to a channel."""
        msg_type: str = message.__class__.__name__
+
+        if msg_type not in self._module_cache:
+            self._module_cache[msg_type] = message.__class__
+
        message_json: str = message.model_dump_json()
        await self._producer.xadd(channel, {"msg_type": msg_type, "message": message_json})

But it would only be useful in rare cases (dynamically created models or models at lower-than-module scope) or specifically in the test case, dropping the import and class declaration into the test itself.

@alex-oleshkevich
Copy link
Contributor

We do not accept any new backends. Please make it as a gist or separate package and link it in the readme.
#118

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants