From bd594c174785b7fe13f7e68950ad0d39fe54f155 Mon Sep 17 00:00:00 2001 From: Collin Brittain Date: Tue, 6 Jun 2023 15:19:38 -0500 Subject: [PATCH 01/11] Delete rust test runner --- orchestrator/Cargo.lock | 32 -- orchestrator/Cargo.toml | 4 +- orchestrator/README.md | 9 +- orchestrator/test_runner/Cargo.toml | 41 -- .../test_runner/src/arbitrary_logic.rs | 17 - orchestrator/test_runner/src/bootstrapping.rs | 117 ----- orchestrator/test_runner/src/happy_path.rs | 472 ------------------ orchestrator/test_runner/src/happy_path_v2.rs | 205 -------- orchestrator/test_runner/src/main.rs | 237 --------- .../test_runner/src/orch_keys_update.rs | 126 ----- .../src/transaction_stress_test.rs | 241 --------- orchestrator/test_runner/src/utils.rs | 186 ------- orchestrator/test_runner/src/valset_stress.rs | 14 - orchestrator/test_runner/startup.sh | 6 - orchestrator/testnet.Dockerfile | 33 -- 15 files changed, 3 insertions(+), 1737 deletions(-) delete mode 100644 orchestrator/test_runner/Cargo.toml delete mode 100644 orchestrator/test_runner/src/arbitrary_logic.rs delete mode 100644 orchestrator/test_runner/src/bootstrapping.rs delete mode 100644 orchestrator/test_runner/src/happy_path.rs delete mode 100644 orchestrator/test_runner/src/happy_path_v2.rs delete mode 100644 orchestrator/test_runner/src/main.rs delete mode 100644 orchestrator/test_runner/src/orch_keys_update.rs delete mode 100644 orchestrator/test_runner/src/transaction_stress_test.rs delete mode 100644 orchestrator/test_runner/src/utils.rs delete mode 100644 orchestrator/test_runner/src/valset_stress.rs delete mode 100644 orchestrator/test_runner/startup.sh delete mode 100644 orchestrator/testnet.Dockerfile diff --git a/orchestrator/Cargo.lock b/orchestrator/Cargo.lock index e9382e7ba..4896d27e8 100644 --- a/orchestrator/Cargo.lock +++ b/orchestrator/Cargo.lock @@ -2339,7 +2339,6 @@ dependencies = [ "orchestrator", "register_delegate_keys", "relayer", - "test_runner", ] [[package]] @@ -4736,37 +4735,6 @@ dependencies = [ "winapi-util", ] -[[package]] -name = "test_runner" -version = "0.1.0" -dependencies = [ - "actix", - "actix-rt 2.5.0", - "actix-web", - "clarity", - "cosmos_gravity", - "deep_space 2.4.7", - "docopt", - "env_logger", - "ethereum_gravity", - "ethers", - "futures", - "gravity_abi", - "gravity_proto", - "gravity_utils", - "hex", - "lazy_static", - "log", - "orchestrator", - "rand 0.8.4", - "serde", - "serde_derive", - "tokio 1.28.0", - "tonic", - "url", - "web30", -] - [[package]] name = "textwrap" version = "0.14.2" diff --git a/orchestrator/Cargo.toml b/orchestrator/Cargo.toml index 686b4f2c6..8518fc237 100644 --- a/orchestrator/Cargo.toml +++ b/orchestrator/Cargo.toml @@ -12,14 +12,13 @@ description = """ """ [workspace] -default-members = ["gorc", "orchestrator", "test_runner"] +default-members = ["gorc", "orchestrator"] members = [ "orchestrator", "cosmos_gravity", "ethereum_gravity", "gravity_utils", "gravity_proto_build", - "test_runner", "gravity_proto", "relayer", "register_delegate_keys", @@ -34,7 +33,6 @@ cosmos_gravity = { path = "./cosmos_gravity" } ethereum_gravity = { path = "./ethereum_gravity" } gravity_utils = { path = "./gravity_utils" } gravity_proto_build = { path = "./gravity_proto_build" } -test_runner = { path = "./test_runner" } gravity_proto = { path = "./gravity_proto" } register_delegate_keys = { path = "./register_delegate_keys" } gorc = { path = "./gorc" } diff --git a/orchestrator/README.md b/orchestrator/README.md index 2c806b834..79abd6d43 100644 --- a/orchestrator/README.md +++ b/orchestrator/README.md @@ -51,10 +51,6 @@ This is to build the relayer logic (i.e. cosmos to ethereum) as a seperate binar Supporting bash scripts for this library -### test_runner/ - -A binary which runs tests against a cosmos chain - ## CLI @@ -67,7 +63,6 @@ client deploy-erc20-representation --cosmos-grpc= --cosmos-prefix= orchestrator --cosmos-phrase= --ethereum-key= --cosmos-grpc= --address-prefix= --ethereum-rpc= --fees= --contract-address= register-delegate-key --validator-phrase= --address-prefix= [--cosmos-phrase=] [--ethereum-key=] --cosmos-grpc= --fees= relayer --ethereum-key= --cosmos-grpc= --address-prefix= --ethereum-rpc= --contract-address= --cosmos-grpc= -test_runner ``` ## PROPOSED @@ -105,7 +100,7 @@ gorc update [name] [new-name] list show [name] - cosmos + cosmos add [name] import [name] [mnemnoic] delete [name] @@ -117,7 +112,7 @@ gorc ```json [gravity] contract = "0x6b175474e89094c44da98b954eedeac495271d0f" - + [ethereum] key = "testkey" rpc = "http://localhost:8545" diff --git a/orchestrator/test_runner/Cargo.toml b/orchestrator/test_runner/Cargo.toml deleted file mode 100644 index abed6c5ec..000000000 --- a/orchestrator/test_runner/Cargo.toml +++ /dev/null @@ -1,41 +0,0 @@ -[package] -name = "test_runner" -version = "0.1.0" -authors = ["Justin Kilpatrick "] -edition = "2018" - -# only becuase I like - more in names -# [[bin]] -# name = "test-runner" -# path = "src/main.rs" - -[dependencies] -ethereum_gravity = {path = "../ethereum_gravity"} -cosmos_gravity = {path = "../cosmos_gravity"} -gravity_abi = { path = "../gravity_abi" } -gravity_utils = {path = "../gravity_utils"} -gravity_proto = {path = "../gravity_proto/"} -orchestrator = {path = "../orchestrator/"} - -deep_space = { git = "https://github.com/iqlusioninc/deep_space/", branch = "master" } -serde_derive = "1.0" -clarity = "0.4.11" -docopt = "1" -ethers = { git = "https://github.com/iqlusioninc/ethers-rs.git", branch = "zaki/error_abi_support", features = ["abigen"] } -serde = "1.0" -actix = "0.12" -actix-web = {version = "3", features=["openssl"]} -actix-rt = "2.5" -lazy_static = "1" -url = "2" -web30 = "0.15.4" -log = "0.4" -env_logger = "0.8" -tokio = "1.4.0" -rand = "0.8" -tonic = "0.4" -futures = "0.3.18" -hex = "0.4.3" - -[features] -ethermint = ["cosmos_gravity/ethermint", "orchestrator/ethermint"] diff --git a/orchestrator/test_runner/src/arbitrary_logic.rs b/orchestrator/test_runner/src/arbitrary_logic.rs deleted file mode 100644 index 6c071d24c..000000000 --- a/orchestrator/test_runner/src/arbitrary_logic.rs +++ /dev/null @@ -1,17 +0,0 @@ -//! This is the testing module for arbitrary logic functionality. This is where instead of managing transfers directly the bridge simply passes an -//! arbitrary call to an arbitrary sub contract along with a specific amount of funds, allowing for execution of whatever command is required - -use crate::TOTAL_TIMEOUT; -use deep_space::Contact; -use ethers::prelude::*; -use gravity_proto::gravity::query_client::QueryClient as GravityQueryClient; -use tokio::time::sleep as delay_for; -use tonic::transport::Channel; - -pub async fn arbitrary_logic_test( - _eth_provider: &Provider, - _grpc_client: GravityQueryClient, - _contact: &Contact, -) { - delay_for(TOTAL_TIMEOUT).await; -} diff --git a/orchestrator/test_runner/src/bootstrapping.rs b/orchestrator/test_runner/src/bootstrapping.rs deleted file mode 100644 index ffb2788ac..000000000 --- a/orchestrator/test_runner/src/bootstrapping.rs +++ /dev/null @@ -1,117 +0,0 @@ -use crate::utils::ValidatorKeys; -use cosmos_gravity::crypto::PrivateKey as CosmosPrivateKey; -use ethers::core::k256::ecdsa::SigningKey; -use ethers::types::Address as EthAddress; -use std::fs::File; -use std::io::{BufRead, BufReader, Read}; - -pub fn parse_ethereum_keys() -> Vec { - let filename = "/testdata/validator-eth-keys"; - let file = File::open(filename).expect("Failed to find eth keys"); - let reader = BufReader::new(file); - let mut ret = Vec::new(); - - for line in reader.lines() { - let line = line.expect("Error reading eth-keys file!"); - let key_hex = hex::decode(line.strip_prefix("0x").unwrap()).unwrap(); - let key: SigningKey = SigningKey::from_bytes(&key_hex).unwrap(); - ret.push(key); - } - ret -} - -pub fn parse_validator_keys() -> Vec { - let filename = "/testdata/validator-phrases"; - parse_phrases(filename) -} - -/// Orchestrator private keys are generated via the gravity key add -/// command just like the validator keys themselves and stored in a -/// similar file /orchestrator-phrases -pub fn parse_orchestrator_keys() -> Vec { - let filename = "/testdata/orchestrator-phrases"; - parse_phrases(filename) -} - -fn parse_phrases(filename: &str) -> Vec { - let file = File::open(filename).expect("Failed to find phrases"); - let reader = BufReader::new(file); - let mut ret = Vec::new(); - - for line in reader.lines() { - let phrase = line.expect("Error reading phrase file!"); - let key = CosmosPrivateKey::from_phrase(&phrase, "").expect("Bad phrase!"); - ret.push(key); - } - ret -} - -pub fn get_keys() -> Vec { - let cosmos_keys = parse_validator_keys(); - let orch_keys = parse_orchestrator_keys(); - let eth_keys = parse_ethereum_keys(); - let mut ret = Vec::new(); - for ((c_key, o_key), e_key) in cosmos_keys.into_iter().zip(orch_keys).zip(eth_keys) { - ret.push(ValidatorKeys { - eth_key: e_key, - validator_key: c_key, - orch_key: o_key, - }) - } - ret -} - -pub struct BootstrapContractAddresses { - pub gravity_contract: EthAddress, - pub erc20_addresses: Vec, - pub uniswap_liquidity_address: Option, -} - -/// Parses the ERC20 and Gravity contract addresses from the file created -/// in deploy_contracts() -pub fn parse_contract_addresses() -> BootstrapContractAddresses { - let mut file = - File::open("/testdata/contracts").expect("Failed to find contracts! did they not deploy?"); - let mut output = String::new(); - file.read_to_string(&mut output).unwrap(); - let mut maybe_gravity_address = None; - let mut erc20_addresses = Vec::new(); - let mut uniswap_liquidity = None; - for line in output.lines() { - if line.contains("Gravity deployed at Address -") { - let address_string = line.split('-').last().unwrap(); - maybe_gravity_address = Some(address_string.trim().parse().unwrap()); - } else if line.contains("ERC20 deployed at Address -") { - let address_string = line.split('-').last().unwrap(); - erc20_addresses.push(address_string.trim().parse().unwrap()); - } else if line.contains("Uniswap Liquidity test deployed at Address - ") { - let address_string = line.split('-').last().unwrap(); - uniswap_liquidity = Some(address_string.trim().parse().unwrap()); - } - } - let gravity_contract: EthAddress = maybe_gravity_address.unwrap(); - BootstrapContractAddresses { - gravity_contract, - erc20_addresses, - uniswap_liquidity_address: uniswap_liquidity, - } -} - -// fn all_paths_exist(input: &[&str]) -> bool { -// for i in input { -// if !Path::new(i).exists() { -// return false; -// } -// } -// true -// } - -// fn return_existing<'a>(a: [&'a str; 2], b: [&'a str; 2]) -> [&'a str; 2] { -// if all_paths_exist(&a) { -// a -// } else if all_paths_exist(&b) { -// b -// } else { -// panic!("No paths exist!") -// } -// } diff --git a/orchestrator/test_runner/src/happy_path.rs b/orchestrator/test_runner/src/happy_path.rs deleted file mode 100644 index 21c1e257d..000000000 --- a/orchestrator/test_runner/src/happy_path.rs +++ /dev/null @@ -1,472 +0,0 @@ -// use crate::get_chain_id; -use crate::get_fee; -use crate::get_gas_price; -use crate::utils::*; -use crate::MINER_ADDRESS; -use crate::MINER_CLIENT; -use crate::OPERATION_TIMEOUT; -use crate::TOTAL_TIMEOUT; -use clarity::Uint256; -use cosmos_gravity::crypto::PrivateKey as CosmosPrivateKey; -use cosmos_gravity::send::send_to_eth; -use cosmos_gravity::{build, query::get_oldest_unsigned_transaction_batch, send}; -use deep_space::address::Address as CosmosAddress; -use deep_space::coin::Coin; -use deep_space::Contact; -use ethereum_gravity::erc20_utils::get_erc20_balance; -use ethereum_gravity::utils::get_valset_nonce; -use ethereum_gravity::{send_to_cosmos::send_to_cosmos, utils::get_tx_batch_nonce}; -use ethers::core::k256::ecdsa::SigningKey; -use ethers::prelude::*; -use ethers::types::Address as EthAddress; -use gravity_proto::gravity::query_client::QueryClient as GravityQueryClient; -use gravity_utils::types::SendToCosmosEvent; -use rand::Rng; -use std::str::FromStr; -use std::time::Duration; -use tokio::time::sleep as delay_for; -use tonic::transport::Channel; - -pub async fn happy_path_test( - grpc_client: GravityQueryClient, - contact: &Contact, - keys: Vec, - gravity_address: EthAddress, - erc20_address: EthAddress, - validator_out: bool, -) { - let mut grpc_client = grpc_client; - - // bootstrapping tests finish here and we move into operational tests - - // send 3 valset updates to make sure the process works back to back - // don't do this in the validator out test because it changes powers - // randomly and may actually make it impossible for that test to pass - // by random re-allocation of powers. If we had 5 or 10 validators - // instead of 3 this wouldn't be a problem. But with 3 not even 1% of - // power can be reallocated to the down validator before things stop - // working. We'll settle for testing that the initial valset (generated - // with the first block) is successfully updated - - if !validator_out { - for _ in 0u32..2 { - test_valset_update(contact, &keys, gravity_address).await; - } - } else { - wait_for_nonzero_valset(gravity_address).await; - } - - // generate an address for coin sending tests, this ensures test imdepotency - let mut rng = rand::thread_rng(); - let secret: [u8; 32] = rng.gen(); - let dest_cosmos_private_key = CosmosPrivateKey::from_secret(&secret); - let dest_cosmos_address = dest_cosmos_private_key - .to_address(CosmosAddress::DEFAULT_PREFIX) - .unwrap(); - let dest_eth_private_key = SigningKey::from_bytes(&secret).unwrap(); - let dest_eth_wallet = LocalWallet::from(dest_eth_private_key.clone()); - let dest_eth_address = dest_eth_wallet.address(); - - // the denom and amount of the token bridged from Ethereum -> Cosmos - // so the denom is the gravity token name - // Send a token 3 times - for _ in 0u32..3 { - test_erc20_deposit( - contact, - dest_cosmos_address, - gravity_address, - erc20_address, - 100u64.into(), - ) - .await; - } - - // We are going to submit a duplicate tx with nonce 1 - // This had better not increase the balance again - // this test may have false positives if the timeout is not - // long enough. TODO check for an error on the cosmos send response - submit_duplicate_erc20_send( - 1u64.into(), - contact, - erc20_address, - 1u64.into(), - dest_cosmos_address, - &keys, - ) - .await; - - // we test a batch by sending a transaction - test_batch( - contact, - &mut grpc_client, - dest_eth_address, - gravity_address, - keys[0].validator_key, - dest_cosmos_private_key, - erc20_address, - ) - .await; -} - -pub async fn wait_for_nonzero_valset(gravity_address: EthAddress) { - match tokio::time::timeout(TOTAL_TIMEOUT, async { - let mut current_eth_valset_nonce = - get_valset_nonce(gravity_address, (*MINER_CLIENT).clone()) - .await - .expect("Failed to get current eth valset"); - - while 0 == current_eth_valset_nonce { - info!("Validator set is not yet updated to >0, waiting"); - current_eth_valset_nonce = get_valset_nonce(gravity_address, (*MINER_CLIENT).clone()) - .await - .expect("Failed to get current eth valset"); - delay_for(Duration::from_secs(4)).await; - } - }) - .await - { - Ok(_) => { - println!("Success") - } - Err(_) => { - panic!("Failed to update validator set"); - } - } -} - -pub async fn test_valset_update( - contact: &Contact, - keys: &[ValidatorKeys], - gravity_address: EthAddress, -) { - // if we don't do this the orchestrators may run ahead of us and we'll be stuck here after - // getting credit for two loops when we did one - let starting_eth_valset_nonce = get_valset_nonce(gravity_address, (*MINER_CLIENT).clone()) - .await - .expect("Failed to get starting eth valset"); - // this is hacky and not really a good way to test validator set updates in a highly - // repeatable fashion. What we really need to do is be aware of the total staking state - // and manipulate the validator set very intentionally rather than kinda blindly like - // we are here. For example the more your run this function the less this fixed amount - // makes any difference, eventually it will fail because the change to the total staked - // percentage is too small. - let mut rng = rand::thread_rng(); - let keys_to_change = rng.gen_range(0..keys.len()); - let keys_to_change = &keys[keys_to_change]; - - let validator_to_change = keys_to_change.validator_key; - let delegate_address = validator_to_change - .to_address("cosmosvaloper") - .unwrap() - .to_string(); - - // should be about 4% of the total power to start - // let amount = crate::STARTING_STAKE_PER_VALIDATOR / 4; // 12.5B - let amount = crate::STAKE_SUPPLY_PER_VALIDATOR / 4; // 25B - let amount = deep_space::Coin { - amount: amount.into(), - denom: "stake".to_string(), - }; - match tokio::time::timeout(TOTAL_TIMEOUT, async { - // now we send a valset request that the orchestrators will pick up on - // in this case we send it as the first validator because they can pay the fee - info!( - "Sending in valset request (starting_eth_valset_nonce {})", - starting_eth_valset_nonce - ); - - info!( - "Delegating {} to {} in order to generate a validator set update", - amount, delegate_address - ); - loop { - let res = contact - .delegate_to_validator( - delegate_address.parse().unwrap(), - amount.clone(), - get_fee(), - keys_to_change.orch_key.into(), - Some(OPERATION_TIMEOUT), - ) - .await; - - if res.is_err() { - warn!("Delegate to validator failed (will retry) {:?}", res); - continue; // retry - } - break; - } - }) - .await - { - Ok(_) => { - info!("Delegated {} to {}", amount, delegate_address); - } - Err(_) => { - panic!("Delegate to validator timed out."); - } - } - - let mut current_eth_valset_nonce = get_valset_nonce(gravity_address, (*MINER_CLIENT).clone()) - .await - .expect("Failed to get current eth valset"); - - match tokio::time::timeout(TOTAL_TIMEOUT, async { - while starting_eth_valset_nonce == current_eth_valset_nonce { - info!( - "Validator set is not yet updated to >{}, waiting", - starting_eth_valset_nonce - ); - current_eth_valset_nonce = get_valset_nonce(gravity_address, (*MINER_CLIENT).clone()) - .await - .expect("Failed to get current eth valset"); - delay_for(Duration::from_secs(4)).await; - } - }) - .await - { - Ok(_) => { - assert!(starting_eth_valset_nonce != current_eth_valset_nonce); - info!("Validator set successfully updated!"); - } - Err(_) => { - panic!("Failed to update validator set"); - } - } -} - -/// this function tests Ethereum -> Cosmos -async fn test_erc20_deposit( - contact: &Contact, - dest: CosmosAddress, - gravity_address: EthAddress, - erc20_address: EthAddress, - amount: U256, -) { - let amount_uint256 = Uint256::from_str(amount.to_string().as_str()).unwrap(); - let start_coin = check_cosmos_balance("gravity", dest, contact).await; - info!( - "Sending to Cosmos from {} to {} with amount {}", - *MINER_ADDRESS, dest, amount - ); - // we send some erc20 tokens to the gravity contract to register a deposit - let tx_id = send_to_cosmos( - erc20_address, - gravity_address, - amount, - dest, - Some(TOTAL_TIMEOUT), - (*MINER_CLIENT).clone(), - ) - .await - .expect("Failed to send tokens to Cosmos"); - info!("Send to Cosmos txid: {:#066x}", tx_id); - - match tokio::time::timeout(TOTAL_TIMEOUT, async { - match ( - start_coin.clone(), - check_cosmos_balance("gravity", dest, contact).await, - ) { - (Some(start_coin), Some(end_coin)) => { - if start_coin.amount + amount_uint256.clone() == end_coin.amount - && start_coin.denom == end_coin.denom - { - info!( - "Successfully bridged ERC20 {}{} to Cosmos! Balance is now {}{}", - amount, start_coin.denom, end_coin.amount, end_coin.denom - ); - } - } - (None, Some(end_coin)) => { - if amount_uint256 == end_coin.amount { - info!( - "Successfully bridged ERC20 {}{} to Cosmos! Balance is now {}{}", - amount, end_coin.denom, end_coin.amount, end_coin.denom - ); - } else { - panic!("Failed to bridge ERC20!") - } - } - _ => {} - } - }) - .await - { - Ok(_) => { - info!("Waiting for ERC20 deposit"); - contact.wait_for_next_block(TOTAL_TIMEOUT).await.unwrap(); - } - Err(_) => { - panic!("Failed to bridge ERC20!"); - } - } -} - -#[allow(clippy::too_many_arguments)] -async fn test_batch( - contact: &Contact, - grpc_client: &mut GravityQueryClient, - dest_eth_address: EthAddress, - gravity_address: EthAddress, - requester_cosmos_private_key: CosmosPrivateKey, - dest_cosmos_private_key: CosmosPrivateKey, - erc20_contract: EthAddress, -) { - let dest_cosmos_address = dest_cosmos_private_key - .to_address(&contact.get_prefix()) - .unwrap(); - let coin = check_cosmos_balance("gravity", dest_cosmos_address, contact) - .await - .unwrap(); - let token_name = coin.denom; - let amount = coin.amount; - - let bridge_denom_fee = Coin { - denom: token_name.clone(), - amount: 1u64.into(), - }; - let amount = amount - 5u64.into(); - info!( - "Sending {}{} from {} on Cosmos back to Ethereum", - amount, token_name, dest_cosmos_address - ); - let res = send_to_eth( - dest_cosmos_private_key, - dest_eth_address, - Coin { - denom: token_name.clone(), - amount: amount.clone(), - }, - bridge_denom_fee.clone(), - (10f64, "footoken".to_string()), - contact, - 1.0, - ) - .await - .unwrap(); - info!("Sent tokens to Ethereum with {:?}", res); - - contact.wait_for_next_block(TOTAL_TIMEOUT).await.unwrap(); - let requester_address = requester_cosmos_private_key - .to_address(&contact.get_prefix()) - .unwrap(); - get_oldest_unsigned_transaction_batch(grpc_client, requester_address) - .await - .expect("Failed to get batch to sign"); - - let mut current_eth_batch_nonce = - get_tx_batch_nonce(gravity_address, erc20_contract, (*MINER_CLIENT).clone()) - .await - .expect("Failed to get current eth valset"); - let starting_batch_nonce = current_eth_batch_nonce; - - match tokio::time::timeout(TOTAL_TIMEOUT, async { - while starting_batch_nonce == current_eth_batch_nonce { - info!( - "Batch is not yet submitted {}>, waiting", - starting_batch_nonce - ); - current_eth_batch_nonce = - get_tx_batch_nonce(gravity_address, erc20_contract, (*MINER_CLIENT).clone()) - .await - .expect("Failed to get current eth tx batch nonce"); - delay_for(Duration::from_secs(4)).await; - } - }) - .await - { - Ok(_) => { - println!("Submitted transaction batch set"); - } - Err(_) => { - panic!("Failed to submit transaction batch set"); - } - } - - let eth_client = (*MINER_CLIENT).clone(); - let tx = TransactionRequest { - from: Some(eth_client.address()), - to: Some(NameOrAddress::Address(dest_eth_address)), - gas: None, - gas_price: None, - value: Some(1_000_000_000_000_000_000u128.into()), - data: Some(Vec::new().into()), - nonce: None, - }; - - let pending_tx = eth_client.send_transaction(tx, None).await.unwrap(); - pending_tx.await.unwrap(); - - let amount_u256 = U256::from_str(amount.to_string().as_str()).unwrap(); - - // we have to send this address one eth so that it can perform contract calls - send_one_eth(dest_eth_address, eth_client.clone()).await; - assert_eq!( - get_erc20_balance(erc20_contract, dest_eth_address, eth_client.clone()) - .await - .unwrap(), - amount_u256 - ); - info!( - "Successfully updated txbatch nonce to {} and sent {}{} tokens to Ethereum!", - current_eth_batch_nonce, amount, token_name - ); -} - -// this function submits a EthereumBridgeDepositClaim to the module with a given nonce. This can be set to be a nonce that has -// already been submitted to test the nonce functionality. -#[allow(clippy::too_many_arguments)] -async fn submit_duplicate_erc20_send( - nonce: U256, - contact: &Contact, - erc20_address: EthAddress, - amount: U256, - receiver: CosmosAddress, - keys: &[ValidatorKeys], -) { - let start_coin = check_cosmos_balance("gravity", receiver, contact) - .await - .expect("Did not find coins!"); - - let ethereum_sender = "0x912fd21d7a69678227fe6d08c64222db41477ba0" - .parse() - .unwrap(); - let event = SendToCosmosEvent { - event_nonce: nonce, - block_height: 500u16.into(), - erc20: erc20_address, - sender: ethereum_sender, - destination: receiver, - amount, - }; - - // iterate through all validators and try to send an event with duplicate nonce - for k in keys.iter() { - let cosmos_key = k.validator_key; - - let messages = build::ethereum_event_messages( - contact, - cosmos_key, - vec![event.clone()], - vec![], - vec![], - vec![], - vec![], - ); - - let gas_price = get_gas_price(); - let res = send::send_messages(contact, cosmos_key, gas_price, messages, 1.0).await; - let res = res.unwrap(); - trace!("Submitted duplicate sendToCosmos event: {:?}", res); - } - - if let Some(end_coin) = check_cosmos_balance("gravity", receiver, contact).await { - if start_coin.amount == end_coin.amount && start_coin.denom == end_coin.denom { - info!("Successfully failed to duplicate ERC20!"); - } else { - panic!("Duplicated ERC20!") - } - } else { - panic!("Duplicate test failed for unknown reasons!"); - } -} diff --git a/orchestrator/test_runner/src/happy_path_v2.rs b/orchestrator/test_runner/src/happy_path_v2.rs deleted file mode 100644 index 8421da3ac..000000000 --- a/orchestrator/test_runner/src/happy_path_v2.rs +++ /dev/null @@ -1,205 +0,0 @@ -//! This is the happy path test for Cosmos to Ethereum asset transfers, meaning assets originated on Cosmos -use crate::utils::get_user_key; -use crate::utils::send_one_eth; -use crate::MINER_CLIENT; -use crate::TOTAL_TIMEOUT; -use crate::{get_fee, utils::ValidatorKeys}; -use clarity::Uint256; -use cosmos_gravity::send::send_to_eth; -use deep_space::coin::Coin; -use deep_space::Contact; -use ethereum_gravity::erc20_utils::get_erc20_balance; -use ethereum_gravity::{deploy_erc20::deploy_erc20, utils::get_event_nonce}; -use ethers::prelude::*; -use ethers::types::Address as EthAddress; -use gravity_proto::gravity::{ - query_client::QueryClient as GravityQueryClient, DenomToErc20Request, -}; -use gravity_utils::ethereum::downcast_to_u64; -use std::str::FromStr; -use std::sync::Arc; -use tonic::transport::Channel; - -pub async fn happy_path_test_v2( - eth_provider: &Provider, - grpc_client: GravityQueryClient, - contact: &Contact, - keys: Vec, - gravity_address: EthAddress, -) { - let mut grpc_client = grpc_client; - let eth_wallet = LocalWallet::from(keys[0].eth_key.clone()); - let provider = eth_provider.clone(); - let chain_id = provider - .get_chainid() - .await - .expect("Could not retrieve chain ID"); - let chain_id = downcast_to_u64(chain_id).expect("Chain ID overflowed when downcasting to u64"); - let eth_client = Arc::new(SignerMiddleware::new( - provider, - eth_wallet.with_chain_id(chain_id), - )); - let starting_event_nonce = get_event_nonce(gravity_address, eth_client.clone()) - .await - .unwrap(); - - let token_to_send_to_eth = "footoken".to_string(); - let token_to_send_to_eth_display_name = "mfootoken".to_string(); - - deploy_erc20( - token_to_send_to_eth.clone(), - token_to_send_to_eth_display_name.clone(), - token_to_send_to_eth_display_name.clone(), - 6, - gravity_address, - Some(TOTAL_TIMEOUT), - 1.0, - eth_client.clone(), - ) - .await - .unwrap(); - let ending_event_nonce = get_event_nonce(gravity_address, eth_client.clone()) - .await - .unwrap(); - - assert!(starting_event_nonce != ending_event_nonce); - info!( - "Successfully deployed new ERC20 representing FooToken on Cosmos with event nonce {}", - ending_event_nonce - ); - // the erc20 representing the cosmos asset on Ethereum - let erc20_contract = match tokio::time::timeout(TOTAL_TIMEOUT, async { - loop { - let res = grpc_client - .denom_to_erc20(DenomToErc20Request { - denom: token_to_send_to_eth.clone(), - }) - .await; - - if let Ok(res) = res { - break res; - } - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - } - }) - .await - { - Ok(res) => { - let erc20 = res.into_inner().erc20; - info!( - "Successfully adopted {} token contract of {}", - token_to_send_to_eth, erc20 - ); - Some(erc20) - } - Err(_) => { - panic!( - "Cosmos did not adopt the ERC20 contract for {} it must be invalid in some way", - token_to_send_to_eth - ); - } - }; - - let erc20_contract: EthAddress = erc20_contract.unwrap().parse().unwrap(); - - // one foo token - let amount_to_bridge: Uint256 = 1_000_000u64.into(); - let send_to_user_coin = Coin { - denom: token_to_send_to_eth.clone(), - amount: amount_to_bridge.clone() + 100u8.into(), - }; - let send_to_eth_coin = Coin { - denom: token_to_send_to_eth.clone(), - amount: amount_to_bridge.clone(), - }; - - let user = get_user_key(); - // send the user some footoken - contact - .send_tokens( - send_to_user_coin.clone(), - Some(get_fee()), - user.cosmos_address, - keys[0].validator_key.into(), - Some(TOTAL_TIMEOUT), - ) - .await - .unwrap(); - - let balances = contact.get_balances(user.cosmos_address).await.unwrap(); - let mut found = false; - for coin in balances { - if coin.denom == token_to_send_to_eth.clone() { - found = true; - break; - } - } - if !found { - panic!( - "Failed to send {} to the user address", - token_to_send_to_eth - ); - } - info!( - "Sent some {} to user address {}", - token_to_send_to_eth, user.cosmos_address - ); - // send the user some eth, they only need this to check their - // erc20 balance, so a pretty minor usecase - send_one_eth(user.eth_address, (*MINER_CLIENT).clone()).await; - info!("Sent 1 eth to user address {}", user.eth_address); - - let res = send_to_eth( - user.cosmos_key, - user.eth_address, - send_to_eth_coin, - get_fee(), - (10f64, "footoken".to_string()), - contact, - 1.0, - ) - .await - .unwrap(); - info!("Send to eth res {:?}", res); - info!( - "Locked up {} {} to send to Cosmos", - amount_to_bridge, token_to_send_to_eth - ); - - info!("Waiting for batch to be signed and relayed to Ethereum"); - match tokio::time::timeout(TOTAL_TIMEOUT, async { - loop { - let balance = - get_erc20_balance(erc20_contract, user.eth_address, (*MINER_CLIENT).clone()).await; - if balance.is_err() { - continue; - } - let balance = balance.unwrap(); - let balance = Uint256::from_str(balance.to_string().as_str()).unwrap(); - if balance == amount_to_bridge { - break; - } else if balance != 0u8.into() { - panic!( - "Expected {} {} but got {} instead", - amount_to_bridge, token_to_send_to_eth, balance - ); - } - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - } - }) - .await - { - Ok(_) => { - info!( - "Successfully bridged {} Cosmos asset {} to Ethereum!", - amount_to_bridge, token_to_send_to_eth - ); - } - Err(_) => { - panic!( - "An error occured while bridging {} Cosmos asset {} to Ethereum!", - amount_to_bridge, token_to_send_to_eth - ); - } - } -} diff --git a/orchestrator/test_runner/src/main.rs b/orchestrator/test_runner/src/main.rs deleted file mode 100644 index 3e1795389..000000000 --- a/orchestrator/test_runner/src/main.rs +++ /dev/null @@ -1,237 +0,0 @@ -//! this crate, namely runs all up integration tests of the Gravity code against -//! several scenarios, happy path and non happy path. This is essentially meant -//! to be executed in our specific CI docker container and nowhere else. If you -//! find some function useful pull it up into the more general gravity_utils or the like - -#[macro_use] -extern crate log; -#[macro_use] -extern crate lazy_static; - -use crate::bootstrapping::*; -use crate::utils::*; -use arbitrary_logic::arbitrary_logic_test; -use clarity::Uint256; -use cosmos_gravity::utils::wait_for_cosmos_online; -use deep_space::coin::Coin; -use deep_space::Address as CosmosAddress; -use deep_space::Contact; -use ethereum_gravity::types::EthClient; -use ethers::core::k256::ecdsa::SigningKey; -use ethers::prelude::*; -use ethers::providers::Provider; -use ethers::types::Address as EthAddress; -use gravity_proto::gravity::query_client::QueryClient as GravityQueryClient; -use gravity_utils::ethereum::hex_str_to_bytes; -use happy_path::happy_path_test; -use happy_path_v2::happy_path_test_v2; -use orch_keys_update::orch_keys_update; -use std::convert::TryFrom; -use std::{env, sync::Arc, time::Duration}; -use transaction_stress_test::transaction_stress_test; -use valset_stress::validator_set_stress_test; - -mod arbitrary_logic; -mod bootstrapping; -mod happy_path; -mod happy_path_v2; -mod orch_keys_update; -mod transaction_stress_test; -mod utils; -mod valset_stress; - -/// the timeout for individual requests -const OPERATION_TIMEOUT: Duration = Duration::from_secs(30); -/// the timeout for the total system -const TOTAL_TIMEOUT: Duration = Duration::from_secs(300); - -lazy_static! { - static ref COSMOS_NODE_GRPC: String = - env::var("COSMOS_NODE_GRPC").unwrap_or_else(|_| "http://localhost:9090".to_owned()); - static ref COSMOS_NODE_ABCI: String = - env::var("COSMOS_NODE_ABCI").unwrap_or_else(|_| "http://localhost:26657".to_owned()); - static ref ETH_NODE: String = - env::var("ETH_NODE").unwrap_or_else(|_| "http://localhost:8545".to_owned()); -} - -/// this value reflects the contents of /tests/container-scripts/setup-validator.sh -/// and is used to compute if a stake change is big enough to trigger a validator set -/// update since we want to make several such changes intentionally -pub const STAKE_SUPPLY_PER_VALIDATOR: u128 = 100000000000; // 100B -/// this is the amount each validator bonds at startup -pub const STARTING_STAKE_PER_VALIDATOR: u128 = STAKE_SUPPLY_PER_VALIDATOR / 2; // 50B - -lazy_static! { - // this key is the private key for the public key defined in tests/assets/ETHGenesis.json - // where the full node / miner sends its rewards. Therefore it's always going - // to have a lot of ETH to pay for things like contract deployments - static ref MINER_PRIVATE_KEY: SigningKey = - SigningKey::from_bytes(hex_str_to_bytes( - "0xb1bab011e03a9862664706fc3bbaa1b16651528e5f0e7fbfcbfdd8be302a13e7").unwrap().as_slice() - ).unwrap(); - static ref MINER_WALLET: LocalWallet = LocalWallet::from((*MINER_PRIVATE_KEY).clone()); - static ref MINER_ADDRESS: EthAddress = (*MINER_WALLET).address(); - static ref MINER_PROVIDER: Provider = Provider::::try_from((*ETH_NODE).clone()).unwrap(); - static ref MINER_SIGNER: SignerMiddleware, LocalWallet> = - SignerMiddleware::new((*MINER_PROVIDER).clone(), (*MINER_WALLET).clone()); - static ref MINER_CLIENT: EthClient = Arc::new((*MINER_SIGNER).clone()); - -} - -/// Gets the standard non-token fee for the testnet. We deploy the test chain with STAKE -/// and FOOTOKEN balances by default, one footoken is sufficient for any Cosmos tx fee except -/// fees for send_to_eth messages which have to be of the same bridged denom so that the relayers -/// on the Ethereum side can be paid in that token. -pub fn get_fee() -> Coin { - Coin { - denom: get_test_token_name(), - amount: 1u32.into(), - } -} - -pub fn get_gas_price() -> (f64, String) { - (1f64, get_test_token_name()) -} - -pub fn get_test_token_name() -> String { - "footoken".to_string() -} - -pub fn get_chain_id() -> String { - "gravity-test".to_string() -} - -pub fn one_eth() -> U256 { - 1000000000000000000u128.into() -} - -pub fn one_hundred_eth() -> U256 { - (1000000000000000000u128 * 100).into() -} - -pub fn one_hundred_eth_uint256() -> Uint256 { - (1000000000000000000u128 * 100).into() -} - -// pub fn should_deploy_contracts() -> bool { -// match env::var("DEPLOY_CONTRACTS") { -// Ok(s) => s == "1" || s.to_lowercase() == "yes" || s.to_lowercase() == "true", -// _ => false, -// } -// } - -#[tokio::main] -pub async fn main() { - env_logger::init(); - - info!("Staring Gravity test-runner"); - - let contact = Contact::new( - COSMOS_NODE_GRPC.as_str(), - OPERATION_TIMEOUT, - CosmosAddress::DEFAULT_PREFIX, - ) - .unwrap(); - info!("COSMOS_NODE_GRPC {}", COSMOS_NODE_GRPC.as_str()); - info!("Waiting for Cosmos chain to come online"); - wait_for_cosmos_online(&contact, TOTAL_TIMEOUT).await; - - let grpc_client = GravityQueryClient::connect(COSMOS_NODE_GRPC.as_str()) - .await - .unwrap(); - let eth_provider = Provider::::try_from((*ETH_NODE).clone()).unwrap(); - let keys = get_keys(); - - // // if we detect this env var we are only deploying contracts, do that then exit. - // if should_deploy_contracts() { - // info!("test-runner in contract deploying mode, deploying contracts, then exiting"); - // deploy_contracts(&contact).await; - // return; - // } - - let contracts = parse_contract_addresses(); - // the address of the deployed Gravity contract - let gravity_address = contracts.gravity_contract; - // addresses of deployed ERC20 token contracts to be used for testing - let erc20_addresses = contracts.erc20_addresses; - - assert!(check_cosmos_balance( - &get_test_token_name(), - keys[0] - .validator_key - .to_address(&contact.get_prefix()) - .unwrap(), - &contact - ) - .await - .is_some()); - - // This segment contains optional tests, by default we run a happy path test - // this tests all major functionality of Gravity once or twice. - // VALSET_STRESS sends in 1k valsets to sign and update - // BATCH_STRESS fills several batches and executes an out of order batch - // VALIDATOR_OUT simulates a validator not participating in the happy path test - // V2_HAPPY_PATH runs the happy path tests but focusing on moving Cosmos assets to Ethereum - // ARBITRARY_LOGIC tests the arbitrary logic functionality, where an arbitrary contract call - // is created and deployed vai the bridge. - let test_type = env::var("TEST_TYPE"); - info!("Starting tests with {:?}", test_type); - if let Ok(test_type) = test_type { - if test_type == "VALIDATOR_OUT" { - info!("Starting Validator out test"); - happy_path_test( - grpc_client, - &contact, - keys, - gravity_address, - erc20_addresses[0], - true, - ) - .await; - return; - } else if test_type == "BATCH_STRESS" { - info!("Starting batch stress test"); - let contact = Contact::new( - COSMOS_NODE_GRPC.as_str(), - TOTAL_TIMEOUT, - CosmosAddress::DEFAULT_PREFIX, - ) - .unwrap(); - transaction_stress_test( - ð_provider, - &contact, - keys, - gravity_address, - erc20_addresses, - ) - .await; - return; - } else if test_type == "VALSET_STRESS" { - info!("Starting valset stress test"); - validator_set_stress_test(&contact, keys, gravity_address).await; - return; - } else if test_type == "V2_HAPPY_PATH" { - info!("Starting happy path for Gravity v2"); - happy_path_test_v2(ð_provider, grpc_client, &contact, keys, gravity_address).await; - return; - } else if test_type == "ARBITRARY_LOGIC" { - info!("Starting arbitrary logic tests!"); - arbitrary_logic_test(ð_provider, grpc_client, &contact).await; - return; - } else if test_type == "ORCHESTRATOR_KEYS" { - info!("Starting orchestrator key update tests!"); - orch_keys_update(grpc_client, &contact, keys).await; - return; - } - } - info!("Starting Happy path test"); - happy_path_test( - grpc_client, - &contact, - keys, - gravity_address, - erc20_addresses[0], - false, - ) - .await; -} diff --git a/orchestrator/test_runner/src/orch_keys_update.rs b/orchestrator/test_runner/src/orch_keys_update.rs deleted file mode 100644 index 623957ff3..000000000 --- a/orchestrator/test_runner/src/orch_keys_update.rs +++ /dev/null @@ -1,126 +0,0 @@ -//! This test verifies that live updating of orchestrator keys works correctly - -use crate::utils::ValidatorKeys; -use cosmos_gravity::crypto::PrivateKey as CosmosPrivateKey; -use cosmos_gravity::send::update_gravity_delegate_addresses; -use deep_space::address::Address as CosmosAddress; -use deep_space::Contact; -use ethers::types::Address as EthAddress; -use ethers::{core::k256::ecdsa::SigningKey, prelude::*}; -use gravity_proto::gravity::{ - query_client::QueryClient as GravityQueryClient, DelegateKeysByEthereumSignerRequest, - DelegateKeysByOrchestratorRequest, -}; -use gravity_utils::ethereum::format_eth_address; -use rand::Rng; -use std::time::Duration; -use tonic::transport::Channel; - -const BLOCK_TIMEOUT: Duration = Duration::from_secs(30); - -pub async fn orch_keys_update( - grpc_client: GravityQueryClient, - contact: &Contact, - keys: Vec, -) { - let mut keys = keys; - let mut grpc_client = grpc_client; - // just to test that we have the right keys from the gentx - info!("About to check already set delegate addresses"); - for k in keys.iter() { - let eth_address = LocalWallet::from(k.eth_key.clone()).address(); - let orch_address = k.orch_key.to_address(&contact.get_prefix()).unwrap(); - let eth_response = grpc_client - .delegate_keys_by_ethereum_signer(DelegateKeysByEthereumSignerRequest { - ethereum_signer: format_eth_address(eth_address), - }) - .await - .unwrap() - .into_inner(); - - let parsed_response_orch_address: CosmosAddress = - eth_response.orchestrator_address.parse().unwrap(); - assert_eq!(parsed_response_orch_address, orch_address); - - let orchestrator_response = grpc_client - .delegate_keys_by_orchestrator(DelegateKeysByOrchestratorRequest { - orchestrator_address: orch_address.to_string(), - }) - .await - .unwrap() - .into_inner(); - - let parsed_response_eth_address: EthAddress = - orchestrator_response.ethereum_signer.parse().unwrap(); - assert_eq!(parsed_response_eth_address, eth_address); - } - - info!("Starting with {:?}", keys); - - // now we change them all - for k in keys.iter_mut() { - let mut rng = rand::thread_rng(); - let secret: [u8; 32] = rng.gen(); - // generate some new keys to replace the old ones - let ethereum_key = SigningKey::from_bytes(&secret).unwrap(); - let ethereum_wallet = LocalWallet::from(ethereum_key.clone()); - let cosmos_key = CosmosPrivateKey::from_secret(&secret); - // update the keys in the key list - k.eth_key = ethereum_key; - k.orch_key = cosmos_key; - let cosmos_address = cosmos_key.to_address(&contact.get_prefix()).unwrap(); - - info!( - "Signing and submitting Delegate addresses {} for validator {}", - format_eth_address(ethereum_wallet.address()), - cosmos_address, - ); - // send in the new delegate keys signed by the validator address - update_gravity_delegate_addresses( - contact, - ethereum_wallet.address(), - cosmos_address, - k.validator_key, - ethereum_wallet, - (0f64, "".to_string()), - 2.0, - ) - .await - .expect("Failed to set delegate addresses!"); - } - - contact.wait_for_next_block(BLOCK_TIMEOUT).await.unwrap(); - - // TODO registering is too unreliable right now for confusing reasons, revisit with prototx - - // info!("About to check changed delegate addresses"); - // // verify that the change has taken place - // for k in keys.iter() { - // let eth_address = k.eth_key.to_public_key().unwrap(); - // let orch_address = k.orch_key.to_public_key().unwrap().to_address(); - - // let orchestrator_response = grpc_client - // .get_delegate_key_by_orchestrator(QueryDelegateKeysByOrchestratorAddress { - // orchestrator_address: orch_address.to_string(), - // }) - // .await - // .unwrap() - // .into_inner(); - - // let parsed_response_eth_address: EthAddress = - // orchestrator_response.eth_address.parse().unwrap(); - // assert_eq!(parsed_response_eth_address, eth_address); - - // let eth_response = grpc_client - // .get_delegate_key_by_eth(QueryDelegateKeysByEthAddress { - // eth_address: eth_address.to_string(), - // }) - // .await - // .unwrap() - // .into_inner(); - - // let parsed_response_orch_address: CosmosAddress = - // eth_response.orchestrator_address.parse().unwrap(); - // assert_eq!(parsed_response_orch_address, orch_address); - // } -} diff --git a/orchestrator/test_runner/src/transaction_stress_test.rs b/orchestrator/test_runner/src/transaction_stress_test.rs deleted file mode 100644 index c7252787a..000000000 --- a/orchestrator/test_runner/src/transaction_stress_test.rs +++ /dev/null @@ -1,241 +0,0 @@ -use crate::{ - one_eth, one_hundred_eth, one_hundred_eth_uint256, utils::*, MINER_CLIENT, TOTAL_TIMEOUT, -}; -use clarity::Uint256; -use cosmos_gravity::send::send_to_eth; -use deep_space::coin::Coin; -use deep_space::Contact; -use ethereum_gravity::{ - erc20_utils::get_erc20_balance, send_to_cosmos::send_to_cosmos, utils::get_tx_batch_nonce, -}; -use ethers::prelude::*; -use ethers::types::Address as EthAddress; -use futures::future::join_all; -use gravity_utils::ethereum::downcast_to_u64; -use std::{collections::HashSet, str::FromStr, sync::Arc, time::Duration}; - -const TIMEOUT: Duration = Duration::from_secs(120); - -/// The number of users we will be simulating for this test, each user -/// will get one token from each token type in erc20_addresses and send it -/// across the bridge to Cosmos as a deposit and then send it back to a different -/// Ethereum address in a transaction batch -/// So the total number of -/// Ethereum sends = (2 * NUM_USERS) -/// ERC20 sends = (erc20_addresses.len() * NUM_USERS) -/// Gravity Deposits = (erc20_addresses.len() * NUM_USERS) -/// Batches executed = erc20_addresses.len() * (NUM_USERS / 100) -const NUM_USERS: usize = 100; - -/// Perform a stress test by sending thousands of -/// transactions and producing large batches -#[allow(clippy::too_many_arguments)] -pub async fn transaction_stress_test( - eth_provider: &Provider, - contact: &Contact, - keys: Vec, - gravity_address: EthAddress, - erc20_addresses: Vec, -) { - // Generate 100 user keys to send ETH and multiple types of tokens - let mut user_keys = Vec::new(); - for _ in 0..NUM_USERS { - user_keys.push(get_user_key()); - } - // the sending eth addresses need Ethereum to send ERC20 tokens to the bridge - let sending_eth_addresses: Vec = user_keys.iter().map(|i| i.eth_address).collect(); - // the destination eth addresses need Ethereum to perform a contract call and get their erc20 balances - let dest_eth_addresses: Vec = - user_keys.iter().map(|i| i.eth_dest_address).collect(); - let mut eth_destinations = Vec::new(); - eth_destinations.extend(sending_eth_addresses.clone()); - eth_destinations.extend(dest_eth_addresses); - send_eth_bulk(one_eth(), ð_destinations, (*MINER_CLIENT).clone()).await; - info!("Sent {} addresses 1 ETH", NUM_USERS); - - // now we need to send all the sending eth addresses erc20's to send - for token in erc20_addresses.iter() { - send_erc20_bulk( - one_hundred_eth(), - *token, - &sending_eth_addresses, - (*MINER_CLIENT).clone(), - ) - .await; - info!("Sent {} addresses 100 {}", NUM_USERS, token); - } - for token in erc20_addresses.iter() { - let mut sends = Vec::new(); - for keys in user_keys.iter() { - let eth_wallet = LocalWallet::from(keys.eth_key.clone()); - let provider = eth_provider.clone(); - let chain_id = provider - .get_chainid() - .await - .expect("Could not retrieve chain ID"); - let chain_id = - downcast_to_u64(chain_id).expect("Chain ID overflowed when downcasting to u64"); - let eth_client = Arc::new(SignerMiddleware::new( - provider, - eth_wallet.with_chain_id(chain_id), - )); - let fut = send_to_cosmos( - *token, - gravity_address, - one_hundred_eth(), - keys.cosmos_address, - Some(TIMEOUT), - eth_client.clone(), - ); - sends.push(fut); - } - let results = join_all(sends).await; - for result in results { - result.unwrap(); - } - info!( - "Locked 100 {} from {} into the Gravity Ethereum Contract", - token, NUM_USERS - ); - } - - let mut good = true; - match tokio::time::timeout(TOTAL_TIMEOUT, async { - loop { - good = true; - for keys in user_keys.iter() { - let c_addr = keys.cosmos_address; - let balances = contact.get_balances(c_addr).await.unwrap(); - for token in erc20_addresses.iter() { - let mut found = false; - for balance in balances.iter() { - if balance.denom.contains(&token.to_string()) - && balance.amount == one_hundred_eth_uint256() - { - found = true; - } - } - if !found { - good = false; - } - } - } - if good { - break; - } - tokio::time::sleep(Duration::from_secs(5)).await; - } - }) - .await - { - Ok(_) => { - info!( - "All {} deposits bridged to Cosmos successfully!", - user_keys.len() * erc20_addresses.len() - ); - } - Err(_) => { - panic!( - "Failed to perform all {} deposits to Cosmos!", - user_keys.len() * erc20_addresses.len() - ); - } - } - - let send_amount = one_hundred_eth_uint256() - 500u16.into(); - - let mut denoms = HashSet::new(); - for token in erc20_addresses.iter() { - let mut futs = Vec::new(); - for keys in user_keys.iter() { - let c_addr = keys.cosmos_address; - let c_key = keys.cosmos_key; - let e_dest_addr = keys.eth_dest_address; - let balances = contact.get_balances(c_addr).await.unwrap(); - // this way I don't have to hardcode a denom and we can change the way denoms are formed - // without changing this test. - let mut send_coin = None; - for balance in balances { - if balance.denom.contains(&token.to_string()) { - send_coin = Some(balance.clone()); - denoms.insert(balance.denom); - } - } - let mut send_coin = send_coin.unwrap(); - send_coin.amount = send_amount.clone(); - let send_fee = Coin { - denom: send_coin.denom.clone(), - amount: 1u8.into(), - }; - let res = send_to_eth( - c_key, - e_dest_addr, - send_coin, - send_fee, - (0f64, "".to_string()), - contact, - 1.0, - ); - futs.push(res); - } - let results = join_all(futs).await; - for result in results { - let result = result.unwrap(); - trace!("SendToEth result {:?}", result); - } - info!( - "Successfully placed {} {} into the tx pool", - NUM_USERS, token - ); - } - - match tokio::time::timeout(TOTAL_TIMEOUT, async { - loop { - good = true; - for keys in user_keys.iter() { - let e_dest_addr = keys.eth_dest_address; - for token in erc20_addresses.iter() { - let bal = get_erc20_balance(*token, e_dest_addr, (*MINER_CLIENT).clone()) - .await - .unwrap(); - let bal = Uint256::from_str(bal.to_string().as_str()).unwrap(); - if bal != send_amount.clone() { - good = false; - } - } - } - if good { - break; - } - tokio::time::sleep(Duration::from_secs(5)).await; - } - }) - .await - { - Ok(_) => { - info!( - "All {} withdraws to Ethereum bridged successfully!", - NUM_USERS * erc20_addresses.len() - ); - } - Err(_) => { - panic!( - "Failed to perform all {} withdraws to Ethereum!", - NUM_USERS * erc20_addresses.len() - ); - } - } - - // we should find a batch nonce greater than zero since all the batches - // executed - let eth_wallet = LocalWallet::from(keys[0].eth_key.clone()); - let eth_client = Arc::new(SignerMiddleware::new(eth_provider.clone(), eth_wallet)); - for token in erc20_addresses { - assert!( - get_tx_batch_nonce(gravity_address, token, eth_client.clone()) - .await - .unwrap() - > 0 - ) - } -} diff --git a/orchestrator/test_runner/src/utils.rs b/orchestrator/test_runner/src/utils.rs deleted file mode 100644 index d90df7815..000000000 --- a/orchestrator/test_runner/src/utils.rs +++ /dev/null @@ -1,186 +0,0 @@ -use crate::one_eth; -use cosmos_gravity::crypto::PrivateKey as CosmosPrivateKey; -use deep_space::address::Address as CosmosAddress; -use deep_space::coin::Coin; -use deep_space::Contact; -use ethereum_gravity::{erc20_utils::get_erc20_balance, types::EthClient}; -use ethers::core::k256::ecdsa::SigningKey; -use ethers::prelude::*; -use ethers::types::Address as EthAddress; -use futures::future::join_all; -use gravity_abi::erc20::ERC20; -use rand::Rng; - -pub async fn send_one_eth(dest: EthAddress, eth_client: EthClient) { - let tx = TransactionRequest { - from: Some(eth_client.address()), - to: Some(NameOrAddress::Address(dest)), - gas: None, - gas_price: None, - value: Some(one_eth()), - data: Some(Vec::new().into()), - nonce: None, - }; - - let pending_tx = eth_client.send_transaction(tx, None).await.unwrap(); - pending_tx.await.unwrap(); -} - -pub async fn check_cosmos_balance( - denom: &str, - address: CosmosAddress, - contact: &Contact, -) -> Option { - let account_info = contact.get_balances(address).await.unwrap(); - trace!("Cosmos balance {:?}", account_info); - for coin in account_info { - // make sure the name and amount is correct - if coin.denom.starts_with(denom) { - return Some(coin); - } - } - None -} - -/// This function efficiently distributes ERC20 tokens to a large number of provided Ethereum addresses -/// the real problem here is that you can't do more than one send operation at a time from a -/// single address without your sequence getting out of whack. By manually setting the nonce -/// here we can send thousands of transactions in only a few blocks -pub async fn send_erc20_bulk( - amount: U256, - erc20: EthAddress, - destinations: &[EthAddress], - eth_client: EthClient, -) { - let miner_balance = get_erc20_balance(erc20, eth_client.address(), eth_client.clone()) - .await - .unwrap(); - assert!(miner_balance > amount.checked_mul(destinations.len().into()).unwrap()); - - let mut nonce = eth_client - .get_transaction_count(eth_client.address(), None) - .await - .unwrap(); - let mut transactions = Vec::new(); - - for address in destinations { - let data = ERC20::new(erc20, eth_client.clone()) - .transfer(*address, amount) - .calldata() - .unwrap(); - - let tx = TransactionRequest { - from: Some(eth_client.address()), - to: Some(NameOrAddress::Address(erc20)), - gas: Some(100_000u32.into()), - gas_price: None, - value: Some(0u32.into()), - data: Some(data), - nonce: Some(nonce), - }; - - let tx = eth_client.send_transaction(tx, None); - transactions.push(tx); - nonce += 1u64.into(); - } - - let pending_tx_results = join_all(transactions).await; - let mut pending_txs = Vec::new(); - for pending_tx_result in pending_tx_results { - let pending_tx = pending_tx_result.unwrap(); - pending_txs.push(pending_tx); - } - join_all(pending_txs).await; - - for address in destinations { - let new_balance = get_erc20_balance(erc20, *address, eth_client.clone()) - .await - .unwrap(); - assert!(new_balance >= amount); - } -} - -/// This function efficiently distributes ETH to a large number of provided Ethereum addresses -/// the real problem here is that you can't do more than one send operation at a time from a -/// single address without your sequence getting out of whack. By manually setting the nonce -/// here we can quickly send thousands of transactions in only a few blocks -pub async fn send_eth_bulk(amount: U256, destinations: &[EthAddress], eth_client: EthClient) { - let mut nonce = eth_client - .get_transaction_count(eth_client.address(), None) - .await - .unwrap(); - let mut transactions = Vec::new(); - - for address in destinations { - let tx = TransactionRequest { - from: Some(eth_client.address()), - to: Some(NameOrAddress::Address(*address)), - gas: Some(24_000u64.into()), - gas_price: Some(1_000_000_000u64.into()), - value: Some(amount), - data: Some(Vec::new().into()), - nonce: Some(nonce), - }; - - let tx = eth_client.send_transaction(tx, None); - transactions.push(tx); - nonce += 1u64.into(); - } - - let pending_tx_results = join_all(transactions).await; - let mut pending_txs = Vec::new(); - for pending_tx_result in pending_tx_results { - let pending_tx = pending_tx_result.unwrap(); - pending_txs.push(pending_tx); - } - join_all(pending_txs).await; -} - -pub fn get_user_key() -> BridgeUserKey { - let mut rng = rand::thread_rng(); - let secret: [u8; 32] = rng.gen(); - // the starting location of the funds - let eth_key = SigningKey::from_bytes(&secret).unwrap(); - let eth_address = LocalWallet::from(eth_key.clone()).address(); - // the destination on cosmos that sends along to the final ethereum destination - let cosmos_key = CosmosPrivateKey::from_secret(&secret); - let cosmos_address = cosmos_key - .to_address(CosmosAddress::DEFAULT_PREFIX) - .unwrap(); - let mut rng = rand::thread_rng(); - let secret: [u8; 32] = rng.gen(); - // the final destination of the tokens back on Ethereum - let eth_dest_key = SigningKey::from_bytes(&secret).unwrap(); - let eth_dest_address = LocalWallet::from(eth_dest_key.clone()).address(); - BridgeUserKey { - eth_address, - eth_key, - cosmos_address, - cosmos_key, - eth_dest_key, - eth_dest_address, - } -} -#[derive(Debug)] -pub struct BridgeUserKey { - // the starting addresses that get Eth balances to send across the bridge - pub eth_address: EthAddress, - pub eth_key: SigningKey, - // the cosmos addresses that get the funds and send them on to the dest eth addresses - pub cosmos_address: CosmosAddress, - pub cosmos_key: CosmosPrivateKey, - // the location tokens are sent back to on Ethereum - pub eth_dest_address: EthAddress, - pub eth_dest_key: SigningKey, -} - -#[derive(Debug, Clone)] -pub struct ValidatorKeys { - /// The Ethereum key used by this validator to sign Gravity bridge messages - pub eth_key: SigningKey, - /// The Orchestrator key used by this validator to submit oracle messages and signatures - /// to the cosmos chain - pub orch_key: CosmosPrivateKey, - /// The validator key used by this validator to actually sign and produce blocks - pub validator_key: CosmosPrivateKey, -} diff --git a/orchestrator/test_runner/src/valset_stress.rs b/orchestrator/test_runner/src/valset_stress.rs deleted file mode 100644 index 215b5e28e..000000000 --- a/orchestrator/test_runner/src/valset_stress.rs +++ /dev/null @@ -1,14 +0,0 @@ -use crate::happy_path::test_valset_update; -use crate::utils::ValidatorKeys; -use deep_space::Contact; -use ethers::types::Address as EthAddress; - -pub async fn validator_set_stress_test( - contact: &Contact, - keys: Vec, - gravity_address: EthAddress, -) { - for _ in 0u32..10 { - test_valset_update(contact, &keys, gravity_address).await; - } -} diff --git a/orchestrator/test_runner/startup.sh b/orchestrator/test_runner/startup.sh deleted file mode 100644 index ee295e388..000000000 --- a/orchestrator/test_runner/startup.sh +++ /dev/null @@ -1,6 +0,0 @@ -validator_address=$(getent hosts gravity0 | awk '{ print $1 }') -abci="http://$validator_address:26657" -grpc="http://$validator_address:9090" -ethrpc="http://$(getent hosts ethereum | awk '{ print $1 }'):8545" - -COSMOS_NODE_GRPC="$grpc" COSMOS_NODE_ABCI="$abci" ETH_NODE="$ethrpc" PATH=$PATH:$HOME/.cargo/bin test_runner \ No newline at end of file diff --git a/orchestrator/testnet.Dockerfile b/orchestrator/testnet.Dockerfile deleted file mode 100644 index 9caeb9a24..000000000 --- a/orchestrator/testnet.Dockerfile +++ /dev/null @@ -1,33 +0,0 @@ -# Reference: https://www.lpalmieri.com/posts/fast-rust-docker-builds/ - -FROM rust:1.56 as cargo-chef-rust -RUN apt-get install bash -RUN cargo install cargo-chef - -FROM cargo-chef-rust as planner -WORKDIR app -# We only pay the installation cost once, -# it will be cached from the second build onwards -# To ensure a reproducible build consider pinning -# the cargo-chef version with `--version X.X.X` -COPY . . -RUN cargo chef prepare --recipe-path recipe.json - -FROM cargo-chef-rust as cacher -WORKDIR app -COPY --from=planner /app/recipe.json recipe.json -RUN cargo chef cook --release --recipe-path recipe.json - -FROM cargo-chef-rust as builder -WORKDIR app -COPY . . -# Copy over the cached dependencies -COPY --from=cacher /app/target target -COPY --from=cacher /usr/local/cargo /usr/local/cargo -RUN cargo build --manifest-path=test_runner/Cargo.toml --release --bin test_runner - -FROM cargo-chef-rust as runtime -WORKDIR app -COPY test_runner/startup.sh startup.sh -COPY --from=builder /app/target/release/test_runner /usr/local/bin -CMD sh startup.sh \ No newline at end of file From 9dc72476cec221eba04de595b3a6b20dbe5853fb Mon Sep 17 00:00:00 2001 From: Collin Brittain Date: Wed, 7 Jun 2023 22:00:00 -0500 Subject: [PATCH 02/11] Task keep-alive loops --- orchestrator/cosmos_gravity/src/send.rs | 27 ++++++- .../gorc/src/commands/orchestrator/start.rs | 6 +- orchestrator/orchestrator/src/main_loop.rs | 74 ++++++++++++++++--- orchestrator/relayer/src/main_loop.rs | 34 +++++++-- 4 files changed, 120 insertions(+), 21 deletions(-) diff --git a/orchestrator/cosmos_gravity/src/send.rs b/orchestrator/cosmos_gravity/src/send.rs index 054958523..fce278f2a 100644 --- a/orchestrator/cosmos_gravity/src/send.rs +++ b/orchestrator/cosmos_gravity/src/send.rs @@ -149,11 +149,11 @@ pub async fn send_messages( Ok(contact.wait_for_tx(response, TIMEOUT).await?) } -pub async fn send_main_loop( +pub async fn run_sender( contact: &Contact, cosmos_key: CosmosPrivateKey, gas_price: (f64, String), - mut rx: tokio::sync::mpsc::Receiver>, + rx: &mut tokio::sync::mpsc::Receiver>, gas_adjustment: f64, msg_batch_size: usize, ) { @@ -198,6 +198,29 @@ pub async fn send_main_loop( } } +pub async fn send_main_loop( + contact: &Contact, + cosmos_key: CosmosPrivateKey, + gas_price: (f64, String), + mut rx: tokio::sync::mpsc::Receiver>, + gas_adjustment: f64, + msg_batch_size: usize, +) { + loop { + info!("starting cosmos sender"); + run_sender( + contact, + cosmos_key, + gas_price.to_owned(), + &mut rx, + gas_adjustment, + msg_batch_size, + ).await; + + warn!("cosmos sender exited unexpectedly. restarting!") + } +} + fn log_send_error(messages: &Vec, err: GravityError) { let msg_types = messages .iter() diff --git a/orchestrator/gorc/src/commands/orchestrator/start.rs b/orchestrator/gorc/src/commands/orchestrator/start.rs index 740282c2c..661c90773 100644 --- a/orchestrator/gorc/src/commands/orchestrator/start.rs +++ b/orchestrator/gorc/src/commands/orchestrator/start.rs @@ -114,7 +114,11 @@ impl Runnable for StartCommand { self.orchestrator_only, config.cosmos.msg_batch_size, ) - .await; + .await + .unwrap_or_else(|e| { + error!("orchestrator exited with error: {}", e); + std::process::exit(1); + }); }) .unwrap_or_else(|e| { status_err!("executor exited with error: {}", e); diff --git a/orchestrator/orchestrator/src/main_loop.rs b/orchestrator/orchestrator/src/main_loop.rs index 928c213e5..45c737f1f 100644 --- a/orchestrator/orchestrator/src/main_loop.rs +++ b/orchestrator/orchestrator/src/main_loop.rs @@ -25,6 +25,7 @@ use ethereum_gravity::types::EthClient; use ethereum_gravity::utils::get_gravity_id; use ethers::{prelude::*, types::Address as EthAddress}; use gravity_proto::gravity::query_client::QueryClient as GravityQueryClient; +use gravity_utils::error::GravityError; use gravity_utils::ethereum::bytes_to_hex_str; use relayer::main_loop::relayer_main_loop; use std::convert::TryInto; @@ -60,8 +61,9 @@ pub async fn orchestrator_main_loop( gas_adjustment: f64, relayer_opt_out: bool, cosmos_msg_batch_size: u32, -) { - let (tx, rx) = tokio::sync::mpsc::channel(1); +) -> Result<(), GravityError> { + let gravity_id = get_gravity_id(gravity_contract_address, eth_client.clone()).await?; + let (tx, rx) = tokio::sync::mpsc::channel(10); let a = send_main_loop( &contact, @@ -83,6 +85,7 @@ pub async fn orchestrator_main_loop( ); let c = eth_signer_main_loop( + gravity_id.clone(), cosmos_key, contact.clone(), eth_client.clone(), @@ -95,6 +98,7 @@ pub async fn orchestrator_main_loop( if !relayer_opt_out { let e = relayer_main_loop( + gravity_id, eth_client.clone(), grpc_client.clone(), gravity_contract_address, @@ -105,6 +109,8 @@ pub async fn orchestrator_main_loop( } else { futures::future::join4(a, b, c, d).await; } + + Ok(()) } // the amount of time to wait when encountering error conditions @@ -113,10 +119,37 @@ const DELAY: Duration = Duration::from_secs(5); // the number of loop iterations to wait between sending height update messages const HEIGHT_UPDATE_INTERVAL: u32 = 50; +#[allow(unused_variables)] +pub async fn eth_oracle_main_loop( + cosmos_key: CosmosPrivateKey, + contact: Contact, + eth_client: EthClient, + grpc_client: GravityQueryClient, + gravity_contract_address: EthAddress, + blocks_to_search: u64, + msg_sender: tokio::sync::mpsc::Sender>, +) { + loop { + info!("starting oracle"); + run_oracle( + cosmos_key, + contact.clone(), + eth_client.clone(), + grpc_client.clone(), + gravity_contract_address, + blocks_to_search, + msg_sender.clone(), + ) + .await; + + warn!("oracle exited unexpectedly. restarting!"); + } +} + /// This function is responsible for making sure that Ethereum events are retrieved from the Ethereum blockchain /// and ferried over to Cosmos where they will be used to issue tokens or process batches. #[allow(unused_variables)] -pub async fn eth_oracle_main_loop( +pub async fn run_oracle( cosmos_key: CosmosPrivateKey, contact: Contact, eth_client: EthClient, @@ -245,6 +278,7 @@ pub async fn eth_oracle_main_loop( /// valid and signed off on. #[allow(unused_variables)] pub async fn eth_signer_main_loop( + gravity_id: String, cosmos_key: CosmosPrivateKey, contact: Contact, eth_client: EthClient, @@ -252,15 +286,35 @@ pub async fn eth_signer_main_loop( contract_address: EthAddress, msg_sender: tokio::sync::mpsc::Sender>, ) { - let our_cosmos_address = cosmos_key.to_address(&contact.get_prefix()).unwrap(); - let mut grpc_client = grpc_client; + loop { + info!("starting ethereum signer"); + run_signer( + gravity_id.clone(), + cosmos_key, + contact.clone(), + eth_client.clone(), + grpc_client.clone(), + contract_address, + msg_sender.clone(), + ) + .await; - let gravity_id = get_gravity_id(contract_address, eth_client.clone()).await; - if gravity_id.is_err() { - error!("Failed to get GravityID, check your Eth node"); - return; + warn!("signer exited unexpectedly. restarting!"); } - let gravity_id = gravity_id.unwrap(); +} + +#[allow(unused_variables)] +pub async fn run_signer( + gravity_id: String, + cosmos_key: CosmosPrivateKey, + contact: Contact, + eth_client: EthClient, + grpc_client: GravityQueryClient, + contract_address: EthAddress, + msg_sender: tokio::sync::mpsc::Sender>, +) { + let our_cosmos_address = cosmos_key.to_address(&contact.get_prefix()).unwrap(); + let mut grpc_client = grpc_client; loop { let (async_resp, _) = tokio::join!( diff --git a/orchestrator/relayer/src/main_loop.rs b/orchestrator/relayer/src/main_loop.rs index 2794d5edf..7f14ce188 100644 --- a/orchestrator/relayer/src/main_loop.rs +++ b/orchestrator/relayer/src/main_loop.rs @@ -11,23 +11,41 @@ use tonic::transport::Channel; pub const LOOP_SPEED: Duration = Duration::from_secs(17); pub const PENDING_TX_TIMEOUT: Duration = Duration::from_secs(120); -/// This function contains the orchestrator primary loop, it is broken out of the main loop so that -/// it can be called in the test runner for easier orchestration of multi-node tests #[allow(unused_variables)] pub async fn relayer_main_loop( + gravity_id: String, eth_client: EthClient, grpc_client: GravityQueryClient, gravity_contract_address: EthAddress, eth_gas_price_multiplier: f32, eth_gas_multiplier: f32, ) { - let mut grpc_client = grpc_client; - let gravity_id = get_gravity_id(gravity_contract_address, eth_client.clone()).await; - if gravity_id.is_err() { - error!("Failed to get GravityID, check your Eth node"); - return; + loop { + info!("starting relayer"); + run_relayer( + gravity_id.clone(), + eth_client.clone(), + grpc_client.clone(), + gravity_contract_address, + eth_gas_price_multiplier, + eth_gas_multiplier, + ) + .await; + + warn!("relayer exited unexpectedly. restarting!"); } - let gravity_id = gravity_id.unwrap(); +} + +#[allow(unused_variables)] +pub async fn run_relayer( + gravity_id: String, + eth_client: EthClient, + grpc_client: GravityQueryClient, + gravity_contract_address: EthAddress, + eth_gas_price_multiplier: f32, + eth_gas_multiplier: f32, +) { + let mut grpc_client = grpc_client; let mut logic_call_skips = LogicCallSkips::new(); loop { From e31df4f2ca3bfacbb033eaf967d433fa19c248ac Mon Sep 17 00:00:00 2001 From: Collin Brittain Date: Thu, 8 Jun 2023 09:41:08 -0500 Subject: [PATCH 03/11] Move cosmos address derivation to beginning of orchestrator startup except for sender --- orchestrator/cosmos_gravity/src/build.rs | 27 +++--------- .../src/ethereum_event_watcher.rs | 13 +++--- orchestrator/orchestrator/src/main_loop.rs | 44 ++++++++----------- 3 files changed, 30 insertions(+), 54 deletions(-) diff --git a/orchestrator/cosmos_gravity/src/build.rs b/orchestrator/cosmos_gravity/src/build.rs index 731e632b8..b834c4599 100644 --- a/orchestrator/cosmos_gravity/src/build.rs +++ b/orchestrator/cosmos_gravity/src/build.rs @@ -1,5 +1,4 @@ -use deep_space::Contact; -use deep_space::Msg; +use deep_space::{Address as CosmosAddress, Msg}; use ethereum_gravity::types::EthClient; use ethers::prelude::*; use ethers::utils::keccak256; @@ -14,20 +13,16 @@ use lazy_static::lazy_static; use regex::Regex; use std::collections::BTreeMap; -use crate::crypto::PrivateKey as CosmosPrivateKey; - lazy_static! { static ref DENOM_REGEX: Regex = Regex::new("^[a-zA-Z][a-zA-Z0-9/-]{2,127}$").unwrap(); } pub async fn signer_set_tx_confirmation_messages( - contact: &Contact, + cosmos_address: CosmosAddress, eth_client: EthClient, valsets: Vec, - cosmos_key: CosmosPrivateKey, gravity_id: String, ) -> Vec { - let cosmos_address = cosmos_key.to_address(&contact.get_prefix()).unwrap(); let ethereum_address = eth_client.address(); let mut msgs = Vec::new(); @@ -52,13 +47,11 @@ pub async fn signer_set_tx_confirmation_messages( } pub async fn batch_tx_confirmation_messages( - contact: &Contact, + cosmos_address: CosmosAddress, eth_client: EthClient, batches: Vec, - cosmos_key: CosmosPrivateKey, gravity_id: String, ) -> Vec { - let cosmos_address = cosmos_key.to_address(&contact.get_prefix()).unwrap(); let ethereum_address = eth_client.address(); let mut msgs = Vec::new(); @@ -84,13 +77,11 @@ pub async fn batch_tx_confirmation_messages( } pub async fn contract_call_tx_confirmation_messages( - contact: &Contact, + cosmos_address: CosmosAddress, eth_client: EthClient, logic_calls: Vec, - cosmos_key: CosmosPrivateKey, gravity_id: String, ) -> Vec { - let cosmos_address = cosmos_key.to_address(&contact.get_prefix()).unwrap(); let ethereum_address = eth_client.address(); let mut msgs = Vec::new(); @@ -117,12 +108,9 @@ pub async fn contract_call_tx_confirmation_messages( } pub async fn ethereum_vote_height_messages( - contact: &Contact, - cosmos_key: CosmosPrivateKey, + cosmos_address: CosmosAddress, ethereum_height: U64, ) -> Vec { - let cosmos_address = cosmos_key.to_address(&contact.get_prefix()).unwrap(); - let msg = proto::MsgEthereumHeightVote { ethereum_height: ethereum_height.as_u64(), signer: cosmos_address.to_string(), @@ -136,16 +124,13 @@ pub async fn ethereum_vote_height_messages( } pub fn ethereum_event_messages( - contact: &Contact, - cosmos_key: CosmosPrivateKey, + cosmos_address: CosmosAddress, deposits: Vec, batches: Vec, erc20_deploys: Vec, logic_calls: Vec, valsets: Vec, ) -> Vec { - let cosmos_address = cosmos_key.to_address(&contact.get_prefix()).unwrap(); - // This sorts oracle messages by event nonce before submitting them. It's not a pretty implementation because // we're missing an intermediary layer of abstraction. We could implement 'EventTrait' and then implement sort // for it, but then when we go to transform 'EventTrait' objects into GravityMsg enum values we'll have all sorts diff --git a/orchestrator/orchestrator/src/ethereum_event_watcher.rs b/orchestrator/orchestrator/src/ethereum_event_watcher.rs index 6251c56a0..4b25c30cc 100644 --- a/orchestrator/orchestrator/src/ethereum_event_watcher.rs +++ b/orchestrator/orchestrator/src/ethereum_event_watcher.rs @@ -5,9 +5,8 @@ use crate::get_with_retry::get_block_number_with_retry; use crate::get_with_retry::get_chain_id_with_retry; use crate::metrics; use cosmos_gravity::build; -use cosmos_gravity::crypto::PrivateKey as CosmosPrivateKey; use cosmos_gravity::query::get_last_event_nonce; -use deep_space::{Contact, Msg}; +use deep_space::{Address as CosmosAddress, Contact, Msg}; use ethereum_gravity::types::EthClient; use ethers::prelude::*; use ethers::types::Address as EthAddress; @@ -29,18 +28,17 @@ use tonic::transport::Channel; #[allow(clippy::too_many_arguments)] pub async fn check_for_events( + cosmos_address: CosmosAddress, eth_client: EthClient, contact: &Contact, grpc_client: &mut GravityQueryClient, gravity_contract_address: EthAddress, - cosmos_key: CosmosPrivateKey, starting_block: U64, blocks_to_search: U64, block_delay: U64, msg_sender: tokio::sync::mpsc::Sender>, ) -> Result { let prefix = contact.get_prefix(); - let our_cosmos_address = cosmos_key.to_address(&prefix).unwrap(); let latest_block = get_block_number_with_retry(eth_client.clone()).await; let latest_block = latest_block - block_delay; @@ -110,7 +108,7 @@ pub async fn check_for_events( // block, so we also need this routine so make sure we don't send in the first event in this hypothetical // multi event block again. In theory we only send all events for every block and that will pass of fail // atomicly but lets not take that risk. - let last_event_nonce = get_last_event_nonce(grpc_client, our_cosmos_address).await?; + let last_event_nonce = get_last_event_nonce(grpc_client, cosmos_address).await?; metrics::set_cosmos_last_event_nonce(last_event_nonce); let erc20_deployed_events: Vec = @@ -182,8 +180,7 @@ pub async fn check_for_events( || !valset_updated_events.is_empty() { let messages = build::ethereum_event_messages( - contact, - cosmos_key, + cosmos_address, send_to_cosmos_events.to_owned(), transaction_batch_events.to_owned(), erc20_deployed_events.to_owned(), @@ -229,7 +226,7 @@ pub async fn check_for_events( // TODO(bolten): we are only waiting one block, is it possible if we are sending multiple // events via the sender, they could be received over the block boundary and thus our new // event nonce does not reflect full processing of the above events? - let new_event_nonce = get_last_event_nonce(grpc_client, our_cosmos_address).await?; + let new_event_nonce = get_last_event_nonce(grpc_client, cosmos_address).await?; if new_event_nonce == last_event_nonce { return Err(GravityError::InvalidBridgeStateError( format!("Claims did not process, trying to update but still on {}, trying again in a moment", last_event_nonce), diff --git a/orchestrator/orchestrator/src/main_loop.rs b/orchestrator/orchestrator/src/main_loop.rs index 45c737f1f..39ab809c0 100644 --- a/orchestrator/orchestrator/src/main_loop.rs +++ b/orchestrator/orchestrator/src/main_loop.rs @@ -20,7 +20,7 @@ use cosmos_gravity::{ }; use deep_space::client::ChainStatus; use deep_space::error::CosmosGrpcError; -use deep_space::{Contact, Msg}; +use deep_space::{Address as CosmosAddress, Contact, Msg}; use ethereum_gravity::types::EthClient; use ethereum_gravity::utils::get_gravity_id; use ethers::{prelude::*, types::Address as EthAddress}; @@ -62,6 +62,7 @@ pub async fn orchestrator_main_loop( relayer_opt_out: bool, cosmos_msg_batch_size: u32, ) -> Result<(), GravityError> { + let cosmos_address = cosmos_key.to_address(&contact.get_prefix())?; let gravity_id = get_gravity_id(gravity_contract_address, eth_client.clone()).await?; let (tx, rx) = tokio::sync::mpsc::channel(10); @@ -75,7 +76,7 @@ pub async fn orchestrator_main_loop( ); let b = eth_oracle_main_loop( - cosmos_key, + cosmos_address.clone(), contact.clone(), eth_client.clone(), grpc_client.clone(), @@ -85,8 +86,8 @@ pub async fn orchestrator_main_loop( ); let c = eth_signer_main_loop( + cosmos_address, gravity_id.clone(), - cosmos_key, contact.clone(), eth_client.clone(), grpc_client.clone(), @@ -121,7 +122,7 @@ const HEIGHT_UPDATE_INTERVAL: u32 = 50; #[allow(unused_variables)] pub async fn eth_oracle_main_loop( - cosmos_key: CosmosPrivateKey, + cosmos_address: CosmosAddress, contact: Contact, eth_client: EthClient, grpc_client: GravityQueryClient, @@ -132,7 +133,7 @@ pub async fn eth_oracle_main_loop( loop { info!("starting oracle"); run_oracle( - cosmos_key, + cosmos_address, contact.clone(), eth_client.clone(), grpc_client.clone(), @@ -150,7 +151,7 @@ pub async fn eth_oracle_main_loop( /// and ferried over to Cosmos where they will be used to issue tokens or process batches. #[allow(unused_variables)] pub async fn run_oracle( - cosmos_key: CosmosPrivateKey, + cosmos_address: CosmosAddress, contact: Contact, eth_client: EthClient, grpc_client: GravityQueryClient, @@ -158,7 +159,6 @@ pub async fn run_oracle( blocks_to_search: u64, msg_sender: tokio::sync::mpsc::Sender>, ) { - let our_cosmos_address = cosmos_key.to_address(&contact.get_prefix()).unwrap(); let block_delay = match get_block_delay(eth_client.clone()).await { Ok(block_delay) => block_delay, Err(e) => { @@ -171,7 +171,7 @@ pub async fn run_oracle( }; let mut last_checked_block = get_last_checked_block( grpc_client.clone(), - our_cosmos_address, + cosmos_address, gravity_contract_address, eth_client.clone(), blocks_to_search, @@ -201,8 +201,7 @@ pub async fn run_oracle( // more confidence we are attesting to a height that has not been re-orged if loop_count % HEIGHT_UPDATE_INTERVAL == 0 { let messages = build::ethereum_vote_height_messages( - &contact, - cosmos_key, + cosmos_address, latest_eth_block - block_delay, ) .await; @@ -241,11 +240,11 @@ pub async fn run_oracle( // Relays events from Ethereum -> Cosmos match check_for_events( + cosmos_address, eth_client.clone(), &contact, &mut grpc_client, gravity_contract_address, - cosmos_key, last_checked_block, blocks_to_search.into(), block_delay, @@ -278,8 +277,8 @@ pub async fn run_oracle( /// valid and signed off on. #[allow(unused_variables)] pub async fn eth_signer_main_loop( + cosmos_address: CosmosAddress, gravity_id: String, - cosmos_key: CosmosPrivateKey, contact: Contact, eth_client: EthClient, grpc_client: GravityQueryClient, @@ -289,8 +288,8 @@ pub async fn eth_signer_main_loop( loop { info!("starting ethereum signer"); run_signer( + cosmos_address, gravity_id.clone(), - cosmos_key, contact.clone(), eth_client.clone(), grpc_client.clone(), @@ -305,17 +304,15 @@ pub async fn eth_signer_main_loop( #[allow(unused_variables)] pub async fn run_signer( + cosmos_address: CosmosAddress, gravity_id: String, - cosmos_key: CosmosPrivateKey, contact: Contact, eth_client: EthClient, grpc_client: GravityQueryClient, contract_address: EthAddress, msg_sender: tokio::sync::mpsc::Sender>, ) { - let our_cosmos_address = cosmos_key.to_address(&contact.get_prefix()).unwrap(); let mut grpc_client = grpc_client; - loop { let (async_resp, _) = tokio::join!( async { @@ -358,7 +355,7 @@ pub async fn run_signer( } // sign the last unsigned valsets - match get_oldest_unsigned_valsets(&mut grpc_client, our_cosmos_address).await { + match get_oldest_unsigned_valsets(&mut grpc_client, cosmos_address.clone()).await { Ok(valsets) => { if valsets.is_empty() { trace!("No validator sets to sign, node is caught up!") @@ -369,10 +366,9 @@ pub async fn run_signer( valsets[0].nonce ); let messages = build::signer_set_tx_confirmation_messages( - &contact, + cosmos_address, eth_client.clone(), valsets, - cosmos_key, gravity_id.clone(), ) .await; @@ -392,7 +388,7 @@ pub async fn run_signer( } // sign the last unsigned batch, TODO check if we already have signed this - match get_oldest_unsigned_transaction_batch(&mut grpc_client, our_cosmos_address) + match get_oldest_unsigned_transaction_batch(&mut grpc_client, cosmos_address.clone()) .await { Ok(Some(last_unsigned_batch)) => { @@ -405,10 +401,9 @@ pub async fn run_signer( ); let transaction_batches = vec![last_unsigned_batch]; let messages = build::batch_tx_confirmation_messages( - &contact, + cosmos_address.clone(), eth_client.clone(), transaction_batches, - cosmos_key, gravity_id.clone(), ) .await; @@ -428,7 +423,7 @@ pub async fn run_signer( } let logic_calls = - get_oldest_unsigned_logic_call(&mut grpc_client, our_cosmos_address).await; + get_oldest_unsigned_logic_call(&mut grpc_client, cosmos_address.clone()).await; if let Ok(logic_calls) = logic_calls { for logic_call in logic_calls { info!( @@ -438,10 +433,9 @@ pub async fn run_signer( ); let logic_calls = vec![logic_call]; let messages = build::contract_call_tx_confirmation_messages( - &contact, + cosmos_address, eth_client.clone(), logic_calls, - cosmos_key, gravity_id.clone(), ) .await; From 180e699c63d2618b0d3ceaf7b06b5abe8945267c Mon Sep 17 00:00:00 2001 From: Collin Brittain Date: Thu, 8 Jun 2023 09:41:37 -0500 Subject: [PATCH 04/11] Gravity ID retrieval in relayer binary startup --- orchestrator/relayer/src/main.rs | 4 +++- orchestrator/relayer/src/main_loop.rs | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/orchestrator/relayer/src/main.rs b/orchestrator/relayer/src/main.rs index f8b15dc08..aed55bf49 100644 --- a/orchestrator/relayer/src/main.rs +++ b/orchestrator/relayer/src/main.rs @@ -4,6 +4,7 @@ use crate::main_loop::relayer_main_loop; use crate::main_loop::LOOP_SPEED; use docopt::Docopt; use env_logger::Env; +use ethereum_gravity::utils::get_gravity_id; use ethers::prelude::*; use ethers::signers::LocalWallet as EthWallet; use ethers::types::Address as EthAddress; @@ -92,7 +93,7 @@ async fn main() { let eth_client = SignerMiddleware::new(provider, ethereum_wallet.clone().with_chain_id(chain_id)); let eth_client = Arc::new(eth_client); - + let gravity_id = get_gravity_id(gravity_contract_address, eth_client.clone()).await.expect("failed to get Gravity ID. check your ethereum node connection"); let public_eth_key = eth_client.address(); info!("Starting Gravity Relayer"); info!("Ethereum Address: {}", format_eth_address(public_eth_key)); @@ -106,6 +107,7 @@ async fn main() { check_for_eth(public_eth_key, eth_client.clone()).await; relayer_main_loop( + gravity_id, eth_client, connections.grpc.unwrap(), gravity_contract_address, diff --git a/orchestrator/relayer/src/main_loop.rs b/orchestrator/relayer/src/main_loop.rs index 7f14ce188..573a1eaae 100644 --- a/orchestrator/relayer/src/main_loop.rs +++ b/orchestrator/relayer/src/main_loop.rs @@ -2,7 +2,7 @@ use crate::{ batch_relaying::relay_batches, find_latest_valset::find_latest_valset, logic_call_relaying::relay_logic_calls, valset_relaying::relay_valsets, }; -use ethereum_gravity::{logic_call::LogicCallSkips, types::EthClient, utils::get_gravity_id}; +use ethereum_gravity::{logic_call::LogicCallSkips, types::EthClient}; use ethers::types::Address as EthAddress; use gravity_proto::gravity::query_client::QueryClient as GravityQueryClient; use std::time::Duration; From 15a327fc4fe0b35ff3df5bfdb71ec9bdba1a8189 Mon Sep 17 00:00:00 2001 From: Collin Brittain Date: Thu, 8 Jun 2023 09:49:53 -0500 Subject: [PATCH 05/11] Change batch size config value to usize to avoid fallable conversion --- orchestrator/gorc/src/config.rs | 2 +- orchestrator/orchestrator/src/main_loop.rs | 5 ++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/orchestrator/gorc/src/config.rs b/orchestrator/gorc/src/config.rs index 076980071..dedc2a77d 100644 --- a/orchestrator/gorc/src/config.rs +++ b/orchestrator/gorc/src/config.rs @@ -97,7 +97,7 @@ pub struct CosmosSection { pub grpc: String, pub prefix: String, pub gas_adjustment: f64, - pub msg_batch_size: u32, + pub msg_batch_size: usize, pub gas_price: GasPrice, } diff --git a/orchestrator/orchestrator/src/main_loop.rs b/orchestrator/orchestrator/src/main_loop.rs index 39ab809c0..4bf8b19f2 100644 --- a/orchestrator/orchestrator/src/main_loop.rs +++ b/orchestrator/orchestrator/src/main_loop.rs @@ -28,7 +28,6 @@ use gravity_proto::gravity::query_client::QueryClient as GravityQueryClient; use gravity_utils::error::GravityError; use gravity_utils::ethereum::bytes_to_hex_str; use relayer::main_loop::relayer_main_loop; -use std::convert::TryInto; use std::process::exit; use std::{net, time::Duration}; use tokio::time::sleep as delay_for; @@ -60,7 +59,7 @@ pub async fn orchestrator_main_loop( blocks_to_search: u64, gas_adjustment: f64, relayer_opt_out: bool, - cosmos_msg_batch_size: u32, + cosmos_msg_batch_size: usize, ) -> Result<(), GravityError> { let cosmos_address = cosmos_key.to_address(&contact.get_prefix())?; let gravity_id = get_gravity_id(gravity_contract_address, eth_client.clone()).await?; @@ -72,7 +71,7 @@ pub async fn orchestrator_main_loop( gas_price, rx, gas_adjustment, - cosmos_msg_batch_size.try_into().unwrap(), + cosmos_msg_batch_size, ); let b = eth_oracle_main_loop( From 8ef1c96ce75cd77936bc2b78da60a2c1940eefe1 Mon Sep 17 00:00:00 2001 From: Collin Brittain Date: Thu, 8 Jun 2023 10:01:46 -0500 Subject: [PATCH 06/11] Remove unused function check_for_eth --- orchestrator/orchestrator/src/main_loop.rs | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/orchestrator/orchestrator/src/main_loop.rs b/orchestrator/orchestrator/src/main_loop.rs index 4bf8b19f2..b533d1f28 100644 --- a/orchestrator/orchestrator/src/main_loop.rs +++ b/orchestrator/orchestrator/src/main_loop.rs @@ -455,14 +455,3 @@ pub async fn run_signer( ); } } - -pub async fn check_for_eth(orchestrator_address: EthAddress, eth_client: EthClient) { - let balance = eth_client - .get_balance(orchestrator_address, None) - .await - .unwrap(); - if balance == 0u8.into() { - warn!("You don't have any Ethereum! You will need to send some to {} for this program to work. Dust will do for basic operations, more info about average relaying costs will be presented as the program runs", orchestrator_address); - } - metrics::set_ethereum_bal(balance); -} From 7298f80241166f389044daae31cda447d0e451b5 Mon Sep 17 00:00:00 2001 From: Collin Brittain Date: Thu, 8 Jun 2023 12:53:44 -0500 Subject: [PATCH 07/11] Move block_delay query to startup --- orchestrator/orchestrator/src/main_loop.rs | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/orchestrator/orchestrator/src/main_loop.rs b/orchestrator/orchestrator/src/main_loop.rs index b533d1f28..70f890d1a 100644 --- a/orchestrator/orchestrator/src/main_loop.rs +++ b/orchestrator/orchestrator/src/main_loop.rs @@ -28,7 +28,6 @@ use gravity_proto::gravity::query_client::QueryClient as GravityQueryClient; use gravity_utils::error::GravityError; use gravity_utils::ethereum::bytes_to_hex_str; use relayer::main_loop::relayer_main_loop; -use std::process::exit; use std::{net, time::Duration}; use tokio::time::sleep as delay_for; use tonic::transport::Channel; @@ -62,6 +61,7 @@ pub async fn orchestrator_main_loop( cosmos_msg_batch_size: usize, ) -> Result<(), GravityError> { let cosmos_address = cosmos_key.to_address(&contact.get_prefix())?; + let block_delay = get_block_delay(eth_client.clone()).await?; let gravity_id = get_gravity_id(gravity_contract_address, eth_client.clone()).await?; let (tx, rx) = tokio::sync::mpsc::channel(10); @@ -81,6 +81,7 @@ pub async fn orchestrator_main_loop( grpc_client.clone(), gravity_contract_address, blocks_to_search, + block_delay, tx.clone(), ); @@ -156,18 +157,9 @@ pub async fn run_oracle( grpc_client: GravityQueryClient, gravity_contract_address: EthAddress, blocks_to_search: u64, + block_delay: U64, msg_sender: tokio::sync::mpsc::Sender>, ) { - let block_delay = match get_block_delay(eth_client.clone()).await { - Ok(block_delay) => block_delay, - Err(e) => { - error!( - "Error encountered when retrieving block delay, cannot continue: {}", - e - ); - exit(1); - } - }; let mut last_checked_block = get_last_checked_block( grpc_client.clone(), cosmos_address, From ba9c55a2d1b3d99f4076f05f488cee8e64db8164 Mon Sep 17 00:00:00 2001 From: Collin Brittain Date: Thu, 8 Jun 2023 12:54:37 -0500 Subject: [PATCH 08/11] Replace unwrap() with ? in signer and oracle loops where needed --- orchestrator/cosmos_gravity/src/send.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/orchestrator/cosmos_gravity/src/send.rs b/orchestrator/cosmos_gravity/src/send.rs index fce278f2a..06620c54b 100644 --- a/orchestrator/cosmos_gravity/src/send.rs +++ b/orchestrator/cosmos_gravity/src/send.rs @@ -90,7 +90,7 @@ pub async fn send_to_eth( ))); } - let cosmos_address = cosmos_key.to_address(&contact.get_prefix()).unwrap(); + let cosmos_address = cosmos_key.to_address(&contact.get_prefix())?; let msg = proto::MsgSendToEthereum { sender: cosmos_address.to_string(), @@ -103,13 +103,13 @@ pub async fn send_to_eth( } pub async fn send_messages( - contact: &Contact, + contact: Contact, cosmos_key: CosmosPrivateKey, gas_price: (f64, String), messages: Vec, gas_adjustment: f64, ) -> Result { - let cosmos_address = cosmos_key.to_address(&contact.get_prefix()).unwrap(); + let cosmos_address = cosmos_key.to_address(&contact.get_prefix())?; let fee_amount = Coin { denom: gas_price.1.clone(), From ae54a0c35b130def2b07b6d445e91d4ad20af637 Mon Sep 17 00:00:00 2001 From: Collin Brittain Date: Thu, 8 Jun 2023 12:55:10 -0500 Subject: [PATCH 09/11] Add panic-handling and task restart to sender, signer, and oracle threads --- orchestrator/cosmos_gravity/src/send.rs | 46 ++++++++------ .../gorc/src/commands/cosmos_to_eth.rs | 2 +- orchestrator/orchestrator/src/main_loop.rs | 63 +++++++++++-------- 3 files changed, 64 insertions(+), 47 deletions(-) diff --git a/orchestrator/cosmos_gravity/src/send.rs b/orchestrator/cosmos_gravity/src/send.rs index 06620c54b..25a35ca2e 100644 --- a/orchestrator/cosmos_gravity/src/send.rs +++ b/orchestrator/cosmos_gravity/src/send.rs @@ -14,8 +14,10 @@ use gravity_proto::gravity as proto; use gravity_utils::error::GravityError; use gravity_utils::ethereum::format_eth_address; use prost::Message; +use tokio::sync::Mutex; use std::cmp; use std::collections::HashSet; +use std::sync::Arc; use std::{result::Result, time::Duration}; use crate::crypto::PrivateKey as CosmosPrivateKey; @@ -67,7 +69,7 @@ pub async fn update_gravity_delegate_addresses( }; let msg = Msg::new("/gravity.v1.MsgDelegateKeys", msg); - send_messages(contact, cosmos_key, gas_price, vec![msg], gas_adjustment).await + send_messages(contact.to_owned(), cosmos_key, gas_price, vec![msg], gas_adjustment).await } /// Sends tokens from Cosmos to Ethereum. These tokens will not be sent immediately instead @@ -78,7 +80,7 @@ pub async fn send_to_eth( amount: Coin, bridge_fee: Coin, gas_price: (f64, String), - contact: &Contact, + contact: Contact, gas_adjustment: f64, ) -> Result { if amount.denom != bridge_fee.denom { @@ -150,18 +152,18 @@ pub async fn send_messages( } pub async fn run_sender( - contact: &Contact, + contact: Contact, cosmos_key: CosmosPrivateKey, gas_price: (f64, String), - rx: &mut tokio::sync::mpsc::Receiver>, + rx: Arc>>>, gas_adjustment: f64, msg_batch_size: usize, ) { - while let Some(messages) = rx.recv().await { + while let Some(messages) = rx.lock().await.recv().await { for msg_chunk in messages.chunks(msg_batch_size) { let batch = msg_chunk.to_vec(); match send_messages( - contact, + contact.clone(), cosmos_key, gas_price.to_owned(), msg_chunk.to_vec(), @@ -180,7 +182,7 @@ pub async fn run_sender( for msg in batch { let msg_vec = vec![msg]; match send_messages( - contact, + contact.clone(), cosmos_key, gas_price.to_owned(), msg_vec.clone(), @@ -198,26 +200,32 @@ pub async fn run_sender( } } +/// manages the cosmos message sender task. `run_sender` should never return, but in case there are system-caused +/// panics, we want to make sure we attempt to restart it. if this function ever returns, the receiver will be dropped +/// and all tasks containing senders will panic. pub async fn send_main_loop( - contact: &Contact, + contact: Contact, cosmos_key: CosmosPrivateKey, gas_price: (f64, String), - mut rx: tokio::sync::mpsc::Receiver>, + rx: tokio::sync::mpsc::Receiver>, gas_adjustment: f64, msg_batch_size: usize, ) { + let rx = Arc::new(Mutex::new(rx)); loop { info!("starting cosmos sender"); - run_sender( - contact, - cosmos_key, - gas_price.to_owned(), - &mut rx, - gas_adjustment, - msg_batch_size, - ).await; - - warn!("cosmos sender exited unexpectedly. restarting!") + if let Err(err) = tokio::task::spawn( + run_sender( + contact.clone(), + cosmos_key, + gas_price.to_owned(), + rx.clone(), + gas_adjustment, + msg_batch_size, + ) + ).await { + error!("cosmos sender failed with: {:?}", err); + } } } diff --git a/orchestrator/gorc/src/commands/cosmos_to_eth.rs b/orchestrator/gorc/src/commands/cosmos_to_eth.rs index 2dae2a2c3..63ed02a7c 100644 --- a/orchestrator/gorc/src/commands/cosmos_to_eth.rs +++ b/orchestrator/gorc/src/commands/cosmos_to_eth.rs @@ -136,7 +136,7 @@ impl Runnable for CosmosToEthCmd { amount.clone(), bridge_fee.clone(), config.cosmos.gas_price.as_tuple(), - &contact, + contact, 1.0 ) .await; diff --git a/orchestrator/orchestrator/src/main_loop.rs b/orchestrator/orchestrator/src/main_loop.rs index 70f890d1a..54a0c4ff9 100644 --- a/orchestrator/orchestrator/src/main_loop.rs +++ b/orchestrator/orchestrator/src/main_loop.rs @@ -66,7 +66,7 @@ pub async fn orchestrator_main_loop( let (tx, rx) = tokio::sync::mpsc::channel(10); let a = send_main_loop( - &contact, + contact.clone(), cosmos_key, gas_price, rx, @@ -120,6 +120,8 @@ const DELAY: Duration = Duration::from_secs(5); // the number of loop iterations to wait between sending height update messages const HEIGHT_UPDATE_INTERVAL: u32 = 50; +/// manages the ethereum oracle thread. `run_oracle` should never return, but in case there are system-caused +/// panics, we want to make sure we attempt to restart it. #[allow(unused_variables)] pub async fn eth_oracle_main_loop( cosmos_address: CosmosAddress, @@ -128,22 +130,25 @@ pub async fn eth_oracle_main_loop( grpc_client: GravityQueryClient, gravity_contract_address: EthAddress, blocks_to_search: u64, + block_delay: U64, msg_sender: tokio::sync::mpsc::Sender>, ) { loop { info!("starting oracle"); - run_oracle( - cosmos_address, - contact.clone(), - eth_client.clone(), - grpc_client.clone(), - gravity_contract_address, - blocks_to_search, - msg_sender.clone(), - ) - .await; - - warn!("oracle exited unexpectedly. restarting!"); + if let Err(err) = tokio::task::spawn( + run_oracle( + cosmos_address, + contact.clone(), + eth_client.clone(), + grpc_client.clone(), + gravity_contract_address, + blocks_to_search, + block_delay, + msg_sender.clone(), + ) + ).await { + error!("oracle exited unexpectedly: {:?}", err); + } } } @@ -263,9 +268,8 @@ pub async fn run_oracle( } } -/// The eth_signer simply signs off on any batches or validator sets provided by the validator -/// since these are provided directly by a trusted Cosmsos node they can simply be assumed to be -/// valid and signed off on. +/// manages the ethereum signer task. `run_signer` should never return, but in case there are system-caused +/// panics, we want to make sure we attempt to restart it. #[allow(unused_variables)] pub async fn eth_signer_main_loop( cosmos_address: CosmosAddress, @@ -278,21 +282,26 @@ pub async fn eth_signer_main_loop( ) { loop { info!("starting ethereum signer"); - run_signer( - cosmos_address, - gravity_id.clone(), - contact.clone(), - eth_client.clone(), - grpc_client.clone(), - contract_address, - msg_sender.clone(), + if let Err(err) = tokio::task::spawn( + run_signer( + cosmos_address, + gravity_id.clone(), + contact.clone(), + eth_client.clone(), + grpc_client.clone(), + contract_address, + msg_sender.clone(), + ) ) - .await; - - warn!("signer exited unexpectedly. restarting!"); + .await { + error!("eth signer failed with {:?}", err); + } } } +/// simply signs off on any batches or validator sets provided by the validator since these are +/// provided directly by a trusted Cosmsos node they can simply be assumed to be valid and signed +/// off on. #[allow(unused_variables)] pub async fn run_signer( cosmos_address: CosmosAddress, From c7d1a56901757c48e6a469746b24a6c28ff819e5 Mon Sep 17 00:00:00 2001 From: Collin Brittain Date: Thu, 8 Jun 2023 12:57:10 -0500 Subject: [PATCH 10/11] Handle relayer panics, make error messages consistent --- orchestrator/cosmos_gravity/src/send.rs | 2 +- orchestrator/orchestrator/src/main_loop.rs | 4 ++-- orchestrator/relayer/src/main_loop.rs | 22 ++++++++++++---------- 3 files changed, 15 insertions(+), 13 deletions(-) diff --git a/orchestrator/cosmos_gravity/src/send.rs b/orchestrator/cosmos_gravity/src/send.rs index 25a35ca2e..cc9b85e1c 100644 --- a/orchestrator/cosmos_gravity/src/send.rs +++ b/orchestrator/cosmos_gravity/src/send.rs @@ -224,7 +224,7 @@ pub async fn send_main_loop( msg_batch_size, ) ).await { - error!("cosmos sender failed with: {:?}", err); + error!("cosmos sender failed with: {err:?}"); } } } diff --git a/orchestrator/orchestrator/src/main_loop.rs b/orchestrator/orchestrator/src/main_loop.rs index 54a0c4ff9..3da407297 100644 --- a/orchestrator/orchestrator/src/main_loop.rs +++ b/orchestrator/orchestrator/src/main_loop.rs @@ -147,7 +147,7 @@ pub async fn eth_oracle_main_loop( msg_sender.clone(), ) ).await { - error!("oracle exited unexpectedly: {:?}", err); + error!("oracle failed with: {err:?}"); } } } @@ -294,7 +294,7 @@ pub async fn eth_signer_main_loop( ) ) .await { - error!("eth signer failed with {:?}", err); + error!("eth signer failed with {err:?}"); } } } diff --git a/orchestrator/relayer/src/main_loop.rs b/orchestrator/relayer/src/main_loop.rs index 573a1eaae..19a461aa1 100644 --- a/orchestrator/relayer/src/main_loop.rs +++ b/orchestrator/relayer/src/main_loop.rs @@ -22,17 +22,19 @@ pub async fn relayer_main_loop( ) { loop { info!("starting relayer"); - run_relayer( - gravity_id.clone(), - eth_client.clone(), - grpc_client.clone(), - gravity_contract_address, - eth_gas_price_multiplier, - eth_gas_multiplier, + if let Err(err) = tokio::task::spawn( + run_relayer( + gravity_id.clone(), + eth_client.clone(), + grpc_client.clone(), + gravity_contract_address, + eth_gas_price_multiplier, + eth_gas_multiplier, + ) ) - .await; - - warn!("relayer exited unexpectedly. restarting!"); + .await { + error!("relayer failed with: {err:?}"); + } } } From 3f5ed472937f7a0b7ef4ea1c79b8762bbea73510 Mon Sep 17 00:00:00 2001 From: Collin Brittain Date: Tue, 1 Aug 2023 15:57:20 -0500 Subject: [PATCH 11/11] Remove unecessary clone of cosmos address --- orchestrator/orchestrator/src/main_loop.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/orchestrator/orchestrator/src/main_loop.rs b/orchestrator/orchestrator/src/main_loop.rs index 3da407297..c3942d5ae 100644 --- a/orchestrator/orchestrator/src/main_loop.rs +++ b/orchestrator/orchestrator/src/main_loop.rs @@ -75,7 +75,7 @@ pub async fn orchestrator_main_loop( ); let b = eth_oracle_main_loop( - cosmos_address.clone(), + cosmos_address, contact.clone(), eth_client.clone(), grpc_client.clone(), @@ -355,7 +355,7 @@ pub async fn run_signer( } // sign the last unsigned valsets - match get_oldest_unsigned_valsets(&mut grpc_client, cosmos_address.clone()).await { + match get_oldest_unsigned_valsets(&mut grpc_client, cosmos_address).await { Ok(valsets) => { if valsets.is_empty() { trace!("No validator sets to sign, node is caught up!") @@ -388,7 +388,7 @@ pub async fn run_signer( } // sign the last unsigned batch, TODO check if we already have signed this - match get_oldest_unsigned_transaction_batch(&mut grpc_client, cosmos_address.clone()) + match get_oldest_unsigned_transaction_batch(&mut grpc_client, cosmos_address) .await { Ok(Some(last_unsigned_batch)) => { @@ -401,7 +401,7 @@ pub async fn run_signer( ); let transaction_batches = vec![last_unsigned_batch]; let messages = build::batch_tx_confirmation_messages( - cosmos_address.clone(), + cosmos_address, eth_client.clone(), transaction_batches, gravity_id.clone(), @@ -423,7 +423,7 @@ pub async fn run_signer( } let logic_calls = - get_oldest_unsigned_logic_call(&mut grpc_client, cosmos_address.clone()).await; + get_oldest_unsigned_logic_call(&mut grpc_client, cosmos_address).await; if let Ok(logic_calls) = logic_calls { for logic_call in logic_calls { info!(