diff --git a/CHANGELOG.md b/CHANGELOG.md index fb6b907814..fdafc32ec2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ and this project adheres to the versioning scheme outlined in the [README.md](RE - Added new `ValidateRejectCode` values to the `/v3/block_proposal` endpoint - Added `StateMachineUpdateContent::V1` to support a vector of `StacksTransaction` expected to be replayed in subsequent Stacks blocks +- Updated `StackerDBListener` to listen for signers' state machine updates and store replay info to the side to enable a miner to perform transaction replay ### Changed diff --git a/testnet/stacks-node/src/nakamoto_node/signer_coordinator.rs b/testnet/stacks-node/src/nakamoto_node/signer_coordinator.rs index 4e6482c25a..34b80d0374 100644 --- a/testnet/stacks-node/src/nakamoto_node/signer_coordinator.rs +++ b/testnet/stacks-node/src/nakamoto_node/signer_coordinator.rs @@ -28,7 +28,7 @@ use stacks::chainstate::burn::{BlockSnapshot, ConsensusHash}; use stacks::chainstate::nakamoto::{NakamotoBlock, NakamotoChainState}; use stacks::chainstate::stacks::boot::{RewardSet, MINERS_NAME}; use stacks::chainstate::stacks::db::StacksChainState; -use stacks::chainstate::stacks::Error as ChainstateError; +use stacks::chainstate::stacks::{Error as ChainstateError, StacksTransaction}; use stacks::codec::StacksMessageCodec; use stacks::libstackerdb::StackerDBChunkData; use stacks::net::stackerdb::StackerDBs; @@ -97,6 +97,7 @@ impl SignerCoordinator { reward_set, election_block, burnchain, + config, )?; let is_mainnet = config.is_mainnet(); let rpc_socket = config @@ -484,6 +485,13 @@ impl SignerCoordinator { .get_tenure_extend_timestamp(self.weight_threshold) } + /// Get the transactions that at least 70% of the signing power are + /// expecting to be replayed. + pub fn get_replay_transactions(&self) -> Vec { + self.stackerdb_comms + .get_replay_transactions(self.weight_threshold) + } + /// Check if the tenure needs to change fn check_burn_tip_changed(&self, sortdb: &SortitionDB) -> bool { let cur_burn_chain_tip = SortitionDB::get_canonical_burn_chain_tip(sortdb.conn()) diff --git a/testnet/stacks-node/src/nakamoto_node/stackerdb_listener.rs b/testnet/stacks-node/src/nakamoto_node/stackerdb_listener.rs index b0a317c9e1..5d80497ec8 100644 --- a/testnet/stacks-node/src/nakamoto_node/stackerdb_listener.rs +++ b/testnet/stacks-node/src/nakamoto_node/stackerdb_listener.rs @@ -22,14 +22,19 @@ use std::sync::{Arc, Condvar, Mutex}; use std::time::Duration; use hashbrown::{HashMap, HashSet}; -use libsigner::v0::messages::{BlockAccepted, BlockResponse, SignerMessage as SignerMessageV0}; -use libsigner::SignerEvent; +use libsigner::v0::messages::{ + BlockAccepted, BlockResponse, MessageSlotID, SignerMessage as SignerMessageV0, + StateMachineUpdate, StateMachineUpdateContent, +}; +use libsigner::{SignerEvent, SignerSession, StackerDBSession}; use stacks::burnchains::Burnchain; use stacks::chainstate::burn::BlockSnapshot; use stacks::chainstate::nakamoto::NakamotoBlockHeader; use stacks::chainstate::stacks::boot::{NakamotoSignerEntry, RewardSet, SIGNERS_NAME}; use stacks::chainstate::stacks::events::StackerDBChunksEvent; -use stacks::chainstate::stacks::Error as ChainstateError; +use stacks::chainstate::stacks::{Error as ChainstateError, StacksTransaction}; +use stacks::codec::StacksMessageCodec; +use stacks::net::stackerdb::StackerDBs; use stacks::types::chainstate::StacksPublicKey; use stacks::types::PublicKey; use stacks::util::get_epoch_time_secs; @@ -40,6 +45,7 @@ use stacks_common::util::tests::TestFlag; use super::Error as NakamotoNodeError; use crate::event_dispatcher::StackerDBChannel; +use crate::Config; #[cfg(test)] /// Fault injection flag to prevent the miner from seeing enough signer signatures. @@ -68,6 +74,12 @@ pub(crate) struct TimestampInfo { pub weight: u32, } +#[derive(Debug, Clone)] +pub(crate) struct ReplayInfo { + pub transactions: Vec, + pub weight: u32, +} + /// The listener for the StackerDB, which listens for messages from the /// signers and tracks the state of block signatures and idle timestamps. pub struct StackerDBListener { @@ -96,6 +108,11 @@ pub struct StackerDBListener { /// - key: StacksPublicKey /// - value: TimestampInfo pub(crate) signer_idle_timestamps: Arc>>, + /// Tracks any replay transactions from signers to decide when the miner should + /// attempt to replay reorged blocks + /// - key: StacksPublicKey + /// - value: Vec + pub(crate) replay_info: Arc>>, } /// Interface for other threads to retrieve info from the StackerDBListener @@ -109,6 +126,11 @@ pub struct StackerDBListenerComms { /// - key: StacksPublicKey /// - value: TimestampInfo signer_idle_timestamps: Arc>>, + /// Tracks any replay transactions from signers to decide when the miner should + /// attempt to replay reorged blocks + /// - key: StacksPublicKey + /// - value: ReplayInfo + replay_info: Arc>>, } impl StackerDBListener { @@ -119,6 +141,7 @@ impl StackerDBListener { reward_set: &RewardSet, burn_tip: &BlockSnapshot, burnchain: &Burnchain, + config: &Config, ) -> Result { let (receiver, replaced_other) = stackerdb_channel .lock() @@ -161,6 +184,60 @@ impl StackerDBListener { }) .collect::, ChainstateError>>()?; + let reward_cycle = burnchain + .block_height_to_reward_cycle(burn_tip.block_height) + .expect("BUG: unknown reward cycle"); + let signers_contract_id = MessageSlotID::StateMachineUpdate + .stacker_db_contract(config.is_mainnet(), reward_cycle); + let rpc_socket = config + .node + .get_rpc_loopback() + .ok_or_else(|| ChainstateError::MinerAborted)?; + let mut signers_session = + StackerDBSession::new(&rpc_socket.to_string(), signers_contract_id.clone()); + let stackerdbs = StackerDBs::connect(&config.get_stacker_db_file_path(), false)?; + let slot_ids: Vec<_> = stackerdbs + .get_signers(&signers_contract_id) + .expect("FATAL: could not get signers from stacker DB") + .into_iter() + .enumerate() + .map(|(slot_id, _)| { + u32::try_from(slot_id).expect("FATAL: too many signers to fit into u32 range") + }) + .collect(); + let chunks = signers_session + .get_latest_chunks(&slot_ids) + .inspect_err(|e| warn!("Unable to read the latest signer state from signer db: {e}.")) + .unwrap_or_default(); + let mut replay_infos = HashMap::new(); + for (chunk, slot_id) in chunks.into_iter().zip(slot_ids) { + let Some(chunk) = chunk else { + continue; + }; + let Some(signer_entry) = &signer_entries.get(&slot_id) else { + continue; + }; + let Ok(signer_pubkey) = StacksPublicKey::from_slice(&signer_entry.signing_key) else { + continue; + }; + if let Ok(SignerMessageV0::StateMachineUpdate(update)) = + SignerMessageV0::consensus_deserialize(&mut chunk.as_slice()) + { + let transactions = match update.content { + StateMachineUpdateContent::V0 { .. } => vec![], + StateMachineUpdateContent::V1 { + replay_transactions, + .. + } => replay_transactions, + }; + let replay_info = ReplayInfo { + transactions, + weight: signer_entry.weight, + }; + replay_infos.insert(signer_pubkey, replay_info); + } + } + Ok(Self { stackerdb_channel, receiver: Some(receiver), @@ -172,6 +249,7 @@ impl StackerDBListener { signer_entries, blocks: Arc::new((Mutex::new(HashMap::new()), Condvar::new())), signer_idle_timestamps: Arc::new(Mutex::new(HashMap::new())), + replay_info: Arc::new(Mutex::new(replay_infos)), }) } @@ -179,6 +257,7 @@ impl StackerDBListener { StackerDBListenerComms { blocks: self.blocks.clone(), signer_idle_timestamps: self.signer_idle_timestamps.clone(), + replay_info: self.replay_info.clone(), } } @@ -445,8 +524,8 @@ impl StackerDBListener { | SignerMessageV0::MockBlock(_) => { debug!("Received mock message. Ignoring."); } - SignerMessageV0::StateMachineUpdate(_) => { - debug!("Received state machine update message. Ignoring."); + SignerMessageV0::StateMachineUpdate(update) => { + self.update_replay_info(signer_pubkey, signer_entry.weight, update); } }; } @@ -472,6 +551,32 @@ impl StackerDBListener { idle_timestamps.insert(signer_pubkey, timestamp_info); } + fn update_replay_info( + &self, + signer_pubkey: StacksPublicKey, + weight: u32, + update: StateMachineUpdate, + ) { + let transactions = match update.content { + StateMachineUpdateContent::V0 { .. } => vec![], + StateMachineUpdateContent::V1 { + replay_transactions, + .. + } => replay_transactions, + }; + let mut replay_infos = self + .replay_info + .lock() + .expect("FATAL: failed to lock idle timestamps"); + + // Update the map with the replay info and weight + let replay_info = ReplayInfo { + transactions, + weight, + }; + replay_infos.insert(signer_pubkey, replay_info); + } + /// Do we ignore signer signatures? #[cfg(test)] fn fault_injection_ignore_signatures() -> bool { @@ -597,4 +702,31 @@ impl StackerDBListenerComms { // tenure. u64::MAX } + + /// Get the transactions that at least 70% of the signing power expect to be replayed in + /// the next stacks block + pub fn get_replay_transactions(&self, weight_threshold: u32) -> Vec { + let replay_info = self + .replay_info + .lock() + .expect("FATAL: failed to lock replay transactions"); + + let replay_info = replay_info.values().collect::>(); + let mut weights: HashMap<&Vec, u32> = HashMap::new(); + for info in replay_info { + // We only care about signers voting for us to replay a specific set of transactions + if info.transactions.is_empty() { + continue; + } + let entry = weights.entry(&info.transactions).or_default(); + *entry += info.weight; + if *entry >= weight_threshold { + debug!("SignerCoordinator: 70% threshold reached to attempt replay transactions"; + "replay_transactions" => ?info.transactions, + ); + return info.transactions.clone(); + } + } + vec![] + } }