From 2d1d3c5b5f3483e3033203a5e00253fca7334c5a Mon Sep 17 00:00:00 2001 From: Pranay Tulugu Date: Wed, 13 Sep 2023 10:15:51 -0700 Subject: [PATCH] Add state based registration to registration server Since the register endpoint can be called simultaneously by multiple router, we need to queue up these txs and batch them together and setup the correct nonce --- Cargo.lock | 4 + integration_tests/Cargo.toml | 4 + integration_tests/src/config.rs | 13 + integration_tests/src/contract_test.rs | 196 +++++++++++++++ integration_tests/src/five_nodes.rs | 25 +- integration_tests/src/lib.rs | 2 + integration_tests/src/registration_server.rs | 83 +++++++ integration_tests/src/utils.rs | 47 +++- rita_client/src/exit_manager/mod.rs | 3 + rita_client_registration/Cargo.toml | 2 + rita_client_registration/src/lib.rs | 235 +++++++++++++++--- rita_exit/src/database/mod.rs | 8 +- rita_exit/src/rita_loop/mod.rs | 42 +++- test_runner/src/main.rs | 247 +------------------ 14 files changed, 612 insertions(+), 299 deletions(-) create mode 100644 integration_tests/src/contract_test.rs create mode 100644 integration_tests/src/registration_server.rs diff --git a/Cargo.lock b/Cargo.lock index f27a67d84..0bf160cdd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1608,6 +1608,7 @@ version = "0.1.0" dependencies = [ "actix", "actix-rt", + "actix-web", "althea_kernel_interface", "althea_proto", "althea_types", @@ -1630,6 +1631,7 @@ dependencies = [ "num256", "petgraph", "rita_client", + "rita_client_registration", "rita_common", "rita_exit", "settings", @@ -2672,9 +2674,11 @@ dependencies = [ name = "rita_client_registration" version = "0.1.0" dependencies = [ + "actix", "althea_types", "awc", "clarity", + "futures 0.3.28", "lazy_static", "log", "phonenumber", diff --git a/integration_tests/Cargo.toml b/integration_tests/Cargo.toml index 0b9487f0a..4fb51fb48 100644 --- a/integration_tests/Cargo.toml +++ b/integration_tests/Cargo.toml @@ -21,6 +21,7 @@ settings = { path = "../settings" } rita_client = { path = "../rita_client", features = ["dev_env"] } rita_common = { path = "../rita_common", features = ["integration_test"] } rita_exit = { path = "../rita_exit", features = ["dev_env"] } +rita_client_registration = { path = "../rita_client_registration" } ctrlc = { version = "3.2.1", features = ["termination"] } diesel = { version = "1.4", features = ["postgres", "r2d2"] } diesel_migrations = { version = "1.4", features = ["postgres"] } @@ -34,3 +35,6 @@ num256 = "0.5" num-traits = "0.2" web30 = "1.0" lazy_static = "1.4" +actix-web = { version = "4.3", default_features = false, features = [ + "openssl", +] } diff --git a/integration_tests/src/config.rs b/integration_tests/src/config.rs index 2ddbd6a65..5284a2943 100644 --- a/integration_tests/src/config.rs +++ b/integration_tests/src/config.rs @@ -51,6 +51,17 @@ pub fn generate_rita_config_file(path: String) -> Result<(), KernelInterfaceErro .to_string(); lines.push(exit); + let contact_info = " + [exit_client.contact_info.number.national]\n + value = 7040000000\n + zeros = 0\n + [exit_client.contact_info.number.code]\n + source = \"plus\"\n + value = 1\n + " + .to_string(); + lines.push(contact_info); + let log = " [log]\n enabled = true\n" @@ -64,6 +75,7 @@ pub fn generate_exit_config_file(path: String) -> Result<(), KernelInterfaceErro let mut lines: Vec = Vec::new(); let desc = " db_uri = \"postgres://postgres@localhost/test\"\n + client_registration_url = \"https://7.7.7.1:40400/register_router\"\n workers = 1\n description = \"just a normal althea exit\"\n" .to_string(); @@ -115,6 +127,7 @@ pub fn generate_exit_config_file(path: String) -> Result<(), KernelInterfaceErro wg_public_key = \"H/ABwzXk834OwGYU8CZGfFxNZOd+BAJEaVDHiEiWWhU=\"\n wg_private_key = \"ALxcZm2r58gY0sB4vIfnjShc86qBoVK3f32H9VrwqWU=\"\n wg_private_key_path = \"/tmp/exit-priv\"\n + registered_users_contract_addr = \"0xb9b674D720F96995ca033ec347df080d500c2230\"\n pass = \"Some pass here\"\n" .to_string(); lines.push(exit_network); diff --git a/integration_tests/src/contract_test.rs b/integration_tests/src/contract_test.rs new file mode 100644 index 000000000..8f57d3c0c --- /dev/null +++ b/integration_tests/src/contract_test.rs @@ -0,0 +1,196 @@ +use std::{thread, time::Duration}; + +use althea_types::Identity; +use rita_client_registration::client_db::{ + add_client_to_registered_list, get_all_regsitered_clients, get_registered_client_using_ethkey, + get_registered_client_using_meship, get_registered_client_using_wgkey, +}; +use web30::client::Web3; + +use crate::{ + payments_eth::{ETH_MINER_KEY, WEB3_TIMEOUT}, + utils::{get_altheadb_contract_addr, get_eth_node}, +}; + +pub async fn run_altheadb_contract_test() { + // Try adding a dummy entry and validating that we can retrive them + validate_contract_functionality().await; + + thread::sleep(Duration::from_secs(1000)); +} + +pub async fn validate_contract_functionality() { + let contact = Web3::new(&get_eth_node(), WEB3_TIMEOUT); + + // Define the users + let user = Identity { + mesh_ip: "fd00::1337".parse().unwrap(), + eth_address: "0x02ad6b480DFeD806C63a0839C6f1f3136c5fD515" + .parse() + .unwrap(), + wg_public_key: "sPtNGQbyPpCsqSKD6PbnflB1lIUCd259Vhd0mJfJeGo=" + .parse() + .unwrap(), + nickname: None, + }; + + let user_2 = Identity { + mesh_ip: "fd00::1447:1447".parse().unwrap(), + eth_address: "0x1994A73F79F9648d4a8064D9C0F221fB1007Fd2F" + .parse() + .unwrap(), + wg_public_key: "Yhyj+CKZbyEKea/9hdIFje98yc5Cukt1Pbq0qWB4Aqw=" + .parse() + .unwrap(), + nickname: None, + }; + + let user_3 = Identity { + mesh_ip: "fd00::3000:1117".parse().unwrap(), + eth_address: "0x9c33D0dFdc9E3f7cC73bE3A575C31cfe3059C76a" + .parse() + .unwrap(), + wg_public_key: "fzOUfEqYzRE0MwfR5o7XV+MKZKj/qEfELRzQTRTKAB8=" + .parse() + .unwrap(), + nickname: None, + }; + + // Try requests when there are no users present + let res = get_all_regsitered_clients( + &contact, + ETH_MINER_KEY + .parse::() + .unwrap() + .to_address(), + get_altheadb_contract_addr(), + ) + .await + .unwrap(); + + assert!(res.is_empty()); + + let res = get_registered_client_using_wgkey( + user.wg_public_key, + ETH_MINER_KEY + .parse::() + .unwrap() + .to_address(), + get_altheadb_contract_addr(), + &contact, + ) + .await; + + assert!(res.is_err()); + + // Add the first user + let _res = add_client_to_registered_list( + &contact, + user, + get_altheadb_contract_addr(), + ETH_MINER_KEY.parse().unwrap(), + None, + vec![], + ) + .await + .unwrap(); + + thread::sleep(Duration::from_secs(5)); + + // Try requesting some info that doesnt exist + let res = get_registered_client_using_ethkey( + "0x3d261902a988d94599d7f0Bd4c2e4514D73BB329" + .parse() + .unwrap(), + ETH_MINER_KEY + .parse::() + .unwrap() + .to_address(), + get_altheadb_contract_addr(), + &contact, + ) + .await; + + assert!(res.is_err()); + + // Add the second user + let _res = add_client_to_registered_list( + &contact, + user_2, + get_altheadb_contract_addr(), + ETH_MINER_KEY.parse().unwrap(), + None, + vec![], + ) + .await + .unwrap(); + + thread::sleep(Duration::from_secs(5)); + + // Add the third user + let _res = add_client_to_registered_list( + &contact, + user_3, + get_altheadb_contract_addr(), + ETH_MINER_KEY.parse().unwrap(), + None, + vec![], + ) + .await + .unwrap(); + + thread::sleep(Duration::from_secs(10)); + + let res = get_all_regsitered_clients( + &contact, + ETH_MINER_KEY + .parse::() + .unwrap() + .to_address(), + get_altheadb_contract_addr(), + ) + .await; + + println!("All users are : {:?}", res); + + thread::sleep(Duration::from_secs(5)); + + let res = get_registered_client_using_wgkey( + user.wg_public_key, + ETH_MINER_KEY + .parse::() + .unwrap() + .to_address(), + get_altheadb_contract_addr(), + &contact, + ) + .await + .unwrap(); + assert_eq!(res, user); + + let res = get_registered_client_using_ethkey( + user_2.eth_address, + ETH_MINER_KEY + .parse::() + .unwrap() + .to_address(), + get_altheadb_contract_addr(), + &contact, + ) + .await + .unwrap(); + assert_eq!(res, user_2); + + let res = get_registered_client_using_meship( + user_3.mesh_ip, + ETH_MINER_KEY + .parse::() + .unwrap() + .to_address(), + get_altheadb_contract_addr(), + &contact, + ) + .await + .unwrap(); + assert_eq!(res, user_3); +} diff --git a/integration_tests/src/five_nodes.rs b/integration_tests/src/five_nodes.rs index bd03702c5..bcd3b1a63 100644 --- a/integration_tests/src/five_nodes.rs +++ b/integration_tests/src/five_nodes.rs @@ -1,14 +1,16 @@ +use crate::payments_eth::{ONE_ETH, WEB3_TIMEOUT}; use crate::setup_utils::database::start_postgres; use crate::setup_utils::namespaces::*; use crate::setup_utils::rita::thread_spawner; use crate::utils::{ - get_default_settings, register_all_namespaces_to_exit, test_all_internet_connectivity, - test_reach_all, test_routes, + get_default_settings, register_all_namespaces_to_exit, send_eth_bulk, + test_all_internet_connectivity, test_reach_all, test_routes, }; use log::info; use std::collections::HashMap; use std::thread; use std::time::Duration; +use web30::client::Web3; /// Runs a five node fixed network map test scenario, this does basic network setup and tests reachability to /// all destinations @@ -27,7 +29,7 @@ pub async fn run_five_node_test_scenario() { let res = setup_ns(namespaces.clone()); info!("Namespaces setup: {res:?}"); - let _ = thread_spawner(namespaces.clone(), client_settings, exit_settings) + let rita_identities = thread_spawner(namespaces.clone(), client_settings, exit_settings) .expect("Could not spawn Rita threads"); info!("Thread Spawner: {res:?}"); @@ -38,11 +40,26 @@ pub async fn run_five_node_test_scenario() { test_routes(namespaces.clone(), expected_routes); + // Exits need to have funds to request a registered client list, which is needed for proper setup + info!("Topup exits with funds"); + let web3 = Web3::new("http://localhost:8545", WEB3_TIMEOUT); + let mut to_top_up = Vec::new(); + for c in rita_identities.client_identities { + to_top_up.push(c.eth_address); + } + for e in rita_identities.exit_identities { + to_top_up.push(e.eth_address) + } + + info!("Sending 50 eth to all routers"); + send_eth_bulk((ONE_ETH * 50).into(), &to_top_up, &web3).await; + info!("Registering routers to the exit"); register_all_namespaces_to_exit(namespaces.clone()).await; + thread::sleep(Duration::from_secs(10)); + info!("Checking for wg_exit tunnel setup"); - thread::sleep(Duration::from_secs(5)); test_all_internet_connectivity(namespaces.clone()); } diff --git a/integration_tests/src/lib.rs b/integration_tests/src/lib.rs index b6653f52b..b0e86e794 100644 --- a/integration_tests/src/lib.rs +++ b/integration_tests/src/lib.rs @@ -4,11 +4,13 @@ extern crate lazy_static; use std::time::Duration; pub mod config; +pub mod contract_test; pub mod debts; pub mod five_nodes; pub mod mutli_exit; pub mod payments_althea; pub mod payments_eth; +pub mod registration_server; pub mod setup_utils; pub mod utils; diff --git a/integration_tests/src/registration_server.rs b/integration_tests/src/registration_server.rs new file mode 100644 index 000000000..da184a948 --- /dev/null +++ b/integration_tests/src/registration_server.rs @@ -0,0 +1,83 @@ +use std::thread; + +use actix_rt::System; +use actix_web::{ + web::{self, Json}, + App, HttpResponse, HttpServer, +}; +use althea_types::ExitClientIdentity; +use rita_client_registration::{ + client_conflict, handle_sms_registration, register_client_batch_loop, +}; +use web30::client::Web3; + +use crate::{ + payments_eth::{ETH_MINER_KEY, WEB3_TIMEOUT}, + utils::{get_altheadb_contract_addr, get_eth_node, get_test_runner_magic_phone}, +}; +use log::{error, info}; + +pub const REGISTRATION_PORT_SERVER: u16 = 40400; + +pub fn start_registration_server() { + // Start the register loop + register_client_batch_loop( + get_eth_node(), + get_altheadb_contract_addr(), + ETH_MINER_KEY.parse().unwrap(), + ); + + // Start endpoint listener + thread::spawn(move || { + let runner = System::new(); + runner.block_on(async move { + // Exit stuff, huge threadpool to offset Pgsql blocking + let _res = HttpServer::new(|| { + App::new() + .route("/register_router", web::post().to(register_router)) + .route("/test", web::get().to(test_endpoint)) + }) + .bind(format!("7.7.7.1:{}", REGISTRATION_PORT_SERVER)) + .unwrap() + .shutdown_timeout(0) + .run() + .await; + }); + }); +} + +async fn register_router(client: Json) -> HttpResponse { + let client = client.into_inner(); + info!("Attempting to register client: {}", client.global.mesh_ip); + let contact = Web3::new(&get_eth_node(), WEB3_TIMEOUT); + + // Check for an existing client + let client = client; + if client_conflict( + &client, + &contact, + get_altheadb_contract_addr(), + ETH_MINER_KEY + .parse::() + .unwrap() + .to_address(), + ) + .await + { + error!("Found a client conflict! {}", client.global.mesh_ip); + return HttpResponse::Unauthorized().finish(); + } + + HttpResponse::Ok().json( + handle_sms_registration( + client, + "dummy key".to_string(), + Some(get_test_runner_magic_phone()), + ) + .await, + ) +} + +async fn test_endpoint() -> HttpResponse { + HttpResponse::Ok().finish() +} diff --git a/integration_tests/src/utils.rs b/integration_tests/src/utils.rs index 98258189a..1563cd118 100644 --- a/integration_tests/src/utils.rs +++ b/integration_tests/src/utils.rs @@ -17,7 +17,7 @@ use althea_proto::{ use althea_types::{ContactType, Denom, Identity, SystemChain, WgKey}; use awc::http::StatusCode; use babel_monitor::{open_babel_stream, parse_routes, structs::Route}; -use clarity::{Transaction, Uint256}; +use clarity::{Address, Transaction, Uint256}; use deep_space::{Address as AltheaAddress, Coin, Contact, CosmosPrivateKey, PrivateKey}; use futures::future::join_all; use ipnetwork::IpNetwork; @@ -36,6 +36,7 @@ use settings::{client::RitaClientSettings, exit::RitaExitSettingsStruct}; use std::{ collections::{HashMap, HashSet}, net::Ipv6Addr, + process::Command, str::from_utf8, sync::{Arc, RwLock}, thread, @@ -63,6 +64,9 @@ pub const STAKING_TOKEN: &str = "aalthea"; pub const MIN_GLOBAL_FEE_AMOUNT: u128 = 10; pub const TOTAL_TIMEOUT: Duration = Duration::from_secs(300); pub const DEBT_ACCURACY_THRES: u8 = 15; +pub const ETH_NODE: &str = "http://localhost:8545"; +pub const MINER_PRIVATE_KEY: &str = + "0x34d97aaf58b1a81d3ed3068a870d8093c6341cf5d1ef7e6efa03fe7f7fc2c3a8"; lazy_static! { pub static ref TEST_EXIT_DETAILS: HashMap = { @@ -131,6 +135,40 @@ pub fn get_eth_node() -> String { format!("http://{}:8545", NODE_IP) } +pub fn get_altheadb_contract_addr() -> Address { + "0xb9b674D720F96995ca033ec347df080d500c2230" + .parse() + .unwrap() +} + +pub fn get_test_runner_magic_phone() -> String { + "+17040000000".to_string() +} + +pub async fn deploy_contracts() { + let contact = Contact::new( + &get_althea_grpc(), + ALTHEA_CONTACT_TIMEOUT, + ALTHEA_CHAIN_PREFIX, + ) + .unwrap(); + // prevents the node deployer from failing (rarely) when the chain has not + // yet produced the next block after submitting each eth address + contact.wait_for_next_block(TOTAL_TIMEOUT).await.unwrap(); + + let res = Command::new("npx") + .args([ + "ts-node", + "/althea_rs/solidity/contract-deployer.ts", + &format!("--eth-privkey={}", MINER_PRIVATE_KEY), + &format!("--eth-node={}", ETH_NODE), + ]) + .output() + .expect("Failed to deploy contracts!"); + + info!("Contract deploy returned {:?}", from_utf8(&res.stdout)); +} + /// Test pingability waiting and failing if it is not successful pub fn test_reach_all(nsinfo: NamespaceInfo) { let start = Instant::now(); @@ -349,9 +387,10 @@ pub fn get_default_settings( let mut exit = exit.clone(); let mut client = client.clone(); exit.network.mesh_ip = Some(cluster.root_ip); + exit.exit_network.cluster_exits = cluster_exits; client.exit_client.contact_info = Some( ContactType::Both { - number: "+11111111".parse().unwrap(), + number: get_test_runner_magic_phone().parse().unwrap(), email: "fake@fake.com".parse().unwrap(), sequence_number: Some(0), } @@ -422,7 +461,7 @@ pub async fn register_to_exit(namespace_name: String, exit_name: String) -> Stat let client = awc::Client::default(); let req = client .post(format!( - "http://localhost:4877/exits/{}/register", + "http://localhost:4877/exits/{}/verify/1111", exit_network.exit_name )) .send() @@ -1025,7 +1064,7 @@ pub async fn register_all_namespaces_to_exit(namespaces: NamespaceInfo) { break; } if Instant::now() - start > register_timeout { - panic!("Failed to register {} to exit", r.get_name()); + panic!("Failed to register {} to exit with {}", r.get_name(), res); } warn!("Failed {} registration to exit, trying again", r.get_name()); thread::sleep(Duration::from_secs(1)); diff --git a/rita_client/src/exit_manager/mod.rs b/rita_client/src/exit_manager/mod.rs index 9b65032a0..e584b5831 100644 --- a/rita_client/src/exit_manager/mod.rs +++ b/rita_client/src/exit_manager/mod.rs @@ -363,6 +363,8 @@ async fn send_exit_setup_request( ident: ExitClientIdentity, ) -> Result { let endpoint = format!("http://[{}]:{}/secure_setup", to.ip(), to.port()); + error!("Trying to hit endpoint: {:?}", endpoint); + let ident = encrypt_exit_client_id(&exit_pubkey.into(), ident); let client = awc::Client::default(); @@ -556,6 +558,7 @@ pub async fn exit_setup_request(exit: String, code: Option) -> Result<() ); let exit_response = send_exit_setup_request(exit_pubkey, endpoint, ident).await?; + let mut rita_client = settings::get_rita_client(); let current_exit = match rita_client.exit_client.exits.get_mut(&exit) { diff --git a/rita_client_registration/Cargo.toml b/rita_client_registration/Cargo.toml index 9146eab91..29a652c50 100644 --- a/rita_client_registration/Cargo.toml +++ b/rita_client_registration/Cargo.toml @@ -15,3 +15,5 @@ phonenumber = "0.3" awc = "3.1" web30 = "1.0" tokio = { version = "1.21", features = ["macros", "time"] } +actix = "0.13" +futures = { version = "0.3", features = ["compat"] } diff --git a/rita_client_registration/src/lib.rs b/rita_client_registration/src/lib.rs index 3dd09bbf4..d80af01cd 100644 --- a/rita_client_registration/src/lib.rs +++ b/rita_client_registration/src/lib.rs @@ -1,20 +1,30 @@ #![deny(unused_crate_dependencies)] use std::{ - collections::HashMap, + collections::{HashMap, HashSet}, sync::{Arc, RwLock}, - time::Duration, + thread, + time::{Duration, Instant}, }; -use althea_types::{ExitClientIdentity, WgKey}; -use clarity::{Address, PrivateKey}; +use actix::System; +use althea_types::{ExitClientIdentity, Identity, WgKey}; +use clarity::{ + abi::{encode_call, AbiToken}, + Address, PrivateKey, Uint256, +}; +use futures::future::join_all; use phonenumber::PhoneNumber; use serde::{Deserialize, Serialize}; use tokio::join; -use web30::client::Web3; +use web30::{ + client::Web3, + jsonrpc::error::Web3Error, + types::{SendTxOption, TransactionResponse}, +}; use crate::client_db::{ - add_client_to_registered_list, get_registered_client_using_ethkey, - get_registered_client_using_meship, get_registered_client_using_wgkey, + get_registered_client_using_ethkey, get_registered_client_using_meship, + get_registered_client_using_wgkey, }; #[macro_use] @@ -27,8 +37,13 @@ pub mod client_db; lazy_static! { /// A map that stores number of texts sent to a client during registration static ref TEXTS_SENT: Arc>> = Arc::new(RwLock::new(HashMap::new())); + static ref TX_BATCH: Arc>> = Arc::new(RwLock::new(HashSet::new())); } +const REGISTRATION_LOOP_SPEED: Duration = Duration::from_secs(10); +const WEB3_TIMEOUT: Duration = Duration::from_secs(15); +pub const TX_TIMEOUT: Duration = Duration::from_secs(60); + /// Return struct from check_text and Send Text. Verified indicates status from api http req, /// bad phone number is an error parsing clients phone number /// Internal server error is an error while querying api endpoint @@ -64,6 +79,18 @@ fn get_texts_sent(key: WgKey) -> u8 { *TEXTS_SENT.read().unwrap().get(&key).unwrap_or(&0u8) } +fn add_client_to_reg_batch(id: Identity) { + TX_BATCH.write().unwrap().insert(id); +} + +fn remove_client_from_reg_batch(id: Identity) { + TX_BATCH.write().unwrap().remove(&id); +} + +fn get_reg_batch() -> Vec { + TX_BATCH.read().unwrap().clone().into_iter().collect() +} + #[derive(Serialize)] pub struct SmsCheck { api_key: String, @@ -132,7 +159,7 @@ async fn client_exists( match c_id { Ok(a) => client.global == a, Err(e) => { - error!( + warn!( "Error retrieving an identity with wg key {} with {}", client.global.wg_public_key, e ); @@ -146,15 +173,15 @@ pub async fn handle_sms_registration( client: ExitClientIdentity, api_key: String, magic_number: Option, - contact: &Web3, - contract_addr: Address, - our_private_key: PrivateKey, - wait_timeout: Option, ) -> ExitSignupReturn { info!( "Handling phone registration for {}", client.global.wg_public_key ); + error!( + "Handling phone registration for {}", + client.global.wg_public_key + ); // Get magic phone number let magic_phone_number = magic_number; @@ -171,25 +198,19 @@ pub async fn handle_sms_registration( (Some(number), Some(code), true) => { let is_magic = magic_phone_number.is_some() && magic_phone_number.unwrap() == number.clone(); - let check_text = match check_text(number.clone(), code, api_key).await { - Ok(a) => a, - Err(e) => return return_api_error(e), + let result = is_magic || { + match check_text(number.clone(), code, api_key).await { + Ok(a) => a, + Err(e) => return return_api_error(e), + } }; - let result = is_magic || check_text; if result { info!( "Phone registration complete for {}", client.global.wg_public_key ); - let _ = add_client_to_registered_list( - contact, - client.global, - contract_addr, - our_private_key, - wait_timeout, - vec![], - ) - .await; + + add_client_to_reg_batch(client.global); reset_texts_sent(client.global.wg_public_key); ExitSignupReturn::RegistrationOk } else { @@ -210,27 +231,20 @@ pub async fn handle_sms_registration( (Some(number), Some(code), false) => { let is_magic = magic_phone_number.is_some() && magic_phone_number.unwrap() == number.clone(); - let check_text = match check_text(number.clone(), code, api_key).await { - Ok(a) => a, - Err(e) => return return_api_error(e), - }; - let result = is_magic | check_text; + let result = is_magic || { + match check_text(number.clone(), code, api_key).await { + Ok(a) => a, + Err(e) => return return_api_error(e), + } + }; trace!("Check text returned {}", result); if result { info!( "Phone registration complete for {}", client.global.wg_public_key ); - let _ = add_client_to_registered_list( - contact, - client.global, - contract_addr, - our_private_key, - wait_timeout, - vec![], - ) - .await; + add_client_to_reg_batch(client.global); reset_texts_sent(client.global.wg_public_key); ExitSignupReturn::RegistrationOk } else { @@ -319,3 +333,146 @@ async fn send_text(number: String, api_key: String) -> Result<(), TextApiError> } } } + +pub fn register_client_batch_loop( + web3_url: String, + contract_addr: Address, + our_private_key: PrivateKey, +) { + let mut last_restart = Instant::now(); + thread::spawn(move || { + // this will always be an error, so it's really just a loop statement + // with some fancy destructuring + while let Err(e) = { + let web3 = web3_url.clone(); + thread::spawn(move || { + // Our Exit state variabl + let runner = System::new(); + + runner.block_on(async move { + loop { + let start = Instant::now(); + info!("Registration Loop tick"); + + let reg_clients = get_reg_batch(); + let contact = Web3::new(&web3, WEB3_TIMEOUT); + let mut nonce_retries = 0; + let mut nonce; + loop { + match contact + .eth_get_transaction_count(our_private_key.to_address()) + .await + { + Ok(a) => { + nonce = a; + break; + } + Err(e) => { + error!("Unable to get nonce to register routers: {}", e); + if nonce_retries > 10 { + error!("Cant register routers, panicing!"); + let sys = System::current(); + sys.stop(); + panic!( + "{}", + format!( + "Unable to get nonce to register routers: {}", + e + ) + ); + } + nonce_retries += 1; + thread::sleep(Duration::from_secs(5)); + continue; + } + } + } + + let mut batch = vec![]; + for id in reg_clients { + error!("This batch has {}", id.mesh_ip); + match contact + .send_transaction( + contract_addr, + match encode_call( + "add_registered_user((string,string,address))", + &[AbiToken::Struct(vec![ + AbiToken::String(id.mesh_ip.to_string()), + AbiToken::String(id.wg_public_key.to_string()), + AbiToken::Address(id.eth_address), + ])], + ) { + Ok(a) => a, + Err(e) => { + error!( + "REGISTRATION ERROR: Why cant we encode this call? {}", + e + ); + continue; + } + }, + 0u32.into(), + our_private_key, + vec![SendTxOption::Nonce(nonce)], + ) + .await + { + Ok(tx_id) => { + //increment nonce for next tx + nonce += 1u64.into(); + remove_client_from_reg_batch(id); + batch.push(tx_id); + } + Err(e) => { + error!( + "Failed registration for {} with {}", + id.wg_public_key, e + ); + } + } + } + + // Join on txs + let res = wait_for_txids(batch, &contact).await; + trace!("Received Transactions: {:?}", res); + + info!("Registration loop elapsed in = {:?}", start.elapsed()); + if start.elapsed() < REGISTRATION_LOOP_SPEED { + info!( + "Registration Loop sleeping for {:?}", + REGISTRATION_LOOP_SPEED - start.elapsed() + ); + thread::sleep(REGISTRATION_LOOP_SPEED - start.elapsed()); + } + info!("Registration loop sleeping Done!"); + } + }); + }) + .join() + } { + error!( + "Rita client Exit Manager loop thread paniced! Respawning {:?}", + e + ); + if Instant::now() - last_restart < Duration::from_secs(60) { + error!("Restarting too quickly, leaving it to auto rescue!"); + let sys = System::current(); + sys.stop_with_code(121); + } + last_restart = Instant::now(); + } + }); +} + +/// utility function that waits for a large number of txids to enter a block +async fn wait_for_txids( + txids: Vec, + web3: &Web3, +) -> Vec> { + let mut wait_for_txid = Vec::new(); + for txid in txids { + let wait = web3.wait_for_transaction(txid, TX_TIMEOUT, None); + wait_for_txid.push(wait); + } + join_all(wait_for_txid).await +} diff --git a/rita_exit/src/database/mod.rs b/rita_exit/src/database/mod.rs index 52da1b2d0..ae06e6502 100644 --- a/rita_exit/src/database/mod.rs +++ b/rita_exit/src/database/mod.rs @@ -118,9 +118,9 @@ pub async fn forward_client_signup_request(exit_client: ExitClientIdentity) -> E let url: &str; let reg_url = get_rita_exit().client_registration_url; if cfg!(feature = "dev_env") { - url = "http://0.0.0.0:8080/register_router"; + url = "http://7.7.7.1:40400/register_router"; } else if cfg!(feature = "operator_debug") { - url = "http://192.168.10.2:8080/register_router"; + url = "http://192.168.10.2:40400/register_router"; } else { url = ®_url; } @@ -129,6 +129,10 @@ pub async fn forward_client_signup_request(exit_client: ExitClientIdentity) -> E "About to request client {} registration with {}", exit_client.global, url ); + error!( + "About to request client {} registration with {}", + exit_client.global, url + ); let client = awc::Client::default(); let response = client diff --git a/rita_exit/src/rita_loop/mod.rs b/rita_exit/src/rita_loop/mod.rs index ec5030f1b..5ee1a9284 100644 --- a/rita_exit/src/rita_loop/mod.rs +++ b/rita_exit/src/rita_loop/mod.rs @@ -40,6 +40,8 @@ use rita_common::KI; pub const EXIT_LOOP_SPEED: u64 = 5; pub const EXIT_LOOP_SPEED_DURATION: Duration = Duration::from_secs(EXIT_LOOP_SPEED); pub const EXIT_LOOP_TIMEOUT: Duration = Duration::from_secs(4); +/// Retry getting all clients for 5 mins before crashing +pub const GET_CLIENT_RETRY: Duration = Duration::from_secs(300); /// Name of the legacy exit interface pub const LEGACY_INTERFACE: &str = "wg_exit"; @@ -126,23 +128,37 @@ async fn rita_exit_loop(rita_exit_cache: RitaExitCache, usage_history: ExitLock) let contract_addr = rita_exit.exit_network.registered_users_contract_addr; let get_clients_benchmark = Instant::now(); - let reg_clients_list = match get_all_regsitered_clients(&contact, our_addr, contract_addr).await - { - Ok(a) => a, - Err(e) => { - // Getting all clients is core functionality, we panic if fails - let message = format!( - "Failed to get all registered users with {}. Web3 url: {}, contract_addr: {}", + + // We retry getting users for 5 mins before we crash. Getting registered users is core functionality + let reg_clients_list; + let retry_start = Instant::now(); + loop { + match get_all_regsitered_clients(&contact, our_addr, contract_addr).await { + Ok(a) => { + reg_clients_list = a; + break; + } + Err(e) => { + // Getting all clients is core functionality, we panic if fails + let message = format!( + "Failed to get all registered users with {}. Web3 url: {}, contract_addr: {}. This is required for exit to funciton correctly", e, get_web3_server(), contract_addr ); - error!("{}", message); - let sys = AsyncSystem::current(); - sys.stop(); - panic!("{}", message); - } - }; + error!("{}", message); + thread::sleep(Duration::from_secs(10)); + if Instant::now() - retry_start > GET_CLIENT_RETRY { + let sys = AsyncSystem::current(); + sys.stop(); + panic!("{}", message); + } + } + }; + } + + error!("received reg clients: {:?}", reg_clients_list); + info!( "Finished Rita get clients, got {:?} clients in {}ms", reg_clients_list.len(), diff --git a/test_runner/src/main.rs b/test_runner/src/main.rs index 1f87d7a33..e52ba9606 100644 --- a/test_runner/src/main.rs +++ b/test_runner/src/main.rs @@ -1,36 +1,27 @@ -use althea_types::Identity; -use deep_space::Contact; use integration_tests::config::{ generate_exit_config_file, generate_rita_config_file, CONFIG_FILE_PATH, EXIT_CONFIG_PATH, }; +use integration_tests::contract_test::run_altheadb_contract_test; use integration_tests::debts::run_debts_test; /// Binary crate for actually running the integration tests use integration_tests::five_nodes::run_five_node_test_scenario; use integration_tests::mutli_exit::run_multi_exit_test; -use integration_tests::payments_eth::{ETH_MINER_KEY, WEB3_TIMEOUT}; -use integration_tests::utils::{get_althea_grpc, get_eth_node, TOTAL_TIMEOUT}; +use integration_tests::registration_server::start_registration_server; +use integration_tests::utils::deploy_contracts; use integration_tests::{ payments_althea::run_althea_payments_test_scenario, payments_eth::run_eth_payments_test_scenario, utils::set_sigterm, }; use log::info; -use rita_client_registration::client_db::{ - add_client_to_registered_list, get_all_regsitered_clients, get_registered_client_using_ethkey, - get_registered_client_using_meship, get_registered_client_using_wgkey, -}; -use std::process::Command; -use std::str::from_utf8; + +use std::env; + use std::time::Duration; -use std::{env, thread}; -use web30::client::Web3; extern crate log; pub const ALTHEA_CHAIN_PREFIX: &str = "althea"; pub const ALTHEA_CONTACT_TIMEOUT: Duration = Duration::from_secs(30); -pub const ETH_NODE: &str = "http://localhost:8545"; -pub const MINER_PRIVATE_KEY: &str = - "0x34d97aaf58b1a81d3ed3068a870d8093c6341cf5d1ef7e6efa03fe7f7fc2c3a8"; #[actix_rt::main] async fn main() { @@ -54,8 +45,8 @@ async fn main() { println!("Waiting to deploy contracts"); deploy_contracts().await; - // Try adding a dummy entry and validating that we can retrive them - validate_contract_functionality().await; + println!("Starting registration server"); + start_registration_server(); let test_type = env::var("TEST_TYPE"); info!("Starting tests with {:?}", test_type); @@ -70,6 +61,8 @@ async fn main() { run_althea_payments_test_scenario().await } else if test_type == "MULTI_EXIT" { run_multi_exit_test().await + } else if test_type == "CONTRACT_TEST" { + run_altheadb_contract_test().await } else { panic!("Error unknown test type {}!", test_type); } @@ -77,223 +70,3 @@ async fn main() { panic!("Error test type not set!"); } } - -pub async fn deploy_contracts() { - let contact = Contact::new( - &get_althea_grpc(), - ALTHEA_CONTACT_TIMEOUT, - ALTHEA_CHAIN_PREFIX, - ) - .unwrap(); - // prevents the node deployer from failing (rarely) when the chain has not - // yet produced the next block after submitting each eth address - contact.wait_for_next_block(TOTAL_TIMEOUT).await.unwrap(); - - let res = Command::new("npx") - .args([ - "ts-node", - "/althea_rs/solidity/contract-deployer.ts", - &format!("--eth-privkey={}", MINER_PRIVATE_KEY), - &format!("--eth-node={}", ETH_NODE), - ]) - .output() - .expect("Failed to deploy contracts!"); - - println!("Contract deploy returned {:?}", from_utf8(&res.stdout)); -} - -pub async fn validate_contract_functionality() { - let contact = Web3::new(&get_eth_node(), WEB3_TIMEOUT); - - // Define the users - let user = Identity { - mesh_ip: "fd00::1337".parse().unwrap(), - eth_address: "0x02ad6b480DFeD806C63a0839C6f1f3136c5fD515" - .parse() - .unwrap(), - wg_public_key: "sPtNGQbyPpCsqSKD6PbnflB1lIUCd259Vhd0mJfJeGo=" - .parse() - .unwrap(), - nickname: None, - }; - - let user_2 = Identity { - mesh_ip: "fd00::1447:1447".parse().unwrap(), - eth_address: "0x1994A73F79F9648d4a8064D9C0F221fB1007Fd2F" - .parse() - .unwrap(), - wg_public_key: "Yhyj+CKZbyEKea/9hdIFje98yc5Cukt1Pbq0qWB4Aqw=" - .parse() - .unwrap(), - nickname: None, - }; - - let user_3 = Identity { - mesh_ip: "fd00::3000:1117".parse().unwrap(), - eth_address: "0x9c33D0dFdc9E3f7cC73bE3A575C31cfe3059C76a" - .parse() - .unwrap(), - wg_public_key: "fzOUfEqYzRE0MwfR5o7XV+MKZKj/qEfELRzQTRTKAB8=" - .parse() - .unwrap(), - nickname: None, - }; - - // Try requests when there are no users present - let res = get_all_regsitered_clients( - &contact, - ETH_MINER_KEY - .parse::() - .unwrap() - .to_address(), - "0xb9b674D720F96995ca033ec347df080d500c2230" - .parse() - .unwrap(), - ) - .await - .unwrap(); - - assert!(res.is_empty()); - - let res = get_registered_client_using_wgkey( - user.wg_public_key, - ETH_MINER_KEY - .parse::() - .unwrap() - .to_address(), - "0xb9b674D720F96995ca033ec347df080d500c2230" - .parse() - .unwrap(), - &contact, - ) - .await; - - assert!(res.is_err()); - - // Add the first user - let _res = add_client_to_registered_list( - &contact, - user, - "0xb9b674D720F96995ca033ec347df080d500c2230" - .parse() - .unwrap(), - ETH_MINER_KEY.parse().unwrap(), - None, - vec![], - ) - .await - .unwrap(); - - thread::sleep(Duration::from_secs(5)); - - // Try requesting some info that doesnt exist - let res = get_registered_client_using_ethkey( - "0x3d261902a988d94599d7f0Bd4c2e4514D73BB329" - .parse() - .unwrap(), - ETH_MINER_KEY - .parse::() - .unwrap() - .to_address(), - "0xb9b674D720F96995ca033ec347df080d500c2230" - .parse() - .unwrap(), - &contact, - ) - .await; - - assert!(res.is_err()); - - // Add the second user - let _res = add_client_to_registered_list( - &contact, - user_2, - "0xb9b674D720F96995ca033ec347df080d500c2230" - .parse() - .unwrap(), - ETH_MINER_KEY.parse().unwrap(), - None, - vec![], - ) - .await - .unwrap(); - - thread::sleep(Duration::from_secs(5)); - - // Add the third user - let _res = add_client_to_registered_list( - &contact, - user_3, - "0xb9b674D720F96995ca033ec347df080d500c2230" - .parse() - .unwrap(), - ETH_MINER_KEY.parse().unwrap(), - None, - vec![], - ) - .await - .unwrap(); - - thread::sleep(Duration::from_secs(10)); - - let res = get_all_regsitered_clients( - &contact, - ETH_MINER_KEY - .parse::() - .unwrap() - .to_address(), - "0xb9b674D720F96995ca033ec347df080d500c2230" - .parse() - .unwrap(), - ) - .await; - - println!("All users are : {:?}", res); - - thread::sleep(Duration::from_secs(5)); - - let res = get_registered_client_using_wgkey( - user.wg_public_key, - ETH_MINER_KEY - .parse::() - .unwrap() - .to_address(), - "0xb9b674D720F96995ca033ec347df080d500c2230" - .parse() - .unwrap(), - &contact, - ) - .await - .unwrap(); - assert_eq!(res, user); - - let res = get_registered_client_using_ethkey( - user_2.eth_address, - ETH_MINER_KEY - .parse::() - .unwrap() - .to_address(), - "0xb9b674D720F96995ca033ec347df080d500c2230" - .parse() - .unwrap(), - &contact, - ) - .await - .unwrap(); - assert_eq!(res, user_2); - - let res = get_registered_client_using_meship( - user_3.mesh_ip, - ETH_MINER_KEY - .parse::() - .unwrap() - .to_address(), - "0xb9b674D720F96995ca033ec347df080d500c2230" - .parse() - .unwrap(), - &contact, - ) - .await - .unwrap(); - assert_eq!(res, user_3); -}