From 2b00bef9e745884a6ab9ae508c8b1b69e86c6a0c Mon Sep 17 00:00:00 2001 From: mmynk Date: Tue, 30 Jan 2024 15:50:44 -0600 Subject: [PATCH 01/20] Add sub-module for reading `tc` stats --- below/tc/Cargo.toml | 13 +++ below/tc/src/errors.rs | 13 +++ below/tc/src/lib.rs | 111 +++++++++++++++++++++++++ below/tc/src/test.rs | 122 +++++++++++++++++++++++++++ below/tc/src/types.rs | 182 +++++++++++++++++++++++++++++++++++++++++ 5 files changed, 441 insertions(+) create mode 100644 below/tc/Cargo.toml create mode 100644 below/tc/src/errors.rs create mode 100644 below/tc/src/lib.rs create mode 100644 below/tc/src/test.rs create mode 100644 below/tc/src/types.rs diff --git a/below/tc/Cargo.toml b/below/tc/Cargo.toml new file mode 100644 index 00000000..5287437a --- /dev/null +++ b/below/tc/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "below-tc" +version = "0.0.1" +edition = "2021" + +[dependencies] +netlink-packet-core = "0.7.0" +netlink-packet-route = "0.19.0" +netlink-packet-utils = "0.5.2" +netlink-sys = "0.8.5" +nix = { version = "0.27.1", features = ["net"] } +thiserror = "1.0" +serde = { version = "1.0", features = ["derive", "rc"] } diff --git a/below/tc/src/errors.rs b/below/tc/src/errors.rs new file mode 100644 index 00000000..1d4a0f74 --- /dev/null +++ b/below/tc/src/errors.rs @@ -0,0 +1,13 @@ +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum TcError { + #[error("Netlink error: {0}")] + Netlink(String), + + #[error("Read interfaces error: {0}")] + ReadInterfaces(String), + + #[error("Failed to read tc stats: {0}")] + Read(String), +} diff --git a/below/tc/src/lib.rs b/below/tc/src/lib.rs new file mode 100644 index 00000000..beae7a31 --- /dev/null +++ b/below/tc/src/lib.rs @@ -0,0 +1,111 @@ +mod errors; +mod types; + +#[cfg(test)] +mod test; + +use std::collections::BTreeMap; + +use netlink_packet_core::{NetlinkHeader, NetlinkMessage, NetlinkPayload, NLM_F_DUMP, NLM_F_REQUEST}; +use netlink_packet_route::RouteNetlinkMessage; +use netlink_packet_route::tc::TcMessage; +use netlink_sys::constants::NETLINK_ROUTE; +use netlink_sys::{Socket, SocketAddr}; +use nix::net::if_; + +pub use errors::*; +pub use types::*; + +pub type TcStats = Vec; +pub type Result = std::result::Result; + +/// Get list of all `tc` qdiscs. +pub fn tc_stats() -> Result { + let ifaces = get_interfaces()?; + read_tc_stats(ifaces, &get_netlink_qdiscs) +} + +fn read_tc_stats(interfaces: BTreeMap, netlink_retriever: &dyn Fn() -> Result>) -> Result { + let messages = netlink_retriever()?; + let tc_stats = messages + .into_iter() + .map(|msg| Tc::new(&interfaces, &msg)) + .collect(); + + Ok(tc_stats) +} + +/// Open a netlink socket to retrieve `tc` qdiscs. +/// The underlying library sends a message of type `RTM_GETQDISC` to the kernel. +/// The kernel responds with a message of type `RTM_NEWQDISC` for each qdisc. +fn get_netlink_qdiscs() -> Result> { + // open a socket + let socket = Socket::new(NETLINK_ROUTE).map_err(|e| TcError::Netlink(e.to_string()))?; + socket.connect(&SocketAddr::new(0, 0)).map_err(|e| TcError::Netlink(e.to_string()))?; + + // create a netlink request + let mut nl_hdr = NetlinkHeader::default(); + nl_hdr.flags = NLM_F_REQUEST | NLM_F_DUMP; + let msg = RouteNetlinkMessage::GetQueueDiscipline(TcMessage::default()); + let mut packet = NetlinkMessage::new(nl_hdr, NetlinkPayload::from(msg)); + packet.finalize(); + let mut buf = vec![0; packet.header.length as usize]; + packet.serialize(&mut buf[..]); + + // send the request + socket.send(&buf[..], 0).map_err(|e| TcError::Netlink(e.to_string()))?; + + // receive the response + let mut recv_buf = vec![0; 4096]; + let mut offset = 0; + let mut response = Vec::new(); + 'out: while let Ok(size) = socket.recv(&mut &mut recv_buf[offset..], 0) { + loop { + let bytes = &recv_buf[offset..]; + let rx_packet = >::deserialize(bytes) + .map_err(|e| TcError::Netlink(e.to_string()))?; + response.push(rx_packet.clone()); + let payload = rx_packet.payload; + if let NetlinkPayload::Error(err) = payload { + return Err(TcError::Netlink(err.to_string())); + } + if let NetlinkPayload::Done(_) = payload { + break 'out; + } + + offset += rx_packet.header.length as usize; + if offset == size || rx_packet.header.length == 0 { + offset = 0; + break; + } + } + } + + let mut tc_msgs = Vec::new(); + for msg in response { + if let NetlinkPayload::InnerMessage(RouteNetlinkMessage::NewQueueDiscipline(tc)) = msg.payload { + tc_msgs.push(tc); + } + } + + return Ok(tc_msgs); +} + +/// Get a map of interface index to interface name. +fn get_interfaces() -> Result> { + let ifaces = if_::if_nameindex().map_err(|e| TcError::ReadInterfaces(e.to_string()))?; + let if_map = ifaces + .iter() + .map(|iface| { + let index = iface.index(); + let name = if let Ok(name) = iface.name().to_str() { + name.to_string() + } else { + String::new() + }; + (index, name) + }) + .collect::>(); + + Ok(if_map) +} diff --git a/below/tc/src/test.rs b/below/tc/src/test.rs new file mode 100644 index 00000000..d5e0aa83 --- /dev/null +++ b/below/tc/src/test.rs @@ -0,0 +1,122 @@ +use netlink_packet_route::tc::{TcAttribute, TcFqCodelQdStats, TcFqCodelXstats, TcHandle, TcHeader, TcMessage, TcOption, TcQdiscFqCodelOption, TcStats, TcStats2, TcStatsBasic, TcStatsQueue, TcXstats}; + +use super::*; + +fn fake_netlink_qdiscs() -> Result> { + let mut tc_msgs = Vec::new(); + + let msg1 = TcMessage::from_parts( + TcHeader { + index: 2, + handle: TcHandle::from(0), + parent: TcHandle::from(2), + ..Default::default() + }, + vec![ + TcAttribute::Kind("fq_codel".to_string()), + TcAttribute::Options(vec![ + TcOption::FqCodel(TcQdiscFqCodelOption::Target(4999u32)), + TcOption::FqCodel(TcQdiscFqCodelOption::Limit(10240u32)), + TcOption::FqCodel(TcQdiscFqCodelOption::Interval(99999u32)), + TcOption::FqCodel(TcQdiscFqCodelOption::Ecn(1u32)), + TcOption::FqCodel(TcQdiscFqCodelOption::Flows(1024u32)), + TcOption::FqCodel(TcQdiscFqCodelOption::Quantum(1514u32)), + TcOption::FqCodel(TcQdiscFqCodelOption::CeThreshold(0u32)), + TcOption::FqCodel(TcQdiscFqCodelOption::DropBatchSize(64u32)), + TcOption::FqCodel(TcQdiscFqCodelOption::MemoryLimit(33554432u32)), + ]), + TcAttribute::Stats({ + let mut stats = TcStats::default(); + stats.bytes = 39902796u64; + stats.packets = 165687u32; + stats.drops = 100u32; + stats.overlimits = 200u32; + stats.bps = 300u32; + stats.pps = 400u32; + stats.qlen = 500u32; + stats.backlog = 10u32; + stats + }), + TcAttribute::Stats2(vec![ + TcStats2::Basic({ + let mut basic = TcStatsBasic::default(); + basic.bytes = 39902796u64; + basic.packets = 165687u32; + basic + }), + TcStats2::Queue({ + let mut queue = TcStatsQueue::default(); + queue.qlen = 500u32; + queue.drops = 100u32; + queue.requeues = 50u32; + queue.overlimits = 20u32; + queue + }), + ]), + TcAttribute::Xstats( + TcXstats::FqCodel( + TcFqCodelXstats::Qdisc({ + let mut fq_codel = TcFqCodelQdStats::default(); + fq_codel.maxpacket = 258; + fq_codel.drop_overlimit = 0; + fq_codel.ecn_mark = 0; + fq_codel.new_flow_count = 91; + fq_codel.new_flows_len = 0; + fq_codel.old_flows_len = 0; + fq_codel.ce_mark = 0; + fq_codel.memory_usage = 0; + fq_codel.drop_overmemory = 0; + fq_codel + } + ) + ) + ),], + ); + tc_msgs.push(msg1); + + Ok(tc_msgs) +} + +#[test] +fn test_tc_stats() { + let ifaces = BTreeMap::from_iter(vec![(2, "eth0".to_string())]); + let tc_map = read_tc_stats(ifaces, &fake_netlink_qdiscs).unwrap(); + + let tc = tc_map.get(0).unwrap(); + assert_eq!(tc.if_index, 2); + assert_eq!(tc.handle, 0); + assert_eq!(tc.parent, 2); + + assert_eq!(tc.kind, "fq_codel"); + assert_eq!(tc.stats.bytes, Some(39902796)); + assert_eq!(tc.stats.packets, Some(165687)); + assert_eq!(tc.stats.qlen, Some(500)); + assert_eq!(tc.stats.bps, Some(300)); + assert_eq!(tc.stats.pps, Some(400)); + + // qdisc + assert_eq!(tc.qdisc, Some(QDisc::FqCodel(FqCodelQDisc { + target: 4999, + limit: 10240, + interval: 99999, + ecn: 1, + flows: 1024, + quantum: 1514, + ce_threshold: 0, + drop_batch_size: 64, + memory_limit: 33554432, + }))); + + // xstats + assert_eq!(tc.stats.xstats, Some(XStats::FqCodel(FqCodelXStats { + maxpacket: 258, + drop_overlimit: 0, + ecn_mark: 0, + new_flow_count: 91, + new_flows_len: 0, + old_flows_len: 0, + ce_mark: 0, + memory_usage: 0, + drop_overmemory: 0, + }))); +} diff --git a/below/tc/src/types.rs b/below/tc/src/types.rs new file mode 100644 index 00000000..4e0461b1 --- /dev/null +++ b/below/tc/src/types.rs @@ -0,0 +1,182 @@ +use std::collections::BTreeMap; + +use netlink_packet_route::tc; +use netlink_packet_route::tc::{TcAttribute, TcFqCodelXstats, TcMessage, TcOption, TcQdiscFqCodelOption}; +use serde::{Deserialize, Serialize}; + +const FQ_CODEL: &str = "fq_codel"; + +/// `Tc` represents a traffic control qdisc. +#[derive(Default, Clone, PartialEq, Debug, Serialize, Deserialize)] +pub struct Tc { + pub if_index: u32, + pub if_name: String, + pub handle: u32, + pub parent: u32, + pub kind: String, + pub stats: Stats, + pub qdisc: Option, +} + +impl Tc { + pub fn new(interfaces: &BTreeMap, tc_msg: &TcMessage) -> Self { + let if_index = tc_msg.header.index as u32; + let if_name = interfaces.get(&if_index).map_or_else(String::new, |iface| iface.to_string()); + let mut tc = Self { + if_index, + if_name, + handle: tc_msg.header.handle.into(), + parent: tc_msg.header.parent.into(), + ..Default::default() + }; + let mut opts = Vec::new(); + + for attr in &tc_msg.attributes { + match attr { + TcAttribute::Kind(name) => tc.kind = name.clone(), + TcAttribute::Options(tc_opts) => opts = tc_opts.to_vec(), + TcAttribute::Stats(tc_stats) => { + tc.stats.bps = Some(tc_stats.bps); + tc.stats.pps = Some(tc_stats.pps); + } + TcAttribute::Stats2(tc_stats) => { + for stat in tc_stats { + match stat { + tc::TcStats2::Basic(basic) => { + tc.stats.bytes = Some(basic.bytes); + tc.stats.packets = Some(basic.packets); + } + tc::TcStats2::Queue(queue) => { + tc.stats.qlen = Some(queue.qlen); + tc.stats.backlog = Some(queue.backlog); + tc.stats.drops = Some(queue.drops); + tc.stats.requeues = Some(queue.requeues); + tc.stats.overlimits = Some(queue.overlimits); + } + _ => {} + } + } + } + TcAttribute::Xstats(tc_xstats) => { + match tc_xstats { + tc::TcXstats::FqCodel(fq_codel_xstats) => { + tc.stats.xstats = Some(XStats::FqCodel(FqCodelXStats::new(fq_codel_xstats))) + } + _ => {} + } + } + _ => {} + } + } + + + tc.qdisc = QDisc::new(&tc.kind, opts); + + tc + } +} + +/// `Stats` represents the statistics of a qdisc. +#[derive(Default, Clone, PartialEq, Debug, Serialize, Deserialize)] +pub struct Stats { + // Stats2::StatsBasic + pub bytes: Option, + pub packets: Option, + + // Stats2::StatsQueue + pub qlen: Option, + pub backlog: Option, + pub drops: Option, + pub requeues: Option, + pub overlimits: Option, + + // XStats + pub xstats: Option, + + pub bps: Option, + pub pps: Option, +} + +#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)] +pub enum QDisc { + FqCodel(FqCodelQDisc), +} + +impl QDisc { + fn new(kind: &str, opts: Vec) -> Option { + if kind == FQ_CODEL { + let mut fq_codel = FqCodelQDisc::default(); + for opt in opts { + match opt { + TcOption::FqCodel(fq_codel_opt) => { + match fq_codel_opt { + TcQdiscFqCodelOption::Target(target) => fq_codel.target = target, + TcQdiscFqCodelOption::Limit(limit) => fq_codel.limit = limit, + TcQdiscFqCodelOption::Interval(interval) => fq_codel.interval = interval, + TcQdiscFqCodelOption::Ecn(ecn) => fq_codel.ecn = ecn, + TcQdiscFqCodelOption::Flows(flows) => fq_codel.flows = flows, + TcQdiscFqCodelOption::Quantum(quantum) => fq_codel.quantum = quantum, + TcQdiscFqCodelOption::CeThreshold(ce_threshold) => fq_codel.ce_threshold = ce_threshold, + TcQdiscFqCodelOption::DropBatchSize(drop_batch_size) => fq_codel.drop_batch_size = drop_batch_size, + TcQdiscFqCodelOption::MemoryLimit(memory_limit) => fq_codel.memory_limit = memory_limit, + _ => {} + } + } + _ => {} + } + } + return Some(Self::FqCodel(fq_codel)); + } + None + } +} + +#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)] +pub enum XStats { + FqCodel(FqCodelXStats), +} + +#[derive(Default, Clone, PartialEq, Debug, Serialize, Deserialize)] +pub struct FqCodelQDisc { + pub target: u32, + pub limit: u32, + pub interval: u32, + pub ecn: u32, + pub flows: u32, + pub quantum: u32, + pub ce_threshold: u32, + pub drop_batch_size: u32, + pub memory_limit: u32, +} + +#[derive(Default, Clone, PartialEq, Debug, Serialize, Deserialize)] +pub struct FqCodelXStats { + pub maxpacket: u32, + pub drop_overlimit: u32, + pub ecn_mark: u32, + pub new_flow_count: u32, + pub new_flows_len: u32, + pub old_flows_len: u32, + pub ce_mark: u32, + pub memory_usage: u32, + pub drop_overmemory: u32, +} + +impl FqCodelXStats { + pub fn new(xstats: &TcFqCodelXstats) -> Self { + match xstats { + TcFqCodelXstats::Qdisc(qdisc) => Self { + maxpacket: qdisc.maxpacket, + drop_overlimit: qdisc.drop_overlimit, + ecn_mark: qdisc.ecn_mark, + new_flow_count: qdisc.new_flow_count, + new_flows_len: qdisc.new_flows_len, + old_flows_len: qdisc.old_flows_len, + ce_mark: qdisc.ce_mark, + memory_usage: qdisc.memory_usage, + drop_overmemory: qdisc.drop_overmemory, + }, + _ => Self::default(), + } + } +} From 5f826b20e770bd4d44a676e883f3cb8eeee257a8 Mon Sep 17 00:00:00 2001 From: mmynk Date: Tue, 30 Jan 2024 15:51:06 -0600 Subject: [PATCH 02/20] Add model to represent `Tc` object --- Cargo.lock | 75 +++++++- below/model/Cargo.toml | 1 + below/model/src/collector.rs | 20 +++ below/model/src/common_field_ids.rs | 31 +++- below/model/src/lib.rs | 14 ++ below/model/src/sample.rs | 1 + below/model/src/sample_model.rs | 43 +++++ below/model/src/tc_model.rs | 268 ++++++++++++++++++++++++++++ 8 files changed, 447 insertions(+), 6 deletions(-) create mode 100644 below/model/src/tc_model.rs diff --git a/Cargo.lock b/Cargo.lock index 45d2d0c8..cad5c717 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -276,6 +276,7 @@ dependencies = [ "below-common", "below-ethtool", "below-gpu-stats", + "below-tc", "below_derive", "cgroupfs", "enum-iterator", @@ -324,6 +325,19 @@ dependencies = [ "zstd-safe", ] +[[package]] +name = "below-tc" +version = "0.0.1" +dependencies = [ + "netlink-packet-core", + "netlink-packet-route", + "netlink-packet-utils", + "netlink-sys", + "nix 0.27.1", + "serde", + "thiserror", +] + [[package]] name = "below-view" version = "0.8.0" @@ -378,6 +392,12 @@ version = "3.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "572f695136211188308f16ad2ca5c851a712c464060ae6974944458eb83880ba" +[[package]] +name = "byteorder" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" + [[package]] name = "bytes" version = "1.2.1" @@ -1295,12 +1315,9 @@ dependencies = [ [[package]] name = "log" -version = "0.4.17" +version = "0.4.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "abb12e687cfb44aa40f41fc3978ef76448f9b6038cad6aef4259d3c095a2382e" -dependencies = [ - "cfg-if", -] +checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c" [[package]] name = "maplit" @@ -1368,6 +1385,54 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "netlink-packet-core" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72724faf704479d67b388da142b186f916188505e7e0b26719019c525882eda4" +dependencies = [ + "anyhow", + "byteorder", + "netlink-packet-utils", +] + +[[package]] +name = "netlink-packet-route" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74c171cd77b4ee8c7708da746ce392440cb7bcf618d122ec9ecc607b12938bf4" +dependencies = [ + "anyhow", + "byteorder", + "libc", + "log", + "netlink-packet-core", + "netlink-packet-utils", +] + +[[package]] +name = "netlink-packet-utils" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ede8a08c71ad5a95cdd0e4e52facd37190977039a4704eb82a283f713747d34" +dependencies = [ + "anyhow", + "byteorder", + "paste", + "thiserror", +] + +[[package]] +name = "netlink-sys" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6471bf08e7ac0135876a9581bf3217ef0333c191c128d34878079f42ee150411" +dependencies = [ + "bytes", + "libc", + "log", +] + [[package]] name = "nix" version = "0.25.0" diff --git a/below/model/Cargo.toml b/below/model/Cargo.toml index 746d839f..de2257df 100644 --- a/below/model/Cargo.toml +++ b/below/model/Cargo.toml @@ -27,6 +27,7 @@ resctrlfs = { version = "0.8.0", path = "../resctrlfs" } serde = { version = "1.0.185", features = ["derive", "rc"] } serde_json = { version = "1.0.100", features = ["float_roundtrip", "unbounded_depth"] } slog = { version = "2.7", features = ["max_level_trace", "nested-values"] } +tc = { package = "below-tc", version = "0.0.1", path = "../tc" } [dev-dependencies] futures = { version = "0.3.30", features = ["async-await", "compat"] } diff --git a/below/model/src/collector.rs b/below/model/src/collector.rs index 57601723..d71dadfc 100644 --- a/below/model/src/collector.rs +++ b/below/model/src/collector.rs @@ -31,6 +31,7 @@ pub struct CollectorOptions { pub enable_btrfs_stats: bool, pub enable_ethtool_stats: bool, pub enable_resctrl_stats: bool, + pub enable_tc_stats: bool, pub btrfs_samples: u64, pub btrfs_min_pct: f64, pub cgroup_re: Option, @@ -48,6 +49,7 @@ impl Default for CollectorOptions { enable_btrfs_stats: false, enable_ethtool_stats: false, enable_resctrl_stats: false, + enable_tc_stats: false, btrfs_samples: btrfs::DEFAULT_SAMPLES, btrfs_min_pct: btrfs::DEFAULT_MIN_PCT, cgroup_re: None, @@ -306,6 +308,17 @@ fn collect_sample( } } }, + tc: if !options.enable_tc_stats { + None + } else { + match tc::tc_stats() { + Ok(tc_stats) => Some(tc_stats), + Err(e) => { + error!(logger, "{:#}", e); + Default::default() + } + } + } }) } @@ -467,6 +480,13 @@ macro_rules! count_per_sec { } ret }}; + ($a:ident, $b:ident, $delta:expr, $target_type:ty) => {{ + let mut ret = None; + if $a <= $b { + ret = Some((($b - $a) as f64 / $delta.as_secs_f64()).ceil() as $target_type); + } + ret + }}; ($a_opt:expr, $b_opt:expr, $delta:expr, $target_type:ty) => {{ let mut ret = None; if let (Some(a), Some(b)) = ($a_opt, $b_opt) { diff --git a/below/model/src/common_field_ids.rs b/below/model/src/common_field_ids.rs index 28b53314..2e29a3f2 100644 --- a/below/model/src/common_field_ids.rs +++ b/below/model/src/common_field_ids.rs @@ -23,7 +23,7 @@ /// /// This list also servers as documentation for available field ids that could /// be used in other below crates. A test ensures that this list is up-to-date. -pub const COMMON_MODEL_FIELD_IDS: [&str; 420] = [ +pub const COMMON_MODEL_FIELD_IDS: [&str; 445] = [ "system.hostname", "system.kernel_version", "system.os_release", @@ -444,4 +444,33 @@ pub const COMMON_MODEL_FIELD_IDS: [&str; 420] = [ "network.udp6.sndbuf_errors", "network.udp6.in_csum_errors", "network.udp6.ignored_multi", + "tc.tc..backlog_per_sec", + "tc.tc..bps", + "tc.tc..bytes_per_sec", + "tc.tc..interface", + "tc.tc..drops_per_sec", + "tc.tc..kind", + "tc.tc..overlimits_per_sec", + "tc.tc..packets_per_sec", + "tc.tc..pps", + "tc.tc..qdisc.fq_codel.ce_threshold", + "tc.tc..qdisc.fq_codel.drop_batch_size", + "tc.tc..qdisc.fq_codel.ecn", + "tc.tc..qdisc.fq_codel.flows_per_sec", + "tc.tc..qdisc.fq_codel.interval", + "tc.tc..qdisc.fq_codel.limit", + "tc.tc..qdisc.fq_codel.memory_limit", + "tc.tc..qdisc.fq_codel.quantum", + "tc.tc..qdisc.fq_codel.target", + "tc.tc..qlen", + "tc.tc..requeues_per_sec", + "tc.tc..xstats.fq_codel.ce_mark", + "tc.tc..xstats.fq_codel.drop_overlimit_per_sec", + "tc.tc..xstats.fq_codel.drop_overmemory_per_sec", + "tc.tc..xstats.fq_codel.ecn_mark", + "tc.tc..xstats.fq_codel.maxpacket", + "tc.tc..xstats.fq_codel.memory_usage_per_sec", + "tc.tc..xstats.fq_codel.new_flow_count_per_sec", + "tc.tc..xstats.fq_codel.new_flows_len", + "tc.tc..xstats.fq_codel.old_flows_len", ]; diff --git a/below/model/src/lib.rs b/below/model/src/lib.rs index 001fe49c..bcdb5505 100644 --- a/below/model/src/lib.rs +++ b/below/model/src/lib.rs @@ -40,6 +40,7 @@ pub mod resctrl; pub mod sample; mod sample_model; pub mod system; +pub mod tc_model; open_source_shim!(pub); @@ -50,6 +51,7 @@ pub use process::*; pub use resctrl::*; pub use sample::*; pub use system::*; +pub use tc_model::*; /// A wrapper for different field types used in Models. By this way we can query /// different fields in a single function without using Box. @@ -535,6 +537,8 @@ pub struct Model { pub gpu: Option, #[queriable(subquery)] pub resctrl: Option, + #[queriable(subquery)] + pub tc: Option, } impl Model { @@ -593,6 +597,16 @@ impl Model { }, ) }), + tc: sample.tc.as_ref().map(|tc| { + TcModel::new( + tc, + if let Some((s, d)) = last { + s.tc.as_ref().map(|tc| (tc, d)) + } else { + None + }, + ) + }), } } } diff --git a/below/model/src/sample.rs b/below/model/src/sample.rs index ae5f80ad..61c33dde 100644 --- a/below/model/src/sample.rs +++ b/below/model/src/sample.rs @@ -23,6 +23,7 @@ pub struct Sample { pub gpus: Option, pub ethtool: Option, pub resctrl: Option, + pub tc: Option, } #[derive(Default, Clone, PartialEq, Debug, Serialize, Deserialize)] diff --git a/below/model/src/sample_model.rs b/below/model/src/sample_model.rs index 3e827d7f..3c026c31 100644 --- a/below/model/src/sample_model.rs +++ b/below/model/src/sample_model.rs @@ -926,6 +926,49 @@ pub const SAMPLE_MODEL_JSON: &str = r#" "in_csum_errors": 0, "ignored_multi": 0 } + }, + "tc": { + "tc": [ + { + "interface": "eth0", + "kind": "fq_codel", + "qlen": 42, + "bps": 420, + "pps": 1337, + "bytes_per_sec": 299792458, + "packets_per_sec": 314, + "backlog_per_sec": 2718281828, + "drops_per_sec": 8675309, + "requeues_per_sec": 12345, + "overlimits_per_sec": 314159, + "qdisc": { + "fq_codel": { + "target": 2701, + "limit": 7, + "interval": 3, + "ecn": 6, + "quantum": 42, + "ce_threshold": 101, + "drop_batch_size": 9000, + "memory_limit": 123456, + "flows_per_sec": 31415 + } + }, + "xstats": { + "fq_codel": { + "maxpacket": 8675309, + "ecn_mark": 299792458, + "new_flows_len": 314, + "old_flows_len": 1729, + "ce_mark": 42, + "drop_overlimit_per_sec": 420, + "new_flow_count_per_sec": 1337, + "memory_usage_per_sec": 2718281828, + "drop_overmemory_per_sec": 27182 + } + } + } + ] } } "#; diff --git a/below/model/src/tc_model.rs b/below/model/src/tc_model.rs new file mode 100644 index 00000000..397c557d --- /dev/null +++ b/below/model/src/tc_model.rs @@ -0,0 +1,268 @@ +use super::*; +use tc::{QDisc, Tc, XStats}; + +/// rate! macro calculates the rate of a field for given sample and last objects. +/// It basically calls count_per_sec! macro after extracting the field from the objects. +macro_rules! rate { + ($field:ident, $sample:ident, $last:ident, $target_type:ty) => {{ + $last.and_then(|(last, d)| { + let s = $sample.$field; + let l = last.$field; + count_per_sec!(l, s, d, $target_type) + }) + }}; +} + +#[derive( + Clone, + Debug, + Default, + PartialEq, + Serialize, + Deserialize, + below_derive::Queriable +)] +pub struct TcModel { + #[queriable(subquery)] + pub tc: Vec, +} + +impl TcModel { + pub fn new( + sample: &Vec, + last: Option<(&Vec, Duration)>, + ) -> Self { + // Assumption: sample and last are always ordered + let tc = match last { + Some((last_tcs, d)) if last_tcs.len() == sample.len() => { + sample.iter() + .zip(last_tcs.iter()) + .map(|(sample, last)| SingleTcModel::new(sample, Some((last, d)))) + .collect::>() + } + _ => Vec::new(), + }; + + Self { tc } + } +} + +#[derive( + Clone, + Debug, + Default, + PartialEq, + Serialize, + Deserialize, + below_derive::Queriable +)] +pub struct SingleTcModel { + /// Name of the interface + pub interface: String, + /// Name of the qdisc + pub kind: String, + + pub qlen: Option, + pub bps: Option, + pub pps: Option, + + pub bytes_per_sec: Option, + pub packets_per_sec: Option, + pub backlog_per_sec: Option, + pub drops_per_sec: Option, + pub requeues_per_sec: Option, + pub overlimits_per_sec: Option, + + #[queriable(subquery)] + pub qdisc: Option, + + #[queriable(subquery)] + pub xstats: Option, +} + +impl Nameable for SingleTcModel { + fn name() -> &'static str { + "tc" + } +} + +impl SingleTcModel { + pub fn new(sample: &Tc, last: Option<(&Tc, Duration)>) -> Self { + let mut tc_model = SingleTcModel { + interface: sample.if_name.clone(), + kind: sample.kind.clone(), + ..Default::default() + }; + + let stats = &sample.stats; + tc_model.qlen = stats.qlen; + tc_model.bps = stats.bps; + tc_model.pps = stats.pps; + + last.map(|(l, d)| { + let last = &l.stats; + tc_model.bytes_per_sec = count_per_sec!(last.bytes, stats.bytes, d, u64); + tc_model.packets_per_sec = count_per_sec!(last.packets, stats.packets, d, u32); + tc_model.backlog_per_sec = count_per_sec!(last.backlog, stats.backlog, d, u32); + tc_model.drops_per_sec = count_per_sec!(last.drops, stats.drops, d, u32); + tc_model.requeues_per_sec = count_per_sec!(last.requeues, stats.requeues, d, u32); + tc_model.overlimits_per_sec = count_per_sec!(last.overlimits, stats.overlimits, d, u32); + }); + + if let Some(sample) = stats.xstats.as_ref() { + let last = + last.and_then(|(last, d)| last.stats.xstats.as_ref().map(|l| (l, d))); + + tc_model.xstats = Some(XStatsModel::new(sample, last)); + } + + if let Some(sample) = sample.qdisc.as_ref() { + let last = last.and_then(|(last, d)| last.qdisc.as_ref().map(|l| (l, d))); + + tc_model.qdisc = Some(QDiscModel::new(sample, last)); + } + + tc_model + } +} + +#[derive( + Clone, + Debug, + Default, + PartialEq, + Serialize, + Deserialize, + below_derive::Queriable +)] +pub struct QDiscModel { + #[queriable(subquery)] + pub fq_codel: Option, +} + +impl QDiscModel { + fn new(sample: &QDisc, last: Option<(&QDisc, Duration)>) -> Self { + match sample { + QDisc::FqCodel(sample) => Self { + fq_codel: { + last.map(|(l, d)| match l { + QDisc::FqCodel(last) => { + let last = Some((last, d)); + FqCodelQDiscModel::new(sample, last) + } + }) + }, + }, + } + } +} + +#[derive( + Clone, + Debug, + Default, + PartialEq, + Serialize, + Deserialize, + below_derive::Queriable +)] +pub struct FqCodelQDiscModel { + pub target: u32, + pub limit: u32, + pub interval: u32, + pub ecn: u32, + pub quantum: u32, + pub ce_threshold: u32, + pub drop_batch_size: u32, + pub memory_limit: u32, + pub flows_per_sec: Option, +} + +impl FqCodelQDiscModel { + fn new(sample: &tc::FqCodelQDisc, last: Option<(&tc::FqCodelQDisc, Duration)>) -> Self { + Self { + target: sample.target, + limit: sample.limit, + interval: sample.interval, + ecn: sample.ecn, + quantum: sample.quantum, + ce_threshold: sample.ce_threshold, + drop_batch_size: sample.drop_batch_size, + memory_limit: sample.memory_limit, + flows_per_sec: { + last.and_then(|l| { + let last = Some(l); + rate!(flows, sample, last, u32) + }) + }, + } + } +} + +#[derive( + Clone, + Debug, + Default, + PartialEq, + Serialize, + Deserialize, + below_derive::Queriable +)] +pub struct XStatsModel { + #[queriable(subquery)] + pub fq_codel: Option, +} + +impl XStatsModel { + fn new(sample: &XStats, last: Option<(&XStats, Duration)>) -> Self { + match sample { + XStats::FqCodel(sample) => Self { + fq_codel: { + last.map(|(l, d)| match l { + XStats::FqCodel(last) => { + let last = Some((last, d)); + FqCodelXStatsModel::new(sample, last) + } + }) + }, + }, + } + } +} + +#[derive( + Clone, + Debug, + Default, + PartialEq, + Serialize, + Deserialize, + below_derive::Queriable +)] +pub struct FqCodelXStatsModel { + pub maxpacket: u32, + pub ecn_mark: u32, + pub new_flows_len: u32, + pub old_flows_len: u32, + pub ce_mark: u32, + pub drop_overlimit_per_sec: Option, + pub new_flow_count_per_sec: Option, + pub memory_usage_per_sec: Option, + pub drop_overmemory_per_sec: Option, +} + +impl FqCodelXStatsModel { + fn new(sample: &tc::FqCodelXStats, last: Option<(&tc::FqCodelXStats, Duration)>) -> Self { + Self { + maxpacket: sample.maxpacket, + ecn_mark: sample.ecn_mark, + new_flows_len: sample.new_flows_len, + old_flows_len: sample.old_flows_len, + ce_mark: sample.ce_mark, + drop_overlimit_per_sec: rate!(drop_overlimit, sample, last, u32), + new_flow_count_per_sec: rate!(drop_overlimit, sample, last, u32), + memory_usage_per_sec: rate!(drop_overlimit, sample, last, u32), + drop_overmemory_per_sec: rate!(drop_overlimit, sample, last, u32), + } + } +} From 2282fcdd3155772a5dfee257a3bb986dda9c2e1c Mon Sep 17 00:00:00 2001 From: mmynk Date: Tue, 30 Jan 2024 15:51:17 -0600 Subject: [PATCH 03/20] Add dump command for `tc` --- below/dump/src/command.rs | 87 ++++++++++++++++ below/dump/src/lib.rs | 38 +++++++ below/dump/src/tc.rs | 104 +++++++++++++++++++ below/dump/src/test.rs | 212 ++++++++++++++++++++++++++++++++++++++ 4 files changed, 441 insertions(+) create mode 100644 below/dump/src/tc.rs diff --git a/below/dump/src/command.rs b/below/dump/src/command.rs index 300a61af..6a0aa0f7 100644 --- a/below/dump/src/command.rs +++ b/below/dump/src/command.rs @@ -26,6 +26,7 @@ use model::SingleDiskModelFieldId; use model::SingleNetModelFieldId; use model::SingleProcessModelFieldId; use model::SingleQueueModelFieldId; +use model::SingleTcModelFieldId; use model::SystemModelFieldId; use once_cell::sync::Lazy; use regex::Regex; @@ -1036,6 +1037,81 @@ $ below dump ethtool-queue -b "08:30:00" -e "08:30:30" -O json ) }); +/// Represents the fields of the tc model. +#[derive( + Clone, + Debug, + PartialEq, + below_derive::EnumFromStr, + below_derive::EnumToString +)] +pub enum TcAggField { + Stats, + XStats, + QDisc, +} + +impl AggField for TcAggField { + fn expand(&self, _detail: bool) -> Vec { + use model::SingleTcModelFieldId as FieldId; + + match self { + Self::Stats => vec![ + FieldId::Interface, + FieldId::Kind, + FieldId::Qlen, + FieldId::Bps, + FieldId::Pps, + FieldId::BytesPerSec, + FieldId::PacketsPerSec, + FieldId::BacklogPerSec, + FieldId::DropsPerSec, + FieldId::RequeuesPerSec, + FieldId::OverlimitsPerSec, + ], + Self::XStats => enum_iterator::all::() + .map(FieldId::Xstats) + .collect::>(), + Self::QDisc => enum_iterator::all::() + .map(FieldId::Qdisc) + .collect::>(), + } + } +} + +pub type TcOptionField = DumpOptionField; + +pub static DEFAULT_TC_FIELDS: &[TcOptionField] = &[ + DumpOptionField::Unit(DumpField::Common(CommonField::Datetime)), + DumpOptionField::Agg(TcAggField::Stats), + DumpOptionField::Agg(TcAggField::XStats), + DumpOptionField::Agg(TcAggField::QDisc), + DumpOptionField::Unit(DumpField::Common(CommonField::Timestamp)), +]; + +const TC_ABOUT: &str = "Dump the tc related stats with qdiscs"; + +/// Generated about message for tc (traffic control) dump so supported fields are up-to-date. +static TC_LONG_ABOUT: Lazy = Lazy::new(|| { + format!( + r#"{about} +********************** Available fields ********************** +{common_fields}, {tc_fields}. +********************** Aggregated fields ********************** +* --detail: no effect. +* --default: includes [{default_fields}]. +* --everything: includes everything (equivalent to --default --detail). +********************** Example Commands ********************** +Example: +$ below dump tc -b "08:30:00" -e "08:30:30" -O json +"#, + about = TC_ABOUT, + common_fields = join(enum_iterator::all::()), + tc_fields = join(enum_iterator::all::()), + default_fields = join(DEFAULT_TC_FIELDS.to_owned()), + ) +}); + make_option! (OutputFormat { "raw": Raw, "csv": Csv, @@ -1219,4 +1295,15 @@ pub enum DumpCommand { #[clap(long, short, conflicts_with("fields"))] pattern: Option, }, + #[clap(about = TC_ABOUT, long_about = TC_LONG_ABOUT.as_str())] + Tc { + /// Select which fields to display and in what order. + #[clap(short, long)] + fields: Option>, + #[clap(flatten)] + opts: GeneralOpt, + /// Saved pattern in the dumprc file under [tc] section. + #[clap(long, short, conflicts_with("fields"))] + pattern: Option, + }, } diff --git a/below/dump/src/lib.rs b/below/dump/src/lib.rs index 86f3ac01..15688af1 100644 --- a/below/dump/src/lib.rs +++ b/below/dump/src/lib.rs @@ -53,6 +53,7 @@ pub mod print; pub mod process; pub mod system; pub mod tmain; +pub mod tc; pub mod transport; #[cfg(test)] @@ -117,6 +118,7 @@ pub type IfaceField = DumpField; // Essentially the same as NetworkField pub type TransportField = DumpField; pub type EthtoolQueueField = DumpField; +pub type TcField = DumpField; fn get_advance( logger: slog::Logger, @@ -559,5 +561,41 @@ pub fn run( errs, ) } + DumpCommand::Tc { + fields, + opts, + pattern, + } => { + let (time_begin, time_end, advance) = + get_advance(logger, dir, host, port, snapshot, &opts)?; + let detail = opts.everything || opts.detail; + let fields = if let Some(pattern_key) = pattern { + parse_pattern(filename, pattern_key, "tc") + } else { + fields + }; + let fields = expand_fields( + match fields.as_ref() { + Some(fields) => fields, + _ => command::DEFAULT_TC_FIELDS, + }, + detail, + ); + let tc = tc::Tc::new(&opts, fields); + let mut output: Box = match opts.output.as_ref() { + Some(file_path) => Box::new(File::create(file_path)?), + None => Box::new(io::stdout()), + }; + dump_timeseries( + advance, + time_begin, + time_end, + &tc, + output.as_mut(), + opts.output_format, + opts.br, + errs, + ) + } } } diff --git a/below/dump/src/tc.rs b/below/dump/src/tc.rs new file mode 100644 index 00000000..482e7a0e --- /dev/null +++ b/below/dump/src/tc.rs @@ -0,0 +1,104 @@ +use super::*; +use model::SingleTcModel; + +pub struct Tc { + opts: GeneralOpt, + fields: Vec, +} + +impl Tc { + pub fn new(opts: &GeneralOpt, fields: Vec) -> Self { + Self { + opts: opts.to_owned(), + fields, + } + } +} + +impl Dumper for Tc { + fn dump_model( + &self, + ctx: &CommonFieldContext, + model: &model::Model, + output: &mut dyn Write, + round: &mut usize, + comma_flag: bool, + ) -> Result { + let tcs: Vec<&SingleTcModel> = match &model.tc { + Some(tc_model) => tc_model.tc.iter().collect(), + None => Vec::new(), + }; + if tcs.is_empty() { + return Ok(IterExecResult::Skip); + } + + let mut json_output = json!([]); + + tcs.into_iter() + .map(|tc| { + match self.opts.output_format { + Some(OutputFormat::Raw) | None => write!( + output, + "{}", + print::dump_raw( + &self.fields, + ctx, + tc, + *round, + self.opts.repeat_title, + self.opts.disable_title, + self.opts.raw + ) + )?, + Some(OutputFormat::Csv) => write!( + output, + "{}", + print::dump_csv( + &self.fields, + ctx, + tc, + *round, + self.opts.disable_title, + self.opts.raw + ) + )?, + Some(OutputFormat::Tsv) => write!( + output, + "{}", + print::dump_tsv( + &self.fields, + ctx, + tc, + *round, + self.opts.disable_title, + self.opts.raw + ) + )?, + Some(OutputFormat::KeyVal) => write!( + output, + "{}", + print::dump_kv(&self.fields, ctx, tc, self.opts.raw) + )?, + Some(OutputFormat::Json) => { + let par = print::dump_json(&self.fields, ctx, tc, self.opts.raw); + json_output.as_array_mut().unwrap().push(par); + } + Some(OutputFormat::OpenMetrics) => { + write!(output, "{}", print::dump_openmetrics(&self.fields, ctx, tc))? + } + } + *round += 1; + Ok(()) + }) + .collect::>>()?; + + match (self.opts.output_format, comma_flag) { + (Some(OutputFormat::Json), true) => write!(output, ",{}", json_output)?, + (Some(OutputFormat::Json), false) => write!(output, "{}", json_output)?, + (Some(OutputFormat::OpenMetrics), _) => (), + _ => writeln!(output)?, + }; + + Ok(IterExecResult::Success) + } +} diff --git a/below/dump/src/test.rs b/below/dump/src/test.rs index 21126d82..3df76e78 100644 --- a/below/dump/src/test.rs +++ b/below/dump/src/test.rs @@ -987,6 +987,7 @@ fn test_dump_queue_content() { network, gpu: None, resctrl: None, + tc: None, }; let mut opts: GeneralOpt = Default::default(); @@ -1224,3 +1225,214 @@ proc = ["datetime", "mem.anon"] )) ); } + +#[test] +fn test_tc_titles() { + let titles = expand_fields(command::DEFAULT_TC_FIELDS, true) + .iter() + .filter_map(|dump_field| match dump_field { + DumpField::Common(_) => None, + DumpField::FieldId(field_id) => Some(field_id.to_string()), + }) + .collect::>(); + + let expected_titles = vec![ + "interface", + "kind", + "qlen", + "bps", + "pps", + "bytes_per_sec", + "packets_per_sec", + "backlog_per_sec", + "drops_per_sec", + "requeues_per_sec", + "overlimits_per_sec", + "xstats.fq_codel.maxpacket", + "xstats.fq_codel.ecn_mark", + "xstats.fq_codel.new_flows_len", + "xstats.fq_codel.old_flows_len", + "xstats.fq_codel.ce_mark", + "xstats.fq_codel.drop_overlimit_per_sec", + "xstats.fq_codel.new_flow_count_per_sec", + "xstats.fq_codel.memory_usage_per_sec", + "xstats.fq_codel.drop_overmemory_per_sec", + "qdisc.fq_codel.target", + "qdisc.fq_codel.limit", + "qdisc.fq_codel.interval", + "qdisc.fq_codel.ecn", + "qdisc.fq_codel.quantum", + "qdisc.fq_codel.ce_threshold", + "qdisc.fq_codel.drop_batch_size", + "qdisc.fq_codel.memory_limit", + "qdisc.fq_codel.flows_per_sec", + ]; + assert_eq!(titles, expected_titles); +} + +#[test] +fn test_dump_tc_content() { + let tc_models = vec![ + model::SingleTcModel { + interface: "eth0".to_string(), + kind: "mq".to_string(), + qlen: Some(42), + bps: Some(420), + pps: Some(1337), + bytes_per_sec: Some(299792458), + packets_per_sec: Some(314), + backlog_per_sec: Some(271828182), + drops_per_sec: Some(8675309), + requeues_per_sec: Some(12345), + overlimits_per_sec: Some(314159), + qdisc: None, + xstats: None, + }, + model::SingleTcModel { + interface: "eth0".to_string(), + kind: "fq_codel".to_string(), + qlen: Some(42), + bps: Some(420), + pps: Some(1337), + bytes_per_sec: Some(299792458), + packets_per_sec: Some(314), + backlog_per_sec: Some(271828182), + drops_per_sec: Some(8675309), + requeues_per_sec: Some(12345), + overlimits_per_sec: Some(314159), + qdisc: Some(model::QDiscModel { + fq_codel: Some(model::FqCodelQDiscModel { + target: 2701, + limit: 7, + interval: 3, + ecn: 6, + quantum: 42, + ce_threshold: 101, + drop_batch_size: 9000, + memory_limit: 123456, + flows_per_sec: Some(31415), + }), + }), + xstats: Some(model::XStatsModel { + fq_codel: Some(model::FqCodelXStatsModel { + maxpacket: 8675309, + ecn_mark: 299792458, + new_flows_len: 314, + old_flows_len: 1729, + ce_mark: 42, + drop_overlimit_per_sec: Some(420), + new_flow_count_per_sec: Some(1337), + memory_usage_per_sec: Some(271828182), + drop_overmemory_per_sec: Some(27182), + }), + }), + }, + ]; + + let model = model::Model { + time_elapsed: Duration::from_secs(60 * 10), + timestamp: SystemTime::now(), + system: model::SystemModel::default(), + cgroup: model::CgroupModel::default(), + process: model::ProcessModel::default(), + network: model::NetworkModel::default(), + gpu: None, + resctrl: None, + tc: Some(model::TcModel { + tc: tc_models, + }), + }; + + let mut opts: GeneralOpt = Default::default(); + let fields = command::expand_fields(command::DEFAULT_TC_FIELDS, true); + + opts.output_format = Some(OutputFormat::Json); + let queue_dumper = tc::Tc::new(&opts, fields.clone()); + + let mut queue_content: Vec = Vec::new(); + let mut round = 0; + let ctx = CommonFieldContext { + timestamp: 0, + hostname: "h".to_string(), + }; + + let result = queue_dumper + .dump_model(&ctx, &model, &mut queue_content, &mut round, false) + .expect("Failed to dump queue model"); + assert!(result == tmain::IterExecResult::Success); + + // verify json correctness + assert!(!queue_content.is_empty()); + let jval: Value = + serde_json::from_slice(&queue_content).expect("Fail parse json of queue dump"); + + let expected_json = json!([ + { + "Datetime": "1970-01-01 00:00:00", + "Device": "1", + "Kind": "mq", + "Queue Length": "42", + "Bps": "420 B/s", + "Pps": "1337/s", + "Bytes": "285.9 MB/s", + "Packets": "314/s", + "Backlog": "271828182/s", + "Drops": "8675309/s", + "Requeues": "12345/s", + "Overlimits": "314159/s", + "Target": "?", + "Limit": "?", + "Interval": "?", + "Ecn": "?", + "Quantum": "?", + "CeThreshold": "?", + "DropBatchSize": "?", + "MemoryLimit": "?", + "Flows": "?", + "MaxPacket": "?", + "EcnMark": "?", + "NewFlowsLen": "?", + "OldFlowsLen": "?", + "CeMark": "?", + "DropOverlimit": "?", + "NewFlowCount": "?", + "MemoryUsage": "?", + "DropOvermemory": "?", + "Timestamp": "0" + }, + { + "Datetime": "1970-01-01 00:00:00", + "Device": "1", + "Kind": "fq_codel", + "Queue Length": "42", + "Bps": "420 B/s", + "Pps": "1337/s", + "Bytes": "285.9 MB/s", + "Packets": "314/s", + "Backlog": "271828182/s", + "Drops": "8675309/s", + "Requeues": "12345/s", + "Overlimits": "314159/s", + "Target": "2701", + "Limit": "7", + "Interval": "3", + "Ecn": "6", + "Quantum": "42", + "CeThreshold": "101", + "DropBatchSize": "9000", + "MemoryLimit": "123456", + "Flows": "31415/s", + "MaxPacket": "8675309", + "EcnMark": "299792458", + "NewFlowsLen": "314", + "OldFlowsLen": "1729", + "CeMark": "42", + "DropOverlimit": "420/s", + "NewFlowCount": "1337/s", + "MemoryUsage": "271828182/s", + "DropOvermemory": "27182/s", + "Timestamp": "0" + } + ]); + assert_eq!(jval, expected_json); +} From 8ac8604c220252e88d848e9efa89ee273ccb6459 Mon Sep 17 00:00:00 2001 From: mmynk Date: Tue, 30 Jan 2024 15:51:32 -0600 Subject: [PATCH 04/20] Add render configs for `tc` --- below/render/src/default_configs.rs | 180 ++++++++++++++++++++++++++++ 1 file changed, 180 insertions(+) diff --git a/below/render/src/default_configs.rs b/below/render/src/default_configs.rs index 1942e0d1..75befbb5 100644 --- a/below/render/src/default_configs.rs +++ b/below/render/src/default_configs.rs @@ -1552,3 +1552,183 @@ impl HasRenderConfig for model::CgroupProperties { } } } + +impl HasRenderConfig for model::SingleTcModel { + fn get_render_config_builder(field_id: &Self::FieldId) -> RenderConfigBuilder { + use model::SingleTcModelFieldId::*; + let rc = RenderConfigBuilder::new(); + match field_id { + Interface => rc.title("Interface"), + Kind => rc.title("Kind"), + Qlen => rc.title("Queue Length"), + Bps => rc.title("Bps").format(ReadableSize).suffix("/s"), + Pps => rc.title("Pps").suffix("/s"), + BytesPerSec => rc.title("Bytes").format(ReadableSize).suffix("/s"), + PacketsPerSec => rc.title("Packets").suffix("/s"), + BacklogPerSec => rc.title("Backlog").suffix("/s"), + DropsPerSec => rc.title("Drops").suffix("/s"), + RequeuesPerSec => rc.title("Requeues").suffix("/s"), + OverlimitsPerSec => rc.title("Overlimits").suffix("/s"), + Qdisc(field_id) => model::QDiscModel::get_render_config_builder(field_id), + Xstats(field_id) => model::XStatsModel::get_render_config_builder(field_id), + } + } +} + +impl HasRenderConfigForDump for model::SingleTcModel { + fn get_openmetrics_config_for_dump( + &self, + field_id: &Self::FieldId, + ) -> Option { + use model::SingleTcModelFieldId::*; + let gauge = gauge() + .label("interface", &self.interface) + .label("qdisc", &self.kind); + match field_id { + Interface => None, + Kind => None, + Qlen => Some(gauge), + Bps => Some(gauge.unit("bytes_per_second")), + Pps => Some(gauge.unit("packets_per_second")), + BytesPerSec => Some(gauge.unit("bytes_per_second")), + PacketsPerSec => Some(gauge.unit("packets_per_second")), + BacklogPerSec => Some(gauge.unit("packets_per_second")), + DropsPerSec => Some(gauge.unit("packets_per_second")), + RequeuesPerSec => Some(gauge.unit("packets_per_second")), + OverlimitsPerSec => Some(gauge.unit("packets_per_second")), + Qdisc(field_id) => self + .qdisc + .as_ref() + .and_then(|qdisc| qdisc.get_openmetrics_config_for_dump(field_id)), + Xstats(field_id) => self + .xstats + .as_ref() + .and_then(|xstats| xstats.get_openmetrics_config_for_dump(field_id)), + } + } +} + +impl HasRenderConfig for model::QDiscModel { + fn get_render_config_builder(field_id: &Self::FieldId) -> RenderConfigBuilder { + use model::QDiscModelFieldId::*; + match field_id { + FqCodel(field_id) => model::FqCodelQDiscModel::get_render_config_builder(field_id), + } + } +} + +impl HasRenderConfigForDump for model::QDiscModel { + fn get_openmetrics_config_for_dump( + &self, + field_id: &Self::FieldId, + ) -> Option { + use model::QDiscModelFieldId::*; + match field_id { + FqCodel(field_id) => self + .fq_codel + .as_ref() + .and_then(|fq_codel| fq_codel.get_openmetrics_config_for_dump(field_id)), + } + } +} + +impl HasRenderConfig for model::FqCodelQDiscModel { + fn get_render_config_builder(field_id: &Self::FieldId) -> RenderConfigBuilder { + use model::FqCodelQDiscModelFieldId::*; + let rc = RenderConfigBuilder::new(); + match field_id { + Target => rc.title("Target"), + Limit => rc.title("Limit"), + Interval => rc.title("Interval"), + Ecn => rc.title("Ecn"), + Quantum => rc.title("Quantum"), + CeThreshold => rc.title("CeThreshold"), + DropBatchSize => rc.title("DropBatchSize"), + MemoryLimit => rc.title("MemoryLimit"), + FlowsPerSec => rc.title("Flows").suffix("/s"), + } + } +} + +impl HasRenderConfig for model::XStatsModel { + fn get_render_config_builder(field_id: &Self::FieldId) -> RenderConfigBuilder { + use model::XStatsModelFieldId::*; + match field_id { + FqCodel(field_id) => model::FqCodelXStatsModel::get_render_config_builder(field_id), + } + } +} + +impl HasRenderConfigForDump for model::XStatsModel { + fn get_openmetrics_config_for_dump( + &self, + field_id: &Self::FieldId, + ) -> Option { + use model::XStatsModelFieldId::*; + match field_id { + FqCodel(field_id) => self + .fq_codel + .as_ref() + .and_then(|fq_codel| fq_codel.get_openmetrics_config_for_dump(field_id)), + } + } +} + +impl HasRenderConfigForDump for model::FqCodelQDiscModel { + fn get_openmetrics_config_for_dump( + &self, + field_id: &Self::FieldId, + ) -> Option { + use model::FqCodelQDiscModelFieldId::*; + match field_id { + Target => Some(gauge()), + Limit => Some(gauge()), + Interval => Some(gauge()), + Ecn => Some(gauge()), + Quantum => Some(gauge()), + CeThreshold => Some(gauge()), + DropBatchSize => Some(gauge()), + MemoryLimit => Some(gauge()), + FlowsPerSec => Some(gauge()), + } + } +} + +impl HasRenderConfig for model::FqCodelXStatsModel { + fn get_render_config_builder(field_id: &Self::FieldId) -> RenderConfigBuilder { + use model::FqCodelXStatsModelFieldId::*; + let rc = RenderConfigBuilder::new(); + match field_id { + Maxpacket => rc.title("MaxPacket"), + EcnMark => rc.title("EcnMark"), + NewFlowsLen => rc.title("NewFlowsLen"), + OldFlowsLen => rc.title("OldFlowsLen"), + CeMark => rc.title("CeMark"), + DropOverlimitPerSec => rc.title("DropOverlimit").suffix("/s"), + NewFlowCountPerSec => rc.title("NewFlowCount").suffix("/s"), + MemoryUsagePerSec => rc.title("MemoryUsage").suffix("/s"), + DropOvermemoryPerSec => rc.title("DropOvermemory").suffix("/s"), + } + } +} + +impl HasRenderConfigForDump for model::FqCodelXStatsModel { + fn get_openmetrics_config_for_dump( + &self, + field_id: &Self::FieldId, + ) -> Option { + use model::FqCodelXStatsModelFieldId::*; + let gauge = gauge(); + match field_id { + Maxpacket => Some(gauge), + EcnMark => Some(gauge), + NewFlowsLen => Some(gauge), + OldFlowsLen => Some(gauge), + CeMark => Some(gauge), + DropOverlimitPerSec => Some(gauge), + NewFlowCountPerSec => Some(gauge), + MemoryUsagePerSec => Some(gauge), + DropOvermemoryPerSec => Some(gauge), + } + } +} From 28bc14f9a32ad457a94b7f8a525d2ec6d74a38ff Mon Sep 17 00:00:00 2001 From: mmynk Date: Tue, 30 Jan 2024 15:52:00 -0600 Subject: [PATCH 05/20] Add config knobs --- below/config/src/lib.rs | 2 ++ below/src/main.rs | 1 + 2 files changed, 3 insertions(+) diff --git a/below/config/src/lib.rs b/below/config/src/lib.rs index 3298c597..1f82e9eb 100644 --- a/below/config/src/lib.rs +++ b/below/config/src/lib.rs @@ -47,6 +47,7 @@ pub struct BelowConfig { pub btrfs_min_pct: f64, pub enable_ethtool_stats: bool, pub enable_resctrl_stats: bool, + pub enable_tc_stats: bool, } impl Default for BelowConfig { @@ -63,6 +64,7 @@ impl Default for BelowConfig { btrfs_min_pct: btrfs::DEFAULT_MIN_PCT, enable_ethtool_stats: false, enable_resctrl_stats: false, + enable_tc_stats: false, } } } diff --git a/below/src/main.rs b/below/src/main.rs index d9e8b5a5..62c7001e 100644 --- a/below/src/main.rs +++ b/below/src/main.rs @@ -1012,6 +1012,7 @@ fn record( enable_btrfs_stats: below_config.enable_btrfs_stats, enable_ethtool_stats: below_config.enable_ethtool_stats, enable_resctrl_stats: below_config.enable_resctrl_stats, + enable_tc_stats: below_config.enable_tc_stats, btrfs_samples: below_config.btrfs_samples, btrfs_min_pct: below_config.btrfs_min_pct, cgroup_re, From ba6476d0900512bcae9d3521b36bda838b02ccc4 Mon Sep 17 00:00:00 2001 From: mmynk Date: Tue, 30 Jan 2024 15:52:27 -0600 Subject: [PATCH 06/20] Sync dependencies --- Cargo.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/Cargo.toml b/Cargo.toml index 6dc8f046..316538d4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,7 @@ members = [ "below/render", "below/resctrlfs", "below/store", + "below/tc", "below/view", ] resolver = "2" From 91221b32d59e7465d41a4c3d6972f33da36d5e16 Mon Sep 17 00:00:00 2001 From: mmynk Date: Wed, 31 Jan 2024 18:58:55 +0000 Subject: [PATCH 07/20] `rustfmt` on changed files --- below/dump/src/lib.rs | 2 +- below/dump/src/test.rs | 4 +- below/model/src/collector.rs | 2 +- below/model/src/collector_plugin.rs | 12 ++-- below/model/src/tc_model.rs | 19 +++---- below/tc/src/lib.rs | 45 +++++++++------ below/tc/src/test.rs | 87 +++++++++++++++-------------- below/tc/src/types.rs | 51 +++++++++-------- 8 files changed, 118 insertions(+), 104 deletions(-) diff --git a/below/dump/src/lib.rs b/below/dump/src/lib.rs index 15688af1..ba1a65f5 100644 --- a/below/dump/src/lib.rs +++ b/below/dump/src/lib.rs @@ -52,8 +52,8 @@ pub mod network; pub mod print; pub mod process; pub mod system; -pub mod tmain; pub mod tc; +pub mod tmain; pub mod transport; #[cfg(test)] diff --git a/below/dump/src/test.rs b/below/dump/src/test.rs index 3df76e78..dbad0ee7 100644 --- a/below/dump/src/test.rs +++ b/below/dump/src/test.rs @@ -1338,9 +1338,7 @@ fn test_dump_tc_content() { network: model::NetworkModel::default(), gpu: None, resctrl: None, - tc: Some(model::TcModel { - tc: tc_models, - }), + tc: Some(model::TcModel { tc: tc_models }), }; let mut opts: GeneralOpt = Default::default(); diff --git a/below/model/src/collector.rs b/below/model/src/collector.rs index d71dadfc..02c2fea6 100644 --- a/below/model/src/collector.rs +++ b/below/model/src/collector.rs @@ -318,7 +318,7 @@ fn collect_sample( Default::default() } } - } + }, }) } diff --git a/below/model/src/collector_plugin.rs b/below/model/src/collector_plugin.rs index 01355d3f..faffe4d9 100644 --- a/below/model/src/collector_plugin.rs +++ b/below/model/src/collector_plugin.rs @@ -155,14 +155,14 @@ mod test { // Test overwriting sample futures::executor::block_on(collector.collect_and_update()).unwrap(); c.wait(); // <-- 1 - // Consumer checking overwritten sample + // Consumer checking overwritten sample c.wait(); // <-- 2 - // Test sending None + // Test sending None futures::executor::block_on(collector.collect_and_update()).unwrap(); c.wait(); // <-- 3 - // Consumer checking None + // Consumer checking None c.wait(); // <-- 4 - // Test sending error. Will fail on both collector and consumer threads. + // Test sending error. Will fail on both collector and consumer threads. let is_error = matches!( futures::executor::block_on(collector.collect_and_update()), Err(_) @@ -174,11 +174,11 @@ mod test { barrier.wait(); // <-- 1 assert_eq!(Some(2), consumer.try_take().unwrap()); barrier.wait(); // <-- 2 - // Collector sending None + // Collector sending None barrier.wait(); // <-- 3 assert_eq!(None, consumer.try_take().unwrap()); barrier.wait(); // <-- 4 - // Collector sending error + // Collector sending error barrier.wait(); // <-- 5 assert!(matches!(consumer.try_take(), Err(_))); diff --git a/below/model/src/tc_model.rs b/below/model/src/tc_model.rs index 397c557d..10e921a5 100644 --- a/below/model/src/tc_model.rs +++ b/below/model/src/tc_model.rs @@ -28,18 +28,14 @@ pub struct TcModel { } impl TcModel { - pub fn new( - sample: &Vec, - last: Option<(&Vec, Duration)>, - ) -> Self { + pub fn new(sample: &Vec, last: Option<(&Vec, Duration)>) -> Self { // Assumption: sample and last are always ordered let tc = match last { - Some((last_tcs, d)) if last_tcs.len() == sample.len() => { - sample.iter() - .zip(last_tcs.iter()) - .map(|(sample, last)| SingleTcModel::new(sample, Some((last, d)))) - .collect::>() - } + Some((last_tcs, d)) if last_tcs.len() == sample.len() => sample + .iter() + .zip(last_tcs.iter()) + .map(|(sample, last)| SingleTcModel::new(sample, Some((last, d)))) + .collect::>(), _ => Vec::new(), }; @@ -110,8 +106,7 @@ impl SingleTcModel { }); if let Some(sample) = stats.xstats.as_ref() { - let last = - last.and_then(|(last, d)| last.stats.xstats.as_ref().map(|l| (l, d))); + let last = last.and_then(|(last, d)| last.stats.xstats.as_ref().map(|l| (l, d))); tc_model.xstats = Some(XStatsModel::new(sample, last)); } diff --git a/below/tc/src/lib.rs b/below/tc/src/lib.rs index beae7a31..5b194681 100644 --- a/below/tc/src/lib.rs +++ b/below/tc/src/lib.rs @@ -6,9 +6,11 @@ mod test; use std::collections::BTreeMap; -use netlink_packet_core::{NetlinkHeader, NetlinkMessage, NetlinkPayload, NLM_F_DUMP, NLM_F_REQUEST}; -use netlink_packet_route::RouteNetlinkMessage; +use netlink_packet_core::{ + NetlinkHeader, NetlinkMessage, NetlinkPayload, NLM_F_DUMP, NLM_F_REQUEST, +}; use netlink_packet_route::tc::TcMessage; +use netlink_packet_route::RouteNetlinkMessage; use netlink_sys::constants::NETLINK_ROUTE; use netlink_sys::{Socket, SocketAddr}; use nix::net::if_; @@ -25,7 +27,10 @@ pub fn tc_stats() -> Result { read_tc_stats(ifaces, &get_netlink_qdiscs) } -fn read_tc_stats(interfaces: BTreeMap, netlink_retriever: &dyn Fn() -> Result>) -> Result { +fn read_tc_stats( + interfaces: BTreeMap, + netlink_retriever: &dyn Fn() -> Result>, +) -> Result { let messages = netlink_retriever()?; let tc_stats = messages .into_iter() @@ -41,7 +46,9 @@ fn read_tc_stats(interfaces: BTreeMap, netlink_retriever: &dyn Fn() fn get_netlink_qdiscs() -> Result> { // open a socket let socket = Socket::new(NETLINK_ROUTE).map_err(|e| TcError::Netlink(e.to_string()))?; - socket.connect(&SocketAddr::new(0, 0)).map_err(|e| TcError::Netlink(e.to_string()))?; + socket + .connect(&SocketAddr::new(0, 0)) + .map_err(|e| TcError::Netlink(e.to_string()))?; // create a netlink request let mut nl_hdr = NetlinkHeader::default(); @@ -53,7 +60,9 @@ fn get_netlink_qdiscs() -> Result> { packet.serialize(&mut buf[..]); // send the request - socket.send(&buf[..], 0).map_err(|e| TcError::Netlink(e.to_string()))?; + socket + .send(&buf[..], 0) + .map_err(|e| TcError::Netlink(e.to_string()))?; // receive the response let mut recv_buf = vec![0; 4096]; @@ -83,7 +92,9 @@ fn get_netlink_qdiscs() -> Result> { let mut tc_msgs = Vec::new(); for msg in response { - if let NetlinkPayload::InnerMessage(RouteNetlinkMessage::NewQueueDiscipline(tc)) = msg.payload { + if let NetlinkPayload::InnerMessage(RouteNetlinkMessage::NewQueueDiscipline(tc)) = + msg.payload + { tc_msgs.push(tc); } } @@ -95,17 +106,17 @@ fn get_netlink_qdiscs() -> Result> { fn get_interfaces() -> Result> { let ifaces = if_::if_nameindex().map_err(|e| TcError::ReadInterfaces(e.to_string()))?; let if_map = ifaces - .iter() - .map(|iface| { - let index = iface.index(); - let name = if let Ok(name) = iface.name().to_str() { - name.to_string() - } else { - String::new() - }; - (index, name) - }) - .collect::>(); + .iter() + .map(|iface| { + let index = iface.index(); + let name = if let Ok(name) = iface.name().to_str() { + name.to_string() + } else { + String::new() + }; + (index, name) + }) + .collect::>(); Ok(if_map) } diff --git a/below/tc/src/test.rs b/below/tc/src/test.rs index d5e0aa83..a1549854 100644 --- a/below/tc/src/test.rs +++ b/below/tc/src/test.rs @@ -1,4 +1,7 @@ -use netlink_packet_route::tc::{TcAttribute, TcFqCodelQdStats, TcFqCodelXstats, TcHandle, TcHeader, TcMessage, TcOption, TcQdiscFqCodelOption, TcStats, TcStats2, TcStatsBasic, TcStatsQueue, TcXstats}; +use netlink_packet_route::tc::{ + TcAttribute, TcFqCodelQdStats, TcFqCodelXstats, TcHandle, TcHeader, TcMessage, TcOption, + TcQdiscFqCodelOption, TcStats, TcStats2, TcStatsBasic, TcStatsQueue, TcXstats, +}; use super::*; @@ -53,24 +56,20 @@ fn fake_netlink_qdiscs() -> Result> { queue }), ]), - TcAttribute::Xstats( - TcXstats::FqCodel( - TcFqCodelXstats::Qdisc({ - let mut fq_codel = TcFqCodelQdStats::default(); - fq_codel.maxpacket = 258; - fq_codel.drop_overlimit = 0; - fq_codel.ecn_mark = 0; - fq_codel.new_flow_count = 91; - fq_codel.new_flows_len = 0; - fq_codel.old_flows_len = 0; - fq_codel.ce_mark = 0; - fq_codel.memory_usage = 0; - fq_codel.drop_overmemory = 0; - fq_codel - } - ) - ) - ),], + TcAttribute::Xstats(TcXstats::FqCodel(TcFqCodelXstats::Qdisc({ + let mut fq_codel = TcFqCodelQdStats::default(); + fq_codel.maxpacket = 258; + fq_codel.drop_overlimit = 0; + fq_codel.ecn_mark = 0; + fq_codel.new_flow_count = 91; + fq_codel.new_flows_len = 0; + fq_codel.old_flows_len = 0; + fq_codel.ce_mark = 0; + fq_codel.memory_usage = 0; + fq_codel.drop_overmemory = 0; + fq_codel + }))), + ], ); tc_msgs.push(msg1); @@ -95,28 +94,34 @@ fn test_tc_stats() { assert_eq!(tc.stats.pps, Some(400)); // qdisc - assert_eq!(tc.qdisc, Some(QDisc::FqCodel(FqCodelQDisc { - target: 4999, - limit: 10240, - interval: 99999, - ecn: 1, - flows: 1024, - quantum: 1514, - ce_threshold: 0, - drop_batch_size: 64, - memory_limit: 33554432, - }))); + assert_eq!( + tc.qdisc, + Some(QDisc::FqCodel(FqCodelQDisc { + target: 4999, + limit: 10240, + interval: 99999, + ecn: 1, + flows: 1024, + quantum: 1514, + ce_threshold: 0, + drop_batch_size: 64, + memory_limit: 33554432, + })) + ); // xstats - assert_eq!(tc.stats.xstats, Some(XStats::FqCodel(FqCodelXStats { - maxpacket: 258, - drop_overlimit: 0, - ecn_mark: 0, - new_flow_count: 91, - new_flows_len: 0, - old_flows_len: 0, - ce_mark: 0, - memory_usage: 0, - drop_overmemory: 0, - }))); + assert_eq!( + tc.stats.xstats, + Some(XStats::FqCodel(FqCodelXStats { + maxpacket: 258, + drop_overlimit: 0, + ecn_mark: 0, + new_flow_count: 91, + new_flows_len: 0, + old_flows_len: 0, + ce_mark: 0, + memory_usage: 0, + drop_overmemory: 0, + })) + ); } diff --git a/below/tc/src/types.rs b/below/tc/src/types.rs index 4e0461b1..c0d95041 100644 --- a/below/tc/src/types.rs +++ b/below/tc/src/types.rs @@ -1,7 +1,9 @@ use std::collections::BTreeMap; use netlink_packet_route::tc; -use netlink_packet_route::tc::{TcAttribute, TcFqCodelXstats, TcMessage, TcOption, TcQdiscFqCodelOption}; +use netlink_packet_route::tc::{ + TcAttribute, TcFqCodelXstats, TcMessage, TcOption, TcQdiscFqCodelOption, +}; use serde::{Deserialize, Serialize}; const FQ_CODEL: &str = "fq_codel"; @@ -21,7 +23,9 @@ pub struct Tc { impl Tc { pub fn new(interfaces: &BTreeMap, tc_msg: &TcMessage) -> Self { let if_index = tc_msg.header.index as u32; - let if_name = interfaces.get(&if_index).map_or_else(String::new, |iface| iface.to_string()); + let if_name = interfaces + .get(&if_index) + .map_or_else(String::new, |iface| iface.to_string()); let mut tc = Self { if_index, if_name, @@ -57,19 +61,16 @@ impl Tc { } } } - TcAttribute::Xstats(tc_xstats) => { - match tc_xstats { - tc::TcXstats::FqCodel(fq_codel_xstats) => { - tc.stats.xstats = Some(XStats::FqCodel(FqCodelXStats::new(fq_codel_xstats))) - } - _ => {} + TcAttribute::Xstats(tc_xstats) => match tc_xstats { + tc::TcXstats::FqCodel(fq_codel_xstats) => { + tc.stats.xstats = Some(XStats::FqCodel(FqCodelXStats::new(fq_codel_xstats))) } - } + _ => {} + }, _ => {} } } - tc.qdisc = QDisc::new(&tc.kind, opts); tc @@ -108,20 +109,24 @@ impl QDisc { let mut fq_codel = FqCodelQDisc::default(); for opt in opts { match opt { - TcOption::FqCodel(fq_codel_opt) => { - match fq_codel_opt { - TcQdiscFqCodelOption::Target(target) => fq_codel.target = target, - TcQdiscFqCodelOption::Limit(limit) => fq_codel.limit = limit, - TcQdiscFqCodelOption::Interval(interval) => fq_codel.interval = interval, - TcQdiscFqCodelOption::Ecn(ecn) => fq_codel.ecn = ecn, - TcQdiscFqCodelOption::Flows(flows) => fq_codel.flows = flows, - TcQdiscFqCodelOption::Quantum(quantum) => fq_codel.quantum = quantum, - TcQdiscFqCodelOption::CeThreshold(ce_threshold) => fq_codel.ce_threshold = ce_threshold, - TcQdiscFqCodelOption::DropBatchSize(drop_batch_size) => fq_codel.drop_batch_size = drop_batch_size, - TcQdiscFqCodelOption::MemoryLimit(memory_limit) => fq_codel.memory_limit = memory_limit, - _ => {} + TcOption::FqCodel(fq_codel_opt) => match fq_codel_opt { + TcQdiscFqCodelOption::Target(target) => fq_codel.target = target, + TcQdiscFqCodelOption::Limit(limit) => fq_codel.limit = limit, + TcQdiscFqCodelOption::Interval(interval) => fq_codel.interval = interval, + TcQdiscFqCodelOption::Ecn(ecn) => fq_codel.ecn = ecn, + TcQdiscFqCodelOption::Flows(flows) => fq_codel.flows = flows, + TcQdiscFqCodelOption::Quantum(quantum) => fq_codel.quantum = quantum, + TcQdiscFqCodelOption::CeThreshold(ce_threshold) => { + fq_codel.ce_threshold = ce_threshold } - } + TcQdiscFqCodelOption::DropBatchSize(drop_batch_size) => { + fq_codel.drop_batch_size = drop_batch_size + } + TcQdiscFqCodelOption::MemoryLimit(memory_limit) => { + fq_codel.memory_limit = memory_limit + } + _ => {} + }, _ => {} } } From 85b12422994a5afbd2a54e64a8d87725dee72763 Mon Sep 17 00:00:00 2001 From: mmynk Date: Mon, 19 Feb 2024 08:38:51 +0000 Subject: [PATCH 08/20] Use `borrow` trait --- below/tc/src/lib.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/below/tc/src/lib.rs b/below/tc/src/lib.rs index 5b194681..1f298f39 100644 --- a/below/tc/src/lib.rs +++ b/below/tc/src/lib.rs @@ -4,6 +4,7 @@ mod types; #[cfg(test)] mod test; +use std::borrow::{Borrow, BorrowMut}; use std::collections::BTreeMap; use netlink_packet_core::{ @@ -57,20 +58,20 @@ fn get_netlink_qdiscs() -> Result> { let mut packet = NetlinkMessage::new(nl_hdr, NetlinkPayload::from(msg)); packet.finalize(); let mut buf = vec![0; packet.header.length as usize]; - packet.serialize(&mut buf[..]); + packet.serialize(buf[..].borrow_mut()); // send the request socket - .send(&buf[..], 0) + .send(buf[..].borrow(), 0) .map_err(|e| TcError::Netlink(e.to_string()))?; // receive the response let mut recv_buf = vec![0; 4096]; let mut offset = 0; let mut response = Vec::new(); - 'out: while let Ok(size) = socket.recv(&mut &mut recv_buf[offset..], 0) { + 'out: while let Ok(size) = socket.recv(&mut recv_buf[..].borrow_mut(), 0) { loop { - let bytes = &recv_buf[offset..]; + let bytes = recv_buf[offset..].borrow(); let rx_packet = >::deserialize(bytes) .map_err(|e| TcError::Netlink(e.to_string()))?; response.push(rx_packet.clone()); From 0f448d2e24d91e3263c18f76eadca9bb56616011 Mon Sep 17 00:00:00 2001 From: mmynk Date: Mon, 19 Feb 2024 09:26:39 +0000 Subject: [PATCH 09/20] Add documentation for public fields --- below/tc/src/types.rs | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/below/tc/src/types.rs b/below/tc/src/types.rs index c0d95041..2467217e 100644 --- a/below/tc/src/types.rs +++ b/below/tc/src/types.rs @@ -11,12 +11,19 @@ const FQ_CODEL: &str = "fq_codel"; /// `Tc` represents a traffic control qdisc. #[derive(Default, Clone, PartialEq, Debug, Serialize, Deserialize)] pub struct Tc { + /// Index of the network interface. pub if_index: u32, + /// Name of the network interface. pub if_name: String, + /// A unique identifier for the qdisc. pub handle: u32, + /// Identifier of the parent qdisc. pub parent: u32, + /// Type of the queueing discipline, e.g. `fq_codel`, `htb`, etc. pub kind: String, + /// Detailed statistics of the qdisc, such as bytes, packets, qlen, etc. pub stats: Stats, + /// qdisc wraps the specific qdisc type, e.g. `fq_codel`. pub qdisc: Option, } @@ -81,23 +88,33 @@ impl Tc { #[derive(Default, Clone, PartialEq, Debug, Serialize, Deserialize)] pub struct Stats { // Stats2::StatsBasic + /// Number of enqueued bytes. pub bytes: Option, + /// Number of enqueued packets. pub packets: Option, // Stats2::StatsQueue + /// Length of the queue. pub qlen: Option, + /// Number of bytes pending in the queue. pub backlog: Option, + /// Packets dropped because of lack of resources. pub drops: Option, pub requeues: Option, + /// Number of throttle events when this flow goes out of allocated bandwidth. pub overlimits: Option, // XStats + /// xstats wraps extended statistics of the qdisc. pub xstats: Option, + /// Current flow byte rate. pub bps: Option, + /// Current flow packet rate. pub pps: Option, } +/// `QDisc` represents the queueing discipline of a network interface. #[derive(Clone, PartialEq, Debug, Serialize, Deserialize)] pub enum QDisc { FqCodel(FqCodelQDisc), @@ -143,27 +160,45 @@ pub enum XStats { #[derive(Default, Clone, PartialEq, Debug, Serialize, Deserialize)] pub struct FqCodelQDisc { + /// Accceptable minimum standing/persistent queue delay. pub target: u32, + /// Hard limit on the real queue size. pub limit: u32, + /// Used to ensure that the measured minimum delay does not become too stale. pub interval: u32, + /// Used to mark packets instead of dropping them. pub ecn: u32, + /// Number of flows into which the incoming packets are classified. pub flows: u32, + /// Number of bytes used as 'deficit' in the fair queuing algorithm. pub quantum: u32, + /// Sets a threshold above which all packets are marked with ECN Congestion Experienced. pub ce_threshold: u32, + /// Sets the maximum number of packets to drop when limit or memory_limit is exceeded. pub drop_batch_size: u32, + /// Sets a limit on the total number of bytes that can be queued in this FQ-CoDel instance. pub memory_limit: u32, } #[derive(Default, Clone, PartialEq, Debug, Serialize, Deserialize)] pub struct FqCodelXStats { + /// Largest packet we've seen so far pub maxpacket: u32, + /// Number of time max qdisc packet limit was hit. pub drop_overlimit: u32, + /// Number of packets ECN marked instead of being dropped. pub ecn_mark: u32, + /// Number of time packets created a 'new flow'. pub new_flow_count: u32, + /// Count of flows in new list. pub new_flows_len: u32, + /// Count of flows in old list. pub old_flows_len: u32, + /// Packets above ce_threshold. pub ce_mark: u32, + /// Memory usage (bytes). pub memory_usage: u32, + /// Number of time packets were dropped due to memory limit. pub drop_overmemory: u32, } From 54ae812377a1a0c9e32779bb6abc68db7920bb6b Mon Sep 17 00:00:00 2001 From: mmynk Date: Mon, 19 Feb 2024 09:26:57 +0000 Subject: [PATCH 10/20] Explicit imports over star --- below/model/src/tc_model.rs | 6 +++++- below/tc/src/lib.rs | 6 +++--- below/tc/src/test.rs | 6 ++++-- 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/below/model/src/tc_model.rs b/below/model/src/tc_model.rs index 10e921a5..30ee718f 100644 --- a/below/model/src/tc_model.rs +++ b/below/model/src/tc_model.rs @@ -1,4 +1,8 @@ -use super::*; +use std::time::Duration; + +use serde::{Deserialize, Serialize}; + +use crate::{Field, FieldId, Nameable, Queriable}; use tc::{QDisc, Tc, XStats}; /// rate! macro calculates the rate of a field for given sample and last objects. diff --git a/below/tc/src/lib.rs b/below/tc/src/lib.rs index 1f298f39..58d54584 100644 --- a/below/tc/src/lib.rs +++ b/below/tc/src/lib.rs @@ -16,11 +16,11 @@ use netlink_sys::constants::NETLINK_ROUTE; use netlink_sys::{Socket, SocketAddr}; use nix::net::if_; -pub use errors::*; -pub use types::*; +use errors::TcError; +pub use types::{FqCodelQDisc, FqCodelXStats, QDisc, Tc, XStats}; pub type TcStats = Vec; -pub type Result = std::result::Result; +type Result = std::result::Result; /// Get list of all `tc` qdiscs. pub fn tc_stats() -> Result { diff --git a/below/tc/src/test.rs b/below/tc/src/test.rs index a1549854..c65c9680 100644 --- a/below/tc/src/test.rs +++ b/below/tc/src/test.rs @@ -1,9 +1,11 @@ +use std::collections::BTreeMap; + use netlink_packet_route::tc::{ TcAttribute, TcFqCodelQdStats, TcFqCodelXstats, TcHandle, TcHeader, TcMessage, TcOption, TcQdiscFqCodelOption, TcStats, TcStats2, TcStatsBasic, TcStatsQueue, TcXstats, }; -use super::*; +use crate::{types::XStats, FqCodelQDisc, FqCodelXStats, QDisc, Result}; fn fake_netlink_qdiscs() -> Result> { let mut tc_msgs = Vec::new(); @@ -79,7 +81,7 @@ fn fake_netlink_qdiscs() -> Result> { #[test] fn test_tc_stats() { let ifaces = BTreeMap::from_iter(vec![(2, "eth0".to_string())]); - let tc_map = read_tc_stats(ifaces, &fake_netlink_qdiscs).unwrap(); + let tc_map = crate::read_tc_stats(ifaces, &fake_netlink_qdiscs).unwrap(); let tc = tc_map.get(0).unwrap(); assert_eq!(tc.if_index, 2); From 564b43d2c8fe832575695d4a42f887988a24fe33 Mon Sep 17 00:00:00 2001 From: mmynk Date: Sat, 16 Mar 2024 18:49:33 -0500 Subject: [PATCH 11/20] Revert `rustfmt` change --- below/model/src/collector_plugin.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/below/model/src/collector_plugin.rs b/below/model/src/collector_plugin.rs index faffe4d9..44a3b49c 100644 --- a/below/model/src/collector_plugin.rs +++ b/below/model/src/collector_plugin.rs @@ -155,14 +155,14 @@ mod test { // Test overwriting sample futures::executor::block_on(collector.collect_and_update()).unwrap(); c.wait(); // <-- 1 - // Consumer checking overwritten sample + // Consumer checking overwritten sample c.wait(); // <-- 2 - // Test sending None + // Test sending None futures::executor::block_on(collector.collect_and_update()).unwrap(); c.wait(); // <-- 3 - // Consumer checking None + // Consumer checking None c.wait(); // <-- 4 - // Test sending error. Will fail on both collector and consumer threads. + // Test sending error. Will fail on both collector and consumer threads. let is_error = matches!( futures::executor::block_on(collector.collect_and_update()), Err(_) From 158d66a1068462d1354ca849548d9de6e7985e75 Mon Sep 17 00:00:00 2001 From: mmynk Date: Sun, 17 Mar 2024 00:32:49 +0000 Subject: [PATCH 12/20] Rename `Tc` type to `TcStat` --- below/model/src/tc_model.rs | 6 +++--- below/tc/src/lib.rs | 6 +++--- below/tc/src/types.rs | 4 ++-- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/below/model/src/tc_model.rs b/below/model/src/tc_model.rs index 30ee718f..df02d639 100644 --- a/below/model/src/tc_model.rs +++ b/below/model/src/tc_model.rs @@ -3,7 +3,7 @@ use std::time::Duration; use serde::{Deserialize, Serialize}; use crate::{Field, FieldId, Nameable, Queriable}; -use tc::{QDisc, Tc, XStats}; +use tc::{QDisc, TcStat, TcStats, XStats}; /// rate! macro calculates the rate of a field for given sample and last objects. /// It basically calls count_per_sec! macro after extracting the field from the objects. @@ -32,7 +32,7 @@ pub struct TcModel { } impl TcModel { - pub fn new(sample: &Vec, last: Option<(&Vec, Duration)>) -> Self { + pub fn new(sample: &TcStats, last: Option<(&TcStats, Duration)>) -> Self { // Assumption: sample and last are always ordered let tc = match last { Some((last_tcs, d)) if last_tcs.len() == sample.len() => sample @@ -87,7 +87,7 @@ impl Nameable for SingleTcModel { } impl SingleTcModel { - pub fn new(sample: &Tc, last: Option<(&Tc, Duration)>) -> Self { + pub fn new(sample: &TcStat, last: Option<(&TcStat, Duration)>) -> Self { let mut tc_model = SingleTcModel { interface: sample.if_name.clone(), kind: sample.kind.clone(), diff --git a/below/tc/src/lib.rs b/below/tc/src/lib.rs index 58d54584..6789547b 100644 --- a/below/tc/src/lib.rs +++ b/below/tc/src/lib.rs @@ -17,9 +17,9 @@ use netlink_sys::{Socket, SocketAddr}; use nix::net::if_; use errors::TcError; -pub use types::{FqCodelQDisc, FqCodelXStats, QDisc, Tc, XStats}; +pub use types::{FqCodelQDisc, FqCodelXStats, QDisc, TcStat, XStats}; -pub type TcStats = Vec; +pub type TcStats = Vec; type Result = std::result::Result; /// Get list of all `tc` qdiscs. @@ -35,7 +35,7 @@ fn read_tc_stats( let messages = netlink_retriever()?; let tc_stats = messages .into_iter() - .map(|msg| Tc::new(&interfaces, &msg)) + .map(|msg| TcStat::new(&interfaces, &msg)) .collect(); Ok(tc_stats) diff --git a/below/tc/src/types.rs b/below/tc/src/types.rs index 2467217e..00a0ca52 100644 --- a/below/tc/src/types.rs +++ b/below/tc/src/types.rs @@ -10,7 +10,7 @@ const FQ_CODEL: &str = "fq_codel"; /// `Tc` represents a traffic control qdisc. #[derive(Default, Clone, PartialEq, Debug, Serialize, Deserialize)] -pub struct Tc { +pub struct TcStat { /// Index of the network interface. pub if_index: u32, /// Name of the network interface. @@ -27,7 +27,7 @@ pub struct Tc { pub qdisc: Option, } -impl Tc { +impl TcStat { pub fn new(interfaces: &BTreeMap, tc_msg: &TcMessage) -> Self { let if_index = tc_msg.header.index as u32; let if_name = interfaces From 5b77fd9ca12a9e2365301b2d7f5b2203513b35d9 Mon Sep 17 00:00:00 2001 From: mmynk Date: Sun, 17 Mar 2024 00:34:28 +0000 Subject: [PATCH 13/20] Pass only the relevant interface to `TcStat::new` --- Cargo.lock | 10 +++++----- below/tc/src/lib.rs | 7 ++++++- below/tc/src/types.rs | 7 +------ 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cad5c717..6253873a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -441,9 +441,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.0.89" +version = "1.0.90" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a0ba8f7aaa012f30d5b2861462f6708eccd49c3c39863fe083a308035f63d723" +checksum = "8cd6604a82acf3039f1144f54b8eb34e91ffba622051189e71b781822d5ee1f5" dependencies = [ "jobserver", "libc", @@ -1102,9 +1102,9 @@ dependencies = [ [[package]] name = "hermit-abi" -version = "0.3.1" +version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fed44880c466736ef9a5c5b5facefb5ed0785676d0c02d612db14e54f0d84286" +checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" [[package]] name = "hostname" @@ -1529,7 +1529,7 @@ version = "1.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" dependencies = [ - "hermit-abi 0.3.1", + "hermit-abi 0.3.9", "libc", ] diff --git a/below/tc/src/lib.rs b/below/tc/src/lib.rs index 6789547b..9d5757fb 100644 --- a/below/tc/src/lib.rs +++ b/below/tc/src/lib.rs @@ -35,7 +35,12 @@ fn read_tc_stats( let messages = netlink_retriever()?; let tc_stats = messages .into_iter() - .map(|msg| TcStat::new(&interfaces, &msg)) + .filter_map(|msg| { + interfaces + .get(&(msg.header.index as u32)) + .cloned() + .map(|if_name| TcStat::new(if_name, &msg)) + }) .collect(); Ok(tc_stats) diff --git a/below/tc/src/types.rs b/below/tc/src/types.rs index 00a0ca52..7d65b937 100644 --- a/below/tc/src/types.rs +++ b/below/tc/src/types.rs @@ -1,5 +1,3 @@ -use std::collections::BTreeMap; - use netlink_packet_route::tc; use netlink_packet_route::tc::{ TcAttribute, TcFqCodelXstats, TcMessage, TcOption, TcQdiscFqCodelOption, @@ -28,11 +26,8 @@ pub struct TcStat { } impl TcStat { - pub fn new(interfaces: &BTreeMap, tc_msg: &TcMessage) -> Self { + pub fn new(if_name: String, tc_msg: &TcMessage) -> Self { let if_index = tc_msg.header.index as u32; - let if_name = interfaces - .get(&if_index) - .map_or_else(String::new, |iface| iface.to_string()); let mut tc = Self { if_index, if_name, From 6985d1d8854393af0df7bc15449a6a30ac8903bc Mon Sep 17 00:00:00 2001 From: mmynk Date: Mon, 1 Apr 2024 16:40:13 +0000 Subject: [PATCH 14/20] Add an async collector plugin for tc stats Tc stats is made best effort as netlink operations could be blocking. --- below/model/src/collector.rs | 20 ++++---- below/model/src/lib.rs | 1 + below/model/src/tc_collector_plugin.rs | 35 +++++++++++++ below/src/main.rs | 71 ++++++++++++++++++++++++++ 4 files changed, 118 insertions(+), 9 deletions(-) create mode 100644 below/model/src/tc_collector_plugin.rs diff --git a/below/model/src/collector.rs b/below/model/src/collector.rs index 02c2fea6..bf577f09 100644 --- a/below/model/src/collector.rs +++ b/below/model/src/collector.rs @@ -37,6 +37,8 @@ pub struct CollectorOptions { pub cgroup_re: Option, pub gpu_stats_receiver: Option>, + pub tc_stats_receiver: + Option>, } impl Default for CollectorOptions { @@ -54,6 +56,7 @@ impl Default for CollectorOptions { btrfs_min_pct: btrfs::DEFAULT_MIN_PCT, cgroup_re: None, gpu_stats_receiver: None, + tc_stats_receiver: None, } } } @@ -308,16 +311,15 @@ fn collect_sample( } } }, - tc: if !options.enable_tc_stats { - None + tc: if let Some(tc_stats_receiver) = &options.tc_stats_receiver { + Some( + tc_stats_receiver + .try_take() + .context("TC stats collector had an error")? + .unwrap_or_default(), + ) } else { - match tc::tc_stats() { - Ok(tc_stats) => Some(tc_stats), - Err(e) => { - error!(logger, "{:#}", e); - Default::default() - } - } + None }, }) } diff --git a/below/model/src/lib.rs b/below/model/src/lib.rs index bcdb5505..c27ffd17 100644 --- a/below/model/src/lib.rs +++ b/below/model/src/lib.rs @@ -41,6 +41,7 @@ pub mod sample; mod sample_model; pub mod system; pub mod tc_model; +pub mod tc_collector_plugin; open_source_shim!(pub); diff --git a/below/model/src/tc_collector_plugin.rs b/below/model/src/tc_collector_plugin.rs new file mode 100644 index 00000000..7b06046a --- /dev/null +++ b/below/model/src/tc_collector_plugin.rs @@ -0,0 +1,35 @@ +use anyhow::Result; +use async_trait::async_trait; +use slog::error; +use tc::TcStats; + +use crate::collector_plugin::AsyncCollectorPlugin; + +pub type SampleType = TcStats; + +pub struct TcStatsCollectorPlugin { + logger: slog::Logger, +} + +impl TcStatsCollectorPlugin { + pub fn new(logger: slog::Logger) -> Result { + Ok(Self { logger }) + } +} + +#[async_trait] +impl AsyncCollectorPlugin for TcStatsCollectorPlugin { + type T = TcStats; + + async fn try_collect(&mut self) -> Result> { + let stats = match tc::tc_stats() { + Ok(tc_stats) => Some(tc_stats), + Err(e) => { + error!(self.logger, "{:#}", e); + Default::default() + } + }; + + Ok(stats) + } +} diff --git a/below/src/main.rs b/below/src/main.rs index 62c7001e..3dc8a0f3 100644 --- a/below/src/main.rs +++ b/below/src/main.rs @@ -475,6 +475,70 @@ pub fn start_gpu_stats_thread_and_get_stats_receiver( Ok(receiver) } +fn start_tc_stats_thread_and_get_stats_receiver( + logger: slog::Logger, + interval: Duration, +) -> Result> { + let tc_collector = model::tc_collector_plugin::TcStatsCollectorPlugin::new(logger.clone()) + .context("Failed to initialize TC stats collector")?; + let (mut collector, receiver) = model::collector_plugin::collector_consumer(tc_collector); + + // Start a thread to collect TC stats + let target_interval = interval.clone(); + thread::Builder::new() + .name("tc_stats_collector".to_owned()) + .spawn(move || { + // Exponential backoff on unrecoverable errors + const EXP_BACKOFF_FACTOR: u32 = 2; + const MAX_BACKOFF_SECS: u64 = 900; + let max_backoff = Duration::from_secs(MAX_BACKOFF_SECS); + let mut interval = target_interval; + loop { + let collect_instant = Instant::now(); + let rt = TB::new_current_thread() + .thread_name("create_fburl") + .build() + .expect("Failed to build tokio runtime."); + match rt.block_on(collector.collect_and_update()) { + Ok(_) => { + interval = target_interval; + } + Err(e) => { + interval = std::cmp::min( + interval + .saturating_mul(EXP_BACKOFF_FACTOR), + max_backoff, + ); + error!( + logger, + "TC stats collection backing off {:?} because of unrecoverable error: {:?}", + interval, + e + ); + } + } + let collect_duration = Instant::now().duration_since(collect_instant); + + const COLLECT_DURATION_WARN_THRESHOLD: u64 = 2; + if collect_duration > Duration::from_secs(COLLECT_DURATION_WARN_THRESHOLD) { + warn!( + logger, + "TC collection took {} > {}", + collect_duration.as_secs_f64(), + COLLECT_DURATION_WARN_THRESHOLD + ); + } + if interval > collect_duration { + let sleep_duration = interval - collect_duration; + std::thread::sleep(sleep_duration); + } + } + }) + .expect("Failed to spawn thread"); + + Ok(receiver) +} + /// Returns true if other end disconnected, false otherwise fn check_for_exitstat_errors(logger: &slog::Logger, receiver: &Receiver) -> bool { // Print an error but don't exit on bpf issues. Do this b/c we can't always @@ -1002,6 +1066,12 @@ fn record( None }; + let tc_stats_receiver = if below_config.enable_tc_stats { + Some(start_tc_stats_thread_and_get_stats_receiver(logger.clone(), interval)?) + } else { + None + }; + let mut collector = model::Collector::new( logger.clone(), model::CollectorOptions { @@ -1017,6 +1087,7 @@ fn record( btrfs_min_pct: below_config.btrfs_min_pct, cgroup_re, gpu_stats_receiver, + tc_stats_receiver, }, ); From 8fe69d080d00169e5962197bce2aa73ad2c65b7c Mon Sep 17 00:00:00 2001 From: mmynk Date: Mon, 1 Apr 2024 19:23:46 +0000 Subject: [PATCH 15/20] Fix length of common field vec --- below/model/src/common_field_ids.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/below/model/src/common_field_ids.rs b/below/model/src/common_field_ids.rs index 2e29a3f2..3b752298 100644 --- a/below/model/src/common_field_ids.rs +++ b/below/model/src/common_field_ids.rs @@ -23,7 +23,7 @@ /// /// This list also servers as documentation for available field ids that could /// be used in other below crates. A test ensures that this list is up-to-date. -pub const COMMON_MODEL_FIELD_IDS: [&str; 445] = [ +pub const COMMON_MODEL_FIELD_IDS: [&str; 449] = [ "system.hostname", "system.kernel_version", "system.os_release", From c4eeb0eb4d258bf046465f2e5ff95078d6bb7011 Mon Sep 17 00:00:00 2001 From: mmynk Date: Mon, 15 Apr 2024 21:09:39 +0000 Subject: [PATCH 16/20] Add README for `tc` crate --- below/tc/README.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 below/tc/README.md diff --git a/below/tc/README.md b/below/tc/README.md new file mode 100644 index 00000000..d65e42e4 --- /dev/null +++ b/below/tc/README.md @@ -0,0 +1,5 @@ +# tc + +`tc` is a rust library that parses the queueing discipline ([qdisc](https://tldp.org/HOWTO/Traffic-Control-HOWTO/components.html#c-qdisc)) component of the Traffic Control ([`tc`](http://man7.org/linux/man-pages/man8/tc.8.html)) Linux subsystem. + +The library depends on another upstream library https://github.com/rust-netlink/netlink-packet-route for parsing the [netlink](https://www.kernel.org/doc/html/latest/userspace-api/netlink/intro.html) response which is then converted into an intermediate representation to be consumed by the `model` library. From 97ac6bf79e9c22db05d2443c781988d75d6ec54c Mon Sep 17 00:00:00 2001 From: mmynk Date: Tue, 16 Apr 2024 22:04:27 +0000 Subject: [PATCH 17/20] Refactor `fq_codel` xstats The struct `FqCodelXStats` is changed into enum. The enum has type `FqCodelQdiscStats` which represents `tc_fq_codel_qd_stats`. We can add type to represent `tc_fq_codel_cl_stats` when classful qdisc support is added. --- below/model/src/tc_model.rs | 27 ++++++++++++++------------- below/tc/src/lib.rs | 2 +- below/tc/src/test.rs | 26 ++++++++++++++------------ below/tc/src/types.rs | 37 ++++++++++++++++++++++--------------- 4 files changed, 51 insertions(+), 41 deletions(-) diff --git a/below/model/src/tc_model.rs b/below/model/src/tc_model.rs index df02d639..9c6e3ec0 100644 --- a/below/model/src/tc_model.rs +++ b/below/model/src/tc_model.rs @@ -112,7 +112,7 @@ impl SingleTcModel { if let Some(sample) = stats.xstats.as_ref() { let last = last.and_then(|(last, d)| last.stats.xstats.as_ref().map(|l| (l, d))); - tc_model.xstats = Some(XStatsModel::new(sample, last)); + tc_model.xstats = XStatsModel::new(sample, last); } if let Some(sample) = sample.qdisc.as_ref() { @@ -213,18 +213,18 @@ pub struct XStatsModel { } impl XStatsModel { - fn new(sample: &XStats, last: Option<(&XStats, Duration)>) -> Self { - match sample { - XStats::FqCodel(sample) => Self { - fq_codel: { - last.map(|(l, d)| match l { - XStats::FqCodel(last) => { - let last = Some((last, d)); - FqCodelXStatsModel::new(sample, last) - } - }) - }, + fn new(sample: &XStats, last: Option<(&XStats, Duration)>) -> Option { + match (sample, last) { + (XStats::FqCodel(sample), Some((XStats::FqCodel(last), d))) => { + match (sample, last) { + (tc::FqCodelXStats::FqCodelQdiscStats(sample), tc::FqCodelXStats::FqCodelQdiscStats(last)) => { + Some(Self { + fq_codel: Some(FqCodelXStatsModel::new(sample, Some((last, d)))), + }) + }, + } }, + _ => None, } } } @@ -239,6 +239,7 @@ impl XStatsModel { below_derive::Queriable )] pub struct FqCodelXStatsModel { + // FqCodelQdXStats pub maxpacket: u32, pub ecn_mark: u32, pub new_flows_len: u32, @@ -251,7 +252,7 @@ pub struct FqCodelXStatsModel { } impl FqCodelXStatsModel { - fn new(sample: &tc::FqCodelXStats, last: Option<(&tc::FqCodelXStats, Duration)>) -> Self { + fn new(sample: &tc::FqCodelQdStats, last: Option<(&tc::FqCodelQdStats, Duration)>) -> Self { Self { maxpacket: sample.maxpacket, ecn_mark: sample.ecn_mark, diff --git a/below/tc/src/lib.rs b/below/tc/src/lib.rs index 9d5757fb..d9468bd0 100644 --- a/below/tc/src/lib.rs +++ b/below/tc/src/lib.rs @@ -17,7 +17,7 @@ use netlink_sys::{Socket, SocketAddr}; use nix::net::if_; use errors::TcError; -pub use types::{FqCodelQDisc, FqCodelXStats, QDisc, TcStat, XStats}; +pub use types::{FqCodelQDisc, FqCodelQdStats, FqCodelXStats, QDisc, TcStat, XStats}; pub type TcStats = Vec; type Result = std::result::Result; diff --git a/below/tc/src/test.rs b/below/tc/src/test.rs index c65c9680..c318c2fd 100644 --- a/below/tc/src/test.rs +++ b/below/tc/src/test.rs @@ -5,7 +5,7 @@ use netlink_packet_route::tc::{ TcQdiscFqCodelOption, TcStats, TcStats2, TcStatsBasic, TcStatsQueue, TcXstats, }; -use crate::{types::XStats, FqCodelQDisc, FqCodelXStats, QDisc, Result}; +use crate::{types::XStats, FqCodelQDisc, FqCodelQdStats, FqCodelXStats, QDisc, Result}; fn fake_netlink_qdiscs() -> Result> { let mut tc_msgs = Vec::new(); @@ -114,16 +114,18 @@ fn test_tc_stats() { // xstats assert_eq!( tc.stats.xstats, - Some(XStats::FqCodel(FqCodelXStats { - maxpacket: 258, - drop_overlimit: 0, - ecn_mark: 0, - new_flow_count: 91, - new_flows_len: 0, - old_flows_len: 0, - ce_mark: 0, - memory_usage: 0, - drop_overmemory: 0, - })) + Some(XStats::FqCodel(FqCodelXStats::FqCodelQdiscStats( + FqCodelQdStats { + maxpacket: 258, + drop_overlimit: 0, + ecn_mark: 0, + new_flow_count: 91, + new_flows_len: 0, + old_flows_len: 0, + ce_mark: 0, + memory_usage: 0, + drop_overmemory: 0, + } + ))) ); } diff --git a/below/tc/src/types.rs b/below/tc/src/types.rs index 7d65b937..41fc5ae1 100644 --- a/below/tc/src/types.rs +++ b/below/tc/src/types.rs @@ -65,7 +65,7 @@ impl TcStat { } TcAttribute::Xstats(tc_xstats) => match tc_xstats { tc::TcXstats::FqCodel(fq_codel_xstats) => { - tc.stats.xstats = Some(XStats::FqCodel(FqCodelXStats::new(fq_codel_xstats))) + tc.stats.xstats = FqCodelXStats::new(fq_codel_xstats).map(XStats::FqCodel); } _ => {} }, @@ -175,8 +175,13 @@ pub struct FqCodelQDisc { pub memory_limit: u32, } +#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)] +pub enum FqCodelXStats { + FqCodelQdiscStats(FqCodelQdStats), +} + #[derive(Default, Clone, PartialEq, Debug, Serialize, Deserialize)] -pub struct FqCodelXStats { +pub struct FqCodelQdStats { /// Largest packet we've seen so far pub maxpacket: u32, /// Number of time max qdisc packet limit was hit. @@ -198,20 +203,22 @@ pub struct FqCodelXStats { } impl FqCodelXStats { - pub fn new(xstats: &TcFqCodelXstats) -> Self { + pub fn new(xstats: &TcFqCodelXstats) -> Option { match xstats { - TcFqCodelXstats::Qdisc(qdisc) => Self { - maxpacket: qdisc.maxpacket, - drop_overlimit: qdisc.drop_overlimit, - ecn_mark: qdisc.ecn_mark, - new_flow_count: qdisc.new_flow_count, - new_flows_len: qdisc.new_flows_len, - old_flows_len: qdisc.old_flows_len, - ce_mark: qdisc.ce_mark, - memory_usage: qdisc.memory_usage, - drop_overmemory: qdisc.drop_overmemory, - }, - _ => Self::default(), + TcFqCodelXstats::Qdisc(qdisc) => { + Some(FqCodelXStats::FqCodelQdiscStats(FqCodelQdStats { + maxpacket: qdisc.maxpacket, + drop_overlimit: qdisc.drop_overlimit, + ecn_mark: qdisc.ecn_mark, + new_flow_count: qdisc.new_flow_count, + new_flows_len: qdisc.new_flows_len, + old_flows_len: qdisc.old_flows_len, + ce_mark: qdisc.ce_mark, + memory_usage: qdisc.memory_usage, + drop_overmemory: qdisc.drop_overmemory, + })) + } + _ => None, } } } From d11374fd98fd52ee1a3551a56945fe3bba510d4f Mon Sep 17 00:00:00 2001 From: mmynk Date: Wed, 17 Apr 2024 19:41:52 +0000 Subject: [PATCH 18/20] Use wrapper `below_derive::queriable_derives` --- below/model/src/tc_model.rs | 40 ++++--------------------------------- 1 file changed, 4 insertions(+), 36 deletions(-) diff --git a/below/model/src/tc_model.rs b/below/model/src/tc_model.rs index 9c6e3ec0..be811862 100644 --- a/below/model/src/tc_model.rs +++ b/below/model/src/tc_model.rs @@ -17,15 +17,7 @@ macro_rules! rate { }}; } -#[derive( - Clone, - Debug, - Default, - PartialEq, - Serialize, - Deserialize, - below_derive::Queriable -)] +#[below_derive::queriable_derives] pub struct TcModel { #[queriable(subquery)] pub tc: Vec, @@ -47,15 +39,7 @@ impl TcModel { } } -#[derive( - Clone, - Debug, - Default, - PartialEq, - Serialize, - Deserialize, - below_derive::Queriable -)] +#[below_derive::queriable_derives] pub struct SingleTcModel { /// Name of the interface pub interface: String, @@ -125,15 +109,7 @@ impl SingleTcModel { } } -#[derive( - Clone, - Debug, - Default, - PartialEq, - Serialize, - Deserialize, - below_derive::Queriable -)] +#[below_derive::queriable_derives] pub struct QDiscModel { #[queriable(subquery)] pub fq_codel: Option, @@ -156,15 +132,7 @@ impl QDiscModel { } } -#[derive( - Clone, - Debug, - Default, - PartialEq, - Serialize, - Deserialize, - below_derive::Queriable -)] +#[below_derive::queriable_derives] pub struct FqCodelQDiscModel { pub target: u32, pub limit: u32, From 76fef69b31de76a00637773520012deadd3e0c19 Mon Sep 17 00:00:00 2001 From: mmynk Date: Wed, 17 Apr 2024 19:44:07 +0000 Subject: [PATCH 19/20] Update deps from merge --- Cargo.lock | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index edc7c717..e1f0f1f4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -400,9 +400,9 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" -version = "1.2.1" +version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec8a7b6a70fde80372154c65702f00a0f56f3e1c36abbc6c440484be248856db" +checksum = "514de17de45fdb8dc022b1a7975556c53c86f9f0aa5f534b98977b171857c2c9" dependencies = [ "serde", ] @@ -598,7 +598,7 @@ dependencies = [ "autocfg", "cfg-if", "crossbeam-utils", - "memoffset", + "memoffset 0.6.5", "scopeguard", ] @@ -1355,6 +1355,15 @@ dependencies = [ "autocfg", ] +[[package]] +name = "memoffset" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "488016bfae457b036d996092f6cb448677611ce4449e970ceaf42695203f218a" +dependencies = [ + "autocfg", +] + [[package]] name = "miniz_oxide" version = "0.7.2" @@ -1434,7 +1443,7 @@ dependencies = [ "bitflags 1.3.2", "cfg-if", "libc", - "memoffset", + "memoffset 0.6.5", "pin-utils", ] @@ -1447,6 +1456,7 @@ dependencies = [ "bitflags 2.4.0", "cfg-if", "libc", + "memoffset 0.9.1", ] [[package]] From 3d987a70b6fe65512f538d8d8e1efaa7a9164b26 Mon Sep 17 00:00:00 2001 From: mmynk Date: Wed, 17 Apr 2024 20:11:51 +0000 Subject: [PATCH 20/20] Fix lint issues --- below/model/src/lib.rs | 2 +- below/model/src/tc_model.rs | 15 +++++++-------- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/below/model/src/lib.rs b/below/model/src/lib.rs index 435869b5..307ccf01 100644 --- a/below/model/src/lib.rs +++ b/below/model/src/lib.rs @@ -41,8 +41,8 @@ pub mod resctrl; pub mod sample; mod sample_model; pub mod system; -pub mod tc_model; pub mod tc_collector_plugin; +pub mod tc_model; open_source_shim!(pub); diff --git a/below/model/src/tc_model.rs b/below/model/src/tc_model.rs index be811862..e09c3331 100644 --- a/below/model/src/tc_model.rs +++ b/below/model/src/tc_model.rs @@ -183,14 +183,13 @@ pub struct XStatsModel { impl XStatsModel { fn new(sample: &XStats, last: Option<(&XStats, Duration)>) -> Option { match (sample, last) { - (XStats::FqCodel(sample), Some((XStats::FqCodel(last), d))) => { - match (sample, last) { - (tc::FqCodelXStats::FqCodelQdiscStats(sample), tc::FqCodelXStats::FqCodelQdiscStats(last)) => { - Some(Self { - fq_codel: Some(FqCodelXStatsModel::new(sample, Some((last, d)))), - }) - }, - } + (XStats::FqCodel(sample), Some((XStats::FqCodel(last), d))) => match (sample, last) { + ( + tc::FqCodelXStats::FqCodelQdiscStats(sample), + tc::FqCodelXStats::FqCodelQdiscStats(last), + ) => Some(Self { + fq_codel: Some(FqCodelXStatsModel::new(sample, Some((last, d)))), + }), }, _ => None, }