From f54d61c5c6111c1a2455d05f3d91a41686cdff0a Mon Sep 17 00:00:00 2001 From: David Estes Date: Fri, 7 Jun 2024 15:21:36 -0600 Subject: [PATCH 1/9] feat: derive event header traits IPLD impls debug, PartialEq, Eq so unless we allow true floats I think this is okay --- event/src/bytes.rs | 2 +- event/src/unvalidated/payload/init.rs | 12 +++++++++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/event/src/bytes.rs b/event/src/bytes.rs index 58557977e..79334def4 100644 --- a/event/src/bytes.rs +++ b/event/src/bytes.rs @@ -6,7 +6,7 @@ use serde::{ }; /// Sequence of byte values. -#[derive(Clone, PartialEq, Default, Debug)] +#[derive(Clone, PartialEq, Eq, Default, Debug)] pub struct Bytes(Vec); impl Bytes { diff --git a/event/src/unvalidated/payload/init.rs b/event/src/unvalidated/payload/init.rs index f1a7c99f1..a89ee25c6 100644 --- a/event/src/unvalidated/payload/init.rs +++ b/event/src/unvalidated/payload/init.rs @@ -26,7 +26,7 @@ impl Payload { } /// Headers for an init event -#[derive(Default, Serialize, Deserialize)] +#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct Header { #[serde(default, skip_serializing_if = "Vec::is_empty")] @@ -81,4 +81,14 @@ impl Header { pub fn should_index(&self) -> bool { self.should_index.unwrap_or(true) } + + /// The unique value for the stream + pub fn unique(&self) -> Option<&[u8]> { + self.unique.as_ref().map(Bytes::as_slice) + } + + /// The context value for the stream + pub fn context(&self) -> Option<&[u8]> { + self.context.as_ref().map(Bytes::as_slice) + } } From 70571750635149e83bb409ce7d4bb92d5d232656 Mon Sep 17 00:00:00 2001 From: David Estes Date: Mon, 10 Jun 2024 11:56:04 -0600 Subject: [PATCH 2/9] chore: clean up errors and unused code --- service/src/error.rs | 29 ++++++------------- store/src/error.rs | 17 ++++++++++++ store/src/sql/entities/cid.rs | 52 ----------------------------------- 3 files changed, 25 insertions(+), 73 deletions(-) delete mode 100644 store/src/sql/entities/cid.rs diff --git a/service/src/error.rs b/service/src/error.rs index d2af9d6a2..f75c33d0a 100644 --- a/service/src/error.rs +++ b/service/src/error.rs @@ -81,18 +81,10 @@ impl Error { impl From for recon::Error { fn from(value: Error) -> Self { match value { - Error::Application { error } => recon::Error::Application { - error: error.context("recon error"), - }, - Error::Fatal { error } => recon::Error::Fatal { - error: error.context("recon error"), - }, - Error::Transient { error } => recon::Error::Transient { - error: error.context("recon error"), - }, - Error::InvalidArgument { error } => recon::Error::Application { - error: error.context("recon error"), - }, + Error::Application { error } => recon::Error::Application { error }, + Error::Fatal { error } => recon::Error::Fatal { error }, + Error::Transient { error } => recon::Error::Transient { error }, + Error::InvalidArgument { error } => recon::Error::Application { error }, } } } @@ -100,15 +92,10 @@ impl From for recon::Error { impl From for Error { fn from(value: ceramic_store::Error) -> Self { match value { - ceramic_store::Error::Application { error } => Error::Application { - error: error.context("store error"), - }, - ceramic_store::Error::Fatal { error } => Error::Fatal { - error: error.context("store error"), - }, - ceramic_store::Error::Transient { error } => Error::Transient { - error: error.context("store error"), - }, + ceramic_store::Error::Application { error } => Error::Application { error }, + ceramic_store::Error::Fatal { error } => Error::Fatal { error }, + ceramic_store::Error::Transient { error } => Error::Transient { error }, + ceramic_store::Error::InvalidArgument { error } => Error::InvalidArgument { error }, } } } diff --git a/store/src/error.rs b/store/src/error.rs index 75628acc4..afd574126 100644 --- a/store/src/error.rs +++ b/store/src/error.rs @@ -9,6 +9,12 @@ pub enum Error { /// The error details that may include context and other information error: anyhow::Error, }, + #[error("InvalidArgument: {error}")] + /// Invalid client input + InvalidArgument { + /// The error details that may include context and other information + error: anyhow::Error, + }, #[error("Fatal error encountered: {error}")] /// A fatal error that is unlikely to be recoverable, and may require terminating the process completely Fatal { @@ -45,6 +51,13 @@ impl Error { } } + /// Crate an InvalidArgument error + pub fn new_invalid_arg(error: impl Into) -> Self { + Self::InvalidArgument { + error: error.into(), + } + } + /// Add context to the internal error. Works identically to `anyhow::context` pub fn context(self, context: C) -> Self where @@ -60,6 +73,9 @@ impl Error { Error::Transient { error } => Self::Transient { error: error.context(context), }, + Error::InvalidArgument { error } => Self::InvalidArgument { + error: error.context(context), + }, } } } @@ -115,6 +131,7 @@ impl From for recon::Error { Error::Application { error } => recon::Error::Application { error }, Error::Fatal { error } => recon::Error::Fatal { error }, Error::Transient { error } => recon::Error::Transient { error }, + Error::InvalidArgument { error } => recon::Error::Application { error }, } } } diff --git a/store/src/sql/entities/cid.rs b/store/src/sql/entities/cid.rs deleted file mode 100644 index 5211faa3b..000000000 --- a/store/src/sql/entities/cid.rs +++ /dev/null @@ -1,52 +0,0 @@ -use std::borrow::Cow; - -use cid::Cid; -use sqlx::{ - encode::IsNull, - error::BoxDynError, - sqlite::{SqliteArgumentValue, SqliteRow, SqliteTypeInfo, SqliteValueRef}, - Decode, Encode, FromRow, Row, Sqlite, Type, -}; - -#[derive(Debug)] -pub struct CidBlob(pub(crate) Cid); - -impl Type for CidBlob { - fn type_info() -> SqliteTypeInfo { - <&[u8] as Type>::type_info() - } - - fn compatible(ty: &SqliteTypeInfo) -> bool { - <&[u8] as Type>::compatible(ty) - } -} - -impl<'q> Encode<'q, Sqlite> for CidBlob { - fn encode(self, args: &mut Vec>) -> IsNull { - args.push(SqliteArgumentValue::Blob(Cow::Owned(self.0.to_bytes()))); - - IsNull::No - } - - fn encode_by_ref(&self, args: &mut Vec>) -> IsNull { - args.push(SqliteArgumentValue::Blob(Cow::Owned(self.0.to_bytes()))); - - IsNull::No - } -} - -impl<'r> Decode<'r, Sqlite> for CidBlob { - fn decode(value: SqliteValueRef<'r>) -> Result { - let v: &[u8] = <&[u8] as Decode>::decode(value)?; - let cid = Cid::try_from(v)?; - Ok(CidBlob(cid)) - } -} - -impl FromRow<'_, SqliteRow> for CidBlob { - fn from_row(row: &SqliteRow) -> std::result::Result { - let v: Vec = row.get(0); - let cid = Cid::try_from(v.as_slice()).map_err(|e| sqlx::Error::Decode(Box::new(e)))?; - Ok(CidBlob(cid)) - } -} From 7b63733fe5a7540ec62abed86166c69e0308e098 Mon Sep 17 00:00:00 2001 From: David Estes Date: Thu, 13 Jun 2024 17:51:33 -0600 Subject: [PATCH 3/9] feat: crate tables to track data migrations --- migrations/sqlite/20240611165721_data_migration.down.sql | 2 ++ migrations/sqlite/20240611165721_data_migration.up.sql | 9 +++++++++ migrations/sqlite/20240611183747_version.down.sql | 2 ++ migrations/sqlite/20240611183747_version.up.sql | 6 ++++++ 4 files changed, 19 insertions(+) create mode 100644 migrations/sqlite/20240611165721_data_migration.down.sql create mode 100644 migrations/sqlite/20240611165721_data_migration.up.sql create mode 100644 migrations/sqlite/20240611183747_version.down.sql create mode 100644 migrations/sqlite/20240611183747_version.up.sql diff --git a/migrations/sqlite/20240611165721_data_migration.down.sql b/migrations/sqlite/20240611165721_data_migration.down.sql new file mode 100644 index 000000000..512ed0666 --- /dev/null +++ b/migrations/sqlite/20240611165721_data_migration.down.sql @@ -0,0 +1,2 @@ +-- Add down migration script here +DROP TABLE IF EXSITS ceramic_one_data_migration; \ No newline at end of file diff --git a/migrations/sqlite/20240611165721_data_migration.up.sql b/migrations/sqlite/20240611165721_data_migration.up.sql new file mode 100644 index 000000000..1abb5af89 --- /dev/null +++ b/migrations/sqlite/20240611165721_data_migration.up.sql @@ -0,0 +1,9 @@ +-- Add up migration script here + +CREATE TABLE IF NOT EXISTS ceramic_one_data_migration ( + "name" TEXT PRIMARY KEY NOT NULL, + "version" TEXT NOT NULL, + started_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + last_attempted_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + completed_at TIMESTAMP +); diff --git a/migrations/sqlite/20240611183747_version.down.sql b/migrations/sqlite/20240611183747_version.down.sql new file mode 100644 index 000000000..234d3e445 --- /dev/null +++ b/migrations/sqlite/20240611183747_version.down.sql @@ -0,0 +1,2 @@ +-- Add down migration script here +DROP TABLE IF EXISTS "ceramic_one_version"; \ No newline at end of file diff --git a/migrations/sqlite/20240611183747_version.up.sql b/migrations/sqlite/20240611183747_version.up.sql new file mode 100644 index 000000000..0f8fdf25c --- /dev/null +++ b/migrations/sqlite/20240611183747_version.up.sql @@ -0,0 +1,6 @@ +-- Add up migration script here +CREATE TABLE IF NOT EXISTS "ceramic_one_version" ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + "version" TEXT NOT NULL UNIQUE, + "installed_at" TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP +); \ No newline at end of file From f0adbe9965368050c58a9b9bc7a0519c243f79fc Mon Sep 17 00:00:00 2001 From: David Estes Date: Thu, 13 Jun 2024 17:57:40 -0600 Subject: [PATCH 4/9] feat: initial data migrator that does nothing but track version --- Cargo.lock | 6 ++ Cargo.toml | 2 +- store/Cargo.toml | 1 + store/src/lib.rs | 2 + store/src/migration.rs | 212 +++++++++++++++++++++++++++++++++++++++++ store/src/sql/query.rs | 3 + 6 files changed, 225 insertions(+), 1 deletion(-) create mode 100644 store/src/migration.rs diff --git a/Cargo.lock b/Cargo.lock index d48cb4a8b..a3d182581 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1384,6 +1384,7 @@ dependencies = [ "thiserror", "tmpdir", "tokio", + "tracing", "tracing-subscriber", "tracing-test", "uuid 1.8.0", @@ -7684,6 +7685,7 @@ dependencies = [ "atoi", "byteorder", "bytes 1.6.0", + "chrono", "crc", "crossbeam-queue", "either", @@ -7744,6 +7746,7 @@ dependencies = [ "sha2 0.10.8", "sqlx-core", "sqlx-mysql", + "sqlx-postgres", "sqlx-sqlite", "syn 1.0.109", "tempfile", @@ -7762,6 +7765,7 @@ dependencies = [ "bitflags 2.5.0", "byteorder", "bytes 1.6.0", + "chrono", "crc", "digest 0.10.7", "dotenvy", @@ -7803,6 +7807,7 @@ dependencies = [ "base64 0.21.7", "bitflags 2.5.0", "byteorder", + "chrono", "crc", "dotenvy", "etcetera", @@ -7838,6 +7843,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b244ef0a8414da0bed4bb1910426e890b19e5e9bccc27ada6b797d05c55ae0aa" dependencies = [ "atoi", + "chrono", "flume 0.11.0", "futures-channel", "futures-core", diff --git a/Cargo.toml b/Cargo.toml index 9a091b1e9..717ebeb67 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -151,7 +151,7 @@ serde_qs = "0.10.1" serde_with = "2.1" sha2 = { version = "0.10", default-features = false } smallvec = "1.10" -sqlx = { version = "0.7", features = ["sqlite", "runtime-tokio"] } +sqlx = { version = "0.7", features = ["sqlite", "runtime-tokio", "chrono"] } ssh-key = { version = "0.5.1", default-features = false } ssi = { version = "0.7", features = ["ed25519"] } swagger = { version = "6.1", features = [ diff --git a/store/Cargo.toml b/store/Cargo.toml index 250021320..df193391d 100644 --- a/store/Cargo.toml +++ b/store/Cargo.toml @@ -27,6 +27,7 @@ thiserror.workspace = true recon.workspace = true sqlx.workspace = true tokio.workspace = true +tracing.workspace = true [dev-dependencies] ceramic-event.workspace = true diff --git a/store/src/lib.rs b/store/src/lib.rs index 217c8751d..f76bdf598 100644 --- a/store/src/lib.rs +++ b/store/src/lib.rs @@ -4,10 +4,12 @@ mod error; mod metrics; +mod migration; mod sql; pub use error::Error; pub use metrics::{Metrics, StoreMetricsMiddleware}; +pub use migration::DataMigrator; pub use sql::{ entities::EventInsertable, entities::EventInsertableBody, CeramicOneBlock, CeramicOneEvent, CeramicOneEventBlock, CeramicOneInterest, Migrations, SqlitePool, SqliteRootStore, diff --git a/store/src/migration.rs b/store/src/migration.rs new file mode 100644 index 000000000..9dbbcecbd --- /dev/null +++ b/store/src/migration.rs @@ -0,0 +1,212 @@ +use std::sync::OnceLock; + +use sqlx::{prelude::FromRow, types::chrono}; +use tracing::{debug, info}; + +use crate::{Error, Result, SqlitePool}; + +static MIGRATIONS: OnceLock> = OnceLock::new(); + +#[derive(Debug)] +struct Migration { + /// The name of the migration. + name: &'static str, + /// The version this migration was released (anything below this should be migrated). + _version: &'static str, +} + +/// The list of migrations that need to be run in order to get the database up to date and start the server. +/// Add new migrations to the end of the list. +fn data_migrations() -> &'static Vec { + MIGRATIONS.get_or_init(Vec::new) +} + +#[derive(Debug, Clone, sqlx::FromRow)] +// We want to retrieve these fields for logging but we don't refer to them directly +#[allow(dead_code)] +struct Version { + id: i64, + version: String, + installed_at: chrono::NaiveDateTime, +} + +/// The data migrator is responsible for running data migrations the node requires as part of the verison upgrade. +pub struct DataMigrator { + prev_version: Option, + required_migrations: Vec<&'static Migration>, + pool: SqlitePool, +} + +impl DataMigrator { + /// Create a new data migrator. This updates the version table with the current version and determine which migrations need to be run. + pub async fn try_new(pool: SqlitePool) -> Result { + let current_version = env!("CARGO_PKG_VERSION").to_string(); + + let prev_version: Option = sqlx::query_as( + "SELECT id, version, installed_at FROM ceramic_one_version ORDER BY installed_at DESC limit 1;", + ) + .fetch_optional(pool.reader()) + .await?; + + sqlx::query( + "INSERT INTO ceramic_one_version (version) VALUES ($1) ON CONFLICT DO NOTHING;", + ) + .bind(¤t_version) + .execute(pool.writer()) + .await?; + + let applied_migrations = DataMigration::fetch_all(&pool).await?; + debug!(?prev_version, %current_version, ?applied_migrations, "Current data migration status"); + // In the future, we can filter out migrations that are not required based on the version as well + let required_migrations = data_migrations() + .iter() + .flat_map(|candidate| { + if applied_migrations + .iter() + .find(|m| m.name == candidate.name) + .and_then(|m| m.completed_at) + .is_some() + { + None + } else { + Some(candidate) + } + }) + .collect(); + + tracing::debug!("required migrations: {:?}", required_migrations); + + Ok(Self { + prev_version, + required_migrations, + pool, + }) + } + + /// Determines whether migrations are needed. Will mark all migrations as "complete" on a fresh install. + pub async fn needs_migration(&self) -> Result { + let new_install = self.is_new_install().await?; + + if new_install { + debug!("Setting up new node... data migrations are not required."); + for migration_info in &self.required_migrations { + DataMigration::insert_completed(&self.pool, migration_info.name).await?; + } + return Ok(false); + } + + Ok(!self.required_migrations.is_empty()) + } + + /// In the future, we can check the version and run migrations based on that. For now, we need to know if this is just getting + /// the new feature that tracks versions, or if it is a fresh install. We use the presence of recon event data to indicate it's not new. + async fn is_new_install(&self) -> Result { + if self.prev_version.is_none() { + let row = sqlx::query(r#"SELECT cid as rowid from ceramic_one_event;"#) + .fetch_optional(self.pool.reader()) + .await?; + Ok(row.is_none()) + } else { + Ok(false) + } + } + + /// Run all migrations that have not been run yet. + pub async fn run_all(&self) -> Result<()> { + for migration_info in &self.required_migrations { + info!("Starting migration: {}", migration_info.name); + DataMigration::upsert(&self.pool, migration_info.name).await?; + self.run_migration_by_name(migration_info.name).await?; + DataMigration::mark_completed(&self.pool, migration_info.name).await?; + } + Ok(()) + } + + async fn run_migration_by_name(&self, name: &str) -> Result<()> { + #[allow(clippy::match_single_binding)] + let _res: Result<()> = match name { + _ => { + return Err(Error::new_fatal(anyhow::anyhow!( + "Unknown migration: {}", + name + ))) + } + }; + #[allow(unreachable_code)] + match _res { + Ok(_) => { + info!("Migration {} completed successfully", name); + Ok(()) + } + Err(e) => { + tracing::error!("Migration encountered error: {:?}", e); + match e { + Error::Fatal { error } => Err(Error::new_fatal(anyhow::anyhow!( + "Migration {} failed in irrecoverable way: {}", + name, + error + ))), + + e => { + let err = e.context("Migration failed but can be retried"); + Err(err) + } + } + } + } + } +} + +#[derive(Debug, Clone, FromRow)] +// we want to retrieve these fields for logging but we don't refer to them directly +#[allow(dead_code)] +struct DataMigration { + name: String, + version: String, + started_at: chrono::NaiveDateTime, + last_attempted_at: chrono::NaiveDateTime, + completed_at: Option, +} + +impl DataMigration { + async fn fetch_all(pool: &SqlitePool) -> Result> { + Ok(sqlx::query_as( + "SELECT name, version, started_at, completed_at, last_attempted_at FROM ceramic_one_data_migration;", + ) + .fetch_all(pool.reader()) + .await?) + } + + async fn upsert(pool: &SqlitePool, name: &str) -> Result<()> { + sqlx::query("INSERT INTO ceramic_one_data_migration (name, version) VALUES ($1, $2) on conflict (name) do update set last_attempted_at = CURRENT_TIMESTAMP;") + .bind(name) + .bind(env!("CARGO_PKG_VERSION")) + .execute(pool.writer()) + .await?; + Ok(()) + } + + async fn insert_completed(pool: &SqlitePool, name: &str) -> Result<()> { + sqlx::query("INSERT INTO ceramic_one_data_migration (name, version, completed_at) VALUES ($1, $2, CURRENT_TIMESTAMP);") + .bind(name) + .bind(env!("CARGO_PKG_VERSION")) + .execute(pool.writer()) + .await?; + Ok(()) + } + + async fn mark_completed(pool: &SqlitePool, name: &str) -> Result<()> { + sqlx::query("UPDATE ceramic_one_data_migration SET completed_at = CURRENT_TIMESTAMP WHERE name = $1;").bind(name).execute(pool.writer()).await?; + Ok(()) + } +} + +#[cfg(test)] +mod test { + #[tokio::test] + async fn correctly_skips_new_install() { + let pool = crate::sql::SqlitePool::connect_in_memory().await.unwrap(); + let migrator = crate::migration::DataMigrator::try_new(pool).await.unwrap(); + assert!(!migrator.needs_migration().await.unwrap()); + } +} diff --git a/store/src/sql/query.rs b/store/src/sql/query.rs index 9282e079c..c73ffc9d1 100644 --- a/store/src/sql/query.rs +++ b/store/src/sql/query.rs @@ -120,6 +120,9 @@ impl EventQuery { } /// Updates the delivered column in the event table so it can be set to the client + /// Requires 2 parameters: + /// $1 = delivered (i64) + /// $2 = cid (bytes) pub fn mark_ready_to_deliver() -> &'static str { "UPDATE ceramic_one_event SET delivered = $1 WHERE cid = $2;" } From 2b3f86e3e60aa4f8831a0dcd616ec76892d5a1de Mon Sep 17 00:00:00 2001 From: David Estes Date: Thu, 13 Jun 2024 18:02:18 -0600 Subject: [PATCH 5/9] feat: add command to run data migrations automatically on daemon startup --- one/src/lib.rs | 25 +++++++++++++++++++--- service/src/interest/service.rs | 1 + service/src/lib.rs | 37 +++++++++++++++++++++++++++++++++ 3 files changed, 60 insertions(+), 3 deletions(-) diff --git a/one/src/lib.rs b/one/src/lib.rs index 83030d288..6a12ae9f4 100644 --- a/one/src/lib.rs +++ b/one/src/lib.rs @@ -17,7 +17,7 @@ use ceramic_core::{EventId, Interest}; use ceramic_kubo_rpc::Multiaddr; use ceramic_metrics::{config::Config as MetricsConfig, MetricsHandle}; use ceramic_p2p::{load_identity, DiskStorage, Keychain, Libp2pConfig}; -use ceramic_service::{CeramicEventService, CeramicInterestService}; +use ceramic_service::{CeramicEventService, CeramicInterestService, CeramicService}; use clap::{Args, Parser, Subcommand, ValueEnum}; use futures::StreamExt; use multibase::Base; @@ -157,6 +157,10 @@ struct DaemonOpts { /// The default is to use `db.sqlite3` in the store directory. #[arg(long, env = "CERAMIC_ONE_DATABASE_URL")] database_url: Option, + + #[arg(long, default_value_t = false, env = "CERAMIC_ONE_MIGRATE_DATA")] + /// Whether to apply any required data migrations at startup. If false and migrations are required, the server will exit. + migrate_data: bool, } #[derive(ValueEnum, Debug, Clone, Default)] @@ -280,11 +284,13 @@ impl DaemonOpts { async fn build_sqlite_dbs(path: &str) -> Result { let sql_pool = ceramic_store::SqlitePool::connect(path, ceramic_store::Migrations::Apply).await?; - let interest_store = Arc::new(CeramicInterestService::new(sql_pool.clone())); - let event_store = Arc::new(CeramicEventService::new(sql_pool).await?); + let ceramic_service = CeramicService::try_new(sql_pool).await?; + let interest_store = ceramic_service.interest_service().to_owned(); + let event_store = ceramic_service.event_service().to_owned(); println!("Connected to sqlite database: {}", path); Ok(Databases::Sqlite(SqliteBackend { + ceramic_service, event_store, interest_store, })) @@ -295,6 +301,7 @@ enum Databases { Sqlite(SqliteBackend), } struct SqliteBackend { + ceramic_service: CeramicService, interest_store: Arc, event_store: Arc, } @@ -312,6 +319,7 @@ impl Daemon { Databases::Sqlite(db) => { Daemon::run_int( opts, + db.ceramic_service, db.interest_store.clone(), db.interest_store, db.event_store.clone(), @@ -325,6 +333,7 @@ impl Daemon { async fn run_int( opts: DaemonOpts, + service: CeramicService, interest_api_store: Arc, interest_recon_store: Arc, model_api_store: Arc, @@ -389,6 +398,16 @@ impl Daemon { ); }); + let migrator = service.data_migrator().await?; + if migrator.needs_migration().await? { + if opts.migrate_data { + migrator.run_all().await?; + } else { + warn!("Data migrations are required, but --migrate-data is not set. Run with --migrate-data to apply migrations. Before doing so, you should back up your sqlite files from {:?}", dir); + return Ok(()); + } + } + let p2p_config = Libp2pConfig { mdns: false, bitswap_server: false, diff --git a/service/src/interest/service.rs b/service/src/interest/service.rs index f95cacd74..43756a262 100644 --- a/service/src/interest/service.rs +++ b/service/src/interest/service.rs @@ -2,6 +2,7 @@ use ceramic_store::SqlitePool; /// A Service that understands how to process and store Ceramic Interests. /// Implements the [`recon::Store`], [`iroh_bitswap::Store`], and [`ceramic_api::EventStore`] traits for [`ceramic_core::Interest`]. +#[derive(Debug, Clone)] pub struct CeramicInterestService { pub(crate) pool: SqlitePool, } diff --git a/service/src/lib.rs b/service/src/lib.rs index e42c6d02e..e667522fa 100644 --- a/service/src/lib.rs +++ b/service/src/lib.rs @@ -1,11 +1,48 @@ mod error; mod event; mod interest; + #[cfg(test)] mod tests; +use std::sync::Arc; + +use ceramic_store::{DataMigrator, SqlitePool}; pub use error::Error; pub use event::CeramicEventService; pub use interest::CeramicInterestService; pub(crate) type Result = std::result::Result; + +/// The ceramic service holds the logic needed by the other components (e.g. api, recon) to access the store and process events +/// in a way that makes sense to the ceramic protocol, and not just as raw bytes. +#[derive(Debug)] +pub struct CeramicService { + pub(crate) interest: Arc, + pub(crate) event: Arc, +} + +impl CeramicService { + /// Create a new CeramicService + pub async fn try_new(pool: SqlitePool) -> Result { + let interest = Arc::new(CeramicInterestService::new(pool.clone())); + let event = Arc::new(CeramicEventService::new(pool).await?); + Ok(Self { interest, event }) + } + + /// Get the interest service + pub fn interest_service(&self) -> &Arc { + &self.interest + } + + /// Get the event service + pub fn event_service(&self) -> &Arc { + &self.event + } + + /// Get the data migrator + pub async fn data_migrator(&self) -> Result { + let m = DataMigrator::try_new(self.event.pool.clone()).await?; + Ok(m) + } +} From 9b2634f35c14783e71ba83b9fe6bbd4852357508 Mon Sep 17 00:00:00 2001 From: David Estes Date: Thu, 13 Jun 2024 18:12:41 -0600 Subject: [PATCH 6/9] feat: add tables to track stream/event metadata --- migrations/sqlite/20240529202212_stream.down.sql | 2 ++ migrations/sqlite/20240529202212_stream.up.sql | 8 ++++++++ .../sqlite/20240530125008_event_metadata.down.sql | 2 ++ .../sqlite/20240530125008_event_metadata.up.sql | 11 +++++++++++ 4 files changed, 23 insertions(+) create mode 100644 migrations/sqlite/20240529202212_stream.down.sql create mode 100644 migrations/sqlite/20240529202212_stream.up.sql create mode 100644 migrations/sqlite/20240530125008_event_metadata.down.sql create mode 100644 migrations/sqlite/20240530125008_event_metadata.up.sql diff --git a/migrations/sqlite/20240529202212_stream.down.sql b/migrations/sqlite/20240529202212_stream.down.sql new file mode 100644 index 000000000..28034f533 --- /dev/null +++ b/migrations/sqlite/20240529202212_stream.down.sql @@ -0,0 +1,2 @@ +-- Add down migration script here +DROP TABLE IF EXISTS "ceramic_one_stream"; \ No newline at end of file diff --git a/migrations/sqlite/20240529202212_stream.up.sql b/migrations/sqlite/20240529202212_stream.up.sql new file mode 100644 index 000000000..197381ad0 --- /dev/null +++ b/migrations/sqlite/20240529202212_stream.up.sql @@ -0,0 +1,8 @@ +-- Add up migration script here +CREATE TABLE IF NOT EXISTS "ceramic_one_stream" ( + "cid" BLOB NOT NULL, -- init event cid + "sep" TEXT NOT NULL, + "sep_value" blob NOT NULL, + -- we ignore the composeDB/indexing related fields: should_index, unique, context + PRIMARY KEY(cid) +); diff --git a/migrations/sqlite/20240530125008_event_metadata.down.sql b/migrations/sqlite/20240530125008_event_metadata.down.sql new file mode 100644 index 000000000..cda23cd1f --- /dev/null +++ b/migrations/sqlite/20240530125008_event_metadata.down.sql @@ -0,0 +1,2 @@ +-- Add down migration script here +DROP TABLE IF EXISTS "ceramic_one_event_metadata"; \ No newline at end of file diff --git a/migrations/sqlite/20240530125008_event_metadata.up.sql b/migrations/sqlite/20240530125008_event_metadata.up.sql new file mode 100644 index 000000000..98379173e --- /dev/null +++ b/migrations/sqlite/20240530125008_event_metadata.up.sql @@ -0,0 +1,11 @@ +-- Add up migration script here +CREATE TABLE IF NOT EXISTS "ceramic_one_event_metadata" ( + "cid" BLOB NOT NULL, -- event cid + "event_type" INTEGER NOT NULL, -- enum EventType: Init, Data, Time + "stream_cid" BLOB NOT NULL, -- id field in header. can't have FK because stream may not exist until we discover it but should reference ceramic_one_stream(cid) + "prev" BLOB, -- prev event cid. can't have a foreign key because node may not know about prev event but it should reference ceramic_one_event(cid) + PRIMARY KEY(cid), + FOREIGN KEY(cid) REFERENCES ceramic_one_event(cid) +); + +CREATE INDEX IF NOT EXISTS "idx_ceramic_one_event_metadata_stream_cid" ON "ceramic_one_event_metadata" ("stream_cid"); From 2198d3a519df0a80a3176d768eaa1d21ec08c6a4 Mon Sep 17 00:00:00 2001 From: David Estes Date: Thu, 13 Jun 2024 18:29:27 -0600 Subject: [PATCH 7/9] chore: remove todo --- store/src/error.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/store/src/error.rs b/store/src/error.rs index afd574126..d7943e191 100644 --- a/store/src/error.rs +++ b/store/src/error.rs @@ -119,7 +119,7 @@ impl From for Error { sqlx::Error::Decode(e) => Self::new_app(anyhow!(e)), sqlx::Error::RowNotFound => Self::new_app(anyhow!("Row not found")), // non_exhaustive - // TODO: is there a way to skip a variant and throw a compilation error if one is ever added? + // is there a way to skip a variant and throw a compilation error if one is ever added? e => Self::new_app(e), } } From 100185b5295153e5290bacd5632c616fb8121db9 Mon Sep 17 00:00:00 2001 From: David Estes Date: Thu, 13 Jun 2024 18:39:59 -0600 Subject: [PATCH 8/9] feat: use the stream and event_metadata tables when discovering events - populated when discovering events - rewrote IOD using it (been running `recon_lots_of_streams` in a loop and it keeps passing) - created crate specific InsertResult structs (api, recon, service, store). If any api batch write fails because of something in the service (i.e. no prev), it won't fail other writes in the batch --- api/src/lib.rs | 2 +- api/src/server.rs | 56 +- api/src/tests.rs | 55 +- service/Cargo.toml | 3 +- service/src/event/mod.rs | 1 + service/src/event/order_events.rs | 102 +++ service/src/event/ordering_task.rs | 851 +++++++---------------- service/src/event/service.rs | 366 +++------- service/src/event/store.rs | 23 +- service/src/tests/mod.rs | 89 +-- service/src/tests/ordering.rs | 135 +++- store/Cargo.toml | 7 +- store/src/lib.rs | 4 +- store/src/metrics.rs | 11 +- store/src/sql/access/event.rs | 120 +++- store/src/sql/access/mod.rs | 4 +- store/src/sql/access/stream.rs | 151 ++++ store/src/sql/entities/event.rs | 152 +++- store/src/sql/entities/event_block.rs | 4 + store/src/sql/entities/event_metadata.rs | 58 ++ store/src/sql/entities/hash.rs | 2 +- store/src/sql/entities/mod.rs | 8 +- store/src/sql/entities/stream.rs | 68 ++ store/src/sql/entities/utils.rs | 4 +- store/src/sql/mod.rs | 5 +- store/src/sql/query.rs | 8 +- store/src/sql/test.rs | 19 +- 27 files changed, 1258 insertions(+), 1050 deletions(-) create mode 100644 service/src/event/order_events.rs create mode 100644 store/src/sql/access/stream.rs create mode 100644 store/src/sql/entities/event_metadata.rs create mode 100644 store/src/sql/entities/stream.rs diff --git a/api/src/lib.rs b/api/src/lib.rs index 7cb8cca41..b306ac7c1 100644 --- a/api/src/lib.rs +++ b/api/src/lib.rs @@ -3,7 +3,7 @@ mod server; pub use resume_token::ResumeToken; -pub use server::{EventStore, InterestStore, Server}; +pub use server::{EventInsertResult, EventStore, InterestStore, Server}; #[cfg(test)] mod tests; diff --git a/api/src/server.rs b/api/src/server.rs index 2ce7d297a..5289f51de 100644 --- a/api/src/server.rs +++ b/api/src/server.rs @@ -6,6 +6,7 @@ mod event; +use std::collections::HashMap; use std::time::Duration; use std::{future::Future, ops::Range}; use std::{marker::PhantomData, ops::RangeBounds}; @@ -162,11 +163,28 @@ impl InterestStore for Arc { } } +#[derive(Debug, Clone)] +pub struct EventInsertResult { + id: EventId, + // if set, the reason this event couldn't be inserted + failed: Option, +} + +impl EventInsertResult { + pub fn new(id: EventId, failed: Option) -> Self { + Self { id, failed } + } + + pub fn success(&self) -> bool { + self.failed.is_none() + } +} + /// Trait for accessing persistent storage of Events #[async_trait] pub trait EventStore: Send + Sync { /// Returns (new_key, new_value) where true if was newly inserted, false if it already existed. - async fn insert_many(&self, items: &[(EventId, Vec)]) -> Result>; + async fn insert_many(&self, items: &[(EventId, Vec)]) -> Result>; async fn range_with_values( &self, range: Range, @@ -199,7 +217,7 @@ pub trait EventStore: Send + Sync { #[async_trait::async_trait] impl EventStore for Arc { - async fn insert_many(&self, items: &[(EventId, Vec)]) -> Result> { + async fn insert_many(&self, items: &[(EventId, Vec)]) -> Result> { self.as_ref().insert_many(items).await } @@ -241,7 +259,7 @@ impl EventStore for Arc { struct EventInsert { id: EventId, data: Vec, - tx: tokio::sync::oneshot::Sender>, + tx: tokio::sync::oneshot::Sender>, } struct InsertTask { @@ -325,25 +343,35 @@ where if events.is_empty() { return; } - let mut oneshots = Vec::with_capacity(events.len()); + let mut oneshots = HashMap::with_capacity(events.len()); let mut items = Vec::with_capacity(events.len()); events.drain(..).for_each(|req: EventInsert| { - oneshots.push(req.tx); + oneshots.insert(req.id.to_bytes(), req.tx); items.push((req.id, req.data)); }); tracing::trace!("calling insert many with {} items.", items.len()); match event_store.insert_many(&items).await { Ok(results) => { tracing::debug!("insert many returned {} results.", results.len()); - for (tx, result) in oneshots.into_iter().zip(results.into_iter()) { - if let Err(e) = tx.send(Ok(result)) { - tracing::warn!("failed to send success response to api listener: {:?}", e); + for result in results { + if let Some(tx) = oneshots.remove(&result.id.to_bytes()) { + if let Err(e) = tx.send(Ok(result)) { + tracing::warn!( + "failed to send success response to api listener: {:?}", + e + ); + } + } else { + tracing::warn!( + "lost channel to respond to API listener for event ID: {:?}", + result.id + ); } } } Err(e) => { tracing::warn!("failed to insert events: {e}"); - for tx in oneshots.into_iter() { + for tx in oneshots.into_values() { if let Err(e) = tx.send(Err(anyhow::anyhow!("Failed to insert event: {e}"))) { tracing::warn!("failed to send failed response to api listener: {:?}", e); } @@ -495,7 +523,7 @@ where .await? .map_err(|_| ErrorResponse::new("Database service not available".to_owned()))?; - let _new = tokio::time::timeout(INSERT_REQUEST_TIMEOUT, rx) + let new = tokio::time::timeout(INSERT_REQUEST_TIMEOUT, rx) .await .map_err(|_| { ErrorResponse::new("Timeout waiting for database service response".to_owned()) @@ -503,7 +531,13 @@ where .map_err(|_| ErrorResponse::new("No response. Database service crashed".to_owned()))? .map_err(|e| ErrorResponse::new(format!("Failed to insert event: {e}")))?; - Ok(EventsPostResponse::Success) + if let Some(failed) = new.failed { + Ok(EventsPostResponse::BadRequest(BadRequestResponse::new( + failed, + ))) + } else { + Ok(EventsPostResponse::Success) + } } pub async fn post_interests( diff --git a/api/src/tests.rs b/api/src/tests.rs index b7837951b..0b1de6e33 100644 --- a/api/src/tests.rs +++ b/api/src/tests.rs @@ -5,6 +5,7 @@ use std::{ops::Range, str::FromStr, sync::Arc}; use crate::server::decode_multibase_data; use crate::server::BuildResponse; use crate::server::Server; +use crate::EventInsertResult; use crate::{EventStore, InterestStore}; use anyhow::Result; @@ -121,7 +122,7 @@ mock! { pub EventStoreTest {} #[async_trait] impl EventStore for EventStoreTest { - async fn insert_many(&self, items: &[(EventId, Vec)]) -> Result>; + async fn insert_many(&self, items: &[(EventId, Vec)]) -> Result>; async fn range_with_values( &self, range: Range, @@ -198,7 +199,12 @@ async fn create_event() { .expect_insert_many() .with(predicate::eq(args)) .times(1) - .returning(|_| Ok(vec![true])); + .returning(|input| { + Ok(input + .iter() + .map(|(id, _)| EventInsertResult::new(id.clone(), None)) + .collect()) + }); let server = Server::new(peer_id, network, mock_interest, Arc::new(mock_event_store)); let resp = server .events_post( @@ -211,6 +217,51 @@ async fn create_event() { .unwrap(); assert!(matches!(resp, EventsPostResponse::Success)); } + +#[tokio::test] +async fn create_event_fails() { + let peer_id = PeerId::random(); + let network = Network::Mainnet; + let expected_event_id = EventId::try_from(hex::decode(DATA_EVENT_ID).unwrap()).unwrap(); + + // Remove whitespace from event CAR file + let event_data = DATA_EVENT_CAR + .chars() + .filter(|c| !c.is_whitespace()) + .collect::(); + let mock_interest = MockAccessInterestStoreTest::new(); + let mut mock_event_store = MockEventStoreTest::new(); + mock_get_init_event(&mut mock_event_store); + let args = vec![( + expected_event_id.clone(), + decode_multibase_data(&event_data).unwrap(), + )]; + + mock_event_store + .expect_insert_many() + .with(predicate::eq(args)) + .times(1) + .returning(|input| { + Ok(input + .iter() + .map(|(id, _)| { + EventInsertResult::new(id.clone(), Some("Event is missing prev".to_string())) + }) + .collect()) + }); + let server = Server::new(peer_id, network, mock_interest, Arc::new(mock_event_store)); + let resp = server + .events_post( + models::EventData { + data: event_data.to_string(), + }, + &Context, + ) + .await + .unwrap(); + assert!(matches!(resp, EventsPostResponse::BadRequest(_))); +} + #[tokio::test] #[traced_test] async fn register_interest_sort_value() { diff --git a/service/Cargo.toml b/service/Cargo.toml index bb65f422f..b68033ba9 100644 --- a/service/Cargo.toml +++ b/service/Cargo.toml @@ -18,7 +18,6 @@ ceramic-store.workspace = true cid.workspace = true hex.workspace = true ipld-core.workspace = true -serde_ipld_dagcbor.workspace = true iroh-bitswap.workspace = true multihash-codetable.workspace = true recon.workspace = true @@ -32,8 +31,8 @@ ipld-core.workspace = true multibase.workspace = true paste = "1.0" rand.workspace = true -serde.workspace = true serde_ipld_dagcbor.workspace = true +serde.workspace = true test-log.workspace = true tmpdir.workspace = true tokio.workspace = true diff --git a/service/src/event/mod.rs b/service/src/event/mod.rs index f7dce573a..69e12b26c 100644 --- a/service/src/event/mod.rs +++ b/service/src/event/mod.rs @@ -1,3 +1,4 @@ +mod order_events; mod ordering_task; mod service; mod store; diff --git a/service/src/event/order_events.rs b/service/src/event/order_events.rs new file mode 100644 index 000000000..ae4c70e72 --- /dev/null +++ b/service/src/event/order_events.rs @@ -0,0 +1,102 @@ +use std::collections::HashSet; + +use ceramic_store::{CeramicOneEvent, EventInsertable, SqlitePool}; +use cid::Cid; + +use crate::Result; + +pub(crate) struct OrderEvents { + pub(crate) deliverable: Vec, + pub(crate) missing_history: Vec, +} + +impl OrderEvents { + /// Groups the events into lists those with a delivered prev and those without. This can be used to return an error if the event is required to have history. + /// The events will be marked as deliverable so that they can be passed directly to the store to be persisted. + pub async fn try_new( + pool: &SqlitePool, + mut candidate_events: Vec, + ) -> Result { + // move all the init events to the front so we make sure to add them first and get the deliverable order correct + let new_cids: HashSet = HashSet::from_iter(candidate_events.iter().map(|e| e.cid())); + let mut deliverable = Vec::with_capacity(candidate_events.len()); + candidate_events.retain(|e| { + if e.deliverable() { + deliverable.push(e.clone()); + false + } else { + true + } + }); + if candidate_events.is_empty() { + return Ok(OrderEvents { + deliverable, + missing_history: Vec::new(), + }); + } + + let mut prevs_in_memory = Vec::with_capacity(candidate_events.len()); + let mut missing_history = Vec::with_capacity(candidate_events.len()); + + while let Some(mut event) = candidate_events.pop() { + match &event.prev() { + None => { + unreachable!("Init events should have been filtered out since they're always deliverable"); + } + Some(prev) => { + if new_cids.contains(prev) { + prevs_in_memory.push(event.clone()); + continue; + } else { + let (_exists, prev_deliverable) = + CeramicOneEvent::deliverable_by_cid(pool, prev).await?; + if prev_deliverable { + event.set_deliverable(true); + deliverable.push(event); + } else { + // technically, we may have the "rosetta stone" event in memory that could unlock this chain, if we loaded everything and recursed, + // but the immediate prev is not in this set and has not been delivered to the client yet, so they shouldn't have known how to + // construct this event so we'll consider this missing history. This can be used to return an error if the event is required to have history. + missing_history.push(event); + } + } + } + } + } + + // We add the events to the deliverable list until nothing changes. + // It should be a small set and it will shrink each loop, so continually looping is acceptable. + loop { + let mut made_changes = false; + while let Some(mut event) = prevs_in_memory.pop() { + match &event.prev() { + None => { + unreachable!( + "Init events should have been filtered out of the in memory set" + ); + } + Some(prev) => { + // a hashset would be better loopkup but we're not going to have that many events so hashing + // for a handful of lookups and then convert back to a vec probably isn't worth it. + if deliverable.iter().any(|e| e.cid() == *prev) { + event.set_deliverable(true); + deliverable.push(event); + made_changes = true; + } else { + prevs_in_memory.push(event); + } + } + } + } + if !made_changes { + missing_history.extend(prevs_in_memory); + break; + } + } + + Ok(OrderEvents { + deliverable, + missing_history, + }) + } +} diff --git a/service/src/event/ordering_task.rs b/service/src/event/ordering_task.rs index a1e13f204..6398e04d1 100644 --- a/service/src/event/ordering_task.rs +++ b/service/src/event/ordering_task.rs @@ -1,73 +1,21 @@ use std::collections::{HashMap, HashSet, VecDeque}; -use anyhow::anyhow; -use ceramic_store::{CeramicOneEvent, SqlitePool}; +use ceramic_store::{ + CeramicOneEvent, CeramicOneStream, InsertedEvent, SqlitePool, StreamEventMetadata, +}; use cid::Cid; use tracing::{debug, error, info, trace, warn}; -use crate::{CeramicEventService, Error, Result}; +use crate::{Error, Result}; -/// How many events to select at once to see if they've become deliverable when we have downtime -/// Used at startup and occassionally in case we ever dropped something -/// We keep the number small for now as we may need to traverse many prevs for each one of these and load them into memory. -const DELIVERABLE_EVENTS_BATCH_SIZE: usize = 1000; -/// How many batches of undelivered events are we willing to process on start up? -/// To avoid an infinite loop. It's going to take a long time to process `DELIVERABLE_EVENTS_BATCH_SIZE * MAX_ITERATIONS` events -const MAX_ITERATIONS: usize = 100_000_000; - -/// How often should we try to process all undelivered events in case we missed something -const CHECK_ALL_INTERVAL_SECONDS: u64 = 60 * 10; // 10 minutes - -type InitCid = cid::Cid; -type PrevCid = cid::Cid; -type EventCid = cid::Cid; - -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct DeliveredEvent { - pub(crate) cid: Cid, - pub(crate) init_cid: InitCid, -} - -impl DeliveredEvent { - pub fn new(cid: Cid, init_cid: InitCid) -> Self { - Self { cid, init_cid } - } -} - -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub(crate) struct DeliverableMetadata { - pub(crate) init_cid: InitCid, - pub(crate) prev: PrevCid, -} - -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct DeliverableEvent { - pub(crate) cid: EventCid, - pub(crate) meta: DeliverableMetadata, - attempts: usize, - last_attempt: std::time::Instant, - started: std::time::Instant, - expires: Option, -} - -impl DeliverableEvent { - pub fn new(cid: Cid, meta: DeliverableMetadata, expires: Option) -> Self { - Self { - cid, - meta, - attempts: 0, - last_attempt: std::time::Instant::now(), - started: std::time::Instant::now(), - expires, - } - } -} +type StreamCid = Cid; +type EventCid = Cid; +type PrevCid = Cid; #[derive(Debug)] pub struct DeliverableTask { pub(crate) _handle: tokio::task::JoinHandle<()>, - pub(crate) tx: tokio::sync::mpsc::Sender, - pub(crate) tx_new: tokio::sync::mpsc::Sender, + pub(crate) tx_delivered: tokio::sync::mpsc::Sender, } #[derive(Debug)] @@ -75,30 +23,27 @@ pub struct OrderingTask {} impl OrderingTask { pub async fn run(pool: SqlitePool, q_depth: usize, load_delivered: bool) -> DeliverableTask { - let (tx, rx) = tokio::sync::mpsc::channel::(q_depth); - let (tx_new, rx_new) = tokio::sync::mpsc::channel::(q_depth); + let (tx_delivered, rx_delivered) = tokio::sync::mpsc::channel::(q_depth); let handle = - tokio::spawn(async move { Self::run_loop(pool, load_delivered, rx, rx_new).await }); + tokio::spawn(async move { Self::run_loop(pool, load_delivered, rx_delivered).await }); DeliverableTask { _handle: handle, - tx, - tx_new, + tx_delivered, } } async fn run_loop( pool: SqlitePool, load_undelivered: bool, - mut rx: tokio::sync::mpsc::Receiver, - mut rx_new: tokio::sync::mpsc::Receiver, + mut rx_delivered: tokio::sync::mpsc::Receiver, ) { // before starting, make sure we've updated any events in the database we missed let mut state = OrderingState::new(); if load_undelivered && state - .process_all_undelivered_events(&pool, MAX_ITERATIONS) + .process_all_undelivered_events(&pool) .await .map_err(Self::log_error) .is_err() @@ -106,53 +51,38 @@ impl OrderingTask { return; } - let mut last_processed = std::time::Instant::now(); loop { - let mut modified: Option> = None; - let mut need_prev_buf = Vec::with_capacity(100); - let mut newly_added_buf = Vec::with_capacity(100); - - tokio::select! { - incoming = rx.recv_many(&mut need_prev_buf, 100) => { - if incoming > 0 { - modified = Some(state.add_incoming_batch(need_prev_buf)); - } - } - new = rx_new.recv_many(&mut newly_added_buf, 100) => { - if new > 0 { - modified = Some(newly_added_buf.into_iter().map(|ev| ev.init_cid).collect::>()); - } + let mut delivered_events = Vec::with_capacity(100); + + if rx_delivered.recv_many(&mut delivered_events, 100).await > 0 { + debug!(?delivered_events, "new delivered events!"); + for event in delivered_events { + state.add_stream(event.stream_cid); } - else => { - info!(stream_count=%state.pending_by_stream.len(), "Server dropped the ordering task. Processing once more before exiting..."); - let _ = state - .process_events(&pool, None) - .await - .map_err(Self::log_error); + + if state + .process_streams(&pool) + .await + .map_err(Self::log_error) + .is_err() + { return; } - }; - // Given the math on OrderingState and the generally low number of updates to streams, we are going - // to ignore pruning until there's more of an indication that it's necessary. Just log some stats. - if last_processed.elapsed().as_secs() > CHECK_ALL_INTERVAL_SECONDS { - let stream_count = state.pending_by_stream.len(); - if stream_count > 1000 { - info!(%stream_count, "Over 1000 pending streams without recent updates."); - } else { - debug!(%stream_count, "Fewer than 1000 streams pending without recent updates."); - } - } + } else if rx_delivered.is_closed() { + debug!( + "Server dropped the delivered events channel. Attempting to processing streams in memory once more before exiting." + ); - if modified.is_some() - && state - .process_events(&pool, modified) + if state + .process_streams(&pool) .await .map_err(Self::log_error) .is_err() - { - return; + { + return; + } + break; } - last_processed = std::time::Instant::now(); } } @@ -175,599 +105,294 @@ impl OrderingTask { } } -#[derive(Debug)] -/// Rough size estimate: -/// pending_by_stream: 96 * stream_cnt + 540 * event_cnt -/// ready_events: 96 * ready_event_cnt -/// so for stream_cnt = 1000, event_cnt = 2, ready_event_cnt = 1000 -/// we get about 1 MB of memory used. -pub struct OrderingState { - /// Map of undelivered events by init CID (i.e. the stream CID). - pending_by_stream: HashMap, - /// Queue of events that can be marked ready to deliver. - /// Can be added as long as their prev is stored or in this list ahead of them. - ready_events: VecDeque, -} - #[derive(Debug, Clone, Default)] /// ~540 bytes per event in this struct pub(crate) struct StreamEvents { + /// Map of `event.prev` to `event.cid` for quick lookup of the next event in the stream. prev_map: HashMap, - cid_map: HashMap, + /// Map of `event.cid` to `metadata` for quick lookup of the event metadata. + cid_map: HashMap, + /// Events that can be delivered FIFO order for the stream + deliverable: VecDeque, + /// The total number of events in the stream when we started + total_events: usize, } -impl FromIterator for StreamEvents { - fn from_iter>(iter: T) -> Self { - let mut stream = Self::new(); - for item in iter { - stream.add_event(item); +impl StreamEvents { + fn new(_cid: StreamCid, events: I) -> Self + where + I: ExactSizeIterator, + { + let total_events = events.len(); + let mut new = Self { + prev_map: HashMap::with_capacity(total_events), + cid_map: HashMap::with_capacity(total_events), + deliverable: VecDeque::with_capacity(total_events), + total_events, + }; + + for event in events { + new.add_event(event); } - stream + new } -} -impl StreamEvents { - pub fn new() -> Self { - Self::default() + async fn new_from_db(stream: StreamCid, pool: &SqlitePool) -> Result { + let stream_events = CeramicOneStream::load_stream_events(pool, stream).await?; + trace!(?stream_events, "Loaded stream events for ordering"); + Ok(Self::new(stream, stream_events.into_iter())) } - /// returns Some(Stream Init CID) if this is a new event, else None. - pub fn add_event(&mut self, event: DeliverableEvent) -> Option { - let res = if self.prev_map.insert(event.meta.prev, event.cid).is_none() { - Some(event.meta.init_cid) - } else { - None - }; - self.cid_map.insert(event.cid, event); - res + fn is_empty(&self) -> bool { + // We only care if we have things that are pending to be delivered + self.prev_map.is_empty() } - pub fn is_empty(&self) -> bool { - // these should always match - self.prev_map.is_empty() && self.cid_map.is_empty() + /// returns true if this is a new event. + fn add_event(&mut self, event: StreamEventMetadata) -> bool { + if let Some(prev) = event.prev { + self.prev_map.insert(prev, event.cid); + } + self.cid_map.insert(event.cid, event).is_none() } - fn remove_by_event_cid(&mut self, cid: &Cid) -> Option { - if let Some(cid) = self.cid_map.remove(cid) { - self.prev_map.remove(&cid.meta.prev); - Some(cid) + fn remove_by_event_cid(&mut self, cid: &Cid) -> Option { + if let Some(ev) = self.cid_map.remove(cid) { + if let Some(prev) = ev.prev { + self.prev_map.remove(&prev); + } + Some(ev) } else { None } } - fn remove_by_prev_cid(&mut self, cid: &Cid) -> Option { - if let Some(cid) = self.prev_map.remove(cid) { + fn remove_by_prev_cid(&mut self, prev: &Cid) -> Option { + if let Some(cid) = self.prev_map.remove(prev) { self.cid_map.remove(&cid); Some(cid) } else { None } } -} -impl OrderingState { - pub fn new() -> Self { - Self { - pending_by_stream: HashMap::new(), - ready_events: VecDeque::new(), - } + fn delivered_events(&self) -> impl Iterator { + self.cid_map + .iter() + .filter_map(|(cid, event)| if event.deliverable { Some(cid) } else { None }) } - /// This will review all the events for any streams known to have undelivered events and see if any of them are now deliverable. - /// If `streams_to_process` is None, all streams will be processed, otherwise only the streams in the set will be processed. - /// Processing all streams could take a long time and not necessarily do anything productive (if we're missing a key event, we're still blocked). - /// However, passing a value for `streams_to_process` when we know something has changed is likely to have positive results and be much faster. - pub(crate) async fn process_events( - &mut self, - pool: &SqlitePool, - streams_to_process: Option>, - ) -> Result<()> { - self.persist_ready_events(pool).await?; - for (cid, stream_events) in self.pending_by_stream.iter_mut() { - if streams_to_process - .as_ref() - .map_or(false, |to_do| !to_do.contains(cid)) - { - continue; - } - let deliverable = Self::discover_deliverable_events(pool, stream_events).await?; - if !deliverable.is_empty() { - self.ready_events.extend(deliverable) - } - } - if !self.ready_events.is_empty() { - self.persist_ready_events(pool).await?; + async fn order_events(pool: &SqlitePool, stream: StreamCid) -> Result { + let mut to_process = Self::new_from_db(stream, pool).await?; + if to_process.delivered_events().count() == 0 { + return Ok(to_process); } - Ok(()) - } + let stream_event_count = to_process.cid_map.len(); + let delivered_cids = to_process.delivered_events().cloned().collect::>(); + let mut start_with = VecDeque::with_capacity(stream_event_count - delivered_cids.len()); - /// Removes deliverable events from the `prev_map` and returns them. This means prev is already delivered or in the - /// list to be marked as delivered. The input is expected to be a list of CIDs for a given stream that are waiting - /// to be processed. It will still work if it's intermixed for multiple streams, but it's not the most efficient way to use it. - /// The returned CIDs in the VeqDeque are for events that are expected to be updated FIFO i.e. vec.pop_front() - /// - /// This breaks with multi-prev as we expect a single prev for each event. The input map is expected to contain the - /// (prev <- event) relationship (that is, the value is the event that depends on the key). - pub(crate) async fn discover_deliverable_events( - pool: &SqlitePool, - stream_map: &mut StreamEvents, - ) -> Result> { - if stream_map.is_empty() { - return Ok(VecDeque::new()); + for cid in delivered_cids { + if let Some(next_event) = to_process.remove_by_prev_cid(&cid) { + to_process.remove_by_event_cid(&next_event); + start_with.push_back(next_event); + } } - let mut deliverable = VecDeque::new(); - let prev_map_cln = stream_map.prev_map.clone(); - for (prev, ev_cid) in prev_map_cln { - if stream_map.cid_map.contains_key(&prev) { - trace!( - ?prev, - cid=?ev_cid, - "Found event that depends on another event in memory" - ); - // we have it in memory so we need to order it related to others to insert correctly - // although it may not be possible if the chain just goes back to some unknown event - // once we find the first event that's deliverable, we can go back through and find the rest - continue; - } else { - let (exists, delivered) = CeramicOneEvent::delivered_by_cid(pool, &prev).await?; - if delivered { - trace!(deliverable=?ev_cid, "Found delivered prev in database. Adding to ready list"); - deliverable.push_back(ev_cid); - stream_map.remove_by_event_cid(&ev_cid); - } else if exists { - trace!("Found undelivered prev in database. Building data to check for deliverable."); - // if it's not in memory, we need to read it from the db and parse it for the prev value to add it to our set - let data = CeramicOneEvent::value_by_cid(pool, &prev) - .await? - .ok_or_else(|| { - Error::new_app(anyhow!( - "Missing data for event that exists should be impossible" - )) - })?; - let (insertable_body, maybe_prev) = - CeramicEventService::parse_event_carfile(prev, &data).await?; - - if let Some(prev) = maybe_prev { - let event = DeliverableEvent::new(insertable_body.cid, prev, None); - trace!(cid=%event.cid, "Adding event discovered in database to stream pending list"); - stream_map.add_event(event); - } else { - warn!(event_cid=%insertable_body.cid,"Found undelivered event with no prev while processing pending. Should not happen."); - deliverable.push_back(insertable_body.cid); - stream_map.remove_by_event_cid(&ev_cid); - } - } else { - trace!( - ?ev_cid, - "Found event that depends on unknown event. Will check later." - ); - } + while let Some(new_tip) = start_with.pop_front() { + to_process.deliverable.push_back(new_tip); + let mut tip = new_tip; + while let Some(next_event) = to_process.remove_by_prev_cid(&tip) { + to_process.deliverable.push_back(next_event); + tip = next_event; } } - let mut newly_ready = deliverable.clone(); - while let Some(cid) = newly_ready.pop_front() { - if let Some(now_ready_ev) = stream_map.remove_by_prev_cid(&cid) { - deliverable.push_back(now_ready_ev); - newly_ready.push_back(now_ready_ev); - } + Ok(to_process) + } +} + +#[derive(Debug)] +pub struct OrderingState { + streams: HashSet, + deliverable: VecDeque, +} + +impl OrderingState { + fn new() -> Self { + Self { + streams: HashSet::new(), + deliverable: VecDeque::new(), } - debug!(?deliverable, "deliverable events discovered"); + } - Ok(deliverable) + /// Add a stream to the list of streams to process. This implies it has undelivered events and is worthwhile to attempt. + fn add_stream(&mut self, stream: StreamCid) -> bool { + self.streams.insert(stream) } - /// Process all undelivered events in the database. This is a blocking operation that could take a long time. - /// It is intended to be run at startup but could be used on an interval or after some errors to recover. - pub(crate) async fn process_all_undelivered_events( - &mut self, - pool: &SqlitePool, - max_iterations: usize, - ) -> Result<()> { - let mut cnt = 0; - let mut offset: usize = 0; - while cnt < max_iterations { - cnt += 1; - let (new, found) = self - .add_undelivered_batch(pool, offset, DELIVERABLE_EVENTS_BATCH_SIZE) - .await?; - if new == 0 { - break; - } else { - // We can start processing and we'll follow the stream history if we have it. In that case, we either arrive - // at the beginning and mark them all delivered, or we find a gap and stop processing and leave them in memory. - // In this case, we won't discover them until we start running recon with a peer, so maybe we should drop them - // or otherwise mark them ignored somehow. - self.process_events(pool, None).await?; - if new < DELIVERABLE_EVENTS_BATCH_SIZE { - break; + /// Process every stream we know about that has undelivered events that should be "unlocked" now. This could be adjusted to process commit things in batches, + /// but for now it assumes it can process all the streams and events in one go. It should be idempotent, so if it fails, it can be retried. Events that are + /// delivered multiple times will not change the original delivered state. + async fn process_streams(&mut self, pool: &SqlitePool) -> Result<()> { + let mut stream_cnt = HashMap::new(); + // we need to handle the fact that new writes can come in without knowing they're deliverable because we're still in the process of updating them. + // so when we finish the loop and we had streams we couldn't complete, we try again to see if they had new writes. if nothing changed we exit. + // this could certainly be optimized. we could only query the count of events, or we could load undelivered events and keep track of our + // total state, as anything forking that was deliverable would arrive on the incoming channel. for now, streams are short and this is probably sufficient. + loop { + let mut processed_streams = Vec::with_capacity(self.streams.len()); + for stream in &self.streams { + let ordered_events = StreamEvents::order_events(pool, *stream).await?; + stream_cnt.insert(*stream, ordered_events.total_events); + if ordered_events.is_empty() { + processed_streams.push(*stream); } - offset = offset.saturating_add(found); - } - if cnt >= max_iterations { - warn!(batch_size=DELIVERABLE_EVENTS_BATCH_SIZE, iterations=%max_iterations, "Exceeded max iterations for finding undelivered events!"); - break; + self.deliverable.extend(ordered_events.deliverable); } - } - if self.ready_events.is_empty() { - Ok(()) - } else { - self.persist_ready_events(pool).await?; - Ok(()) - } - } - /// Add a batch of events from the database to the pending list to be processed. - /// Returns the (#events new events found , #events returned by query) - async fn add_undelivered_batch( - &mut self, - pool: &SqlitePool, - offset: usize, - limit: usize, - ) -> Result<(usize, usize)> { - let undelivered = CeramicOneEvent::undelivered_with_values(pool, offset, limit).await?; - trace!(count=%undelivered.len(), "Found undelivered events to process"); - if undelivered.is_empty() { - return Ok((0, 0)); - } - let found = undelivered.len(); - let mut new = 0; - for (key, data) in undelivered { - let event_cid = key.cid().ok_or_else(|| { - Error::new_invalid_arg(anyhow::anyhow!("EventID is missing a CID: {}", key)) - })?; - let (insertable_body, maybe_prev) = - CeramicEventService::parse_event_carfile(event_cid, &data).await?; - if let Some(prev) = maybe_prev { - let event = DeliverableEvent::new(insertable_body.cid, prev, None); - if self.track_pending(event).is_some() { - new += 1; + let found_events = !self.deliverable.is_empty(); + if found_events { + tracing::debug!(count=%self.deliverable.len(), "Marking events as ready to deliver"); + let mut tx = pool.begin_tx().await?; + // We process the ready events as a FIFO queue so they are marked delivered before events that were added after and depend on them. + // Could use `pop_front` but we want to make sure we commit and then clear everything at once. + for cid in &self.deliverable { + CeramicOneEvent::mark_ready_to_deliver(&mut tx, cid).await?; } - } else { - // safe to ignore in tests, shows up because when we mark init events as undelivered even though they don't have a prev - info!(event_cid=%insertable_body.cid, "Found undelivered event with no prev while processing undelivered. Should not happen. Likely means events were dropped before."); - self.ready_events.push_back(insertable_body.cid); - new += 1; // we treat this as new since it might unlock something else but it's not actually going in our queue is it's a bit odd + tx.commit().await?; + self.deliverable.clear(); } - } - trace!(%new, %found, "Adding undelivered events to pending set"); - Ok((new, found)) - } - - fn add_incoming_batch(&mut self, events: Vec) -> HashSet { - let mut updated_streams = HashSet::with_capacity(events.len()); - for event in events { - if let Some(updated_stream) = self.track_pending(event) { - updated_streams.insert(updated_stream); + self.streams + .retain(|stream| !processed_streams.contains(stream)); + // not strictly necessary as the next loop will not do anything but we can avoid allocating + if self.streams.is_empty() || !found_events { + break; } + + debug!(stream_state=?self, ?processed_streams, "Finished processing streams loop with more to do"); } - updated_streams - } + debug!(stream_state=?self, "Finished processing streams"); - /// returns the init event CID (stream CID) if this is a new event - fn track_pending(&mut self, event: DeliverableEvent) -> Option { - self.pending_by_stream - .entry(event.meta.init_cid) - .or_default() - .add_event(event) + Ok(()) } - /// Modify all the events that are ready to be marked as delivered. - - /// We should improve the error handling and likely add some batching if the number of ready events is very high. - /// We copy the events up front to avoid losing any events if the task is cancelled. - async fn persist_ready_events(&mut self, pool: &SqlitePool) -> Result<()> { - if !self.ready_events.is_empty() { - let mut to_process = self.ready_events.clone(); // to avoid cancel loss - tracing::debug!(count=%self.ready_events.len(), "Marking events as ready to deliver"); - let mut tx = pool.begin_tx().await?; - - // We process the ready events as a FIFO queue so they are marked delivered before events - // that were added after and depend on them. - while let Some(cid) = to_process.pop_front() { - CeramicOneEvent::mark_ready_to_deliver(&mut tx, &cid).await?; + /// Processes all streams with undelivered events returning the total number of streams identified. This is a recursive function that will + /// continue to process streams until it finds no more streams with undelivered events. This is useful for bootstrapping the ordering task + /// in case we missed/dropped something in the past. Anything we can't process now requires discovering from a peer, so there isn't really + /// any advantage to keeping it in memory and trying again later. + async fn process_all_undelivered_events(&mut self, pool: &SqlitePool) -> Result { + tracing::trace!("Processing all undelivered events for ordering"); + + let mut streams_discovered = 0; + let mut resume_at = Some(0); + while let Some(highwater_mark) = resume_at { + let (cids, hw_mark) = + CeramicOneStream::load_stream_cids_with_undelivered_events(pool, highwater_mark) + .await?; + resume_at = hw_mark; + trace!(count=cids.len(), stream_cids=?cids, "Discovered streams with undelivered events"); + for cid in cids { + if self.add_stream(cid) { + streams_discovered += 1; + } } - tx.commit().await?; - self.ready_events.clear(); // safe to clear since we are past any await points and hold exclusive access + self.process_streams(pool).await?; } - Ok(()) + + Ok(streams_discovered) } } #[cfg(test)] mod test { use ceramic_store::EventInsertable; - use multihash_codetable::{Code, MultihashDigest}; - use recon::ReconItem; - use crate::tests::{build_event, check_deliverable, random_block, TestEventInfo}; + use crate::tests::get_n_events; use super::*; - /// these events are init events so they should have been delivered - /// need to build with data events that have the prev stored already - async fn build_insertable_undelivered() -> EventInsertable { - let TestEventInfo { - event_id: id, car, .. - } = build_event().await; - let cid = id.cid().unwrap(); - - let (body, _meta) = CeramicEventService::parse_event_carfile(cid, &car) - .await - .unwrap(); - assert!(!body.deliverable); - EventInsertable::try_new(id, body).unwrap() - } - - fn assert_stream_map_elems(map: &StreamEvents, size: usize) { - assert_eq!(size, map.cid_map.len(), "{:?}", map); - assert_eq!(size, map.prev_map.len(), "{:?}", map); - } - - fn build_linked_events( - number: usize, - stream_cid: Cid, - first_prev: Cid, - ) -> Vec { - let mut events = Vec::with_capacity(number); - - let first_cid = random_block().cid; - events.push(DeliverableEvent::new( - first_cid, - DeliverableMetadata { - init_cid: stream_cid, - prev: first_prev, - }, - None, - )); - - for i in 1..number { - let random = random_block(); - let ev = DeliverableEvent::new( - random.cid, - DeliverableMetadata { - init_cid: stream_cid, - prev: events[i - 1].cid, - }, - None, - ); - events.push(ev); - } - - events - } - #[tokio::test] - async fn test_none_deliverable_without_first() { - // they events all point to the one before but A has never been delivered so we can't do anything - let stream_cid = Cid::new_v1(0x71, Code::Sha2_256.digest(b"arbitrary")); - let missing = Cid::new_v1(0x71, Code::Sha2_256.digest(b"missing")); - let events = build_linked_events(4, stream_cid, missing); - let mut prev_map = StreamEvents::from_iter(events); - + async fn test_undelivered_batch_empty() { + let _ = ceramic_metrics::init_local_tracing(); let pool = SqlitePool::connect_in_memory().await.unwrap(); - - let deliverable = super::OrderingState::discover_deliverable_events(&pool, &mut prev_map) + let total = OrderingState::new() + .process_all_undelivered_events(&pool) .await .unwrap(); - - assert_eq!(0, deliverable.len()); + assert_eq!(0, total); } #[tokio::test] - async fn test_all_deliverable_one_stream() { + async fn test_undelivered_streams_all() { let _ = ceramic_metrics::init_local_tracing(); - let TestEventInfo { - event_id: one_id, - car: one_car, - .. - } = build_event().await; - let one_cid = one_id.cid().unwrap(); - let store = CeramicEventService::new(SqlitePool::connect_in_memory().await.unwrap()) - .await - .unwrap(); - recon::Store::insert(&store, &ReconItem::new(&one_id, &one_car)) - .await - .unwrap(); - - check_deliverable(&store.pool, &one_cid, true).await; - - let stream_cid = Cid::new_v1(0x71, Code::Sha2_256.digest(b"arbitrary")); - - let events = build_linked_events(4, stream_cid, one_cid); - let expected = VecDeque::from_iter(events.iter().map(|ev| ev.cid)); - let mut prev_map = StreamEvents::from_iter(events); + let pool = SqlitePool::connect_in_memory().await.unwrap(); + let s1_events = get_n_events(3).await; + let s2_events = get_n_events(5).await; + let mut all_insertable = Vec::with_capacity(8); - assert_stream_map_elems(&prev_map, 4); - let deliverable = - super::OrderingState::discover_deliverable_events(&store.pool, &mut prev_map) + for event in s1_events.iter() { + let insertable = EventInsertable::try_new(event.0.to_owned(), &event.1) .await .unwrap(); + let expected_deliverable = insertable.deliverable(); + let res = CeramicOneEvent::insert_many(&pool, &[insertable.clone()]) + .await + .unwrap(); + assert_eq!(expected_deliverable, res.inserted[0].deliverable); - assert_eq!(4, deliverable.len()); - assert_eq!(expected, deliverable); - assert_stream_map_elems(&prev_map, 0); - } - - #[tokio::test] - async fn test_some_deliverable_one_stream() { - let _ = ceramic_metrics::init_local_tracing(); - let TestEventInfo { - event_id: one_id, - car: one_car, - .. - } = build_event().await; - let one_cid = one_id.cid().unwrap(); - let store = CeramicEventService::new(SqlitePool::connect_in_memory().await.unwrap()) - .await - .unwrap(); - recon::Store::insert(&store, &ReconItem::new(&one_id, &one_car)) - .await - .unwrap(); - - check_deliverable(&store.pool, &one_cid, true).await; - - let stream_cid = Cid::new_v1(0x71, Code::Sha2_256.digest(b"arbitrary")); - let missing = Cid::new_v1(0x71, Code::Sha2_256.digest(b"missing")); - - let mut deliverable_events = build_linked_events(6, stream_cid, one_cid); - let stuck_events = build_linked_events(8, stream_cid, missing); - let expected = VecDeque::from_iter(deliverable_events.iter().map(|ev| ev.cid)); - deliverable_events.extend(stuck_events); - let mut prev_map = StreamEvents::from_iter(deliverable_events); - - assert_stream_map_elems(&prev_map, 14); - let deliverable = - super::OrderingState::discover_deliverable_events(&store.pool, &mut prev_map) + all_insertable.push(insertable); + } + for event in s2_events.iter() { + let insertable = EventInsertable::try_new(event.0.to_owned(), &event.1) .await .unwrap(); + let expected_deliverable = insertable.deliverable(); + let res = CeramicOneEvent::insert_many(&pool, &[insertable.clone()]) + .await + .unwrap(); + assert_eq!(expected_deliverable, res.inserted[0].deliverable); - assert_eq!(6, deliverable.len()); - assert_eq!(expected, deliverable); - assert_stream_map_elems(&prev_map, 8); - } + all_insertable.push(insertable); + } - #[tokio::test] - // expected to be per stream but all events are combined for the history required version currently so - // this needs to work as well - async fn test_all_deliverable_multiple_streams() { - let _ = ceramic_metrics::init_local_tracing(); - let TestEventInfo { - event_id: one_id, - car: one_car, - .. - } = build_event().await; - let TestEventInfo { - event_id: two_id, - car: two_car, - .. - } = build_event().await; - let one_cid = one_id.cid().unwrap(); - let two_cid = two_id.cid().unwrap(); - let store = CeramicEventService::new(SqlitePool::connect_in_memory().await.unwrap()) - .await - .unwrap(); - recon::Store::insert_many( - &store, - &[ - ReconItem::new(&one_id, &one_car), - ReconItem::new(&two_id, &two_car), - ], - ) - .await - .unwrap(); - - check_deliverable(&store.pool, &one_cid, true).await; - check_deliverable(&store.pool, &two_cid, true).await; - - let stream_cid = Cid::new_v1(0x71, Code::Sha2_256.digest(b"arbitrary-one")); - let stream_cid_2 = Cid::new_v1(0x71, Code::Sha2_256.digest(b"arbitrary-two")); - - let mut events_a = build_linked_events(4, stream_cid, one_cid); - let mut events_b = build_linked_events(10, stream_cid_2, two_cid); - let expected_a = VecDeque::from_iter(events_a.iter().map(|ev| ev.cid)); - let expected_b = VecDeque::from_iter(events_b.iter().map(|ev| ev.cid)); - // we expect the events to be in the prev chain order, but they can be intervleaved across streams - // we reverse the items in the input to proov this (it's a hashmap internally so there is no order, but still) - events_a.reverse(); - events_b.reverse(); - events_a.extend(events_b); - assert_eq!(14, events_a.len()); - let mut prev_map = StreamEvents::from_iter(events_a); - - assert_stream_map_elems(&prev_map, 14); - let deliverable = - super::OrderingState::discover_deliverable_events(&store.pool, &mut prev_map) + for event in &all_insertable { + let (_exists, delivered) = CeramicOneEvent::deliverable_by_cid(&pool, &event.cid()) .await .unwrap(); - - assert_eq!(14, deliverable.len()); - assert_eq!(0, prev_map.cid_map.len(), "{:?}", prev_map); - assert_eq!(0, prev_map.prev_map.len(), "{:?}", prev_map); - - let mut split_a = VecDeque::new(); - let mut split_b = VecDeque::new(); - for cid in deliverable { - if expected_a.contains(&cid) { - split_a.push_back(cid); - } else if expected_b.contains(&cid) { - split_b.push_back(cid); + // init events are always delivered and the others should have been skipped + if event.cid() == event.stream_cid() + || event.order_key == s1_events[0].0 + || event.order_key == s2_events[0].0 + { + assert!( + delivered, + "Event {:?} was not delivered. init={:?}, s1={:?}, s2={:?}", + event.cid(), + event.stream_cid(), + s1_events + .iter() + .map(|(e, _)| e.cid().unwrap()) + .collect::>(), + s2_events + .iter() + .map(|(e, _)| e.cid().unwrap()) + .collect::>(), + ); } else { - panic!("Unexpected CID in deliverable list: {:?}", cid); + assert!(!delivered); } } - - assert_eq!(expected_a, split_a); - assert_eq!(expected_b, split_b); - } - - #[tokio::test] - async fn test_undelivered_batch_empty() { - let _ = ceramic_metrics::init_local_tracing(); - let pool = SqlitePool::connect_in_memory().await.unwrap(); - let (new, found) = OrderingState::new() - .add_undelivered_batch(&pool, 0, 10) - .await - .unwrap(); - assert_eq!(0, new); - assert_eq!(0, found); - } - - #[tokio::test] - async fn test_undelivered_batch_offset() { - let _ = ceramic_metrics::init_local_tracing(); - let pool = SqlitePool::connect_in_memory().await.unwrap(); - let insertable = build_insertable_undelivered().await; - - let _new = CeramicOneEvent::insert_many(&pool, &[insertable]) + let total = OrderingState::new() + .process_all_undelivered_events(&pool) .await .unwrap(); - let mut state = OrderingState::new(); - let (new, found) = state.add_undelivered_batch(&pool, 0, 10).await.unwrap(); - assert_eq!(1, found); - assert_eq!(1, new); - let (new, found) = state.add_undelivered_batch(&pool, 10, 10).await.unwrap(); - assert_eq!(0, new); - assert_eq!(0, found); - state.persist_ready_events(&pool).await.unwrap(); - let (new, found) = state.add_undelivered_batch(&pool, 0, 10).await.unwrap(); - assert_eq!(0, new); - assert_eq!(0, found); - } - - #[tokio::test] - async fn test_undelivered_batch_all() { - let _ = ceramic_metrics::init_local_tracing(); - - let pool = SqlitePool::connect_in_memory().await.unwrap(); - let mut undelivered = Vec::with_capacity(10); - for _ in 0..10 { - let insertable = build_insertable_undelivered().await; - undelivered.push(insertable); + assert_eq!(2, total); + for event in &all_insertable { + let (_exists, delivered) = CeramicOneEvent::deliverable_by_cid(&pool, &event.cid()) + .await + .unwrap(); + assert!(delivered); } - - let (hw, event) = CeramicOneEvent::new_events_since_value(&pool, 0, 1000) - .await - .unwrap(); - assert_eq!(0, hw); - assert!(event.is_empty()); - - let _new = CeramicOneEvent::insert_many(&pool, &undelivered[..]) - .await - .unwrap(); - - let mut state = OrderingState::new(); - state - .process_all_undelivered_events(&pool, 1) - .await - .unwrap(); - - let (_hw, event) = CeramicOneEvent::new_events_since_value(&pool, 0, 1000) - .await - .unwrap(); - assert_eq!(event.len(), 10); } } diff --git a/service/src/event/service.rs b/service/src/event/service.rs index daddd3531..296e8e30f 100644 --- a/service/src/event/service.rs +++ b/service/src/event/service.rs @@ -1,16 +1,10 @@ -use std::collections::{HashMap, HashSet}; - use ceramic_core::EventId; -use ceramic_event::unvalidated; -use ceramic_store::{CeramicOneEvent, EventInsertable, EventInsertableBody, SqlitePool}; -use cid::Cid; -use ipld_core::ipld::Ipld; -use recon::{InsertResult, ReconItem}; +use ceramic_store::{CeramicOneEvent, EventInsertable, SqlitePool}; use tracing::{trace, warn}; -use super::ordering_task::{ - DeliverableEvent, DeliverableMetadata, DeliverableTask, DeliveredEvent, OrderingState, - OrderingTask, StreamEvents, +use super::{ + order_events::OrderEvents, + ordering_task::{DeliverableTask, OrderingTask}, }; use crate::{Error, Result}; @@ -41,7 +35,7 @@ impl CeramicEventService { } /// Skip loading all undelivered events from the database on startup (for testing) - #[allow(dead_code)] // used in tests + #[cfg(test)] pub(crate) async fn new_without_undelivered(pool: SqlitePool) -> Result { CeramicOneEvent::init_delivered_order(&pool).await?; @@ -72,71 +66,16 @@ impl CeramicEventService { Ok(()) } - /// This function is used to parse the event from the carfile and return the insertable event and the previous cid pointer. - /// Probably belongs in the event crate. - pub(crate) async fn parse_event_carfile( - event_cid: cid::Cid, - carfile: &[u8], - ) -> Result<(EventInsertableBody, Option)> { - let insertable = EventInsertableBody::try_from_carfile(event_cid, carfile).await?; - let ev_block = insertable.block_for_cid(&insertable.cid)?; - - trace!(count=%insertable.blocks.len(), cid=%event_cid, "parsing event blocks"); - let event_ipld: unvalidated::RawEvent = - serde_ipld_dagcbor::from_slice(&ev_block.bytes).map_err(|e| { - Error::new_invalid_arg( - anyhow::anyhow!(e).context("event block is not valid event format"), - ) - })?; - - let maybe_init_prev = match event_ipld { - unvalidated::RawEvent::Time(t) => Some((t.id(), t.prev())), - unvalidated::RawEvent::Signed(signed) => { - let link = signed.link().ok_or_else(|| { - Error::new_invalid_arg(anyhow::anyhow!("event should have a link")) - })?; - let link = insertable.block_for_cid(&link).map_err(|e| { - Error::new_invalid_arg( - anyhow::anyhow!(e).context("prev CID missing from carfile"), - ) - })?; - let payload: unvalidated::Payload = - serde_ipld_dagcbor::from_slice(&link.bytes).map_err(|e| { - Error::new_invalid_arg( - anyhow::anyhow!(e).context("Failed to follow event link"), - ) - })?; - - match payload { - unvalidated::Payload::Data(d) => Some((*d.id(), *d.prev())), - unvalidated::Payload::Init(_init) => None, - } - } - unvalidated::RawEvent::Unsigned(_init) => None, - }; - let meta = maybe_init_prev.map(|(cid, prev)| DeliverableMetadata { - init_cid: cid, - prev, - }); - Ok((insertable, meta)) - } - #[tracing::instrument(skip(self, items), level = tracing::Level::DEBUG, fields(items = items.len()))] /// This function is used to insert events from a carfile requiring that the history is local to the node. /// This is likely used in API contexts when a user is trying to insert events. Events discovered from /// peers can come in any order and we will discover the prev chain over time. Use /// `insert_events_from_carfiles_remote_history` for that case. - pub(crate) async fn insert_events_from_carfiles_local_history<'a>( + pub(crate) async fn insert_events_from_carfiles_local_api<'a>( &self, items: &[recon::ReconItem<'a, EventId>], - ) -> Result { - if items.is_empty() { - return Ok(InsertResult::default()); - } - - let ordering = - InsertEventOrdering::discover_deliverable_local_history(items, &self.pool).await?; - self.process_events(ordering).await + ) -> Result { + self.insert_events(items, true).await } #[tracing::instrument(skip(self, items), level = tracing::Level::DEBUG, fields(items = items.len()))] @@ -144,233 +83,110 @@ impl CeramicEventService { /// This is used in recon contexts when we are discovering events from peers in a recon but not ceramic order and /// don't have the complete order. To enforce that the history is local, e.g. in API contexts, use /// `insert_events_from_carfiles_local_history`. - pub(crate) async fn insert_events_from_carfiles_remote_history<'a>( + pub(crate) async fn insert_events_from_carfiles_recon<'a>( &self, items: &[recon::ReconItem<'a, EventId>], ) -> Result { - if items.is_empty() { - return Ok(InsertResult::default()); + let res = self.insert_events(items, false).await?; + let mut keys = vec![false; items.len()]; + // we need to put things back in the right order that the recon trait expects, even though we don't really care about the result + for (i, item) in items.iter().enumerate() { + let new_key = res + .store_result + .inserted + .iter() + .find(|e| e.order_key == *item.key) + .map_or(false, |e| e.new_key); // TODO: should we error if it's not in this set + keys[i] = new_key; } - - let ordering = InsertEventOrdering::discover_deliverable_remote_history(items).await?; - self.process_events(ordering).await + Ok(recon::InsertResult::new(keys)) } - async fn process_events(&self, ordering: InsertEventOrdering) -> Result { - let res = CeramicOneEvent::insert_many(&self.pool, &ordering.insert_now[..]).await?; - - for ev in ordering.background_task_deliverable { - trace!(cid=%ev.0, prev=%ev.1.prev, init=%ev.1.init_cid, "sending to delivery task"); - if let Err(e) = self - .delivery_task - .tx - .try_send(DeliverableEvent::new(ev.0, ev.1, None)) - { - match e { - tokio::sync::mpsc::error::TrySendError::Full(e) => { - // we should only be doing this during recon, in which case we can rediscover events. - // the delivery task will start picking up these events once it's drained since they are stored in the db - warn!(cid=%e.cid, meta=?e.meta, limit=%PENDING_EVENTS_CHANNEL_DEPTH, "Delivery task full. Dropping event and will not be able to mark deliverable until queue drains"); - } - tokio::sync::mpsc::error::TrySendError::Closed(_) => { - warn!("Delivery task closed. shutting down"); - return Err(Error::new_fatal(anyhow::anyhow!("Delivery task closed"))); - } - } - } - } - for new in ordering.notify_task_new { - if let Err(e) = self.delivery_task.tx_new.try_send(new) { - match e { - tokio::sync::mpsc::error::TrySendError::Full(ev) => { - // we should only be doing this during recon, in which case we can rediscover events. - // the delivery task will start picking up these events once it's drained since they are stored in the db - warn!(attempt=?ev, limit=%PENDING_EVENTS_CHANNEL_DEPTH, "Notify new task full"); - } - tokio::sync::mpsc::error::TrySendError::Closed(_) => { - warn!("Delivery task closed. shutting down"); - return Err(Error::new_fatal(anyhow::anyhow!("Delivery task closed"))); - } - } - } - } - Ok(res) - } -} - -struct InsertEventOrdering { - insert_now: Vec, - notify_task_new: Vec, - background_task_deliverable: HashMap, -} - -impl InsertEventOrdering { - /// This will mark events as deliverable if their prev exists locally. Otherwise they will be - /// sorted into the bucket for the background task to process. - pub(crate) async fn discover_deliverable_remote_history<'a>( - items: &[ReconItem<'a, EventId>], - ) -> Result { - let mut result = Self { - insert_now: Vec::with_capacity(items.len()), - notify_task_new: Vec::with_capacity(items.len()), - background_task_deliverable: HashMap::new(), - }; - - for item in items { - let (insertable, maybe_prev) = Self::parse_item(item).await?; - if let Some(meta) = maybe_prev { - result.mark_event_deliverable_later(insertable, meta); - } else { - let init_cid = insertable.body.cid; - result.mark_event_deliverable_now(insertable, init_cid); - } + async fn insert_events<'a>( + &self, + items: &[recon::ReconItem<'a, EventId>], + history_required: bool, + ) -> Result { + if items.is_empty() { + return Ok(InsertResult::default()); } - Ok(result) - } + let mut to_insert = Vec::with_capacity(items.len()); - /// This will error if any of the events doesn't have its prev on the local node (in the database/memory or in this batch). - pub(crate) async fn discover_deliverable_local_history<'a>( - items: &[ReconItem<'a, EventId>], - pool: &SqlitePool, - ) -> Result { - let mut result = Self { - insert_now: Vec::with_capacity(items.len()), - notify_task_new: Vec::with_capacity(items.len()), - background_task_deliverable: HashMap::new(), - }; - - let mut insert_after_history_check: Vec<(DeliverableMetadata, EventInsertable)> = - Vec::with_capacity(items.len()); - - for item in items { - let (insertable, maybe_prev) = Self::parse_item(item).await?; - if let Some(meta) = maybe_prev { - insert_after_history_check.push((meta, insertable)); - } else { - let init_cid = insertable.body.cid; - result.mark_event_deliverable_now(insertable, init_cid); - } + for event in items { + let insertable = EventInsertable::try_new(event.key.to_owned(), event.value).await?; + to_insert.push(insertable); } - trace!(local_events_checking=%insert_after_history_check.len(), "checking local history"); - result - .verify_history_inline(pool, insert_after_history_check) - .await?; - Ok(result) - } - - async fn parse_item<'a>( - item: &ReconItem<'a, EventId>, - ) -> Result<(EventInsertable, Option)> { - let cid = item.key.cid().ok_or_else(|| { - Error::new_invalid_arg(anyhow::anyhow!("EventID is missing a CID: {}", item.key)) - })?; - // we want to end a conversation if any of the events aren't ceramic events and not store them - // this includes making sure the key matched the body cid - let (insertable_body, maybe_prev) = - CeramicEventService::parse_event_carfile(cid, item.value).await?; - let insertable = EventInsertable::try_new(item.key.to_owned(), insertable_body)?; - Ok((insertable, maybe_prev)) - } - - fn mark_event_deliverable_later( - &mut self, - insertable: EventInsertable, - meta: DeliverableMetadata, - ) { - self.background_task_deliverable - .insert(insertable.body.cid, meta); - self.insert_now.push(insertable); - } - - fn mark_event_deliverable_now(&mut self, mut ev: EventInsertable, init_cid: Cid) { - ev.deliverable(true); - self.notify_task_new - .push(DeliveredEvent::new(ev.body.cid, init_cid)); - self.insert_now.push(ev); - } - - async fn verify_history_inline( - &mut self, - pool: &SqlitePool, - to_check: Vec<(DeliverableMetadata, EventInsertable)>, - ) -> Result<()> { - if to_check.is_empty() { - return Ok(()); - } + let ordered = OrderEvents::try_new(&self.pool, to_insert).await?; - let incoming_deliverable_cids: HashSet = self - .insert_now + let missing_history = ordered + .missing_history .iter() - .filter_map(|e| { - if e.body.deliverable { - Some(e.body.cid) - } else { - None - } - }) + .map(|e| e.order_key.clone()) .collect(); - // ideally, this map would be per stream, but we are just processing all of them together for now - let mut to_check_map = StreamEvents::new(); - - let required_to_find = to_check.len(); - let mut found_in_batch = 0; - let mut insert_if_greenlit = HashMap::with_capacity(required_to_find); - - for (meta, ev) in to_check { - if incoming_deliverable_cids.contains(&meta.prev) { - trace!(new=%ev.body.cid, prev=%meta.prev, "prev event being added in same batch"); - found_in_batch += 1; - self.mark_event_deliverable_now(ev, meta.init_cid); - } else { - trace!(new=%ev.body.cid, prev=%meta.prev, "will check for prev event in db"); + let to_insert = if history_required { + ordered.deliverable + } else { + ordered + .deliverable + .into_iter() + .chain(ordered.missing_history) + .collect() + }; - let _new = to_check_map.add_event(DeliverableEvent::new( - ev.body.cid, - meta.to_owned(), - None, - )); - insert_if_greenlit.insert(ev.body.cid, (meta, ev)); + let res = CeramicOneEvent::insert_many(&self.pool, &to_insert[..]).await?; + // api writes shouldn't have any missed pieces that need ordering so we don't send those + if !history_required { + for ev in &res.inserted { + if ev.deliverable { + trace!(event=?ev, "sending delivered to ordering task"); + if let Err(e) = self.delivery_task.tx_delivered.try_send(ev.clone()) { + match e { + tokio::sync::mpsc::error::TrySendError::Full(e) => { + // we should only be doing this during recon, in which case we can rediscover events. + // the delivery task will start picking up these events once it's drained since they are stored in the db + warn!(event=?e, limit=%PENDING_EVENTS_CHANNEL_DEPTH, "Delivery task full. Dropping event and will not be able to mark deliverable until queue drains"); + } + tokio::sync::mpsc::error::TrySendError::Closed(_) => { + warn!("Delivery task closed. shutting down"); + return Err(Error::new_fatal(anyhow::anyhow!( + "Delivery task closed" + ))); + } + } + } + } } } - if to_check_map.is_empty() { - return Ok(()); - } - - let deliverable = - OrderingState::discover_deliverable_events(pool, &mut to_check_map).await?; - if deliverable.len() != required_to_find - found_in_batch { - let missing = insert_if_greenlit - .values() - .filter_map(|(_, ev)| { - if !deliverable.contains(&ev.body.cid) { - Some(ev.body.cid) - } else { - None - } - }) - .collect::>(); + Ok(InsertResult { + store_result: res, + missing_history, + }) + } +} - tracing::info!(?missing, ?deliverable, "Missing required `prev` event CIDs"); +#[derive(Debug, PartialEq, Eq, Default)] +pub struct InsertResult { + pub(crate) store_result: ceramic_store::InsertResult, + pub(crate) missing_history: Vec, +} - Err(Error::new_invalid_arg(anyhow::anyhow!( - "Missing required `prev` event CIDs: {:?}", - missing - ))) - } else { - // we need to use the deliverable list's order because we might depend on something in the same batch, and that function will - // ensure we have a queue in the correct order. So we follow the order and use our insert_if_greenlit map to get the details. - for cid in deliverable { - if let Some((meta, insertable)) = insert_if_greenlit.remove(&cid) { - self.mark_event_deliverable_now(insertable, meta.init_cid); - } else { - warn!(%cid, "Didn't find event to insert in memory when it was expected"); - } - } - Ok(()) +impl From for Vec { + fn from(res: InsertResult) -> Self { + let mut api_res = + Vec::with_capacity(res.store_result.inserted.len() + res.missing_history.len()); + for ev in res.store_result.inserted { + api_res.push(ceramic_api::EventInsertResult::new(ev.order_key, None)); + } + for ev in res.missing_history { + api_res.push(ceramic_api::EventInsertResult::new( + ev, + Some("Failed to insert event as `prev` event was missing".to_owned()), + )); } + api_res } } diff --git a/service/src/event/store.rs b/service/src/event/store.rs index e61506bcf..fef6f4661 100644 --- a/service/src/event/store.rs +++ b/service/src/event/store.rs @@ -5,7 +5,7 @@ use ceramic_core::EventId; use ceramic_store::{CeramicOneBlock, CeramicOneEvent}; use cid::Cid; use iroh_bitswap::Block; -use recon::{HashCount, InsertResult, ReconItem, Result as ReconResult, Sha256a}; +use recon::{HashCount, ReconItem, Result as ReconResult, Sha256a}; use crate::event::CeramicEventService; @@ -16,7 +16,7 @@ impl recon::Store for CeramicEventService { async fn insert(&self, item: &ReconItem<'_, Self::Key>) -> ReconResult { let res = self - .insert_events_from_carfiles_remote_history(&[item.to_owned()]) + .insert_events_from_carfiles_recon(&[item.to_owned()]) .await?; Ok(res.keys.first().copied().unwrap_or(false)) @@ -25,10 +25,11 @@ impl recon::Store for CeramicEventService { /// Insert new keys into the key space. /// Returns true for each key if it did not previously exist, in the /// same order as the input iterator. - async fn insert_many(&self, items: &[ReconItem<'_, Self::Key>]) -> ReconResult { - let res = self - .insert_events_from_carfiles_remote_history(items) - .await?; + async fn insert_many( + &self, + items: &[ReconItem<'_, Self::Key>], + ) -> ReconResult { + let res = self.insert_events_from_carfiles_recon(items).await?; Ok(res) } @@ -106,15 +107,19 @@ impl iroh_bitswap::Store for CeramicEventService { #[async_trait::async_trait] impl ceramic_api::EventStore for CeramicEventService { - async fn insert_many(&self, items: &[(EventId, Vec)]) -> anyhow::Result> { + async fn insert_many( + &self, + items: &[(EventId, Vec)], + ) -> anyhow::Result> { let items = items .iter() .map(|(key, val)| ReconItem::new(key, val.as_slice())) .collect::>(); let res = self - .insert_events_from_carfiles_local_history(&items[..]) + .insert_events_from_carfiles_local_api(&items[..]) .await?; - Ok(res.keys) + + Ok(res.into()) } async fn range_with_values( diff --git a/service/src/tests/mod.rs b/service/src/tests/mod.rs index de36fb97a..25a29e424 100644 --- a/service/src/tests/mod.rs +++ b/service/src/tests/mod.rs @@ -114,22 +114,12 @@ fn gen_rand_bytes() -> [u8; SIZE] { arr } -pub(crate) fn random_block() -> Block { - let mut data = [0u8; 1024]; - rand::Rng::fill(&mut ::rand::thread_rng(), &mut data); - let hash = Code::Sha2_256.digest(&data); - Block { - cid: Cid::new_v1(0x00, hash), - data: data.to_vec().into(), - } -} - pub(crate) async fn check_deliverable( pool: &ceramic_store::SqlitePool, cid: &Cid, deliverable: bool, ) { - let (exists, delivered) = ceramic_store::CeramicOneEvent::delivered_by_cid(pool, cid) + let (exists, delivered) = ceramic_store::CeramicOneEvent::deliverable_by_cid(pool, cid) .await .unwrap(); assert!(exists); @@ -172,28 +162,13 @@ async fn data_event( signed::Event::from_payload(unvalidated::Payload::Data(commit), signer.to_owned()).unwrap() } -async fn get_events_with_model(model: &StreamId) -> [(EventId, Vec); 3] { +// returns init + N events +async fn get_init_plus_n_events_with_model( + model: &StreamId, + number: usize, +) -> Vec<(EventId, Vec)> { let signer = Box::new(signer().await); - let data = gen_rand_bytes::<50>(); - let data2 = gen_rand_bytes::<50>(); - - let data = ipld!({ - "radius": 1, - "red": 2, - "green": 3, - "blue": 4, - "raw": data.as_slice(), - }); - - let data2 = ipld!({ - "radius": 1, - "red": 2, - "green": 3, - "blue": 4, - "raw": data2.as_slice(), - }); - let init = init_event(model, &signer).await; let init_cid = init.envelope_cid(); let (event_id, car) = ( @@ -202,33 +177,45 @@ async fn get_events_with_model(model: &StreamId) -> [(EventId, Vec); 3] { ); let init_cid = event_id.cid().unwrap(); - let data = data_event(init_cid, init_cid, data, &signer).await; - let cid = data.envelope_cid(); - let (data_id, data_car) = ( - build_event_id(&data.envelope_cid(), &init_cid, model), - data.encode_car().await.unwrap(), - ); - let data2 = data_event(init_cid, cid, data2, &signer).await; - let (data_id_2, data_car_2) = ( - build_event_id(&data2.envelope_cid(), &init_cid, model), - data2.encode_car().await.unwrap(), - ); - [ - (event_id, car), - (data_id, data_car), - (data_id_2, data_car_2), - ] + let mut events = Vec::with_capacity(number); + events.push((event_id, car)); + let mut prev = init_cid; + for _ in 0..number { + let data = gen_rand_bytes::<50>(); + let data = ipld!({ + "radius": 1, + "red": 2, + "green": 3, + "blue": 4, + "raw": data.as_slice(), + }); + + let data = data_event(init_cid, prev, data, &signer).await; + let (data_id, data_car) = ( + build_event_id(&data.envelope_cid(), &init_cid, model), + data.encode_car().await.unwrap(), + ); + prev = data_id.cid().unwrap(); + events.push((data_id, data_car)); + } + events } -pub(crate) async fn get_events_return_model() -> (StreamId, [(EventId, Vec); 3]) { +pub(crate) async fn get_events_return_model() -> (StreamId, Vec<(EventId, Vec)>) { let model = StreamId::document(random_cid()); - let events = get_events_with_model(&model).await; + let events = get_init_plus_n_events_with_model(&model, 3).await; (model, events) } // builds init -> data -> data that are a stream (will be a different stream each call) -pub(crate) async fn get_events() -> [(EventId, Vec); 3] { +pub(crate) async fn get_events() -> Vec<(EventId, Vec)> { let model = StreamId::document(random_cid()); - get_events_with_model(&model).await + get_init_plus_n_events_with_model(&model, 3).await +} + +// Get N events with the same model (init + N-1 data events) +pub(crate) async fn get_n_events(number: usize) -> Vec<(EventId, Vec)> { + let model = &StreamId::document(random_cid()); + get_init_plus_n_events_with_model(model, number - 1).await } diff --git a/service/src/tests/ordering.rs b/service/src/tests/ordering.rs index ddcbc7f23..61caae825 100644 --- a/service/src/tests/ordering.rs +++ b/service/src/tests/ordering.rs @@ -1,5 +1,7 @@ use ceramic_api::EventStore; use ceramic_core::EventId; +use rand::seq::SliceRandom; +use rand::thread_rng; use recon::ReconItem; use crate::{ @@ -12,14 +14,16 @@ async fn setup_service() -> CeramicEventService { let conn = ceramic_store::SqlitePool::connect_in_memory() .await .unwrap(); + CeramicEventService::new_without_undelivered(conn) .await .unwrap() } async fn add_and_assert_new_recon_event(store: &CeramicEventService, item: ReconItem<'_, EventId>) { + tracing::trace!("inserted event: {}", item.key.cid().unwrap()); let new = store - .insert_events_from_carfiles_remote_history(&[item]) + .insert_events_from_carfiles_recon(&[item]) .await .unwrap(); let new = new.keys.into_iter().filter(|k| *k).count(); @@ -28,10 +32,10 @@ async fn add_and_assert_new_recon_event(store: &CeramicEventService, item: Recon async fn add_and_assert_new_local_event(store: &CeramicEventService, item: ReconItem<'_, EventId>) { let new = store - .insert_events_from_carfiles_local_history(&[item]) + .insert_events_from_carfiles_local_api(&[item]) .await .unwrap(); - let new = new.keys.into_iter().filter(|k| *k).count(); + let new = new.store_result.count_new_keys(); assert_eq!(1, new); } @@ -45,30 +49,17 @@ async fn test_init_event_delivered() { } #[tokio::test] -async fn test_missing_prev_error_history_required() { +async fn test_missing_prev_history_required_not_inserted() { let store = setup_service().await; let events = get_events().await; let data = &events[1]; let new = store - .insert_events_from_carfiles_local_history(&[ReconItem::new(&data.0, &data.1)]) - .await; - match new { - Ok(v) => panic!("should have errored: {:?}", v), - Err(e) => { - match e { - crate::Error::InvalidArgument { error } => { - // yes fragile, but we want to make sure it's not a parsing error or something unexpected - assert!(error - .to_string() - .contains("Missing required `prev` event CIDs")); - } - e => { - panic!("unexpected error: {:?}", e); - } - }; - } - }; + .insert_events_from_carfiles_local_api(&[ReconItem::new(&data.0, &data.1)]) + .await + .unwrap(); + assert!(new.store_result.inserted.is_empty()); + assert_eq!(1, new.missing_history.len()); } #[tokio::test] @@ -100,13 +91,13 @@ async fn test_prev_in_same_write_history_required() { let init: &(EventId, Vec) = &events[0]; let data = &events[1]; let new = store - .insert_events_from_carfiles_local_history(&[ + .insert_events_from_carfiles_local_api(&[ ReconItem::new(&data.0, &data.1), ReconItem::new(&init.0, &init.1), ]) .await .unwrap(); - let new = new.keys.into_iter().filter(|k| *k).count(); + let new = new.store_result.count_new_keys(); assert_eq!(2, new); check_deliverable(&store.pool, &init.0.cid().unwrap(), true).await; check_deliverable(&store.pool, &data.0.cid().unwrap(), true).await; @@ -178,7 +169,6 @@ async fn missing_prev_pending_recon_should_deliver_without_stream_update() { // now we add the second event, it should quickly become deliverable let data = &events[1]; add_and_assert_new_recon_event(&store, ReconItem::new(&data.0, &data.1)).await; - check_deliverable(&store.pool, &data.0.cid().unwrap(), false).await; // This happens out of band, so give it a moment to make sure everything is updated tokio::time::sleep(std::time::Duration::from_millis(100)).await; @@ -234,7 +224,7 @@ async fn multiple_streams_missing_prev_recon_should_deliver_without_stream_updat // this _could_ be deliverable immediately if we checked but for now we just send to the other task, // so `check_deliverable` could return true or false depending on timing (but probably false). // as this is an implementation detail and we'd prefer true, we just use HW ordering to make sure it's been delivered - tokio::time::sleep(std::time::Duration::from_millis(100)).await; + tokio::time::sleep(std::time::Duration::from_secs(1)).await; let (_, delivered) = store .events_since_highwater_mark(0, i64::MAX) .await @@ -266,3 +256,96 @@ async fn multiple_streams_missing_prev_recon_should_deliver_without_stream_updat ]; assert_eq!(expected, delivered); } + +async fn validate_all_delivered(store: &CeramicEventService, expected_delivered: usize) { + loop { + let (_, delivered) = store + .events_since_highwater_mark(0, i64::MAX) + .await + .unwrap(); + let total = delivered.len(); + if total < expected_delivered { + tracing::trace!( + "found {} delivered, waiting for {}", + total, + expected_delivered + ); + tokio::time::sleep(std::time::Duration::from_millis(500)).await; + } else { + break; + } + } +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn recon_lots_of_streams() { + // adds 100 events to 10 streams, mixes up the event order for each stream, inserts half + // the events for each stream before mixing up the stream order and inserting the rest + let per_stream = 100; + let num_streams = 10; + let store = setup_service().await; + let mut streams = Vec::new(); + let mut all_cids = Vec::new(); + let expected = per_stream * num_streams; + for _ in 0..num_streams { + let mut events = crate::tests::get_n_events(per_stream).await; + let cids = events + .iter() + .map(|e| e.0.cid().unwrap()) + .collect::>(); + all_cids.extend(cids); + assert_eq!(per_stream, events.len()); + events.shuffle(&mut thread_rng()); + streams.push(events); + } + let mut total_added = 0; + + assert_eq!(expected, all_cids.len()); + tracing::debug!(?all_cids, "starting test"); + for stream in streams.iter_mut() { + while let Some(event) = stream.pop() { + if stream.len() > per_stream / 2 { + total_added += 1; + add_and_assert_new_recon_event(&store, ReconItem::new(&event.0, &event.1)).await; + } else { + total_added += 1; + add_and_assert_new_recon_event(&store, ReconItem::new(&event.0, &event.1)).await; + break; + } + } + } + streams.shuffle(&mut thread_rng()); + for stream in streams.iter_mut() { + while let Some(event) = stream.pop() { + total_added += 1; + add_and_assert_new_recon_event(&store, ReconItem::new(&event.0, &event.1)).await; + } + } + // first just make sure they were all inserted (not delivered yet) + for (i, cid) in all_cids.iter().enumerate() { + let (exists, _delivered) = + ceramic_store::CeramicOneEvent::deliverable_by_cid(&store.pool, cid) + .await + .unwrap(); + assert!(exists, "idx: {}. missing cid: {}", i, cid); + } + + assert_eq!(expected, total_added); + tokio::time::timeout( + std::time::Duration::from_secs(5), + validate_all_delivered(&store, expected), + ) + .await + .unwrap(); + + let (_, delivered) = store + .events_since_highwater_mark(0, i64::MAX) + .await + .unwrap(); + + assert_eq!(expected, delivered.len()); + // now we check that all the events are deliverable + for cid in all_cids.iter() { + check_deliverable(&store.pool, cid, true).await; + } +} diff --git a/store/Cargo.toml b/store/Cargo.toml index df193391d..35318fa86 100644 --- a/store/Cargo.toml +++ b/store/Cargo.toml @@ -13,19 +13,22 @@ anyhow.workspace = true async-trait.workspace = true ceramic-api.workspace = true ceramic-core.workspace = true +ceramic-event.workspace = true ceramic-metrics.workspace = true cid.workspace = true futures.workspace = true hex.workspace = true +ipld-core.workspace = true iroh-bitswap.workspace = true iroh-car.workspace = true itertools = "0.12.0" -multihash.workspace = true multihash-codetable.workspace = true +multihash.workspace = true prometheus-client.workspace = true -thiserror.workspace = true recon.workspace = true +serde_ipld_dagcbor.workspace = true sqlx.workspace = true +thiserror.workspace = true tokio.workspace = true tracing.workspace = true diff --git a/store/src/lib.rs b/store/src/lib.rs index f76bdf598..8c419696a 100644 --- a/store/src/lib.rs +++ b/store/src/lib.rs @@ -12,8 +12,8 @@ pub use metrics::{Metrics, StoreMetricsMiddleware}; pub use migration::DataMigrator; pub use sql::{ entities::EventInsertable, entities::EventInsertableBody, CeramicOneBlock, CeramicOneEvent, - CeramicOneEventBlock, CeramicOneInterest, Migrations, SqlitePool, SqliteRootStore, - SqliteTransaction, + CeramicOneEventBlock, CeramicOneInterest, CeramicOneStream, InsertResult, InsertedEvent, + Migrations, SqlitePool, SqliteRootStore, SqliteTransaction, StreamEventMetadata, }; pub(crate) type Result = std::result::Result; diff --git a/store/src/metrics.rs b/store/src/metrics.rs index 855d4e960..18fbfbf78 100644 --- a/store/src/metrics.rs +++ b/store/src/metrics.rs @@ -13,7 +13,7 @@ use prometheus_client::{ }, registry::Registry, }; -use recon::{AssociativeHash, HashCount, InsertResult, ReconItem, Result as ReconResult}; +use recon::{AssociativeHash, HashCount, ReconItem, Result as ReconResult}; use tokio::time::Instant; #[derive(Clone, Debug, Hash, PartialEq, Eq)] @@ -158,7 +158,10 @@ impl ceramic_api::EventStore for StoreMetricsMiddleware where S: ceramic_api::EventStore, { - async fn insert_many(&self, items: &[(EventId, Vec)]) -> anyhow::Result> { + async fn insert_many( + &self, + items: &[(EventId, Vec)], + ) -> anyhow::Result> { let new_keys = StoreMetricsMiddleware::::record( &self.metrics, "api_insert_many", @@ -166,7 +169,7 @@ where ) .await?; - let key_cnt = new_keys.iter().filter(|k| **k).count(); + let key_cnt = new_keys.iter().filter(|k| k.success()).count(); self.metrics.record(&InsertEvent { cnt: key_cnt as u64, @@ -253,7 +256,7 @@ where Ok(new) } - async fn insert_many(&self, items: &[ReconItem<'_, K>]) -> ReconResult { + async fn insert_many(&self, items: &[ReconItem<'_, K>]) -> ReconResult { let res = StoreMetricsMiddleware::::record( &self.metrics, "insert_many", diff --git a/store/src/sql/access/event.rs b/store/src/sql/access/event.rs index 88014e1c9..4c19fe3ad 100644 --- a/store/src/sql/access/event.rs +++ b/store/src/sql/access/event.rs @@ -7,33 +7,93 @@ use std::{ use anyhow::anyhow; use ceramic_core::{event_id::InvalidEventId, EventId}; use cid::Cid; -use recon::{AssociativeHash, HashCount, InsertResult, Key, Result as ReconResult, Sha256a}; + +use recon::{AssociativeHash, HashCount, Key, Result as ReconResult, Sha256a}; use crate::{ sql::{ entities::{ - rebuild_car, BlockRow, CountRow, DeliveredEvent, EventInsertable, OrderKey, - ReconEventBlockRaw, ReconHash, + rebuild_car, BlockRow, CountRow, DeliveredEventRow, EventHeader, EventInsertable, + OrderKey, ReconEventBlockRaw, ReconHash, StreamCid, }, query::{EventQuery, ReconQuery, ReconType, SqlBackend}, sqlite::SqliteTransaction, }, - CeramicOneBlock, CeramicOneEventBlock, Error, Result, SqlitePool, + CeramicOneBlock, CeramicOneEventBlock, CeramicOneStream, Error, Result, SqlitePool, }; static GLOBAL_COUNTER: AtomicI64 = AtomicI64::new(0); +#[derive(Debug, Clone, PartialEq, Eq)] +/// An event that was inserted into the database +pub struct InsertedEvent { + /// The event order key that was inserted + pub order_key: EventId, + /// The Stream CID + pub stream_cid: StreamCid, + /// Whether the event was marked as deliverable + pub deliverable: bool, + /// Whether the event was a new key + pub new_key: bool, +} + +impl InsertedEvent { + /// Create a new delivered event + fn new(order_key: EventId, new_key: bool, stream_cid: StreamCid, deliverable: bool) -> Self { + Self { + order_key, + stream_cid, + deliverable, + new_key, + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Default)] +/// The result of inserting events into the database +pub struct InsertResult { + /// The events that were marked as delivered in this batch + pub inserted: Vec, +} + +impl InsertResult { + /// The count of new keys added in this batch + pub fn count_new_keys(&self) -> usize { + self.inserted.iter().filter(|e| e.new_key).count() + } +} + +impl InsertResult { + fn new(inserted: Vec) -> Self { + Self { inserted } + } +} + /// Access to the ceramic event table and related logic pub struct CeramicOneEvent {} impl CeramicOneEvent { - async fn insert_key(tx: &mut SqliteTransaction<'_>, key: &EventId) -> Result { + fn next_deliverable() -> i64 { + GLOBAL_COUNTER.fetch_add(1, Ordering::SeqCst) + } + + /// Insert the event and its hash into the ceramic_one_event table + async fn insert_event( + tx: &mut SqliteTransaction<'_>, + key: &EventId, + deliverable: bool, + ) -> Result { let id = key.as_bytes(); let cid = key .cid() .map(|cid| cid.to_bytes()) .ok_or_else(|| Error::new_app(anyhow!("Event CID is required")))?; let hash = Sha256a::digest(key); + let delivered: Option = if deliverable { + Some(Self::next_deliverable()) + } else { + None + }; let resp = sqlx::query(ReconQuery::insert_event()) .bind(id) @@ -46,6 +106,7 @@ impl CeramicOneEvent { .bind(hash.as_u32s()[5]) .bind(hash.as_u32s()[6]) .bind(hash.as_u32s()[7]) + .bind(delivered) .execute(&mut **tx.inner()) .await; @@ -87,7 +148,7 @@ impl CeramicOneEvent { pub async fn mark_ready_to_deliver(conn: &mut SqliteTransaction<'_>, key: &Cid) -> Result<()> { // Fetch add happens with an open transaction (on one writer for the db) so we're guaranteed to get a unique value sqlx::query(EventQuery::mark_ready_to_deliver()) - .bind(GLOBAL_COUNTER.fetch_add(1, Ordering::SeqCst)) + .bind(Self::next_deliverable()) .bind(&key.to_bytes()) .execute(&mut **conn.inner()) .await?; @@ -95,33 +156,51 @@ impl CeramicOneEvent { Ok(()) } - /// Insert many events into the database. This is the main function to use when storing events. + /// Insert many events into the database. The events and their blocks and metadata are inserted in a single + /// transaction and either all successful or rolled back. + /// + /// IMPORTANT: + /// It is the caller's responsibility to order events marked deliverable correctly. + /// That is, events will be processed in the order they are given so earlier events are given a lower global ordering + /// and will be returned earlier in the feed. Events can be intereaved with different streams, but if two events + /// depend on each other, the `prev` must come first in the list to ensure the correct order for indexers and consumers. pub async fn insert_many( pool: &SqlitePool, to_add: &[EventInsertable], ) -> Result { - let mut new_keys = vec![false; to_add.len()]; + let mut inserted = Vec::with_capacity(to_add.len()); let mut tx = pool.begin_tx().await.map_err(Error::from)?; - for (idx, item) in to_add.iter().enumerate() { - let new_key = Self::insert_key(&mut tx, &item.order_key).await?; + for item in to_add { + let new_key = + Self::insert_event(&mut tx, &item.order_key, item.body.deliverable).await?; + inserted.push(InsertedEvent::new( + item.order_key.clone(), + new_key, + item.stream_cid(), + item.body.deliverable, + )); + // the insert failed so we didn't mark it as deliverable.. is this possible? + if item.body.deliverable && !new_key { + Self::mark_ready_to_deliver(&mut tx, &item.cid()).await?; + } if new_key { for block in item.body.blocks.iter() { CeramicOneBlock::insert(&mut tx, block.multihash.inner(), &block.bytes).await?; CeramicOneEventBlock::insert(&mut tx, block).await?; } + if let EventHeader::Init { header, .. } = &item.body.header { + CeramicOneStream::insert_tx(&mut tx, item.stream_cid(), header).await?; + } + + CeramicOneStream::insert_event_header_tx(&mut tx, &item.body.header).await?; } - if item.body.deliverable { - Self::mark_ready_to_deliver(&mut tx, &item.body.cid).await?; - } - new_keys[idx] = new_key; } tx.commit().await.map_err(Error::from)?; - let res = InsertResult::new(new_keys); + let res = InsertResult::new(inserted); Ok(res) } - /// Find events that haven't been delivered to the client and may be ready pub async fn undelivered_with_values( pool: &SqlitePool, @@ -220,13 +299,13 @@ impl CeramicOneEvent { delivered: i64, limit: i64, ) -> Result<(i64, Vec)> { - let rows: Vec = sqlx::query_as(EventQuery::new_delivered_events()) + let rows: Vec = sqlx::query_as(EventQuery::new_delivered_events()) .bind(delivered) .bind(limit) .fetch_all(pool.reader()) .await?; - DeliveredEvent::parse_query_results(delivered, rows) + DeliveredEventRow::parse_query_results(delivered, rows) } /// Finds the event data by a given EventId i.e. "order key". @@ -248,8 +327,9 @@ impl CeramicOneEvent { } /// Finds if an event exists and has been previously delivered, meaning anything that depends on it can be delivered. - /// (bool, bool) = (exists, delivered) - pub async fn delivered_by_cid(pool: &SqlitePool, key: &Cid) -> Result<(bool, bool)> { + /// returns (bool, bool) = (exists, deliverable) + /// We don't guarantee that a client has seen the event, just that it's been marked as deliverable and they could. + pub async fn deliverable_by_cid(pool: &SqlitePool, key: &Cid) -> Result<(bool, bool)> { #[derive(sqlx::FromRow)] struct CidExists { exists: bool, diff --git a/store/src/sql/access/mod.rs b/store/src/sql/access/mod.rs index 912a462ac..bd6eefc3c 100644 --- a/store/src/sql/access/mod.rs +++ b/store/src/sql/access/mod.rs @@ -2,8 +2,10 @@ mod block; mod event; mod event_block; mod interest; +mod stream; pub use block::CeramicOneBlock; -pub use event::CeramicOneEvent; +pub use event::{CeramicOneEvent, InsertResult, InsertedEvent}; pub use event_block::CeramicOneEventBlock; pub use interest::CeramicOneInterest; +pub use stream::{CeramicOneStream, StreamEventMetadata}; diff --git a/store/src/sql/access/stream.rs b/store/src/sql/access/stream.rs new file mode 100644 index 000000000..2197ee69e --- /dev/null +++ b/store/src/sql/access/stream.rs @@ -0,0 +1,151 @@ +use anyhow::anyhow; +use cid::Cid; + +use crate::{ + sql::entities::{ + EventHeader, EventMetadataRow, EventType, IncompleteStream, StreamCid, StreamEventRow, + StreamRow, + }, + Error, Result, SqlitePool, SqliteTransaction, +}; + +/// Access to the stream and related tables. Generally querying events as a stream. +pub struct CeramicOneStream {} + +#[derive(Debug, Clone, PartialEq, Eq)] +/// Represents a stream event in a way that allows ordering it in the stream. It is metadata and not the event payload itself. +pub struct StreamEventMetadata { + /// The event CID + pub cid: Cid, + /// The previous event CID + pub prev: Option, + /// Whether the event is deliverable + pub deliverable: bool, +} + +impl TryFrom for StreamEventMetadata { + type Error = crate::Error; + + fn try_from(row: StreamEventRow) -> std::result::Result { + let cid = Cid::try_from(row.cid) + .map_err(|e| Error::new_app(anyhow!("Invalid event cid: {}", e)))?; + let prev = row + .prev + .map(Cid::try_from) + .transpose() + .map_err(|e| Error::new_app(anyhow!("Invalid event prev: {}", e)))?; + Ok(Self { + cid, + prev, + deliverable: row.deliverable, + }) + } +} + +impl CeramicOneStream { + /// Load the events for a given stream. Will return nothing if the stream does not exist (i.e. the init event is undiscovered). + pub async fn load_stream_events( + pool: &SqlitePool, + stream_cid: StreamCid, + ) -> Result> { + let rows: Vec<(Vec, Option>, bool)> = + sqlx::query_as(StreamEventRow::fetch_by_stream_cid()) + .bind(stream_cid.to_bytes()) + .fetch_all(pool.reader()) + .await?; + + let res = rows + .into_iter() + .map(|(cid, prev, delivered)| { + let cid = Cid::try_from(cid).expect("cid"); + let prev = prev.map(Cid::try_from).transpose().expect("prev"); + + StreamEventMetadata { + cid, + prev, + deliverable: delivered, + } + }) + .collect(); + + Ok(res) + } + + /// Load streams with undelivered events to see if they need to be delivered now. + /// highwater_mark is the i64 processed that you want to start after. + /// Start with `0` to start at the beginning. Will return None if there are no more streams to process. + pub async fn load_stream_cids_with_undelivered_events( + pool: &SqlitePool, + highwater_mark: i64, + ) -> Result<(Vec, Option)> { + let streams: Vec = + sqlx::query_as(IncompleteStream::fetch_all_with_undelivered()) + .bind(highwater_mark) + .bind(100) + .fetch_all(pool.reader()) + .await?; + + let row_id = streams.iter().map(|s| s.row_id).max(); + let streams = streams.into_iter().map(|s| s.stream_cid).collect(); + Ok((streams, row_id)) + } + + pub(crate) async fn insert_tx( + tx: &mut SqliteTransaction<'_>, + stream_cid: StreamCid, + header: &ceramic_event::unvalidated::init::Header, + ) -> Result<()> { + let _resp = sqlx::query(StreamRow::insert()) + .bind(stream_cid.to_bytes()) + .bind(header.sep()) + .bind(header.model()) + .fetch_one(&mut **tx.inner()) + .await?; + + Ok(()) + } + + pub(crate) async fn insert_event_header_tx( + tx: &mut SqliteTransaction<'_>, + header: &EventHeader, + ) -> Result<()> { + let (cid, event_type, stream_cid, prev) = match header { + EventHeader::Init { cid, .. } => ( + cid.to_bytes(), + EventType::Init, + header.stream_cid().to_bytes(), + None, + ), + EventHeader::Data { + cid, + stream_cid, + prev, + } => ( + cid.to_bytes(), + EventType::Data, + stream_cid.to_bytes(), + Some(prev.to_bytes()), + ), + EventHeader::Time { + cid, + stream_cid, + prev, + } => ( + cid.to_bytes(), + EventType::Time, + stream_cid.to_bytes(), + Some(prev.to_bytes()), + ), + }; + + let _res = sqlx::query(EventMetadataRow::insert()) + .bind(cid) + .bind(stream_cid) + .bind(event_type) + .bind(prev) + .execute(&mut **tx.inner()) + .await?; + + Ok(()) + } +} diff --git a/store/src/sql/entities/event.rs b/store/src/sql/entities/event.rs index 32610eb12..442b9323b 100644 --- a/store/src/sql/entities/event.rs +++ b/store/src/sql/entities/event.rs @@ -1,6 +1,8 @@ use anyhow::anyhow; use ceramic_core::EventId; +use ceramic_event::unvalidated; use cid::Cid; +use ipld_core::ipld::Ipld; use iroh_car::{CarHeader, CarReader, CarWriter}; use std::collections::BTreeSet; @@ -10,6 +12,8 @@ use crate::{ Error, Result, }; +use super::{EventHeader, EventType}; + pub async fn rebuild_car(blocks: Vec) -> Result>> { if blocks.is_empty() { return Ok(None); @@ -50,20 +54,41 @@ pub struct EventInsertable { } impl EventInsertable { - /// Try to build the EventInsertable struct. Will error if the key and body don't match. - pub fn try_new(order_key: EventId, body: EventInsertableBody) -> Result { - if order_key.cid().as_ref() != Some(&body.cid) { - return Err(Error::new_app(anyhow!( - "Event ID and body CID do not match: {:?} != {:?}", - order_key.cid(), - body.cid - )))?; - } + /// Try to build the EventInsertable struct from a carfile. + pub async fn try_new(order_key: EventId, body: &[u8]) -> Result { + let cid = order_key.cid().ok_or_else(|| { + Error::new_invalid_arg(anyhow::anyhow!("EventID is missing a CID: {}", order_key)) + })?; + let body = EventInsertableBody::try_from_carfile(cid, body).await?; Ok(Self { order_key, body }) } - /// change the deliverable status of the event - pub fn deliverable(&mut self, deliverable: bool) { + /// Get the CID of the event + pub fn cid(&self) -> Cid { + self.body.cid + } + + /// Get the stream CID + pub fn stream_cid(&self) -> Cid { + self.body.header.stream_cid() + } + + /// Get the previous event CID if any + pub fn prev(&self) -> Option { + match &self.body.header { + EventHeader::Data { prev, .. } | EventHeader::Time { prev, .. } => Some(*prev), + EventHeader::Init { .. } => None, + } + } + + /// Whether this event is deliverable currently + pub fn deliverable(&self) -> bool { + self.body.deliverable + } + + /// Mark the event as deliverable. + /// This will be used when inserting the event to make sure the field is updated accordingly. + pub fn set_deliverable(&mut self, deliverable: bool) { self.body.deliverable = deliverable; } } @@ -72,24 +97,48 @@ impl EventInsertable { /// The type we use to insert events into the database pub struct EventInsertableBody { /// The event CID i.e. the root CID from the car file - pub cid: Cid, - /// Whether this event is deliverable to clients or is waiting for more data - pub deliverable: bool, + pub(crate) cid: Cid, + /// The event header data about the event type and stream + pub(crate) header: EventHeader, + /// Whether the event is deliverable i.e. it's prev has been delivered and the chain is continuous to an init event + pub(crate) deliverable: bool, /// The blocks of the event // could use a map but there aren't that many blocks per event (right?) - pub blocks: Vec, + pub(crate) blocks: Vec, } impl EventInsertableBody { /// Create a new EventInsertRaw struct. Deliverable is set to false by default. - pub fn new(cid: Cid, blocks: Vec) -> Self { + pub fn new( + cid: Cid, + header: EventHeader, + blocks: Vec, + deliverable: bool, + ) -> Self { Self { cid, - deliverable: false, + header, blocks, + deliverable, } } + /// Get the CID of the event + pub fn cid(&self) -> Cid { + self.cid + } + + /// Whether this event is immediately deliverable to clients or the history chain needs to be reviewed + /// false indicates it can be stored and delivered immediately + pub fn event_type(&self) -> EventType { + self.header.event_type() + } + + /// Get the blocks of the event + pub fn blocks(&self) -> &Vec { + &self.blocks + } + /// Find a block from the carfile for a given CID if it's included pub fn block_for_cid_opt(&self, cid: &Cid) -> Option<&EventBlockRaw> { self.blocks @@ -136,6 +185,73 @@ impl EventInsertableBody { blocks.push(ebr); idx += 1; } - Ok(Self::new(event_cid, blocks)) + + let ev_block = blocks + .iter() + .find(|b| b.cid() == event_cid) + .ok_or_else(|| { + Error::new_app(anyhow!( + "event block not found in car file: cid={}", + event_cid + )) + })?; + let event_ipld: unvalidated::RawEvent = + serde_ipld_dagcbor::from_slice(&ev_block.bytes).map_err(|e| { + Error::new_invalid_arg( + anyhow::anyhow!(e).context("event block is not valid event format"), + ) + })?; + + let cid = event_cid; + + let (deliverable, header) = match event_ipld { + unvalidated::RawEvent::Time(t) => ( + false, + EventHeader::Time { + cid, + stream_cid: t.id(), + prev: t.prev(), + }, + ), + unvalidated::RawEvent::Signed(signed) => { + let link = signed.link().ok_or_else(|| { + Error::new_invalid_arg(anyhow::anyhow!("event should have a link")) + })?; + let link = blocks.iter().find(|b| b.cid() == link).ok_or_else(|| { + Error::new_invalid_arg(anyhow::anyhow!("prev CID missing from carfile")) + })?; + let payload: unvalidated::Payload = + serde_ipld_dagcbor::from_slice(&link.bytes).map_err(|e| { + Error::new_invalid_arg( + anyhow::anyhow!(e).context("Failed to follow event link"), + ) + })?; + + match payload { + unvalidated::Payload::Data(d) => ( + false, + EventHeader::Data { + cid, + stream_cid: *d.id(), + prev: *d.prev(), + }, + ), + unvalidated::Payload::Init(init) => { + let header = init.header().to_owned(); + + (true, EventHeader::Init { cid, header }) + } + } + } + unvalidated::RawEvent::Unsigned(init) => ( + true, + EventHeader::Init { + cid, + header: init.header().to_owned(), + }, + ), + }; + + Ok(Self::new(event_cid, header, blocks, deliverable)) } } diff --git a/store/src/sql/entities/event_block.rs b/store/src/sql/entities/event_block.rs index e40698a69..58b80f2f6 100644 --- a/store/src/sql/entities/event_block.rs +++ b/store/src/sql/entities/event_block.rs @@ -134,4 +134,8 @@ impl EventBlockRaw { bytes, }) } + + pub fn cid(&self) -> Cid { + Cid::new_v1(self.codec as u64, self.multihash.clone().into_inner()) + } } diff --git a/store/src/sql/entities/event_metadata.rs b/store/src/sql/entities/event_metadata.rs new file mode 100644 index 000000000..b40d829b0 --- /dev/null +++ b/store/src/sql/entities/event_metadata.rs @@ -0,0 +1,58 @@ +use cid::Cid; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, sqlx::Type)] +pub enum EventType { + Init, + Data, + Time, +} + +#[derive(Debug, Clone)] +pub struct EventMetadataRow {} + +impl EventMetadataRow { + pub fn insert() -> &'static str { + "INSERT INTO ceramic_one_event_metadata (cid, stream_cid, event_type, prev) VALUES ($1, $2, $3, $4)" + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +/// An event header wrapper for use in the store crate. +/// TODO: replace this with something from the event crate +pub enum EventHeader { + Init { + cid: Cid, + header: ceramic_event::unvalidated::init::Header, + }, + Data { + cid: Cid, + stream_cid: Cid, + prev: Cid, + }, + Time { + cid: Cid, + stream_cid: Cid, + prev: Cid, + }, +} + +impl EventHeader { + /// Returns the event type of the event header + pub(crate) fn event_type(&self) -> EventType { + match self { + EventHeader::Init { .. } => EventType::Init, + EventHeader::Data { .. } => EventType::Data, + EventHeader::Time { .. } => EventType::Time, + } + } + + /// Returns the stream CID of the event + pub(crate) fn stream_cid(&self) -> Cid { + match self { + EventHeader::Init { cid, .. } => *cid, + EventHeader::Data { stream_cid, .. } | EventHeader::Time { stream_cid, .. } => { + *stream_cid + } + } + } +} diff --git a/store/src/sql/entities/hash.rs b/store/src/sql/entities/hash.rs index 926e6af1e..a914b808f 100644 --- a/store/src/sql/entities/hash.rs +++ b/store/src/sql/entities/hash.rs @@ -4,7 +4,7 @@ use sqlx::{sqlite::SqliteRow, Row as _}; use crate::{Error, Result}; -#[derive(Debug, Clone, PartialEq, Eq, Hash, sqlx::Type)] +#[derive(Debug, Clone, PartialEq, Eq, sqlx::Type)] pub struct BlockHash(Multihash<64>); impl BlockHash { diff --git a/store/src/sql/entities/mod.rs b/store/src/sql/entities/mod.rs index 8103296af..f03d7c9ce 100644 --- a/store/src/sql/entities/mod.rs +++ b/store/src/sql/entities/mod.rs @@ -1,12 +1,18 @@ mod block; mod event; mod event_block; +mod event_metadata; mod hash; +mod stream; mod utils; pub use block::{BlockBytes, BlockRow}; pub use event::{rebuild_car, EventInsertable, EventInsertableBody}; pub use event_block::{EventBlockRaw, ReconEventBlockRaw}; +pub use event_metadata::{EventHeader, EventMetadataRow, EventType}; pub use hash::{BlockHash, ReconHash}; +pub use stream::{IncompleteStream, StreamEventRow, StreamRow}; -pub use utils::{CountRow, DeliveredEvent, OrderKey}; +pub use utils::{CountRow, DeliveredEventRow, OrderKey}; + +pub type StreamCid = cid::Cid; diff --git a/store/src/sql/entities/stream.rs b/store/src/sql/entities/stream.rs new file mode 100644 index 000000000..9916fcd4f --- /dev/null +++ b/store/src/sql/entities/stream.rs @@ -0,0 +1,68 @@ +use cid::Cid; +use sqlx::{sqlite::SqliteRow, Row}; + +use super::StreamCid; + +#[derive(Debug, Clone, sqlx::FromRow)] +pub struct StreamRow { + pub cid: Vec, + pub sep: String, + pub sep_val: Vec, +} + +impl StreamRow { + pub fn insert() -> &'static str { + "INSERT INTO ceramic_one_stream (cid, sep, sep_value) VALUES ($1, $2, $3) returning cid" + } +} + +#[derive(Debug, Clone, sqlx::FromRow)] +pub struct StreamEventRow { + pub cid: Vec, + pub prev: Option>, + pub deliverable: bool, +} + +impl StreamEventRow { + /// Requires binding one argument: + /// $1 = stream_cid (bytes) + pub fn fetch_by_stream_cid() -> &'static str { + r#" + SELECT e.cid as "cid", eh.prev as "prev", + e.delivered IS NOT NULL as "deliverable" + FROM ceramic_one_stream s + JOIN ceramic_one_event_metadata eh on eh.stream_cid = s.cid + JOIN ceramic_one_event e on e.cid = eh.cid + WHERE s.cid = $1"# + } +} + +#[derive(Debug, Clone)] +pub struct IncompleteStream { + pub stream_cid: StreamCid, + pub row_id: i64, +} + +impl IncompleteStream { + /// Requires binding two arguments: + /// $1 = highwater mark (i64) + /// $2 = limit (usize) + pub fn fetch_all_with_undelivered() -> &'static str { + r#" + SELECT DISTINCT s.cid as "stream_cid", s.rowid + FROM ceramic_one_stream s + JOIN ceramic_one_event_metadata eh on eh.stream_cid = s.cid + JOIN ceramic_one_event e on e.cid = eh.cid + WHERE e.delivered is NULL and s.rowid > $1 + LIMIT $2"# + } +} + +impl sqlx::FromRow<'_, SqliteRow> for IncompleteStream { + fn from_row(row: &SqliteRow) -> sqlx::Result { + let cid: Vec = row.try_get("stream_cid")?; + let row_id: i64 = row.try_get("rowid")?; + let stream_cid = Cid::try_from(cid).map_err(|e| sqlx::Error::Decode(Box::new(e)))?; + Ok(Self { stream_cid, row_id }) + } +} diff --git a/store/src/sql/entities/utils.rs b/store/src/sql/entities/utils.rs index d8efc5f5a..3b6555607 100644 --- a/store/src/sql/entities/utils.rs +++ b/store/src/sql/entities/utils.rs @@ -24,12 +24,12 @@ impl TryFrom for EventId { } #[derive(sqlx::FromRow)] -pub struct DeliveredEvent { +pub struct DeliveredEventRow { pub cid: Vec, pub new_highwater_mark: i64, } -impl DeliveredEvent { +impl DeliveredEventRow { /// assumes rows are sorted by `delivered` ascending pub fn parse_query_results(current: i64, rows: Vec) -> Result<(i64, Vec)> { let max: i64 = rows.last().map_or(current, |r| r.new_highwater_mark + 1); diff --git a/store/src/sql/mod.rs b/store/src/sql/mod.rs index 884b04b9b..886b241b5 100644 --- a/store/src/sql/mod.rs +++ b/store/src/sql/mod.rs @@ -6,7 +6,10 @@ mod sqlite; #[cfg(test)] mod test; -pub use access::{CeramicOneBlock, CeramicOneEvent, CeramicOneEventBlock, CeramicOneInterest}; +pub use access::{ + CeramicOneBlock, CeramicOneEvent, CeramicOneEventBlock, CeramicOneInterest, CeramicOneStream, + InsertResult, InsertedEvent, StreamEventMetadata, +}; pub use root::SqliteRootStore; pub use sqlite::{SqlitePool, SqliteTransaction}; diff --git a/store/src/sql/query.rs b/store/src/sql/query.rs index c73ffc9d1..fcc935b9a 100644 --- a/store/src/sql/query.rs +++ b/store/src/sql/query.rs @@ -124,7 +124,7 @@ impl EventQuery { /// $1 = delivered (i64) /// $2 = cid (bytes) pub fn mark_ready_to_deliver() -> &'static str { - "UPDATE ceramic_one_event SET delivered = $1 WHERE cid = $2;" + "UPDATE ceramic_one_event SET delivered = $1 WHERE cid = $2 and delivered is NULL;" } } @@ -170,11 +170,13 @@ impl ReconQuery { "INSERT INTO ceramic_one_event ( order_key, cid, ahash_0, ahash_1, ahash_2, ahash_3, - ahash_4, ahash_5, ahash_6, ahash_7 + ahash_4, ahash_5, ahash_6, ahash_7, + delivered ) VALUES ( $1, $2, $3, $4, $5, $6, - $7, $8, $9, $10 + $7, $8, $9, $10, + $11 );" } diff --git a/store/src/sql/test.rs b/store/src/sql/test.rs index d0e55c777..6e581977d 100644 --- a/store/src/sql/test.rs +++ b/store/src/sql/test.rs @@ -32,8 +32,19 @@ fn random_event(cid: &str) -> EventInsertable { order_key, body: EventInsertableBody { cid, - deliverable: false, blocks: vec![], + deliverable: true, + header: super::entities::EventHeader::Init { + cid, + header: ceramic_event::unvalidated::init::Header::new( + vec![CONTROLLER.to_string()], + SEP_KEY.to_string(), + vec![3, 2, 45, 8], + None, + None, + None, + ), + }, }, } } @@ -48,8 +59,7 @@ async fn hash_range_query() { .await .unwrap(); - let new = x.keys.into_iter().filter(|x| *x).count(); - assert_eq!(new, 2); + assert_eq!(x.count_new_keys(), 2); let hash = CeramicOneEvent::hash_range( &pool, @@ -70,8 +80,7 @@ async fn range_query() { .await .unwrap(); - let new = x.keys.into_iter().filter(|x| *x).count(); - assert_eq!(new, 2); + assert_eq!(x.count_new_keys(), 2); let ids = CeramicOneEvent::range( &pool, From d7e194b8b58d959e201e09ce891e143b4112cc33 Mon Sep 17 00:00:00 2001 From: David Estes Date: Thu, 13 Jun 2024 19:53:01 -0600 Subject: [PATCH 9/9] chore: write migration to populate new tables from blockstore --- store/src/migration.rs | 116 ++++++++++++++++++++++++++++++++++++++--- 1 file changed, 110 insertions(+), 6 deletions(-) diff --git a/store/src/migration.rs b/store/src/migration.rs index 9dbbcecbd..9205297c4 100644 --- a/store/src/migration.rs +++ b/store/src/migration.rs @@ -3,7 +3,10 @@ use std::sync::OnceLock; use sqlx::{prelude::FromRow, types::chrono}; use tracing::{debug, info}; -use crate::{Error, Result, SqlitePool}; +use crate::{ + sql::entities::{EventHeader, ReconEventBlockRaw}, + CeramicOneStream, Error, EventInsertableBody, Result, SqlitePool, +}; static MIGRATIONS: OnceLock> = OnceLock::new(); @@ -18,7 +21,12 @@ struct Migration { /// The list of migrations that need to be run in order to get the database up to date and start the server. /// Add new migrations to the end of the list. fn data_migrations() -> &'static Vec { - MIGRATIONS.get_or_init(Vec::new) + MIGRATIONS.get_or_init(|| { + vec![Migration { + name: "events_to_streams", + _version: "0.22.0", + }] + }) } #[derive(Debug, Clone, sqlx::FromRow)] @@ -123,8 +131,8 @@ impl DataMigrator { } async fn run_migration_by_name(&self, name: &str) -> Result<()> { - #[allow(clippy::match_single_binding)] - let _res: Result<()> = match name { + let res = match name { + "events_to_streams" => self.migrate_events_to_streams().await, _ => { return Err(Error::new_fatal(anyhow::anyhow!( "Unknown migration: {}", @@ -132,8 +140,7 @@ impl DataMigrator { ))) } }; - #[allow(unreachable_code)] - match _res { + match res { Ok(_) => { info!("Migration {} completed successfully", name); Ok(()) @@ -155,6 +162,103 @@ impl DataMigrator { } } } + + // This isn't the most efficient approach but it's simple and we should only run it once. + // It isn't expected to ever be run on something that isn't a sqlite database upgrading from version 0.22.0 or below. + async fn migrate_events_to_streams(&self) -> Result<()> { + let mut cid_cursor = Some(0); + + struct EventToStreamRow { + row_blocks: ReconEventBlockRaw, + row_id: i64, + } + use sqlx::Row; + impl sqlx::FromRow<'_, sqlx::sqlite::SqliteRow> for EventToStreamRow { + fn from_row(row: &sqlx::sqlite::SqliteRow) -> std::result::Result { + let row_id: i64 = row.try_get("rowid")?; + let row_blocks = ReconEventBlockRaw::from_row(row)?; + Ok(Self { row_id, row_blocks }) + } + } + + let limit = 1000; + let mut total_migrated = 0; + while let Some(last_cid) = cid_cursor { + let mut tx = self.pool.begin_tx().await?; + + // RowID starts from 1 so we can just use greater than 0 to start + let all_blocks: Vec = sqlx::query_as( + r#"WITH key AS ( + SELECT + e.cid AS event_cid, e.order_key, e.rowid + FROM ceramic_one_event e + WHERE + EXISTS ( + SELECT 1 + FROM ceramic_one_event_block + WHERE event_cid = e.cid + ) + AND NOT EXISTS ( + SELECT 1 + FROM ceramic_one_event_metadata + WHERE cid = e.cid + ) + AND e.rowid > $1 + ORDER BY e.rowid + LIMIT $2 + ) + SELECT + key.order_key, key.event_cid, eb.codec, eb.root, eb.idx, b.multihash, b.bytes, key.rowid + FROM key + JOIN ceramic_one_event_block eb ON key.event_cid = eb.event_cid + JOIN ceramic_one_block b ON b.multihash = eb.block_multihash + ORDER BY key.order_key, eb.idx;"#, + ) + .bind(last_cid) + .bind(limit) + .fetch_all(&mut **tx.inner()) + .await?; + + let new_max = all_blocks.iter().map(|v| v.row_id).max(); + if all_blocks.is_empty() || new_max.is_none() { + tx.commit().await?; + return Ok(()); + } else { + cid_cursor = new_max; + } + + let values = ReconEventBlockRaw::into_carfiles( + all_blocks.into_iter().map(|b| b.row_blocks).collect(), + ) + .await?; + total_migrated += values.len(); + if total_migrated % 10000 == 0 { + debug!("Migrated {} events to the stream format", total_migrated); + } + tracing::trace!("found {} values to migrate", values.len()); + + for (event_id, payload) in values { + tracing::trace!("Migrating event: {}", event_id); + // should we log and continue if anything fails? It shouldn't be possible unless something bad happened + // and we allowed unexpected data into the system, but if we error, it will require manual intervention to recover + // as the bad data will need to be deleted by hand. temporary failures to write can be retried. + let event_cid = event_id.cid().ok_or_else(|| { + Error::new_fatal(anyhow::anyhow!("Event ID is missing a CID: {}", event_id)) + })?; + + let insertable = EventInsertableBody::try_from_carfile(event_cid, &payload).await?; + + if let EventHeader::Init { header, .. } = &insertable.header { + CeramicOneStream::insert_tx(&mut tx, insertable.cid, header).await?; + } + + CeramicOneStream::insert_event_header_tx(&mut tx, &insertable.header).await?; + } + tx.commit().await?; + } + + Ok(()) + } } #[derive(Debug, Clone, FromRow)]