Skip to content

Commit 6b98606

Browse files
committed
fix: remove thread local
1 parent d382efa commit 6b98606

File tree

5 files changed

+86
-75
lines changed

5 files changed

+86
-75
lines changed

Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,3 @@ tokio = { version = "1", features = [ "full", "macros" ] }
1717
futures = "0.3"
1818

1919
[features]
20-
unstable-thread-local = []

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,4 @@ async fn main() {
2121
```
2222

2323
## `#![no_std]` support
24-
`async-stream-lite` supports `#![no_std]` on nightly Rust (due to the usage of [the unstable `#[thread_local]` attribute](https://doc.rust-lang.org/beta/unstable-book/language-features/thread-local.html)). To enable `#![no_std]` support, enable the `unstable-thread-local` feature.
24+
`async-stream-lite` supports `#![no_std]`, but requires `alloc`.

src/lib.rs

Lines changed: 57 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,16 @@
11
#![allow(clippy::tabs_in_doc_comments)]
2-
#![cfg_attr(feature = "unstable-thread-local", feature(thread_local))]
3-
#![cfg_attr(all(not(test), feature = "unstable-thread-local"), no_std)]
2+
#![cfg_attr(not(test), no_std)]
43

4+
extern crate alloc;
55
extern crate core;
66

7+
use alloc::sync::{Arc, Weak};
78
use core::{
89
cell::Cell,
910
future::Future,
1011
marker::PhantomData,
1112
pin::Pin,
12-
ptr,
13+
sync::atomic::{AtomicBool, Ordering},
1314
task::{Context, Poll}
1415
};
1516

@@ -19,21 +20,49 @@ use futures_core::stream::{FusedStream, Stream};
1920
mod tests;
2021
mod r#try;
2122

22-
#[cfg(not(feature = "unstable-thread-local"))]
23-
thread_local! {
24-
static STORE: Cell<*mut ()> = const { Cell::new(ptr::null_mut()) };
23+
pub(crate) struct SharedStore<T> {
24+
entered: AtomicBool,
25+
cell: Cell<Option<T>>
2526
}
26-
#[cfg(feature = "unstable-thread-local")]
27-
#[thread_local]
28-
static STORE: Cell<*mut ()> = Cell::new(ptr::null_mut());
27+
28+
impl<T> Default for SharedStore<T> {
29+
fn default() -> Self {
30+
Self {
31+
entered: AtomicBool::new(false),
32+
cell: Cell::new(None)
33+
}
34+
}
35+
}
36+
37+
impl<T> SharedStore<T> {
38+
pub fn has_value(&self) -> bool {
39+
unsafe { &*self.cell.as_ptr() }.is_some()
40+
}
41+
}
42+
43+
unsafe impl<T> Sync for SharedStore<T> {}
2944

3045
pub struct Yielder<T> {
31-
_p: PhantomData<T>
46+
pub(crate) store: Weak<SharedStore<T>>
3247
}
3348

3449
impl<T> Yielder<T> {
3550
pub fn r#yield(&self, value: T) -> YieldFut<'_, T> {
36-
YieldFut { value: Some(value), _p: PhantomData }
51+
#[cold]
52+
fn invalid_usage() -> ! {
53+
panic!("attempted to use async_stream_lite yielder outside of stream context or across threads")
54+
}
55+
56+
let Some(store) = self.store.upgrade() else {
57+
invalid_usage();
58+
};
59+
if !store.entered.load(Ordering::Relaxed) {
60+
invalid_usage();
61+
}
62+
63+
store.cell.replace(Some(value));
64+
65+
YieldFut { store, _p: PhantomData }
3766
}
3867
}
3968

@@ -42,7 +71,7 @@ impl<T> Yielder<T> {
4271
/// This future must be `.await`ed inside the generator in order for the item to be yielded by the stream.
4372
#[must_use = "stream will not yield this item unless the future returned by yield is awaited"]
4473
pub struct YieldFut<'y, T> {
45-
value: Option<T>,
74+
store: Arc<SharedStore<T>>,
4675
_p: PhantomData<&'y ()>
4776
}
4877

@@ -51,66 +80,36 @@ impl<T> Unpin for YieldFut<'_, T> {}
5180
impl<T> Future for YieldFut<'_, T> {
5281
type Output = ();
5382

54-
fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
55-
if self.value.is_none() {
83+
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
84+
if !self.store.has_value() {
5685
return Poll::Ready(());
5786
}
5887

59-
fn op<T>(cell: &Cell<*mut ()>, value: &mut Option<T>) {
60-
let ptr = cell.get().cast::<Option<T>>();
61-
let option_ref = unsafe { ptr.as_mut() }.expect("attempted to use async_stream yielder outside of stream context or across threads");
62-
if option_ref.is_none() {
63-
*option_ref = value.take();
64-
}
65-
}
66-
67-
#[cfg(not(feature = "unstable-thread-local"))]
68-
return STORE.with(|cell| {
69-
op(cell, &mut self.value);
70-
Poll::Pending
71-
});
72-
#[cfg(feature = "unstable-thread-local")]
73-
{
74-
op(&STORE, &mut self.value);
75-
Poll::Pending
76-
}
88+
Poll::Pending
7789
}
7890
}
7991

80-
struct Enter<'a, T> {
81-
_p: PhantomData<&'a T>,
82-
prev: *mut ()
92+
struct Enter<'s, T> {
93+
store: &'s SharedStore<T>
8394
}
8495

85-
fn enter<T>(dst: &mut Option<T>) -> Enter<'_, T> {
86-
fn op<T>(cell: &Cell<*mut ()>, dst: &mut Option<T>) -> *mut () {
87-
let prev = cell.get();
88-
cell.set((dst as *mut Option<T>).cast::<()>());
89-
prev
90-
}
91-
#[cfg(not(feature = "unstable-thread-local"))]
92-
let prev = STORE.with(|cell| op(cell, dst));
93-
#[cfg(feature = "unstable-thread-local")]
94-
let prev = op(&STORE, dst);
95-
Enter { _p: PhantomData, prev }
96+
fn enter<T>(store: &SharedStore<T>) -> Enter<'_, T> {
97+
store.entered.store(true, Ordering::Relaxed);
98+
Enter { store }
9699
}
97100

98101
impl<T> Drop for Enter<'_, T> {
99102
fn drop(&mut self) {
100-
#[cfg(not(feature = "unstable-thread-local"))]
101-
STORE.with(|cell| cell.set(self.prev));
102-
#[cfg(feature = "unstable-thread-local")]
103-
STORE.set(self.prev);
103+
self.store.entered.store(false, Ordering::Relaxed);
104104
}
105105
}
106106

107107
pin_project_lite::pin_project! {
108108
/// A [`Stream`] created from an asynchronous generator-like function.
109109
///
110110
/// To create an [`AsyncStream`], use the [`async_stream`] function.
111-
#[derive(Debug)]
112111
pub struct AsyncStream<T, U> {
113-
_p: PhantomData<T>,
112+
store: Arc<SharedStore<T>>,
114113
done: bool,
115114
#[pin]
116115
generator: U
@@ -138,16 +137,15 @@ where
138137
return Poll::Ready(None);
139138
}
140139

141-
let mut dst = None;
142140
let res = {
143-
let _enter = enter(&mut dst);
141+
let _enter = enter(&me.store);
144142
me.generator.poll(cx)
145143
};
146144

147145
*me.done = res.is_ready();
148146

149-
if dst.is_some() {
150-
return Poll::Ready(dst.take());
147+
if me.store.has_value() {
148+
return Poll::Ready(me.store.cell.take());
151149
}
152150

153151
if *me.done { Poll::Ready(None) } else { Poll::Pending }
@@ -271,12 +269,9 @@ where
271269
F: FnOnce(Yielder<T>) -> U,
272270
U: Future<Output = ()>
273271
{
274-
let generator = generator(Yielder { _p: PhantomData });
275-
AsyncStream {
276-
_p: PhantomData,
277-
done: false,
278-
generator
279-
}
272+
let store = Arc::new(SharedStore::default());
273+
let generator = generator(Yielder { store: Arc::downgrade(&store) });
274+
AsyncStream { store, done: false, generator }
280275
}
281276

282277
pub use self::r#try::{TryAsyncStream, try_async_stream};

src/tests.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,3 +310,18 @@ async fn multithreaded() {
310310
}
311311
join_all(futures).await;
312312
}
313+
314+
#[tokio::test]
315+
#[should_panic = "attempted to use async_stream_lite yielder outside of stream context or across threads"]
316+
async fn test_move_yielder() {
317+
let mut slot = None;
318+
let s = async_stream(|yielder: Yielder<()>| async {
319+
slot.replace(yielder);
320+
});
321+
pin_mut!(s);
322+
323+
let _ = s.next().await;
324+
drop(s);
325+
326+
slot.take().unwrap().r#yield(()).await;
327+
}

src/try.rs

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use alloc::sync::Arc;
12
use core::{
23
future::Future,
34
marker::PhantomData,
@@ -7,18 +8,18 @@ use core::{
78

89
use futures_core::stream::{FusedStream, Stream};
910

10-
use crate::{Yielder, enter};
11+
use crate::{SharedStore, Yielder, enter};
1112

1213
pin_project_lite::pin_project! {
1314
/// A [`Stream`] created from a fallible, asynchronous generator-like function.
1415
///
1516
/// To create a [`TryAsyncStream`], use the [`try_async_stream`] function. See also [`crate::AsyncStream`].
16-
#[derive(Debug)]
1717
pub struct TryAsyncStream<T, E, U> {
18-
_p: PhantomData<(T, E)>,
18+
store: Arc<SharedStore<T>>,
1919
done: bool,
2020
#[pin]
21-
generator: U
21+
generator: U,
22+
_p: PhantomData<E>
2223
}
2324
}
2425

@@ -43,18 +44,17 @@ where
4344
return Poll::Ready(None);
4445
}
4546

46-
let mut dst = None;
4747
let res = {
48-
let _enter = enter(&mut dst);
48+
let _enter = enter(&me.store);
4949
me.generator.poll(cx)
5050
};
5151

5252
*me.done = res.is_ready();
5353

5454
if let Poll::Ready(Err(e)) = res {
5555
return Poll::Ready(Some(Err(e)));
56-
} else if dst.is_some() {
57-
return Poll::Ready(dst.take().map(Ok));
56+
} else if me.store.has_value() {
57+
return Poll::Ready(me.store.cell.take().map(Ok));
5858
}
5959

6060
if *me.done { Poll::Ready(None) } else { Poll::Pending }
@@ -96,11 +96,13 @@ where
9696
F: FnOnce(Yielder<T>) -> U,
9797
U: Future<Output = Result<(), E>>
9898
{
99-
let generator = generator(Yielder { _p: PhantomData });
99+
let store = Arc::new(SharedStore::default());
100+
let generator = generator(Yielder { store: Arc::downgrade(&store) });
100101
TryAsyncStream {
101-
_p: PhantomData,
102+
store,
102103
done: false,
103-
generator
104+
generator,
105+
_p: PhantomData
104106
}
105107
}
106108

0 commit comments

Comments
 (0)