diff --git a/crates/bin/pindexer/src/dex/dex.sql b/crates/bin/pindexer/src/dex/dex.sql index 8fbc2f764c..720dbce968 100644 --- a/crates/bin/pindexer/src/dex/dex.sql +++ b/crates/bin/pindexer/src/dex/dex.sql @@ -113,3 +113,36 @@ CREATE TABLE IF NOT EXISTS dex_lp_execution ( -- The end asset for this execution. context_end BYTEA NOT NULL ); + +--- Represents instances where swap executions happened. +CREATE TABLE IF NOT EXISTS dex_batch_swap ( + height BIGINT PRIMARY KEY, + trace12_start INTEGER REFERENCES dex_trace (id), + trace12_end INTEGER REFERENCES dex_trace (id), + trace21_start INTEGER REFERENCES dex_trace (id), + trace21_end INTEGER REFERENCES dex_trace (id), + asset1 BYTEA NOT NULL, + asset2 BYTEA NOT NULL, + unfilled1 Amount NOT NULL, + unfilled2 Amount NOT NULL, + delta1 Amount NOT NULL, + delta2 Amount NOT NULL, + lambda1 Amount NOT NULL, + lambda2 Amount NOT NULL +); + +CREATE INDEX ON dex_batch_swap(height); +CREATE INDEX ON dex_batch_swap(asset1, height); +CREATE INDEX ON dex_batch_swap(asset2, height); + +-- Represents instances of invididual swaps into the batch. +CREATE TABLE IF NOT EXISTS dex_swap ( + id SERIAL PRIMARY KEY, + height BIGINT NOT NULL, + value1 Value, + value2 Value +); + +CREATE INDEX ON dex_swap(height, id); +CREATE INDEX ON dex_swap(((value1).asset)); +CREATE INDEX ON dex_swap(((value2).asset)); diff --git a/crates/bin/pindexer/src/dex/mod.rs b/crates/bin/pindexer/src/dex/mod.rs index 03428bd448..f084a25d08 100644 --- a/crates/bin/pindexer/src/dex/mod.rs +++ b/crates/bin/pindexer/src/dex/mod.rs @@ -5,7 +5,7 @@ use cometindex::async_trait; use penumbra_asset::asset::Id as AssetId; use penumbra_dex::lp::position::{Id, Position}; use penumbra_dex::lp::{self, TradingFunction}; -use penumbra_dex::{DirectedTradingPair, SwapExecution}; +use penumbra_dex::{BatchSwapOutputData, DirectedTradingPair, SwapExecution, TradingPair}; use penumbra_num::Amount; use penumbra_proto::{event::ProtoEvent, penumbra::core::component::dex::v1 as pb}; use sqlx::{PgPool, Postgres, Transaction}; @@ -13,6 +13,49 @@ use sqlx::{PgPool, Postgres, Transaction}; use crate::sql::Sql; use crate::{AppView, ContextualizedEvent, PgTransaction}; +/// Insert a swap execution into the database. +/// +/// This returns the start and end indices of its trace. +async fn insert_swap_execution<'d>( + dbtx: &mut Transaction<'d, Postgres>, + execution: Option<&SwapExecution>, +) -> anyhow::Result<(Option, Option)> { + let execution = match execution { + None => return Ok((None, None)), + Some(e) => e, + }; + let mut trace_start = None; + let mut trace_end = None; + for trace in &execution.traces { + let mut step_start = None; + let mut step_end = None; + for step in trace { + let (id,): (i32,) = sqlx::query_as( + r#"INSERT INTO dex_trace_step VALUES (DEFAULT, (CAST($1 AS Amount), $2)) RETURNING id;"#, + ) + .bind(step.amount.to_string()) + .bind(Sql::from(step.asset_id)) + .fetch_one(dbtx.as_mut()) + .await?; + if let None = step_start { + step_start = Some(id); + } + step_end = Some(id); + } + let (id,): (i32,) = + sqlx::query_as(r#"INSERT INTO dex_trace VALUES (DEFAULT, $1, $2) RETURNING id;"#) + .bind(step_start) + .bind(step_end) + .fetch_one(dbtx.as_mut()) + .await?; + if let None = trace_start { + trace_start = Some(id); + } + trace_end = Some(id); + } + Ok((trace_start, trace_end)) +} + /// One of the possible events that we care about. #[derive(Clone, Debug)] enum Event { @@ -33,6 +76,13 @@ enum Event { height: u64, execution: SwapExecution, }, + /// A parsed version of [pb::EventBatchSwap] + BatchSwap { + height: u64, + execution12: Option, + execution21: Option, + output_data: BatchSwapOutputData, + }, /// A parsed version of [pb::EventPositionOpen] PositionOpen { height: u64, position: Position }, /// A parsed version of [pb::EventPositionWithdraw] @@ -55,10 +105,17 @@ enum Event { prev_reserves_2: Amount, context: DirectedTradingPair, }, + /// A parsed version of [pb::EventSwap] + Swap { + height: u64, + trading_pair: TradingPair, + delta_1_i: Amount, + delta_2_i: Amount, + }, } impl Event { - const NAMES: [&'static str; 7] = [ + const NAMES: [&'static str; 9] = [ "penumbra.core.component.dex.v1.EventValueCircuitBreakerCredit", "penumbra.core.component.dex.v1.EventValueCircuitBreakerDebit", "penumbra.core.component.dex.v1.EventArbExecution", @@ -66,6 +123,8 @@ impl Event { "penumbra.core.component.dex.v1.EventPositionOpen", "penumbra.core.component.dex.v1.EventPositionClose", "penumbra.core.component.dex.v1.EventPositionExecution", + "penumbra.core.component.dex.v1.EventBatchSwap", + "penumbra.core.component.dex.v1.EventSwap", ]; /// Index this event, using the handle to the postgres transaction. @@ -116,36 +175,7 @@ impl Event { Ok(()) } Event::ArbExecution { height, execution } => { - let mut trace_start = None; - let mut trace_end = None; - for trace in &execution.traces { - let mut step_start = None; - let mut step_end = None; - for step in trace { - let (id,): (i32,) = sqlx::query_as( - r#"INSERT INTO dex_trace_step VALUES (DEFAULT, (CAST($1 AS Amount), $2)) RETURNING id;"#, - ) - .bind(step.amount.to_string()) - .bind(Sql::from(step.asset_id)) - .fetch_one(dbtx.as_mut()) - .await?; - if let None = step_start { - step_start = Some(id); - } - step_end = Some(id); - } - let (id,): (i32,) = sqlx::query_as( - r#"INSERT INTO dex_trace VALUES (DEFAULT, $1, $2) RETURNING id;"#, - ) - .bind(step_start) - .bind(step_end) - .fetch_one(dbtx.as_mut()) - .await?; - if let None = trace_start { - trace_start = Some(id); - } - trace_end = Some(id); - } + let (trace_start, trace_end) = insert_swap_execution(dbtx, Some(execution)).await?; sqlx::query(r#"INSERT INTO dex_arb VALUES ($1, (CAST($2 AS Amount), $3), (CAST($4 AS Amount), $5), $6, $7);"#) .bind(i64::try_from(*height)?) .bind(execution.input.amount.to_string()) @@ -328,6 +358,50 @@ impl Event { .await?; Ok(()) } + Event::BatchSwap { + height, + execution12, + execution21, + output_data, + } => { + let (trace12_start, trace12_end) = + insert_swap_execution(dbtx, execution12.as_ref()).await?; + let (trace21_start, trace21_end) = + insert_swap_execution(dbtx, execution21.as_ref()).await?; + sqlx::query(r#"INSERT INTO dex_batch_swap VALUES ($1, $2, $3, $4, $5, $6, $7, CAST($8 AS Amount), CAST($9 AS Amount), CAST($10 AS Amount), CAST($11 AS Amount), CAST($12 AS Amount), CAST($13 AS Amount));"#) + .bind(i64::try_from(*height)?) + .bind(trace12_start) + .bind(trace12_end) + .bind(trace21_start) + .bind(trace21_end) + .bind(Sql::from(output_data.trading_pair.asset_1())) + .bind(Sql::from(output_data.trading_pair.asset_2())) + .bind(output_data.unfilled_1.to_string()) + .bind(output_data.unfilled_2.to_string()) + .bind(output_data.delta_1.to_string()) + .bind(output_data.delta_2.to_string()) + .bind(output_data.lambda_1.to_string()) + .bind(output_data.lambda_2.to_string()) + .execute(dbtx.as_mut()) + .await?; + Ok(()) + } + Event::Swap { + height, + trading_pair, + delta_1_i, + delta_2_i, + } => { + sqlx::query(r#"INSERT INTO dex_swap VALUES (DEFAULT, $1, (CAST($2 AS Amount), $3), (CAST($4 AS Amount), $5));"#) + .bind(i64::try_from(*height)?) + .bind(delta_1_i.to_string()) + .bind(Sql::from(trading_pair.asset_1())) + .bind(delta_2_i.to_string()) + .bind(Sql::from(trading_pair.asset_2())) + .execute(dbtx.as_mut()) + .await?; + Ok(()) + } } } } @@ -467,6 +541,52 @@ impl<'a> TryFrom<&'a ContextualizedEvent> for Event { context, }) } + // Batch Swap + x if x == Event::NAMES[7] => { + let pe = pb::EventBatchSwap::from_event(event.as_ref())?; + let height = event.block_height; + let output_data = pe + .batch_swap_output_data + .ok_or(anyhow!("missing swap execution"))? + .try_into()?; + let execution12 = pe + .swap_execution_1_for_2 + .map(|x| x.try_into()) + .transpose()?; + let execution21 = pe + .swap_execution_2_for_1 + .map(|x| x.try_into()) + .transpose()?; + Ok(Self::BatchSwap { + height, + execution12, + execution21, + output_data, + }) + } + // Swap + x if x == Event::NAMES[8] => { + let pe = pb::EventSwap::from_event(event.as_ref())?; + let height = event.block_height; + let trading_pair = pe + .trading_pair + .expect("trading_pair should be present") + .try_into()?; + let delta_1_i = pe + .delta_1_i + .expect("delta_1_i should be present") + .try_into()?; + let delta_2_i = pe + .delta_2_i + .expect("delta_2_i should be present") + .try_into()?; + Ok(Self::Swap { + height, + trading_pair, + delta_1_i, + delta_2_i, + }) + } x => Err(anyhow!(format!("unrecognized event kind: {x}"))), } }