Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: swaps indexing #4776

Merged
33 changes: 33 additions & 0 deletions crates/bin/pindexer/src/dex/dex.sql
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,36 @@ CREATE TABLE IF NOT EXISTS dex_lp_execution (
-- The end asset for this execution.
context_end BYTEA NOT NULL
);

--- Represents instances where swap executions happened.
CREATE TABLE IF NOT EXISTS dex_batch_swap (
height BIGINT PRIMARY KEY,
trace12_start INTEGER REFERENCES dex_trace (id),
trace12_end INTEGER REFERENCES dex_trace (id),
trace21_start INTEGER REFERENCES dex_trace (id),
trace21_end INTEGER REFERENCES dex_trace (id),
asset1 BYTEA NOT NULL,
asset2 BYTEA NOT NULL,
unfilled1 Amount NOT NULL,
unfilled2 Amount NOT NULL,
delta1 Amount NOT NULL,
delta2 Amount NOT NULL,
lambda1 Amount NOT NULL,
lambda2 Amount NOT NULL
);

CREATE INDEX ON dex_batch_swap(height);
CREATE INDEX ON dex_batch_swap(asset1, height);
CREATE INDEX ON dex_batch_swap(asset2, height);

-- Represents instances of invididual swaps into the batch.
CREATE TABLE IF NOT EXISTS dex_swap (
id SERIAL PRIMARY KEY,
height BIGINT NOT NULL,
value1 Value,
value2 Value
);

CREATE INDEX ON dex_swap(height, id);
CREATE INDEX ON dex_swap(((value1).asset));
CREATE INDEX ON dex_swap(((value2).asset));
184 changes: 152 additions & 32 deletions crates/bin/pindexer/src/dex/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,57 @@ use cometindex::async_trait;
use penumbra_asset::asset::Id as AssetId;
use penumbra_dex::lp::position::{Id, Position};
use penumbra_dex::lp::{self, TradingFunction};
use penumbra_dex::{DirectedTradingPair, SwapExecution};
use penumbra_dex::{BatchSwapOutputData, DirectedTradingPair, SwapExecution, TradingPair};
use penumbra_num::Amount;
use penumbra_proto::{event::ProtoEvent, penumbra::core::component::dex::v1 as pb};
use sqlx::{PgPool, Postgres, Transaction};

use crate::sql::Sql;
use crate::{AppView, ContextualizedEvent, PgTransaction};

/// Insert a swap execution into the database.
///
/// This returns the start and end indices of its trace.
async fn insert_swap_execution<'d>(
dbtx: &mut Transaction<'d, Postgres>,
execution: Option<&SwapExecution>,
) -> anyhow::Result<(Option<i32>, Option<i32>)> {
let execution = match execution {
None => return Ok((None, None)),
Some(e) => e,
};
let mut trace_start = None;
let mut trace_end = None;
for trace in &execution.traces {
let mut step_start = None;
let mut step_end = None;
for step in trace {
let (id,): (i32,) = sqlx::query_as(
r#"INSERT INTO dex_trace_step VALUES (DEFAULT, (CAST($1 AS Amount), $2)) RETURNING id;"#,
)
.bind(step.amount.to_string())
.bind(Sql::from(step.asset_id))
.fetch_one(dbtx.as_mut())
.await?;
if let None = step_start {
step_start = Some(id);
}
step_end = Some(id);
}
let (id,): (i32,) =
sqlx::query_as(r#"INSERT INTO dex_trace VALUES (DEFAULT, $1, $2) RETURNING id;"#)
.bind(step_start)
.bind(step_end)
.fetch_one(dbtx.as_mut())
.await?;
if let None = trace_start {
trace_start = Some(id);
}
trace_end = Some(id);
}
Ok((trace_start, trace_end))
}

/// One of the possible events that we care about.
#[derive(Clone, Debug)]
enum Event {
Expand All @@ -33,6 +76,13 @@ enum Event {
height: u64,
execution: SwapExecution,
},
/// A parsed version of [pb::EventBatchSwap]
BatchSwap {
height: u64,
execution12: Option<SwapExecution>,
execution21: Option<SwapExecution>,
output_data: BatchSwapOutputData,
},
/// A parsed version of [pb::EventPositionOpen]
PositionOpen { height: u64, position: Position },
/// A parsed version of [pb::EventPositionWithdraw]
Expand All @@ -55,17 +105,26 @@ enum Event {
prev_reserves_2: Amount,
context: DirectedTradingPair,
},
/// A parsed version of [pb::EventSwap]
Swap {
height: u64,
trading_pair: TradingPair,
delta_1_i: Amount,
delta_2_i: Amount,
},
}

impl Event {
const NAMES: [&'static str; 7] = [
const NAMES: [&'static str; 9] = [
"penumbra.core.component.dex.v1.EventValueCircuitBreakerCredit",
"penumbra.core.component.dex.v1.EventValueCircuitBreakerDebit",
"penumbra.core.component.dex.v1.EventArbExecution",
"penumbra.core.component.dex.v1.EventPositionWithdraw",
"penumbra.core.component.dex.v1.EventPositionOpen",
"penumbra.core.component.dex.v1.EventPositionClose",
"penumbra.core.component.dex.v1.EventPositionExecution",
"penumbra.core.component.dex.v1.EventBatchSwap",
"penumbra.core.component.dex.v1.EventSwap",
];

/// Index this event, using the handle to the postgres transaction.
Expand Down Expand Up @@ -116,36 +175,7 @@ impl Event {
Ok(())
}
Event::ArbExecution { height, execution } => {
let mut trace_start = None;
let mut trace_end = None;
for trace in &execution.traces {
let mut step_start = None;
let mut step_end = None;
for step in trace {
let (id,): (i32,) = sqlx::query_as(
r#"INSERT INTO dex_trace_step VALUES (DEFAULT, (CAST($1 AS Amount), $2)) RETURNING id;"#,
)
.bind(step.amount.to_string())
.bind(Sql::from(step.asset_id))
.fetch_one(dbtx.as_mut())
.await?;
if let None = step_start {
step_start = Some(id);
}
step_end = Some(id);
}
let (id,): (i32,) = sqlx::query_as(
r#"INSERT INTO dex_trace VALUES (DEFAULT, $1, $2) RETURNING id;"#,
)
.bind(step_start)
.bind(step_end)
.fetch_one(dbtx.as_mut())
.await?;
if let None = trace_start {
trace_start = Some(id);
}
trace_end = Some(id);
}
let (trace_start, trace_end) = insert_swap_execution(dbtx, Some(execution)).await?;
sqlx::query(r#"INSERT INTO dex_arb VALUES ($1, (CAST($2 AS Amount), $3), (CAST($4 AS Amount), $5), $6, $7);"#)
.bind(i64::try_from(*height)?)
.bind(execution.input.amount.to_string())
Expand Down Expand Up @@ -328,6 +358,50 @@ impl Event {
.await?;
Ok(())
}
Event::BatchSwap {
height,
execution12,
execution21,
output_data,
} => {
let (trace12_start, trace12_end) =
insert_swap_execution(dbtx, execution12.as_ref()).await?;
let (trace21_start, trace21_end) =
insert_swap_execution(dbtx, execution21.as_ref()).await?;
sqlx::query(r#"INSERT INTO dex_batch_swap VALUES ($1, $2, $3, $4, $5, $6, $7, CAST($8 AS Amount), CAST($9 AS Amount), CAST($10 AS Amount), CAST($11 AS Amount), CAST($12 AS Amount), CAST($13 AS Amount));"#)
.bind(i64::try_from(*height)?)
.bind(trace12_start)
.bind(trace12_end)
.bind(trace21_start)
.bind(trace21_end)
.bind(Sql::from(output_data.trading_pair.asset_1()))
.bind(Sql::from(output_data.trading_pair.asset_2()))
.bind(output_data.unfilled_1.to_string())
.bind(output_data.unfilled_2.to_string())
.bind(output_data.delta_1.to_string())
.bind(output_data.delta_2.to_string())
.bind(output_data.lambda_1.to_string())
.bind(output_data.lambda_2.to_string())
.execute(dbtx.as_mut())
.await?;
Ok(())
}
Event::Swap {
height,
trading_pair,
delta_1_i,
delta_2_i,
} => {
sqlx::query(r#"INSERT INTO dex_swap VALUES (DEFAULT, $1, (CAST($2 AS Amount), $3), (CAST($4 AS Amount), $5));"#)
.bind(i64::try_from(*height)?)
.bind(delta_1_i.to_string())
.bind(Sql::from(trading_pair.asset_1()))
.bind(delta_2_i.to_string())
.bind(Sql::from(trading_pair.asset_2()))
.execute(dbtx.as_mut())
.await?;
Ok(())
}
}
}
}
Expand Down Expand Up @@ -467,6 +541,52 @@ impl<'a> TryFrom<&'a ContextualizedEvent> for Event {
context,
})
}
// Batch Swap
x if x == Event::NAMES[7] => {
let pe = pb::EventBatchSwap::from_event(event.as_ref())?;
let height = event.block_height;
let output_data = pe
.batch_swap_output_data
.ok_or(anyhow!("missing swap execution"))?
.try_into()?;
let execution12 = pe
.swap_execution_1_for_2
.map(|x| x.try_into())
.transpose()?;
let execution21 = pe
.swap_execution_2_for_1
.map(|x| x.try_into())
.transpose()?;
Ok(Self::BatchSwap {
height,
execution12,
execution21,
output_data,
})
}
// Swap
x if x == Event::NAMES[8] => {
let pe = pb::EventSwap::from_event(event.as_ref())?;
let height = event.block_height;
let trading_pair = pe
.trading_pair
.expect("trading_pair should be present")
.try_into()?;
let delta_1_i = pe
.delta_1_i
.expect("delta_1_i should be present")
.try_into()?;
let delta_2_i = pe
.delta_2_i
.expect("delta_2_i should be present")
.try_into()?;
Ok(Self::Swap {
height,
trading_pair,
delta_1_i,
delta_2_i,
})
}
x => Err(anyhow!(format!("unrecognized event kind: {x}"))),
}
}
Expand Down
Loading