diff --git a/README.md b/README.md index 7d3e9f50d..0e1ecc6eb 100644 --- a/README.md +++ b/README.md @@ -208,7 +208,7 @@ Debug information is logged to the DEBUG level, including: * Profit levels for considered trades * Intermediate debug logs -The log level may be set via the `LOGLEVEL` environment variable. Possible values are: `INFO`, `DEBUG`, or `ERROR`. +The log level may be set via the `LOGLEVEL` environment variable. Possible values are: `info`, `debug`, or `error`. An example output is as follows: diff --git a/local-interchaintest/Cargo.lock b/local-interchaintest/Cargo.lock index dfe203736..90ba8fc31 100644 --- a/local-interchaintest/Cargo.lock +++ b/local-interchaintest/Cargo.lock @@ -1207,6 +1207,7 @@ dependencies = [ "serde", "serde_json", "shared_child", + "sqlite", ] [[package]] @@ -1968,6 +1969,34 @@ dependencies = [ "der", ] +[[package]] +name = "sqlite" +version = "0.36.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5dfe6fb16f2bee6452feeb4d12bfa404fbcd3cfc121b2950e501d1ae9cae718e" +dependencies = [ + "sqlite3-sys", +] + +[[package]] +name = "sqlite3-src" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "174d4a6df77c27db281fb23de1a6d968f3aaaa4807c2a1afa8056b971f947b4a" +dependencies = [ + "cc", + "pkg-config", +] + +[[package]] +name = "sqlite3-sys" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3901ada7090c3c3584dc92ec7ef1b7091868d13bfe6d7de9f0bcaffee7d0ade5" +dependencies = [ + "sqlite3-src", +] + [[package]] name = "static_assertions" version = "1.1.0" diff --git a/local-interchaintest/Cargo.toml b/local-interchaintest/Cargo.toml index da3bd8725..f8b0761fb 100644 --- a/local-interchaintest/Cargo.toml +++ b/local-interchaintest/Cargo.toml @@ -15,3 +15,4 @@ itertools = "0.13.0" shared_child = "1.0.0" clap = { version = "4.5.8", features = ["derive"] } derive_builder = "0.20.0" +sqlite = { version = "0.36.1" } \ No newline at end of file diff --git a/local-interchaintest/chains/neutron_osmosis_gaia.json b/local-interchaintest/chains/neutron_osmosis_gaia.json index e3aebf1b9..6b5402560 100644 --- a/local-interchaintest/chains/neutron_osmosis_gaia.json +++ b/local-interchaintest/chains/neutron_osmosis_gaia.json @@ -167,7 +167,7 @@ "binary": "osmosisd", "bech32_prefix": "osmo", "docker_image": { - "version": "v25.0.4", + "version": "v26.0.2", "repository": "ghcr.io/strangelove-ventures/heighliner/osmosis" }, "gas_prices": "0.0025%DENOM%", diff --git a/local-interchaintest/src/main.rs b/local-interchaintest/src/main.rs index 20ea2497a..367740945 100644 --- a/local-interchaintest/src/main.rs +++ b/local-interchaintest/src/main.rs @@ -15,7 +15,7 @@ mod util; const TEST_MNEMONIC: &str = "decorate bright ozone fork gallery riot bus exhaust worth way bone indoor calm squirrel merry zero scheme cotton until shop any excess stage laundry"; /// Path to a file where found arbs are stored -const ARBFILE_PATH: &str = "../arbs.json"; +const ARBFILE_PATH: &str = "../arbs.db"; /// The address that should principally own all contracts const OWNER_ADDR: &str = "neutron1hj5fveer5cjtn4wd6wstzugjfdxzl0xpznmsky"; @@ -183,7 +183,56 @@ fn main() -> Result<(), Box> { .with_test(Box::new(tests::test_unprofitable_arb) as TestFn) .build()?, )? - /* + // Test case unprofitable osmo arb: + // + // - Astro: untrn-bruhtoken @ 1.5 bruhtoken/untrn + // - Osmo: bruhtoken-uosmo @ 0.001 uosmo/bruhtoken + // - Astro: uosmo-untrn @ 1 untrn/uosmo + .run( + TestBuilder::default() + .with_name("Osmosis Arb") + .with_description("The arbitrage bot not execute an unprofitable arb on Osmosis") + .with_denom(untrn_osmo.clone(), 100000000000) + .with_denom(uosmo.clone(), 100000000000) + .with_denom(bruhtoken.clone(), 100000000000) + .with_denom(untrn.clone(), 100000000000) + .with_denom(bruhtoken_osmo.clone(), 100000000000) + .with_pool( + untrn.clone(), + uosmo_ntrn.clone(), + Pool::Astroport( + AstroportPoolBuilder::default() + .with_balance_asset_a(10000000u128) + .with_balance_asset_b(15000000u128) + .build()?, + ), + ) + .with_pool( + uosmo.clone(), + bruhtoken_osmo.clone(), + Pool::Osmosis( + OsmosisPoolBuilder::default() + .with_funds(bruhtoken_osmo.clone(), 100000000u128) + .with_funds(uosmo.clone(), 100000u128) + .with_weight(bruhtoken_osmo.clone(), 100u128) + .with_weight(uosmo.clone(), 1u128) + .build(), + ), + ) + .with_pool( + untrn.clone(), + bruhtoken.clone(), + Pool::Auction( + AuctionPoolBuilder::default() + .with_balance_offer_asset(10000000u128) + .with_price(Decimal::percent(10)) + .build()?, + ), + ) + .with_arbbot() + .with_test(Box::new(tests::test_unprofitable_osmo_arb) as TestFn) + .build()?, + )? // Test case (astro -> osmo arb): // // - Astro: untrn-bruhtoken @ 1.5 bruhtoken/untrn @@ -213,7 +262,7 @@ fn main() -> Result<(), Box> { bruhtoken_osmo.clone(), Pool::Osmosis( OsmosisPoolBuilder::default() - .with_funds(bruhtoken_osmo.clone(), 10000000u128) + .with_funds(bruhtoken_osmo.clone(), 100000000u128) .with_funds(uosmo.clone(), 10000000u128) .with_weight(bruhtoken_osmo.clone(), 1u128) .with_weight(uosmo.clone(), 1u128) @@ -223,16 +272,16 @@ fn main() -> Result<(), Box> { .with_pool( untrn.clone(), bruhtoken.clone(), - Pool::Astroport( - AstroportPoolBuilder::default() - .with_balance_asset_a(10000000u128) - .with_balance_asset_b(10000000u128) + Pool::Auction( + AuctionPoolBuilder::default() + .with_balance_offer_asset(10000000u128) + .with_price(Decimal::percent(10)) .build()?, ), ) .with_arbbot() .with_test(Box::new(tests::test_osmo_arb) as TestFn) .build()?, - )?*/ + )? .join() } diff --git a/local-interchaintest/src/setup.rs b/local-interchaintest/src/setup.rs index a7e5ed36d..f88690ff9 100644 --- a/local-interchaintest/src/setup.rs +++ b/local-interchaintest/src/setup.rs @@ -1,5 +1,5 @@ use super::{ - util::{self, DenomMapEntry}, + util::{self, BidirectionalDenomRouteLeg, ChainInfo, DenomFile, DenomRouteLeg}, ARBFILE_PATH, OSMO_OWNER_ADDR, OWNER_ADDR, TEST_MNEMONIC, }; use clap::Parser; @@ -7,8 +7,8 @@ use cosmwasm_std::Decimal; use derive_builder::Builder; use localic_utils::{types::contract::MinAmount, utils::test_context::TestContext}; use notify::{Event, EventKind, RecursiveMode, Result as NotifyResult, Watcher}; -use serde_json::Value; use shared_child::SharedChild; +use sqlite::State; use std::{ borrow::BorrowMut, collections::{HashMap, HashSet}, @@ -21,11 +21,15 @@ use std::{ atomic::{AtomicBool, Ordering}, mpsc, Arc, Mutex, }, + thread, + time::Duration, }; const EXIT_STATUS_SUCCESS: i32 = 9; const EXIT_STATUS_SIGKILL: i32 = 9; +const EMPTY_ARB_DB_SIZE: u64 = 10000; + /// A lazily evaluated denom hash, /// based on an src chain, a dest chain /// and a base denom. If the dest chain @@ -53,10 +57,41 @@ impl Denom { &self, amount: u128, ctx: &mut TestContext, - ) -> Result<(String, Option<(DenomMapEntry, DenomMapEntry)>), Box> - { + ) -> Result> { match self { - Self::Local { base_denom, .. } => Ok((base_denom.to_owned(), None)), + Self::Local { + base_denom, + base_chain, + } => { + let src_chain = ctx.get_chain(&base_chain); + + let chain_info = ChainInfo { + chain_name: src_chain.chain_name.clone(), + chain_id: src_chain.rb.chain_id.clone(), + pfm_enabled: true, + supports_memo: true, + bech32_prefix: src_chain.chain_prefix.clone(), + fee_asset: src_chain.native_denom.clone(), + chain_type: String::from("cosmos"), + pretty_name: src_chain.chain_name.clone(), + }; + + let leg = DenomRouteLeg { + src_chain: base_chain.to_owned(), + dest_chain: base_chain.to_owned(), + src_denom: base_denom.to_owned(), + dest_denom: base_denom.to_owned(), + from_chain: chain_info.clone(), + to_chain: chain_info, + port: String::from("transfer"), + channel: String::new(), + }; + + Ok(BidirectionalDenomRouteLeg { + src_to_dest: leg.clone(), + dest_to_src: leg, + }) + } Self::Interchain { base_denom, base_chain, @@ -87,23 +122,49 @@ impl Denom { let src_chain = ctx.get_chain(&base_chain); let dest_chain = ctx.get_chain(&dest_chain); - Ok(( - ibc_denom_a.clone(), - Some(( - DenomMapEntry { - chain_id: dest_chain.rb.chain_id.clone(), - denom: ibc_denom_a.clone(), - channel_id: trace_a.to_owned(), - port_id: "transfer".to_owned(), - }, - DenomMapEntry { - chain_id: src_chain.rb.chain_id.clone(), - denom: base_denom.to_string(), - channel_id: trace_a_counter.to_owned(), - port_id: "transfer".to_owned(), - }, - )), - )) + let chain_a_info = ChainInfo { + chain_name: src_chain.chain_name.clone(), + chain_id: src_chain.rb.chain_id.clone(), + pfm_enabled: true, + supports_memo: true, + bech32_prefix: src_chain.chain_prefix.clone(), + fee_asset: src_chain.native_denom.clone(), + chain_type: String::from("cosmos"), + pretty_name: src_chain.chain_name.clone(), + }; + let chain_b_info = ChainInfo { + chain_name: dest_chain.chain_name.clone(), + chain_id: dest_chain.rb.chain_id.clone(), + pfm_enabled: true, + supports_memo: true, + bech32_prefix: dest_chain.chain_prefix.clone(), + fee_asset: dest_chain.native_denom.clone(), + chain_type: String::from("cosmos"), + pretty_name: dest_chain.chain_name.clone(), + }; + + Ok(BidirectionalDenomRouteLeg { + src_to_dest: DenomRouteLeg { + src_chain: src_chain.rb.chain_id.clone(), + dest_chain: dest_chain.rb.chain_id.clone(), + src_denom: base_denom.clone(), + dest_denom: ibc_denom_a.clone(), + channel: trace_a.to_owned(), + port: "transfer".to_owned(), + from_chain: chain_a_info.clone(), + to_chain: chain_b_info.clone(), + }, + dest_to_src: DenomRouteLeg { + src_chain: dest_chain.rb.chain_id.clone(), + dest_chain: src_chain.rb.chain_id.clone(), + src_denom: ibc_denom_a, + dest_denom: base_denom.clone(), + channel: trace_a_counter.to_owned(), + port: "transfer".to_owned(), + from_chain: chain_b_info, + to_chain: chain_a_info, + }, + }) } } } @@ -137,8 +198,7 @@ pub struct Args { pub struct TestRunner<'a> { test_statuses: Arc>>, cli_args: Args, - /// Mapping from (src_denom, dest_chain) -> dest_denom - denom_map: HashMap<(String, String), DenomMapEntry>, + denom_file: DenomFile, created_denoms: HashSet, test_ctx: &'a mut TestContext, } @@ -148,7 +208,7 @@ impl<'a> TestRunner<'a> { Self { test_statuses: Default::default(), cli_args, - denom_map: Default::default(), + denom_file: Default::default(), created_denoms: Default::default(), test_ctx: ctx, } @@ -195,7 +255,7 @@ impl<'a> TestRunner<'a> { // Perform hot start setup // Mapping of denoms to their matching denoms, chain id's, channel id's, and ports - self.denom_map = Default::default(); + self.denom_file = Default::default(); let ctx = &mut self.test_ctx; @@ -231,7 +291,7 @@ impl<'a> TestRunner<'a> { ctx.build_tx_create_price_oracle().send()?; ctx.build_tx_update_auction_oracle().send()?; - test.setup(&mut self.denom_map, ctx)?; + test.setup(&mut self.denom_file, ctx)?; let ntrn_to_osmo = ctx .transfer_channel_ids @@ -260,15 +320,15 @@ impl<'a> TestRunner<'a> { .expect("Failed to create deployments file"); util::create_arbs_file().expect("Failed to create arbs file"); util::create_netconfig().expect("Failed to create net config"); - util::create_denom_file(&self.denom_map).expect("Failed to create denom file"); + util::create_denom_file(&self.denom_file).expect("Failed to create denom file"); let statuses = self.test_statuses.clone(); if test.run_arbbot { - with_arb_bot_output(Arc::new(Box::new(move |arbfile: Option| { + with_arb_bot_output(Arc::new(Box::new(move || { statuses.lock().expect("Failed to lock statuses").insert( (test.name.clone(), test.description.clone()), - (*test.test)(arbfile), + (*test.test)(), ); Ok(()) @@ -279,7 +339,7 @@ impl<'a> TestRunner<'a> { statuses.lock().expect("Failed to lock statuses").insert( (test.name.clone(), test.description.clone()), - (*test.test)(None), + (*test.test)(), ); Ok(self) @@ -321,7 +381,7 @@ impl<'a> TestRunner<'a> { } /// A test that receives arb bot executable output. -pub type TestFn = Box) -> TestResult + Send + Sync>; +pub type TestFn = Box TestResult + Send + Sync>; pub type OwnedTestFn = Arc; pub type TestResult = Result<(), Box>; @@ -359,7 +419,7 @@ pub struct Test { impl Test { pub fn setup( &mut self, - denom_map: &mut HashMap<(String, String), DenomMapEntry>, + denom_file: &mut DenomFile, ctx: &mut TestContext, ) -> Result<&mut Self, Box> { self.tokenfactory_token_balances_acc0.iter().try_for_each( @@ -421,32 +481,19 @@ impl Test { let funds_b = spec.balance_asset_b; // Create the osmo pool and join it - let (norm_denom_a, denom_map_ent_1) = - denom_a.normalize(funds_a, ctx).unwrap(); - let (norm_denom_b, denom_map_ent_2) = - denom_b.normalize(funds_b, ctx).unwrap(); - - if let Some((map_ent_a_1, map_ent_a_2)) = denom_map_ent_1 { - // (denom, neutron) -> denom' - // (denom', osmo) -> denom - denom_map.insert((denom_a.to_string(), "neutron".into()), map_ent_a_1); - denom_map.insert((norm_denom_a.clone(), "osmosis".into()), map_ent_a_2); - } - - if let Some((map_ent_b_1, map_ent_b_2)) = denom_map_ent_2 { - // (denom, neutron) -> denom' - // (denom', osmo) -> denom - denom_map.insert((denom_b.to_string(), "neutron".into()), map_ent_b_1); - denom_map.insert((norm_denom_b.clone(), "osmosis".into()), map_ent_b_2); - } + let route_leg_a = denom_a.normalize(funds_a, ctx).unwrap(); + let route_leg_b = denom_b.normalize(funds_b, ctx).unwrap(); + + denom_file.push_denom(route_leg_a.clone()); + denom_file.push_denom(route_leg_b.clone()); ctx.build_tx_create_pool() - .with_denom_a(&norm_denom_a) - .with_denom_b(&norm_denom_b) + .with_denom_a(&route_leg_a.src_to_dest.dest_denom) + .with_denom_b(&route_leg_b.src_to_dest.dest_denom) .send()?; ctx.build_tx_fund_pool() - .with_denom_a(&norm_denom_a) - .with_denom_b(&norm_denom_b) + .with_denom_a(&route_leg_a.src_to_dest.dest_denom) + .with_denom_b(&route_leg_b.src_to_dest.dest_denom) .with_amount_denom_a(spec.balance_asset_a) .with_amount_denom_b(spec.balance_asset_b) .with_liq_token_receiver(OWNER_ADDR) @@ -460,39 +507,41 @@ impl Test { let weight_b = spec.denom_weights.get(denom_b).unwrap_or(&0); // Create the osmo pool and join it - let (norm_denom_a, denom_map_ent_1) = - denom_a.normalize(*funds_a, ctx).unwrap(); - let (norm_denom_b, denom_map_ent_2) = - denom_b.normalize(*funds_b, ctx).unwrap(); - - if let Some((map_ent_a_1, map_ent_a_2)) = denom_map_ent_1 { - // (denom, neutron) -> denom' - // (denom', osmo) -> denom - denom_map.insert((denom_a.to_string(), "osmosis".into()), map_ent_a_1); - denom_map.insert((norm_denom_a.clone(), "neutron".into()), map_ent_a_2); - } - - if let Some((map_ent_b_1, map_ent_b_2)) = denom_map_ent_2 { - // (denom, neutron) -> denom' - // (denom', osmo) -> denom - denom_map.insert((denom_b.to_string(), "osmosis".into()), map_ent_b_1); - denom_map.insert((norm_denom_b.clone(), "neutron".into()), map_ent_b_2); - } + let route_leg_a = denom_a.normalize(*funds_a, ctx).unwrap(); + let route_leg_b = denom_b.normalize(*funds_b, ctx).unwrap(); + + denom_file.push_denom(route_leg_a.clone()); + denom_file.push_denom(route_leg_b.clone()); ctx.build_tx_create_osmo_pool() - .with_weight(&norm_denom_a, *weight_a as u64) - .with_weight(&norm_denom_b, *weight_b as u64) - .with_initial_deposit(&norm_denom_a, *funds_a as u64) - .with_initial_deposit(&norm_denom_b, *funds_b as u64) + .with_weight(&route_leg_a.src_to_dest.dest_denom, *weight_a as u64) + .with_weight(&route_leg_b.src_to_dest.dest_denom, *weight_b as u64) + .with_initial_deposit( + &route_leg_a.src_to_dest.dest_denom, + *funds_a as u64, + ) + .with_initial_deposit( + &route_leg_b.src_to_dest.dest_denom, + *funds_b as u64, + ) .send()?; - let pool_id = ctx.get_osmo_pool(&norm_denom_a, &norm_denom_b)?; + let pool_id = ctx.get_osmo_pool( + &route_leg_a.src_to_dest.dest_denom, + &route_leg_b.src_to_dest.dest_denom, + )?; // Fund the pool ctx.build_tx_fund_osmo_pool() .with_pool_id(pool_id) - .with_max_amount_in(&norm_denom_a, *funds_a as u64) - .with_max_amount_in(&norm_denom_b, *funds_b as u64) + .with_max_amount_in( + &route_leg_a.src_to_dest.dest_denom, + *funds_a as u64, + ) + .with_max_amount_in( + &route_leg_b.src_to_dest.dest_denom, + *funds_b as u64, + ) .with_share_amount_out(1000000000000) .send() } @@ -517,7 +566,7 @@ impl Test { ctx.build_tx_start_auction() .with_offer_asset(&denom_a.to_string()) .with_ask_asset(&denom_b.to_string()) - .with_end_block_delta(10000) + .with_end_block_delta(1000000000000000000) .send() } }) @@ -641,13 +690,13 @@ pub fn with_arb_bot_output(test: OwnedTestFn) -> TestResult { let proc_handle = Arc::new(proc); let proc_handle_watcher = proc_handle.clone(); let (tx_res, rx_res) = mpsc::channel(); - let mut finished = AtomicBool::new(false); + let finished = AtomicBool::new(false); let test_handle = test.clone(); - // Wait until the arbs.json file has been produced + // Wait until the arbs.db file has been produced let mut watcher = notify::recommended_watcher(move |res: NotifyResult| { - let e = res.expect("failed to watch arbs.json"); + let e = res.expect("failed to watch arbs.db"); // An arb was found if let EventKind::Modify(_) = e.kind { @@ -661,21 +710,33 @@ pub fn with_arb_bot_output(test: OwnedTestFn) -> TestResult { let f = OpenOptions::new() .read(true) .open(ARBFILE_PATH) - .expect("failed to open arbs.json"); + .expect("failed to open arbs.db"); - if f.metadata().expect("can't get arbs metadata").len() == 0 { + if f.metadata().expect("can't get arbs metadata").len() < EMPTY_ARB_DB_SIZE { return; } - let arbfile: Value = - serde_json::from_reader(&f).expect("failed to deserialize arbs.json"); - - let res = test_handle(Some(arbfile)); + thread::sleep(Duration::from_secs(1)); proc_handle_watcher.kill().expect("failed to kill arb bot"); - tx_res.send(res).expect("failed to send test results"); - finished.store(true, Ordering::SeqCst); + thread::sleep(Duration::from_secs(2)); + + let conn = sqlite::open(ARBFILE_PATH).expect("failed to open db"); + + let query = "SELECT COUNT(*) AS cnt FROM orders"; + let mut statement = conn.prepare(query).unwrap(); + + if let Ok(State::Row) = statement.next() { + // The db is committed, we can run the tests now + if statement.read::("cnt").unwrap() > 0 { + let res = test_handle(); + + tx_res.send(res).expect("failed to send test results"); + + finished.store(true, Ordering::SeqCst); + } + } } })?; diff --git a/local-interchaintest/src/tests.rs b/local-interchaintest/src/tests.rs index 9c75b16fb..1cb09b300 100644 --- a/local-interchaintest/src/tests.rs +++ b/local-interchaintest/src/tests.rs @@ -1,12 +1,7 @@ -use super::util; -use serde_json::Value; +use super::{util, ARBFILE_PATH}; use std::{error::Error, process::Command}; -const ERROR_MARGIN_PROFIT: u64 = 50000; - -pub fn test_transfer_osmosis( - _: Option, -) -> Result<(), Box> { +pub fn test_transfer_osmosis() -> Result<(), Box> { Command::new("python") .current_dir("tests") .arg("transfer_osmosis.py") @@ -15,9 +10,7 @@ pub fn test_transfer_osmosis( Ok(()) } -pub fn test_transfer_neutron( - _: Option, -) -> Result<(), Box> { +pub fn test_transfer_neutron() -> Result<(), Box> { Command::new("python") .current_dir("tests") .arg("transfer_neutron.py") @@ -26,124 +19,168 @@ pub fn test_transfer_neutron( Ok(()) } -pub fn test_profitable_arb( - arbfile: Option, -) -> Result<(), Box> { - let arbfile = arbfile.unwrap(); - let arbs = arbfile.as_array().expect("no arbs in arbfile"); - - util::assert_err("!arbs.is_empty()", arbs.is_empty(), false)?; - - let profit: u64 = arbs - .iter() - .filter_map(|arb_str| arb_str.as_str()) - .filter_map(|arb_str| { - serde_json::from_str::(arb_str) - .ok()? - .get("realized_profit")? - .as_number()? - .as_u64() - }) - .sum(); - let auction_profit: u64 = arbs - .iter() - .filter_map(|arb_str| arb_str.as_str()) - .filter(|arb_str| arb_str.contains("auction")) - .filter_map(|arb_str| serde_json::from_str::(arb_str).ok()) - .filter_map(|arb| arb.get("realized_profit")?.as_number()?.as_u64()) - .sum(); +pub fn test_profitable_arb() -> Result<(), Box> { + let conn = sqlite::open(ARBFILE_PATH).expect("failed to open db"); + + let profit = { + let query = "SELECT SUM(o.realized_profit) AS total_profit FROM orders o"; + + let mut statement = conn.prepare(query).unwrap(); + + statement + .next() + .ok() + .and_then(|_| statement.read::("total_profit").ok()) + .unwrap_or_default() + }; + + let auction_profit = { + let query = "SELECT SUM(order_profit) AS total_profit FROM (SELECT MAX(o.realized_profit) AS order_profit FROM orders o INNER JOIN legs l ON o.uid = l.order_uid GROUP BY o.uid)"; + let mut statement = conn.prepare(query).unwrap(); + + statement + .next() + .ok() + .and_then(|_| statement.read::("total_profit").ok()) + .unwrap_or_default() + }; + + println!("ARB BOT PROFIT: {profit}"); + println!("AUCTION BOT PROFIT: {auction_profit}"); + + util::assert_err("profit > 0", profit > 0, true)?; + util::assert_err("auction_profit > 0", auction_profit > 0, true)?; + + Ok(()) +} + +pub fn test_unprofitable_arb() -> Result<(), Box> { + let conn = sqlite::open(ARBFILE_PATH).expect("failed to open db"); + + let profit = { + let query = "SELECT SUM(o.realized_profit) AS total_profit FROM orders o"; + + let mut statement = conn.prepare(query).unwrap(); + + statement + .next() + .ok() + .and_then(|_| statement.read::("total_profit").ok()) + .unwrap_or_default() + }; + + let auction_profit = { + let query = "SELECT SUM(order_profit) AS total_profit FROM (SELECT MAX(o.realized_profit) AS order_profit FROM orders o INNER JOIN legs l ON o.uid = l.order_uid GROUP BY o.uid)"; + let mut statement = conn.prepare(query).unwrap(); + + statement + .next() + .ok() + .and_then(|_| statement.read::("total_profit").ok()) + .unwrap_or_default() + }; println!("ARB BOT PROFIT: {profit}"); println!("AUCTION BOT PROFIT: {auction_profit}"); - util::assert_err( - "200000 + PROFIT_MARGIN > profit > 200000 - PROFIT_MARGIN", - 200000 + ERROR_MARGIN_PROFIT > profit && profit > 200000 - ERROR_MARGIN_PROFIT, - true, - )?; - util::assert_err( - "200000 + PROFIT_MARGIN > auction_profit > 200000 - PROFIT_MARGIN", - 200000 + ERROR_MARGIN_PROFIT > auction_profit - && auction_profit > 200000 - ERROR_MARGIN_PROFIT, - true, - )?; + util::assert_err("profit == 0", profit == 0, true)?; + util::assert_err("auction_profit == 0", auction_profit == 0, true)?; Ok(()) } -pub fn test_unprofitable_arb( - arbfile: Option, -) -> Result<(), Box> { - let arbfile = arbfile.unwrap(); - let arbs = arbfile.as_array().expect("no arbs in arbfile"); - - util::assert_err("!arbs.is_empty()", arbs.is_empty(), false)?; - - let profit: u64 = arbs - .iter() - .filter_map(|arb_str| arb_str.as_str()) - .filter_map(|arb_str| { - serde_json::from_str::(arb_str) - .ok()? - .get("realized_profit")? - .as_number()? - .as_u64() - }) - .sum(); - let auction_profit: u64 = arbs - .iter() - .filter_map(|arb_str| arb_str.as_str()) - .filter(|arb_str| arb_str.contains("auction")) - .filter_map(|arb_str| serde_json::from_str::(arb_str).ok()) - .filter_map(|arb| arb.get("realized_profit")?.as_number()?.as_u64()) - .sum(); +pub fn test_unprofitable_osmo_arb() -> Result<(), Box> { + let conn = sqlite::open(ARBFILE_PATH).expect("failed to open db"); + + let profit = { + let query = "SELECT SUM(o.realized_profit) AS total_profit FROM orders o"; + + let mut statement = conn.prepare(query).unwrap(); + + statement + .next() + .ok() + .and_then(|_| statement.read::("total_profit").ok()) + .unwrap_or_default() + }; + + let auction_profit = { + let query = "SELECT SUM(order_profit) AS total_profit FROM (SELECT MAX(o.realized_profit) AS order_profit FROM orders o INNER JOIN legs l ON o.uid = l.order_uid WHERE l.kind = 'auction' GROUP BY o.uid)"; + let mut statement = conn.prepare(query).unwrap(); + + statement + .next() + .ok() + .and_then(|_| statement.read::("total_profit").ok()) + .unwrap_or_default() + }; + + let osmo_profit = { + let query = "SELECT SUM(order_profit) AS total_profit FROM (SELECT MAX(o.realized_profit) AS order_profit FROM orders o INNER JOIN legs l ON o.uid = l.order_uid WHERE l.kind = 'osmosis' GROUP BY o.uid)"; + let mut statement = conn.prepare(query).unwrap(); + + statement + .next() + .ok() + .and_then(|_| statement.read::("total_profit").ok()) + .unwrap_or_default() + }; println!("ARB BOT PROFIT: {profit}"); println!("AUCTION BOT PROFIT: {auction_profit}"); + println!("OSMOSIS BOT PROFIT: {osmo_profit}"); - util::assert_err("profit == 0", profit, 0)?; - util::assert_err("auction_profit == 0", auction_profit, 0)?; + util::assert_err("profit == 0", profit == 0, true)?; + util::assert_err("osmo_profit == 0", osmo_profit == 0, true)?; + util::assert_err("auction_profit == 0", auction_profit == 0, true)?; Ok(()) } -pub fn test_osmo_arb(arbfile: Option) -> Result<(), Box> { - let arbfile = arbfile.unwrap(); - let arbs = arbfile.as_array().expect("no arbs in arbfile"); - - util::assert_err("!arbs.is_empty()", arbs.is_empty(), false)?; - - let profit: u64 = arbs - .iter() - .filter_map(|arb_str| arb_str.as_str()) - .filter_map(|arb_str| { - serde_json::from_str::(arb_str) - .ok()? - .get("realized_profit")? - .as_number()? - .as_u64() - }) - .sum(); - let auction_profit: u64 = arbs - .iter() - .filter_map(|arb_str| arb_str.as_str()) - .filter(|arb_str| arb_str.contains("auction")) - .filter_map(|arb_str| serde_json::from_str::(arb_str).ok()) - .filter_map(|arb| arb.get("realized_profit")?.as_number()?.as_u64()) - .sum(); - let osmo_profit: u64 = arbs - .iter() - .filter_map(|arb_str| arb_str.as_str()) - .filter(|arb_str| arb_str.contains("osmosis")) - .filter_map(|arb_str| serde_json::from_str::(arb_str).ok()) - .filter_map(|arb| arb.get("realized_profit")?.as_number()?.as_u64()) - .sum(); +pub fn test_osmo_arb() -> Result<(), Box> { + let conn = sqlite::open(ARBFILE_PATH).expect("failed to open db"); + + let profit = { + let query = "SELECT SUM(o.realized_profit) AS total_profit FROM orders o"; + + let mut statement = conn.prepare(query).unwrap(); + + statement + .next() + .ok() + .and_then(|_| statement.read::("total_profit").ok()) + .unwrap_or_default() + }; + + let auction_profit = { + let query = "SELECT SUM(order_profit) AS total_profit FROM (SELECT MAX(o.realized_profit) AS order_profit FROM orders o INNER JOIN legs l ON o.uid = l.order_uid WHERE l.kind = 'auction' GROUP BY o.uid)"; + let mut statement = conn.prepare(query).unwrap(); + + statement + .next() + .ok() + .and_then(|_| statement.read::("total_profit").ok()) + .unwrap_or_default() + }; + + let osmo_profit = { + let query = "SELECT SUM(order_profit) AS total_profit FROM (SELECT MAX(o.realized_profit) AS order_profit FROM orders o INNER JOIN legs l ON o.uid = l.order_uid WHERE l.kind = 'osmosis' GROUP BY o.uid)"; + let mut statement = conn.prepare(query).unwrap(); + + statement + .next() + .ok() + .and_then(|_| statement.read::("total_profit").ok()) + .unwrap_or_default() + }; println!("ARB BOT PROFIT: {profit}"); println!("AUCTION BOT PROFIT: {auction_profit}"); println!("OSMOSIS BOT PROFIT: {osmo_profit}"); - util::assert_err("osmo_profit == 0", osmo_profit, 0)?; + util::assert_err("profit > 0", profit > 0, true)?; + util::assert_err("osmo_profit > 0", osmo_profit > 0, true)?; + util::assert_err("auction_profit > 0", auction_profit > 0, true)?; Ok(()) } diff --git a/local-interchaintest/src/util.rs b/local-interchaintest/src/util.rs index 1eb0c298b..471a21443 100644 --- a/local-interchaintest/src/util.rs +++ b/local-interchaintest/src/util.rs @@ -1,12 +1,89 @@ use serde::Serialize; use std::{collections::HashMap, error::Error, fmt::Debug, fs::OpenOptions, io::Write}; +#[derive(Clone, Serialize)] +pub(crate) struct ChainInfo { + pub(crate) chain_name: String, + pub(crate) chain_id: String, + pub(crate) pfm_enabled: bool, + pub(crate) supports_memo: bool, + pub(crate) bech32_prefix: String, + pub(crate) fee_asset: String, + pub(crate) chain_type: String, + pub(crate) pretty_name: String, +} + #[derive(Serialize)] -pub struct DenomMapEntry { - pub chain_id: String, - pub denom: String, - pub channel_id: String, - pub port_id: String, +pub(crate) struct DenomChainInfo { + pub(crate) denom: String, + pub(crate) src_chain_id: String, + pub(crate) dest_chain_id: String, +} + +#[derive(Serialize, Clone)] +pub(crate) struct BidirectionalDenomRouteLeg { + pub(crate) src_to_dest: DenomRouteLeg, + pub(crate) dest_to_src: DenomRouteLeg, +} + +#[derive(Serialize, Clone)] +pub(crate) struct DenomRouteLeg { + pub(crate) src_chain: String, + pub(crate) dest_chain: String, + pub(crate) src_denom: String, + pub(crate) dest_denom: String, + pub(crate) from_chain: ChainInfo, + pub(crate) to_chain: ChainInfo, + pub(crate) port: String, + pub(crate) channel: String, +} + +impl From for DenomChainInfo { + fn from(v: DenomRouteLeg) -> Self { + Self { + denom: v.dest_denom, + src_chain_id: v.src_chain, + dest_chain_id: v.dest_chain, + } + } +} + +#[derive(Serialize, Default)] +pub(crate) struct DenomFile { + pub(crate) denom_map: HashMap>, + pub(crate) denom_routes: HashMap>>, + pub(crate) chain_info: HashMap, +} + +impl DenomFile { + /// Registers a denom in the denom file by adding its + /// matching denoms on all respective chains, + /// and the routes needed to get here. + pub fn push_denom(&mut self, v: BidirectionalDenomRouteLeg) -> &mut Self { + self.denom_map + .entry(v.src_to_dest.src_denom.clone()) + .or_default() + .push(v.src_to_dest.clone().into()); + self.denom_routes + .entry(v.src_to_dest.src_denom.clone()) + .or_default() + .entry(v.src_to_dest.dest_denom.clone()) + .or_default() + .push(v.src_to_dest); + + self.denom_map + .entry(v.dest_to_src.src_denom.clone()) + .or_default() + .push(v.dest_to_src.clone().into()); + self.denom_routes + .entry(v.dest_to_src.src_denom.clone()) + .or_default() + .entry(v.dest_to_src.clone().dest_denom) + .or_default() + .push(v.dest_to_src); + + self + } } /// Creates an error representing a failed assertion. @@ -98,9 +175,9 @@ pub(crate) fn create_arbs_file() -> Result<(), Box> { .create(true) .truncate(true) .write(true) - .open("../arbs.json")?; + .open("../arbs.db")?; - f.write_all(serde_json::json!([]).to_string().as_bytes())?; + f.write_all(&[])?; Ok(()) } @@ -130,24 +207,14 @@ pub(crate) fn create_netconfig() -> Result<(), Box> { Ok(()) } -pub(crate) fn create_denom_file( - denoms: &HashMap<(String, String), DenomMapEntry>, -) -> Result<(), Box> { +pub(crate) fn create_denom_file(file: &DenomFile) -> Result<(), Box> { let mut f = OpenOptions::new() .create(true) .truncate(true) .write(true) .open("../denoms.json")?; - let denoms_for_src = - denoms - .iter() - .fold(HashMap::new(), |mut acc, ((src_denom, _), dest_denom)| { - acc.insert(src_denom, vec![dest_denom]); - acc - }); - - f.write_all(serde_json::to_string(&denoms_for_src)?.as_bytes())?; + f.write_all(serde_json::to_string(file)?.as_bytes())?; Ok(()) } diff --git a/local-interchaintest/tests/transfer_neutron.py b/local-interchaintest/tests/transfer_neutron.py index 78fbda183..539ce5a44 100644 --- a/local-interchaintest/tests/transfer_neutron.py +++ b/local-interchaintest/tests/transfer_neutron.py @@ -1,8 +1,13 @@ +from sqlite3 import connect import json import asyncio +from asyncio import Semaphore from typing import Any from src.strategies.util import transfer_raw -from src.scheduler import Ctx +from src.scheduler import ( + Ctx, + MAX_SKIP_CONCURRENT_CALLS, +) from src.util import try_multiple_clients from src.util import custom_neutron_network_config import aiohttp @@ -46,6 +51,10 @@ async def main() -> None: [], {}, denoms, + {}, + {}, + Semaphore(MAX_SKIP_CONCURRENT_CALLS), + connect("test_db.db"), ) await transfer_raw( diff --git a/local-interchaintest/tests/transfer_osmosis.py b/local-interchaintest/tests/transfer_osmosis.py index 7e609781c..03bc07cc1 100644 --- a/local-interchaintest/tests/transfer_osmosis.py +++ b/local-interchaintest/tests/transfer_osmosis.py @@ -1,8 +1,13 @@ +from sqlite3 import connect import json +from asyncio import Semaphore import asyncio from typing import Any from src.strategies.util import transfer_raw -from src.scheduler import Ctx +from src.scheduler import ( + Ctx, + MAX_SKIP_CONCURRENT_CALLS, +) from src.util import try_multiple_clients from src.util import custom_neutron_network_config import aiohttp @@ -46,6 +51,10 @@ async def main() -> None: [], {}, denoms, + {}, + {}, + Semaphore(MAX_SKIP_CONCURRENT_CALLS), + connect("test_db.db"), ) await transfer_raw( diff --git a/main.py b/main.py index c0c9195ed..507c59904 100644 --- a/main.py +++ b/main.py @@ -4,6 +4,9 @@ Implements a command-line interface for running arbitrage strategies. """ +from contextlib import closing +from sqlite3 import connect +from asyncio import Semaphore import traceback import asyncio from multiprocessing import Process @@ -13,17 +16,23 @@ import sys from os import path import os -from typing import Any, cast, Optional +from typing import Any, cast from cosmpy.aerial.client import LedgerClient from cosmpy.aerial.wallet import LocalWallet -from src.scheduler import Scheduler, Ctx +from src.scheduler import ( + Scheduler, + Ctx, + MAX_SKIP_CONCURRENT_CALLS, +) from src.util import ( custom_neutron_network_config, DISCOVERY_CONCURRENCY_FACTOR, + load_denom_chain_info, + load_denom_route_leg, + load_chain_info, ) from src.contracts.pool.osmosis import OsmosisPoolDirectory from src.contracts.pool.astroport import NeutronAstroportPoolDirectory -from src.contracts.route import Status from src.strategies.naive import strategy from dotenv import load_dotenv import aiohttp @@ -69,12 +78,13 @@ async def main() -> None: "-l", "--log_file", ) - parser.add_argument("-hf", "--history_file", default="arbs.json") + parser.add_argument("-hf", "--history_db", default="arbs.db") parser.add_argument("-c", "--net_config", default="net_conf.json") parser.add_argument( "-df", "--deployments_file", default="contracts/deployments.json" ) parser.add_argument("-rt", "--rebalance_threshold", default=1000) + parser.add_argument("-lr", "--log_rebalancing", default=False) parser.add_argument("cmd", nargs="*", default=None) args = parser.parse_args() @@ -84,7 +94,7 @@ async def main() -> None: format="%(asctime)s %(levelname)-8s %(message)s", datefmt="%Y-%m-%d %H:%M:%S", filename=args.log_file, - level=os.environ.get("LOGLEVEL", "INFO").upper(), + level=os.environ.get("LOGLEVEL", "info").upper(), ) else: @@ -92,25 +102,19 @@ async def main() -> None: format="%(asctime)s %(levelname)-8s %(message)s", datefmt="%Y-%m-%d %H:%M:%S", stream=sys.stdout, - level=os.environ.get("LOGLEVEL", "INFO").upper(), + level=os.environ.get("LOGLEVEL", "info").upper(), ) - # Always make sure the history file exists - if args.history_file is not None and not path.isfile(args.history_file): - logger.info("Creating pool file") - - with open(args.history_file, "w+", encoding="utf-8") as f: - json.dump( - [], - f, - ) - - denom_map: Optional[dict[str, list[dict[str, str]]]] = None + denom_file: dict[str, Any] = { + "denom_map": {}, + "denom_routes": {}, + "chain_info": {}, + } # If the user has specified a denom map, use that instead of skip if args.denom_file is not None and path.isfile(args.denom_file): with open(args.denom_file, "r", encoding="utf-8") as f: - denom_map = json.load(f) + denom_file = json.load(f) # If the user specified a poolfile, create the poolfile if it is empty if args.pool_file is not None and not path.isfile(args.pool_file): @@ -147,284 +151,159 @@ async def main() -> None: connector=aiohttp.TCPConnector( force_close=True, limit_per_host=DISCOVERY_CONCURRENCY_FACTOR ), - timeout=aiohttp.ClientTimeout(total=30), + timeout=aiohttp.ClientTimeout(total=60), ) as session: - ctx: Ctx[Any] = Ctx( - { - chain_id: [ - LedgerClient( - custom_neutron_network_config(endpoint, chain_id=chain_id) - ) - for endpoint in endpoints["grpc"] - ] - for chain_id, endpoints in endpoints.items() - }, - endpoints, - LocalWallet.from_mnemonic( - os.environ.get("WALLET_MNEMONIC"), prefix="neutron" - ), - { - "pool_file": args.pool_file, - "poll_interval": int(args.poll_interval), - "hops": int(args.hops), - "pools": int(args.pools) if args.pools else None, - "require_leg_types": args.require_leg_types, - "base_denom": args.base_denom, - "profit_margin": int(args.profit_margin), - "rebalance_threshold": int(args.rebalance_threshold), - "wallet_mnemonic": os.environ.get("WALLET_MNEMONIC"), - "cmd": args.cmd, - "net_config": args.net_config, - "log_file": args.log_file, - "history_file": args.history_file, - "skip_api_key": ( - os.environ.get("SKIP_API_KEY") - if "SKIP_API_KEY" in os.environ - else None + with closing(connect(args.history_db)) as conn: + ctx: Ctx[Any] = Ctx( + { + chain_id: [ + LedgerClient( + custom_neutron_network_config( + endpoint, chain_id=chain_id + ) + ) + for endpoint in endpoints["grpc"] + ] + for chain_id, endpoints in endpoints.items() + }, + endpoints, + LocalWallet.from_mnemonic( + os.environ.get("WALLET_MNEMONIC"), prefix="neutron" ), - }, - None, - False, - session, - [], - cast(dict[str, Any], json.load(f)), - denom_map, - ).recover_history() - sched = Scheduler(ctx, strategy) - - # Register Osmosis and Astroport providers - osmosis = OsmosisPoolDirectory( - ctx.deployments, - ctx.http_session, - poolfile_path=args.pool_file, - endpoints=endpoints[ - list(ctx.deployments["pools"]["osmosis"].keys())[0] - ], - ) - astros = [ - NeutronAstroportPoolDirectory( + { + "pool_file": args.pool_file, + "poll_interval": int(args.poll_interval), + "hops": int(args.hops), + "pools": int(args.pools) if args.pools else None, + "require_leg_types": args.require_leg_types, + "base_denom": args.base_denom, + "profit_margin": int(args.profit_margin), + "rebalance_threshold": int(args.rebalance_threshold), + "wallet_mnemonic": os.environ.get("WALLET_MNEMONIC"), + "cmd": args.cmd, + "net_config": args.net_config, + "log_file": args.log_file, + "history_db": args.history_db, + "skip_api_key": ( + os.environ.get("SKIP_API_KEY") + if "SKIP_API_KEY" in os.environ + else None + ), + "log_rebalancing": args.log_rebalancing, + }, + None, + False, + session, + [], + cast(dict[str, Any], json.load(f)), + { + denom: [load_denom_chain_info(info) for info in infos] + for (denom, infos) in denom_file["denom_map"].items() + }, + { + src_denom: { + dest_denom: [ + load_denom_route_leg(route) for route in routes + ] + for (dest_denom, routes) in dest_denom_routes.items() + } + for (src_denom, dest_denom_routes) in denom_file[ + "denom_routes" + ].items() + }, + { + chain_id: load_chain_info(info) + for (chain_id, info) in denom_file["chain_info"].items() + }, + Semaphore(MAX_SKIP_CONCURRENT_CALLS), + conn, + ) + sched = Scheduler(ctx, strategy) + + # Register Osmosis and Astroport providers + osmosis = OsmosisPoolDirectory( ctx.deployments, - chain_id, ctx.http_session, - [ - ( - grpc.aio.secure_channel( - endpoint.split("grpc+https://")[1], - grpc.ssl_channel_credentials(), - ) - if "https" in endpoint - else grpc.aio.insecure_channel( - endpoint.split("grpc+http://")[1], - ) - ) - for endpoint in endpoints[chain_id]["grpc"] - ], poolfile_path=args.pool_file, - endpoints=endpoints[chain_id], + endpoints=endpoints[ + list(ctx.deployments["pools"]["osmosis"].keys())[0] + ], ) - for chain_id in ctx.deployments["pools"]["astroport"].keys() - if chain_id in endpoints - ] - - osmo_pools = await osmosis.pools() - astros_pools = [await astro.pools() for astro in astros] - - if args.cmd is not None and len(args.cmd) > 0 and args.cmd[0] == "hist": - # The user wnats to see a specific route - if len(args.cmd) == 3 and args.cmd[1] == "show": - order_id = int(args.cmd[2]) - - if order_id < 0 or order_id >= len(ctx.order_history): - logger.critical("Route does not exist.") - - sys.exit(1) - - logger.info("%s", ctx.order_history[order_id].fmt_pretty()) - - logger.info("Execution trace:") - - for log in ctx.order_history[order_id].logs: - logger.info("%s", log) - else: - for order in ctx.order_history: - logger.info( - "%s (%s) expected ROI: %d, realized P/L: %d, status: %s, is_osmo: %s, is_valence: %s", - order, - order.time_created, - order.expected_profit, - order.realized_profit if order.realized_profit else 0, - order.status, - any([leg.kind == "osmosis" for leg in order.route]), - any([leg.kind == "auction" for leg in order.route]), - ) - - # Print a profit summary - logger.info( - "Summary - total routes attepmted: %d, total routes completed: %d, min P/L: %d, max P/L: %d, total P/L: %d", - len(ctx.order_history), - len( - [ - order - for order in ctx.order_history - if order.status == Status.EXECUTED - ] - ), - min( - [ - order.realized_profit - for order in ctx.order_history - if order.realized_profit - ], - default=0, - ), - max( - [ - order.realized_profit - for order in ctx.order_history - if order.realized_profit - ], - default=0, - ), - sum( - [ - order.realized_profit - for order in ctx.order_history - if order.realized_profit - ] - ), - ) - - atomic_orders = [ - order - for order in ctx.order_history - if all( - [ - leg.kind == "astroport" or leg.kind == "auction" - for leg in order.route - ] - ) - ] - - ibc_orders = [ - order - for order in ctx.order_history - if any([leg.kind == "osmosis" for leg in order.route]) - ] - - logger.info( - "Summary - total atomic routes attepmted: %d, total atomic routes completed: %d, min P/L: %d, max P/L: %d, total atomic P/L: %d", - len(atomic_orders), - len( - [ - order - for order in atomic_orders - if order.status == Status.EXECUTED - ] - ), - min( - [ - order.realized_profit - for order in atomic_orders - if order.realized_profit - ], - default=0, - ), - max( - [ - order.realized_profit - for order in atomic_orders - if order.realized_profit - ], - default=0, - ), - sum( - [ - order.realized_profit - for order in atomic_orders - if order.realized_profit - ] - ), - ) - logger.info( - "Summary - total IBC routes attepmted: %d, total IBC routes completed: %d, min P/L: %d, max P/L: %d, total IBC P/L: %d", - len(ibc_orders), - len( - [ - order - for order in ibc_orders - if order.status == Status.EXECUTED - ] - ), - min( - [ - order.realized_profit - for order in atomic_orders - if order.realized_profit - ], - default=0, - ), - max( - [ - order.realized_profit - for order in atomic_orders - if order.realized_profit - ], - default=0, - ), - sum( - [ - order.realized_profit - for order in ibc_orders - if order.realized_profit - ] - ), + astros = [ + NeutronAstroportPoolDirectory( + ctx.deployments, + chain_id, + ctx.http_session, + [ + ( + grpc.aio.secure_channel( + endpoint.split("grpc+https://")[1], + grpc.ssl_channel_credentials(), + ) + if "https" in endpoint + else grpc.aio.insecure_channel( + endpoint.split("grpc+http://")[1], + ) + ) + for endpoint in endpoints[chain_id]["grpc"] + ], + poolfile_path=args.pool_file, + endpoints=endpoints[chain_id], ) + for chain_id in ctx.deployments["pools"]["astroport"].keys() + if chain_id in endpoints + ] + + osmo_pools = await osmosis.pools() + astros_pools = [await astro.pools() for astro in astros] + + for osmo_base in osmo_pools.values(): + for osmo_pool in osmo_base.values(): + sched.register_provider(osmo_pool) + + for astro_pools in astros_pools: + for astro_base in astro_pools.values(): + for astro_pool in astro_base.values(): + sched.register_provider(astro_pool) + + await sched.register_auctions() + + # Calculate the number of pools by summing up the number of pools for a particular base + # in Osmosis and Astroport + n_pools: int = sum( + map(lambda base: len(base.values()), osmo_pools.values()) + ) + sum(map(lambda base: len(base.values()), astro_pools.values())) + + logger.info("Built pool catalogue with %d pools", n_pools) + + async def event_loop() -> None: + while True: + try: + async with asyncio.timeout(args.poll_interval): + await sched.poll() + except Exception: + logger.info( + "Arbitrage round failed: %s", traceback.format_exc() + ) - return - - for osmo_base in osmo_pools.values(): - for osmo_pool in osmo_base.values(): - sched.register_provider(osmo_pool) - - for astro_pools in astros_pools: - for astro_base in astro_pools.values(): - for astro_pool in astro_base.values(): - sched.register_provider(astro_pool) - - await sched.register_auctions() - - # Calculate the number of pools by summing up the number of pools for a particular base - # in Osmosis and Astroport - n_pools: int = sum( - map(lambda base: len(base.values()), osmo_pools.values()) - ) + sum(map(lambda base: len(base.values()), astro_pools.values())) - - logger.info("Built pool catalogue with %d pools", n_pools) - - async def event_loop() -> None: - while True: - try: - async with asyncio.timeout(args.poll_interval): - await sched.poll() - except Exception: - logger.info( - "Arbitrage round failed: %s", traceback.format_exc() - ) - - continue + continue - def daemon() -> None: - loop = asyncio.get_event_loop() - loop.run_until_complete(event_loop()) + def daemon() -> None: + loop = asyncio.get_event_loop() + loop.run_until_complete(event_loop()) - # Save pools to the specified file if the user wants to dump pools - if args.cmd is not None and len(args.cmd) > 0 and args.cmd[0] == "daemon": - Process(target=daemon, args=[]).run() - logger.info("Spawned searcher daemon") + # Save pools to the specified file if the user wants to dump pools + if ( + args.cmd is not None + and len(args.cmd) > 0 + and args.cmd[0] == "daemon" + ): + Process(target=daemon, args=[]).run() + logger.info("Spawned searcher daemon") - return + return - await event_loop() + await event_loop() if __name__ == "__main__": diff --git a/src/contracts/pool/astroport.py b/src/contracts/pool/astroport.py index 0171883a8..8754b934e 100644 --- a/src/contracts/pool/astroport.py +++ b/src/contracts/pool/astroport.py @@ -31,7 +31,7 @@ from cosmpy.aerial.tx import Transaction -MAX_SPREAD = "0.05" +MAX_SPREAD = "0.1" @dataclass @@ -254,7 +254,7 @@ async def __balance(self, asset: Token | NativeToken) -> int: balance = next(b for b in balances if b["info"] == token_to_asset_info(asset)) - if not balance: + if balance is None: return 0 return int(balance["amount"]) diff --git a/src/contracts/route.py b/src/contracts/route.py index c52cc8bb4..3eb6520c6 100644 --- a/src/contracts/route.py +++ b/src/contracts/route.py @@ -1,7 +1,8 @@ +from datetime import datetime import json from enum import Enum from dataclasses import dataclass -from typing import Union, Callable, Optional +from typing import Union, Callable, Optional, Any from src.contracts.auction import AuctionProvider from src.contracts.pool.provider import PoolProvider @@ -39,6 +40,7 @@ class LegRepr: out_asset: str kind: str executed: bool + execution_height: Optional[int] def __str__(self) -> str: return f"{self.kind}: {self.in_asset} -> {self.out_asset}" @@ -53,13 +55,15 @@ class Route: uid: int route: list[LegRepr] + legs: list[Leg] theoretical_profit: int expected_profit: int realized_profit: Optional[int] quantities: list[int] status: Status - time_created: str + time_created: datetime logs: list[str] + logs_enabled: bool def __hash__(self) -> int: return hash(self.uid) @@ -67,10 +71,63 @@ def __hash__(self) -> int: def __str__(self) -> str: return f"r{self.uid}" - def fmt_pretty(self) -> str: - route_fmt = " -> ".join(map(lambda route_leg: str(route_leg), self.route)) - - return f"{str(self)} ({self.time_created}) expected ROI: {self.expected_profit}, realized P/L: {self.realized_profit}, status: {self.status}, path: {route_fmt}, execution plan: {self.quantities}" + def db_row(self) -> list[Any]: + """ + Creates a tuple representing the route's metadata for + persistence purposes. + Expects insertioni columns (theoretical_profit, + expected_profit, + realized_profit, + status, + time_created, + logs_enabled) + """ + + return [ + str(self.theoretical_profit), + str(self.expected_profit), + str(self.realized_profit), + str(self.status), + self.time_created, + self.logs_enabled, + ] + + def legs_db_rows(self, order_uid: int) -> list[list[Any]]: + """ + Creates a db row for each leg in the route. + Expects insertion columns (in_amount, out_amount, in_asset, out_asset, kind, executed) + """ + + legs = [] + + for i, (leg, leg_repr, in_amount) in enumerate( + zip(self.legs, self.route, self.quantities) + ): + out_amount = self.quantities[i + 1] + + legs.append( + [ + i, + order_uid, + str(in_amount), + str(out_amount), + leg.in_asset(), + leg.out_asset(), + leg_repr.kind, + leg_repr.executed, + leg_repr.execution_height, + ] + ) + + return legs + + def logs_db_rows(self, order_uid: int) -> list[tuple[int, int, str]]: + """ + Creates a db row for each log in the route. + Expects insertionc columns (contents) + """ + + return [(i, order_uid, log) for (i, log) in enumerate(self.logs)] def dumps(self) -> str: return json.dumps( @@ -96,27 +153,3 @@ def dumps(self) -> str: "logs": self.logs, } ) - - -def load_route(s: str) -> Route: - loaded = json.loads(s) - - return Route( - loaded["uid"], - [load_leg_repr(json_leg) for json_leg in loaded["route"]], - loaded["theoretical_profit"], - loaded["expected_profit"], - loaded["realized_profit"], - loaded["quantities"], - Status[loaded["status"].split(".")[1]], - loaded["time_created"], - loaded["logs"], - ) - - -def load_leg_repr(s: str) -> LegRepr: - loaded = json.loads(s) - - return LegRepr( - loaded["in_asset"], loaded["out_asset"], loaded["kind"], loaded["executed"] - ) diff --git a/src/db.py b/src/db.py new file mode 100644 index 000000000..d2adebcd6 --- /dev/null +++ b/src/db.py @@ -0,0 +1,111 @@ +from sqlite3 import Cursor +from typing import Any + + +def order_row_count(cur: Cursor) -> int: + """ + Gets the number of orders in the database. + """ + + res = cur.execute("SELECT COUNT(*) AS cnt FROM orders") + + (cnt,) = res.fetchone() + + return int(cnt) + + +def insert_order_rows(cur: Cursor, rows: list[list[Any]]) -> None: + """ + Inserts a new row into orders for each order row. + Does not commit the transaction. + """ + + cur.executemany( + """INSERT INTO orders( + theoretical_profit, + expected_profit, + realized_profit, + status, + time_created, + logs_enabled + ) VALUES (?, ?, ?, ?, ?, ?)""", + rows, + ) + + +def insert_legs_rows(cur: Cursor, rows: list[list[Any]]) -> None: + """ + Inserts a new row into legs for each leg row. + Does not commit the transaction. + """ + + cur.executemany( + """INSERT INTO legs( + route_index, + order_uid, + in_amount, + out_amount, + in_asset, + out_asset, + kind, + executed, + execution_height + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""", + rows, + ) + + +def insert_logs_rows(cur: Cursor, rows: list[tuple[int, int, str]]) -> None: + """ + Inserts a new row for each log row. + Does not commit the transaction. + """ + + cur.executemany( + "INSERT INTO logs(log_index, order_uid, contents) VALUES (?, ?, ?)", rows + ) + + +def migrate(cur: Cursor) -> None: + """ + Creates requisite tables in the on-disk db in case they do not already exist. + Does not commit the transaction. + """ + + cur.execute( + """CREATE TABLE IF NOT EXISTS orders( + uid INTEGER NOT NULL PRIMARY KEY, + theoretical_profit TEXT NOT NULL, + expected_profit TEXT NOT NULL, + realized_profit TEXT, + status TEXT NOT NULL, + time_created DATETIME NOT NULL, + logs_enabled BOOL NOT NULL + )""" + ) + + cur.execute( + """CREATE TABLE IF NOT EXISTS legs( + route_index INTEGER NOT NULL, + order_uid INTEGER NOT NULL, + in_amount TEXT NOT NULL, + out_amount TEXT, + in_asset TEXT NOT NULL, + out_asset TEXT NOT NULL, + kind TEXT NOT NULL, + executed BOOL NOT NULL, + execution_height INTEGER, + PRIMARY KEY (route_index, order_uid), + FOREIGN KEY(order_uid) REFERENCES orders(uid) + )""" + ) + + cur.execute( + """CREATE TABLE IF NOT EXISTS logs( + log_index INTEGER NOT NULL, + order_uid INTEGER NOT NULL, + contents TEXT NOT NULL, + PRIMARY KEY (log_index, order_uid), + FOREIGN KEY(order_uid) REFERENCES orders(uid) + )""" + ) diff --git a/src/scheduler.py b/src/scheduler.py index 298cbd428..9542b7799 100644 --- a/src/scheduler.py +++ b/src/scheduler.py @@ -2,16 +2,32 @@ Implements a strategy runner with an arbitrary provider set in an event-loop style. """ +from sqlite3 import Connection +from asyncio import Semaphore import logging from datetime import datetime -import json from typing import Callable, List, Self, Optional, Awaitable, Any, TypeVar, Generic from dataclasses import dataclass from cosmpy.aerial.client import LedgerClient +from cosmpy.crypto.address import Address from cosmpy.aerial.wallet import LocalWallet from src.contracts.auction import AuctionDirectory, AuctionProvider -from src.contracts.route import Route, load_route, LegRepr, Status, Leg +from src.contracts.route import Route, LegRepr, Status, Leg from src.contracts.pool.provider import PoolProvider +from src.db import ( + migrate, + insert_legs_rows, + insert_logs_rows, + insert_order_rows, + order_row_count, +) +from src.util import ( + try_multiple_clients, + DenomRouteLeg, + DenomRouteQuery, + ChainInfo, + DenomChainInfo, +) import aiohttp import grpc @@ -21,6 +37,15 @@ MAX_ROUTE_HISTORY_LEN = 200000 +# The maximum number of concurrent connections +# that can be open to +MAX_SKIP_CONCURRENT_CALLS = 5 + + +# Length to truncate denoms in balance logs to +DENOM_BALANCE_PREFIX_MAX_DENOM_LEN = 12 + + TState = TypeVar("TState") @@ -41,7 +66,11 @@ class Ctx(Generic[TState]): http_session: aiohttp.ClientSession order_history: list[Route] deployments: dict[str, Any] - denom_map: Optional[dict[str, list[dict[str, str]]]] + denom_map: dict[str, list[DenomChainInfo]] + denom_routes: dict[str, dict[str, list[DenomRouteLeg]]] + chain_info: dict[str, ChainInfo] + http_session_lock: Semaphore + db_connection: Connection def with_state(self, state: Any) -> Self: """ @@ -58,22 +87,32 @@ def commit_history(self) -> Self: Commits the order history to disk. """ - with open(self.cli_args["history_file"], "w", encoding="utf-8") as f: - f.seek(0) - json.dump([order.dumps() for order in self.order_history], f) + cur = self.db_connection.cursor() - return self + migrate(cur) - def recover_history(self) -> Self: - """ - Retrieves the order history from disk - """ + starting_uid = order_row_count(cur) + + route_rows = [route.db_row() for route in self.order_history] + leg_rows = [ + leg_row + for (i, route) in enumerate(self.order_history) + for leg_row in route.legs_db_rows(starting_uid + i) + ] + log_rows = [ + log_row + for (i, route) in enumerate(self.order_history) + for log_row in route.logs_db_rows(starting_uid + i) + ] + + insert_order_rows(cur, route_rows) + insert_legs_rows(cur, leg_rows) + insert_logs_rows(cur, log_rows) - with open(self.cli_args["history_file"], "r", encoding="utf-8") as f: - f.seek(0) - self.order_history = [ - load_route(json_route) for json_route in json.load(f) - ][:-MAX_ROUTE_HISTORY_LEN] + cur.close() + self.db_connection.commit() + + self.order_history = [] return self @@ -92,6 +131,7 @@ def queue_route( theoretical_profit: int, expected_profit: int, quantities: list[int], + enable_logs: bool = True, ) -> Route: """ Creates a new identified route, inserting it into the order history, @@ -101,17 +141,20 @@ def queue_route( r = Route( len(self.order_history), [ - LegRepr(leg.in_asset(), leg.out_asset(), leg.backend.kind, False) + LegRepr(leg.in_asset(), leg.out_asset(), leg.backend.kind, False, None) for leg in route ], + route, theoretical_profit, expected_profit, None, quantities, Status.QUEUED, - datetime.now().strftime("%Y-%m-%d @ %H:%M:%S"), + datetime.now(), [], + enable_logs, ) + self.order_history.append(r) return r @@ -133,14 +176,57 @@ def log_route( Writes a log to the standard logger and to the log file of a route. """ - route.logs.append(f"{log_level.upper()} {fmt_string % tuple(args)}") + if not route.logs_enabled: + return + + def asset_balance_prefix(leg: Leg, asset: str) -> Optional[str]: + balance_resp_asset = self.query_denom_balance( + asset, leg.backend.chain_id, leg.backend.chain_prefix + ) + + if balance_resp_asset is None or not isinstance(balance_resp_asset, int): + return None + + return f"balance[{leg.backend.chain_id}]({asset[:DENOM_BALANCE_PREFIX_MAX_DENOM_LEN]}): {balance_resp_asset}" + + def leg_balance_prefixes(leg: Leg) -> list[str]: + """ + Get the chain, denom, and denom balance for the in and out assets in the leg. + """ + + assets = [leg.in_asset(), leg.out_asset()] + + return [ + x for x in (asset_balance_prefix(leg, asset) for asset in assets) if x + ] + + # Log all in and out asset balances for each leg in the route, + # removing any duplicate prefixes using dict.fromkeys + prefix = "" + + if log_level == "debug": + prefix = " ".join( + list( + dict.fromkeys( + [ + prefix + for leg_prefixes in [ + leg_balance_prefixes(leg) for leg in route.legs + ] + for prefix in leg_prefixes + ] + ) + ) + ) + + route.logs.append(f"{log_level.upper()} {prefix} {fmt_string % tuple(args)}") if route.uid >= len(self.order_history) or route.uid < 0: return self.order_history[route.uid] = route - fmt_string = f"%s- {fmt_string}" + fmt_string = f"{prefix} %s- {fmt_string}" if log_level == "info": logger.info(fmt_string, str(route), *args) @@ -155,6 +241,197 @@ def log_route( if log_level == "debug": logger.debug(fmt_string, str(route), *args) + def query_denom_balance(self, denom: str, chain_id: str, chain_prefix: str) -> int: + """ + Gets the balance of the denom on the given chain. + """ + + balance_resp_asset = try_multiple_clients( + self.clients[chain_id], + lambda client: client.query_bank_balance( + Address( + self.wallet.public_key(), + prefix=chain_prefix, + ), + denom, + ), + ) + + if balance_resp_asset is None or not isinstance(balance_resp_asset, int): + return 0 + + return int(balance_resp_asset) + + async def query_denom_route( + self, query: DenomRouteQuery + ) -> Optional[list[DenomRouteLeg]]: + if ( + self.denom_routes + and query.src_denom in self.denom_routes + and query.dest_denom in self.denom_routes[query.src_denom] + ): + return self.denom_routes[query.src_denom][query.dest_denom] + + head = {"accept": "application/json", "content-type": "application/json"} + + async with self.http_session_lock: + async with self.http_session.post( + "https://api.skip.money/v2/fungible/route", + headers=head, + json={ + "amount_in": "1", + "source_asset_denom": query.src_denom, + "source_asset_chain_id": query.src_chain, + "dest_asset_denom": query.dest_denom, + "dest_asset_chain_id": query.dest_chain, + "allow_multi_tx": True, + "allow_unsafe": False, + "bridges": ["IBC"], + }, + ) as resp: + if resp.status != 200: + return None + + ops = (await resp.json())["operations"] + + # The transfer includes a swap or some other operation + # we can't handle + if any(("transfer" not in op for op in ops)): + return None + + transfer_info = ops[0]["transfer"] + + from_chain_info = await self.query_chain_info( + transfer_info["from_chain_id"] + ) + to_chain_info = await self.query_chain_info( + transfer_info["to_chain_id"] + ) + + if not from_chain_info or not to_chain_info: + return None + + route = [ + DenomRouteLeg( + src_chain=query.src_chain, + dest_chain=query.dest_chain, + src_denom=query.src_denom, + dest_denom=query.dest_denom, + from_chain=from_chain_info, + to_chain=to_chain_info, + port=transfer_info["port"], + channel=transfer_info["channel"], + ) + for op in ops + ] + + self.denom_routes.get(query.src_denom, {})[query.dest_denom] = route + + return route + + async def query_chain_info( + self, + chain_id: str, + ) -> Optional[ChainInfo]: + """ + Gets basic information about a cosmos chain. + """ + + if chain_id in self.chain_info: + return self.chain_info[chain_id] + + head = {"accept": "application/json", "content-type": "application/json"} + + async with self.http_session_lock: + async with self.http_session.get( + f"https://api.skip.money/v2/info/chains?chain_ids={chain_id}", + headers=head, + ) as resp: + if resp.status != 200: + return None + + chains = (await resp.json())["chains"] + + if len(chains) == 0: + return None + + chain = chains[0] + + chain_info = ChainInfo( + chain_name=chain["chain_name"], + chain_id=chain["chain_id"], + pfm_enabled=chain["pfm_enabled"], + supports_memo=chain["supports_memo"], + bech32_prefix=chain["bech32_prefix"], + fee_asset=chain["fee_assets"][0]["denom"], + chain_type=chain["chain_type"], + pretty_name=chain["pretty_name"], + ) + + self.chain_info[chain_id] = chain_info + + return chain_info + + async def query_denom_info_on_chain( + self, + src_chain: str, + src_denom: str, + dest_chain: str, + ) -> Optional[DenomChainInfo]: + """ + Gets a neutron denom's denom and channel on/to another chain. + """ + + infos = await self.query_denom_info(src_chain, src_denom) + + return next((info for info in infos if info.dest_chain_id == dest_chain)) + + async def query_denom_info( + self, + src_chain: str, + src_denom: str, + ) -> list[DenomChainInfo]: + """ + Gets a denom's denom and channel on/to other chains. + """ + + if src_denom in self.denom_map: + return self.denom_map[src_denom] + + head = {"accept": "application/json", "content-type": "application/json"} + + async with self.http_session_lock: + async with self.http_session.post( + "https://api.skip.money/v2/fungible/assets_from_source", + headers=head, + json={ + "allow_multi_tx": False, + "include_cw20_assets": True, + "source_asset_denom": src_denom, + "source_asset_chain_id": src_chain, + "client_id": "timewave-arb-bot", + }, + ) as resp: + if resp.status != 200: + return [] + + dests = (await resp.json())["dest_assets"] + + def chain_info(chain_id: str, info: dict[str, Any]) -> DenomChainInfo: + info = info["assets"][0] + + return DenomChainInfo( + src_chain_id=src_chain, + denom=info["denom"], + dest_chain_id=chain_id, + ) + + infos = [chain_info(chain_id, info) for chain_id, info in dests.items()] + + self.denom_map[src_denom] = infos + + return infos + class Scheduler(Generic[TState]): """ diff --git a/src/strategies/bellman_ford.py b/src/strategies/bellman_ford.py index d70ed918e..fdc4ff408 100644 --- a/src/strategies/bellman_ford.py +++ b/src/strategies/bellman_ford.py @@ -4,7 +4,6 @@ from functools import cache import traceback -import random import logging from decimal import Decimal import asyncio @@ -21,7 +20,6 @@ quantities_for_route_profit, ) from src.util import ( - denom_info_on_chain, try_multiple_clients, ) from cosmpy.crypto.address import Address @@ -291,7 +289,7 @@ async def strategy( ), ) - if not balance_resp: + if balance_resp is None or not isinstance(balance_resp, int): return ctx profit = await route_base_denom_profit(balance_resp, route) @@ -392,7 +390,7 @@ async def route_bellman_ford( } if ctx.cli_args["pools"]: - vertices = set(random.sample(list(vertices), ctx.cli_args["pools"] - 1)) + vertices = set(list(vertices)) # How far a given denom is from the `src` denom distances: dict[str, Decimal] = {} @@ -409,17 +407,15 @@ async def route_bellman_ford( for edge_a, edge_b in ctx.state.weights.values(): async def relax_edge(edge: Edge) -> None: - in_asset_infos = await denom_info_on_chain( + in_asset_infos = await ctx.query_denom_info_on_chain( edge.backend.backend.chain_id, edge.backend.in_asset(), "neutron-1", - ctx.http_session, ) - out_asset_infos = await denom_info_on_chain( + out_asset_infos = await ctx.query_denom_info_on_chain( edge.backend.backend.chain_id, edge.backend.out_asset(), "neutron-1", - ctx.http_session, ) if not in_asset_infos: @@ -428,8 +424,8 @@ async def relax_edge(edge: Edge) -> None: if not out_asset_infos: return - in_asset = in_asset_infos[0].denom - out_asset = out_asset_infos[0].denom + in_asset = in_asset_infos.denom + out_asset = out_asset_infos.denom if ( ( @@ -507,40 +503,36 @@ def check_cycle(edge: Edge) -> Optional[list[str]]: # If this trade doesn't start and end with USDC # construct it to do so if legs[0].in_asset() != src or legs[-1].out_asset() != src: - in_denom = await denom_info_on_chain( + in_denom = await ctx.query_denom_info_on_chain( "neutron-1", src, legs[0].backend.chain_id, - ctx.http_session, ) if not in_denom: return None - out_denom = await denom_info_on_chain( + out_denom = await ctx.query_denom_info_on_chain( "neutron-1", src, legs[-1].backend.chain_id, - ctx.http_session, ) if not out_denom: return None in_legs: list[Union[PoolProvider, AuctionProvider]] = list( - pools.get(in_denom[0].denom, {}).get(legs[0].in_asset(), []) + pools.get(in_denom.denom, {}).get(legs[0].in_asset(), []) ) - in_auction = auctions.get(in_denom[0].denom, {}).get(legs[0].in_asset(), None) + in_auction = auctions.get(in_denom.denom, {}).get(legs[0].in_asset(), None) if in_auction: in_legs.append(in_auction) out_legs: list[Union[PoolProvider, AuctionProvider]] = list( - pools.get(legs[-1].out_asset(), {}).get(out_denom[0].denom, []) - ) - out_auction = auctions.get(legs[-1].out_asset(), {}).get( - out_denom[0].denom, None + pools.get(legs[-1].out_asset(), {}).get(out_denom.denom, []) ) + out_auction = auctions.get(legs[-1].out_asset(), {}).get(out_denom.denom, None) if out_auction: out_legs.append(out_auction) @@ -556,12 +548,12 @@ def check_cycle(edge: Edge) -> Optional[list[str]]: Leg( ( in_leg.asset_a - if in_leg.asset_a() == in_denom[0].denom + if in_leg.asset_a() == in_denom.denom else in_leg.asset_b ), ( in_leg.asset_b - if in_leg.asset_a() == in_denom[0].denom + if in_leg.asset_a() == in_denom.denom else in_leg.asset_a ), in_leg, @@ -572,12 +564,12 @@ def check_cycle(edge: Edge) -> Optional[list[str]]: Leg( ( out_leg.asset_b - if out_leg.asset_a() == out_denom[0].denom + if out_leg.asset_a() == out_denom.denom else out_leg.asset_a ), ( out_leg.asset_a - if out_leg.asset_a() == out_denom[0].denom + if out_leg.asset_a() == out_denom.denom else out_leg.asset_b ), out_leg, diff --git a/src/strategies/naive.py b/src/strategies/naive.py index 9bdecc877..e58c836a6 100644 --- a/src/strategies/naive.py +++ b/src/strategies/naive.py @@ -63,8 +63,7 @@ def poll( ), ) - if balance_resp: - self.balance = balance_resp + self.balance = balance_resp return self @@ -113,7 +112,7 @@ async def strategy( ): ctx.log_route(r, "info", "Route queued: %s", [fmt_route(route)]) - if not state.balance: + if state.balance is None: return ctx ctx.log_route( @@ -233,7 +232,7 @@ async def eval_route( if not state: return None - if not state.balance: + if state.balance is None or not isinstance(state.balance, int): logger.error( "Failed to fetch bot wallet balance for account %s", str(Address(ctx.wallet.public_key(), prefix="neutron")), diff --git a/src/strategies/util.py b/src/strategies/util.py index efae0dfe5..e53a93f9f 100644 --- a/src/strategies/util.py +++ b/src/strategies/util.py @@ -2,7 +2,6 @@ Defines common utilities shared across arbitrage strategies. """ -import random import traceback import asyncio from itertools import groupby @@ -21,17 +20,13 @@ NeutronAstroportPoolProvider, ) from src.util import ( - chain_info, IBC_TRANSFER_TIMEOUT_SEC, IBC_TRANSFER_POLL_INTERVAL_SEC, try_multiple_rest_endpoints, try_multiple_clients_fatal, try_multiple_clients, - DENOM_QUANTITY_ABORT_ARB, - denom_route, - denom_info_on_chain, - denom_info, - DenomChainInfo, + DenomRouteQuery, + fmt_denom_route_leg, ) from src.scheduler import Ctx from cosmos.base.v1beta1 import coin_pb2 @@ -47,6 +42,16 @@ MAX_POOL_LIQUIDITY_TRADE = Decimal("0.05") + +DENOM_QUANTITY_ABORT_ARB = 500 + + +""" +Prevent routes from being evaluated excessively when binary search fails. +""" +MAX_EVAL_PROBES = 2**6 + + """ The amount of the summed gas limit that will be consumed if messages are batched together. @@ -112,6 +117,14 @@ def collapse_route( ] +def expand_route(route_sublegs: list[list[tuple[Leg, int]]]) -> list[tuple[Leg, int]]: + """ + Ungroups legs grouped together by consecutive elements. + """ + + return [leg for sublegs in route_sublegs for leg in sublegs] + + def build_atomic_arb( sublegs: list[tuple[Leg, int]], wallet: LocalWallet ) -> Transaction: @@ -138,6 +151,28 @@ def build_atomic_arb( return tx +def denom_balance_on_chain( + provider: Union[AuctionProvider, PoolProvider], denom: str, ctx: Ctx[Any] +) -> int: + """ + Gets the maximum order size for the provider of the leg, + given the balance in the user's wallet. + """ + + balance_resp = try_multiple_clients( + ctx.clients[provider.chain_id], + lambda client: client.query_bank_balance( + Address(ctx.wallet.public_key(), prefix=provider.chain_prefix), + denom, + ), + ) + + if isinstance(balance_resp, int): + return balance_resp + + return 0 + + async def exec_arb( route_ent: Route, profit: int, @@ -187,9 +222,72 @@ async def exec_arb( ], ) - for sublegs in to_execute: + for i, sublegs in enumerate(to_execute): + (leg, predicted_to_swap) = sublegs[0] + + # The execution plan must have a sufficient balance + # in order to execute the step. + # Otherwise, the step's quantity should be set to the balance + # and the plan should be updated and reevaluated for profit. + # If the route is no longer profitable, it should be canceled + + to_swap = min( + predicted_to_swap, + denom_balance_on_chain( + prev_leg.backend if prev_leg else leg.backend, + prev_leg.out_asset() if prev_leg else leg.in_asset(), + ctx, + ), + ) + + ctx.log_route( + route_ent, + "info", + "Execution plan for leg %s requires %d, and maximum spendable for leg is %d", + [fmt_route([leg]), predicted_to_swap, to_swap], + ) + + # Recalculate execution plan and update all legs, + # or abort if the route is no longer profitable + if to_swap != predicted_to_swap: + ctx.log_route( + route_ent, + "info", + "Maximum spendable for leg %s (%d) is insufficient for execution plan (requires %d); reevaluating", + [fmt_route([leg]), predicted_to_swap, to_swap], + ) + + remaining_legs = expand_route(to_execute[i:]) + + _, new_execution_plan = await quantities_for_route_profit( + to_swap, + [leg for leg, _ in remaining_legs], + route_ent, + ctx, + ) + + # The execution plan was aborted + if len(new_execution_plan) < len(remaining_legs): + ctx.log_route( + route_ent, + "info", + "Insufficient execution planning (%d) for remaining legs (%d); skipping", + [len(new_execution_plan), len(remaining_legs)], + ) + + continue + + # The execution plan indicates the trade is no longer profitable + if new_execution_plan[-1] < quantities[0]: + raise ValueError( + "Execution plan indicates arb is no longer profitable." + ) + + # Update the remaining execution plan + to_execute[i:] = collapse_route(iter(remaining_legs)) + leg_to_swap: tuple[Leg, int] = sublegs[0] - (leg, to_swap) = leg_to_swap + (leg, _) = leg_to_swap # Log legs on the same chain if len(sublegs) > 1: @@ -360,13 +458,21 @@ async def exec_arb( ) for leg, _ in sublegs: - next( + executed_leg = next( ( leg_repr for leg_repr in route_ent.route if str(leg_repr) == str(leg) ) - ).executed = True + ) + + executed_leg.executed = True + + # Update the execution height if it can be found + resp = tx.response + + if resp: + executed_leg.execution_height = resp.height prev_leg = leg @@ -430,9 +536,17 @@ async def exec_arb( ], ) - next( + executed_leg = next( (leg_repr for leg_repr in route_ent.route if str(leg_repr) == str(leg)) - ).executed = True + ) + + executed_leg.executed = True + + # Update the execution height if it can be found + resp = tx.response + + if resp: + executed_leg.execution_height = resp.height prev_leg = leg @@ -458,7 +572,7 @@ async def rebalance_portfolio( for chain_id in ctx.endpoints.keys(): logger.info("Rebalancing portfolio for chain %s", chain_id) - chain_meta = await chain_info(chain_id, ctx.http_session) + chain_meta = await ctx.query_chain_info(chain_id) if not chain_meta: continue @@ -499,67 +613,77 @@ async def eval_sell_denom(denom: str, sell_denom: str, balance: int) -> None: logger.info("Rebalancing %d %s", balance, denom) - async for route_ent, route in listen_routes_with_depth_dfs( - ctx.cli_args["hops"], - denom, - sell_denom, - set(), - pools, - auctions, - ctx, - ): - # For logging - _, execution_plan = await quantities_for_route_profit( - balance, route, route_ent, ctx, seek_profit=False + route_ent, route = await anext( + listen_routes_with_depth_dfs( + ctx.cli_args["hops"], + denom, + sell_denom, + set(), + pools, + auctions, + ctx, ) + ) - # The execution plan was aborted - if len(execution_plan) <= len(route): - ctx.log_route( - route_ent, - "info", - "Insufficient execution planning for rebalancing for %s; skipping", - [denom], - ) + ctx.log_route( + route_ent, + "info", + "Rebalancing route discovered: %s", + [fmt_route(route)], + ) - continue + route_ent.logs_enabled = ctx.cli_args["log_rebalancing"] - # Check that the execution plan results in a liquidatable quantity - if execution_plan[-1] < ctx.cli_args["rebalance_threshold"]: - ctx.log_route( - route_ent, - "info", - "Not enough funds for rebalancing %s; trying a different execution plan", - [denom], - ) + # For logging + _, execution_plan = await quantities_for_route_profit( + balance, route, route_ent, ctx, seek_profit=False + ) + + # The execution plan was aborted + if len(execution_plan) <= len(route): + ctx.log_route( + route_ent, + "info", + "Insufficient execution planning for rebalancing for %s; skipping", + [denom], + ) - continue + return + # Check that the execution plan results in a liquidatable quantity + if execution_plan[-1] < ctx.cli_args["rebalance_threshold"]: ctx.log_route( - route_ent, "info", "Executing rebalancing plan for %s", [denom] + route_ent, + "info", + "Not enough funds for rebalancing %s; trying a different execution plan", + [denom], ) - # Execute the plan - route_ent.quantities = execution_plan - ctx.update_route(route_ent) + return - try: - await exec_arb(route_ent, 0, execution_plan, route, ctx) + ctx.log_route( + route_ent, "info", "Executing rebalancing plan for %s", [denom] + ) - break - except Exception: - ctx.log_route( - route_ent, - "error", - "Arb failed - rebalancing of %s failed: %s", - [ - denom, - traceback.format_exc().replace( - "\n", - f"\n{route_ent.uid}- Arb failed - failed to rebalance funds: ", - ), - ], - ) + # Execute the plan + route_ent.quantities = execution_plan + ctx.update_route(route_ent) + + try: + await exec_arb(route_ent, 0, execution_plan, route, ctx) + except Exception: + ctx.log_route( + route_ent, + "error", + "Arb failed - rebalancing of %s failed: %s", + [ + denom, + traceback.format_exc().replace( + "\n", + f"\n{route_ent.uid}- Arb failed - failed to rebalance funds: ", + ), + ], + ) await asyncio.gather( *[ @@ -585,8 +709,6 @@ async def listen_routes_with_depth_dfs( ] ] = None, ) -> AsyncGenerator[tuple[Route, list[Leg]], None]: - denom_cache: dict[str, dict[str, list[str]]] = {} - start_pools: list[Union[AuctionProvider, PoolProvider]] = [ *auctions.get(src, {}).values(), *(pool for pool_set in pools.get(src, {}).values() for pool in pool_set), @@ -604,19 +726,21 @@ async def listen_routes_with_depth_dfs( async def next_legs( path: list[Leg], ) -> AsyncGenerator[tuple[Route, list[Leg]], None]: - nonlocal denom_cache nonlocal eval_profit - if len(path) >= 2 and not ( - path[-1].in_asset() == path[-2].out_asset() - or path[-1].in_asset() - in [ - denom - for denom_list in denom_cache[path[-2].out_asset()].values() - for denom in denom_list - ] - ): - return + if len(path) >= 2: + denom_infos = await ctx.query_denom_info( + path[-2].backend.chain_id, + path[-2].out_asset(), + ) + + matching_denoms = [info.denom for info in denom_infos] + + if not ( + path[-1].in_asset() == path[-2].out_asset() + or path[-1].in_asset() in matching_denoms + ): + return # Only find `limit` pools # with a depth less than `depth @@ -662,29 +786,7 @@ async def next_legs( # no more work to do end = prev_pool.out_asset() - if end not in denom_cache: - try: - denom_infos = await denom_info( - prev_pool.backend.chain_id, end, ctx.http_session, ctx.denom_map - ) - - denom_cache[end] = { - info[0].chain_id: [i.denom for i in info] - for info in ( - denom_infos - + [ - [ - DenomChainInfo( - denom=end, - chain_id=prev_pool.backend.chain_id, - ) - ] - ] - ) - if len(info) > 0 and info[0].chain_id - } - except asyncio.TimeoutError: - return + denom_infos = await ctx.query_denom_info(prev_pool.backend.chain_id, end) # A pool is a candidate to be a next pool if it has a denom # contained in denom_cache[end] or one of its denoms *is* end @@ -721,38 +823,40 @@ async def next_legs( Leg( ( auction.asset_a - if auction.asset_a() == src or auction.asset_a() == denom + if auction.asset_a() == src + or auction.asset_a() == denom_info.denom else auction.asset_b ), ( auction.asset_a - if auction.asset_a() != src and auction.asset_a() != denom + if auction.asset_a() != src + and auction.asset_a() != denom_info.denom else auction.asset_b ), auction, ) - for denom_set in denom_cache[end].values() - for denom in denom_set - for auction in auctions.get(denom, {}).values() + for denom_info in denom_infos + for auction in auctions.get(denom_info.denom, {}).values() if auction.chain_id != prev_pool.backend.chain_id ), *( Leg( ( pool.asset_a - if pool.asset_a() == src or pool.asset_a() == denom + if pool.asset_a() == src + or pool.asset_a() == denom_info.denom else pool.asset_b ), ( pool.asset_a - if pool.asset_a() != src and pool.asset_a() != denom + if pool.asset_a() != src + and pool.asset_a() != denom_info.denom else pool.asset_b ), pool, ) - for denom_set in denom_cache[end].values() - for denom in denom_set - for pool_set in pools.get(denom, {}).values() + for denom_info in denom_infos + for pool_set in pools.get(denom_info.denom, {}).values() for pool in pool_set if pool.chain_id != prev_pool.backend.chain_id ), @@ -763,8 +867,6 @@ async def next_legs( if len(next_pools) == 0: return - random.shuffle(next_pools) - routes = stream.merge(*[next_legs(path + [pool]) for pool in next_pools]) async with routes.stream() as streamer: @@ -814,7 +916,7 @@ async def recover_funds( ), ) - if not balance_resp: + if balance_resp is None or not isinstance(balance_resp, int): raise ValueError(f"Couldn't get balance for asset {curr_leg.in_asset()}.") if curr_leg.backend.chain_id != backtracked[0].backend.chain_id: @@ -829,18 +931,14 @@ async def recover_funds( to_transfer, ) - resp = await quantities_for_route_profit( - balance_resp, backtracked, r, ctx, seek_profit=False - ) + resp = await quantities_for_route_profit(balance_resp, backtracked, r, ctx) if not resp: raise ValueError("Couldn't get execution plan.") profit, quantities = resp - r = ctx.queue_route( - backtracked, -r.theoretical_profit, -r.expected_profit, quantities - ) + r = ctx.queue_route(backtracked, 0, 0, quantities) ctx.log_route(r, "info", "Executing recovery", []) @@ -862,12 +960,10 @@ async def transfer( succeeded. """ - denom_infos_on_dest = await denom_info_on_chain( + denom_infos_on_dest = await ctx.query_denom_info_on_chain( prev_leg.backend.chain_id, denom, leg.backend.chain_id, - ctx.http_session, - ctx.denom_map, ) if not denom_infos_on_dest: @@ -875,17 +971,25 @@ async def transfer( f"Missing denom info for transfer {denom} ({prev_leg.backend.chain_id}) -> {leg.backend.chain_id}" ) - ibc_route = await denom_route( - prev_leg.backend.chain_id, - denom, - leg.backend.chain_id, - denom_infos_on_dest[0].denom, - ctx.http_session, + ibc_route = await ctx.query_denom_route( + DenomRouteQuery( + src_chain=prev_leg.backend.chain_id, + src_denom=denom, + dest_chain=leg.backend.chain_id, + dest_denom=denom_infos_on_dest.denom, + ) ) if not ibc_route or len(ibc_route) == 0: raise ValueError(f"No route from {denom} to {leg.backend.chain_id}") + ctx.log_route( + route, + "info", + "Got potential transfer route: %s", + [fmt_denom_route_leg(leg) for leg in ibc_route], + ) + src_channel_id = ibc_route[0].channel sender_addr = str( Address(ctx.wallet.public_key(), prefix=ibc_route[0].from_chain.bech32_prefix) diff --git a/src/util.py b/src/util.py index d36124d90..b61b45ce9 100644 --- a/src/util.py +++ b/src/util.py @@ -21,14 +21,9 @@ DENOM_RESOLVER_TIMEOUT_SEC = 5 -# The maximum number of concurrent connections -# that can be open to -MAX_SKIP_CONCURRENT_CALLS = 5 - - # Dictates the maximum number of concurrent calls to the skip # API in searching -DISCOVERY_CONCURRENCY_FACTOR = 20 +DISCOVERY_CONCURRENCY_FACTOR = 15 # Dictates the maximum number of concurrent calls to pool providers @@ -36,11 +31,6 @@ EVALUATION_CONCURRENCY_FACTOR = 10 -# The quantity of a denom below which -# it is no longer worthwhile checking for profit -DENOM_QUANTITY_ABORT_ARB = 500 - - NEUTRON_NETWORK_CONFIG = NetworkConfig( chain_id="neutron-1", url="grpc+http://grpc-kralum.neutron-1.neutron.org:80", @@ -255,6 +245,34 @@ class ChainInfo: pretty_name: str +def load_chain_info(obj: dict[str, Any]) -> ChainInfo: + return ChainInfo( + chain_name=obj["chain_name"], + chain_id=obj["chain_id"], + pfm_enabled=obj["pfm_enabled"], + supports_memo=obj["supports_memo"], + bech32_prefix=obj["bech32_prefix"], + fee_asset=obj["fee_asset"], + chain_type=obj["chain_type"], + pretty_name=obj["pretty_name"], + ) + + +@dataclass +class DenomRouteQuery: + """ + Information identifying a request for a denom route. + """ + + src_chain: str + src_denom: str + dest_chain: str + dest_denom: str + + def __hash__(self) -> int: + return hash((self.src_chain, self.src_denom, self.dest_chain, self.dest_denom)) + + @dataclass class DenomRouteLeg: """ @@ -274,13 +292,27 @@ class DenomRouteLeg: from_chain: ChainInfo to_chain: ChainInfo - denom_in: str - denom_out: str - port: str channel: str +def load_denom_route_leg(obj: dict[str, Any]) -> DenomRouteLeg: + return DenomRouteLeg( + src_chain=obj["src_chain"], + dest_chain=obj["dest_chain"], + src_denom=obj["src_denom"], + dest_denom=obj["dest_denom"], + from_chain=load_chain_info(obj["from_chain"]), + to_chain=load_chain_info(obj["to_chain"]), + port=obj["port"], + channel=obj["channel"], + ) + + +def fmt_denom_route_leg(leg: DenomRouteLeg) -> str: + return f"{leg.src_denom} ({leg.src_chain}) -> {leg.dest_denom} ({leg.dest_chain}) via {leg.channel}/{leg.port}" + + @dataclass class DenomChainInfo: """ @@ -290,205 +322,16 @@ class DenomChainInfo: """ denom: str - chain_id: Optional[str] - - -async def denom_info( - src_chain: str, - src_denom: str, - session: aiohttp.ClientSession, - denom_map: Optional[dict[str, list[dict[str, str]]]] = None, -) -> list[list[DenomChainInfo]]: - """ - Gets a denom's denom and channel on/to other chains. - """ - - if denom_map: - return [ - [ - DenomChainInfo( - denom_info["denom"], - denom_info["chain_id"], - ) - ] - for denom_info in denom_map.get(src_denom, []) - ] - - head = {"accept": "application/json", "content-type": "application/json"} - - async with session.post( - "https://api.skip.money/v1/fungible/assets_from_source", - headers=head, - json={ - "allow_multi_tx": False, - "include_cw20_assets": True, - "source_asset_denom": src_denom, - "source_asset_chain_id": src_chain, - "client_id": "timewave-arb-bot", - }, - ) as resp: - if resp.status != 200: - return [] - - dests = (await resp.json())["dest_assets"] - - def chain_info(chain_id: str, info: dict[str, Any]) -> DenomChainInfo: - info = info["assets"][0] + src_chain_id: str + dest_chain_id: str - return DenomChainInfo(denom=info["denom"], chain_id=chain_id) - return [[chain_info(chain_id, info) for chain_id, info in dests.items()]] - - -async def denom_info_on_chain( - src_chain: str, - src_denom: str, - dest_chain: str, - session: aiohttp.ClientSession, - denom_map: Optional[dict[str, list[dict[str, str]]]] = None, -) -> Optional[list[DenomChainInfo]]: - """ - Gets a neutron denom's denom and channel on/to another chain. - """ - - if denom_map: - return [ - DenomChainInfo( - denom_info["denom"], - denom_info["chain_id"], - ) - for denom_info in denom_map.get(src_denom, []) - if denom_info["chain_id"] == dest_chain - ][:1] - - head = {"accept": "application/json", "content-type": "application/json"} - - async with session.post( - "https://api.skip.money/v1/fungible/assets_from_source", - headers=head, - json={ - "allow_multi_tx": False, - "include_cw20_assets": True, - "source_asset_denom": src_denom, - "source_asset_chain_id": src_chain, - "client_id": "timewave-arb-bot", - }, - ) as resp: - if resp.status != 200: - return None - - dests = (await resp.json())["dest_assets"] - - if dest_chain in dests: - info = dests[dest_chain]["assets"][0] - - return [DenomChainInfo(denom=info["denom"], chain_id=dest_chain)] - - return None - - -async def denom_route( - src_chain: str, - src_denom: str, - dest_chain: str, - dest_denom: str, - session: aiohttp.ClientSession, - denom_map: Optional[dict[str, list[dict[str, str]]]] = None, -) -> Optional[list[DenomRouteLeg]]: - """ - Gets a neutron denom's denom and channel on/to another chain. - """ - - head = {"accept": "application/json", "content-type": "application/json"} - - async with session.post( - "https://api.skip.money/v2/fungible/route", - headers=head, - json={ - "amount_in": "1", - "source_asset_denom": src_denom, - "source_asset_chain_id": src_chain, - "dest_asset_denom": dest_denom, - "dest_asset_chain_id": dest_chain, - "allow_multi_tx": True, - "allow_unsafe": False, - "bridges": ["IBC"], - }, - ) as resp: - if resp.status != 200: - return None - - ops = (await resp.json())["operations"] - - # The transfer includes a swap or some other operation - # we can't handle - if any(("transfer" not in op for op in ops)): - return None - - transfer_info = ops[0]["transfer"] - - from_chain_info = await chain_info( - transfer_info["from_chain_id"], session, denom_map - ) - to_chain_info = await chain_info( - transfer_info["to_chain_id"], session, denom_map - ) - - if not from_chain_info or not to_chain_info: - return None - - return [ - DenomRouteLeg( - src_chain=src_chain, - dest_chain=dest_chain, - src_denom=src_denom, - dest_denom=dest_denom, - from_chain=from_chain_info, - to_chain=to_chain_info, - denom_in=transfer_info["denom_in"], - denom_out=transfer_info["denom_out"], - port=transfer_info["port"], - channel=transfer_info["channel"], - ) - for op in ops - ] - - -async def chain_info( - chain_id: str, - session: aiohttp.ClientSession, - denom_map: Optional[dict[str, list[dict[str, str]]]] = None, -) -> Optional[ChainInfo]: - """ - Gets basic information about a cosmos chain. - """ - - head = {"accept": "application/json", "content-type": "application/json"} - - async with session.get( - f"https://api.skip.money/v2/info/chains?chain_ids={chain_id}", - headers=head, - ) as resp: - if resp.status != 200: - return None - - chains = (await resp.json())["chains"] - - if len(chains) == 0: - return None - - chain = chains[0] - - return ChainInfo( - chain_name=chain["chain_name"], - chain_id=chain["chain_id"], - pfm_enabled=chain["pfm_enabled"], - supports_memo=chain["supports_memo"], - bech32_prefix=chain["bech32_prefix"], - fee_asset=chain["fee_assets"][0]["denom"], - chain_type=chain["chain_type"], - pretty_name=chain["pretty_name"], - ) +def load_denom_chain_info(obj: dict[str, Any]) -> DenomChainInfo: + return DenomChainInfo( + denom=obj["denom"], + src_chain_id=obj["src_chain_id"], + dest_chain_id=obj["dest_chain_id"], + ) @dataclass diff --git a/tests/test_auction.py b/tests/test_auction.py index 5325ed4c6..3987ca4be 100644 --- a/tests/test_auction.py +++ b/tests/test_auction.py @@ -84,7 +84,6 @@ async def test_auction_provider() -> None: * price ) assert liq_estimate - liquidity < 5 - assert liquidity - liq_estimate < 100 @pytest.mark.asyncio diff --git a/tests/test_naive_strategy.py b/tests/test_naive_strategy.py index f24a57a4b..cbf792004 100644 --- a/tests/test_naive_strategy.py +++ b/tests/test_naive_strategy.py @@ -3,11 +3,7 @@ """ import typing -from typing import Any from dataclasses import dataclass -import json -from src.util import custom_neutron_network_config -from src.scheduler import Ctx from src.strategies.util import fmt_route_leg, IBC_TRANSFER_GAS from src.strategies.naive import State, route_gas from src.contracts.pool.osmosis import OsmosisPoolDirectory @@ -15,9 +11,7 @@ from src.contracts.pool.astroport import NeutronAstroportPoolDirectory from src.contracts.auction import AuctionDirectory from src.util import DISCOVERY_CONCURRENCY_FACTOR -from tests.util import deployments -from cosmpy.aerial.client import LedgerClient -from cosmpy.aerial.wallet import LocalWallet +from tests.util import deployments, ctx import pytest import aiohttp import grpc @@ -91,44 +85,9 @@ async def test_fmt_route_leg() -> None: @pytest.mark.asyncio async def test_state_poll() -> None: - net_config, deployments = (None, None) - - with open("net_conf.json", "r", encoding="utf-8") as nf: - net_config = json.load(nf) - - with open("contracts/deployments.json", "r", encoding="utf-8") as f: - deployments = json.load(f) - - async with aiohttp.ClientSession( - connector=aiohttp.TCPConnector(force_close=True, limit_per_host=1), - timeout=aiohttp.ClientTimeout(total=30), - ) as session: - ctx: Ctx[Any] = Ctx( - { - chain_id: [ - LedgerClient( - custom_neutron_network_config(endpoint, chain_id=chain_id) - ) - for endpoint in endpoints["grpc"] - ] - for chain_id, endpoints in net_config.items() - }, - net_config, - LocalWallet.from_mnemonic( - "decorate bright ozone fork gallery riot bus exhaust worth way bone indoor calm squirrel merry zero scheme cotton until shop any excess stage laundry", - prefix="neutron", - ), - {"base_denom": "untrn"}, - None, - False, - session, - [], - deployments, - None, - ) - + async with ctx() as test_ctx: s = State(None) - s.poll(ctx, {}, {}) + s.poll(test_ctx, {}, {}) assert s.balance assert s.balance > 0 diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index 9c59e56a6..62e943a87 100644 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -2,42 +2,23 @@ Tests that the scheduler works as expected. """ -from dataclasses import dataclass -import json -from typing import List, cast, Any -from cosmpy.aerial.client import LedgerClient, NetworkConfig -from cosmpy.aerial.wallet import LocalWallet from src.scheduler import Scheduler, Ctx -from src.util import ( - NEUTRON_NETWORK_CONFIG, - DISCOVERY_CONCURRENCY_FACTOR, - custom_neutron_network_config, -) +from src.util import DISCOVERY_CONCURRENCY_FACTOR from src.contracts.pool.osmosis import OsmosisPoolDirectory from src.contracts.pool.astroport import NeutronAstroportPoolDirectory from src.contracts.pool.provider import PoolProvider from src.contracts.auction import AuctionProvider -from tests.util import deployments +from tests.util import deployments, ctx, State import aiohttp import pytest import grpc pytest_plugins = ("pytest_asyncio",) -# Note: this account has no funds and is not used for anything -TEST_WALLET_MNEMONIC = ( - "update armed valve web gate shiver birth exclude curtain cotton juice property" -) - - -@dataclass -class State: - balance: int - async def strategy( strat_ctx: Ctx[State], - _pools: dict[str, dict[str, List[PoolProvider]]], + _pools: dict[str, dict[str, list[PoolProvider]]], _auctions: dict[str, dict[str, AuctionProvider]], ) -> Ctx[State]: """ @@ -47,86 +28,14 @@ async def strategy( return strat_ctx -def ctx(session: aiohttp.ClientSession) -> Ctx[State]: - """ - Gets a default context for test schedulers. - """ - - endpoints: dict[str, dict[str, list[str]]] = { - "neutron-1": { - "http": ["https://neutron-rest.publicnode.com"], - "grpc": ["grpc+https://neutron-grpc.publicnode.com:443"], - }, - "osmosis-1": { - "http": ["https://lcd.osmosis.zone"], - "grpc": ["grpc+https://osmosis-grpc.publicnode.com:443"], - }, - } - - with open("contracts/deployments.json", encoding="utf-8") as f: - return Ctx( - { - "neutron-1": [ - LedgerClient(NEUTRON_NETWORK_CONFIG), - *[ - LedgerClient(custom_neutron_network_config(endpoint)) - for endpoint in endpoints["neutron-1"]["grpc"] - ], - ], - "osmosis-1": [ - *[ - LedgerClient( - NetworkConfig( - chain_id="osmosis-1", - url=endpoint, - fee_minimum_gas_price=0.0053, - fee_denomination="uosmo", - staking_denomination="uosmo", - ) - ) - for endpoint in endpoints["osmosis-1"]["grpc"] - ], - ], - }, - endpoints, - LocalWallet.from_mnemonic(TEST_WALLET_MNEMONIC, prefix="neutron"), - { - "pool_file": None, - "poll_interval": 120, - "hops": 3, - "pools": 100, - "require_leg_types": set(), - "base_denom": "", - "profit_margin": 100, - "wallet_mnemonic": "", - "cmd": "", - "net_config": "", - "log_file": "", - "history_file": "", - "skip_api_key": None, - }, - None, - False, - session, - [], - cast(dict[str, Any], json.load(f)), - None, - ).with_state(State(1000)) - - @pytest.mark.asyncio async def test_init() -> None: """ Test that a scheduler can be instantiated. """ - async with aiohttp.ClientSession( - connector=aiohttp.TCPConnector( - force_close=True, limit_per_host=DISCOVERY_CONCURRENCY_FACTOR - ), - timeout=aiohttp.ClientTimeout(total=30), - ) as session: - sched = Scheduler(ctx(session), strategy) + async with ctx() as test_ctx: + sched = Scheduler(test_ctx, strategy) assert sched is not None @@ -145,16 +54,17 @@ async def test_register_provider() -> None: osmosis = OsmosisPoolDirectory(deployments(), session) pool = list(list((await osmosis.pools()).values())[0].values())[0] - sched = Scheduler(ctx(session), strategy) + async with ctx() as test_ctx: + sched = Scheduler(test_ctx, strategy) - directory = OsmosisPoolDirectory(deployments(), session) - pools = await directory.pools() + directory = OsmosisPoolDirectory(deployments(), session) + pools = await directory.pools() - for base in pools.values(): - for pool in base.values(): - sched.register_provider(pool) + for base in pools.values(): + for pool in base.values(): + sched.register_provider(pool) - assert len(sched.providers) > 0 + assert len(sched.providers) > 0 @pytest.mark.asyncio @@ -184,7 +94,7 @@ async def test_poll() -> None: async def simple_strategy( strat_ctx: Ctx[State], - pools: dict[str, dict[str, List[PoolProvider]]], + pools: dict[str, dict[str, list[PoolProvider]]], auctions: dict[str, dict[str, AuctionProvider]], ) -> Ctx[State]: assert len(pools) > 0 @@ -192,18 +102,19 @@ async def simple_strategy( return strat_ctx - sched = Scheduler(ctx(session), simple_strategy) + async with ctx() as test_ctx: + sched = Scheduler(test_ctx, simple_strategy) - await sched.register_auctions() - osmos_pools = await osmosis.pools() - astro_pools = await astroport.pools() + await sched.register_auctions() + osmos_pools = await osmosis.pools() + astro_pools = await astroport.pools() - for base in osmos_pools.values(): - for pool in base.values(): - sched.register_provider(pool) + for base in osmos_pools.values(): + for pool in base.values(): + sched.register_provider(pool) - for astro_base in astro_pools.values(): - for astro_pool in astro_base.values(): - sched.register_provider(astro_pool) + for astro_base in astro_pools.values(): + for astro_pool in astro_base.values(): + sched.register_provider(astro_pool) - await sched.poll() + await sched.poll() diff --git a/tests/test_strategy_util.py b/tests/test_strategy_util.py index e1a73be8f..1f4f984b7 100644 --- a/tests/test_strategy_util.py +++ b/tests/test_strategy_util.py @@ -3,7 +3,7 @@ from typing import Any from src.contracts.route import Leg from src.strategies.util import collapse_route, build_atomic_arb -from tests.test_scheduler import TEST_WALLET_MNEMONIC +from tests.util import TEST_WALLET_MNEMONIC from cosmpy.aerial.wallet import LocalWallet diff --git a/tests/test_util.py b/tests/test_util.py index 9f2d91ae4..cf9b1addd 100644 --- a/tests/test_util.py +++ b/tests/test_util.py @@ -2,9 +2,9 @@ Tests that the skip util methods work as expected. """ -from src.util import denom_info, denom_info_on_chain, denom_route +from src.util import DenomRouteQuery +from tests.util import ctx import pytest -import aiohttp pytest_plugins = ("pytest_asyncio",) @@ -15,19 +15,15 @@ async def test_denom_info() -> None: Tests that skip can fetch the destination chains for untrn. """ - async with aiohttp.ClientSession( - connector=aiohttp.TCPConnector(force_close=True, limit_per_host=1), - timeout=aiohttp.ClientTimeout(total=30), - ) as session: - info = await denom_info("neutron-1", "untrn", session) + async with ctx() as test_ctx: + info = await test_ctx.query_denom_info("neutron-1", "untrn") assert info assert len(info) > 0 - assert len(info[0]) > 0 - assert info[0][0].chain_id == "archway-1" + assert info[0].dest_chain_id == "archway-1" assert ( - info[0][0].denom + info[0].denom == "ibc/9E3CDA65E02637E219B43802452D6B37D782F466CF76ECB9F47A2E00C07C4769" ) @@ -38,33 +34,29 @@ async def test_denom_info_on_chain() -> None: Tests that skip can fetch the osmosis destination chain info for untrn. """ - async with aiohttp.ClientSession( - connector=aiohttp.TCPConnector(force_close=True, limit_per_host=1), - timeout=aiohttp.ClientTimeout(total=30), - ) as session: - info = await denom_info_on_chain("neutron-1", "untrn", "osmosis-1", session) + async with ctx() as test_ctx: + info = await test_ctx.query_denom_info_on_chain( + "neutron-1", "untrn", "osmosis-1" + ) assert info - assert len(info) > 0 assert ( - info[0].denom + info.denom == "ibc/126DA09104B71B164883842B769C0E9EC1486C0887D27A9999E395C2C8FB5682" ) - assert info[0].chain_id == "osmosis-1" + assert info.dest_chain_id == "osmosis-1" - info = await denom_info_on_chain( + info = await test_ctx.query_denom_info_on_chain( "neutron-1", "ibc/376222D6D9DAE23092E29740E56B758580935A6D77C24C2ABD57A6A78A1F3955", "osmosis-1", - session, ) assert info - assert len(info) > 0 - assert info[0].denom == "uosmo" - assert info[0].chain_id == "osmosis-1" + assert info.denom == "uosmo" + assert info.dest_chain_id == "osmosis-1" @pytest.mark.asyncio @@ -73,16 +65,15 @@ async def test_denom_route() -> None: Tests that skip can fetch the route for USDC from neutron to osmosis. """ - async with aiohttp.ClientSession( - connector=aiohttp.TCPConnector(force_close=True, limit_per_host=1), - timeout=aiohttp.ClientTimeout(total=30), - ) as session: - info = await denom_route( - "neutron-1", - "ibc/B559A80D62249C8AA07A380E2A2BEA6E5CA9A6F079C912C3A9E9B494105E4F81", - "osmosis-1", - "ibc/498A0751C798A0D9A389AA3691123DADA57DAA4FE165D5C75894505B876BA6E4", - session, + async with ctx() as test_ctx: + + info = await test_ctx.query_denom_route( + DenomRouteQuery( + src_chain="neutron-1", + src_denom="ibc/376222D6D9DAE23092E29740E56B758580935A6D77C24C2ABD57A6A78A1F3955", + dest_chain="osmosis-1", + dest_denom="uosmo", + ) ) assert info @@ -90,12 +81,13 @@ async def test_denom_route() -> None: assert info - info = await denom_route( - "neutron-1", - "ibc/376222D6D9DAE23092E29740E56B758580935A6D77C24C2ABD57A6A78A1F3955", - "osmosis-1", - "uosmo", - session, + info = await test_ctx.query_denom_route( + DenomRouteQuery( + src_chain="neutron-1", + src_denom="ibc/376222D6D9DAE23092E29740E56B758580935A6D77C24C2ABD57A6A78A1F3955", + dest_chain="osmosis-1", + dest_denom="uosmo", + ) ) assert info diff --git a/tests/util.py b/tests/util.py index f48211edc..f3c539d82 100644 --- a/tests/util.py +++ b/tests/util.py @@ -1,5 +1,29 @@ -from typing import Any, cast +from sqlite3 import connect +from asyncio import Semaphore +from typing import Any, cast, AsyncIterator import json +import aiohttp +from dataclasses import dataclass +from contextlib import asynccontextmanager +from cosmpy.aerial.client import LedgerClient, NetworkConfig +from cosmpy.aerial.wallet import LocalWallet +from src.scheduler import Ctx, MAX_SKIP_CONCURRENT_CALLS +from src.util import ( + DISCOVERY_CONCURRENCY_FACTOR, + NEUTRON_NETWORK_CONFIG, + custom_neutron_network_config, +) + + +@dataclass +class State: + balance: int + + +# Note: this account has no funds and is not used for anything +TEST_WALLET_MNEMONIC = ( + "update armed valve web gate shiver birth exclude curtain cotton juice property" +) def deployments() -> dict[str, Any]: @@ -9,3 +33,83 @@ def deployments() -> dict[str, Any]: """ with open("contracts/deployments.json", encoding="utf-8") as f: return cast(dict[str, Any], json.load(f)) + + +@asynccontextmanager +async def ctx() -> AsyncIterator[Ctx[Any]]: + """ + Gets a default context for test schedulers. + """ + + async with aiohttp.ClientSession( + connector=aiohttp.TCPConnector( + force_close=True, limit_per_host=DISCOVERY_CONCURRENCY_FACTOR + ), + timeout=aiohttp.ClientTimeout(total=30), + ) as session: + endpoints: dict[str, dict[str, list[str]]] = { + "neutron-1": { + "http": ["https://neutron-rest.publicnode.com"], + "grpc": ["grpc+https://neutron-grpc.publicnode.com:443"], + }, + "osmosis-1": { + "http": ["https://lcd.osmosis.zone"], + "grpc": ["grpc+https://osmosis-grpc.publicnode.com:443"], + }, + } + + with open("contracts/deployments.json", encoding="utf-8") as f: + yield Ctx( + clients={ + "neutron-1": [ + LedgerClient(NEUTRON_NETWORK_CONFIG), + *[ + LedgerClient(custom_neutron_network_config(endpoint)) + for endpoint in endpoints["neutron-1"]["grpc"] + ], + ], + "osmosis-1": [ + *[ + LedgerClient( + NetworkConfig( + chain_id="osmosis-1", + url=endpoint, + fee_minimum_gas_price=0.0053, + fee_denomination="uosmo", + staking_denomination="uosmo", + ) + ) + for endpoint in endpoints["osmosis-1"]["grpc"] + ], + ], + }, + endpoints=endpoints, + wallet=LocalWallet.from_mnemonic( + TEST_WALLET_MNEMONIC, prefix="neutron" + ), + cli_args={ + "pool_file": None, + "poll_interval": 120, + "hops": 3, + "pools": 100, + "require_leg_types": set(), + "base_denom": "", + "profit_margin": 100, + "wallet_mnemonic": "", + "cmd": "", + "net_config": "", + "log_file": "", + "history_db": "", + "skip_api_key": None, + }, + state=None, + terminated=False, + http_session=session, + db_connection=connect("test_db.db"), + order_history=[], + deployments=cast(dict[str, Any], json.load(f)), + denom_map={}, + denom_routes={}, + chain_info={}, + http_session_lock=Semaphore(MAX_SKIP_CONCURRENT_CALLS), + ).with_state(State(1000))