Skip to content

Commit

Permalink
pindexer: Implement indexing of arb executions (#4754)
Browse files Browse the repository at this point in the history
Merge #4753 first.

## Describe your changes

This implements indexing for arb executions in pindexer.

## Issue ticket number and link

Closes #4736.

## Checklist before requesting a review

- [x] If this code contains consensus-breaking changes, I have added the
"consensus-breaking" label. Otherwise, I declare my belief that there
are not consensus-breaking changes, for the following reason:

  > indexing changes only

---------

Co-authored-by: vacekj <[email protected]>
Co-authored-by: Atris <[email protected]>
  • Loading branch information
3 people authored Jul 24, 2024
1 parent fbdd570 commit 53b8338
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 7 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/bin/pindexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ num-bigint = { version = "0.4" }
penumbra-shielded-pool = {workspace = true, default-features = false}
penumbra-stake = {workspace = true, default-features = false}
penumbra-app = {workspace = true, default-features = false}
penumbra-dex = {workspace = true, default-features = false}
penumbra-num = {workspace = true, default-features = false}
penumbra-asset = {workspace = true, default-features = false}
penumbra-proto = {workspace = true, default-features = false}
Expand Down
31 changes: 30 additions & 1 deletion crates/bin/pindexer/src/dex/dex.sql
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,41 @@
-- that given an `penumbra_asset::asset::Id`, we always know exactly how to filter
-- tables, rather than needing to do a join with another table.

CREATE DOMAIN IF NOT EXISTS Amount AS NUMERIC(39, 0) NOT NULL;

CREATE TYPE Value AS (
amount Amount,
asset BYTEA NOT NULL
);

-- Keeps track of changes to the dex's value circuit breaker.
CREATE TABLE IF NOT EXISTS dex_value_circuit_breaker_change (
-- The asset being moved into or out of the dex.
asset_id BYTEA NOT NULL,
-- The flow, either positive, or negative, into the dex via this particular asset.
--
-- Because we're dealing with arbitrary assets, we need to use something which can store u128
flow NUMERIC(39, 0) NOT NULL
flow Amount
);

-- One step of an execution trace.
CREATE TABLE IF NOT EXISTS trace_step (
id SERIAL PRIMARY KEY,
value Value,
);

-- A single trace, showing what a small amount of an input asset was exchanged for.
CREATE TABLE IF NOT EXISTS trace (
id SERIAL PRIMARY KEY,
step_start INTEGER REFERENCES trace_step(id),
step_end INTEGER REFERENCES trace_step(id),
);

--- Represents instances where arb executions happened.
CREATE TABLE IF NOT EXISTS arb (
height BIGINT PRIMARY KEY,
input Value,
output Value,
trace_start INTEGER REFERENCES arb_traces(id),
trace_end INTEGER REFERENCES arb_traces(id),
);
72 changes: 66 additions & 6 deletions crates/bin/pindexer/src/dex/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::collections::HashSet;
use anyhow::anyhow;
use cometindex::async_trait;
use penumbra_asset::asset::Id as AssetId;
use penumbra_dex::SwapExecution;
use penumbra_num::Amount;
use penumbra_proto::{event::ProtoEvent, penumbra::core::component::dex::v1 as pb};
use sqlx::{PgPool, Postgres, Transaction};
Expand All @@ -11,7 +12,7 @@ use crate::sql::Sql;
use crate::{AppView, ContextualizedEvent, PgTransaction};

/// One of the possible events that we care about.
#[derive(Clone, Copy, Debug)]
#[derive(Clone, Debug)]
enum Event {
/// A parsed version of [pb::EventValueCircuitBreakerCredit].
CircuitBreakerCredit {
Expand All @@ -25,17 +26,23 @@ enum Event {
previous_balance: Amount,
new_balance: Amount,
},
/// A parsed version of [pb::EventArbExecution]
ArbExecution {
height: u64,
execution: SwapExecution,
},
}

impl Event {
const NAMES: [&'static str; 2] = [
const NAMES: [&'static str; 3] = [
"penumbra.core.component.dex.v1.EventValueCircuitBreakerCredit",
"penumbra.core.component.dex.v1.EventValueCircuitBreakerDebit",
"penumbra.core.component.dex.v1.EventArbExecution",
];

/// Index this event, using the handle to the postgres transaction.
async fn index<'d>(&self, dbtx: &mut Transaction<'d, Postgres>) -> anyhow::Result<()> {
match *self {
match self {
Event::CircuitBreakerCredit {
asset_id,
previous_balance,
Expand All @@ -52,7 +59,7 @@ impl Event {
VALUES ($1, $2);
"#,
)
.bind(Sql::from(asset_id))
.bind(Sql::from(*asset_id))
.bind(Sql::from(amount))
.execute(dbtx.as_mut())
.await?;
Expand All @@ -74,12 +81,55 @@ impl Event {
VALUES ($1, -$2);
"#,
)
.bind(Sql::from(asset_id))
.bind(Sql::from(*asset_id))
.bind(Sql::from(amount))
.execute(dbtx.as_mut())
.await?;
Ok(())
}
Event::ArbExecution { height, execution } => {
let mut trace_start = None;
let mut trace_end = None;
for trace in &execution.traces {
let mut step_start = None;
let mut step_end = None;
for step in trace {
let (id,): (i64,) = sqlx::query_as(
r#"INSERT INTO trace_step VALUES (DEFAULT, ($1, $2)) RETURNING id;"#,
)
.bind(Sql::from(step.amount))
.bind(Sql::from(step.asset_id))
.fetch_one(dbtx.as_mut())
.await?;
if let None = step_start {
step_start = Some(id);
}
step_end = Some(id);
}
let (id,): (i64,) = sqlx::query_as(
r#"INSERT INTO trace VALUES (DEFAULT, $1, $2) RETURNING id;"#,
)
.bind(step_start)
.bind(step_end)
.fetch_one(dbtx.as_mut())
.await?;
if let None = trace_start {
trace_start = Some(id);
}
trace_end = Some(id);
}
sqlx::query(r#"INSERT INTO arb VALUES ($1, ($2, $3), ($4, $5), $6, $7);"#)
.bind(i64::try_from(*height)?)
.bind(Sql::from(execution.input.amount))
.bind(Sql::from(execution.input.asset_id))
.bind(Sql::from(execution.output.amount))
.bind(Sql::from(execution.output.asset_id))
.bind(trace_start)
.bind(trace_end)
.execute(dbtx.as_mut())
.await?;
Ok(())
}
}
}
}
Expand Down Expand Up @@ -123,6 +173,16 @@ impl<'a> TryFrom<&'a ContextualizedEvent> for Event {
new_balance,
})
}
// Arb
x if x == Event::NAMES[2] => {
let pe = pb::EventArbExecution::from_event(event.as_ref())?;
let height = pe.height;
let execution = pe
.swap_execution
.ok_or(anyhow!("missing swap execution"))?
.try_into()?;
Ok(Self::ArbExecution { height, execution })
}
x => Err(anyhow!(format!("unrecognized event kind: {x}"))),
}
}
Expand Down Expand Up @@ -157,7 +217,7 @@ impl AppView for Component {
self.event_strings.contains(type_str)
}

#[tracing::instrument(skip_all, fields(height = event.block_height))]
#[tracing::instrument(skip_all, fields(height = event.block_height, name = event.event.kind.as_str()))]
async fn index_event(
&self,
dbtx: &mut PgTransaction,
Expand Down

0 comments on commit 53b8338

Please sign in to comment.