Skip to content

Commit 7fa967b

Browse files
authored
Merge pull request #3313 from spinframework/outbound-http-permits-metrics
Outbound http permits metrics
2 parents faa7650 + 8cc9dab commit 7fa967b

File tree

4 files changed

+95
-8
lines changed

4 files changed

+95
-8
lines changed

crates/factor-outbound-http/src/lib.rs

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,66 @@ impl InstanceState {
142142

143143
impl SelfInstanceBuilder for InstanceState {}
144144

145+
/// Helper module for acquiring permits from the outbound connections semaphore.
146+
///
147+
/// This is used by the outbound HTTP implementations to limit concurrent outbound connections.
148+
mod concurrent_outbound_connections {
149+
use super::*;
150+
151+
/// Acquires a semaphore permit for the given interface, if a semaphore is configured.
152+
pub async fn acquire_semaphore<'a>(
153+
interface: &str,
154+
semaphore: &'a Option<Arc<Semaphore>>,
155+
) -> Option<tokio::sync::SemaphorePermit<'a>> {
156+
let s = semaphore.as_ref()?;
157+
acquire(interface, || s.try_acquire(), async || s.acquire().await).await
158+
}
159+
160+
/// Acquires an owned semaphore permit for the given interface, if a semaphore is configured.
161+
pub async fn acquire_owned_semaphore(
162+
interface: &str,
163+
semaphore: &Option<Arc<Semaphore>>,
164+
) -> Option<tokio::sync::OwnedSemaphorePermit> {
165+
let s = semaphore.as_ref()?;
166+
acquire(
167+
interface,
168+
|| s.clone().try_acquire_owned(),
169+
async || s.clone().acquire_owned().await,
170+
)
171+
.await
172+
}
173+
174+
/// Helper function to acquire a semaphore permit, either immediately or by waiting.
175+
///
176+
/// Allows getting either a borrowed or owned permit.
177+
async fn acquire<T>(
178+
interface: &str,
179+
try_acquire: impl Fn() -> Result<T, tokio::sync::TryAcquireError>,
180+
acquire: impl AsyncFnOnce() -> Result<T, tokio::sync::AcquireError>,
181+
) -> Option<T> {
182+
// Try to acquire a permit without waiting first
183+
// Keep track of whether we had to wait for metrics purposes.
184+
let mut waited = false;
185+
let permit = match try_acquire() {
186+
Ok(p) => Ok(p),
187+
// No available permits right now; wait for one
188+
Err(tokio::sync::TryAcquireError::NoPermits) => {
189+
waited = true;
190+
acquire().await.map_err(|_| ())
191+
}
192+
Err(_) => Err(()),
193+
};
194+
if permit.is_ok() {
195+
spin_telemetry::monotonic_counter!(
196+
outbound_http.concurrent_connection_permits_acquired = 1,
197+
interface = interface,
198+
waited = waited
199+
);
200+
}
201+
permit.ok()
202+
}
203+
}
204+
145205
pub type Request = http::Request<wasmtime_wasi_http::body::HyperOutgoingBody>;
146206
pub type Response = http::Response<wasmtime_wasi_http::body::HyperIncomingBody>;
147207

crates/factor-outbound-http/src/spin.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -104,10 +104,11 @@ impl spin_http::Host for crate::InstanceState {
104104
// If we're limiting concurrent outbound requests, acquire a permit
105105
// Note: since we don't have access to the underlying connection, we can only
106106
// limit the number of concurrent requests, not connections.
107-
let permit = match &self.concurrent_outbound_connections_semaphore {
108-
Some(s) => s.acquire().await.ok(),
109-
None => None,
110-
};
107+
let permit = crate::concurrent_outbound_connections::acquire_semaphore(
108+
"spin",
109+
&self.concurrent_outbound_connections_semaphore,
110+
)
111+
.await;
111112
let resp = client.execute(req).await.map_err(log_reqwest_error)?;
112113
drop(permit);
113114

crates/factor-outbound-http/src/wasi.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -590,10 +590,12 @@ impl ConnectOptions {
590590
crate::remove_blocked_addrs(&self.blocked_networks, &mut socket_addrs)?;
591591

592592
// If we're limiting concurrent outbound requests, acquire a permit
593-
let permit = match &self.concurrent_outbound_connections_semaphore {
594-
Some(s) => s.clone().acquire_owned().await.ok(),
595-
None => None,
596-
};
593+
594+
let permit = crate::concurrent_outbound_connections::acquire_owned_semaphore(
595+
"wasi",
596+
&self.concurrent_outbound_connections_semaphore,
597+
)
598+
.await;
597599

598600
let stream = timeout(self.connect_timeout, TcpStream::connect(&*socket_addrs))
599601
.await

crates/telemetry/src/metrics.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ pub(crate) fn otel_metrics_layer<S: Subscriber + for<'span> LookupSpan<'span>>(
6161
///
6262
/// The increment may only be an i64 or f64. You must not mix types for the same metric.
6363
///
64+
/// Takes advantage of counter support in [tracing-opentelemetry](https://docs.rs/tracing-opentelemetry/0.32.0/tracing_opentelemetry/struct.MetricsLayer.html).
65+
///
6466
/// ```no_run
6567
/// # use spin_telemetry::metrics::counter;
6668
/// counter!(spin.metric_name = 1, metric_attribute = "value");
@@ -76,6 +78,8 @@ macro_rules! counter {
7678
///
7779
/// The increment may only be an i64 or f64. You must not mix types for the same metric.
7880
///
81+
/// Takes advantage of histogram support in [tracing-opentelemetry](https://docs.rs/tracing-opentelemetry/0.32.0/tracing_opentelemetry/struct.MetricsLayer.html).
82+
///
7983
/// ```no_run
8084
/// # use spin_telemetry::metrics::histogram;
8185
/// histogram!(spin.metric_name = 1.5, metric_attribute = "value");
@@ -91,6 +95,8 @@ macro_rules! histogram {
9195
///
9296
/// The increment may only be a positive i64 or f64. You must not mix types for the same metric.
9397
///
98+
/// Takes advantage of monotonic counter support in [tracing-opentelemetry](https://docs.rs/tracing-opentelemetry/0.32.0/tracing_opentelemetry/struct.MetricsLayer.html).
99+
///
94100
/// ```no_run
95101
/// # use spin_telemetry::metrics::monotonic_counter;
96102
/// monotonic_counter!(spin.metric_name = 1, metric_attribute = "value");
@@ -101,6 +107,24 @@ macro_rules! monotonic_counter {
101107
}
102108
}
103109

110+
#[macro_export]
111+
/// Records an increment to the named monotonic counter with the given attributes.
112+
///
113+
/// The increment may only be a positive i64 or f64. You must not mix types for the same metric.
114+
///
115+
/// Takes advantage of gauge support in [tracing-opentelemetry](https://docs.rs/tracing-opentelemetry/0.32.0/tracing_opentelemetry/struct.MetricsLayer.html).
116+
///
117+
/// ```no_run
118+
/// # use spin_telemetry::metrics::gauge;
119+
/// gauge!(spin.metric_name = 1, metric_attribute = "value");
120+
/// ```
121+
macro_rules! gauge {
122+
($metric:ident $(. $suffixes:ident)* = $metric_value:expr $(, $attrs:ident=$values:expr)*) => {
123+
tracing::trace!(gauge.$metric $(. $suffixes)* = $metric_value $(, $attrs=$values)*);
124+
}
125+
}
126+
104127
pub use counter;
128+
pub use gauge;
105129
pub use histogram;
106130
pub use monotonic_counter;

0 commit comments

Comments
 (0)