From 477964a8bc0467527c7e98928f28abe69895d55e Mon Sep 17 00:00:00 2001 From: Atris Date: Mon, 29 Jul 2024 17:35:20 +0200 Subject: [PATCH] feat: swaps indexing --- crates/bin/pindexer/src/dex/dex.sql | 61 +++++++++++++++++----------- crates/bin/pindexer/src/dex/mod.rs | 62 ++++++++++++++++++++++++++++- 2 files changed, 99 insertions(+), 24 deletions(-) diff --git a/crates/bin/pindexer/src/dex/dex.sql b/crates/bin/pindexer/src/dex/dex.sql index 894cc3b506..2299ca6351 100644 --- a/crates/bin/pindexer/src/dex/dex.sql +++ b/crates/bin/pindexer/src/dex/dex.sql @@ -14,40 +14,55 @@ DROP TYPE IF EXISTS Value CASCADE; DROP DOMAIN IF EXISTS Amount; CREATE DOMAIN Amount AS NUMERIC(39, 0) NOT NULL; -CREATE TYPE Value AS ( - amount Amount, - asset BYTEA +CREATE TYPE Value AS +( + amount Amount, + asset BYTEA ); -- Keeps track of changes to the dex's value circuit breaker. -CREATE TABLE IF NOT EXISTS dex_value_circuit_breaker_change ( - -- The asset being moved into or out of the dex. - asset_id BYTEA NOT NULL, - -- The flow, either positive, or negative, into the dex via this particular asset. - -- - -- Because we're dealing with arbitrary assets, we need to use something which can store u128 - flow Amount +CREATE TABLE IF NOT EXISTS dex_value_circuit_breaker_change +( + -- The asset being moved into or out of the dex. + asset_id BYTEA NOT NULL, + -- The flow, either positive, or negative, into the dex via this particular asset. + -- + -- Because we're dealing with arbitrary assets, we need to use something which can store u128 + flow Amount ); -- One step of an execution trace. -CREATE TABLE IF NOT EXISTS trace_step ( - id SERIAL PRIMARY KEY, - value Value +CREATE TABLE IF NOT EXISTS trace_step +( + id SERIAL PRIMARY KEY, + value Value ); -- A single trace, showing what a small amount of an input asset was exchanged for. -CREATE TABLE IF NOT EXISTS trace ( - id SERIAL PRIMARY KEY, - step_start INTEGER REFERENCES trace_step(id), - step_end INTEGER REFERENCES trace_step(id) +CREATE TABLE IF NOT EXISTS trace +( + id SERIAL PRIMARY KEY, + step_start INTEGER REFERENCES trace_step (id), + step_end INTEGER REFERENCES trace_step (id) ); --- Represents instances where arb executions happened. -CREATE TABLE IF NOT EXISTS arb ( - height BIGINT PRIMARY KEY, - input Value, - output Value, - trace_start INTEGER REFERENCES trace(id), - trace_end INTEGER REFERENCES trace(id) +CREATE TABLE IF NOT EXISTS arb +( + height BIGINT PRIMARY KEY, + input Value, + output Value, + trace_start INTEGER REFERENCES trace (id), + trace_end INTEGER REFERENCES trace (id) ); + +--- Represents instances where arb executions happened. +CREATE TABLE IF NOT EXISTS swap +( + height BIGINT PRIMARY KEY, + input Value, + output Value, + trace_start INTEGER REFERENCES trace (id), + trace_end INTEGER REFERENCES trace (id) +); \ No newline at end of file diff --git a/crates/bin/pindexer/src/dex/mod.rs b/crates/bin/pindexer/src/dex/mod.rs index 1409f965c0..f15a9e0d2a 100644 --- a/crates/bin/pindexer/src/dex/mod.rs +++ b/crates/bin/pindexer/src/dex/mod.rs @@ -5,6 +5,7 @@ use cometindex::async_trait; use penumbra_asset::asset::Id as AssetId; use penumbra_dex::SwapExecution; use penumbra_num::Amount; +use penumbra_proto::core::component::dex::v1::BatchSwapOutputData; use penumbra_proto::{event::ProtoEvent, penumbra::core::component::dex::v1 as pb}; use sqlx::{PgPool, Postgres, Transaction}; @@ -31,13 +32,19 @@ enum Event { height: u64, execution: SwapExecution, }, + /// A parsed version of [pb::EventBatchSwap] + Swap { + height: u64, + execution: SwapExecution, + }, } impl Event { - const NAMES: [&'static str; 3] = [ + const NAMES: [&'static str; 4] = [ "penumbra.core.component.dex.v1.EventValueCircuitBreakerCredit", "penumbra.core.component.dex.v1.EventValueCircuitBreakerDebit", "penumbra.core.component.dex.v1.EventArbExecution", + "penumbra.core.component.dex.v1.EventBatchSwap", ]; /// Index this event, using the handle to the postgres transaction. @@ -130,6 +137,49 @@ impl Event { .await?; Ok(()) } + Event::Swap { height, execution } => { + let mut trace_start = None; + let mut trace_end = None; + for trace in &execution.traces { + let mut step_start = None; + let mut step_end = None; + for step in trace { + let (id,): (i32,) = sqlx::query_as( + r#"INSERT INTO trace_step VALUES (DEFAULT, (CAST($1 AS Amount), $2)) RETURNING id;"#, + ) + .bind(step.amount.to_string()) + .bind(Sql::from(step.asset_id)) + .fetch_one(dbtx.as_mut()) + .await?; + if let None = step_start { + step_start = Some(id); + } + step_end = Some(id); + } + let (id,): (i32,) = sqlx::query_as( + r#"INSERT INTO trace VALUES (DEFAULT, $1, $2) RETURNING id;"#, + ) + .bind(step_start) + .bind(step_end) + .fetch_one(dbtx.as_mut()) + .await?; + if let None = trace_start { + trace_start = Some(id); + } + trace_end = Some(id); + } + sqlx::query(r#"INSERT INTO swap VALUES ($1, (CAST($2 AS Amount), $3), (CAST($4 AS AMOUNT), $5), $6, $7);"#) + .bind(i64::try_from(*height)?) + .bind(execution.input.amount.to_string()) + .bind(Sql::from(execution.input.asset_id)) + .bind(execution.output.amount.to_string()) + .bind(Sql::from(execution.output.asset_id)) + .bind(trace_start) + .bind(trace_end) + .execute(dbtx.as_mut()) + .await?; + Ok(()) + } } } } @@ -183,6 +233,16 @@ impl<'a> TryFrom<&'a ContextualizedEvent> for Event { .try_into()?; Ok(Self::ArbExecution { height, execution }) } + // Batch Swap + x if x == Event::NAMES[3] => { + let pe = pb::EventBatchSwap::from_event(event.as_ref())?; + let height = event.block_height; + let execution = pe + .swap_execution_1_for_2 + .ok_or(anyhow!("missing swap execution"))? + .try_into()?; + Ok(Self::Swap { height, execution }) + } x => Err(anyhow!(format!("unrecognized event kind: {x}"))), } }