Skip to content

Feat/miners read signer state machine updates #6064

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ and this project adheres to the versioning scheme outlined in the [README.md](RE

- Added new `ValidateRejectCode` values to the `/v3/block_proposal` endpoint
- Added `StateMachineUpdateContent::V1` to support a vector of `StacksTransaction` expected to be replayed in subsequent Stacks blocks
- Updated `StackerDBListener` to listen for signers' state machine updates and store replay info to the side to enable a miner to perform transaction replay

### Changed

Expand Down
10 changes: 9 additions & 1 deletion testnet/stacks-node/src/nakamoto_node/signer_coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use stacks::chainstate::burn::{BlockSnapshot, ConsensusHash};
use stacks::chainstate::nakamoto::{NakamotoBlock, NakamotoChainState};
use stacks::chainstate::stacks::boot::{RewardSet, MINERS_NAME};
use stacks::chainstate::stacks::db::StacksChainState;
use stacks::chainstate::stacks::Error as ChainstateError;
use stacks::chainstate::stacks::{Error as ChainstateError, StacksTransaction};
use stacks::codec::StacksMessageCodec;
use stacks::libstackerdb::StackerDBChunkData;
use stacks::net::stackerdb::StackerDBs;
Expand Down Expand Up @@ -97,6 +97,7 @@ impl SignerCoordinator {
reward_set,
election_block,
burnchain,
config,
)?;
let is_mainnet = config.is_mainnet();
let rpc_socket = config
Expand Down Expand Up @@ -484,6 +485,13 @@ impl SignerCoordinator {
.get_tenure_extend_timestamp(self.weight_threshold)
}

/// Get the transactions that at least 70% of the signing power are
/// expecting to be replayed.
pub fn get_replay_transactions(&self) -> Vec<StacksTransaction> {
self.stackerdb_comms
.get_replay_transactions(self.weight_threshold)
}

/// Check if the tenure needs to change
fn check_burn_tip_changed(&self, sortdb: &SortitionDB) -> bool {
let cur_burn_chain_tip = SortitionDB::get_canonical_burn_chain_tip(sortdb.conn())
Expand Down
142 changes: 137 additions & 5 deletions testnet/stacks-node/src/nakamoto_node/stackerdb_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,19 @@ use std::sync::{Arc, Condvar, Mutex};
use std::time::Duration;

use hashbrown::{HashMap, HashSet};
use libsigner::v0::messages::{BlockAccepted, BlockResponse, SignerMessage as SignerMessageV0};
use libsigner::SignerEvent;
use libsigner::v0::messages::{
BlockAccepted, BlockResponse, MessageSlotID, SignerMessage as SignerMessageV0,
StateMachineUpdate, StateMachineUpdateContent,
};
use libsigner::{SignerEvent, SignerSession, StackerDBSession};
use stacks::burnchains::Burnchain;
use stacks::chainstate::burn::BlockSnapshot;
use stacks::chainstate::nakamoto::NakamotoBlockHeader;
use stacks::chainstate::stacks::boot::{NakamotoSignerEntry, RewardSet, SIGNERS_NAME};
use stacks::chainstate::stacks::events::StackerDBChunksEvent;
use stacks::chainstate::stacks::Error as ChainstateError;
use stacks::chainstate::stacks::{Error as ChainstateError, StacksTransaction};
use stacks::codec::StacksMessageCodec;
use stacks::net::stackerdb::StackerDBs;
use stacks::types::chainstate::StacksPublicKey;
use stacks::types::PublicKey;
use stacks::util::get_epoch_time_secs;
Expand All @@ -40,6 +45,7 @@ use stacks_common::util::tests::TestFlag;

use super::Error as NakamotoNodeError;
use crate::event_dispatcher::StackerDBChannel;
use crate::Config;

#[cfg(test)]
/// Fault injection flag to prevent the miner from seeing enough signer signatures.
Expand Down Expand Up @@ -68,6 +74,12 @@ pub(crate) struct TimestampInfo {
pub weight: u32,
}

#[derive(Debug, Clone)]
pub(crate) struct ReplayInfo {
pub transactions: Vec<StacksTransaction>,
pub weight: u32,
}

/// The listener for the StackerDB, which listens for messages from the
/// signers and tracks the state of block signatures and idle timestamps.
pub struct StackerDBListener {
Expand Down Expand Up @@ -96,6 +108,11 @@ pub struct StackerDBListener {
/// - key: StacksPublicKey
/// - value: TimestampInfo
pub(crate) signer_idle_timestamps: Arc<Mutex<HashMap<StacksPublicKey, TimestampInfo>>>,
/// Tracks any replay transactions from signers to decide when the miner should
/// attempt to replay reorged blocks
/// - key: StacksPublicKey
/// - value: Vec<StacksTransaction>
pub(crate) replay_info: Arc<Mutex<HashMap<StacksPublicKey, ReplayInfo>>>,
}

/// Interface for other threads to retrieve info from the StackerDBListener
Expand All @@ -109,6 +126,11 @@ pub struct StackerDBListenerComms {
/// - key: StacksPublicKey
/// - value: TimestampInfo
signer_idle_timestamps: Arc<Mutex<HashMap<StacksPublicKey, TimestampInfo>>>,
/// Tracks any replay transactions from signers to decide when the miner should
/// attempt to replay reorged blocks
/// - key: StacksPublicKey
/// - value: ReplayInfo
replay_info: Arc<Mutex<HashMap<StacksPublicKey, ReplayInfo>>>,
}

impl StackerDBListener {
Expand All @@ -119,6 +141,7 @@ impl StackerDBListener {
reward_set: &RewardSet,
burn_tip: &BlockSnapshot,
burnchain: &Burnchain,
config: &Config,
) -> Result<Self, ChainstateError> {
let (receiver, replaced_other) = stackerdb_channel
.lock()
Expand Down Expand Up @@ -161,6 +184,60 @@ impl StackerDBListener {
})
.collect::<Result<HashMap<_, _>, ChainstateError>>()?;

let reward_cycle = burnchain
.block_height_to_reward_cycle(burn_tip.block_height)
.expect("BUG: unknown reward cycle");
let signers_contract_id = MessageSlotID::StateMachineUpdate
.stacker_db_contract(config.is_mainnet(), reward_cycle);
let rpc_socket = config
.node
.get_rpc_loopback()
.ok_or_else(|| ChainstateError::MinerAborted)?;
let mut signers_session =
StackerDBSession::new(&rpc_socket.to_string(), signers_contract_id.clone());
let stackerdbs = StackerDBs::connect(&config.get_stacker_db_file_path(), false)?;
let slot_ids: Vec<_> = stackerdbs
.get_signers(&signers_contract_id)
.expect("FATAL: could not get signers from stacker DB")
.into_iter()
.enumerate()
.map(|(slot_id, _)| {
u32::try_from(slot_id).expect("FATAL: too many signers to fit into u32 range")
})
.collect();
let chunks = signers_session
.get_latest_chunks(&slot_ids)
.inspect_err(|e| warn!("Unable to read the latest signer state from signer db: {e}."))
.unwrap_or_default();
let mut replay_infos = HashMap::new();
for (chunk, slot_id) in chunks.into_iter().zip(slot_ids) {
let Some(chunk) = chunk else {
continue;
};
let Some(signer_entry) = &signer_entries.get(&slot_id) else {
continue;
};
let Ok(signer_pubkey) = StacksPublicKey::from_slice(&signer_entry.signing_key) else {
continue;
};
if let Ok(SignerMessageV0::StateMachineUpdate(update)) =
SignerMessageV0::consensus_deserialize(&mut chunk.as_slice())
{
let transactions = match update.content {
StateMachineUpdateContent::V0 { .. } => vec![],
StateMachineUpdateContent::V1 {
replay_transactions,
..
} => replay_transactions,
};
let replay_info = ReplayInfo {
transactions,
weight: signer_entry.weight,
};
replay_infos.insert(signer_pubkey, replay_info);
}
}

Ok(Self {
stackerdb_channel,
receiver: Some(receiver),
Expand All @@ -172,13 +249,15 @@ impl StackerDBListener {
signer_entries,
blocks: Arc::new((Mutex::new(HashMap::new()), Condvar::new())),
signer_idle_timestamps: Arc::new(Mutex::new(HashMap::new())),
replay_info: Arc::new(Mutex::new(replay_infos)),
})
}

pub fn get_comms(&self) -> StackerDBListenerComms {
StackerDBListenerComms {
blocks: self.blocks.clone(),
signer_idle_timestamps: self.signer_idle_timestamps.clone(),
replay_info: self.replay_info.clone(),
}
}

Expand Down Expand Up @@ -445,8 +524,8 @@ impl StackerDBListener {
| SignerMessageV0::MockBlock(_) => {
debug!("Received mock message. Ignoring.");
}
SignerMessageV0::StateMachineUpdate(_) => {
debug!("Received state machine update message. Ignoring.");
SignerMessageV0::StateMachineUpdate(update) => {
self.update_replay_info(signer_pubkey, signer_entry.weight, update);
}
};
}
Expand All @@ -472,6 +551,32 @@ impl StackerDBListener {
idle_timestamps.insert(signer_pubkey, timestamp_info);
}

fn update_replay_info(
&self,
signer_pubkey: StacksPublicKey,
weight: u32,
update: StateMachineUpdate,
) {
let transactions = match update.content {
StateMachineUpdateContent::V0 { .. } => vec![],
StateMachineUpdateContent::V1 {
replay_transactions,
..
} => replay_transactions,
};
let mut replay_infos = self
.replay_info
.lock()
.expect("FATAL: failed to lock idle timestamps");

// Update the map with the replay info and weight
let replay_info = ReplayInfo {
transactions,
weight,
};
replay_infos.insert(signer_pubkey, replay_info);
}

/// Do we ignore signer signatures?
#[cfg(test)]
fn fault_injection_ignore_signatures() -> bool {
Expand Down Expand Up @@ -597,4 +702,31 @@ impl StackerDBListenerComms {
// tenure.
u64::MAX
}

/// Get the transactions that at least 70% of the signing power expect to be replayed in
/// the next stacks block
pub fn get_replay_transactions(&self, weight_threshold: u32) -> Vec<StacksTransaction> {
let replay_info = self
.replay_info
.lock()
.expect("FATAL: failed to lock replay transactions");

let replay_info = replay_info.values().collect::<Vec<_>>();
let mut weights: HashMap<&Vec<StacksTransaction>, u32> = HashMap::new();
for info in replay_info {
// We only care about signers voting for us to replay a specific set of transactions
if info.transactions.is_empty() {
continue;
}
let entry = weights.entry(&info.transactions).or_default();
*entry += info.weight;
if *entry >= weight_threshold {
debug!("SignerCoordinator: 70% threshold reached to attempt replay transactions";
"replay_transactions" => ?info.transactions,
);
return info.transactions.clone();
}
}
vec![]
}
}
Loading