Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor MigrationRunner::run_migrations() to call a helper #10232

Open
wants to merge 3 commits into
base: tristan957/inline-queries
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion compute_tools/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ license.workspace = true
[features]
default = []
# Enables test specific features.
testing = []
testing = ["fail/failpoints"]

[dependencies]
base64.workspace = true
Expand All @@ -19,6 +19,7 @@ camino.workspace = true
chrono.workspace = true
cfg-if.workspace = true
clap.workspace = true
fail.workspace = true
flate2.workspace = true
futures.workspace = true
hyper0 = { workspace = true, features = ["full"] }
Expand Down
5 changes: 5 additions & 0 deletions compute_tools/src/bin/compute_ctl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,15 @@ use compute_tools::params::*;
use compute_tools::spec::*;
use compute_tools::swap::resize_swap;
use rlimit::{setrlimit, Resource};
use utils::failpoint_support;

// this is an arbitrary build tag. Fine as a default / for testing purposes
// in-case of not-set environment var
const BUILD_TAG_DEFAULT: &str = "latest";

fn main() -> Result<()> {
let scenario = failpoint_support::init();

let (build_tag, clap_args) = init()?;

// enable core dumping for all child processes
Expand Down Expand Up @@ -100,6 +103,8 @@ fn main() -> Result<()> {

maybe_delay_exit(delay_exit);

scenario.teardown();

deinit_and_exit(wait_pg_result);
}

Expand Down
15 changes: 13 additions & 2 deletions compute_tools/src/compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1181,8 +1181,19 @@ impl ComputeNode {
let mut conf = postgres::config::Config::from(conf);
conf.application_name("compute_ctl:migrations");

let mut client = conf.connect(NoTls)?;
handle_migrations(&mut client).context("apply_config handle_migrations")
match conf.connect(NoTls) {
Ok(mut client) => {
if let Err(e) = handle_migrations(&mut client) {
error!("Failed to run migrations: {}", e);
}
}
Err(e) => {
error!(
"Failed to connect to the compute for running migrations: {}",
e
);
}
};
});

Ok::<(), anyhow::Error>(())
Expand Down
15 changes: 15 additions & 0 deletions compute_tools/src/http/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@ use metrics::proto::MetricFamily;
use metrics::Encoder;
use metrics::TextEncoder;
use tokio::task;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};
use tracing_utils::http::OtelName;
use utils::failpoint_support::failpoints_handler;
use utils::http::error::ApiError;
use utils::http::request::must_get_query_param;

fn status_response_from_state(state: &ComputeState) -> ComputeStatusResponse {
Expand Down Expand Up @@ -310,6 +313,18 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
}
}

(&Method::POST, "/failpoints") if cfg!(feature = "testing") => {
match failpoints_handler(req, CancellationToken::new()).await {
Ok(r) => r,
Err(ApiError::BadRequest(e)) => {
render_json_error(&e.to_string(), StatusCode::BAD_REQUEST)
}
Err(_) => {
render_json_error("Internal server error", StatusCode::INTERNAL_SERVER_ERROR)
}
}
}

// download extension files from remote extension storage on demand
(&Method::POST, route) if route.starts_with("/extension_server/") => {
info!("serving {:?} POST request", route);
Expand Down
129 changes: 78 additions & 51 deletions compute_tools/src/migration.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
use anyhow::{Context, Result};
use fail::fail_point;
use postgres::Client;
use tracing::info;

/// Runs a series of migrations on a target database
pub(crate) struct MigrationRunner<'m> {
client: &'m mut Client,
migrations: &'m [&'m str],
}

impl<'m> MigrationRunner<'m> {
/// Create a new migration runner
pub fn new(client: &'m mut Client, migrations: &'m [&'m str]) -> Self {
// The neon_migration.migration_id::id column is a bigint, which is equivalent to an i64
assert!(migrations.len() + 1 < i64::MAX as usize);

Self { client, migrations }
}

/// Get the current value neon_migration.migration_id
fn get_migration_id(&mut self) -> Result<i64> {
let query = "SELECT id FROM neon_migration.migration_id";
let row = self
Expand All @@ -25,77 +29,100 @@ impl<'m> MigrationRunner<'m> {
Ok(row.get::<&str, i64>("id"))
}

/// Update the neon_migration.migration_id value
///
/// This function has a fail point called compute-migration, which can be
/// used if you would like to fail the application of a series of migrations
/// at some point.
fn update_migration_id(&mut self, migration_id: i64) -> Result<()> {
let setval = format!("UPDATE neon_migration.migration_id SET id={}", migration_id);
// We use this fail point in order to check that failing in the
// middle of applying a series of migrations fails in an expected
// manner
if cfg!(feature = "testing") {
let fail = (|| {
fail_point!("compute-migration", |fail_migration_id| {
migration_id == fail_migration_id.unwrap().parse::<i64>().unwrap()
});

false
})();

if fail {
return Err(anyhow::anyhow!(format!(
"migration {} was configured to fail because of a failpoint",
migration_id
)));
}
}

self.client
.simple_query(&setval)
.query(
"UPDATE neon_migration.migration_id SET id = $1",
&[&migration_id],
)
.context("run_migrations update id")?;

Ok(())
}

fn prepare_migrations(&mut self) -> Result<()> {
let query = "CREATE SCHEMA IF NOT EXISTS neon_migration";
self.client.simple_query(query)?;

let query = "CREATE TABLE IF NOT EXISTS neon_migration.migration_id (key INT NOT NULL PRIMARY KEY, id bigint NOT NULL DEFAULT 0)";
self.client.simple_query(query)?;

let query = "INSERT INTO neon_migration.migration_id VALUES (0, 0) ON CONFLICT DO NOTHING";
self.client.simple_query(query)?;

let query = "ALTER SCHEMA neon_migration OWNER TO cloud_admin";
self.client.simple_query(query)?;

let query = "REVOKE ALL ON SCHEMA neon_migration FROM PUBLIC";
self.client.simple_query(query)?;
/// Prepare the migrations the target database for handling migrations
fn prepare_database(&mut self) -> Result<()> {
self.client
.simple_query("CREATE SCHEMA IF NOT EXISTS neon_migration")?;
self.client.simple_query("CREATE TABLE IF NOT EXISTS neon_migration.migration_id (key INT NOT NULL PRIMARY KEY, id bigint NOT NULL DEFAULT 0)")?;
self.client.simple_query(
"INSERT INTO neon_migration.migration_id VALUES (0, 0) ON CONFLICT DO NOTHING",
)?;
self.client
.simple_query("ALTER SCHEMA neon_migration OWNER TO cloud_admin")?;
self.client
.simple_query("REVOKE ALL ON SCHEMA neon_migration FROM PUBLIC")?;

Ok(())
}

pub fn run_migrations(mut self) -> Result<()> {
self.prepare_migrations()?;
/// Run an individual migration
fn run_migration(&mut self, migration_id: i64, migration: &str) -> Result<()> {
if migration.starts_with("-- SKIP") {
info!("Skipping migration id={}", migration_id);

let mut current_migration = self.get_migration_id()? as usize;
while current_migration < self.migrations.len() {
macro_rules! migration_id {
($cm:expr) => {
($cm + 1) as i64
};
}
// Even though we are skipping the migration, updating the
// migration ID should help keep logic easy to understand when
// trying to understand the state of a cluster.
self.update_migration_id(migration_id)?;
} else {
info!("Running migration id={}:\n{}\n", migration_id, migration);

let migration = self.migrations[current_migration];
self.client
.simple_query("BEGIN")
.context("begin migration")?;

if migration.starts_with("-- SKIP") {
info!("Skipping migration id={}", migration_id!(current_migration));
} else {
info!(
"Running migration id={}:\n{}\n",
migration_id!(current_migration),
migration
);
self.client
.simple_query(migration)
.with_context(|| format!("run_migrations migration id={migration_id}"))?;

self.client
.simple_query("BEGIN")
.context("begin migration")?;
self.update_migration_id(migration_id)?;

self.client.simple_query(migration).with_context(|| {
format!(
"run_migrations migration id={}",
migration_id!(current_migration)
)
})?;
self.client
.simple_query("COMMIT")
.context("commit migration")?;

// Migration IDs start at 1
self.update_migration_id(migration_id!(current_migration))?;
info!("Finished migration id={}", migration_id);
}

Ok(())
}

self.client
.simple_query("COMMIT")
.context("commit migration")?;
/// Run the configrured set of migrations
pub fn run_migrations(mut self) -> Result<()> {
self.prepare_database()?;

info!("Finished migration id={}", migration_id!(current_migration));
}
let mut current_migration = self.get_migration_id()? as usize;
while current_migration < self.migrations.len() {
self.run_migration(
(current_migration + 1) as i64,
self.migrations[current_migration],
)?;

current_migration += 1;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
DO $$
DECLARE
bypassrls boolean;
BEGIN
SELECT rolbypassrls INTO bypassrls FROM pg_roles WHERE rolname = 'neon_superuser';
IF NOT bypassrls THEN
RAISE EXCEPTION 'neon_superuser cannot bypass RLS';
END IF;
END $$;
25 changes: 25 additions & 0 deletions compute_tools/src/migrations/tests/0002-alter_roles.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
DO $$
DECLARE
role record;
BEGIN
FOR role IN
SELECT rolname AS name, rolinherit AS inherit
FROM pg_roles
WHERE pg_has_role(rolname, 'neon_superuser', 'member')
LOOP
IF NOT role.inherit THEN
RAISE EXCEPTION '% cannot inherit', quote_ident(role.name);
END IF;
END LOOP;

FOR role IN
SELECT rolname AS name, rolbypassrls AS bypassrls
FROM pg_roles
WHERE NOT pg_has_role(rolname, 'neon_superuser', 'member')
AND NOT starts_with(rolname, 'pg_')
LOOP
IF role.bypassrls THEN
RAISE EXCEPTION '% can bypass RLS', quote_ident(role.name);
END IF;
END LOOP;
END $$;
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
DO $$
BEGIN
IF (SELECT current_setting('server_version_num')::numeric < 160000) THEN
RETURN;
END IF;

IF NOT (SELECT pg_has_role('neon_superuser', 'pg_create_subscription', 'member')) THEN
RAISE EXCEPTION 'neon_superuser cannot execute pg_create_subscription';
END IF;
END $$;
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
DO $$
DECLARE
monitor record;
BEGIN
SELECT pg_has_role('neon_superuser', 'pg_monitor', 'member') AS member,
admin_option AS admin
INTO monitor
FROM pg_auth_members
WHERE roleid = 'pg_monitor'::regrole
AND member = 'pg_monitor'::regrole;

IF NOT monitor.member THEN
RAISE EXCEPTION 'neon_superuser is not a member of pg_monitor';
END IF;

IF NOT monitor.admin THEN
RAISE EXCEPTION 'neon_superuser cannot grant pg_monitor';
END IF;
END $$;
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- This test was never written becuase at the time migration tests were added
-- the accompanying migration was already skipped.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- This test was never written becuase at the time migration tests were added
-- the accompanying migration was already skipped.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- This test was never written becuase at the time migration tests were added
-- the accompanying migration was already skipped.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- This test was never written becuase at the time migration tests were added
-- the accompanying migration was already skipped.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- This test was never written becuase at the time migration tests were added
-- the accompanying migration was already skipped.
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
DO $$
DECLARE
can_execute boolean;
BEGIN
SELECT bool_and(has_function_privilege('neon_superuser', oid, 'execute'))
INTO can_execute
FROM pg_proc
WHERE proname IN ('pg_export_snapshot', 'pg_log_standby_snapshot')
AND pronamespace = 'pg_catalog'::regnamespace;
IF NOT can_execute THEN
RAISE EXCEPTION 'neon_superuser cannot execute both pg_export_snapshot and pg_log_standby_snapshot';
END IF;
END $$;
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
DO $$
DECLARE
can_execute boolean;
BEGIN
SELECT has_function_privilege('neon_superuser', oid, 'execute')
INTO can_execute
FROM pg_proc
WHERE proname = 'pg_show_replication_origin_status'
AND pronamespace = 'pg_catalog'::regnamespace;
IF NOT can_execute THEN
RAISE EXCEPTION 'neon_superuser cannot execute pg_show_replication_origin_status';
END IF;
END $$;
Loading
Loading