Skip to content

SQLAlchemy middleware #104

@danielknell

Description

@danielknell

Description

Add a banshee.extra.sqlalchemy package containing a transaction handling middleware that will roll back the changes of all handlers, should one fail in the handling of subsequent events.

Motivation

An ideal system should be designed such that any error while handling a request dispatched by a handler, will leave the system in a working state. And each handler will hold minimal locks on the underlying data source.

def add_widget(
    cmd: AddWidget, 
    uow: UnitOfWork, 
    bus: Bus,
    repo: WidgetRepository
) -> None:
    with uow:
        repo.add(Widget(name=cmd.name))

        uow.commit()

    bus.handle(WidgetAdded)

However, in the example of a relational database, it can be far simpler to wrap the initial dispatching of a request in a database transaction, meaning any error will result in the actions of all handlers being rolled back -- and instead using savepoints within the handlers.

with session.begin():
    bus.handle(AddWidget(name="foobar"))

Each style has its drawbacks, the first creates a more complex event stream and needs explicit handling of any errors, while the second needs careful care to ensure no external systems the handlers may have called do becomes out of sync with the state stored in the database.

bus = (
    banshee.Builder()
    .with_locator(registry)
    .with_middleware(...)
    .build()
)

bus.handle(AddWidget(name="foobar"))

The second option can be abstracted further using a middleware, and given the prevelance of SQLAlchemy in python applications, it would be nice to have a standard well tested implementation for it within the library.

Possible implementation

A middleware that wraps subsequent middleware in a transaction, keeping track so as to not try and nest transactions when a handler itself handles a request.

class SQLAlchemyMiddleware:
    def __init__(
        self, 
        get_session: Callable[[], sqlalchemy.Session], 
        nested: bool = False
    ) -> None:
        self.get_session = get_session
        self.nested = nesting

        self._is_transaction = contextvars.ContextVar(
            name="_is_active", 
            default=False
        )

    async def __call__(
        message: banshee.Message[T], 
        handle: banshee.HandleMessage
    ) -> banshee.Message[T]:
        session = self.get_session()

        if self._is_transaction.get():
            if not self.nested:
                return await handle(message)

            async with session.nested_transaction():
                return await handle(message)


        token = self._is_transaction.set(True)

        try:
            async with await session.begin()
                return await handle(message)
        finally:
            self._is_transaction.reset(token)

Metadata

Metadata

Assignees

No one assigned

    Labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions