-
Notifications
You must be signed in to change notification settings - Fork 152
Make async functions mappable #493
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
| @gen_test() | ||
| def test_map_async_tornado(): | ||
| @gen.coroutine | ||
| def add_tor(x=0, y=0): | ||
| return x + y | ||
|
|
||
| async def add_native(x=0, y=0): | ||
| return x + y | ||
|
|
||
| source = Stream(asynchronous=True) | ||
| L = source.map_async(add_tor, y=1).map_async(add_native, y=2).sink_to_list() | ||
|
|
||
| yield source.emit(0) | ||
|
|
||
| yield gen.moment # Must yield to the event loop to ensure it finished | ||
| assert L == [3] | ||
|
|
||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_map_async(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not a Tornado user so I still do not really understand the implication of using it vs native asyncio so I wrote the test twice to show that either harness will run coroutines from each other.
| assert_eq(pd.concat(L), expected) | ||
|
|
||
|
|
||
| @flaky(max_runs=3, min_passes=1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I noticed this one leaving an upstream around on occasion. It might be related to GC changes in 3.13+.
781e691 to
c102bbd
Compare
|
I believe there are other places in the code where iscoroutinefunction (or isawaitable ?) to decide what to do? It could be argued that, from the current state of the package, all nodes should be asynchronous... |
Things I learned about Tornado and asyncio:
I ended up figuring out a way to mimic Akka Streams's parallelism parameter for mapAsync which warrants keeping I have definitely run into problems in stream processing when the mapping function is heavy-weight and the system was creating a strong happens-before relationship between mapping successive elements so I wanted to give an option to make that a weaker condition than forcing a total ordering on each await. |
You mean the use of I am wondering whether we should be adding any further tornado-specific functionality at this points. If people make their async functions with |
In Akka Streams, the API is designed such that In a prior job, we had a problem where one of our enrichment functions used
In this case, I was able to support Tornado coroutines without making them a primary design decision. I made the docstring examples for |
I tried map with a coroutine and it failed spectacularly:
```
@gen_test()
def test_map_async_tornado():
@gen.coroutine
def add_tor(x=0, y=0):
return x + y
source = Stream(asynchronous=True)
L = source.map(add_tor, y=1).map(add_tor, y=2).sink_to_list()
yield source.emit(0)
yield gen.moment # yield to the event loop to ensure it finished
> assert L == [3]
E assert [<Future finished exception=TypeError("unsupported operand type(s) for +: '_asyncio.Future' and 'int'")>] == [3]
E
E At index 0 diff: <Future finished exception=TypeError("unsupported operand type(s) for +: '_asyncio.Future' and 'int'")> != 3
E
E Full diff:
E [
E - 3,
E + <Future finished exception=TypeError("unsupported operand type(s) for +: '_asyncio.Future' and 'int'")>,
E ]
```
So I made a new `map_async` that uses native asyncio plumbing to await
the coroutine before feeding it downstream.
The background task can't return obviously if we want the stream to continue operating.
Use an asyncio.Queue of the tasks to ensure that arrival and departure order of elements match. Asserts back pressure when a new value arrives via update but the work queue is full. Because asyncio.Queue cannot peak, the parallelism factor is not precise as the worker callback can have either zero or one task in hand but it must free up a slot in the queue to do so. Under pressure, the parallelism will generally be `(parallelism + 1)` instead of `parallelism` as given in the `__init__` as one Future will be in the awaited in the worker callback while the queue fills up from update calls.
Creates new map_async API on Stream and updates the documentation around async work to note it.
I tried map with a coroutine and it failed spectacularly:
So I made a new
map_asyncthat uses native asyncio plumbing to await the coroutine before feeding it downstream.The distinguishing of
mapandmap_asyncwas inspired by prior work with mapAsync from Akka Streams as the use of anasyncio.Queueand a callback running on the loop to drain it seems like a decent amount of overhead to avoid when we do not need it.I'm still trying to figure out if inspect would work to adapt dynamically given either a native
async defor a Tornadogen.coroutine. If so, diverging the API would not be needed.