Skip to content
This repository has been archived by the owner on Aug 22, 2024. It is now read-only.

see time on ci based on number of stackslib mutants #62

Open
wants to merge 21 commits into
base: nr-mutants/targer
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
2f7aedd
chore: move existing relayer tests into net/tests/relay/epoch2x.rs, t…
jcnelson Jun 6, 2024
b256858
chore: move unsolicited message handling logic into its own file (net…
jcnelson Jun 6, 2024
f6dd43c
feat: nakamoto block-push logic for handling newly-received Nakamoto …
jcnelson Jun 6, 2024
4934b34
feat: unit test coverage for Nakamoto unsolicited message handling an…
jcnelson Jun 6, 2024
a794490
fix: log when we process a new nakamoto block
jcnelson Jun 6, 2024
b139a42
chore: implement NakamotoBlocks push message for p2p stack
jcnelson Jun 6, 2024
4e2f82c
chore: document all fault-injection flags, and expand the maximum num…
jcnelson Jun 6, 2024
58173a0
chore: fault-injection for Nakamoto block download
jcnelson Jun 6, 2024
991487c
chore: track pushed NakamotoBlocks in the NetworkResult struct, so th…
jcnelson Jun 6, 2024
175f5b8
chore: need Clone for StackerDBSyncResult (since we need it for Netwo…
jcnelson Jun 6, 2024
ad7f255
chore: relay test module
jcnelson Jun 6, 2024
f1c5463
Merge branch 'develop' into feat/nakamoto-block-push
jcnelson Jun 6, 2024
f2e8b88
chore: fmt
jcnelson Jun 6, 2024
a7c5a1f
fix: verify on receipt of the blocks that it's signed by the signers …
jcnelson Jun 6, 2024
ff8cfe7
chore: fix failing tests and expand test coverage
jcnelson Jun 11, 2024
442c6fe
chore: resource accounting for pushed nakamoto blocks
jcnelson Jun 11, 2024
c80b2c9
chore: option for maximum nakamoto block push bandwidth
jcnelson Jun 11, 2024
868be4c
chore: take &NakamotoBlock instead of NakamotoBlock
jcnelson Jun 11, 2024
afdbaad
chore: address PR feedback
jcnelson Jun 11, 2024
93b53dc
fix: build error
jcnelson Jun 12, 2024
04a9f8d
skip timeout network mutants stackslib
ASuciuX Jun 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions stackslib/src/chainstate/nakamoto/coordinator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,7 @@ impl<
/// with Some(pox-anchor-block-hash) until the reward cycle info is processed in the sortition
/// DB.
pub fn handle_new_nakamoto_stacks_block(&mut self) -> Result<Option<BlockHeaderHash>, Error> {
debug!("Handle new Nakamoto block");
let canonical_sortition_tip = self.canonical_sortition_tip.clone().expect(
"FAIL: processing a new Stacks block, but don't have a canonical sortition tip",
);
Expand Down
2 changes: 1 addition & 1 deletion stackslib/src/chainstate/nakamoto/coordinator/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ fn replay_reward_cycle(
&sortdb,
&mut sort_handle,
&mut node.chainstate,
block.clone(),
&block,
None,
)
.unwrap_or(false);
Expand Down
11 changes: 5 additions & 6 deletions stackslib/src/chainstate/nakamoto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1835,7 +1835,7 @@ impl NakamotoChainState {
/// Insert a Nakamoto block into the staging blocks DB
pub(crate) fn store_block(
staging_db_tx: &NakamotoStagingBlocksTx,
block: NakamotoBlock,
block: &NakamotoBlock,
burn_attachable: bool,
) -> Result<(), ChainstateError> {
let block_id = block.block_id();
Expand Down Expand Up @@ -1894,7 +1894,7 @@ impl NakamotoChainState {
/// Returns true if we stored the block; false if not.
pub fn accept_block(
config: &ChainstateConfig,
block: NakamotoBlock,
block: &NakamotoBlock,
db_handle: &mut SortitionHandleConn,
staging_db_tx: &NakamotoStagingBlocksTx,
headers_conn: &Connection,
Expand Down Expand Up @@ -1927,14 +1927,14 @@ impl NakamotoChainState {

// it's okay if this fails because we might not have the parent block yet. It will be
// checked on `::append_block()`
let expected_burn_opt = Self::get_expected_burns(db_handle, headers_conn, &block)?;
let expected_burn_opt = Self::get_expected_burns(db_handle, headers_conn, block)?;

// this block must be consistent with its miner's leader-key and block-commit, and must
// contain only transactions that are valid in this epoch.
if let Err(e) = Self::validate_nakamoto_block_burnchain(
db_handle,
expected_burn_opt,
&block,
block,
config.mainnet,
config.chain_id,
) {
Expand All @@ -1958,9 +1958,8 @@ impl NakamotoChainState {
// same sortition history as `db_handle` (and thus it must be burn_attachable)
let burn_attachable = true;

let _block_id = block.block_id();
Self::store_block(staging_db_tx, block, burn_attachable)?;
test_debug!("Stored Nakamoto block {}", &_block_id);
test_debug!("Stored Nakamoto block {}", &block.block_id());
Ok(true)
}

Expand Down
6 changes: 3 additions & 3 deletions stackslib/src/chainstate/nakamoto/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1069,7 +1069,7 @@ pub fn test_load_store_update_nakamoto_blocks() {
300,
)
.unwrap();
NakamotoChainState::store_block(&staging_tx, nakamoto_block.clone(), false).unwrap();
NakamotoChainState::store_block(&staging_tx, &nakamoto_block, false).unwrap();

// tenure has one block
assert_eq!(
Expand Down Expand Up @@ -1102,7 +1102,7 @@ pub fn test_load_store_update_nakamoto_blocks() {
)
.unwrap();

NakamotoChainState::store_block(&staging_tx, nakamoto_block_2.clone(), false).unwrap();
NakamotoChainState::store_block(&staging_tx, &nakamoto_block_2, false).unwrap();

// tenure has two blocks
assert_eq!(
Expand All @@ -1123,7 +1123,7 @@ pub fn test_load_store_update_nakamoto_blocks() {
);

// store, but do not process, a block
NakamotoChainState::store_block(&staging_tx, nakamoto_block_3.clone(), false).unwrap();
NakamotoChainState::store_block(&staging_tx, &nakamoto_block_3, false).unwrap();

staging_tx.commit().unwrap();
tx.commit().unwrap();
Expand Down
4 changes: 2 additions & 2 deletions stackslib/src/chainstate/nakamoto/tests/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -632,7 +632,7 @@ impl TestStacksNode {
sortdb,
&mut sort_handle,
chainstate,
nakamoto_block.clone(),
&nakamoto_block,
None,
) {
Ok(accepted) => accepted,
Expand Down Expand Up @@ -1159,7 +1159,7 @@ impl<'a> TestPeer<'a> {
&sortdb,
&mut sort_handle,
&mut node.chainstate,
block,
&block,
None,
)
.unwrap();
Expand Down
124 changes: 124 additions & 0 deletions stackslib/src/net/chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ pub struct NeighborStats {
pub transaction_push_rx_counts: VecDeque<(u64, u64)>,
/// (timestamp, num bytes)
pub stackerdb_push_rx_counts: VecDeque<(u64, u64)>,
/// (timestamp, num bytes)
pub nakamoto_block_push_rx_counts: VecDeque<(u64, u64)>,
pub relayed_messages: HashMap<NeighborAddress, RelayStats>,
}

Expand All @@ -152,6 +154,7 @@ impl NeighborStats {
microblocks_push_rx_counts: VecDeque::new(),
transaction_push_rx_counts: VecDeque::new(),
stackerdb_push_rx_counts: VecDeque::new(),
nakamoto_block_push_rx_counts: VecDeque::new(),
relayed_messages: HashMap::new(),
}
}
Expand Down Expand Up @@ -214,6 +217,17 @@ impl NeighborStats {
}
}

/// Record that we recently received a Nakamoto blcok push of the given size.
/// Keeps track of the last `NUM_BANDWIDTH_POINTS` such events, so we can estimate the current
/// bandwidth consumed by Nakamoto block pushes
pub fn add_nakamoto_block_push(&mut self, message_size: u64) -> () {
self.nakamoto_block_push_rx_counts
.push_back((get_epoch_time_secs(), message_size));
while self.nakamoto_block_push_rx_counts.len() > NUM_BANDWIDTH_POINTS {
self.nakamoto_block_push_rx_counts.pop_front();
}
}

pub fn add_relayer(&mut self, addr: &NeighborAddress, num_bytes: u64) -> () {
if let Some(stats) = self.relayed_messages.get_mut(addr) {
stats.num_messages += 1;
Expand Down Expand Up @@ -298,6 +312,14 @@ impl NeighborStats {
NeighborStats::get_bandwidth(&self.stackerdb_push_rx_counts, BANDWIDTH_POINT_LIFETIME)
}

/// Get a peer's total nakamoto block bandwidth usage
pub fn get_nakamoto_block_push_bandwidth(&self) -> f64 {
NeighborStats::get_bandwidth(
&self.nakamoto_block_push_rx_counts,
BANDWIDTH_POINT_LIFETIME,
)
}

/// Determine how many of a particular message this peer has received
pub fn get_message_recv_count(&self, msg_id: StacksMessageID) -> u64 {
*(self.msg_rx_counts.get(&msg_id).unwrap_or(&0))
Expand Down Expand Up @@ -2217,6 +2239,45 @@ impl ConversationP2P {
Ok(None)
}

/// Validate a pushed Nakamoto block list.
/// Update bandwidth accounting, but forward the blocks along if we can accept them.
/// Possibly return a reply handle for a NACK if we throttle the remote sender
fn validate_nakamoto_block_push(
&mut self,
network: &PeerNetwork,
preamble: &Preamble,
relayers: Vec<RelayData>,
) -> Result<Option<ReplyHandleP2P>, net_error> {
assert!(preamble.payload_len > 1); // don't count 1-byte type prefix

let local_peer = network.get_local_peer();
let chain_view = network.get_chain_view();

if !self.process_relayers(local_peer, preamble, &relayers) {
warn!(
"Drop pushed Nakamoto blocks -- invalid relayers {:?}",
&relayers
);
self.stats.msgs_err += 1;
return Err(net_error::InvalidMessage);
}

self.stats
.add_nakamoto_block_push((preamble.payload_len as u64) - 1);

if self.connection.options.max_nakamoto_block_push_bandwidth > 0
&& self.stats.get_nakamoto_block_push_bandwidth()
> (self.connection.options.max_nakamoto_block_push_bandwidth as f64)
{
debug!("Neighbor {:?} exceeded max Nakamoto block push bandwidth of {} bytes/sec (currently at {})", &self.to_neighbor_key(), self.connection.options.max_nakamoto_block_push_bandwidth, self.stats.get_nakamoto_block_push_bandwidth());
return self
.reply_nack(local_peer, chain_view, preamble, NackErrorCodes::Throttled)
.and_then(|handle| Ok(Some(handle)));
}

Ok(None)
}

/// Handle an inbound authenticated p2p data-plane message.
/// Return the message if not handled
fn handle_data_message(
Expand Down Expand Up @@ -2305,6 +2366,21 @@ impl ConversationP2P {
}
}
}
StacksMessageType::NakamotoBlocks(_) => {
// not handled here, but do some accounting -- we can't receive too many
// Nakamoto blocks per second
match self.validate_nakamoto_block_push(
network,
&msg.preamble,
msg.relayers.clone(),
)? {
Some(handle) => Ok(handle),
None => {
// will forward upstream
return Ok(Some(msg));
}
}
}
_ => {
// all else will forward upstream
return Ok(Some(msg));
Expand Down Expand Up @@ -6603,6 +6679,54 @@ mod test {
assert_eq!(bw_stats.get_stackerdb_push_bandwidth(), 110.0);
}

#[test]
fn test_neighbor_stats_nakamoto_block_push_bandwidth() {
let mut stats = NeighborStats::new(false);

assert_eq!(stats.get_nakamoto_block_push_bandwidth(), 0.0);

stats.add_nakamoto_block_push(100);
assert_eq!(stats.get_nakamoto_block_push_bandwidth(), 0.0);

// this should all happen in one second
let bw_stats = loop {
let mut bw_stats = stats.clone();
let start = get_epoch_time_secs();

for _ in 0..(NUM_BANDWIDTH_POINTS - 1) {
bw_stats.add_nakamoto_block_push(100);
}

let end = get_epoch_time_secs();
if end == start {
break bw_stats;
}
};

assert_eq!(
bw_stats.get_nakamoto_block_push_bandwidth(),
(NUM_BANDWIDTH_POINTS as f64) * 100.0
);

// space some out; make sure it takes 11 seconds
let bw_stats = loop {
let mut bw_stats = NeighborStats::new(false);
let start = get_epoch_time_secs();
for _ in 0..11 {
bw_stats.add_nakamoto_block_push(100);
sleep_ms(1001);
}

let end = get_epoch_time_secs();
if end == start + 11 {
break bw_stats;
}
};

// 100 bytes/sec
assert_eq!(bw_stats.get_nakamoto_block_push_bandwidth(), 110.0);
}

#[test]
fn test_sign_relay_forward_message() {
let conn_opts = ConnectionOptions::default();
Expand Down
49 changes: 49 additions & 0 deletions stackslib/src/net/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use stacks_common::util::secp256k1::{

use crate::burnchains::{BurnchainView, PrivateKey, PublicKey};
use crate::chainstate::burn::ConsensusHash;
use crate::chainstate::nakamoto::NakamotoBlock;
use crate::chainstate::stacks::{
StacksBlock, StacksMicroblock, StacksPublicKey, StacksTransaction, MAX_BLOCK_LEN,
};
Expand Down Expand Up @@ -353,6 +354,37 @@ impl NakamotoInvData {
}
}

impl StacksMessageCodec for NakamotoBlocksData {
#[cfg_attr(test, mutants::skip)]
fn consensus_serialize<W: Write>(&self, fd: &mut W) -> Result<(), codec_error> {
write_next(fd, &self.blocks)?;
Ok(())
}

fn consensus_deserialize<R: Read>(fd: &mut R) -> Result<Self, codec_error> {
let blocks: Vec<NakamotoBlock> = {
// loose upper-bound
let mut bound_read = BoundReader::from_reader(fd, MAX_MESSAGE_LEN as u64);
read_next_at_most::<_, NakamotoBlock>(&mut bound_read, NAKAMOTO_BLOCKS_PUSHED_MAX)
}?;

// only valid if there are no dups
let mut present = HashSet::new();
for block in blocks.iter() {
if present.contains(&block.block_id()) {
// no dups allowed
return Err(codec_error::DeserializeError(
"Invalid NakamotoBlocksData: duplicate block".to_string(),
));
}

present.insert(block.block_id());
}

Ok(NakamotoBlocksData { blocks })
}
}

impl StacksMessageCodec for GetPoxInv {
fn consensus_serialize<W: Write>(&self, fd: &mut W) -> Result<(), codec_error> {
write_next(fd, &self.consensus_hash)?;
Expand Down Expand Up @@ -930,6 +962,7 @@ impl StacksMessageType {
StacksMessageType::StackerDBPushChunk(ref _m) => StacksMessageID::StackerDBPushChunk,
StacksMessageType::GetNakamotoInv(ref _m) => StacksMessageID::GetNakamotoInv,
StacksMessageType::NakamotoInv(ref _m) => StacksMessageID::NakamotoInv,
StacksMessageType::NakamotoBlocks(ref _m) => StacksMessageID::NakamotoBlocks,
}
}

Expand Down Expand Up @@ -964,6 +997,7 @@ impl StacksMessageType {
StacksMessageType::StackerDBPushChunk(ref _m) => "StackerDBPushChunk",
StacksMessageType::GetNakamotoInv(ref _m) => "GetNakamotoInv",
StacksMessageType::NakamotoInv(ref _m) => "NakamotoInv",
StacksMessageType::NakamotoBlocks(ref _m) => "NakamotoBlocks",
}
}

Expand Down Expand Up @@ -1071,6 +1105,15 @@ impl StacksMessageType {
StacksMessageType::NakamotoInv(ref m) => {
format!("NakamotoInv({:?})", &m.tenures)
}
StacksMessageType::NakamotoBlocks(ref m) => {
format!(
"NakamotoBlocks({:?})",
m.blocks
.iter()
.map(|block| block.block_id())
.collect::<Vec<_>>()
)
}
}
}
}
Expand Down Expand Up @@ -1122,6 +1165,7 @@ impl StacksMessageCodec for StacksMessageID {
}
x if x == StacksMessageID::GetNakamotoInv as u8 => StacksMessageID::GetNakamotoInv,
x if x == StacksMessageID::NakamotoInv as u8 => StacksMessageID::NakamotoInv,
x if x == StacksMessageID::NakamotoBlocks as u8 => StacksMessageID::NakamotoBlocks,
_ => {
return Err(codec_error::DeserializeError(
"Unknown message ID".to_string(),
Expand Down Expand Up @@ -1166,6 +1210,7 @@ impl StacksMessageCodec for StacksMessageType {
StacksMessageType::StackerDBPushChunk(ref m) => write_next(fd, m)?,
StacksMessageType::GetNakamotoInv(ref m) => write_next(fd, m)?,
StacksMessageType::NakamotoInv(ref m) => write_next(fd, m)?,
StacksMessageType::NakamotoBlocks(ref m) => write_next(fd, m)?,
}
Ok(())
}
Expand Down Expand Up @@ -1276,6 +1321,10 @@ impl StacksMessageCodec for StacksMessageType {
let m: NakamotoInvData = read_next(fd)?;
StacksMessageType::NakamotoInv(m)
}
StacksMessageID::NakamotoBlocks => {
let m: NakamotoBlocksData = read_next(fd)?;
StacksMessageType::NakamotoBlocks(m)
}
StacksMessageID::Reserved => {
return Err(codec_error::DeserializeError(
"Unsupported message ID 'reserved'".to_string(),
Expand Down
Loading
Loading