Skip to content

Commit

Permalink
Use retention limit from configuration instead of constants
Browse files Browse the repository at this point in the history
This allow to synchronise pruning between tables, else pruning of data
can occurs from tables that is used as a foreign key in another.
  • Loading branch information
Alenar committed Jul 21, 2023
1 parent 951c32d commit 77730aa
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 19 deletions.
30 changes: 20 additions & 10 deletions mithril-aggregator/src/database/provider/epoch_setting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@ use crate::ProtocolParametersStorer;
use mithril_common::StdError;
use tokio::sync::Mutex;

/// Delete epoch settings for Epoch older than this.
const EPOCH_SETTING_PRUNE_EPOCH_THRESHOLD: Epoch = Epoch(10);

/// Settings for an epoch, including the protocol parameters.
#[derive(Debug, PartialEq)]
pub struct EpochSettingRecord {
Expand Down Expand Up @@ -250,12 +247,19 @@ impl<'conn> DeleteEpochSettingProvider<'conn> {
/// Service to deal with epoch settings (read & write).
pub struct EpochSettingStore {
connection: Arc<Mutex<Connection>>,

/// Number of epochs before previous records will be pruned at the next call to
/// [save_protocol_parameters][EpochSettingStore::save_protocol_parameters].
retention_limit: Option<u64>,
}

impl EpochSettingStore {
/// Create a new EpochSetting service
pub fn new(connection: Arc<Mutex<Connection>>) -> Self {
Self { connection }
pub fn new(connection: Arc<Mutex<Connection>>, retention_limit: Option<u64>) -> Self {
Self {
connection,
retention_limit,
}
}
}

Expand All @@ -277,10 +281,12 @@ impl ProtocolParametersStorer for EpochSettingStore {
.map_err(|e| AdapterError::GeneralError(format!("{e}")))?;

// Prune useless old epoch settings.
let _ = DeleteEpochSettingProvider::new(connection)
.prune(epoch - EPOCH_SETTING_PRUNE_EPOCH_THRESHOLD)
.map_err(AdapterError::QueryError)?
.count();
if let Some(threshold) = self.retention_limit {
let _ = DeleteEpochSettingProvider::new(connection)
.prune(epoch - threshold)
.map_err(AdapterError::QueryError)?
.count();
}

connection
.execute("commit transaction")
Expand Down Expand Up @@ -519,8 +525,12 @@ mod tests {
#[tokio::test]
async fn save_protocol_parameters_prune_older_epoch_settings() {
let connection = Connection::open(":memory:").unwrap();
const EPOCH_SETTING_PRUNE_EPOCH_THRESHOLD: u64 = 5;
setup_epoch_setting_db(&connection, &[1, 2]).unwrap();
let store = EpochSettingStore::new(Arc::new(Mutex::new(connection)));
let store = EpochSettingStore::new(
Arc::new(Mutex::new(connection)),
Some(EPOCH_SETTING_PRUNE_EPOCH_THRESHOLD),
);

store
.save_protocol_parameters(
Expand Down
28 changes: 20 additions & 8 deletions mithril-aggregator/src/database/provider/stake_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use mithril_common::StdError;
use tokio::sync::Mutex;

/// Delete stake pools for Epoch older than this.
const STAKE_POOL_PRUNE_EPOCH_THRESHOLD: Epoch = Epoch(10);

/// Stake pool as read from Chain.
#[derive(Debug, PartialEq)]
Expand Down Expand Up @@ -234,12 +233,19 @@ impl<'conn> DeleteStakePoolProvider<'conn> {
/// Service to deal with stake pools (read & write).
pub struct StakePoolStore {
connection: Arc<Mutex<Connection>>,

/// Number of epochs before previous records will be pruned at the next call to
/// [save_protocol_parameters][StakePoolStore::save_stakes].
retention_limit: Option<u64>,
}

impl StakePoolStore {
/// Create a new StakePool service
pub fn new(connection: Arc<Mutex<Connection>>) -> Self {
Self { connection }
pub fn new(connection: Arc<Mutex<Connection>>, retention_limit: Option<u64>) -> Self {
Self {
connection,
retention_limit,
}
}
}

Expand All @@ -265,10 +271,12 @@ impl StakeStorer for StakePoolStore {
}

// Prune useless old stake distributions.
let _ = DeleteStakePoolProvider::new(connection)
.prune(epoch - STAKE_POOL_PRUNE_EPOCH_THRESHOLD)
.map_err(AdapterError::QueryError)?
.count();
if let Some(threshold) = self.retention_limit {
let _ = DeleteStakePoolProvider::new(connection)
.prune(epoch - threshold)
.map_err(AdapterError::QueryError)?
.count();
}

connection
.execute("commit transaction")
Expand Down Expand Up @@ -471,8 +479,12 @@ mod tests {
#[tokio::test]
async fn save_protocol_parameters_prune_older_epoch_settings() {
let connection = Connection::open(":memory:").unwrap();
const STAKE_POOL_PRUNE_EPOCH_THRESHOLD: u64 = 10;
setup_stake_db(&connection, &[1, 2]).unwrap();
let store = StakePoolStore::new(Arc::new(Mutex::new(connection)));
let store = StakePoolStore::new(
Arc::new(Mutex::new(connection)),
Some(STAKE_POOL_PRUNE_EPOCH_THRESHOLD),
);

store
.save_stakes(
Expand Down
6 changes: 5 additions & 1 deletion mithril-aggregator/src/dependency_injection/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,10 @@ impl DependenciesBuilder {
}

async fn build_stake_store(&mut self) -> Result<Arc<StakePoolStore>> {
let stake_pool_store = Arc::new(StakePoolStore::new(self.get_sqlite_connection().await?));
let stake_pool_store = Arc::new(StakePoolStore::new(
self.get_sqlite_connection().await?,
self.configuration.store_retention_limit.map(|l| l as u64),
));

Ok(stake_pool_store)
}
Expand Down Expand Up @@ -419,6 +422,7 @@ impl DependenciesBuilder {
) -> Result<Arc<dyn ProtocolParametersStorer>> {
Ok(Arc::new(EpochSettingStore::new(
self.get_sqlite_connection().await?,
self.configuration.store_retention_limit.map(|l| l as u64),
)))
}

Expand Down

0 comments on commit 77730aa

Please sign in to comment.