Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ INTEG_API_INVOKE := RestApiUrl HttpApiUrl
INTEG_EXTENSIONS := extension-fn extension-trait logs-trait
# Using musl to run extensions on both AL1 and AL2
INTEG_ARCH := x86_64-unknown-linux-musl
RIE_MAX_CONCURRENCY ?= 4

define uppercase
$(shell sed -r 's/(^|-)(\w)/\U\2/g' <<< $(1))
Expand Down Expand Up @@ -111,4 +112,8 @@ fmt:
cargo +nightly fmt --all

test-rie:
./scripts/test-rie.sh $(EXAMPLE)
./scripts/test-rie.sh $(EXAMPLE)

# Run RIE in Lambda Managed Instance (LMI) mode with concurrent polling.
test-rie-lmi:
RIE_MAX_CONCURRENCY=$(RIE_MAX_CONCURRENCY) ./scripts/test-rie.sh $(EXAMPLE)
9 changes: 9 additions & 0 deletions examples/basic-lambda-concurrent/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
[package]
name = "basic-lambda-concurrent"
version = "0.1.0"
edition = "2021"

[dependencies]
lambda_runtime = { path = "../../lambda-runtime" }
serde = "1.0.219"
tokio = { version = "1", features = ["macros"] }
74 changes: 74 additions & 0 deletions examples/basic-lambda-concurrent/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// This example requires the following input to succeed:
// { "command": "do something" }

use lambda_runtime::{service_fn, tracing, Error, LambdaEvent};
use serde::{Deserialize, Serialize};

/// This is also a made-up example. Requests come into the runtime as unicode
/// strings in json format, which can map to any structure that implements `serde::Deserialize`
/// The runtime pays no attention to the contents of the request payload.
#[derive(Deserialize)]
struct Request {
command: String,
}

/// This is a made-up example of what a response structure may look like.
/// There is no restriction on what it can be. The runtime requires responses
/// to be serialized into json. The runtime pays no attention
/// to the contents of the response payload.
#[derive(Serialize)]
struct Response {
req_id: String,
msg: String,
}

#[tokio::main]
async fn main() -> Result<(), Error> {
// required to enable CloudWatch error logging by the runtime
tracing::init_default_subscriber();

let func = service_fn(my_handler);
if let Err(err) = lambda_runtime::run_concurrent(func).await {
eprintln!("run error: {:?}", err);
return Err(err);
}
Ok(())
}

pub(crate) async fn my_handler(event: LambdaEvent<Request>) -> Result<Response, Error> {
// extract some useful info from the request
let command = event.payload.command;

// prepare the response
let resp = Response {
req_id: event.context.request_id,
msg: format!("Command {command} executed."),
};

// return `Response` (it will be serialized to JSON automatically by the runtime)
Ok(resp)
}

#[cfg(test)]
mod tests {
use crate::{my_handler, Request};
use lambda_runtime::{Context, LambdaEvent};

#[tokio::test]
async fn response_is_good_for_simple_input() {
let id = "ID";

let mut context = Context::default();
context.request_id = id.to_string();

let payload = Request {
command: "X".to_string(),
};
let event = LambdaEvent { payload, context };

let result = my_handler(event).await.unwrap();

assert_eq!(result.msg, "Command X executed.");
assert_eq!(result.req_id, id.to_string());
}
}
5 changes: 4 additions & 1 deletion examples/basic-lambda/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ async fn main() -> Result<(), Error> {
tracing::init_default_subscriber();

let func = service_fn(my_handler);
lambda_runtime::run(func).await?;
if let Err(err) = lambda_runtime::run(func).await {
eprintln!("run error: {:?}", err);
return Err(err);
}
Ok(())
}

Expand Down
32 changes: 31 additions & 1 deletion lambda-http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ use std::{
};

mod streaming;
pub use streaming::{run_with_streaming_response, StreamAdapter};
pub use streaming::{run_with_streaming_response, run_with_streaming_response_concurrent, StreamAdapter};

/// Type alias for `http::Request`s with a fixed [`Body`](enum.Body.html) type
pub type Request = http::Request<Body>;
Expand Down Expand Up @@ -151,6 +151,18 @@ pub struct Adapter<'a, R, S> {
_phantom_data: PhantomData<&'a R>,
}

impl<'a, R, S> Clone for Adapter<'a, R, S>
where
S: Clone,
{
fn clone(&self) -> Self {
Self {
service: self.service.clone(),
_phantom_data: PhantomData,
}
}
}

impl<'a, R, S, E> From<S> for Adapter<'a, R, S>
where
S: Service<Request, Response = R, Error = E>,
Expand Down Expand Up @@ -203,6 +215,24 @@ where
lambda_runtime::run(Adapter::from(handler)).await
}

/// Starts the Lambda Rust runtime in a mode that is compatible with
/// Lambda Managed Instances (concurrent invocations).
///
/// When `AWS_LAMBDA_MAX_CONCURRENCY` is set to a value greater than 1, this
/// will spawn `AWS_LAMBDA_MAX_CONCURRENCY` worker tasks, each running its own
/// `/next` polling loop. When the environment variable is unset or `<= 1`,
/// it falls back to the same sequential behavior as [`run`], so the same
/// handler can run on both classic Lambda and Lambda Managed Instances.
pub async fn run_concurrent<R, S, E>(handler: S) -> Result<(), Error>
where
S: Service<Request, Response = R, Error = E> + Clone + Send + 'static,
S::Future: Send + 'static,
R: IntoResponse + Send + Sync + 'static,
E: std::fmt::Debug + Into<Diagnostic> + Send + 'static,
{
lambda_runtime::run_concurrent(Adapter::from(handler)).await
}

#[cfg(test)]
mod test_adapter {
use std::task::{Context, Poll};
Expand Down
52 changes: 47 additions & 5 deletions lambda-http/src/streaming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ pub use http::{self, Response};
use http_body::Body;
use lambda_runtime::{
tower::{
util::{MapRequest, MapResponse},
util::{BoxCloneService, MapRequest, MapResponse},
ServiceBuilder, ServiceExt,
},
Diagnostic,
Expand Down Expand Up @@ -93,14 +93,33 @@ where
B::Error: Into<Error> + Send + Debug,
{
ServiceBuilder::new()
.map_request(|req: LambdaEvent<LambdaRequest>| {
let event: Request = req.payload.into();
event.with_lambda_context(req.context)
})
.map_request(event_to_request as fn(LambdaEvent<LambdaRequest>) -> Request)
.service(handler)
.map_response(into_stream_response)
}

/// Builds a streaming-aware Tower service from a `Service<Request>` that can be
/// cloned and sent across tasks. This is used by the concurrent HTTP entrypoint.
#[allow(clippy::type_complexity)]
fn into_stream_service_boxed<S, B, E>(
handler: S,
) -> BoxCloneService<LambdaEvent<LambdaRequest>, StreamResponse<BodyStream<B>>, E>
where
S: Service<Request, Response = Response<B>, Error = E> + Clone + Send + 'static,
S::Future: Send + 'static,
E: Debug + Into<Diagnostic> + Send + 'static,
B: Body + Unpin + Send + 'static,
B::Data: Into<Bytes> + Send,
B::Error: Into<Error> + Send + Debug,
{
let svc = ServiceBuilder::new()
.map_request(event_to_request as fn(LambdaEvent<LambdaRequest>) -> Request)
.service(handler)
.map_response(into_stream_response);

BoxCloneService::new(svc)
}

/// Converts an `http::Response<B>` into a streaming Lambda response.
fn into_stream_response<B>(res: Response<B>) -> StreamResponse<BodyStream<B>>
where
Expand Down Expand Up @@ -128,6 +147,11 @@ where
}
}

fn event_to_request(req: LambdaEvent<LambdaRequest>) -> Request {
let event: Request = req.payload.into();
event.with_lambda_context(req.context)
}

/// Runs the Lambda runtime with a handler that returns **streaming** HTTP
/// responses.
///
Expand All @@ -147,6 +171,24 @@ where
lambda_runtime::run(into_stream_service(handler)).await
}

/// Runs the Lambda runtime with a handler that returns **streaming** HTTP
/// responses, in a mode that is compatible with Lambda Managed Instances.
///
/// This uses a cloneable, boxed service internally so it can be driven by the
/// concurrent runtime. When `AWS_LAMBDA_MAX_CONCURRENCY` is not set or `<= 1`,
/// it falls back to the same sequential behavior as [`run_with_streaming_response`].
pub async fn run_with_streaming_response_concurrent<S, B, E>(handler: S) -> Result<(), Error>
where
S: Service<Request, Response = Response<B>, Error = E> + Clone + Send + 'static,
S::Future: Send + 'static,
E: Debug + Into<Diagnostic> + Send + 'static,
B: Body + Unpin + Send + 'static,
B::Data: Into<Bytes> + Send,
B::Error: Into<Error> + Send + Debug,
{
lambda_runtime::run_concurrent(into_stream_service_boxed(handler)).await
}

pin_project_lite::pin_project! {
#[non_exhaustive]
pub struct BodyStream<B> {
Expand Down
41 changes: 35 additions & 6 deletions lambda-runtime-api-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ impl Client {
ClientBuilder {
connector: HttpConnector::new(),
uri: None,
pool_size: None,
}
}
}
Expand All @@ -59,11 +60,16 @@ impl Client {
self.client.request(req).map_err(Into::into).boxed()
}

/// Create a new client with a given base URI and HTTP connector.
fn with(base: Uri, connector: HttpConnector) -> Self {
let client = hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::new())
.http1_max_buf_size(1024 * 1024)
.build(connector);
/// Create a new client with a given base URI, HTTP connector, and optional pool size hint.
fn with(base: Uri, connector: HttpConnector, pool_size: Option<usize>) -> Self {
let mut builder = hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::new());
builder.http1_max_buf_size(1024 * 1024);

if let Some(size) = pool_size {
builder.pool_max_idle_per_host(size);
}

let client = builder.build(connector);
Self { base, client }
}

Expand Down Expand Up @@ -94,6 +100,7 @@ impl Client {
pub struct ClientBuilder {
connector: HttpConnector,
uri: Option<http::Uri>,
pool_size: Option<usize>,
}

impl ClientBuilder {
Expand All @@ -102,6 +109,7 @@ impl ClientBuilder {
ClientBuilder {
connector,
uri: self.uri,
pool_size: self.pool_size,
}
}

Expand All @@ -111,6 +119,14 @@ impl ClientBuilder {
Self { uri: Some(uri), ..self }
}

/// Provide a pool size hint for the underlying Hyper client.
pub fn with_pool_size(self, pool_size: usize) -> Self {
Self {
pool_size: Some(pool_size),
..self
}
}

/// Create the new client to interact with the Runtime API.
pub fn build(self) -> Result<Client, Error> {
let uri = match self.uri {
Expand All @@ -120,7 +136,7 @@ impl ClientBuilder {
uri.try_into().expect("Unable to convert to URL")
}
};
Ok(Client::with(uri, self.connector))
Ok(Client::with(uri, self.connector, self.pool_size))
}
}

Expand Down Expand Up @@ -182,4 +198,17 @@ mod tests {
&req.uri().to_string()
);
}

#[test]
fn builder_accepts_pool_size() {
let base = "http://localhost:9001";
let expected: Uri = base.parse().unwrap();
let client = Client::builder()
.with_pool_size(4)
.with_endpoint(base.parse().unwrap())
.build()
.unwrap();

assert_eq!(client.base, expected);
}
}
12 changes: 12 additions & 0 deletions lambda-runtime/src/layers/api_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,18 @@ where
}
}

impl<S> Clone for RuntimeApiClientService<S>
where
S: Clone,
{
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
client: self.client.clone(),
}
}
}

#[pin_project(project = RuntimeApiClientFutureProj)]
pub enum RuntimeApiClientFuture<F> {
First(#[pin] F, Arc<Client>),
Expand Down
21 changes: 21 additions & 0 deletions lambda-runtime/src/layers/api_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,27 @@ impl<S, EventPayload, Response, BufferedResponse, StreamingResponse, StreamItem,
}
}

impl<S, EventPayload, Response, BufferedResponse, StreamingResponse, StreamItem, StreamError> Clone
for RuntimeApiResponseService<
S,
EventPayload,
Response,
BufferedResponse,
StreamingResponse,
StreamItem,
StreamError,
>
where
S: Clone,
{
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
_phantom: PhantomData,
}
}
}

impl<S, EventPayload, Response, BufferedResponse, StreamingResponse, StreamItem, StreamError> Service<LambdaInvocation>
for RuntimeApiResponseService<
S,
Expand Down
1 change: 1 addition & 0 deletions lambda-runtime/src/layers/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ impl<S> Layer<S> for TracingLayer {
}

/// Tower service returned by [TracingLayer].
#[derive(Clone)]
pub struct TracingService<S> {
inner: S,
}
Expand Down
Loading