diff --git a/Cargo.lock b/Cargo.lock index 65451b53d4..b2ce05e88f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5824,6 +5824,7 @@ dependencies = [ "num-bigint", "penumbra-app", "penumbra-asset", + "penumbra-dex", "penumbra-num", "penumbra-proto", "penumbra-shielded-pool", diff --git a/crates/bin/pindexer/Cargo.toml b/crates/bin/pindexer/Cargo.toml index 92a3d75445..a02dae2b84 100644 --- a/crates/bin/pindexer/Cargo.toml +++ b/crates/bin/pindexer/Cargo.toml @@ -18,6 +18,7 @@ num-bigint = { version = "0.4" } penumbra-shielded-pool = {workspace = true, default-features = false} penumbra-stake = {workspace = true, default-features = false} penumbra-app = {workspace = true, default-features = false} +penumbra-dex = {workspace = true, default-features = false} penumbra-num = {workspace = true, default-features = false} penumbra-asset = {workspace = true, default-features = false} penumbra-proto = {workspace = true, default-features = false} diff --git a/crates/bin/pindexer/src/dex/dex.sql b/crates/bin/pindexer/src/dex/dex.sql index e1e93b11ee..72ff858a6e 100644 --- a/crates/bin/pindexer/src/dex/dex.sql +++ b/crates/bin/pindexer/src/dex/dex.sql @@ -9,6 +9,13 @@ -- that given an `penumbra_asset::asset::Id`, we always know exactly how to filter -- tables, rather than needing to do a join with another table. +CREATE DOMAIN IF NOT EXISTS Amount AS NUMERIC(39, 0) NOT NULL; + +CREATE TYPE Value AS ( + amount Amount, + asset BYTEA NOT NULL +); + -- Keeps track of changes to the dex's value circuit breaker. CREATE TABLE IF NOT EXISTS dex_value_circuit_breaker_change ( -- The asset being moved into or out of the dex. @@ -16,5 +23,27 @@ CREATE TABLE IF NOT EXISTS dex_value_circuit_breaker_change ( -- The flow, either positive, or negative, into the dex via this particular asset. -- -- Because we're dealing with arbitrary assets, we need to use something which can store u128 - flow NUMERIC(39, 0) NOT NULL + flow Amount +); + +-- One step of an execution trace. +CREATE TABLE IF NOT EXISTS trace_step ( + id SERIAL PRIMARY KEY, + value Value, +); + +-- A single trace, showing what a small amount of an input asset was exchanged for. +CREATE TABLE IF NOT EXISTS trace ( + id SERIAL PRIMARY KEY, + step_start INTEGER REFERENCES trace_step(id), + step_end INTEGER REFERENCES trace_step(id), +); + +--- Represents instances where arb executions happened. +CREATE TABLE IF NOT EXISTS arb ( + height BIGINT PRIMARY KEY, + input Value, + output Value, + trace_start INTEGER REFERENCES arb_traces(id), + trace_end INTEGER REFERENCES arb_traces(id), ); diff --git a/crates/bin/pindexer/src/dex/mod.rs b/crates/bin/pindexer/src/dex/mod.rs index e854130375..1d1910c78a 100644 --- a/crates/bin/pindexer/src/dex/mod.rs +++ b/crates/bin/pindexer/src/dex/mod.rs @@ -3,6 +3,7 @@ use std::collections::HashSet; use anyhow::anyhow; use cometindex::async_trait; use penumbra_asset::asset::Id as AssetId; +use penumbra_dex::SwapExecution; use penumbra_num::Amount; use penumbra_proto::{event::ProtoEvent, penumbra::core::component::dex::v1 as pb}; use sqlx::{PgPool, Postgres, Transaction}; @@ -11,7 +12,7 @@ use crate::sql::Sql; use crate::{AppView, ContextualizedEvent, PgTransaction}; /// One of the possible events that we care about. -#[derive(Clone, Copy, Debug)] +#[derive(Clone, Debug)] enum Event { /// A parsed version of [pb::EventValueCircuitBreakerCredit]. CircuitBreakerCredit { @@ -25,17 +26,23 @@ enum Event { previous_balance: Amount, new_balance: Amount, }, + /// A parsed version of [pb::EventArbExecution] + ArbExecution { + height: u64, + execution: SwapExecution, + }, } impl Event { - const NAMES: [&'static str; 2] = [ + const NAMES: [&'static str; 3] = [ "penumbra.core.component.dex.v1.EventValueCircuitBreakerCredit", "penumbra.core.component.dex.v1.EventValueCircuitBreakerDebit", + "penumbra.core.component.dex.v1.EventArbExecution", ]; /// Index this event, using the handle to the postgres transaction. async fn index<'d>(&self, dbtx: &mut Transaction<'d, Postgres>) -> anyhow::Result<()> { - match *self { + match self { Event::CircuitBreakerCredit { asset_id, previous_balance, @@ -52,7 +59,7 @@ impl Event { VALUES ($1, $2); "#, ) - .bind(Sql::from(asset_id)) + .bind(Sql::from(*asset_id)) .bind(Sql::from(amount)) .execute(dbtx.as_mut()) .await?; @@ -74,12 +81,55 @@ impl Event { VALUES ($1, -$2); "#, ) - .bind(Sql::from(asset_id)) + .bind(Sql::from(*asset_id)) .bind(Sql::from(amount)) .execute(dbtx.as_mut()) .await?; Ok(()) } + Event::ArbExecution { height, execution } => { + let mut trace_start = None; + let mut trace_end = None; + for trace in &execution.traces { + let mut step_start = None; + let mut step_end = None; + for step in trace { + let (id,): (i64,) = sqlx::query_as( + r#"INSERT INTO trace_step VALUES (DEFAULT, ($1, $2)) RETURNING id;"#, + ) + .bind(Sql::from(step.amount)) + .bind(Sql::from(step.asset_id)) + .fetch_one(dbtx.as_mut()) + .await?; + if let None = step_start { + step_start = Some(id); + } + step_end = Some(id); + } + let (id,): (i64,) = sqlx::query_as( + r#"INSERT INTO trace VALUES (DEFAULT, $1, $2) RETURNING id;"#, + ) + .bind(step_start) + .bind(step_end) + .fetch_one(dbtx.as_mut()) + .await?; + if let None = trace_start { + trace_start = Some(id); + } + trace_end = Some(id); + } + sqlx::query(r#"INSERT INTO arb VALUES ($1, ($2, $3), ($4, $5), $6, $7);"#) + .bind(i64::try_from(*height)?) + .bind(Sql::from(execution.input.amount)) + .bind(Sql::from(execution.input.asset_id)) + .bind(Sql::from(execution.output.amount)) + .bind(Sql::from(execution.output.asset_id)) + .bind(trace_start) + .bind(trace_end) + .execute(dbtx.as_mut()) + .await?; + Ok(()) + } } } } @@ -123,6 +173,16 @@ impl<'a> TryFrom<&'a ContextualizedEvent> for Event { new_balance, }) } + // Arb + x if x == Event::NAMES[2] => { + let pe = pb::EventArbExecution::from_event(event.as_ref())?; + let height = pe.height; + let execution = pe + .swap_execution + .ok_or(anyhow!("missing swap execution"))? + .try_into()?; + Ok(Self::ArbExecution { height, execution }) + } x => Err(anyhow!(format!("unrecognized event kind: {x}"))), } } @@ -157,7 +217,7 @@ impl AppView for Component { self.event_strings.contains(type_str) } - #[tracing::instrument(skip_all, fields(height = event.block_height))] + #[tracing::instrument(skip_all, fields(height = event.block_height, name = event.event.kind.as_str()))] async fn index_event( &self, dbtx: &mut PgTransaction,