diff --git a/Cargo.lock b/Cargo.lock index 9306520b3f6..5ddba4b9b06 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3859,7 +3859,7 @@ dependencies = [ [[package]] name = "mithril-persistence" -version = "0.2.34" +version = "0.2.35" dependencies = [ "anyhow", "async-trait", @@ -3903,7 +3903,7 @@ dependencies = [ [[package]] name = "mithril-signer" -version = "0.2.214" +version = "0.2.215" dependencies = [ "anyhow", "async-trait", diff --git a/internal/mithril-persistence/Cargo.toml b/internal/mithril-persistence/Cargo.toml index 9b8d2333e32..5e2da4bfe87 100644 --- a/internal/mithril-persistence/Cargo.toml +++ b/internal/mithril-persistence/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mithril-persistence" -version = "0.2.34" +version = "0.2.35" description = "Common types, interfaces, and utilities to persist data for Mithril nodes." authors = { workspace = true } edition = { workspace = true } diff --git a/internal/mithril-persistence/src/database/version_checker.rs b/internal/mithril-persistence/src/database/version_checker.rs index 84e91fa50b9..fa609915181 100644 --- a/internal/mithril-persistence/src/database/version_checker.rs +++ b/internal/mithril-persistence/src/database/version_checker.rs @@ -156,7 +156,7 @@ insert into db_version (application_type, version, updated_at) values ('{applica } /// Represent a file containing SQL structure or data alterations. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct SqlMigration { /// The semver version this migration targets. pub version: DbVersion, diff --git a/internal/mithril-persistence/src/sqlite/connection_builder.rs b/internal/mithril-persistence/src/sqlite/connection_builder.rs index 207029e29d6..eb0e9eb0a59 100644 --- a/internal/mithril-persistence/src/sqlite/connection_builder.rs +++ b/internal/mithril-persistence/src/sqlite/connection_builder.rs @@ -107,21 +107,8 @@ impl ConnectionBuilder { .with_context(|| "SQLite initialization: could not enable FOREIGN KEY support.")?; } - if self.sql_migrations.is_empty().not() { - // Check database migrations - debug!(logger, "Applying database migrations"); - let mut db_checker = - DatabaseVersionChecker::new(self.base_logger, self.node_type, &connection); - - for migration in self.sql_migrations { - db_checker.add_migration(migration); - } - - db_checker - .apply() - .with_context(|| "Database migration error")?; - } - + let migrations = self.sql_migrations.clone(); + self.apply_migrations(&connection, migrations)?; if self .options .contains(&ConnectionOptions::ForceDisableForeignKeys) @@ -131,9 +118,37 @@ impl ConnectionBuilder { .execute("pragma foreign_keys=false") .with_context(|| "SQLite initialization: could not disable FOREIGN KEY support.")?; } - Ok(connection) } + + /// Apply a list of migration to the connection. + pub fn apply_migrations( + &self, + connection: &ConnectionThreadSafe, + sql_migrations: Vec, + ) -> StdResult<()> { + let logger = self.base_logger.new_with_component_name::(); + + if sql_migrations.is_empty().not() { + // Check database migrations + debug!(logger, "Applying database migrations"); + let mut db_checker = DatabaseVersionChecker::new( + self.base_logger.clone(), + self.node_type.clone(), + connection, + ); + + for migration in sql_migrations { + db_checker.add_migration(migration.clone()); + } + + db_checker + .apply() + .with_context(|| "Database migration error")?; + } + + Ok(()) + } } #[cfg(test)] @@ -278,4 +293,31 @@ mod tests { let foreign_keys = execute_single_cell_query(&connection, "pragma foreign_keys;"); assert_eq!(Value::Integer(false.into()), foreign_keys); } + + #[test] + fn test_apply_a_partial_migrations() { + let migrations = vec![ + SqlMigration::new(1, "create table first(id integer);"), + SqlMigration::new(2, "create table second(id integer);"), + ]; + + let connection = ConnectionBuilder::open_memory().build().unwrap(); + + assert!(connection.prepare("select * from first;").is_err()); + assert!(connection.prepare("select * from second;").is_err()); + + ConnectionBuilder::open_memory() + .apply_migrations(&connection, migrations[0..1].to_vec()) + .unwrap(); + + assert!(connection.prepare("select * from first;").is_ok()); + assert!(connection.prepare("select * from second;").is_err()); + + ConnectionBuilder::open_memory() + .apply_migrations(&connection, migrations) + .unwrap(); + + assert!(connection.prepare("select * from first;").is_ok()); + assert!(connection.prepare("select * from second;").is_ok()); + } } diff --git a/mithril-signer/Cargo.toml b/mithril-signer/Cargo.toml index 905551e4440..fe1d1525b3c 100644 --- a/mithril-signer/Cargo.toml +++ b/mithril-signer/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mithril-signer" -version = "0.2.214" +version = "0.2.215" description = "A Mithril Signer" authors = { workspace = true } edition = { workspace = true } diff --git a/mithril-signer/src/database/migration.rs b/mithril-signer/src/database/migration.rs index 3b87c78670d..bf26d72c65d 100644 --- a/mithril-signer/src/database/migration.rs +++ b/mithril-signer/src/database/migration.rs @@ -63,12 +63,61 @@ create index signed_beacon_signed_entity_type_id on signed_beacon(signed_entity_ // Migration 4 // Remove `network` from cardano immutable files full beacons in `signed_beacon` table SqlMigration::new( - 31, + 4, r#" update signed_beacon set beacon = json_remove(beacon, '$.network') where signed_beacon.signed_entity_type_id = 2; "#, ), + // Migration 5 + // Add the `stake_pool` table and migration data from the previous + // `stake_store` JSON format. + SqlMigration::new( + 5, + r#" +create table stake_pool ( + stake_pool_id text not null, + epoch integer not null, + stake integer not null, + created_at text not null, + primary key (epoch, stake_pool_id) +); +create table if not exists stake (key_hash text primary key, key json not null, value json not null); +insert into stake_pool (epoch, stake_pool_id, stake, created_at) + select + stake.key as epoch, + stake_dis.key as stake_pool_id, + stake_dis.value as stake, + strftime('%Y-%m-%dT%H:%M:%fZ', current_timestamp) + from stake, json_each(stake.value) as stake_dis + order by epoch asc; +drop table stake; +"#, + ), + // Migration 6 + // Add the `protocol_initializer` table and migration data from the previous + // `protocol_initializer` JSON format. + SqlMigration::new( + 6, + r#" +create table new_protocol_initializer ( + epoch integer not null, + protocol json not null, + created_at text not null, + primary key (epoch) +); +create table if not exists protocol_initializer (key_hash text primary key, key json not null, value json not null); +insert into new_protocol_initializer (epoch, protocol, created_at) + select + protocol_initializer.key as epoch, + protocol_initializer.value, + strftime('%Y-%m-%dT%H:%M:%fZ', current_timestamp) + from protocol_initializer + order by epoch asc; +drop table protocol_initializer; +alter table new_protocol_initializer rename to protocol_initializer; +"#, + ), ] } diff --git a/mithril-signer/src/database/mod.rs b/mithril-signer/src/database/mod.rs index efd73711816..48ee00feec9 100644 --- a/mithril-signer/src/database/mod.rs +++ b/mithril-signer/src/database/mod.rs @@ -7,3 +7,5 @@ pub mod record; pub mod repository; #[cfg(test)] pub(crate) mod test_helper; +#[cfg(test)] +pub(crate) mod tests; diff --git a/mithril-signer/src/database/query/mod.rs b/mithril-signer/src/database/query/mod.rs index 5be4618bf4d..2f5b5001218 100644 --- a/mithril-signer/src/database/query/mod.rs +++ b/mithril-signer/src/database/query/mod.rs @@ -1,5 +1,9 @@ //! Signer related database queries +mod protocol_initializer; mod signed_beacon; +mod stake_pool; +pub use protocol_initializer::*; pub use signed_beacon::*; +pub use stake_pool::*; diff --git a/mithril-signer/src/database/query/protocol_initializer/delete_protocol_initializer.rs b/mithril-signer/src/database/query/protocol_initializer/delete_protocol_initializer.rs new file mode 100644 index 00000000000..68a2e0f6e21 --- /dev/null +++ b/mithril-signer/src/database/query/protocol_initializer/delete_protocol_initializer.rs @@ -0,0 +1,42 @@ +use sqlite::Value; + +use mithril_common::entities::Epoch; +use mithril_persistence::sqlite::{Query, SourceAlias, SqLiteEntity, WhereCondition}; + +use crate::database::record::ProtocolInitializerRecord; + +/// Query to delete old [ProtocolInitializer] from the sqlite database +pub struct DeleteProtocolInitializerQuery { + condition: WhereCondition, +} + +impl Query for DeleteProtocolInitializerQuery { + type Entity = ProtocolInitializerRecord; + + fn filters(&self) -> WhereCondition { + self.condition.clone() + } + + fn get_definition(&self, condition: &str) -> String { + // it is important to alias the fields with the same name as the table + // since the table cannot be aliased in a RETURNING statement in SQLite. + let projection = Self::Entity::get_projection().expand(SourceAlias::new(&[( + "{:protocol_initializer:}", + "protocol_initializer", + )])); + + format!("delete from protocol_initializer where {condition} returning {projection}") + } +} + +impl DeleteProtocolInitializerQuery { + /// Create the SQL query to prune data older than the given Epoch. + pub fn below_epoch_threshold(epoch_threshold: Epoch) -> Self { + let condition = WhereCondition::new( + "epoch < ?*", + vec![Value::Integer(epoch_threshold.try_into().unwrap())], + ); + + Self { condition } + } +} diff --git a/mithril-signer/src/database/query/protocol_initializer/get_protocol_initializer.rs b/mithril-signer/src/database/query/protocol_initializer/get_protocol_initializer.rs new file mode 100644 index 00000000000..6146c851d7f --- /dev/null +++ b/mithril-signer/src/database/query/protocol_initializer/get_protocol_initializer.rs @@ -0,0 +1,53 @@ +use sqlite::Value; + +use mithril_common::entities::Epoch; +use mithril_persistence::sqlite::{Query, SourceAlias, SqLiteEntity, WhereCondition}; + +use crate::database::record::ProtocolInitializerRecord; + +/// Simple queries to retrieve [ProtocolInitializer] from the sqlite database. +pub struct GetProtocolInitializerQuery { + condition: WhereCondition, + limit: Option, +} + +impl GetProtocolInitializerQuery { + /// Get protocol initializer that match the epoch. + pub fn for_epoch(epoch: Epoch) -> Self { + let epoch_i64: i64 = epoch.try_into().unwrap(); + let condition = WhereCondition::new( + "protocol_initializer.epoch = ?", + vec![Value::Integer(epoch_i64)], + ); + + Self { + condition, + limit: None, + } + } + + pub fn last_n(limit: usize) -> Self { + let condition = WhereCondition::default(); + Self { + condition, + limit: Some(limit), + } + } +} + +impl Query for GetProtocolInitializerQuery { + type Entity = ProtocolInitializerRecord; + + fn filters(&self) -> WhereCondition { + self.condition.clone() + } + + fn get_definition(&self, condition: &str) -> String { + let aliases = SourceAlias::new(&[("{:protocol_initializer:}", "protocol_initializer")]); + let projection = Self::Entity::get_projection().expand(aliases); + let limit = self + .limit + .map_or("".to_string(), |limit| format!(" limit {}", limit)); + format!("select {projection} from protocol_initializer where {condition} order by rowid desc{limit}") + } +} diff --git a/mithril-signer/src/database/query/protocol_initializer/insert_protocol_initializer.rs b/mithril-signer/src/database/query/protocol_initializer/insert_protocol_initializer.rs new file mode 100644 index 00000000000..58bb70189e4 --- /dev/null +++ b/mithril-signer/src/database/query/protocol_initializer/insert_protocol_initializer.rs @@ -0,0 +1,46 @@ +use sqlite::Value; + +use mithril_common::StdResult; +use mithril_persistence::sqlite::{Query, SourceAlias, SqLiteEntity, WhereCondition}; + +use crate::database::record::ProtocolInitializerRecord; + +/// Query to insert or replace [ProtocolInitializerRecord] in the sqlite database +pub struct InsertOrReplaceProtocolInitializerQuery { + condition: WhereCondition, +} + +impl InsertOrReplaceProtocolInitializerQuery { + pub fn one(record: ProtocolInitializerRecord) -> StdResult { + let value = serde_json::to_string(&record.protocol_initializer).unwrap(); + let condition = WhereCondition::new( + "(epoch, protocol, created_at) values (?*, ?*, ?*)", + vec![ + Value::Integer(record.epoch.try_into()?), + Value::String(value), + Value::String(record.created_at.to_rfc3339()), + ], + ); + + Ok(Self { condition }) + } +} + +impl Query for InsertOrReplaceProtocolInitializerQuery { + type Entity = ProtocolInitializerRecord; + + fn filters(&self) -> WhereCondition { + self.condition.clone() + } + + fn get_definition(&self, condition: &str) -> String { + // it is important to alias the fields with the same name as the table + // since the table cannot be aliased in a RETURNING statement in SQLite. + let projection = Self::Entity::get_projection().expand(SourceAlias::new(&[( + "{:protocol_initializer:}", + "protocol_initializer", + )])); + + format!("insert or replace into protocol_initializer {condition} returning {projection}") + } +} diff --git a/mithril-signer/src/database/query/protocol_initializer/mod.rs b/mithril-signer/src/database/query/protocol_initializer/mod.rs new file mode 100644 index 00000000000..a33fce5893c --- /dev/null +++ b/mithril-signer/src/database/query/protocol_initializer/mod.rs @@ -0,0 +1,7 @@ +mod delete_protocol_initializer; +mod get_protocol_initializer; +mod insert_protocol_initializer; + +pub use delete_protocol_initializer::*; +pub use get_protocol_initializer::*; +pub use insert_protocol_initializer::*; diff --git a/mithril-signer/src/database/query/stake_pool/delete_stake_pool.rs b/mithril-signer/src/database/query/stake_pool/delete_stake_pool.rs new file mode 100644 index 00000000000..5c7f19c0b38 --- /dev/null +++ b/mithril-signer/src/database/query/stake_pool/delete_stake_pool.rs @@ -0,0 +1,77 @@ +use sqlite::Value; + +use mithril_common::{entities::Epoch, StdResult}; +use mithril_persistence::sqlite::{Query, SourceAlias, SqLiteEntity, WhereCondition}; + +use crate::database::record::StakePool; + +/// Query to delete old [StakePool] from the sqlite database +pub struct DeleteStakePoolQuery { + condition: WhereCondition, +} + +impl Query for DeleteStakePoolQuery { + type Entity = StakePool; + + fn filters(&self) -> WhereCondition { + self.condition.clone() + } + + fn get_definition(&self, condition: &str) -> String { + // it is important to alias the fields with the same name as the table + // since the table cannot be aliased in a RETURNING statement in SQLite. + let projection = Self::Entity::get_projection() + .expand(SourceAlias::new(&[("{:stake_pool:}", "stake_pool")])); + + format!("delete from stake_pool where {condition} returning {projection}") + } +} + +impl DeleteStakePoolQuery { + /// Create the SQL query to prune data older than the given Epoch. + pub fn below_epoch_threshold(epoch_threshold: Epoch) -> Self { + let condition = WhereCondition::new( + "epoch < ?*", + vec![Value::Integer(epoch_threshold.try_into().unwrap())], + ); + + Self { condition } + } + + /// Create the SQL query to delete the given Epoch. + pub fn by_epoch(epoch: Epoch) -> StdResult { + let condition = WhereCondition::new("epoch = ?*", vec![Value::Integer(epoch.try_into()?)]); + + Ok(Self { condition }) + } +} + +#[cfg(test)] +mod tests { + use crate::database::query::GetStakePoolQuery; + use crate::database::test_helper::{insert_stake_pool, main_db_connection}; + use mithril_persistence::sqlite::ConnectionExtensions; + + use super::*; + + #[test] + fn test_prune_below_epoch_threshold() { + let connection = main_db_connection().unwrap(); + insert_stake_pool(&connection, &[1, 2]).unwrap(); + + let cursor = connection + .fetch(DeleteStakePoolQuery::below_epoch_threshold(Epoch(2))) + .unwrap(); + assert_eq!(3, cursor.count()); + + let cursor = connection + .fetch(GetStakePoolQuery::by_epoch(Epoch(1)).unwrap()) + .unwrap(); + assert_eq!(0, cursor.count()); + + let cursor = connection + .fetch(GetStakePoolQuery::by_epoch(Epoch(2)).unwrap()) + .unwrap(); + assert_eq!(3, cursor.count()); + } +} diff --git a/mithril-signer/src/database/query/stake_pool/get_stake_pool.rs b/mithril-signer/src/database/query/stake_pool/get_stake_pool.rs new file mode 100644 index 00000000000..032e48ca465 --- /dev/null +++ b/mithril-signer/src/database/query/stake_pool/get_stake_pool.rs @@ -0,0 +1,81 @@ +use sqlite::Value; + +use mithril_common::entities::Epoch; +use mithril_common::StdResult; +use mithril_persistence::sqlite::{Query, SourceAlias, SqLiteEntity, WhereCondition}; + +use crate::database::record::StakePool; + +/// Simple queries to retrieve [StakePool] from the sqlite database. +pub struct GetStakePoolQuery { + condition: WhereCondition, +} + +impl GetStakePoolQuery { + /// Get StakePools for a given Epoch for given pool_ids. + pub fn by_epoch(epoch: Epoch) -> StdResult { + let condition = WhereCondition::new("epoch = ?*", vec![Value::Integer(epoch.try_into()?)]); + + Ok(Self { condition }) + } + + #[cfg(test)] + pub(crate) fn all() -> Self { + Self { + condition: WhereCondition::default(), + } + } +} + +impl Query for GetStakePoolQuery { + type Entity = StakePool; + + fn filters(&self) -> WhereCondition { + self.condition.clone() + } + + fn get_definition(&self, condition: &str) -> String { + let aliases = SourceAlias::new(&[("{:stake_pool:}", "sp")]); + let projection = Self::Entity::get_projection().expand(aliases); + + format!("select {projection} from stake_pool as sp where {condition} order by epoch asc, stake desc, stake_pool_id asc") + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::database::{query::InsertOrReplaceStakePoolQuery, test_helper::main_db_connection}; + use mithril_persistence::sqlite::ConnectionExtensions; + + #[test] + fn test_query_sorts_the_return_stake_pool_by_epoch_stack_and_stake_pool_id() { + let connection = main_db_connection().unwrap(); + connection + .apply(InsertOrReplaceStakePoolQuery::many(vec![ + ("pool-A".to_string(), Epoch(1), 1500), + ("pool-D".to_string(), Epoch(2), 1250), + ("pool-B".to_string(), Epoch(1), 1000), + ("pool-E".to_string(), Epoch(1), 1600), + ("pool-C".to_string(), Epoch(1), 1600), + ])) + .unwrap(); + + let stake_pool_in_database: Vec = + connection.fetch_collect(GetStakePoolQuery::all()).unwrap(); + + assert_eq!( + vec![ + ("pool-C".to_string(), Epoch(1), 1600), + ("pool-E".to_string(), Epoch(1), 1600), + ("pool-A".to_string(), Epoch(1), 1500), + ("pool-B".to_string(), Epoch(1), 1000), + ("pool-D".to_string(), Epoch(2), 1250), + ], + stake_pool_in_database + .into_iter() + .map(|s| (s.stake_pool_id, s.epoch, s.stake)) + .collect::>() + ); + } +} diff --git a/mithril-signer/src/database/query/stake_pool/insert_or_replace_stake_pool.rs b/mithril-signer/src/database/query/stake_pool/insert_or_replace_stake_pool.rs new file mode 100644 index 00000000000..a5412048ce8 --- /dev/null +++ b/mithril-signer/src/database/query/stake_pool/insert_or_replace_stake_pool.rs @@ -0,0 +1,93 @@ +use std::iter::repeat; + +use chrono::Utc; +use sqlite::Value; + +use mithril_common::entities::{Epoch, PartyId, Stake}; +use mithril_persistence::sqlite::{Query, SourceAlias, SqLiteEntity, WhereCondition}; + +use crate::database::record::StakePool; + +/// Query to insert or replace [StakePool] in the sqlite database +pub struct InsertOrReplaceStakePoolQuery { + condition: WhereCondition, +} + +impl InsertOrReplaceStakePoolQuery { + pub fn many(records: Vec<(PartyId, Epoch, Stake)>) -> Self { + let columns = "(stake_pool_id, epoch, stake, created_at)"; + let values_columns: Vec<&str> = repeat("(?*, ?*, ?*, ?*)").take(records.len()).collect(); + let values = records + .into_iter() + .flat_map(|(stake_pool_id, epoch, stake)| { + vec![ + Value::String(stake_pool_id), + Value::Integer(epoch.try_into().unwrap()), + Value::Integer(i64::try_from(stake).unwrap()), + Value::String(Utc::now().to_rfc3339()), + ] + }) + .collect(); + let condition = WhereCondition::new( + format!("{columns} values {}", values_columns.join(", ")).as_str(), + values, + ); + + Self { condition } + } +} + +impl Query for InsertOrReplaceStakePoolQuery { + type Entity = StakePool; + + fn filters(&self) -> WhereCondition { + self.condition.clone() + } + + fn get_definition(&self, condition: &str) -> String { + // it is important to alias the fields with the same name as the table + // since the table cannot be aliased in a RETURNING statement in SQLite. + let projection = Self::Entity::get_projection() + .expand(SourceAlias::new(&[("{:stake_pool:}", "stake_pool")])); + + format!("insert or replace into stake_pool {condition} returning {projection}") + } +} + +#[cfg(test)] +mod tests { + use crate::database::query::GetStakePoolQuery; + use crate::database::test_helper::{insert_stake_pool, main_db_connection}; + use mithril_persistence::sqlite::ConnectionExtensions; + + use super::*; + + #[test] + fn test_update_stakes() { + let connection = main_db_connection().unwrap(); + insert_stake_pool(&connection, &[3]).unwrap(); + + let pools: Vec = connection + .fetch_collect(InsertOrReplaceStakePoolQuery::many(vec![( + "pool4".to_string(), + Epoch(3), + 9999, + )])) + .unwrap(); + let stake_pool = pools.first().unwrap(); + + assert_eq!("pool4".to_string(), stake_pool.stake_pool_id); + assert_eq!(Epoch(3), stake_pool.epoch); + assert_eq!(9999, stake_pool.stake); + + let mut cursor = connection + .fetch(GetStakePoolQuery::by_epoch(Epoch(3)).unwrap()) + .unwrap(); + let stake_pool = cursor.next().expect("Should have a stake pool 'pool4'."); + + assert_eq!("pool4".to_string(), stake_pool.stake_pool_id); + assert_eq!(Epoch(3), stake_pool.epoch); + assert_eq!(9999, stake_pool.stake); + assert_eq!(3, cursor.count()); + } +} diff --git a/mithril-signer/src/database/query/stake_pool/mod.rs b/mithril-signer/src/database/query/stake_pool/mod.rs new file mode 100644 index 00000000000..b9f60c850d4 --- /dev/null +++ b/mithril-signer/src/database/query/stake_pool/mod.rs @@ -0,0 +1,7 @@ +mod delete_stake_pool; +mod get_stake_pool; +mod insert_or_replace_stake_pool; + +pub use delete_stake_pool::*; +pub use get_stake_pool::*; +pub use insert_or_replace_stake_pool::*; diff --git a/mithril-signer/src/database/record/mod.rs b/mithril-signer/src/database/record/mod.rs index 9d493c54652..53c95a39f66 100644 --- a/mithril-signer/src/database/record/mod.rs +++ b/mithril-signer/src/database/record/mod.rs @@ -1,5 +1,9 @@ //! Signer related database records +mod protocol_initializer_record; mod signed_beacon_record; +mod stake_pool; +pub use protocol_initializer_record::*; pub use signed_beacon_record::*; +pub use stake_pool::*; diff --git a/mithril-signer/src/database/record/protocol_initializer_record.rs b/mithril-signer/src/database/record/protocol_initializer_record.rs new file mode 100644 index 00000000000..b3c540139f3 --- /dev/null +++ b/mithril-signer/src/database/record/protocol_initializer_record.rs @@ -0,0 +1,60 @@ +use chrono::{DateTime, Utc}; + +use mithril_common::{crypto_helper::ProtocolInitializer, entities::Epoch}; +use mithril_persistence::sqlite::{HydrationError, Projection, SqLiteEntity}; + +/// Protocol initializer. +#[derive(Debug)] +pub struct ProtocolInitializerRecord { + /// Epoch + pub epoch: Epoch, + + /// Protocol Initializer + pub protocol_initializer: ProtocolInitializer, + + /// DateTime of the record creation. + pub created_at: DateTime, +} + +impl SqLiteEntity for ProtocolInitializerRecord { + fn hydrate(row: sqlite::Row) -> Result + where + Self: Sized, + { + let epoch_int = row.read::(0); + let protocol = row.read::<&str, _>(1); + let datetime = &row.read::<&str, _>(2); + + let record = Self { + protocol_initializer: serde_json::from_str(protocol).map_err(|e| { + HydrationError::InvalidData(format!( + "Could not cast string ({}) to ProtocolInitializer. Error: '{e}'", + protocol + )) + })?, + epoch: Epoch(epoch_int.try_into().map_err(|e| { + HydrationError::InvalidData(format!( + "Could not cast i64 ({epoch_int}) to u64. Error: '{e}'" + )) + })?), + created_at: DateTime::parse_from_rfc3339(datetime) + .map_err(|e| { + HydrationError::InvalidData(format!( + "Could not turn string '{datetime}' to rfc3339 Datetime. Error: {e}" + )) + })? + .with_timezone(&Utc), + }; + + Ok(record) + } + + fn get_projection() -> Projection { + let mut projection = Projection::default(); + projection.add_field("epoch", "{:protocol_initializer:}.epoch", "integer"); + projection.add_field("protocol", "{:protocol_initializer:}.protocol", "integer"); + projection.add_field("created_at", "{:protocol_initializer:}.created_at", "text"); + + projection + } +} diff --git a/mithril-signer/src/database/record/stake_pool.rs b/mithril-signer/src/database/record/stake_pool.rs new file mode 100644 index 00000000000..8a8f1c65708 --- /dev/null +++ b/mithril-signer/src/database/record/stake_pool.rs @@ -0,0 +1,64 @@ +use chrono::{DateTime, Utc}; + +use mithril_common::entities::{Epoch, PartyId}; +use mithril_persistence::sqlite::{HydrationError, Projection, SqLiteEntity}; + +/// Stake pool as read from Chain. +#[derive(Debug, PartialEq)] +pub struct StakePool { + /// Pool Id + pub stake_pool_id: PartyId, + + /// Total stake of this pool. + pub stake: u64, + + /// Epoch at which this pool is valid. + pub epoch: Epoch, + + /// DateTime of the record creation. + pub created_at: DateTime, +} + +impl SqLiteEntity for StakePool { + fn hydrate(row: sqlite::Row) -> Result + where + Self: Sized, + { + let epoch_int = row.read::(2); + let datetime = &row.read::<&str, _>(3); + let stake = row.read::(1); + + let stake_pool = Self { + stake_pool_id: row.read::<&str, _>(0).to_string(), + stake: u64::try_from(stake).map_err(|e| { + HydrationError::InvalidData(format!( + "Could not cast the StakePool.stake from internal db I64 → U64. Error: '{e}'.", + )) + })?, + epoch: Epoch(epoch_int.try_into().map_err(|e| { + HydrationError::InvalidData(format!( + "Could not cast i64 ({epoch_int}) to u64. Error: '{e}'" + )) + })?), + created_at: DateTime::parse_from_rfc3339(datetime) + .map_err(|e| { + HydrationError::InvalidData(format!( + "Could not turn string '{datetime}' to rfc3339 Datetime. Error: {e}" + )) + })? + .with_timezone(&Utc), + }; + + Ok(stake_pool) + } + + fn get_projection() -> Projection { + let mut projection = Projection::default(); + projection.add_field("stake_pool_id", "{:stake_pool:}.stake_pool_id", "text"); + projection.add_field("stake", "{:stake_pool:}.stake", "integer"); + projection.add_field("epoch", "{:stake_pool:}.epoch", "integer"); + projection.add_field("created_at", "{:stake_pool:}.created_at", "text"); + + projection + } +} diff --git a/mithril-signer/src/database/repository/mod.rs b/mithril-signer/src/database/repository/mod.rs index 750bcbea6a1..162285b1809 100644 --- a/mithril-signer/src/database/repository/mod.rs +++ b/mithril-signer/src/database/repository/mod.rs @@ -1,6 +1,10 @@ //! Signer related database repositories mod cardano_transaction_repository; +mod protocol_initializer_repository; mod signed_beacon_repository; +mod stake_pool_store; +pub use protocol_initializer_repository::*; pub use signed_beacon_repository::*; +pub use stake_pool_store::*; diff --git a/mithril-signer/src/database/repository/protocol_initializer_repository.rs b/mithril-signer/src/database/repository/protocol_initializer_repository.rs new file mode 100644 index 00000000000..9803ddac918 --- /dev/null +++ b/mithril-signer/src/database/repository/protocol_initializer_repository.rs @@ -0,0 +1,94 @@ +use std::sync::Arc; + +use anyhow::Ok; +use async_trait::async_trait; + +use crate::database::query::{ + DeleteProtocolInitializerQuery, InsertOrReplaceProtocolInitializerQuery, +}; +use crate::database::record::ProtocolInitializerRecord; +use crate::{ + database::query::GetProtocolInitializerQuery, services::EpochPruningTask, + store::ProtocolInitializerStorer, +}; +use mithril_common::{crypto_helper::ProtocolInitializer, entities::Epoch, StdResult}; +use mithril_persistence::sqlite::ConnectionExtensions; +use mithril_persistence::{sqlite::SqliteConnection /*store::adapter::StoreAdapter*/}; + +/// Implementation of the ProtocolInitializerStorer +pub struct ProtocolInitializerRepository { + connection: Arc, + retention_limit: Option, +} + +impl ProtocolInitializerRepository { + /// Create a new ProtocolInitializerRepository. + pub fn new(connection: Arc, retention_limit: Option) -> Self { + Self { + connection, + retention_limit, + } + } +} + +#[async_trait] +impl EpochPruningTask for ProtocolInitializerRepository { + fn pruned_data(&self) -> &'static str { + "Protocol initializer" + } + + async fn prune(&self, epoch: Epoch) -> StdResult<()> { + if let Some(threshold) = self.retention_limit { + self.connection + .apply(DeleteProtocolInitializerQuery::below_epoch_threshold( + epoch - threshold, + ))?; + } + Ok(()) + } +} + +#[async_trait] +impl ProtocolInitializerStorer for ProtocolInitializerRepository { + async fn save_protocol_initializer( + &self, + epoch: Epoch, + protocol_initializer: ProtocolInitializer, + ) -> StdResult> { + let previous_protocol_initializer = self.get_protocol_initializer(epoch).await?; + let record = ProtocolInitializerRecord { + epoch, + protocol_initializer: protocol_initializer.clone(), + created_at: chrono::Utc::now(), + }; + self.connection + .apply(InsertOrReplaceProtocolInitializerQuery::one(record).unwrap())?; + + Ok(previous_protocol_initializer) + } + + async fn get_protocol_initializer( + &self, + epoch: Epoch, + ) -> StdResult> { + let record = self + .connection + .fetch_first(GetProtocolInitializerQuery::for_epoch(epoch))?; + + Ok(record.map(|record| record.protocol_initializer)) + } + + async fn get_last_protocol_initializer( + &self, + last: usize, + ) -> StdResult> { + let record: Vec = self + .connection + .fetch_collect(GetProtocolInitializerQuery::last_n(last))?; + + Ok(record + .iter() + .map(|record| (record.epoch, record.protocol_initializer.to_owned())) + .collect()) + } +} diff --git a/mithril-signer/src/database/repository/stake_pool_store.rs b/mithril-signer/src/database/repository/stake_pool_store.rs new file mode 100644 index 00000000000..d412e94558d --- /dev/null +++ b/mithril-signer/src/database/repository/stake_pool_store.rs @@ -0,0 +1,106 @@ +use std::ops::Not; +use std::sync::Arc; + +use anyhow::Context; +use async_trait::async_trait; + +use mithril_common::entities::{Epoch, StakeDistribution}; +use mithril_common::signable_builder::StakeDistributionRetriever; +use mithril_common::StdResult; +use mithril_persistence::sqlite::{ConnectionExtensions, SqliteConnection}; +use mithril_persistence::store::StakeStorer; + +use crate::database::query::{ + DeleteStakePoolQuery, GetStakePoolQuery, InsertOrReplaceStakePoolQuery, +}; +use crate::database::record::StakePool; +use crate::services::EpochPruningTask; + +/// Service to deal with stake pools (read & write). +pub struct StakePoolStore { + connection: Arc, + + /// Number of epochs before previous records will be pruned at the next call to + /// [save_protocol_parameters][StakePoolStore::save_stakes]. + retention_limit: Option, +} + +impl StakePoolStore { + /// Create a new StakePool service + pub fn new(connection: Arc, retention_limit: Option) -> Self { + Self { + connection, + retention_limit, + } + } +} + +#[async_trait] +impl StakeStorer for StakePoolStore { + async fn save_stakes( + &self, + epoch: Epoch, + stakes: StakeDistribution, + ) -> StdResult> { + // We should create a transaction including delete and insert but it's not possible + // with the current implementation because the connection is shared. + self.connection + .apply(DeleteStakePoolQuery::by_epoch(epoch)?) + .with_context(|| format!("delete stakes failure, epoch: {epoch}"))?; + + let pools: Vec = self + .connection + .fetch_collect(InsertOrReplaceStakePoolQuery::many( + stakes + .into_iter() + .map(|(pool_id, stake)| (pool_id, epoch, stake)) + .collect(), + )) + .with_context(|| format!("persist stakes failure, epoch: {epoch}"))?; + + Ok(Some(StakeDistribution::from_iter( + pools.into_iter().map(|p| (p.stake_pool_id, p.stake)), + ))) + } + + async fn get_stakes(&self, epoch: Epoch) -> StdResult> { + let cursor = self + .connection + .fetch(GetStakePoolQuery::by_epoch(epoch)?) + .with_context(|| format!("get stakes failure, epoch: {epoch}"))?; + let mut stake_distribution = StakeDistribution::new(); + + for stake_pool in cursor { + stake_distribution.insert(stake_pool.stake_pool_id, stake_pool.stake); + } + + Ok(stake_distribution + .is_empty() + .not() + .then_some(stake_distribution)) + } +} + +#[async_trait] +impl StakeDistributionRetriever for StakePoolStore { + async fn retrieve(&self, epoch: Epoch) -> StdResult> { + self.get_stakes(epoch).await + } +} + +#[async_trait] +impl EpochPruningTask for StakePoolStore { + fn pruned_data(&self) -> &'static str { + "Stake pool" + } + + async fn prune(&self, epoch: Epoch) -> StdResult<()> { + if let Some(threshold) = self.retention_limit { + self.connection + .apply(DeleteStakePoolQuery::below_epoch_threshold( + epoch - threshold, + ))?; + } + Ok(()) + } +} diff --git a/mithril-signer/src/database/test_helper.rs b/mithril-signer/src/database/test_helper.rs index 6600c0a59db..0bef3b049fc 100644 --- a/mithril-signer/src/database/test_helper.rs +++ b/mithril-signer/src/database/test_helper.rs @@ -1,11 +1,16 @@ use std::path::Path; +use std::sync::Arc; +use chrono::Utc; +use mithril_common::entities::Epoch; use mithril_common::StdResult; use mithril_persistence::sqlite::{ - ConnectionBuilder, ConnectionExtensions, ConnectionOptions, SqliteConnection, + ConnectionBuilder, ConnectionExtensions, ConnectionOptions, Query, SqliteConnection, }; +use serde::Serialize; +use sqlite::Value; -use crate::database::query::InsertSignedBeaconRecordQuery; +use crate::database::query::{InsertOrReplaceStakePoolQuery, InsertSignedBeaconRecordQuery}; use crate::database::record::SignedBeaconRecord; /// In-memory sqlite database without foreign key support with migrations applied @@ -61,3 +66,90 @@ pub fn insert_signed_beacons(connection: &SqliteConnection, records: Vec StdResult<()> { + let query = { + // leverage the expanded parameter from this query which is unit + // tested on its own above. + let (sql_values, _) = + InsertOrReplaceStakePoolQuery::many(vec![("pool_id".to_string(), Epoch(1), 1000)]) + .filters() + .expand(); + + format!("insert into stake_pool {sql_values}") + }; + + // Note: decreasing stakes for pool3 so we can test that the order has changed + for (pool_id, epoch, stake) in epoch_to_insert_stake_pools.iter().flat_map(|epoch| { + [ + ("pool1", *epoch, 1000 + (epoch - 1) * 40), + ("pool2", *epoch, 1100 + (epoch - 1) * 45), + ("pool3", *epoch, 1200 - (epoch - 1) * 50), + ] + }) { + let mut statement = connection.prepare(&query)?; + statement + .bind::<&[(_, Value)]>(&[ + (1, pool_id.to_string().into()), + (2, Value::Integer(epoch)), + (3, Value::Integer(stake)), + (4, Utc::now().to_rfc3339().into()), + ]) + .unwrap(); + statement.next().unwrap(); + } + + Ok(()) +} + +/// A simple struct that help to initialize database with the old adapter behavior for testing purposes. +pub struct FakeStoreAdapter { + connection: Arc, + table: &'static str, +} + +impl FakeStoreAdapter { + pub fn new(connection: Arc, table: &'static str) -> Self { + Self { connection, table } + } + + pub fn create_table(&self) { + let sql = format!( + "create table {} (key_hash text primary key, key json not null, value json not null)", + self.table + ); + self.connection.execute(sql).unwrap(); + } + + pub fn is_key_hash_exist(&self, key_hash: &str) -> bool { + let sql = format!( + "select exists(select 1 from {} where key_hash = ?1) as record_exists", + self.table + ); + let parameters = [Value::String(key_hash.to_string())]; + let result: i64 = self.connection.query_single_cell(sql, ¶meters).unwrap(); + result == 1 + } + + pub fn store_record( + &self, + key_hash: &str, + key: &K, + record: &V, + ) -> StdResult<()> { + let sql = format!( + "insert into {} (key_hash, key, value) values (?1, ?2, ?3) on conflict (key_hash) do update set value = excluded.value", + self.table + ); + let mut statement = self.connection.prepare(sql)?; + statement.bind((1, key_hash))?; + statement.bind((2, serde_json::to_string(&key)?.as_str()))?; + statement.bind((3, serde_json::to_string(record)?.as_str()))?; + let _ = statement.next()?; + + Ok(()) + } +} diff --git a/mithril-signer/src/database/tests/mod.rs b/mithril-signer/src/database/tests/mod.rs new file mode 100644 index 00000000000..bca47c18311 --- /dev/null +++ b/mithril-signer/src/database/tests/mod.rs @@ -0,0 +1,2 @@ +mod protocol_initializer; +mod stake_pool; diff --git a/mithril-signer/src/database/tests/protocol_initializer.rs b/mithril-signer/src/database/tests/protocol_initializer.rs new file mode 100644 index 00000000000..9c8ce72e899 --- /dev/null +++ b/mithril-signer/src/database/tests/protocol_initializer.rs @@ -0,0 +1,229 @@ +use std::sync::Arc; + +use mithril_common::test_utils::fake_data; +use mithril_common::{crypto_helper::ProtocolInitializer, entities::Epoch}; +use mithril_persistence::sqlite::{ConnectionBuilder, ConnectionExtensions}; + +use crate::database::repository::ProtocolInitializerRepository; +use crate::database::test_helper::{main_db_connection, FakeStoreAdapter}; +use crate::services::EpochPruningTask; +use crate::store::ProtocolInitializerStorer; + +fn setup_protocol_initializers(nb_epoch: u64) -> Vec<(Epoch, ProtocolInitializer)> { + let mut values: Vec<(Epoch, ProtocolInitializer)> = Vec::new(); + for epoch in 1..=nb_epoch { + let stake = (epoch + 1) * 100; + let protocol_initializer = fake_data::protocol_initializer("1", stake); + values.push((Epoch(epoch), protocol_initializer)); + } + values +} + +async fn init_store( + values: &[(Epoch, ProtocolInitializer)], + retention_limit: Option, +) -> ProtocolInitializerRepository { + let store = ProtocolInitializerRepository::new( + Arc::new(main_db_connection().unwrap()), + retention_limit, + ); + + store_protocol_initializers(&store, values).await; + + store +} + +async fn store_protocol_initializers( + store: &ProtocolInitializerRepository, + values: &[(Epoch, ProtocolInitializer)], +) { + for value in values.iter() { + store + .save_protocol_initializer(value.0, value.1.clone()) + .await + .unwrap(); + } +} + +mod request { + use super::*; + + #[tokio::test] + async fn save_key_in_empty_store_return_none_as_previous_value() { + let protocol_initializers = setup_protocol_initializers(1); + + let store = init_store(&[], None).await; + let res = store + .save_protocol_initializer( + protocol_initializers[0].0, + protocol_initializers[0].1.clone(), + ) + .await + .unwrap(); + + assert!(res.is_none()); + } + + #[tokio::test] + async fn update_protocol_initializer_in_store_return_previous_value() { + let protocol_initializers = setup_protocol_initializers(2); + let store = init_store(&protocol_initializers[0..1], None).await; + + let res = store + .save_protocol_initializer( + protocol_initializers[0].0, + protocol_initializers[1].1.clone(), + ) + .await + .unwrap(); + + assert!(res.is_some()); + assert_eq!( + protocol_initializers[0].1.get_stake(), + res.unwrap().get_stake() + ); + } + + #[tokio::test] + async fn get_protocol_initializer_for_empty_epoch() { + let store = init_store(&setup_protocol_initializers(2), None).await; + + let res = store.get_protocol_initializer(Epoch(0)).await.unwrap(); + + assert!(res.is_none()); + } + + #[tokio::test] + async fn get_protocol_initializer_for_existing_epoch() { + let store = init_store(&setup_protocol_initializers(2), None).await; + + let res = store.get_protocol_initializer(Epoch(1)).await.unwrap(); + + assert!(res.is_some()); + } + + #[tokio::test] + async fn get_last_protocol_initializer_return_last_one_first() { + let values = setup_protocol_initializers(10); + let store = init_store(&values, None).await; + + let res = store.get_last_protocol_initializer(3).await.unwrap(); + + assert_eq!(3, res.len()); + assert_eq!(values[9].0, res[0].0); + assert_eq!(values[8].0, res[1].0); + assert_eq!(values[7].0, res[2].0); + } + + #[tokio::test] + async fn get_last_protocol_initializer_return_all_when_too_few_records() { + let values = setup_protocol_initializers(2); + let store = init_store(&values, None).await; + + let res = store.get_last_protocol_initializer(3).await.unwrap(); + + assert_eq!(2, res.len()); + assert_eq!(values[1].0, res[0].0); + assert_eq!(values[0].0, res[1].0); + } +} + +mod pruning { + use super::*; + + async fn get_epochs_in_database(store: &ProtocolInitializerRepository) -> Vec { + let result = store.get_last_protocol_initializer(10).await.unwrap(); + result.into_iter().map(|(epoch, _)| epoch).collect() + } + + #[tokio::test] + async fn prune_epoch_older_than_threshold() { + const PROTOCOL_INITIALIZER_PRUNE_EPOCH_THRESHOLD: u64 = 10; + + let nb_epochs = 5; + let store = init_store( + &setup_protocol_initializers(nb_epochs), + Some(PROTOCOL_INITIALIZER_PRUNE_EPOCH_THRESHOLD), + ) + .await; + + assert_eq!( + vec!(Epoch(5), Epoch(4), Epoch(3), Epoch(2), Epoch(1)), + get_epochs_in_database(&store).await + ); + + let current_epoch = Epoch(4) + PROTOCOL_INITIALIZER_PRUNE_EPOCH_THRESHOLD; + store.prune(current_epoch).await.unwrap(); + + assert_eq!( + vec!(Epoch(5), Epoch(4)), + get_epochs_in_database(&store).await + ); + } + + #[tokio::test] + async fn without_threshold_nothing_is_pruned() { + let nb_epochs = 5; + let store = init_store(&setup_protocol_initializers(nb_epochs), None).await; + + store.prune(Epoch(100)).await.unwrap(); + + let result = store.get_last_protocol_initializer(10).await.unwrap(); + assert_eq!(nb_epochs as usize, result.len()); + } +} + +mod migration { + use super::*; + + #[tokio::test] + async fn should_migrate_data_from_adapter() { + let migrations = crate::database::migration::get_migrations(); + + let connection = Arc::new(ConnectionBuilder::open_memory().build().unwrap()); + let protocol_initializer_adapter = + FakeStoreAdapter::new(connection.clone(), "protocol_initializer"); + protocol_initializer_adapter.create_table(); + + assert!(connection + .prepare("select key_hash from protocol_initializer;") + .is_ok()); + + // Here we can add some data with the old schema. + let (_, protocol_initializer_to_retrieve) = &setup_protocol_initializers(1)[0]; + + assert!(!protocol_initializer_adapter.is_key_hash_exist("HashEpoch5")); + + protocol_initializer_adapter + .store_record("HashEpoch5", &Epoch(5), protocol_initializer_to_retrieve) + .unwrap(); + + assert!(protocol_initializer_adapter.is_key_hash_exist("HashEpoch5")); + + // We finish the migration + ConnectionBuilder::open_memory() + .apply_migrations(&connection, migrations) + .unwrap(); + + assert!(connection + .prepare("select key_hash from protocol_initializer;") + .is_err()); + assert!(connection + .prepare("select * from protocol_initializer;") + .is_ok()); + + let value: i64 = connection + .query_single_cell("select count(*) from protocol_initializer", &[]) + .unwrap(); + assert_eq!(value, 1); + + // We can check that data are migrated. + let store = ProtocolInitializerRepository::new(connection, None); + let protocol_initializer = store.get_protocol_initializer(Epoch(5)).await.unwrap(); + + assert_eq!( + protocol_initializer.unwrap().get_stake(), + protocol_initializer_to_retrieve.get_stake() + ); + } +} diff --git a/mithril-signer/src/database/tests/stake_pool.rs b/mithril-signer/src/database/tests/stake_pool.rs new file mode 100644 index 00000000000..000cc79d31e --- /dev/null +++ b/mithril-signer/src/database/tests/stake_pool.rs @@ -0,0 +1,290 @@ +use std::sync::Arc; + +use mithril_common::entities::{Epoch, StakeDistribution}; +use mithril_common::signable_builder::StakeDistributionRetriever; +use mithril_persistence::sqlite::ConnectionBuilder; +use mithril_persistence::store::StakeStorer; + +use crate::database::repository::StakePoolStore; +use crate::database::test_helper::{insert_stake_pool, main_db_connection, FakeStoreAdapter}; +use crate::services::EpochPruningTask; + +mod request { + + use super::*; + + #[tokio::test] + async fn retrieve_with_no_stakes_returns_none() { + let connection = main_db_connection().unwrap(); + let store = StakePoolStore::new(Arc::new(connection), None); + + let result = store.retrieve(Epoch(1)).await.unwrap(); + + assert!(result.is_none()); + } + + #[tokio::test] + async fn retrieve_returns_stake_distribution() { + let stake_distribution_epoch_100 = + StakeDistribution::from([("pool-A".to_string(), 1000), ("pool-B".to_string(), 1200)]); + let stake_distribution_epoch_200 = StakeDistribution::from([ + ("pool-A".to_string(), 2500), + ("pool-B".to_string(), 2000), + ("pool-C".to_string(), 2600), + ]); + let connection = main_db_connection().unwrap(); + let store = StakePoolStore::new(Arc::new(connection), None); + store + .save_stakes(Epoch(100), stake_distribution_epoch_100.clone()) + .await + .unwrap(); + store + .save_stakes(Epoch(200), stake_distribution_epoch_200.clone()) + .await + .unwrap(); + + { + let stake_distribution_in_database = store.retrieve(Epoch(100)).await.unwrap().unwrap(); + + assert_eq!(2, stake_distribution_in_database.len()); + assert_eq!(1000, stake_distribution_in_database["pool-A"]); + assert_eq!(1200, stake_distribution_in_database["pool-B"]); + } + + { + let stake_distribution_in_database = store.retrieve(Epoch(200)).await.unwrap().unwrap(); + + assert_eq!(3, stake_distribution_in_database.len()); + assert_eq!(2500, stake_distribution_in_database["pool-A"]); + assert_eq!(2000, stake_distribution_in_database["pool-B"]); + assert_eq!(2600, stake_distribution_in_database["pool-C"]); + } + } + + #[tokio::test] + async fn save_stake_distribution_return_inserted_records() { + let epoch = Epoch(100); + + let connection = main_db_connection().unwrap(); + let store = StakePoolStore::new(Arc::new(connection), None); + + { + let stake_distribution = StakeDistribution::from([ + ("pool-A".to_string(), 1000), + ("pool-B".to_string(), 1200), + ]); + + let save_result = store + .save_stakes(epoch, stake_distribution.clone()) + .await + .unwrap(); + + assert_eq!(stake_distribution, save_result.unwrap()); + } + + { + let stake_distribution = StakeDistribution::from([ + ("pool-A".to_string(), 2000), + ("pool-C".to_string(), 2300), + ]); + + let save_result = store + .save_stakes(epoch, stake_distribution.clone()) + .await + .unwrap(); + + assert_eq!(stake_distribution, save_result.unwrap()); + } + } + + #[tokio::test] + async fn save_stake_distribution_replace_all_stake_for_the_epoch() { + let epoch = Epoch(100); + + let connection = main_db_connection().unwrap(); + let store = StakePoolStore::new(Arc::new(connection), None); + + { + let stake_distribution = StakeDistribution::from([ + ("pool-A".to_string(), 1000), + ("pool-B".to_string(), 1200), + ]); + store + .save_stakes(epoch, stake_distribution.clone()) + .await + .unwrap(); + + let stake_distribution_in_database = store.retrieve(epoch).await.unwrap().unwrap(); + + assert_eq!(2, stake_distribution_in_database.len()); + assert_eq!(1000, stake_distribution_in_database["pool-A"]); + assert_eq!(1200, stake_distribution_in_database["pool-B"]); + } + + { + let stake_distribution = StakeDistribution::from([ + ("pool-B".to_string(), 2000), + ("pool-C".to_string(), 2300), + ]); + store + .save_stakes(epoch, stake_distribution.clone()) + .await + .unwrap(); + + let stake_distribution_in_database = store.retrieve(epoch).await.unwrap().unwrap(); + + assert_eq!(2, stake_distribution_in_database.len()); + assert_eq!(2000, stake_distribution_in_database["pool-B"]); + assert_eq!(2300, stake_distribution_in_database["pool-C"]); + } + } + + #[tokio::test] + async fn save_stake_distribution_do_not_change_other_epoch() { + let connection = main_db_connection().unwrap(); + let store = StakePoolStore::new(Arc::new(connection), None); + + let stake_distribution_99 = StakeDistribution::from([("pool-A".to_string(), 50)]); + store + .save_stakes(Epoch(99), stake_distribution_99.clone()) + .await + .unwrap(); + + let stake_distribution_100 = StakeDistribution::from([("pool-A".to_string(), 1000)]); + store + .save_stakes(Epoch(100), stake_distribution_100.clone()) + .await + .unwrap(); + + let stake_distribution_101 = StakeDistribution::from([("pool-A".to_string(), 5000)]); + store + .save_stakes(Epoch(101), stake_distribution_101.clone()) + .await + .unwrap(); + + { + let stake_distribution_100_updated = + StakeDistribution::from([("pool-A".to_string(), 1111)]); + store + .save_stakes(Epoch(100), stake_distribution_100_updated.clone()) + .await + .unwrap(); + + let stake_distribution_in_database = store.retrieve(Epoch(100)).await.unwrap().unwrap(); + assert_eq!( + stake_distribution_100_updated, + stake_distribution_in_database + ); + + let stake_distribution_in_database = store.retrieve(Epoch(99)).await.unwrap().unwrap(); + assert_eq!(stake_distribution_99, stake_distribution_in_database); + + let stake_distribution_in_database = store.retrieve(Epoch(101)).await.unwrap().unwrap(); + assert_eq!(stake_distribution_101, stake_distribution_in_database); + } + } +} + +mod pruning { + use super::*; + + async fn get_epochs_in_database_until( + store: &StakePoolStore, + until_epoch: Epoch, + ) -> Vec { + let mut epochs_in_database = vec![]; + let mut current_epoch = Epoch(1); + while current_epoch <= until_epoch { + if store.get_stakes(current_epoch).await.unwrap().is_some() { + epochs_in_database.push(current_epoch); + } + current_epoch += 1; + } + epochs_in_database + } + + #[tokio::test] + async fn prune_epoch_settings_older_than_threshold() { + const STAKE_POOL_PRUNE_EPOCH_THRESHOLD: u64 = 10; + + let connection = main_db_connection().unwrap(); + insert_stake_pool(&connection, &[1, 2, 3, 4, 5]).unwrap(); + let store = + StakePoolStore::new(Arc::new(connection), Some(STAKE_POOL_PRUNE_EPOCH_THRESHOLD)); + + assert_eq!( + vec!(Epoch(1), Epoch(2), Epoch(3), Epoch(4), Epoch(5)), + get_epochs_in_database_until(&store, Epoch(8)).await + ); + + let current_epoch = Epoch(4) + STAKE_POOL_PRUNE_EPOCH_THRESHOLD; + store.prune(current_epoch).await.unwrap(); + + assert_eq!( + vec!(Epoch(4), Epoch(5)), + get_epochs_in_database_until(&store, Epoch(8)).await + ); + } + + #[tokio::test] + async fn without_threshold_nothing_is_pruned() { + let connection = main_db_connection().unwrap(); + insert_stake_pool(&connection, &[1, 2]).unwrap(); + let store = StakePoolStore::new(Arc::new(connection), None); + + store.prune(Epoch(100)).await.unwrap(); + + let epoch1_stakes = store.get_stakes(Epoch(1)).await.unwrap(); + let epoch2_stakes = store.get_stakes(Epoch(2)).await.unwrap(); + + assert!( + epoch1_stakes.is_some(), + "Stakes at epoch 1 should have been pruned", + ); + assert!( + epoch2_stakes.is_some(), + "Stakes at epoch 2 should still exist", + ); + } +} + +mod migration { + use super::*; + + #[tokio::test] + async fn should_migrate_data_from_adapter() { + let migrations = crate::database::migration::get_migrations(); + + let connection = Arc::new(ConnectionBuilder::open_memory().build().unwrap()); + + let stake_adapter = FakeStoreAdapter::new(connection.clone(), "stake"); + stake_adapter.create_table(); + + assert!(connection.prepare("select * from stake;").is_ok()); + assert!(connection.prepare("select * from db_version;").is_err()); + assert!(connection.prepare("select * from stake_pool;").is_err()); + + // Here we can add some data with the old schema. + let stake_distribution_to_retrieve = + StakeDistribution::from([("pool-123".to_string(), 123)]); + + // If we don't want to use the adapter anymore, we can execute request directly. + assert!(!stake_adapter.is_key_hash_exist("HashEpoch5")); + stake_adapter + .store_record("HashEpoch5", &Epoch(5), &stake_distribution_to_retrieve) + .unwrap(); + assert!(stake_adapter.is_key_hash_exist("HashEpoch5")); + + // We finish the migration + ConnectionBuilder::open_memory() + .apply_migrations(&connection, migrations) + .unwrap(); + assert!(connection.prepare("select * from stake;").is_err()); + assert!(connection.prepare("select * from stake_pool;").is_ok()); + + // We can check that data are migrated. + let store = StakePoolStore::new(connection, None); + let stake_distribution = store.retrieve(Epoch(5)).await.unwrap(); + assert_eq!(stake_distribution, Some(stake_distribution_to_retrieve)); + } +} diff --git a/mithril-signer/src/dependency_injection/builder.rs b/mithril-signer/src/dependency_injection/builder.rs index 60cb7d4b26d..746d687e018 100644 --- a/mithril-signer/src/dependency_injection/builder.rs +++ b/mithril-signer/src/dependency_injection/builder.rs @@ -32,9 +32,10 @@ use mithril_common::{MithrilTickerService, StdResult, TickerService}; use mithril_persistence::database::repository::CardanoTransactionRepository; use mithril_persistence::database::{ApplicationNodeType, SqlMigration}; use mithril_persistence::sqlite::{ConnectionBuilder, SqliteConnection, SqliteConnectionPool}; -use mithril_persistence::store::adapter::SQLiteAdapter; -use crate::database::repository::SignedBeaconRepository; +use crate::database::repository::{ + ProtocolInitializerRepository, SignedBeaconRepository, StakePoolStore, +}; use crate::dependency_injection::SignerDependencyContainer; use crate::services::{ AggregatorHTTPClient, CardanoTransactionsImporter, @@ -43,7 +44,7 @@ use crate::services::{ SignerUpkeepService, TransactionsImporterByChunk, TransactionsImporterWithPruner, TransactionsImporterWithVacuum, }; -use crate::store::{MKTreeStoreSqlite, ProtocolInitializerStore, StakeStore}; +use crate::store::MKTreeStoreSqlite; use crate::{ Configuration, MetricsService, HTTP_REQUEST_TIMEOUT_DURATION, SQLITE_FILE, SQLITE_FILE_CARDANO_TRANSACTION, @@ -212,21 +213,20 @@ impl<'a> DependenciesBuilder<'a> { ); let signed_entity_type_lock = Arc::new(SignedEntityTypeLock::default()); - let protocol_initializer_store = Arc::new(ProtocolInitializerStore::new( - Box::new(SQLiteAdapter::new( - "protocol_initializer", - sqlite_connection.clone(), - )?), - self.config.store_retention_limit, + + let protocol_initializer_store = Arc::new(ProtocolInitializerRepository::new( + sqlite_connection.clone(), + self.config.store_retention_limit.map(|limit| limit as u64), )); + let digester = Arc::new(CardanoImmutableDigester::new( network.to_string(), self.build_digester_cache_provider().await?, self.root_logger(), )); - let stake_store = Arc::new(StakeStore::new( - Box::new(SQLiteAdapter::new("stake", sqlite_connection.clone())?), - self.config.store_retention_limit, + let stake_store = Arc::new(StakePoolStore::new( + sqlite_connection.clone(), + self.config.store_retention_limit.map(|limit| limit as u64), )); let chain_observer = { let builder = self.chain_observer_builder; diff --git a/mithril-signer/src/dependency_injection/containers.rs b/mithril-signer/src/dependency_injection/containers.rs index 4564e41446e..7171fd7cf01 100644 --- a/mithril-signer/src/dependency_injection/containers.rs +++ b/mithril-signer/src/dependency_injection/containers.rs @@ -8,15 +8,17 @@ use mithril_common::era::{EraChecker, EraReader}; use mithril_common::signable_builder::SignableBuilderService; use mithril_common::signed_entity_type_lock::SignedEntityTypeLock; use mithril_common::TickerService; + +use mithril_persistence::store::StakeStorer; use tokio::sync::RwLock; use crate::services::{ AggregatorClient, CertifierService, EpochService, SingleSigner, UpkeepService, }; -use crate::store::{ProtocolInitializerStorer, StakeStore}; +use crate::store::ProtocolInitializerStorer; use crate::MetricsService; -type StakeStoreService = Arc; +type StakeStoreService = Arc; type CertificateHandlerService = Arc; type ChainObserverService = Arc; type DigesterService = Arc; diff --git a/mithril-signer/src/runtime/runner.rs b/mithril-signer/src/runtime/runner.rs index c14eff4b07a..14882391992 100644 --- a/mithril-signer/src/runtime/runner.rs +++ b/mithril-signer/src/runtime/runner.rs @@ -10,7 +10,6 @@ use mithril_common::entities::{ }; use mithril_common::logging::LoggerExtensions; use mithril_common::StdResult; -use mithril_persistence::store::StakeStorer; use crate::dependency_injection::SignerDependencyContainer; use crate::entities::{BeaconToSign, SignerEpochSettings}; @@ -341,9 +340,7 @@ mod tests { CardanoTransactionsPreloader, CardanoTransactionsPreloaderActivation, }, chain_observer::FakeObserver, - crypto_helper::{ - MKMap, MKMapNode, MKTreeNode, MKTreeStoreInMemory, MKTreeStorer, ProtocolInitializer, - }, + crypto_helper::{MKMap, MKMapNode, MKTreeNode, MKTreeStoreInMemory, MKTreeStorer}, digesters::{DumbImmutableDigester, DumbImmutableFileObserver}, entities::{BlockNumber, BlockRange, Epoch, SignedEntityTypeDiscriminants}, era::{adapters::EraReaderBootstrapAdapter, EraChecker, EraReader}, @@ -357,10 +354,10 @@ mod tests { test_utils::{fake_data, MithrilFixtureBuilder}, MithrilTickerService, TickerService, }; - use mithril_persistence::store::adapter::{DumbStoreAdapter, MemoryAdapter}; - use mithril_persistence::store::StakeStorer; - use crate::database::repository::SignedBeaconRepository; + use crate::database::repository::{ + ProtocolInitializerRepository, SignedBeaconRepository, StakePoolStore, + }; use crate::database::test_helper::main_db_connection; use crate::metrics::MetricsService; use crate::services::{ @@ -368,7 +365,6 @@ mod tests { MithrilSingleSigner, MockTransactionStore, MockUpkeepService, SignerCertifierService, SignerSignableSeedBuilder, SignerSignedEntityConfigProvider, }; - use crate::store::{ProtocolInitializerStore, StakeStore}; use crate::test_tools::TestLogger; use super::*; @@ -404,7 +400,6 @@ mod tests { async fn init_services() -> SignerDependencyContainer { let logger = TestLogger::stdout(); let sqlite_connection = Arc::new(main_db_connection().unwrap()); - let adapter: MemoryAdapter = MemoryAdapter::new(None).unwrap(); let stake_distribution_signers = fake_data::signers_with_stakes(2); let party_id = stake_distribution_signers[1].party_id.clone(); let fake_observer = FakeObserver::default(); @@ -447,12 +442,14 @@ mod tests { transactions_importer.clone(), block_range_root_retriever, )); - let stake_store = Arc::new(StakeStore::new(Box::new(DumbStoreAdapter::new()), None)); + let stake_store = Arc::new(StakePoolStore::new(sqlite_connection.clone(), None)); let cardano_stake_distribution_builder = Arc::new( CardanoStakeDistributionSignableBuilder::new(stake_store.clone()), ); - let protocol_initializer_store = - Arc::new(ProtocolInitializerStore::new(Box::new(adapter), None)); + let protocol_initializer_store = Arc::new(ProtocolInitializerRepository::new( + sqlite_connection.clone(), + None, + )); let epoch_service = Arc::new(RwLock::new(MithrilEpochService::new( stake_store.clone(), protocol_initializer_store.clone(), diff --git a/mithril-signer/src/services/epoch_service.rs b/mithril-signer/src/services/epoch_service.rs index 98f9d0af3a7..68aba151483 100644 --- a/mithril-signer/src/services/epoch_service.rs +++ b/mithril-signer/src/services/epoch_service.rs @@ -316,20 +316,21 @@ impl SignedEntityConfigProvider for SignerSignedEntityConfigProvider { } } +#[cfg(test)] +use crate::database::repository::ProtocolInitializerRepository; + #[cfg(test)] impl MithrilEpochService { /// `TEST ONLY` - Create a new instance of the service using dumb dependencies. pub fn new_with_dumb_dependencies() -> Self { - use crate::store::ProtocolInitializerStore; - use crate::store::StakeStore; + use crate::database::repository::StakePoolStore; + use crate::database::test_helper::main_db_connection; use crate::test_tools::TestLogger; - use mithril_persistence::store::adapter::DumbStoreAdapter; - let stake_store = Arc::new(StakeStore::new(Box::new(DumbStoreAdapter::new()), None)); - let protocol_initializer_store = Arc::new(ProtocolInitializerStore::new( - Box::new(DumbStoreAdapter::new()), - None, - )); + let sqlite_connection = Arc::new(main_db_connection().unwrap()); + let stake_store = Arc::new(StakePoolStore::new(sqlite_connection.clone(), None)); + let protocol_initializer_store = + Arc::new(ProtocolInitializerRepository::new(sqlite_connection, None)); Self::new( stake_store, @@ -430,11 +431,11 @@ mod tests { use mithril_common::entities::{Epoch, StakeDistribution}; use mithril_common::test_utils::{fake_data, MithrilFixtureBuilder}; - use mithril_persistence::store::adapter::{DumbStoreAdapter, MemoryAdapter}; + use crate::database::repository::{ProtocolInitializerRepository, StakePoolStore}; + use crate::database::test_helper::main_db_connection; use crate::entities::SignerEpochSettings; use crate::services::MithrilProtocolInitializerBuilder; - use crate::store::{ProtocolInitializerStore, StakeStore}; use crate::test_tools::TestLogger; use super::*; @@ -450,11 +451,10 @@ mod tests { None, ) .unwrap(); - let stake_store = Arc::new(StakeStore::new(Box::new(DumbStoreAdapter::new()), None)); - let protocol_initializer_store = Arc::new(ProtocolInitializerStore::new( - Box::new(DumbStoreAdapter::new()), - None, - )); + let connection = Arc::new(main_db_connection().unwrap()); + let stake_store = Arc::new(StakePoolStore::new(connection.clone(), None)); + let protocol_initializer_store = + Arc::new(ProtocolInitializerRepository::new(connection, None)); let service = MithrilEpochService::new( stake_store, protocol_initializer_store, @@ -475,11 +475,11 @@ mod tests { .to_owned(); let epoch = Epoch(12); let signers = fixtures.signers(); - let stake_store = Arc::new(StakeStore::new(Box::new(DumbStoreAdapter::new()), None)); - let protocol_initializer_store = Arc::new(ProtocolInitializerStore::new( - Box::new(DumbStoreAdapter::new()), - None, - )); + + let connection = Arc::new(main_db_connection().unwrap()); + let stake_store = Arc::new(StakePoolStore::new(connection.clone(), None)); + let protocol_initializer_store = + Arc::new(ProtocolInitializerRepository::new(connection, None)); let epoch_settings = SignerEpochSettings { epoch, @@ -615,15 +615,14 @@ mod tests { .collect(); // Init stores - let stake_store = Arc::new(StakeStore::new(Box::new(DumbStoreAdapter::new()), None)); + let connection = Arc::new(main_db_connection().unwrap()); + let stake_store = Arc::new(StakePoolStore::new(connection.clone(), None)); stake_store .save_stakes(epoch, stake_distribution.clone()) .await .expect("save_stakes should not fail"); - let protocol_initializer_store = Arc::new(ProtocolInitializerStore::new( - Box::new(DumbStoreAdapter::new()), - None, - )); + let protocol_initializer_store = + Arc::new(ProtocolInitializerRepository::new(connection, None)); // Build service and register epoch settings let service = MithrilEpochService::new( @@ -649,11 +648,10 @@ mod tests { let signers = fake_data::signers(10); // Init stores - let stake_store = Arc::new(StakeStore::new(Box::new(DumbStoreAdapter::new()), None)); - let protocol_initializer_store = Arc::new(ProtocolInitializerStore::new( - Box::new(DumbStoreAdapter::new()), - None, - )); + let connection = Arc::new(main_db_connection().unwrap()); + let stake_store = Arc::new(StakePoolStore::new(connection.clone(), None)); + let protocol_initializer_store = + Arc::new(ProtocolInitializerRepository::new(connection, None)); // Epoch settings let epoch_settings = SignerEpochSettings { @@ -740,26 +738,27 @@ mod tests { let stake_distribution: StakeDistribution = build_stake_distribution(&signers, 100); let next_stake_distribution: StakeDistribution = build_stake_distribution(&signers, 500); - let stake_store = Arc::new(StakeStore::new( - Box::new( - MemoryAdapter::::new(Some(vec![ - ( - epoch.offset_to_signer_retrieval_epoch().unwrap(), - stake_distribution.clone(), - ), - ( - epoch.offset_to_next_signer_retrieval_epoch(), - next_stake_distribution.clone(), - ), - ])) - .unwrap(), - ), - None, - )); - let protocol_initializer_store = Arc::new(ProtocolInitializerStore::new( - Box::new(DumbStoreAdapter::new()), - None, - )); + let connection = Arc::new(main_db_connection().unwrap()); + let stake_store = { + let store = Arc::new(StakePoolStore::new(connection.clone(), None)); + store + .save_stakes( + epoch.offset_to_signer_retrieval_epoch().unwrap(), + stake_distribution.clone(), + ) + .await + .unwrap(); + store + .save_stakes( + epoch.offset_to_next_signer_retrieval_epoch(), + next_stake_distribution.clone(), + ) + .await + .unwrap(); + store + }; + let protocol_initializer_store = + Arc::new(ProtocolInitializerRepository::new(connection, None)); // Epoch settings let epoch_settings = SignerEpochSettings { @@ -807,17 +806,17 @@ mod tests { async fn test_protocol_initializer_is_available_after_register_epoch_settings_call_if_in_store() { let epoch = Epoch(12); - let stake_store = Arc::new(StakeStore::new(Box::new(DumbStoreAdapter::new()), None)); - let protocol_initializer_store = Arc::new(ProtocolInitializerStore::new( - Box::new( - MemoryAdapter::new(Some(vec![( - epoch.offset_to_signer_retrieval_epoch().unwrap(), - fake_data::protocol_initializer("seed", 1245), - )])) - .unwrap(), - ), - None, - )); + let connection = Arc::new(main_db_connection().unwrap()); + let stake_store = Arc::new(StakePoolStore::new(connection.clone(), None)); + let protocol_initializer_store = + Arc::new(ProtocolInitializerRepository::new(connection, None)); + protocol_initializer_store + .save_protocol_initializer( + epoch.offset_to_signer_retrieval_epoch().unwrap(), + fake_data::protocol_initializer("seed", 1245), + ) + .await + .unwrap(); let mut service = MithrilEpochService::new( stake_store, @@ -842,14 +841,10 @@ mod tests { #[tokio::test] async fn is_source_of_signed_entity_config() { - let stake_store = Arc::new(StakeStore::new( - Box::new(MemoryAdapter::::new(None).unwrap()), - None, - )); - let protocol_initializer_store = Arc::new(ProtocolInitializerStore::new( - Box::new(DumbStoreAdapter::new()), - None, - )); + let connection = Arc::new(main_db_connection().unwrap()); + let stake_store = Arc::new(StakePoolStore::new(connection.clone(), None)); + let protocol_initializer_store = + Arc::new(ProtocolInitializerRepository::new(connection, None)); let epoch_service = Arc::new(RwLock::new(MithrilEpochService::new( stake_store, protocol_initializer_store, diff --git a/mithril-signer/src/services/single_signer.rs b/mithril-signer/src/services/single_signer.rs index 554e4e1e8d9..418e09ce0a1 100644 --- a/mithril-signer/src/services/single_signer.rs +++ b/mithril-signer/src/services/single_signer.rs @@ -172,14 +172,14 @@ mod tests { use std::sync::Arc; use tokio::sync::RwLock; + use crate::database::repository::{ProtocolInitializerRepository, StakePoolStore}; + use crate::database::test_helper::main_db_connection; + use crate::services::MithrilEpochService; + use crate::test_tools::TestLogger; use mithril_common::crypto_helper::ProtocolClerk; use mithril_common::entities::{Epoch, ProtocolMessagePartKey}; use mithril_common::test_utils::MithrilFixtureBuilder; - use mithril_persistence::store::adapter::{DumbStoreAdapter, MemoryAdapter}; - - use crate::services::MithrilEpochService; - use crate::store::{ProtocolInitializerStore, StakeStore}; - use crate::test_tools::TestLogger; + use mithril_persistence::store::StakeStorer; use super::*; @@ -191,20 +191,20 @@ mod tests { let clerk = ProtocolClerk::from_signer(¤t_signer.protocol_signer); let avk = clerk.compute_avk(); let logger = TestLogger::stdout(); - let stake_store = Arc::new(StakeStore::new( - Box::new( - MemoryAdapter::new(Some(vec![( + let connection = Arc::new(main_db_connection().unwrap()); + let stake_store = { + let store = Arc::new(StakePoolStore::new(connection.clone(), None)); + store + .save_stakes( Epoch(10).offset_to_signer_retrieval_epoch().unwrap(), fixture.stake_distribution(), - )])) - .unwrap(), - ), - None, - )); - let protocol_initializer_store = Arc::new(ProtocolInitializerStore::new( - Box::new(DumbStoreAdapter::new()), - None, - )); + ) + .await + .unwrap(); + store + }; + let protocol_initializer_store = + Arc::new(ProtocolInitializerRepository::new(connection, None)); let epoch_service = MithrilEpochService::new(stake_store, protocol_initializer_store, logger.clone()) .set_data_to_default_or_fake(Epoch(10)) diff --git a/mithril-signer/src/store/mod.rs b/mithril-signer/src/store/mod.rs index a7784730e95..ebcd46647c9 100644 --- a/mithril-signer/src/store/mod.rs +++ b/mithril-signer/src/store/mod.rs @@ -2,8 +2,6 @@ mod mktree_store_sqlite; mod protocol_initializer_store; -mod stake_store; pub use mktree_store_sqlite::*; pub use protocol_initializer_store::*; -pub use stake_store::*; diff --git a/mithril-signer/src/store/protocol_initializer_store.rs b/mithril-signer/src/store/protocol_initializer_store.rs index a76fe31e564..139e15d8e93 100644 --- a/mithril-signer/src/store/protocol_initializer_store.rs +++ b/mithril-signer/src/store/protocol_initializer_store.rs @@ -1,12 +1,6 @@ use async_trait::async_trait; -use tokio::sync::RwLock; use mithril_common::{crypto_helper::ProtocolInitializer, entities::Epoch, StdResult}; -use mithril_persistence::store::{adapter::StoreAdapter, StorePruner}; - -use crate::services::EpochPruningTask; - -type Adapter = Box>; #[cfg_attr(test, mockall::automock)] #[async_trait] @@ -32,188 +26,3 @@ pub trait ProtocolInitializerStorer: Sync + Send { last: usize, ) -> StdResult>; } -/// Implementation of the ProtocolInitializerStorer -pub struct ProtocolInitializerStore { - adapter: RwLock, - retention_limit: Option, -} - -impl ProtocolInitializerStore { - /// Create a new ProtocolInitializerStore. - pub fn new(adapter: Adapter, retention_limit: Option) -> Self { - Self { - adapter: RwLock::new(adapter), - retention_limit, - } - } -} - -#[async_trait] -impl EpochPruningTask for ProtocolInitializerStore { - fn pruned_data(&self) -> &'static str { - "Protocol initializer" - } - - async fn prune(&self, _epoch: Epoch) -> StdResult<()> { - mithril_persistence::store::StorePruner::prune(self).await - } -} - -#[async_trait] -impl StorePruner for ProtocolInitializerStore { - type Key = Epoch; - type Record = ProtocolInitializer; - - fn get_adapter( - &self, - ) -> &RwLock>> { - &self.adapter - } - - fn get_max_records(&self) -> Option { - self.retention_limit - } -} - -#[async_trait] -impl ProtocolInitializerStorer for ProtocolInitializerStore { - async fn save_protocol_initializer( - &self, - epoch: Epoch, - protocol_initializer: ProtocolInitializer, - ) -> StdResult> { - let previous_protocol_initializer = self.adapter.read().await.get_record(&epoch).await?; - self.adapter - .write() - .await - .store_record(&epoch, &protocol_initializer) - .await?; - - Ok(previous_protocol_initializer) - } - - async fn get_protocol_initializer( - &self, - epoch: Epoch, - ) -> StdResult> { - let record = self.adapter.read().await.get_record(&epoch).await?; - Ok(record) - } - - async fn get_last_protocol_initializer( - &self, - last: usize, - ) -> StdResult> { - let records = self.adapter.read().await.get_last_n_records(last).await?; - - Ok(records) - } -} - -#[cfg(test)] -mod tests { - use mithril_common::test_utils::fake_data; - use mithril_persistence::store::adapter::MemoryAdapter; - - use super::*; - - fn setup_protocol_initializers(nb_epoch: u64) -> Vec<(Epoch, ProtocolInitializer)> { - let mut values: Vec<(Epoch, ProtocolInitializer)> = Vec::new(); - for epoch in 1..=nb_epoch { - let stake = (epoch + 1) * 100; - let protocol_initializer = fake_data::protocol_initializer("1", stake); - values.push((Epoch(epoch), protocol_initializer)); - } - values - } - - fn init_store(nb_epoch: u64, retention_limit: Option) -> ProtocolInitializerStore { - let values = setup_protocol_initializers(nb_epoch); - - let values = if !values.is_empty() { - Some(values) - } else { - None - }; - let adapter: MemoryAdapter = - MemoryAdapter::new(values).unwrap(); - ProtocolInitializerStore::new(Box::new(adapter), retention_limit) - } - - #[tokio::test] - async fn save_key_in_empty_store() { - let protocol_initializers = setup_protocol_initializers(1); - let store = init_store(0, None); - let res = store - .save_protocol_initializer( - protocol_initializers[0].0, - protocol_initializers[0].1.clone(), - ) - .await - .unwrap(); - - assert!(res.is_none()); - } - - #[tokio::test] - async fn update_protocol_initializer_in_store() { - let protocol_initializers = setup_protocol_initializers(2); - let store = init_store(1, None); - let res = store - .save_protocol_initializer( - protocol_initializers[0].0, - protocol_initializers[1].1.clone(), - ) - .await - .unwrap(); - - assert!(res.is_some()); - assert_eq!( - protocol_initializers[0].1.get_stake(), - res.unwrap().get_stake() - ); - } - - #[tokio::test] - async fn get_protocol_initializer_for_empty_epoch() { - let store = init_store(2, None); - let res = store.get_protocol_initializer(Epoch(0)).await.unwrap(); - - assert!(res.is_none()); - } - - #[tokio::test] - async fn get_protocol_initializer_for_existing_epoch() { - let store = init_store(2, None); - let res = store.get_protocol_initializer(Epoch(1)).await.unwrap(); - - assert!(res.is_some()); - } - - #[tokio::test] - async fn check_retention_limit() { - let store = init_store(3, Some(2)); - let _protocol_initializers = setup_protocol_initializers(1); - - assert!(store - .get_protocol_initializer(Epoch(1)) - .await - .unwrap() - .is_some()); - - // Whatever the epoch, it's the retention limit that matters. - EpochPruningTask::prune(&store, Epoch(99)).await.unwrap(); - - assert!(store - .get_protocol_initializer(Epoch(1)) - .await - .unwrap() - .is_none()); - - assert!(store - .get_protocol_initializer(Epoch(2)) - .await - .unwrap() - .is_some()); - } -} diff --git a/mithril-signer/src/store/stake_store.rs b/mithril-signer/src/store/stake_store.rs deleted file mode 100644 index e78456414a2..00000000000 --- a/mithril-signer/src/store/stake_store.rs +++ /dev/null @@ -1,203 +0,0 @@ -use async_trait::async_trait; -use mithril_common::entities::{Epoch, StakeDistribution}; -use mithril_common::signable_builder::StakeDistributionRetriever; -use mithril_common::StdResult; -use mithril_persistence::store::StakeStorer; -use tokio::sync::RwLock; - -use mithril_persistence::store::{adapter::StoreAdapter, StorePruner}; - -use crate::services::EpochPruningTask; - -type Adapter = Box>; - -/// A [StakeStorer] that uses a [StoreAdapter] to store data. -pub struct StakeStore { - adapter: RwLock, - retention_limit: Option, -} - -impl StakeStore { - /// StakeStore factory - pub fn new(adapter: Adapter, retention_limit: Option) -> Self { - Self { - adapter: RwLock::new(adapter), - retention_limit, - } - } -} - -#[async_trait] -impl EpochPruningTask for StakeStore { - fn pruned_data(&self) -> &'static str { - "Stake" - } - - async fn prune(&self, _epoch: Epoch) -> StdResult<()> { - mithril_persistence::store::StorePruner::prune(self).await - } -} - -#[async_trait] -impl StorePruner for StakeStore { - type Key = Epoch; - type Record = StakeDistribution; - - fn get_adapter( - &self, - ) -> &RwLock>> { - &self.adapter - } - - fn get_max_records(&self) -> Option { - self.retention_limit - } -} - -#[async_trait] -impl StakeStorer for StakeStore { - async fn save_stakes( - &self, - epoch: Epoch, - stakes: StakeDistribution, - ) -> StdResult> { - let signers = { - let mut adapter = self.adapter.write().await; - let signers = adapter.get_record(&epoch).await?; - adapter.store_record(&epoch, &stakes).await?; - - signers - }; - - Ok(signers) - } - - async fn get_stakes(&self, epoch: Epoch) -> StdResult> { - Ok(self.adapter.read().await.get_record(&epoch).await?) - } -} - -#[async_trait] -impl StakeDistributionRetriever for StakeStore { - async fn retrieve(&self, epoch: Epoch) -> StdResult> { - let stake_distribution = self.get_stakes(epoch).await?; - - Ok(stake_distribution) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use mithril_persistence::store::adapter::MemoryAdapter; - - fn init_store( - nb_epoch: u64, - signers_per_epoch: u64, - retention_limit: Option, - ) -> StakeStore { - let mut values: Vec<(Epoch, StakeDistribution)> = Vec::new(); - - for epoch in 1..=nb_epoch { - let mut signers: StakeDistribution = StakeDistribution::new(); - - for party_idx in 1..=signers_per_epoch { - let party_id = format!("{party_idx}"); - signers.insert(party_id.clone(), 100 * party_idx + 1); - } - values.push((Epoch(epoch), signers)); - } - - let values = if !values.is_empty() { - Some(values) - } else { - None - }; - let adapter: MemoryAdapter = MemoryAdapter::new(values).unwrap(); - StakeStore::new(Box::new(adapter), retention_limit) - } - - #[tokio::test] - async fn save_key_in_empty_store() { - let store = init_store(0, 0, None); - let res = store - .save_stakes(Epoch(1), StakeDistribution::from([("1".to_string(), 123)])) - .await - .expect("Test adapter should not fail."); - - assert!(res.is_none()); - } - - #[tokio::test] - async fn update_signer_in_store() { - let store = init_store(1, 1, None); - let res = store - .save_stakes(Epoch(1), StakeDistribution::from([("1".to_string(), 123)])) - .await - .expect("Test adapter should not fail."); - - assert_eq!( - StakeDistribution::from([("1".to_string(), 101)]), - res.expect("the result should not be empty"), - ); - } - - #[tokio::test] - async fn get_stakes_for_empty_epoch() { - let store = init_store(2, 1, None); - let res = store - .get_stakes(Epoch(0)) - .await - .expect("Test adapter should not fail."); - - assert!(res.is_none()); - } - - #[tokio::test] - async fn get_stakes_for_existing_epoch() { - let store = init_store(2, 2, None); - let res = store - .get_stakes(Epoch(1)) - .await - .expect("Test adapter should not fail."); - - assert!(res.is_some()); - assert_eq!(2, res.expect("Query result should not be empty.").len()); - } - - #[tokio::test] - async fn check_retention_limit() { - let store = init_store(3, 2, Some(2)); - assert!(store.get_stakes(Epoch(1)).await.unwrap().is_some()); - - // Whatever the epoch, it's the retention limit that matters. - EpochPruningTask::prune(&store, Epoch(99)).await.unwrap(); - assert!(store.get_stakes(Epoch(1)).await.unwrap().is_none()); - assert!(store.get_stakes(Epoch(2)).await.unwrap().is_some()); - assert!(store.get_stakes(Epoch(3)).await.unwrap().is_some()); - } - - #[tokio::test] - async fn retrieve_with_no_stakes_returns_none() { - let store = init_store(0, 0, None); - - let result = store.retrieve(Epoch(1)).await.unwrap(); - - assert!(result.is_none()); - } - - #[tokio::test] - async fn retrieve_returns_stake_distribution() { - let stake_distribution_to_retrieve = - StakeDistribution::from([("pool-123".to_string(), 123)]); - let store = init_store(0, 0, None); - store - .save_stakes(Epoch(1), stake_distribution_to_retrieve.clone()) - .await - .unwrap(); - - let stake_distribution = store.retrieve(Epoch(1)).await.unwrap(); - - assert_eq!(stake_distribution, Some(stake_distribution_to_retrieve)); - } -} diff --git a/mithril-signer/tests/test_extensions/state_machine_tester.rs b/mithril-signer/tests/test_extensions/state_machine_tester.rs index 1915748f8a9..812e1c36d36 100644 --- a/mithril-signer/tests/test_extensions/state_machine_tester.rs +++ b/mithril-signer/tests/test_extensions/state_machine_tester.rs @@ -37,18 +37,18 @@ use mithril_common::{ }; use mithril_persistence::{ database::repository::CardanoTransactionRepository, sqlite::SqliteConnectionPool, - store::adapter::SQLiteAdapter, store::StakeStorer, + store::StakeStorer, }; use mithril_signer::{ - database::repository::SignedBeaconRepository, + database::repository::{ProtocolInitializerRepository, SignedBeaconRepository, StakePoolStore}, dependency_injection::{DependenciesBuilder, SignerDependencyContainer}, services::{ AggregatorClient, CardanoTransactionsImporter, MithrilEpochService, MithrilSingleSigner, SignerCertifierService, SignerSignableSeedBuilder, SignerSignedEntityConfigProvider, SignerUpkeepService, }, - store::{MKTreeStoreSqlite, ProtocolInitializerStore, ProtocolInitializerStorer, StakeStore}, + store::{MKTreeStoreSqlite, ProtocolInitializerStorer}, Configuration, MetricsService, RuntimeError, SignerRunner, SignerState, StateMachine, }; @@ -77,8 +77,8 @@ pub struct StateMachineTester { immutable_observer: Arc, chain_observer: Arc, certificate_handler: Arc, - protocol_initializer_store: Arc, - stake_store: Arc, + protocol_initializer_store: Arc, + stake_store: Arc, era_checker: Arc, era_reader_adapter: Arc, block_scanner: Arc, @@ -158,15 +158,13 @@ impl StateMachineTester { ticker_service.clone(), )); let digester = Arc::new(DumbImmutableDigester::new("DIGEST", true)); - let protocol_initializer_store = Arc::new(ProtocolInitializerStore::new( - Box::new( - SQLiteAdapter::new("protocol_initializer", sqlite_connection.clone()).unwrap(), - ), - config.store_retention_limit, + let protocol_initializer_store = Arc::new(ProtocolInitializerRepository::new( + sqlite_connection.clone(), + config.store_retention_limit.map(|limit| limit as u64), )); - let stake_store = Arc::new(StakeStore::new( - Box::new(SQLiteAdapter::new("stake", sqlite_connection.clone()).unwrap()), - config.store_retention_limit, + let stake_store = Arc::new(StakePoolStore::new( + sqlite_connection.clone(), + config.store_retention_limit.map(|limit| limit as u64), )); let era_reader_adapter = Arc::new(EraReaderDummyAdapter::from_markers(vec![ (EraMarker { @@ -448,7 +446,7 @@ impl StateMachineTester { .map_err(TestError::SubsystemError)?; self.assert(maybe_protocol_initializer.is_some(), format!( - "there should be a protocol intializer in store for Epoch {}, here is the last 3 in store: {:?}", + "there should be a protocol initializer in store for Epoch {}, here is the last 3 in store: {:?}", epoch, self.protocol_initializer_store .get_last_protocol_initializer(2)