Skip to content

Commit

Permalink
feat: swaps indexing
Browse files Browse the repository at this point in the history
  • Loading branch information
vacekj committed Jul 29, 2024
1 parent 47a1ba1 commit 477964a
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 24 deletions.
61 changes: 38 additions & 23 deletions crates/bin/pindexer/src/dex/dex.sql
Original file line number Diff line number Diff line change
Expand Up @@ -14,40 +14,55 @@ DROP TYPE IF EXISTS Value CASCADE;
DROP DOMAIN IF EXISTS Amount;

CREATE DOMAIN Amount AS NUMERIC(39, 0) NOT NULL;
CREATE TYPE Value AS (
amount Amount,
asset BYTEA
CREATE TYPE Value AS
(
amount Amount,
asset BYTEA
);


-- Keeps track of changes to the dex's value circuit breaker.
CREATE TABLE IF NOT EXISTS dex_value_circuit_breaker_change (
-- The asset being moved into or out of the dex.
asset_id BYTEA NOT NULL,
-- The flow, either positive, or negative, into the dex via this particular asset.
--
-- Because we're dealing with arbitrary assets, we need to use something which can store u128
flow Amount
CREATE TABLE IF NOT EXISTS dex_value_circuit_breaker_change
(
-- The asset being moved into or out of the dex.
asset_id BYTEA NOT NULL,
-- The flow, either positive, or negative, into the dex via this particular asset.
--
-- Because we're dealing with arbitrary assets, we need to use something which can store u128
flow Amount
);

-- One step of an execution trace.
CREATE TABLE IF NOT EXISTS trace_step (
id SERIAL PRIMARY KEY,
value Value
CREATE TABLE IF NOT EXISTS trace_step
(
id SERIAL PRIMARY KEY,
value Value
);

-- A single trace, showing what a small amount of an input asset was exchanged for.
CREATE TABLE IF NOT EXISTS trace (
id SERIAL PRIMARY KEY,
step_start INTEGER REFERENCES trace_step(id),
step_end INTEGER REFERENCES trace_step(id)
CREATE TABLE IF NOT EXISTS trace
(
id SERIAL PRIMARY KEY,
step_start INTEGER REFERENCES trace_step (id),
step_end INTEGER REFERENCES trace_step (id)
);

--- Represents instances where arb executions happened.
CREATE TABLE IF NOT EXISTS arb (
height BIGINT PRIMARY KEY,
input Value,
output Value,
trace_start INTEGER REFERENCES trace(id),
trace_end INTEGER REFERENCES trace(id)
CREATE TABLE IF NOT EXISTS arb
(
height BIGINT PRIMARY KEY,
input Value,
output Value,
trace_start INTEGER REFERENCES trace (id),
trace_end INTEGER REFERENCES trace (id)
);

--- Represents instances where arb executions happened.
CREATE TABLE IF NOT EXISTS swap
(
height BIGINT PRIMARY KEY,
input Value,
output Value,
trace_start INTEGER REFERENCES trace (id),
trace_end INTEGER REFERENCES trace (id)
);
62 changes: 61 additions & 1 deletion crates/bin/pindexer/src/dex/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use cometindex::async_trait;
use penumbra_asset::asset::Id as AssetId;
use penumbra_dex::SwapExecution;
use penumbra_num::Amount;
use penumbra_proto::core::component::dex::v1::BatchSwapOutputData;
use penumbra_proto::{event::ProtoEvent, penumbra::core::component::dex::v1 as pb};
use sqlx::{PgPool, Postgres, Transaction};

Expand All @@ -31,13 +32,19 @@ enum Event {
height: u64,
execution: SwapExecution,
},
/// A parsed version of [pb::EventBatchSwap]
Swap {
height: u64,
execution: SwapExecution,
},
}

impl Event {
const NAMES: [&'static str; 3] = [
const NAMES: [&'static str; 4] = [
"penumbra.core.component.dex.v1.EventValueCircuitBreakerCredit",
"penumbra.core.component.dex.v1.EventValueCircuitBreakerDebit",
"penumbra.core.component.dex.v1.EventArbExecution",
"penumbra.core.component.dex.v1.EventBatchSwap",
];

/// Index this event, using the handle to the postgres transaction.
Expand Down Expand Up @@ -130,6 +137,49 @@ impl Event {
.await?;
Ok(())
}
Event::Swap { height, execution } => {
let mut trace_start = None;
let mut trace_end = None;
for trace in &execution.traces {
let mut step_start = None;
let mut step_end = None;
for step in trace {
let (id,): (i32,) = sqlx::query_as(
r#"INSERT INTO trace_step VALUES (DEFAULT, (CAST($1 AS Amount), $2)) RETURNING id;"#,
)
.bind(step.amount.to_string())
.bind(Sql::from(step.asset_id))
.fetch_one(dbtx.as_mut())
.await?;
if let None = step_start {
step_start = Some(id);
}
step_end = Some(id);
}
let (id,): (i32,) = sqlx::query_as(
r#"INSERT INTO trace VALUES (DEFAULT, $1, $2) RETURNING id;"#,
)
.bind(step_start)
.bind(step_end)
.fetch_one(dbtx.as_mut())
.await?;
if let None = trace_start {
trace_start = Some(id);
}
trace_end = Some(id);
}
sqlx::query(r#"INSERT INTO swap VALUES ($1, (CAST($2 AS Amount), $3), (CAST($4 AS AMOUNT), $5), $6, $7);"#)
.bind(i64::try_from(*height)?)
.bind(execution.input.amount.to_string())
.bind(Sql::from(execution.input.asset_id))
.bind(execution.output.amount.to_string())
.bind(Sql::from(execution.output.asset_id))
.bind(trace_start)
.bind(trace_end)
.execute(dbtx.as_mut())
.await?;
Ok(())
}
}
}
}
Expand Down Expand Up @@ -183,6 +233,16 @@ impl<'a> TryFrom<&'a ContextualizedEvent> for Event {
.try_into()?;
Ok(Self::ArbExecution { height, execution })
}
// Batch Swap
x if x == Event::NAMES[3] => {
let pe = pb::EventBatchSwap::from_event(event.as_ref())?;
let height = event.block_height;
let execution = pe
.swap_execution_1_for_2
.ok_or(anyhow!("missing swap execution"))?
.try_into()?;
Ok(Self::Swap { height, execution })
}
x => Err(anyhow!(format!("unrecognized event kind: {x}"))),
}
}
Expand Down

0 comments on commit 477964a

Please sign in to comment.