Skip to content

Commit

Permalink
Add jwk-fetching related metrics (MystenLabs#14021)
Browse files Browse the repository at this point in the history
- Add JWK fetch metrics
- Remove unnecessary AsyncOnceCell
  • Loading branch information
mystenmark authored Sep 29, 2023
1 parent 16e1c8e commit 3aa6cdc
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 29 deletions.
70 changes: 51 additions & 19 deletions crates/sui-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use arc_swap::ArcSwap;
use fastcrypto_zkp::bn254::zk_login::JwkId;
use fastcrypto_zkp::bn254::zk_login::OIDCProvider;
use futures::TryFutureExt;
use mysten_common::sync::async_once_cell::AsyncOnceCell;
use prometheus::Registry;
use sui_core::authority::CHAIN_IDENTIFIER;
use sui_core::consensus_adapter::LazyNarwhalClient;
Expand Down Expand Up @@ -120,7 +119,7 @@ use sui_types::sui_system_state::SuiSystemStateTrait;
use typed_store::rocks::default_db_options;
use typed_store::DBMetrics;

use crate::metrics::GrpcMetrics;
use crate::metrics::{GrpcMetrics, SuiNodeMetrics};

pub mod admin;
mod handle;
Expand Down Expand Up @@ -204,6 +203,7 @@ pub struct SuiNode {
state: Arc<AuthorityState>,
transaction_orchestrator: Option<Arc<TransactiondOrchestrator<NetworkAuthorityClient>>>,
registry_service: RegistryService,
metrics: Arc<SuiNodeMetrics>,

_discovery: discovery::Handle,
state_sync: state_sync::Handle,
Expand Down Expand Up @@ -244,19 +244,12 @@ impl SuiNode {
registry_service: RegistryService,
custom_rpc_runtime: Option<Handle>,
) -> Result<Arc<SuiNode>> {
let node_one_cell = Arc::new(AsyncOnceCell::<Arc<SuiNode>>::new());
Self::start_async(
config,
registry_service,
node_one_cell.clone(),
custom_rpc_runtime,
)
.await?;
Ok(node_one_cell.get().await)
Self::start_async(config, registry_service, custom_rpc_runtime).await
}

fn start_jwk_updater(
config: &NodeConfig,
metrics: Arc<SuiNodeMetrics>,
authority: AuthorityName,
epoch_store: Arc<AuthorityPerEpochStore>,
consensus_adapter: Arc<ConsensusAdapter>,
Expand All @@ -276,13 +269,19 @@ impl SuiNode {
"Starting JWK updater tasks with supported providers: {:?}", supported_providers
);

fn validate_jwk(provider: &OIDCProvider, id: &JwkId, jwk: &JWK) -> bool {
fn validate_jwk(
metrics: &Arc<SuiNodeMetrics>,
provider: &OIDCProvider,
id: &JwkId,
jwk: &JWK,
) -> bool {
let Ok(iss_provider) = OIDCProvider::from_iss(&id.iss) else {
warn!(
"JWK iss {:?} (retrieved from {:?}) is not a valid provider",
id.iss,
provider
);
metrics.invalid_jwks.with_label_values(&[&provider.to_string()]).inc();
return false;
};

Expand All @@ -291,41 +290,69 @@ impl SuiNode {
"JWK iss {:?} (retrieved from {:?}) does not match provider {:?}",
id.iss, provider, iss_provider
);
metrics
.invalid_jwks
.with_label_values(&[&provider.to_string()])
.inc();
return false;
}

if !check_total_jwk_size(id, jwk) {
warn!("JWK {:?} (retrieved from {:?}) is too large", id, provider);
metrics
.invalid_jwks
.with_label_values(&[&provider.to_string()])
.inc();
return false;
}

true
}

// metrics is:
// pub struct SuiNodeMetrics {
// pub jwk_requests: IntCounterVec,
// pub jwk_request_errors: IntCounterVec,
// pub total_jwks: IntCounterVec,
// pub unique_jwks: IntCounterVec,
// }

for p in supported_providers.into_iter() {
let provider_str = p.to_string();
let epoch_store = epoch_store.clone();
let consensus_adapter = consensus_adapter.clone();
let metrics = metrics.clone();
spawn_monitored_task!(epoch_store.clone().within_alive_epoch(
async move {
// note: restart-safe de-duplication happens after consensus, this is
// just best-effort to reduce unneeded submissions.
let mut seen = HashSet::new();
loop {
info!("fetching JWK for provider {:?}", p);
metrics.jwk_requests.with_label_values(&[&provider_str]).inc();
match Self::fetch_jwks(authority, &p).await {
Err(e) => {
metrics.jwk_request_errors.with_label_values(&[&provider_str]).inc();
warn!("Error when fetching JWK {:?}", e);
// Retry in 30 seconds
tokio::time::sleep(Duration::from_secs(30)).await;
continue;
}
Ok(mut keys) => {
metrics.total_jwks
.with_label_values(&[&provider_str])
.inc_by(keys.len() as u64);

keys.retain(|(id, jwk)| {
validate_jwk(&p, id, jwk) &&
validate_jwk(&metrics, &p, id, jwk) &&
!epoch_store.jwk_active_in_current_epoch(id, jwk) &&
seen.insert((id.clone(), jwk.clone()))
});

metrics.unique_jwks
.with_label_values(&[&provider_str])
.inc_by(keys.len() as u64);

// prevent oauth providers from sending too many keys,
// inadvertently or otherwise
if keys.len() > MAX_JWK_KEYS_PER_FETCH {
Expand Down Expand Up @@ -354,9 +381,8 @@ impl SuiNode {
pub async fn start_async(
config: &NodeConfig,
registry_service: RegistryService,
node_once_cell: Arc<AsyncOnceCell<Arc<SuiNode>>>,
custom_rpc_runtime: Option<Handle>,
) -> Result<()> {
) -> Result<Arc<SuiNode>> {
NodeConfigMetrics::new(&registry_service.default_registry()).record_metrics(config);
let mut config = config.clone();
if config.supported_protocol_versions.is_none() {
Expand Down Expand Up @@ -627,6 +653,7 @@ impl SuiNode {
};

let connection_monitor_status = Arc::new(connection_monitor_status);
let sui_node_metrics = Arc::new(SuiNodeMetrics::new(&registry_service.default_registry()));

let validator_components = if state.is_validator(&epoch_store) {
let components = Self::construct_validator_components(
Expand All @@ -639,6 +666,7 @@ impl SuiNode {
accumulator.clone(),
connection_monitor_status.clone(),
&registry_service,
sui_node_metrics.clone(),
)
.await?;
// This is only needed during cold start.
Expand All @@ -656,6 +684,7 @@ impl SuiNode {
state,
transaction_orchestrator,
registry_service,
metrics: sui_node_metrics,

_discovery: discovery_handle,
state_sync: state_sync_handle,
Expand All @@ -680,10 +709,7 @@ impl SuiNode {
let node_copy = node.clone();
spawn_monitored_task!(async move { Self::monitor_reconfiguration(node_copy).await });

node_once_cell
.set(node)
.expect("Failed to set Arc<Node> in node_once_cell");
Ok(())
Ok(node)
}

pub fn subscribe_to_epoch_change(&self) -> broadcast::Receiver<SuiSystemState> {
Expand Down Expand Up @@ -944,6 +970,7 @@ impl SuiNode {
accumulator: Arc<StateAccumulator>,
connection_monitor_status: Arc<ConnectionMonitorStatus>,
registry_service: &RegistryService,
sui_node_metrics: Arc<SuiNodeMetrics>,
) -> Result<ValidatorComponents> {
let consensus_config = config
.consensus_config()
Expand Down Expand Up @@ -989,6 +1016,7 @@ impl SuiNode {
accumulator,
validator_server_handle,
checkpoint_metrics,
sui_node_metrics,
sui_tx_validator_metrics,
)
.await
Expand All @@ -1006,6 +1034,7 @@ impl SuiNode {
accumulator: Arc<StateAccumulator>,
validator_server_handle: JoinHandle<Result<()>>,
checkpoint_metrics: Arc<CheckpointMetrics>,
sui_node_metrics: Arc<SuiNodeMetrics>,
sui_tx_validator_metrics: Arc<SuiTxValidatorMetrics>,
) -> Result<ValidatorComponents> {
let (checkpoint_service, checkpoint_service_exit) = Self::start_checkpoint_service(
Expand Down Expand Up @@ -1066,6 +1095,7 @@ impl SuiNode {
if epoch_store.authenticator_state_enabled() {
Self::start_jwk_updater(
config,
sui_node_metrics,
state.name,
epoch_store.clone(),
consensus_adapter.clone(),
Expand Down Expand Up @@ -1402,6 +1432,7 @@ impl SuiNode {
self.accumulator.clone(),
validator_server_handle,
checkpoint_metrics,
self.metrics.clone(),
sui_tx_validator_metrics,
)
.await?,
Expand Down Expand Up @@ -1435,6 +1466,7 @@ impl SuiNode {
self.accumulator.clone(),
self.connection_monitor_status.clone(),
&self.registry_service,
self.metrics.clone(),
)
.await?,
)
Expand Down
19 changes: 9 additions & 10 deletions crates/sui-node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,16 +104,15 @@ fn main() {
let rpc_runtime = runtimes.json_rpc.handle().clone();

runtimes.sui_node.spawn(async move {
if let Err(e) = sui_node::SuiNode::start_async(
&config,
registry_service,
node_once_cell_clone,
Some(rpc_runtime),
)
.await
{
error!("Failed to start node: {e:?}");
std::process::exit(1)
match sui_node::SuiNode::start_async(&config, registry_service, Some(rpc_runtime)).await {
Ok(sui_node) => node_once_cell_clone
.set(sui_node)
.expect("Failed to set node in AsyncOnceCell"),

Err(e) => {
error!("Failed to start node: {e:?}");
std::process::exit(1);
}
}
// TODO: Do we want to provide a way for the node to gracefully shutdown?
loop {
Expand Down
51 changes: 51 additions & 0 deletions crates/sui-node/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,57 @@ pub fn start_metrics_push_task(config: &sui_config::NodeConfig, registry: Regist
});
}

pub struct SuiNodeMetrics {
pub jwk_requests: IntCounterVec,
pub jwk_request_errors: IntCounterVec,

pub total_jwks: IntCounterVec,
pub invalid_jwks: IntCounterVec,
pub unique_jwks: IntCounterVec,
}

impl SuiNodeMetrics {
pub fn new(registry: &Registry) -> Self {
Self {
jwk_requests: register_int_counter_vec_with_registry!(
"jwk_requests",
"Total number of JWK requests",
&["provider"],
registry,
)
.unwrap(),
jwk_request_errors: register_int_counter_vec_with_registry!(
"jwk_request_errors",
"Total number of JWK request errors",
&["provider"],
registry,
)
.unwrap(),
total_jwks: register_int_counter_vec_with_registry!(
"total_jwks",
"Total number of JWKs",
&["provider"],
registry,
)
.unwrap(),
invalid_jwks: register_int_counter_vec_with_registry!(
"invalid_jwks",
"Total number of invalid JWKs",
&["provider"],
registry,
)
.unwrap(),
unique_jwks: register_int_counter_vec_with_registry!(
"unique_jwks",
"Total number of unique JWKs",
&["provider"],
registry,
)
.unwrap(),
}
}
}

#[derive(Clone)]
pub struct GrpcMetrics {
inflight_grpc: IntGaugeVec,
Expand Down

0 comments on commit 3aa6cdc

Please sign in to comment.