Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ocean: Catchup to tip on startup #3105

Merged
merged 13 commits into from
Nov 5, 2024
4 changes: 2 additions & 2 deletions lib/ain-ocean/src/api/pool_pair/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,9 +434,9 @@ async fn list_pool_swaps_verbose(
_ => true,
})
.map(|item| async {
let (_, swap) = item?;
let (key, swap) = item?;
let from = find_swap_from(&ctx, &swap).await?;
let to = find_swap_to(&ctx, &swap).await?;
let to = find_swap_to(&ctx, &key, &swap).await?;

let swap_type = check_swap_type(&ctx, &swap).await?;

Expand Down
16 changes: 14 additions & 2 deletions lib/ain-ocean/src/api/pool_pair/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::{
NotFoundKind, OtherSnafu,
},
indexer::PoolSwapAggregatedInterval,
model::{PoolSwap, PoolSwapAggregatedAggregated},
model::{PoolSwap, PoolSwapAggregatedAggregated, PoolSwapKey},
storage::{RepositoryOps, SecondaryIndex, SortOrder},
Result,
};
Expand Down Expand Up @@ -673,6 +673,7 @@ pub async fn find_swap_from(

pub async fn find_swap_to(
ctx: &Arc<AppContext>,
swap_key: &PoolSwapKey,
swap: &PoolSwap,
) -> Result<Option<PoolSwapFromToData>> {
let PoolSwap {
Expand All @@ -689,9 +690,20 @@ pub async fn find_swap_to(

let display_symbol = parse_display_symbol(&to_token);

// TODO Index to_amount if missing
if to_amount.is_none() {
let amount = 0;
let swap = PoolSwap {
to_amount: Some(amount),
..swap.clone()
};
ctx.services.pool.by_id.put(swap_key, &swap)?;
}

Ok(Some(PoolSwapFromToData {
address: to_address,
amount: Decimal::new(to_amount.to_owned(), 8).to_string(),
// amount: Decimal::new(to_amount.to_owned(), 8).to_string(), // Need fallback
amount: Decimal::new(to_amount.to_owned().unwrap_or_default(), 8).to_string(),
symbol: to_token.symbol,
display_symbol,
}))
Expand Down
10 changes: 10 additions & 0 deletions lib/ain-ocean/src/indexer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,14 @@ fn invalidate_script_activity_vout(
Ok(())
}

pub fn get_block_height(services: &Arc<Services>) -> Result<u32> {
Ok(services
.block
.by_height
.get_highest()?
.map_or(0, |block| block.height))
}

pub fn index_block(services: &Arc<Services>, block: Block<Transaction>) -> Result<()> {
trace!("[index_block] Indexing block...");
let start = Instant::now();
Expand Down Expand Up @@ -601,6 +609,7 @@ pub fn index_block(services: &Arc<Services>, block: Block<Transaction>) -> Resul
DfTx::SetLoanToken(data) => data.index(services, ctx)?,
DfTx::CompositeSwap(data) => data.index(services, ctx)?,
DfTx::PlaceAuctionBid(data) => data.index(services, ctx)?,
DfTx::CreatePoolPair(_) => services.pool_pair_cache.invalidate(),
_ => (),
}
log_elapsed(start, "Indexed dftx");
Expand Down Expand Up @@ -712,6 +721,7 @@ pub fn invalidate_block(services: &Arc<Services>, block: Block<Transaction>) ->
DfTx::SetLoanToken(data) => data.invalidate(services, ctx)?,
DfTx::CompositeSwap(data) => data.invalidate(services, ctx)?,
DfTx::PlaceAuctionBid(data) => data.invalidate(services, ctx)?,
DfTx::CreatePoolPair(_) => services.pool_pair_cache.invalidate(),
_ => (),
}
log_elapsed(start, "Invalidate dftx");
Expand Down
33 changes: 14 additions & 19 deletions lib/ain-ocean/src/indexer/oracle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,14 @@ impl Index for RemoveOracle {
let oracle_id = ctx.tx.txid;
services.oracle.by_id.delete(&oracle_id)?;

let (_, previous) = get_previous_oracle(services, oracle_id)?;

for price_feed in &previous.price_feeds {
services.oracle_token_currency.by_id.delete(&(
price_feed.token.to_owned(),
price_feed.currency.to_owned(),
oracle_id,
))?;
if let Ok((_, previous)) = get_previous_oracle(services, oracle_id) {
for price_feed in &previous.price_feeds {
services.oracle_token_currency.by_id.delete(&(
price_feed.token.to_owned(),
price_feed.currency.to_owned(),
oracle_id,
))?;
}
}

Ok(())
Expand Down Expand Up @@ -262,17 +262,15 @@ fn map_price_aggregated(
)),
SortOrder::Descending,
)?
.take_while(|item| match item {
Ok((k, _)) => k.0 == token.clone() && k.1 == currency.clone(),
_ => true,
})
.take_while(|item| matches!(item, Ok((k, _)) if &k.0 == token && &k.1 == currency))
.flatten()
.collect::<Vec<_>>();

let mut aggregated_total = Decimal::zero();
let mut aggregated_count = Decimal::zero();
let mut aggregated_weightage = Decimal::zero();

let base_id = Txid::from_byte_array([0xffu8; 32]);
let oracles_len = oracles.len();
for (id, oracle) in oracles {
if oracle.weightage == 0 {
Expand All @@ -283,10 +281,7 @@ fn map_price_aggregated(
let feed = services
.oracle_price_feed
.by_id
.list(
Some((id.0, id.1, id.2, Txid::from_byte_array([0xffu8; 32]))),
SortOrder::Descending,
)?
.list(Some((id.0, id.1, id.2, base_id)), SortOrder::Descending)?
.next()
.transpose()?;

Expand Down Expand Up @@ -364,8 +359,8 @@ fn index_set_oracle_data(
let key = (
price_aggregated.aggregated.oracles.total,
price_aggregated.block.height,
token.clone(),
currency.clone(),
token,
currency,
);
ticker_repo.by_key.put(&key, pair)?;
ticker_repo.by_id.put(
Expand Down Expand Up @@ -542,7 +537,7 @@ pub fn index_interval_mapper(

if block.median_time - aggregated.block.median_time > interval.clone() as i64 {
return start_new_bucket(services, block, token, currency, aggregated, interval);
}
};

forward_aggregate(services, previous, aggregated)?;

Expand Down
104 changes: 84 additions & 20 deletions lib/ain-ocean/src/indexer/poolswap.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
use std::{str::FromStr, sync::Arc};

use ain_cpp_imports::PoolPairCreationHeight;
use ain_dftx::{pool::*, COIN};
use bitcoin::Txid;
use log::trace;
use parking_lot::RwLock;
use rust_decimal::Decimal;
use rust_decimal_macros::dec;
use snafu::OptionExt;

use super::{Context, IndexBlockStart};
use crate::{
error::{ArithmeticOverflowSnafu, ArithmeticUnderflowSnafu},
error::{ArithmeticOverflowSnafu, ArithmeticUnderflowSnafu, Error, NotFoundKind},
indexer::{tx_result, Index, Result},
model::{
self, BlockContext, PoolSwapAggregated, PoolSwapAggregatedAggregated, PoolSwapResult,
Expand Down Expand Up @@ -182,7 +184,7 @@ fn create_new_bucket(

impl IndexBlockStart for PoolSwap {
fn index_block_start(self, services: &Arc<Services>, block: &BlockContext) -> Result<()> {
let mut pool_pairs = ain_cpp_imports::get_pool_pairs();
let mut pool_pairs = services.pool_pair_cache.get();
pool_pairs.sort_by(|a, b| b.creation_height.cmp(&a.creation_height));

for interval in AGGREGATED_INTERVALS {
Expand Down Expand Up @@ -245,7 +247,7 @@ impl IndexBlockStart for PoolSwap {

impl Index for PoolSwap {
fn index(self, services: &Arc<Services>, ctx: &Context) -> Result<()> {
trace!("[Poolswap] Indexing...");
trace!("[Poolswap] Indexing {self:?}...");
let txid = ctx.tx.txid;
let idx = ctx.tx_idx;
let from = self.from_script;
Expand All @@ -254,17 +256,28 @@ impl Index for PoolSwap {
let from_amount = self.from_amount;
let to_token_id = self.to_token_id.0;

let Some(TxResult::PoolSwap(PoolSwapResult { to_amount, pool_id })) =
services.result.get(&txid)?
else {
// TODO: Commenting out for now, fallback should only be introduced for supporting back CLI indexing
return Err("Missing swap result".into());
// let pair = find_pair(from_token_id, to_token_id);
// if pair.is_none() {
// return Err(format_err!("Pool not found by {from_token_id}-{to_token_id} or {to_token_id}-{from_token_id}").into());
// }
// let pair = pair.unwrap();
// (None, pair.id)
let (to_amount, pool_id) = match services.result.get(&txid)? {
Some(TxResult::PoolSwap(PoolSwapResult { to_amount, pool_id })) => {
(Some(to_amount), pool_id)
}
_ => {
let poolpairs = services.pool_pair_cache.get();

let pool_id = poolpairs
.into_iter()
.find(|pp| {
(pp.id_token_a == self.from_token_id.0 as u32
&& pp.id_token_b == self.to_token_id.0 as u32)
|| (pp.id_token_a == self.to_token_id.0 as u32
&& pp.id_token_b == self.from_token_id.0 as u32)
})
.map(|pp| pp.id)
.ok_or(Error::NotFound {
kind: NotFoundKind::PoolPair,
})?;

(None, pool_id)
}
};

let swap: model::PoolSwap = model::PoolSwap {
Expand Down Expand Up @@ -317,17 +330,31 @@ impl Index for PoolSwap {

impl Index for CompositeSwap {
fn index(self, services: &Arc<Services>, ctx: &Context) -> Result<()> {
trace!("[CompositeSwap] Indexing...");
trace!("[CompositeSwap] Indexing {self:?}...");
let txid = ctx.tx.txid;
let from_token_id = self.pool_swap.from_token_id.0;
let from_amount = self.pool_swap.from_amount;
let to_token_id = self.pool_swap.to_token_id.0;

let Some(TxResult::PoolSwap(PoolSwapResult { to_amount, pool_id })) =
services.result.get(&txid)?
else {
trace!("Missing swap result for {}", txid.to_string());
return Err("Missing swap result".into());
let (to_amount, pool_id) = match services.result.get(&txid)? {
Some(TxResult::PoolSwap(PoolSwapResult { to_amount, pool_id })) => {
(Some(to_amount), Some(pool_id))
}
_ => {
let poolpairs = services.pool_pair_cache.get();

let pool_id = poolpairs
.into_iter()
.find(|pp| {
(pp.id_token_a == self.pool_swap.from_token_id.0 as u32
&& pp.id_token_b == self.pool_swap.to_token_id.0 as u32)
|| (pp.id_token_a == self.pool_swap.to_token_id.0 as u32
&& pp.id_token_b == self.pool_swap.from_token_id.0 as u32)
})
.map(|pp| pp.id);

(None, pool_id)
}
};

let from = self.pool_swap.from_script;
Expand All @@ -336,6 +363,9 @@ impl Index for CompositeSwap {

let pool_ids = if pools.is_empty() {
// the pool_id from finals wap is the only swap while pools is empty
let pool_id = pool_id.ok_or(Error::NotFound {
kind: NotFoundKind::PoolPair,
})?;
Vec::from([pool_id])
} else {
pools.iter().map(|pool| pool.id.0 as u32).collect()
Expand Down Expand Up @@ -382,3 +412,37 @@ impl Index for CompositeSwap {
tx_result::invalidate(services, &ctx.tx.txid)
}
}

#[derive(Default)]
pub struct PoolPairCache {
cache: RwLock<Option<Vec<PoolPairCreationHeight>>>,
}

impl PoolPairCache {
pub fn new() -> Self {
Self {
cache: RwLock::new(None),
}
}

pub fn get(&self) -> Vec<PoolPairCreationHeight> {
{
let guard = self.cache.read();
if let Some(poolpairs) = guard.as_ref() {
return poolpairs.clone();
}
}

let poolpairs = ain_cpp_imports::get_pool_pairs();

let mut guard = self.cache.write();
*guard = Some(poolpairs.clone());

poolpairs
}

pub fn invalidate(&self) {
let mut guard = self.cache.write();
*guard = None;
}
}
5 changes: 4 additions & 1 deletion lib/ain-ocean/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ use std::{path::PathBuf, sync::Arc};

pub use api::ocean_router;
use error::Error;
use indexer::poolswap::PoolPairCache;
pub use indexer::{
index_block, invalidate_block,
get_block_height, index_block, invalidate_block,
oracle::invalidate_oracle_interval,
transaction::{index_transaction, invalidate_transaction},
tx_result,
Expand Down Expand Up @@ -138,6 +139,7 @@ pub struct Services {
pub script_unspent: ScriptUnspentService,
pub token_graph: Arc<Mutex<UnGraphMap<u32, String>>>,
pub store: Arc<OceanStore>,
pub pool_pair_cache: PoolPairCache,
}

impl Services {
Expand Down Expand Up @@ -208,6 +210,7 @@ impl Services {
},
token_graph: Arc::new(Mutex::new(UnGraphMap::new())),
store: Arc::clone(&store),
pool_pair_cache: PoolPairCache::new(),
}
}
}
2 changes: 1 addition & 1 deletion lib/ain-ocean/src/model/poolswap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub struct PoolSwap {
pub pool_id: u32,
pub from_amount: i64,
pub from_token_id: u64,
pub to_amount: i64,
pub to_amount: Option<i64>,
pub to_token_id: u64,
pub from: ScriptBuf,
pub to: ScriptBuf,
Expand Down
1 change: 1 addition & 0 deletions lib/ain-rs-exports/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,7 @@ pub mod ffi {

fn evm_try_flush_db(result: &mut CrossBoundaryResult);

fn ocean_get_block_height(result: &mut CrossBoundaryResult) -> u32;
fn ocean_index_block(result: &mut CrossBoundaryResult, block_str: String);
fn ocean_invalidate_block(result: &mut CrossBoundaryResult, block: String);

Expand Down
5 changes: 5 additions & 0 deletions lib/ain-rs-exports/src/ocean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ use crate::{
prelude::{cross_boundary_error_return, cross_boundary_success_return},
};

#[ffi_fallible]
pub fn ocean_get_block_height() -> Result<u32> {
ain_ocean::get_block_height(&ain_ocean::SERVICES)
}

#[ffi_fallible]
pub fn ocean_index_block(block_str: String) -> Result<()> {
let block: Block<Transaction> = serde_json::from_str(&block_str)?;
Expand Down
2 changes: 2 additions & 0 deletions src/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ DEFI_CORE_H = \
index/blockfilterindex.h \
index/txindex.h \
indirectmap.h \
ocean.h \
init.h \
interfaces/chain.h \
interfaces/handler.h \
Expand Down Expand Up @@ -410,6 +411,7 @@ libdefi_server_a_SOURCES = \
index/blockfilterindex.cpp \
index/txindex.cpp \
interfaces/chain.cpp \
ocean.cpp \
init.cpp \
dbwrapper.cpp \
ffi/ffiexports.cpp \
Expand Down
Loading
Loading