diff --git a/crates/bin/pindexer/Cargo.toml b/crates/bin/pindexer/Cargo.toml index a02dae2b84..bb53133d72 100644 --- a/crates/bin/pindexer/Cargo.toml +++ b/crates/bin/pindexer/Cargo.toml @@ -24,5 +24,5 @@ penumbra-asset = {workspace = true, default-features = false} penumbra-proto = {workspace = true, default-features = false} tokio = {workspace = true, features = ["full"]} serde_json = {workspace = true} -sqlx = { workspace = true, features = ["bigdecimal", "chrono", "postgres"] } +sqlx = { workspace = true, features = ["bigdecimal", "chrono", "postgres", "json"] } tracing = {workspace = true} diff --git a/crates/bin/pindexer/src/block_events.rs b/crates/bin/pindexer/src/block_events.rs new file mode 100644 index 0000000000..173f53f363 --- /dev/null +++ b/crates/bin/pindexer/src/block_events.rs @@ -0,0 +1,165 @@ +use anyhow::Result; +use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgTransaction}; +use penumbra_proto::{ + core::component::{ + auction::v1 as auction_pb, dex::v1 as dex_pb, sct::v1 as sct_pb, stake::v1 as stake_pb, + }, + event::ProtoEvent, +}; + +#[derive(Debug)] +pub struct BlockEvents {} + +#[async_trait] +impl AppView for BlockEvents { + async fn init_chain( + &self, + dbtx: &mut PgTransaction, + _: &serde_json::Value, + ) -> Result<(), anyhow::Error> { + sqlx::query( + "CREATE TABLE IF NOT EXISTS block_events ( + id SERIAL PRIMARY KEY, + height BIGINT UNIQUE, + events JSONB NOT NULL + );", + ) + .execute(dbtx.as_mut()) + .await?; + + sqlx::query("CREATE INDEX IF NOT EXISTS idx_height ON block_events(height DESC);") + .execute(dbtx.as_mut()) + .await?; + Ok(()) + } + + fn is_relevant(&self, type_str: &str) -> bool { + // Current known set of ABCI events emitted that can be block events + match type_str { + "block" => true, + "penumbra.core.component.auction.v1.EventDutchAuctionEnded" => true, + "penumbra.core.component.auction.v1.EventDutchAuctionUpdated" => true, + "penumbra.core.component.auction.v1.EventValueCircuitBreakerCredit" => true, + "penumbra.core.component.auction.v1.EventValueCircuitBreakerDebit" => true, + "penumbra.core.component.dex.v1.EventArbExecution" => true, + "penumbra.core.component.dex.v1.EventBatchSwap" => true, + "penumbra.core.component.dex.v1.EventPositionClose" => true, + "penumbra.core.component.dex.v1.EventPositionExecution" => true, + "penumbra.core.component.dex.v1.EventPositionOpen" => true, + "penumbra.core.component.dex.v1.EventPositionWithdraw" => true, + "penumbra.core.component.dex.v1.EventValueCircuitBreakerCredit" => true, + "penumbra.core.component.dex.v1.EventValueCircuitBreakerDebit" => true, + "penumbra.core.component.sct.v1.EventAnchor" => true, + "penumbra.core.component.sct.v1.EventBlockRoot" => true, + "penumbra.core.component.sct.v1.EventCommitment" => true, + "penumbra.core.component.sct.v1.EventEpochRoot" => true, + "penumbra.core.component.stake.v1.EventTombstoneValidator" => true, + _ => false, + } + } + + async fn index_event( + &self, + dbtx: &mut PgTransaction, + event: &ContextualizedEvent, + _src_db: &sqlx::PgPool, + ) -> Result<(), anyhow::Error> { + // Transaction Event, not a Block Event. + if event.tx_hash.is_some() { + return Ok(()); + } + + match event.event.kind.as_str() { + // This event type isn't real as far as I can tell. Not sure what to do with it. + "block" => {} + // EventBlockRoot should always be first... Right? + "penumbra.core.component.sct.v1.EventBlockRoot" => { + handle_block_event::(dbtx, event).await? + } + "penumbra.core.component.sct.v1.EventAnchor" => { + handle_block_event::(dbtx, event).await? + } + "penumbra.core.component.sct.v1.EventCommitment" => { + handle_block_event::(dbtx, event).await? + } + "penumbra.core.component.sct.v1.EventEpochRoot" => { + handle_block_event::(dbtx, event).await? + } + "penumbra.core.component.auction.v1.EventDutchAuctionEnded" => { + handle_block_event::(dbtx, event).await? + } + "penumbra.core.component.auction.v1.EventDutchAuctionUpdated" => { + handle_block_event::(dbtx, event).await? + } + "penumbra.core.component.auction.v1.EventValueCircuitBreakerCredit" => { + handle_block_event::(dbtx, event) + .await? + } + "penumbra.core.component.auction.v1.EventValueCircuitBreakerDebit" => { + handle_block_event::(dbtx, event).await? + } + "penumbra.core.component.dex.v1.EventArbExecution" => { + handle_block_event::(dbtx, event).await? + } + "penumbra.core.component.dex.v1.EventBatchSwap" => { + handle_block_event::(dbtx, event).await? + } + "penumbra.core.component.dex.v1.EventPositionClose" => { + handle_block_event::(dbtx, event).await? + } + "penumbra.core.component.dex.v1.EventPositionExecution" => { + handle_block_event::(dbtx, event).await? + } + "penumbra.core.component.dex.v1.EventPositionOpen" => { + handle_block_event::(dbtx, event).await? + } + "penumbra.core.component.dex.v1.EventPositionWithdraw" => { + handle_block_event::(dbtx, event).await? + } + "penumbra.core.component.dex.v1.EventValueCircuitBreakerCredit" => { + handle_block_event::(dbtx, event).await? + } + "penumbra.core.component.dex.v1.EventValueCircuitBreakerDebit" => { + handle_block_event::(dbtx, event).await? + } + "penumbra.core.component.stake.v1.EventTombstoneValidator" => { + handle_block_event::(dbtx, event).await? + } + _ => {} + } + Ok(()) + } +} + +async fn handle_block_event<'a, E: ProtoEvent>( + dbtx: &mut PgTransaction<'a>, + event: &ContextualizedEvent, +) -> Result<()> { + let height = event.block_height; + let pe = E::from_event(event.as_ref())?; + // Create a json of the form { "PROTOBUF_EVENT_SCHEMA_URI": "PROTOBUF_EVENT_JSON_STRING" } + let json_event = serde_json::json!({ + event.event.kind.as_str(): &pe + }); + let affected = sqlx::query( + " + INSERT INTO block_events(height, events) + VALUES ($1, JSONB_BUILD_ARRAY($2)) + ON CONFLICT(height) + DO UPDATE + SET + events = JSONB_INSERT(EXCLUDED.events, '{0}', $2) + ", + ) + .bind(height as i64) + .bind(&json_event) + .execute(dbtx.as_mut()) + .await? + .rows_affected(); + + if affected == 0 { + anyhow::bail!("No block found for this event!"); + } + + Ok(()) +} diff --git a/crates/bin/pindexer/src/lib.rs b/crates/bin/pindexer/src/lib.rs index 9c443724d8..2132ae5d27 100644 --- a/crates/bin/pindexer/src/lib.rs +++ b/crates/bin/pindexer/src/lib.rs @@ -3,6 +3,7 @@ pub use cometindex::{opt::Options, AppView, ContextualizedEvent, Indexer, PgPool mod indexer_ext; pub use indexer_ext::IndexerExt; pub mod block; +pub mod block_events; pub mod dex; pub mod shielded_pool; mod sql; diff --git a/crates/bin/pindexer/src/main.rs b/crates/bin/pindexer/src/main.rs index 71989105ff..72854e305c 100644 --- a/crates/bin/pindexer/src/main.rs +++ b/crates/bin/pindexer/src/main.rs @@ -1,6 +1,8 @@ use anyhow::Result; use clap::Parser as _; -use pindexer::block::Block; +// TODO: Fix timestamp extraction. Currently throws a hard panic because one is never found +// use pindexer::block::Block; +use pindexer::block_events::BlockEvents; use pindexer::{Indexer, IndexerExt as _, Options}; #[tokio::main] @@ -8,7 +10,8 @@ async fn main() -> Result<()> { Indexer::new(Options::parse()) .with_default_tracing() .with_default_penumbra_app_views() - .with_index(Block {}) + // .with_index(Block {}) + .with_index(BlockEvents {}) .run() .await?;