|
| 1 | +# Asynchronous streams for Rust |
| 2 | + |
| 3 | +Asynchronous stream of elements. |
| 4 | + |
| 5 | +Provides two macros, `stream!` and `try_stream!`, allowing the caller to |
| 6 | +define asynchronous streams of elements. These are implemented using `async` |
| 7 | +& `await` notation. The `stream!` macro works using only |
| 8 | +`#[feature(async_await)]`. |
| 9 | + |
| 10 | +The `stream!` macro returns an anonymous type implementing the [`Stream`] |
| 11 | +trait. The `Item` associated type is the type of the values yielded from the |
| 12 | +stream. The `try_stream!` also returns an anonymous type implementing the |
| 13 | +[`Stream`] trait, but the `Item` associated type is `Result<T, Error>`. The |
| 14 | +`try_stream!` macro supports using `?` notiation as part of the |
| 15 | +implementation. |
| 16 | + |
| 17 | +## Usage |
| 18 | + |
| 19 | +A basic stream yielding numbers. Values are yielded using the `yield` |
| 20 | +keyword. The stream block must return `()`. |
| 21 | + |
| 22 | +```rust |
| 23 | +#![feature(async_await)] |
| 24 | + |
| 25 | +use tokio::prelude::*; |
| 26 | + |
| 27 | +use async_stream::stream; |
| 28 | +use futures_util::pin_mut; |
| 29 | + |
| 30 | +#[tokio::main] |
| 31 | +async fn main() { |
| 32 | + let s = stream! { |
| 33 | + for i in 0..3 { |
| 34 | + yield i; |
| 35 | + } |
| 36 | + }; |
| 37 | + |
| 38 | + pin_mut!(s); // needed for iteration |
| 39 | + |
| 40 | + while let Some(value) = s.next().await { |
| 41 | + println!("got {}", value); |
| 42 | + } |
| 43 | +} |
| 44 | +``` |
| 45 | + |
| 46 | +Streams may be returned by using `impl Stream<Item = T>`: |
| 47 | + |
| 48 | +```rust |
| 49 | +#![feature(async_await)] |
| 50 | + |
| 51 | +use tokio::prelude::*; |
| 52 | + |
| 53 | +use async_stream::stream; |
| 54 | +use futures_util::pin_mut; |
| 55 | + |
| 56 | +fn zero_to_three() -> impl Stream<Item = u32> { |
| 57 | + stream! { |
| 58 | + for i in 0..3 { |
| 59 | + yield i; |
| 60 | + } |
| 61 | + } |
| 62 | +} |
| 63 | + |
| 64 | +#[tokio::main] |
| 65 | +async fn main() { |
| 66 | + let s = zero_to_three(); |
| 67 | + pin_mut!(s); // needed for iteration |
| 68 | + |
| 69 | + while let Some(value) = s.next().await { |
| 70 | + println!("got {}", value); |
| 71 | + } |
| 72 | +} |
| 73 | +``` |
| 74 | + |
| 75 | +Streams may be implemented in terms of other streams: |
| 76 | + |
| 77 | +```rust |
| 78 | +#![feature(async_await)] |
| 79 | + |
| 80 | +use tokio::prelude::*; |
| 81 | + |
| 82 | +use async_stream::stream; |
| 83 | +use futures_util::pin_mut; |
| 84 | + |
| 85 | +fn zero_to_three() -> impl Stream<Item = u32> { |
| 86 | + stream! { |
| 87 | + for i in 0..3 { |
| 88 | + yield i; |
| 89 | + } |
| 90 | + } |
| 91 | +} |
| 92 | + |
| 93 | +fn double<S: Stream<Item = u32>>(input: S) |
| 94 | + -> impl Stream<Item = u32> |
| 95 | +{ |
| 96 | + stream! { |
| 97 | + pin_mut!(input); |
| 98 | + while let Some(value) = input.next().await { |
| 99 | + yield value * 2; |
| 100 | + } |
| 101 | + } |
| 102 | +} |
| 103 | + |
| 104 | +#[tokio::main] |
| 105 | +async fn main() { |
| 106 | + let s = double(zero_to_three()); |
| 107 | + pin_mut!(s); // needed for iteration |
| 108 | + |
| 109 | + while let Some(value) = s.next().await { |
| 110 | + println!("got {}", value); |
| 111 | + } |
| 112 | +} |
| 113 | +``` |
| 114 | + |
| 115 | +Rust try notation (`?`) can be used with the `try_stream!` macro. The `Item` |
| 116 | +of the returned stream is `Result` with `Ok` being the value yielded and |
| 117 | +`Err` the error type returned by `?`. |
| 118 | + |
| 119 | +```rust |
| 120 | +#![feature(async_await)] |
| 121 | + |
| 122 | +use tokio::net::{TcpListener, TcpStream}; |
| 123 | +use tokio::prelude::*; |
| 124 | + |
| 125 | +use async_stream::try_stream; |
| 126 | +use std::io; |
| 127 | +use std::net::SocketAddr; |
| 128 | + |
| 129 | +fn bind_and_accept(addr: SocketAddr) |
| 130 | + -> impl Stream<Item = io::Result<TcpStream>> |
| 131 | +{ |
| 132 | + try_stream! { |
| 133 | + let mut listener = TcpListener::bind(&addr)?; |
| 134 | + |
| 135 | + loop { |
| 136 | + let (stream, addr) = listener.accept().await?; |
| 137 | + println!("received on {:?}", addr); |
| 138 | + yield stream; |
| 139 | + } |
| 140 | + } |
| 141 | +} |
| 142 | +``` |
| 143 | + |
| 144 | +## Implementation |
| 145 | + |
| 146 | +The `stream!` and `try_stream!` macros are implemented using proc macros. |
| 147 | +Given that proc macros in expression position are not supported on stable |
| 148 | +rust, a hack similar to the one provided by the [`proc-macro-hack`] crate is |
| 149 | +used. The macro searches the syntax tree for instances of `sender.send($expr)` and |
| 150 | +transforms them into `sender.send($expr).await`. |
| 151 | + |
| 152 | +The stream uses a lightweight sender to send values from the stream |
| 153 | +implementation to the caller. When entering the stream, an `Option<T>` is |
| 154 | +stored on the stack. A pointer to the cell is stored in a thread local and |
| 155 | +`poll` is called on the async block. When `poll` returns. |
| 156 | +`sender.send(value)` stores the value that cell and yields back to the |
| 157 | +caller. |
| 158 | + |
| 159 | +## Limitations |
| 160 | + |
| 161 | +`async-stream` suffers from the same limitations as the [`proc-macro-hack`] |
| 162 | +crate. Primarily, nesting support must be implemented using a `TT-muncher`. |
| 163 | +If large `stream!` blocks are used, the caller will be required to add |
| 164 | +`#![recursion_limit = "..."]` to their crate. |
| 165 | + |
| 166 | +A `stream!` macro may only contain up to 64 macro invocations. |
| 167 | + |
| 168 | +[`Stream`]: https://docs.rs/futures-core-preview/*/futures_core/stream/trait.Stream.html |
| 169 | +[`proc-macro-hack`]: https://github.com/dtolnay/proc-macro-hack/ |
| 170 | + |
| 171 | +## License |
| 172 | + |
| 173 | +This project is licensed under the [MIT license](LICENSE). |
| 174 | + |
| 175 | +### Contribution |
| 176 | + |
| 177 | +Unless you explicitly state otherwise, any contribution intentionally submitted |
| 178 | +for inclusion in `async-stream` by you, shall be licensed as MIT, without any |
| 179 | +additional terms or conditions. |
0 commit comments