From 91ee5d1d26c7369623dc42f7e146981b04559dbd Mon Sep 17 00:00:00 2001 From: Adam Hendel Date: Mon, 13 Mar 2023 09:39:03 -0500 Subject: [PATCH] coredb-operator: add extensions to status (#157) --- .github/workflows/operator.yml | 2 +- coredb-operator/src/controller.rs | 19 ++- coredb-operator/src/extensions.rs | 154 ++++++++++++++++++++- coredb-operator/src/fixtures.rs | 7 +- coredb-operator/src/statefulset.rs | 2 +- coredb-operator/tests/integration_tests.rs | 8 +- coredb-operator/yaml/crd.yaml | 29 ++++ 7 files changed, 209 insertions(+), 12 deletions(-) diff --git a/.github/workflows/operator.yml b/.github/workflows/operator.yml index 2ca3042d2..045c8cd51 100644 --- a/.github/workflows/operator.yml +++ b/.github/workflows/operator.yml @@ -16,7 +16,7 @@ on: push: branches: - main - - 'release/[0-9]+.[0-9]+' + - 'release/**' jobs: lint: diff --git a/coredb-operator/src/controller.rs b/coredb-operator/src/controller.rs index 4580fa905..55bb59e8c 100644 --- a/coredb-operator/src/controller.rs +++ b/coredb-operator/src/controller.rs @@ -23,7 +23,8 @@ use kube::{ }; use crate::{ - extensions::manage_extensions, postgres_exporter_role::create_postgres_exporter_role, + extensions::{get_all_extensions, manage_extensions}, + postgres_exporter_role::create_postgres_exporter_role, secret::reconcile_secret, }; use k8s_openapi::{ @@ -73,6 +74,7 @@ pub struct CoreDBSpec { #[derive(Deserialize, Serialize, Clone, Default, Debug, JsonSchema)] pub struct CoreDBStatus { pub running: bool, + pub extensions: Option>, #[serde(default = "defaults::default_storage")] pub storage: Quantity, } @@ -201,20 +203,31 @@ impl CoreDB { return Ok(Action::requeue(Duration::from_secs(1))); } + let mut extensions: Vec = + get_all_extensions(self, ctx.clone()).await.unwrap_or_else(|_| { + panic!( + "Error getting extensions on CoreDB {}", + self.metadata.name.clone().unwrap() + ) + }); + + // TODO(chuckhend) - reconcile extensions before create/drop in manage_extensions manage_extensions(self, ctx.clone()).await.unwrap_or_else(|_| { panic!( "Error updating extensions on CoreDB {}", self.metadata.name.clone().unwrap() ) }); - + // must be sorted same, else reconcile will trigger again + extensions.sort_by_key(|e| e.name.clone()); // always overwrite status object with what we saw let new_status = Patch::Apply(json!({ "apiVersion": "coredb.io/v1alpha1", "kind": "CoreDB", "status": CoreDBStatus { running: true, - storage: self.spec.storage.clone() + storage: self.spec.storage.clone(), + extensions: Some(extensions), } })); let ps = PatchParams::apply("cntrlr").force(); diff --git a/coredb-operator/src/extensions.rs b/coredb-operator/src/extensions.rs index 2ba301567..ef127dabc 100644 --- a/coredb-operator/src/extensions.rs +++ b/coredb-operator/src/extensions.rs @@ -1,8 +1,69 @@ -use crate::{Context, CoreDB, Error}; +use crate::{ + controller::{Extension, ExtensionInstallLocation}, + Context, CoreDB, Error, +}; use regex::Regex; -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc}; use tracing::{debug, info, warn}; +#[derive(Debug)] +pub struct ExtRow { + pub name: String, + pub version: String, + pub enabled: bool, + pub schema: String, +} + +const LIST_DATABASES_QUERY: &str = r#"SELECT datname FROM pg_database WHERE datistemplate = false;"#; +const LIST_EXTENSIONS_QUERY: &str = r#"select +distinct on +(name) * +from +( +select + * +from + ( + select + t0.extname as name, + t0.extversion as version, + true as enabled, + t1.nspname as schema + from + ( + select + extnamespace, + extname, + extversion + from + pg_extension + ) t0, + ( + select + oid, + nspname + from + pg_namespace + ) t1 + where + t1.oid = t0.extnamespace +) installed +union +select + name, + default_version as version, + false as enabled, + 'public' as schema +from + pg_catalog.pg_available_extensions +order by + enabled asc +) combined +order by +name asc, +enabled desc +"#; + pub async fn manage_extensions(cdb: &CoreDB, ctx: Arc) -> Result<(), Error> { let client = ctx.client.clone(); let extensions = &cdb.spec.extensions; @@ -66,3 +127,92 @@ pub async fn manage_extensions(cdb: &CoreDB, ctx: Arc) -> Result<(), Er } Ok(()) } + +pub async fn list_databases(cdb: &CoreDB, ctx: Arc) -> Result, Error> { + let client = ctx.client.clone(); + let psql_out = cdb + .psql( + LIST_DATABASES_QUERY.to_owned(), + "postgres".to_owned(), + client.clone(), + ) + .await + .unwrap(); + let result_string = psql_out.stdout.unwrap(); + let mut databases = vec![]; + for line in result_string.lines().skip(2) { + let fields: Vec<&str> = line.split('|').map(|s| s.trim()).collect(); + if fields.is_empty() { + debug!("Done:{:?}", fields); + continue; + } + databases.push(fields[0].to_string()); + } + let num_databases = databases.len(); + info!("Found {} databases", num_databases); + Ok(databases) +} + +pub async fn list_extensions(cdb: &CoreDB, ctx: Arc, database: &str) -> Result, Error> { + let client = ctx.client.clone(); + let psql_out = cdb + .psql( + LIST_EXTENSIONS_QUERY.to_owned(), + database.to_owned(), + client.clone(), + ) + .await + .unwrap(); + let result_string = psql_out.stdout.unwrap(); + let mut extensions = vec![]; + for line in result_string.lines().skip(2) { + let fields: Vec<&str> = line.split('|').map(|s| s.trim()).collect(); + if fields.len() < 4 { + debug!("Done:{:?}", fields); + continue; + } + let package = ExtRow { + name: fields[0].to_owned(), + version: fields[1].to_owned(), + enabled: fields[2] == "t", + schema: fields[3].to_owned(), + }; + extensions.push(package); + } + let num_extensions = extensions.len(); + info!("Found {} extensions", num_extensions); + Ok(extensions) +} + +/// list databases then get all extensions from each database +pub async fn get_all_extensions(cdb: &CoreDB, ctx: Arc) -> Result, Error> { + let databases = list_databases(cdb, ctx.clone()).await?; + + let mut ext_hashmap: HashMap> = HashMap::new(); + // query every database for extensions + // transform results by extension name, rather than by database + for db in databases { + let extensions = list_extensions(cdb, ctx.clone(), &db).await?; + for ext in extensions { + let extlocation = ExtensionInstallLocation { + database: db.clone(), + version: Some(ext.version), + enabled: ext.enabled, + schema: ext.schema, + }; + ext_hashmap + .entry(ext.name) + .or_insert_with(Vec::new) + .push(extlocation); + } + } + + let mut ext_spec: Vec = Vec::new(); + for (ext_name, ext_locations) in &ext_hashmap { + ext_spec.push(Extension { + name: ext_name.clone(), + locations: ext_locations.clone(), + }); + } + Ok(ext_spec) +} diff --git a/coredb-operator/src/fixtures.rs b/coredb-operator/src/fixtures.rs index 631355a47..8b0e3194a 100644 --- a/coredb-operator/src/fixtures.rs +++ b/coredb-operator/src/fixtures.rs @@ -75,7 +75,6 @@ impl ApiServerVerifier { let coredb = coredb_.clone(); tokio::spawn(async move { pin_mut!(handle); - // After the PATCH to CoreDB, we expect a GET on Secrets let (request, send) = handle .next_request() @@ -168,9 +167,9 @@ impl ApiServerVerifier { assert_eq!( request.uri().to_string(), format!( - "/apis/coredb.io/v1alpha1/namespaces/testns/coredbs/{}/status?&force=true&fieldManager=cntrlr", - coredb.name_any() - ) + "/apis/coredb.io/v1alpha1/namespaces/testns/coredbs/{}/status?&force=true&fieldManager=cntrlr", + coredb.name_any() + ) ); let req_body = to_bytes(request.into_body()).await.unwrap(); let json: serde_json::Value = diff --git a/coredb-operator/src/statefulset.rs b/coredb-operator/src/statefulset.rs index ea1d332a0..18b36e5da 100644 --- a/coredb-operator/src/statefulset.rs +++ b/coredb-operator/src/statefulset.rs @@ -270,7 +270,7 @@ async fn update_pvc(client: Client, sts: &StatefulSet, cdb: &CoreDB) { Api::namespaced(client, &sts.clone().metadata.namespace.unwrap()); let sts_name = sts.clone().metadata.name.unwrap(); - let pvc_name = format!("data-{}-0", sts_name); + let pvc_name = format!("data-{sts_name}-0"); let mut pvc_requests: BTreeMap = BTreeMap::new(); pvc_requests.insert("storage".to_string(), cdb.spec.storage.clone()); diff --git a/coredb-operator/tests/integration_tests.rs b/coredb-operator/tests/integration_tests.rs index f68d999c0..5d547b561 100644 --- a/coredb-operator/tests/integration_tests.rs +++ b/coredb-operator/tests/integration_tests.rs @@ -313,7 +313,13 @@ mod test { println!("{}", result.stdout.clone().unwrap()); // assert does not contain postgis - assert!(!result.stdout.unwrap().contains("postgis")); + assert!(!result.stdout.clone().unwrap().contains("postgis")); + + // assert extensions made it into the status + let spec = coredbs.get(name).await.unwrap(); + let status = spec.status.unwrap(); + let extensions = status.extensions.unwrap(); + assert!(extensions.len() > 0); } #[tokio::test] diff --git a/coredb-operator/yaml/crd.yaml b/coredb-operator/yaml/crd.yaml index d608a4ded..4485077ce 100644 --- a/coredb-operator/yaml/crd.yaml +++ b/coredb-operator/yaml/crd.yaml @@ -200,6 +200,35 @@ spec: description: The status object of `CoreDB` nullable: true properties: + extensions: + items: + properties: + locations: + items: + properties: + database: + default: postrgres + type: string + enabled: + type: boolean + schema: + default: public + type: string + version: + nullable: true + type: string + required: + - enabled + type: object + type: array + name: + type: string + required: + - locations + - name + type: object + nullable: true + type: array running: type: boolean storage: