Skip to content

Commit

Permalink
test(state-sync): add test case for forks of the sync block (#12698)
Browse files Browse the repository at this point in the history
This adds a test that currently fails because the state sync block
doesn't handle the case where the first sync block seen doesn't end up
finalized. We add a drop_blocks_by_height() function to the builder that
will have the network layer drop any block messages for the given
height. The result of this is that the block producer for the sync block
will start state syncing with that sync_hash, but no other nodes will
ever see that block, so they'll all know about a different sync_hash.
This test case relies on the fact that this block producer will need to
state sync in that epoch. This happens to be true, but in the future it
would be good to more explicitly define what shards will be tracked up
front.
  • Loading branch information
marcelo-gonzalez authored Jan 9, 2025
1 parent 869ed26 commit 3564f4b
Show file tree
Hide file tree
Showing 3 changed files with 173 additions and 23 deletions.
20 changes: 17 additions & 3 deletions integration-tests/src/test_loop/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use near_primitives::epoch_manager::EpochConfigStore;
use near_primitives::network::PeerId;
use near_primitives::sharding::ShardChunkHeader;
use near_primitives::test_utils::create_test_signer;
use near_primitives::types::{AccountId, ShardId, ShardIndex};
use near_primitives::types::{AccountId, BlockHeight, ShardId, ShardIndex};
use near_primitives::upgrade_schedule::ProtocolUpgradeVotingSchedule;
use near_primitives::version::PROTOCOL_UPGRADE_SCHEDULE;
use near_store::adapter::StoreAdapter;
Expand All @@ -44,7 +44,9 @@ use near_vm_runner::{ContractRuntimeCache, FilesystemContractRuntimeCache};
use nearcore::state_sync::StateSyncDumper;

use super::env::{ClientToShardsManagerSender, TestData, TestLoopChunksStorage, TestLoopEnv};
use super::utils::network::{chunk_endorsement_dropper, chunk_endorsement_dropper_by_hash};
use super::utils::network::{
block_dropper_by_height, chunk_endorsement_dropper, chunk_endorsement_dropper_by_hash,
};
use near_chain::resharding::resharding_actor::ReshardingActor;

enum DropConditionKind {
Expand All @@ -62,6 +64,8 @@ enum DropConditionKind {
/// self.0[`shard_id`][`height_created` - `epoch_start`] is true, or if
/// `height_created` - `epoch_start` > self.0[`shard_id`].len()
ChunksProducedByHeight(HashMap<ShardId, Vec<bool>>),
// Drops Block broadcast messages with height in `self.0`
BlocksByHeight(HashSet<BlockHeight>),
}

pub(crate) struct TestLoopBuilder {
Expand All @@ -82,7 +86,7 @@ pub(crate) struct TestLoopBuilder {
archival_clients: HashSet<AccountId>,
/// Will store all chunks produced within the test loop.
chunks_storage: Arc<Mutex<TestLoopChunksStorage>>,
/// Conditions under which chunks/endorsements are dropped.
/// Conditions under which chunks/endorsements/blocks are dropped.
drop_condition_kinds: Vec<DropConditionKind>,
/// Number of latest epochs to keep before garbage collecting associated data.
gc_num_epochs_to_keep: Option<u64>,
Expand Down Expand Up @@ -278,6 +282,9 @@ fn register_drop_condition(
drop_chunks_condition,
));
}
DropConditionKind::BlocksByHeight(heights) => {
peer_manager_actor.register_override_handler(block_dropper_by_height(heights.clone()));
}
}
}

Expand Down Expand Up @@ -388,6 +395,13 @@ impl TestLoopBuilder {
self
}

pub(crate) fn drop_blocks_by_height(mut self, heights: HashSet<BlockHeight>) -> Self {
if !heights.is_empty() {
self.drop_condition_kinds.push(DropConditionKind::BlocksByHeight(heights));
}
self
}

pub(crate) fn gc_num_epochs_to_keep(mut self, num_epochs: u64) -> Self {
self.gc_num_epochs_to_keep = Some(num_epochs);
self
Expand Down
144 changes: 125 additions & 19 deletions integration-tests/src/test_loop/tests/state_sync.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use near_async::messaging::{Handler, SendAsync};
use near_async::test_loop::TestLoopV2;
use near_async::time::Duration;
use near_chain::ChainStoreAccess;
use near_chain_configs::test_genesis::{
TestEpochConfigBuilder, TestGenesisBuilder, ValidatorsSpec,
};
Expand All @@ -11,8 +12,10 @@ use near_primitives::hash::CryptoHash;
use near_primitives::shard_layout::ShardLayout;
use near_primitives::test_utils::create_user_test_signer;
use near_primitives::transaction::SignedTransaction;
use near_primitives::types::{AccountId, AccountInfo, BlockHeightDelta, Nonce, NumSeats, ShardId};
use near_primitives::version::PROTOCOL_VERSION;
use near_primitives::types::{
AccountId, AccountInfo, BlockHeight, BlockHeightDelta, Nonce, NumSeats, ShardId,
};
use near_primitives::version::{ProtocolFeature, PROTOCOL_VERSION};

use crate::test_loop::builder::TestLoopBuilder;
use crate::test_loop::env::{TestData, TestLoopEnv};
Expand Down Expand Up @@ -67,18 +70,20 @@ fn generate_accounts(boundary_accounts: &[String]) -> Vec<Vec<(AccountId, Nonce)
struct TestState {
env: TestLoopEnv,
accounts: Option<Vec<Vec<(AccountId, Nonce)>>>,
skip_sync_block_height: Option<BlockHeight>,
}

fn setup_initial_blockchain(
num_validators: usize,
num_block_producer_seats: usize,
num_chunk_producer_seats: usize,
num_shards: usize,
generate_shard_accounts: bool,
chunks_produced: HashMap<ShardId, Vec<bool>>,
skip_sync_block: bool,
) -> TestState {
let builder = TestLoopBuilder::new();
let mut builder = TestLoopBuilder::new();

let num_block_producer_seats = 1;
let num_chunk_producer_seats = num_shards;
let validators = (0..num_validators)
.map(|i| {
let account_id = format!("node{}", i);
Expand All @@ -97,19 +102,20 @@ fn setup_initial_blockchain(
if generate_shard_accounts { Some(generate_accounts(&boundary_accounts)) } else { None };

let epoch_length = 10;
let genesis_height = 10000;
let shard_layout =
ShardLayout::simple_v1(&boundary_accounts.iter().map(|s| s.as_str()).collect::<Vec<_>>());
let validators_spec = ValidatorsSpec::raw(
validators,
num_block_producer_seats as NumSeats,
num_chunk_producer_seats as NumSeats,
0,
num_validators as NumSeats,
);

let mut genesis_builder = TestGenesisBuilder::new()
.genesis_time_from_clock(&builder.clock())
.protocol_version(PROTOCOL_VERSION)
.genesis_height(10000)
.genesis_height(genesis_height)
.epoch_length(epoch_length)
.shard_layout(shard_layout.clone())
.transaction_validity_period(1000)
Expand All @@ -136,14 +142,30 @@ fn setup_initial_blockchain(
let epoch_config_store =
EpochConfigStore::test(BTreeMap::from([(PROTOCOL_VERSION, Arc::new(epoch_config))]));

let skip_sync_block_height = if skip_sync_block {
// It would probably be better not to rely on this height calculation, since that makes
// some assumptions about the state sync protocol that ideally tests wouldn't make. In the future
// it would be nice to modify `drop_blocks_by_height()` to allow for more complex logic to decide
// whether to drop the block, and be more robust to state sync protocol changes. But for now this
// will trigger the behavior we want and it's quite a bit easier.
let sync_height = if ProtocolFeature::CurrentEpochStateSync.enabled(PROTOCOL_VERSION) {
genesis_height + epoch_length + 4
} else {
genesis_height + epoch_length + 1
};
builder = builder.drop_blocks_by_height([sync_height].into_iter().collect());
Some(sync_height)
} else {
None
};
let env = builder
.genesis(genesis)
.epoch_config_store(epoch_config_store)
.clients(clients)
.drop_chunks_by_height(chunks_produced)
.build();

TestState { env, accounts }
TestState { env, accounts, skip_sync_block_height }
}

fn get_wrapped<T>(s: &[T], idx: usize) -> &T {
Expand Down Expand Up @@ -213,16 +235,34 @@ fn send_txs_between_shards(
}
}

// Check that no block with height `skip_sync_block_height` made it on the canonical chain, so we're testing
// what we think we should be.
fn assert_fork_happened(env: &TestLoopEnv, skip_sync_block_height: BlockHeight) {
let handle = env.datas[0].client_sender.actor_handle();
let client = &env.test_loop.data.get(&handle).client;

// Here we assume the one before the skipped block will exist, since it's easier that way and it should
// be true in this test.
let prev_hash = client.chain.get_block_hash_by_height(skip_sync_block_height - 1).unwrap();
let next_hash = client.chain.chain_store.get_next_block_hash(&prev_hash).unwrap();
let header = client.chain.get_block_header(&next_hash).unwrap();
assert!(header.height() > skip_sync_block_height);
}

/// runs the network and sends transactions at the beginning of each epoch. At the end the condition we're
/// looking for is just that a few epochs have passed, because that should only be possible if state sync was successful
/// (which will be required because we enable chunk producer shard shuffling on this chain)
fn produce_chunks(env: &mut TestLoopEnv, mut accounts: Option<Vec<Vec<(AccountId, Nonce)>>>) {
fn produce_chunks(
env: &mut TestLoopEnv,
mut accounts: Option<Vec<Vec<(AccountId, Nonce)>>>,
skip_sync_block_height: Option<BlockHeight>,
) {
let handle = env.datas[0].client_sender.actor_handle();
let client = &env.test_loop.data.get(&handle).client;
let mut tip = client.chain.head().unwrap();
// TODO: make this more precise. We don't have to wait 2 whole seconds, but the amount we wait will
// depend on whether this block is meant to have skipped chunks.
let timeout = client.config.min_block_production_delay + Duration::seconds(2);
// TODO: make this more precise. We don't have to wait 20 whole seconds, but the amount we wait will
// depend on whether this block is meant to have skipped chunks or whether we're generating skipped blocks.
let timeout = client.config.min_block_production_delay + Duration::seconds(20);

let mut epoch_id_switches = 0;
loop {
Expand Down Expand Up @@ -252,10 +292,14 @@ fn produce_chunks(env: &mut TestLoopEnv, mut accounts: Option<Vec<Vec<(AccountId
}
tip = new_tip;
}

if let Some(skip_sync_block_height) = skip_sync_block_height {
assert_fork_happened(env, skip_sync_block_height);
}
}

fn run_test(state: TestState) {
let TestState { mut env, mut accounts } = state;
let TestState { mut env, mut accounts, skip_sync_block_height } = state;
let handle = env.datas[0].client_sender.actor_handle();
let client = &env.test_loop.data.get(&handle).client;
let first_epoch_time = client.config.min_block_production_delay
Expand All @@ -276,46 +320,60 @@ fn run_test(state: TestState) {
first_epoch_time,
);

produce_chunks(&mut env, accounts);
produce_chunks(&mut env, accounts, skip_sync_block_height);
env.shutdown_and_drain_remaining_events(Duration::seconds(3));
}

#[derive(Debug)]
struct StateSyncTest {
num_validators: usize,
num_block_producer_seats: usize,
num_chunk_producer_seats: usize,
num_shards: usize,
// If true, generate several extra accounts per shard. We have a test with this disabled
// to test state syncing shards without any account data
generate_shard_accounts: bool,
chunks_produced: &'static [(ShardId, &'static [bool])],
skip_sync_block: bool,
}

static TEST_CASES: &[StateSyncTest] = &[
// The first two make no modifications to chunks_produced, and all chunks should be produced. This is the normal case
StateSyncTest {
num_validators: 2,
num_block_producer_seats: 2,
num_chunk_producer_seats: 2,
num_shards: 2,
generate_shard_accounts: true,
chunks_produced: &[],
skip_sync_block: false,
},
StateSyncTest {
num_validators: 4,
num_validators: 5,
num_block_producer_seats: 4,
num_chunk_producer_seats: 4,
num_shards: 4,
generate_shard_accounts: true,
chunks_produced: &[],
skip_sync_block: false,
},
// In this test we have 2 validators and 4 shards, and we don't generate any extra accounts.
// That makes 3 accounts ncluding the "near" account. This means at least one shard will have no
// accounts in it, so we check that corner case here.
StateSyncTest {
num_validators: 2,
num_block_producer_seats: 2,
num_chunk_producer_seats: 2,
num_shards: 4,
generate_shard_accounts: false,
chunks_produced: &[],
skip_sync_block: false,
},
// Now we miss some chunks at the beginning of the epoch
StateSyncTest {
num_validators: 4,
num_validators: 5,
num_block_producer_seats: 4,
num_chunk_producer_seats: 4,
num_shards: 4,
generate_shard_accounts: true,
chunks_produced: &[
Expand All @@ -324,43 +382,90 @@ static TEST_CASES: &[StateSyncTest] = &[
(ShardId::new(2), &[true]),
(ShardId::new(3), &[true]),
],
skip_sync_block: false,
},
StateSyncTest {
num_validators: 4,
num_validators: 5,
num_block_producer_seats: 4,
num_chunk_producer_seats: 4,
num_shards: 4,
generate_shard_accounts: true,
chunks_produced: &[(ShardId::new(0), &[true, false]), (ShardId::new(1), &[true, false])],
skip_sync_block: false,
},
StateSyncTest {
num_validators: 4,
num_validators: 5,
num_block_producer_seats: 4,
num_chunk_producer_seats: 4,
num_shards: 4,
generate_shard_accounts: true,
chunks_produced: &[
(ShardId::new(0), &[false, true]),
(ShardId::new(2), &[true, false, true]),
],
skip_sync_block: false,
},
];

#[test]
fn slow_test_state_sync_current_epoch() {
init_test_logger();

// TODO: make these separate #[test]s, because looping over them like this makes
// us wait for each one in succession instead of letting cargo test run them in parallel
for t in TEST_CASES.iter() {
tracing::info!("run test: {:?}", t);
let state = setup_initial_blockchain(
t.num_validators,
t.num_block_producer_seats,
t.num_chunk_producer_seats,
t.num_shards,
t.generate_shard_accounts,
t.chunks_produced
.iter()
.map(|(shard_id, produced)| (*shard_id, produced.to_vec()))
.collect(),
t.skip_sync_block,
);
run_test(state);
}
}

// Here we drop the block that's supposed to be the sync hash after the first full epoch,
// which causes it to be produced but then skipped on the final chain. If the state sync code
// is unaware of the possibility of forks, this will cause the producer of that block to
// believe that that block should be the sync hash block, while all other nodes will
// believe it should be the next block.
#[test]
#[ignore]
fn test_state_sync_forks() {
init_test_logger();

let params = StateSyncTest {
num_validators: 5,
num_block_producer_seats: 4,
num_chunk_producer_seats: 4,
num_shards: 5,
generate_shard_accounts: true,
chunks_produced: &[],
skip_sync_block: true,
};
let state = setup_initial_blockchain(
params.num_validators,
params.num_block_producer_seats,
params.num_chunk_producer_seats,
params.num_shards,
params.generate_shard_accounts,
params
.chunks_produced
.iter()
.map(|(shard_id, produced)| (*shard_id, produced.to_vec()))
.collect(),
params.skip_sync_block,
);
run_test(state);
}

fn await_sync_hash(env: &mut TestLoopEnv) -> CryptoHash {
env.test_loop.run_until(
|data| {
Expand Down Expand Up @@ -410,7 +515,8 @@ fn spam_state_sync_header_reqs(env: &mut TestLoopEnv) {
fn slow_test_state_request() {
init_test_logger();

let TestState { mut env, .. } = setup_initial_blockchain(4, 4, false, HashMap::default());
let TestState { mut env, .. } =
setup_initial_blockchain(4, 4, 4, 4, false, HashMap::default(), false);

spam_state_sync_header_reqs(&mut env);
env.shutdown_and_drain_remaining_events(Duration::seconds(3));
Expand Down
Loading

0 comments on commit 3564f4b

Please sign in to comment.