Skip to content

Commit f586be1

Browse files
authored
cm-async: Start to fill out {Future,Stream}Any (#12142)
* cm-async: Start to fill out `{Future,Stream}Any` This commit is the first step down the road of filling out the preexisting, but empty/buggy, `FutureAny` and `StreamAny` types. These are intended to behave similarly to `ResourceAny` where the embedder doesn't have static knowledge ahead of time about the type of the future/stream in use. Changes made here are: * `ComponentType for {Stream,Future}Reader<T>` now correctly typecheck the `T`. * Conversion to/from `*Any` types now properly typechecks the payload type against the expected type. * `{Future,Stream}Any` now live in their own file with the matrix of conversions to the typed variants. * A `close` method was added to `*Any` types. These types are not currently directly constructible but this will likely be relaxed in the future. Additionally the host can't actually use these values without knowing the type, which is another restriction that will be relaxed in the future (aka implemented). cc #11161 * Fix tests * Skip a test on miri
1 parent 2fafa35 commit f586be1

File tree

10 files changed

+1154
-257
lines changed

10 files changed

+1154
-257
lines changed

crates/misc/component-async-tests/tests/scenario/transmit.rs

Lines changed: 43 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -552,6 +552,7 @@ pub trait TransmitTest {
552552
) -> impl Future<Output = Result<Self::Result>> + Send + 'a;
553553

554554
fn into_params(
555+
store: impl AsContextMut<Data = Ctx>,
555556
control: StreamReader<Control>,
556557
caller_stream: StreamReader<String>,
557558
caller_future1: FutureReader<String>,
@@ -603,6 +604,7 @@ impl TransmitTest for StaticTransmitTest {
603604
}
604605

605606
fn into_params(
607+
_store: impl AsContextMut<Data = Ctx>,
606608
control: StreamReader<Control>,
607609
caller_stream: StreamReader<String>,
608610
caller_future1: FutureReader<String>,
@@ -663,21 +665,31 @@ impl TransmitTest for DynamicTransmitTest {
663665
}
664666

665667
fn into_params(
668+
mut store: impl AsContextMut<Data = Ctx>,
666669
control: StreamReader<Control>,
667670
caller_stream: StreamReader<String>,
668671
caller_future1: FutureReader<String>,
669672
caller_future2: FutureReader<String>,
670673
) -> Self::Params {
671674
vec![
672-
control.into_val(),
673-
caller_stream.into_val(),
674-
caller_future1.into_val(),
675-
caller_future2.into_val(),
675+
control.try_into_stream_any(&mut store).unwrap().into(),
676+
caller_stream
677+
.try_into_stream_any(&mut store)
678+
.unwrap()
679+
.into(),
680+
caller_future1
681+
.try_into_future_any(&mut store)
682+
.unwrap()
683+
.into(),
684+
caller_future2
685+
.try_into_future_any(&mut store)
686+
.unwrap()
687+
.into(),
676688
]
677689
}
678690

679691
fn from_result(
680-
mut store: impl AsContextMut<Data = Ctx>,
692+
_store: impl AsContextMut<Data = Ctx>,
681693
result: Self::Result,
682694
) -> Result<(
683695
StreamReader<String>,
@@ -687,9 +699,19 @@ impl TransmitTest for DynamicTransmitTest {
687699
let Val::Tuple(fields) = result else {
688700
unreachable!()
689701
};
690-
let stream = StreamReader::from_val(store.as_context_mut(), &fields[0])?;
691-
let future1 = FutureReader::from_val(store.as_context_mut(), &fields[1])?;
692-
let future2 = FutureReader::from_val(store.as_context_mut(), &fields[2])?;
702+
let mut fields = fields.into_iter();
703+
let Val::Stream(stream) = fields.next().unwrap() else {
704+
unreachable!()
705+
};
706+
let Val::Future(future1) = fields.next().unwrap() else {
707+
unreachable!()
708+
};
709+
let Val::Future(future2) = fields.next().unwrap() else {
710+
unreachable!()
711+
};
712+
let stream = StreamReader::try_from_stream_any(stream).unwrap();
713+
let future1 = FutureReader::try_from_future_any(future1).unwrap();
714+
let future2 = FutureReader::try_from_future_any(future2).unwrap();
693715
Ok((stream, future1, future2))
694716
}
695717
}
@@ -770,19 +792,20 @@ async fn test_transmit_with<Test: TransmitTest + 'static>(component: &str) -> Re
770792
.boxed(),
771793
);
772794

773-
futures.push(
774-
Test::call(
775-
accessor,
776-
&test,
777-
Test::into_params(
778-
control_rx,
779-
caller_stream_rx,
780-
caller_future1_rx,
781-
caller_future2_rx,
782-
),
795+
let params = accessor.with(|s| {
796+
Test::into_params(
797+
s,
798+
control_rx,
799+
caller_stream_rx,
800+
caller_future1_rx,
801+
caller_future2_rx,
783802
)
784-
.map(|v| v.map(Event::Result))
785-
.boxed(),
803+
});
804+
805+
futures.push(
806+
Test::call(accessor, &test, params)
807+
.map(|v| v.map(Event::Result))
808+
.boxed(),
786809
);
787810

788811
while let Some(event) = futures.try_next().await? {

crates/wasmtime/src/runtime/component/concurrent.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -90,17 +90,17 @@ use wasmtime_environ::component::{
9090
};
9191

9292
pub use abort::JoinHandle;
93+
pub use future_stream_any::{FutureAny, StreamAny};
9394
pub use futures_and_streams::{
9495
Destination, DirectDestination, DirectSource, ErrorContext, FutureConsumer, FutureProducer,
9596
FutureReader, GuardedFutureReader, GuardedStreamReader, ReadBuffer, Source, StreamConsumer,
9697
StreamProducer, StreamReader, StreamResult, VecBuffer, WriteBuffer,
9798
};
98-
pub(crate) use futures_and_streams::{
99-
ResourcePair, lower_error_context_to_index, lower_future_to_index, lower_stream_to_index,
100-
};
99+
pub(crate) use futures_and_streams::{ResourcePair, lower_error_context_to_index};
101100

102101
mod abort;
103102
mod error_contexts;
103+
mod future_stream_any;
104104
mod futures_and_streams;
105105
pub(crate) mod table;
106106
pub(crate) mod tls;

0 commit comments

Comments
 (0)