Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pindexer: Implement indexing of arb executions #4754

Merged
merged 5 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/bin/pindexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ num-bigint = { version = "0.4" }
penumbra-shielded-pool = {workspace = true, default-features = false}
penumbra-stake = {workspace = true, default-features = false}
penumbra-app = {workspace = true, default-features = false}
penumbra-dex = {workspace = true, default-features = false}
penumbra-num = {workspace = true, default-features = false}
penumbra-asset = {workspace = true, default-features = false}
penumbra-proto = {workspace = true, default-features = false}
Expand Down
31 changes: 30 additions & 1 deletion crates/bin/pindexer/src/dex/dex.sql
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,41 @@
-- that given an `penumbra_asset::asset::Id`, we always know exactly how to filter
-- tables, rather than needing to do a join with another table.

CREATE DOMAIN IF NOT EXISTS Amount AS NUMERIC(39, 0) NOT NULL;

CREATE TYPE Value AS (
amount Amount,
asset BYTEA NOT NULL
);

-- Keeps track of changes to the dex's value circuit breaker.
CREATE TABLE IF NOT EXISTS dex_value_circuit_breaker_change (
-- The asset being moved into or out of the dex.
asset_id BYTEA NOT NULL,
-- The flow, either positive, or negative, into the dex via this particular asset.
--
-- Because we're dealing with arbitrary assets, we need to use something which can store u128
flow NUMERIC(39, 0) NOT NULL
flow Amount
);

-- One step of an execution trace.
CREATE TABLE IF NOT EXISTS trace_step (
id SERIAL PRIMARY KEY,
value Value,
);

-- A single trace, showing what a small amount of an input asset was exchanged for.
CREATE TABLE IF NOT EXISTS trace (
id SERIAL PRIMARY KEY,
step_start INTEGER REFERENCES trace_step(id),
step_end INTEGER REFERENCES trace_step(id),
);

--- Represents instances where arb executions happened.
CREATE TABLE IF NOT EXISTS arb (
height BIGINT PRIMARY KEY,
input Value,
output Value,
trace_start INTEGER REFERENCES arb_traces(id),
trace_end INTEGER REFERENCES arb_traces(id),
);
72 changes: 66 additions & 6 deletions crates/bin/pindexer/src/dex/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::collections::HashSet;
use anyhow::anyhow;
use cometindex::async_trait;
use penumbra_asset::asset::Id as AssetId;
use penumbra_dex::SwapExecution;
use penumbra_num::Amount;
use penumbra_proto::{event::ProtoEvent, penumbra::core::component::dex::v1 as pb};
use sqlx::{PgPool, Postgres, Transaction};
Expand All @@ -11,7 +12,7 @@ use crate::sql::Sql;
use crate::{AppView, ContextualizedEvent, PgTransaction};

/// One of the possible events that we care about.
#[derive(Clone, Copy, Debug)]
#[derive(Clone, Debug)]
enum Event {
/// A parsed version of [pb::EventValueCircuitBreakerCredit].
CircuitBreakerCredit {
Expand All @@ -25,17 +26,23 @@ enum Event {
previous_balance: Amount,
new_balance: Amount,
},
/// A parsed version of [pb::EventArbExecution]
ArbExecution {
height: u64,
execution: SwapExecution,
},
}

impl Event {
const NAMES: [&'static str; 2] = [
const NAMES: [&'static str; 3] = [
"penumbra.core.component.dex.v1.EventValueCircuitBreakerCredit",
"penumbra.core.component.dex.v1.EventValueCircuitBreakerDebit",
"penumbra.core.component.dex.v1.EventArbExecution",
];

/// Index this event, using the handle to the postgres transaction.
async fn index<'d>(&self, dbtx: &mut Transaction<'d, Postgres>) -> anyhow::Result<()> {
match *self {
match self {
Event::CircuitBreakerCredit {
asset_id,
previous_balance,
Expand All @@ -52,7 +59,7 @@ impl Event {
VALUES ($1, $2);
"#,
)
.bind(Sql::from(asset_id))
.bind(Sql::from(*asset_id))
.bind(Sql::from(amount))
.execute(dbtx.as_mut())
.await?;
Expand All @@ -74,12 +81,55 @@ impl Event {
VALUES ($1, -$2);
"#,
)
.bind(Sql::from(asset_id))
.bind(Sql::from(*asset_id))
.bind(Sql::from(amount))
.execute(dbtx.as_mut())
.await?;
Ok(())
}
Event::ArbExecution { height, execution } => {
let mut trace_start = None;
let mut trace_end = None;
for trace in &execution.traces {
let mut step_start = None;
let mut step_end = None;
for step in trace {
let (id,): (i64,) = sqlx::query_as(
r#"INSERT INTO trace_step VALUES (DEFAULT, ($1, $2)) RETURNING id;"#,
)
.bind(Sql::from(step.amount))
.bind(Sql::from(step.asset_id))
.fetch_one(dbtx.as_mut())
.await?;
if let None = step_start {
step_start = Some(id);
}
step_end = Some(id);
}
let (id,): (i64,) = sqlx::query_as(
r#"INSERT INTO trace VALUES (DEFAULT, $1, $2) RETURNING id;"#,
)
.bind(step_start)
.bind(step_end)
.fetch_one(dbtx.as_mut())
.await?;
if let None = trace_start {
trace_start = Some(id);
}
trace_end = Some(id);
}
sqlx::query(r#"INSERT INTO arb VALUES ($1, ($2, $3), ($4, $5), $6, $7);"#)
.bind(i64::try_from(*height)?)
.bind(Sql::from(execution.input.amount))
.bind(Sql::from(execution.input.asset_id))
.bind(Sql::from(execution.output.amount))
.bind(Sql::from(execution.output.asset_id))
.bind(trace_start)
.bind(trace_end)
.execute(dbtx.as_mut())
.await?;
Ok(())
}
}
}
}
Expand Down Expand Up @@ -123,6 +173,16 @@ impl<'a> TryFrom<&'a ContextualizedEvent> for Event {
new_balance,
})
}
// Arb
x if x == Event::NAMES[2] => {
let pe = pb::EventArbExecution::from_event(event.as_ref())?;
let height = pe.height;
let execution = pe
.swap_execution
.ok_or(anyhow!("missing swap execution"))?
.try_into()?;
Ok(Self::ArbExecution { height, execution })
}
x => Err(anyhow!(format!("unrecognized event kind: {x}"))),
}
}
Expand Down Expand Up @@ -157,7 +217,7 @@ impl AppView for Component {
self.event_strings.contains(type_str)
}

#[tracing::instrument(skip_all, fields(height = event.block_height))]
#[tracing::instrument(skip_all, fields(height = event.block_height, name = event.event.kind.as_str()))]
async fn index_event(
&self,
dbtx: &mut PgTransaction,
Expand Down
Loading