Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dex: evict excess of liquidity positions #4511

Merged
merged 10 commits into from
Jun 1, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use anyhow::{Context, Result};
use async_trait::async_trait;
use cnidarium::StateWrite;
#[cfg(feature = "component")]
use penumbra_dex::component::{StateReadExt, SwapManager as _};
use penumbra_dex::component::SwapDataRead;
use penumbra_fee::component::StateReadExt as _;
use penumbra_governance::StateReadExt as _;
use penumbra_proto::DomainType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use penumbra_proto::StateWriteProto;
use penumbra_sct::component::source::SourceContext;

use crate::{
component::{position_manager::PositionManager as _, StateReadExt, StateWriteExt, SwapManager},
component::{InternalDexWrite, StateReadExt, SwapDataRead, SwapDataWrite, SwapManager},
event,
swap::{proof::SwapProofPublic, Swap},
};
Expand Down
7 changes: 2 additions & 5 deletions crates/core/component/dex/src/component/arb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,11 @@ use penumbra_sct::component::clock::EpochRead;
use tracing::instrument;

use crate::{
component::{ExecutionCircuitBreaker, ValueCircuitBreaker},
component::{ExecutionCircuitBreaker, InternalDexWrite, ValueCircuitBreaker},
event, SwapExecution,
};

use super::{
router::{RouteAndFill, RoutingParams},
StateWriteExt,
};
use super::router::{RouteAndFill, RoutingParams};

#[async_trait]
pub trait Arbitrage: StateWrite + Sized {
Expand Down
2 changes: 1 addition & 1 deletion crates/core/component/dex/src/component/chandelier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ mod tests {
use crate::{
component::{
router::create_buy, tests::TempStorageExt as _, Dex, PositionManager as _,
StateReadExt as _, StateWriteExt as _,
SwapDataRead, SwapDataWrite,
},
DirectedUnitPair,
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ mod tests {

use crate::component::position_manager::price_index::PositionByPriceIndex;
use crate::component::router::HandleBatchSwaps as _;
use crate::component::{StateReadExt as _, StateWriteExt as _};
use crate::component::{InternalDexWrite, StateReadExt as _, SwapDataRead, SwapDataWrite};
use crate::lp::plan::PositionWithdrawPlan;
use crate::{
component::{router::create_buy, tests::TempStorageExt},
Expand Down
164 changes: 86 additions & 78 deletions crates/core/component/dex/src/component/dex.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
use std::{collections::BTreeMap, sync::Arc};
use std::collections::BTreeSet;
use std::sync::Arc;

use anyhow::Result;
use async_trait::async_trait;
use cnidarium::{StateRead, StateWrite};
use cnidarium_component::Component;
use penumbra_asset::asset;
use penumbra_asset::{Value, STAKING_TOKEN_ASSET_ID};
use penumbra_proto::{StateReadProto, StateWriteProto};
use tendermint::v0_37::abci;
use tracing::instrument;

use crate::state_key::block_scoped;
use crate::{
component::flow::SwapFlow, event, genesis, state_key, BatchSwapOutputData, DexParameters,
DirectedTradingPair, SwapExecution, TradingPair,
component::SwapDataRead, component::SwapDataWrite, event, genesis, state_key,
BatchSwapOutputData, DexParameters, DirectedTradingPair, SwapExecution, TradingPair,
};

use super::eviction_manager::EvictionManager;
use super::{
chandelier::Chandelier,
router::{HandleBatchSwaps, RoutingParams},
Expand All @@ -29,9 +33,7 @@ impl Component for Dex {
#[instrument(name = "dex", skip(state, app_state))]
async fn init_chain<S: StateWrite>(mut state: S, app_state: Option<&Self::AppState>) {
match app_state {
None => {
// Checkpoint -- no-op
}
None => { /* no-op */ }
Some(app_state) => {
state.put_dex_params(app_state.dex_params.clone());
}
Expand All @@ -54,7 +56,6 @@ impl Component for Dex {
// This has already happened in the action handlers for each `PositionOpen` action.

// 2. For each batch swap during the block, calculate clearing prices and set in the JMT.

let routing_params = state.routing_params().await.expect("dex params are set");

for (trading_pair, swap_flows) in state.swap_flows() {
Expand Down Expand Up @@ -113,7 +114,15 @@ impl Component for Dex {
Err(e) => tracing::warn!(?e, "error processing arb, this is a bug"),
}

// 4. Close all positions queued for closure at the end of the block.
// 4. Inspect trading pairs that saw new position opened during this block, and
// evict their excess LPs if any are found.
Arc::get_mut(state)
.expect("state should be uniquely referenced after batch swaps complete")
.evict_positions()
.await
.expect("MERGEBLOCK(erwan): remove this");

erwanor marked this conversation as resolved.
Show resolved Hide resolved
erwanor marked this conversation as resolved.
Show resolved Hide resolved
// 5. Close all positions queued for closure at the end of the block.
// It's important to do this after execution, to allow block-scoped JIT liquidity.
Arc::get_mut(state)
.expect("state should be uniquely referenced after batch swaps complete")
Expand All @@ -135,9 +144,26 @@ impl Component for Dex {
}
}

/// Extension trait providing read access to dex data.
/// Provides public read access to DEX data.
#[async_trait]
pub trait StateReadExt: StateRead {
/// Gets the DEX parameters from the state.
async fn get_dex_params(&self) -> Result<DexParameters> {
self.get(state_key::config::dex_params())
.await?
.ok_or_else(|| anyhow::anyhow!("Missing DexParameters"))
}

/// Uses the DEX parameters to construct a `RoutingParams` for use in execution or simulation.
async fn routing_params(&self) -> Result<RoutingParams> {
let dex_params = self.get_dex_params().await?;
Ok(RoutingParams {
max_hops: dex_params.max_hops as usize,
fixed_candidates: Arc::new(dex_params.fixed_candidates),
price_limit: None,
})
}

erwanor marked this conversation as resolved.
Show resolved Hide resolved
async fn output_data(
&self,
height: u64,
Expand All @@ -160,47 +186,67 @@ pub trait StateReadExt: StateRead {
self.get(&state_key::arb_execution(height)).await
}

/// Get the swap flow for the given trading pair accumulated in this block so far.
fn swap_flow(&self, pair: &TradingPair) -> SwapFlow {
self.swap_flows().get(pair).cloned().unwrap_or_default()
}

fn swap_flows(&self) -> BTreeMap<TradingPair, SwapFlow> {
self.object_get::<BTreeMap<TradingPair, SwapFlow>>(state_key::swap_flows())
/// Return a set of [`TradingPair`]s for which liquidity positions were opened
/// during this block.
fn get_active_trading_pairs_in_block(&self) -> BTreeSet<TradingPair> {
self.object_get(block_scoped::active::trading_pairs())
.unwrap_or_default()
}

fn pending_batch_swap_outputs(&self) -> im::OrdMap<TradingPair, BatchSwapOutputData> {
self.object_get(state_key::pending_outputs())
.unwrap_or_default()
}

/// Gets the DEX parameters from the state.
async fn get_dex_params(&self) -> Result<DexParameters> {
self.get(state_key::config::dex_params())
.await?
.ok_or_else(|| anyhow::anyhow!("Missing DexParameters"))
}

/// Uses the DEX parameters to construct a `RoutingParams` for use in execution or simulation.
async fn routing_params(&self) -> Result<RoutingParams> {
let dex_params = self.get_dex_params().await?;
Ok(RoutingParams {
max_hops: dex_params.max_hops as usize,
fixed_candidates: Arc::new(dex_params.fixed_candidates),
price_limit: None,
})
}
}

impl<T: StateRead + ?Sized> StateReadExt for T {}

/// Extension trait providing write access to dex data.
#[async_trait]
pub trait StateWriteExt: StateWrite + StateReadExt {
pub trait StateWriteExt: StateWrite {
fn put_dex_params(&mut self, params: DexParameters) {
cratelyn marked this conversation as resolved.
Show resolved Hide resolved
self.put(state_key::config::dex_params().to_string(), params);
}
}

impl<T: StateWrite + ?Sized> StateWriteExt for T {}

/// The maximum number of "hot" asset identifiers to track for this block.
const RECENTLY_ACCESSED_ASSET_LIMIT: usize = 10;

/// Provide write access to internal dex data.
pub(crate) trait InternalDexWrite: StateWrite {
/// Adds an asset ID to the list of recently accessed assets,
/// making it a candidate for the current block's arbitrage routing.
///
/// This ensures that assets associated with recently active positions
/// will be eligible for arbitrage if mispriced positions are opened.
#[tracing::instrument(level = "debug", skip_all)]
fn add_recently_accessed_asset(
&mut self,
asset_id: asset::Id,
fixed_candidates: Arc<Vec<asset::Id>>,
) {
let mut assets = self.recently_accessed_assets();

// Limit the number of recently accessed assets to prevent blowing
// up routing time.
if assets.len() >= RECENTLY_ACCESSED_ASSET_LIMIT {
return;
}

// If the asset is already in the fixed candidate list, don't insert it.
if fixed_candidates.contains(&asset_id) {
return;
}

assets.insert(asset_id);
self.object_put(state_key::recently_accessed_assets(), assets);
}
erwanor marked this conversation as resolved.
Show resolved Hide resolved

/// Mark a [`TradingPair`] as active during this block.
fn mark_trading_pair_as_active(&mut self, pair: TradingPair) {
let mut active_pairs = self.get_active_trading_pairs_in_block();

if active_pairs.insert(pair) {
self.object_put(block_scoped::active::trading_pairs(), active_pairs)
}
}

async fn set_output_data(
&mut self,
Expand Down Expand Up @@ -258,44 +304,6 @@ pub trait StateWriteExt: StateWrite + StateReadExt {
fn set_arb_execution(&mut self, height: u64, execution: SwapExecution) {
self.put(state_key::arb_execution(height), execution);
}

async fn put_swap_flow(
&mut self,
trading_pair: &TradingPair,
swap_flow: SwapFlow,
) -> Result<()> {
// Credit the DEX for the swap inflows.
//
// Note that we credit the DEX for _all_ inflows, since we don't know
// how much will eventually be filled.
self.dex_vcb_credit(Value {
amount: swap_flow.0,
asset_id: trading_pair.asset_1,
})
.await?;
self.dex_vcb_credit(Value {
amount: swap_flow.1,
asset_id: trading_pair.asset_2,
})
.await?;

// TODO: replace with IM struct later
let mut swap_flows = self.swap_flows();
swap_flows.insert(*trading_pair, swap_flow);
self.object_put(state_key::swap_flows(), swap_flows);

Ok(())
}

fn put_swap_execution_at_height(
&mut self,
height: u64,
pair: DirectedTradingPair,
swap_execution: SwapExecution,
) {
let path = state_key::swap_execution(height, pair);
self.nonverifiable_put(path.as_bytes().to_vec(), swap_execution);
}
}

impl<T: StateWrite + ?Sized> StateWriteExt for T {}
impl<T: StateWrite + ?Sized> InternalDexWrite for T {}
Loading
Loading