Skip to content

Commit

Permalink
WIP: Remove exit lazy static
Browse files Browse the repository at this point in the history
This patch simplifies the dataflow for th rita exit module by removing
the lazy static exit database and instead holding that data in the main
rita exit thread.
  • Loading branch information
jkilpatr committed Nov 7, 2024
1 parent 6ce7f66 commit 063e765
Show file tree
Hide file tree
Showing 12 changed files with 631 additions and 574 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions exit_trust_root/src/client_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use clarity::{
abi::{encode_call, AbiToken},
Address, PrivateKey, Uint256,
};
use std::{net::IpAddr, time::Duration, vec};
use std::{collections::HashSet, net::IpAddr, time::Duration, vec};
use tokio::time::timeout as future_timeout;
use web30::{
client::Web3,
Expand All @@ -24,7 +24,7 @@ pub async fn get_all_registered_clients(
web30: &Web3,
requester_address: Address,
contract: Address,
) -> Result<Vec<Identity>, Web3Error> {
) -> Result<HashSet<Identity>, Web3Error> {
let payload = encode_call("getAllRegisteredUsers()", &[])?;
let res = web30
.simulate_transaction(
Expand All @@ -33,7 +33,8 @@ pub async fn get_all_registered_clients(
)
.await?;

convert_althea_types_to_web3_error(Identity::decode_array_from_eth_abi(res))
let val = convert_althea_types_to_web3_error(Identity::decode_array_from_eth_abi(res))?;
Ok(val.into_iter().collect())
}

pub async fn get_registered_client_using_wgkey(
Expand Down
10 changes: 0 additions & 10 deletions exit_trust_root/src/register_client_batch_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,6 @@ pub struct RegistrationRequest {

pub const MAX_BATCH_SIZE: usize = 75;

/// Utility function used to easily perform O(1) lookups against the identities list
pub fn get_clients_hashset(input: Vec<Identity>) -> HashSet<Identity> {
let mut output = HashSet::new();
for i in input {
output.insert(i);
}
output
}

/// This function monitors the registration queue lock free queue. It will dequeue any new entries and attempt to register them
/// in a batch sent every REGISTRATION_LOOP_SPEED seconds. This function will also check if the user is already registered before attempting to register them
pub async fn register_client_batch_loop(
Expand Down Expand Up @@ -106,7 +97,6 @@ pub async fn register_client_batch_loop(
continue;
}
};
let all_clients = get_clients_hashset(all_clients);

let mut clients_to_register = Vec::new();
for client in list.iter() {
Expand Down
2 changes: 1 addition & 1 deletion rita_bin/src/exit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ async fn main() {
)));

// this call blocks, transforming this startup thread into the main exit watchdog thread
start_rita_exit_loop(clients);
start_rita_exit_loop(clients).await;
}

/// This function performs startup integrity checks on the config and system. It checks that we can reach the internet
Expand Down
1 change: 0 additions & 1 deletion rita_exit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ babel_monitor = { path = "../babel_monitor" }
actix = {workspace = true}
awc = {workspace = true}
handlebars = "5.1"
lazy_static = "1.5"
ipnetwork = "0.20"
clarity = {workspace = true}
serde = "1.0"
Expand Down
138 changes: 83 additions & 55 deletions rita_exit/src/database/geoip.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use actix::System;
use crate::RitaExitError;
use althea_kernel_interface::interface_tools::get_wg_remote_ip;
use althea_types::regions::Regions;
use babel_monitor::open_babel_stream;
Expand All @@ -9,9 +9,6 @@ use std::collections::HashMap;
use std::net::IpAddr;
use std::time::Duration;

use crate::database::RITA_EXIT_STATE;
use crate::RitaExitError;

/// gets the gateway ip for a given mesh IP
pub fn get_gateway_ip_single(mesh_ip: IpAddr) -> Result<IpAddr, Box<RitaExitError>> {
let babel_port = settings::get_rita_exit().network.babel_port;
Expand Down Expand Up @@ -131,7 +128,10 @@ struct CountryDetails {
}

/// get ISO country code from ip, consults a in memory cache
pub fn get_country(ip: IpAddr) -> Result<Regions, Box<RitaExitError>> {
pub async fn get_country(
geoip_cache: &mut HashMap<IpAddr, Regions>,
ip: IpAddr,
) -> Result<Regions, Box<RitaExitError>> {
trace!("get GeoIP country for {}", ip.to_string());

// if allowed countries is not configured we don't care and will use
Expand Down Expand Up @@ -170,12 +170,7 @@ pub fn get_country(ip: IpAddr) -> Result<Regions, Box<RitaExitError>> {

// we have to turn this option into a string in order to avoid
// the borrow checker trying to keep this lock open for a long period
let cache_result = RITA_EXIT_STATE
.read()
.unwrap()
.geoip_cache
.get(&ip)
.copied();
let cache_result = geoip_cache.get(&ip).copied();

match cache_result {
Some(code) => Ok(code),
Expand All @@ -187,55 +182,51 @@ pub fn get_country(ip: IpAddr) -> Result<Regions, Box<RitaExitError>> {
ip.to_string()
);
// run in async closure and return the result
let runner = System::new();
runner.block_on(async move {
let client = awc::Client::new();
if let Ok(mut res) = client
.get(&geo_ip_url)
.basic_auth(api_user, api_key)
.timeout(Duration::from_secs(1))
.send()
.await
{
trace!("Got geoip result {:?}", res);
if let Ok(res) = res.json().await {
let value: GeoIpRet = res;
let code = match value.country.iso_code.parse() {
Ok(r) => r,
Err(_) => {
error!(
"Failed to parse geoip response {:?}",
value.country.iso_code
);
Regions::UnkownRegion
}
};
trace!("Adding GeoIP value {:?} to cache", code);
RITA_EXIT_STATE
.write()
.unwrap()
.geoip_cache
.insert(ip, code);
trace!("Added to cache, returning");
Ok(code)
} else {
Err(Box::new(RitaExitError::MiscStringError(
"Failed to deserialize geoip response".to_string(),
)))
}
let client = awc::Client::new();
if let Ok(mut res) = client
.get(&geo_ip_url)
.basic_auth(api_user, api_key)
.timeout(Duration::from_secs(1))
.send()
.await
{
trace!("Got geoip result {:?}", res);
if let Ok(res) = res.json().await {
let value: GeoIpRet = res;
let code = match value.country.iso_code.parse() {
Ok(r) => r,
Err(_) => {
error!(
"Failed to parse geoip response {:?}",
value.country.iso_code
);
Regions::UnkownRegion
}
};
trace!("Adding GeoIP value {:?} to cache", code);
geoip_cache.insert(ip, code);
trace!("Added to cache, returning");
Ok(code)
} else {
Err(Box::new(RitaExitError::MiscStringError(
"Request failed".to_string(),
"Failed to deserialize geoip response".to_string(),
)))
}
})
} else {
Err(Box::new(RitaExitError::MiscStringError(
"Request failed".to_string(),
)))
}
}
}
}

/// Returns true or false if an ip is confirmed to be inside or outside the region and error
/// if an api error is encountered trying to figure that out.
pub fn verify_ip(request_ip: IpAddr) -> Result<bool, Box<RitaExitError>> {
pub async fn verify_ip(
geoip_cache: &mut HashMap<IpAddr, Regions>,
request_ip: IpAddr,
) -> Result<bool, Box<RitaExitError>> {
// in this case we have a gateway directly attached to the exit, so our
// peer address for them will be an fe80 linklocal ip address. When we
// detect this we know that they are in the allowed countries list because
Expand All @@ -249,7 +240,7 @@ pub fn verify_ip(request_ip: IpAddr) -> Result<bool, Box<RitaExitError>> {
if settings::get_rita_exit().allowed_countries.is_empty() {
Ok(true)
} else {
let country = get_country(request_ip)?;
let country = get_country(geoip_cache, request_ip).await?;
if !settings::get_rita_exit().allowed_countries.is_empty()
&& !settings::get_rita_exit()
.allowed_countries
Expand All @@ -262,8 +253,45 @@ pub fn verify_ip(request_ip: IpAddr) -> Result<bool, Box<RitaExitError>> {
}
}

#[test]
#[ignore]
fn test_get_country() {
get_country("8.8.8.8".parse().unwrap()).unwrap();
#[cfg(test)]
mod tests {
use std::str::FromStr;
use super::*;

#[actix_web::test]
#[ignore]
async fn test_get_country() {
let mut geoip_cache = HashMap::new();
let ip = IpAddr::from_str("8.8.8.8").unwrap();
let result = get_country(&mut geoip_cache, ip).await;
assert!(result.is_ok());
}

#[actix_web::test]
#[ignore]
async fn test_get_gateway_ip_single() {
let ip = IpAddr::from_str("2001:4860:4860::8888").unwrap();
let result = get_gateway_ip_single(ip);
assert!(result.is_ok());
}

#[actix_web::test]
#[ignore]
async fn test_get_gateway_ip_bulk() {
let ips = vec![
IpAddr::from_str("2001:4860:4860::8888").unwrap(),
IpAddr::from_str("2001:4860:4860::8844").unwrap(),
];
let result = get_gateway_ip_bulk(ips, Duration::from_secs(5));
assert!(result.is_ok());
}

#[actix_web::test]
#[ignore]
async fn test_verify_ip() {
let mut geoip_cache = HashMap::new();
let ip = IpAddr::from_str("8.8.8.8").unwrap();
let result = verify_ip(&mut geoip_cache, ip).await;
assert!(result.is_ok());
}
}
Loading

0 comments on commit 063e765

Please sign in to comment.