Skip to content

Commit

Permalink
Implement 'show memory_breakdown --node [node]''
Browse files Browse the repository at this point in the history
michaelklishin committed Jan 2, 2025
1 parent 7293ff6 commit f4390b4
Showing 8 changed files with 241 additions and 16 deletions.
28 changes: 20 additions & 8 deletions src/cli.rs
Original file line number Diff line number Diff line change
@@ -754,14 +754,26 @@ fn declare_subcommands() -> [Command; 11] {
]
}

fn show_subcommands() -> [Command; 3] {
[
Command::new("overview")
.about("displays a essential information about target node and its cluster"),
Command::new("churn").about("displays object churn metrics"),
Command::new("endpoint")
.about("for troubleshooting: displays the computed HTTP API endpoint URI"),
]
fn show_subcommands() -> [Command; 4] {
let overview_cmd = Command::new("overview")
.about("displays a essential information about target node and its cluster");
let churn_cmd = Command::new("churn").about("displays object churn metrics");
let endpoint_cmd = Command::new("endpoint")
.about("for troubleshooting: displays the computed HTTP API endpoint URI");
let memory_breakdown_cmd = Command::new("memory_breakdown")
.about("use it to understand what consumes memory on the target node")
.arg(
Arg::new("node")
.long("node")
.help("target node, must be a cluster member")
.required(true),
)
.after_long_help(color_print::cformat!(
"<bold>Doc guide:</bold>: {}",
MEMORY_FOOTPRINT_GUIDE_URL
));

[overview_cmd, churn_cmd, endpoint_cmd, memory_breakdown_cmd]
}

fn delete_subcommands() -> [Command; 11] {
15 changes: 12 additions & 3 deletions src/commands.rs
Original file line number Diff line number Diff line change
@@ -28,15 +28,24 @@ use rabbitmq_http_client::requests::EnforcedLimitParams;
use crate::constants::DEFAULT_QUEUE_TYPE;
use rabbitmq_http_client::commons::BindingDestinationType;
use rabbitmq_http_client::commons::QueueType;
use rabbitmq_http_client::responses::{FeatureFlagList, Overview};
use rabbitmq_http_client::{password_hashing, requests, responses};

type APIClient<'a> = Client<&'a str, &'a str, &'a str>;

pub fn show_overview(client: APIClient) -> ClientResult<Overview> {
pub fn show_overview(client: APIClient) -> ClientResult<responses::Overview> {
client.overview()
}

pub fn show_memory_breakdown(
client: APIClient,
command_args: &ArgMatches,
) -> ClientResult<responses::NodeMemoryBreakdown> {
let node = command_args.get_one::<String>("node").unwrap();
client
.get_node_memory_footprint(node)
.map(|footprint| footprint.breakdown)
}

pub fn list_nodes(client: APIClient) -> ClientResult<Vec<responses::ClusterNode>> {
client.list_nodes()
}
@@ -122,7 +131,7 @@ pub fn list_parameters(
}
}

pub fn list_feature_flags(client: APIClient) -> ClientResult<FeatureFlagList> {
pub fn list_feature_flags(client: APIClient) -> ClientResult<responses::FeatureFlagList> {
client.list_feature_flags()
}

4 changes: 4 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -253,6 +253,10 @@ fn dispatch_subcommand(
println!("Using endpoint: {}", endpoint);
res_handler.no_output_on_success(Ok(()))
}
("show", "memory_breakdown") => {
let result = commands::show_memory_breakdown(client, command_args);
res_handler.memory_breakdown_result(result)
}

("list", "nodes") => {
let result = commands::list_nodes(client);
18 changes: 16 additions & 2 deletions src/output.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use crate::errors::CommandRunError;
// Copyright (C) 2023-2025 RabbitMQ Core Team ([email protected])
//
// Licensed under the Apache License, Version 2.0 (the "License");
@@ -13,11 +12,12 @@ use crate::errors::CommandRunError;
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::config::SharedSettings;
use crate::errors::CommandRunError;
use crate::tables;
use clap::ArgMatches;
use rabbitmq_http_client::blocking_api::{HttpClientError, Result as ClientResult};
use rabbitmq_http_client::error::Error as ClientError;
use rabbitmq_http_client::responses::Overview;
use rabbitmq_http_client::responses::{NodeMemoryBreakdown, Overview};
use reqwest::StatusCode;
use std::fmt;
use sysexits::ExitCode;
@@ -130,6 +130,20 @@ impl ResultHandler {
}
}

pub fn memory_breakdown_result(&mut self, result: ClientResult<NodeMemoryBreakdown>) {
match result {
Ok(output) => {
self.exit_code = Some(ExitCode::Ok);

let mut table = tables::memory_breakdown(output);
self.table_styler.apply(&mut table);

println!("{}", table);
}
Err(error) => self.report_command_run_error(&error),
}
}

pub fn no_output_on_success<T>(&mut self, result: ClientResult<T>) {
match result {
Ok(_) => {
1 change: 1 addition & 0 deletions src/static_urls.rs
Original file line number Diff line number Diff line change
@@ -40,6 +40,7 @@ pub(crate) const DEPRECATED_FEATURE_GUIDE_URL: &str =
pub(crate) const ACCESS_CONTROL_GUIDE_URL: &str = "https://rabbitmq.com/docs/access-control";
pub(crate) const HTTP_API_ACCESS_PERMISSIONS_GUIDE_URL: &str =
"https://rabbitmq.com/docs/management#permissions";
pub(crate) const MEMORY_FOOTPRINT_GUIDE_URL: &str = "https://www.rabbitmq.com/docs/memory-use";
pub(crate) const DEFINITION_GUIDE_URL: &str = "https://rabbitmq.com/docs/definitions";
pub(crate) const CONSUMER_GUIDE_URL: &str = "https://rabbitmq.com/docs/consumers";
pub(crate) const POLLING_CONSUMER_GUIDE_URL: &str = "https://rabbitmq.com/docs/consumers#polling";
142 changes: 139 additions & 3 deletions src/tables.rs
Original file line number Diff line number Diff line change
@@ -13,7 +13,8 @@
// limitations under the License.
use rabbitmq_http_client::blocking_api::HttpClientError;
use rabbitmq_http_client::responses::{
ClusterAlarmCheckDetails, HealthCheckFailureDetails, Overview, QuorumCriticalityCheckDetails,
ClusterAlarmCheckDetails, HealthCheckFailureDetails, NodeMemoryBreakdown, Overview,
QuorumCriticalityCheckDetails,
};
use reqwest::StatusCode;
use tabled::settings::Panel;
@@ -27,9 +28,9 @@ struct OverviewRow<'a> {
}

#[derive(Debug, Tabled)]
struct RowOfTwoStrings<'a> {
struct RowOfTwoStrings<'a, T: ?Sized + std::fmt::Display> {
key: &'a str,
value: &'a str,
value: &'a T,
}

pub fn overview(ov: Overview) -> Table {
@@ -336,3 +337,138 @@ pub fn health_check_failure(

tb.build()
}

pub(crate) fn memory_breakdown(breakdown: NodeMemoryBreakdown) -> Table {
// There is no easy way to transpose an existing table in Tabled, so…
let atom_table_val = breakdown.atom_table;
let allocated_but_unused_val = breakdown.allocated_but_unused;
let binary_heap_val = breakdown.binary_heap;
let classic_queue_procs_val = breakdown.classic_queue_procs;
let code_val = breakdown.code;
let connection_channels_val = breakdown.connection_channels;
let connection_readers_val = breakdown.connection_readers;
let connection_writers_val = breakdown.connection_writers;
let connection_other_val = breakdown.connection_other;
let management_db_val = breakdown.management_db;
let message_indices_val = breakdown.message_indices;
let metadata_store_val = breakdown.metadata_store;
let metadata_store_ets_tables_val = breakdown.metadata_store_ets_tables;
let metrics_val = breakdown.metrics;
let mnesia_val = breakdown.mnesia;
let other_ets_tables_val = breakdown.other_ets_tables;
let other_system_val = breakdown.other_system;
let other_procs_val = breakdown.other_procs;
let quorum_queue_procs_val = breakdown.quorum_queue_procs;
let quorum_queue_ets_tables_val = breakdown.quorum_queue_ets_tables;
let plugins_val = breakdown.plugins;
let reserved_but_unallocated_val = breakdown.reserved_but_unallocated;
let stream_queue_procs_val = breakdown.stream_queue_procs;
let stream_queue_replica_reader_procs_val = breakdown.stream_queue_replica_reader_procs;
let stream_queue_coordinator_procs_val = breakdown.stream_queue_coordinator_procs;
let mut data: Vec<RowOfTwoStrings<u64>> = vec![
RowOfTwoStrings {
key: "Atom table",
value: &atom_table_val,
},
RowOfTwoStrings {
key: "Allocated but unused",
value: &allocated_but_unused_val,
},
RowOfTwoStrings {
key: "Binary heap",
value: &binary_heap_val,
},
RowOfTwoStrings {
key: "Classic queue processes",
value: &classic_queue_procs_val,
},
RowOfTwoStrings {
key: "Code ",
value: &code_val,
},
RowOfTwoStrings {
key: "AMQP 0-9-1 channels",
value: &connection_channels_val,
},
RowOfTwoStrings {
key: "Client connections: reader processes",
value: &connection_readers_val,
},
RowOfTwoStrings {
key: "Client connections: writer processes",
value: &connection_writers_val,
},
RowOfTwoStrings {
key: "Client connections: others processes",
value: &connection_other_val,
},
RowOfTwoStrings {
key: "Management stats database",
value: &management_db_val,
},
RowOfTwoStrings {
key: "Message store indices",
value: &message_indices_val,
},
RowOfTwoStrings {
key: "Metadata store",
value: &metadata_store_val,
},
RowOfTwoStrings {
key: "Metadata store ETS tables",
value: &metadata_store_ets_tables_val,
},
RowOfTwoStrings {
key: "Metrics data",
value: &metrics_val,
},
RowOfTwoStrings {
key: "Mnesia",
value: &mnesia_val,
},
RowOfTwoStrings {
key: "Other (ETS tables)",
value: &other_ets_tables_val,
},
RowOfTwoStrings {
key: "Other (used by the runtime)",
value: &other_system_val,
},
RowOfTwoStrings {
key: "Other processes",
value: &other_procs_val,
},
RowOfTwoStrings {
key: "Quorum queue replica processes",
value: &quorum_queue_procs_val,
},
RowOfTwoStrings {
key: "Quorum queue ETS tables",
value: &quorum_queue_ets_tables_val,
},
RowOfTwoStrings {
key: "Plugins and their data",
value: &plugins_val,
},
RowOfTwoStrings {
key: "Reserved by the kernel but unallocated",
value: &reserved_but_unallocated_val,
},
RowOfTwoStrings {
key: "Stream replica processes",
value: &stream_queue_procs_val,
},
RowOfTwoStrings {
key: "Stream replica reader processes",
value: &stream_queue_replica_reader_procs_val,
},
RowOfTwoStrings {
key: "Stream coordinator processes",
value: &stream_queue_coordinator_procs_val,
},
];
// Note: this is descending ordering
data.sort_by(|a, b| b.value.cmp(a.value));
let tb = Table::builder(data);
tb.build()
}
33 changes: 33 additions & 0 deletions tests/memory_breakdown_tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright (C) 2023-2025 RabbitMQ Core Team ([email protected])
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use predicates::prelude::*;

mod test_helpers;
use crate::test_helpers::*;

#[test]
fn test_memory_breakdown_succeeds() -> Result<(), Box<dyn std::error::Error>> {
let rc = api_client();
let nodes = rc.list_nodes()?;
let first = nodes.get(0).unwrap();

run_succeeds(["show", "memory_breakdown", "--node", first.name.as_str()]).stdout(
predicates::str::contains("Allocated but unused")
.and(predicates::str::contains("Quorum queue ETS tables"))
.and(predicates::str::contains("Client connections"))
.and(predicates::str::contains("Metadata store")),
);

Ok(())
}
16 changes: 16 additions & 0 deletions tests/test_helpers.rs
Original file line number Diff line number Diff line change
@@ -21,8 +21,24 @@ use assert_cmd::assert::Assert;
use assert_cmd::prelude::*;
use std::process::Command;

use rabbitmq_http_client::blocking_api::Client as GenericAPIClient;

type APIClient<'a> = GenericAPIClient<&'a str, &'a str, &'a str>;

type CommandRunResult = Result<(), Box<dyn std::error::Error>>;

pub const ENDPOINT: &str = "http://localhost:15672/api";
pub const USERNAME: &str = "guest";
pub const PASSWORD: &str = "guest";

pub fn endpoint() -> String {
ENDPOINT.to_owned()
}

pub fn api_client() -> APIClient<'static> {
APIClient::new(ENDPOINT, USERNAME, PASSWORD)
}

pub fn await_metric_emission(ms: u64) {
std::thread::sleep(Duration::from_millis(ms));
}

0 comments on commit f4390b4

Please sign in to comment.