From 18bf50a0f3a29aea090e1bb2e13d4a67250068c1 Mon Sep 17 00:00:00 2001 From: Erwan Or Date: Thu, 30 May 2024 23:00:59 -0400 Subject: [PATCH 01/10] dex: improve component layering / interface --- .../compact-block/src/component/manager.rs | 2 +- .../dex/src/component/action_handler/swap.rs | 2 +- .../core/component/dex/src/component/arb.rs | 7 +- .../component/dex/src/component/chandelier.rs | 2 +- .../src/component/circuit_breaker/value.rs | 2 +- .../core/component/dex/src/component/dex.rs | 135 ++++++++---------- .../core/component/dex/src/component/mod.rs | 10 +- .../dex/src/component/position_manager.rs | 34 +---- .../src/component/position_manager/counter.rs | 2 +- .../src/component/router/route_and_fill.rs | 2 +- .../dex/src/component/router/tests.rs | 2 + .../dex/src/component/swap_manager.rs | 90 ++++++++++-- .../core/component/dex/src/component/tests.rs | 1 + 13 files changed, 159 insertions(+), 132 deletions(-) diff --git a/crates/core/component/compact-block/src/component/manager.rs b/crates/core/component/compact-block/src/component/manager.rs index 8824a0c16a..2d38177ebe 100644 --- a/crates/core/component/compact-block/src/component/manager.rs +++ b/crates/core/component/compact-block/src/component/manager.rs @@ -2,7 +2,7 @@ use anyhow::{Context, Result}; use async_trait::async_trait; use cnidarium::StateWrite; #[cfg(feature = "component")] -use penumbra_dex::component::{StateReadExt, SwapManager as _}; +use penumbra_dex::component::SwapDataRead; use penumbra_fee::component::StateReadExt as _; use penumbra_governance::StateReadExt as _; use penumbra_proto::DomainType; diff --git a/crates/core/component/dex/src/component/action_handler/swap.rs b/crates/core/component/dex/src/component/action_handler/swap.rs index 8eb5e7a388..63fd35ca7c 100644 --- a/crates/core/component/dex/src/component/action_handler/swap.rs +++ b/crates/core/component/dex/src/component/action_handler/swap.rs @@ -9,7 +9,7 @@ use penumbra_proto::StateWriteProto; use penumbra_sct::component::source::SourceContext; use crate::{ - component::{position_manager::PositionManager as _, StateReadExt, StateWriteExt, SwapManager}, + component::{InternalDexWrite, StateReadExt, SwapDataRead, SwapDataWrite, SwapManager}, event, swap::{proof::SwapProofPublic, Swap}, }; diff --git a/crates/core/component/dex/src/component/arb.rs b/crates/core/component/dex/src/component/arb.rs index c67928f5a5..0aa06a26aa 100644 --- a/crates/core/component/dex/src/component/arb.rs +++ b/crates/core/component/dex/src/component/arb.rs @@ -10,14 +10,11 @@ use penumbra_sct::component::clock::EpochRead; use tracing::instrument; use crate::{ - component::{ExecutionCircuitBreaker, ValueCircuitBreaker}, + component::{ExecutionCircuitBreaker, InternalDexWrite, ValueCircuitBreaker}, event, SwapExecution, }; -use super::{ - router::{RouteAndFill, RoutingParams}, - StateWriteExt, -}; +use super::router::{RouteAndFill, RoutingParams}; #[async_trait] pub trait Arbitrage: StateWrite + Sized { diff --git a/crates/core/component/dex/src/component/chandelier.rs b/crates/core/component/dex/src/component/chandelier.rs index 966665c349..f8a819c650 100644 --- a/crates/core/component/dex/src/component/chandelier.rs +++ b/crates/core/component/dex/src/component/chandelier.rs @@ -271,7 +271,7 @@ mod tests { use crate::{ component::{ router::create_buy, tests::TempStorageExt as _, Dex, PositionManager as _, - StateReadExt as _, StateWriteExt as _, + SwapDataRead, SwapDataWrite, }, DirectedUnitPair, }; diff --git a/crates/core/component/dex/src/component/circuit_breaker/value.rs b/crates/core/component/dex/src/component/circuit_breaker/value.rs index 46db88523e..5b75517ab0 100644 --- a/crates/core/component/dex/src/component/circuit_breaker/value.rs +++ b/crates/core/component/dex/src/component/circuit_breaker/value.rs @@ -64,7 +64,7 @@ mod tests { use crate::component::position_manager::price_index::PositionByPriceIndex; use crate::component::router::HandleBatchSwaps as _; - use crate::component::{StateReadExt as _, StateWriteExt as _}; + use crate::component::{InternalDexWrite, StateReadExt as _, SwapDataRead, SwapDataWrite}; use crate::lp::plan::PositionWithdrawPlan; use crate::{ component::{router::create_buy, tests::TempStorageExt}, diff --git a/crates/core/component/dex/src/component/dex.rs b/crates/core/component/dex/src/component/dex.rs index 51cf8df30f..c695bfb4f9 100644 --- a/crates/core/component/dex/src/component/dex.rs +++ b/crates/core/component/dex/src/component/dex.rs @@ -1,17 +1,18 @@ -use std::{collections::BTreeMap, sync::Arc}; +use std::sync::Arc; use anyhow::Result; use async_trait::async_trait; use cnidarium::{StateRead, StateWrite}; use cnidarium_component::Component; +use penumbra_asset::asset; use penumbra_asset::{Value, STAKING_TOKEN_ASSET_ID}; use penumbra_proto::{StateReadProto, StateWriteProto}; use tendermint::v0_37::abci; use tracing::instrument; use crate::{ - component::flow::SwapFlow, event, genesis, state_key, BatchSwapOutputData, DexParameters, - DirectedTradingPair, SwapExecution, TradingPair, + component::SwapDataRead, component::SwapDataWrite, event, genesis, state_key, + BatchSwapOutputData, DexParameters, DirectedTradingPair, SwapExecution, TradingPair, }; use super::{ @@ -54,7 +55,6 @@ impl Component for Dex { // This has already happened in the action handlers for each `PositionOpen` action. // 2. For each batch swap during the block, calculate clearing prices and set in the JMT. - let routing_params = state.routing_params().await.expect("dex params are set"); for (trading_pair, swap_flows) in state.swap_flows() { @@ -135,9 +135,26 @@ impl Component for Dex { } } -/// Extension trait providing read access to dex data. +/// Provides public read access to DEX data. #[async_trait] pub trait StateReadExt: StateRead { + /// Gets the DEX parameters from the state. + async fn get_dex_params(&self) -> Result { + self.get(state_key::config::dex_params()) + .await? + .ok_or_else(|| anyhow::anyhow!("Missing DexParameters")) + } + + /// Uses the DEX parameters to construct a `RoutingParams` for use in execution or simulation. + async fn routing_params(&self) -> Result { + let dex_params = self.get_dex_params().await?; + Ok(RoutingParams { + max_hops: dex_params.max_hops as usize, + fixed_candidates: Arc::new(dex_params.fixed_candidates), + price_limit: None, + }) + } + async fn output_data( &self, height: u64, @@ -159,48 +176,50 @@ pub trait StateReadExt: StateRead { async fn arb_execution(&self, height: u64) -> Result> { self.get(&state_key::arb_execution(height)).await } - - /// Get the swap flow for the given trading pair accumulated in this block so far. - fn swap_flow(&self, pair: &TradingPair) -> SwapFlow { - self.swap_flows().get(pair).cloned().unwrap_or_default() - } - - fn swap_flows(&self) -> BTreeMap { - self.object_get::>(state_key::swap_flows()) - .unwrap_or_default() - } - - fn pending_batch_swap_outputs(&self) -> im::OrdMap { - self.object_get(state_key::pending_outputs()) - .unwrap_or_default() - } - - /// Gets the DEX parameters from the state. - async fn get_dex_params(&self) -> Result { - self.get(state_key::config::dex_params()) - .await? - .ok_or_else(|| anyhow::anyhow!("Missing DexParameters")) - } - - /// Uses the DEX parameters to construct a `RoutingParams` for use in execution or simulation. - async fn routing_params(&self) -> Result { - let dex_params = self.get_dex_params().await?; - Ok(RoutingParams { - max_hops: dex_params.max_hops as usize, - fixed_candidates: Arc::new(dex_params.fixed_candidates), - price_limit: None, - }) - } } impl StateReadExt for T {} /// Extension trait providing write access to dex data. #[async_trait] -pub trait StateWriteExt: StateWrite + StateReadExt { +pub trait StateWriteExt: StateWrite { fn put_dex_params(&mut self, params: DexParameters) { self.put(state_key::config::dex_params().to_string(), params); } +} + +impl StateWriteExt for T {} + +/// The maximum number of "hot" asset identifiers to track for this block. +const RECENTLY_ACCESSED_ASSET_LIMIT: usize = 10; +pub(crate) trait InternalDexWrite: StateWrite { + /// Adds an asset ID to the list of recently accessed assets, + /// making it a candidate for the current block's arbitrage routing. + /// + /// This ensures that assets associated with recently active positions + /// will be eligible for arbitrage if mispriced positions are opened. + #[tracing::instrument(level = "debug", skip_all)] + fn add_recently_accessed_asset( + &mut self, + asset_id: asset::Id, + fixed_candidates: Arc>, + ) { + let mut assets = self.recently_accessed_assets(); + + // Limit the number of recently accessed assets to prevent blowing + // up routing time. + if assets.len() >= RECENTLY_ACCESSED_ASSET_LIMIT { + return; + } + + // If the asset is already in the fixed candidate list, don't insert it. + if fixed_candidates.contains(&asset_id) { + return; + } + + assets.insert(asset_id); + self.object_put(state_key::recently_accessed_assets(), assets); + } async fn set_output_data( &mut self, @@ -258,44 +277,6 @@ pub trait StateWriteExt: StateWrite + StateReadExt { fn set_arb_execution(&mut self, height: u64, execution: SwapExecution) { self.put(state_key::arb_execution(height), execution); } - - async fn put_swap_flow( - &mut self, - trading_pair: &TradingPair, - swap_flow: SwapFlow, - ) -> Result<()> { - // Credit the DEX for the swap inflows. - // - // Note that we credit the DEX for _all_ inflows, since we don't know - // how much will eventually be filled. - self.dex_vcb_credit(Value { - amount: swap_flow.0, - asset_id: trading_pair.asset_1, - }) - .await?; - self.dex_vcb_credit(Value { - amount: swap_flow.1, - asset_id: trading_pair.asset_2, - }) - .await?; - - // TODO: replace with IM struct later - let mut swap_flows = self.swap_flows(); - swap_flows.insert(*trading_pair, swap_flow); - self.object_put(state_key::swap_flows(), swap_flows); - - Ok(()) - } - - fn put_swap_execution_at_height( - &mut self, - height: u64, - pair: DirectedTradingPair, - swap_execution: SwapExecution, - ) { - let path = state_key::swap_execution(height, pair); - self.nonverifiable_put(path.as_bytes().to_vec(), swap_execution); - } } -impl StateWriteExt for T {} +impl InternalDexWrite for T {} diff --git a/crates/core/component/dex/src/component/mod.rs b/crates/core/component/dex/src/component/mod.rs index 888df993d6..a7936a021b 100644 --- a/crates/core/component/dex/src/component/mod.rs +++ b/crates/core/component/dex/src/component/mod.rs @@ -19,8 +19,14 @@ pub use self::metrics::register_metrics; pub(crate) use arb::Arbitrage; pub(crate) use circuit_breaker::ExecutionCircuitBreaker; pub(crate) use circuit_breaker::ValueCircuitBreaker; +pub(crate) use dex::InternalDexWrite; pub use dex::{Dex, StateReadExt, StateWriteExt}; -pub use position_manager::{PositionManager, PositionRead}; -pub use swap_manager::SwapManager; +pub use position_manager::PositionManager; +pub(crate) use swap_manager::SwapDataWrite; +pub(crate) use swap_manager::SwapManager; + +// Read data from the Dex component; +pub use position_manager::PositionRead; +pub use swap_manager::SwapDataRead; #[cfg(test)] pub(crate) mod tests; diff --git a/crates/core/component/dex/src/component/position_manager.rs b/crates/core/component/dex/src/component/position_manager.rs index c40da0a79e..7d3de73a4d 100644 --- a/crates/core/component/dex/src/component/position_manager.rs +++ b/crates/core/component/dex/src/component/position_manager.rs @@ -14,6 +14,7 @@ use tap::Tap; use tracing::instrument; use crate::component::{ + dex::InternalDexWrite, dex::StateReadExt as _, position_manager::{ base_liquidity_index::AssetByLiquidityIndex, inventory_index::PositionByInventoryIndex, @@ -33,11 +34,10 @@ use crate::{event, state_key}; use super::chandelier::Chandelier; const DYNAMIC_ASSET_LIMIT: usize = 10; -const RECENTLY_ACCESSED_ASSET_LIMIT: usize = 10; mod base_liquidity_index; -mod counter; -mod inventory_index; +pub(crate) mod counter; +pub(crate) mod inventory_index; pub(crate) mod price_index; #[async_trait] @@ -282,34 +282,6 @@ pub trait PositionManager: StateWrite + PositionRead { Ok(()) } - /// Adds an asset ID to the list of recently accessed assets, - /// making it a candidate for the current block's arbitrage routing. - /// - /// This ensures that assets associated with recently active positions - /// will be eligible for arbitrage if mispriced positions are opened. - #[tracing::instrument(level = "debug", skip_all)] - fn add_recently_accessed_asset( - &mut self, - asset_id: asset::Id, - fixed_candidates: Arc>, - ) { - let mut assets = self.recently_accessed_assets(); - - // Limit the number of recently accessed assets to prevent blowing - // up routing time. - if assets.len() >= RECENTLY_ACCESSED_ASSET_LIMIT { - return; - } - - // If the asset is already in the fixed candidate list, don't insert it. - if fixed_candidates.contains(&asset_id) { - return; - } - - assets.insert(asset_id); - self.object_put(state_key::recently_accessed_assets(), assets); - } - /// Record execution against an opened position. /// /// IMPORTANT: This method can mutate its input state. diff --git a/crates/core/component/dex/src/component/position_manager/counter.rs b/crates/core/component/dex/src/component/position_manager/counter.rs index 5185fe1f2d..023494f0dd 100644 --- a/crates/core/component/dex/src/component/position_manager/counter.rs +++ b/crates/core/component/dex/src/component/position_manager/counter.rs @@ -8,7 +8,7 @@ use crate::TradingPair; use anyhow::Result; #[async_trait] -pub(super) trait PositionCounterRead: StateRead { +pub(crate) trait PositionCounterRead: StateRead { /// Returns the number of position for a [`TradingPair`]. /// If there were no counter initialized for a given pair, this default to zero. async fn get_position_count(&self, trading_pair: &TradingPair) -> u32 { diff --git a/crates/core/component/dex/src/component/router/route_and_fill.rs b/crates/core/component/dex/src/component/router/route_and_fill.rs index 139dc68589..be45696c2d 100644 --- a/crates/core/component/dex/src/component/router/route_and_fill.rs +++ b/crates/core/component/dex/src/component/router/route_and_fill.rs @@ -13,7 +13,7 @@ use crate::{ chandelier::Chandelier, flow::SwapFlow, router::{FillRoute, PathSearch, RoutingParams}, - ExecutionCircuitBreaker, PositionManager, StateWriteExt, + ExecutionCircuitBreaker, InternalDexWrite, PositionManager, }, lp::position::MAX_RESERVE_AMOUNT, BatchSwapOutputData, SwapExecution, TradingPair, diff --git a/crates/core/component/dex/src/component/router/tests.rs b/crates/core/component/dex/src/component/router/tests.rs index 80c065845a..4f94abcbd1 100644 --- a/crates/core/component/dex/src/component/router/tests.rs +++ b/crates/core/component/dex/src/component/router/tests.rs @@ -8,6 +8,8 @@ use penumbra_num::{fixpoint::U128x128, Amount}; use rand_core::OsRng; use std::sync::Arc; +use crate::component::SwapDataRead; +use crate::component::SwapDataWrite; use crate::lp::SellOrder; use crate::DexParameters; use crate::{ diff --git a/crates/core/component/dex/src/component/swap_manager.rs b/crates/core/component/dex/src/component/swap_manager.rs index 901f432b84..f2f59f25bd 100644 --- a/crates/core/component/dex/src/component/swap_manager.rs +++ b/crates/core/component/dex/src/component/swap_manager.rs @@ -1,37 +1,105 @@ +use std::collections::BTreeMap; + use async_trait::async_trait; -use cnidarium::StateWrite; +use cnidarium::{StateRead, StateWrite}; +use penumbra_asset::Value; use penumbra_sct::{component::tree::SctManager, CommitmentSource}; use penumbra_tct as tct; use tracing::instrument; -use crate::{state_key, swap::SwapPayload}; +use crate::component::circuit_breaker::value::ValueCircuitBreaker; +use crate::BatchSwapOutputData; +use crate::SwapExecution; +use crate::{ + component::flow::SwapFlow, state_key, swap::SwapPayload, DirectedTradingPair, TradingPair, +}; +use anyhow::Result; +use penumbra_proto::StateWriteProto; /// Manages the addition of new notes to the chain state. #[async_trait] -pub trait SwapManager: StateWrite { +pub(crate) trait SwapManager: StateWrite { #[instrument(skip(self, swap), fields(commitment = ?swap.commitment))] async fn add_swap_payload(&mut self, swap: SwapPayload, source: CommitmentSource) { - tracing::debug!("adding swap payload"); - - // 0. Record an ABCI event for transaction indexing. - //self.record(event::state_payload(&payload)); + tracing::trace!("adding swap payload"); - // 1. Insert it into the SCT, recording its source + // Record the swap commitment and its metadata in the SCT let position = self.add_sct_commitment(swap.commitment, source.clone()) .await - // TODO: why? can't we exceed the number of state commitments in a block? + // TODO(erwan): Tracked in #830: we should handle this gracefully .expect("inserting into the state commitment tree should not fail because we should budget commitments per block (currently unimplemented)"); - // 3. Finally, record it to be inserted into the compact block: + // Record the payload in object-storage so that we can include it in this block's [`CompactBlock`]. let mut payloads = self.pending_swap_payloads(); payloads.push_back((position, swap, source)); self.object_put(state_key::pending_payloads(), payloads); } +} +impl SwapManager for T {} + +pub trait SwapDataRead: StateRead { fn pending_swap_payloads(&self) -> im::Vector<(tct::Position, SwapPayload, CommitmentSource)> { self.object_get(state_key::pending_payloads()) .unwrap_or_default() } + + /// Get the swap flow for the given trading pair accumulated in this block so far. + fn swap_flow(&self, pair: &TradingPair) -> SwapFlow { + self.swap_flows().get(pair).cloned().unwrap_or_default() + } + + fn swap_flows(&self) -> BTreeMap { + self.object_get::>(state_key::swap_flows()) + .unwrap_or_default() + } + + fn pending_batch_swap_outputs(&self) -> im::OrdMap { + self.object_get(state_key::pending_outputs()) + .unwrap_or_default() + } } -impl SwapManager for T {} +impl SwapDataRead for T {} + +pub(crate) trait SwapDataWrite: StateWrite { + async fn put_swap_flow( + &mut self, + trading_pair: &TradingPair, + swap_flow: SwapFlow, + ) -> Result<()> { + // Credit the DEX for the swap inflows. + // + // Note that we credit the DEX for _all_ inflows, since we don't know + // how much will eventually be filled. + self.dex_vcb_credit(Value { + amount: swap_flow.0, + asset_id: trading_pair.asset_1, + }) + .await?; + self.dex_vcb_credit(Value { + amount: swap_flow.1, + asset_id: trading_pair.asset_2, + }) + .await?; + + // TODO: replace with IM struct later + let mut swap_flows = self.swap_flows(); + swap_flows.insert(*trading_pair, swap_flow); + self.object_put(state_key::swap_flows(), swap_flows); + + Ok(()) + } + + fn put_swap_execution_at_height( + &mut self, + height: u64, + pair: DirectedTradingPair, + swap_execution: SwapExecution, + ) { + let path = state_key::swap_execution(height, pair); + self.nonverifiable_put(path.as_bytes().to_vec(), swap_execution); + } +} + +impl SwapDataWrite for T {} diff --git a/crates/core/component/dex/src/component/tests.rs b/crates/core/component/dex/src/component/tests.rs index ae7bb3a527..1875c05861 100644 --- a/crates/core/component/dex/src/component/tests.rs +++ b/crates/core/component/dex/src/component/tests.rs @@ -8,6 +8,7 @@ use penumbra_asset::{asset, Value}; use penumbra_num::Amount; use rand_core::OsRng; +use crate::component::{SwapDataRead, SwapDataWrite}; use crate::lp::action::PositionOpen; use crate::lp::{position, SellOrder}; use crate::DexParameters; From 6b99f1cb1ffc6e61879f2cb0f388789f1c03b5e6 Mon Sep 17 00:00:00 2001 From: Erwan Or Date: Thu, 30 May 2024 23:00:59 -0400 Subject: [PATCH 02/10] dex(eviction): scaffolding for end block impl --- .../dex/src/component/eviction_manager.rs | 108 ++++++++++++++++++ .../core/component/dex/src/component/mod.rs | 1 + 2 files changed, 109 insertions(+) create mode 100644 crates/core/component/dex/src/component/eviction_manager.rs diff --git a/crates/core/component/dex/src/component/eviction_manager.rs b/crates/core/component/dex/src/component/eviction_manager.rs new file mode 100644 index 0000000000..a3f2faa50f --- /dev/null +++ b/crates/core/component/dex/src/component/eviction_manager.rs @@ -0,0 +1,108 @@ +use crate::component::StateReadExt; +use crate::{component::position_manager::counter::PositionCounterRead, lp::position}; +use futures::{StreamExt as _, TryStreamExt}; +use std::collections::BTreeSet; +use tokio::task::JoinSet; + +use crate::state_key::eviction_queue; +use anyhow::Result; +use cnidarium::StateWrite; +use penumbra_proto::DomainType; +use tracing::instrument; + +use crate::{component::PositionManager, DirectedTradingPair, TradingPair}; + +pub(crate) trait EvictionManager: StateWrite { + /// Evict liquidity positions that are in excess of the trading pair limit. + /// + /// # Overview + /// This method enforce the approximate limit on the number of + /// positions that can be active for a given trading pair, as defined + /// by [`max_positions_per_pair`](DexParameters#max_positions_per_pair). + /// + /// # Mechanism + /// + /// The eviction mechanism functions by inspecting every trading pair which + /// had LP opened during the block. For each of them, it compute the "excess" + /// amount of positions `M`, defined as follow: + /// `M = N - N_max` where N is the number of positions, + /// and N_max is a chain parameter. + /// + /// Since a [`TradingPair`] defines two possible directed pairs, we need + /// to ensure that we don't evict LPs if they provide important liquidity + /// to at least one direction of the pair. + /// + /// To do this effectively, we maintain a liquidity index which orders LPs + /// by ascending liquidity for each direction of a trading pair. This allow + /// us to easily fetch the worse M positions for each index, and only evict + /// overlapping LPs. + /// + /// This approach sidestep the problem of adjudicating which position deserve + /// to be preserved depending on the trading direction, and limit the number + /// of reads to 2*M. On the other hand, it means that the actual maximum number + /// of positions per pair is 2*N_max. + /// + /// ## Diagram + /// + /// Q_1: A -> B + /// ╔════════════╦━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ + /// ║ Bottom M ┃ M+1 │ M+2 │ . . . │ N-1 │ N ┃ + /// ╚════════════╩━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ + /// ▽▼▼▼▼▼▼▼▽▽▼▽ + /// ┌─▶ find overlap + /// │ ▲▲▲▲△△△▲▲▲▲▲ Q_2: B -> A + /// │ ╔════════════╦━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ + /// │ ║ Bottom M ┃ M+1 │ M+2 │ . . . │ N-1 │ N ┃ + /// │ ╚════════════╩━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ + /// │ ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━▶ + /// │ Ordered by inventory + /// │ + /// │ ▽▼▼▼▼▼▼▼▽▽▼▽ + /// └───────▶ ∩ Across both indices, the + /// ▲▲▲▲△△△▲▲▲▲▲ bottom M positions overlap + /// k times where 0 <= k <= M + /// for N < 2*N_max + /// + #[instrument(skip_all, err, level = "trace")] + async fn evict_positions(&mut self) -> Result<()> { + let hot_pairs: Vec = todo!(); + let max_positions_per_pair = self.get_dex_params().await?.max_positions_per_pair; + + for pair in hot_pairs.iter() { + let total_positions = self.get_position_count(pair).await; + let overhead_size = total_positions.saturating_sub(max_positions_per_pair); + if overhead_size == 0 { + continue; + } + + let pair_ab = DirectedTradingPair::new(pair.asset_1(), pair.asset_2()); + let pair_ba = pair_ab.flip(); + let key_ab = eviction_queue::inventory_index::by_trading_pair(&pair_ab); + let key_ba = eviction_queue::inventory_index::by_trading_pair(&pair_ba); + + let mut stream_ab = self.nonverifiable_prefix_raw(&key_ab).boxed(); + let mut stream_ba = self.nonverifiable_prefix_raw(&key_ba).boxed(); + + let overhead_ab = stream_ab + .take(overhead_size as usize) + .and_then(|(_, raw_id)| async move { position::Id::decode(&*raw_id) }) + .try_collect::>() + .await?; + + let overhead_ba = stream_ba + .take(overhead_size as usize) + .and_then(|(_, raw_id)| async move { position::Id::decode(&*raw_id) }) + .try_collect::>() + .await?; + + let overlap = overhead_ab.intersection(&overhead_ba); + + for id in overlap { + self.close_position_by_id(id).await?; + } + } + todo!() + } +} + +impl EvictionManager for T {} diff --git a/crates/core/component/dex/src/component/mod.rs b/crates/core/component/dex/src/component/mod.rs index a7936a021b..3afdf00add 100644 --- a/crates/core/component/dex/src/component/mod.rs +++ b/crates/core/component/dex/src/component/mod.rs @@ -11,6 +11,7 @@ mod arb; mod chandelier; pub(crate) mod circuit_breaker; mod dex; +mod eviction_manager; mod flow; mod position_manager; mod swap_manager; From 95b39d4f1c492b10d21b1be7ea2eec4ac323cb93 Mon Sep 17 00:00:00 2001 From: Erwan Or Date: Thu, 30 May 2024 23:00:59 -0400 Subject: [PATCH 03/10] dex: rudimentary object tracking of active pairs --- .../core/component/dex/src/component/dex.rs | 24 ++++++++++++++++--- .../dex/src/component/eviction_manager.rs | 9 ++++--- .../core/component/dex/src/component/mod.rs | 15 +++++++----- crates/core/component/dex/src/state_key.rs | 8 +++++++ repro.sh | 6 +++++ 5 files changed, 48 insertions(+), 14 deletions(-) create mode 100755 repro.sh diff --git a/crates/core/component/dex/src/component/dex.rs b/crates/core/component/dex/src/component/dex.rs index c695bfb4f9..8aaa4882a4 100644 --- a/crates/core/component/dex/src/component/dex.rs +++ b/crates/core/component/dex/src/component/dex.rs @@ -1,3 +1,4 @@ +use std::collections::BTreeSet; use std::sync::Arc; use anyhow::Result; @@ -10,6 +11,7 @@ use penumbra_proto::{StateReadProto, StateWriteProto}; use tendermint::v0_37::abci; use tracing::instrument; +use crate::state_key::block_scoped; use crate::{ component::SwapDataRead, component::SwapDataWrite, event, genesis, state_key, BatchSwapOutputData, DexParameters, DirectedTradingPair, SwapExecution, TradingPair, @@ -30,9 +32,7 @@ impl Component for Dex { #[instrument(name = "dex", skip(state, app_state))] async fn init_chain(mut state: S, app_state: Option<&Self::AppState>) { match app_state { - None => { - // Checkpoint -- no-op - } + None => { /* no-op */ } Some(app_state) => { state.put_dex_params(app_state.dex_params.clone()); } @@ -176,6 +176,13 @@ pub trait StateReadExt: StateRead { async fn arb_execution(&self, height: u64) -> Result> { self.get(&state_key::arb_execution(height)).await } + + /// Return a set of [`TradingPair`]s for which liquidity positions were opened + /// during this block. + fn get_active_trading_pairs_in_block(&self) -> BTreeSet { + self.object_get(block_scoped::active::trading_pairs()) + .unwrap_or_default() + } } impl StateReadExt for T {} @@ -192,6 +199,8 @@ impl StateWriteExt for T {} /// The maximum number of "hot" asset identifiers to track for this block. const RECENTLY_ACCESSED_ASSET_LIMIT: usize = 10; + +/// Provide write access to internal dex data. pub(crate) trait InternalDexWrite: StateWrite { /// Adds an asset ID to the list of recently accessed assets, /// making it a candidate for the current block's arbitrage routing. @@ -221,6 +230,15 @@ pub(crate) trait InternalDexWrite: StateWrite { self.object_put(state_key::recently_accessed_assets(), assets); } + /// Mark a [`TradingPair`] as active during this block. + fn mark_trading_pair_as_active(&mut self, pair: TradingPair) { + let mut active_pairs = self.get_active_trading_pairs_in_block(); + + if active_pairs.insert(pair) { + self.object_put(block_scoped::active::trading_pairs(), active_pairs) + } + } + async fn set_output_data( &mut self, output_data: BatchSwapOutputData, diff --git a/crates/core/component/dex/src/component/eviction_manager.rs b/crates/core/component/dex/src/component/eviction_manager.rs index a3f2faa50f..bf9e96fdf6 100644 --- a/crates/core/component/dex/src/component/eviction_manager.rs +++ b/crates/core/component/dex/src/component/eviction_manager.rs @@ -2,7 +2,6 @@ use crate::component::StateReadExt; use crate::{component::position_manager::counter::PositionCounterRead, lp::position}; use futures::{StreamExt as _, TryStreamExt}; use std::collections::BTreeSet; -use tokio::task::JoinSet; use crate::state_key::eviction_queue; use anyhow::Result; @@ -65,7 +64,7 @@ pub(crate) trait EvictionManager: StateWrite { /// #[instrument(skip_all, err, level = "trace")] async fn evict_positions(&mut self) -> Result<()> { - let hot_pairs: Vec = todo!(); + let hot_pairs: BTreeSet = self.get_active_trading_pairs_in_block(); let max_positions_per_pair = self.get_dex_params().await?.max_positions_per_pair; for pair in hot_pairs.iter() { @@ -80,8 +79,8 @@ pub(crate) trait EvictionManager: StateWrite { let key_ab = eviction_queue::inventory_index::by_trading_pair(&pair_ab); let key_ba = eviction_queue::inventory_index::by_trading_pair(&pair_ba); - let mut stream_ab = self.nonverifiable_prefix_raw(&key_ab).boxed(); - let mut stream_ba = self.nonverifiable_prefix_raw(&key_ba).boxed(); + let stream_ab = self.nonverifiable_prefix_raw(&key_ab).boxed(); + let stream_ba = self.nonverifiable_prefix_raw(&key_ba).boxed(); let overhead_ab = stream_ab .take(overhead_size as usize) @@ -101,7 +100,7 @@ pub(crate) trait EvictionManager: StateWrite { self.close_position_by_id(id).await?; } } - todo!() + Ok(()) } } diff --git a/crates/core/component/dex/src/component/mod.rs b/crates/core/component/dex/src/component/mod.rs index 3afdf00add..286b7717d6 100644 --- a/crates/core/component/dex/src/component/mod.rs +++ b/crates/core/component/dex/src/component/mod.rs @@ -16,18 +16,21 @@ mod flow; mod position_manager; mod swap_manager; -pub use self::metrics::register_metrics; +pub use dex::{Dex, StateReadExt, StateWriteExt}; +pub use position_manager::PositionManager; + +// Read data from the Dex component; +pub use position_manager::PositionRead; +pub use swap_manager::SwapDataRead; + pub(crate) use arb::Arbitrage; pub(crate) use circuit_breaker::ExecutionCircuitBreaker; pub(crate) use circuit_breaker::ValueCircuitBreaker; pub(crate) use dex::InternalDexWrite; -pub use dex::{Dex, StateReadExt, StateWriteExt}; -pub use position_manager::PositionManager; pub(crate) use swap_manager::SwapDataWrite; pub(crate) use swap_manager::SwapManager; -// Read data from the Dex component; -pub use position_manager::PositionRead; -pub use swap_manager::SwapDataRead; #[cfg(test)] pub(crate) mod tests; + +pub use self::metrics::register_metrics; diff --git a/crates/core/component/dex/src/state_key.rs b/crates/core/component/dex/src/state_key.rs index 5be9bc1a5c..bedde3a7f6 100644 --- a/crates/core/component/dex/src/state_key.rs +++ b/crates/core/component/dex/src/state_key.rs @@ -60,6 +60,14 @@ pub mod candlesticks { } } +pub mod block_scoped { + pub mod active { + pub fn trading_pairs() -> &'static str { + "dex/block_scoped/active/trading_pairs" + } + } +} + pub fn output_data(height: u64, trading_pair: TradingPair) -> String { format!( "dex/output/{:020}/{}/{}", diff --git a/repro.sh b/repro.sh new file mode 100755 index 0000000000..f272016ab6 --- /dev/null +++ b/repro.sh @@ -0,0 +1,6 @@ +#!/usr/bin/env bash +cargo run --release --bin pcli -- --home localnet_config tx lp order sell 183.070325gn@1.000501gm +cargo run --release --bin pcli -- --home localnet_config tx lp order sell 200gn@1.000501gm +cargo run --release --bin pcli -- --home localnet_config tx lp order sell 2.013014gn@1.00502gm +cargo run --release --bin pcli -- --home localnet_config tx lp order sell 2.013014gn@3.19999penumbra +cargo run --release --bin pcli -- --home localnet_config tx lp order buy 2.013014gn@3.19999penumbra From c6afd148632f316e7410d4f0cbdf42111b5634d4 Mon Sep 17 00:00:00 2001 From: Erwan Or Date: Thu, 30 May 2024 23:00:59 -0400 Subject: [PATCH 04/10] dex: process evictions in end block --- crates/core/component/dex/src/component/dex.rs | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/crates/core/component/dex/src/component/dex.rs b/crates/core/component/dex/src/component/dex.rs index 8aaa4882a4..7fb2469d81 100644 --- a/crates/core/component/dex/src/component/dex.rs +++ b/crates/core/component/dex/src/component/dex.rs @@ -17,6 +17,7 @@ use crate::{ BatchSwapOutputData, DexParameters, DirectedTradingPair, SwapExecution, TradingPair, }; +use super::eviction_manager::EvictionManager; use super::{ chandelier::Chandelier, router::{HandleBatchSwaps, RoutingParams}, @@ -113,7 +114,15 @@ impl Component for Dex { Err(e) => tracing::warn!(?e, "error processing arb, this is a bug"), } - // 4. Close all positions queued for closure at the end of the block. + // 4. Inspect trading pairs that saw new position opened during this block, and + // evict their excess LPs if any are found. + Arc::get_mut(state) + .expect("state should be uniquely referenced after batch swaps complete") + .evict_positions() + .await + .expect("MERGEBLOCK(erwan): remove this"); + + // 5. Close all positions queued for closure at the end of the block. // It's important to do this after execution, to allow block-scoped JIT liquidity. Arc::get_mut(state) .expect("state should be uniquely referenced after batch swaps complete") From c4497e1546cd8fdeab2339f5ee6078b6c97ae0f7 Mon Sep 17 00:00:00 2001 From: Erwan Or Date: Thu, 30 May 2024 23:00:59 -0400 Subject: [PATCH 05/10] dex: mark pairs as active in `open_position` --- crates/core/component/dex/src/component/position_manager.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/crates/core/component/dex/src/component/position_manager.rs b/crates/core/component/dex/src/component/position_manager.rs index 7d3de73a4d..86e76d0d7a 100644 --- a/crates/core/component/dex/src/component/position_manager.rs +++ b/crates/core/component/dex/src/component/position_manager.rs @@ -274,6 +274,9 @@ pub trait PositionManager: StateWrite + PositionRead { position.phi.pair.asset_2(), routing_params.fixed_candidates, ); + // Mark the trading pair as active so that we can inspect it + // at the end of the block and garbage collect excess LPs. + self.mark_trading_pair_as_active(position.phi.pair); // Finally, record the new position state. self.record_proto(event::position_open(&position)); From f733274cb9195cf2531faae355bdb87cea96661a Mon Sep 17 00:00:00 2001 From: Erwan Or Date: Thu, 30 May 2024 23:00:59 -0400 Subject: [PATCH 06/10] dex(eviction): parse position id from index key --- .../component/dex/src/component/eviction_manager.rs | 11 ++++++++--- crates/core/component/dex/src/state_key.rs | 7 +++++++ repro.sh | 6 ------ 3 files changed, 15 insertions(+), 9 deletions(-) delete mode 100755 repro.sh diff --git a/crates/core/component/dex/src/component/eviction_manager.rs b/crates/core/component/dex/src/component/eviction_manager.rs index bf9e96fdf6..c4fa13f156 100644 --- a/crates/core/component/dex/src/component/eviction_manager.rs +++ b/crates/core/component/dex/src/component/eviction_manager.rs @@ -6,7 +6,6 @@ use std::collections::BTreeSet; use crate::state_key::eviction_queue; use anyhow::Result; use cnidarium::StateWrite; -use penumbra_proto::DomainType; use tracing::instrument; use crate::{component::PositionManager, DirectedTradingPair, TradingPair}; @@ -84,13 +83,19 @@ pub(crate) trait EvictionManager: StateWrite { let overhead_ab = stream_ab .take(overhead_size as usize) - .and_then(|(_, raw_id)| async move { position::Id::decode(&*raw_id) }) + .and_then(|(k, _)| async move { + let raw_id = eviction_queue::inventory_index::parse_id_from_key(k)?; + Ok(position::Id(raw_id)) + }) .try_collect::>() .await?; let overhead_ba = stream_ba .take(overhead_size as usize) - .and_then(|(_, raw_id)| async move { position::Id::decode(&*raw_id) }) + .and_then(|(k, _)| async move { + let raw_id = eviction_queue::inventory_index::parse_id_from_key(k)?; + Ok(position::Id(raw_id)) + }) .try_collect::>() .await?; diff --git a/crates/core/component/dex/src/state_key.rs b/crates/core/component/dex/src/state_key.rs index bedde3a7f6..6f0747d8f8 100644 --- a/crates/core/component/dex/src/state_key.rs +++ b/crates/core/component/dex/src/state_key.rs @@ -222,6 +222,7 @@ pub(crate) mod eviction_queue { pub(crate) mod inventory_index { use crate::lp::position; use crate::DirectedTradingPair; + use anyhow::ensure; use penumbra_num::Amount; pub(crate) fn by_trading_pair(pair: &DirectedTradingPair) -> [u8; 107] { @@ -245,5 +246,11 @@ pub(crate) mod eviction_queue { full_key } + + pub(crate) fn parse_id_from_key(key: Vec) -> anyhow::Result<[u8; 32]> { + ensure!(key.len() == 155, "key must be 155 bytes"); + let k = &key[123..155]; + Ok(k.try_into()?) + } } } diff --git a/repro.sh b/repro.sh deleted file mode 100755 index f272016ab6..0000000000 --- a/repro.sh +++ /dev/null @@ -1,6 +0,0 @@ -#!/usr/bin/env bash -cargo run --release --bin pcli -- --home localnet_config tx lp order sell 183.070325gn@1.000501gm -cargo run --release --bin pcli -- --home localnet_config tx lp order sell 200gn@1.000501gm -cargo run --release --bin pcli -- --home localnet_config tx lp order sell 2.013014gn@1.00502gm -cargo run --release --bin pcli -- --home localnet_config tx lp order sell 2.013014gn@3.19999penumbra -cargo run --release --bin pcli -- --home localnet_config tx lp order buy 2.013014gn@3.19999penumbra From 45bb38f24d031b0ea824e24796ed502957abfaca Mon Sep 17 00:00:00 2001 From: Erwan Or Date: Fri, 31 May 2024 14:43:51 -0400 Subject: [PATCH 07/10] dex(eviction): s/compute/computes Co-authored-by: katelyn martin Signed-off-by: Erwan Or --- crates/core/component/dex/src/component/eviction_manager.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/core/component/dex/src/component/eviction_manager.rs b/crates/core/component/dex/src/component/eviction_manager.rs index c4fa13f156..ef005437ca 100644 --- a/crates/core/component/dex/src/component/eviction_manager.rs +++ b/crates/core/component/dex/src/component/eviction_manager.rs @@ -21,7 +21,7 @@ pub(crate) trait EvictionManager: StateWrite { /// # Mechanism /// /// The eviction mechanism functions by inspecting every trading pair which - /// had LP opened during the block. For each of them, it compute the "excess" + /// had LP opened during the block. For each of them, it computes the "excess" /// amount of positions `M`, defined as follow: /// `M = N - N_max` where N is the number of positions, /// and N_max is a chain parameter. From e025b9334324dbf7ef1fa13157668ae63be19914 Mon Sep 17 00:00:00 2001 From: Erwan Or Date: Fri, 31 May 2024 14:46:44 -0400 Subject: [PATCH 08/10] dex(eviction): fix typo s/worse/worst Co-authored-by: katelyn martin Signed-off-by: Erwan Or --- crates/core/component/dex/src/component/eviction_manager.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/core/component/dex/src/component/eviction_manager.rs b/crates/core/component/dex/src/component/eviction_manager.rs index ef005437ca..59cea41da4 100644 --- a/crates/core/component/dex/src/component/eviction_manager.rs +++ b/crates/core/component/dex/src/component/eviction_manager.rs @@ -32,7 +32,7 @@ pub(crate) trait EvictionManager: StateWrite { /// /// To do this effectively, we maintain a liquidity index which orders LPs /// by ascending liquidity for each direction of a trading pair. This allow - /// us to easily fetch the worse M positions for each index, and only evict + /// us to easily fetch the worst M positions for each index, and only evict /// overlapping LPs. /// /// This approach sidestep the problem of adjudicating which position deserve From 591aa94c8979cb0bb727ce006635d013e647a077 Mon Sep 17 00:00:00 2001 From: Erwan Or Date: Fri, 31 May 2024 14:49:01 -0400 Subject: [PATCH 09/10] dex: remove mergeblock --- crates/core/component/dex/src/component/dex.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/core/component/dex/src/component/dex.rs b/crates/core/component/dex/src/component/dex.rs index 7fb2469d81..13d1e88d42 100644 --- a/crates/core/component/dex/src/component/dex.rs +++ b/crates/core/component/dex/src/component/dex.rs @@ -116,11 +116,11 @@ impl Component for Dex { // 4. Inspect trading pairs that saw new position opened during this block, and // evict their excess LPs if any are found. - Arc::get_mut(state) + let _ = Arc::get_mut(state) .expect("state should be uniquely referenced after batch swaps complete") .evict_positions() .await - .expect("MERGEBLOCK(erwan): remove this"); + .map_err(|e| tracing::error!(?e, "error evicting positions, skipping")); // 5. Close all positions queued for closure at the end of the block. // It's important to do this after execution, to allow block-scoped JIT liquidity. From 309915470fae85430aa772a603d741157bc97f32 Mon Sep 17 00:00:00 2001 From: katelyn martin Date: Tue, 28 May 2024 00:00:00 +0000 Subject: [PATCH 10/10] dex: `RoutingParams: From` this is a small change. this is a nice property about our data model, i.e. that an owned copy of the dex parameters may be used to make routing parameters. let's make that relationship explicit by putting it into a `From` impl, and pulling it out of the `StateReadExt` logic. --- crates/core/component/dex/src/component/dex.rs | 7 +------ .../dex/src/component/router/params.rs | 18 ++++++++++++++++++ 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/crates/core/component/dex/src/component/dex.rs b/crates/core/component/dex/src/component/dex.rs index 13d1e88d42..3f3f544b2f 100644 --- a/crates/core/component/dex/src/component/dex.rs +++ b/crates/core/component/dex/src/component/dex.rs @@ -156,12 +156,7 @@ pub trait StateReadExt: StateRead { /// Uses the DEX parameters to construct a `RoutingParams` for use in execution or simulation. async fn routing_params(&self) -> Result { - let dex_params = self.get_dex_params().await?; - Ok(RoutingParams { - max_hops: dex_params.max_hops as usize, - fixed_candidates: Arc::new(dex_params.fixed_candidates), - price_limit: None, - }) + self.get_dex_params().await.map(RoutingParams::from) } async fn output_data( diff --git a/crates/core/component/dex/src/component/router/params.rs b/crates/core/component/dex/src/component/router/params.rs index 95b0022dfa..6d18a9c2c7 100644 --- a/crates/core/component/dex/src/component/router/params.rs +++ b/crates/core/component/dex/src/component/router/params.rs @@ -3,6 +3,8 @@ use std::sync::Arc; use penumbra_asset::asset; use penumbra_num::fixpoint::U128x128; +use crate::DexParameters; + #[derive(Debug, Clone)] pub struct RoutingParams { pub price_limit: Option, @@ -37,3 +39,19 @@ impl RoutingParams { } } } + +impl From for RoutingParams { + fn from( + DexParameters { + fixed_candidates, + max_hops, + .. + }: DexParameters, + ) -> Self { + Self { + fixed_candidates: Arc::new(fixed_candidates), + max_hops: max_hops as usize, + price_limit: None, + } + } +}