Skip to content

Commit cc4a440

Browse files
fix(runtime): keep concurrency settings internal
- drop Config.max_concurrency to avoid semver breakage - add Runtime.concurrency_limit derived from AWS_LAMBDA_MAX_CONCURRENCY - size the API client pool to match the up-front worker allocation - update tests and Runtime literals accordingly
1 parent f4a7612 commit cc4a440

File tree

2 files changed

+20
-74
lines changed

2 files changed

+20
-74
lines changed

lambda-runtime/src/lib.rs

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,6 @@ pub struct Config {
5959
pub log_stream: String,
6060
/// The name of the Amazon CloudWatch Logs group for the function.
6161
pub log_group: String,
62-
/// Maximum concurrent invocations for Lambda managed-concurrency environments.
63-
/// Populated from `AWS_LAMBDA_MAX_CONCURRENCY` when present.
64-
pub max_concurrency: Option<u32>,
6562
}
6663

6764
type RefConfig = Arc<Config>;
@@ -78,17 +75,8 @@ impl Config {
7875
version: env::var("AWS_LAMBDA_FUNCTION_VERSION").expect("Missing AWS_LAMBDA_FUNCTION_VERSION env var"),
7976
log_stream: env::var("AWS_LAMBDA_LOG_STREAM_NAME").unwrap_or_default(),
8077
log_group: env::var("AWS_LAMBDA_LOG_GROUP_NAME").unwrap_or_default(),
81-
max_concurrency: env::var("AWS_LAMBDA_MAX_CONCURRENCY")
82-
.ok()
83-
.and_then(|v| v.parse::<u32>().ok())
84-
.filter(|&c| c > 0),
8578
}
8679
}
87-
88-
/// Returns true if concurrent runtime mode should be enabled.
89-
pub fn is_concurrent(&self) -> bool {
90-
self.max_concurrency.map(|c| c > 1).unwrap_or(false)
91-
}
9280
}
9381

9482
/// Return a new [`ServiceFn`] with a closure that takes an event and context as separate arguments.

lambda-runtime/src/runtime.rs

Lines changed: 20 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ pub struct Runtime<S> {
6060
service: S,
6161
config: Arc<Config>,
6262
client: Arc<ApiClient>,
63+
concurrency_limit: u32,
6364
}
6465

6566
/// One-time marker to log X-Ray behavior in concurrent mode.
@@ -102,7 +103,9 @@ where
102103
pub fn new(handler: F) -> Self {
103104
trace!("Loading config from env");
104105
let config = Arc::new(Config::from_env());
105-
let pool_size = config.max_concurrency.unwrap_or(1).max(1) as usize;
106+
let concurrency_limit = max_concurrency_from_env().unwrap_or(1).max(1);
107+
// Strategy: allocate all worker tasks up-front, so size the client pool to match.
108+
let pool_size = concurrency_limit as usize;
106109
let client = Arc::new(
107110
ApiClient::builder()
108111
.with_pool_size(pool_size)
@@ -113,6 +116,7 @@ where
113116
service: wrap_handler(handler, client.clone()),
114117
config,
115118
client,
119+
concurrency_limit,
116120
}
117121
}
118122
}
@@ -149,6 +153,7 @@ impl<S> Runtime<S> {
149153
client: self.client,
150154
config: self.config,
151155
service: layer.layer(self.service),
156+
concurrency_limit: self.concurrency_limit,
152157
}
153158
}
154159
}
@@ -164,9 +169,8 @@ where
164169
/// sequential `run_with_incoming` loop so that the same handler can run on both
165170
/// classic Lambda and Lambda Managed Instances.
166171
pub async fn run_concurrent(self) -> Result<(), BoxError> {
167-
if self.config.is_concurrent() {
168-
let max_concurrency = self.config.max_concurrency.unwrap_or(1);
169-
Self::run_concurrent_inner(self.service, self.config, self.client, max_concurrency).await
172+
if self.concurrency_limit > 1 {
173+
Self::run_concurrent_inner(self.service, self.config, self.client, self.concurrency_limit).await
170174
} else {
171175
let incoming = incoming(&self.client);
172176
Self::run_with_incoming(self.service, self.config, incoming).await
@@ -178,9 +182,9 @@ where
178182
service: S,
179183
config: Arc<Config>,
180184
client: Arc<ApiClient>,
181-
max_concurrency: u32,
185+
concurrency_limit: u32,
182186
) -> Result<(), BoxError> {
183-
let limit = max_concurrency as usize;
187+
let limit = concurrency_limit as usize;
184188

185189
let mut workers = FuturesUnordered::new();
186190
for _ in 1..limit {
@@ -321,6 +325,13 @@ async fn next_event_future(client: Arc<ApiClient>) -> Result<http::Response<hype
321325
client.call(req).await
322326
}
323327

328+
fn max_concurrency_from_env() -> Option<u32> {
329+
env::var("AWS_LAMBDA_MAX_CONCURRENCY")
330+
.ok()
331+
.and_then(|v| v.parse::<u32>().ok())
332+
.filter(|&c| c > 0)
333+
}
334+
324335
async fn concurrent_worker_loop<S>(mut service: S, config: Arc<Config>, client: Arc<ApiClient>) -> Result<(), BoxError>
325336
where
326337
S: Service<LambdaInvocation, Response = (), Error = BoxError>,
@@ -573,6 +584,7 @@ mod endpoint_tests {
573584
client: client.clone(),
574585
config: Arc::new(config),
575586
service: wrap_handler(f, client),
587+
concurrency_limit: 1,
576588
};
577589
let client = &runtime.client;
578590
let incoming = incoming(client).take(1);
@@ -620,14 +632,14 @@ mod endpoint_tests {
620632
version: "1".to_string(),
621633
log_stream: "test_stream".to_string(),
622634
log_group: "test_log".to_string(),
623-
max_concurrency: None,
624635
});
625636

626637
let client = Arc::new(client);
627638
let runtime = Runtime {
628639
client: client.clone(),
629640
config,
630641
service: wrap_handler(f, client),
642+
concurrency_limit: 1,
631643
};
632644
let client = &runtime.client;
633645
let incoming = incoming(client).take(1);
@@ -651,60 +663,6 @@ mod endpoint_tests {
651663
.await
652664
}
653665

654-
#[test]
655-
fn config_parses_max_concurrency() {
656-
// Preserve existing env values
657-
let prev_fn = env::var("AWS_LAMBDA_FUNCTION_NAME").ok();
658-
let prev_mem = env::var("AWS_LAMBDA_FUNCTION_MEMORY_SIZE").ok();
659-
let prev_ver = env::var("AWS_LAMBDA_FUNCTION_VERSION").ok();
660-
let prev_log_stream = env::var("AWS_LAMBDA_LOG_STREAM_NAME").ok();
661-
let prev_log_group = env::var("AWS_LAMBDA_LOG_GROUP_NAME").ok();
662-
let prev_max = env::var("AWS_LAMBDA_MAX_CONCURRENCY").ok();
663-
664-
env::set_var("AWS_LAMBDA_FUNCTION_NAME", "test_fn");
665-
env::set_var("AWS_LAMBDA_FUNCTION_MEMORY_SIZE", "128");
666-
env::set_var("AWS_LAMBDA_FUNCTION_VERSION", "1");
667-
env::set_var("AWS_LAMBDA_LOG_STREAM_NAME", "test_stream");
668-
env::set_var("AWS_LAMBDA_LOG_GROUP_NAME", "test_log");
669-
env::set_var("AWS_LAMBDA_MAX_CONCURRENCY", "4");
670-
671-
let cfg = Config::from_env();
672-
assert_eq!(cfg.max_concurrency, Some(4));
673-
assert!(cfg.is_concurrent());
674-
675-
// Restore env
676-
if let Some(v) = prev_fn {
677-
env::set_var("AWS_LAMBDA_FUNCTION_NAME", v);
678-
} else {
679-
env::remove_var("AWS_LAMBDA_FUNCTION_NAME");
680-
}
681-
if let Some(v) = prev_mem {
682-
env::set_var("AWS_LAMBDA_FUNCTION_MEMORY_SIZE", v);
683-
} else {
684-
env::remove_var("AWS_LAMBDA_FUNCTION_MEMORY_SIZE");
685-
}
686-
if let Some(v) = prev_ver {
687-
env::set_var("AWS_LAMBDA_FUNCTION_VERSION", v);
688-
} else {
689-
env::remove_var("AWS_LAMBDA_FUNCTION_VERSION");
690-
}
691-
if let Some(v) = prev_log_stream {
692-
env::set_var("AWS_LAMBDA_LOG_STREAM_NAME", v);
693-
} else {
694-
env::remove_var("AWS_LAMBDA_LOG_STREAM_NAME");
695-
}
696-
if let Some(v) = prev_log_group {
697-
env::set_var("AWS_LAMBDA_LOG_GROUP_NAME", v);
698-
} else {
699-
env::remove_var("AWS_LAMBDA_LOG_GROUP_NAME");
700-
}
701-
if let Some(v) = prev_max {
702-
env::set_var("AWS_LAMBDA_MAX_CONCURRENCY", v);
703-
} else {
704-
env::remove_var("AWS_LAMBDA_MAX_CONCURRENCY");
705-
}
706-
}
707-
708666
#[tokio::test]
709667
async fn concurrent_worker_crash_does_not_stop_other_workers() -> Result<(), Error> {
710668
let next_calls = Arc::new(AtomicUsize::new(0));
@@ -831,9 +789,9 @@ mod endpoint_tests {
831789
version: "1".to_string(),
832790
log_stream: "test_stream".to_string(),
833791
log_group: "test_log".to_string(),
834-
max_concurrency: Some(2),
835792
}),
836793
service: wrap_handler(handler, client),
794+
concurrency_limit: 2,
837795
};
838796

839797
let res = tokio::time::timeout(Duration::from_secs(2), runtime.run_concurrent()).await;

0 commit comments

Comments
 (0)