Skip to content

Commit

Permalink
coredb-operator: add extensions to status (#157)
Browse files Browse the repository at this point in the history
  • Loading branch information
ChuckHend authored Mar 13, 2023
1 parent dbefc43 commit 91ee5d1
Show file tree
Hide file tree
Showing 7 changed files with 209 additions and 12 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/operator.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ on:
push:
branches:
- main
- 'release/[0-9]+.[0-9]+'
- 'release/**'

jobs:
lint:
Expand Down
19 changes: 16 additions & 3 deletions coredb-operator/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ use kube::{
};

use crate::{
extensions::manage_extensions, postgres_exporter_role::create_postgres_exporter_role,
extensions::{get_all_extensions, manage_extensions},
postgres_exporter_role::create_postgres_exporter_role,
secret::reconcile_secret,
};
use k8s_openapi::{
Expand Down Expand Up @@ -73,6 +74,7 @@ pub struct CoreDBSpec {
#[derive(Deserialize, Serialize, Clone, Default, Debug, JsonSchema)]
pub struct CoreDBStatus {
pub running: bool,
pub extensions: Option<Vec<Extension>>,
#[serde(default = "defaults::default_storage")]
pub storage: Quantity,
}
Expand Down Expand Up @@ -201,20 +203,31 @@ impl CoreDB {
return Ok(Action::requeue(Duration::from_secs(1)));
}

let mut extensions: Vec<Extension> =
get_all_extensions(self, ctx.clone()).await.unwrap_or_else(|_| {
panic!(
"Error getting extensions on CoreDB {}",
self.metadata.name.clone().unwrap()
)
});

// TODO(chuckhend) - reconcile extensions before create/drop in manage_extensions
manage_extensions(self, ctx.clone()).await.unwrap_or_else(|_| {
panic!(
"Error updating extensions on CoreDB {}",
self.metadata.name.clone().unwrap()
)
});

// must be sorted same, else reconcile will trigger again
extensions.sort_by_key(|e| e.name.clone());
// always overwrite status object with what we saw
let new_status = Patch::Apply(json!({
"apiVersion": "coredb.io/v1alpha1",
"kind": "CoreDB",
"status": CoreDBStatus {
running: true,
storage: self.spec.storage.clone()
storage: self.spec.storage.clone(),
extensions: Some(extensions),
}
}));
let ps = PatchParams::apply("cntrlr").force();
Expand Down
154 changes: 152 additions & 2 deletions coredb-operator/src/extensions.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,69 @@
use crate::{Context, CoreDB, Error};
use crate::{
controller::{Extension, ExtensionInstallLocation},
Context, CoreDB, Error,
};
use regex::Regex;
use std::sync::Arc;
use std::{collections::HashMap, sync::Arc};
use tracing::{debug, info, warn};

#[derive(Debug)]
pub struct ExtRow {
pub name: String,
pub version: String,
pub enabled: bool,
pub schema: String,
}

const LIST_DATABASES_QUERY: &str = r#"SELECT datname FROM pg_database WHERE datistemplate = false;"#;
const LIST_EXTENSIONS_QUERY: &str = r#"select
distinct on
(name) *
from
(
select
*
from
(
select
t0.extname as name,
t0.extversion as version,
true as enabled,
t1.nspname as schema
from
(
select
extnamespace,
extname,
extversion
from
pg_extension
) t0,
(
select
oid,
nspname
from
pg_namespace
) t1
where
t1.oid = t0.extnamespace
) installed
union
select
name,
default_version as version,
false as enabled,
'public' as schema
from
pg_catalog.pg_available_extensions
order by
enabled asc
) combined
order by
name asc,
enabled desc
"#;

pub async fn manage_extensions(cdb: &CoreDB, ctx: Arc<Context>) -> Result<(), Error> {
let client = ctx.client.clone();
let extensions = &cdb.spec.extensions;
Expand Down Expand Up @@ -66,3 +127,92 @@ pub async fn manage_extensions(cdb: &CoreDB, ctx: Arc<Context>) -> Result<(), Er
}
Ok(())
}

pub async fn list_databases(cdb: &CoreDB, ctx: Arc<Context>) -> Result<Vec<String>, Error> {
let client = ctx.client.clone();
let psql_out = cdb
.psql(
LIST_DATABASES_QUERY.to_owned(),
"postgres".to_owned(),
client.clone(),
)
.await
.unwrap();
let result_string = psql_out.stdout.unwrap();
let mut databases = vec![];
for line in result_string.lines().skip(2) {
let fields: Vec<&str> = line.split('|').map(|s| s.trim()).collect();
if fields.is_empty() {
debug!("Done:{:?}", fields);
continue;
}
databases.push(fields[0].to_string());
}
let num_databases = databases.len();
info!("Found {} databases", num_databases);
Ok(databases)
}

pub async fn list_extensions(cdb: &CoreDB, ctx: Arc<Context>, database: &str) -> Result<Vec<ExtRow>, Error> {
let client = ctx.client.clone();
let psql_out = cdb
.psql(
LIST_EXTENSIONS_QUERY.to_owned(),
database.to_owned(),
client.clone(),
)
.await
.unwrap();
let result_string = psql_out.stdout.unwrap();
let mut extensions = vec![];
for line in result_string.lines().skip(2) {
let fields: Vec<&str> = line.split('|').map(|s| s.trim()).collect();
if fields.len() < 4 {
debug!("Done:{:?}", fields);
continue;
}
let package = ExtRow {
name: fields[0].to_owned(),
version: fields[1].to_owned(),
enabled: fields[2] == "t",
schema: fields[3].to_owned(),
};
extensions.push(package);
}
let num_extensions = extensions.len();
info!("Found {} extensions", num_extensions);
Ok(extensions)
}

/// list databases then get all extensions from each database
pub async fn get_all_extensions(cdb: &CoreDB, ctx: Arc<Context>) -> Result<Vec<Extension>, Error> {
let databases = list_databases(cdb, ctx.clone()).await?;

let mut ext_hashmap: HashMap<String, Vec<ExtensionInstallLocation>> = HashMap::new();
// query every database for extensions
// transform results by extension name, rather than by database
for db in databases {
let extensions = list_extensions(cdb, ctx.clone(), &db).await?;
for ext in extensions {
let extlocation = ExtensionInstallLocation {
database: db.clone(),
version: Some(ext.version),
enabled: ext.enabled,
schema: ext.schema,
};
ext_hashmap
.entry(ext.name)
.or_insert_with(Vec::new)
.push(extlocation);
}
}

let mut ext_spec: Vec<Extension> = Vec::new();
for (ext_name, ext_locations) in &ext_hashmap {
ext_spec.push(Extension {
name: ext_name.clone(),
locations: ext_locations.clone(),
});
}
Ok(ext_spec)
}
7 changes: 3 additions & 4 deletions coredb-operator/src/fixtures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ impl ApiServerVerifier {
let coredb = coredb_.clone();
tokio::spawn(async move {
pin_mut!(handle);

// After the PATCH to CoreDB, we expect a GET on Secrets
let (request, send) = handle
.next_request()
Expand Down Expand Up @@ -168,9 +167,9 @@ impl ApiServerVerifier {
assert_eq!(
request.uri().to_string(),
format!(
"/apis/coredb.io/v1alpha1/namespaces/testns/coredbs/{}/status?&force=true&fieldManager=cntrlr",
coredb.name_any()
)
"/apis/coredb.io/v1alpha1/namespaces/testns/coredbs/{}/status?&force=true&fieldManager=cntrlr",
coredb.name_any()
)
);
let req_body = to_bytes(request.into_body()).await.unwrap();
let json: serde_json::Value =
Expand Down
2 changes: 1 addition & 1 deletion coredb-operator/src/statefulset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ async fn update_pvc(client: Client, sts: &StatefulSet, cdb: &CoreDB) {
Api::namespaced(client, &sts.clone().metadata.namespace.unwrap());

let sts_name = sts.clone().metadata.name.unwrap();
let pvc_name = format!("data-{}-0", sts_name);
let pvc_name = format!("data-{sts_name}-0");
let mut pvc_requests: BTreeMap<String, Quantity> = BTreeMap::new();
pvc_requests.insert("storage".to_string(), cdb.spec.storage.clone());

Expand Down
8 changes: 7 additions & 1 deletion coredb-operator/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,13 @@ mod test {

println!("{}", result.stdout.clone().unwrap());
// assert does not contain postgis
assert!(!result.stdout.unwrap().contains("postgis"));
assert!(!result.stdout.clone().unwrap().contains("postgis"));

// assert extensions made it into the status
let spec = coredbs.get(name).await.unwrap();
let status = spec.status.unwrap();
let extensions = status.extensions.unwrap();
assert!(extensions.len() > 0);
}

#[tokio::test]
Expand Down
29 changes: 29 additions & 0 deletions coredb-operator/yaml/crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,35 @@ spec:
description: The status object of `CoreDB`
nullable: true
properties:
extensions:
items:
properties:
locations:
items:
properties:
database:
default: postrgres
type: string
enabled:
type: boolean
schema:
default: public
type: string
version:
nullable: true
type: string
required:
- enabled
type: object
type: array
name:
type: string
required:
- locations
- name
type: object
nullable: true
type: array
running:
type: boolean
storage:
Expand Down

0 comments on commit 91ee5d1

Please sign in to comment.