Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 33 additions & 25 deletions src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,29 +281,29 @@ impl<Conn: Connection> PoolInner<Conn> {
}
};

// Cancel safety: All branches of this select! statement are
// cancel-safe (mpsc::Receiver::recv, tokio::time::sleep_until,
// monitoring the tokio::sync::watch::Receivers)
//
// Futurelock safety: All select arms are queried concurrently. No
// awaiting happens outside this concurrent polling.
Comment on lines +288 to +289
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, this select is also futurelock-safe since it contains no arms where the future is an &mut future, so there is no un-dropped future that may have acquired a lock or similar resource. It might be worth mentioning that here, as well; I think part of the value of these "safety" comments is that they enumerate all of the things which, if someone were to change, they might introduce a problem.

tokio::select! {
// Handle requests from clients
request = self.rx.recv() => {
match request {
Some(Request::Claim { id, tx }) => {
self.claim_or_enqueue(id, tx).await
self.claim_or_enqueue(id, tx)
}
// The caller has explicitly asked us to terminate, and
// we should respond to them once we've stopped doing
// work.
Some(Request::Terminate) => {
self.terminate().await;
return;
},
Some(Request::Terminate) => break,
// The caller has abandoned their connection to the pool.
//
// We stop handling new requests, but have no one to
// notify. Given that the caller no longer needs the
// pool, we choose to terminate to avoid leaks.
None => {
self.terminate().await;
return;
}
None => break,
}
}
// Timeout old requests from clients
Expand All @@ -326,28 +326,36 @@ impl<Conn: Connection> PoolInner<Conn> {
// Periodically rebalance the allocation of slots to backends
_ = rebalance_interval.tick() => {
event!(Level::INFO, "Rebalancing: timer tick");
self.rebalance().await;
self.rebalance();
}
// If any of the slots change state, update their allocations.
Some((name, status)) = &mut backend_status_stream.next(), if !backend_status_stream.is_empty() => {
Some((name, status)) = backend_status_stream.next(), if !backend_status_stream.is_empty() => {
event!(Level::INFO, name = ?name, status = ?status, "Rebalancing: Backend has new status");
rebalance_interval.reset();
self.rebalance().await;
self.rebalance();

if matches!(status, slot::SetState::Online { has_unclaimed_slots: true }) {
self.try_claim_from_queue().await;
self.try_claim_from_queue();
}
},
}
}

// Out of an abundance of caution, to avoid futurelock: drop all
// possible unpolled futures before invoking terminate.
drop(rebalance_interval);
drop(backend_status_stream);
drop(resolver_stream);
Comment on lines +344 to +348
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is necessary. The Interval and Streams are not Futures, they are values with methods that return futures. Since we are calling those methods directly in the various select arms, the futures returned by those methods are already dropped. Dropping these eagerly feels like unnecessary ceremony that doesn't actually reduce the risk of potential futurelock.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be clear, I think we should probably remove this, since it doesn't actually get us any additional safety. My concern about this sort of thing is that someone else might come along and read this code, and get the impression that doing this will protect us from futurelock, when it doesn't actually. You and I have spent enough time debugging this that we understand the problem fairly well, but someone who doesn't might think that this ritual will protect them when it won't, and then we've inadvertently created a superstition.


self.terminate().await;
}

async fn claim_or_enqueue(
fn claim_or_enqueue(
&mut self,
id: ClaimId,
tx: oneshot::Sender<Result<claim::Handle<Conn>, Error>>,
) {
let result = self.claim(id).await;
let result = self.claim(id);
if result.is_ok() {
let _ = tx.send(result);
return;
Expand All @@ -364,13 +372,13 @@ impl<Conn: Connection> PoolInner<Conn> {
});
}

async fn try_claim_from_queue(&mut self) {
fn try_claim_from_queue(&mut self) {
loop {
let Some(request) = self.request_queue.pop_front() else {
return;
};

let result = self.claim(request.id).await;
let result = self.claim(request.id);
if result.is_ok() {
let _ = request.tx.send(result);
} else {
Expand All @@ -394,16 +402,16 @@ impl<Conn: Connection> PoolInner<Conn> {
}

#[instrument(skip(self), name = "PoolInner::rebalance")]
async fn rebalance(&mut self) {
fn rebalance(&mut self) {
#[cfg(feature = "probes")]
probes::rebalance__start!(|| self.name.as_str());
self.rebalance_inner().await;
self.rebalance_inner();

#[cfg(feature = "probes")]
probes::rebalance__done!(|| self.name.as_str());
}

async fn rebalance_inner(&mut self) {
fn rebalance_inner(&mut self) {
let mut questionable_backend_count = 0;
let mut usable_backends = vec![];

Expand All @@ -412,7 +420,7 @@ impl<Conn: Connection> PoolInner<Conn> {
for (name, slot_set) in iter {
match slot_set.get_state() {
slot::SetState::Offline => {
let _ = slot_set.set_wanted_count(1).await;
let _ = slot_set.set_wanted_count(1);
questionable_backend_count += 1;
}
slot::SetState::Online { .. } => {
Expand Down Expand Up @@ -442,7 +450,7 @@ impl<Conn: Connection> PoolInner<Conn> {
let Some(slot_set) = self.slots.get_mut(&name) else {
continue;
};
let _ = slot_set.set_wanted_count(slots_wanted_per_backend).await;
let _ = slot_set.set_wanted_count(slots_wanted_per_backend);
}

let mut new_priority_list = PriorityList::new();
Expand Down Expand Up @@ -472,7 +480,7 @@ impl<Conn: Connection> PoolInner<Conn> {
self.priority_list = new_priority_list;
}

async fn claim(&mut self, id: ClaimId) -> Result<claim::Handle<Conn>, Error> {
fn claim(&mut self, id: ClaimId) -> Result<claim::Handle<Conn>, Error> {
let mut attempted_backend = vec![];
let mut result = Err(Error::NoBackends);

Expand Down Expand Up @@ -504,7 +512,7 @@ impl<Conn: Connection> PoolInner<Conn> {
//
// Either way, put this backend back in the priority list after
// we're done with it.
let Ok(claim) = set.claim(id).await else {
let Ok(claim) = set.claim(id) else {
event!(Level::DEBUG, "Failed to actually get claim for backend");
rebalancer::claimed_err(&mut weighted_backend);
attempted_backend.push(weighted_backend);
Expand All @@ -525,7 +533,7 @@ impl<Conn: Connection> PoolInner<Conn> {
Err(_) => probes::pool__claim__failed!(|| (self.name.as_str(), id.0)),
}

self.priority_list.extend(attempted_backend.into_iter());
self.priority_list.extend(attempted_backend);
result
}
}
Expand Down
67 changes: 44 additions & 23 deletions src/resolvers/dns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use hickory_resolver::config::ResolverOpts;
use hickory_resolver::error::{ResolveError, ResolveErrorKind};
use hickory_resolver::TokioAsyncResolver;
use std::collections::{BTreeMap, HashMap};
use std::future::Future;
use std::net::SocketAddr;
use std::net::SocketAddrV6;
use std::sync::atomic::{AtomicBool, Ordering};
Expand Down Expand Up @@ -171,22 +172,40 @@ impl DnsResolverWorker {
.or_insert_with(|| Client::new(&self.config, address, failure_window));
}

// This function is cancel-safe.
async fn tick_and_query_dns(&mut self, query_interval: &mut tokio::time::Interval) {
// We want to wait for "query_interval"'s timeout to pass before
// starting to query DNS. However, if we're partway through "query_dns"
// and we are cancelled, we'd like to resume immediately.
//
// To accomplish this:
// - After we tick once, we "reset_immediately" so tick will fire
// again immediately if this future is dropped and re-created.
// - Once we finish "query_dns", we reset the query interval to
// actually respect the "tick period" of time.
query_interval.tick().await;
query_interval.reset_immediately();

self.query_dns().await;
if self.backends.is_empty() {
query_interval.reset_after(self.config.query_retry_if_no_records_found);
} else {
query_interval.reset();
}
}

async fn run(mut self, mut terminate_rx: tokio::sync::oneshot::Receiver<()>) {
let mut query_interval = tokio::time::interval(self.config.query_interval);
loop {
let next_tick = query_interval.tick();
let next_backend_expiration = self.sleep_until_next_backend_expiration();

// Cancel safety: All branches are cancel-safe.
//
// Futurelock safety: All select arms are queried concurrently. No
// awaiting happens outside this concurrent polling.
Comment on lines +204 to +205
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one I think is important because we are select!ing over an &mut future in a loop, which is a warning sign. I might also note that tick_and_query_dns does not interact with terminate_rx, although the fact that all that code runs in the select's future rather than the body of the select means it would be safe even if it did change to do that.

tokio::select! {
_ = &mut terminate_rx => {
return;
},
_ = next_tick => {
self.query_dns().await;
if self.backends.is_empty() {
query_interval.reset_after(self.config.query_retry_if_no_records_found);
}
},
_ = &mut terminate_rx => return,
_ = self.tick_and_query_dns(&mut query_interval) => {},
backend_name = next_backend_expiration => {
if self.backends.remove(&backend_name).is_some() {
self.watch_tx.send_modify(|backends| {
Expand Down Expand Up @@ -342,7 +361,8 @@ impl DnsResolverWorker {
});
}

async fn sleep_until_next_backend_expiration(&self) -> backend::Name {
// This function is cancel-safe.
fn sleep_until_next_backend_expiration(&self) -> impl Future<Output = backend::Name> {
let next_expiration = self.backends.iter().reduce(|soonest, backend| {
let Some(backend_expiration) = backend.1.expires_at else {
return soonest;
Expand All @@ -364,20 +384,21 @@ impl DnsResolverWorker {
}
});

let Some((
name,
BackendRecord {
expires_at: Some(deadline),
..
},
)) = next_expiration
else {
let () = futures::future::pending().await;
unreachable!();
let (name, deadline) = match next_expiration {
Some((
name,
BackendRecord {
expires_at: Some(deadline),
..
},
)) => (name.clone(), *deadline),
_ => return futures::future::Either::Left(futures::future::pending()),
};

tokio::time::sleep_until((*deadline).into()).await;
name.clone()
futures::future::Either::Right(async move {
tokio::time::sleep_until(deadline.into()).await;
name
})
}
}

Expand Down
Loading