diff --git a/Cargo.lock b/Cargo.lock index f27a67d84..0bf160cdd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1608,6 +1608,7 @@ version = "0.1.0" dependencies = [ "actix", "actix-rt", + "actix-web", "althea_kernel_interface", "althea_proto", "althea_types", @@ -1630,6 +1631,7 @@ dependencies = [ "num256", "petgraph", "rita_client", + "rita_client_registration", "rita_common", "rita_exit", "settings", @@ -2672,9 +2674,11 @@ dependencies = [ name = "rita_client_registration" version = "0.1.0" dependencies = [ + "actix", "althea_types", "awc", "clarity", + "futures 0.3.28", "lazy_static", "log", "phonenumber", diff --git a/integration_tests/Cargo.toml b/integration_tests/Cargo.toml index 0b9487f0a..4fb51fb48 100644 --- a/integration_tests/Cargo.toml +++ b/integration_tests/Cargo.toml @@ -21,6 +21,7 @@ settings = { path = "../settings" } rita_client = { path = "../rita_client", features = ["dev_env"] } rita_common = { path = "../rita_common", features = ["integration_test"] } rita_exit = { path = "../rita_exit", features = ["dev_env"] } +rita_client_registration = { path = "../rita_client_registration" } ctrlc = { version = "3.2.1", features = ["termination"] } diesel = { version = "1.4", features = ["postgres", "r2d2"] } diesel_migrations = { version = "1.4", features = ["postgres"] } @@ -34,3 +35,6 @@ num256 = "0.5" num-traits = "0.2" web30 = "1.0" lazy_static = "1.4" +actix-web = { version = "4.3", default_features = false, features = [ + "openssl", +] } diff --git a/integration_tests/src/config.rs b/integration_tests/src/config.rs index 2ddbd6a65..5284a2943 100644 --- a/integration_tests/src/config.rs +++ b/integration_tests/src/config.rs @@ -51,6 +51,17 @@ pub fn generate_rita_config_file(path: String) -> Result<(), KernelInterfaceErro .to_string(); lines.push(exit); + let contact_info = " + [exit_client.contact_info.number.national]\n + value = 7040000000\n + zeros = 0\n + [exit_client.contact_info.number.code]\n + source = \"plus\"\n + value = 1\n + " + .to_string(); + lines.push(contact_info); + let log = " [log]\n enabled = true\n" @@ -64,6 +75,7 @@ pub fn generate_exit_config_file(path: String) -> Result<(), KernelInterfaceErro let mut lines: Vec = Vec::new(); let desc = " db_uri = \"postgres://postgres@localhost/test\"\n + client_registration_url = \"https://7.7.7.1:40400/register_router\"\n workers = 1\n description = \"just a normal althea exit\"\n" .to_string(); @@ -115,6 +127,7 @@ pub fn generate_exit_config_file(path: String) -> Result<(), KernelInterfaceErro wg_public_key = \"H/ABwzXk834OwGYU8CZGfFxNZOd+BAJEaVDHiEiWWhU=\"\n wg_private_key = \"ALxcZm2r58gY0sB4vIfnjShc86qBoVK3f32H9VrwqWU=\"\n wg_private_key_path = \"/tmp/exit-priv\"\n + registered_users_contract_addr = \"0xb9b674D720F96995ca033ec347df080d500c2230\"\n pass = \"Some pass here\"\n" .to_string(); lines.push(exit_network); diff --git a/integration_tests/src/contract_test.rs b/integration_tests/src/contract_test.rs new file mode 100644 index 000000000..8f57d3c0c --- /dev/null +++ b/integration_tests/src/contract_test.rs @@ -0,0 +1,196 @@ +use std::{thread, time::Duration}; + +use althea_types::Identity; +use rita_client_registration::client_db::{ + add_client_to_registered_list, get_all_regsitered_clients, get_registered_client_using_ethkey, + get_registered_client_using_meship, get_registered_client_using_wgkey, +}; +use web30::client::Web3; + +use crate::{ + payments_eth::{ETH_MINER_KEY, WEB3_TIMEOUT}, + utils::{get_altheadb_contract_addr, get_eth_node}, +}; + +pub async fn run_altheadb_contract_test() { + // Try adding a dummy entry and validating that we can retrive them + validate_contract_functionality().await; + + thread::sleep(Duration::from_secs(1000)); +} + +pub async fn validate_contract_functionality() { + let contact = Web3::new(&get_eth_node(), WEB3_TIMEOUT); + + // Define the users + let user = Identity { + mesh_ip: "fd00::1337".parse().unwrap(), + eth_address: "0x02ad6b480DFeD806C63a0839C6f1f3136c5fD515" + .parse() + .unwrap(), + wg_public_key: "sPtNGQbyPpCsqSKD6PbnflB1lIUCd259Vhd0mJfJeGo=" + .parse() + .unwrap(), + nickname: None, + }; + + let user_2 = Identity { + mesh_ip: "fd00::1447:1447".parse().unwrap(), + eth_address: "0x1994A73F79F9648d4a8064D9C0F221fB1007Fd2F" + .parse() + .unwrap(), + wg_public_key: "Yhyj+CKZbyEKea/9hdIFje98yc5Cukt1Pbq0qWB4Aqw=" + .parse() + .unwrap(), + nickname: None, + }; + + let user_3 = Identity { + mesh_ip: "fd00::3000:1117".parse().unwrap(), + eth_address: "0x9c33D0dFdc9E3f7cC73bE3A575C31cfe3059C76a" + .parse() + .unwrap(), + wg_public_key: "fzOUfEqYzRE0MwfR5o7XV+MKZKj/qEfELRzQTRTKAB8=" + .parse() + .unwrap(), + nickname: None, + }; + + // Try requests when there are no users present + let res = get_all_regsitered_clients( + &contact, + ETH_MINER_KEY + .parse::() + .unwrap() + .to_address(), + get_altheadb_contract_addr(), + ) + .await + .unwrap(); + + assert!(res.is_empty()); + + let res = get_registered_client_using_wgkey( + user.wg_public_key, + ETH_MINER_KEY + .parse::() + .unwrap() + .to_address(), + get_altheadb_contract_addr(), + &contact, + ) + .await; + + assert!(res.is_err()); + + // Add the first user + let _res = add_client_to_registered_list( + &contact, + user, + get_altheadb_contract_addr(), + ETH_MINER_KEY.parse().unwrap(), + None, + vec![], + ) + .await + .unwrap(); + + thread::sleep(Duration::from_secs(5)); + + // Try requesting some info that doesnt exist + let res = get_registered_client_using_ethkey( + "0x3d261902a988d94599d7f0Bd4c2e4514D73BB329" + .parse() + .unwrap(), + ETH_MINER_KEY + .parse::() + .unwrap() + .to_address(), + get_altheadb_contract_addr(), + &contact, + ) + .await; + + assert!(res.is_err()); + + // Add the second user + let _res = add_client_to_registered_list( + &contact, + user_2, + get_altheadb_contract_addr(), + ETH_MINER_KEY.parse().unwrap(), + None, + vec![], + ) + .await + .unwrap(); + + thread::sleep(Duration::from_secs(5)); + + // Add the third user + let _res = add_client_to_registered_list( + &contact, + user_3, + get_altheadb_contract_addr(), + ETH_MINER_KEY.parse().unwrap(), + None, + vec![], + ) + .await + .unwrap(); + + thread::sleep(Duration::from_secs(10)); + + let res = get_all_regsitered_clients( + &contact, + ETH_MINER_KEY + .parse::() + .unwrap() + .to_address(), + get_altheadb_contract_addr(), + ) + .await; + + println!("All users are : {:?}", res); + + thread::sleep(Duration::from_secs(5)); + + let res = get_registered_client_using_wgkey( + user.wg_public_key, + ETH_MINER_KEY + .parse::() + .unwrap() + .to_address(), + get_altheadb_contract_addr(), + &contact, + ) + .await + .unwrap(); + assert_eq!(res, user); + + let res = get_registered_client_using_ethkey( + user_2.eth_address, + ETH_MINER_KEY + .parse::() + .unwrap() + .to_address(), + get_altheadb_contract_addr(), + &contact, + ) + .await + .unwrap(); + assert_eq!(res, user_2); + + let res = get_registered_client_using_meship( + user_3.mesh_ip, + ETH_MINER_KEY + .parse::() + .unwrap() + .to_address(), + get_altheadb_contract_addr(), + &contact, + ) + .await + .unwrap(); + assert_eq!(res, user_3); +} diff --git a/integration_tests/src/five_nodes.rs b/integration_tests/src/five_nodes.rs index bd03702c5..bcd3b1a63 100644 --- a/integration_tests/src/five_nodes.rs +++ b/integration_tests/src/five_nodes.rs @@ -1,14 +1,16 @@ +use crate::payments_eth::{ONE_ETH, WEB3_TIMEOUT}; use crate::setup_utils::database::start_postgres; use crate::setup_utils::namespaces::*; use crate::setup_utils::rita::thread_spawner; use crate::utils::{ - get_default_settings, register_all_namespaces_to_exit, test_all_internet_connectivity, - test_reach_all, test_routes, + get_default_settings, register_all_namespaces_to_exit, send_eth_bulk, + test_all_internet_connectivity, test_reach_all, test_routes, }; use log::info; use std::collections::HashMap; use std::thread; use std::time::Duration; +use web30::client::Web3; /// Runs a five node fixed network map test scenario, this does basic network setup and tests reachability to /// all destinations @@ -27,7 +29,7 @@ pub async fn run_five_node_test_scenario() { let res = setup_ns(namespaces.clone()); info!("Namespaces setup: {res:?}"); - let _ = thread_spawner(namespaces.clone(), client_settings, exit_settings) + let rita_identities = thread_spawner(namespaces.clone(), client_settings, exit_settings) .expect("Could not spawn Rita threads"); info!("Thread Spawner: {res:?}"); @@ -38,11 +40,26 @@ pub async fn run_five_node_test_scenario() { test_routes(namespaces.clone(), expected_routes); + // Exits need to have funds to request a registered client list, which is needed for proper setup + info!("Topup exits with funds"); + let web3 = Web3::new("http://localhost:8545", WEB3_TIMEOUT); + let mut to_top_up = Vec::new(); + for c in rita_identities.client_identities { + to_top_up.push(c.eth_address); + } + for e in rita_identities.exit_identities { + to_top_up.push(e.eth_address) + } + + info!("Sending 50 eth to all routers"); + send_eth_bulk((ONE_ETH * 50).into(), &to_top_up, &web3).await; + info!("Registering routers to the exit"); register_all_namespaces_to_exit(namespaces.clone()).await; + thread::sleep(Duration::from_secs(10)); + info!("Checking for wg_exit tunnel setup"); - thread::sleep(Duration::from_secs(5)); test_all_internet_connectivity(namespaces.clone()); } diff --git a/integration_tests/src/lib.rs b/integration_tests/src/lib.rs index b6653f52b..b0e86e794 100644 --- a/integration_tests/src/lib.rs +++ b/integration_tests/src/lib.rs @@ -4,11 +4,13 @@ extern crate lazy_static; use std::time::Duration; pub mod config; +pub mod contract_test; pub mod debts; pub mod five_nodes; pub mod mutli_exit; pub mod payments_althea; pub mod payments_eth; +pub mod registration_server; pub mod setup_utils; pub mod utils; diff --git a/integration_tests/src/registration_server.rs b/integration_tests/src/registration_server.rs new file mode 100644 index 000000000..da184a948 --- /dev/null +++ b/integration_tests/src/registration_server.rs @@ -0,0 +1,83 @@ +use std::thread; + +use actix_rt::System; +use actix_web::{ + web::{self, Json}, + App, HttpResponse, HttpServer, +}; +use althea_types::ExitClientIdentity; +use rita_client_registration::{ + client_conflict, handle_sms_registration, register_client_batch_loop, +}; +use web30::client::Web3; + +use crate::{ + payments_eth::{ETH_MINER_KEY, WEB3_TIMEOUT}, + utils::{get_altheadb_contract_addr, get_eth_node, get_test_runner_magic_phone}, +}; +use log::{error, info}; + +pub const REGISTRATION_PORT_SERVER: u16 = 40400; + +pub fn start_registration_server() { + // Start the register loop + register_client_batch_loop( + get_eth_node(), + get_altheadb_contract_addr(), + ETH_MINER_KEY.parse().unwrap(), + ); + + // Start endpoint listener + thread::spawn(move || { + let runner = System::new(); + runner.block_on(async move { + // Exit stuff, huge threadpool to offset Pgsql blocking + let _res = HttpServer::new(|| { + App::new() + .route("/register_router", web::post().to(register_router)) + .route("/test", web::get().to(test_endpoint)) + }) + .bind(format!("7.7.7.1:{}", REGISTRATION_PORT_SERVER)) + .unwrap() + .shutdown_timeout(0) + .run() + .await; + }); + }); +} + +async fn register_router(client: Json) -> HttpResponse { + let client = client.into_inner(); + info!("Attempting to register client: {}", client.global.mesh_ip); + let contact = Web3::new(&get_eth_node(), WEB3_TIMEOUT); + + // Check for an existing client + let client = client; + if client_conflict( + &client, + &contact, + get_altheadb_contract_addr(), + ETH_MINER_KEY + .parse::() + .unwrap() + .to_address(), + ) + .await + { + error!("Found a client conflict! {}", client.global.mesh_ip); + return HttpResponse::Unauthorized().finish(); + } + + HttpResponse::Ok().json( + handle_sms_registration( + client, + "dummy key".to_string(), + Some(get_test_runner_magic_phone()), + ) + .await, + ) +} + +async fn test_endpoint() -> HttpResponse { + HttpResponse::Ok().finish() +} diff --git a/integration_tests/src/utils.rs b/integration_tests/src/utils.rs index 98258189a..1563cd118 100644 --- a/integration_tests/src/utils.rs +++ b/integration_tests/src/utils.rs @@ -17,7 +17,7 @@ use althea_proto::{ use althea_types::{ContactType, Denom, Identity, SystemChain, WgKey}; use awc::http::StatusCode; use babel_monitor::{open_babel_stream, parse_routes, structs::Route}; -use clarity::{Transaction, Uint256}; +use clarity::{Address, Transaction, Uint256}; use deep_space::{Address as AltheaAddress, Coin, Contact, CosmosPrivateKey, PrivateKey}; use futures::future::join_all; use ipnetwork::IpNetwork; @@ -36,6 +36,7 @@ use settings::{client::RitaClientSettings, exit::RitaExitSettingsStruct}; use std::{ collections::{HashMap, HashSet}, net::Ipv6Addr, + process::Command, str::from_utf8, sync::{Arc, RwLock}, thread, @@ -63,6 +64,9 @@ pub const STAKING_TOKEN: &str = "aalthea"; pub const MIN_GLOBAL_FEE_AMOUNT: u128 = 10; pub const TOTAL_TIMEOUT: Duration = Duration::from_secs(300); pub const DEBT_ACCURACY_THRES: u8 = 15; +pub const ETH_NODE: &str = "http://localhost:8545"; +pub const MINER_PRIVATE_KEY: &str = + "0x34d97aaf58b1a81d3ed3068a870d8093c6341cf5d1ef7e6efa03fe7f7fc2c3a8"; lazy_static! { pub static ref TEST_EXIT_DETAILS: HashMap = { @@ -131,6 +135,40 @@ pub fn get_eth_node() -> String { format!("http://{}:8545", NODE_IP) } +pub fn get_altheadb_contract_addr() -> Address { + "0xb9b674D720F96995ca033ec347df080d500c2230" + .parse() + .unwrap() +} + +pub fn get_test_runner_magic_phone() -> String { + "+17040000000".to_string() +} + +pub async fn deploy_contracts() { + let contact = Contact::new( + &get_althea_grpc(), + ALTHEA_CONTACT_TIMEOUT, + ALTHEA_CHAIN_PREFIX, + ) + .unwrap(); + // prevents the node deployer from failing (rarely) when the chain has not + // yet produced the next block after submitting each eth address + contact.wait_for_next_block(TOTAL_TIMEOUT).await.unwrap(); + + let res = Command::new("npx") + .args([ + "ts-node", + "/althea_rs/solidity/contract-deployer.ts", + &format!("--eth-privkey={}", MINER_PRIVATE_KEY), + &format!("--eth-node={}", ETH_NODE), + ]) + .output() + .expect("Failed to deploy contracts!"); + + info!("Contract deploy returned {:?}", from_utf8(&res.stdout)); +} + /// Test pingability waiting and failing if it is not successful pub fn test_reach_all(nsinfo: NamespaceInfo) { let start = Instant::now(); @@ -349,9 +387,10 @@ pub fn get_default_settings( let mut exit = exit.clone(); let mut client = client.clone(); exit.network.mesh_ip = Some(cluster.root_ip); + exit.exit_network.cluster_exits = cluster_exits; client.exit_client.contact_info = Some( ContactType::Both { - number: "+11111111".parse().unwrap(), + number: get_test_runner_magic_phone().parse().unwrap(), email: "fake@fake.com".parse().unwrap(), sequence_number: Some(0), } @@ -422,7 +461,7 @@ pub async fn register_to_exit(namespace_name: String, exit_name: String) -> Stat let client = awc::Client::default(); let req = client .post(format!( - "http://localhost:4877/exits/{}/register", + "http://localhost:4877/exits/{}/verify/1111", exit_network.exit_name )) .send() @@ -1025,7 +1064,7 @@ pub async fn register_all_namespaces_to_exit(namespaces: NamespaceInfo) { break; } if Instant::now() - start > register_timeout { - panic!("Failed to register {} to exit", r.get_name()); + panic!("Failed to register {} to exit with {}", r.get_name(), res); } warn!("Failed {} registration to exit, trying again", r.get_name()); thread::sleep(Duration::from_secs(1)); diff --git a/rita_client/src/exit_manager/mod.rs b/rita_client/src/exit_manager/mod.rs index 9b65032a0..e584b5831 100644 --- a/rita_client/src/exit_manager/mod.rs +++ b/rita_client/src/exit_manager/mod.rs @@ -363,6 +363,8 @@ async fn send_exit_setup_request( ident: ExitClientIdentity, ) -> Result { let endpoint = format!("http://[{}]:{}/secure_setup", to.ip(), to.port()); + error!("Trying to hit endpoint: {:?}", endpoint); + let ident = encrypt_exit_client_id(&exit_pubkey.into(), ident); let client = awc::Client::default(); @@ -556,6 +558,7 @@ pub async fn exit_setup_request(exit: String, code: Option) -> Result<() ); let exit_response = send_exit_setup_request(exit_pubkey, endpoint, ident).await?; + let mut rita_client = settings::get_rita_client(); let current_exit = match rita_client.exit_client.exits.get_mut(&exit) { diff --git a/rita_client_registration/Cargo.toml b/rita_client_registration/Cargo.toml index 9146eab91..29a652c50 100644 --- a/rita_client_registration/Cargo.toml +++ b/rita_client_registration/Cargo.toml @@ -15,3 +15,5 @@ phonenumber = "0.3" awc = "3.1" web30 = "1.0" tokio = { version = "1.21", features = ["macros", "time"] } +actix = "0.13" +futures = { version = "0.3", features = ["compat"] } diff --git a/rita_client_registration/src/lib.rs b/rita_client_registration/src/lib.rs index 3dd09bbf4..d80af01cd 100644 --- a/rita_client_registration/src/lib.rs +++ b/rita_client_registration/src/lib.rs @@ -1,20 +1,30 @@ #![deny(unused_crate_dependencies)] use std::{ - collections::HashMap, + collections::{HashMap, HashSet}, sync::{Arc, RwLock}, - time::Duration, + thread, + time::{Duration, Instant}, }; -use althea_types::{ExitClientIdentity, WgKey}; -use clarity::{Address, PrivateKey}; +use actix::System; +use althea_types::{ExitClientIdentity, Identity, WgKey}; +use clarity::{ + abi::{encode_call, AbiToken}, + Address, PrivateKey, Uint256, +}; +use futures::future::join_all; use phonenumber::PhoneNumber; use serde::{Deserialize, Serialize}; use tokio::join; -use web30::client::Web3; +use web30::{ + client::Web3, + jsonrpc::error::Web3Error, + types::{SendTxOption, TransactionResponse}, +}; use crate::client_db::{ - add_client_to_registered_list, get_registered_client_using_ethkey, - get_registered_client_using_meship, get_registered_client_using_wgkey, + get_registered_client_using_ethkey, get_registered_client_using_meship, + get_registered_client_using_wgkey, }; #[macro_use] @@ -27,8 +37,13 @@ pub mod client_db; lazy_static! { /// A map that stores number of texts sent to a client during registration static ref TEXTS_SENT: Arc>> = Arc::new(RwLock::new(HashMap::new())); + static ref TX_BATCH: Arc>> = Arc::new(RwLock::new(HashSet::new())); } +const REGISTRATION_LOOP_SPEED: Duration = Duration::from_secs(10); +const WEB3_TIMEOUT: Duration = Duration::from_secs(15); +pub const TX_TIMEOUT: Duration = Duration::from_secs(60); + /// Return struct from check_text and Send Text. Verified indicates status from api http req, /// bad phone number is an error parsing clients phone number /// Internal server error is an error while querying api endpoint @@ -64,6 +79,18 @@ fn get_texts_sent(key: WgKey) -> u8 { *TEXTS_SENT.read().unwrap().get(&key).unwrap_or(&0u8) } +fn add_client_to_reg_batch(id: Identity) { + TX_BATCH.write().unwrap().insert(id); +} + +fn remove_client_from_reg_batch(id: Identity) { + TX_BATCH.write().unwrap().remove(&id); +} + +fn get_reg_batch() -> Vec { + TX_BATCH.read().unwrap().clone().into_iter().collect() +} + #[derive(Serialize)] pub struct SmsCheck { api_key: String, @@ -132,7 +159,7 @@ async fn client_exists( match c_id { Ok(a) => client.global == a, Err(e) => { - error!( + warn!( "Error retrieving an identity with wg key {} with {}", client.global.wg_public_key, e ); @@ -146,15 +173,15 @@ pub async fn handle_sms_registration( client: ExitClientIdentity, api_key: String, magic_number: Option, - contact: &Web3, - contract_addr: Address, - our_private_key: PrivateKey, - wait_timeout: Option, ) -> ExitSignupReturn { info!( "Handling phone registration for {}", client.global.wg_public_key ); + error!( + "Handling phone registration for {}", + client.global.wg_public_key + ); // Get magic phone number let magic_phone_number = magic_number; @@ -171,25 +198,19 @@ pub async fn handle_sms_registration( (Some(number), Some(code), true) => { let is_magic = magic_phone_number.is_some() && magic_phone_number.unwrap() == number.clone(); - let check_text = match check_text(number.clone(), code, api_key).await { - Ok(a) => a, - Err(e) => return return_api_error(e), + let result = is_magic || { + match check_text(number.clone(), code, api_key).await { + Ok(a) => a, + Err(e) => return return_api_error(e), + } }; - let result = is_magic || check_text; if result { info!( "Phone registration complete for {}", client.global.wg_public_key ); - let _ = add_client_to_registered_list( - contact, - client.global, - contract_addr, - our_private_key, - wait_timeout, - vec![], - ) - .await; + + add_client_to_reg_batch(client.global); reset_texts_sent(client.global.wg_public_key); ExitSignupReturn::RegistrationOk } else { @@ -210,27 +231,20 @@ pub async fn handle_sms_registration( (Some(number), Some(code), false) => { let is_magic = magic_phone_number.is_some() && magic_phone_number.unwrap() == number.clone(); - let check_text = match check_text(number.clone(), code, api_key).await { - Ok(a) => a, - Err(e) => return return_api_error(e), - }; - let result = is_magic | check_text; + let result = is_magic || { + match check_text(number.clone(), code, api_key).await { + Ok(a) => a, + Err(e) => return return_api_error(e), + } + }; trace!("Check text returned {}", result); if result { info!( "Phone registration complete for {}", client.global.wg_public_key ); - let _ = add_client_to_registered_list( - contact, - client.global, - contract_addr, - our_private_key, - wait_timeout, - vec![], - ) - .await; + add_client_to_reg_batch(client.global); reset_texts_sent(client.global.wg_public_key); ExitSignupReturn::RegistrationOk } else { @@ -319,3 +333,146 @@ async fn send_text(number: String, api_key: String) -> Result<(), TextApiError> } } } + +pub fn register_client_batch_loop( + web3_url: String, + contract_addr: Address, + our_private_key: PrivateKey, +) { + let mut last_restart = Instant::now(); + thread::spawn(move || { + // this will always be an error, so it's really just a loop statement + // with some fancy destructuring + while let Err(e) = { + let web3 = web3_url.clone(); + thread::spawn(move || { + // Our Exit state variabl + let runner = System::new(); + + runner.block_on(async move { + loop { + let start = Instant::now(); + info!("Registration Loop tick"); + + let reg_clients = get_reg_batch(); + let contact = Web3::new(&web3, WEB3_TIMEOUT); + let mut nonce_retries = 0; + let mut nonce; + loop { + match contact + .eth_get_transaction_count(our_private_key.to_address()) + .await + { + Ok(a) => { + nonce = a; + break; + } + Err(e) => { + error!("Unable to get nonce to register routers: {}", e); + if nonce_retries > 10 { + error!("Cant register routers, panicing!"); + let sys = System::current(); + sys.stop(); + panic!( + "{}", + format!( + "Unable to get nonce to register routers: {}", + e + ) + ); + } + nonce_retries += 1; + thread::sleep(Duration::from_secs(5)); + continue; + } + } + } + + let mut batch = vec![]; + for id in reg_clients { + error!("This batch has {}", id.mesh_ip); + match contact + .send_transaction( + contract_addr, + match encode_call( + "add_registered_user((string,string,address))", + &[AbiToken::Struct(vec![ + AbiToken::String(id.mesh_ip.to_string()), + AbiToken::String(id.wg_public_key.to_string()), + AbiToken::Address(id.eth_address), + ])], + ) { + Ok(a) => a, + Err(e) => { + error!( + "REGISTRATION ERROR: Why cant we encode this call? {}", + e + ); + continue; + } + }, + 0u32.into(), + our_private_key, + vec![SendTxOption::Nonce(nonce)], + ) + .await + { + Ok(tx_id) => { + //increment nonce for next tx + nonce += 1u64.into(); + remove_client_from_reg_batch(id); + batch.push(tx_id); + } + Err(e) => { + error!( + "Failed registration for {} with {}", + id.wg_public_key, e + ); + } + } + } + + // Join on txs + let res = wait_for_txids(batch, &contact).await; + trace!("Received Transactions: {:?}", res); + + info!("Registration loop elapsed in = {:?}", start.elapsed()); + if start.elapsed() < REGISTRATION_LOOP_SPEED { + info!( + "Registration Loop sleeping for {:?}", + REGISTRATION_LOOP_SPEED - start.elapsed() + ); + thread::sleep(REGISTRATION_LOOP_SPEED - start.elapsed()); + } + info!("Registration loop sleeping Done!"); + } + }); + }) + .join() + } { + error!( + "Rita client Exit Manager loop thread paniced! Respawning {:?}", + e + ); + if Instant::now() - last_restart < Duration::from_secs(60) { + error!("Restarting too quickly, leaving it to auto rescue!"); + let sys = System::current(); + sys.stop_with_code(121); + } + last_restart = Instant::now(); + } + }); +} + +/// utility function that waits for a large number of txids to enter a block +async fn wait_for_txids( + txids: Vec, + web3: &Web3, +) -> Vec> { + let mut wait_for_txid = Vec::new(); + for txid in txids { + let wait = web3.wait_for_transaction(txid, TX_TIMEOUT, None); + wait_for_txid.push(wait); + } + join_all(wait_for_txid).await +} diff --git a/rita_exit/src/database/mod.rs b/rita_exit/src/database/mod.rs index 52da1b2d0..ae06e6502 100644 --- a/rita_exit/src/database/mod.rs +++ b/rita_exit/src/database/mod.rs @@ -118,9 +118,9 @@ pub async fn forward_client_signup_request(exit_client: ExitClientIdentity) -> E let url: &str; let reg_url = get_rita_exit().client_registration_url; if cfg!(feature = "dev_env") { - url = "http://0.0.0.0:8080/register_router"; + url = "http://7.7.7.1:40400/register_router"; } else if cfg!(feature = "operator_debug") { - url = "http://192.168.10.2:8080/register_router"; + url = "http://192.168.10.2:40400/register_router"; } else { url = ®_url; } @@ -129,6 +129,10 @@ pub async fn forward_client_signup_request(exit_client: ExitClientIdentity) -> E "About to request client {} registration with {}", exit_client.global, url ); + error!( + "About to request client {} registration with {}", + exit_client.global, url + ); let client = awc::Client::default(); let response = client diff --git a/rita_exit/src/rita_loop/mod.rs b/rita_exit/src/rita_loop/mod.rs index ec5030f1b..5ee1a9284 100644 --- a/rita_exit/src/rita_loop/mod.rs +++ b/rita_exit/src/rita_loop/mod.rs @@ -40,6 +40,8 @@ use rita_common::KI; pub const EXIT_LOOP_SPEED: u64 = 5; pub const EXIT_LOOP_SPEED_DURATION: Duration = Duration::from_secs(EXIT_LOOP_SPEED); pub const EXIT_LOOP_TIMEOUT: Duration = Duration::from_secs(4); +/// Retry getting all clients for 5 mins before crashing +pub const GET_CLIENT_RETRY: Duration = Duration::from_secs(300); /// Name of the legacy exit interface pub const LEGACY_INTERFACE: &str = "wg_exit"; @@ -126,23 +128,37 @@ async fn rita_exit_loop(rita_exit_cache: RitaExitCache, usage_history: ExitLock) let contract_addr = rita_exit.exit_network.registered_users_contract_addr; let get_clients_benchmark = Instant::now(); - let reg_clients_list = match get_all_regsitered_clients(&contact, our_addr, contract_addr).await - { - Ok(a) => a, - Err(e) => { - // Getting all clients is core functionality, we panic if fails - let message = format!( - "Failed to get all registered users with {}. Web3 url: {}, contract_addr: {}", + + // We retry getting users for 5 mins before we crash. Getting registered users is core functionality + let reg_clients_list; + let retry_start = Instant::now(); + loop { + match get_all_regsitered_clients(&contact, our_addr, contract_addr).await { + Ok(a) => { + reg_clients_list = a; + break; + } + Err(e) => { + // Getting all clients is core functionality, we panic if fails + let message = format!( + "Failed to get all registered users with {}. Web3 url: {}, contract_addr: {}. This is required for exit to funciton correctly", e, get_web3_server(), contract_addr ); - error!("{}", message); - let sys = AsyncSystem::current(); - sys.stop(); - panic!("{}", message); - } - }; + error!("{}", message); + thread::sleep(Duration::from_secs(10)); + if Instant::now() - retry_start > GET_CLIENT_RETRY { + let sys = AsyncSystem::current(); + sys.stop(); + panic!("{}", message); + } + } + }; + } + + error!("received reg clients: {:?}", reg_clients_list); + info!( "Finished Rita get clients, got {:?} clients in {}ms", reg_clients_list.len(), diff --git a/test_runner/src/main.rs b/test_runner/src/main.rs index 1f87d7a33..e52ba9606 100644 --- a/test_runner/src/main.rs +++ b/test_runner/src/main.rs @@ -1,36 +1,27 @@ -use althea_types::Identity; -use deep_space::Contact; use integration_tests::config::{ generate_exit_config_file, generate_rita_config_file, CONFIG_FILE_PATH, EXIT_CONFIG_PATH, }; +use integration_tests::contract_test::run_altheadb_contract_test; use integration_tests::debts::run_debts_test; /// Binary crate for actually running the integration tests use integration_tests::five_nodes::run_five_node_test_scenario; use integration_tests::mutli_exit::run_multi_exit_test; -use integration_tests::payments_eth::{ETH_MINER_KEY, WEB3_TIMEOUT}; -use integration_tests::utils::{get_althea_grpc, get_eth_node, TOTAL_TIMEOUT}; +use integration_tests::registration_server::start_registration_server; +use integration_tests::utils::deploy_contracts; use integration_tests::{ payments_althea::run_althea_payments_test_scenario, payments_eth::run_eth_payments_test_scenario, utils::set_sigterm, }; use log::info; -use rita_client_registration::client_db::{ - add_client_to_registered_list, get_all_regsitered_clients, get_registered_client_using_ethkey, - get_registered_client_using_meship, get_registered_client_using_wgkey, -}; -use std::process::Command; -use std::str::from_utf8; + +use std::env; + use std::time::Duration; -use std::{env, thread}; -use web30::client::Web3; extern crate log; pub const ALTHEA_CHAIN_PREFIX: &str = "althea"; pub const ALTHEA_CONTACT_TIMEOUT: Duration = Duration::from_secs(30); -pub const ETH_NODE: &str = "http://localhost:8545"; -pub const MINER_PRIVATE_KEY: &str = - "0x34d97aaf58b1a81d3ed3068a870d8093c6341cf5d1ef7e6efa03fe7f7fc2c3a8"; #[actix_rt::main] async fn main() { @@ -54,8 +45,8 @@ async fn main() { println!("Waiting to deploy contracts"); deploy_contracts().await; - // Try adding a dummy entry and validating that we can retrive them - validate_contract_functionality().await; + println!("Starting registration server"); + start_registration_server(); let test_type = env::var("TEST_TYPE"); info!("Starting tests with {:?}", test_type); @@ -70,6 +61,8 @@ async fn main() { run_althea_payments_test_scenario().await } else if test_type == "MULTI_EXIT" { run_multi_exit_test().await + } else if test_type == "CONTRACT_TEST" { + run_altheadb_contract_test().await } else { panic!("Error unknown test type {}!", test_type); } @@ -77,223 +70,3 @@ async fn main() { panic!("Error test type not set!"); } } - -pub async fn deploy_contracts() { - let contact = Contact::new( - &get_althea_grpc(), - ALTHEA_CONTACT_TIMEOUT, - ALTHEA_CHAIN_PREFIX, - ) - .unwrap(); - // prevents the node deployer from failing (rarely) when the chain has not - // yet produced the next block after submitting each eth address - contact.wait_for_next_block(TOTAL_TIMEOUT).await.unwrap(); - - let res = Command::new("npx") - .args([ - "ts-node", - "/althea_rs/solidity/contract-deployer.ts", - &format!("--eth-privkey={}", MINER_PRIVATE_KEY), - &format!("--eth-node={}", ETH_NODE), - ]) - .output() - .expect("Failed to deploy contracts!"); - - println!("Contract deploy returned {:?}", from_utf8(&res.stdout)); -} - -pub async fn validate_contract_functionality() { - let contact = Web3::new(&get_eth_node(), WEB3_TIMEOUT); - - // Define the users - let user = Identity { - mesh_ip: "fd00::1337".parse().unwrap(), - eth_address: "0x02ad6b480DFeD806C63a0839C6f1f3136c5fD515" - .parse() - .unwrap(), - wg_public_key: "sPtNGQbyPpCsqSKD6PbnflB1lIUCd259Vhd0mJfJeGo=" - .parse() - .unwrap(), - nickname: None, - }; - - let user_2 = Identity { - mesh_ip: "fd00::1447:1447".parse().unwrap(), - eth_address: "0x1994A73F79F9648d4a8064D9C0F221fB1007Fd2F" - .parse() - .unwrap(), - wg_public_key: "Yhyj+CKZbyEKea/9hdIFje98yc5Cukt1Pbq0qWB4Aqw=" - .parse() - .unwrap(), - nickname: None, - }; - - let user_3 = Identity { - mesh_ip: "fd00::3000:1117".parse().unwrap(), - eth_address: "0x9c33D0dFdc9E3f7cC73bE3A575C31cfe3059C76a" - .parse() - .unwrap(), - wg_public_key: "fzOUfEqYzRE0MwfR5o7XV+MKZKj/qEfELRzQTRTKAB8=" - .parse() - .unwrap(), - nickname: None, - }; - - // Try requests when there are no users present - let res = get_all_regsitered_clients( - &contact, - ETH_MINER_KEY - .parse::() - .unwrap() - .to_address(), - "0xb9b674D720F96995ca033ec347df080d500c2230" - .parse() - .unwrap(), - ) - .await - .unwrap(); - - assert!(res.is_empty()); - - let res = get_registered_client_using_wgkey( - user.wg_public_key, - ETH_MINER_KEY - .parse::() - .unwrap() - .to_address(), - "0xb9b674D720F96995ca033ec347df080d500c2230" - .parse() - .unwrap(), - &contact, - ) - .await; - - assert!(res.is_err()); - - // Add the first user - let _res = add_client_to_registered_list( - &contact, - user, - "0xb9b674D720F96995ca033ec347df080d500c2230" - .parse() - .unwrap(), - ETH_MINER_KEY.parse().unwrap(), - None, - vec![], - ) - .await - .unwrap(); - - thread::sleep(Duration::from_secs(5)); - - // Try requesting some info that doesnt exist - let res = get_registered_client_using_ethkey( - "0x3d261902a988d94599d7f0Bd4c2e4514D73BB329" - .parse() - .unwrap(), - ETH_MINER_KEY - .parse::() - .unwrap() - .to_address(), - "0xb9b674D720F96995ca033ec347df080d500c2230" - .parse() - .unwrap(), - &contact, - ) - .await; - - assert!(res.is_err()); - - // Add the second user - let _res = add_client_to_registered_list( - &contact, - user_2, - "0xb9b674D720F96995ca033ec347df080d500c2230" - .parse() - .unwrap(), - ETH_MINER_KEY.parse().unwrap(), - None, - vec![], - ) - .await - .unwrap(); - - thread::sleep(Duration::from_secs(5)); - - // Add the third user - let _res = add_client_to_registered_list( - &contact, - user_3, - "0xb9b674D720F96995ca033ec347df080d500c2230" - .parse() - .unwrap(), - ETH_MINER_KEY.parse().unwrap(), - None, - vec![], - ) - .await - .unwrap(); - - thread::sleep(Duration::from_secs(10)); - - let res = get_all_regsitered_clients( - &contact, - ETH_MINER_KEY - .parse::() - .unwrap() - .to_address(), - "0xb9b674D720F96995ca033ec347df080d500c2230" - .parse() - .unwrap(), - ) - .await; - - println!("All users are : {:?}", res); - - thread::sleep(Duration::from_secs(5)); - - let res = get_registered_client_using_wgkey( - user.wg_public_key, - ETH_MINER_KEY - .parse::() - .unwrap() - .to_address(), - "0xb9b674D720F96995ca033ec347df080d500c2230" - .parse() - .unwrap(), - &contact, - ) - .await - .unwrap(); - assert_eq!(res, user); - - let res = get_registered_client_using_ethkey( - user_2.eth_address, - ETH_MINER_KEY - .parse::() - .unwrap() - .to_address(), - "0xb9b674D720F96995ca033ec347df080d500c2230" - .parse() - .unwrap(), - &contact, - ) - .await - .unwrap(); - assert_eq!(res, user_2); - - let res = get_registered_client_using_meship( - user_3.mesh_ip, - ETH_MINER_KEY - .parse::() - .unwrap() - .to_address(), - "0xb9b674D720F96995ca033ec347df080d500c2230" - .parse() - .unwrap(), - &contact, - ) - .await - .unwrap(); - assert_eq!(res, user_3); -}