Skip to content

Commit

Permalink
Drive pindexer forward with staking example (#4707)
Browse files Browse the repository at this point in the history
## Describe your changes

This PR drives the `pindexer` development forward with worked examples
of staking-related data -- using https://penumbra.today as an example
use case. With these changes, it should be possible for all of the data
on that site to be rendered directly out of Postgres, without any RPC
calls.

We should figure out how to test this -- it should be amenable to
testing since we have a good source of event data (mainnet) and
expectations about what it should look like -- but this is less critical
at this instant.

## Checklist before requesting a review

- [x] If this code contains consensus-breaking changes, I have added the
"consensus-breaking" label. Otherwise, I declare my belief that there
are not consensus-breaking changes, for the following reason:

  > Only changes to code outside `pindexer` are convenience methods.
  • Loading branch information
hdevalence authored Jul 14, 2024
1 parent c1d7a84 commit 5902460
Show file tree
Hide file tree
Showing 19 changed files with 804 additions and 15 deletions.
6 changes: 6 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions crates/bin/pindexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ publish = false
[dependencies]
cometindex = {workspace = true}
penumbra-shielded-pool = {workspace = true, default-features = false}
penumbra-stake = {workspace = true, default-features = false}
penumbra-app = {workspace = true, default-features = false}
penumbra-num = {workspace = true, default-features = false}
penumbra-asset = {workspace = true, default-features = false}
penumbra-proto = {workspace = true, default-features = false}
tokio = {workspace = true, features = ["full"]}
anyhow = {workspace = true}
serde_json = {workspace = true}
tracing = {workspace = true}
5 changes: 5 additions & 0 deletions crates/bin/pindexer/src/indexer_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,10 @@ pub trait IndexerExt: Sized {
impl IndexerExt for cometindex::Indexer {
fn with_default_penumbra_app_views(self) -> Self {
self.with_index(crate::shielded_pool::fmd::ClueSet {})
.with_index(crate::stake::ValidatorSet {})
.with_index(crate::stake::Slashings {})
.with_index(crate::stake::MissedBlocks {})
.with_index(crate::stake::DelegationTxs {})
.with_index(crate::stake::UndelegationTxs {})
}
}
4 changes: 4 additions & 0 deletions crates/bin/pindexer/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
pub use cometindex::{AppView, Indexer};

mod indexer_ext;
pub use indexer_ext::IndexerExt;

pub mod shielded_pool;

pub mod stake;
4 changes: 2 additions & 2 deletions crates/bin/pindexer/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use anyhow::Result;
use pindexer::{shielded_pool::fmd::ClueSet, Indexer};
use pindexer::{Indexer, IndexerExt as _};

#[tokio::main]
async fn main() -> Result<()> {
Indexer::new()
.with_default_tracing()
.with_index(ClueSet {})
.with_default_penumbra_app_views()
.run()
.await?;

Expand Down
6 changes: 5 additions & 1 deletion crates/bin/pindexer/src/shielded_pool/fmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ pub struct ClueSet {}

#[async_trait]
impl AppView for ClueSet {
async fn init_chain(&self, dbtx: &mut PgTransaction) -> Result<(), anyhow::Error> {
async fn init_chain(
&self,
dbtx: &mut PgTransaction,
_app_state: &serde_json::Value,
) -> Result<(), anyhow::Error> {
sqlx::query(
// table name is module path + struct name
"
Expand Down
11 changes: 11 additions & 0 deletions crates/bin/pindexer/src/stake.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
pub use delegation_txs::DelegationTxs;
pub use missed_blocks::MissedBlocks;
pub use slashings::Slashings;
pub use undelegation_txs::UndelegationTxs;
pub use validator_set::ValidatorSet;

mod delegation_txs;
mod missed_blocks;
mod slashings;
mod undelegation_txs;
mod validator_set;
82 changes: 82 additions & 0 deletions crates/bin/pindexer/src/stake/delegation_txs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
use anyhow::{anyhow, Result};
use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgTransaction};
use penumbra_num::Amount;
use penumbra_proto::{core::component::stake::v1 as pb, event::ProtoEvent};

#[derive(Debug)]
pub struct DelegationTxs {}

#[async_trait]
impl AppView for DelegationTxs {
async fn init_chain(
&self,
dbtx: &mut PgTransaction,
_app_state: &serde_json::Value,
) -> Result<()> {
// Create the table
sqlx::query(
"CREATE TABLE stake_delegation_txs (
id SERIAL PRIMARY KEY,
validator_ik BYTEA NOT NULL,
amount BIGINT NOT NULL,
height BIGINT NOT NULL,
tx_hash BYTEA NOT NULL
);",
)
.execute(dbtx.as_mut())
.await?;

// Create index on validator_ik
sqlx::query("CREATE INDEX idx_stake_delegation_txs_validator_ik ON stake_delegation_txs(validator_ik);")
.execute(dbtx.as_mut())
.await?;

// Create descending index on height
sqlx::query(
"CREATE INDEX idx_stake_delegation_txs_height ON stake_delegation_txs(height DESC);",
)
.execute(dbtx.as_mut())
.await?;

// Create composite index on validator_ik and height (descending)
sqlx::query("CREATE INDEX idx_stake_delegation_txs_validator_ik_height ON stake_delegation_txs(validator_ik, height DESC);")
.execute(dbtx.as_mut())
.await?;

Ok(())
}

fn is_relevant(&self, type_str: &str) -> bool {
type_str == "penumbra.core.component.stake.v1.EventDelegate"
}

async fn index_event(
&self,
dbtx: &mut PgTransaction,
event: &ContextualizedEvent,
) -> Result<()> {
let pe = pb::EventDelegate::from_event(event.as_ref())?;

let ik_bytes = pe
.identity_key
.ok_or_else(|| anyhow::anyhow!("missing ik in event"))?
.ik;

let amount = Amount::try_from(
pe.amount
.ok_or_else(|| anyhow::anyhow!("missing amount in event"))?,
)?;

sqlx::query(
"INSERT INTO stake_delegation_txs (validator_ik, amount, height, tx_hash) VALUES ($1, $2, $3, $4)"
)
.bind(&ik_bytes)
.bind(amount.value() as i64)
.bind(event.block_height as i64)
.bind(event.tx_hash.ok_or_else(|| anyhow!("missing tx hash in event"))?)
.execute(dbtx.as_mut())
.await?;

Ok(())
}
}
72 changes: 72 additions & 0 deletions crates/bin/pindexer/src/stake/missed_blocks.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
use anyhow::Result;
use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgTransaction};

use penumbra_proto::{core::component::stake::v1 as pb, event::ProtoEvent};

#[derive(Debug)]
pub struct MissedBlocks {}

#[async_trait]
impl AppView for MissedBlocks {
async fn init_chain(
&self,
dbtx: &mut PgTransaction,
_app_state: &serde_json::Value,
) -> Result<(), anyhow::Error> {
// Create the table
sqlx::query(
"CREATE TABLE stake_missed_blocks (
id SERIAL PRIMARY KEY,
height BIGINT NOT NULL,
ik BYTEA NOT NULL
);",
)
.execute(dbtx.as_mut())
.await?;

// Create descending index on height
sqlx::query(
"CREATE INDEX idx_stake_missed_blocks_height ON stake_missed_blocks(height DESC);",
)
.execute(dbtx.as_mut())
.await?;

// Create index on ik
sqlx::query("CREATE INDEX idx_stake_missed_blocks_ik ON stake_missed_blocks(ik);")
.execute(dbtx.as_mut())
.await?;

// Create composite index on height (descending) and ik
sqlx::query("CREATE INDEX idx_stake_missed_blocks_height_ik ON stake_missed_blocks(height DESC, ik);")
.execute(dbtx.as_mut())
.await?;

Ok(())
}

fn is_relevant(&self, type_str: &str) -> bool {
type_str == "penumbra.core.component.stake.v1.EventValidatorMissedBlock"
}

async fn index_event(
&self,
dbtx: &mut PgTransaction,
event: &ContextualizedEvent,
) -> Result<(), anyhow::Error> {
let pe = pb::EventValidatorMissedBlock::from_event(event.as_ref())?;
let ik_bytes = pe
.identity_key
.ok_or_else(|| anyhow::anyhow!("missing ik in event"))?
.ik;

let height = event.block_height;

sqlx::query("INSERT INTO stake_missed_blocks (height, ik) VALUES ($1, $2)")
.bind(height as i64)
.bind(ik_bytes)
.execute(dbtx.as_mut())
.await?;

Ok(())
}
}
83 changes: 83 additions & 0 deletions crates/bin/pindexer/src/stake/slashings.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
use anyhow::{anyhow, Result};
use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgTransaction};

use penumbra_proto::{core::component::stake::v1 as pb, event::ProtoEvent};
use penumbra_stake::IdentityKey;

#[derive(Debug)]
pub struct Slashings {}

#[async_trait]
impl AppView for Slashings {
async fn init_chain(
&self,
dbtx: &mut PgTransaction,
_app_state: &serde_json::Value,
) -> Result<(), anyhow::Error> {
sqlx::query(
"CREATE TABLE stake_slashings (
id SERIAL PRIMARY KEY,
height BIGINT NOT NULL,
ik BYTEA NOT NULL,
epoch_index BIGINT NOT NULL,
penalty TEXT NOT NULL
);",
)
.execute(dbtx.as_mut())
.await?;

sqlx::query("CREATE INDEX idx_stake_slashings_height ON stake_slashings(height);")
.execute(dbtx.as_mut())
.await?;

sqlx::query("CREATE INDEX idx_stake_slashings_ik ON stake_slashings(ik);")
.execute(dbtx.as_mut())
.await?;

sqlx::query("CREATE INDEX idx_stake_slashings_height_ik ON stake_slashings(height, ik);")
.execute(dbtx.as_mut())
.await?;

Ok(())
}

fn is_relevant(&self, type_str: &str) -> bool {
match type_str {
"penumbra.core.component.stake.v1.EventSlashingPenaltyApplied" => true,
_ => false,
}
}

async fn index_event(
&self,
dbtx: &mut PgTransaction,
event: &ContextualizedEvent,
) -> Result<(), anyhow::Error> {
let pe = pb::EventSlashingPenaltyApplied::from_event(event.as_ref())?;
let ik = IdentityKey::try_from(
pe.identity_key
.ok_or_else(|| anyhow!("missing ik in event"))?,
)?;

let height = event.block_height;
let epoch_index = pe.epoch_index;

let penalty_json = serde_json::to_string(
&pe.new_penalty
.ok_or_else(|| anyhow!("missing new_penalty"))?,
)?;

sqlx::query(
"INSERT INTO stake_slashings (height, ik, epoch_index, penalty)
VALUES ($1, $2, $3, $4)",
)
.bind(height as i64)
.bind(ik.to_bytes())
.bind(epoch_index as i64)
.bind(penalty_json)
.execute(dbtx.as_mut())
.await?;

Ok(())
}
}
Loading

0 comments on commit 5902460

Please sign in to comment.