Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PoC] TPU receiver prometheus metrics #76

Draft
wants to merge 2 commits into
base: prometheus
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ solana-streamer = { path = "../streamer", version = "=1.10.39" }
solana-transaction-status = { path = "../transaction-status", version = "=1.10.39" }
solana-version = { path = "../version", version = "=1.10.39" }
solana-vote-program = { path = "../programs/vote", version = "=1.10.39" }
solana-prometheus = { path = "../prometheus" }
sys-info = "0.9.1"
tempfile = "3.3.0"
thiserror = "1.0"
Expand Down
13 changes: 13 additions & 0 deletions core/src/fetch_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use {
time::Duration,
},
};
use solana_prometheus::collector::{PrometheusCollector};

pub struct FetchStage {
thread_hdls: Vec<JoinHandle<()>>,
Expand All @@ -39,6 +40,7 @@ impl FetchStage {
tpu_vote_sockets: Vec<UdpSocket>,
exit: &Arc<AtomicBool>,
poh_recorder: &Arc<Mutex<PohRecorder>>,
prometheus_collector: Option<PrometheusCollector>,
coalesce_ms: u64,
) -> (Self, PacketBatchReceiver, PacketBatchReceiver) {
let (sender, receiver) = unbounded();
Expand All @@ -56,6 +58,7 @@ impl FetchStage {
forward_receiver,
poh_recorder,
coalesce_ms,
prometheus_collector,
None,
),
receiver,
Expand All @@ -75,6 +78,7 @@ impl FetchStage {
forward_receiver: PacketBatchReceiver,
poh_recorder: &Arc<Mutex<PohRecorder>>,
coalesce_ms: u64,
prometheus_collector: Option<PrometheusCollector>,
in_vote_only_mode: Option<Arc<AtomicBool>>,
) -> Self {
let tx_sockets = sockets.into_iter().map(Arc::new).collect();
Expand All @@ -91,6 +95,7 @@ impl FetchStage {
forward_receiver,
poh_recorder,
coalesce_ms,
prometheus_collector,
in_vote_only_mode,
)
}
Expand Down Expand Up @@ -149,6 +154,7 @@ impl FetchStage {
forward_receiver: PacketBatchReceiver,
poh_recorder: &Arc<Mutex<PohRecorder>>,
coalesce_ms: u64,
prometheus_collector: Option<PrometheusCollector>,
in_vote_only_mode: Option<Arc<AtomicBool>>,
) -> Self {
let recycler: PacketBatchRecycler = Recycler::warmed(1000, 1024);
Expand Down Expand Up @@ -234,6 +240,13 @@ impl FetchStage {
tpu_vote_stats.report();
tpu_forward_stats.report();

if let Some(collector) = &prometheus_collector {
let stats = {
tpu_stats.total_stats.lock().unwrap().clone()
};
collector.lock().unwrap().save_tpu_receiver_stats(stats)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, this will keep one copy of the metrics in with the Prometheus config, and periodically “push” to it, and then the endpoint serves whatever the latest value is that it had.

This creates a polling delay, the value that we serve on /metrics can be outdated by the cycle interval of this loop. It looks like that internal is 1 second, so it’s not too bad, but it would be nicer if we can get the actual current values. If we can get the StreamerReceiveStats into the Prometheus endpoint, it could do the atomic reads there, but it means we would need to make them totals. We can handle the resetting here at this point then.

Something like this:

struct GoatCounter<T> {
    num_goats_teleported: T,
    num_goats_received: T,
}

type GoatCounterCurrent = GoatCounter<AtomicU64>;
type GoatCounterSnapshot = GoatCounter<u64>;

impl GoatCounterCurrent {
    pub fn snapshot(&self) -> GoatCounterSnapshot {
        GoatCounterSnapshot {
            num_goats_teleported: self.num_goats_teleported.load(Ordering::Relaxed),
            num_goats_received: self.num_goats_received.load(Ordering::Relaxed),
        }
    }
}

impl std::ops::Sub for GoatCounterSnapshot {
    fn sub(self, rhs: Self) -> Self {
        Self {
            num_goats_teleported: self.num_goats_teleported - rhs.num_goats_teleported,
            num_goats_received: self.num_goats_received - rhs.num_goats_received,
        }
    }
}

// In the place where we print the metrics:
let mut last_logged: GoatCounterSnapshot = todo!();
let current = goat_counter.snapshot();
log(current - last_logged);
last_logged = current;

// In the Prometheus endpoint:
let current = goat_counter.snapshot();
write_metrics(&current, ...);

Not sure if this is feasible in practice though, it looks like there is something going on with accumulation over multiple threads? Also, having the separate fields as atomic ints can lead to surprising results. For example, if you are trying to compute an error rate, and you have counters num_requests and num_errors, where it should be the case that num_errors <= num_requests, then if we load those counters with Ordering::Relaxed, that inequality could be violated. It requires some thought both at the place where the counters are read and the place where they are incremented to get it right, I’m not sure Solana does this, since they prioritize speed over correctness, and the Ordering::Relaxed does give the compiler and CPU more freedom to optimize than some of the other orderings ...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I see, so that would basically result in changing how the logic of current Solana metrics are implemented. I would be a bit hesitant to change this Solana metrics logic because to make it consistent we would likely need to do it in all places this pattern is followed (which seems to be a lot).

Not sure if this is feasible in practice though, it looks like there is something going on with accumulation over multiple threads?

Yes, that is what is happening.

Also, having the separate fields as atomic ints can lead to surprising results. For example, if you are trying to compute an error rate, and you have counters num_requests and num_errors

Yeah there are definitely possible inconsistencies, but I guess that is really common in metrics. Using the example of num_requests and num_errors in some other application, num_requests is likely to be incremented at the beginning of handling the request while num_errors for obvious reasons can only be incremented later, so it won't be atomic operation.

P.S. I now want to see this whole application for teleporting goats 🤣

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

num_requests is likely to be incremented at the beginning of handling the request while num_errors for obvious reasons can only be incremented later, so it won't be atomic operation.

That by itself is not an issue, since you still couldn’t get a >100% error rate. If you increment the error counter before the request counter, then you could get a >100% error rate, which is really counter-intuitive. (Or if you read the request counter before the error counter.)

I now want to see this whole application for teleporting goats

You might enjoy these :-)
https://crbug.com/31482
https://codereview.chromium.org/543045

}

if exit.load(Ordering::Relaxed) {
return;
}
Expand Down
3 changes: 3 additions & 0 deletions core/src/tpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use {
time::Duration,
},
};
use solana_prometheus::collector::PrometheusCollector;

pub const DEFAULT_TPU_COALESCE_MS: u64 = 5;

Expand Down Expand Up @@ -99,6 +100,7 @@ impl Tpu {
cost_model: &Arc<RwLock<CostModel>>,
connection_cache: &Arc<ConnectionCache>,
keypair: &Keypair,
prometheus_collector: Option<PrometheusCollector>,
enable_quic_servers: bool,
) -> Self {
let TpuSockets {
Expand All @@ -124,6 +126,7 @@ impl Tpu {
forwarded_packet_receiver,
poh_recorder,
tpu_coalesce_ms,
prometheus_collector,
Some(bank_forks.read().unwrap().get_vote_only_mode_signal()),
);

Expand Down
4 changes: 4 additions & 0 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ use {
time::{Duration, Instant},
},
};
use solana_prometheus::collector::PrometheusCollector;

const MAX_COMPLETED_DATA_SETS_IN_CHANNEL: usize = 100_000;
const WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT: u64 = 80;
Expand Down Expand Up @@ -379,6 +380,7 @@ impl Validator {
socket_addr_space: SocketAddrSpace,
use_quic: bool,
tpu_connection_pool_size: usize,
prometheus_collector: Option<PrometheusCollector>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although it makes sense to put new arguments at the end of the function parameters, to avoid conflicts, it's better to put new arguments in the middle of the other arguments, that makes rebasing easier if something changes in the arguments.

) -> Self {
let id = identity_keypair.pubkey();
assert_eq!(id, node.info.id);
Expand Down Expand Up @@ -740,6 +742,7 @@ impl Validator {
connection_cache.clone(),
max_complete_transaction_status_slot,
config.vote_accounts_to_monitor.clone(),
prometheus_collector.clone(),
)),
if !config.rpc_config.full_api {
None
Expand Down Expand Up @@ -996,6 +999,7 @@ impl Validator {
&cost_model,
&connection_cache,
&identity_keypair,
prometheus_collector,
config.enable_quic_servers,
);

Expand Down
1 change: 1 addition & 0 deletions prometheus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ serde = { version = "1.0.136", features = ["derive"] }
serde_json = "1.0.79"
solana-gossip = { path = "../gossip" }
solana-runtime = { path = "../runtime" }
solana-streamer = { path = "../streamer" }
solana-sdk = { path = "../sdk" }
solana-vote-program = { path = "../programs/vote" }
solana-config-program = { path = "../programs/config" }
Expand Down
42 changes: 42 additions & 0 deletions prometheus/src/collector.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
use crate::utils::{write_metric, Metric, MetricFamily};
use solana_streamer::streamer::StreamerReceiveStatsTotal;
use std::io;
use std::sync::{Arc, Mutex};

pub type PrometheusCollector = Arc<Mutex<MetricsCollector>>;

pub struct MetricsCollector {
tpu_receiver_stats: Option<StreamerReceiveStatsTotal>,
}

impl MetricsCollector {
pub fn new() -> Self {
Self {
tpu_receiver_stats: None,
}
}

pub fn save_tpu_receiver_stats(&mut self, stats: StreamerReceiveStatsTotal) {
self.tpu_receiver_stats = Some(stats)
}

pub fn write_metrics<W: io::Write>(&self, out: &mut W) -> io::Result<()> {
if self.tpu_receiver_stats.is_none() {
return Ok(());
}

let tpu_metrics = self.tpu_receiver_stats.as_ref().unwrap();

write_metric(
out,
&MetricFamily {
name: "solana_validator_tpu_packets_count_total",
help: "Packets received by Transaction Processing Unit",
type_: "counter",
metrics: vec![Metric::new(tpu_metrics.packets_count_total as u64)],
},
)?;

Ok(())
}
}
12 changes: 12 additions & 0 deletions prometheus/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
mod bank_metrics;
pub mod banks_with_commitments;
mod cluster_metrics;
pub mod collector;
pub mod identity_info;
mod snapshot_metrics;
mod utils;

use crate::collector::PrometheusCollector;
use banks_with_commitments::BanksWithCommitments;
use identity_info::IdentityInfoMap;
use solana_gossip::cluster_info::ClusterInfo;
Expand All @@ -21,6 +23,7 @@ pub fn render_prometheus(
vote_accounts: &Arc<HashSet<Pubkey>>,
identity_config: &Arc<IdentityInfoMap>,
snapshot_config: &Option<SnapshotConfig>,
collector: &Option<PrometheusCollector>,
) -> Vec<u8> {
// There are 3 levels of commitment for a bank:
// - finalized: most recent block *confirmed* by supermajority of the
Expand All @@ -41,5 +44,14 @@ pub fn render_prometheus(
if let Some(snapshot_config) = snapshot_config {
snapshot_metrics::write_snapshot_metrics(snapshot_config, &mut out).expect("IO error");
}

if let Some(collector) = collector {
collector
.lock()
.unwrap()
.write_metrics(&mut out)
.expect("IO error");
}

out
}
5 changes: 5 additions & 0 deletions replica-node/src/replica_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ pub struct ReplicaNodeConfig {
pub replica_exit: Arc<RwLock<Exit>>,
pub socket_addr_space: SocketAddrSpace,
pub vote_accounts_to_monitor: Arc<HashSet<Pubkey>>,
// TODO: for now it does not makes sense to pass it to RPC as we
// only get TPU metrics.
// pub prometheus_collector: PrometheusCollector,
}

pub struct ReplicaNode {
Expand Down Expand Up @@ -257,6 +260,8 @@ fn start_client_rpc_services(
connection_cache,
max_complete_transaction_status_slot,
replica_config.vote_accounts_to_monitor.clone(),
// replica_config.prometheus_collector.clone(),
None,
)),
Some(pubsub_service),
Some(OptimisticallyConfirmedBankTracker::new(
Expand Down
7 changes: 7 additions & 0 deletions rpc/src/rpc_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ use {
},
tokio_util::codec::{BytesCodec, FramedRead},
};
use solana_prometheus::collector::PrometheusCollector;

const FULL_SNAPSHOT_REQUEST_PATH: &str = "/snapshot.tar.bz2";
const INCREMENTAL_SNAPSHOT_REQUEST_PATH: &str = "/incremental-snapshot.tar.bz2";
Expand All @@ -82,6 +83,7 @@ struct RpcRequestMiddleware {
/// Initialized based on vote_accounts_to_monitor, maps identity
/// pubkey associated with the vote account to the validator info.
identity_info_map: Arc<IdentityInfoMap>,
prometheus_collector: Option<PrometheusCollector>,
}

impl RpcRequestMiddleware {
Expand All @@ -92,6 +94,7 @@ impl RpcRequestMiddleware {
health: Arc<RpcHealth>,
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
vote_accounts_to_monitor: Arc<HashSet<Pubkey>>,
prometheus_collector: Option<PrometheusCollector>,
) -> Self {
Self {
ledger_path,
Expand All @@ -112,6 +115,7 @@ impl RpcRequestMiddleware {
health,
block_commitment_cache,
vote_accounts_to_monitor,
prometheus_collector,
}
}

Expand Down Expand Up @@ -319,6 +323,7 @@ impl RequestMiddleware for RpcRequestMiddleware {
&self.vote_accounts_to_monitor,
&self.identity_info_map,
&self.snapshot_config,
&self.prometheus_collector,
)))
.unwrap()
.into()
Expand Down Expand Up @@ -375,6 +380,7 @@ impl JsonRpcService {
connection_cache: Arc<ConnectionCache>,
current_transaction_status_slot: Arc<AtomicU64>,
vote_accounts_to_monitor: Arc<HashSet<Pubkey>>,
prometheus_collector: Option<PrometheusCollector>,
) -> Self {
info!("rpc bound to {:?}", rpc_addr);
info!("rpc configuration: {:?}", config);
Expand Down Expand Up @@ -524,6 +530,7 @@ impl JsonRpcService {
health.clone(),
block_commitment_cache.clone(),
vote_accounts_to_monitor,
prometheus_collector,
);
let server = ServerBuilder::with_meta_extractor(
io,
Expand Down
Loading