Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Osmosis Integration Tests + Improvements #27

Open
wants to merge 46 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
752cb4d
Add balance logs to log_route.
dowlandaiello Oct 3, 2024
95a5eae
Make log balance prefix lowercase.
dowlandaiello Oct 3, 2024
8ebba6c
Fix balance log prefix base denom log.
dowlandaiello Oct 3, 2024
b1de2fb
Log next queued leg-related balances in route logs.
dowlandaiello Oct 3, 2024
2a174eb
Display balance information in log prefixes for all in/out per prov.
dowlandaiello Oct 4, 2024
82ad226
Update local-ic docker image.
dowlandaiello Oct 4, 2024
dc80f96
Fix failing tests.
dowlandaiello Oct 4, 2024
caf00b8
Reactive osmosis local-ic integration tests.
dowlandaiello Oct 4, 2024
e2ea392
Centralize all skip queries in context with caching.
dowlandaiello Oct 7, 2024
6e44e5e
Get osmo tests working.
dowlandaiello Oct 14, 2024
7cce893
Fix a small bug in route exec planning probing conditions.
dowlandaiello Oct 14, 2024
a57f3dd
Increase HTTP timeout.
dowlandaiello Oct 14, 2024
4c14798
Cache query_denom_info.
dowlandaiello Oct 14, 2024
f7d577f
Update integration tests target profit values.
dowlandaiello Oct 14, 2024
42da5b4
Update osmo integration test target profit again.
dowlandaiello Oct 14, 2024
fccaa4c
Add extra comments to one-liners in prefixing.
dowlandaiello Oct 14, 2024
ff18c79
Diable rebalancing logs by default.
dowlandaiello Oct 14, 2024
c27a4fc
Skip logging routes with logs disabled.
dowlandaiello Oct 14, 2024
8e39fa2
Increase http timeout again.
dowlandaiello Oct 14, 2024
2903e15
Use skip API v2.
dowlandaiello Oct 14, 2024
4005233
Lower concurrency factor.
dowlandaiello Oct 14, 2024
6a6bb11
Another route planning fix.
dowlandaiello Oct 14, 2024
00e2b26
Ratelimit skip requests, fix CI failing tests.
dowlandaiello Oct 14, 2024
94a188f
Lower HTTP client timeout.
dowlandaiello Oct 14, 2024
3ad374c
Remove extraneous sources of randomness from integration tests.
dowlandaiello Oct 14, 2024
3ba9498
Disable exhaustive rebalancing attempts.
dowlandaiello Oct 14, 2024
b36b319
See previous.
dowlandaiello Oct 14, 2024
454b858
Log rebalancing routes.
dowlandaiello Oct 14, 2024
70b020c
Put a hard stop on maximum iterations per route planning.
dowlandaiello Oct 14, 2024
f1cf81a
Addm ore debug logs to IBC transfers.
dowlandaiello Oct 14, 2024
107e1b4
See previous.
dowlandaiello Oct 14, 2024
8f9bfef
Allow more concurrent skip calls.
dowlandaiello Oct 14, 2024
260408a
Current status: debugging a blocking skip denom chain info call in tr…
dowlandaiello Oct 14, 2024
efb95e3
Fix dealocks, add caching in binary search.
dowlandaiello Oct 15, 2024
ea0d630
Migrate order history to sqlite.
dowlandaiello Oct 16, 2024
d99c185
Create db.py.
dowlandaiello Oct 16, 2024
2bbce5e
Remove autocommit arg to sqlite connect call.
dowlandaiello Oct 16, 2024
7486a56
Restore old route planning algorithm.
dowlandaiello Oct 16, 2024
ad52f28
See previous.
dowlandaiello Oct 16, 2024
b13e9b8
See previous.
dowlandaiello Oct 16, 2024
060385e
Gate log prefixes behind debug log level.
dowlandaiello Oct 17, 2024
93bc2fe
Remove denom balance prefix caching altogether.
dowlandaiello Oct 17, 2024
15f19a9
Track block heights at which legs in orders are submitted.
dowlandaiello Oct 17, 2024
6d05a26
Linter fixes.
dowlandaiello Oct 17, 2024
fda5eac
Add tests for unprofitable osmosis arbs.
dowlandaiello Oct 18, 2024
b1ab9cf
Use source channel endpoint for packet ack checks.
dowlandaiello Oct 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion local-interchaintest/chains/neutron_osmosis_gaia.json
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@
"binary": "osmosisd",
"bech32_prefix": "osmo",
"docker_image": {
"version": "v25.0.4",
"version": "v26.0.2",
"repository": "ghcr.io/strangelove-ventures/heighliner/osmosis"
},
"gas_prices": "0.0025%DENOM%",
Expand Down
13 changes: 6 additions & 7 deletions local-interchaintest/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,6 @@ fn main() -> Result<(), Box<dyn StdError + Send + Sync>> {
.with_test(Box::new(tests::test_unprofitable_arb) as TestFn)
.build()?,
)?
/*
// Test case (astro -> osmo arb):
//
// - Astro: untrn-bruhtoken @ 1.5 bruhtoken/untrn
Expand Down Expand Up @@ -213,7 +212,7 @@ fn main() -> Result<(), Box<dyn StdError + Send + Sync>> {
bruhtoken_osmo.clone(),
Pool::Osmosis(
OsmosisPoolBuilder::default()
.with_funds(bruhtoken_osmo.clone(), 10000000u128)
.with_funds(bruhtoken_osmo.clone(), 100000000u128)
.with_funds(uosmo.clone(), 10000000u128)
.with_weight(bruhtoken_osmo.clone(), 1u128)
.with_weight(uosmo.clone(), 1u128)
Expand All @@ -223,16 +222,16 @@ fn main() -> Result<(), Box<dyn StdError + Send + Sync>> {
.with_pool(
untrn.clone(),
bruhtoken.clone(),
Pool::Astroport(
AstroportPoolBuilder::default()
.with_balance_asset_a(10000000u128)
.with_balance_asset_b(10000000u128)
Pool::Auction(
AuctionPoolBuilder::default()
.with_balance_offer_asset(10000000u128)
.with_price(Decimal::percent(10))
.build()?,
),
)
.with_arbbot()
.with_test(Box::new(tests::test_osmo_arb) as TestFn)
.build()?,
)?*/
)?
.join()
}
2 changes: 1 addition & 1 deletion local-interchaintest/src/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,7 @@ pub fn with_arb_bot_output(test: OwnedTestFn) -> TestResult {
let proc_handle = Arc::new(proc);
let proc_handle_watcher = proc_handle.clone();
let (tx_res, rx_res) = mpsc::channel();
let mut finished = AtomicBool::new(false);
let finished = AtomicBool::new(false);

let test_handle = test.clone();

Expand Down
2 changes: 2 additions & 0 deletions local-interchaintest/tests/transfer_neutron.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ async def main() -> None:
[],
{},
denoms,
{},
{},
)

await transfer_raw(
Expand Down
2 changes: 2 additions & 0 deletions local-interchaintest/tests/transfer_osmosis.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ async def main() -> None:
[],
{},
denoms,
{},
{},
)

await transfer_raw(
Expand Down
14 changes: 10 additions & 4 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import sys
from os import path
import os
from typing import Any, cast, Optional
from typing import Any, cast
from cosmpy.aerial.client import LedgerClient
from cosmpy.aerial.wallet import LocalWallet
from src.scheduler import Scheduler, Ctx
Expand Down Expand Up @@ -105,12 +105,16 @@ async def main() -> None:
f,
)

denom_map: Optional[dict[str, list[dict[str, str]]]] = None
denom_file: dict[str, Any] = {
"denom_map": {},
"denom_routes": {},
"chain_info": {},
}

# If the user has specified a denom map, use that instead of skip
if args.denom_file is not None and path.isfile(args.denom_file):
with open(args.denom_file, "r", encoding="utf-8") as f:
denom_map = json.load(f)
denom_file = json.load(f)

# If the user specified a poolfile, create the poolfile if it is empty
if args.pool_file is not None and not path.isfile(args.pool_file):
Expand Down Expand Up @@ -188,7 +192,9 @@ async def main() -> None:
session,
[],
cast(dict[str, Any], json.load(f)),
denom_map,
denom_file["denom_map"],
denom_file["denom_routes"],
denom_file["chain_info"],
).recover_history()
sched = Scheduler(ctx, strategy)

Expand Down
2 changes: 2 additions & 0 deletions src/contracts/route.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class Route:

uid: int
route: list[LegRepr]
legs: list[Leg]
theoretical_profit: int
expected_profit: int
realized_profit: Optional[int]
Expand Down Expand Up @@ -104,6 +105,7 @@ def load_route(s: str) -> Route:
return Route(
loaded["uid"],
[load_leg_repr(json_leg) for json_leg in loaded["route"]],
[],
loaded["theoretical_profit"],
loaded["expected_profit"],
loaded["realized_profit"],
Expand Down
210 changes: 207 additions & 3 deletions src/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,18 @@
from typing import Callable, List, Self, Optional, Awaitable, Any, TypeVar, Generic
from dataclasses import dataclass
from cosmpy.aerial.client import LedgerClient
from cosmpy.crypto.address import Address
from cosmpy.aerial.wallet import LocalWallet
from src.contracts.auction import AuctionDirectory, AuctionProvider
from src.contracts.route import Route, load_route, LegRepr, Status, Leg
from src.contracts.pool.provider import PoolProvider
from src.util import (
try_multiple_clients,
DenomRouteLeg,
DenomRouteQuery,
ChainInfo,
DenomChainInfo,
)
import aiohttp
import grpc

Expand All @@ -21,6 +29,10 @@
MAX_ROUTE_HISTORY_LEN = 200000


# Length to truncate denoms in balance logs to
DENOM_BALANCE_PREFIX_MAX_DENOM_LEN = 12


TState = TypeVar("TState")


Expand All @@ -41,7 +53,9 @@ class Ctx(Generic[TState]):
http_session: aiohttp.ClientSession
order_history: list[Route]
deployments: dict[str, Any]
denom_map: Optional[dict[str, list[dict[str, str]]]]
denom_map: dict[str, list[DenomChainInfo]]
denom_routes: dict[DenomRouteQuery, list[DenomRouteLeg]]
chain_info: dict[str, ChainInfo]

def with_state(self, state: Any) -> Self:
"""
Expand Down Expand Up @@ -104,6 +118,7 @@ def queue_route(
LegRepr(leg.in_asset(), leg.out_asset(), leg.backend.kind, False)
for leg in route
],
route,
theoretical_profit,
expected_profit,
None,
Expand Down Expand Up @@ -133,14 +148,46 @@ def log_route(
Writes a log to the standard logger and to the log file of a route.
"""

route.logs.append(f"{log_level.upper()} {fmt_string % tuple(args)}")
def asset_balance_prefix(leg: Leg, asset: str) -> Optional[str]:
balance_resp_asset = try_multiple_clients(
self.clients[leg.backend.chain_id],
lambda client: client.query_bank_balance(
Address(
self.wallet.public_key(),
prefix=leg.backend.chain_prefix,
),
asset,
),
)

if not balance_resp_asset:
return None

return f"balance[{leg.backend.chain_id}]({asset[:DENOM_BALANCE_PREFIX_MAX_DENOM_LEN]}): {balance_resp_asset}"

def leg_balance_prefixes(leg: Leg) -> list[str]:
assets = [leg.in_asset(), leg.out_asset()]

return [
x for x in (asset_balance_prefix(leg, asset) for asset in assets) if x
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be nice to get some comments around oneliners like this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

]

prefix = " ".join(
{
prefix
for leg_prefixes in [leg_balance_prefixes(leg) for leg in route.legs]
for prefix in leg_prefixes
}
)

route.logs.append(f"{log_level.upper()} {prefix} {fmt_string % tuple(args)}")

if route.uid >= len(self.order_history) or route.uid < 0:
return

self.order_history[route.uid] = route

fmt_string = f"%s- {fmt_string}"
fmt_string = f"{prefix} %s- {fmt_string}"

if log_level == "info":
logger.info(fmt_string, str(route), *args)
Expand All @@ -155,6 +202,163 @@ def log_route(
if log_level == "debug":
logger.debug(fmt_string, str(route), *args)

async def query_denom_route(
self, query: DenomRouteQuery
) -> Optional[list[DenomRouteLeg]]:
if self.denom_routes and query in self.denom_routes:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we exit early here in case we already have a response stored? would be nice to add a comment

return self.denom_routes[query]

head = {"accept": "application/json", "content-type": "application/json"}

async with self.http_session.post(
"https://api.skip.money/v2/fungible/route",
headers=head,
json={
"amount_in": "1",
"source_asset_denom": query.src_denom,
"source_asset_chain_id": query.src_chain,
"dest_asset_denom": query.dest_denom,
"dest_asset_chain_id": query.dest_chain,
"allow_multi_tx": True,
"allow_unsafe": False,
"bridges": ["IBC"],
},
) as resp:
if resp.status != 200:
return None

ops = (await resp.json())["operations"]

# The transfer includes a swap or some other operation
# we can't handle
if any(("transfer" not in op for op in ops)):
return None

transfer_info = ops[0]["transfer"]

from_chain_info = await self.query_chain_info(
transfer_info["from_chain_id"]
)
to_chain_info = await self.query_chain_info(transfer_info["to_chain_id"])

if not from_chain_info or not to_chain_info:
return None

route = [
DenomRouteLeg(
src_chain=query.src_chain,
dest_chain=query.dest_chain,
src_denom=query.src_denom,
dest_denom=query.dest_denom,
from_chain=from_chain_info,
to_chain=to_chain_info,
denom_in=transfer_info["denom_in"],
denom_out=transfer_info["denom_out"],
port=transfer_info["port"],
channel=transfer_info["channel"],
)
for op in ops
]

self.denom_routes[query] = route

return route

async def query_chain_info(
self,
chain_id: str,
) -> Optional[ChainInfo]:
"""
Gets basic information about a cosmos chain.
"""

if chain_id in self.chain_info:
return self.chain_info[chain_id]

head = {"accept": "application/json", "content-type": "application/json"}

async with self.http_session.get(
f"https://api.skip.money/v2/info/chains?chain_ids={chain_id}",
headers=head,
) as resp:
if resp.status != 200:
return None

chains = (await resp.json())["chains"]

if len(chains) == 0:
return None

chain = chains[0]

chain_info = ChainInfo(
chain_name=chain["chain_name"],
chain_id=chain["chain_id"],
pfm_enabled=chain["pfm_enabled"],
supports_memo=chain["supports_memo"],
bech32_prefix=chain["bech32_prefix"],
fee_asset=chain["fee_assets"][0]["denom"],
chain_type=chain["chain_type"],
pretty_name=chain["pretty_name"],
)

self.chain_info[chain_id] = chain_info

return chain_info

async def query_denom_info_on_chain(
self,
src_chain: str,
src_denom: str,
dest_chain: str,
) -> Optional[DenomChainInfo]:
"""
Gets a neutron denom's denom and channel on/to another chain.
"""

infos = await self.query_denom_info(src_chain, src_denom)

return next((info for info in infos if info.dest_chain_id == dest_chain))

async def query_denom_info(
self,
src_chain: str,
src_denom: str,
) -> list[DenomChainInfo]:
"""
Gets a denom's denom and channel on/to other chains.
"""

if src_denom in self.denom_map:
return self.denom_map[src_denom]

head = {"accept": "application/json", "content-type": "application/json"}

async with self.http_session.post(
"https://api.skip.money/v1/fungible/assets_from_source",
headers=head,
json={
"allow_multi_tx": False,
"include_cw20_assets": True,
"source_asset_denom": src_denom,
"source_asset_chain_id": src_chain,
"client_id": "timewave-arb-bot",
},
) as resp:
if resp.status != 200:
return []

dests = (await resp.json())["dest_assets"]

def chain_info(chain_id: str, info: dict[str, Any]) -> DenomChainInfo:
info = info["assets"][0]

return DenomChainInfo(
src_chain_id=src_chain, denom=info["denom"], dest_chain_id=chain_id
)

return [chain_info(chain_id, info) for chain_id, info in dests.items()]


class Scheduler(Generic[TState]):
"""
Expand Down
Loading
Loading