Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions crates/factor-outbound-http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,66 @@ impl InstanceState {

impl SelfInstanceBuilder for InstanceState {}

/// Helper module for acquiring permits from the outbound connections semaphore.
///
/// This is used by the outbound HTTP implementations to limit concurrent outbound connections.
mod concurrent_outbound_connections {
use super::*;

/// Acquires a semaphore permit for the given interface, if a semaphore is configured.
pub async fn acquire_semaphore<'a>(
interface: &str,
semaphore: &'a Option<Arc<Semaphore>>,
) -> Option<tokio::sync::SemaphorePermit<'a>> {
let s = semaphore.as_ref()?;
acquire(interface, || s.try_acquire(), async || s.acquire().await).await
}

/// Acquires an owned semaphore permit for the given interface, if a semaphore is configured.
pub async fn acquire_owned_semaphore(
interface: &str,
semaphore: &Option<Arc<Semaphore>>,
) -> Option<tokio::sync::OwnedSemaphorePermit> {
let s = semaphore.as_ref()?;
acquire(
interface,
|| s.clone().try_acquire_owned(),
async || s.clone().acquire_owned().await,
)
.await
}

/// Helper function to acquire a semaphore permit, either immediately or by waiting.
///
/// Allows getting either a borrowed or owned permit.
async fn acquire<T>(
interface: &str,
try_acquire: impl Fn() -> Result<T, tokio::sync::TryAcquireError>,
acquire: impl AsyncFnOnce() -> Result<T, tokio::sync::AcquireError>,
) -> Option<T> {
// Try to acquire a permit without waiting first
// Keep track of whether we had to wait for metrics purposes.
let mut waited = false;
let permit = match try_acquire() {
Ok(p) => Ok(p),
// No available permits right now; wait for one
Err(tokio::sync::TryAcquireError::NoPermits) => {
waited = true;
acquire().await.map_err(|_| ())
}
Err(_) => Err(()),
};
if permit.is_ok() {
spin_telemetry::monotonic_counter!(
outbound_http.concurrent_connection_permits_acquired = 1,
interface = interface,
waited = waited
);
}
permit.ok()
}
}

pub type Request = http::Request<wasmtime_wasi_http::body::HyperOutgoingBody>;
pub type Response = http::Response<wasmtime_wasi_http::body::HyperIncomingBody>;

Expand Down
9 changes: 5 additions & 4 deletions crates/factor-outbound-http/src/spin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,11 @@ impl spin_http::Host for crate::InstanceState {
// If we're limiting concurrent outbound requests, acquire a permit
// Note: since we don't have access to the underlying connection, we can only
// limit the number of concurrent requests, not connections.
let permit = match &self.concurrent_outbound_connections_semaphore {
Some(s) => s.acquire().await.ok(),
None => None,
};
let permit = crate::concurrent_outbound_connections::acquire_semaphore(
"spin",
&self.concurrent_outbound_connections_semaphore,
)
.await;
let resp = client.execute(req).await.map_err(log_reqwest_error)?;
drop(permit);

Expand Down
10 changes: 6 additions & 4 deletions crates/factor-outbound-http/src/wasi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -598,10 +598,12 @@ impl ConnectOptions {
}

// If we're limiting concurrent outbound requests, acquire a permit
let permit = match &self.concurrent_outbound_connections_semaphore {
Some(s) => s.clone().acquire_owned().await.ok(),
None => None,
};

let permit = crate::concurrent_outbound_connections::acquire_owned_semaphore(
"wasi",
&self.concurrent_outbound_connections_semaphore,
)
.await;

let stream = timeout(self.connect_timeout, TcpStream::connect(&*socket_addrs))
.await
Expand Down