From d51fec35cbf181f6c171bd43f98a9f4cd5e83eb7 Mon Sep 17 00:00:00 2001 From: David Estes Date: Tue, 11 Jun 2024 15:05:28 -0600 Subject: [PATCH] feat: track version and data migrations created struct to manage updating data across versions --- Cargo.lock | 6 + Cargo.toml | 2 +- .../20240611165721_data_migration.down.sql | 2 + .../20240611165721_data_migration.up.sql | 9 + .../sqlite/20240611183747_version.down.sql | 2 + .../sqlite/20240611183747_version.up.sql | 6 + store/Cargo.toml | 1 + store/src/lib.rs | 2 + store/src/migration.rs | 245 ++++++++++++++++++ store/src/sql/access/event.rs | 22 +- store/src/sql/query.rs | 23 -- store/src/sql/test.rs | 9 - 12 files changed, 277 insertions(+), 52 deletions(-) create mode 100644 migrations/sqlite/20240611165721_data_migration.down.sql create mode 100644 migrations/sqlite/20240611165721_data_migration.up.sql create mode 100644 migrations/sqlite/20240611183747_version.down.sql create mode 100644 migrations/sqlite/20240611183747_version.up.sql create mode 100644 store/src/migration.rs diff --git a/Cargo.lock b/Cargo.lock index 9e3b521f1..0cfeb61f6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1385,6 +1385,7 @@ dependencies = [ "thiserror", "tmpdir", "tokio", + "tracing", "tracing-subscriber", "tracing-test", "uuid 1.8.0", @@ -7685,6 +7686,7 @@ dependencies = [ "atoi", "byteorder", "bytes 1.6.0", + "chrono", "crc", "crossbeam-queue", "either", @@ -7745,6 +7747,7 @@ dependencies = [ "sha2 0.10.8", "sqlx-core", "sqlx-mysql", + "sqlx-postgres", "sqlx-sqlite", "syn 1.0.109", "tempfile", @@ -7763,6 +7766,7 @@ dependencies = [ "bitflags 2.5.0", "byteorder", "bytes 1.6.0", + "chrono", "crc", "digest 0.10.7", "dotenvy", @@ -7804,6 +7808,7 @@ dependencies = [ "base64 0.21.7", "bitflags 2.5.0", "byteorder", + "chrono", "crc", "dotenvy", "etcetera", @@ -7839,6 +7844,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b244ef0a8414da0bed4bb1910426e890b19e5e9bccc27ada6b797d05c55ae0aa" dependencies = [ "atoi", + "chrono", "flume 0.11.0", "futures-channel", "futures-core", diff --git a/Cargo.toml b/Cargo.toml index 9a091b1e9..717ebeb67 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -151,7 +151,7 @@ serde_qs = "0.10.1" serde_with = "2.1" sha2 = { version = "0.10", default-features = false } smallvec = "1.10" -sqlx = { version = "0.7", features = ["sqlite", "runtime-tokio"] } +sqlx = { version = "0.7", features = ["sqlite", "runtime-tokio", "chrono"] } ssh-key = { version = "0.5.1", default-features = false } ssi = { version = "0.7", features = ["ed25519"] } swagger = { version = "6.1", features = [ diff --git a/migrations/sqlite/20240611165721_data_migration.down.sql b/migrations/sqlite/20240611165721_data_migration.down.sql new file mode 100644 index 000000000..512ed0666 --- /dev/null +++ b/migrations/sqlite/20240611165721_data_migration.down.sql @@ -0,0 +1,2 @@ +-- Add down migration script here +DROP TABLE IF EXSITS ceramic_one_data_migration; \ No newline at end of file diff --git a/migrations/sqlite/20240611165721_data_migration.up.sql b/migrations/sqlite/20240611165721_data_migration.up.sql new file mode 100644 index 000000000..1abb5af89 --- /dev/null +++ b/migrations/sqlite/20240611165721_data_migration.up.sql @@ -0,0 +1,9 @@ +-- Add up migration script here + +CREATE TABLE IF NOT EXISTS ceramic_one_data_migration ( + "name" TEXT PRIMARY KEY NOT NULL, + "version" TEXT NOT NULL, + started_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + last_attempted_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + completed_at TIMESTAMP +); diff --git a/migrations/sqlite/20240611183747_version.down.sql b/migrations/sqlite/20240611183747_version.down.sql new file mode 100644 index 000000000..234d3e445 --- /dev/null +++ b/migrations/sqlite/20240611183747_version.down.sql @@ -0,0 +1,2 @@ +-- Add down migration script here +DROP TABLE IF EXISTS "ceramic_one_version"; \ No newline at end of file diff --git a/migrations/sqlite/20240611183747_version.up.sql b/migrations/sqlite/20240611183747_version.up.sql new file mode 100644 index 000000000..0f8fdf25c --- /dev/null +++ b/migrations/sqlite/20240611183747_version.up.sql @@ -0,0 +1,6 @@ +-- Add up migration script here +CREATE TABLE IF NOT EXISTS "ceramic_one_version" ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + "version" TEXT NOT NULL UNIQUE, + "installed_at" TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP +); \ No newline at end of file diff --git a/store/Cargo.toml b/store/Cargo.toml index deb815be4..35318fa86 100644 --- a/store/Cargo.toml +++ b/store/Cargo.toml @@ -30,6 +30,7 @@ serde_ipld_dagcbor.workspace = true sqlx.workspace = true thiserror.workspace = true tokio.workspace = true +tracing.workspace = true [dev-dependencies] ceramic-event.workspace = true diff --git a/store/src/lib.rs b/store/src/lib.rs index 26ccb73b3..65a500300 100644 --- a/store/src/lib.rs +++ b/store/src/lib.rs @@ -4,10 +4,12 @@ mod error; mod metrics; +mod migration; mod sql; pub use error::Error; pub use metrics::{Metrics, StoreMetricsMiddleware}; +pub use migration::DataMigrator; pub use sql::{ entities::EventInsertable, entities::EventInsertableBody, CandidateEvent, CeramicOneBlock, CeramicOneEvent, CeramicOneEventBlock, CeramicOneInterest, CeramicOneStream, InsertResult, diff --git a/store/src/migration.rs b/store/src/migration.rs new file mode 100644 index 000000000..691b45fbb --- /dev/null +++ b/store/src/migration.rs @@ -0,0 +1,245 @@ +use std::sync::OnceLock; + +use sqlx::{prelude::FromRow, types::chrono}; +use tracing::{info}; + +use crate::{ + sql::{ + entities::{EventCid, EventHeader, ReconEventBlockRaw}, + }, + CeramicOneStream, Error, EventInsertableBody, Result, SqlitePool, +}; + +static MIGRATIONS: OnceLock> = OnceLock::new(); + +struct Migration { + /// The name of the migration. + name: &'static str, + /// The version this migration was released (anything below this should be migrated). + _version: &'static str, +} + +/// The list of migrations that need to be run in order to get the database up to date and start the server. +/// Add new migrations to the end of the list. +fn required_migrations() -> &'static Vec { + MIGRATIONS.get_or_init(|| { + vec![Migration { + name: "events_to_streams", + _version: "0.23.0", + }] + }) +} + +#[derive(Debug, Clone, sqlx::FromRow)] +struct Version { + id: i64, + version: String, + installed_at: chrono::NaiveDateTime, +} + +/// The data migrator is responsible for running data migrations the node requires as part of the verison upgrade. +pub struct DataMigrator { + prev_version: Option, + required_migrations: Vec<&'static Migration>, + pool: SqlitePool, +} + +impl DataMigrator { + pub async fn try_new(pool: SqlitePool) -> Result { + let current_version = env!("CARGO_PKG_VERSION").to_string(); + + let prev_version: Option = sqlx::query_as( + "SELECT id, version, installed_at FROM ceramic_one_version ORDER BY installed_at DESC limit 1;", + ) + .fetch_optional(pool.reader()) + .await?; + + sqlx::query( + "INSERT INTO ceramic_one_version (version) VALUES ($1) ON CONFLICT DO NOTHING;", + ) + .bind(¤t_version) + .execute(pool.writer()) + .await?; + + let applied_migrations = DataMigration::fetch_all(&pool).await?; + // In the future, we can filter out migrations that are not required based on the version as well + let required_migrations = required_migrations() + .iter() + .flat_map(|candidate| { + if applied_migrations + .iter() + .find(|m| m.name == candidate.name) + .and_then(|m| m.completed_at) + .is_some() + { + None + } else { + Some(candidate) + } + }) + .collect(); + + Ok(Self { + prev_version, + required_migrations, + pool, + }) + } + + /// Determines whether migrations are needed. Will mark all migrations as "complete" on a fresh install. + pub async fn needs_migration(&self) -> Result { + let new_install = self.is_new_install().await?; + + if new_install { + info!("Setting up new node... data migrations are not required."); + for migration_info in &self.required_migrations { + DataMigration::insert_completed(&self.pool, migration_info.name).await?; + } + return Ok(false); + } + + Ok(!self.required_migrations.is_empty()) + } + + /// In the future, we can check the version and run migrations based on that. For now, we need to know if this is just getting + /// the new feature that tracks versions, or if it is a fresh install. We use the presence of recon event data to indicate it's not new. + async fn is_new_install(&self) -> Result { + if self.prev_version.is_none() { + let x = sqlx::query(r#"SELECT cid from ceramic_one_event limit 1;"#) + .fetch_optional(self.pool.reader()) + .await?; + Ok(x.is_some()) + } else { + Ok(false) + } + } + + /// Run all migrations that have not been run yet. + pub async fn run_all(&self) -> Result<()> { + for migration_info in &self.required_migrations { + DataMigration::upsert(&self.pool, migration_info.name).await?; + self.run_migration_by_name(migration_info.name).await?; + DataMigration::mark_completed(&self.pool, migration_info.name).await?; + } + Ok(()) + } + + async fn run_migration_by_name(&self, name: &str) -> Result<()> { + info!("Starting migration: {}", name); + match name { + "events_to_streams" => self.migrate_events_to_streams().await, + _ => Err(Error::new_fatal(anyhow::anyhow!( + "Unknown migration: {}", + name + ))), + } + } + + // This isn't the most efficient approach but it's simple and we should only run it once. + // It isn't expected to ever be run on something that isn't a sqlite database upgrading from version 0.22.0 or below. + async fn migrate_events_to_streams(&self) -> Result<()> { + let mut cid_cursor = Some(EventCid::default()); + + while let Some(last_cid) = cid_cursor { + cid_cursor = self.migrate_events_to_streams_batch(last_cid).await?; + } + + Ok(()) + } + + async fn migrate_events_to_streams_batch( + &self, + last_cid: EventCid, + ) -> Result> { + let all_blocks: Vec = sqlx::query_as( + r#"SELECT + key.order_key, key.event_cid, eb.codec, eb.root, eb.idx, b.multihash, b.bytes + FROM ( + SELECT + e.cid as event_cid, e.order_key + FROM ceramic_one_event e + WHERE + EXISTS (SELECT 1 FROM ceramic_one_event_block where event_cid = e.cid) + AND NOT EXISTS (SELECT 1 from ceramic_one_event_metadata where cid = e.cid) + AND e.cid > $1 + ORDER BY e.cid + LIMIT 1000 + ) key + JOIN + ceramic_one_event_block eb ON key.event_cid = eb.event_cid + JOIN ceramic_one_block b on b.multihash = eb.block_multihash + ORDER BY key.order_key, eb.idx;"#, + ) + .bind(last_cid.to_bytes()) + .fetch_all(self.pool.reader()) + .await?; + + let values = ReconEventBlockRaw::into_carfiles(all_blocks).await?; + + let last_cid = values.last().and_then(|(id, _)| id.cid()); + if last_cid.is_none() { + return Ok(None); + } + let mut tx = self.pool.begin_tx().await?; + for (event_id, payload) in values { + // should we log and continue? this shouldn't be possible unless something bad happened + // if we error, will require manual intervention to recover + let event_cid = event_id.cid().ok_or_else(|| { + Error::new_fatal(anyhow::anyhow!("Event ID is missing a CID: {}", event_id)) + })?; + + let insertable = EventInsertableBody::try_from_carfile(event_cid, &payload).await?; + + if let EventHeader::Init { header, .. } = &insertable.header { + CeramicOneStream::insert_tx(&mut tx, insertable.cid, header).await?; + } + + CeramicOneStream::insert_event_header_tx(&mut tx, &insertable.header).await?; + } + tx.commit().await?; + + Ok(last_cid) + } +} + +#[derive(Debug, Clone, FromRow)] +struct DataMigration { + name: String, + version: String, + started_at: chrono::NaiveDateTime, + last_attempted_at: chrono::NaiveDateTime, + completed_at: Option, +} + +impl DataMigration { + async fn fetch_all(pool: &SqlitePool) -> Result> { + Ok(sqlx::query_as( + "SELECT name, version, started_at, completed_at, last_attempted_at FROM ceramic_one_data_migration;", + ) + .fetch_all(pool.reader()) + .await?) + } + + async fn upsert(pool: &SqlitePool, name: &str) -> Result<()> { + sqlx::query("INSERT INTO ceramic_one_data_migration (name, version) VALUES ($1, $2) on conflict (name) do update set last_attempted_at = CURRENT_TIMESTAMP;") + .bind(name) + .bind(env!("CARGO_PKG_VERSION")) + .execute(pool.writer()) + .await?; + Ok(()) + } + + async fn insert_completed(pool: &SqlitePool, name: &str) -> Result<()> { + sqlx::query("INSERT INTO ceramic_one_data_migration (name, version, completed_at) VALUES ($1, $2, CURRENT_TIMESTAMP);") + .bind(name) + .bind(env!("CARGO_PKG_VERSION")) + .execute(pool.writer()) + .await?; + Ok(()) + } + + async fn mark_completed(pool: &SqlitePool, name: &str) -> Result<()> { + sqlx::query("UPDATE ceramic_one_data_migration SET completed_at = CURRENT_TIMESTAMP WHERE name = $1;").bind(name).execute(pool.writer()).await?; + Ok(()) + } +} diff --git a/store/src/sql/access/event.rs b/store/src/sql/access/event.rs index 89654517d..8109a8970 100644 --- a/store/src/sql/access/event.rs +++ b/store/src/sql/access/event.rs @@ -224,15 +224,15 @@ impl CeramicOneEvent { for (idx, (deliverable, item)) in to_add.iter().enumerate() { let new_key = Self::insert_key(&mut tx, &item.order_key, *deliverable).await?; - let candiadate = CandidateEvent::new(item.cid(), item.stream_cid()); + let candidate = CandidateEvent::new(item.cid(), item.stream_cid()); if *deliverable { - delivered.push(candiadate); + delivered.push(candidate); // the insert failed so we didn't mark it as deliverable.. is this possible? if !new_key { Self::mark_ready_to_deliver(&mut tx, &item.cid()).await?; } } else { - undelivered.push(candiadate); + undelivered.push(candidate); } if new_key { for block in item.body.blocks.iter() { @@ -254,22 +254,6 @@ impl CeramicOneEvent { Ok(res) } - /// Find events that haven't been delivered to the client and may be ready - pub async fn undelivered_with_values( - pool: &SqlitePool, - offset: usize, - limit: usize, - ) -> Result)>> { - let all_blocks: Vec = - sqlx::query_as(EventQuery::undelivered_with_values()) - .bind(limit as i64) - .bind(offset as i64) - .fetch_all(pool.reader()) - .await?; - - let values = ReconEventBlockRaw::into_carfiles(all_blocks).await?; - Ok(values) - } /// Calculate the hash of a range of events pub async fn hash_range( diff --git a/store/src/sql/query.rs b/store/src/sql/query.rs index fc652dfcf..df631d80c 100644 --- a/store/src/sql/query.rs +++ b/store/src/sql/query.rs @@ -79,29 +79,6 @@ impl EventQuery { ORDER BY key.order_key, eb.idx;"# } - /// Find event CIDs that have not yet been delivered to the client - /// Useful after a restart, or if the task managing delivery has availability to try old events - pub fn undelivered_with_values() -> &'static str { - r#"SELECT - key.order_key, key.event_cid, eb.codec, eb.root, eb.idx, b.multihash, b.bytes - FROM ( - SELECT - e.cid as event_cid, e.order_key - FROM ceramic_one_event e - WHERE - EXISTS (SELECT 1 FROM ceramic_one_event_block where event_cid = e.cid) - AND e.delivered IS NULL - LIMIT - $1 - OFFSET - $2 - ) key - JOIN - ceramic_one_event_block eb ON key.event_cid = eb.event_cid - JOIN ceramic_one_block b on b.multihash = eb.block_multihash - ORDER BY key.order_key, eb.idx;"# - } - /// Requires binding 2 parameters. Fetches the new rows as `DeliveredEvent` objects pub fn new_delivered_events() -> &'static str { r#"SELECT diff --git a/store/src/sql/test.rs b/store/src/sql/test.rs index 365cfad2a..ad17e4073 100644 --- a/store/src/sql/test.rs +++ b/store/src/sql/test.rs @@ -135,15 +135,6 @@ async fn range_query() { .assert_debug_eq(&ids); } -#[tokio::test] -async fn undelivered_with_values() { - let pool = SqlitePool::connect_in_memory().await.unwrap(); - let res = CeramicOneEvent::undelivered_with_values(&pool, 0, 10000) - .await - .unwrap(); - assert_eq!(res.len(), 0); -} - #[tokio::test] async fn range_with_values() { let pool = SqlitePool::connect_in_memory().await.unwrap();