Skip to content

Commit

Permalink
gate: Detect disconnected inner services in readiness (#2491)
Browse files Browse the repository at this point in the history
If `Gate` becomes ready, it assumes the inner service remains ready
indefinitely.

Load balancers rely on lazy and redudant readiness checking to avoid
disconnected endpoints.

This change fixes the Gate to ensure that the inner service is always
polled whenever the gate is polled.
  • Loading branch information
olix0r authored Oct 25, 2023
1 parent 986d458 commit 4f68425
Showing 1 changed file with 46 additions and 3 deletions.
49 changes: 46 additions & 3 deletions linkerd/stack/src/gate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,11 +173,12 @@ where
type Future = S::Future;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if self.permit.is_ready() {
return Poll::Ready(Ok(()));
}
// If we previously polled to ready and acquired a permit, clear it so
// we can reestablish readiness without holding it.
self.permit = Poll::Pending;
let permit = ready!(self.poll_acquire(cx));
ready!(self.inner.poll_ready(cx))?;
tracing::trace!("Acquired permit");
self.permit = Poll::Ready(permit);
Poll::Ready(Ok(()))
}
Expand Down Expand Up @@ -227,6 +228,7 @@ impl<S> Gate<S> {
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::AtomicBool;
use tokio_test::{assert_pending, assert_ready, task};

#[tokio::test]
Expand Down Expand Up @@ -262,6 +264,47 @@ mod tests {
assert_ready!(gate.poll_ready()).expect("ok");
}

#[tokio::test]
async fn gate_repolls_back_to_pending() {
let (tx, rx) = channel();
let pending = Arc::new(AtomicBool::new(false));
let (mut gate, mut handle) = {
struct Svc<S>(S, Arc<AtomicBool>);
impl<Req, S: Service<Req>> Service<Req> for Svc<S> {
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;
fn poll_ready(
&mut self,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
if self.1.load(std::sync::atomic::Ordering::Relaxed) {
return Poll::Pending;
}
self.0.poll_ready(cx)
}
fn call(&mut self, req: Req) -> Self::Future {
self.0.call(req)
}
}

let pending = pending.clone();
tower_test::mock::spawn_with::<(), (), _, _>(move |inner| {
Gate::new(rx.clone(), Svc(inner, pending.clone()))
})
};

tx.open();
handle.allow(1);
assert_ready!(gate.poll_ready()).expect("ok");

pending.store(true, std::sync::atomic::Ordering::Relaxed);
assert_pending!(gate.poll_ready());

pending.store(false, std::sync::atomic::Ordering::Relaxed);
assert_ready!(gate.poll_ready()).expect("ok");
}

#[tokio::test]
async fn notifies_on_open() {
let (tx, rx) = channel();
Expand Down

0 comments on commit 4f68425

Please sign in to comment.