From 2e4287b4656eb649db4dc8ad21915d5f673f52c6 Mon Sep 17 00:00:00 2001 From: John Gallagher Date: Tue, 27 Feb 2024 16:27:37 -0500 Subject: [PATCH] Blueprint execution: Add dataset records for Crucible zones (#5143) On each invocation, for every Crucible zone present in the blueprint (all of which have already successfully been sent to sled-agent by this point), we attempt to insert a `dataset` record for that zone, but only if a record does not already exist. The datasets themselves are created by sled-agent when we send the zone configs. Fixes #5118. --- Cargo.lock | 1 + nexus/blueprint-execution/Cargo.toml | 1 + nexus/blueprint-execution/src/datasets.rs | 315 +++++++++++++++++++ nexus/blueprint-execution/src/lib.rs | 13 +- nexus/db-model/src/dataset_kind.rs | 2 +- nexus/db-queries/src/db/collection_insert.rs | 21 ++ nexus/db-queries/src/db/datastore/dataset.rs | 265 +++++++++++++++- 7 files changed, 614 insertions(+), 4 deletions(-) create mode 100644 nexus/blueprint-execution/src/datasets.rs diff --git a/Cargo.lock b/Cargo.lock index 85b7e5a186..27b5bbd206 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4271,6 +4271,7 @@ dependencies = [ "dns-service-client", "futures", "httptest", + "illumos-utils", "internal-dns", "ipnet", "nexus-db-model", diff --git a/nexus/blueprint-execution/Cargo.toml b/nexus/blueprint-execution/Cargo.toml index 3284bda27e..164559b468 100644 --- a/nexus/blueprint-execution/Cargo.toml +++ b/nexus/blueprint-execution/Cargo.toml @@ -10,6 +10,7 @@ omicron-rpaths.workspace = true anyhow.workspace = true dns-service-client.workspace = true futures.workspace = true +illumos-utils.workspace = true internal-dns.workspace = true nexus-db-model.workspace = true nexus-db-queries.workspace = true diff --git a/nexus/blueprint-execution/src/datasets.rs b/nexus/blueprint-execution/src/datasets.rs new file mode 100644 index 0000000000..97d8324fdb --- /dev/null +++ b/nexus/blueprint-execution/src/datasets.rs @@ -0,0 +1,315 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Ensures dataset records required by a given blueprint + +use anyhow::Context; +use illumos_utils::zpool::ZpoolName; +use nexus_db_model::Dataset; +use nexus_db_model::DatasetKind; +use nexus_db_queries::context::OpContext; +use nexus_db_queries::db::DataStore; +use nexus_types::deployment::OmicronZoneConfig; +use nexus_types::deployment::OmicronZoneType; +use nexus_types::identity::Asset; +use slog::info; +use slog::warn; +use slog_error_chain::InlineErrorChain; +use std::collections::BTreeSet; +use std::net::SocketAddrV6; + +/// For each crucible zone in `blueprint`, ensure that a corresponding dataset +/// record exists in `datastore` +/// +/// Does not modify any existing dataset records. Returns the number of datasets +/// inserted. +pub(crate) async fn ensure_crucible_dataset_records_exist( + opctx: &OpContext, + datastore: &DataStore, + all_omicron_zones: impl Iterator, +) -> anyhow::Result { + // Before attempting to insert any datasets, first query for any existing + // dataset records so we can filter them out. This looks like a typical + // TOCTOU issue, but it is purely a performance optimization. We expect + // almost all executions of this function to do nothing: new crucible + // datasets are created very rarely relative to how frequently blueprint + // realization happens. We could remove this check and filter and instead + // run the below "insert if not exists" query on every crucible zone, and + // the behavior would still be correct. However, that would issue far more + // queries than necessary in the very common case of "we don't need to do + // anything at all". + let mut crucible_datasets = datastore + .dataset_list_all_batched(opctx, Some(DatasetKind::Crucible)) + .await + .context("failed to list all datasets")? + .into_iter() + .map(|dataset| dataset.id()) + .collect::>(); + + let mut num_inserted = 0; + let mut num_already_exist = 0; + + for zone in all_omicron_zones { + let OmicronZoneType::Crucible { address, dataset } = &zone.zone_type + else { + continue; + }; + + let id = zone.id; + + // If already present in the datastore, move on. + if crucible_datasets.remove(&id) { + num_already_exist += 1; + continue; + } + + // Map progenitor client strings into the types we need. We never + // expect these to fail. + let addr: SocketAddrV6 = match address.parse() { + Ok(addr) => addr, + Err(err) => { + warn!( + opctx.log, "failed to parse crucible zone address"; + "address" => address, + "err" => InlineErrorChain::new(&err), + ); + continue; + } + }; + let zpool_name: ZpoolName = match dataset.pool_name.parse() { + Ok(name) => name, + Err(err) => { + warn!( + opctx.log, "failed to parse crucible zone pool name"; + "pool_name" => &*dataset.pool_name, + "err" => err, + ); + continue; + } + }; + + let pool_id = zpool_name.id(); + let dataset = Dataset::new(id, pool_id, addr, DatasetKind::Crucible); + let maybe_inserted = datastore + .dataset_insert_if_not_exists(dataset) + .await + .with_context(|| { + format!("failed to insert dataset record for dataset {id}") + })?; + + // If we succeeded in inserting, log it; if `maybe_dataset` is `None`, + // we must have lost the TOCTOU race described above, and another Nexus + // must have inserted this dataset before we could. + if maybe_inserted.is_some() { + info!( + opctx.log, + "inserted new dataset for crucible zone"; + "id" => %id, + ); + num_inserted += 1; + } else { + num_already_exist += 1; + } + } + + // We don't currently support removing datasets, so this would be + // surprising: the database contains dataset records that are no longer in + // our blueprint. We can't do anything about this, so just warn. + if !crucible_datasets.is_empty() { + warn!( + opctx.log, + "database contains {} unexpected crucible datasets", + crucible_datasets.len(); + "dataset_ids" => ?crucible_datasets, + ); + } + + info!( + opctx.log, + "ensured all crucible zones have dataset records"; + "num_inserted" => num_inserted, + "num_already_existed" => num_already_exist, + ); + + Ok(num_inserted) +} + +#[cfg(test)] +mod tests { + use super::*; + use nexus_db_model::SledBaseboard; + use nexus_db_model::SledSystemHardware; + use nexus_db_model::SledUpdate; + use nexus_db_model::Zpool; + use nexus_test_utils_macros::nexus_test; + use sled_agent_client::types::OmicronZoneDataset; + use uuid::Uuid; + + type ControlPlaneTestContext = + nexus_test_utils::ControlPlaneTestContext; + + #[nexus_test] + async fn test_ensure_crucible_dataset_records_exist( + cptestctx: &ControlPlaneTestContext, + ) { + // Set up. + let nexus = &cptestctx.server.apictx().nexus; + let datastore = nexus.datastore(); + let opctx = OpContext::for_tests( + cptestctx.logctx.log.clone(), + datastore.clone(), + ); + let opctx = &opctx; + + // Use the standard representative inventory collection. + let representative = nexus_inventory::examples::representative(); + let collection = representative.builder.build(); + + // Record the sleds and zpools contained in this collection. + let rack_id = Uuid::new_v4(); + for (&sled_id, config) in &collection.omicron_zones { + let sled = SledUpdate::new( + sled_id, + "[::1]:0".parse().unwrap(), + SledBaseboard { + serial_number: format!("test-{sled_id}"), + part_number: "test-sled".to_string(), + revision: 0, + }, + SledSystemHardware { + is_scrimlet: false, + usable_hardware_threads: 128, + usable_physical_ram: (64 << 30).try_into().unwrap(), + reservoir_size: (16 << 30).try_into().unwrap(), + }, + rack_id, + ); + datastore + .sled_upsert(sled) + .await + .expect("failed to upsert sled") + .unwrap(); + + for zone in &config.zones.zones { + let OmicronZoneType::Crucible { dataset, .. } = &zone.zone_type + else { + continue; + }; + let zpool_name: ZpoolName = + dataset.pool_name.parse().expect("invalid zpool name"); + let zpool = Zpool::new( + zpool_name.id(), + sled_id, + Uuid::new_v4(), // physical_disk_id + (1 << 30).try_into().unwrap(), // total_size + ); + datastore + .zpool_upsert(zpool) + .await + .expect("failed to upsert zpool"); + } + } + + // How many crucible zones are there? + let ncrucible_zones = collection + .all_omicron_zones() + .filter(|z| matches!(z.zone_type, OmicronZoneType::Crucible { .. })) + .count(); + + // Prior to ensuring datasets exist, there should be none. + assert_eq!( + datastore + .dataset_list_all_batched(opctx, Some(DatasetKind::Crucible)) + .await + .unwrap() + .len(), + 0 + ); + let ndatasets_inserted = ensure_crucible_dataset_records_exist( + opctx, + datastore, + collection.all_omicron_zones(), + ) + .await + .expect("failed to ensure crucible datasets"); + + // We should have inserted a dataset for each crucible zone. + assert_eq!(ncrucible_zones, ndatasets_inserted); + assert_eq!( + datastore + .dataset_list_all_batched(opctx, Some(DatasetKind::Crucible)) + .await + .unwrap() + .len(), + ncrucible_zones, + ); + + // Ensuring the same crucible datasets again should insert no new + // records. + let ndatasets_inserted = ensure_crucible_dataset_records_exist( + opctx, + datastore, + collection.all_omicron_zones(), + ) + .await + .expect("failed to ensure crucible datasets"); + assert_eq!(0, ndatasets_inserted); + assert_eq!( + datastore + .dataset_list_all_batched(opctx, Some(DatasetKind::Crucible)) + .await + .unwrap() + .len(), + ncrucible_zones, + ); + + // Create another zpool on one of the sleds, so we can add a new + // crucible zone that uses it. + let new_zpool_id = Uuid::new_v4(); + for &sled_id in collection.omicron_zones.keys().take(1) { + let zpool = Zpool::new( + new_zpool_id, + sled_id, + Uuid::new_v4(), // physical_disk_id + (1 << 30).try_into().unwrap(), // total_size + ); + datastore + .zpool_upsert(zpool) + .await + .expect("failed to upsert zpool"); + } + + // Call `ensure_crucible_dataset_records_exist` again, adding a new + // crucible zone. It should insert only this new zone. + let new_zone = OmicronZoneConfig { + id: Uuid::new_v4(), + underlay_address: "::1".parse().unwrap(), + zone_type: OmicronZoneType::Crucible { + address: "[::1]:0".to_string(), + dataset: OmicronZoneDataset { + pool_name: ZpoolName::new_external(new_zpool_id) + .to_string() + .parse() + .unwrap(), + }, + }, + }; + let ndatasets_inserted = ensure_crucible_dataset_records_exist( + opctx, + datastore, + collection.all_omicron_zones().chain(std::iter::once(&new_zone)), + ) + .await + .expect("failed to ensure crucible datasets"); + assert_eq!(ndatasets_inserted, 1); + assert_eq!( + datastore + .dataset_list_all_batched(opctx, Some(DatasetKind::Crucible)) + .await + .unwrap() + .len(), + ncrucible_zones + 1, + ); + } +} diff --git a/nexus/blueprint-execution/src/lib.rs b/nexus/blueprint-execution/src/lib.rs index d6f5f8fc31..531c4f57a8 100644 --- a/nexus/blueprint-execution/src/lib.rs +++ b/nexus/blueprint-execution/src/lib.rs @@ -19,6 +19,7 @@ use std::collections::BTreeMap; use std::net::SocketAddrV6; use uuid::Uuid; +mod datasets; mod dns; mod omicron_zones; mod resource_allocation; @@ -89,6 +90,14 @@ where omicron_zones::deploy_zones(&opctx, &sleds_by_id, &blueprint.omicron_zones) .await?; + datasets::ensure_crucible_dataset_records_exist( + &opctx, + datastore, + blueprint.all_omicron_zones().map(|(_sled_id, zone)| zone), + ) + .await + .map_err(|err| vec![err])?; + dns::deploy_dns( &opctx, datastore, @@ -97,5 +106,7 @@ where &sleds_by_id, ) .await - .map_err(|e| vec![anyhow!("{}", InlineErrorChain::new(&e))]) + .map_err(|e| vec![anyhow!("{}", InlineErrorChain::new(&e))])?; + + Ok(()) } diff --git a/nexus/db-model/src/dataset_kind.rs b/nexus/db-model/src/dataset_kind.rs index 00317592e8..86495b9d61 100644 --- a/nexus/db-model/src/dataset_kind.rs +++ b/nexus/db-model/src/dataset_kind.rs @@ -11,7 +11,7 @@ impl_enum_type!( #[diesel(postgres_type(name = "dataset_kind", schema = "public"))] pub struct DatasetKindEnum; - #[derive(Clone, Debug, AsExpression, FromSqlRow, Serialize, Deserialize, PartialEq)] + #[derive(Clone, Copy, Debug, AsExpression, FromSqlRow, Serialize, Deserialize, PartialEq)] #[diesel(sql_type = DatasetKindEnum)] pub enum DatasetKind; diff --git a/nexus/db-queries/src/db/collection_insert.rs b/nexus/db-queries/src/db/collection_insert.rs index b295f0574d..ef2a4a4d48 100644 --- a/nexus/db-queries/src/db/collection_insert.rs +++ b/nexus/db-queries/src/db/collection_insert.rs @@ -202,6 +202,27 @@ where .map_err(|e| Self::translate_async_error(e)) } + /// Issues the CTE asynchronously and parses the result. + /// + /// The four outcomes are: + /// - Ok(Some(new row)) + /// - Ok(None) + /// - Error(collection not found) + /// - Error(other diesel error) + pub async fn insert_and_get_optional_result_async( + self, + conn: &async_bb8_diesel::Connection, + ) -> AsyncInsertIntoCollectionResult> + where + // We require this bound to ensure that "Self" is runnable as query. + Self: query_methods::LoadQuery<'static, DbConnection, ResourceType>, + { + self.get_result_async::(conn) + .await + .optional() + .map_err(|e| Self::translate_async_error(e)) + } + /// Issues the CTE asynchronously and parses the result. /// /// The three outcomes are: diff --git a/nexus/db-queries/src/db/datastore/dataset.rs b/nexus/db-queries/src/db/datastore/dataset.rs index 0b26789e8f..3c1fd0afb1 100644 --- a/nexus/db-queries/src/db/datastore/dataset.rs +++ b/nexus/db-queries/src/db/datastore/dataset.rs @@ -5,6 +5,9 @@ //! [`DataStore`] methods on [`Dataset`]s. use super::DataStore; +use super::SQL_BATCH_SIZE; +use crate::authz; +use crate::context::OpContext; use crate::db; use crate::db::collection_insert::AsyncInsertError; use crate::db::collection_insert::DatastoreCollection; @@ -13,12 +16,17 @@ use crate::db::error::ErrorHandler; use crate::db::identity::Asset; use crate::db::model::Dataset; use crate::db::model::Zpool; +use crate::db::pagination::paginated; +use crate::db::pagination::Paginator; use async_bb8_diesel::AsyncRunQueryDsl; use chrono::Utc; use diesel::prelude::*; use diesel::upsert::excluded; +use nexus_db_model::DatasetKind; use omicron_common::api::external::CreateResult; +use omicron_common::api::external::DataPageParams; use omicron_common::api::external::Error; +use omicron_common::api::external::ListResultVec; use omicron_common::api::external::LookupResult; use omicron_common::api::external::LookupType; use omicron_common::api::external::ResourceType; @@ -45,11 +53,12 @@ impl DataStore { ) -> CreateResult { use db::schema::dataset::dsl; + let dataset_id = dataset.id(); let zpool_id = dataset.pool_id; Zpool::insert_resource( zpool_id, diesel::insert_into(dsl::dataset) - .values(dataset.clone()) + .values(dataset) .on_conflict(dsl::id) .do_update() .set(( @@ -73,9 +82,261 @@ impl DataStore { e, ErrorHandler::Conflict( ResourceType::Dataset, - &dataset.id().to_string(), + &dataset_id.to_string(), ), ), }) } + + /// Stores a new dataset in the database, but only if a dataset with the + /// given `id` does not already exist + /// + /// Does not update existing rows. If a dataset with the given ID already + /// exists, returns `Ok(None)`. + pub async fn dataset_insert_if_not_exists( + &self, + dataset: Dataset, + ) -> CreateResult> { + use db::schema::dataset::dsl; + + let zpool_id = dataset.pool_id; + Zpool::insert_resource( + zpool_id, + diesel::insert_into(dsl::dataset) + .values(dataset) + .on_conflict(dsl::id) + .do_nothing(), + ) + .insert_and_get_optional_result_async( + &*self.pool_connection_unauthorized().await?, + ) + .await + .map_err(|e| match e { + AsyncInsertError::CollectionNotFound => Error::ObjectNotFound { + type_name: ResourceType::Zpool, + lookup_type: LookupType::ById(zpool_id), + }, + AsyncInsertError::DatabaseError(e) => { + public_error_from_diesel(e, ErrorHandler::Server) + } + }) + } + + /// List one page of datasets + /// + /// If `filter_kind` is `Some(value)`, only datasets with a `kind` matching + /// `value` will be returned. If `filter_kind` is `None`, all datasets will + /// be returned. + async fn dataset_list( + &self, + opctx: &OpContext, + filter_kind: Option, + pagparams: &DataPageParams<'_, Uuid>, + ) -> ListResultVec { + opctx.authorize(authz::Action::ListChildren, &authz::FLEET).await?; + use db::schema::dataset::dsl; + + let mut query = paginated(dsl::dataset, dsl::id, pagparams) + .filter(dsl::time_deleted.is_null()); + + if let Some(kind) = filter_kind { + query = query.filter(dsl::kind.eq(kind)); + } + + query + .select(Dataset::as_select()) + .load_async(&*self.pool_connection_authorized(opctx).await?) + .await + .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) + } + + /// List all datasets, making as many queries as needed to get them all + /// + /// If `filter_kind` is `Some(value)`, only datasets with a `kind` matching + /// `value` will be returned. If `filter_kind` is `None`, all datasets will + /// be returned. + /// + /// This should generally not be used in API handlers or other + /// latency-sensitive contexts, but it can make sense in saga actions or + /// background tasks. + pub async fn dataset_list_all_batched( + &self, + opctx: &OpContext, + filter_kind: Option, + ) -> ListResultVec { + opctx.authorize(authz::Action::ListChildren, &authz::FLEET).await?; + opctx.check_complex_operations_allowed()?; + + let mut all_datasets = Vec::new(); + let mut paginator = Paginator::new(SQL_BATCH_SIZE); + while let Some(p) = paginator.next() { + let batch = self + .dataset_list(opctx, filter_kind, &p.current_pagparams()) + .await?; + paginator = + p.found_batch(&batch, &|d: &nexus_db_model::Dataset| d.id()); + all_datasets.extend(batch); + } + + Ok(all_datasets) + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::db::datastore::test_utils::datastore_test; + use nexus_db_model::SledBaseboard; + use nexus_db_model::SledSystemHardware; + use nexus_db_model::SledUpdate; + use nexus_test_utils::db::test_setup_database; + use omicron_test_utils::dev; + + #[tokio::test] + async fn test_insert_if_not_exists() { + let logctx = dev::test_setup_log("inventory_insert"); + let mut db = test_setup_database(&logctx.log).await; + let (opctx, datastore) = datastore_test(&logctx, &db).await; + let opctx = &opctx; + + // There should be no datasets initially. + assert_eq!( + datastore.dataset_list_all_batched(opctx, None).await.unwrap(), + [] + ); + + // Create a fake sled that holds our fake zpool. + let sled_id = Uuid::new_v4(); + let sled = SledUpdate::new( + sled_id, + "[::1]:0".parse().unwrap(), + SledBaseboard { + serial_number: "test-sn".to_string(), + part_number: "test-pn".to_string(), + revision: 0, + }, + SledSystemHardware { + is_scrimlet: false, + usable_hardware_threads: 128, + usable_physical_ram: (64 << 30).try_into().unwrap(), + reservoir_size: (16 << 30).try_into().unwrap(), + }, + Uuid::new_v4(), + ); + datastore + .sled_upsert(sled) + .await + .expect("failed to upsert sled") + .unwrap(); + + // Create a fake zpool that backs our fake datasets. + let zpool_id = Uuid::new_v4(); + let zpool = Zpool::new( + zpool_id, + sled_id, + Uuid::new_v4(), + (1 << 30).try_into().unwrap(), + ); + datastore.zpool_upsert(zpool).await.expect("failed to upsert zpool"); + + // Inserting a new dataset should succeed. + let dataset1 = datastore + .dataset_insert_if_not_exists(Dataset::new( + Uuid::new_v4(), + zpool_id, + "[::1]:0".parse().unwrap(), + DatasetKind::Crucible, + )) + .await + .expect("failed to insert dataset") + .expect("insert found unexpected existing dataset"); + let mut expected_datasets = vec![dataset1.clone()]; + assert_eq!( + datastore.dataset_list_all_batched(opctx, None).await.unwrap(), + expected_datasets, + ); + assert_eq!( + datastore + .dataset_list_all_batched(opctx, Some(DatasetKind::Crucible)) + .await + .unwrap(), + expected_datasets, + ); + assert_eq!( + datastore + .dataset_list_all_batched(opctx, Some(DatasetKind::Cockroach)) + .await + .unwrap(), + [], + ); + + // Attempting to insert another dataset with the same ID should succeed + // without updating the existing record. We'll check this by passing a + // different socket address and kind. + let insert_again_result = datastore + .dataset_insert_if_not_exists(Dataset::new( + dataset1.id(), + zpool_id, + "[::1]:12345".parse().unwrap(), + DatasetKind::Cockroach, + )) + .await + .expect("failed to do-nothing insert dataset"); + assert_eq!(insert_again_result, None); + assert_eq!( + datastore.dataset_list_all_batched(opctx, None).await.unwrap(), + expected_datasets, + ); + + // We can can also upsert a different dataset... + let dataset2 = datastore + .dataset_upsert(Dataset::new( + Uuid::new_v4(), + zpool_id, + "[::1]:0".parse().unwrap(), + DatasetKind::Cockroach, + )) + .await + .expect("failed to upsert dataset"); + expected_datasets.push(dataset2.clone()); + expected_datasets.sort_by_key(|d| d.id()); + assert_eq!( + datastore.dataset_list_all_batched(opctx, None).await.unwrap(), + expected_datasets, + ); + assert_eq!( + datastore + .dataset_list_all_batched(opctx, Some(DatasetKind::Crucible)) + .await + .unwrap(), + [dataset1.clone()], + ); + assert_eq!( + datastore + .dataset_list_all_batched(opctx, Some(DatasetKind::Cockroach)) + .await + .unwrap(), + [dataset2.clone()], + ); + + // ... and trying to `insert_if_not_exists` should similarly return + // `None`. + let insert_again_result = datastore + .dataset_insert_if_not_exists(Dataset::new( + dataset1.id(), + zpool_id, + "[::1]:12345".parse().unwrap(), + DatasetKind::Cockroach, + )) + .await + .expect("failed to do-nothing insert dataset"); + assert_eq!(insert_again_result, None); + assert_eq!( + datastore.dataset_list_all_batched(opctx, None).await.unwrap(), + expected_datasets, + ); + + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } }