Skip to content

Commit

Permalink
Merge branch 'master' into pranay/refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
ptulugu authored Sep 20, 2023
2 parents 97baa7c + d165efc commit e4a7ce6
Show file tree
Hide file tree
Showing 18 changed files with 695 additions and 576 deletions.
18 changes: 13 additions & 5 deletions althea_types/src/interop.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{contact_info::ContactType, wg_key::WgKey, BillingDetails, InstallationDetails};
use crate::{ClientExtender, UsageTracker, WifiDevice};
use crate::{ClientExtender, UsageTrackerFlat, UsageTrackerTransfer, WifiDevice};
use arrayvec::ArrayString;
use babel_monitor::structs::Neighbor;
use babel_monitor::structs::Route;
Expand Down Expand Up @@ -335,7 +335,7 @@ pub struct LightClientLocalIdentity {
/// This represents a generic payment that may be to or from us
/// it contains a txid from a published transaction
/// that should be validated against the blockchain
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone)]
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone, Copy)]
pub struct PaymentTx {
pub to: Identity,
pub from: Identity,
Expand All @@ -353,7 +353,7 @@ impl Hash for PaymentTx {

/// This represents a generic payment that may be to or from us, it does not contain a txid meaning it is
/// unpublished
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Hash, Clone)]
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Hash, Clone, Copy)]
pub struct UnpublishedPaymentTx {
pub to: Identity,
pub from: Identity,
Expand Down Expand Up @@ -622,7 +622,7 @@ pub struct OperatorUpdateMessage {
pub operator_action: Option<OperatorAction>,
/// String that holds the download link to the latest firmware release
/// When a user hits 'update router', it updates to this version
/// to be removed once all routesr are updated to >= beta 19 rc9
/// to be removed once all routers are updated to >= beta 19 rc9
pub local_update_instruction: Option<UpdateTypeLegacy>,
/// String that holds the download link to the latest firmware release
/// When a user hits 'update router', it updates to this version
Expand Down Expand Up @@ -747,11 +747,19 @@ pub struct OperatorCheckinMessage {
/// This is a user set bandwidth limit value, it will cap the users download
/// and upload to the provided value of their choosing. Denoted in mbps
pub user_bandwidth_limit: Option<usize>,
/// Legacy bandwidth usage from pre beta 20 routers, one of the two will be None
pub user_bandwidth_usage: Option<UsageTrackerFlat>,
/// Details of both the Client and Relay bandwidth usage over a given period determined
/// by the ops_last_seen_usage_hour in OperatorUpdateMessage. When the device's last
/// saved usage hour is the same as the ops last seen, we send no data here as we are up
/// to date. Data sent through here gets added to a database entry for each device.
pub user_bandwidth_usage: Option<UsageTracker>,
pub user_bandwidth_usage_v2: Option<UsageTrackerTransfer>,
/// Current client data usage in mbps computed as the last input to the usage tracker
/// so an average of around 5-10 seconds
pub client_mbps: Option<u64>,
/// Curent relay data usage in mbps, coputed as the last input to the usage tracker
/// so an average of around 5-10 seconds
pub relay_mbps: Option<u64>,
/// This is to keep track of the rita client uptime for debugging purposes
/// In the event something whacko happens, serde will magically derive def-
/// fault value.
Expand Down
114 changes: 98 additions & 16 deletions althea_types/src/user_info.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::collections::VecDeque;
use std::hash::{Hash, Hasher};
use std::collections::{HashMap, VecDeque};
use std::net::Ipv4Addr;
use std::time::SystemTime;

Expand Down Expand Up @@ -64,33 +63,116 @@ pub struct InstallationDetails {
pub install_date: Option<SystemTime>,
}

/// The main actor that holds the usage state for the duration of operations
/// to be sent up to ops tools.
/// The old storage method for usage tracker data that stores flat data
/// in arrays and does not index the data via hashmap this format was abandoned
/// as error prone but is still used so legacy routers can send data
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct UsageTracker {
pub struct UsageTrackerFlat {
pub last_save_hour: u64,
pub client_bandwidth: VecDeque<UsageHour>,
pub relay_bandwidth: VecDeque<UsageHour>,
pub client_bandwidth: VecDeque<IndexedUsageHour>,
pub relay_bandwidth: VecDeque<IndexedUsageHour>,
}

/// A struct for tracking each hour of usage, indexed by time in hours since
/// the unix epoch.
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
pub struct UsageHour {
#[derive(Clone, Copy, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub struct IndexedUsageHour {
pub index: u64,
pub up: u64,
pub down: u64,
pub price: u32,
}

impl Hash for UsageHour {
fn hash<H: Hasher>(&self, state: &mut H) {
self.index.hash(state);
/// A struct used to store data usage over an arbitrary period the length of time
/// is implied by the code that is handling this struct. Do not transfer without considering
/// that you may be changing units
#[derive(Clone, Copy, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub struct Usage {
pub up: u64,
pub down: u64,
pub price: u32,
}

/// The main actor that holds the usage state for the duration of operations
/// to be sent up to ops tools.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct UsageTrackerTransfer {
/// client bandwidth usage per hour indexd by unix timestamp in hours
pub client_bandwidth: HashMap<u64, Usage>,
/// relay bandwidth usage per hour indexd by unix timestamp in hours
pub relay_bandwidth: HashMap<u64, Usage>,
/// exit bandwidth usage per hour indexd by unix timestamp in hours
pub exit_bandwidth: HashMap<u64, Usage>,
}

impl UsageTrackerTransfer {
/// gets the greatest index currently stored in this struct which represents
/// the last saved usage hour
pub fn last_save_hour(&self) -> u64 {
let mut highest = 0;
let iter = vec![
&self.client_bandwidth,
&self.relay_bandwidth,
&self.exit_bandwidth,
];
for data in iter {
for i in data.keys() {
highest = highest.max(*i);
}
}
highest
}
}

impl From<UsageTrackerFlat> for UsageTrackerTransfer {
fn from(value: UsageTrackerFlat) -> Self {
UsageTrackerTransfer {
client_bandwidth: convert_flat_to_map_usage_data(value.client_bandwidth),
relay_bandwidth: convert_flat_to_map_usage_data(value.relay_bandwidth),
exit_bandwidth: HashMap::new(),
}
}
}
impl PartialEq for UsageHour {
fn eq(&self, other: &Self) -> bool {
self.index == other.index

/// Used to convert between usage tracker storage formats
pub fn convert_flat_to_map_usage_data(input: VecDeque<IndexedUsageHour>) -> HashMap<u64, Usage> {
let mut out = HashMap::new();
for hour in input {
match out.get_mut(&hour.index) {
// we have a duplicate entry which we must correct, pick the higher data usage and keep that
Some(to_edit) => {
let duplicate_usage: Usage = *to_edit;
to_edit.up = std::cmp::max(duplicate_usage.up, hour.up);
to_edit.down = std::cmp::max(duplicate_usage.down, hour.down);
to_edit.price = std::cmp::max(duplicate_usage.price, hour.price);
}
None => {
out.insert(
hour.index,
Usage {
up: hour.up,
down: hour.down,
price: hour.price,
},
);
}
}
}
out
}

/// Used to convert between usage tracker storage formats
pub fn convert_map_to_flat_usage_data(input: HashMap<u64, Usage>) -> VecDeque<IndexedUsageHour> {
let mut out = VecDeque::new();
for (hour, usage) in input {
out.push_back(IndexedUsageHour {
index: hour,
up: usage.up,
down: usage.down,
price: usage.price,
})
}
// we want this sorted from greatest to least so we do the cmp in reverse order
out.make_contiguous().sort_by(|a, b| b.index.cmp(&a.index));
out
}
impl Eq for UsageHour {}
4 changes: 2 additions & 2 deletions integration_tests/src/setup_utils/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ use std::{
/// Starts the exit postgres instance in the native system namespace, TODO insert plumbing so that exits can reach it
pub fn start_postgres() {
const POSTGRES_USER: &str = "postgres";
const POSTGRES_BIN: &str = "/usr/lib/postgresql/15/bin/postgres";
const INITDB_BIN: &str = "/usr/lib/postgresql/15/bin/initdb";
const POSTGRES_BIN: &str = "/usr/lib/postgresql/16/bin/postgres";
const INITDB_BIN: &str = "/usr/lib/postgresql/16/bin/initdb";
// for this test script
const DB_URL_LOCAL: &str = "postgres://[email protected]/test";
// for the rita exit instances
Expand Down
4 changes: 2 additions & 2 deletions legacy_integration_tests/container/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ RUN apt-get install -y python3-termcolor python3-toml python3-networkx python3-m
RUN curl https://sh.rustup.rs -sSf | sh -s -- -y
RUN PATH=$PATH:$HOME/.cargo/bin cargo install diesel_cli --force
ENV POSTGRES_USER=postgres
ENV POSTGRES_BIN=/usr/lib/postgresql/15/bin/postgres
ENV INITDB_BIN=/usr/lib/postgresql/15/bin/initdb
ENV POSTGRES_BIN=/usr/lib/postgresql/16/bin/postgres
ENV INITDB_BIN=/usr/lib/postgresql/16/bin/initdb
ARG NODES
ENV SPEEDTEST_THROUGHPUT=$SPEEDTEST_THROUGHPUT
ENV SPEEDTEST_DURATION=$SPEEDTEST_DURATION
Expand Down
2 changes: 1 addition & 1 deletion rita_client/src/dashboard/usage.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use actix_web_async::{HttpRequest, HttpResponse};
use rita_common::usage_tracker::get_usage_data;
use rita_common::usage_tracker::UsageType;
use rita_common::usage_tracker::structs::UsageType;

pub async fn get_client_usage(_req: HttpRequest) -> HttpResponse {
trace!("/usage/client hit");
Expand Down
22 changes: 14 additions & 8 deletions rita_client/src/exit_manager/exit_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,16 +78,22 @@ pub fn start_exit_manager_loop() {
}
};

// Run this ping test every PING_TEST_SPEED seconds
if Instant::now() - em_state.last_connection_time > PING_TEST_SPEED {
if run_ping_test() {
em_state.last_connection_time = Instant::now();
} else {
// If this router has been in a bad state for >10 mins, reboot
if (Instant::now() - em_state.last_connection_time) > REBOOT_TIMEOUT {
let _res = KI.run_command("reboot", &[]);
// Run ping test only when we are registered to prevent constant reboots
if let ExitState::Registered { .. } = exit.info {
// Run this ping test every PING_TEST_SPEED seconds
if Instant::now() - em_state.last_connection_time > PING_TEST_SPEED {
if run_ping_test() {
em_state.last_connection_time = Instant::now();
} else {
// If this router has been in a bad state for >10 mins, reboot
if (Instant::now() - em_state.last_connection_time) > REBOOT_TIMEOUT {
let _res = KI.run_command("reboot", &[]);
}
}
}
} else {
// reset our reboot timer every tick if not registered
em_state.last_connection_time = Instant::now();
}

// Get cluster exit list. This is saved locally and updated every tick depending on what exit we connect to.
Expand Down
Loading

0 comments on commit e4a7ce6

Please sign in to comment.