Skip to content

Commit

Permalink
[nexus] Explicitly terminate pools for qorb (#6881)
Browse files Browse the repository at this point in the history
Fixes #6505 , integrates
usage of the new qorb APIs to terminate pools cleanly:
oxidecomputer/qorb#45

# Background

-
[https://github.com/oxidecomputer/qorb](https://github.com/oxidecomputer/qorb)
is a connection pooling crate, which spins up tokio task that attempt to
connect to backends.
- When `qorb` was integrated into Omicron in
#5876, I used it to connect
to our database backend (CockroachDB). This included usage in tests,
even with a "single backend host" (one, test-only CRDB server) -- I
wanted to ensure that we used the same pool and connector logic in tests
and prod.

# What Went Wrong

As identified in #6505 , we saw some tests failing during
**termination**. The specific cause of the failure was a panic from
[async-bb8-diesel](https://github.com/oxidecomputer/async-bb8-diesel),
where we attempted to spawn tasks with a terminating tokio executor.
This issue stems from async-bb8-diesel's usage of
`tokio::task::spawn_blocking`, where the returned `JoinHandle` is
immediately awaited and **unwrapped**, with an expectation that "we
should always be able to spawn a blocking task".

There was a separate discussion about "whether or not async-bb8-diesel
should be unwrapping in this case" (see:
oxidecomputer/async-bb8-diesel#77), but instead,
I chose to focus on the question:

## Why are we trying to send requests to async-bb8-diesel while the
tokio runtime is exiting?

The answer to this question lies in qorb's internals -- qorb itself
spawns many tokio tasks to handle ongoing work, including monitoring DNS
resolution, checking on backend health, and making connections before
they are requested. One of these qorb tasks calling `ping_async`, which
checks connection health, used the `async-bb8-diesel` interface that
ultimately panicked.

Within qorb most of these tokio tasks have a drop implementation of the
form:

```rust
struct MyQorbStructure {
  handle: tokio::task::JoinHandle<()>,
}

impl Drop for MyQorbStructure {
  fn drop(&mut self) {
    self.handle.abort();
  }
}
```

Tragically, without async drop support in Rust, this is the best we can
do implicitly -- signal that the background tasks should stop ASAP --
but that may not be soon enough! Calling `.abort()` on the `JoinHandle`
does not terminate the task immediately, it simply signals that it
should shut down at the next yield point.

As a result, we can still see the flake observed in #6505:

- A qorb pool is initialized with background tasks
- One of the qorb worker tasks is about to make a call to check on the
health of a connection to a backend
- The test finishes, and returns. The tokio runtime begins terminating
- We call `drop` on the `qorb` pool, which signals the background task
should abort
- The aforementioned qorb worker task makes the call to `ping_async`,
which calls `spawn_blocking`. This fails, because the tokio runtime is
terminating, and returns a
[JoinError::Cancelled](https://buildomat.eng.oxide.computer/wg/0/details/01J9YQVN7X5EQNXFSEY6XJBH8B/zfviqPz9RoJp3bY4TafbyqXTwbhqdr7w4oupqBtVARR00CXF/01J9YQWAXY36WM0R2VG27QMFRK#S6049).
- `async-bb8-diesel` unwraps this `JoinError`, and the test panics.

# How do we mitigate this?

That's where this PR comes in. Using the new qorb APIs, we don't rely on
the synchronous drop methods -- we explicitly call `.terminate().await`
functions which do the following:

- Use `tokio::sync::oneshot`s as signals to `tokio::tasks` that they
should exit
- `.await` the `JoinHandle` for those tasks before returning

Doing this work explicitly as a part of cleanup ensures that there are
not any background tasks attempting to do new work while the tokio
runtime is terminating.
  • Loading branch information
smklein authored Oct 18, 2024
1 parent 27a4365 commit 14edbf3
Show file tree
Hide file tree
Showing 70 changed files with 1,215 additions and 1,115 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,7 @@ propolis_api_types = { git = "https://github.com/oxidecomputer/propolis", rev =
propolis-client = { git = "https://github.com/oxidecomputer/propolis", rev = "11371b0f3743f8df5b047dc0edc2699f4bdf3927" }
propolis-mock-server = { git = "https://github.com/oxidecomputer/propolis", rev = "11371b0f3743f8df5b047dc0edc2699f4bdf3927" }
proptest = "1.5.0"
qorb = "0.1.1"
qorb = "0.1.2"
quote = "1.0"
rand = "0.8.5"
rand_core = "0.6.4"
Expand Down
12 changes: 5 additions & 7 deletions dev-tools/omdb/src/bin/omdb/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use crate::check_allow_destructive::DestructiveOperationToken;
use crate::helpers::CONNECTION_OPTIONS_HEADING;
use crate::helpers::DATABASE_OPTIONS_HEADING;
use crate::Omdb;
use anyhow::anyhow;
use anyhow::bail;
use anyhow::Context;
use async_bb8_diesel::AsyncConnection;
Expand Down Expand Up @@ -255,10 +254,7 @@ impl DbUrlOptions {
// doesn't match what we expect. So we use `DataStore::new_unchecked()`
// here. We will then check the schema version explicitly and warn the
// user if it doesn't match.
let datastore = Arc::new(
DataStore::new_unchecked(log.clone(), pool)
.map_err(|e| anyhow!(e).context("creating datastore"))?,
);
let datastore = Arc::new(DataStore::new_unchecked(log.clone(), pool));
check_schema_version(&datastore).await;
Ok(datastore)
}
Expand Down Expand Up @@ -785,7 +781,7 @@ impl DbArgs {
) -> Result<(), anyhow::Error> {
let datastore = self.db_url_opts.connect(omdb, log).await?;
let opctx = OpContext::for_tests(log.clone(), datastore.clone());
match &self.command {
let res = match &self.command {
DbCommands::Rack(RackArgs { command: RackCommands::List }) => {
cmd_db_rack_list(&opctx, &datastore, &self.fetch_opts).await
}
Expand Down Expand Up @@ -1013,7 +1009,9 @@ impl DbArgs {
DbCommands::Volumes(VolumeArgs {
command: VolumeCommands::List,
}) => cmd_db_volume_list(&datastore, &self.fetch_opts).await,
}
};
datastore.terminate().await;
res
}
}

Expand Down
2 changes: 1 addition & 1 deletion live-tests/macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ pub fn live_test(_attrs: TokenStream, input: TokenStream) -> TokenStream {
#func_ident_string
).await.expect("setting up LiveTestContext");
#func_ident(&ctx).await;
ctx.cleanup_successful();
ctx.cleanup_successful().await;
}
};
let mut sig = input_func.sig.clone();
Expand Down
8 changes: 4 additions & 4 deletions live-tests/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ impl LiveTestContext {

/// Clean up this `LiveTestContext`
///
/// This mainly removes log files created by the test. We do this in this
/// explicit cleanup function rather than on `Drop` because we want the log
/// files preserved on test failure.
pub fn cleanup_successful(self) {
/// This removes log files and cleans up the [`DataStore`], which
/// but be terminated asynchronously.
pub async fn cleanup_successful(self) {
self.datastore.terminate().await;
self.logctx.cleanup_successful();
}

Expand Down
17 changes: 15 additions & 2 deletions nexus-config/src/nexus_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ pub struct MgdConfig {
#[derive(Clone, Debug, Deserialize, PartialEq)]
struct UnvalidatedTunables {
max_vpc_ipv4_subnet_prefix: u8,
load_timeout: Option<std::time::Duration>,
}

/// Tunable configuration parameters, intended for use in test environments or
Expand All @@ -282,6 +283,11 @@ pub struct Tunables {
/// Note that this is the maximum _prefix_ size, which sets the minimum size
/// of the subnet.
pub max_vpc_ipv4_subnet_prefix: u8,

/// How long should we attempt to loop until the schema matches?
///
/// If "None", nexus loops forever during initialization.
pub load_timeout: Option<std::time::Duration>,
}

// Convert from the unvalidated tunables, verifying each parameter as needed.
Expand All @@ -292,6 +298,7 @@ impl TryFrom<UnvalidatedTunables> for Tunables {
Tunables::validate_ipv4_prefix(unvalidated.max_vpc_ipv4_subnet_prefix)?;
Ok(Tunables {
max_vpc_ipv4_subnet_prefix: unvalidated.max_vpc_ipv4_subnet_prefix,
load_timeout: unvalidated.load_timeout,
})
}
}
Expand Down Expand Up @@ -341,7 +348,10 @@ pub const MAX_VPC_IPV4_SUBNET_PREFIX: u8 = 26;

impl Default for Tunables {
fn default() -> Self {
Tunables { max_vpc_ipv4_subnet_prefix: MAX_VPC_IPV4_SUBNET_PREFIX }
Tunables {
max_vpc_ipv4_subnet_prefix: MAX_VPC_IPV4_SUBNET_PREFIX,
load_timeout: None,
}
}
}

Expand Down Expand Up @@ -1003,7 +1013,10 @@ mod test {
trusted_root: Utf8PathBuf::from("/path/to/root.json"),
}),
schema: None,
tunables: Tunables { max_vpc_ipv4_subnet_prefix: 27 },
tunables: Tunables {
max_vpc_ipv4_subnet_prefix: 27,
load_timeout: None
},
dendrite: HashMap::from([(
SwitchLocation::Switch0,
DpdConfig {
Expand Down
95 changes: 39 additions & 56 deletions nexus/db-queries/src/db/collection_attach.rs
Original file line number Diff line number Diff line change
Expand Up @@ -577,15 +577,15 @@ where
#[cfg(test)]
mod test {
use super::*;
use crate::db::{self, identity::Resource as IdentityResource};
use crate::db::datastore::pub_test_utils::TestDatabase;
use crate::db::identity::Resource as IdentityResource;
use async_bb8_diesel::{AsyncRunQueryDsl, AsyncSimpleConnection};
use chrono::Utc;
use db_macros::Resource;
use diesel::expression_methods::ExpressionMethods;
use diesel::pg::Pg;
use diesel::QueryDsl;
use diesel::SelectableHelper;
use nexus_test_utils::db::test_setup_database;
use omicron_common::api::external::{IdentityMetadataCreateParams, Name};
use omicron_test_utils::dev;
use uuid::Uuid;
Expand Down Expand Up @@ -869,11 +869,9 @@ mod test {
async fn test_attach_missing_collection_fails() {
let logctx =
dev::test_setup_log("test_attach_missing_collection_fails");
let mut db = test_setup_database(&logctx.log).await;
let cfg = db::Config { url: db.pg_config().clone() };
let pool = db::Pool::new_single_host(&logctx.log, &cfg);

let conn = setup_db(&pool).await;
let db = TestDatabase::new_with_pool(&logctx.log).await;
let pool = db.pool();
let conn = setup_db(pool).await;

let collection_id = uuid::Uuid::new_v4();
let resource_id = uuid::Uuid::new_v4();
Expand All @@ -891,16 +889,15 @@ mod test {

assert!(matches!(attach, Err(AttachError::CollectionNotFound)));

db.cleanup().await.unwrap();
db.terminate().await;
logctx.cleanup_successful();
}

#[tokio::test]
async fn test_attach_missing_resource_fails() {
let logctx = dev::test_setup_log("test_attach_missing_resource_fails");
let mut db = test_setup_database(&logctx.log).await;
let cfg = db::Config { url: db.pg_config().clone() };
let pool = db::Pool::new_single_host(&logctx.log, &cfg);
let db = TestDatabase::new_with_pool(&logctx.log).await;
let pool = db.pool();

let conn = setup_db(&pool).await;

Expand Down Expand Up @@ -928,16 +925,15 @@ mod test {
// The collection should remain unchanged.
assert_eq!(collection, get_collection(collection_id, &conn).await);

db.cleanup().await.unwrap();
db.terminate().await;
logctx.cleanup_successful();
}

#[tokio::test]
async fn test_attach_once() {
let logctx = dev::test_setup_log("test_attach_once");
let mut db = test_setup_database(&logctx.log).await;
let cfg = db::Config { url: db.pg_config().clone() };
let pool = db::Pool::new_single_host(&logctx.log, &cfg);
let db = TestDatabase::new_with_pool(&logctx.log).await;
let pool = db.pool();

let conn = setup_db(&pool).await;

Expand Down Expand Up @@ -976,16 +972,15 @@ mod test {
);
assert_eq!(returned_resource, get_resource(resource_id, &conn).await);

db.cleanup().await.unwrap();
db.terminate().await;
logctx.cleanup_successful();
}

#[tokio::test]
async fn test_attach_once_synchronous() {
let logctx = dev::test_setup_log("test_attach_once_synchronous");
let mut db = test_setup_database(&logctx.log).await;
let cfg = db::Config { url: db.pg_config().clone() };
let pool = db::Pool::new_single_host(&logctx.log, &cfg);
let db = TestDatabase::new_with_pool(&logctx.log).await;
let pool = db.pool();

let conn = setup_db(&pool).await;

Expand Down Expand Up @@ -1025,18 +1020,16 @@ mod test {
);
assert_eq!(returned_resource, get_resource(resource_id, &conn).await);

db.cleanup().await.unwrap();
db.terminate().await;
logctx.cleanup_successful();
}

#[tokio::test]
async fn test_attach_multiple_times() {
let logctx = dev::test_setup_log("test_attach_multiple_times");
let mut db = test_setup_database(&logctx.log).await;
let cfg = db::Config { url: db.pg_config().clone() };
let pool = db::Pool::new_single_host(&logctx.log, &cfg);

let conn = setup_db(&pool).await;
let db = TestDatabase::new_with_pool(&logctx.log).await;
let pool = db.pool();
let conn = setup_db(pool).await;

const RESOURCE_COUNT: u32 = 5;

Expand Down Expand Up @@ -1081,18 +1074,16 @@ mod test {
);
}

db.cleanup().await.unwrap();
db.terminate().await;
logctx.cleanup_successful();
}

#[tokio::test]
async fn test_attach_beyond_capacity_fails() {
let logctx = dev::test_setup_log("test_attach_beyond_capacity_fails");
let mut db = test_setup_database(&logctx.log).await;
let cfg = db::Config { url: db.pg_config().clone() };
let pool = db::Pool::new_single_host(&logctx.log, &cfg);

let conn = setup_db(&pool).await;
let db = TestDatabase::new_with_pool(&logctx.log).await;
let pool = db.pool();
let conn = setup_db(pool).await;

let collection_id = uuid::Uuid::new_v4();

Expand Down Expand Up @@ -1145,18 +1136,16 @@ mod test {
_ => panic!("Unexpected error: {:?}", err),
};

db.cleanup().await.unwrap();
db.terminate().await;
logctx.cleanup_successful();
}

#[tokio::test]
async fn test_attach_while_already_attached() {
let logctx = dev::test_setup_log("test_attach_while_already_attached");
let mut db = test_setup_database(&logctx.log).await;
let cfg = db::Config { url: db.pg_config().clone() };
let pool = db::Pool::new_single_host(&logctx.log, &cfg);

let conn = setup_db(&pool).await;
let db = TestDatabase::new_with_pool(&logctx.log).await;
let pool = db.pool();
let conn = setup_db(pool).await;

let collection_id = uuid::Uuid::new_v4();

Expand Down Expand Up @@ -1252,18 +1241,16 @@ mod test {
_ => panic!("Unexpected error: {:?}", err),
};

db.cleanup().await.unwrap();
db.terminate().await;
logctx.cleanup_successful();
}

#[tokio::test]
async fn test_attach_with_filters() {
let logctx = dev::test_setup_log("test_attach_once");
let mut db = test_setup_database(&logctx.log).await;
let cfg = db::Config { url: db.pg_config().clone() };
let pool = db::Pool::new_single_host(&logctx.log, &cfg);

let conn = setup_db(&pool).await;
let db = TestDatabase::new_with_pool(&logctx.log).await;
let pool = db.pool();
let conn = setup_db(pool).await;

let collection_id = uuid::Uuid::new_v4();
let resource_id = uuid::Uuid::new_v4();
Expand Down Expand Up @@ -1307,18 +1294,16 @@ mod test {
assert_eq!(returned_resource, get_resource(resource_id, &conn).await);
assert_eq!(returned_resource.description(), "new description");

db.cleanup().await.unwrap();
db.terminate().await;
logctx.cleanup_successful();
}

#[tokio::test]
async fn test_attach_deleted_resource_fails() {
let logctx = dev::test_setup_log("test_attach_deleted_resource_fails");
let mut db = test_setup_database(&logctx.log).await;
let cfg = db::Config { url: db.pg_config().clone() };
let pool = db::Pool::new_single_host(&logctx.log, &cfg);

let conn = setup_db(&pool).await;
let db = TestDatabase::new_with_pool(&logctx.log).await;
let pool = db.pool();
let conn = setup_db(pool).await;

let collection_id = uuid::Uuid::new_v4();
let resource_id = uuid::Uuid::new_v4();
Expand Down Expand Up @@ -1352,18 +1337,16 @@ mod test {
.await;
assert!(matches!(attach, Err(AttachError::ResourceNotFound)));

db.cleanup().await.unwrap();
db.terminate().await;
logctx.cleanup_successful();
}

#[tokio::test]
async fn test_attach_without_update_filter() {
let logctx = dev::test_setup_log("test_attach_without_update_filter");
let mut db = test_setup_database(&logctx.log).await;
let cfg = db::Config { url: db.pg_config().clone() };
let pool = db::Pool::new_single_host(&logctx.log, &cfg);

let conn = setup_db(&pool).await;
let db = TestDatabase::new_with_pool(&logctx.log).await;
let pool = db.pool();
let conn = setup_db(pool).await;

let collection_id = uuid::Uuid::new_v4();

Expand Down Expand Up @@ -1408,7 +1391,7 @@ mod test {
.collection_id
.is_none());

db.cleanup().await.unwrap();
db.terminate().await;
logctx.cleanup_successful();
}
}
Loading

0 comments on commit 14edbf3

Please sign in to comment.