diff --git a/mithril-aggregator/src/database/provider/epoch_setting.rs b/mithril-aggregator/src/database/provider/epoch_setting.rs index ae523d4b0ad..1a15b95a667 100644 --- a/mithril-aggregator/src/database/provider/epoch_setting.rs +++ b/mithril-aggregator/src/database/provider/epoch_setting.rs @@ -17,9 +17,6 @@ use crate::ProtocolParametersStorer; use mithril_common::StdError; use tokio::sync::Mutex; -/// Delete epoch settings for Epoch older than this. -const EPOCH_SETTING_PRUNE_EPOCH_THRESHOLD: Epoch = Epoch(10); - /// Settings for an epoch, including the protocol parameters. #[derive(Debug, PartialEq)] pub struct EpochSettingRecord { @@ -250,12 +247,19 @@ impl<'conn> DeleteEpochSettingProvider<'conn> { /// Service to deal with epoch settings (read & write). pub struct EpochSettingStore { connection: Arc>, + + /// Number of epochs before previous records will be pruned at the next call to + /// [save_protocol_parameters][EpochSettingStore::save_protocol_parameters]. + retention_limit: Option, } impl EpochSettingStore { /// Create a new EpochSetting service - pub fn new(connection: Arc>) -> Self { - Self { connection } + pub fn new(connection: Arc>, retention_limit: Option) -> Self { + Self { + connection, + retention_limit, + } } } @@ -277,10 +281,12 @@ impl ProtocolParametersStorer for EpochSettingStore { .map_err(|e| AdapterError::GeneralError(format!("{e}")))?; // Prune useless old epoch settings. - let _ = DeleteEpochSettingProvider::new(connection) - .prune(epoch - EPOCH_SETTING_PRUNE_EPOCH_THRESHOLD) - .map_err(AdapterError::QueryError)? - .count(); + if let Some(threshold) = self.retention_limit { + let _ = DeleteEpochSettingProvider::new(connection) + .prune(epoch - threshold) + .map_err(AdapterError::QueryError)? + .count(); + } connection .execute("commit transaction") @@ -519,8 +525,12 @@ mod tests { #[tokio::test] async fn save_protocol_parameters_prune_older_epoch_settings() { let connection = Connection::open(":memory:").unwrap(); + const EPOCH_SETTING_PRUNE_EPOCH_THRESHOLD: u64 = 5; setup_epoch_setting_db(&connection, &[1, 2]).unwrap(); - let store = EpochSettingStore::new(Arc::new(Mutex::new(connection))); + let store = EpochSettingStore::new( + Arc::new(Mutex::new(connection)), + Some(EPOCH_SETTING_PRUNE_EPOCH_THRESHOLD), + ); store .save_protocol_parameters( diff --git a/mithril-aggregator/src/database/provider/stake_pool.rs b/mithril-aggregator/src/database/provider/stake_pool.rs index a4b9c593353..c5e6b1bc783 100644 --- a/mithril-aggregator/src/database/provider/stake_pool.rs +++ b/mithril-aggregator/src/database/provider/stake_pool.rs @@ -18,7 +18,6 @@ use mithril_common::StdError; use tokio::sync::Mutex; /// Delete stake pools for Epoch older than this. -const STAKE_POOL_PRUNE_EPOCH_THRESHOLD: Epoch = Epoch(10); /// Stake pool as read from Chain. #[derive(Debug, PartialEq)] @@ -234,12 +233,19 @@ impl<'conn> DeleteStakePoolProvider<'conn> { /// Service to deal with stake pools (read & write). pub struct StakePoolStore { connection: Arc>, + + /// Number of epochs before previous records will be pruned at the next call to + /// [save_protocol_parameters][StakePoolStore::save_stakes]. + retention_limit: Option, } impl StakePoolStore { /// Create a new StakePool service - pub fn new(connection: Arc>) -> Self { - Self { connection } + pub fn new(connection: Arc>, retention_limit: Option) -> Self { + Self { + connection, + retention_limit, + } } } @@ -265,10 +271,12 @@ impl StakeStorer for StakePoolStore { } // Prune useless old stake distributions. - let _ = DeleteStakePoolProvider::new(connection) - .prune(epoch - STAKE_POOL_PRUNE_EPOCH_THRESHOLD) - .map_err(AdapterError::QueryError)? - .count(); + if let Some(threshold) = self.retention_limit { + let _ = DeleteStakePoolProvider::new(connection) + .prune(epoch - threshold) + .map_err(AdapterError::QueryError)? + .count(); + } connection .execute("commit transaction") @@ -471,8 +479,12 @@ mod tests { #[tokio::test] async fn save_protocol_parameters_prune_older_epoch_settings() { let connection = Connection::open(":memory:").unwrap(); + const STAKE_POOL_PRUNE_EPOCH_THRESHOLD: u64 = 10; setup_stake_db(&connection, &[1, 2]).unwrap(); - let store = StakePoolStore::new(Arc::new(Mutex::new(connection))); + let store = StakePoolStore::new( + Arc::new(Mutex::new(connection)), + Some(STAKE_POOL_PRUNE_EPOCH_THRESHOLD), + ); store .save_stakes( diff --git a/mithril-aggregator/src/dependency_injection/builder.rs b/mithril-aggregator/src/dependency_injection/builder.rs index b5640d68747..7c50a499134 100644 --- a/mithril-aggregator/src/dependency_injection/builder.rs +++ b/mithril-aggregator/src/dependency_injection/builder.rs @@ -273,7 +273,10 @@ impl DependenciesBuilder { } async fn build_stake_store(&mut self) -> Result> { - let stake_pool_store = Arc::new(StakePoolStore::new(self.get_sqlite_connection().await?)); + let stake_pool_store = Arc::new(StakePoolStore::new( + self.get_sqlite_connection().await?, + self.configuration.store_retention_limit.map(|l| l as u64), + )); Ok(stake_pool_store) } @@ -419,6 +422,7 @@ impl DependenciesBuilder { ) -> Result> { Ok(Arc::new(EpochSettingStore::new( self.get_sqlite_connection().await?, + self.configuration.store_retention_limit.map(|l| l as u64), ))) }