From 68360e8399600544e6b4a016f7d7f3651ee16f7b Mon Sep 17 00:00:00 2001 From: Lucas Meier Date: Sat, 21 Sep 2024 21:43:53 -0700 Subject: [PATCH] pindexer: supply: first refactoring pass This removes a lot of repeated logic around database accesses, and then also modularizes them to hide some of the implementation details in the rest of the file. --- crates/bin/pindexer/src/indexer_ext.rs | 2 +- crates/bin/pindexer/src/supply.rs | 620 +++++++++++++------------ 2 files changed, 329 insertions(+), 293 deletions(-) diff --git a/crates/bin/pindexer/src/indexer_ext.rs b/crates/bin/pindexer/src/indexer_ext.rs index 2dc0247847..8ee1939318 100644 --- a/crates/bin/pindexer/src/indexer_ext.rs +++ b/crates/bin/pindexer/src/indexer_ext.rs @@ -11,6 +11,6 @@ impl IndexerExt for cometindex::Indexer { .with_index(crate::stake::UndelegationTxs {}) .with_index(crate::governance::GovernanceProposals {}) .with_index(crate::dex::Component::new()) - .with_index(crate::supply::Supply::new()) + .with_index(crate::supply::Component::new()) } } diff --git a/crates/bin/pindexer/src/supply.rs b/crates/bin/pindexer/src/supply.rs index 117a01583f..cea3fc4199 100644 --- a/crates/bin/pindexer/src/supply.rs +++ b/crates/bin/pindexer/src/supply.rs @@ -4,7 +4,7 @@ use anyhow::{anyhow, Context, Result}; use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgTransaction}; use penumbra_app::genesis::AppState; use penumbra_asset::asset; -use penumbra_num::{fixpoint::U128x128, Amount}; +use penumbra_num::Amount; use penumbra_proto::{ event::ProtoEvent, penumbra::core::component::funding::v1 as pb_funding, penumbra::core::component::stake::v1 as pb_stake, @@ -12,130 +12,301 @@ use penumbra_proto::{ use penumbra_stake::{rate::RateData, validator::Validator, IdentityKey}; use sqlx::{PgPool, Postgres, Transaction}; -const BPS_SQUARED: u64 = 1_0000_0000u64; +mod unstaked_supply { + //! This module handles updates around the unstaked supply. + use anyhow::Result; + use cometindex::PgTransaction; -#[derive(Clone, Copy)] -struct DelegatedSupply { - um: u64, - del_um: u64, - rate_bps2: u64, -} + /// Initialize the database tables for this module. + pub async fn init_db(dbtx: &mut PgTransaction<'_>) -> Result<()> { + sqlx::query( + r#" + CREATE TABLE IF NOT EXISTS supply_total_unstaked ( + height BIGINT PRIMARY KEY, + um BIGINT NOT NULL + ); + "#, + ) + .execute(dbtx.as_mut()) + .await?; + Ok(()) + } -impl Default for DelegatedSupply { - fn default() -> Self { - Self { - um: 0, - del_um: 0, - rate_bps2: BPS_SQUARED, - } + /// Get the supply for at a given height. + async fn get_supply(dbtx: &mut PgTransaction<'_>, height: u64) -> Result> { + let row: Option = sqlx::query_scalar( + "SELECT um FROM supply_total_unstaked WHERE height <= $1 ORDER BY height DESC LIMIT 1", + ) + .bind(i64::try_from(height)?) + .fetch_optional(dbtx.as_mut()) + .await?; + row.map(|x| u64::try_from(x)) + .transpose() + .map_err(Into::into) + } + + /// Set the supply at a given height. + async fn set_supply(dbtx: &mut PgTransaction<'_>, height: u64, supply: u64) -> Result<()> { + sqlx::query( + r#" + INSERT INTO + supply_total_unstaked + VALUES ($1, $2) + ON CONFLICT (height) + DO UPDATE SET + um = excluded.um + "#, + ) + .bind(i64::try_from(height)?) + .bind(i64::try_from(supply)?) + .execute(dbtx.as_mut()) + .await?; + Ok(()) + } + + /// Modify the supply at a given height. + /// + /// This will take the supply at the given height, and replace it with the + /// new result produced by the function. + pub async fn modify( + dbtx: &mut PgTransaction<'_>, + height: u64, + f: impl FnOnce(Option) -> Result, + ) -> Result<()> { + let supply = get_supply(dbtx, height).await?; + let new_supply = f(supply)?; + set_supply(dbtx, height, new_supply).await } } -impl DelegatedSupply { - fn modify(self, delta: u64) -> anyhow::Result { - let rate = U128x128::ratio(self.rate_bps2, BPS_SQUARED)?; - let um_delta = delta; - let del_um_delta = if rate == U128x128::from(0u128) { - 0u64 - } else { - let del_um_delta = (U128x128::from(delta) / rate)?; - let rounded = if NEGATE { - // So that we don't remove too few del_um - del_um_delta.round_up()? - } else { - // So that we don't add too many del_um - del_um_delta.round_down() - }; - rounded.try_into()? - }; - let out = if NEGATE { - Self { - um: self - .um - .checked_sub(um_delta) - .ok_or(anyhow!("supply modification failed"))?, - del_um: self - .del_um - .checked_sub(del_um_delta) - .ok_or(anyhow!("supply modification failed"))?, - rate_bps2: self.rate_bps2, - } - } else { +mod delegated_supply { + //! This module handles updates around the delegated supply to a validator. + use anyhow::{anyhow, Result}; + use cometindex::PgTransaction; + use penumbra_num::fixpoint::U128x128; + use penumbra_stake::{rate::RateData, IdentityKey}; + + const BPS_SQUARED: u64 = 1_0000_0000u64; + + /// Represents the supply around a given validator. + /// + /// The supply needs to track the amount of delegated tokens to that validator, + /// as well as the conversion rate from those tokens to the native token. + #[derive(Clone, Copy)] + pub struct Supply { + um: u64, + del_um: u64, + rate_bps2: u64, + } + + impl Default for Supply { + fn default() -> Self { Self { - um: self - .um - .checked_add(um_delta) - .ok_or(anyhow!("supply modification failed"))?, - del_um: self - .del_um - .checked_add(del_um_delta) - .ok_or(anyhow!("supply modification failed"))?, - rate_bps2: self.rate_bps2, + um: 0, + del_um: 0, + rate_bps2: BPS_SQUARED, } - }; - Ok(out) + } } - fn rate_change(self, rate_data: &RateData) -> Result { - let um = rate_data - .unbonded_amount(self.del_um.into()) - .value() - .try_into()?; + impl Supply { + /// Change the amount of um in this supply, by adding or removing um. + pub fn add_um(self, delta: i64) -> Result { + let rate = U128x128::ratio(self.rate_bps2, BPS_SQUARED)?; + let negate = delta.is_negative(); + let delta = delta.unsigned_abs(); + let um_delta = delta; + let del_um_delta = if rate == U128x128::from(0u128) { + 0u64 + } else { + let del_um_delta = (U128x128::from(delta) / rate)?; + let rounded = if negate { + // So that we don't remove too few del_um + del_um_delta.round_up()? + } else { + // So that we don't add too many del_um + del_um_delta.round_down() + }; + rounded.try_into()? + }; + let out = if negate { + Self { + um: self + .um + .checked_sub(um_delta) + .ok_or(anyhow!("supply modification failed"))?, + del_um: self + .del_um + .checked_sub(del_um_delta) + .ok_or(anyhow!("supply modification failed"))?, + rate_bps2: self.rate_bps2, + } + } else { + Self { + um: self + .um + .checked_add(um_delta) + .ok_or(anyhow!("supply modification failed"))?, + del_um: self + .del_um + .checked_add(del_um_delta) + .ok_or(anyhow!("supply modification failed"))?, + rate_bps2: self.rate_bps2, + } + }; + Ok(out) + } - Ok(Self { - um, - del_um: self.del_um, - rate_bps2: rate_data.validator_exchange_rate.value().try_into()?, - }) + /// Change the conversion rate between delegated_um and um in this supply. + pub fn change_rate(self, rate: &RateData) -> Result { + let um = rate + .unbonded_amount(self.del_um.into()) + .value() + .try_into()?; + + Ok(Self { + um, + del_um: self.del_um, + rate_bps2: rate.validator_exchange_rate.value().try_into()?, + }) + } } -} -async fn add_validator<'d>( - dbtx: &mut Transaction<'d, Postgres>, - identity_key: &IdentityKey, -) -> anyhow::Result { - let ik_string = identity_key.to_string(); - let id: Option = - sqlx::query_scalar(r#"SELECT id FROM supply_validators WHERE identity_key = $1"#) - .bind(&ik_string) - .fetch_optional(dbtx.as_mut()) - .await?; - if let Some(id) = id { - return Ok(id); + /// Initialize the database tables for this module. + pub async fn init_db<'d>(dbtx: &mut PgTransaction<'d>) -> Result<()> { + sqlx::query( + r#" + CREATE TABLE IF NOT EXISTS supply_validators ( + id SERIAL PRIMARY KEY, + identity_key TEXT NOT NULL + ); + "#, + ) + .execute(dbtx.as_mut()) + .await?; + sqlx::query( + r#" + CREATE TABLE IF NOT EXISTS supply_total_staked ( + validator_id INT REFERENCES supply_validators(id), + height BIGINT NOT NULL, + um BIGINT NOT NULL, + del_um BIGINT NOT NULL, + rate_bps2 BIGINT NOT NULL, + PRIMARY KEY (validator_id, height) + ); + "#, + ) + .execute(dbtx.as_mut()) + .await?; + Ok(()) } - let id = - sqlx::query_scalar(r#"INSERT INTO supply_validators VALUES (DEFAULT, $1) RETURNING id"#) - .bind(&ik_string) - .fetch_one(dbtx.as_mut()) - .await?; - Ok(id) -} -async fn delegated_supply_current<'d>( - dbtx: &mut Transaction<'d, Postgres>, - val_id: i32, -) -> Result> { - let row: Option<(i64, i64, i64)> = sqlx::query_as("SELECT um, del_um, rate_bps2 FROM supply_total_staked WHERE validator_id = $1 ORDER BY height DESC LIMIT 1") - .bind(val_id).fetch_optional(dbtx.as_mut()).await?; - row.map(|(um, del_um, rate_bps2)| { - let um = um.try_into()?; - let del_um = del_um.try_into()?; - let rate_bps2 = rate_bps2.try_into()?; - Ok(DelegatedSupply { - um, - del_um, - rate_bps2, + /// An opaque internal identifier for a given validator. + #[derive(Clone, Copy, PartialEq)] + pub struct ValidatorID(i32); + + /// Define a validator, returning its internal ID. + /// + /// This will have no effect if the validator has already been defined. + pub async fn define_validator( + dbtx: &mut PgTransaction<'_>, + identity_key: &IdentityKey, + ) -> Result { + let ik_string = identity_key.to_string(); + + let id: Option = + sqlx::query_scalar(r#"SELECT id FROM supply_validators WHERE identity_key = $1"#) + .bind(&ik_string) + .fetch_optional(dbtx.as_mut()) + .await?; + + if let Some(id) = id { + return Ok(ValidatorID(id)); + } + let id = sqlx::query_scalar( + r#"INSERT INTO supply_validators VALUES (DEFAULT, $1) RETURNING id"#, + ) + .bind(&ik_string) + .fetch_one(dbtx.as_mut()) + .await?; + Ok(ValidatorID(id)) + } + + /// Get the supply for a given validator at a given height. + async fn get_supply( + dbtx: &mut PgTransaction<'_>, + validator: ValidatorID, + height: u64, + ) -> Result> { + let row: Option<(i64, i64, i64)> = sqlx::query_as( + r#" + SELECT + um, del_um, rate_bps2 + FROM + supply_total_staked + WHERE + validator_id = $1 AND height <= $2 + ORDER BY height DESC + LIMIT 1 + "#, + ) + .bind(validator.0) + .bind(i64::try_from(height)?) + .fetch_optional(dbtx.as_mut()) + .await?; + row.map(|(um, del_um, rate_bps2)| { + let um = um.try_into()?; + let del_um = del_um.try_into()?; + let rate_bps2 = rate_bps2.try_into()?; + Ok(Supply { + um, + del_um, + rate_bps2, + }) }) - }) - .transpose() -} + .transpose() + } -async fn supply_current<'d>(dbtx: &mut Transaction<'d, Postgres>) -> Result { - let row: Option = - sqlx::query_scalar("SELECT um FROM supply_total_unstaked ORDER BY height DESC LIMIT 1") - .fetch_optional(dbtx.as_mut()) - .await?; - Ok(row.unwrap_or_default().try_into()?) + /// Set the supply for a given validator at a given height. + async fn set_supply( + dbtx: &mut PgTransaction<'_>, + validator: ValidatorID, + height: u64, + supply: Supply, + ) -> Result<()> { + sqlx::query( + r#" + INSERT INTO + supply_total_staked + VALUES ($1, $2, $3, $4, $5) + ON CONFLICT (validator_id, height) + DO UPDATE SET + um = excluded.um, + del_um = excluded.del_um, + rate_bps2 = excluded.rate_bps2 + "#, + ) + .bind(validator.0) + .bind(i64::try_from(height)?) + .bind(i64::try_from(supply.um)?) + .bind(i64::try_from(supply.del_um)?) + .bind(i64::try_from(supply.rate_bps2)?) + .execute(dbtx.as_mut()) + .await?; + Ok(()) + } + + /// Modify the supply for a given validator, at a given height. + pub async fn modify( + dbtx: &mut PgTransaction<'_>, + validator: ValidatorID, + height: u64, + f: impl FnOnce(Option) -> Result, + ) -> Result<()> { + let supply = get_supply(dbtx, validator, height).await?; + let new_supply = f(supply)?; + set_supply(dbtx, validator, height, new_supply).await + } } /// Supply-relevant events. @@ -185,156 +356,58 @@ impl Event { identity_key, amount, } => { - let amount = u64::try_from(amount.value())?; - - let val_id = add_validator(dbtx, identity_key).await?; - let current_supply = delegated_supply_current(dbtx, val_id).await?; - let new_supply = current_supply.unwrap_or_default().modify::(amount)?; - sqlx::query( - r#" - INSERT INTO - supply_total_staked - VALUES ($1, $2, $3, $4, $5) - ON CONFLICT (validator_id, height) - DO UPDATE SET - um = excluded.um, - del_um = excluded.del_um, - rate_bps2 = excluded.rate_bps2 - "#, - ) - .bind(val_id) - .bind(i64::try_from(*height)?) - .bind(i64::try_from(new_supply.um)?) - .bind(i64::try_from(new_supply.del_um)?) - .bind(i64::try_from(new_supply.rate_bps2)?) - .execute(dbtx.as_mut()) - .await?; - let current_um = supply_current(dbtx).await?; - let new_um = current_um - .checked_sub(amount) - .ok_or(anyhow!("um supply underflowed"))?; - sqlx::query( - r#" - INSERT INTO - supply_total_unstaked - VALUES ($1, $2) - ON CONFLICT (height) - DO UPDATE SET - um = excluded.um - "#, - ) - .bind(i64::try_from(*height)?) - .bind(i64::try_from(new_um)?) - .execute(dbtx.as_mut()) + let amount = i64::try_from(amount.value())?; + + unstaked_supply::modify(dbtx, *height, |current| { + Ok(current.unwrap_or_default() - amount as u64) + }) .await?; - Ok(()) + let validator = delegated_supply::define_validator(dbtx, identity_key).await?; + delegated_supply::modify(dbtx, validator, *height, |current| { + current.unwrap_or_default().add_um(amount) + }) + .await } Event::Undelegate { height, identity_key, unbonded_amount, } => { - let amount = u64::try_from(unbonded_amount.value())?; - - let val_id = add_validator(dbtx, identity_key).await?; - let current_supply = delegated_supply_current(dbtx, val_id).await?; - let new_supply = current_supply.unwrap_or_default().modify::(amount)?; - sqlx::query( - r#" - INSERT INTO - supply_total_staked - VALUES ($1, $2, $3, $4, $5) - ON CONFLICT (validator_id, height) - DO UPDATE SET - um = excluded.um, - del_um = excluded.del_um, - rate_bps2 = excluded.rate_bps2 - "#, - ) - .bind(val_id) - .bind(i64::try_from(*height)?) - .bind(i64::try_from(new_supply.um)?) - .bind(i64::try_from(new_supply.del_um)?) - .bind(i64::try_from(new_supply.rate_bps2)?) - .execute(dbtx.as_mut()) - .await?; - let current_um = supply_current(dbtx).await?; - let new_um = current_um - .checked_add(amount) - .ok_or(anyhow!("um supply overflowed"))?; - sqlx::query( - r#" - INSERT INTO - supply_total_unstaked - VALUES ($1, $2) - ON CONFLICT (height) - DO UPDATE SET - um = excluded.um - "#, - ) - .bind(i64::try_from(*height)?) - .bind(i64::try_from(new_um)?) - .execute(dbtx.as_mut()) + let amount = i64::try_from(unbonded_amount.value())?; + + unstaked_supply::modify(dbtx, *height, |current| { + Ok(current.unwrap_or_default() + amount as u64) + }) .await?; - Ok(()) + let validator = delegated_supply::define_validator(dbtx, identity_key).await?; + delegated_supply::modify(dbtx, validator, *height, |current| { + current.unwrap_or_default().add_um(-amount) + }) + .await } Event::FundingStreamReward { height, reward_amount, } => { let amount = u64::try_from(reward_amount.value())?; - let current_um = supply_current(dbtx).await?; - let new_um = current_um - .checked_add(amount) - .ok_or(anyhow!("um supply overflowed"))?; - sqlx::query( - r#" - INSERT INTO - supply_total_unstaked - VALUES ($1, $2) - ON CONFLICT (height) - DO UPDATE SET - um = excluded.um - "#, - ) - .bind(i64::try_from(*height)?) - .bind(i64::try_from(new_um)?) - .execute(dbtx.as_mut()) - .await?; - Ok(()) + unstaked_supply::modify(dbtx, *height, |current| { + Ok(current.unwrap_or_default() + amount) + }) + .await } Event::RateDataChange { height, identity_key, rate_data, } => { - let val_id = add_validator(dbtx, identity_key).await?; - let current_supply = delegated_supply_current(dbtx, val_id).await?; - let new_supply = current_supply.unwrap_or_default().rate_change(rate_data)?; - sqlx::query( - r#" - INSERT INTO - supply_total_staked - VALUES ($1, $2, $3, $4, $5) - ON CONFLICT (validator_id, height) - DO UPDATE SET - um = excluded.um, - del_um = excluded.del_um, - rate_bps2 = excluded.rate_bps2 - "#, - ) - .bind(val_id) - .bind(i64::try_from(*height)?) - .bind(i64::try_from(new_supply.um)?) - .bind(i64::try_from(new_supply.del_um)?) - .bind(i64::try_from(new_supply.rate_bps2)?) - .execute(dbtx.as_mut()) - .await?; - - Ok(()) + let validator = delegated_supply::define_validator(dbtx, identity_key).await?; + delegated_supply::modify(dbtx, validator, *height, |current| { + current.unwrap_or_default().change_rate(rate_data) + }) + .await } } } @@ -433,12 +506,9 @@ async fn add_genesis_native_token_allocation_supply<'a>( } // Add community pool allocation unstaked_native_token_sum += content.community_pool_content.initial_balance.amount; + let unstaked_native_token_sum = u64::try_from(unstaked_native_token_sum.value())?; - sqlx::query("INSERT INTO supply_total_unstaked (height, um) VALUES ($1, $2)") - .bind(0i64) - .bind(unstaked_native_token_sum.value() as i64) - .execute(dbtx.as_mut()) - .await?; + unstaked_supply::modify(dbtx, 0, |_| Ok(unstaked_native_token_sum)).await?; let mut allos = BTreeMap::::new(); for allo in &content.shielded_pool_content.allocations { @@ -452,29 +522,29 @@ async fn add_genesis_native_token_allocation_supply<'a>( // at genesis, assume a 1:1 ratio between delegation amount and native token amount. for val in &content.stake_content.validators { let val = Validator::try_from(val.clone())?; - let delegation_amount = allos.get(&val.token().id()).cloned().unwrap_or_default(); - - let val_id = add_validator(dbtx, &val.identity_key).await?; - - sqlx::query("INSERT INTO supply_total_staked (height, validator_id, um, del_um, rate_bps2) VALUES ($1, $2, $3, $4, $5)") - .bind(0i64) - .bind(val_id) - .bind(delegation_amount.value() as i64) - .bind(delegation_amount.value() as i64) - .bind(BPS_SQUARED as i64) - .execute(dbtx.as_mut()) - .await?; + let delegation_amount: i64 = allos + .get(&val.token().id()) + .cloned() + .unwrap_or_default() + .value() + .try_into()?; + + let val_id = delegated_supply::define_validator(dbtx, &val.identity_key).await?; + delegated_supply::modify(dbtx, val_id, 0, |_| { + delegated_supply::Supply::default().add_um(delegation_amount) + }) + .await?; } Ok(()) } #[derive(Debug)] -pub struct Supply { +pub struct Component { event_strings: HashSet<&'static str>, } -impl Supply { +impl Component { pub fn new() -> Self { let event_strings = Event::NAMES.into_iter().collect(); Self { event_strings } @@ -482,48 +552,14 @@ impl Supply { } #[async_trait] -impl AppView for Supply { +impl AppView for Component { async fn init_chain( &self, dbtx: &mut PgTransaction, app_state: &serde_json::Value, ) -> Result<(), anyhow::Error> { - sqlx::query( - // table name is module path + struct name - " -CREATE TABLE IF NOT EXISTS supply_total_unstaked ( - height BIGINT PRIMARY KEY, - um BIGINT NOT NULL -); -", - ) - .execute(dbtx.as_mut()) - .await?; - sqlx::query( - // table name is module path + struct name - " -CREATE TABLE IF NOT EXISTS supply_validators ( - id SERIAL PRIMARY KEY, - identity_key TEXT NOT NULL -); -", - ) - .execute(dbtx.as_mut()) - .await?; - sqlx::query( - " -CREATE TABLE IF NOT EXISTS supply_total_staked ( - validator_id INT REFERENCES supply_validators(id), - height BIGINT NOT NULL, - um BIGINT NOT NULL, - del_um BIGINT NOT NULL, - rate_bps2 BIGINT NOT NULL, - PRIMARY KEY (validator_id, height) -); -", - ) - .execute(dbtx.as_mut()) - .await?; + unstaked_supply::init_db(dbtx).await?; + delegated_supply::init_db(dbtx).await?; // decode the initial supply from the genesis // initial app state is not recomputed from events, because events are not emitted in init_chain.