Skip to content

Commit

Permalink
Merge pull request #100 from pragma-org/rolling-stake-distributions
Browse files Browse the repository at this point in the history
Rolling stake distributions
  • Loading branch information
KtorZ authored Feb 6, 2025
2 parents 84722c6 + 43bd08b commit 7860df4
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 50 deletions.
4 changes: 2 additions & 2 deletions crates/amaru/tests/rewards.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@

use std::path::PathBuf;

use amaru_ledger::{rewards::StakeDistributionSnapshot, store::RewardsSummary};
use amaru_ledger::{rewards::StakeDistribution, store::RewardsSummary};
use amaru_stores::rocksdb::RocksDB;
use pallas_primitives::Epoch;

const LEDGER_DB: &str = "../../ledger.db";

fn compare_preprod_snapshot(epoch: Epoch) {
let snapshot = StakeDistributionSnapshot::new(
let snapshot = StakeDistribution::new(
&RocksDB::from_snapshot(&PathBuf::from(LEDGER_DB), epoch)
.unwrap_or_else(|_| panic!("Failed to open ledger snapshot for epoch {}", epoch)),
)
Expand Down
1 change: 1 addition & 0 deletions crates/consensus/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ impl Stage {
.or_panic()
}

#[instrument(level = Level::INFO, skip(self))]
async fn switch_to_fork(
&mut self,
peer: &Peer,
Expand Down
45 changes: 21 additions & 24 deletions crates/ledger/src/rewards.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,20 +61,20 @@ each epoch (related to different snapshots) since it is a continuous cycle.
Computing rewards[^1] using:
│ - snapshot(e + 1) for
│ - snapshot(e + 2) for
│ - pool performances
│ - treasury & reserves
Stake is delegated │ - snapshot(e) for:
│ - stake distribution
│ - pool parameters
│ Using snapshot(e - 1)
for leader schedule │ Distributing rewards
│ │ earned from (e)
│ │ │
snapshot(e - 1) snapshot(e) │ snapshot(e + 1) │ │snapshot(e + 2)
╽ ╽ ╽ ╽ ╽╽
Stake is delegated │ - snapshot(e) for:
│ - stake distribution
│ - pool parameters
Using snapshot(e)
for leader schedule │ Distributing rewards
│ │ earned from (e)
│ │ │
snapshot(e) snapshot(e+1) │ snapshot(e + 2) │ │snapshot(e + 3)
╽ ╽ ╽ ╽ ╽╽
━━━━━━━━━━━━╸╸╸╋━━━━━━━━━━━━━━━━╸╸╸╋╸╸╸━━━━━━━━━━━━━━━━╸╸╸╋╸╸╸━━━━━━━━━━━━━━━╸╸╸╋╸╸╸━━━━━━━━>
e - 1 e e + 1 e + 2 e + 3
e e + 1 e + 2 e + 3 e + 4
[^1]: Technically, we need to wait a few slots for the snapshot (e + 1) to stabilise; otherwise
we risk doing an expensive computation which may be rolled back. In practice, the calculation
Expand Down Expand Up @@ -124,9 +124,9 @@ const EVENT_TARGET: &str = "amaru::ledger::state::rewards";
/// Note that the `keys` and `scripts `field only contains _active_ accounts; that is, accounts
/// delegated to a registered stake pool.
#[derive(Debug)]
pub struct StakeDistributionSnapshot {
pub struct StakeDistribution {
/// Epoch number for this snapshot (taken at the end of the epoch)
epoch: Epoch,
pub epoch: Epoch,

/// Total stake, in Lovelace, delegated to registered pools
active_stake: Lovelace,
Expand All @@ -147,7 +147,7 @@ pub struct StakeDistributionSnapshot {
pools: BTreeMap<PoolId, PoolState>,
}

impl StakeDistributionSnapshot {
impl StakeDistribution {
/// Clompute a new stake distribution snapshot using data available in the `Store`.
///
/// Invariant: The given store is expected to be a snapshot taken at the end of an epoch.
Expand Down Expand Up @@ -238,15 +238,15 @@ impl StakeDistributionSnapshot {
let epoch = db.most_recent_snapshot();

info!(
name: "stake_distribution.snapshot",
target: EVENT_TARGET,
epoch = ?epoch,
active_stake = ?active_stake,
accounts = ?(keys.len() + scripts.len()),
pools = ?pools.len(),
"stake_distribution.snapshot",
);

Ok(StakeDistributionSnapshot {
Ok(StakeDistribution {
epoch,
active_stake,
keys,
Expand All @@ -256,9 +256,9 @@ impl StakeDistributionSnapshot {
}
}

impl serde::Serialize for StakeDistributionSnapshot {
impl serde::Serialize for StakeDistribution {
fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
let mut s = serializer.serialize_struct("StakeDistributionSnapshot", 5)?;
let mut s = serializer.serialize_struct("StakeDistribution", 5)?;
s.serialize_field("epoch", &self.epoch)?;
s.serialize_field("active_stake", &self.active_stake)?;
s.serialize_field("keys", &self.keys)?;
Expand Down Expand Up @@ -604,10 +604,7 @@ impl serde::Serialize for RewardsSummary {
}

impl RewardsSummary {
pub fn new<E>(
db: &impl Store<Error = E>,
snapshot: StakeDistributionSnapshot,
) -> Result<Self, E> {
pub fn new<E>(db: &impl Store<Error = E>, snapshot: StakeDistribution) -> Result<Self, E> {
let pots = db.with_pots(|entry| Pots::from(entry.borrow()))?;

let (mut blocks_count, mut blocks_per_pool) = RewardsSummary::count_blocks(db)?;
Expand Down Expand Up @@ -682,7 +679,6 @@ impl RewardsSummary {
});

info!(
name: "rewards.summary",
target: EVENT_TARGET,
epoch = ?snapshot.epoch,
?efficiency,
Expand All @@ -694,6 +690,7 @@ impl RewardsSummary {
pots.reserves = ?pots.reserves,
pots.treasury = ?pots.treasury,
pots.fees = ?pots.fees,
"rewards.summary",
);

Ok(RewardsSummary {
Expand Down Expand Up @@ -759,7 +756,7 @@ impl RewardsSummary {
blocks_count: u64,
available_rewards: Lovelace,
total_stake: Lovelace,
snapshot: &StakeDistributionSnapshot,
snapshot: &StakeDistribution,
pool: &PoolState,
) -> PoolRewards {
let owner_stake = pool.owner_stake(&snapshot.keys);
Expand Down
51 changes: 48 additions & 3 deletions crates/ledger/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ use crate::{
self, epoch_from_slot, Hash, Hasher, MintedBlock, Point, PoolId, PoolParams, PoolSigma,
TransactionInput, TransactionOutput, CONSENSUS_SECURITY_PARAM, STABILITY_WINDOW,
},
rewards::RewardsSummary,
rewards::{RewardsSummary, StakeDistribution},
state::volatile_db::{StoreUpdate, VolatileDB, VolatileState},
store::{columns::*, Store},
};
use std::{
borrow::Cow,
collections::BTreeSet,
collections::{BTreeSet, VecDeque},
sync::{Arc, Mutex},
};
use tracing::{debug, debug_span, info_span, Span};
Expand Down Expand Up @@ -61,10 +61,38 @@ where
/// The computed rewards summary to be applied on the next epoch boundary. This is computed
/// once in the epoch, and held until the end where it is reset.
rewards_summary: Option<RewardsSummary>,

/// The latest stake distributions, used to determine the leader-schedule for the ongoing epoch
/// and as well as the rewards.
stake_distributions: VecDeque<StakeDistribution>,
}

impl<S: Store<Error = E>, E: std::fmt::Debug> State<S, E> {
pub fn new(stable: Arc<Mutex<S>>) -> Self {
let db = stable.lock().unwrap();

// NOTE: Initialize stake distribution held in-memory. The one before last is needed by the
// consensus layer to validate the leader schedule, while the one before that will be
// consumed for the rewards calculation.
//
// We store them as a bounded queue, such that we always ever have 2 stake distributions
// available. Once we consume the oldest one, we immediately insert a new one for the epoch
// that just passed, pushing the other forward.
//
// Note that a possible memory optimization could be to discard all accounts from the most
// recent ones, since we only truly need the stake pool distribution for the leader
// schedule whereas accounts can be quite numerous.
let latest_epoch = db.most_recent_snapshot();
let mut stake_distributions = VecDeque::new();
for epoch in latest_epoch - 2..=latest_epoch - 1 {
stake_distributions.push_front(
db.stake_distribution(epoch)
.unwrap_or_else(|e| panic!("unable to get current stake distribution: {e:?}")),
);
}

drop(db);

Self {
stable,

Expand All @@ -81,6 +109,8 @@ impl<S: Store<Error = E>, E: std::fmt::Debug> State<S, E> {
volatile: VolatileDB::default(),

rewards_summary: None,

stake_distributions,
}
}

Expand Down Expand Up @@ -184,7 +214,16 @@ impl<S: Store<Error = E>, E: std::fmt::Debug> State<S, E> {
// Once we reach the stability window,
if self.rewards_summary.is_none() && relative_slot >= STABILITY_WINDOW as u64 {
self.rewards_summary = Some(
db.rewards_summary(current_epoch - 1)
db.rewards_summary(
self.stake_distributions
.pop_back()
.unwrap_or_else(|| panic!("no readily available stake distribution?")),
)
.map_err(ForwardErr::StorageErr)?,
);

self.stake_distributions.push_front(
db.stake_distribution(current_epoch - 1)
.map_err(ForwardErr::StorageErr)?,
);
}
Expand All @@ -201,6 +240,12 @@ impl<S: Store<Error = E>, E: std::fmt::Debug> State<S, E> {
}

pub fn backward<'b>(&mut self, to: &'b Point) -> Result<(), BackwardErr<'b>> {
// NOTE: This happens typically on start-up; The consensus layer will typically ask us to
// rollback to the last known point, which ought to be the tip of the database.
if self.volatile.is_empty() && self.tip().as_ref() == to {
return Ok(());
}

self.volatile
.rollback_to(to, BackwardErr::UnknownRollbackPoint(to))
}
Expand Down
12 changes: 9 additions & 3 deletions crates/ledger/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
pub mod columns;

use super::kernel::{Epoch, Point, PoolId, TransactionInput, TransactionOutput};
pub use crate::rewards::RewardsSummary;
pub use crate::rewards::{RewardsSummary, StakeDistribution};
use columns::*;
use std::{borrow::BorrowMut, iter};

Expand Down Expand Up @@ -75,8 +75,14 @@ pub trait Store: Send + Sync {
input: &TransactionInput,
) -> Result<Option<TransactionOutput>, Self::Error>;

/// Compute rewards using database snapshots.
fn rewards_summary(&self, epoch: Epoch) -> Result<RewardsSummary, Self::Error>;
/// Compute stake distribution using database snapshots.
fn stake_distribution(&self, epoch: Epoch) -> Result<StakeDistribution, Self::Error>;

/// Compute rewards using database snapshots and a previously computed stake distribution.
fn rewards_summary(
&self,
stake_distribution: StakeDistribution,
) -> Result<RewardsSummary, Self::Error>;

/// Get current values of the treasury and reserves accounts.
fn with_pots<A>(
Expand Down
41 changes: 23 additions & 18 deletions crates/stores/src/rocksdb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use amaru_ledger::{
borrow::{borrowable_proxy::BorrowableProxy, IterBorrow},
},
kernel::{Epoch, Point, PoolId, TransactionInput, TransactionOutput},
rewards::StakeDistributionSnapshot,
rewards::StakeDistribution,
store::{columns as scolumns, Columns, RewardsSummary, Store},
};
use columns::*;
Expand All @@ -30,7 +30,7 @@ use std::{
fmt, fs, io,
path::{Path, PathBuf},
};
use tracing::{debug, debug_span, info, warn};
use tracing::{debug, debug_span, info, info_span, warn};

pub mod columns;
pub mod common;
Expand Down Expand Up @@ -91,7 +91,6 @@ impl RocksDB {
.unwrap_or_default()
.parse::<Epoch>()
{
info!(target: EVENT_TARGET, epoch, "new.found_snapshot");
snapshots.push(epoch);
} else if entry.file_name() != DIR_LIVE_DB {
warn!(
Expand All @@ -104,6 +103,8 @@ impl RocksDB {

snapshots.sort();

info!(target: EVENT_TARGET, snapshots = ?snapshots, "new.known_snapshots");

let mut opts = Options::default();
opts.create_if_missing(true);
opts.set_prefix_extractor(SliceTransform::create_fixed_prefix(PREFIX_LEN));
Expand Down Expand Up @@ -209,11 +210,11 @@ impl Store for RocksDB {
pools::add(&batch, add.pools)?;
accounts::add(&batch, add.accounts)?;

accounts::reset(&batch, withdrawals)?;

utxo::remove(&batch, remove.utxo)?;
pools::remove(&batch, remove.pools)?;
accounts::remove(&batch, remove.accounts)?;

accounts::reset(&batch, withdrawals)?;
}
}

Expand All @@ -235,7 +236,7 @@ impl Store for RocksDB {
let snapshot = self.snapshots.last().map(|s| s + 1).unwrap_or(epoch);
if snapshot == epoch {
if let Some(mut rewards_summary) = rewards_summary {
debug_span!(target: EVENT_TARGET, "snapshot.applying_rewards").in_scope(|| {
info_span!(target: EVENT_TARGET, "snapshot.applying_rewards").in_scope(|| {
self.with_accounts(|iterator| {
for (account, mut row) in iterator {
if let Some(rewards) = rewards_summary.extract_rewards(&account) {
Expand Down Expand Up @@ -295,21 +296,25 @@ impl Store for RocksDB {
Ok(())
}

fn rewards_summary(&self, epoch: Epoch) -> Result<RewardsSummary, Self::Error> {
let stake_distr = StakeDistributionSnapshot::new(
&Self::from_snapshot(&self.dir, epoch - 2).unwrap_or_else(|e| {
panic!(
"unable to open database snapshot for epoch {:?}: {:?}",
epoch - 2,
e
)
}),
)?;
fn stake_distribution(&self, epoch: Epoch) -> Result<StakeDistribution, Self::Error> {
StakeDistribution::new(&Self::from_snapshot(&self.dir, epoch).unwrap_or_else(|e| {
panic!(
"unable to open database snapshot for epoch {:?}: {:?}",
epoch, e
)
}))
}

fn rewards_summary(
&self,
stake_distr: StakeDistribution,
) -> Result<RewardsSummary, Self::Error> {
RewardsSummary::new(
&Self::from_snapshot(&self.dir, epoch).unwrap_or_else(|e| {
&Self::from_snapshot(&self.dir, stake_distr.epoch + 2).unwrap_or_else(|e| {
panic!(
"unable to open database snapshot for epoch {:?}: {:?}",
epoch, e
stake_distr.epoch + 2,
e
)
}),
stake_distr,
Expand Down

0 comments on commit 7860df4

Please sign in to comment.