-
-
Notifications
You must be signed in to change notification settings - Fork 1
Description
Description
Add a banshee.extra.sqlalchemy package containing a transaction handling middleware that will roll back the changes of all handlers, should one fail in the handling of subsequent events.
Motivation
An ideal system should be designed such that any error while handling a request dispatched by a handler, will leave the system in a working state. And each handler will hold minimal locks on the underlying data source.
def add_widget(
cmd: AddWidget,
uow: UnitOfWork,
bus: Bus,
repo: WidgetRepository
) -> None:
with uow:
repo.add(Widget(name=cmd.name))
uow.commit()
bus.handle(WidgetAdded)However, in the example of a relational database, it can be far simpler to wrap the initial dispatching of a request in a database transaction, meaning any error will result in the actions of all handlers being rolled back -- and instead using savepoints within the handlers.
with session.begin():
bus.handle(AddWidget(name="foobar"))Each style has its drawbacks, the first creates a more complex event stream and needs explicit handling of any errors, while the second needs careful care to ensure no external systems the handlers may have called do becomes out of sync with the state stored in the database.
bus = (
banshee.Builder()
.with_locator(registry)
.with_middleware(...)
.build()
)
bus.handle(AddWidget(name="foobar"))The second option can be abstracted further using a middleware, and given the prevelance of SQLAlchemy in python applications, it would be nice to have a standard well tested implementation for it within the library.
Possible implementation
A middleware that wraps subsequent middleware in a transaction, keeping track so as to not try and nest transactions when a handler itself handles a request.
class SQLAlchemyMiddleware:
def __init__(
self,
get_session: Callable[[], sqlalchemy.Session],
nested: bool = False
) -> None:
self.get_session = get_session
self.nested = nesting
self._is_transaction = contextvars.ContextVar(
name="_is_active",
default=False
)
async def __call__(
message: banshee.Message[T],
handle: banshee.HandleMessage
) -> banshee.Message[T]:
session = self.get_session()
if self._is_transaction.get():
if not self.nested:
return await handle(message)
async with session.nested_transaction():
return await handle(message)
token = self._is_transaction.set(True)
try:
async with await session.begin()
return await handle(message)
finally:
self._is_transaction.reset(token)