Skip to content

Running enqueued/pending workflows on worker startup #534

@pmalic

Description

@pmalic

Presently, to implement a cron-type worker you need to implement a waiting loop, something similar to the example from the Vercel integration docs (https://docs.dbos.dev/integrations/vercel):

async def main():
    DBOS.launch()
    while True:
        await asyncio.sleep(300)
        if not await DBOS.list_queued_workflows_async():
            break

if __name__ == "__main__":
    asyncio.run(main())

Have you considered adding a function (or an optional parameter to DBOS.launch) that would wait until all currently enqueued/pending workflows "dequeue-able" by the current worker are executed?

Using a wait loop with list_queued_workflows can be bug-prone because you could have a queued workflow in a queue not defined in this worker so this loop would run forever without actually dequeueing any workflows. I encountered this problem myself in Python and solved it by adding the queue_name parameter to list_queued_workflows (you could maybe adjust the Vercel example so other people don't run into the same issue :).

Or maybe you could extend the Python docs to include an example of this worker "pattern". Curiously, the LLM prompt page has an example of something related (await asyncio.Event().wait()), but this isn't mentioned in the "human-intended" docs, i.e., the case when you have only your workflow code and nothing else.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions