diff --git a/crates/amaru/tests/rewards.rs b/crates/amaru/tests/rewards.rs index a0355076..f88d5bf6 100644 --- a/crates/amaru/tests/rewards.rs +++ b/crates/amaru/tests/rewards.rs @@ -14,14 +14,14 @@ use std::path::PathBuf; -use amaru_ledger::{rewards::StakeDistributionSnapshot, store::RewardsSummary}; +use amaru_ledger::{rewards::StakeDistribution, store::RewardsSummary}; use amaru_stores::rocksdb::RocksDB; use pallas_primitives::Epoch; const LEDGER_DB: &str = "../../ledger.db"; fn compare_preprod_snapshot(epoch: Epoch) { - let snapshot = StakeDistributionSnapshot::new( + let snapshot = StakeDistribution::new( &RocksDB::from_snapshot(&PathBuf::from(LEDGER_DB), epoch) .unwrap_or_else(|_| panic!("Failed to open ledger snapshot for epoch {}", epoch)), ) diff --git a/crates/consensus/src/consensus/mod.rs b/crates/consensus/src/consensus/mod.rs index a3cd38fa..4823e6ef 100644 --- a/crates/consensus/src/consensus/mod.rs +++ b/crates/consensus/src/consensus/mod.rs @@ -111,6 +111,7 @@ impl Stage { .or_panic() } + #[instrument(level = Level::INFO, skip(self))] async fn switch_to_fork( &mut self, peer: &Peer, diff --git a/crates/ledger/src/rewards.rs b/crates/ledger/src/rewards.rs index 6526ab70..0b396986 100644 --- a/crates/ledger/src/rewards.rs +++ b/crates/ledger/src/rewards.rs @@ -61,20 +61,20 @@ each epoch (related to different snapshots) since it is a continuous cycle. Computing rewards[^1] using: - │ - snapshot(e + 1) for + │ - snapshot(e + 2) for │ - pool performances │ - treasury & reserves - Stake is delegated │ - snapshot(e) for: - │ │ - stake distribution - │ │ - pool parameters - │ Using snapshot(e - 1) │ - │ for leader schedule │ Distributing rewards - │ │ │ earned from (e) - │ │ │ │ -snapshot(e - 1) │ snapshot(e) │ snapshot(e + 1) │ │snapshot(e + 2) - ╽ ╽ ╽ ╽ ╽ ╽ ╽╽ + Stake is delegated │ - snapshot(e) for: + │ │ - stake distribution + │ │ - pool parameters + │ Using snapshot(e) │ + │ for leader schedule │ Distributing rewards + │ │ │ earned from (e) + │ │ │ │ + snapshot(e) │ snapshot(e+1) │ snapshot(e + 2) │ │snapshot(e + 3) + ╽ ╽ ╽ ╽ ╽ ╽ ╽╽ ━━━━━━━━━━━━╸╸╸╋━━━━━━━━━━━━━━━━╸╸╸╋╸╸╸━━━━━━━━━━━━━━━━╸╸╸╋╸╸╸━━━━━━━━━━━━━━━╸╸╸╋╸╸╸━━━━━━━━> - e - 1 e e + 1 e + 2 e + 3 + e e + 1 e + 2 e + 3 e + 4 [^1]: Technically, we need to wait a few slots for the snapshot (e + 1) to stabilise; otherwise we risk doing an expensive computation which may be rolled back. In practice, the calculation @@ -124,9 +124,9 @@ const EVENT_TARGET: &str = "amaru::ledger::state::rewards"; /// Note that the `keys` and `scripts `field only contains _active_ accounts; that is, accounts /// delegated to a registered stake pool. #[derive(Debug)] -pub struct StakeDistributionSnapshot { +pub struct StakeDistribution { /// Epoch number for this snapshot (taken at the end of the epoch) - epoch: Epoch, + pub epoch: Epoch, /// Total stake, in Lovelace, delegated to registered pools active_stake: Lovelace, @@ -147,7 +147,7 @@ pub struct StakeDistributionSnapshot { pools: BTreeMap, } -impl StakeDistributionSnapshot { +impl StakeDistribution { /// Clompute a new stake distribution snapshot using data available in the `Store`. /// /// Invariant: The given store is expected to be a snapshot taken at the end of an epoch. @@ -238,15 +238,15 @@ impl StakeDistributionSnapshot { let epoch = db.most_recent_snapshot(); info!( - name: "stake_distribution.snapshot", target: EVENT_TARGET, epoch = ?epoch, active_stake = ?active_stake, accounts = ?(keys.len() + scripts.len()), pools = ?pools.len(), + "stake_distribution.snapshot", ); - Ok(StakeDistributionSnapshot { + Ok(StakeDistribution { epoch, active_stake, keys, @@ -256,9 +256,9 @@ impl StakeDistributionSnapshot { } } -impl serde::Serialize for StakeDistributionSnapshot { +impl serde::Serialize for StakeDistribution { fn serialize(&self, serializer: S) -> Result { - let mut s = serializer.serialize_struct("StakeDistributionSnapshot", 5)?; + let mut s = serializer.serialize_struct("StakeDistribution", 5)?; s.serialize_field("epoch", &self.epoch)?; s.serialize_field("active_stake", &self.active_stake)?; s.serialize_field("keys", &self.keys)?; @@ -604,10 +604,7 @@ impl serde::Serialize for RewardsSummary { } impl RewardsSummary { - pub fn new( - db: &impl Store, - snapshot: StakeDistributionSnapshot, - ) -> Result { + pub fn new(db: &impl Store, snapshot: StakeDistribution) -> Result { let pots = db.with_pots(|entry| Pots::from(entry.borrow()))?; let (mut blocks_count, mut blocks_per_pool) = RewardsSummary::count_blocks(db)?; @@ -682,7 +679,6 @@ impl RewardsSummary { }); info!( - name: "rewards.summary", target: EVENT_TARGET, epoch = ?snapshot.epoch, ?efficiency, @@ -694,6 +690,7 @@ impl RewardsSummary { pots.reserves = ?pots.reserves, pots.treasury = ?pots.treasury, pots.fees = ?pots.fees, + "rewards.summary", ); Ok(RewardsSummary { @@ -759,7 +756,7 @@ impl RewardsSummary { blocks_count: u64, available_rewards: Lovelace, total_stake: Lovelace, - snapshot: &StakeDistributionSnapshot, + snapshot: &StakeDistribution, pool: &PoolState, ) -> PoolRewards { let owner_stake = pool.owner_stake(&snapshot.keys); diff --git a/crates/ledger/src/state.rs b/crates/ledger/src/state.rs index 8df3a832..626096b7 100644 --- a/crates/ledger/src/state.rs +++ b/crates/ledger/src/state.rs @@ -23,13 +23,13 @@ use crate::{ self, epoch_from_slot, Hash, Hasher, MintedBlock, Point, PoolId, PoolParams, PoolSigma, TransactionInput, TransactionOutput, CONSENSUS_SECURITY_PARAM, STABILITY_WINDOW, }, - rewards::RewardsSummary, + rewards::{RewardsSummary, StakeDistribution}, state::volatile_db::{StoreUpdate, VolatileDB, VolatileState}, store::{columns::*, Store}, }; use std::{ borrow::Cow, - collections::BTreeSet, + collections::{BTreeSet, VecDeque}, sync::{Arc, Mutex}, }; use tracing::{debug, debug_span, info_span, Span}; @@ -61,10 +61,38 @@ where /// The computed rewards summary to be applied on the next epoch boundary. This is computed /// once in the epoch, and held until the end where it is reset. rewards_summary: Option, + + /// The latest stake distributions, used to determine the leader-schedule for the ongoing epoch + /// and as well as the rewards. + stake_distributions: VecDeque, } impl, E: std::fmt::Debug> State { pub fn new(stable: Arc>) -> Self { + let db = stable.lock().unwrap(); + + // NOTE: Initialize stake distribution held in-memory. The one before last is needed by the + // consensus layer to validate the leader schedule, while the one before that will be + // consumed for the rewards calculation. + // + // We store them as a bounded queue, such that we always ever have 2 stake distributions + // available. Once we consume the oldest one, we immediately insert a new one for the epoch + // that just passed, pushing the other forward. + // + // Note that a possible memory optimization could be to discard all accounts from the most + // recent ones, since we only truly need the stake pool distribution for the leader + // schedule whereas accounts can be quite numerous. + let latest_epoch = db.most_recent_snapshot(); + let mut stake_distributions = VecDeque::new(); + for epoch in latest_epoch - 2..=latest_epoch - 1 { + stake_distributions.push_front( + db.stake_distribution(epoch) + .unwrap_or_else(|e| panic!("unable to get current stake distribution: {e:?}")), + ); + } + + drop(db); + Self { stable, @@ -81,6 +109,8 @@ impl, E: std::fmt::Debug> State { volatile: VolatileDB::default(), rewards_summary: None, + + stake_distributions, } } @@ -184,7 +214,16 @@ impl, E: std::fmt::Debug> State { // Once we reach the stability window, if self.rewards_summary.is_none() && relative_slot >= STABILITY_WINDOW as u64 { self.rewards_summary = Some( - db.rewards_summary(current_epoch - 1) + db.rewards_summary( + self.stake_distributions + .pop_back() + .unwrap_or_else(|| panic!("no readily available stake distribution?")), + ) + .map_err(ForwardErr::StorageErr)?, + ); + + self.stake_distributions.push_front( + db.stake_distribution(current_epoch - 1) .map_err(ForwardErr::StorageErr)?, ); } @@ -201,6 +240,12 @@ impl, E: std::fmt::Debug> State { } pub fn backward<'b>(&mut self, to: &'b Point) -> Result<(), BackwardErr<'b>> { + // NOTE: This happens typically on start-up; The consensus layer will typically ask us to + // rollback to the last known point, which ought to be the tip of the database. + if self.volatile.is_empty() && self.tip().as_ref() == to { + return Ok(()); + } + self.volatile .rollback_to(to, BackwardErr::UnknownRollbackPoint(to)) } diff --git a/crates/ledger/src/store.rs b/crates/ledger/src/store.rs index 9e91ef8c..192a3e15 100644 --- a/crates/ledger/src/store.rs +++ b/crates/ledger/src/store.rs @@ -15,7 +15,7 @@ pub mod columns; use super::kernel::{Epoch, Point, PoolId, TransactionInput, TransactionOutput}; -pub use crate::rewards::RewardsSummary; +pub use crate::rewards::{RewardsSummary, StakeDistribution}; use columns::*; use std::{borrow::BorrowMut, iter}; @@ -75,8 +75,14 @@ pub trait Store: Send + Sync { input: &TransactionInput, ) -> Result, Self::Error>; - /// Compute rewards using database snapshots. - fn rewards_summary(&self, epoch: Epoch) -> Result; + /// Compute stake distribution using database snapshots. + fn stake_distribution(&self, epoch: Epoch) -> Result; + + /// Compute rewards using database snapshots and a previously computed stake distribution. + fn rewards_summary( + &self, + stake_distribution: StakeDistribution, + ) -> Result; /// Get current values of the treasury and reserves accounts. fn with_pots( diff --git a/crates/stores/src/rocksdb/mod.rs b/crates/stores/src/rocksdb/mod.rs index 35af68b5..8fc560f2 100644 --- a/crates/stores/src/rocksdb/mod.rs +++ b/crates/stores/src/rocksdb/mod.rs @@ -19,7 +19,7 @@ use amaru_ledger::{ borrow::{borrowable_proxy::BorrowableProxy, IterBorrow}, }, kernel::{Epoch, Point, PoolId, TransactionInput, TransactionOutput}, - rewards::StakeDistributionSnapshot, + rewards::StakeDistribution, store::{columns as scolumns, Columns, RewardsSummary, Store}, }; use columns::*; @@ -30,7 +30,7 @@ use std::{ fmt, fs, io, path::{Path, PathBuf}, }; -use tracing::{debug, debug_span, info, warn}; +use tracing::{debug, debug_span, info, info_span, warn}; pub mod columns; pub mod common; @@ -91,7 +91,6 @@ impl RocksDB { .unwrap_or_default() .parse::() { - info!(target: EVENT_TARGET, epoch, "new.found_snapshot"); snapshots.push(epoch); } else if entry.file_name() != DIR_LIVE_DB { warn!( @@ -104,6 +103,8 @@ impl RocksDB { snapshots.sort(); + info!(target: EVENT_TARGET, snapshots = ?snapshots, "new.known_snapshots"); + let mut opts = Options::default(); opts.create_if_missing(true); opts.set_prefix_extractor(SliceTransform::create_fixed_prefix(PREFIX_LEN)); @@ -209,11 +210,11 @@ impl Store for RocksDB { pools::add(&batch, add.pools)?; accounts::add(&batch, add.accounts)?; + accounts::reset(&batch, withdrawals)?; + utxo::remove(&batch, remove.utxo)?; pools::remove(&batch, remove.pools)?; accounts::remove(&batch, remove.accounts)?; - - accounts::reset(&batch, withdrawals)?; } } @@ -235,7 +236,7 @@ impl Store for RocksDB { let snapshot = self.snapshots.last().map(|s| s + 1).unwrap_or(epoch); if snapshot == epoch { if let Some(mut rewards_summary) = rewards_summary { - debug_span!(target: EVENT_TARGET, "snapshot.applying_rewards").in_scope(|| { + info_span!(target: EVENT_TARGET, "snapshot.applying_rewards").in_scope(|| { self.with_accounts(|iterator| { for (account, mut row) in iterator { if let Some(rewards) = rewards_summary.extract_rewards(&account) { @@ -295,21 +296,25 @@ impl Store for RocksDB { Ok(()) } - fn rewards_summary(&self, epoch: Epoch) -> Result { - let stake_distr = StakeDistributionSnapshot::new( - &Self::from_snapshot(&self.dir, epoch - 2).unwrap_or_else(|e| { - panic!( - "unable to open database snapshot for epoch {:?}: {:?}", - epoch - 2, - e - ) - }), - )?; + fn stake_distribution(&self, epoch: Epoch) -> Result { + StakeDistribution::new(&Self::from_snapshot(&self.dir, epoch).unwrap_or_else(|e| { + panic!( + "unable to open database snapshot for epoch {:?}: {:?}", + epoch, e + ) + })) + } + + fn rewards_summary( + &self, + stake_distr: StakeDistribution, + ) -> Result { RewardsSummary::new( - &Self::from_snapshot(&self.dir, epoch).unwrap_or_else(|e| { + &Self::from_snapshot(&self.dir, stake_distr.epoch + 2).unwrap_or_else(|e| { panic!( "unable to open database snapshot for epoch {:?}: {:?}", - epoch, e + stake_distr.epoch + 2, + e ) }), stake_distr,