From 05ce08c84a23f94c0336f4860533107f7d7e0e83 Mon Sep 17 00:00:00 2001 From: Justin Kilpatrick Date: Fri, 15 Sep 2023 14:16:40 -0400 Subject: [PATCH 1/3] Refactor UsageTracker This simple double ended iterator based design of UsageTracker has served us pretty well for a long time. We would push to the front a new usage hour and pop old ones off the back. Sadly this format has proven to not be very robust to errors, if for any reason the usage or payments list was to become out of order invalid duplicate data would exist leading to confusing questions about which one to select. We saw this most recently with payments where the make_payments_v2 change caused duplicate payments and incorrect income data to pollute the databases of many routers. That issue was patched with a duplicate check but in this new structure duplicates of payments in the payment list and duplicates of data usage in the data usage list are simply not possible. By storing the usage data as a hashmap indexed by unix timestamp we will always retrieve a copy of the usage data for a given hour if we request it and only one version of any hour can exist. Likewise for payments we've moved to a flat HashSet where the Hash value for the payment struct is the txid, this means that duplicate payments with the same txid can not co-exist in the hashset. This new more robust format will be consumed by operator tools as well making it easier to manage usage data for many routers in the database there. --- althea_types/src/interop.rs | 10 +- althea_types/src/user_info.rs | 83 +++- rita_client/src/dashboard/usage.rs | 2 +- rita_client/src/exit_manager/exit_switcher.rs | 2 +- rita_client/src/operator_update/mod.rs | 146 ++---- rita_client/src/operator_update/tests.rs | 97 +--- .../src/operator_update/update_loop.rs | 3 +- rita_client/src/traffic_watcher/mod.rs | 2 +- rita_common/src/network_endpoints/mod.rs | 2 +- rita_common/src/payment_controller/mod.rs | 8 +- rita_common/src/payment_validator/mod.rs | 20 +- rita_common/src/traffic_watcher/mod.rs | 2 +- rita_common/src/usage_tracker/mod.rs | 470 +++++++----------- rita_common/src/usage_tracker/structs.rs | 163 ++++++ rita_common/src/usage_tracker/tests.rs | 89 ++-- rita_exit/src/traffic_watcher/mod.rs | 2 +- 16 files changed, 536 insertions(+), 565 deletions(-) create mode 100644 rita_common/src/usage_tracker/structs.rs diff --git a/althea_types/src/interop.rs b/althea_types/src/interop.rs index 00961d91e..deeceac8f 100644 --- a/althea_types/src/interop.rs +++ b/althea_types/src/interop.rs @@ -1,5 +1,5 @@ use crate::{contact_info::ContactType, wg_key::WgKey, BillingDetails, InstallationDetails}; -use crate::{ClientExtender, UsageTracker, WifiDevice}; +use crate::{ClientExtender, UsageTrackerFlat, UsageTrackerTransfer, WifiDevice}; use arrayvec::ArrayString; use babel_monitor::structs::Neighbor; use babel_monitor::structs::Route; @@ -335,7 +335,7 @@ pub struct LightClientLocalIdentity { /// This represents a generic payment that may be to or from us /// it contains a txid from a published transaction /// that should be validated against the blockchain -#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone)] +#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone, Copy)] pub struct PaymentTx { pub to: Identity, pub from: Identity, @@ -353,7 +353,7 @@ impl Hash for PaymentTx { /// This represents a generic payment that may be to or from us, it does not contain a txid meaning it is /// unpublished -#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Hash, Clone)] +#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Hash, Clone, Copy)] pub struct UnpublishedPaymentTx { pub to: Identity, pub from: Identity, @@ -747,11 +747,13 @@ pub struct OperatorCheckinMessage { /// This is a user set bandwidth limit value, it will cap the users download /// and upload to the provided value of their choosing. Denoted in mbps pub user_bandwidth_limit: Option, + /// Legacy bandwidth usage from pre beta 20 routers, one of the two will be None + pub user_bandwidth_usage: Option, /// Details of both the Client and Relay bandwidth usage over a given period determined /// by the ops_last_seen_usage_hour in OperatorUpdateMessage. When the device's last /// saved usage hour is the same as the ops last seen, we send no data here as we are up /// to date. Data sent through here gets added to a database entry for each device. - pub user_bandwidth_usage: Option, + pub user_bandwidth_usage_v2: Option, /// This is to keep track of the rita client uptime for debugging purposes /// In the event something whacko happens, serde will magically derive def- /// fault value. diff --git a/althea_types/src/user_info.rs b/althea_types/src/user_info.rs index 19dc8cb85..557ae19c2 100644 --- a/althea_types/src/user_info.rs +++ b/althea_types/src/user_info.rs @@ -1,5 +1,4 @@ -use std::collections::VecDeque; -use std::hash::{Hash, Hasher}; +use std::collections::{HashMap, VecDeque}; use std::net::Ipv4Addr; use std::time::SystemTime; @@ -64,33 +63,85 @@ pub struct InstallationDetails { pub install_date: Option, } -/// The main actor that holds the usage state for the duration of operations -/// to be sent up to ops tools. +/// The old storage method for usage tracker data that stores flat data +/// in arrays and does not index the data via hashmap this format was abandoned +/// as error prone but is still used so legacy routers can send data #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] -pub struct UsageTracker { +pub struct UsageTrackerFlat { pub last_save_hour: u64, - pub client_bandwidth: VecDeque, - pub relay_bandwidth: VecDeque, + pub client_bandwidth: VecDeque, + pub relay_bandwidth: VecDeque, } /// A struct for tracking each hour of usage, indexed by time in hours since /// the unix epoch. -#[derive(Clone, Copy, Debug, Serialize, Deserialize)] -pub struct UsageHour { +#[derive(Clone, Copy, Debug, Serialize, Deserialize, Eq, PartialEq)] +pub struct IndexedUsageHour { pub index: u64, pub up: u64, pub down: u64, pub price: u32, } -impl Hash for UsageHour { - fn hash(&self, state: &mut H) { - self.index.hash(state); +/// A struct used to store data usage over an arbitrary period the length of time +/// is implied by the code that is handling this struct. Do not transfer without considering +/// that you may be changing units +#[derive(Clone, Copy, Debug, Serialize, Deserialize, Eq, PartialEq)] +pub struct Usage { + pub up: u64, + pub down: u64, + pub price: u32, +} + +/// The main actor that holds the usage state for the duration of operations +/// to be sent up to ops tools. +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] +pub struct UsageTrackerTransfer { + /// client bandwidth usage per hour indexd by unix timestamp in hours + pub client_bandwidth: HashMap, + /// relay bandwidth usage per hour indexd by unix timestamp in hours + pub relay_bandwidth: HashMap, + /// exit bandwidth usage per hour indexd by unix timestamp in hours + pub exit_bandwidth: HashMap, +} + +/// Used to convert between usage tracker storage formats +pub fn convert_flat_to_map_usage_data(input: VecDeque) -> HashMap { + let mut out = HashMap::new(); + for hour in input { + match out.get_mut(&hour.index) { + // we have a duplicate entry which we must correct, pick the higher data usage and keep that + Some(to_edit) => { + let duplicate_usage: Usage = *to_edit; + to_edit.up = std::cmp::max(duplicate_usage.up, hour.up); + to_edit.down = std::cmp::max(duplicate_usage.down, hour.down); + to_edit.price = std::cmp::max(duplicate_usage.price, hour.price); + } + None => { + out.insert( + hour.index, + Usage { + up: hour.up, + down: hour.down, + price: hour.price, + }, + ); + } + } } + out } -impl PartialEq for UsageHour { - fn eq(&self, other: &Self) -> bool { - self.index == other.index + +/// Used to convert between usage tracker storage formats +pub fn convert_map_to_flat_usage_data(input: HashMap) -> VecDeque { + let mut out = VecDeque::new(); + for (hour, usage) in input { + out.push_back(IndexedUsageHour { + index: hour, + up: usage.up, + down: usage.down, + price: usage.price, + }) } + out } -impl Eq for UsageHour {} diff --git a/rita_client/src/dashboard/usage.rs b/rita_client/src/dashboard/usage.rs index fca0f53c6..90217ca3b 100644 --- a/rita_client/src/dashboard/usage.rs +++ b/rita_client/src/dashboard/usage.rs @@ -1,6 +1,6 @@ use actix_web_async::{HttpRequest, HttpResponse}; use rita_common::usage_tracker::get_usage_data; -use rita_common::usage_tracker::UsageType; +use rita_common::usage_tracker::structs::UsageType; pub async fn get_client_usage(_req: HttpRequest) -> HttpResponse { trace!("/usage/client hit"); diff --git a/rita_client/src/exit_manager/exit_switcher.rs b/rita_client/src/exit_manager/exit_switcher.rs index e2132e757..5869f73ea 100644 --- a/rita_client/src/exit_manager/exit_switcher.rs +++ b/rita_client/src/exit_manager/exit_switcher.rs @@ -832,7 +832,7 @@ mod tests { u16::MAX, tracking_exit, u16::MAX, - best_exit, + best_exit, 400 ), &mut vec, diff --git a/rita_client/src/operator_update/mod.rs b/rita_client/src/operator_update/mod.rs index 749206d4a..74cd59bd9 100644 --- a/rita_client/src/operator_update/mod.rs +++ b/rita_client/src/operator_update/mod.rs @@ -8,21 +8,20 @@ use crate::exit_manager::{get_client_pub_ipv6, get_selected_exit_ip}; use crate::rita_loop::is_gateway_client; use crate::{ extend_hardware_info, reset_wifi_pass, set_router_update_instruction, set_wifi_multi_internal, + RitaClientError, }; use althea_kernel_interface::hardware_info::get_hardware_info; -use althea_types::get_sequence_num; +use althea_types::{get_sequence_num, UsageTrackerTransfer}; use althea_types::{ AuthorizedKeys, BillingDetails, ContactStorage, ContactType, CurExitInfo, ExitConnection, - HardwareInfo, OperatorAction, OperatorCheckinMessage, OperatorUpdateMessage, UsageHour, - UsageTracker, + HardwareInfo, OperatorAction, OperatorCheckinMessage, OperatorUpdateMessage, }; use num256::Uint256; use rita_common::rita_loop::is_gateway; use rita_common::tunnel_manager::neighbor_status::get_neighbor_status; use rita_common::tunnel_manager::shaping::flag_reset_shaper; -use rita_common::usage_tracker::UsageHour as RCUsageHour; -use rita_common::usage_tracker::UsageType::{Client, Relay}; -use rita_common::usage_tracker::{get_current_hour, get_usage_data}; +use rita_common::usage_tracker::structs::UsageType::{Client, Relay}; +use rita_common::usage_tracker::{get_current_hour, get_usage_data_map}; use rita_common::utils::option_convert; use rita_common::DROPBEAR_AUTHORIZED_KEYS; use rita_common::KI; @@ -31,7 +30,7 @@ use serde_json::Value; use settings::client::RitaClientSettings; use settings::network::NetworkSettings; use settings::payment::PaymentSettings; -use std::collections::{HashSet, VecDeque}; +use std::collections::{HashMap, HashSet}; use std::fs::{remove_file, rename, File}; use std::io::{BufRead, BufReader, Write}; use std::os::unix::fs::PermissionsExt; @@ -64,7 +63,7 @@ const UPDATE_FREQUENCY_CAP: Duration = Duration::from_secs(3600); pub async fn operator_update( ops_last_seen_usage_hour: Option, timeout: Duration, -) -> Result { +) -> Result { let url: &str; if cfg!(feature = "dev_env") { url = "http://7.7.7.7:8080/checkin"; @@ -139,30 +138,6 @@ pub async fn operator_update( }, }); - let current_hour = match get_current_hour() { - Ok(hour) => hour, - Err(e) => { - error!("System time is set earlier than unix epoch {:?}", e); - return Err(()); - } - }; - let last_seen_hour = ops_last_seen_usage_hour.unwrap_or(0); - // We check that the difference is >1 because we leave a 1 hour buffer to prevent from sending over an incomplete current hour - let send_hours = current_hour - last_seen_hour > 1; - let mut usage_tracker_data: Option = None; - // if ops_last_seen_usage_hour is a None the thread has restarted and we are waiting for ops to tell us how much data we need to send, - // which will be populated with the next checkin cycle. we only send 1 month at a time if ops is requesting the full usage history. - if send_hours && ops_last_seen_usage_hour.is_some() { - let usage_data_client = get_usage_data(Client); - let usage_data_relay = get_usage_data(Relay); - usage_tracker_data = process_usage_data( - usage_data_client, - usage_data_relay, - last_seen_hour, - current_hour, - ) - } - let exit_con = Some(ExitConnection { cur_exit, client_pub_ipv6: get_client_pub_ipv6(), @@ -184,7 +159,8 @@ pub async fn operator_update( hardware_info, user_bandwidth_limit, rita_uptime: RITA_UPTIME.elapsed(), - user_bandwidth_usage: usage_tracker_data, + user_bandwidth_usage: None, + user_bandwidth_usage_v2: prepare_usage_data_for_upload(ops_last_seen_usage_hour)?, }) .await; @@ -196,7 +172,7 @@ pub async fn operator_update( } Err(e) => { error!("Failed to perform operator checkin with {:?}", e); - return Err(()); + return Err(e.into()); } }; @@ -204,7 +180,7 @@ pub async fn operator_update( Ok(a) => a, Err(e) => { error!("Failed to perform operator checkin with {:?}", e); - return Err(()); + return Err(e.into()); } }; @@ -260,13 +236,7 @@ pub async fn operator_update( }; set_router_update_instruction(update_instructions); perform_operator_update(new_settings.clone(), rita_client, network); - // update our count of ops last seen if we have confirmation ops is up to date on usage data - if new_settings.ops_last_seen_usage_hour == current_hour { - info!("Confirmed ops has taken usage update for hour {current_hour}"); - Ok(current_hour) - } else { - Ok(new_settings.ops_last_seen_usage_hour) - } + Ok(new_settings.ops_last_seen_usage_hour) } /// logs some hardware info that may help debug this router @@ -626,65 +596,39 @@ fn contains_forbidden_key(map: Map, forbidden_values: &[&str]) -> false } -/// Given a vecdeque of usage hours, add up to a month's worth of hours to a returned vecdeque -pub fn iterate_month_usage_data(mut data: VecDeque) -> VecDeque { - // one month in hours - let max_hour_iterations: u32 = 730; - let mut client_iter = 0; - let mut res = VecDeque::new(); - while let Some(hour) = data.pop_front() { - // either we hit max iterations or we are on the second to last entry. - res.push_back(UsageHour { - up: hour.up, - down: hour.down, - price: hour.price, - index: hour.index, - }); - if client_iter >= max_hour_iterations || data.len() == 1 { - break; +/// This function handles preparing usage data for upload to operator tools +fn prepare_usage_data_for_upload( + ops_last_seen_hour: Option, +) -> Result, RitaClientError> { + // if this is none we will not send usage data to ops + // as it indicates we don't yet know what data ops has + match ops_last_seen_hour { + Some(ops_last_seen_hour) => { + let current_hour = get_current_hour()?; + let usage_data_client = get_usage_data_map(Client); + let usage_data_relay = get_usage_data_map(Relay); + // We check that the difference is >1 because we leave a 1 hour buffer to prevent from sending over an incomplete current hour + let min_send_hour = ops_last_seen_hour; + let mut client_bandwidth = HashMap::new(); + let mut relay_bandwidth = HashMap::new(); + for (index, usage) in usage_data_client { + if index > min_send_hour && index < current_hour { + client_bandwidth.insert(index, usage); + } + } + for (index, usage) in usage_data_relay { + if index > min_send_hour && index < current_hour { + relay_bandwidth.insert(index, usage); + } + } + Ok(Some(UsageTrackerTransfer { + client_bandwidth, + relay_bandwidth, + // this function processes client usage data so this is always true + // in the future this value will be set by exits uploading data + exit_bandwidth: HashMap::new(), + })) } - client_iter += 1; + None => Ok(None), } - res -} - -/// Given our saved usage data and our last seen value, process the vecdeques so that we only -/// send to ops new data since last seen. -pub fn process_usage_data( - mut usage_data_client: VecDeque, - mut usage_data_relay: VecDeque, - last_seen_hour: u64, - current_hour: u64, -) -> Option { - // sort client and relay data in case they have come out of order somehow. This sorts by index increasing so newest data at back - usage_data_relay - .make_contiguous() - .sort_by(|a, b| a.index.cmp(&b.index)); - usage_data_client - .make_contiguous() - .sort_by(|a, b| a.index.cmp(&b.index)); - - // so this spits out the index for where last seen is, or the index of the next highest hour(returned in an error). - // we take the result -1 just in case, limit 0, since it's possible we might get back an index out of bounds at the back. - let client_last_seen_index = - match usage_data_client.binary_search_by(|x| x.index.cmp(&last_seen_hour)) { - Ok(p) => p, - Err(p) => p.saturating_sub(1), - }; - let relay_last_seen_index = - match usage_data_relay.binary_search_by(|x| x.index.cmp(&last_seen_hour)) { - Ok(p) => p, - Err(p) => p.saturating_sub(1), - }; - // remove all data before the last seen index - usage_data_client.drain(0..client_last_seen_index); - usage_data_relay.drain(0..relay_last_seen_index); - let new_client_data = iterate_month_usage_data(usage_data_client); - let new_relay_data = iterate_month_usage_data(usage_data_relay); - - Some(UsageTracker { - last_save_hour: current_hour, - client_bandwidth: new_client_data, - relay_bandwidth: new_relay_data, - }) } diff --git a/rita_client/src/operator_update/tests.rs b/rita_client/src/operator_update/tests.rs index b53723b86..617f19d35 100644 --- a/rita_client/src/operator_update/tests.rs +++ b/rita_client/src/operator_update/tests.rs @@ -1,18 +1,14 @@ #[cfg(test)] mod test { - use rand::seq::SliceRandom; - use rita_common::usage_tracker::get_current_hour; - use rita_common::usage_tracker::tests::test::generate_dummy_usage_tracker; + use crate::operator_update::contains_forbidden_key; + use crate::operator_update::prepare_usage_data_for_upload; + use crate::operator_update::update_authorized_keys; use serde_json::json; use serde_json::Value; use std::fs::File; use std::io::{BufRead, BufReader, Write}; use std::{fs, io::Error, path::Path}; - use crate::operator_update::contains_forbidden_key; - use crate::operator_update::process_usage_data; - use crate::operator_update::update_authorized_keys; - const FORBIDDEN_MERGE_VALUES: [&str; 2] = ["test_key", "other_test_key"]; #[test] @@ -136,90 +132,7 @@ mod test { assert!(Path::new(key_file).exists()); } #[test] - fn test_usage_data_processing() { - // this tests the flow used in rita client's operator update loop used to process usage data sent up to ops - let dummy_usage_tracker = generate_dummy_usage_tracker(); - let mut usage_data_client = dummy_usage_tracker.client_bandwidth.clone(); - let mut usage_data_relay = dummy_usage_tracker.relay_bandwidth; - let mut unshuffled_client = usage_data_client.clone(); - let mut unshuffled_relay = usage_data_relay.clone(); - - // Test the sort function first: - // shuffle the data because it's currently ordered - usage_data_client - .make_contiguous() - .shuffle(&mut rand::thread_rng()); - println!( - "Sample of current shuffle is {} {} {}", - usage_data_client.get(0).unwrap().index, - usage_data_client.get(1).unwrap().index, - usage_data_client.get(2).unwrap().index - ); - usage_data_relay - .make_contiguous() - .shuffle(&mut rand::thread_rng()); - // The processing function sorts these lowest index to highest. Note that usage hours are stored to disk as - // the opposite order where newest are added to the front, so this is inefficient. - // Options here to optimize are either a/write my own binary sort again which will compare for the existing structure - // where the saved vecdeque is highest index to lowest index, b/rework usage tracker so that we save data lowest index - // to highest index, or c/the current solution(inefficient, as we will be fully reversing the whole vecdeque of each - // client and relay at least once per hour on rollover): sort the entire list in reverse order to use with the builtin - // bin search from vecdeque - - // for this purpose our last seen will be start hour - 10. - let current_hour = get_current_hour().unwrap(); - let last_seen_hour = current_hour - 10; - let res_usage = process_usage_data( - usage_data_client.clone(), - usage_data_relay.clone(), - last_seen_hour, - current_hour, - ) - .unwrap(); - let res_usage_client = res_usage.client_bandwidth; - let res_usage_relay = res_usage.relay_bandwidth; - - // check that the sorting in process_usage_data is correct after shuffling and sending it through - assert!( - res_usage_relay.get(0).unwrap().index < res_usage_relay.get(1).unwrap().index - && res_usage_relay.get(1).unwrap().index < res_usage_relay.get(2).unwrap().index - ); - assert!( - res_usage_client.get(0).unwrap().index < res_usage_client.get(1).unwrap().index - && res_usage_client.get(1).unwrap().index < res_usage_client.get(2).unwrap().index - ); - // check that the binary searching is correct: we did not remove any entries from usage client; so we should have started exactly - // from the last seen hour. we removed the last seen from usage relay, so we should expect to see our earliest hour as one fewer - assert!(res_usage_client.get(0).unwrap().index == last_seen_hour); - assert!(res_usage_relay.get(0).unwrap().index == last_seen_hour); - - // now check that same thing, but in case we have a gap in the data. we'll remove the entry for the last_seen_hour from usage_data_relay - // to make sure we are successfully returning the next earliest hour (in our case, the last seen -1). we use res_usage client and relay - // because to remove the correct entry it needs to be presorted (if we've gotten here it's guaranteed.) - // we successfully search for the entry or return the next one down. - unshuffled_client.remove(unshuffled_client.len() - 11); - unshuffled_relay.remove(unshuffled_relay.len() - 11); - // so the index of our last seen hour if we say last seen is current - 10... will be at len - 11. - let res_usage = process_usage_data( - unshuffled_client, - unshuffled_relay, - last_seen_hour, - current_hour, - ) - .unwrap(); - let res_usage_client = res_usage.client_bandwidth; - let res_usage_relay = res_usage.relay_bandwidth; - // after processing we should start at last seen - 1. - println!( - "{:?} last seen {:?}", - res_usage_relay.get(0).unwrap().index, - last_seen_hour - ); - assert!(res_usage_relay.get(0).unwrap().index == last_seen_hour - 1); - assert!(res_usage_client.get(0).unwrap().index == last_seen_hour - 1); - - // check that our iteration function does indeed stop at a month of data: - assert!(res_usage_client.len() <= 730); - assert!(res_usage_relay.len() <= 730); + fn test_prepare_usage_data_for_upload() { + assert_eq!(prepare_usage_data_for_upload(None).unwrap(), None); } } diff --git a/rita_client/src/operator_update/update_loop.rs b/rita_client/src/operator_update/update_loop.rs index 599e14672..eab96cca4 100644 --- a/rita_client/src/operator_update/update_loop.rs +++ b/rita_client/src/operator_update/update_loop.rs @@ -43,7 +43,8 @@ pub fn start_operator_update_loop() { wait_unti_next_update = max(wait_unti_next_update / 2, TARGET_UPDATE_FREQUENCY); } - Err(()) => { + Err(e) => { + error!("Ops checkin failed with {:?}!", e); // failed checkin, backoff with a random multiplier the goal of random backoff // is to prevent collisions wait_unti_next_update = min( diff --git a/rita_client/src/traffic_watcher/mod.rs b/rita_client/src/traffic_watcher/mod.rs index c2d86702a..cd3021c32 100644 --- a/rita_client/src/traffic_watcher/mod.rs +++ b/rita_client/src/traffic_watcher/mod.rs @@ -32,9 +32,9 @@ use num_traits::identities::Zero; use rita_common::debt_keeper::{ traffic_replace, traffic_update, wgkey_insensitive_traffic_update, Traffic, }; +use rita_common::usage_tracker::structs::UsageType; use rita_common::usage_tracker::update_usage_data; use rita_common::usage_tracker::UpdateUsage; -use rita_common::usage_tracker::UsageType; use rita_common::KI; use std::collections::HashMap; use std::net::IpAddr; diff --git a/rita_common/src/network_endpoints/mod.rs b/rita_common/src/network_endpoints/mod.rs index 6aa2410ce..cf60a40db 100644 --- a/rita_common/src/network_endpoints/mod.rs +++ b/rita_common/src/network_endpoints/mod.rs @@ -37,7 +37,7 @@ pub async fn make_payments_v2(item: Json>) -> HttpResponse { let mut build_err = String::new(); for pmt in pmt_list { let ts = ToValidate { - payment: pmt.clone(), + payment: pmt, received: Instant::now(), checked: false, }; diff --git a/rita_common/src/payment_controller/mod.rs b/rita_common/src/payment_controller/mod.rs index 0cd7cfa14..2c1e6de34 100644 --- a/rita_common/src/payment_controller/mod.rs +++ b/rita_common/src/payment_controller/mod.rs @@ -311,7 +311,7 @@ async fn make_althea_payment( // setup tx hash let pmt = pmt.publish(Uint256::from_str_radix(&transaction.txhash, 16).unwrap()); - send_make_payment_endpoints(pmt.clone(), network_settings, None, Some(cosmos_node_grpc)).await; + send_make_payment_endpoints(pmt, network_settings, None, Some(cosmos_node_grpc)).await; // place this payment in the validation queue to handle later. let ts = ToValidate { @@ -390,7 +390,7 @@ async fn make_xdai_payment( // add published txid to submission let pmt = pmt.publish(tx_id); - send_make_payment_endpoints(pmt.clone(), network_settings, Some(full_node), None).await; + send_make_payment_endpoints(pmt, network_settings, Some(full_node), None).await; // place this payment in the validation queue to handle later. let ts = ToValidate { @@ -470,7 +470,7 @@ async fn send_make_payment_endpoints( // Get all txids to this client. Temporary add new payment to a copy of a list to send up to endpoint // this pmt is actually recorded in memory after validator confirms it let mut txid_history = get_payment_txids(pmt.to); - txid_history.insert(pmt.clone()); + txid_history.insert(pmt); let actix_client = awc::Client::new(); let neigh_ack = actix_client @@ -485,7 +485,7 @@ async fn send_make_payment_endpoints( let resend_info = ResendInfo { neigh_url: neighbor_url.clone(), - pmt: pmt.clone(), + pmt, attempt: 0u8, }; diff --git a/rita_common/src/payment_validator/mod.rs b/rita_common/src/payment_validator/mod.rs index 6171fbd69..93a0dce4c 100644 --- a/rita_common/src/payment_validator/mod.rs +++ b/rita_common/src/payment_validator/mod.rs @@ -290,7 +290,7 @@ fn remove(msg: Remove) { // store successful transactions so that they can't be played back to us, at least // during this session if msg.success { - add_successful_tx(msg.tx.payment.clone()); + add_successful_tx(msg.tx.payment); } if was_present { info!("Transaction {} was removed", msg.tx); @@ -615,7 +615,7 @@ fn handle_tx_messaging_xdai( ) { let from_address = ts.payment.from.eth_address; let amount = ts.payment.amount; - let pmt = ts.payment.clone(); + let pmt = ts.payment; let our_address = settings::get_rita_common() .payment .eth_address @@ -709,7 +709,7 @@ fn handle_tx_messaging_xdai( ); // update the usage tracker with the details of this payment - update_payments(pmt.clone()); + update_payments(pmt); // Store this payment as a receipt to send in the future if this receiver doesnt see the payment store_payment(pmt); @@ -737,7 +737,7 @@ fn handle_tx_messaging_xdai( /// Handles the tx response from the full node and it's various cases /// pulled out of validate_transaction purely for cosmetic reasons fn handle_tx_messaging_althea(transaction: TransactionDetails, ts: ToValidate) { - let pmt = ts.payment.clone(); + let pmt = ts.payment; // txid is for eth chain and txhash is for althea chain, only one of these should be // Some(..). This was verified before @@ -838,7 +838,7 @@ fn handle_tx_messaging_althea(transaction: TransactionDetails, ts: ToValidate) { denom.expect("How did this happen when we already verified existence"), ); // update the usage tracker with the details of this payment - update_payments(pmt.clone()); + update_payments(pmt); // Store this payment as a receipt to send in the future if this receiver doesnt see the payment store_payment(pmt); @@ -970,8 +970,8 @@ mod tests { txid: 1u8.into(), }; - store_payment(pmt1.clone()); - sent_hashset.insert(pmt1.clone()); + store_payment(pmt1); + sent_hashset.insert(pmt1); assert_eq!(get_payment_txids(pmt1.to), sent_hashset); let pmt2 = PaymentTx { @@ -980,9 +980,9 @@ mod tests { amount: 100u8.into(), txid: 2u8.into(), }; - store_payment(pmt2.clone()); + store_payment(pmt2); - sent_hashset.insert(pmt2.clone()); + sent_hashset.insert(pmt2); assert_eq!(get_payment_txids(pmt2.to), sent_hashset); let pmt3 = PaymentTx { @@ -992,7 +992,7 @@ mod tests { txid: 2u8.into(), }; - store_payment(pmt3.clone()); + store_payment(pmt3); assert_eq!(get_payment_txids(pmt3.to), sent_hashset); } diff --git a/rita_common/src/traffic_watcher/mod.rs b/rita_common/src/traffic_watcher/mod.rs index 925f3ba37..775b8a3fb 100644 --- a/rita_common/src/traffic_watcher/mod.rs +++ b/rita_common/src/traffic_watcher/mod.rs @@ -5,9 +5,9 @@ use crate::debt_keeper::traffic_update; use crate::debt_keeper::Traffic; use crate::tunnel_manager::Neighbor; +use crate::usage_tracker::structs::UsageType; use crate::usage_tracker::update_usage_data; use crate::usage_tracker::UpdateUsage; -use crate::usage_tracker::UsageType; use crate::RitaCommonError; use crate::KI; use althea_kernel_interface::open_tunnel::is_link_local; diff --git a/rita_common/src/usage_tracker/mod.rs b/rita_common/src/usage_tracker/mod.rs index d442c3dc4..9fbf3bd98 100644 --- a/rita_common/src/usage_tracker/mod.rs +++ b/rita_common/src/usage_tracker/mod.rs @@ -6,32 +6,32 @@ use crate::rita_loop::write_to_disk::is_router_storage_small; use crate::RitaCommonError; -use althea_types::Identity; +use althea_types::convert_flat_to_map_usage_data; +use althea_types::convert_map_to_flat_usage_data; +use althea_types::user_info::Usage; +use althea_types::IndexedUsageHour; use althea_types::PaymentTx; use bincode::Error as BincodeError; use flate2::read::ZlibDecoder; use flate2::write::ZlibEncoder; use flate2::Compression; -use num256::Uint256; -use serde::{Deserialize, Serialize}; -use serde_json::Error as JsonError; -use settings::set_rita_common; +use std::collections::HashMap; use std::collections::HashSet; use std::collections::VecDeque; use std::fs::File; -use std::hash::Hash; use std::io::Error as IOError; use std::io::ErrorKind; use std::io::Read; use std::io::Write; use std::path::Path; -use std::path::PathBuf; use std::sync::Arc; use std::sync::RwLock; use std::time::SystemTime; use std::time::UNIX_EPOCH; use std::usize; +use structs::*; +pub mod structs; pub mod tests; /// one year worth of usage storage @@ -48,184 +48,77 @@ const MAX_TX_ENTRIES: usize = 5_000; pub const MINIMUM_NUMBER_OF_TRANSACTIONS_LARGE_STORAGE: usize = 5; pub const MINIMUM_NUMBER_OF_TRANSACTIONS_SMALL_STORAGE: usize = 75; -lazy_static! { - static ref USAGE_TRACKER: Arc> = - Arc::new(RwLock::new(UsageTracker::load_from_disk())); -} +/// The maximum amount of usage data that may be unsaved before we save out to the disk +pub const MAX_UNSAVED_USAGE: u64 = 10 * 1000u64.pow(3); -/// In an effort to converge this module between the three possible bw tracking -/// use cases this enum is used to identify which sort of usage we are tracking -#[derive(Clone, Copy, Debug, Serialize, Deserialize)] -#[allow(dead_code)] -pub enum UsageType { - Client, - Relay, - Exit, -} - -/// A struct for tracking each hour of usage, indexed by time in hours since -/// the unix epoch -#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq)] -pub struct UsageHour { - pub index: u64, - pub up: u64, - pub down: u64, - pub price: u32, +lazy_static! { + static ref USAGE_TRACKER_STORAGE: Arc> = + Arc::new(RwLock::new(UsageTrackerStorage::load_from_disk())); } -/// A version of payment tx with a string txid so that the formatting is correct -/// for display to users. -#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Hash, Clone)] -pub struct FormattedPaymentTx { - pub to: Identity, - pub from: Identity, - pub amount: Uint256, - // should always be populated in this case - pub txid: String, - // TODO add "payment_type" here which will allow the frontend - // to easily tell what this payment is for and prevent the need - // for hacky classification +/// Utility function that grabs usage tracker from it's lock and +/// saves it out. Should be called when we want to save anywhere outside this file +pub fn save_usage_to_disk() { + match USAGE_TRACKER_STORAGE.write().unwrap().save() { + Ok(_val) => info!("Saved usage tracker successfully"), + Err(e) => warn!("Unable to save usage tracker {:}", e), + }; } -fn to_formatted_payment_tx(input: PaymentTx) -> FormattedPaymentTx { - let txid = input.txid; - FormattedPaymentTx { - to: input.to, - from: input.from, - amount: input.amount, - txid: format!("{txid:#066x}"), +/// Helps determine how often we write out to the disk on different devices by setting a device specific mininum +/// number of transactions before saving +pub fn get_minimum_number_of_transactions_to_store() -> usize { + let settings = settings::get_rita_common(); + if is_router_storage_small( + &settings + .network + .device + .unwrap_or_else(|| "x86_64".to_string()), + ) { + MINIMUM_NUMBER_OF_TRANSACTIONS_SMALL_STORAGE + } else { + MINIMUM_NUMBER_OF_TRANSACTIONS_LARGE_STORAGE } } -/// A struct for tracking each hours of payments indexed in hours since unix epoch -#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] -pub struct PaymentHour { - index: u64, - payments: Vec, -} - -/// The main actor that holds the usage state for the duration of operations -/// at some point loading and saving will be defined in service started - -#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] -pub struct UsageTracker { - pub last_save_hour: u64, - // at least one of these will be left unused - pub client_bandwidth: VecDeque, - pub relay_bandwidth: VecDeque, - pub exit_bandwidth: VecDeque, - /// A history of payments - pub payments: VecDeque, -} - -impl UsageTracker { - /// This function is run on startup and removes duplicate entries from usage history - /// This function is run on startup and removes duplicate entries from usage history - pub fn remove_duplicate_and_invalid_payment_entires(mut self) -> UsageTracker { - let mut duplicate_list = HashSet::new(); - let mut payments = self.payments.clone(); - for hour in payments.iter_mut() { - let mut payments = Vec::new(); - for p in hour.payments.iter() { - let txid: Uint256 = match p.txid.parse() { - Ok(tx) => tx, - Err(_) => { - warn!("Removed invalid payment! {}", p.txid); - // found an error, removing - continue; - } - }; - if duplicate_list.contains(&txid) { - // found a duplicate, removing - } else { - duplicate_list.insert(txid); - payments.push(p.clone()) +impl UsageTrackerStorage { + /// This function checks to see how many bytes were used + /// and if the amount used is not greater than 10gb than + /// it will return false. Essentially, it's checking to make + /// sure that there is enough usage to be worth saving + pub fn check_unsaved_usage(&self) -> bool { + let mut total_unsaved_bytes = 0; + let v = vec![ + &self.client_bandwidth, + &self.relay_bandwidth, + &self.exit_bandwidth, + ]; + for i in v { + for (index, usage) in i { + if *index > self.last_save_hour { + total_unsaved_bytes += usage.up; + total_unsaved_bytes += usage.down; } } - // insert filtered payments list - hour.payments = payments; } - self.payments = payments; - self + total_unsaved_bytes > MAX_UNSAVED_USAGE } - pub fn get_txids(&self) -> HashSet { - let mut set = HashSet::new(); - for hour in &self.payments { - for p in &hour.payments { - let txid: Uint256 = match p.txid.parse() { - Ok(t) => t, - Err(_) => { - error!("Invalid tx id in usage tracker {}", p.txid); - continue; - } - }; - set.insert(txid); - } - } - set - } -} - -/// This function checks to see how many bytes were used -/// and if the amount used is not greater than 10gb than -/// it will return false. Essentially, it's checking to make -/// sure that there is usage within the router. -fn check_usage_hour(input: &VecDeque, last_save_hour: u64) -> bool { - let mut input = input.clone(); - let number_of_bytes = 10000; - let mut total_used_bytes = 0; - while !input.is_empty() { - match input.pop_front() { - Some(val) => { - total_used_bytes += val.up; - if val.index < last_save_hour { - return total_used_bytes < number_of_bytes; - } + /// Returns true if the numberof unsaved payments is greater than the mininum number of transactions to store + pub fn check_unsaved_payments(&self) -> bool { + let mut total_num_unsaved_payments = 0; + for p in self.payments.iter() { + if p.index > self.last_save_hour { + total_num_unsaved_payments += 1; } - None => break, } + total_num_unsaved_payments > get_minimum_number_of_transactions_to_store() } - false -} - -fn at_least_two(a: bool, b: bool, c: bool) -> bool { - if a { - b || c - } else { - b && c - } -} - -/// Utility function that grabs usage tracker from it's lock and -/// saves it out. Should be called when we want to save anywhere outside this file -pub fn save_usage_to_disk() { - match USAGE_TRACKER.write().unwrap().save() { - Ok(_val) => info!("Saved usage tracker successfully"), - Err(e) => warn!("Unable to save usage tracker {:}", e), - }; -} -impl UsageTracker { pub fn save(&mut self) -> Result<(), RitaCommonError> { let settings = settings::get_rita_common(); - let minimum_number_of_transactions = if is_router_storage_small( - &settings - .network - .device - .unwrap_or_else(|| "x86_64".to_string()), - ) { - MINIMUM_NUMBER_OF_TRANSACTIONS_SMALL_STORAGE - } else { - MINIMUM_NUMBER_OF_TRANSACTIONS_LARGE_STORAGE - }; - if self.payments.len() < minimum_number_of_transactions - || at_least_two( - check_usage_hour(&self.client_bandwidth, self.last_save_hour), - check_usage_hour(&self.exit_bandwidth, self.last_save_hour), - check_usage_hour(&self.relay_bandwidth, self.last_save_hour), - ) - { + + if self.check_unsaved_payments() || self.check_unsaved_usage() { return Err(RitaCommonError::StdError(IOError::new( ErrorKind::Other, "Too little data for writing", @@ -257,7 +150,9 @@ impl UsageTracker { // 500 tx min. Payment data is trimmed if out of space as it is larger than usage data if newsize >= 1000 { newsize /= 2; - trim_payments(newsize, &mut self.payments); + while self.payments.len() > newsize { + self.remove_oldest_payment_history_entry() + } let serialized = bincode::serialize(self)?; compressed_bytes = match compress_serialized(serialized) { Ok(bytes) => bytes, @@ -277,14 +172,14 @@ impl UsageTracker { /// struct will be returned so data can be successfully collected from the present moment forward. /// /// TODO remove in beta 21 migration code migrates json serialized data to bincode - fn load_from_disk() -> UsageTracker { + fn load_from_disk() -> UsageTrackerStorage { // if the loading process goes wrong for any reason, we just start again - let blank_usage_tracker = UsageTracker { + let blank_usage_tracker = UsageTrackerStorage { + client_bandwidth: HashMap::new(), + relay_bandwidth: HashMap::new(), + exit_bandwidth: HashMap::new(), + payments: HashSet::new(), last_save_hour: 0, - client_bandwidth: VecDeque::new(), - relay_bandwidth: VecDeque::new(), - exit_bandwidth: VecDeque::new(), - payments: VecDeque::new(), }; let file_path = settings::get_rita_common().network.usage_tracker_file; @@ -304,49 +199,43 @@ impl UsageTracker { Err(_e) => return blank_usage_tracker, }; - let res = match ( + match ( file_exists, - try_bincode(&unzipped_bytes), - try_json(&unzipped_bytes), + try_bincode_new(&unzipped_bytes), + try_bincode_old(&unzipped_bytes), ) { - // file exists and bincode deserialization was successful, in the case that somehow json deserialization of the same - // data was also successful just ignore it and use bincode + // file exists and bincode deserialization was successful, ignore all other possibilities (true, Ok(bincode_tracker), _) => bincode_tracker, - //file exists, but bincode deserialization failed -> load using serde (old), update settings and save file - (true, Err(_e), Ok(mut json_tracker)) => { - let mut settings = settings::get_rita_common(); - // save with bincode regardless of result of serde deserialization in order to end reliance on json - let old_path = PathBuf::from(settings.network.usage_tracker_file); - - let mut new_path = old_path.clone(); - new_path.set_extension("bincode"); - - settings.network.usage_tracker_file = - new_path.clone().into_os_string().into_string().unwrap(); - set_rita_common(settings); - - match json_tracker.save() { - Ok(()) => { - // delete the old file after successfully migrating, this may cause problems on routers with - // low available storage space since we want to take up space for both the new and old file - if !(old_path.eq(&new_path)) { - // check that we would not be deleting the file just saved to - let _r = std::fs::remove_file(old_path); - } else { - error!( - "We are trying to save over {:?} with {:?}, how are they same?", - old_path, new_path - ) + // file exists, up to date encoding failed, beta 20 encoding succeeded + (true, Err(_), Ok(bincode_tracker)) => UsageTrackerStorage { + last_save_hour: bincode_tracker.last_save_hour, + client_bandwidth: convert_flat_to_map_usage_data(bincode_tracker.client_bandwidth), + relay_bandwidth: convert_flat_to_map_usage_data(bincode_tracker.relay_bandwidth), + exit_bandwidth: convert_flat_to_map_usage_data(bincode_tracker.exit_bandwidth), + payments: { + let mut out = HashSet::new(); + for ph in bincode_tracker.payments { + for p in ph.payments { + match p.txid.parse() { + Ok(txid) => { + out.insert(UsageTrackerPayment { + to: p.to, + from: p.from, + amount: p.amount, + txid, + index: ph.index, + }); + } + Err(e) => error!( + "Failed to convert payment with txid {:?} discarding!", + e + ), + } } - json_tracker - } - Err(e) => { - error!("Failed to save UsageTracker to bincode {:?}", e); - json_tracker } - } - } - + out + }, + }, // file does not exist; no data to load, this is probably a new router // and we'll just generate a new file (false, _, _) => blank_usage_tracker, @@ -360,8 +249,7 @@ impl UsageTracker { ); blank_usage_tracker } - }; - res.remove_duplicate_and_invalid_payment_entires() + } } } @@ -385,14 +273,14 @@ fn decompressed(mut file: File) -> Result, RitaCommonError> { } /// Attempts to deserialize the provided array of bytes as a bincode encoded UsageTracker struct -fn try_bincode(bytes: &[u8]) -> Result { - let deserialized: Result = bincode::deserialize(bytes); +fn try_bincode_new(bytes: &[u8]) -> Result { + let deserialized: Result = bincode::deserialize(bytes); deserialized } -/// Attempts to deserialize the provided array of bytes as a json encoded UsageTracker struct -fn try_json(bytes: &[u8]) -> Result { - let deserialized: Result = serde_json::from_slice(bytes); +/// Attempts to deserialize the provided array of bytes as a bincode encoded UsageTracker struct +fn try_bincode_old(bytes: &[u8]) -> Result { + let deserialized: Result = bincode::deserialize(bytes); deserialized } @@ -430,53 +318,49 @@ pub fn update_usage_data(msg: UpdateUsage) { } }; - process_usage_update(curr_hour, msg, &mut (USAGE_TRACKER.write().unwrap())); + let mut usage_tracker = USAGE_TRACKER_STORAGE.write().unwrap(); + + usage_tracker.process_usage_update(curr_hour, msg); } -fn process_usage_update(current_hour: u64, msg: UpdateUsage, data: &mut UsageTracker) { - // history contains a reference to whatever the correct storage array is - let history = match msg.kind { - UsageType::Client => &mut data.client_bandwidth, - UsageType::Relay => &mut data.relay_bandwidth, - UsageType::Exit => &mut data.exit_bandwidth, - }; - // we grab the front entry from the VecDeque, if there is an entry one we check if it's - // up to date, if it is we add to it, if it's not or there is no entry we create one. - // note that price is only sampled once per hour. - match history.front_mut() { - None => history.push_front(UsageHour { - index: current_hour, - up: msg.up, - down: msg.down, - price: msg.price, - }), - Some(entry) => { - if entry.index == current_hour { +impl UsageTrackerStorage { + fn process_usage_update(&mut self, current_hour: u64, msg: UpdateUsage) { + // history contains a reference to whatever the correct storage array is + let history = match msg.kind { + UsageType::Client => &mut self.client_bandwidth, + UsageType::Relay => &mut self.relay_bandwidth, + UsageType::Exit => &mut self.exit_bandwidth, + }; + // we grab the front entry from the VecDeque, if there is an entry one we check if it's + // up to date, if it is we add to it, if it's not or there is no entry we create one. + // note that price is only sampled once per hour. + match history.get_mut(¤t_hour) { + None => { + history.insert( + current_hour, + Usage { + up: msg.up, + down: msg.down, + price: msg.price, + }, + ); + } + Some(entry) => { entry.up += msg.up; entry.down += msg.down; - } else { - history.push_front(UsageHour { - index: current_hour, - up: msg.up, - down: msg.down, - price: msg.price, - }) } } - } - while history.len() > MAX_USAGE_ENTRIES { - let _discarded_entry = history.pop_back(); - } -} - -fn trim_payments(size: usize, history: &mut VecDeque) { - while history.len() > size { - let _discarded_entry = history.pop_back(); + while history.len() > MAX_USAGE_ENTRIES { + let smallest_key = history.keys().min_by(|a, b| a.cmp(b)).cloned(); + if let Some(smallest_key) = smallest_key { + history.remove(&smallest_key); + } + } } } pub fn update_payments(payment: PaymentTx) { - let history = &mut (USAGE_TRACKER.write().unwrap()); + let history = &mut (USAGE_TRACKER_STORAGE.write().unwrap()); // This handles the following edge case: // Router A is paying router B. Router B reboots and loses all data in @@ -489,44 +373,45 @@ pub fn update_payments(payment: PaymentTx) { return; } - handle_payments(history, &payment); + history.handle_payments(&payment); } -/// Internal handler function that deals with adding a payment to the list -/// and saving if required -fn handle_payments(history: &mut UsageTracker, payment: &PaymentTx) { - let current_hour = match get_current_hour() { - Ok(hour) => hour, - Err(e) => { - error!("System time is set earlier than unix epoch! {:?}", e); - return; - } - }; - let formatted_payment = to_formatted_payment_tx(payment.clone()); - match history.payments.front_mut() { - None => history.payments.push_front(PaymentHour { - index: current_hour, - payments: vec![formatted_payment], - }), - Some(entry) => { - if entry.index == current_hour { - entry.payments.push(formatted_payment); - } else { - history.payments.push_front(PaymentHour { - index: current_hour, - payments: vec![formatted_payment], - }) +impl UsageTrackerStorage { + /// Internal handler function that deals with adding a payment to the list + /// and saving if required + fn handle_payments(&mut self, payment: &PaymentTx) { + let current_hour = match get_current_hour() { + Ok(hour) => hour, + Err(e) => { + error!("System time is set earlier than unix epoch! {:?}", e); + return; } + }; + let formatted_payment = UsageTrackerPayment::from_payment_tx(*payment, current_hour); + self.payments.insert(formatted_payment); + + while self.payments.len() > MAX_TX_ENTRIES { + self.remove_oldest_payment_history_entry() } } - while history.payments.len() > MAX_TX_ENTRIES { - let _discarded_entry = history.payments.pop_back(); + + /// Removes a single tx from the payment history entry, oldest first + fn remove_oldest_payment_history_entry(&mut self) { + let oldest = self + .payments + .iter() + .min_by(|a, b| a.index.cmp(&b.index)) + .cloned(); + if let Some(oldest) = oldest { + self.payments.remove(&oldest); + } } } /// Gets usage data for this router, stored on the local disk at periodic intervals -pub fn get_usage_data(kind: UsageType) -> VecDeque { - let usage_tracker_var = &*(USAGE_TRACKER.write().unwrap()); +pub fn get_usage_data_map(kind: UsageType) -> HashMap { + let usage_tracker_var = &*(USAGE_TRACKER_STORAGE.write().unwrap()); + match kind { UsageType::Client => usage_tracker_var.client_bandwidth.clone(), UsageType::Relay => usage_tracker_var.relay_bandwidth.clone(), @@ -534,16 +419,27 @@ pub fn get_usage_data(kind: UsageType) -> VecDeque { } } +/// Gets usage data for this router, stored on the local disk at periodic intervals +pub fn get_usage_data(kind: UsageType) -> VecDeque { + let usage_tracker_var = &*(USAGE_TRACKER_STORAGE.write().unwrap()); + let data = match kind { + UsageType::Client => usage_tracker_var.client_bandwidth.clone(), + UsageType::Relay => usage_tracker_var.relay_bandwidth.clone(), + UsageType::Exit => usage_tracker_var.exit_bandwidth.clone(), + }; + convert_map_to_flat_usage_data(data) +} + /// Gets the last saved usage hour from the existing usage tracker pub fn get_last_saved_usage_hour() -> u64 { - let usage_tracker = &*(USAGE_TRACKER.read().unwrap()); + let usage_tracker = &*(USAGE_TRACKER_STORAGE.read().unwrap()); usage_tracker.last_save_hour } /// Gets payment data for this router, stored on the local disk at periodic intervals pub fn get_payments_data() -> VecDeque { - let usage_tracker_var = &*(USAGE_TRACKER.read().unwrap()); - usage_tracker_var.payments.clone() + let usage_tracker_var = &*(USAGE_TRACKER_STORAGE.read().unwrap()); + convert_payment_set_to_payment_hour(usage_tracker_var.payments.clone()) } /// On an interupt (SIGTERM), saving USAGE_TRACKER before exiting, this is essentially diff --git a/rita_common/src/usage_tracker/structs.rs b/rita_common/src/usage_tracker/structs.rs new file mode 100644 index 000000000..bef1862f4 --- /dev/null +++ b/rita_common/src/usage_tracker/structs.rs @@ -0,0 +1,163 @@ +use althea_types::{Identity, IndexedUsageHour, PaymentTx, Usage}; +use num256::Uint256; +use std::collections::{HashMap, HashSet, VecDeque}; +use std::hash::Hash; + +/// A struct for tracking each hours of payments indexed in hours since unix epoch +/// used to send to the frontend +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] +pub struct PaymentHour { + pub index: u64, + pub payments: Vec, +} + +/// Old usage tracker struct used by versions up to Beta 20 rc29 and Beta 21 rc2 +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct UsageTrackerStorageOld { + pub last_save_hour: u64, + // at least one of these will be left unused + pub client_bandwidth: VecDeque, + pub relay_bandwidth: VecDeque, + pub exit_bandwidth: VecDeque, + /// A history of payments + pub payments: VecDeque, +} + +/// Usage tracker data storage, stores information about the data usage of this +/// Rita process +#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] +pub struct UsageTrackerStorage { + /// The last time this struct was saved to the disk + pub last_save_hour: u64, + // at least one of these will be left unused + /// client bandwidth usage per hour indexd by unix timestamp in hours + pub client_bandwidth: HashMap, + /// relay bandwidth usage per hour indexd by unix timestamp in hours + pub relay_bandwidth: HashMap, + /// exit bandwidth usage per hour indexd by unix timestamp in hours + pub exit_bandwidth: HashMap, + /// A history of payments + pub payments: HashSet, +} + +impl UsageTrackerStorage { + pub fn get_txids(&self) -> HashSet { + let mut set = HashSet::new(); + for p in &self.payments { + set.insert(p.txid); + } + set + } +} + +/// In an effort to converge this module between the three possible bw tracking +/// use cases this enum is used to identify which sort of usage we are tracking +#[derive(Clone, Copy, Debug, Serialize, Deserialize)] +#[allow(dead_code)] +pub enum UsageType { + Client, + Relay, + Exit, +} + +/// A version of payment tx with a string txid so that the formatting is correct +/// for display to users. +#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Hash, Clone)] +pub struct FormattedPaymentTxOld { + pub to: Identity, + pub from: Identity, + pub amount: Uint256, + pub txid: String, +} + +impl From for FormattedPaymentTxOld { + fn from(input: PaymentTx) -> Self { + let txid = input.txid; + FormattedPaymentTxOld { + to: input.to, + from: input.from, + amount: input.amount, + txid: format!("{txid:#066x}"), + } + } +} + +impl From for FormattedPaymentTxOld { + fn from(value: UsageTrackerPayment) -> Self { + let txid = value.txid; + FormattedPaymentTxOld { + to: value.to, + from: value.from, + amount: value.amount, + txid: format!("{txid:#066x}"), + } + } +} + +/// A version of payment tx with a string txid so that the formatting is correct +/// for display to users. +#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone)] +pub struct UsageTrackerPayment { + pub to: Identity, + pub from: Identity, + pub amount: Uint256, + pub txid: Uint256, + /// the unix timestamp in hours of when this payment occured. + pub index: u64, +} + +impl Ord for UsageTrackerPayment { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.index.cmp(&other.index) + } +} + +impl PartialOrd for UsageTrackerPayment { + fn partial_cmp(&self, other: &Self) -> Option { + self.index.partial_cmp(&other.index) + } +} + +impl Hash for UsageTrackerPayment { + fn hash(&self, state: &mut H) { + // hash all values except the timestamp + self.to.hash(state); + self.from.hash(state); + self.amount.hash(state); + self.txid.hash(state); + } +} + +impl UsageTrackerPayment { + pub fn from_payment_tx(input: PaymentTx, index: u64) -> UsageTrackerPayment { + UsageTrackerPayment { + to: input.to, + from: input.from, + amount: input.amount, + txid: input.txid, + index, + } + } +} + +pub fn convert_payment_set_to_payment_hour( + input: HashSet, +) -> VecDeque { + let mut intermediate = HashMap::new(); + for ph in input { + match intermediate.get_mut(&ph.index) { + None => { + intermediate.insert(ph.index, vec![ph]); + } + Some(val) => val.push(ph), + } + } + let mut out = VecDeque::new(); + for (h, phs) in intermediate { + out.push_back(PaymentHour { + index: h, + payments: phs.into_iter().map(|v| v.into()).collect(), + }) + } + out +} diff --git a/rita_common/src/usage_tracker/tests.rs b/rita_common/src/usage_tracker/tests.rs index 8b911a18c..51066788a 100644 --- a/rita_common/src/usage_tracker/tests.rs +++ b/rita_common/src/usage_tracker/tests.rs @@ -2,31 +2,42 @@ #[allow(unused)] pub mod test { + use crate::usage_tracker::structs::{ + convert_payment_set_to_payment_hour, UsageTrackerPayment, UsageTrackerStorageOld, + }; use crate::usage_tracker::{ - get_current_hour, FormattedPaymentTx, IOError, PaymentHour, UsageHour, UsageTracker, - MAX_USAGE_ENTRIES, MINIMUM_NUMBER_OF_TRANSACTIONS_LARGE_STORAGE, + get_current_hour, IOError, PaymentHour, Usage, UsageTrackerStorage, MAX_USAGE_ENTRIES, + MINIMUM_NUMBER_OF_TRANSACTIONS_LARGE_STORAGE, }; - use althea_types::Identity; + use crate::RitaCommonError; + use althea_types::{convert_map_to_flat_usage_data, Identity, UnpublishedPaymentTx}; use flate2::write::ZlibEncoder; use flate2::Compression; use rand::{thread_rng, Rng}; use settings::client::RitaClientSettings; use settings::{get_rita_common, set_rita_client, set_rita_common}; - use std::collections::VecDeque; + use std::collections::{HashMap, HashSet, VecDeque}; use std::convert::TryInto; use std::fs::File; use std::io::Write; #[cfg(test)] - impl UsageTracker { - // previous implementation of save which uses serde_json to serialize - fn save2(&self) -> Result<(), IOError> { - let serialized = serde_json::to_vec(self)?; + impl UsageTrackerStorage { + // previous implementation of save which uses the old struct to serialize + fn save2(&self) -> Result<(), RitaCommonError> { + let old_struct = UsageTrackerStorageOld { + last_save_hour: self.last_save_hour, + client_bandwidth: convert_map_to_flat_usage_data(self.client_bandwidth.clone()), + relay_bandwidth: convert_map_to_flat_usage_data(self.relay_bandwidth.clone()), + exit_bandwidth: convert_map_to_flat_usage_data(self.exit_bandwidth.clone()), + payments: convert_payment_set_to_payment_hour(self.payments.clone()), + }; + let serialized = bincode::serialize(&old_struct)?; let mut file = File::create(settings::get_rita_common().network.usage_tracker_file)?; let buffer: Vec = Vec::new(); let mut encoder = ZlibEncoder::new(buffer, Compression::default()); encoder.write_all(&serialized)?; let compressed_bytes = encoder.finish()?; - file.write_all(&compressed_bytes) + Ok(file.write_all(&compressed_bytes)?) } } @@ -42,7 +53,7 @@ pub mod test { let res = dummy_usage_tracker.save(); // saving to bincode with the new method info!("Saving test data: {:?}", res); - let res2 = UsageTracker::load_from_disk(); + let res2 = UsageTrackerStorage::load_from_disk(); info!("Loading test data: {:?}", res2); assert_eq!(dummy_usage_tracker, res2); @@ -59,30 +70,22 @@ pub mod test { let rset = RitaClientSettings::new("../settings/test.toml").unwrap(); set_rita_client(rset); let mut newrc = get_rita_common(); - newrc.network.usage_tracker_file = "/tmp/usage_tracker.json".to_string(); + newrc.network.usage_tracker_file = "/tmp/usage_tracker.bincode".to_string(); set_rita_common(newrc); info!("Generating large usage tracker history"); let dummy_usage_tracker = generate_dummy_usage_tracker(); - info!("Saving test data as json"); + info!("Saving test data in old format"); dummy_usage_tracker.save2().unwrap(); - // using load_from_disk() with usage_tracker_file set to a .json writes bincode - // serialized data to a .json extended file, but because load_from_disk() deletes - // the .json file, this test ends with no file left. - info!("Loading test data from json"); - let mut res2 = UsageTracker::load_from_disk(); - - // setting the usage_tracker_file to .bincode, which is what this upgrade expects - let mut newrc2 = get_rita_common(); - newrc2.network.usage_tracker_file = "/tmp/usage_tracker.bincode".to_string(); - set_rita_common(newrc2); + info!("Loading test data from oldformat"); + let mut res2 = UsageTrackerStorage::load_from_disk(); // Saving res2 with the new save() and updated usage_tracker_file in order to end with - // a .bincode file from the loaded json data saved to res2. + // a .bincode file from the old format bincode file. res2.save().unwrap(); info!("Saving test data as bincode"); - let res4 = UsageTracker::load_from_disk(); + let res4 = UsageTrackerStorage::load_from_disk(); info!("Loading test data from bincode"); // use == to avoid printing out the compared data @@ -91,9 +94,9 @@ pub mod test { assert!(res2 == res4); } // generates a nontrivial usage tracker struct for testing - pub fn generate_dummy_usage_tracker() -> UsageTracker { + pub fn generate_dummy_usage_tracker() -> UsageTrackerStorage { let current_hour = get_current_hour().unwrap(); - UsageTracker { + UsageTrackerStorage { last_save_hour: current_hour, client_bandwidth: generate_bandwidth(current_hour), relay_bandwidth: generate_bandwidth(current_hour), @@ -102,34 +105,35 @@ pub mod test { } } // generates dummy usage hour data randomly - fn generate_bandwidth(starting_hour: u64) -> VecDeque { + fn generate_bandwidth(starting_hour: u64) -> HashMap { let num_to_generate: u16 = thread_rng() .gen_range(50..MAX_USAGE_ENTRIES) .try_into() .unwrap(); - let mut output = VecDeque::new(); + let mut output = HashMap::new(); for i in 0..num_to_generate { - output.push_front(UsageHour { - index: starting_hour - i as u64, - up: rand::random(), - down: rand::random(), - price: rand::random(), - }); + output.insert( + i as u64, + Usage { + up: rand::random(), + down: rand::random(), + price: rand::random(), + }, + ); } output } // generates dummy payment data randomly - fn generate_payments(starting_hour: u64) -> VecDeque { + fn generate_payments(starting_hour: u64) -> HashSet { let mut num_to_generate: u8 = rand::random(); while (num_to_generate as usize) < MINIMUM_NUMBER_OF_TRANSACTIONS_LARGE_STORAGE { num_to_generate = rand::random(); } let our_id = random_identity(); let neighbor_ids = get_neighbor_ids(); - let mut output = VecDeque::new(); + let mut output = HashSet::new(); for i in 0..num_to_generate { let num_payments_generate: u8 = rand::random(); - let mut payments = Vec::new(); for _ in 0..num_payments_generate { let neighbor_idx: u8 = rand::random(); let amount: u128 = rand::random(); @@ -140,17 +144,14 @@ pub mod test { (neighbor_ids[neighbor_idx as usize], our_id) }; let txid: u128 = rand::random(); - payments.push(FormattedPaymentTx { + output.insert(UsageTrackerPayment { to, from, amount: amount.into(), - txid: txid.to_string(), - }) + txid: txid.into(), + index: starting_hour - i as u64, + }); } - output.push_front(PaymentHour { - index: starting_hour - i as u64, - payments, - }); } output } diff --git a/rita_exit/src/traffic_watcher/mod.rs b/rita_exit/src/traffic_watcher/mod.rs index 3eeb832a4..4c98ae7a7 100644 --- a/rita_exit/src/traffic_watcher/mod.rs +++ b/rita_exit/src/traffic_watcher/mod.rs @@ -21,9 +21,9 @@ use babel_monitor::structs::Route; use ipnetwork::IpNetwork; use rita_common::debt_keeper::traffic_update; use rita_common::debt_keeper::Traffic; +use rita_common::usage_tracker::structs::UsageType; use rita_common::usage_tracker::update_usage_data; use rita_common::usage_tracker::UpdateUsage; -use rita_common::usage_tracker::UsageType; use std::collections::HashMap; use std::net::IpAddr; From b620a1675e809ff277f2fb1aaf93d63aed3aa77a Mon Sep 17 00:00:00 2001 From: Justin Kilpatrick Date: Mon, 18 Sep 2023 14:25:39 -0400 Subject: [PATCH 2/3] Add instant throughput data to usage tracker This will be helpful in allowing us to show the current live usage of a router. --- althea_types/src/interop.rs | 8 +- rita_client/src/operator_update/mod.rs | 6 +- rita_common/src/usage_tracker/mod.rs | 104 ++++++++++++++++++----- rita_common/src/usage_tracker/structs.rs | 19 +++++ 4 files changed, 115 insertions(+), 22 deletions(-) diff --git a/althea_types/src/interop.rs b/althea_types/src/interop.rs index deeceac8f..fc8a92d50 100644 --- a/althea_types/src/interop.rs +++ b/althea_types/src/interop.rs @@ -622,7 +622,7 @@ pub struct OperatorUpdateMessage { pub operator_action: Option, /// String that holds the download link to the latest firmware release /// When a user hits 'update router', it updates to this version - /// to be removed once all routesr are updated to >= beta 19 rc9 + /// to be removed once all routers are updated to >= beta 19 rc9 pub local_update_instruction: Option, /// String that holds the download link to the latest firmware release /// When a user hits 'update router', it updates to this version @@ -754,6 +754,12 @@ pub struct OperatorCheckinMessage { /// saved usage hour is the same as the ops last seen, we send no data here as we are up /// to date. Data sent through here gets added to a database entry for each device. pub user_bandwidth_usage_v2: Option, + /// Current client data usage in mbps computed as the last input to the usage tracker + /// so an average of around 5-10 seconds + pub client_mbps: Option, + /// Curent relay data usage in mbps, coputed as the last input to the usage tracker + /// so an average of around 5-10 seconds + pub relay_mbps: Option, /// This is to keep track of the rita client uptime for debugging purposes /// In the event something whacko happens, serde will magically derive def- /// fault value. diff --git a/rita_client/src/operator_update/mod.rs b/rita_client/src/operator_update/mod.rs index 74cd59bd9..d27197338 100644 --- a/rita_client/src/operator_update/mod.rs +++ b/rita_client/src/operator_update/mod.rs @@ -20,8 +20,8 @@ use num256::Uint256; use rita_common::rita_loop::is_gateway; use rita_common::tunnel_manager::neighbor_status::get_neighbor_status; use rita_common::tunnel_manager::shaping::flag_reset_shaper; -use rita_common::usage_tracker::structs::UsageType::{Client, Relay}; -use rita_common::usage_tracker::{get_current_hour, get_usage_data_map}; +use rita_common::usage_tracker::structs::UsageType::{self, Client, Relay}; +use rita_common::usage_tracker::{get_current_hour, get_current_throughput, get_usage_data_map}; use rita_common::utils::option_convert; use rita_common::DROPBEAR_AUTHORIZED_KEYS; use rita_common::KI; @@ -161,6 +161,8 @@ pub async fn operator_update( rita_uptime: RITA_UPTIME.elapsed(), user_bandwidth_usage: None, user_bandwidth_usage_v2: prepare_usage_data_for_upload(ops_last_seen_usage_hour)?, + client_mbps: get_current_throughput(UsageType::Client), + relay_mbps: get_current_throughput(UsageType::Relay), }) .await; diff --git a/rita_common/src/usage_tracker/mod.rs b/rita_common/src/usage_tracker/mod.rs index 9fbf3bd98..3ba8226fb 100644 --- a/rita_common/src/usage_tracker/mod.rs +++ b/rita_common/src/usage_tracker/mod.rs @@ -26,6 +26,7 @@ use std::io::Write; use std::path::Path; use std::sync::Arc; use std::sync::RwLock; +use std::time::Instant; use std::time::SystemTime; use std::time::UNIX_EPOCH; use std::usize; @@ -51,15 +52,37 @@ pub const MINIMUM_NUMBER_OF_TRANSACTIONS_SMALL_STORAGE: usize = 75; /// The maximum amount of usage data that may be unsaved before we save out to the disk pub const MAX_UNSAVED_USAGE: u64 = 10 * 1000u64.pow(3); +/// Just a storage wrapper for the usage tracker lazy static, to wrap the persisted data +/// (usage tracker) and the non persistated data (throughput tracker) without using raw tuple +pub struct UsageTrackerWrapper { + usage_tracker: UsageTrackerStorage, + throughtput_tracker: ThroughputTracker, +} + +impl Default for UsageTrackerWrapper { + fn default() -> Self { + Self::new() + } +} + +impl UsageTrackerWrapper { + pub fn new() -> UsageTrackerWrapper { + UsageTrackerWrapper { + usage_tracker: UsageTrackerStorage::load_from_disk(), + throughtput_tracker: ThroughputTracker::default(), + } + } +} + lazy_static! { - static ref USAGE_TRACKER_STORAGE: Arc> = - Arc::new(RwLock::new(UsageTrackerStorage::load_from_disk())); + static ref USAGE_TRACKER_STORAGE: Arc> = + Arc::new(RwLock::new(UsageTrackerWrapper::new())); } /// Utility function that grabs usage tracker from it's lock and /// saves it out. Should be called when we want to save anywhere outside this file pub fn save_usage_to_disk() { - match USAGE_TRACKER_STORAGE.write().unwrap().save() { + match USAGE_TRACKER_STORAGE.write().unwrap().usage_tracker.save() { Ok(_val) => info!("Saved usage tracker successfully"), Err(e) => warn!("Unable to save usage tracker {:}", e), }; @@ -320,7 +343,34 @@ pub fn update_usage_data(msg: UpdateUsage) { let mut usage_tracker = USAGE_TRACKER_STORAGE.write().unwrap(); - usage_tracker.process_usage_update(curr_hour, msg); + usage_tracker + .usage_tracker + .process_usage_update(curr_hour, msg); + usage_tracker.throughtput_tracker.process_usage_update(msg); +} + +impl ThroughputTracker { + fn process_usage_update(&mut self, msg: UpdateUsage) { + let data = match msg.kind { + UsageType::Client => &mut self.client, + UsageType::Relay => &mut self.relay, + UsageType::Exit => &mut self.exit, + }; + if let Some(last_sample_time) = data.last_sample_time { + // make sure we don't panic if the clock goes backwards + if Instant::now() > last_sample_time { + data.duration_of_this_sample = Instant::now() - last_sample_time + } + } + data.last_sample_time = Some(Instant::now()); + // for clients all traffic is measured for relays the up and down values + // will always be double the actual throughput since we're pulling in and forwarding + // the same data at once. + match msg.kind { + UsageType::Client => data.bytes_used = msg.up + msg.down, + UsageType::Relay | UsageType::Exit => data.bytes_used = msg.up, + } + } } impl UsageTrackerStorage { @@ -360,7 +410,7 @@ impl UsageTrackerStorage { } pub fn update_payments(payment: PaymentTx) { - let history = &mut (USAGE_TRACKER_STORAGE.write().unwrap()); + let mut history = USAGE_TRACKER_STORAGE.write().unwrap(); // This handles the following edge case: // Router A is paying router B. Router B reboots and loses all data in @@ -368,12 +418,12 @@ pub fn update_payments(payment: PaymentTx) { // already been accounted for get counted twice. // This checks the usage history to see if this tx exists // thereby preventing the above case. - if history.get_txids().contains(&payment.txid) { + if history.usage_tracker.get_txids().contains(&payment.txid) { error!("Tried to insert duplicate txid into usage tracker!"); return; } - history.handle_payments(&payment); + history.usage_tracker.handle_payments(&payment); } impl UsageTrackerStorage { @@ -408,38 +458,54 @@ impl UsageTrackerStorage { } } +/// Returns current throughput in bytes per second, or none if it is not yet available +pub fn get_current_throughput(kind: UsageType) -> Option { + let usage_tracker_var = USAGE_TRACKER_STORAGE.read().unwrap(); + let data = match kind { + UsageType::Client => usage_tracker_var.throughtput_tracker.client, + UsageType::Relay => usage_tracker_var.throughtput_tracker.relay, + UsageType::Exit => usage_tracker_var.throughtput_tracker.exit, + }; + if data.last_sample_time.is_some() && data.duration_of_this_sample.as_secs() > 0 { + Some(data.bytes_used / data.duration_of_this_sample.as_secs()) + } else { + // no data yet + None + } +} + /// Gets usage data for this router, stored on the local disk at periodic intervals pub fn get_usage_data_map(kind: UsageType) -> HashMap { - let usage_tracker_var = &*(USAGE_TRACKER_STORAGE.write().unwrap()); + let usage_tracker_var = USAGE_TRACKER_STORAGE.read().unwrap(); match kind { - UsageType::Client => usage_tracker_var.client_bandwidth.clone(), - UsageType::Relay => usage_tracker_var.relay_bandwidth.clone(), - UsageType::Exit => usage_tracker_var.exit_bandwidth.clone(), + UsageType::Client => usage_tracker_var.usage_tracker.client_bandwidth.clone(), + UsageType::Relay => usage_tracker_var.usage_tracker.relay_bandwidth.clone(), + UsageType::Exit => usage_tracker_var.usage_tracker.exit_bandwidth.clone(), } } /// Gets usage data for this router, stored on the local disk at periodic intervals pub fn get_usage_data(kind: UsageType) -> VecDeque { - let usage_tracker_var = &*(USAGE_TRACKER_STORAGE.write().unwrap()); + let usage_tracker_var = USAGE_TRACKER_STORAGE.read().unwrap(); let data = match kind { - UsageType::Client => usage_tracker_var.client_bandwidth.clone(), - UsageType::Relay => usage_tracker_var.relay_bandwidth.clone(), - UsageType::Exit => usage_tracker_var.exit_bandwidth.clone(), + UsageType::Client => usage_tracker_var.usage_tracker.client_bandwidth.clone(), + UsageType::Relay => usage_tracker_var.usage_tracker.relay_bandwidth.clone(), + UsageType::Exit => usage_tracker_var.usage_tracker.exit_bandwidth.clone(), }; convert_map_to_flat_usage_data(data) } /// Gets the last saved usage hour from the existing usage tracker pub fn get_last_saved_usage_hour() -> u64 { - let usage_tracker = &*(USAGE_TRACKER_STORAGE.read().unwrap()); - usage_tracker.last_save_hour + let usage_tracker = USAGE_TRACKER_STORAGE.read().unwrap(); + usage_tracker.usage_tracker.last_save_hour } /// Gets payment data for this router, stored on the local disk at periodic intervals pub fn get_payments_data() -> VecDeque { - let usage_tracker_var = &*(USAGE_TRACKER_STORAGE.read().unwrap()); - convert_payment_set_to_payment_hour(usage_tracker_var.payments.clone()) + let usage_tracker_var = USAGE_TRACKER_STORAGE.read().unwrap(); + convert_payment_set_to_payment_hour(usage_tracker_var.usage_tracker.payments.clone()) } /// On an interupt (SIGTERM), saving USAGE_TRACKER before exiting, this is essentially diff --git a/rita_common/src/usage_tracker/structs.rs b/rita_common/src/usage_tracker/structs.rs index bef1862f4..36d8f3ebe 100644 --- a/rita_common/src/usage_tracker/structs.rs +++ b/rita_common/src/usage_tracker/structs.rs @@ -2,6 +2,25 @@ use althea_types::{Identity, IndexedUsageHour, PaymentTx, Usage}; use num256::Uint256; use std::collections::{HashMap, HashSet, VecDeque}; use std::hash::Hash; +use std::time::{Duration, Instant}; + +/// This struct is used to estimate the current throughput of a given router, it is not saved out to the disk +#[derive(Clone, Copy, Default)] +pub struct ThroughputTracker { + pub client: ThroughputData, + pub relay: ThroughputData, + pub exit: ThroughputData, +} + +#[derive(Clone, Copy, Default)] +pub struct ThroughputData { + /// When the current data in this struct was last updated + pub last_sample_time: Option, + /// the duration between the previous and current value of last_sample_time + /// used to convert the bytes used above into a rate per second + pub duration_of_this_sample: Duration, + pub bytes_used: u64, +} /// A struct for tracking each hours of payments indexed in hours since unix epoch /// used to send to the frontend From db534d9d621c0b4511cfbfd452478241b2986f15 Mon Sep 17 00:00:00 2001 From: Justin Kilpatrick Date: Mon, 18 Sep 2023 15:24:12 -0400 Subject: [PATCH 3/3] Bump for Postgresql 16 --- integration_tests/src/setup_utils/database.rs | 4 ++-- legacy_integration_tests/container/Dockerfile | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/integration_tests/src/setup_utils/database.rs b/integration_tests/src/setup_utils/database.rs index e2068e1a2..5acf7851c 100644 --- a/integration_tests/src/setup_utils/database.rs +++ b/integration_tests/src/setup_utils/database.rs @@ -13,8 +13,8 @@ use std::{ /// Starts the exit postgres instance in the native system namespace, TODO insert plumbing so that exits can reach it pub fn start_postgres() { const POSTGRES_USER: &str = "postgres"; - const POSTGRES_BIN: &str = "/usr/lib/postgresql/15/bin/postgres"; - const INITDB_BIN: &str = "/usr/lib/postgresql/15/bin/initdb"; + const POSTGRES_BIN: &str = "/usr/lib/postgresql/16/bin/postgres"; + const INITDB_BIN: &str = "/usr/lib/postgresql/16/bin/initdb"; // for this test script const DB_URL_LOCAL: &str = "postgres://postgres@127.0.0.1/test"; // for the rita exit instances diff --git a/legacy_integration_tests/container/Dockerfile b/legacy_integration_tests/container/Dockerfile index 8190f9b93..c8b844a10 100644 --- a/legacy_integration_tests/container/Dockerfile +++ b/legacy_integration_tests/container/Dockerfile @@ -5,8 +5,8 @@ RUN apt-get install -y python3-termcolor python3-toml python3-networkx python3-m RUN curl https://sh.rustup.rs -sSf | sh -s -- -y RUN PATH=$PATH:$HOME/.cargo/bin cargo install diesel_cli --force ENV POSTGRES_USER=postgres -ENV POSTGRES_BIN=/usr/lib/postgresql/15/bin/postgres -ENV INITDB_BIN=/usr/lib/postgresql/15/bin/initdb +ENV POSTGRES_BIN=/usr/lib/postgresql/16/bin/postgres +ENV INITDB_BIN=/usr/lib/postgresql/16/bin/initdb ARG NODES ENV SPEEDTEST_THROUGHPUT=$SPEEDTEST_THROUGHPUT ENV SPEEDTEST_DURATION=$SPEEDTEST_DURATION