diff --git a/althea_types/src/interop.rs b/althea_types/src/interop.rs index 00961d91e..fc8a92d50 100644 --- a/althea_types/src/interop.rs +++ b/althea_types/src/interop.rs @@ -1,5 +1,5 @@ use crate::{contact_info::ContactType, wg_key::WgKey, BillingDetails, InstallationDetails}; -use crate::{ClientExtender, UsageTracker, WifiDevice}; +use crate::{ClientExtender, UsageTrackerFlat, UsageTrackerTransfer, WifiDevice}; use arrayvec::ArrayString; use babel_monitor::structs::Neighbor; use babel_monitor::structs::Route; @@ -335,7 +335,7 @@ pub struct LightClientLocalIdentity { /// This represents a generic payment that may be to or from us /// it contains a txid from a published transaction /// that should be validated against the blockchain -#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone)] +#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone, Copy)] pub struct PaymentTx { pub to: Identity, pub from: Identity, @@ -353,7 +353,7 @@ impl Hash for PaymentTx { /// This represents a generic payment that may be to or from us, it does not contain a txid meaning it is /// unpublished -#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Hash, Clone)] +#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Hash, Clone, Copy)] pub struct UnpublishedPaymentTx { pub to: Identity, pub from: Identity, @@ -622,7 +622,7 @@ pub struct OperatorUpdateMessage { pub operator_action: Option, /// String that holds the download link to the latest firmware release /// When a user hits 'update router', it updates to this version - /// to be removed once all routesr are updated to >= beta 19 rc9 + /// to be removed once all routers are updated to >= beta 19 rc9 pub local_update_instruction: Option, /// String that holds the download link to the latest firmware release /// When a user hits 'update router', it updates to this version @@ -747,11 +747,19 @@ pub struct OperatorCheckinMessage { /// This is a user set bandwidth limit value, it will cap the users download /// and upload to the provided value of their choosing. Denoted in mbps pub user_bandwidth_limit: Option, + /// Legacy bandwidth usage from pre beta 20 routers, one of the two will be None + pub user_bandwidth_usage: Option, /// Details of both the Client and Relay bandwidth usage over a given period determined /// by the ops_last_seen_usage_hour in OperatorUpdateMessage. When the device's last /// saved usage hour is the same as the ops last seen, we send no data here as we are up /// to date. Data sent through here gets added to a database entry for each device. - pub user_bandwidth_usage: Option, + pub user_bandwidth_usage_v2: Option, + /// Current client data usage in mbps computed as the last input to the usage tracker + /// so an average of around 5-10 seconds + pub client_mbps: Option, + /// Curent relay data usage in mbps, coputed as the last input to the usage tracker + /// so an average of around 5-10 seconds + pub relay_mbps: Option, /// This is to keep track of the rita client uptime for debugging purposes /// In the event something whacko happens, serde will magically derive def- /// fault value. diff --git a/althea_types/src/user_info.rs b/althea_types/src/user_info.rs index 19dc8cb85..557ae19c2 100644 --- a/althea_types/src/user_info.rs +++ b/althea_types/src/user_info.rs @@ -1,5 +1,4 @@ -use std::collections::VecDeque; -use std::hash::{Hash, Hasher}; +use std::collections::{HashMap, VecDeque}; use std::net::Ipv4Addr; use std::time::SystemTime; @@ -64,33 +63,85 @@ pub struct InstallationDetails { pub install_date: Option, } -/// The main actor that holds the usage state for the duration of operations -/// to be sent up to ops tools. +/// The old storage method for usage tracker data that stores flat data +/// in arrays and does not index the data via hashmap this format was abandoned +/// as error prone but is still used so legacy routers can send data #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] -pub struct UsageTracker { +pub struct UsageTrackerFlat { pub last_save_hour: u64, - pub client_bandwidth: VecDeque, - pub relay_bandwidth: VecDeque, + pub client_bandwidth: VecDeque, + pub relay_bandwidth: VecDeque, } /// A struct for tracking each hour of usage, indexed by time in hours since /// the unix epoch. -#[derive(Clone, Copy, Debug, Serialize, Deserialize)] -pub struct UsageHour { +#[derive(Clone, Copy, Debug, Serialize, Deserialize, Eq, PartialEq)] +pub struct IndexedUsageHour { pub index: u64, pub up: u64, pub down: u64, pub price: u32, } -impl Hash for UsageHour { - fn hash(&self, state: &mut H) { - self.index.hash(state); +/// A struct used to store data usage over an arbitrary period the length of time +/// is implied by the code that is handling this struct. Do not transfer without considering +/// that you may be changing units +#[derive(Clone, Copy, Debug, Serialize, Deserialize, Eq, PartialEq)] +pub struct Usage { + pub up: u64, + pub down: u64, + pub price: u32, +} + +/// The main actor that holds the usage state for the duration of operations +/// to be sent up to ops tools. +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] +pub struct UsageTrackerTransfer { + /// client bandwidth usage per hour indexd by unix timestamp in hours + pub client_bandwidth: HashMap, + /// relay bandwidth usage per hour indexd by unix timestamp in hours + pub relay_bandwidth: HashMap, + /// exit bandwidth usage per hour indexd by unix timestamp in hours + pub exit_bandwidth: HashMap, +} + +/// Used to convert between usage tracker storage formats +pub fn convert_flat_to_map_usage_data(input: VecDeque) -> HashMap { + let mut out = HashMap::new(); + for hour in input { + match out.get_mut(&hour.index) { + // we have a duplicate entry which we must correct, pick the higher data usage and keep that + Some(to_edit) => { + let duplicate_usage: Usage = *to_edit; + to_edit.up = std::cmp::max(duplicate_usage.up, hour.up); + to_edit.down = std::cmp::max(duplicate_usage.down, hour.down); + to_edit.price = std::cmp::max(duplicate_usage.price, hour.price); + } + None => { + out.insert( + hour.index, + Usage { + up: hour.up, + down: hour.down, + price: hour.price, + }, + ); + } + } } + out } -impl PartialEq for UsageHour { - fn eq(&self, other: &Self) -> bool { - self.index == other.index + +/// Used to convert between usage tracker storage formats +pub fn convert_map_to_flat_usage_data(input: HashMap) -> VecDeque { + let mut out = VecDeque::new(); + for (hour, usage) in input { + out.push_back(IndexedUsageHour { + index: hour, + up: usage.up, + down: usage.down, + price: usage.price, + }) } + out } -impl Eq for UsageHour {} diff --git a/integration_tests/src/setup_utils/database.rs b/integration_tests/src/setup_utils/database.rs index e2068e1a2..5acf7851c 100644 --- a/integration_tests/src/setup_utils/database.rs +++ b/integration_tests/src/setup_utils/database.rs @@ -13,8 +13,8 @@ use std::{ /// Starts the exit postgres instance in the native system namespace, TODO insert plumbing so that exits can reach it pub fn start_postgres() { const POSTGRES_USER: &str = "postgres"; - const POSTGRES_BIN: &str = "/usr/lib/postgresql/15/bin/postgres"; - const INITDB_BIN: &str = "/usr/lib/postgresql/15/bin/initdb"; + const POSTGRES_BIN: &str = "/usr/lib/postgresql/16/bin/postgres"; + const INITDB_BIN: &str = "/usr/lib/postgresql/16/bin/initdb"; // for this test script const DB_URL_LOCAL: &str = "postgres://postgres@127.0.0.1/test"; // for the rita exit instances diff --git a/legacy_integration_tests/container/Dockerfile b/legacy_integration_tests/container/Dockerfile index 8190f9b93..c8b844a10 100644 --- a/legacy_integration_tests/container/Dockerfile +++ b/legacy_integration_tests/container/Dockerfile @@ -5,8 +5,8 @@ RUN apt-get install -y python3-termcolor python3-toml python3-networkx python3-m RUN curl https://sh.rustup.rs -sSf | sh -s -- -y RUN PATH=$PATH:$HOME/.cargo/bin cargo install diesel_cli --force ENV POSTGRES_USER=postgres -ENV POSTGRES_BIN=/usr/lib/postgresql/15/bin/postgres -ENV INITDB_BIN=/usr/lib/postgresql/15/bin/initdb +ENV POSTGRES_BIN=/usr/lib/postgresql/16/bin/postgres +ENV INITDB_BIN=/usr/lib/postgresql/16/bin/initdb ARG NODES ENV SPEEDTEST_THROUGHPUT=$SPEEDTEST_THROUGHPUT ENV SPEEDTEST_DURATION=$SPEEDTEST_DURATION diff --git a/rita_client/src/dashboard/usage.rs b/rita_client/src/dashboard/usage.rs index fca0f53c6..90217ca3b 100644 --- a/rita_client/src/dashboard/usage.rs +++ b/rita_client/src/dashboard/usage.rs @@ -1,6 +1,6 @@ use actix_web_async::{HttpRequest, HttpResponse}; use rita_common::usage_tracker::get_usage_data; -use rita_common::usage_tracker::UsageType; +use rita_common::usage_tracker::structs::UsageType; pub async fn get_client_usage(_req: HttpRequest) -> HttpResponse { trace!("/usage/client hit"); diff --git a/rita_client/src/exit_manager/exit_switcher.rs b/rita_client/src/exit_manager/exit_switcher.rs index e2132e757..5869f73ea 100644 --- a/rita_client/src/exit_manager/exit_switcher.rs +++ b/rita_client/src/exit_manager/exit_switcher.rs @@ -832,7 +832,7 @@ mod tests { u16::MAX, tracking_exit, u16::MAX, - best_exit, + best_exit, 400 ), &mut vec, diff --git a/rita_client/src/operator_update/mod.rs b/rita_client/src/operator_update/mod.rs index 749206d4a..d27197338 100644 --- a/rita_client/src/operator_update/mod.rs +++ b/rita_client/src/operator_update/mod.rs @@ -8,21 +8,20 @@ use crate::exit_manager::{get_client_pub_ipv6, get_selected_exit_ip}; use crate::rita_loop::is_gateway_client; use crate::{ extend_hardware_info, reset_wifi_pass, set_router_update_instruction, set_wifi_multi_internal, + RitaClientError, }; use althea_kernel_interface::hardware_info::get_hardware_info; -use althea_types::get_sequence_num; +use althea_types::{get_sequence_num, UsageTrackerTransfer}; use althea_types::{ AuthorizedKeys, BillingDetails, ContactStorage, ContactType, CurExitInfo, ExitConnection, - HardwareInfo, OperatorAction, OperatorCheckinMessage, OperatorUpdateMessage, UsageHour, - UsageTracker, + HardwareInfo, OperatorAction, OperatorCheckinMessage, OperatorUpdateMessage, }; use num256::Uint256; use rita_common::rita_loop::is_gateway; use rita_common::tunnel_manager::neighbor_status::get_neighbor_status; use rita_common::tunnel_manager::shaping::flag_reset_shaper; -use rita_common::usage_tracker::UsageHour as RCUsageHour; -use rita_common::usage_tracker::UsageType::{Client, Relay}; -use rita_common::usage_tracker::{get_current_hour, get_usage_data}; +use rita_common::usage_tracker::structs::UsageType::{self, Client, Relay}; +use rita_common::usage_tracker::{get_current_hour, get_current_throughput, get_usage_data_map}; use rita_common::utils::option_convert; use rita_common::DROPBEAR_AUTHORIZED_KEYS; use rita_common::KI; @@ -31,7 +30,7 @@ use serde_json::Value; use settings::client::RitaClientSettings; use settings::network::NetworkSettings; use settings::payment::PaymentSettings; -use std::collections::{HashSet, VecDeque}; +use std::collections::{HashMap, HashSet}; use std::fs::{remove_file, rename, File}; use std::io::{BufRead, BufReader, Write}; use std::os::unix::fs::PermissionsExt; @@ -64,7 +63,7 @@ const UPDATE_FREQUENCY_CAP: Duration = Duration::from_secs(3600); pub async fn operator_update( ops_last_seen_usage_hour: Option, timeout: Duration, -) -> Result { +) -> Result { let url: &str; if cfg!(feature = "dev_env") { url = "http://7.7.7.7:8080/checkin"; @@ -139,30 +138,6 @@ pub async fn operator_update( }, }); - let current_hour = match get_current_hour() { - Ok(hour) => hour, - Err(e) => { - error!("System time is set earlier than unix epoch {:?}", e); - return Err(()); - } - }; - let last_seen_hour = ops_last_seen_usage_hour.unwrap_or(0); - // We check that the difference is >1 because we leave a 1 hour buffer to prevent from sending over an incomplete current hour - let send_hours = current_hour - last_seen_hour > 1; - let mut usage_tracker_data: Option = None; - // if ops_last_seen_usage_hour is a None the thread has restarted and we are waiting for ops to tell us how much data we need to send, - // which will be populated with the next checkin cycle. we only send 1 month at a time if ops is requesting the full usage history. - if send_hours && ops_last_seen_usage_hour.is_some() { - let usage_data_client = get_usage_data(Client); - let usage_data_relay = get_usage_data(Relay); - usage_tracker_data = process_usage_data( - usage_data_client, - usage_data_relay, - last_seen_hour, - current_hour, - ) - } - let exit_con = Some(ExitConnection { cur_exit, client_pub_ipv6: get_client_pub_ipv6(), @@ -184,7 +159,10 @@ pub async fn operator_update( hardware_info, user_bandwidth_limit, rita_uptime: RITA_UPTIME.elapsed(), - user_bandwidth_usage: usage_tracker_data, + user_bandwidth_usage: None, + user_bandwidth_usage_v2: prepare_usage_data_for_upload(ops_last_seen_usage_hour)?, + client_mbps: get_current_throughput(UsageType::Client), + relay_mbps: get_current_throughput(UsageType::Relay), }) .await; @@ -196,7 +174,7 @@ pub async fn operator_update( } Err(e) => { error!("Failed to perform operator checkin with {:?}", e); - return Err(()); + return Err(e.into()); } }; @@ -204,7 +182,7 @@ pub async fn operator_update( Ok(a) => a, Err(e) => { error!("Failed to perform operator checkin with {:?}", e); - return Err(()); + return Err(e.into()); } }; @@ -260,13 +238,7 @@ pub async fn operator_update( }; set_router_update_instruction(update_instructions); perform_operator_update(new_settings.clone(), rita_client, network); - // update our count of ops last seen if we have confirmation ops is up to date on usage data - if new_settings.ops_last_seen_usage_hour == current_hour { - info!("Confirmed ops has taken usage update for hour {current_hour}"); - Ok(current_hour) - } else { - Ok(new_settings.ops_last_seen_usage_hour) - } + Ok(new_settings.ops_last_seen_usage_hour) } /// logs some hardware info that may help debug this router @@ -626,65 +598,39 @@ fn contains_forbidden_key(map: Map, forbidden_values: &[&str]) -> false } -/// Given a vecdeque of usage hours, add up to a month's worth of hours to a returned vecdeque -pub fn iterate_month_usage_data(mut data: VecDeque) -> VecDeque { - // one month in hours - let max_hour_iterations: u32 = 730; - let mut client_iter = 0; - let mut res = VecDeque::new(); - while let Some(hour) = data.pop_front() { - // either we hit max iterations or we are on the second to last entry. - res.push_back(UsageHour { - up: hour.up, - down: hour.down, - price: hour.price, - index: hour.index, - }); - if client_iter >= max_hour_iterations || data.len() == 1 { - break; +/// This function handles preparing usage data for upload to operator tools +fn prepare_usage_data_for_upload( + ops_last_seen_hour: Option, +) -> Result, RitaClientError> { + // if this is none we will not send usage data to ops + // as it indicates we don't yet know what data ops has + match ops_last_seen_hour { + Some(ops_last_seen_hour) => { + let current_hour = get_current_hour()?; + let usage_data_client = get_usage_data_map(Client); + let usage_data_relay = get_usage_data_map(Relay); + // We check that the difference is >1 because we leave a 1 hour buffer to prevent from sending over an incomplete current hour + let min_send_hour = ops_last_seen_hour; + let mut client_bandwidth = HashMap::new(); + let mut relay_bandwidth = HashMap::new(); + for (index, usage) in usage_data_client { + if index > min_send_hour && index < current_hour { + client_bandwidth.insert(index, usage); + } + } + for (index, usage) in usage_data_relay { + if index > min_send_hour && index < current_hour { + relay_bandwidth.insert(index, usage); + } + } + Ok(Some(UsageTrackerTransfer { + client_bandwidth, + relay_bandwidth, + // this function processes client usage data so this is always true + // in the future this value will be set by exits uploading data + exit_bandwidth: HashMap::new(), + })) } - client_iter += 1; + None => Ok(None), } - res -} - -/// Given our saved usage data and our last seen value, process the vecdeques so that we only -/// send to ops new data since last seen. -pub fn process_usage_data( - mut usage_data_client: VecDeque, - mut usage_data_relay: VecDeque, - last_seen_hour: u64, - current_hour: u64, -) -> Option { - // sort client and relay data in case they have come out of order somehow. This sorts by index increasing so newest data at back - usage_data_relay - .make_contiguous() - .sort_by(|a, b| a.index.cmp(&b.index)); - usage_data_client - .make_contiguous() - .sort_by(|a, b| a.index.cmp(&b.index)); - - // so this spits out the index for where last seen is, or the index of the next highest hour(returned in an error). - // we take the result -1 just in case, limit 0, since it's possible we might get back an index out of bounds at the back. - let client_last_seen_index = - match usage_data_client.binary_search_by(|x| x.index.cmp(&last_seen_hour)) { - Ok(p) => p, - Err(p) => p.saturating_sub(1), - }; - let relay_last_seen_index = - match usage_data_relay.binary_search_by(|x| x.index.cmp(&last_seen_hour)) { - Ok(p) => p, - Err(p) => p.saturating_sub(1), - }; - // remove all data before the last seen index - usage_data_client.drain(0..client_last_seen_index); - usage_data_relay.drain(0..relay_last_seen_index); - let new_client_data = iterate_month_usage_data(usage_data_client); - let new_relay_data = iterate_month_usage_data(usage_data_relay); - - Some(UsageTracker { - last_save_hour: current_hour, - client_bandwidth: new_client_data, - relay_bandwidth: new_relay_data, - }) } diff --git a/rita_client/src/operator_update/tests.rs b/rita_client/src/operator_update/tests.rs index b53723b86..617f19d35 100644 --- a/rita_client/src/operator_update/tests.rs +++ b/rita_client/src/operator_update/tests.rs @@ -1,18 +1,14 @@ #[cfg(test)] mod test { - use rand::seq::SliceRandom; - use rita_common::usage_tracker::get_current_hour; - use rita_common::usage_tracker::tests::test::generate_dummy_usage_tracker; + use crate::operator_update::contains_forbidden_key; + use crate::operator_update::prepare_usage_data_for_upload; + use crate::operator_update::update_authorized_keys; use serde_json::json; use serde_json::Value; use std::fs::File; use std::io::{BufRead, BufReader, Write}; use std::{fs, io::Error, path::Path}; - use crate::operator_update::contains_forbidden_key; - use crate::operator_update::process_usage_data; - use crate::operator_update::update_authorized_keys; - const FORBIDDEN_MERGE_VALUES: [&str; 2] = ["test_key", "other_test_key"]; #[test] @@ -136,90 +132,7 @@ mod test { assert!(Path::new(key_file).exists()); } #[test] - fn test_usage_data_processing() { - // this tests the flow used in rita client's operator update loop used to process usage data sent up to ops - let dummy_usage_tracker = generate_dummy_usage_tracker(); - let mut usage_data_client = dummy_usage_tracker.client_bandwidth.clone(); - let mut usage_data_relay = dummy_usage_tracker.relay_bandwidth; - let mut unshuffled_client = usage_data_client.clone(); - let mut unshuffled_relay = usage_data_relay.clone(); - - // Test the sort function first: - // shuffle the data because it's currently ordered - usage_data_client - .make_contiguous() - .shuffle(&mut rand::thread_rng()); - println!( - "Sample of current shuffle is {} {} {}", - usage_data_client.get(0).unwrap().index, - usage_data_client.get(1).unwrap().index, - usage_data_client.get(2).unwrap().index - ); - usage_data_relay - .make_contiguous() - .shuffle(&mut rand::thread_rng()); - // The processing function sorts these lowest index to highest. Note that usage hours are stored to disk as - // the opposite order where newest are added to the front, so this is inefficient. - // Options here to optimize are either a/write my own binary sort again which will compare for the existing structure - // where the saved vecdeque is highest index to lowest index, b/rework usage tracker so that we save data lowest index - // to highest index, or c/the current solution(inefficient, as we will be fully reversing the whole vecdeque of each - // client and relay at least once per hour on rollover): sort the entire list in reverse order to use with the builtin - // bin search from vecdeque - - // for this purpose our last seen will be start hour - 10. - let current_hour = get_current_hour().unwrap(); - let last_seen_hour = current_hour - 10; - let res_usage = process_usage_data( - usage_data_client.clone(), - usage_data_relay.clone(), - last_seen_hour, - current_hour, - ) - .unwrap(); - let res_usage_client = res_usage.client_bandwidth; - let res_usage_relay = res_usage.relay_bandwidth; - - // check that the sorting in process_usage_data is correct after shuffling and sending it through - assert!( - res_usage_relay.get(0).unwrap().index < res_usage_relay.get(1).unwrap().index - && res_usage_relay.get(1).unwrap().index < res_usage_relay.get(2).unwrap().index - ); - assert!( - res_usage_client.get(0).unwrap().index < res_usage_client.get(1).unwrap().index - && res_usage_client.get(1).unwrap().index < res_usage_client.get(2).unwrap().index - ); - // check that the binary searching is correct: we did not remove any entries from usage client; so we should have started exactly - // from the last seen hour. we removed the last seen from usage relay, so we should expect to see our earliest hour as one fewer - assert!(res_usage_client.get(0).unwrap().index == last_seen_hour); - assert!(res_usage_relay.get(0).unwrap().index == last_seen_hour); - - // now check that same thing, but in case we have a gap in the data. we'll remove the entry for the last_seen_hour from usage_data_relay - // to make sure we are successfully returning the next earliest hour (in our case, the last seen -1). we use res_usage client and relay - // because to remove the correct entry it needs to be presorted (if we've gotten here it's guaranteed.) - // we successfully search for the entry or return the next one down. - unshuffled_client.remove(unshuffled_client.len() - 11); - unshuffled_relay.remove(unshuffled_relay.len() - 11); - // so the index of our last seen hour if we say last seen is current - 10... will be at len - 11. - let res_usage = process_usage_data( - unshuffled_client, - unshuffled_relay, - last_seen_hour, - current_hour, - ) - .unwrap(); - let res_usage_client = res_usage.client_bandwidth; - let res_usage_relay = res_usage.relay_bandwidth; - // after processing we should start at last seen - 1. - println!( - "{:?} last seen {:?}", - res_usage_relay.get(0).unwrap().index, - last_seen_hour - ); - assert!(res_usage_relay.get(0).unwrap().index == last_seen_hour - 1); - assert!(res_usage_client.get(0).unwrap().index == last_seen_hour - 1); - - // check that our iteration function does indeed stop at a month of data: - assert!(res_usage_client.len() <= 730); - assert!(res_usage_relay.len() <= 730); + fn test_prepare_usage_data_for_upload() { + assert_eq!(prepare_usage_data_for_upload(None).unwrap(), None); } } diff --git a/rita_client/src/operator_update/update_loop.rs b/rita_client/src/operator_update/update_loop.rs index 599e14672..eab96cca4 100644 --- a/rita_client/src/operator_update/update_loop.rs +++ b/rita_client/src/operator_update/update_loop.rs @@ -43,7 +43,8 @@ pub fn start_operator_update_loop() { wait_unti_next_update = max(wait_unti_next_update / 2, TARGET_UPDATE_FREQUENCY); } - Err(()) => { + Err(e) => { + error!("Ops checkin failed with {:?}!", e); // failed checkin, backoff with a random multiplier the goal of random backoff // is to prevent collisions wait_unti_next_update = min( diff --git a/rita_client/src/traffic_watcher/mod.rs b/rita_client/src/traffic_watcher/mod.rs index c2d86702a..cd3021c32 100644 --- a/rita_client/src/traffic_watcher/mod.rs +++ b/rita_client/src/traffic_watcher/mod.rs @@ -32,9 +32,9 @@ use num_traits::identities::Zero; use rita_common::debt_keeper::{ traffic_replace, traffic_update, wgkey_insensitive_traffic_update, Traffic, }; +use rita_common::usage_tracker::structs::UsageType; use rita_common::usage_tracker::update_usage_data; use rita_common::usage_tracker::UpdateUsage; -use rita_common::usage_tracker::UsageType; use rita_common::KI; use std::collections::HashMap; use std::net::IpAddr; diff --git a/rita_common/src/network_endpoints/mod.rs b/rita_common/src/network_endpoints/mod.rs index 6aa2410ce..cf60a40db 100644 --- a/rita_common/src/network_endpoints/mod.rs +++ b/rita_common/src/network_endpoints/mod.rs @@ -37,7 +37,7 @@ pub async fn make_payments_v2(item: Json>) -> HttpResponse { let mut build_err = String::new(); for pmt in pmt_list { let ts = ToValidate { - payment: pmt.clone(), + payment: pmt, received: Instant::now(), checked: false, }; diff --git a/rita_common/src/payment_controller/mod.rs b/rita_common/src/payment_controller/mod.rs index 0cd7cfa14..2c1e6de34 100644 --- a/rita_common/src/payment_controller/mod.rs +++ b/rita_common/src/payment_controller/mod.rs @@ -311,7 +311,7 @@ async fn make_althea_payment( // setup tx hash let pmt = pmt.publish(Uint256::from_str_radix(&transaction.txhash, 16).unwrap()); - send_make_payment_endpoints(pmt.clone(), network_settings, None, Some(cosmos_node_grpc)).await; + send_make_payment_endpoints(pmt, network_settings, None, Some(cosmos_node_grpc)).await; // place this payment in the validation queue to handle later. let ts = ToValidate { @@ -390,7 +390,7 @@ async fn make_xdai_payment( // add published txid to submission let pmt = pmt.publish(tx_id); - send_make_payment_endpoints(pmt.clone(), network_settings, Some(full_node), None).await; + send_make_payment_endpoints(pmt, network_settings, Some(full_node), None).await; // place this payment in the validation queue to handle later. let ts = ToValidate { @@ -470,7 +470,7 @@ async fn send_make_payment_endpoints( // Get all txids to this client. Temporary add new payment to a copy of a list to send up to endpoint // this pmt is actually recorded in memory after validator confirms it let mut txid_history = get_payment_txids(pmt.to); - txid_history.insert(pmt.clone()); + txid_history.insert(pmt); let actix_client = awc::Client::new(); let neigh_ack = actix_client @@ -485,7 +485,7 @@ async fn send_make_payment_endpoints( let resend_info = ResendInfo { neigh_url: neighbor_url.clone(), - pmt: pmt.clone(), + pmt, attempt: 0u8, }; diff --git a/rita_common/src/payment_validator/mod.rs b/rita_common/src/payment_validator/mod.rs index 6171fbd69..93a0dce4c 100644 --- a/rita_common/src/payment_validator/mod.rs +++ b/rita_common/src/payment_validator/mod.rs @@ -290,7 +290,7 @@ fn remove(msg: Remove) { // store successful transactions so that they can't be played back to us, at least // during this session if msg.success { - add_successful_tx(msg.tx.payment.clone()); + add_successful_tx(msg.tx.payment); } if was_present { info!("Transaction {} was removed", msg.tx); @@ -615,7 +615,7 @@ fn handle_tx_messaging_xdai( ) { let from_address = ts.payment.from.eth_address; let amount = ts.payment.amount; - let pmt = ts.payment.clone(); + let pmt = ts.payment; let our_address = settings::get_rita_common() .payment .eth_address @@ -709,7 +709,7 @@ fn handle_tx_messaging_xdai( ); // update the usage tracker with the details of this payment - update_payments(pmt.clone()); + update_payments(pmt); // Store this payment as a receipt to send in the future if this receiver doesnt see the payment store_payment(pmt); @@ -737,7 +737,7 @@ fn handle_tx_messaging_xdai( /// Handles the tx response from the full node and it's various cases /// pulled out of validate_transaction purely for cosmetic reasons fn handle_tx_messaging_althea(transaction: TransactionDetails, ts: ToValidate) { - let pmt = ts.payment.clone(); + let pmt = ts.payment; // txid is for eth chain and txhash is for althea chain, only one of these should be // Some(..). This was verified before @@ -838,7 +838,7 @@ fn handle_tx_messaging_althea(transaction: TransactionDetails, ts: ToValidate) { denom.expect("How did this happen when we already verified existence"), ); // update the usage tracker with the details of this payment - update_payments(pmt.clone()); + update_payments(pmt); // Store this payment as a receipt to send in the future if this receiver doesnt see the payment store_payment(pmt); @@ -970,8 +970,8 @@ mod tests { txid: 1u8.into(), }; - store_payment(pmt1.clone()); - sent_hashset.insert(pmt1.clone()); + store_payment(pmt1); + sent_hashset.insert(pmt1); assert_eq!(get_payment_txids(pmt1.to), sent_hashset); let pmt2 = PaymentTx { @@ -980,9 +980,9 @@ mod tests { amount: 100u8.into(), txid: 2u8.into(), }; - store_payment(pmt2.clone()); + store_payment(pmt2); - sent_hashset.insert(pmt2.clone()); + sent_hashset.insert(pmt2); assert_eq!(get_payment_txids(pmt2.to), sent_hashset); let pmt3 = PaymentTx { @@ -992,7 +992,7 @@ mod tests { txid: 2u8.into(), }; - store_payment(pmt3.clone()); + store_payment(pmt3); assert_eq!(get_payment_txids(pmt3.to), sent_hashset); } diff --git a/rita_common/src/traffic_watcher/mod.rs b/rita_common/src/traffic_watcher/mod.rs index 925f3ba37..775b8a3fb 100644 --- a/rita_common/src/traffic_watcher/mod.rs +++ b/rita_common/src/traffic_watcher/mod.rs @@ -5,9 +5,9 @@ use crate::debt_keeper::traffic_update; use crate::debt_keeper::Traffic; use crate::tunnel_manager::Neighbor; +use crate::usage_tracker::structs::UsageType; use crate::usage_tracker::update_usage_data; use crate::usage_tracker::UpdateUsage; -use crate::usage_tracker::UsageType; use crate::RitaCommonError; use crate::KI; use althea_kernel_interface::open_tunnel::is_link_local; diff --git a/rita_common/src/usage_tracker/mod.rs b/rita_common/src/usage_tracker/mod.rs index d442c3dc4..3ba8226fb 100644 --- a/rita_common/src/usage_tracker/mod.rs +++ b/rita_common/src/usage_tracker/mod.rs @@ -6,32 +6,33 @@ use crate::rita_loop::write_to_disk::is_router_storage_small; use crate::RitaCommonError; -use althea_types::Identity; +use althea_types::convert_flat_to_map_usage_data; +use althea_types::convert_map_to_flat_usage_data; +use althea_types::user_info::Usage; +use althea_types::IndexedUsageHour; use althea_types::PaymentTx; use bincode::Error as BincodeError; use flate2::read::ZlibDecoder; use flate2::write::ZlibEncoder; use flate2::Compression; -use num256::Uint256; -use serde::{Deserialize, Serialize}; -use serde_json::Error as JsonError; -use settings::set_rita_common; +use std::collections::HashMap; use std::collections::HashSet; use std::collections::VecDeque; use std::fs::File; -use std::hash::Hash; use std::io::Error as IOError; use std::io::ErrorKind; use std::io::Read; use std::io::Write; use std::path::Path; -use std::path::PathBuf; use std::sync::Arc; use std::sync::RwLock; +use std::time::Instant; use std::time::SystemTime; use std::time::UNIX_EPOCH; use std::usize; +use structs::*; +pub mod structs; pub mod tests; /// one year worth of usage storage @@ -48,184 +49,99 @@ const MAX_TX_ENTRIES: usize = 5_000; pub const MINIMUM_NUMBER_OF_TRANSACTIONS_LARGE_STORAGE: usize = 5; pub const MINIMUM_NUMBER_OF_TRANSACTIONS_SMALL_STORAGE: usize = 75; -lazy_static! { - static ref USAGE_TRACKER: Arc> = - Arc::new(RwLock::new(UsageTracker::load_from_disk())); -} +/// The maximum amount of usage data that may be unsaved before we save out to the disk +pub const MAX_UNSAVED_USAGE: u64 = 10 * 1000u64.pow(3); -/// In an effort to converge this module between the three possible bw tracking -/// use cases this enum is used to identify which sort of usage we are tracking -#[derive(Clone, Copy, Debug, Serialize, Deserialize)] -#[allow(dead_code)] -pub enum UsageType { - Client, - Relay, - Exit, +/// Just a storage wrapper for the usage tracker lazy static, to wrap the persisted data +/// (usage tracker) and the non persistated data (throughput tracker) without using raw tuple +pub struct UsageTrackerWrapper { + usage_tracker: UsageTrackerStorage, + throughtput_tracker: ThroughputTracker, } -/// A struct for tracking each hour of usage, indexed by time in hours since -/// the unix epoch -#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq)] -pub struct UsageHour { - pub index: u64, - pub up: u64, - pub down: u64, - pub price: u32, -} - -/// A version of payment tx with a string txid so that the formatting is correct -/// for display to users. -#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Hash, Clone)] -pub struct FormattedPaymentTx { - pub to: Identity, - pub from: Identity, - pub amount: Uint256, - // should always be populated in this case - pub txid: String, - // TODO add "payment_type" here which will allow the frontend - // to easily tell what this payment is for and prevent the need - // for hacky classification +impl Default for UsageTrackerWrapper { + fn default() -> Self { + Self::new() + } } -fn to_formatted_payment_tx(input: PaymentTx) -> FormattedPaymentTx { - let txid = input.txid; - FormattedPaymentTx { - to: input.to, - from: input.from, - amount: input.amount, - txid: format!("{txid:#066x}"), +impl UsageTrackerWrapper { + pub fn new() -> UsageTrackerWrapper { + UsageTrackerWrapper { + usage_tracker: UsageTrackerStorage::load_from_disk(), + throughtput_tracker: ThroughputTracker::default(), + } } } -/// A struct for tracking each hours of payments indexed in hours since unix epoch -#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] -pub struct PaymentHour { - index: u64, - payments: Vec, +lazy_static! { + static ref USAGE_TRACKER_STORAGE: Arc> = + Arc::new(RwLock::new(UsageTrackerWrapper::new())); } -/// The main actor that holds the usage state for the duration of operations -/// at some point loading and saving will be defined in service started - -#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] -pub struct UsageTracker { - pub last_save_hour: u64, - // at least one of these will be left unused - pub client_bandwidth: VecDeque, - pub relay_bandwidth: VecDeque, - pub exit_bandwidth: VecDeque, - /// A history of payments - pub payments: VecDeque, +/// Utility function that grabs usage tracker from it's lock and +/// saves it out. Should be called when we want to save anywhere outside this file +pub fn save_usage_to_disk() { + match USAGE_TRACKER_STORAGE.write().unwrap().usage_tracker.save() { + Ok(_val) => info!("Saved usage tracker successfully"), + Err(e) => warn!("Unable to save usage tracker {:}", e), + }; } -impl UsageTracker { - /// This function is run on startup and removes duplicate entries from usage history - /// This function is run on startup and removes duplicate entries from usage history - pub fn remove_duplicate_and_invalid_payment_entires(mut self) -> UsageTracker { - let mut duplicate_list = HashSet::new(); - let mut payments = self.payments.clone(); - for hour in payments.iter_mut() { - let mut payments = Vec::new(); - for p in hour.payments.iter() { - let txid: Uint256 = match p.txid.parse() { - Ok(tx) => tx, - Err(_) => { - warn!("Removed invalid payment! {}", p.txid); - // found an error, removing - continue; - } - }; - if duplicate_list.contains(&txid) { - // found a duplicate, removing - } else { - duplicate_list.insert(txid); - payments.push(p.clone()) - } - } - // insert filtered payments list - hour.payments = payments; - } - self.payments = payments; - self - } - - pub fn get_txids(&self) -> HashSet { - let mut set = HashSet::new(); - for hour in &self.payments { - for p in &hour.payments { - let txid: Uint256 = match p.txid.parse() { - Ok(t) => t, - Err(_) => { - error!("Invalid tx id in usage tracker {}", p.txid); - continue; - } - }; - set.insert(txid); - } - } - set +/// Helps determine how often we write out to the disk on different devices by setting a device specific mininum +/// number of transactions before saving +pub fn get_minimum_number_of_transactions_to_store() -> usize { + let settings = settings::get_rita_common(); + if is_router_storage_small( + &settings + .network + .device + .unwrap_or_else(|| "x86_64".to_string()), + ) { + MINIMUM_NUMBER_OF_TRANSACTIONS_SMALL_STORAGE + } else { + MINIMUM_NUMBER_OF_TRANSACTIONS_LARGE_STORAGE } } -/// This function checks to see how many bytes were used -/// and if the amount used is not greater than 10gb than -/// it will return false. Essentially, it's checking to make -/// sure that there is usage within the router. -fn check_usage_hour(input: &VecDeque, last_save_hour: u64) -> bool { - let mut input = input.clone(); - let number_of_bytes = 10000; - let mut total_used_bytes = 0; - while !input.is_empty() { - match input.pop_front() { - Some(val) => { - total_used_bytes += val.up; - if val.index < last_save_hour { - return total_used_bytes < number_of_bytes; +impl UsageTrackerStorage { + /// This function checks to see how many bytes were used + /// and if the amount used is not greater than 10gb than + /// it will return false. Essentially, it's checking to make + /// sure that there is enough usage to be worth saving + pub fn check_unsaved_usage(&self) -> bool { + let mut total_unsaved_bytes = 0; + let v = vec![ + &self.client_bandwidth, + &self.relay_bandwidth, + &self.exit_bandwidth, + ]; + for i in v { + for (index, usage) in i { + if *index > self.last_save_hour { + total_unsaved_bytes += usage.up; + total_unsaved_bytes += usage.down; } } - None => break, } + total_unsaved_bytes > MAX_UNSAVED_USAGE } - false -} -fn at_least_two(a: bool, b: bool, c: bool) -> bool { - if a { - b || c - } else { - b && c + /// Returns true if the numberof unsaved payments is greater than the mininum number of transactions to store + pub fn check_unsaved_payments(&self) -> bool { + let mut total_num_unsaved_payments = 0; + for p in self.payments.iter() { + if p.index > self.last_save_hour { + total_num_unsaved_payments += 1; + } + } + total_num_unsaved_payments > get_minimum_number_of_transactions_to_store() } -} -/// Utility function that grabs usage tracker from it's lock and -/// saves it out. Should be called when we want to save anywhere outside this file -pub fn save_usage_to_disk() { - match USAGE_TRACKER.write().unwrap().save() { - Ok(_val) => info!("Saved usage tracker successfully"), - Err(e) => warn!("Unable to save usage tracker {:}", e), - }; -} - -impl UsageTracker { pub fn save(&mut self) -> Result<(), RitaCommonError> { let settings = settings::get_rita_common(); - let minimum_number_of_transactions = if is_router_storage_small( - &settings - .network - .device - .unwrap_or_else(|| "x86_64".to_string()), - ) { - MINIMUM_NUMBER_OF_TRANSACTIONS_SMALL_STORAGE - } else { - MINIMUM_NUMBER_OF_TRANSACTIONS_LARGE_STORAGE - }; - if self.payments.len() < minimum_number_of_transactions - || at_least_two( - check_usage_hour(&self.client_bandwidth, self.last_save_hour), - check_usage_hour(&self.exit_bandwidth, self.last_save_hour), - check_usage_hour(&self.relay_bandwidth, self.last_save_hour), - ) - { + + if self.check_unsaved_payments() || self.check_unsaved_usage() { return Err(RitaCommonError::StdError(IOError::new( ErrorKind::Other, "Too little data for writing", @@ -257,7 +173,9 @@ impl UsageTracker { // 500 tx min. Payment data is trimmed if out of space as it is larger than usage data if newsize >= 1000 { newsize /= 2; - trim_payments(newsize, &mut self.payments); + while self.payments.len() > newsize { + self.remove_oldest_payment_history_entry() + } let serialized = bincode::serialize(self)?; compressed_bytes = match compress_serialized(serialized) { Ok(bytes) => bytes, @@ -277,14 +195,14 @@ impl UsageTracker { /// struct will be returned so data can be successfully collected from the present moment forward. /// /// TODO remove in beta 21 migration code migrates json serialized data to bincode - fn load_from_disk() -> UsageTracker { + fn load_from_disk() -> UsageTrackerStorage { // if the loading process goes wrong for any reason, we just start again - let blank_usage_tracker = UsageTracker { + let blank_usage_tracker = UsageTrackerStorage { + client_bandwidth: HashMap::new(), + relay_bandwidth: HashMap::new(), + exit_bandwidth: HashMap::new(), + payments: HashSet::new(), last_save_hour: 0, - client_bandwidth: VecDeque::new(), - relay_bandwidth: VecDeque::new(), - exit_bandwidth: VecDeque::new(), - payments: VecDeque::new(), }; let file_path = settings::get_rita_common().network.usage_tracker_file; @@ -304,49 +222,43 @@ impl UsageTracker { Err(_e) => return blank_usage_tracker, }; - let res = match ( + match ( file_exists, - try_bincode(&unzipped_bytes), - try_json(&unzipped_bytes), + try_bincode_new(&unzipped_bytes), + try_bincode_old(&unzipped_bytes), ) { - // file exists and bincode deserialization was successful, in the case that somehow json deserialization of the same - // data was also successful just ignore it and use bincode + // file exists and bincode deserialization was successful, ignore all other possibilities (true, Ok(bincode_tracker), _) => bincode_tracker, - //file exists, but bincode deserialization failed -> load using serde (old), update settings and save file - (true, Err(_e), Ok(mut json_tracker)) => { - let mut settings = settings::get_rita_common(); - // save with bincode regardless of result of serde deserialization in order to end reliance on json - let old_path = PathBuf::from(settings.network.usage_tracker_file); - - let mut new_path = old_path.clone(); - new_path.set_extension("bincode"); - - settings.network.usage_tracker_file = - new_path.clone().into_os_string().into_string().unwrap(); - set_rita_common(settings); - - match json_tracker.save() { - Ok(()) => { - // delete the old file after successfully migrating, this may cause problems on routers with - // low available storage space since we want to take up space for both the new and old file - if !(old_path.eq(&new_path)) { - // check that we would not be deleting the file just saved to - let _r = std::fs::remove_file(old_path); - } else { - error!( - "We are trying to save over {:?} with {:?}, how are they same?", - old_path, new_path - ) + // file exists, up to date encoding failed, beta 20 encoding succeeded + (true, Err(_), Ok(bincode_tracker)) => UsageTrackerStorage { + last_save_hour: bincode_tracker.last_save_hour, + client_bandwidth: convert_flat_to_map_usage_data(bincode_tracker.client_bandwidth), + relay_bandwidth: convert_flat_to_map_usage_data(bincode_tracker.relay_bandwidth), + exit_bandwidth: convert_flat_to_map_usage_data(bincode_tracker.exit_bandwidth), + payments: { + let mut out = HashSet::new(); + for ph in bincode_tracker.payments { + for p in ph.payments { + match p.txid.parse() { + Ok(txid) => { + out.insert(UsageTrackerPayment { + to: p.to, + from: p.from, + amount: p.amount, + txid, + index: ph.index, + }); + } + Err(e) => error!( + "Failed to convert payment with txid {:?} discarding!", + e + ), + } } - json_tracker } - Err(e) => { - error!("Failed to save UsageTracker to bincode {:?}", e); - json_tracker - } - } - } - + out + }, + }, // file does not exist; no data to load, this is probably a new router // and we'll just generate a new file (false, _, _) => blank_usage_tracker, @@ -360,8 +272,7 @@ impl UsageTracker { ); blank_usage_tracker } - }; - res.remove_duplicate_and_invalid_payment_entires() + } } } @@ -385,14 +296,14 @@ fn decompressed(mut file: File) -> Result, RitaCommonError> { } /// Attempts to deserialize the provided array of bytes as a bincode encoded UsageTracker struct -fn try_bincode(bytes: &[u8]) -> Result { - let deserialized: Result = bincode::deserialize(bytes); +fn try_bincode_new(bytes: &[u8]) -> Result { + let deserialized: Result = bincode::deserialize(bytes); deserialized } -/// Attempts to deserialize the provided array of bytes as a json encoded UsageTracker struct -fn try_json(bytes: &[u8]) -> Result { - let deserialized: Result = serde_json::from_slice(bytes); +/// Attempts to deserialize the provided array of bytes as a bincode encoded UsageTracker struct +fn try_bincode_old(bytes: &[u8]) -> Result { + let deserialized: Result = bincode::deserialize(bytes); deserialized } @@ -430,53 +341,76 @@ pub fn update_usage_data(msg: UpdateUsage) { } }; - process_usage_update(curr_hour, msg, &mut (USAGE_TRACKER.write().unwrap())); + let mut usage_tracker = USAGE_TRACKER_STORAGE.write().unwrap(); + + usage_tracker + .usage_tracker + .process_usage_update(curr_hour, msg); + usage_tracker.throughtput_tracker.process_usage_update(msg); } -fn process_usage_update(current_hour: u64, msg: UpdateUsage, data: &mut UsageTracker) { - // history contains a reference to whatever the correct storage array is - let history = match msg.kind { - UsageType::Client => &mut data.client_bandwidth, - UsageType::Relay => &mut data.relay_bandwidth, - UsageType::Exit => &mut data.exit_bandwidth, - }; - // we grab the front entry from the VecDeque, if there is an entry one we check if it's - // up to date, if it is we add to it, if it's not or there is no entry we create one. - // note that price is only sampled once per hour. - match history.front_mut() { - None => history.push_front(UsageHour { - index: current_hour, - up: msg.up, - down: msg.down, - price: msg.price, - }), - Some(entry) => { - if entry.index == current_hour { - entry.up += msg.up; - entry.down += msg.down; - } else { - history.push_front(UsageHour { - index: current_hour, - up: msg.up, - down: msg.down, - price: msg.price, - }) +impl ThroughputTracker { + fn process_usage_update(&mut self, msg: UpdateUsage) { + let data = match msg.kind { + UsageType::Client => &mut self.client, + UsageType::Relay => &mut self.relay, + UsageType::Exit => &mut self.exit, + }; + if let Some(last_sample_time) = data.last_sample_time { + // make sure we don't panic if the clock goes backwards + if Instant::now() > last_sample_time { + data.duration_of_this_sample = Instant::now() - last_sample_time } } - } - while history.len() > MAX_USAGE_ENTRIES { - let _discarded_entry = history.pop_back(); + data.last_sample_time = Some(Instant::now()); + // for clients all traffic is measured for relays the up and down values + // will always be double the actual throughput since we're pulling in and forwarding + // the same data at once. + match msg.kind { + UsageType::Client => data.bytes_used = msg.up + msg.down, + UsageType::Relay | UsageType::Exit => data.bytes_used = msg.up, + } } } -fn trim_payments(size: usize, history: &mut VecDeque) { - while history.len() > size { - let _discarded_entry = history.pop_back(); +impl UsageTrackerStorage { + fn process_usage_update(&mut self, current_hour: u64, msg: UpdateUsage) { + // history contains a reference to whatever the correct storage array is + let history = match msg.kind { + UsageType::Client => &mut self.client_bandwidth, + UsageType::Relay => &mut self.relay_bandwidth, + UsageType::Exit => &mut self.exit_bandwidth, + }; + // we grab the front entry from the VecDeque, if there is an entry one we check if it's + // up to date, if it is we add to it, if it's not or there is no entry we create one. + // note that price is only sampled once per hour. + match history.get_mut(¤t_hour) { + None => { + history.insert( + current_hour, + Usage { + up: msg.up, + down: msg.down, + price: msg.price, + }, + ); + } + Some(entry) => { + entry.up += msg.up; + entry.down += msg.down; + } + } + while history.len() > MAX_USAGE_ENTRIES { + let smallest_key = history.keys().min_by(|a, b| a.cmp(b)).cloned(); + if let Some(smallest_key) = smallest_key { + history.remove(&smallest_key); + } + } } } pub fn update_payments(payment: PaymentTx) { - let history = &mut (USAGE_TRACKER.write().unwrap()); + let mut history = USAGE_TRACKER_STORAGE.write().unwrap(); // This handles the following edge case: // Router A is paying router B. Router B reboots and loses all data in @@ -484,66 +418,94 @@ pub fn update_payments(payment: PaymentTx) { // already been accounted for get counted twice. // This checks the usage history to see if this tx exists // thereby preventing the above case. - if history.get_txids().contains(&payment.txid) { + if history.usage_tracker.get_txids().contains(&payment.txid) { error!("Tried to insert duplicate txid into usage tracker!"); return; } - handle_payments(history, &payment); + history.usage_tracker.handle_payments(&payment); } -/// Internal handler function that deals with adding a payment to the list -/// and saving if required -fn handle_payments(history: &mut UsageTracker, payment: &PaymentTx) { - let current_hour = match get_current_hour() { - Ok(hour) => hour, - Err(e) => { - error!("System time is set earlier than unix epoch! {:?}", e); - return; - } - }; - let formatted_payment = to_formatted_payment_tx(payment.clone()); - match history.payments.front_mut() { - None => history.payments.push_front(PaymentHour { - index: current_hour, - payments: vec![formatted_payment], - }), - Some(entry) => { - if entry.index == current_hour { - entry.payments.push(formatted_payment); - } else { - history.payments.push_front(PaymentHour { - index: current_hour, - payments: vec![formatted_payment], - }) +impl UsageTrackerStorage { + /// Internal handler function that deals with adding a payment to the list + /// and saving if required + fn handle_payments(&mut self, payment: &PaymentTx) { + let current_hour = match get_current_hour() { + Ok(hour) => hour, + Err(e) => { + error!("System time is set earlier than unix epoch! {:?}", e); + return; } + }; + let formatted_payment = UsageTrackerPayment::from_payment_tx(*payment, current_hour); + self.payments.insert(formatted_payment); + + while self.payments.len() > MAX_TX_ENTRIES { + self.remove_oldest_payment_history_entry() } } - while history.payments.len() > MAX_TX_ENTRIES { - let _discarded_entry = history.payments.pop_back(); + + /// Removes a single tx from the payment history entry, oldest first + fn remove_oldest_payment_history_entry(&mut self) { + let oldest = self + .payments + .iter() + .min_by(|a, b| a.index.cmp(&b.index)) + .cloned(); + if let Some(oldest) = oldest { + self.payments.remove(&oldest); + } + } +} + +/// Returns current throughput in bytes per second, or none if it is not yet available +pub fn get_current_throughput(kind: UsageType) -> Option { + let usage_tracker_var = USAGE_TRACKER_STORAGE.read().unwrap(); + let data = match kind { + UsageType::Client => usage_tracker_var.throughtput_tracker.client, + UsageType::Relay => usage_tracker_var.throughtput_tracker.relay, + UsageType::Exit => usage_tracker_var.throughtput_tracker.exit, + }; + if data.last_sample_time.is_some() && data.duration_of_this_sample.as_secs() > 0 { + Some(data.bytes_used / data.duration_of_this_sample.as_secs()) + } else { + // no data yet + None } } /// Gets usage data for this router, stored on the local disk at periodic intervals -pub fn get_usage_data(kind: UsageType) -> VecDeque { - let usage_tracker_var = &*(USAGE_TRACKER.write().unwrap()); +pub fn get_usage_data_map(kind: UsageType) -> HashMap { + let usage_tracker_var = USAGE_TRACKER_STORAGE.read().unwrap(); + match kind { - UsageType::Client => usage_tracker_var.client_bandwidth.clone(), - UsageType::Relay => usage_tracker_var.relay_bandwidth.clone(), - UsageType::Exit => usage_tracker_var.exit_bandwidth.clone(), + UsageType::Client => usage_tracker_var.usage_tracker.client_bandwidth.clone(), + UsageType::Relay => usage_tracker_var.usage_tracker.relay_bandwidth.clone(), + UsageType::Exit => usage_tracker_var.usage_tracker.exit_bandwidth.clone(), } } +/// Gets usage data for this router, stored on the local disk at periodic intervals +pub fn get_usage_data(kind: UsageType) -> VecDeque { + let usage_tracker_var = USAGE_TRACKER_STORAGE.read().unwrap(); + let data = match kind { + UsageType::Client => usage_tracker_var.usage_tracker.client_bandwidth.clone(), + UsageType::Relay => usage_tracker_var.usage_tracker.relay_bandwidth.clone(), + UsageType::Exit => usage_tracker_var.usage_tracker.exit_bandwidth.clone(), + }; + convert_map_to_flat_usage_data(data) +} + /// Gets the last saved usage hour from the existing usage tracker pub fn get_last_saved_usage_hour() -> u64 { - let usage_tracker = &*(USAGE_TRACKER.read().unwrap()); - usage_tracker.last_save_hour + let usage_tracker = USAGE_TRACKER_STORAGE.read().unwrap(); + usage_tracker.usage_tracker.last_save_hour } /// Gets payment data for this router, stored on the local disk at periodic intervals pub fn get_payments_data() -> VecDeque { - let usage_tracker_var = &*(USAGE_TRACKER.read().unwrap()); - usage_tracker_var.payments.clone() + let usage_tracker_var = USAGE_TRACKER_STORAGE.read().unwrap(); + convert_payment_set_to_payment_hour(usage_tracker_var.usage_tracker.payments.clone()) } /// On an interupt (SIGTERM), saving USAGE_TRACKER before exiting, this is essentially diff --git a/rita_common/src/usage_tracker/structs.rs b/rita_common/src/usage_tracker/structs.rs new file mode 100644 index 000000000..36d8f3ebe --- /dev/null +++ b/rita_common/src/usage_tracker/structs.rs @@ -0,0 +1,182 @@ +use althea_types::{Identity, IndexedUsageHour, PaymentTx, Usage}; +use num256::Uint256; +use std::collections::{HashMap, HashSet, VecDeque}; +use std::hash::Hash; +use std::time::{Duration, Instant}; + +/// This struct is used to estimate the current throughput of a given router, it is not saved out to the disk +#[derive(Clone, Copy, Default)] +pub struct ThroughputTracker { + pub client: ThroughputData, + pub relay: ThroughputData, + pub exit: ThroughputData, +} + +#[derive(Clone, Copy, Default)] +pub struct ThroughputData { + /// When the current data in this struct was last updated + pub last_sample_time: Option, + /// the duration between the previous and current value of last_sample_time + /// used to convert the bytes used above into a rate per second + pub duration_of_this_sample: Duration, + pub bytes_used: u64, +} + +/// A struct for tracking each hours of payments indexed in hours since unix epoch +/// used to send to the frontend +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] +pub struct PaymentHour { + pub index: u64, + pub payments: Vec, +} + +/// Old usage tracker struct used by versions up to Beta 20 rc29 and Beta 21 rc2 +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct UsageTrackerStorageOld { + pub last_save_hour: u64, + // at least one of these will be left unused + pub client_bandwidth: VecDeque, + pub relay_bandwidth: VecDeque, + pub exit_bandwidth: VecDeque, + /// A history of payments + pub payments: VecDeque, +} + +/// Usage tracker data storage, stores information about the data usage of this +/// Rita process +#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] +pub struct UsageTrackerStorage { + /// The last time this struct was saved to the disk + pub last_save_hour: u64, + // at least one of these will be left unused + /// client bandwidth usage per hour indexd by unix timestamp in hours + pub client_bandwidth: HashMap, + /// relay bandwidth usage per hour indexd by unix timestamp in hours + pub relay_bandwidth: HashMap, + /// exit bandwidth usage per hour indexd by unix timestamp in hours + pub exit_bandwidth: HashMap, + /// A history of payments + pub payments: HashSet, +} + +impl UsageTrackerStorage { + pub fn get_txids(&self) -> HashSet { + let mut set = HashSet::new(); + for p in &self.payments { + set.insert(p.txid); + } + set + } +} + +/// In an effort to converge this module between the three possible bw tracking +/// use cases this enum is used to identify which sort of usage we are tracking +#[derive(Clone, Copy, Debug, Serialize, Deserialize)] +#[allow(dead_code)] +pub enum UsageType { + Client, + Relay, + Exit, +} + +/// A version of payment tx with a string txid so that the formatting is correct +/// for display to users. +#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Hash, Clone)] +pub struct FormattedPaymentTxOld { + pub to: Identity, + pub from: Identity, + pub amount: Uint256, + pub txid: String, +} + +impl From for FormattedPaymentTxOld { + fn from(input: PaymentTx) -> Self { + let txid = input.txid; + FormattedPaymentTxOld { + to: input.to, + from: input.from, + amount: input.amount, + txid: format!("{txid:#066x}"), + } + } +} + +impl From for FormattedPaymentTxOld { + fn from(value: UsageTrackerPayment) -> Self { + let txid = value.txid; + FormattedPaymentTxOld { + to: value.to, + from: value.from, + amount: value.amount, + txid: format!("{txid:#066x}"), + } + } +} + +/// A version of payment tx with a string txid so that the formatting is correct +/// for display to users. +#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone)] +pub struct UsageTrackerPayment { + pub to: Identity, + pub from: Identity, + pub amount: Uint256, + pub txid: Uint256, + /// the unix timestamp in hours of when this payment occured. + pub index: u64, +} + +impl Ord for UsageTrackerPayment { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.index.cmp(&other.index) + } +} + +impl PartialOrd for UsageTrackerPayment { + fn partial_cmp(&self, other: &Self) -> Option { + self.index.partial_cmp(&other.index) + } +} + +impl Hash for UsageTrackerPayment { + fn hash(&self, state: &mut H) { + // hash all values except the timestamp + self.to.hash(state); + self.from.hash(state); + self.amount.hash(state); + self.txid.hash(state); + } +} + +impl UsageTrackerPayment { + pub fn from_payment_tx(input: PaymentTx, index: u64) -> UsageTrackerPayment { + UsageTrackerPayment { + to: input.to, + from: input.from, + amount: input.amount, + txid: input.txid, + index, + } + } +} + +pub fn convert_payment_set_to_payment_hour( + input: HashSet, +) -> VecDeque { + let mut intermediate = HashMap::new(); + for ph in input { + match intermediate.get_mut(&ph.index) { + None => { + intermediate.insert(ph.index, vec![ph]); + } + Some(val) => val.push(ph), + } + } + let mut out = VecDeque::new(); + for (h, phs) in intermediate { + out.push_back(PaymentHour { + index: h, + payments: phs.into_iter().map(|v| v.into()).collect(), + }) + } + out +} diff --git a/rita_common/src/usage_tracker/tests.rs b/rita_common/src/usage_tracker/tests.rs index 8b911a18c..51066788a 100644 --- a/rita_common/src/usage_tracker/tests.rs +++ b/rita_common/src/usage_tracker/tests.rs @@ -2,31 +2,42 @@ #[allow(unused)] pub mod test { + use crate::usage_tracker::structs::{ + convert_payment_set_to_payment_hour, UsageTrackerPayment, UsageTrackerStorageOld, + }; use crate::usage_tracker::{ - get_current_hour, FormattedPaymentTx, IOError, PaymentHour, UsageHour, UsageTracker, - MAX_USAGE_ENTRIES, MINIMUM_NUMBER_OF_TRANSACTIONS_LARGE_STORAGE, + get_current_hour, IOError, PaymentHour, Usage, UsageTrackerStorage, MAX_USAGE_ENTRIES, + MINIMUM_NUMBER_OF_TRANSACTIONS_LARGE_STORAGE, }; - use althea_types::Identity; + use crate::RitaCommonError; + use althea_types::{convert_map_to_flat_usage_data, Identity, UnpublishedPaymentTx}; use flate2::write::ZlibEncoder; use flate2::Compression; use rand::{thread_rng, Rng}; use settings::client::RitaClientSettings; use settings::{get_rita_common, set_rita_client, set_rita_common}; - use std::collections::VecDeque; + use std::collections::{HashMap, HashSet, VecDeque}; use std::convert::TryInto; use std::fs::File; use std::io::Write; #[cfg(test)] - impl UsageTracker { - // previous implementation of save which uses serde_json to serialize - fn save2(&self) -> Result<(), IOError> { - let serialized = serde_json::to_vec(self)?; + impl UsageTrackerStorage { + // previous implementation of save which uses the old struct to serialize + fn save2(&self) -> Result<(), RitaCommonError> { + let old_struct = UsageTrackerStorageOld { + last_save_hour: self.last_save_hour, + client_bandwidth: convert_map_to_flat_usage_data(self.client_bandwidth.clone()), + relay_bandwidth: convert_map_to_flat_usage_data(self.relay_bandwidth.clone()), + exit_bandwidth: convert_map_to_flat_usage_data(self.exit_bandwidth.clone()), + payments: convert_payment_set_to_payment_hour(self.payments.clone()), + }; + let serialized = bincode::serialize(&old_struct)?; let mut file = File::create(settings::get_rita_common().network.usage_tracker_file)?; let buffer: Vec = Vec::new(); let mut encoder = ZlibEncoder::new(buffer, Compression::default()); encoder.write_all(&serialized)?; let compressed_bytes = encoder.finish()?; - file.write_all(&compressed_bytes) + Ok(file.write_all(&compressed_bytes)?) } } @@ -42,7 +53,7 @@ pub mod test { let res = dummy_usage_tracker.save(); // saving to bincode with the new method info!("Saving test data: {:?}", res); - let res2 = UsageTracker::load_from_disk(); + let res2 = UsageTrackerStorage::load_from_disk(); info!("Loading test data: {:?}", res2); assert_eq!(dummy_usage_tracker, res2); @@ -59,30 +70,22 @@ pub mod test { let rset = RitaClientSettings::new("../settings/test.toml").unwrap(); set_rita_client(rset); let mut newrc = get_rita_common(); - newrc.network.usage_tracker_file = "/tmp/usage_tracker.json".to_string(); + newrc.network.usage_tracker_file = "/tmp/usage_tracker.bincode".to_string(); set_rita_common(newrc); info!("Generating large usage tracker history"); let dummy_usage_tracker = generate_dummy_usage_tracker(); - info!("Saving test data as json"); + info!("Saving test data in old format"); dummy_usage_tracker.save2().unwrap(); - // using load_from_disk() with usage_tracker_file set to a .json writes bincode - // serialized data to a .json extended file, but because load_from_disk() deletes - // the .json file, this test ends with no file left. - info!("Loading test data from json"); - let mut res2 = UsageTracker::load_from_disk(); - - // setting the usage_tracker_file to .bincode, which is what this upgrade expects - let mut newrc2 = get_rita_common(); - newrc2.network.usage_tracker_file = "/tmp/usage_tracker.bincode".to_string(); - set_rita_common(newrc2); + info!("Loading test data from oldformat"); + let mut res2 = UsageTrackerStorage::load_from_disk(); // Saving res2 with the new save() and updated usage_tracker_file in order to end with - // a .bincode file from the loaded json data saved to res2. + // a .bincode file from the old format bincode file. res2.save().unwrap(); info!("Saving test data as bincode"); - let res4 = UsageTracker::load_from_disk(); + let res4 = UsageTrackerStorage::load_from_disk(); info!("Loading test data from bincode"); // use == to avoid printing out the compared data @@ -91,9 +94,9 @@ pub mod test { assert!(res2 == res4); } // generates a nontrivial usage tracker struct for testing - pub fn generate_dummy_usage_tracker() -> UsageTracker { + pub fn generate_dummy_usage_tracker() -> UsageTrackerStorage { let current_hour = get_current_hour().unwrap(); - UsageTracker { + UsageTrackerStorage { last_save_hour: current_hour, client_bandwidth: generate_bandwidth(current_hour), relay_bandwidth: generate_bandwidth(current_hour), @@ -102,34 +105,35 @@ pub mod test { } } // generates dummy usage hour data randomly - fn generate_bandwidth(starting_hour: u64) -> VecDeque { + fn generate_bandwidth(starting_hour: u64) -> HashMap { let num_to_generate: u16 = thread_rng() .gen_range(50..MAX_USAGE_ENTRIES) .try_into() .unwrap(); - let mut output = VecDeque::new(); + let mut output = HashMap::new(); for i in 0..num_to_generate { - output.push_front(UsageHour { - index: starting_hour - i as u64, - up: rand::random(), - down: rand::random(), - price: rand::random(), - }); + output.insert( + i as u64, + Usage { + up: rand::random(), + down: rand::random(), + price: rand::random(), + }, + ); } output } // generates dummy payment data randomly - fn generate_payments(starting_hour: u64) -> VecDeque { + fn generate_payments(starting_hour: u64) -> HashSet { let mut num_to_generate: u8 = rand::random(); while (num_to_generate as usize) < MINIMUM_NUMBER_OF_TRANSACTIONS_LARGE_STORAGE { num_to_generate = rand::random(); } let our_id = random_identity(); let neighbor_ids = get_neighbor_ids(); - let mut output = VecDeque::new(); + let mut output = HashSet::new(); for i in 0..num_to_generate { let num_payments_generate: u8 = rand::random(); - let mut payments = Vec::new(); for _ in 0..num_payments_generate { let neighbor_idx: u8 = rand::random(); let amount: u128 = rand::random(); @@ -140,17 +144,14 @@ pub mod test { (neighbor_ids[neighbor_idx as usize], our_id) }; let txid: u128 = rand::random(); - payments.push(FormattedPaymentTx { + output.insert(UsageTrackerPayment { to, from, amount: amount.into(), - txid: txid.to_string(), - }) + txid: txid.into(), + index: starting_hour - i as u64, + }); } - output.push_front(PaymentHour { - index: starting_hour - i as u64, - payments, - }); } output } diff --git a/rita_exit/src/traffic_watcher/mod.rs b/rita_exit/src/traffic_watcher/mod.rs index 3eeb832a4..4c98ae7a7 100644 --- a/rita_exit/src/traffic_watcher/mod.rs +++ b/rita_exit/src/traffic_watcher/mod.rs @@ -21,9 +21,9 @@ use babel_monitor::structs::Route; use ipnetwork::IpNetwork; use rita_common::debt_keeper::traffic_update; use rita_common::debt_keeper::Traffic; +use rita_common::usage_tracker::structs::UsageType; use rita_common::usage_tracker::update_usage_data; use rita_common::usage_tracker::UpdateUsage; -use rita_common::usage_tracker::UsageType; use std::collections::HashMap; use std::net::IpAddr;