From 063e765d69d1a24bb65e8890d96e5a6a47b374b5 Mon Sep 17 00:00:00 2001 From: Justin Kilpatrick Date: Mon, 4 Nov 2024 16:48:49 -0500 Subject: [PATCH] WIP: Remove exit lazy static This patch simplifies the dataflow for th rita exit module by removing the lazy static exit database and instead holding that data in the main rita exit thread. --- Cargo.lock | 1 - exit_trust_root/src/client_db.rs | 7 +- .../src/register_client_batch_loop.rs | 10 - rita_bin/src/exit.rs | 2 +- rita_exit/Cargo.toml | 1 - rita_exit/src/database/geoip.rs | 138 +++-- rita_exit/src/database/in_memory_database.rs | 373 +++--------- rita_exit/src/database/mod.rs | 89 +-- rita_exit/src/lib.rs | 3 - rita_exit/src/network_endpoints/mod.rs | 3 +- rita_exit/src/rita_loop/mod.rs | 569 +++++++++++++----- rita_exit/src/traffic_watcher/mod.rs | 9 +- 12 files changed, 631 insertions(+), 574 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ee720c40c..3b138be39 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3092,7 +3092,6 @@ dependencies = [ "exit_trust_root", "handlebars", "ipnetwork", - "lazy_static", "lettre", "log", "num256", diff --git a/exit_trust_root/src/client_db.rs b/exit_trust_root/src/client_db.rs index 438aaecf0..5fa6eae2e 100644 --- a/exit_trust_root/src/client_db.rs +++ b/exit_trust_root/src/client_db.rs @@ -9,7 +9,7 @@ use clarity::{ abi::{encode_call, AbiToken}, Address, PrivateKey, Uint256, }; -use std::{net::IpAddr, time::Duration, vec}; +use std::{collections::HashSet, net::IpAddr, time::Duration, vec}; use tokio::time::timeout as future_timeout; use web30::{ client::Web3, @@ -24,7 +24,7 @@ pub async fn get_all_registered_clients( web30: &Web3, requester_address: Address, contract: Address, -) -> Result, Web3Error> { +) -> Result, Web3Error> { let payload = encode_call("getAllRegisteredUsers()", &[])?; let res = web30 .simulate_transaction( @@ -33,7 +33,8 @@ pub async fn get_all_registered_clients( ) .await?; - convert_althea_types_to_web3_error(Identity::decode_array_from_eth_abi(res)) + let val = convert_althea_types_to_web3_error(Identity::decode_array_from_eth_abi(res))?; + Ok(val.into_iter().collect()) } pub async fn get_registered_client_using_wgkey( diff --git a/exit_trust_root/src/register_client_batch_loop.rs b/exit_trust_root/src/register_client_batch_loop.rs index ba3639e64..8fcd0e1b2 100644 --- a/exit_trust_root/src/register_client_batch_loop.rs +++ b/exit_trust_root/src/register_client_batch_loop.rs @@ -21,15 +21,6 @@ pub struct RegistrationRequest { pub const MAX_BATCH_SIZE: usize = 75; -/// Utility function used to easily perform O(1) lookups against the identities list -pub fn get_clients_hashset(input: Vec) -> HashSet { - let mut output = HashSet::new(); - for i in input { - output.insert(i); - } - output -} - /// This function monitors the registration queue lock free queue. It will dequeue any new entries and attempt to register them /// in a batch sent every REGISTRATION_LOOP_SPEED seconds. This function will also check if the user is already registered before attempting to register them pub async fn register_client_batch_loop( @@ -106,7 +97,6 @@ pub async fn register_client_batch_loop( continue; } }; - let all_clients = get_clients_hashset(all_clients); let mut clients_to_register = Vec::new(); for client in list.iter() { diff --git a/rita_bin/src/exit.rs b/rita_bin/src/exit.rs index d165e237d..7535616e2 100644 --- a/rita_bin/src/exit.rs +++ b/rita_bin/src/exit.rs @@ -169,7 +169,7 @@ async fn main() { ))); // this call blocks, transforming this startup thread into the main exit watchdog thread - start_rita_exit_loop(clients); + start_rita_exit_loop(clients).await; } /// This function performs startup integrity checks on the config and system. It checks that we can reach the internet diff --git a/rita_exit/Cargo.toml b/rita_exit/Cargo.toml index e81c5f2df..7eac228d5 100644 --- a/rita_exit/Cargo.toml +++ b/rita_exit/Cargo.toml @@ -16,7 +16,6 @@ babel_monitor = { path = "../babel_monitor" } actix = {workspace = true} awc = {workspace = true} handlebars = "5.1" -lazy_static = "1.5" ipnetwork = "0.20" clarity = {workspace = true} serde = "1.0" diff --git a/rita_exit/src/database/geoip.rs b/rita_exit/src/database/geoip.rs index e1b71dc39..f88d88e12 100644 --- a/rita_exit/src/database/geoip.rs +++ b/rita_exit/src/database/geoip.rs @@ -1,4 +1,4 @@ -use actix::System; +use crate::RitaExitError; use althea_kernel_interface::interface_tools::get_wg_remote_ip; use althea_types::regions::Regions; use babel_monitor::open_babel_stream; @@ -9,9 +9,6 @@ use std::collections::HashMap; use std::net::IpAddr; use std::time::Duration; -use crate::database::RITA_EXIT_STATE; -use crate::RitaExitError; - /// gets the gateway ip for a given mesh IP pub fn get_gateway_ip_single(mesh_ip: IpAddr) -> Result> { let babel_port = settings::get_rita_exit().network.babel_port; @@ -131,7 +128,10 @@ struct CountryDetails { } /// get ISO country code from ip, consults a in memory cache -pub fn get_country(ip: IpAddr) -> Result> { +pub async fn get_country( + geoip_cache: &mut HashMap, + ip: IpAddr, +) -> Result> { trace!("get GeoIP country for {}", ip.to_string()); // if allowed countries is not configured we don't care and will use @@ -170,12 +170,7 @@ pub fn get_country(ip: IpAddr) -> Result> { // we have to turn this option into a string in order to avoid // the borrow checker trying to keep this lock open for a long period - let cache_result = RITA_EXIT_STATE - .read() - .unwrap() - .geoip_cache - .get(&ip) - .copied(); + let cache_result = geoip_cache.get(&ip).copied(); match cache_result { Some(code) => Ok(code), @@ -187,55 +182,51 @@ pub fn get_country(ip: IpAddr) -> Result> { ip.to_string() ); // run in async closure and return the result - let runner = System::new(); - runner.block_on(async move { - let client = awc::Client::new(); - if let Ok(mut res) = client - .get(&geo_ip_url) - .basic_auth(api_user, api_key) - .timeout(Duration::from_secs(1)) - .send() - .await - { - trace!("Got geoip result {:?}", res); - if let Ok(res) = res.json().await { - let value: GeoIpRet = res; - let code = match value.country.iso_code.parse() { - Ok(r) => r, - Err(_) => { - error!( - "Failed to parse geoip response {:?}", - value.country.iso_code - ); - Regions::UnkownRegion - } - }; - trace!("Adding GeoIP value {:?} to cache", code); - RITA_EXIT_STATE - .write() - .unwrap() - .geoip_cache - .insert(ip, code); - trace!("Added to cache, returning"); - Ok(code) - } else { - Err(Box::new(RitaExitError::MiscStringError( - "Failed to deserialize geoip response".to_string(), - ))) - } + let client = awc::Client::new(); + if let Ok(mut res) = client + .get(&geo_ip_url) + .basic_auth(api_user, api_key) + .timeout(Duration::from_secs(1)) + .send() + .await + { + trace!("Got geoip result {:?}", res); + if let Ok(res) = res.json().await { + let value: GeoIpRet = res; + let code = match value.country.iso_code.parse() { + Ok(r) => r, + Err(_) => { + error!( + "Failed to parse geoip response {:?}", + value.country.iso_code + ); + Regions::UnkownRegion + } + }; + trace!("Adding GeoIP value {:?} to cache", code); + geoip_cache.insert(ip, code); + trace!("Added to cache, returning"); + Ok(code) } else { Err(Box::new(RitaExitError::MiscStringError( - "Request failed".to_string(), + "Failed to deserialize geoip response".to_string(), ))) } - }) + } else { + Err(Box::new(RitaExitError::MiscStringError( + "Request failed".to_string(), + ))) + } } } } /// Returns true or false if an ip is confirmed to be inside or outside the region and error /// if an api error is encountered trying to figure that out. -pub fn verify_ip(request_ip: IpAddr) -> Result> { +pub async fn verify_ip( + geoip_cache: &mut HashMap, + request_ip: IpAddr, +) -> Result> { // in this case we have a gateway directly attached to the exit, so our // peer address for them will be an fe80 linklocal ip address. When we // detect this we know that they are in the allowed countries list because @@ -249,7 +240,7 @@ pub fn verify_ip(request_ip: IpAddr) -> Result> { if settings::get_rita_exit().allowed_countries.is_empty() { Ok(true) } else { - let country = get_country(request_ip)?; + let country = get_country(geoip_cache, request_ip).await?; if !settings::get_rita_exit().allowed_countries.is_empty() && !settings::get_rita_exit() .allowed_countries @@ -262,8 +253,45 @@ pub fn verify_ip(request_ip: IpAddr) -> Result> { } } -#[test] -#[ignore] -fn test_get_country() { - get_country("8.8.8.8".parse().unwrap()).unwrap(); +#[cfg(test)] +mod tests { + use std::str::FromStr; + use super::*; + + #[actix_web::test] + #[ignore] + async fn test_get_country() { + let mut geoip_cache = HashMap::new(); + let ip = IpAddr::from_str("8.8.8.8").unwrap(); + let result = get_country(&mut geoip_cache, ip).await; + assert!(result.is_ok()); + } + + #[actix_web::test] + #[ignore] + async fn test_get_gateway_ip_single() { + let ip = IpAddr::from_str("2001:4860:4860::8888").unwrap(); + let result = get_gateway_ip_single(ip); + assert!(result.is_ok()); + } + + #[actix_web::test] + #[ignore] + async fn test_get_gateway_ip_bulk() { + let ips = vec![ + IpAddr::from_str("2001:4860:4860::8888").unwrap(), + IpAddr::from_str("2001:4860:4860::8844").unwrap(), + ]; + let result = get_gateway_ip_bulk(ips, Duration::from_secs(5)); + assert!(result.is_ok()); + } + + #[actix_web::test] + #[ignore] + async fn test_verify_ip() { + let mut geoip_cache = HashMap::new(); + let ip = IpAddr::from_str("8.8.8.8").unwrap(); + let result = verify_ip(&mut geoip_cache, ip).await; + assert!(result.is_ok()); + } } diff --git a/rita_exit/src/database/in_memory_database.rs b/rita_exit/src/database/in_memory_database.rs index bc204e16c..302c89cd1 100644 --- a/rita_exit/src/database/in_memory_database.rs +++ b/rita_exit/src/database/in_memory_database.rs @@ -1,17 +1,13 @@ -use althea_kernel_interface::ExitClient; -use althea_types::{Identity, WgKey}; -use ipnetwork::{IpNetwork, Ipv4Network, Ipv6Network}; +use althea_types::WgKey; +use ipnetwork::{IpNetwork, Ipv6Network}; use std::collections::hash_map::DefaultHasher; use std::collections::{HashMap, HashSet}; -use std::convert::TryInto; use std::fmt::Write; use std::hash::{Hash, Hasher}; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; use crate::RitaExitError; -use super::RITA_EXIT_STATE; - /// Wg exit port on client side pub const CLIENT_WG_PORT: u16 = 59999; @@ -23,45 +19,17 @@ pub const DEFAULT_CLIENT_SUBNET_SIZE: u8 = 56; #[derive(Clone, Debug, Default)] pub struct IpAssignmentMap { - pub ipv6_assignments: HashMap, - pub internal_ip_assignments: HashMap, -} - -// Lazy static setters/getters -pub fn get_ipv6_assignments() -> HashMap { - RITA_EXIT_STATE - .read() - .unwrap() - .ip_assignment_map - .ipv6_assignments - .clone() -} - -pub fn get_internal_ip_assignments() -> HashMap { - RITA_EXIT_STATE - .read() - .unwrap() - .ip_assignment_map - .internal_ip_assignments - .clone() + pub ipv6_assignments: HashMap, + pub internal_ip_assignments: HashMap, } -pub fn add_new_ipv6_assignment(addr: IpAddr, key: WgKey) { - RITA_EXIT_STATE - .write() - .unwrap() - .ip_assignment_map - .ipv6_assignments - .insert(addr, key); -} - -pub fn add_new_internal_ip_assignement(addr: IpAddr, key: WgKey) { - RITA_EXIT_STATE - .write() - .unwrap() - .ip_assignment_map - .internal_ip_assignments - .insert(addr, key); +impl IpAssignmentMap { + pub fn new() -> Self { + IpAssignmentMap { + ipv6_assignments: HashMap::new(), + internal_ip_assignments: HashMap::new(), + } + } } /// Take an index i, a larger subnet and a smaller subnet length and generate the ith smaller subnet in the larger subnet @@ -70,7 +38,7 @@ pub fn generate_iterative_client_subnet( exit_sub: IpNetwork, ind: u64, subprefix: u8, -) -> Result> { +) -> Result> { let net; // Covert the subnet's ip address into a u128 integer to allow for easy iterative @@ -100,14 +68,14 @@ pub fn generate_iterative_client_subnet( if ind < (1 << (subprefix - exit_sub.prefix())) { let ret = net_as_int + (ind as u128 * net.size()); let v6addr = Ipv6Addr::from(ret); - let ret = IpNetwork::from(match Ipv6Network::new(v6addr, subprefix) { + let ret = match Ipv6Network::new(v6addr, subprefix) { Ok(a) => a, Err(e) => { return Err(Box::new(RitaExitError::MiscStringError(format!( "Unable to parse a valid client subnet: {e:?}" )))) } - }); + }; Ok(ret) } else { @@ -120,190 +88,6 @@ pub fn generate_iterative_client_subnet( } } -/// Given a client identity, get the clients ipv6 addr using the wgkey as a generative seed -pub fn get_client_ipv6( - their_record: Identity, - exit_sub: Option, - client_subnet_size: u8, -) -> Result, Box> { - if let Some(exit_sub) = exit_sub { - let wg_hash = hash_wgkey(their_record.wg_public_key); - - // This bitshifting is the total number of client subnets available. We are checking that our iterative index - // is lower than this number. For example, exit subnet: fd00:1000/120, client subnet /124, number of subnets will be - // 2^(124 - 120) => 2^4 => 16 - let total_subnets = 1 << (client_subnet_size - exit_sub.prefix()); - let mut generative_index = wg_hash % total_subnets; - - // Loop to try to generate a valid address - let mut retries = 0; - loop { - // Return an error if we retry too many times - if retries > MAX_IP_RETRIES { - return Err(Box::new(RitaExitError::MiscStringError(format!( - "Unable to get internet ipv6 using network {} and index {}", - exit_sub, generative_index - )))); - } - - let client_subnet = - generate_iterative_client_subnet(exit_sub, generative_index, client_subnet_size)?; - - if validate_internet_ipv6(client_subnet, their_record.wg_public_key) { - add_new_ipv6_assignment(client_subnet.ip(), their_record.wg_public_key); - return Ok(Some(client_subnet)); - } else { - retries += 1; - generative_index = (generative_index + 1) % total_subnets; - continue; - } - } - } else { - // This exit doesnt support ipv6 - Ok(None) - } -} - -/// Given a client identity, get the clients internal ip addr using the wgkey as a generative seed -pub fn get_client_internal_ip( - their_record: Identity, - netmask: u8, - gateway_ip: Ipv4Addr, -) -> Result> { - let wg_hash = hash_wgkey(their_record.wg_public_key); - // total number of available addresses - let total_addresses: u64 = 2_u64.pow((32 - netmask).into()); - let mut generative_index = wg_hash % total_addresses; - let network = match Ipv4Network::new(gateway_ip, netmask) { - Ok(a) => a, - Err(e) => { - return Err(Box::new(RitaExitError::MiscStringError(format!( - "Unable to setup and ipnetwork to generate internal ip {}", - e - )))) - } - }; - - // Keep trying to generate an address till we get a valid one - let mut retries = 0; - loop { - // Return an error if we retry too many times - if retries > MAX_IP_RETRIES { - return Err(Box::new(RitaExitError::MiscStringError(format!( - "Unable to get internal ip using network {} and index {}", - network, generative_index - )))); - } - - let internal_ip = network.nth(match generative_index.try_into() { - Ok(a) => a, - Err(e) => { - warn!("Internal Ip failure: {}", e); - retries += 1; - generative_index = (generative_index + 1) % total_addresses; - continue; - } - }); - - let internal_ip = match internal_ip { - Some(a) => a, - None => { - retries += 1; - generative_index = (generative_index + 1) % total_addresses; - continue; - } - }; - - // Validate that this ip is valid and return it - if validate_internal_ip(network, internal_ip, gateway_ip, their_record.wg_public_key) { - add_new_internal_ip_assignement(IpAddr::V4(internal_ip), their_record.wg_public_key); - return Ok(IpAddr::V4(internal_ip)); - } else { - retries += 1; - generative_index = (generative_index + 1) % total_addresses; - continue; - } - } -} - -/// Check that this ip can be assigned, make sure there isnt a collision with previously assigned ips -pub fn validate_internet_ipv6(client_subnet: IpNetwork, our_wgkey: WgKey) -> bool { - let assigned_ips = get_ipv6_assignments(); - let assignment = assigned_ips.get(&client_subnet.ip()); - match assignment { - Some(a) => { - // There is an entry, verify if its our entry else false - *a == our_wgkey - } - // There is no assigned ip here, ip is valid - None => true, - } -} - -/// Check that this ip can be assigned, make sure it isnt our ip, network ip, broadcast ip, etc -pub fn validate_internal_ip( - network: Ipv4Network, - assigned_ip: Ipv4Addr, - our_ip: Ipv4Addr, - our_wgkey: WgKey, -) -> bool { - let broadcast = network.broadcast(); - let network_ip = network.network(); - - // Collision with our ip - if assigned_ip == our_ip { - return false; - } - // collision with the network ip - if assigned_ip == network_ip { - return false; - } - // collision with broadcast address - if assigned_ip == broadcast { - return false; - } - - let assignments = get_internal_ip_assignments(); - let assignment = assignments.get(&IpAddr::V4(assigned_ip)); - match assignment { - Some(a) => { - // check if this existing ip is ours - *a == our_wgkey - } - // No assignment, we can use this address - None => true, - } -} - -pub fn to_exit_client(client: Identity) -> Result> { - let internet_ipv6 = get_client_ipv6( - client, - settings::get_rita_exit().exit_network.get_ipv6_subnet_alt(), - settings::get_rita_exit() - .get_client_subnet_size() - .unwrap_or(DEFAULT_CLIENT_SUBNET_SIZE), - )?; - let internal_ip = get_client_internal_ip( - client, - settings::get_rita_exit() - .exit_network - .internal_ipv4 - .prefix(), - settings::get_rita_exit() - .exit_network - .internal_ipv4 - .internal_ip(), - )?; - - Ok(ExitClient { - mesh_ip: client.mesh_ip, - internal_ip, - port: CLIENT_WG_PORT, - public_key: client.wg_public_key, - internet_ipv6, - }) -} - pub fn hash_wgkey(key: WgKey) -> u64 { let mut hasher = DefaultHasher::new(); key.to_string().hash(&mut hasher); @@ -321,18 +105,29 @@ pub fn display_hashset(input: &HashSet) -> String { #[cfg(test)] mod tests { + use super::hash_wgkey; + use crate::{ + database::in_memory_database::generate_iterative_client_subnet, rita_loop::RitaExitData, + IpAssignmentMap, + }; use althea_types::Identity; - use ipnetwork::IpNetwork; - - use crate::database::in_memory_database::{ - generate_iterative_client_subnet, get_client_internal_ip, get_internal_ip_assignments, - get_ipv6_assignments, + use ipnetwork::{IpNetwork, Ipv6Network}; + use std::{ + collections::HashSet, + sync::{Arc, RwLock}, }; - use super::{get_client_ipv6, hash_wgkey}; + pub fn get_test_data() -> RitaExitData { + let data = RitaExitData::new( + Arc::new(RwLock::new(IpAssignmentMap::new())), + HashSet::new(), + ); + data + } #[test] fn test_internet_ipv6_assignment() { + let mut data = get_test_data(); let exit_sub = Some("2602:FBAD:10::/126".parse().unwrap()); let dummy_client = Identity { mesh_ip: "fd00::1337".parse().unwrap(), @@ -346,27 +141,29 @@ mod tests { }; // Generate a client subnet - let ip = get_client_ipv6(dummy_client, exit_sub, 128) + let ip = data + .get_or_add_client_ipv6(dummy_client, exit_sub, 128) .unwrap() .unwrap(); // Verify assignement db is correctly populated - assert!(get_ipv6_assignments().len() == 1); + assert!(data.get_ipv6_assignments().len() == 1); assert_eq!( - *get_ipv6_assignments().get(&ip.ip()).unwrap(), + *data.get_ipv6_assignments().get(&ip).unwrap(), dummy_client.wg_public_key ); // Try retrieving the same client - let ip_2 = get_client_ipv6(dummy_client, exit_sub, 128) + let ip_2 = data + .get_or_add_client_ipv6(dummy_client, exit_sub, 128) .unwrap() .unwrap(); assert_eq!(ip, ip_2); // Make sure no new entries in assignemnt db - assert!(get_ipv6_assignments().len() == 1); + assert!(data.get_ipv6_assignments().len() == 1); assert_eq!( - *get_ipv6_assignments().get(&ip.ip()).unwrap(), + *data.get_ipv6_assignments().get(&ip).unwrap(), dummy_client.wg_public_key ); @@ -385,26 +182,28 @@ mod tests { }; // Generate a client subnet - let ip = get_client_ipv6(dummy_client_2, exit_sub, 128) + let ip = data + .get_or_add_client_ipv6(dummy_client_2, exit_sub, 128) .unwrap() .unwrap(); // Verify assignement db is correctly populated - assert!(get_ipv6_assignments().len() == 2); + assert!(data.get_ipv6_assignments().len() == 2); assert_eq!( - *get_ipv6_assignments().get(&ip.ip()).unwrap(), + *data.get_ipv6_assignments().get(&ip).unwrap(), dummy_client_2.wg_public_key ); - let ip_2 = get_client_ipv6(dummy_client_2, exit_sub, 128) + let ip_2 = data + .get_or_add_client_ipv6(dummy_client_2, exit_sub, 128) .unwrap() .unwrap(); assert_eq!(ip, ip_2); // Make sure no new entries in assignemnt db - assert!(get_ipv6_assignments().len() == 2); + assert!(data.get_ipv6_assignments().len() == 2); assert_eq!( - *get_ipv6_assignments().get(&ip.ip()).unwrap(), + *data.get_ipv6_assignments().get(&ip).unwrap(), dummy_client_2.wg_public_key ); @@ -423,29 +222,32 @@ mod tests { }; // Generate a client subnet - let ip = get_client_ipv6(dummy_client_3, exit_sub, 128) + let ip = data + .get_or_add_client_ipv6(dummy_client_3, exit_sub, 128) .unwrap() .unwrap(); // Verify assignement db is correctly populated - assert!(get_ipv6_assignments().len() == 3); + assert!(data.get_ipv6_assignments().len() == 3); assert_eq!( - *get_ipv6_assignments().get(&ip.ip()).unwrap(), + *data.get_ipv6_assignments().get(&ip).unwrap(), dummy_client_3.wg_public_key ); - let _ = get_client_ipv6(dummy_client_2, exit_sub, 128) + let _ = data + .get_or_add_client_ipv6(dummy_client_2, exit_sub, 128) .unwrap() .unwrap(); - let ip_2 = get_client_ipv6(dummy_client_3, exit_sub, 128) + let ip_2 = data + .get_or_add_client_ipv6(dummy_client_3, exit_sub, 128) .unwrap() .unwrap(); assert_eq!(ip, ip_2); // Make sure no new entries in assignemnt db - assert!(get_ipv6_assignments().len() == 3); + assert!(data.get_ipv6_assignments().len() == 3); assert_eq!( - *get_ipv6_assignments().get(&ip.ip()).unwrap(), + *data.get_ipv6_assignments().get(&ip).unwrap(), dummy_client_3.wg_public_key ); @@ -476,6 +278,7 @@ mod tests { #[test] fn test_internal_ip_assignment() { + let mut data = get_test_data(); let dummy_client = Identity { mesh_ip: "fd00::1337".parse().unwrap(), eth_address: "0x4Af6D4125f3CBF07EBAD056E2eCa7b17c58AFEa4" @@ -486,25 +289,27 @@ mod tests { .unwrap(), nickname: None, }; - let ip = - get_client_internal_ip(dummy_client, 30, "172.168.0.100".parse().unwrap()).unwrap(); + let ip = data + .get_or_add_client_internal_ip(dummy_client, 30, "172.168.0.100".parse().unwrap()) + .unwrap(); // Verify assignement db is correctly populated - assert!(get_internal_ip_assignments().len() == 1); + assert!(data.get_internal_ip_assignments().len() == 1); assert_eq!( - *get_internal_ip_assignments().get(&ip).unwrap(), + *data.get_internal_ip_assignments().get(&ip).unwrap(), dummy_client.wg_public_key ); // requesting the same client shouldnt change any state - let ip2 = - get_client_internal_ip(dummy_client, 30, "172.168.0.100".parse().unwrap()).unwrap(); + let ip2 = data + .get_or_add_client_internal_ip(dummy_client, 30, "172.168.0.100".parse().unwrap()) + .unwrap(); assert_eq!(ip, ip2); - assert!(get_internal_ip_assignments().len() == 1); + assert!(data.get_internal_ip_assignments().len() == 1); assert_eq!( - *get_internal_ip_assignments().get(&ip2).unwrap(), + *data.get_internal_ip_assignments().get(&ip2).unwrap(), dummy_client.wg_public_key ); @@ -522,25 +327,27 @@ mod tests { nickname: None, }; - let ip = - get_client_internal_ip(dummy_client_2, 30, "172.168.0.100".parse().unwrap()).unwrap(); + let ip = data + .get_or_add_client_internal_ip(dummy_client_2, 30, "172.168.0.100".parse().unwrap()) + .unwrap(); // Verify assignement db is correctly populated - assert!(get_internal_ip_assignments().len() == 2); + assert!(data.get_internal_ip_assignments().len() == 2); assert_eq!( - *get_internal_ip_assignments().get(&ip).unwrap(), + *data.get_internal_ip_assignments().get(&ip).unwrap(), dummy_client_2.wg_public_key ); // requesting the same client shouldnt change any state - let ip2 = - get_client_internal_ip(dummy_client_2, 30, "172.168.0.100".parse().unwrap()).unwrap(); + let ip2 = data + .get_or_add_client_internal_ip(dummy_client_2, 30, "172.168.0.100".parse().unwrap()) + .unwrap(); assert_eq!(ip, ip2); - assert!(get_internal_ip_assignments().len() == 2); + assert!(data.get_internal_ip_assignments().len() == 2); assert_eq!( - *get_internal_ip_assignments().get(&ip2).unwrap(), + *data.get_internal_ip_assignments().get(&ip2).unwrap(), dummy_client_2.wg_public_key ); @@ -553,19 +360,22 @@ mod tests { // Complex subnet example let net: IpNetwork = "2602:FBAD::/40".parse().unwrap(); let ret = generate_iterative_client_subnet(net, 0, 64); - assert_eq!("2602:FBAD::/64".parse::().unwrap(), ret.unwrap()); + assert_eq!( + "2602:FBAD::/64".parse::().unwrap(), + ret.unwrap() + ); let net: IpNetwork = "2602:FBAD::/40".parse().unwrap(); let ret = generate_iterative_client_subnet(net, 1, 64); assert_eq!( - "2602:FBAD:0:1::/64".parse::().unwrap(), + "2602:FBAD:0:1::/64".parse::().unwrap(), ret.unwrap() ); let net: IpNetwork = "2602:FBAD::/40".parse().unwrap(); let ret = generate_iterative_client_subnet(net, 50, 64); assert_eq!( - "2602:FBAD:0:32::/64".parse::().unwrap(), + "2602:FBAD:0:32::/64".parse::().unwrap(), ret.unwrap() ); @@ -580,15 +390,24 @@ mod tests { // Simple subnet example let net: IpNetwork = "fd00::1337/120".parse().unwrap(); let ret = generate_iterative_client_subnet(net, 0, 124); - assert_eq!("fd00::1300/124".parse::().unwrap(), ret.unwrap()); + assert_eq!( + "fd00::1300/124".parse::().unwrap(), + ret.unwrap() + ); let net: IpNetwork = "fd00::1337/120".parse().unwrap(); let ret = generate_iterative_client_subnet(net, 2, 124); - assert_eq!("fd00::1320/124".parse::().unwrap(), ret.unwrap()); + assert_eq!( + "fd00::1320/124".parse::().unwrap(), + ret.unwrap() + ); let net: IpNetwork = "fd00::1337/120".parse().unwrap(); let ret = generate_iterative_client_subnet(net, 15, 124); - assert_eq!("fd00::13f0/124".parse::().unwrap(), ret.unwrap()); + assert_eq!( + "fd00::13f0/124".parse::().unwrap(), + ret.unwrap() + ); let net: IpNetwork = "fd00::1337/120".parse().unwrap(); let ret = generate_iterative_client_subnet(net, 16, 124); assert!(ret.is_err()); diff --git a/rita_exit/src/database/mod.rs b/rita_exit/src/database/mod.rs index 308564e43..da945b387 100644 --- a/rita_exit/src/database/mod.rs +++ b/rita_exit/src/database/mod.rs @@ -5,14 +5,10 @@ use crate::database::geoip::get_gateway_ip_bulk; use crate::database::geoip::get_gateway_ip_single; use crate::database::geoip::verify_ip; use crate::database::in_memory_database::display_hashset; -use crate::database::in_memory_database::get_client_internal_ip; -use crate::database::in_memory_database::get_client_ipv6; -use crate::database::in_memory_database::to_exit_client; use crate::database::in_memory_database::DEFAULT_CLIENT_SUBNET_SIZE; use crate::rita_loop::EXIT_INTERFACE; use crate::rita_loop::EXIT_LOOP_TIMEOUT; use crate::rita_loop::LEGACY_INTERFACE; -use crate::IpAssignmentMap; use crate::RitaExitError; use althea_kernel_interface::exit_server_tunnel::set_exit_wg_config; use althea_kernel_interface::exit_server_tunnel::setup_individual_client_routes; @@ -29,8 +25,6 @@ use althea_types::regions::Regions; use althea_types::Identity; use althea_types::WgKey; use althea_types::{ExitClientDetails, ExitClientIdentity, ExitDetails, ExitState, ExitVerifMode}; -use clarity::Address; -use exit_trust_root::client_db::get_registered_client_using_wgkey; use exit_trust_root::endpoints::RegisterRequest; use exit_trust_root::endpoints::SubmitCodeRequest; use phonenumber::PhoneNumber; @@ -41,29 +35,13 @@ use settings::get_rita_exit; use std::collections::HashMap; use std::collections::HashSet; use std::net::IpAddr; -use std::sync::Arc; -use std::sync::RwLock; use std::time::Duration; use std::time::Instant; use std::time::SystemTime; -use web30::client::Web3; pub mod geoip; pub mod in_memory_database; -#[derive(Clone, Debug, Default)] -pub struct RitaExitState { - ip_assignment_map: IpAssignmentMap, - geoip_cache: HashMap, -} - -lazy_static! { - /// Keep track of geoip information as well as ip addrs assigned to clients and ensure collisions dont happen. In worst case - /// the exit restarts and loses all this data in which case those client they had collision may get new - /// ip addrs and would need to setup wg exit tunnel again - static ref RITA_EXIT_STATE: Arc> = Arc::new(RwLock::new(RitaExitState::default())); -} - /// one day in seconds pub const ONE_DAY: i64 = 86400; @@ -96,7 +74,11 @@ pub async fn signup_client(client: ExitClientIdentity) -> Result Result> { - trace!("Checking if record exists for {:?}", client.global.mesh_ip); - let exit = get_rita_exit(); - let exit_network = exit.exit_network.clone(); - let own_internal_ip = exit_network.internal_ipv4.internal_ip(); - let internal_netmask = exit_network.internal_ipv4.prefix(); - - match get_registered_client_using_wgkey( - client.global.wg_public_key, - our_address, - contract_addr, - contact, - ) - .await - { - Ok(their_record) => { - trace!("record exists, updating"); - - let current_ip: IpAddr = - get_client_internal_ip(their_record, internal_netmask, own_internal_ip)?; - let current_internet_ipv6 = get_client_ipv6( - their_record, - exit_network.get_ipv6_subnet_alt(), - exit.get_client_subnet_size() - .unwrap_or(DEFAULT_CLIENT_SUBNET_SIZE), - )?; - - Ok(ExitState::Registered { - our_details: ExitClientDetails { - client_internal_ip: current_ip, - internet_ipv6_subnet: current_internet_ipv6, - }, - general_details: get_exit_info(), - message: "Registration OK".to_string(), - identity: Box::new(exit.get_exit_identity()), - }) - } - Err(e) => { - trace!("Failed to retrieve a client: {}", e); - Err(Box::new(RitaExitError::NoClientError)) - } - } -} - /// Every 5 seconds we validate all online clients to make sure that they are in the right region /// we also do this in the client status requests but we want to handle the edge case of a modified /// client that doesn't make status requests -pub fn validate_clients_region( +pub async fn validate_clients_region( + geoip_cache: &mut HashMap, clients_list: Vec, ) -> Result, Box> { info!("Starting exit region validation"); @@ -300,7 +233,7 @@ pub fn validate_clients_region( } let list = get_gateway_ip_bulk(ip_vec, EXIT_LOOP_TIMEOUT)?; for item in list.iter() { - let res = verify_ip(item.gateway_ip); + let res = verify_ip(geoip_cache, item.gateway_ip).await; match res { Ok(true) => trace!("{:?} is from an allowed ip", item), Ok(false) => { @@ -518,7 +451,7 @@ pub fn setup_clients( for c_key in changed_clients_return.new_v1 { if let Some(c) = key_to_client_map.get(&c_key) { setup_individual_client_routes( - match get_client_internal_ip(*c, internal_netmask, internal_ip_v4) { + match get_or_add_client_internal_ip(*c, internal_netmask, internal_ip_v4) { Ok(a) => a, Err(e) => { error!( @@ -536,7 +469,7 @@ pub fn setup_clients( for c_key in changed_clients_return.new_v2 { if let Some(c) = key_to_client_map.get(&c_key) { teardown_individual_client_routes( - match get_client_internal_ip(*c, internal_netmask, internal_ip_v4) { + match get_or_add_client_internal_ip(*c, internal_netmask, internal_ip_v4) { Ok(a) => a, Err(e) => { error!( @@ -713,7 +646,7 @@ pub fn enforce_exit_clients( error!("Failed to setup flow for wg_exit_v2 {:?}", e); } // gets the client ipv6 flow for this exit specifically - let client_ipv6 = get_client_ipv6( + let client_ipv6 = get_or_add_client_ipv6( debt_entry.identity, settings::get_rita_exit().exit_network.get_ipv6_subnet_alt(), settings::get_rita_exit() diff --git a/rita_exit/src/lib.rs b/rita_exit/src/lib.rs index fc27a3dff..6b8318e74 100644 --- a/rita_exit/src/lib.rs +++ b/rita_exit/src/lib.rs @@ -1,9 +1,6 @@ #[macro_use] extern crate log; -#[macro_use] -extern crate lazy_static; - #[macro_use] extern crate serde_derive; diff --git a/rita_exit/src/network_endpoints/mod.rs b/rita_exit/src/network_endpoints/mod.rs index 43e20ede2..080f56bc1 100644 --- a/rita_exit/src/network_endpoints/mod.rs +++ b/rita_exit/src/network_endpoints/mod.rs @@ -1,8 +1,7 @@ //! Network endpoints for rita-exit that are not dashboard or local infromational endpoints //! these are called by rita instances to operate the mesh -use crate::database::{client_status, signup_client}; - +use crate::database::signup_client; use crate::RitaExitError; use actix_web::web; use actix_web::{http::StatusCode, web::Json, HttpRequest, HttpResponse, Result}; diff --git a/rita_exit/src/rita_loop/mod.rs b/rita_exit/src/rita_loop/mod.rs index 4984736d8..5705efdbc 100644 --- a/rita_exit/src/rita_loop/mod.rs +++ b/rita_exit/src/rita_loop/mod.rs @@ -11,24 +11,34 @@ //! wakes up to restart the inner thread if anything goes wrong. use crate::database::{ - enforce_exit_clients, setup_clients, validate_clients_region, ExitClientSetupStates, + enforce_exit_clients, get_exit_info, setup_clients, validate_clients_region, + ExitClientSetupStates, }; -use crate::network_endpoints::*; use crate::traffic_watcher::watch_exit_traffic; +use crate::{ + generate_iterative_client_subnet, hash_wgkey, network_endpoints::*, IpAssignmentMap, + RitaExitError, CLIENT_WG_PORT, DEFAULT_CLIENT_SUBNET_SIZE, MAX_IP_RETRIES, +}; use actix::System as AsyncSystem; use actix_web::{web, App, HttpServer}; use althea_kernel_interface::exit_server_tunnel::{one_time_exit_setup, setup_nat}; use althea_kernel_interface::setup_wg_if::create_blank_wg_interface; use althea_kernel_interface::wg_iface_counter::WgUsage; use althea_kernel_interface::ExitClient; -use althea_types::{Identity, SignedExitServerList, WgKey}; +use althea_types::regions::Regions; +use althea_types::{ + ExitClientDetails, ExitClientIdentity, ExitState, Identity, SignedExitServerList, WgKey, +}; use babel_monitor::{open_babel_stream, parse_routes}; use clarity::Address; use exit_trust_root::client_db::get_all_registered_clients; +use ipnetwork::{IpNetwork, Ipv4Network, Ipv6Network}; use rita_common::debt_keeper::DebtAction; use rita_common::rita_loop::get_web3_server; use settings::exit::EXIT_LIST_PORT; +use settings::get_rita_exit; use std::collections::{HashMap, HashSet}; +use std::net::{IpAddr, Ipv4Addr}; use std::sync::{Arc, RwLock}; use std::thread; use std::time::Duration; @@ -45,70 +55,421 @@ pub const LEGACY_INTERFACE: &str = "wg_exit"; pub const EXIT_INTERFACE: &str = "wg_exit_v2"; /// Cache of rita exit state to track across ticks -#[derive(Default, Clone, Debug, Serialize, Deserialize)] -pub struct RitaExitCache { - // a cache of what tunnels we had setup last round, used to prevent extra setup ops +#[derive(Clone, Debug)] +pub struct RitaExitData { + /// a cache of what tunnels we had setup last round, used to prevent extra setup ops wg_clients: HashSet, - // a list of client debts from the last round, to prevent extra enforcement ops + /// a list of client debts from the last round, to prevent extra enforcement ops debt_actions: HashSet<(Identity, DebtAction)>, - // if we have successfully setup the wg exit tunnel in the past, if false we have never - // setup exit clients and should crash if we fail to do so, otherwise we are preventing - // proper failover + /// if we have successfully setup the wg exit tunnel in the past, if false we have never + /// setup exit clients and should crash if we fail to do so, otherwise we are preventing + /// proper failover successful_setup: bool, - // cache of b19 routers we have successful rules and routes for + /// cache of b19 routers we have successful rules and routes for wg_exit_clients: HashSet, - // cache of b20 routers we have successful rules and routes for + /// cache of b20 routers we have successful rules and routes for wg_exit_v2_clients: HashSet, - // A blacklist of clients that we fail geoip verification for. We tear down these routes + /// A blacklist of clients that we fail geoip verification for. We tear down these routes geoip_blacklist: Vec, + /// A list of geoip info that we have already requested since startup, to reduce api usage + geoip_cache: HashMap, + // ip assignments for clients, represented as a locked map so that we can tell clients what ip's they where + // assigned in the actix worker threads which also has a copy of this lock + ip_assignments: Arc>, + /// A cache of the last usage of the wg tunnels, this must be maintained from when the tunnel for a specific + /// client is created to when it is destroyed/recreated otherwise overbilling will occur + usage_history: HashMap, + /// A list of currently registered clients from the exit database smart contract, updated every tick + registered_clients: HashSet, } -pub type ExitLock = Arc>>; +impl RitaExitData { + pub fn new( + ip_assignments: Arc>, + reg_clients_list: HashSet, + ) -> Self { + RitaExitData { + wg_clients: HashSet::new(), + debt_actions: HashSet::new(), + successful_setup: false, + wg_exit_clients: HashSet::new(), + wg_exit_v2_clients: HashSet::new(), + geoip_blacklist: Vec::new(), + geoip_cache: HashMap::new(), + ip_assignments, + usage_history: HashMap::new(), + registered_clients: reg_clients_list, + } + } -/// Starts the rita exit billing thread, this thread deals with blocking db -/// calls and performs various tasks required for billing. The tasks interacting -/// with actix are the most troublesome because the actix system may restart -/// and crash this thread. To prevent that and other crashes we have a watchdog -/// thread which simply restarts the internal thread. -pub fn start_rita_exit_loop(reg_clients_list: Vec) { - setup_exit_wg_tunnel(); + pub fn is_client_registered(&self, client: Identity) -> bool { + self.registered_clients.contains(&client) + } - // the last usage of the wg tunnels, if an innner thread restarts this must be preserved to prevent - // overbilling users - let usage_history = Arc::new(RwLock::new(HashMap::new())); - - // this will always be an error, so it's really just a loop statement - // with some fancy destructuring, blocking the caller thread as a watchdog - while let Err(e) = { - let reg_clients_list = reg_clients_list.clone(); - // ARC will simply clone the same reference - let usage_history = usage_history.clone(); - thread::spawn(move || { - // Internal exit cache that store state across multiple ticks - let mut rita_exit_cache = RitaExitCache::default(); - let mut reg_clients_list = reg_clients_list.clone(); - let runner = AsyncSystem::new(); - runner.block_on(async move { - loop { - reg_clients_list = update_client_list(reg_clients_list).await; - - rita_exit_cache = rita_exit_loop( - reg_clients_list.clone(), - rita_exit_cache, - usage_history.clone(), - ) - .await; - } + pub fn get_ipv6_assignments(&self) -> HashMap { + self.ip_assignments.read().unwrap().ipv6_assignments.clone() + } + + pub fn get_internal_ip_assignments(&self) -> HashMap { + self.ip_assignments + .read() + .unwrap() + .internal_ip_assignments + .clone() + } + + /// Gets the status of a client, this may include assigning them an IP if there's no existing assignment + pub fn get_client_status( + &mut self, + client: ExitClientIdentity, + ) -> Result> { + trace!("Checking if record exists for {:?}", client.global.mesh_ip); + let exit = get_rita_exit(); + let exit_network = exit.exit_network.clone(); + let own_internal_ip = exit_network.internal_ipv4.internal_ip(); + let internal_netmask = exit_network.internal_ipv4.prefix(); + if self.is_client_registered(client.global) { + trace!("record exists, updating"); + + let current_ip: Ipv4Addr = self.get_or_add_client_internal_ip( + client.global, + internal_netmask, + own_internal_ip, + )?; + let current_internet_ipv6 = self.get_or_add_client_ipv6( + client.global, + exit_network.get_ipv6_subnet_alt(), + exit.get_client_subnet_size() + .unwrap_or(DEFAULT_CLIENT_SUBNET_SIZE), + )?; + + let current_internet_ipv6: Option = current_internet_ipv6.map(|a| a.into()); + + Ok(ExitState::Registered { + our_details: ExitClientDetails { + client_internal_ip: IpAddr::V4(current_ip), + internet_ipv6_subnet: current_internet_ipv6, + }, + general_details: get_exit_info(), + message: "Registration OK".to_string(), + identity: Box::new(exit.get_exit_identity()), }) + } else { + Err(Box::new(RitaExitError::NoClientError)) + } + } + + /// Returns true if the provided ipv4 address is valid for use between the client and the exit + /// as the internal ip + pub fn ip_is_valid_to_assign( + &self, + network: Ipv4Network, + assigned_ip: Ipv4Addr, + our_ip: Ipv4Addr, + ) -> bool { + let broadcast = network.broadcast(); + let network_ip = network.network(); + + // Collision with our ip + if assigned_ip == our_ip { + return false; + } + // collision with the network ip + if assigned_ip == network_ip { + return false; + } + // collision with broadcast address + if assigned_ip == broadcast { + return false; + } + + self.ip_assignments + .read() + .unwrap() + .internal_ip_assignments + .get(&assigned_ip) + .is_none() + } + + /// Given a client identity, get the clients internal ipv4 addr using the wgkey as a generative seed + /// this is the ip used for the wg_exit tunnel for the client. Not the clients public ip visible to the internet + /// which is determined by the NAT settings on the exit + pub fn get_or_add_client_internal_ip( + &mut self, + their_record: Identity, + netmask: u8, + gateway_ip: Ipv4Addr, + ) -> Result> { + let wg_hash = hash_wgkey(their_record.wg_public_key); + // total number of available addresses + let total_addresses: u64 = 2_u64.pow((32 - netmask).into()); + let mut generative_index = wg_hash % total_addresses; + let network = match Ipv4Network::new(gateway_ip, netmask) { + Ok(a) => a, + Err(e) => { + return Err(Box::new(RitaExitError::MiscStringError(format!( + "Unable to setup and ipnetwork to generate internal ip {}", + e + )))) + } + }; + + // check if we already have an ip for this client, TODO optimize this datastructure, it's optimized for generating + // new ip, not for lookup, the generation process can be streamlined to avoid that. + for (ip, wg_key) in self + .ip_assignments + .write() + .unwrap() + .internal_ip_assignments + .iter() + { + if *wg_key == their_record.wg_public_key { + return Ok(*ip); + } + } + + // Keep trying to generate an address till we get a valid one + let mut retries = 0; + loop { + // Return an error if we retry too many times + if retries > MAX_IP_RETRIES { + return Err(Box::new(RitaExitError::MiscStringError(format!( + "Unable to get internal ip using network {} and index {}", + network, generative_index + )))); + } + + let internal_ip = network.nth(match generative_index.try_into() { + Ok(a) => a, + Err(e) => { + warn!("Internal Ip failure: {}", e); + retries += 1; + generative_index = (generative_index + 1) % total_addresses; + continue; + } + }); + + let internal_ip = match internal_ip { + Some(a) => a, + None => { + retries += 1; + generative_index = (generative_index + 1) % total_addresses; + continue; + } + }; + + // Validate that this ip is valid and return it + if self.ip_is_valid_to_assign(network, internal_ip, gateway_ip) { + let mut ips = self.ip_assignments.write().unwrap(); + ips.internal_ip_assignments + .insert(internal_ip, their_record.wg_public_key.clone()); + return Ok(internal_ip); + } else { + retries += 1; + generative_index = (generative_index + 1) % total_addresses; + continue; + } + } + } + + /// Validates if an IPv6 subnet is valid to assign to a client + pub fn is_ipv6_subnet_valid_to_assign(&self, client_subnet: Ipv6Network) -> bool { + self.ip_assignments + .read() + .unwrap() + .ipv6_assignments + .get(&client_subnet) + .is_none() + } + + pub fn get_or_add_client_ipv6( + &mut self, + their_record: Identity, + exit_sub: Option, + client_subnet_size: u8, + ) -> Result, Box> { + if let Some(exit_sub) = exit_sub { + let wg_hash = hash_wgkey(their_record.wg_public_key); + + // This bitshifting is the total number of client subnets available. We are checking that our iterative index + // is lower than this number. For example, exit subnet: fd00:1000/120, client subnet /124, number of subnets will be + // 2^(124 - 120) => 2^4 => 16 + let total_subnets = 1 << (client_subnet_size - exit_sub.prefix()); + let mut generative_index = wg_hash % total_subnets; + + // Loop to try to generate a valid address + let mut retries = 0; + loop { + // Return an error if we retry too many times + if retries > MAX_IP_RETRIES { + return Err(Box::new(RitaExitError::MiscStringError(format!( + "Unable to get internet ipv6 using network {} and index {}", + exit_sub, generative_index + )))); + } + + let client_subnet = generate_iterative_client_subnet( + exit_sub, + generative_index, + client_subnet_size, + )?; + + if self.is_ipv6_subnet_valid_to_assign(client_subnet) { + let mut ips = self.ip_assignments.write().unwrap(); + ips.ipv6_assignments + .insert(client_subnet, their_record.wg_public_key.clone()); + return Ok(Some(client_subnet)); + } else { + retries += 1; + generative_index = (generative_index + 1) % total_subnets; + continue; + } + } + } else { + // This exit doesnt support ipv6 + Ok(None) + } + } + + /// Convert an identity into a rita exit client, this is used to setup the exit tunnel for a client + /// if the client has not already been assigned an ip, it will be assigned one as the exit client + /// is created + pub fn id_to_exit_client( + &mut self, + client: Identity, + ) -> Result> { + let internet_ipv6 = self.get_or_add_client_ipv6( + client, + settings::get_rita_exit().exit_network.get_ipv6_subnet_alt(), + settings::get_rita_exit() + .get_client_subnet_size() + .unwrap_or(DEFAULT_CLIENT_SUBNET_SIZE), + )?; + let internal_ip = self.get_or_add_client_internal_ip( + client, + settings::get_rita_exit() + .exit_network + .internal_ipv4 + .prefix(), + settings::get_rita_exit() + .exit_network + .internal_ipv4 + .internal_ip(), + )?; + let internet_ipv6 = internet_ipv6.map(|a| a.into()); + + Ok(ExitClient { + mesh_ip: client.mesh_ip, + internal_ip: IpAddr::V4(internal_ip), + port: CLIENT_WG_PORT, + public_key: client.wg_public_key, + internet_ipv6, }) - .join() - } { - error!("Exit loop thread panicked! Respawning {:?}", e); + } +} + +/// Starts the rita exit billing thread, this thread deals with blocking db +/// calls and performs various tasks required for billing. If this thread crashes +/// due to consistenty requirements the whole application should be restarted +/// this will cause the wg tunnels to get torn down and rebuilt, putting things back into +/// a consistent state +pub async fn start_rita_exit_loop( + reg_clients_list: HashSet, + ip_assignments: Arc>, +) { + setup_exit_wg_tunnel(); + + let mut rita_exit_cache = RitaExitData::new(ip_assignments, reg_clients_list); + loop { + let start = Instant::now(); + + // Internal exit cache that store state across multiple ticks + rita_exit_cache.registered_clients = + update_client_list(rita_exit_cache.registered_clients.clone()).await; + + let rita_exit = settings::get_rita_exit(); + let babel_port = rita_exit.network.babel_port; + + let start_bill_benchmark = Instant::now(); + // watch and bill for traffic + bill( + babel_port, + start, + rita_exit_cache.registered_clients.clone(), + rita_exit_cache.usage_history.clone(), + ); + info!( + "Finished Rita billing in {}ms", + start_bill_benchmark.elapsed().as_millis() + ); + + info!("About to setup clients"); + let start_setup_benchmark = Instant::now(); + // Create and update client tunnels + match setup_clients( + rita_exit_cache.registered_clients.iter().cloned().collect(), + rita_exit_cache.geoip_blacklist.clone(), + ExitClientSetupStates { + old_clients: rita_exit_cache.wg_clients.clone(), + wg_exit_clients: rita_exit_cache.wg_exit_clients.clone(), + wg_exit_v2_clients: rita_exit_cache.wg_exit_v2_clients.clone(), + }, + ) { + Ok(client_states) => { + rita_exit_cache.successful_setup = true; + rita_exit_cache.wg_clients = client_states.old_clients; + rita_exit_cache.wg_exit_clients = client_states.wg_exit_clients; + rita_exit_cache.wg_exit_v2_clients = client_states.wg_exit_v2_clients; + } + Err(e) => error!("Setup clients failed with {:?}", e), + } + info!( + "Finished Rita setting up clients in {}ms", + start_setup_benchmark.elapsed().as_millis() + ); + + // Make sure no one we are setting up is geoip unauthorized + let start_region_benchmark = Instant::now(); + info!("about to check regions"); + if let Some(list) = check_regions( + &mut rita_exit_cache.geoip_cache, + start, + rita_exit_cache.registered_clients.iter().cloned().collect(), + ) + .await + { + rita_exit_cache.geoip_blacklist = list; + } + info!( + "Finished Rita checking region in {}ms", + start_region_benchmark.elapsed().as_millis() + ); + info!("About to enforce exit clients"); + // handle enforcement on client tunnels by querying debt keeper + // this consumes client list + let start_enforce_benchmark = Instant::now(); + match enforce_exit_clients( + rita_exit_cache.registered_clients.iter().cloned().collect(), + &rita_exit_cache.debt_actions.clone(), + ) { + Ok(new_debt_actions) => rita_exit_cache.debt_actions = new_debt_actions, + Err(e) => warn!("Failed to enforce exit clients with {:?}", e,), + } + info!( + "Finished Rita enforcement in {}ms ", + start_enforce_benchmark.elapsed().as_millis() + ); + info!( + "Finished Rita exit loop in {}ms, all vars should be dropped", + start.elapsed().as_millis(), + ); + + thread::sleep(EXIT_LOOP_SPEED_DURATION); } } /// Updates the client list, if this is not successful the old client list is used -async fn update_client_list(reg_clients_list: Vec) -> Vec { +async fn update_client_list(reg_clients_list: HashSet) -> HashSet { let payment_settings = settings::get_rita_common().payment; let contract_address = settings::get_rita_exit() .exit_network @@ -138,90 +499,21 @@ async fn update_client_list(reg_clients_list: Vec) -> Vec { } } -async fn rita_exit_loop( - reg_clients_list: Vec, - rita_exit_cache: RitaExitCache, - usage_history: ExitLock, -) -> RitaExitCache { - let mut rita_exit_cache = rita_exit_cache; - let start = Instant::now(); - - let rita_exit = settings::get_rita_exit(); - let babel_port = rita_exit.network.babel_port; - - let ids = reg_clients_list.clone(); - let start_bill_benchmark = Instant::now(); - // watch and bill for traffic - bill(babel_port, start, ids, usage_history); - info!( - "Finished Rita billing in {}ms", - start_bill_benchmark.elapsed().as_millis() - ); - - info!("About to setup clients"); - let start_setup_benchmark = Instant::now(); - // Create and update client tunnels - match setup_clients( - reg_clients_list.clone(), - rita_exit_cache.geoip_blacklist.clone(), - ExitClientSetupStates { - old_clients: rita_exit_cache.wg_clients.clone(), - wg_exit_clients: rita_exit_cache.wg_exit_clients.clone(), - wg_exit_v2_clients: rita_exit_cache.wg_exit_v2_clients.clone(), - }, - ) { - Ok(client_states) => { - rita_exit_cache.successful_setup = true; - rita_exit_cache.wg_clients = client_states.old_clients; - rita_exit_cache.wg_exit_clients = client_states.wg_exit_clients; - rita_exit_cache.wg_exit_v2_clients = client_states.wg_exit_v2_clients; - } - Err(e) => error!("Setup clients failed with {:?}", e), - } - info!( - "Finished Rita setting up clients in {}ms", - start_setup_benchmark.elapsed().as_millis() - ); - - // Make sure no one we are setting up is geoip unauthorized - let start_region_benchmark = Instant::now(); - info!("about to check regions"); - if let Some(list) = check_regions(start, reg_clients_list.clone()) { - rita_exit_cache.geoip_blacklist = list; - } - info!( - "Finished Rita checking region in {}ms", - start_region_benchmark.elapsed().as_millis() - ); - info!("About to enforce exit clients"); - // handle enforcement on client tunnels by querying debt keeper - // this consumes client list - let start_enforce_benchmark = Instant::now(); - match enforce_exit_clients(reg_clients_list, &rita_exit_cache.debt_actions.clone()) { - Ok(new_debt_actions) => rita_exit_cache.debt_actions = new_debt_actions, - Err(e) => warn!("Failed to enforce exit clients with {:?}", e,), - } - info!( - "Finished Rita enforcement in {}ms ", - start_enforce_benchmark.elapsed().as_millis() - ); - info!( - "Finished Rita exit loop in {}ms, all vars should be dropped", - start.elapsed().as_millis(), - ); - - thread::sleep(EXIT_LOOP_SPEED_DURATION); - rita_exit_cache -} - -fn bill(babel_port: u16, start: Instant, ids: Vec, usage_history: ExitLock) { +fn bill( + babel_port: u16, + start: Instant, + ids: HashSet, + usage_history: HashMap, +) { trace!("about to try opening babel stream"); match open_babel_stream(babel_port, EXIT_LOOP_TIMEOUT) { Ok(mut stream) => match parse_routes(&mut stream) { Ok(routes) => { trace!("Sending traffic watcher message?"); - if let Err(e) = watch_exit_traffic(usage_history, &routes, &ids) { + if let Err(e) = + watch_exit_traffic(usage_history, &routes, ids.iter().cloned().collect()) + { error!( "Watch exit traffic failed with {}, in {} millis", e, @@ -254,10 +546,14 @@ fn bill(babel_port: u16, start: Instant, ids: Vec, usage_history: Exit /// Run a region validation and return a list of blacklisted clients. This list is later used /// in setup clients to teardown blacklisted client tunnels -fn check_regions(start: Instant, clients_list: Vec) -> Option> { +async fn check_regions( + geoip_cache: &mut HashMap, + start: Instant, + clients_list: Vec, +) -> Option> { let val = settings::get_rita_exit().allowed_countries.is_empty(); if !val { - let res = validate_clients_region(clients_list); + let res = validate_clients_region(geoip_cache, clients_list).await; match res { Err(e) => { warn!( @@ -331,19 +627,19 @@ fn setup_exit_wg_tunnel() { .unwrap(); } -pub fn start_rita_exit_endpoints(workers: usize) { +pub fn start_rita_exit_endpoints(ip_assignments: Arc>) { + let web_data = web::Data::new(ip_assignments); thread::spawn(move || { let runner = AsyncSystem::new(); runner.block_on(async move { - // Exit stuff, huge threadpool to offset Pgsql blocking - let _res = HttpServer::new(|| { + let _res = HttpServer::new(move || { App::new() .route("/secure_setup", web::post().to(secure_setup_request)) .route("/secure_status", web::post().to(secure_status_request)) .route("/client_debt", web::post().to(get_client_debt)) .route("/time", web::get().to(get_exit_timestamp_http)) + .app_data(web_data.clone()) }) - .workers(workers) .bind(format!( "[::0]:{}", settings::get_rita_exit().exit_network.exit_hello_port @@ -360,7 +656,7 @@ pub fn start_rita_exit_endpoints(workers: usize) { /// instance of this IP due to the way babel handles multihoming. Due to race conditions we don't explicitly /// bind to the IP for this listener, we instead bind to all available IPs. As we make tunnels kernel interface /// will add the ip to each wg tunnel and then babel will handle the rest. -pub fn start_rita_exit_list_endpoint(workers: usize) { +pub fn start_rita_exit_list_endpoint() { let exit_contract_data_cache: Arc>> = Arc::new(RwLock::new(HashMap::new())); let web_data = web::Data::new(exit_contract_data_cache.clone()); @@ -372,7 +668,6 @@ pub fn start_rita_exit_list_endpoint(workers: usize) { .route("/exit_list", web::post().to(get_exit_list)) .app_data(web_data.clone()) }) - .workers(workers) .bind(format!("[::0]:{}", EXIT_LIST_PORT,)) .unwrap() .shutdown_timeout(0) diff --git a/rita_exit/src/traffic_watcher/mod.rs b/rita_exit/src/traffic_watcher/mod.rs index 8260c1902..ddb3c8f61 100644 --- a/rita_exit/src/traffic_watcher/mod.rs +++ b/rita_exit/src/traffic_watcher/mod.rs @@ -8,7 +8,6 @@ //! //! Also handles enforcement of nonpayment, since there's no need for a complicated TunnelManager for exits -use crate::rita_loop::ExitLock; use crate::rita_loop::EXIT_INTERFACE; use crate::rita_loop::LEGACY_INTERFACE; use crate::RitaExitError; @@ -151,12 +150,10 @@ fn debts_logging(debts: &HashMap) { /// This traffic watcher watches how much traffic each we send and receive from each client. pub fn watch_exit_traffic( - usage_history: ExitLock, + mut usage_history: HashMap, routes: &[Route], - clients: &[Identity], + clients: Vec, ) -> Result<(), Box> { - let mut usage_history = usage_history.write().unwrap(); - // Since Althea is a pay per forward network we must add a surcharge for transaction fees // to our own price. In the case Exit -> A -> B -> C the exit pays A a lump sum for it's own // fees as well as B's fees. This means the exit pays the transaction fee (a percentage) for @@ -176,7 +173,7 @@ pub fn watch_exit_traffic( } }; - let ret = generate_helper_maps(&our_id, clients); + let ret = generate_helper_maps(&our_id, &clients); let identities = ret.wg_to_id; let id_from_ip = ret.ip_to_id; let destinations = get_babel_info(routes, our_id, id_from_ip);