Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dex: evict excess of liquidity positions #4511

Merged
merged 10 commits into from
Jun 1, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use anyhow::{Context, Result};
use async_trait::async_trait;
use cnidarium::StateWrite;
#[cfg(feature = "component")]
use penumbra_dex::component::{StateReadExt, SwapManager as _};
use penumbra_dex::component::SwapDataRead;
use penumbra_fee::component::StateReadExt as _;
use penumbra_governance::StateReadExt as _;
use penumbra_proto::DomainType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use penumbra_proto::StateWriteProto;
use penumbra_sct::component::source::SourceContext;

use crate::{
component::{position_manager::PositionManager as _, StateReadExt, StateWriteExt, SwapManager},
component::{InternalDexWrite, StateReadExt, SwapDataRead, SwapDataWrite, SwapManager},
event,
swap::{proof::SwapProofPublic, Swap},
};
Expand Down
7 changes: 2 additions & 5 deletions crates/core/component/dex/src/component/arb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,11 @@ use penumbra_sct::component::clock::EpochRead;
use tracing::instrument;

use crate::{
component::{ExecutionCircuitBreaker, ValueCircuitBreaker},
component::{ExecutionCircuitBreaker, InternalDexWrite, ValueCircuitBreaker},
event, SwapExecution,
};

use super::{
router::{RouteAndFill, RoutingParams},
StateWriteExt,
};
use super::router::{RouteAndFill, RoutingParams};

#[async_trait]
pub trait Arbitrage: StateWrite + Sized {
Expand Down
2 changes: 1 addition & 1 deletion crates/core/component/dex/src/component/chandelier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ mod tests {
use crate::{
component::{
router::create_buy, tests::TempStorageExt as _, Dex, PositionManager as _,
StateReadExt as _, StateWriteExt as _,
SwapDataRead, SwapDataWrite,
},
DirectedUnitPair,
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ mod tests {

use crate::component::position_manager::price_index::PositionByPriceIndex;
use crate::component::router::HandleBatchSwaps as _;
use crate::component::{StateReadExt as _, StateWriteExt as _};
use crate::component::{InternalDexWrite, StateReadExt as _, SwapDataRead, SwapDataWrite};
use crate::lp::plan::PositionWithdrawPlan;
use crate::{
component::{router::create_buy, tests::TempStorageExt},
Expand Down
159 changes: 81 additions & 78 deletions crates/core/component/dex/src/component/dex.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
use std::{collections::BTreeMap, sync::Arc};
use std::collections::BTreeSet;
use std::sync::Arc;

use anyhow::Result;
use async_trait::async_trait;
use cnidarium::{StateRead, StateWrite};
use cnidarium_component::Component;
use penumbra_asset::asset;
use penumbra_asset::{Value, STAKING_TOKEN_ASSET_ID};
use penumbra_proto::{StateReadProto, StateWriteProto};
use tendermint::v0_37::abci;
use tracing::instrument;

use crate::state_key::block_scoped;
use crate::{
component::flow::SwapFlow, event, genesis, state_key, BatchSwapOutputData, DexParameters,
DirectedTradingPair, SwapExecution, TradingPair,
component::SwapDataRead, component::SwapDataWrite, event, genesis, state_key,
BatchSwapOutputData, DexParameters, DirectedTradingPair, SwapExecution, TradingPair,
};

use super::eviction_manager::EvictionManager;
use super::{
chandelier::Chandelier,
router::{HandleBatchSwaps, RoutingParams},
Expand All @@ -29,9 +33,7 @@ impl Component for Dex {
#[instrument(name = "dex", skip(state, app_state))]
async fn init_chain<S: StateWrite>(mut state: S, app_state: Option<&Self::AppState>) {
match app_state {
None => {
// Checkpoint -- no-op
}
None => { /* no-op */ }
Some(app_state) => {
state.put_dex_params(app_state.dex_params.clone());
}
Expand All @@ -54,7 +56,6 @@ impl Component for Dex {
// This has already happened in the action handlers for each `PositionOpen` action.

// 2. For each batch swap during the block, calculate clearing prices and set in the JMT.

let routing_params = state.routing_params().await.expect("dex params are set");

for (trading_pair, swap_flows) in state.swap_flows() {
Expand Down Expand Up @@ -113,7 +114,15 @@ impl Component for Dex {
Err(e) => tracing::warn!(?e, "error processing arb, this is a bug"),
}

// 4. Close all positions queued for closure at the end of the block.
// 4. Inspect trading pairs that saw new position opened during this block, and
// evict their excess LPs if any are found.
let _ = Arc::get_mut(state)
.expect("state should be uniquely referenced after batch swaps complete")
.evict_positions()
.await
.map_err(|e| tracing::error!(?e, "error evicting positions, skipping"));

// 5. Close all positions queued for closure at the end of the block.
// It's important to do this after execution, to allow block-scoped JIT liquidity.
Arc::get_mut(state)
.expect("state should be uniquely referenced after batch swaps complete")
Expand All @@ -135,9 +144,21 @@ impl Component for Dex {
}
}

/// Extension trait providing read access to dex data.
/// Provides public read access to DEX data.
#[async_trait]
pub trait StateReadExt: StateRead {
/// Gets the DEX parameters from the state.
async fn get_dex_params(&self) -> Result<DexParameters> {
self.get(state_key::config::dex_params())
.await?
.ok_or_else(|| anyhow::anyhow!("Missing DexParameters"))
}

/// Uses the DEX parameters to construct a `RoutingParams` for use in execution or simulation.
async fn routing_params(&self) -> Result<RoutingParams> {
self.get_dex_params().await.map(RoutingParams::from)
}

async fn output_data(
&self,
height: u64,
Expand All @@ -160,47 +181,67 @@ pub trait StateReadExt: StateRead {
self.get(&state_key::arb_execution(height)).await
}

/// Get the swap flow for the given trading pair accumulated in this block so far.
fn swap_flow(&self, pair: &TradingPair) -> SwapFlow {
self.swap_flows().get(pair).cloned().unwrap_or_default()
}

fn swap_flows(&self) -> BTreeMap<TradingPair, SwapFlow> {
self.object_get::<BTreeMap<TradingPair, SwapFlow>>(state_key::swap_flows())
/// Return a set of [`TradingPair`]s for which liquidity positions were opened
/// during this block.
fn get_active_trading_pairs_in_block(&self) -> BTreeSet<TradingPair> {
self.object_get(block_scoped::active::trading_pairs())
.unwrap_or_default()
}

fn pending_batch_swap_outputs(&self) -> im::OrdMap<TradingPair, BatchSwapOutputData> {
self.object_get(state_key::pending_outputs())
.unwrap_or_default()
}

/// Gets the DEX parameters from the state.
async fn get_dex_params(&self) -> Result<DexParameters> {
self.get(state_key::config::dex_params())
.await?
.ok_or_else(|| anyhow::anyhow!("Missing DexParameters"))
}

/// Uses the DEX parameters to construct a `RoutingParams` for use in execution or simulation.
async fn routing_params(&self) -> Result<RoutingParams> {
let dex_params = self.get_dex_params().await?;
Ok(RoutingParams {
max_hops: dex_params.max_hops as usize,
fixed_candidates: Arc::new(dex_params.fixed_candidates),
price_limit: None,
})
}
}

impl<T: StateRead + ?Sized> StateReadExt for T {}

/// Extension trait providing write access to dex data.
#[async_trait]
pub trait StateWriteExt: StateWrite + StateReadExt {
pub trait StateWriteExt: StateWrite {
cratelyn marked this conversation as resolved.
Show resolved Hide resolved
fn put_dex_params(&mut self, params: DexParameters) {
self.put(state_key::config::dex_params().to_string(), params);
}
}

impl<T: StateWrite + ?Sized> StateWriteExt for T {}

/// The maximum number of "hot" asset identifiers to track for this block.
const RECENTLY_ACCESSED_ASSET_LIMIT: usize = 10;

/// Provide write access to internal dex data.
pub(crate) trait InternalDexWrite: StateWrite {
/// Adds an asset ID to the list of recently accessed assets,
/// making it a candidate for the current block's arbitrage routing.
///
/// This ensures that assets associated with recently active positions
/// will be eligible for arbitrage if mispriced positions are opened.
#[tracing::instrument(level = "debug", skip_all)]
fn add_recently_accessed_asset(
&mut self,
asset_id: asset::Id,
fixed_candidates: Arc<Vec<asset::Id>>,
) {
let mut assets = self.recently_accessed_assets();

// Limit the number of recently accessed assets to prevent blowing
// up routing time.
if assets.len() >= RECENTLY_ACCESSED_ASSET_LIMIT {
return;
}

// If the asset is already in the fixed candidate list, don't insert it.
if fixed_candidates.contains(&asset_id) {
return;
}

assets.insert(asset_id);
self.object_put(state_key::recently_accessed_assets(), assets);
erwanor marked this conversation as resolved.
Show resolved Hide resolved
}

/// Mark a [`TradingPair`] as active during this block.
fn mark_trading_pair_as_active(&mut self, pair: TradingPair) {
let mut active_pairs = self.get_active_trading_pairs_in_block();

if active_pairs.insert(pair) {
self.object_put(block_scoped::active::trading_pairs(), active_pairs)
}
}

async fn set_output_data(
&mut self,
Expand Down Expand Up @@ -258,44 +299,6 @@ pub trait StateWriteExt: StateWrite + StateReadExt {
fn set_arb_execution(&mut self, height: u64, execution: SwapExecution) {
self.put(state_key::arb_execution(height), execution);
}

async fn put_swap_flow(
&mut self,
trading_pair: &TradingPair,
swap_flow: SwapFlow,
) -> Result<()> {
// Credit the DEX for the swap inflows.
//
// Note that we credit the DEX for _all_ inflows, since we don't know
// how much will eventually be filled.
self.dex_vcb_credit(Value {
amount: swap_flow.0,
asset_id: trading_pair.asset_1,
})
.await?;
self.dex_vcb_credit(Value {
amount: swap_flow.1,
asset_id: trading_pair.asset_2,
})
.await?;

// TODO: replace with IM struct later
let mut swap_flows = self.swap_flows();
swap_flows.insert(*trading_pair, swap_flow);
self.object_put(state_key::swap_flows(), swap_flows);

Ok(())
}

fn put_swap_execution_at_height(
&mut self,
height: u64,
pair: DirectedTradingPair,
swap_execution: SwapExecution,
) {
let path = state_key::swap_execution(height, pair);
self.nonverifiable_put(path.as_bytes().to_vec(), swap_execution);
}
}

impl<T: StateWrite + ?Sized> StateWriteExt for T {}
impl<T: StateWrite + ?Sized> InternalDexWrite for T {}
112 changes: 112 additions & 0 deletions crates/core/component/dex/src/component/eviction_manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
use crate::component::StateReadExt;
use crate::{component::position_manager::counter::PositionCounterRead, lp::position};
use futures::{StreamExt as _, TryStreamExt};
use std::collections::BTreeSet;

use crate::state_key::eviction_queue;
use anyhow::Result;
use cnidarium::StateWrite;
use tracing::instrument;

use crate::{component::PositionManager, DirectedTradingPair, TradingPair};

pub(crate) trait EvictionManager: StateWrite {
/// Evict liquidity positions that are in excess of the trading pair limit.
///
/// # Overview
/// This method enforce the approximate limit on the number of
/// positions that can be active for a given trading pair, as defined
/// by [`max_positions_per_pair`](DexParameters#max_positions_per_pair).
///
/// # Mechanism
erwanor marked this conversation as resolved.
Show resolved Hide resolved
///
/// The eviction mechanism functions by inspecting every trading pair which
/// had LP opened during the block. For each of them, it computes the "excess"
/// amount of positions `M`, defined as follow:
/// `M = N - N_max` where N is the number of positions,
/// and N_max is a chain parameter.
///
/// Since a [`TradingPair`] defines two possible directed pairs, we need
/// to ensure that we don't evict LPs if they provide important liquidity
/// to at least one direction of the pair.
///
/// To do this effectively, we maintain a liquidity index which orders LPs
/// by ascending liquidity for each direction of a trading pair. This allow
/// us to easily fetch the worst M positions for each index, and only evict
/// overlapping LPs.
///
/// This approach sidestep the problem of adjudicating which position deserve
/// to be preserved depending on the trading direction, and limit the number
/// of reads to 2*M. On the other hand, it means that the actual maximum number
/// of positions per pair is 2*N_max.
///
/// ## Diagram
///
/// Q_1: A -> B
/// ╔════════════╦━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
/// ║ Bottom M ┃ M+1 │ M+2 │ . . . │ N-1 │ N ┃
/// ╚════════════╩━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
/// ▽▼▼▼▼▼▼▼▽▽▼▽
/// ┌─▶ find overlap
/// │ ▲▲▲▲△△△▲▲▲▲▲ Q_2: B -> A
/// │ ╔════════════╦━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
/// │ ║ Bottom M ┃ M+1 │ M+2 │ . . . │ N-1 │ N ┃
/// │ ╚════════════╩━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
/// │ ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━▶
/// │ Ordered by inventory
/// │
/// │ ▽▼▼▼▼▼▼▼▽▽▼▽
/// └───────▶ ∩ Across both indices, the
/// ▲▲▲▲△△△▲▲▲▲▲ bottom M positions overlap
/// k times where 0 <= k <= M
/// for N < 2*N_max
///
#[instrument(skip_all, err, level = "trace")]
async fn evict_positions(&mut self) -> Result<()> {
let hot_pairs: BTreeSet<TradingPair> = self.get_active_trading_pairs_in_block();
let max_positions_per_pair = self.get_dex_params().await?.max_positions_per_pair;

for pair in hot_pairs.iter() {
let total_positions = self.get_position_count(pair).await;
let overhead_size = total_positions.saturating_sub(max_positions_per_pair);
if overhead_size == 0 {
continue;
}

let pair_ab = DirectedTradingPair::new(pair.asset_1(), pair.asset_2());
let pair_ba = pair_ab.flip();
let key_ab = eviction_queue::inventory_index::by_trading_pair(&pair_ab);
let key_ba = eviction_queue::inventory_index::by_trading_pair(&pair_ba);

let stream_ab = self.nonverifiable_prefix_raw(&key_ab).boxed();
let stream_ba = self.nonverifiable_prefix_raw(&key_ba).boxed();

let overhead_ab = stream_ab
.take(overhead_size as usize)
.and_then(|(k, _)| async move {
let raw_id = eviction_queue::inventory_index::parse_id_from_key(k)?;
Ok(position::Id(raw_id))
})
.try_collect::<BTreeSet<position::Id>>()
.await?;

let overhead_ba = stream_ba
.take(overhead_size as usize)
.and_then(|(k, _)| async move {
let raw_id = eviction_queue::inventory_index::parse_id_from_key(k)?;
Ok(position::Id(raw_id))
})
.try_collect::<BTreeSet<position::Id>>()
.await?;

let overlap = overhead_ab.intersection(&overhead_ba);

for id in overlap {
self.close_position_by_id(id).await?;
}
}
Ok(())
}
erwanor marked this conversation as resolved.
Show resolved Hide resolved
}

impl<T: StateWrite + ?Sized> EvictionManager for T {}
Loading
Loading