From be0ef35be99f9353badf13bd211bff37b0944346 Mon Sep 17 00:00:00 2001 From: Josef Date: Mon, 12 Aug 2024 17:42:59 +0200 Subject: [PATCH] feat: swaps indexing (#4776) ## Describe your changes implements indexing of swaps to satisfy the `/api/swaps` endpoint - [x] There's also an event for Swap, so we should name the event in the code to BatchSwap to avoid confusion. - [x] We probably want to record some more information in the swap table, like having the trading pair there, the total amount of inputs and outputs for each asset, the unfilled amounts, et.c ## Issue ticket number and link fixes #4744 ## Checklist before requesting a review - [x] If this code contains consensus-breaking changes, I have added the "consensus-breaking" label. Otherwise, I declare my belief that there are not consensus-breaking changes, for the following reason: Indexer-only changes --------- Co-authored-by: Lucas Meier --- crates/bin/pindexer/src/dex/dex.sql | 33 +++++ crates/bin/pindexer/src/dex/mod.rs | 184 +++++++++++++++++++++++----- 2 files changed, 185 insertions(+), 32 deletions(-) diff --git a/crates/bin/pindexer/src/dex/dex.sql b/crates/bin/pindexer/src/dex/dex.sql index 8fbc2f764c..720dbce968 100644 --- a/crates/bin/pindexer/src/dex/dex.sql +++ b/crates/bin/pindexer/src/dex/dex.sql @@ -113,3 +113,36 @@ CREATE TABLE IF NOT EXISTS dex_lp_execution ( -- The end asset for this execution. context_end BYTEA NOT NULL ); + +--- Represents instances where swap executions happened. +CREATE TABLE IF NOT EXISTS dex_batch_swap ( + height BIGINT PRIMARY KEY, + trace12_start INTEGER REFERENCES dex_trace (id), + trace12_end INTEGER REFERENCES dex_trace (id), + trace21_start INTEGER REFERENCES dex_trace (id), + trace21_end INTEGER REFERENCES dex_trace (id), + asset1 BYTEA NOT NULL, + asset2 BYTEA NOT NULL, + unfilled1 Amount NOT NULL, + unfilled2 Amount NOT NULL, + delta1 Amount NOT NULL, + delta2 Amount NOT NULL, + lambda1 Amount NOT NULL, + lambda2 Amount NOT NULL +); + +CREATE INDEX ON dex_batch_swap(height); +CREATE INDEX ON dex_batch_swap(asset1, height); +CREATE INDEX ON dex_batch_swap(asset2, height); + +-- Represents instances of invididual swaps into the batch. +CREATE TABLE IF NOT EXISTS dex_swap ( + id SERIAL PRIMARY KEY, + height BIGINT NOT NULL, + value1 Value, + value2 Value +); + +CREATE INDEX ON dex_swap(height, id); +CREATE INDEX ON dex_swap(((value1).asset)); +CREATE INDEX ON dex_swap(((value2).asset)); diff --git a/crates/bin/pindexer/src/dex/mod.rs b/crates/bin/pindexer/src/dex/mod.rs index 03428bd448..f084a25d08 100644 --- a/crates/bin/pindexer/src/dex/mod.rs +++ b/crates/bin/pindexer/src/dex/mod.rs @@ -5,7 +5,7 @@ use cometindex::async_trait; use penumbra_asset::asset::Id as AssetId; use penumbra_dex::lp::position::{Id, Position}; use penumbra_dex::lp::{self, TradingFunction}; -use penumbra_dex::{DirectedTradingPair, SwapExecution}; +use penumbra_dex::{BatchSwapOutputData, DirectedTradingPair, SwapExecution, TradingPair}; use penumbra_num::Amount; use penumbra_proto::{event::ProtoEvent, penumbra::core::component::dex::v1 as pb}; use sqlx::{PgPool, Postgres, Transaction}; @@ -13,6 +13,49 @@ use sqlx::{PgPool, Postgres, Transaction}; use crate::sql::Sql; use crate::{AppView, ContextualizedEvent, PgTransaction}; +/// Insert a swap execution into the database. +/// +/// This returns the start and end indices of its trace. +async fn insert_swap_execution<'d>( + dbtx: &mut Transaction<'d, Postgres>, + execution: Option<&SwapExecution>, +) -> anyhow::Result<(Option, Option)> { + let execution = match execution { + None => return Ok((None, None)), + Some(e) => e, + }; + let mut trace_start = None; + let mut trace_end = None; + for trace in &execution.traces { + let mut step_start = None; + let mut step_end = None; + for step in trace { + let (id,): (i32,) = sqlx::query_as( + r#"INSERT INTO dex_trace_step VALUES (DEFAULT, (CAST($1 AS Amount), $2)) RETURNING id;"#, + ) + .bind(step.amount.to_string()) + .bind(Sql::from(step.asset_id)) + .fetch_one(dbtx.as_mut()) + .await?; + if let None = step_start { + step_start = Some(id); + } + step_end = Some(id); + } + let (id,): (i32,) = + sqlx::query_as(r#"INSERT INTO dex_trace VALUES (DEFAULT, $1, $2) RETURNING id;"#) + .bind(step_start) + .bind(step_end) + .fetch_one(dbtx.as_mut()) + .await?; + if let None = trace_start { + trace_start = Some(id); + } + trace_end = Some(id); + } + Ok((trace_start, trace_end)) +} + /// One of the possible events that we care about. #[derive(Clone, Debug)] enum Event { @@ -33,6 +76,13 @@ enum Event { height: u64, execution: SwapExecution, }, + /// A parsed version of [pb::EventBatchSwap] + BatchSwap { + height: u64, + execution12: Option, + execution21: Option, + output_data: BatchSwapOutputData, + }, /// A parsed version of [pb::EventPositionOpen] PositionOpen { height: u64, position: Position }, /// A parsed version of [pb::EventPositionWithdraw] @@ -55,10 +105,17 @@ enum Event { prev_reserves_2: Amount, context: DirectedTradingPair, }, + /// A parsed version of [pb::EventSwap] + Swap { + height: u64, + trading_pair: TradingPair, + delta_1_i: Amount, + delta_2_i: Amount, + }, } impl Event { - const NAMES: [&'static str; 7] = [ + const NAMES: [&'static str; 9] = [ "penumbra.core.component.dex.v1.EventValueCircuitBreakerCredit", "penumbra.core.component.dex.v1.EventValueCircuitBreakerDebit", "penumbra.core.component.dex.v1.EventArbExecution", @@ -66,6 +123,8 @@ impl Event { "penumbra.core.component.dex.v1.EventPositionOpen", "penumbra.core.component.dex.v1.EventPositionClose", "penumbra.core.component.dex.v1.EventPositionExecution", + "penumbra.core.component.dex.v1.EventBatchSwap", + "penumbra.core.component.dex.v1.EventSwap", ]; /// Index this event, using the handle to the postgres transaction. @@ -116,36 +175,7 @@ impl Event { Ok(()) } Event::ArbExecution { height, execution } => { - let mut trace_start = None; - let mut trace_end = None; - for trace in &execution.traces { - let mut step_start = None; - let mut step_end = None; - for step in trace { - let (id,): (i32,) = sqlx::query_as( - r#"INSERT INTO dex_trace_step VALUES (DEFAULT, (CAST($1 AS Amount), $2)) RETURNING id;"#, - ) - .bind(step.amount.to_string()) - .bind(Sql::from(step.asset_id)) - .fetch_one(dbtx.as_mut()) - .await?; - if let None = step_start { - step_start = Some(id); - } - step_end = Some(id); - } - let (id,): (i32,) = sqlx::query_as( - r#"INSERT INTO dex_trace VALUES (DEFAULT, $1, $2) RETURNING id;"#, - ) - .bind(step_start) - .bind(step_end) - .fetch_one(dbtx.as_mut()) - .await?; - if let None = trace_start { - trace_start = Some(id); - } - trace_end = Some(id); - } + let (trace_start, trace_end) = insert_swap_execution(dbtx, Some(execution)).await?; sqlx::query(r#"INSERT INTO dex_arb VALUES ($1, (CAST($2 AS Amount), $3), (CAST($4 AS Amount), $5), $6, $7);"#) .bind(i64::try_from(*height)?) .bind(execution.input.amount.to_string()) @@ -328,6 +358,50 @@ impl Event { .await?; Ok(()) } + Event::BatchSwap { + height, + execution12, + execution21, + output_data, + } => { + let (trace12_start, trace12_end) = + insert_swap_execution(dbtx, execution12.as_ref()).await?; + let (trace21_start, trace21_end) = + insert_swap_execution(dbtx, execution21.as_ref()).await?; + sqlx::query(r#"INSERT INTO dex_batch_swap VALUES ($1, $2, $3, $4, $5, $6, $7, CAST($8 AS Amount), CAST($9 AS Amount), CAST($10 AS Amount), CAST($11 AS Amount), CAST($12 AS Amount), CAST($13 AS Amount));"#) + .bind(i64::try_from(*height)?) + .bind(trace12_start) + .bind(trace12_end) + .bind(trace21_start) + .bind(trace21_end) + .bind(Sql::from(output_data.trading_pair.asset_1())) + .bind(Sql::from(output_data.trading_pair.asset_2())) + .bind(output_data.unfilled_1.to_string()) + .bind(output_data.unfilled_2.to_string()) + .bind(output_data.delta_1.to_string()) + .bind(output_data.delta_2.to_string()) + .bind(output_data.lambda_1.to_string()) + .bind(output_data.lambda_2.to_string()) + .execute(dbtx.as_mut()) + .await?; + Ok(()) + } + Event::Swap { + height, + trading_pair, + delta_1_i, + delta_2_i, + } => { + sqlx::query(r#"INSERT INTO dex_swap VALUES (DEFAULT, $1, (CAST($2 AS Amount), $3), (CAST($4 AS Amount), $5));"#) + .bind(i64::try_from(*height)?) + .bind(delta_1_i.to_string()) + .bind(Sql::from(trading_pair.asset_1())) + .bind(delta_2_i.to_string()) + .bind(Sql::from(trading_pair.asset_2())) + .execute(dbtx.as_mut()) + .await?; + Ok(()) + } } } } @@ -467,6 +541,52 @@ impl<'a> TryFrom<&'a ContextualizedEvent> for Event { context, }) } + // Batch Swap + x if x == Event::NAMES[7] => { + let pe = pb::EventBatchSwap::from_event(event.as_ref())?; + let height = event.block_height; + let output_data = pe + .batch_swap_output_data + .ok_or(anyhow!("missing swap execution"))? + .try_into()?; + let execution12 = pe + .swap_execution_1_for_2 + .map(|x| x.try_into()) + .transpose()?; + let execution21 = pe + .swap_execution_2_for_1 + .map(|x| x.try_into()) + .transpose()?; + Ok(Self::BatchSwap { + height, + execution12, + execution21, + output_data, + }) + } + // Swap + x if x == Event::NAMES[8] => { + let pe = pb::EventSwap::from_event(event.as_ref())?; + let height = event.block_height; + let trading_pair = pe + .trading_pair + .expect("trading_pair should be present") + .try_into()?; + let delta_1_i = pe + .delta_1_i + .expect("delta_1_i should be present") + .try_into()?; + let delta_2_i = pe + .delta_2_i + .expect("delta_2_i should be present") + .try_into()?; + Ok(Self::Swap { + height, + trading_pair, + delta_1_i, + delta_2_i, + }) + } x => Err(anyhow!(format!("unrecognized event kind: {x}"))), } }