Skip to content

Commit 42fc6fd

Browse files
committed
StreamAdapter
1 parent ccd3585 commit 42fc6fd

File tree

2 files changed

+54
-54
lines changed

2 files changed

+54
-54
lines changed

lambda-http/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ use std::{
102102
};
103103

104104
mod streaming;
105-
pub use streaming::{into_streaming_response, run_with_streaming_response};
105+
pub use streaming::{run_with_streaming_response, StreamAdapter};
106106

107107
/// Type alias for `http::Request`s with a fixed [`Body`](enum.Body.html) type
108108
pub type Request = http::Request<Body>;

lambda-http/src/streaming.rs

Lines changed: 53 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1,84 +1,85 @@
1-
use crate::{http::header::SET_COOKIE, request::LambdaRequest, tower::ServiceBuilder, Request, RequestExt};
1+
use crate::{http::header::SET_COOKIE, request::LambdaRequest, Request, RequestExt};
22
use bytes::Bytes;
3-
pub use http::{self, Response};
4-
use http_body::Body;
5-
pub use lambda_runtime::{
6-
self,
7-
tower::{
8-
util::{MapRequest, MapResponse},
9-
ServiceExt,
10-
},
11-
Error, LambdaEvent, MetadataPrelude, Service, StreamResponse,
12-
};
13-
use lambda_runtime::{tower::util::BoxService, Diagnostic};
14-
use std::{
3+
use core::{
154
fmt::Debug,
5+
future::Future,
166
pin::Pin,
177
task::{Context, Poll},
188
};
9+
pub use http::{self, Response};
10+
use http_body::Body;
11+
use lambda_runtime::Diagnostic;
12+
pub use lambda_runtime::{Error, LambdaEvent, MetadataPrelude, Service, StreamResponse};
13+
use std::marker::PhantomData;
1914
use tokio_stream::Stream;
2015

21-
/// Runs the Lambda runtime with a handler that returns **streaming** HTTP
16+
/// An adapter that lifts a standard [`Service<Request>`] into a
17+
/// [`Service<LambdaEvent<LambdaRequest>>`] which produces streaming Lambda HTTP
2218
/// responses.
23-
pub fn into_streaming_response<'a, S, B, E>(
24-
handler: S,
25-
) -> BoxService<LambdaEvent<LambdaRequest>, StreamResponse<BodyStream<B>>, E>
19+
pub struct StreamAdapter<'a, S, B> {
20+
service: S,
21+
_phantom_data: PhantomData<&'a B>,
22+
}
23+
24+
impl<'a, S, B, E> From<S> for StreamAdapter<'a, S, B>
2625
where
27-
S: Service<Request, Response = Response<B>, Error = E> + Send + 'static,
26+
S: Service<Request, Response = Response<B>, Error = E>,
2827
S::Future: Send + 'a,
29-
E: Debug + Into<Diagnostic> + 'static,
28+
E: Debug + Into<Diagnostic>,
3029
B: Body + Unpin + Send + 'static,
3130
B::Data: Into<Bytes> + Send,
3231
B::Error: Into<Error> + Send + Debug,
3332
{
34-
into_streaming_response_inner::<S, B, E>(handler).boxed()
33+
fn from(service: S) -> Self {
34+
StreamAdapter {
35+
service,
36+
_phantom_data: PhantomData,
37+
}
38+
}
3539
}
3640

37-
#[allow(clippy::type_complexity)]
38-
fn into_streaming_response_inner<'a, S, B, E>(
39-
handler: S,
40-
) -> MapResponse<
41-
MapRequest<S, impl FnMut(LambdaEvent<LambdaRequest>) -> Request>,
42-
impl FnOnce(Response<B>) -> StreamResponse<BodyStream<B>> + Clone,
43-
>
41+
impl<'a, S, B, E> Service<LambdaEvent<LambdaRequest>> for StreamAdapter<'a, S, B>
4442
where
4543
S: Service<Request, Response = Response<B>, Error = E>,
4644
S::Future: Send + 'a,
47-
E: Debug + Into<Diagnostic>,
48-
B: Body + Unpin + Send + 'static,
45+
B: Body + Send + 'static,
4946
B::Data: Into<Bytes> + Send,
5047
B::Error: Into<Error> + Send + Debug,
48+
E: Debug + Into<Diagnostic>,
5149
{
52-
ServiceBuilder::new()
53-
.map_request(|req: LambdaEvent<LambdaRequest>| {
54-
let event: Request = req.payload.into();
55-
event.with_lambda_context(req.context)
56-
})
57-
.service(handler)
58-
.map_response(|res: Response<B>| {
59-
let (parts, body) = res.into_parts();
50+
type Response = StreamResponse<BodyStream<B>>;
51+
type Error = E;
52+
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, E>> + Send + 'a>>;
6053

61-
let mut prelude_headers = parts.headers;
54+
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
55+
self.service.poll_ready(cx)
56+
}
6257

63-
let cookies = prelude_headers
58+
fn call(&mut self, req: LambdaEvent<LambdaRequest>) -> Self::Future {
59+
let event: Request = req.payload.into();
60+
let fut = self.service.call(event.with_lambda_context(req.context));
61+
Box::pin(async move {
62+
let res = fut.await?;
63+
let (parts, body) = res.into_parts();
64+
65+
let mut headers = parts.headers;
66+
let cookies = headers
6467
.get_all(SET_COOKIE)
6568
.iter()
6669
.map(|c| String::from_utf8_lossy(c.as_bytes()).to_string())
67-
.collect::<Vec<String>>();
68-
69-
prelude_headers.remove(SET_COOKIE);
70+
.collect::<Vec<_>>();
71+
headers.remove(SET_COOKIE);
7072

71-
let metadata_prelude = MetadataPrelude {
72-
headers: prelude_headers,
73-
status_code: parts.status,
74-
cookies,
75-
};
76-
77-
StreamResponse {
78-
metadata_prelude,
73+
Ok(StreamResponse {
74+
metadata_prelude: MetadataPrelude {
75+
headers,
76+
status_code: parts.status,
77+
cookies,
78+
},
7979
stream: BodyStream { body },
80-
}
80+
})
8181
})
82+
}
8283
}
8384

8485
/// Runs the Lambda runtime with a handler that returns **streaming** HTTP
@@ -97,8 +98,7 @@ where
9798
B::Data: Into<Bytes> + Send,
9899
B::Error: Into<Error> + Send + Debug,
99100
{
100-
let svc = into_streaming_response_inner(handler);
101-
lambda_runtime::run(svc).await
101+
lambda_runtime::run(StreamAdapter::from(handler)).await
102102
}
103103

104104
pin_project_lite::pin_project! {

0 commit comments

Comments
 (0)