Skip to content

Commit

Permalink
Use retention limit from configuration instead of constants
Browse files Browse the repository at this point in the history
This allow to synchronise pruning between tables, else pruning of data
can occurs from tables that is used as a foreign key in another.
Also, enable data pruning in the e2e tests for the aggregator in order
to test it.
  • Loading branch information
Alenar committed Jul 24, 2023
1 parent 00bca23 commit 04bc285
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 23 deletions.
30 changes: 20 additions & 10 deletions mithril-aggregator/src/database/provider/epoch_setting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@ use crate::ProtocolParametersStorer;
use mithril_common::StdError;
use tokio::sync::Mutex;

/// Delete epoch settings for Epoch older than this.
const EPOCH_SETTING_PRUNE_EPOCH_THRESHOLD: Epoch = Epoch(10);

/// Settings for an epoch, including the protocol parameters.
#[derive(Debug, PartialEq)]
pub struct EpochSettingRecord {
Expand Down Expand Up @@ -250,12 +247,19 @@ impl<'conn> DeleteEpochSettingProvider<'conn> {
/// Service to deal with epoch settings (read & write).
pub struct EpochSettingStore {
connection: Arc<Mutex<Connection>>,

/// Number of epochs before previous records will be pruned at the next call to
/// [save_protocol_parameters][EpochSettingStore::save_protocol_parameters].
retention_limit: Option<u64>,
}

impl EpochSettingStore {
/// Create a new EpochSetting service
pub fn new(connection: Arc<Mutex<Connection>>) -> Self {
Self { connection }
pub fn new(connection: Arc<Mutex<Connection>>, retention_limit: Option<u64>) -> Self {
Self {
connection,
retention_limit,
}
}
}

Expand All @@ -277,10 +281,12 @@ impl ProtocolParametersStorer for EpochSettingStore {
.map_err(|e| AdapterError::GeneralError(format!("{e}")))?;

// Prune useless old epoch settings.
let _ = DeleteEpochSettingProvider::new(connection)
.prune(epoch - EPOCH_SETTING_PRUNE_EPOCH_THRESHOLD)
.map_err(AdapterError::QueryError)?
.count();
if let Some(threshold) = self.retention_limit {
let _ = DeleteEpochSettingProvider::new(connection)
.prune(epoch - threshold)
.map_err(AdapterError::QueryError)?
.count();
}

connection
.execute("commit transaction")
Expand Down Expand Up @@ -519,8 +525,12 @@ mod tests {
#[tokio::test]
async fn save_protocol_parameters_prune_older_epoch_settings() {
let connection = Connection::open(":memory:").unwrap();
const EPOCH_SETTING_PRUNE_EPOCH_THRESHOLD: u64 = 5;
setup_epoch_setting_db(&connection, &[1, 2]).unwrap();
let store = EpochSettingStore::new(Arc::new(Mutex::new(connection)));
let store = EpochSettingStore::new(
Arc::new(Mutex::new(connection)),
Some(EPOCH_SETTING_PRUNE_EPOCH_THRESHOLD),
);

store
.save_protocol_parameters(
Expand Down
30 changes: 20 additions & 10 deletions mithril-aggregator/src/database/provider/stake_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@ use mithril_common::{
use mithril_common::StdError;
use tokio::sync::Mutex;

/// Delete stake pools for Epoch older than this.
const STAKE_POOL_PRUNE_EPOCH_THRESHOLD: Epoch = Epoch(10);

/// Stake pool as read from Chain.
#[derive(Debug, PartialEq)]
pub struct StakePool {
Expand Down Expand Up @@ -234,12 +231,19 @@ impl<'conn> DeleteStakePoolProvider<'conn> {
/// Service to deal with stake pools (read & write).
pub struct StakePoolStore {
connection: Arc<Mutex<Connection>>,

/// Number of epochs before previous records will be pruned at the next call to
/// [save_protocol_parameters][StakePoolStore::save_stakes].
retention_limit: Option<u64>,
}

impl StakePoolStore {
/// Create a new StakePool service
pub fn new(connection: Arc<Mutex<Connection>>) -> Self {
Self { connection }
pub fn new(connection: Arc<Mutex<Connection>>, retention_limit: Option<u64>) -> Self {
Self {
connection,
retention_limit,
}
}
}

Expand All @@ -265,10 +269,12 @@ impl StakeStorer for StakePoolStore {
}

// Prune useless old stake distributions.
let _ = DeleteStakePoolProvider::new(connection)
.prune(epoch - STAKE_POOL_PRUNE_EPOCH_THRESHOLD)
.map_err(AdapterError::QueryError)?
.count();
if let Some(threshold) = self.retention_limit {
let _ = DeleteStakePoolProvider::new(connection)
.prune(epoch - threshold)
.map_err(AdapterError::QueryError)?
.count();
}

connection
.execute("commit transaction")
Expand Down Expand Up @@ -471,8 +477,12 @@ mod tests {
#[tokio::test]
async fn save_protocol_parameters_prune_older_epoch_settings() {
let connection = Connection::open(":memory:").unwrap();
const STAKE_POOL_PRUNE_EPOCH_THRESHOLD: u64 = 10;
setup_stake_db(&connection, &[1, 2]).unwrap();
let store = StakePoolStore::new(Arc::new(Mutex::new(connection)));
let store = StakePoolStore::new(
Arc::new(Mutex::new(connection)),
Some(STAKE_POOL_PRUNE_EPOCH_THRESHOLD),
);

store
.save_stakes(
Expand Down
6 changes: 5 additions & 1 deletion mithril-aggregator/src/dependency_injection/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,10 @@ impl DependenciesBuilder {
}

async fn build_stake_store(&mut self) -> Result<Arc<StakePoolStore>> {
let stake_pool_store = Arc::new(StakePoolStore::new(self.get_sqlite_connection().await?));
let stake_pool_store = Arc::new(StakePoolStore::new(
self.get_sqlite_connection().await?,
self.configuration.store_retention_limit.map(|l| l as u64),
));

Ok(stake_pool_store)
}
Expand Down Expand Up @@ -419,6 +422,7 @@ impl DependenciesBuilder {
) -> Result<Arc<dyn ProtocolParametersStorer>> {
Ok(Arc::new(EpochSettingStore::new(
self.get_sqlite_connection().await?,
self.configuration.store_retention_limit.map(|l| l as u64),
)))
}

Expand Down
4 changes: 2 additions & 2 deletions mithril-test-lab/mithril-end-to-end/src/end_to_end_spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ impl<'a> Spec<'a> {
.await?
.unwrap_or_default();

// Wait 3 epochs after start epoch for the aggregator to be able to bootstrap a genesis certificate
let mut target_epoch = start_epoch + 3;
// Wait 4 epochs after start epoch for the aggregator to be able to bootstrap a genesis certificate
let mut target_epoch = start_epoch + 4;
assertions::wait_for_target_epoch(
self.infrastructure.chain_observer(),
target_epoch,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ impl Aggregator {
"CARDANO_NODE_SOCKET_PATH",
bft_node.socket_path.to_str().unwrap(),
),
("STORE_RETENTION_LIMIT", "5"),
("CARDANO_CLI_PATH", cardano_cli_path.to_str().unwrap()),
("GENESIS_VERIFICATION_KEY", "5b33322c3235332c3138362c3230312c3137372c31312c3131372c3133352c3138372c3136372c3138312c3138382c32322c35392c3230362c3130352c3233312c3135302c3231352c33302c37382c3231322c37362c31362c3235322c3138302c37322c3133342c3133372c3234372c3136312c36385d"),
("GENESIS_SECRET_KEY", "5b3131382c3138342c3232342c3137332c3136302c3234312c36312c3134342c36342c39332c3130362c3232392c38332c3133342c3138392c34302c3138392c3231302c32352c3138342c3136302c3134312c3233372c32362c3136382c35342c3233392c3230342c3133392c3131392c31332c3139395d"),
Expand Down

0 comments on commit 04bc285

Please sign in to comment.