From 632815298906ac2144fd610276429ce3e2f05fb4 Mon Sep 17 00:00:00 2001 From: viquezclaudio Date: Thu, 28 Nov 2024 10:14:56 -0600 Subject: [PATCH 1/3] Validator health status --- .../src/account/staking_contract/validator.rs | 6 + test-utils/src/validator.rs | 2 +- validator/src/micro.rs | 19 ++ validator/src/validator.rs | 100 ++++++++- validator/tests/mock.rs | 209 +++++++++++++++++- 5 files changed, 329 insertions(+), 7 deletions(-) diff --git a/primitives/account/src/account/staking_contract/validator.rs b/primitives/account/src/account/staking_contract/validator.rs index eab42ad87c..93ce458dca 100644 --- a/primitives/account/src/account/staking_contract/validator.rs +++ b/primitives/account/src/account/staking_contract/validator.rs @@ -46,6 +46,12 @@ use crate::{ /// in the first place. /// (**) The validator may be set to automatically reactivate itself upon inactivation. /// If this setting is not enabled the state change can only be triggered manually. +/// However, there is a validator health status with the following states: +/// -> Green: Everything is working as expected, if the validator is deactivated its status changes to Yellow +/// -> Yellow: If the validator is deactivated again, its status is changed to Red +/// -> Red: If the validator is deactivated again, the automatic reactivate (if enabled) has no effect +/// Human intervention is required at this point +/// To go from Red to Yellow or Yellow to Green, the validator needs to be active for at least a quarter of an epoch /// /// Create, Update, Deactivate, Retire and Re-activate are incoming transactions to the staking contract. /// Delete is an outgoing transaction from the staking contract. diff --git a/test-utils/src/validator.rs b/test-utils/src/validator.rs index 7be7081fb0..f7969ee95f 100644 --- a/test-utils/src/validator.rs +++ b/test-utils/src/validator.rs @@ -113,7 +113,7 @@ where let (v, c) = build_validator( peer_ids[i], Address::from(&validator_keys[i]), - false, + true, signing_keys[i].clone(), voting_keys[i].clone(), fee_keys[i].clone(), diff --git a/validator/src/micro.rs b/validator/src/micro.rs index 4d3e60494c..61dce72177 100644 --- a/validator/src/micro.rs +++ b/validator/src/micro.rs @@ -10,6 +10,7 @@ use futures::{future::BoxFuture, ready, FutureExt, Stream}; use nimiq_block::{Block, EquivocationProof, MicroBlock, SkipBlockInfo}; use nimiq_blockchain::{BlockProducer, BlockProducerError, Blockchain}; use nimiq_blockchain_interface::AbstractBlockchain; +use nimiq_keys::Address; use nimiq_mempool::mempool::Mempool; use nimiq_primitives::policy::Policy; use nimiq_time::sleep; @@ -36,6 +37,8 @@ struct NextProduceMicroBlockEvent { block_number: u32, producer_timeout: Duration, block_separation_time: Duration, + validator_address: Address, + publish_block: bool, } impl NextProduceMicroBlockEvent { @@ -53,6 +56,8 @@ impl NextProduceMicroBlockEvent Self { Self { blockchain, @@ -65,6 +70,8 @@ impl NextProduceMicroBlockEvent NextProduceMicroBlockEvent NextProduceMicroBlockEvent NextProduceMicroBlockEvent ProduceMicroBlock Self { let next_event = NextProduceMicroBlockEvent::new( blockchain, @@ -416,6 +433,8 @@ impl ProduceMicroBlock, pub slot_band: Arc>>, pub consensus_state: Arc>, + pub validator_health: Arc>, } impl Clone for ValidatorProxy { @@ -88,6 +111,7 @@ impl Clone for ValidatorProxy { automatic_reactivate: Arc::clone(&self.automatic_reactivate), slot_band: Arc::clone(&self.slot_band), consensus_state: Arc::clone(&self.consensus_state), + validator_health: Arc::clone(&self.validator_health), } } } @@ -119,6 +143,8 @@ where slot_band: Arc>>, consensus_state: Arc>, validator_state: Option, + health_state: Arc>, + automatic_reactivate: Arc, macro_producer: Option>, @@ -200,6 +226,11 @@ where .await }); + let health_state = HealthState { + health: ValidatorHealth::Green, + publish: true, + }; + Self { consensus: consensus.proxy(), blockchain, @@ -222,6 +253,8 @@ where slot_band: Arc::new(RwLock::new(None)), consensus_state: Arc::new(RwLock::new(blockchain_state)), validator_state: None, + health_state: Arc::new(RwLock::new(health_state)), + automatic_reactivate, macro_producer: None, @@ -440,6 +473,8 @@ where next_block_number, Self::compute_micro_block_producer_timeout(head, &blockchain), Self::BLOCK_SEPARATION_TIME, + self.validator_address.read().clone(), + self.health_state.read().publish, )); } } @@ -729,6 +764,7 @@ where automatic_reactivate: Arc::clone(&self.automatic_reactivate), slot_band: Arc::clone(&self.slot_band), consensus_state: Arc::clone(&self.consensus_state), + validator_health: Arc::clone(&self.health_state), } } @@ -825,6 +861,7 @@ where // Once the validator can be active is established, check the validator staking state. if self.is_synced() { let blockchain = self.blockchain.read(); + let block_number = blockchain.block_number(); match self.get_staking_state(&blockchain) { ValidatorStakingState::Active => { drop(blockchain); @@ -832,6 +869,36 @@ where self.validator_state = None; info!("Automatically reactivated."); } + + let validator_health = self.health_state.read().health; + match validator_health { + ValidatorHealth::Green => {} + ValidatorHealth::Yellow(yellow_block_number) => { + let blocks_diff = block_number - yellow_block_number; + debug!( + "Current validator health {} is yellow, blocks diff: {} ", + self.validator_address.read(), + blocks_diff + ); + if blocks_diff >= Policy::blocks_per_epoch() / 4 { + log::info!("Changing the validator health back to green"); + self.health_state.write().health = ValidatorHealth::Green; + } + } + ValidatorHealth::Red(red_block_number) => { + let blocks_diff = block_number - red_block_number; + debug!( + "Current validator health {} is red, blocks diff: {} ", + self.validator_address.read(), + blocks_diff + ); + if blocks_diff >= Policy::blocks_per_epoch() / 4 { + log::info!("Changing the validator health back to yellow"); + self.health_state.write().health = + ValidatorHealth::Yellow(block_number); + } + } + } } ValidatorStakingState::Inactive(jailed_from) => { if self.validator_state.is_none() @@ -842,9 +909,36 @@ where .unwrap_or(true) && self.automatic_reactivate.load(Ordering::Acquire) { - let inactivity_state = self.reactivate(&blockchain); - drop(blockchain); - self.validator_state = Some(inactivity_state); + let validator_health = self.health_state.read().health; + match validator_health { + ValidatorHealth::Green => { + log::warn!( + "The validator {} was inactivated, changing its health to Yellow", + self.validator_address.read() + ); + let inactivity_state = self.reactivate(&blockchain); + drop(blockchain); + self.validator_state = Some(inactivity_state); + self.health_state.write().health = + ValidatorHealth::Yellow(block_number); + } + ValidatorHealth::Yellow(_) => { + log::warn!( + "The validator {} was inactivated again, changing its health to Red", + self.validator_address.read() + ); + let inactivity_state = self.reactivate(&blockchain); + drop(blockchain); + self.validator_state = Some(inactivity_state); + self.health_state.write().health = + ValidatorHealth::Red(block_number); + } + ValidatorHealth::Red(_) => { + log::warn!( + "The validator needs human intervention, no automatic reactivate" + ); + } + } } } ValidatorStakingState::UnknownOrNoStake => {} diff --git a/validator/tests/mock.rs b/validator/tests/mock.rs index 23a5b8bd81..2aced264b4 100644 --- a/validator/tests/mock.rs +++ b/validator/tests/mock.rs @@ -15,7 +15,7 @@ use nimiq_network_interface::{ }; use nimiq_network_libp2p::Network; use nimiq_network_mock::{MockHub, MockNetwork}; -use nimiq_primitives::{networks::NetworkId, policy::Policy}; +use nimiq_primitives::{coin::Coin, networks::NetworkId, policy::Policy}; use nimiq_test_log::test; use nimiq_test_utils::{ test_network::TestNetwork, @@ -24,9 +24,11 @@ use nimiq_test_utils::{ }, }; use nimiq_time::{sleep, timeout}; +use nimiq_transaction_builder::TransactionBuilder; use nimiq_utils::spawn; -use nimiq_validator::aggregation::{ - skip_block::SignedSkipBlockMessage, update::SerializableLevelUpdate, +use nimiq_validator::{ + aggregation::{skip_block::SignedSkipBlockMessage, update::SerializableLevelUpdate}, + validator::ValidatorHealth, }; use serde::{Deserialize, Serialize}; @@ -176,6 +178,207 @@ async fn validators_can_do_skip_block() { assert!(block.block_number() > Policy::genesis_block_number()); } +#[test(tokio::test)] +async fn validator_can_recover_from_yellow_health() { + let env = + MdbxDatabase::new_volatile(Default::default()).expect("Could not open a volatile database"); + + let validators = + build_validators::(env, &(5u64..=10u64).collect::>(), &mut None, false) + .await; + + // Listen for blockchain events from the new block producer (after a skip block). + let validator = validators.first().unwrap(); + let validator_proxy = validator.proxy(); + + let validator = validators.last().unwrap(); + let blockchain = Arc::clone(&validator.blockchain); + let events = blockchain.read().notifier_as_stream(); + + for validator in validators { + log::info!("Spawning validator: {}", validator.validator_address()); + spawn(validator); + } + + validator_proxy.validator_health.write().publish = false; + + events.take(10).for_each(|_| future::ready(())).await; + + let current_validator_health = validator_proxy.validator_health.read().health; + + match current_validator_health { + ValidatorHealth::Yellow(block_number) => { + log::info!( + "Current validator health is yellow, as expected, inactivated block {}", + block_number + ) + } + _ => panic!("Validator Health different than expected"), + }; + + // The validator should no longer be skip blocked: + validator_proxy.validator_health.write().publish = true; + + let events = blockchain.read().notifier_as_stream(); + events.take(30).for_each(|_| future::ready(())).await; + + assert_eq!( + validator_proxy.validator_health.read().health, + ValidatorHealth::Green + ); +} + +#[test(tokio::test)] +async fn validator_health_to_red() { + let env = + MdbxDatabase::new_volatile(Default::default()).expect("Could not open a volatile database"); + + let validators = + build_validators::(env, &(5u64..=10u64).collect::>(), &mut None, false) + .await; + + // Listen for blockchain events from the new block producer (after a skip block). + let validator = validators.first().unwrap(); + let validator_proxy = validator.proxy(); + + let validator = validators.last().unwrap(); + let blockchain = Arc::clone(&validator.blockchain); + let events = blockchain.read().notifier_as_stream(); + + for validator in validators { + log::info!("Spawning validator: {}", validator.validator_address()); + spawn(validator); + } + + validator_proxy.validator_health.write().publish = false; + + events.take(10).for_each(|_| future::ready(())).await; + + let current_validator_health = validator_proxy.validator_health.read().health; + + match current_validator_health { + ValidatorHealth::Yellow(block_number) => { + log::info!( + "Current validator health is yellow, as expected, inactivated block {}", + block_number + ) + } + _ => panic!("Validator Health different than expected"), + }; + + let events = blockchain.read().notifier_as_stream(); + + // Now we produce more blocks, and the validator should be inactivated again + events.take(20).for_each(|_| future::ready(())).await; + + let current_validator_health = validator_proxy.validator_health.read().health; + + match current_validator_health { + ValidatorHealth::Red(block_number) => { + log::info!( + "Current validator health is red, as expected, inactivated block {}", + block_number + ) + } + _ => panic!("Validator Health different than expected"), + }; +} + +#[test(tokio::test)] +async fn validator_health_fully_recover() { + let env = + MdbxDatabase::new_volatile(Default::default()).expect("Could not open a volatile database"); + + let validators = + build_validators::(env, &(5u64..=10u64).collect::>(), &mut None, false) + .await; + + // Listen for blockchain events from the new block producer (after a skip block). + let validator = validators.first().unwrap(); + let consensus = validator.consensus.clone(); + let validator_proxy = validator.proxy(); + let validator_address = validator.validator_address(); + + log::info!( + "Listening to blockchain events from validator {} ", + validator_address, + ); + + let validator = validators.last().unwrap(); + let blockchain = Arc::clone(&validator.blockchain); + + let events = blockchain.read().notifier_as_stream(); + + for validator in validators { + log::info!("Spawning validator: {}", validator.validator_address()); + spawn(validator); + } + + validator_proxy.validator_health.write().publish = false; + + events.take(10).for_each(|_| future::ready(())).await; + + let current_validator_health = validator_proxy.validator_health.read().health; + + match current_validator_health { + ValidatorHealth::Yellow(block_number) => { + log::info!( + "Current validator health is yellow, as expected, inactivated block {}", + block_number + ) + } + _ => panic!("Validator Health different than expected"), + }; + + let events = blockchain.read().notifier_as_stream(); + + // Now we produce more blocks, and the validator should be inactivated again + events.take(20).for_each(|_| future::ready(())).await; + + let current_validator_health = validator_proxy.validator_health.read().health; + + match current_validator_health { + ValidatorHealth::Red(block_number) => { + log::info!( + "Current validator health is red, as expected, inactivated block {}", + block_number + ) + } + _ => panic!("Validator Health different than expected"), + }; + + // Since the validator needs manual intervention, we are going to send the reactivate transaction + + let reactivate_transaction = TransactionBuilder::new_reactivate_validator( + &validator_proxy.fee_key.read(), + validator_address, + &validator_proxy.signing_key.read(), + Coin::ZERO, + Policy::genesis_block_number(), + NetworkId::UnitAlbatross, + ); + + spawn(async move { + log::info!("Sending reactivate transaction to the network"); + if consensus + .send_transaction(reactivate_transaction.clone()) + .await + .is_err() + { + log::error!("Failed to send reactivate transaction"); + } + }); + + validator_proxy.validator_health.write().publish = true; + + let events = blockchain.read().notifier_as_stream(); + events.take(70).for_each(|_| future::ready(())).await; + + let current_validator_health = validator_proxy.validator_health.read().health; + + assert_eq!(current_validator_health, ValidatorHealth::Green); +} + fn create_skip_block_update( skip_block_info: SkipBlockInfo, key_pair: BlsKeyPair, From bc57e2186157a9bf0afb125414b30f1835b5fa94 Mon Sep 17 00:00:00 2001 From: viquezclaudio Date: Mon, 23 Dec 2024 14:19:24 -0600 Subject: [PATCH 2/3] Track blocks that are produced by validator Track the number of blocks that are succesfully produced when changing the validator health --- validator/src/micro.rs | 21 ++++++++++++-------- validator/src/validator.rs | 39 +++++++++++++++++++++++--------------- validator/tests/mock.rs | 4 ++-- 3 files changed, 39 insertions(+), 25 deletions(-) diff --git a/validator/src/micro.rs b/validator/src/micro.rs index 61dce72177..e81f55aebf 100644 --- a/validator/src/micro.rs +++ b/validator/src/micro.rs @@ -19,7 +19,10 @@ use nimiq_validator_network::ValidatorNetwork; use nimiq_vrf::VrfSeed; use parking_lot::RwLock; -use crate::{aggregation::skip_block::SkipBlockAggregation, validator::Validator}; +use crate::{ + aggregation::skip_block::SkipBlockAggregation, + validator::{HealthState, Validator}, +}; pub(crate) enum ProduceMicroBlockEvent { MicroBlock, @@ -38,7 +41,7 @@ struct NextProduceMicroBlockEvent { producer_timeout: Duration, block_separation_time: Duration, validator_address: Address, - publish_block: bool, + health_state: Arc>, } impl NextProduceMicroBlockEvent { @@ -57,7 +60,7 @@ impl NextProduceMicroBlockEvent>, ) -> Self { Self { blockchain, @@ -71,7 +74,7 @@ impl NextProduceMicroBlockEvent NextProduceMicroBlockEvent NextProduceMicroBlockEvent ProduceMicroBlock>, ) -> Self { let next_event = NextProduceMicroBlockEvent::new( blockchain, @@ -434,7 +439,7 @@ impl ProduceMicroBlock {} ValidatorHealth::Yellow(yellow_block_number) => { - let blocks_diff = block_number - yellow_block_number; debug!( - "Current validator health {} is yellow, blocks diff: {} ", - self.validator_address.read(), - blocks_diff + address = %self.validator_address.read(), + inactivated = yellow_block_number, + good_blocks = %self.health_state.read().blk_cnt, + "Current validator health is yellow", ); - if blocks_diff >= Policy::blocks_per_epoch() / 4 { + if self.health_state.read().blk_cnt >= VALIDATOR_HEALTH_THRESHOLD { log::info!("Changing the validator health back to green"); self.health_state.write().health = ValidatorHealth::Green; + self.health_state.write().blk_cnt = 0; } } ValidatorHealth::Red(red_block_number) => { - let blocks_diff = block_number - red_block_number; debug!( - "Current validator health {} is red, blocks diff: {} ", - self.validator_address.read(), - blocks_diff + address = %self.validator_address.read(), + inactivated = red_block_number, + "Current validator health is red", ); - if blocks_diff >= Policy::blocks_per_epoch() / 4 { + if self.health_state.read().blk_cnt >= VALIDATOR_HEALTH_THRESHOLD { log::info!("Changing the validator health back to yellow"); self.health_state.write().health = ValidatorHealth::Yellow(block_number); + self.health_state.write().blk_cnt = 0; } } } @@ -913,25 +920,27 @@ where match validator_health { ValidatorHealth::Green => { log::warn!( - "The validator {} was inactivated, changing its health to Yellow", - self.validator_address.read() + address=%self.validator_address.read(), + "The validator was inactivated, changing its health to Yellow", ); let inactivity_state = self.reactivate(&blockchain); drop(blockchain); self.validator_state = Some(inactivity_state); self.health_state.write().health = ValidatorHealth::Yellow(block_number); + self.health_state.write().blk_cnt = 0; } ValidatorHealth::Yellow(_) => { log::warn!( - "The validator {} was inactivated again, changing its health to Red", - self.validator_address.read() + address=%self.validator_address.read(), + "The validator was inactivated again, changing its health to Red", ); let inactivity_state = self.reactivate(&blockchain); drop(blockchain); self.validator_state = Some(inactivity_state); self.health_state.write().health = ValidatorHealth::Red(block_number); + self.health_state.write().blk_cnt = 0; } ValidatorHealth::Red(_) => { log::warn!( diff --git a/validator/tests/mock.rs b/validator/tests/mock.rs index 2aced264b4..8c0da48898 100644 --- a/validator/tests/mock.rs +++ b/validator/tests/mock.rs @@ -220,7 +220,7 @@ async fn validator_can_recover_from_yellow_health() { validator_proxy.validator_health.write().publish = true; let events = blockchain.read().notifier_as_stream(); - events.take(30).for_each(|_| future::ready(())).await; + events.take(40).for_each(|_| future::ready(())).await; assert_eq!( validator_proxy.validator_health.read().health, @@ -372,7 +372,7 @@ async fn validator_health_fully_recover() { validator_proxy.validator_health.write().publish = true; let events = blockchain.read().notifier_as_stream(); - events.take(70).for_each(|_| future::ready(())).await; + events.take(100).for_each(|_| future::ready(())).await; let current_validator_health = validator_proxy.validator_health.read().health; From 265abb11d5f659147b24358fbc045c2180ec1050 Mon Sep 17 00:00:00 2001 From: viquezclaudio Date: Mon, 13 Jan 2025 15:12:50 -0600 Subject: [PATCH 3/3] Use a cuadratic backup curve for reactivate transactions When a validator is deactivated, the reactivate transaction will have a delay based on the number of consecutive deactivations the validator has experienced in the current epoch. The amount of delayed blocks is of quadratic nature --- .../src/account/staking_contract/validator.rs | 9 +- test-utils/src/validator.rs | 3 +- validator/src/health.rs | 156 ++++++++++++ validator/src/lib.rs | 1 + validator/src/micro.rs | 21 +- validator/src/validator.rs | 126 ++-------- validator/tests/integration.rs | 9 +- validator/tests/mock.rs | 227 +++++++----------- 8 files changed, 286 insertions(+), 266 deletions(-) create mode 100644 validator/src/health.rs diff --git a/primitives/account/src/account/staking_contract/validator.rs b/primitives/account/src/account/staking_contract/validator.rs index 93ce458dca..ac4660063e 100644 --- a/primitives/account/src/account/staking_contract/validator.rs +++ b/primitives/account/src/account/staking_contract/validator.rs @@ -46,12 +46,9 @@ use crate::{ /// in the first place. /// (**) The validator may be set to automatically reactivate itself upon inactivation. /// If this setting is not enabled the state change can only be triggered manually. -/// However, there is a validator health status with the following states: -/// -> Green: Everything is working as expected, if the validator is deactivated its status changes to Yellow -/// -> Yellow: If the validator is deactivated again, its status is changed to Red -/// -> Red: If the validator is deactivated again, the automatic reactivate (if enabled) has no effect -/// Human intervention is required at this point -/// To go from Red to Yellow or Yellow to Green, the validator needs to be active for at least a quarter of an epoch +/// However, there is a delay incurred if the validator is deactivated multiple consecutive times +/// in the current epoch. The delay is a function of the number of deactivations and is reduced +/// if the validator starts producing blocks in time. /// /// Create, Update, Deactivate, Retire and Re-activate are incoming transactions to the staking contract. /// Delete is an outgoing transaction from the staking contract. diff --git a/test-utils/src/validator.rs b/test-utils/src/validator.rs index f7969ee95f..3fe39dde7b 100644 --- a/test-utils/src/validator.rs +++ b/test-utils/src/validator.rs @@ -66,6 +66,7 @@ pub async fn build_validators( peer_ids: &[u64], hub: &mut Option, is_prover_active: bool, + automatic_reactivate: bool, ) -> Vec>> where N::Error: Send + Sync, @@ -113,7 +114,7 @@ where let (v, c) = build_validator( peer_ids[i], Address::from(&validator_keys[i]), - true, + automatic_reactivate, signing_keys[i].clone(), voting_keys[i].clone(), fee_keys[i].clone(), diff --git a/validator/src/health.rs b/validator/src/health.rs new file mode 100644 index 0000000000..f262fd550c --- /dev/null +++ b/validator/src/health.rs @@ -0,0 +1,156 @@ +use std::cmp; + +use nimiq_keys::Address; + +/// Enum that represents the overall health of a validator +/// - Green means the Validator is working as expected. +/// - Yellow means there has been more than 'VALIDATOR_YELLOW_HEALTH_INACTIVATIONS' consecutive inactivations +/// in the current epoch. +/// - Red means there has been more than 'VALIDATOR_RED_HEALTH_INACTIVATIONS' consecutive inactivations +/// in the current epoch. +#[derive(Clone, Copy, Debug, PartialEq)] +pub enum ValidatorHealthState { + Green, + Yellow, + Red, +} + +/// Struct that represents the overall Validator Health +pub struct ValidatorHealth { + /// The current validator health + health: ValidatorHealthState, + /// Validator address + address: Address, + /// Only used for testing purposes controls whether blocks are published by the validator + publish: bool, + /// Number of consecutive inactivations that have occurred in the current epoch + inactivations: u32, + /// Next block number indicating when the re-activate transaction should be sent + reactivate_block_number: u32, + /// Flag that indicates an ongoing reactivation + pending_reactivate: bool, +} + +impl ValidatorHealth { + /// The number of consecutive inactivations from which a validator is considered with yellow health + const VALIDATOR_YELLOW_HEALTH_INACTIVATIONS: u32 = 2; + /// The number of consecutive inactivations from which a validator is considered with red health + const VALIDATOR_RED_HEALTH_INACTIVATIONS: u32 = 4; + // The maximum number of blocks the reactivate transaction can be delayed + const MAX_REACTIVATE_DELAY: u32 = 10_000; + + /// Creates a new instance of the validator health structure + pub fn new(validator_address: &Address) -> ValidatorHealth { + ValidatorHealth { + health: ValidatorHealthState::Green, + publish: true, + inactivations: 0, + reactivate_block_number: 0, + pending_reactivate: false, + address: validator_address.clone(), + } + } + + /// Computes the associated delay based on the current number of inactivations + fn get_reactivate_delay(&self) -> u32 { + cmp::min(self.inactivations.pow(2), Self::MAX_REACTIVATE_DELAY) + } + + /// Returns the current health state of the validator + pub fn health(&self) -> ValidatorHealthState { + self.health + } + + /// This flag should only be set in testing environments. + pub fn _set_publish_flag(&mut self, publish: bool) { + self.publish = publish + } + + /// Decides if blocks should be published by the validator (only used for testing) + pub fn publish_block(&self) -> bool { + self.publish + } + + /// Recomputes the current validator health state + pub fn refresh_validator_health_status(&mut self) { + log::debug!( + address=%self.address, + self.inactivations, + "Current validator Inactivations counter" + ); + + match self.health { + ValidatorHealthState::Green => {} + ValidatorHealthState::Yellow => { + if self.inactivations < Self::VALIDATOR_YELLOW_HEALTH_INACTIVATIONS { + log::info!(self.inactivations, "Changed validator health to green"); + self.health = ValidatorHealthState::Green; + } + } + ValidatorHealthState::Red => { + if self.inactivations < Self::VALIDATOR_RED_HEALTH_INACTIVATIONS { + log::info!(self.inactivations, "Changed validator health to yellow"); + self.health = ValidatorHealthState::Yellow; + } + } + } + } + + /// Decreases the number of consecutive deactivations + pub fn block_produced(&mut self) { + self.inactivations = self.inactivations.saturating_sub(1); + } + + /// Increases the number of consecutive deactivations + pub fn inactivate(&mut self, block_number: u32) { + if !self.pending_reactivate { + self.inactivations = self.inactivations.saturating_add(1); + self.reactivate_block_number = block_number + self.get_reactivate_delay(); + self.pending_reactivate = true; + + match self.health { + ValidatorHealthState::Green => { + if self.inactivations >= Self::VALIDATOR_YELLOW_HEALTH_INACTIVATIONS { + log::warn!(self.inactivations, "Changed validator health to yellow"); + self.health = ValidatorHealthState::Yellow; + } + } + ValidatorHealthState::Yellow => { + if self.inactivations >= Self::VALIDATOR_RED_HEALTH_INACTIVATIONS { + log::warn!(self.inactivations, "Changed validator health to red"); + self.health = ValidatorHealthState::Red; + } + } + ValidatorHealthState::Red => { + log::warn!("Validator health is still red") + } + } + + log::debug!( + address=%self.address, + self.inactivations, + self.reactivate_block_number, + block_number, + "New inactivation, current status", + ); + } + } + + /// Resets the internal counters and validator health state for a new epoch + pub fn reset_epoch(&mut self) { + // Reset the inactivations counter + self.inactivations = 0; + // Reset the validator health every epoch + self.health = ValidatorHealthState::Green; + } + + /// Resets the pending reactivation flag + pub fn reset_reactivation(&mut self) { + self.pending_reactivate = false; + } + + /// Returns the block number for the next re-activate transaction + pub fn get_reactivate_block_number(&self) -> u32 { + self.reactivate_block_number + } +} diff --git a/validator/src/lib.rs b/validator/src/lib.rs index 19fe7e0c81..8171d338b7 100644 --- a/validator/src/lib.rs +++ b/validator/src/lib.rs @@ -2,6 +2,7 @@ extern crate log; pub mod aggregation; +pub mod health; mod jail; pub mod key_utils; mod r#macro; diff --git a/validator/src/micro.rs b/validator/src/micro.rs index e81f55aebf..60076a8d32 100644 --- a/validator/src/micro.rs +++ b/validator/src/micro.rs @@ -20,8 +20,7 @@ use nimiq_vrf::VrfSeed; use parking_lot::RwLock; use crate::{ - aggregation::skip_block::SkipBlockAggregation, - validator::{HealthState, Validator}, + aggregation::skip_block::SkipBlockAggregation, health::ValidatorHealth, validator::Validator, }; pub(crate) enum ProduceMicroBlockEvent { @@ -41,7 +40,7 @@ struct NextProduceMicroBlockEvent { producer_timeout: Duration, block_separation_time: Duration, validator_address: Address, - health_state: Arc>, + validator_health: Arc>, } impl NextProduceMicroBlockEvent { @@ -60,7 +59,7 @@ impl NextProduceMicroBlockEvent>, + validator_health: Arc>, ) -> Self { Self { blockchain, @@ -74,7 +73,7 @@ impl NextProduceMicroBlockEvent NextProduceMicroBlockEvent NextProduceMicroBlockEvent ProduceMicroBlock>, + validator_health: Arc>, ) -> Self { let next_event = NextProduceMicroBlockEvent::new( blockchain, @@ -439,7 +438,7 @@ impl ProduceMicroBlock, pub slot_band: Arc>>, pub consensus_state: Arc>, - pub validator_health: Arc>, + pub validator_health: Arc>, } impl Clone for ValidatorProxy { @@ -148,7 +122,7 @@ where slot_band: Arc>>, consensus_state: Arc>, validator_state: Option, - health_state: Arc>, + validator_health: Arc>, automatic_reactivate: Arc, @@ -231,11 +205,7 @@ where .await }); - let health_state = HealthState { - health: ValidatorHealth::Green, - publish: true, - blk_cnt: 0, - }; + let validator_health = ValidatorHealth::new(&validator_address); Self { consensus: consensus.proxy(), @@ -259,7 +229,7 @@ where slot_band: Arc::new(RwLock::new(None)), consensus_state: Arc::new(RwLock::new(blockchain_state)), validator_state: None, - health_state: Arc::new(RwLock::new(health_state)), + validator_health: Arc::new(RwLock::new(validator_health)), automatic_reactivate, @@ -480,7 +450,7 @@ where Self::compute_micro_block_producer_timeout(head, &blockchain), Self::BLOCK_SEPARATION_TIME, self.validator_address.read().clone(), - Arc::clone(&self.health_state), + Arc::clone(&self.validator_health), )); } } @@ -500,6 +470,9 @@ where self.on_blockchain_extended(hash); } BlockchainEvent::EpochFinalized(ref hash) => { + // Reset the validator health for the new epoch + self.validator_health.write().reset_epoch(); + self.init_epoch(); // The on_blockchain_extended is necessary for the order of events to not matter. self.on_blockchain_extended(hash); @@ -529,6 +502,14 @@ where self.check_reactivate(block.block_number()); self.init_block_producer(Some(hash)); + + let block_number = block.block_number(); + let blockchain = self.blockchain.read(); + + if block_number == self.validator_health.read().get_reactivate_block_number() { + let inactivity_state = self.reactivate(&blockchain); + self.validator_state = Some(inactivity_state); + } } fn on_blockchain_rebranched( @@ -725,7 +706,7 @@ where let cn = self.consensus.clone(); spawn(async move { - debug!("Sending reactivate transaction to the network"); + info!("Sending reactivate transaction to the network"); if cn .send_transaction(reactivate_transaction.clone()) .await @@ -770,7 +751,7 @@ where automatic_reactivate: Arc::clone(&self.automatic_reactivate), slot_band: Arc::clone(&self.slot_band), consensus_state: Arc::clone(&self.consensus_state), - validator_health: Arc::clone(&self.health_state), + validator_health: Arc::clone(&self.validator_health), } } @@ -870,42 +851,16 @@ where let block_number = blockchain.block_number(); match self.get_staking_state(&blockchain) { ValidatorStakingState::Active => { - drop(blockchain); if self.validator_state.is_some() { + drop(blockchain); self.validator_state = None; + self.validator_health.write().reset_reactivation(); info!("Automatically reactivated."); } - let validator_health = self.health_state.read().health; - match validator_health { - ValidatorHealth::Green => {} - ValidatorHealth::Yellow(yellow_block_number) => { - debug!( - address = %self.validator_address.read(), - inactivated = yellow_block_number, - good_blocks = %self.health_state.read().blk_cnt, - "Current validator health is yellow", - ); - if self.health_state.read().blk_cnt >= VALIDATOR_HEALTH_THRESHOLD { - log::info!("Changing the validator health back to green"); - self.health_state.write().health = ValidatorHealth::Green; - self.health_state.write().blk_cnt = 0; - } - } - ValidatorHealth::Red(red_block_number) => { - debug!( - address = %self.validator_address.read(), - inactivated = red_block_number, - "Current validator health is red", - ); - if self.health_state.read().blk_cnt >= VALIDATOR_HEALTH_THRESHOLD { - log::info!("Changing the validator health back to yellow"); - self.health_state.write().health = - ValidatorHealth::Yellow(block_number); - self.health_state.write().blk_cnt = 0; - } - } - } + self.validator_health + .write() + .refresh_validator_health_status(); } ValidatorStakingState::Inactive(jailed_from) => { if self.validator_state.is_none() @@ -916,38 +871,7 @@ where .unwrap_or(true) && self.automatic_reactivate.load(Ordering::Acquire) { - let validator_health = self.health_state.read().health; - match validator_health { - ValidatorHealth::Green => { - log::warn!( - address=%self.validator_address.read(), - "The validator was inactivated, changing its health to Yellow", - ); - let inactivity_state = self.reactivate(&blockchain); - drop(blockchain); - self.validator_state = Some(inactivity_state); - self.health_state.write().health = - ValidatorHealth::Yellow(block_number); - self.health_state.write().blk_cnt = 0; - } - ValidatorHealth::Yellow(_) => { - log::warn!( - address=%self.validator_address.read(), - "The validator was inactivated again, changing its health to Red", - ); - let inactivity_state = self.reactivate(&blockchain); - drop(blockchain); - self.validator_state = Some(inactivity_state); - self.health_state.write().health = - ValidatorHealth::Red(block_number); - self.health_state.write().blk_cnt = 0; - } - ValidatorHealth::Red(_) => { - log::warn!( - "The validator needs human intervention, no automatic reactivate" - ); - } - } + self.validator_health.write().inactivate(block_number); } } ValidatorStakingState::UnknownOrNoStake => {} diff --git a/validator/tests/integration.rs b/validator/tests/integration.rs index 84c696ebd4..8858701d67 100644 --- a/validator/tests/integration.rs +++ b/validator/tests/integration.rs @@ -1,6 +1,5 @@ use std::sync::Arc; -use futures::{future, StreamExt}; use nimiq_block::Block; use nimiq_blockchain::{BlockProducer, Blockchain, BlockchainConfig}; use nimiq_blockchain_interface::{AbstractBlockchain, PushResult}; @@ -8,15 +7,13 @@ use nimiq_bls::KeyPair as BlsKeyPair; use nimiq_database::mdbx::MdbxDatabase; use nimiq_genesis::NetworkId; use nimiq_keys::KeyPair; -use nimiq_network_libp2p::Network; use nimiq_primitives::{coin::Coin, policy::Policy}; use nimiq_test_log::test; -use nimiq_test_utils::{ - blockchain::{produce_macro_blocks_with_txns, signing_key, validator_key, voting_key}, - validator::build_validators, +use nimiq_test_utils::blockchain::{ + produce_macro_blocks_with_txns, signing_key, validator_key, voting_key, }; use nimiq_transaction_builder::TransactionBuilder; -use nimiq_utils::{key_rng::SecureGenerate, spawn, time::OffsetTime}; +use nimiq_utils::{key_rng::SecureGenerate, time::OffsetTime}; use parking_lot::RwLock; #[test(tokio::test)] diff --git a/validator/tests/mock.rs b/validator/tests/mock.rs index 8c0da48898..c6afc5efcf 100644 --- a/validator/tests/mock.rs +++ b/validator/tests/mock.rs @@ -1,34 +1,24 @@ use std::{sync::Arc, task::Poll, time::Duration}; use futures::{future, StreamExt}; -use nimiq_block::{MultiSignature, SignedSkipBlockInfo, SkipBlockInfo}; -use nimiq_blockchain_interface::{AbstractBlockchain, BlockchainEvent}; -use nimiq_bls::{AggregateSignature, KeyPair as BlsKeyPair}; -use nimiq_collections::BitSet; +use nimiq_blockchain_interface::AbstractBlockchain; +use nimiq_bls::KeyPair as BlsKeyPair; use nimiq_database::mdbx::MdbxDatabase; use nimiq_genesis_builder::GenesisBuilder; -use nimiq_handel::update::LevelUpdate; use nimiq_keys::{Address, KeyPair, SecureGenerate}; -use nimiq_network_interface::{ - network::{CloseReason, Network as NetworkInterface}, - request::{MessageMarker, RequestCommon}, -}; +use nimiq_network_interface::request::{MessageMarker, RequestCommon}; use nimiq_network_libp2p::Network; use nimiq_network_mock::{MockHub, MockNetwork}; -use nimiq_primitives::{coin::Coin, networks::NetworkId, policy::Policy}; +use nimiq_primitives::{networks::NetworkId, policy::Policy}; use nimiq_test_log::test; -use nimiq_test_utils::{ - test_network::TestNetwork, - validator::{ - build_validator, build_validators, pop_validator_for_slot, seeded_rng, validator_for_slot, - }, +use nimiq_test_utils::validator::{ + build_validator, build_validators, pop_validator_for_slot, seeded_rng, }; -use nimiq_time::{sleep, timeout}; -use nimiq_transaction_builder::TransactionBuilder; +use nimiq_time::timeout; use nimiq_utils::spawn; use nimiq_validator::{ aggregation::{skip_block::SignedSkipBlockMessage, update::SerializableLevelUpdate}, - validator::ValidatorHealth, + health::ValidatorHealthState, }; use serde::{Deserialize, Serialize}; @@ -107,6 +97,7 @@ async fn four_validators_can_create_micro_blocks() { &(1u64..=4u64).collect::>(), &mut Some(hub), false, + false, ) .await; @@ -145,9 +136,14 @@ async fn validators_can_do_skip_block() { let env = MdbxDatabase::new_volatile(Default::default()).expect("Could not open a volatile database"); - let mut validators = - build_validators::(env, &(5u64..=10u64).collect::>(), &mut None, false) - .await; + let mut validators = build_validators::( + env, + &(5u64..=10u64).collect::>(), + &mut None, + false, + false, + ) + .await; // Disconnect the next block producer. let _validator = pop_validator_for_slot( @@ -183,9 +179,14 @@ async fn validator_can_recover_from_yellow_health() { let env = MdbxDatabase::new_volatile(Default::default()).expect("Could not open a volatile database"); - let validators = - build_validators::(env, &(5u64..=10u64).collect::>(), &mut None, false) - .await; + let validators = build_validators::( + env, + &(5u64..=10u64).collect::>(), + &mut None, + false, + true, + ) + .await; // Listen for blockchain events from the new block producer (after a skip block). let validator = validators.first().unwrap(); @@ -200,31 +201,39 @@ async fn validator_can_recover_from_yellow_health() { spawn(validator); } - validator_proxy.validator_health.write().publish = false; + validator_proxy + .validator_health + .write() + ._set_publish_flag(false); + + log::info!( + "Validator proxy address {}", + validator_proxy.validator_address.read() + ); - events.take(10).for_each(|_| future::ready(())).await; + events.take(30).for_each(|_| future::ready(())).await; - let current_validator_health = validator_proxy.validator_health.read().health; + let current_validator_health = validator_proxy.validator_health.read().health(); match current_validator_health { - ValidatorHealth::Yellow(block_number) => { - log::info!( - "Current validator health is yellow, as expected, inactivated block {}", - block_number - ) + ValidatorHealthState::Yellow => { + log::info!("Current validator health is yellow, as expected",) } _ => panic!("Validator Health different than expected"), }; // The validator should no longer be skip blocked: - validator_proxy.validator_health.write().publish = true; + validator_proxy + .validator_health + .write() + ._set_publish_flag(true); let events = blockchain.read().notifier_as_stream(); - events.take(40).for_each(|_| future::ready(())).await; + events.take(30).for_each(|_| future::ready(())).await; assert_eq!( - validator_proxy.validator_health.read().health, - ValidatorHealth::Green + validator_proxy.validator_health.read().health(), + ValidatorHealthState::Green ); } @@ -233,9 +242,14 @@ async fn validator_health_to_red() { let env = MdbxDatabase::new_volatile(Default::default()).expect("Could not open a volatile database"); - let validators = - build_validators::(env, &(5u64..=10u64).collect::>(), &mut None, false) - .await; + let validators = build_validators::( + env, + &(5u64..=10u64).collect::>(), + &mut None, + false, + true, + ) + .await; // Listen for blockchain events from the new block producer (after a skip block). let validator = validators.first().unwrap(); @@ -250,18 +264,18 @@ async fn validator_health_to_red() { spawn(validator); } - validator_proxy.validator_health.write().publish = false; + validator_proxy + .validator_health + .write() + ._set_publish_flag(false); - events.take(10).for_each(|_| future::ready(())).await; + events.take(30).for_each(|_| future::ready(())).await; - let current_validator_health = validator_proxy.validator_health.read().health; + let current_validator_health = validator_proxy.validator_health.read().health(); match current_validator_health { - ValidatorHealth::Yellow(block_number) => { - log::info!( - "Current validator health is yellow, as expected, inactivated block {}", - block_number - ) + ValidatorHealthState::Yellow => { + log::info!("Current validator health is yellow, as expected",) } _ => panic!("Validator Health different than expected"), }; @@ -271,14 +285,11 @@ async fn validator_health_to_red() { // Now we produce more blocks, and the validator should be inactivated again events.take(20).for_each(|_| future::ready(())).await; - let current_validator_health = validator_proxy.validator_health.read().health; + let current_validator_health = validator_proxy.validator_health.read().health(); match current_validator_health { - ValidatorHealth::Red(block_number) => { - log::info!( - "Current validator health is red, as expected, inactivated block {}", - block_number - ) + ValidatorHealthState::Red => { + log::info!("Current validator health is red, as expected",) } _ => panic!("Validator Health different than expected"), }; @@ -289,13 +300,17 @@ async fn validator_health_fully_recover() { let env = MdbxDatabase::new_volatile(Default::default()).expect("Could not open a volatile database"); - let validators = - build_validators::(env, &(5u64..=10u64).collect::>(), &mut None, false) - .await; + let validators = build_validators::( + env, + &(5u64..=10u64).collect::>(), + &mut None, + false, + true, + ) + .await; // Listen for blockchain events from the new block producer (after a skip block). let validator = validators.first().unwrap(); - let consensus = validator.consensus.clone(); let validator_proxy = validator.proxy(); let validator_address = validator.validator_address(); @@ -314,101 +329,31 @@ async fn validator_health_fully_recover() { spawn(validator); } - validator_proxy.validator_health.write().publish = false; + validator_proxy + .validator_health + .write() + ._set_publish_flag(false); - events.take(10).for_each(|_| future::ready(())).await; + events.take(50).for_each(|_| future::ready(())).await; - let current_validator_health = validator_proxy.validator_health.read().health; + let current_validator_health = validator_proxy.validator_health.read().health(); match current_validator_health { - ValidatorHealth::Yellow(block_number) => { - log::info!( - "Current validator health is yellow, as expected, inactivated block {}", - block_number - ) + ValidatorHealthState::Red => { + log::info!("Current validator health is red, as expected") } _ => panic!("Validator Health different than expected"), }; - let events = blockchain.read().notifier_as_stream(); - - // Now we produce more blocks, and the validator should be inactivated again - events.take(20).for_each(|_| future::ready(())).await; - - let current_validator_health = validator_proxy.validator_health.read().health; - - match current_validator_health { - ValidatorHealth::Red(block_number) => { - log::info!( - "Current validator health is red, as expected, inactivated block {}", - block_number - ) - } - _ => panic!("Validator Health different than expected"), - }; - - // Since the validator needs manual intervention, we are going to send the reactivate transaction - - let reactivate_transaction = TransactionBuilder::new_reactivate_validator( - &validator_proxy.fee_key.read(), - validator_address, - &validator_proxy.signing_key.read(), - Coin::ZERO, - Policy::genesis_block_number(), - NetworkId::UnitAlbatross, - ); - - spawn(async move { - log::info!("Sending reactivate transaction to the network"); - if consensus - .send_transaction(reactivate_transaction.clone()) - .await - .is_err() - { - log::error!("Failed to send reactivate transaction"); - } - }); - - validator_proxy.validator_health.write().publish = true; + validator_proxy + .validator_health + .write() + ._set_publish_flag(true); let events = blockchain.read().notifier_as_stream(); events.take(100).for_each(|_| future::ready(())).await; - let current_validator_health = validator_proxy.validator_health.read().health; - - assert_eq!(current_validator_health, ValidatorHealth::Green); -} + let current_validator_health = validator_proxy.validator_health.read().health(); -fn create_skip_block_update( - skip_block_info: SkipBlockInfo, - key_pair: BlsKeyPair, - validator_id: u16, - slots: &[u16], -) -> LevelUpdate { - // get a single signature for this skip block data - let signed_skip_block_info = - SignedSkipBlockInfo::from_message(skip_block_info, &key_pair.secret_key, validator_id); - - // multiply with number of slots to get a signature representing all the slots of this public_key - let signature = AggregateSignature::from_signatures(&[signed_skip_block_info - .signature - .multiply(slots.len() as u16)]); - - // compute the signers bitset (which is just all the slots) - let mut signers = BitSet::new(); - for &slot in slots { - signers.insert(slot as usize); - } - - // the contribution is composed of the signers bitset with the signature already multiplied by the number of slots. - let contribution = SignedSkipBlockMessage { - proof: MultiSignature::new(signature, signers), - }; - - LevelUpdate::new( - contribution.clone(), - Some(contribution), - 1, - validator_id as usize, - ) + assert_eq!(current_validator_health, ValidatorHealthState::Green); }