Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/run-tests-on-push-to-main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ jobs:
--package acropolis_module_epochs_state \
--package acropolis_module_genesis_bootstrapper \
--package acropolis_module_governance_state \
--package acropolis_module_historical_accounts_state \
--package acropolis_module_mithril_snapshot_fetcher \
--package acropolis_module_parameters_state \
--package acropolis_module_snapshot_bootstrapper \
Expand Down
4 changes: 2 additions & 2 deletions common/src/queries/accounts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub enum AccountsStateQuery {
GetAccountRegistrationHistory { account: StakeAddress },
GetAccountDelegationHistory { account: StakeAddress },
GetAccountMIRHistory { account: StakeAddress },
GetAccountWithdrawalHistory { stake_key: Vec<u8> },
GetAccountWithdrawalHistory { account: StakeAddress },
GetAccountAssociatedAddresses { stake_key: Vec<u8> },
GetAccountAssets { stake_key: Vec<u8> },
GetAccountAssetsTotals { stake_key: Vec<u8> },
Expand Down Expand Up @@ -52,7 +52,7 @@ pub enum AccountsStateQueryResponse {
AccountRegistrationHistory(Vec<RegistrationUpdate>),
AccountDelegationHistory(Vec<DelegationUpdate>),
AccountMIRHistory(Vec<AccountWithdrawal>),
AccountWithdrawalHistory(AccountWithdrawalHistory),
AccountWithdrawalHistory(Vec<AccountWithdrawal>),
AccountAssociatedAddresses(AccountAssociatedAddresses),
AccountAssets(AccountAssets),
AccountAssetsTotals(AccountAssetsTotals),
Expand Down
7 changes: 7 additions & 0 deletions common/src/stake_addresses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -867,6 +867,8 @@ mod tests {
}

mod withdrawal_tests {
use crate::TxIdentifier;

use super::*;

#[test]
Expand All @@ -880,6 +882,7 @@ mod tests {
let withdrawal = Withdrawal {
address: stake_address.clone(),
value: 40,
tx_identifier: TxIdentifier::default(),
};
stake_addresses.process_withdrawal(&withdrawal);

Expand All @@ -898,6 +901,7 @@ mod tests {
let withdrawal = Withdrawal {
address: stake_address.clone(),
value: 24,
tx_identifier: TxIdentifier::default(),
};
stake_addresses.process_withdrawal(&withdrawal);
assert_eq!(stake_addresses.get(&stake_address).unwrap().rewards, 12);
Expand All @@ -906,6 +910,7 @@ mod tests {
let withdrawal = Withdrawal {
address: stake_address.clone(),
value: 2,
tx_identifier: TxIdentifier::default(),
};
stake_addresses.process_withdrawal(&withdrawal);
assert_eq!(stake_addresses.get(&stake_address).unwrap().rewards, 10);
Expand All @@ -922,6 +927,7 @@ mod tests {
let withdrawal = Withdrawal {
address: stake_address.clone(),
value: 0,
tx_identifier: TxIdentifier::default(),
};

stake_addresses.process_withdrawal(&withdrawal);
Expand All @@ -936,6 +942,7 @@ mod tests {
let withdrawal = Withdrawal {
address: stake_address.clone(),
value: 10,
tx_identifier: TxIdentifier::default(),
};

// Should log error but not panic
Expand Down
3 changes: 3 additions & 0 deletions common/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,9 @@ pub struct Withdrawal {

/// Value to withdraw
pub value: Lovelace,

// Identifier of withdrawal tx
pub tx_identifier: TxIdentifier,
}

/// Treasury pot account
Expand Down
1 change: 1 addition & 0 deletions modules/accounts_state/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1314,6 +1314,7 @@ mod tests {
withdrawals: vec![Withdrawal {
address: stake_address.clone(),
value: 39,
tx_identifier: TxIdentifier::default(),
}],
};

Expand Down
39 changes: 20 additions & 19 deletions modules/historical_accounts_state/src/historical_accounts_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ impl HistoricalAccountsState {
if let Message::Cardano((ref block_info, CardanoMessage::ProtocolParams(params))) =
params_msg.as_ref()
{
Self::check_sync(&current_block, &block_info);
Self::check_sync(&current_block, block_info);
let mut state = state_mutex.lock().await;
state.volatile.start_new_epoch(block_info.number);
if let Some(shelley) = &params.params.shelley {
Expand All @@ -124,7 +124,7 @@ impl HistoricalAccountsState {
CardanoMessage::StakeRewardDeltas(rewards_msg),
)) = rewards_msg.as_ref()
{
Self::check_sync(&current_block, &block_info);
Self::check_sync(&current_block, block_info);
let mut state = state_mutex.lock().await;
state
.handle_rewards(rewards_msg)
Expand All @@ -142,12 +142,9 @@ impl HistoricalAccountsState {
);
let _entered = span.enter();

Self::check_sync(&current_block, &block_info);
Self::check_sync(&current_block, block_info);
let mut state = state_mutex.lock().await;
state
.handle_tx_certificates(tx_certs_msg, block_info.epoch as u32)
.inspect_err(|e| error!("TxCertificates handling error: {e:#}"))
.ok();
state.handle_tx_certificates(tx_certs_msg, block_info.epoch as u32);
}

_ => error!("Unexpected message type: {certs_message:?}"),
Expand All @@ -163,12 +160,9 @@ impl HistoricalAccountsState {
);
let _entered = span.enter();

Self::check_sync(&current_block, &block_info);
Self::check_sync(&current_block, block_info);
let mut state = state_mutex.lock().await;
state
.handle_withdrawals(withdrawals_msg)
.inspect_err(|e| error!("Withdrawals handling error: {e:#}"))
.ok();
state.handle_withdrawals(withdrawals_msg);
}

_ => error!("Unexpected message type: {message:?}"),
Expand All @@ -184,7 +178,7 @@ impl HistoricalAccountsState {
);
let _entered = span.enter();

Self::check_sync(&current_block, &block_info);
Self::check_sync(&current_block, block_info);
{
let mut state = state_mutex.lock().await;
state
Expand All @@ -206,8 +200,7 @@ impl HistoricalAccountsState {

if should_prune {
let (store, cfg) = {
let mut state: tokio::sync::MutexGuard<'_, State> =
state_mutex.lock().await;
let mut state = state_mutex.lock().await;
state.prune_volatile().await;
(state.immutable.clone(), state.config.clone())
};
Expand Down Expand Up @@ -320,7 +313,7 @@ impl HistoricalAccountsState {

let response = match query {
AccountsStateQuery::GetAccountRegistrationHistory { account } => {
match state.lock().await.get_registration_history(&account).await {
match state.lock().await.get_registration_history(account).await {
Ok(Some(registrations)) => {
AccountsStateQueryResponse::AccountRegistrationHistory(
registrations,
Expand All @@ -331,7 +324,7 @@ impl HistoricalAccountsState {
}
}
AccountsStateQuery::GetAccountDelegationHistory { account } => {
match state.lock().await.get_delegation_history(&account).await {
match state.lock().await.get_delegation_history(account).await {
Ok(Some(delegations)) => {
AccountsStateQueryResponse::AccountDelegationHistory(delegations)
}
Expand All @@ -340,13 +333,21 @@ impl HistoricalAccountsState {
}
}
AccountsStateQuery::GetAccountMIRHistory { account } => {
match state.lock().await.get_mir_history(&account).await {
match state.lock().await.get_mir_history(account).await {
Ok(Some(mirs)) => AccountsStateQueryResponse::AccountMIRHistory(mirs),
Ok(None) => AccountsStateQueryResponse::NotFound,
Err(e) => AccountsStateQueryResponse::Error(e.to_string()),
}
}

AccountsStateQuery::GetAccountWithdrawalHistory { account } => {
match state.lock().await.get_withdrawal_history(account).await {
Ok(Some(withdrawals)) => {
AccountsStateQueryResponse::AccountWithdrawalHistory(withdrawals)
}
Ok(None) => AccountsStateQueryResponse::NotFound,
Err(e) => AccountsStateQueryResponse::Error(e.to_string()),
}
}
_ => AccountsStateQueryResponse::Error(format!(
"Unimplemented query variant: {:?}",
query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl ImmutableHistoricalAccountStore {
if config.store_rewards_history {
batch.insert(
&self.rewards_history,
&epoch_key,
epoch_key,
to_vec(&entry.reward_history)?,
);
}
Expand All @@ -96,44 +96,44 @@ impl ImmutableHistoricalAccountStore {
if config.store_active_stake_history {
batch.insert(
&self.active_stake_history,
&epoch_key,
epoch_key,
to_vec(&entry.active_stake_history)?,
);
}

// Persist account delegation updates
if config.store_delegation_history {
if let Some(updates) = &entry.delegation_history {
batch.insert(&self.delegation_history, &epoch_key, to_vec(&updates)?);
batch.insert(&self.delegation_history, epoch_key, to_vec(updates)?);
}
}

// Persist account registration updates
if config.store_registration_history {
if let Some(updates) = &entry.registration_history {
batch.insert(&self.registration_history, &epoch_key, to_vec(&updates)?);
batch.insert(&self.registration_history, epoch_key, to_vec(updates)?);
}
}

// Persist withdrawal updates
if config.store_withdrawal_history {
if let Some(updates) = &entry.withdrawal_history {
batch.insert(&self.withdrawal_history, &epoch_key, to_vec(&updates)?);
batch.insert(&self.withdrawal_history, epoch_key, to_vec(updates)?);
}
}

// Persist MIR updates
if config.store_mir_history {
if let Some(updates) = &entry.mir_history {
batch.insert(&self.mir_history, &epoch_key, to_vec(&updates)?);
batch.insert(&self.mir_history, epoch_key, to_vec(updates)?);
}
}

// Persist address updates
// TODO: Deduplicate addresses across epochs
if config.store_addresses {
if let Some(updates) = &entry.addresses {
batch.insert(&self.addresses, &epoch_key, to_vec(&updates)?);
batch.insert(&self.addresses, epoch_key, to_vec(updates)?);
}
}
}
Expand All @@ -157,7 +157,7 @@ impl ImmutableHistoricalAccountStore {
account: &StakeAddress,
) -> Result<Option<Vec<RewardHistory>>> {
let mut immutable_rewards =
self.collect_partition::<RewardHistory>(&self.rewards_history, &account.get_hash())?;
self.collect_partition::<RewardHistory>(&self.rewards_history, account.get_hash())?;

self.merge_pending(
account,
Expand All @@ -175,7 +175,7 @@ impl ImmutableHistoricalAccountStore {
) -> Result<Option<Vec<ActiveStakeHistory>>> {
let mut immutable_active_stake = self.collect_partition::<ActiveStakeHistory>(
&self.active_stake_history,
&account.get_hash(),
account.get_hash(),
)?;

self.merge_pending(
Expand All @@ -194,7 +194,7 @@ impl ImmutableHistoricalAccountStore {
) -> Result<Option<Vec<RegistrationUpdate>>> {
let mut immutable_registrations = self.collect_partition::<RegistrationUpdate>(
&self.registration_history,
&account.get_hash(),
account.get_hash(),
)?;

self.merge_pending(
Expand All @@ -212,7 +212,7 @@ impl ImmutableHistoricalAccountStore {
account: &StakeAddress,
) -> Result<Option<Vec<DelegationUpdate>>> {
let mut immutable_delegations = self
.collect_partition::<DelegationUpdate>(&self.delegation_history, &account.get_hash())?;
.collect_partition::<DelegationUpdate>(&self.delegation_history, account.get_hash())?;

self.merge_pending(
account,
Expand All @@ -229,21 +229,19 @@ impl ImmutableHistoricalAccountStore {
account: &StakeAddress,
) -> Result<Option<Vec<AccountWithdrawal>>> {
let mut immutable_mirs =
self.collect_partition::<AccountWithdrawal>(&self.mir_history, &account.get_hash())?;
self.collect_partition::<AccountWithdrawal>(&self.mir_history, account.get_hash())?;

self.merge_pending(account, |e| e.mir_history.as_ref(), &mut immutable_mirs).await;

Ok((!immutable_mirs.is_empty()).then_some(immutable_mirs))
}

pub async fn _get_withdrawal_history(
pub async fn get_withdrawal_history(
&self,
account: &StakeAddress,
) -> Result<Option<Vec<AccountWithdrawal>>> {
let mut immutable_withdrawals = self.collect_partition::<AccountWithdrawal>(
&self.withdrawal_history,
&account.get_hash(),
)?;
let mut immutable_withdrawals = self
.collect_partition::<AccountWithdrawal>(&self.withdrawal_history, account.get_hash())?;

self.merge_pending(
account,
Expand All @@ -260,7 +258,7 @@ impl ImmutableHistoricalAccountStore {
account: &StakeAddress,
) -> Result<Option<Vec<ShelleyAddress>>> {
let mut immutable_addresses =
self.collect_partition::<ShelleyAddress>(&self.addresses, &account.get_hash())?;
self.collect_partition::<ShelleyAddress>(&self.addresses, account.get_hash())?;

self.merge_pending(account, |e| e.addresses.as_ref(), &mut immutable_addresses).await;

Expand Down
Loading
Loading