Skip to content

Conversation

@dwskoog
Copy link
Contributor

@dwskoog dwskoog commented Dec 19, 2025

Creates new map_async API on Stream and updates the documentation around async work to note it.

I tried map with a coroutine and it failed spectacularly:

    @gen_test()
    def test_map_async_tornado():
        @gen.coroutine
        def add_tor(x=0, y=0):
            return x + y

        source = Stream(asynchronous=True)
        L = source.map(add_tor, y=1).map(add_tor, y=2).sink_to_list()

        yield source.emit(0)

        yield gen.moment  # yield to the event loop to ensure it finished
>       assert L == [3]
E       assert [<Future finished exception=TypeError("unsupported operand type(s) for +: '_asyncio.Future' and 'int'")>] == [3]
E
E         At index 0 diff: <Future finished exception=TypeError("unsupported operand type(s) for +: '_asyncio.Future' and 'int'")> != 3
E
E         Full diff:
E           [
E         -     3,
E         +     <Future finished exception=TypeError("unsupported operand type(s) for +: '_asyncio.Future' and 'int'")>,
E           ]

So I made a new map_async that uses native asyncio plumbing to await the coroutine before feeding it downstream.

The distinguishing of map and map_async was inspired by prior work with mapAsync from Akka Streams as the use of an asyncio.Queue and a callback running on the loop to drain it seems like a decent amount of overhead to avoid when we do not need it.

I'm still trying to figure out if inspect would work to adapt dynamically given either a native async def or a Tornado gen.coroutine. If so, diverging the API would not be needed.

Comment on lines 129 to 155
@gen_test()
def test_map_async_tornado():
@gen.coroutine
def add_tor(x=0, y=0):
return x + y

async def add_native(x=0, y=0):
return x + y

source = Stream(asynchronous=True)
L = source.map_async(add_tor, y=1).map_async(add_native, y=2).sink_to_list()

yield source.emit(0)

yield gen.moment # Must yield to the event loop to ensure it finished
assert L == [3]


@pytest.mark.asyncio
async def test_map_async():
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a Tornado user so I still do not really understand the implication of using it vs native asyncio so I wrote the test twice to show that either harness will run coroutines from each other.

assert_eq(pd.concat(L), expected)


@flaky(max_runs=3, min_passes=1)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed this one leaving an upstream around on occasion. It might be related to GC changes in 3.13+.

@dwskoog dwskoog force-pushed the map_async branch 2 times, most recently from 781e691 to c102bbd Compare December 19, 2025 16:42
@martindurant
Copy link
Member

I believe there are other places in the code where iscoroutinefunction (or isawaitable ?) to decide what to do? It could be argued that, from the current state of the package, all nodes should be asynchronous...

@dwskoog
Copy link
Contributor Author

dwskoog commented Dec 19, 2025

I believe there are other places in the code where iscoroutinefunction (or isawaitable ?) to decide what to do? It could be argued that, from the current state of the package, all nodes should be asynchronous...

Things I learned about Tornado and asyncio:

  • gen.isawaitable is inspect.isawaitable
  • gen.is_coroutine_function and inspect.iscoroutinefunction are completely different
  • gen.sleep and asyncio.sleep would induce wildly different sleep times in total
  • gen.coroutine was turning work into asyncio.Future results fairly aggressively so creating Task objects out of native coroutines and Tornado coroutines takes some consideration.

I ended up figuring out a way to mimic Akka Streams's parallelism parameter for mapAsync which warrants keeping map and map_async distinct as now the user is making a meaningful design decision about how wide to open up the parallel evaluation for mapping the stream elements.

I have definitely run into problems in stream processing when the mapping function is heavy-weight and the system was creating a strong happens-before relationship between mapping successive elements so I wanted to give an option to make that a weaker condition than forcing a total ordering on each await.

@martindurant
Copy link
Member

Akka Streams's parallelism parameter for mapAsync which warrants keeping map and map_async distinct as now the user is making a meaningful design decision

You mean the use of gather?

I am wondering whether we should be adding any further tornado-specific functionality at this points. If people make their async functions with gen, maybe that's just wrong in 2025...

@dwskoog
Copy link
Contributor Author

dwskoog commented Dec 22, 2025

Akka Streams's parallelism parameter for mapAsync which warrants keeping map and map_async distinct as now the user is making a meaningful design decision

You mean the use of gather?

In Akka Streams, the API is designed such that map immediately transforms the element and passes it downstream. There is no design consideration around the cost of the transform (JVM threads and concurrency primitives ensure that if the function takes a substantial amount of time, that backpressure will slow the upstream). The mapAsync API, on the other hand, requires the caller to specify a parallelism parameter: Stream.mapAsync(parallelism: int)(f: (Out) => Future[T]) to factor in how slowly the mapped function runs. The transforms guarantees that Departure Order of the mapped f(x) elements corresponds exactly to the Arrival Order of the input elements x and that up to parallelism applications of f are running in parallel while awaiting the resulting Futures.

In a prior job, we had a problem where one of our enrichment functions used mapAsync and the lookup service we called had a substantial slow down in response time. We had a few days where the entire dataflow was afflicted with that because of the backpressure it induced. When we finally tracked it down, solving it was a matter of making the parallelism of that one mapAsync wider generally, and then scaling the processing of that step dynamically based on the health of that lookup service (Akka has a mechanism for creating multiple copies of a transform while ensuring the order of elements across all copies using the underlying actors).

I am wondering whether we should be adding any further tornado-specific functionality at this points. If people make their async functions with gen, maybe that's just wrong in 2025...

In this case, I was able to support Tornado coroutines without making them a primary design decision. I made the docstring examples for map_async entirely native asyncio as well as the example in the async.rst file. Given the prevalence of Tornado through the rest of the existing plumbing and documentation, it seemed advisable to tolerate it here.

I tried map with a coroutine and it failed spectacularly:
```
    @gen_test()
    def test_map_async_tornado():
        @gen.coroutine
        def add_tor(x=0, y=0):
            return x + y

        source = Stream(asynchronous=True)
        L = source.map(add_tor, y=1).map(add_tor, y=2).sink_to_list()

        yield source.emit(0)

        yield gen.moment  # yield to the event loop to ensure it finished
>       assert L == [3]
E       assert [<Future finished exception=TypeError("unsupported operand type(s) for +: '_asyncio.Future' and 'int'")>] == [3]
E
E         At index 0 diff: <Future finished exception=TypeError("unsupported operand type(s) for +: '_asyncio.Future' and 'int'")> != 3
E
E         Full diff:
E           [
E         -     3,
E         +     <Future finished exception=TypeError("unsupported operand type(s) for +: '_asyncio.Future' and 'int'")>,
E           ]
```
So I made a new `map_async` that uses native asyncio plumbing to await
the coroutine before feeding it downstream.
The background task can't return obviously if we want the stream to
continue operating.
Use an asyncio.Queue of the tasks to ensure that arrival and departure
order of elements match. Asserts back pressure when a new value arrives
via update but the work queue is full.

Because asyncio.Queue cannot peak, the parallelism factor is not
precise as the worker callback can have either zero or one task in hand
but it must free up a slot in the queue to do so. Under pressure, the
parallelism will generally be `(parallelism + 1)` instead of
`parallelism` as given in the `__init__` as one Future will be in the
awaited in the worker callback while the queue fills up from update
calls.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants