diff --git a/crates/sui-node/src/lib.rs b/crates/sui-node/src/lib.rs index 11440dae58c17..bf62dc02f9320 100644 --- a/crates/sui-node/src/lib.rs +++ b/crates/sui-node/src/lib.rs @@ -21,7 +21,6 @@ use arc_swap::ArcSwap; use fastcrypto_zkp::bn254::zk_login::JwkId; use fastcrypto_zkp::bn254::zk_login::OIDCProvider; use futures::TryFutureExt; -use mysten_common::sync::async_once_cell::AsyncOnceCell; use prometheus::Registry; use sui_core::authority::CHAIN_IDENTIFIER; use sui_core::consensus_adapter::LazyNarwhalClient; @@ -120,7 +119,7 @@ use sui_types::sui_system_state::SuiSystemStateTrait; use typed_store::rocks::default_db_options; use typed_store::DBMetrics; -use crate::metrics::GrpcMetrics; +use crate::metrics::{GrpcMetrics, SuiNodeMetrics}; pub mod admin; mod handle; @@ -204,6 +203,7 @@ pub struct SuiNode { state: Arc, transaction_orchestrator: Option>>, registry_service: RegistryService, + metrics: Arc, _discovery: discovery::Handle, state_sync: state_sync::Handle, @@ -244,19 +244,12 @@ impl SuiNode { registry_service: RegistryService, custom_rpc_runtime: Option, ) -> Result> { - let node_one_cell = Arc::new(AsyncOnceCell::>::new()); - Self::start_async( - config, - registry_service, - node_one_cell.clone(), - custom_rpc_runtime, - ) - .await?; - Ok(node_one_cell.get().await) + Self::start_async(config, registry_service, custom_rpc_runtime).await } fn start_jwk_updater( config: &NodeConfig, + metrics: Arc, authority: AuthorityName, epoch_store: Arc, consensus_adapter: Arc, @@ -276,13 +269,19 @@ impl SuiNode { "Starting JWK updater tasks with supported providers: {:?}", supported_providers ); - fn validate_jwk(provider: &OIDCProvider, id: &JwkId, jwk: &JWK) -> bool { + fn validate_jwk( + metrics: &Arc, + provider: &OIDCProvider, + id: &JwkId, + jwk: &JWK, + ) -> bool { let Ok(iss_provider) = OIDCProvider::from_iss(&id.iss) else { warn!( "JWK iss {:?} (retrieved from {:?}) is not a valid provider", id.iss, provider ); + metrics.invalid_jwks.with_label_values(&[&provider.to_string()]).inc(); return false; }; @@ -291,20 +290,38 @@ impl SuiNode { "JWK iss {:?} (retrieved from {:?}) does not match provider {:?}", id.iss, provider, iss_provider ); + metrics + .invalid_jwks + .with_label_values(&[&provider.to_string()]) + .inc(); return false; } if !check_total_jwk_size(id, jwk) { warn!("JWK {:?} (retrieved from {:?}) is too large", id, provider); + metrics + .invalid_jwks + .with_label_values(&[&provider.to_string()]) + .inc(); return false; } true } + // metrics is: + // pub struct SuiNodeMetrics { + // pub jwk_requests: IntCounterVec, + // pub jwk_request_errors: IntCounterVec, + // pub total_jwks: IntCounterVec, + // pub unique_jwks: IntCounterVec, + // } + for p in supported_providers.into_iter() { + let provider_str = p.to_string(); let epoch_store = epoch_store.clone(); let consensus_adapter = consensus_adapter.clone(); + let metrics = metrics.clone(); spawn_monitored_task!(epoch_store.clone().within_alive_epoch( async move { // note: restart-safe de-duplication happens after consensus, this is @@ -312,20 +329,30 @@ impl SuiNode { let mut seen = HashSet::new(); loop { info!("fetching JWK for provider {:?}", p); + metrics.jwk_requests.with_label_values(&[&provider_str]).inc(); match Self::fetch_jwks(authority, &p).await { Err(e) => { + metrics.jwk_request_errors.with_label_values(&[&provider_str]).inc(); warn!("Error when fetching JWK {:?}", e); // Retry in 30 seconds tokio::time::sleep(Duration::from_secs(30)).await; continue; } Ok(mut keys) => { + metrics.total_jwks + .with_label_values(&[&provider_str]) + .inc_by(keys.len() as u64); + keys.retain(|(id, jwk)| { - validate_jwk(&p, id, jwk) && + validate_jwk(&metrics, &p, id, jwk) && !epoch_store.jwk_active_in_current_epoch(id, jwk) && seen.insert((id.clone(), jwk.clone())) }); + metrics.unique_jwks + .with_label_values(&[&provider_str]) + .inc_by(keys.len() as u64); + // prevent oauth providers from sending too many keys, // inadvertently or otherwise if keys.len() > MAX_JWK_KEYS_PER_FETCH { @@ -354,9 +381,8 @@ impl SuiNode { pub async fn start_async( config: &NodeConfig, registry_service: RegistryService, - node_once_cell: Arc>>, custom_rpc_runtime: Option, - ) -> Result<()> { + ) -> Result> { NodeConfigMetrics::new(®istry_service.default_registry()).record_metrics(config); let mut config = config.clone(); if config.supported_protocol_versions.is_none() { @@ -627,6 +653,7 @@ impl SuiNode { }; let connection_monitor_status = Arc::new(connection_monitor_status); + let sui_node_metrics = Arc::new(SuiNodeMetrics::new(®istry_service.default_registry())); let validator_components = if state.is_validator(&epoch_store) { let components = Self::construct_validator_components( @@ -639,6 +666,7 @@ impl SuiNode { accumulator.clone(), connection_monitor_status.clone(), ®istry_service, + sui_node_metrics.clone(), ) .await?; // This is only needed during cold start. @@ -656,6 +684,7 @@ impl SuiNode { state, transaction_orchestrator, registry_service, + metrics: sui_node_metrics, _discovery: discovery_handle, state_sync: state_sync_handle, @@ -680,10 +709,7 @@ impl SuiNode { let node_copy = node.clone(); spawn_monitored_task!(async move { Self::monitor_reconfiguration(node_copy).await }); - node_once_cell - .set(node) - .expect("Failed to set Arc in node_once_cell"); - Ok(()) + Ok(node) } pub fn subscribe_to_epoch_change(&self) -> broadcast::Receiver { @@ -944,6 +970,7 @@ impl SuiNode { accumulator: Arc, connection_monitor_status: Arc, registry_service: &RegistryService, + sui_node_metrics: Arc, ) -> Result { let consensus_config = config .consensus_config() @@ -989,6 +1016,7 @@ impl SuiNode { accumulator, validator_server_handle, checkpoint_metrics, + sui_node_metrics, sui_tx_validator_metrics, ) .await @@ -1006,6 +1034,7 @@ impl SuiNode { accumulator: Arc, validator_server_handle: JoinHandle>, checkpoint_metrics: Arc, + sui_node_metrics: Arc, sui_tx_validator_metrics: Arc, ) -> Result { let (checkpoint_service, checkpoint_service_exit) = Self::start_checkpoint_service( @@ -1066,6 +1095,7 @@ impl SuiNode { if epoch_store.authenticator_state_enabled() { Self::start_jwk_updater( config, + sui_node_metrics, state.name, epoch_store.clone(), consensus_adapter.clone(), @@ -1402,6 +1432,7 @@ impl SuiNode { self.accumulator.clone(), validator_server_handle, checkpoint_metrics, + self.metrics.clone(), sui_tx_validator_metrics, ) .await?, @@ -1435,6 +1466,7 @@ impl SuiNode { self.accumulator.clone(), self.connection_monitor_status.clone(), &self.registry_service, + self.metrics.clone(), ) .await?, ) diff --git a/crates/sui-node/src/main.rs b/crates/sui-node/src/main.rs index 877b5bd113204..503b5ff04fe04 100644 --- a/crates/sui-node/src/main.rs +++ b/crates/sui-node/src/main.rs @@ -104,16 +104,15 @@ fn main() { let rpc_runtime = runtimes.json_rpc.handle().clone(); runtimes.sui_node.spawn(async move { - if let Err(e) = sui_node::SuiNode::start_async( - &config, - registry_service, - node_once_cell_clone, - Some(rpc_runtime), - ) - .await - { - error!("Failed to start node: {e:?}"); - std::process::exit(1) + match sui_node::SuiNode::start_async(&config, registry_service, Some(rpc_runtime)).await { + Ok(sui_node) => node_once_cell_clone + .set(sui_node) + .expect("Failed to set node in AsyncOnceCell"), + + Err(e) => { + error!("Failed to start node: {e:?}"); + std::process::exit(1); + } } // TODO: Do we want to provide a way for the node to gracefully shutdown? loop { diff --git a/crates/sui-node/src/metrics.rs b/crates/sui-node/src/metrics.rs index 058d11668fd31..7328f20a55114 100644 --- a/crates/sui-node/src/metrics.rs +++ b/crates/sui-node/src/metrics.rs @@ -191,6 +191,57 @@ pub fn start_metrics_push_task(config: &sui_config::NodeConfig, registry: Regist }); } +pub struct SuiNodeMetrics { + pub jwk_requests: IntCounterVec, + pub jwk_request_errors: IntCounterVec, + + pub total_jwks: IntCounterVec, + pub invalid_jwks: IntCounterVec, + pub unique_jwks: IntCounterVec, +} + +impl SuiNodeMetrics { + pub fn new(registry: &Registry) -> Self { + Self { + jwk_requests: register_int_counter_vec_with_registry!( + "jwk_requests", + "Total number of JWK requests", + &["provider"], + registry, + ) + .unwrap(), + jwk_request_errors: register_int_counter_vec_with_registry!( + "jwk_request_errors", + "Total number of JWK request errors", + &["provider"], + registry, + ) + .unwrap(), + total_jwks: register_int_counter_vec_with_registry!( + "total_jwks", + "Total number of JWKs", + &["provider"], + registry, + ) + .unwrap(), + invalid_jwks: register_int_counter_vec_with_registry!( + "invalid_jwks", + "Total number of invalid JWKs", + &["provider"], + registry, + ) + .unwrap(), + unique_jwks: register_int_counter_vec_with_registry!( + "unique_jwks", + "Total number of unique JWKs", + &["provider"], + registry, + ) + .unwrap(), + } + } +} + #[derive(Clone)] pub struct GrpcMetrics { inflight_grpc: IntGaugeVec,