Skip to content

Commit

Permalink
Migrate pruning of signer registrations in the upkeep service
Browse files Browse the repository at this point in the history
  • Loading branch information
sfauvel committed Nov 26, 2024
1 parent b4ec1a6 commit 82e196a
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,10 @@ impl VerificationKeyStorer for SignerRegistrationStore {
}

async fn prune_verification_keys(&self, max_epoch_to_prune: Epoch) -> StdResult<()> {
let _deleted_records = self
.connection
.fetch_first(
// we want to prune including the given epoch (+1)
DeleteSignerRegistrationRecordQuery::below_epoch_threshold(max_epoch_to_prune + 1),
)
self.connection
.apply(DeleteSignerRegistrationRecordQuery::below_epoch_threshold(
max_epoch_to_prune,
))
.map_err(AdapterError::QueryError)?;

Ok(())
Expand Down Expand Up @@ -190,7 +188,7 @@ mod tests {

pub fn init_signer_registration_store(
initial_data: Vec<(Epoch, HashMap<PartyId, SignerWithStake>)>,
) -> Arc<dyn VerificationKeyStorer> {
) -> Arc<SignerRegistrationStore> {
let connection = main_db_connection().unwrap();
let initial_data: Vec<(Epoch, Vec<SignerWithStake>)> = initial_data
.into_iter()
Expand Down Expand Up @@ -328,12 +326,50 @@ mod tests {
"Keys should exist before pruning"
);
store
.prune_verification_keys(Epoch(epoch))
.prune_verification_keys(Epoch(epoch) + 1)
.await
.expect("Pruning should not fail");

let pruned_epoch_keys = store.get_verification_keys(Epoch(epoch)).await.unwrap();
assert_eq!(None, pruned_epoch_keys);
}
}

async fn get_epochs_in_database_until(
store: &SignerRegistrationStore,
until_epoch: Epoch,
) -> Vec<Epoch> {
let mut epochs_in_database = vec![];
let mut current_epoch = Epoch(1);
while current_epoch <= until_epoch {
if store
.get_verification_keys(current_epoch)
.await
.unwrap()
.is_some()
{
epochs_in_database.push(current_epoch);
}
current_epoch += 1;
}
epochs_in_database
}

#[tokio::test]
async fn prune_epoch_older_than_threshold() {
let signers = build_signers(5, 2);
let store = init_signer_registration_store(signers);

assert_eq!(
vec!(Epoch(1), Epoch(2), Epoch(3), Epoch(4), Epoch(5)),
get_epochs_in_database_until(&store, Epoch(8)).await
);

store.prune_verification_keys(Epoch(4)).await.unwrap();

assert_eq!(
vec!(Epoch(4), Epoch(5)),
get_epochs_in_database_until(&store, Epoch(8)).await
);
}
}
7 changes: 6 additions & 1 deletion mithril-aggregator/src/dependency_injection/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1315,14 +1315,19 @@ impl DependenciesBuilder {
async fn build_upkeep_service(&mut self) -> Result<Arc<dyn UpkeepService>> {
let stake_pool_pruning_task = self.get_stake_store().await?;
let epoch_settings_pruning_task = self.get_epoch_settings_store().await?;
let mithril_registerer_pruning_task = self.get_mithril_registerer().await?;

let upkeep_service = Arc::new(AggregatorUpkeepService::new(
self.get_sqlite_connection().await?,
self.get_sqlite_connection_cardano_transaction_pool()
.await?,
self.get_event_store_sqlite_connection().await?,
self.get_signed_entity_lock().await?,
vec![stake_pool_pruning_task, epoch_settings_pruning_task],
vec![
stake_pool_pruning_task,
epoch_settings_pruning_task,
mithril_registerer_pruning_task,
],
self.root_logger(),
));

Expand Down
101 changes: 56 additions & 45 deletions mithril-aggregator/src/signer_registerer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use mithril_common::{
StdError, StdResult,
};

use crate::VerificationKeyStorer;
use crate::{services::EpochPruningTask, VerificationKeyStorer};

use mithril_common::chain_observer::ChainObserverError;

Expand Down Expand Up @@ -164,6 +164,26 @@ impl SignerRegistrationRoundOpener for MithrilSignerRegisterer {
stake_distribution,
});

Ok(())
}

async fn close_registration_round(&self) -> StdResult<()> {
let mut current_round = self.current_round.write().await;
*current_round = None;

Ok(())
}
}

#[async_trait]
impl EpochPruningTask for MithrilSignerRegisterer {
fn pruned_data(&self) -> &'static str {
"Signer registration"
}

async fn prune(&self, epoch: Epoch) -> StdResult<()> {
let registration_epoch = epoch.offset_to_recording_epoch();

if let Some(retention_limit) = self.verification_key_epoch_retention_limit {
self.verification_key_store
.prune_verification_keys(registration_epoch - retention_limit)
Expand All @@ -173,19 +193,11 @@ impl SignerRegistrationRoundOpener for MithrilSignerRegisterer {
"VerificationKeyStorer can not prune verification keys below epoch: '{}'",
registration_epoch - retention_limit
)
})
.map_err(|e| SignerRegistrationError::StoreError(anyhow!(e)))?;
})?;
}

Ok(())
}

async fn close_registration_round(&self) -> StdResult<()> {
let mut current_round = self.current_round.write().await;
*current_round = None;

Ok(())
}
}

#[async_trait]
Expand Down Expand Up @@ -288,14 +300,16 @@ mod tests {
use mithril_common::{
chain_observer::FakeObserver,
entities::{Epoch, Signer},
test_utils::{fake_data, MithrilFixtureBuilder},
test_utils::MithrilFixtureBuilder,
};
use mockall::predicate::eq;

use crate::{
database::{repository::SignerRegistrationStore, test_helper::main_db_connection},
MithrilSignerRegisterer, SignerRegisterer, SignerRegistrationRoundOpener,
VerificationKeyStorer,
};
use crate::{services::EpochPruningTask, store::MockVerificationKeyStorer};

use super::MockSignerRecorder;

Expand Down Expand Up @@ -418,47 +432,44 @@ mod tests {
}

#[tokio::test]
async fn should_prune_verification_keys_older_than_two_epochs_at_round_opening() {
let verification_key_store = Arc::new(SignerRegistrationStore::new(Arc::new(
main_db_connection().unwrap(),
)));
for initial_key in 1..=5 {
let signer_with_stake = fake_data::signers_with_stakes(1).pop().unwrap();
verification_key_store
.save_verification_key(Epoch(initial_key), signer_with_stake)
.await
.unwrap();
}
async fn mock_prune_epoch_older_than_threshold() {
const PROTOCOL_INITIALIZER_PRUNE_EPOCH_THRESHOLD: u64 = 10;
let retention_limit = Some(PROTOCOL_INITIALIZER_PRUNE_EPOCH_THRESHOLD);

let mut verification_key_store = MockVerificationKeyStorer::new();
verification_key_store
.expect_prune_verification_keys()
.with(eq(Epoch(4).offset_to_recording_epoch()))
.times(1)
.returning(|_| Ok(()));

let signer_recorder = MockSignerRecorder::new();
let signer_registerer = MithrilSignerRegisterer::new(
Arc::new(FakeObserver::default()),
verification_key_store.clone(),
Arc::new(signer_recorder),
Some(2),
Arc::new(verification_key_store),
Arc::new(MockSignerRecorder::new()),
retention_limit,
);
let fixture = MithrilFixtureBuilder::default().with_signers(5).build();

signer_registerer
.open_registration_round(Epoch(5), fixture.stake_distribution())
.await
.expect("Opening a registration round should not fail");
let current_epoch = Epoch(4) + PROTOCOL_INITIALIZER_PRUNE_EPOCH_THRESHOLD;
signer_registerer.prune(current_epoch).await.unwrap();
}

for epoch in 1..=3 {
let verification_keys = verification_key_store
.get_verification_keys(Epoch(epoch))
.await
.unwrap();
assert_eq!(None, verification_keys);
}
#[tokio::test]
async fn mock_without_threshold_nothing_is_pruned() {
let retention_limit = None;

let verification_keys = verification_key_store
.get_verification_keys(Epoch(4))
.await
.unwrap();
assert!(
verification_keys.is_some(),
"Verification keys of the previous epoch should not have been pruned"
let mut verification_key_store = MockVerificationKeyStorer::new();
verification_key_store
.expect_prune_verification_keys()
.never();

let signer_registerer = MithrilSignerRegisterer::new(
Arc::new(FakeObserver::default()),
Arc::new(verification_key_store),
Arc::new(MockSignerRecorder::new()),
retention_limit,
);

signer_registerer.prune(Epoch(100)).await.unwrap();
}
}

0 comments on commit 82e196a

Please sign in to comment.