Skip to content

Commit

Permalink
mock-node: simplify and remove client setup code (#12838)
Browse files Browse the repository at this point in the history
Before this change, the mock-node tool starts one mock node, and one
normal neard node with `nearcore::start_with_config()`, and then waits
for the node to sync. It also modifies and sets up a home dir for this
client if `--client-height` is given on the command line.

However, nobody was really using this `--client-height` feature much.
Since we can get good test/benchmark results from just using a regular
neard home directory taken from mainnet or localnet testing, we don't
really need any code that manually sets up the database state.

Then if we get rid of that, the question arises: why does this code even
start a node with `nearcore::start_with_config()`? There's not a great
reason to do it in the same binary, and doing so restricts the use of
this tool, since we can't use it to test a standard neard binary, but
must build a custom `mock-node` binary that's not very intuitive unless
you've looked at the code.

So here we delete all the client set up code and just have the node
start a mock node listening on whatever addr is specified in the near
config. Then to use it to benchmark the sync performance of a node, you
just need to set the right boot nodes argument (and possibly also
manually wipe the peer store), and you can use a normal neard binary for
tests
  • Loading branch information
marcelo-gonzalez authored Jan 30, 2025
1 parent 54b225f commit 5d9d184
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 498 deletions.
115 changes: 0 additions & 115 deletions chain/chain/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1957,121 +1957,6 @@ impl<'a> ChainStoreUpdate<'a> {
Ok(())
}

/// Only used in mock network
/// Create a new ChainStoreUpdate that copies the necessary chain state related to `block_hash`
/// from `source_store` to the current store.
pub fn copy_chain_state_as_of_block(
chain_store: &'a mut ChainStore,
block_hash: &CryptoHash,
source_epoch_manager: &dyn EpochManagerAdapter,
source_store: &ChainStore,
) -> Result<ChainStoreUpdate<'a>, Error> {
let mut chain_store_update = ChainStoreUpdate::new(chain_store);
let block = source_store.get_block(block_hash)?;
let header = block.header().clone();
let height = header.height();
let tip = Tip {
height,
last_block_hash: *block_hash,
prev_block_hash: *header.prev_hash(),
epoch_id: *header.epoch_id(),
next_epoch_id: *header.next_epoch_id(),
};
chain_store_update.head = Some(tip.clone());
chain_store_update.tail = Some(height);
chain_store_update.chunk_tail = Some(height);
chain_store_update.fork_tail = Some(height);
chain_store_update.header_head = Some(tip.clone());
chain_store_update.final_head = Some(tip);
chain_store_update.chain_store_cache_update.blocks.insert(*block_hash, block.clone());
chain_store_update.chain_store_cache_update.headers.insert(*block_hash, header.clone());
// store all headers until header.last_final_block
// needed to light client
let mut prev_hash = *header.prev_hash();
let last_final_hash = header.last_final_block();
loop {
let header = source_store.get_block_header(&prev_hash)?;
chain_store_update.chain_store_cache_update.headers.insert(prev_hash, header.clone());
if &prev_hash == last_final_hash {
break;
} else {
chain_store_update
.chain_store_cache_update
.next_block_hashes
.insert(*header.prev_hash(), prev_hash);
prev_hash = *header.prev_hash();
}
}
chain_store_update
.chain_store_cache_update
.block_extras
.insert(*block_hash, source_store.get_block_extra(block_hash)?);
let shard_layout = source_epoch_manager.get_shard_layout(&header.epoch_id())?;
for shard_uid in shard_layout.shard_uids() {
chain_store_update.chain_store_cache_update.chunk_extras.insert(
(*block_hash, shard_uid),
source_store.get_chunk_extra(block_hash, &shard_uid)?.clone(),
);
}
for (shard_index, chunk_header) in block.chunks().iter_deprecated().enumerate() {
let shard_id = shard_layout.get_shard_id(shard_index)?;
let chunk_hash = chunk_header.chunk_hash();
chain_store_update
.chain_store_cache_update
.chunks
.insert(chunk_hash.clone(), source_store.get_chunk(&chunk_hash)?.clone());
chain_store_update.chain_store_cache_update.outgoing_receipts.insert(
(*block_hash, shard_id),
source_store.get_outgoing_receipts(block_hash, shard_id)?.clone(),
);
chain_store_update.chain_store_cache_update.incoming_receipts.insert(
(*block_hash, shard_id),
source_store.get_incoming_receipts(block_hash, shard_id)?.clone(),
);
let outcome_ids =
source_store.get_outcomes_by_block_hash_and_shard_id(block_hash, shard_id)?;
for id in outcome_ids.iter() {
if let Some(existing_outcome) =
source_store.get_outcome_by_id_and_block_hash(id, block_hash)?
{
chain_store_update
.chain_store_cache_update
.outcomes
.insert((*id, *block_hash), existing_outcome);
}
}
chain_store_update
.chain_store_cache_update
.outcome_ids
.insert((*block_hash, shard_id), outcome_ids);
}
chain_store_update
.chain_store_cache_update
.height_to_hashes
.insert(height, Some(*block_hash));
chain_store_update
.chain_store_cache_update
.next_block_hashes
.insert(*header.prev_hash(), *block_hash);
let block_merkle_tree = source_store.get_block_merkle_tree(block_hash)?;
chain_store_update
.chain_store_cache_update
.block_merkle_tree
.insert(*block_hash, block_merkle_tree.clone());
chain_store_update
.chain_store_cache_update
.block_ordinal_to_hash
.insert(block_merkle_tree.size(), *block_hash);
chain_store_update.chain_store_cache_update.processed_block_heights.insert(height);

// other information not directly related to this block
chain_store_update.chain_store_cache_update.height_to_hashes.insert(
source_store.get_genesis_height(),
Some(source_store.get_block_hash_by_height(source_store.get_genesis_height())?),
);
Ok(chain_store_update)
}

#[tracing::instrument(level = "debug", target = "store", "ChainUpdate::finalize", skip_all)]
fn finalize(&mut self) -> Result<StoreUpdate, Error> {
let mut store_update = self.store().store_update();
Expand Down
1 change: 1 addition & 0 deletions tools/mock-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,7 @@ impl MockPeer {
// listen on the addr passed to MockPeer::new() and wait til someone connects.
// Then respond to messages indefinitely until an error occurs
async fn run(mut self, target_height: BlockHeight) -> anyhow::Result<()> {
// TODO: should just keep accepting incoming conns
let mut conn = self.listener.accept().await?;
let messages = InFlightMessages::new(self.network_config.response_delay);
tokio::pin!(messages);
Expand Down
174 changes: 20 additions & 154 deletions tools/mock-node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,127 +3,45 @@
//! network, responding to the client's network requests by reading from a
//! pre-generated chain history in storage.
use actix::System;
use anyhow::Context;
use mock_node::setup::{setup_mock_node, MockNode};
use mock_node::setup::setup_mock_node;
use mock_node::MockNetworkConfig;
use near_actix_test_utils::run_actix;
use near_chain_configs::{GenesisValidationMode, MutableConfigValue};
use near_crypto::{InMemorySigner, KeyType};
use near_jsonrpc_client::JsonRpcClient;
use near_network::tcp;
use near_actix_test_utils::{block_on_interruptible, setup_actix};
use near_o11y::testonly::init_integration_logger;
use near_primitives::types::BlockHeight;
use std::net::SocketAddr;
use std::path::{Path, PathBuf};
use std::time::{Duration, Instant};
use std::path::Path;
use std::time::Duration;

/// Program to start a mock node, which runs a regular client in a mock network environment.
/// The mock network simulates the entire network by replaying a pre-generated chain history
/// from storage and responds to the client's network requests.
/// Program to start a mock node, which starts a TCP server and accepts incoming
/// connections from NEAR nodes. Once connected, it will respond to block and chunk
/// requests, but not do anything else unless periodic outgoing messages are
/// are specified in $home/mock.json.
///
/// There are two ways to replay the stored history:
/// * catchup: client is behind the network and applies the blocks as fast as possible
/// * normal block production: client accept "new" blocks as they are produced
/// (in reality, blocks are just fetched from the pre-generated store).
///
/// This is controlled by two flags:
/// * `--client-height` specifies the height the client starts at. Defaults to 0.
/// * `--network-height` specifies the hight the rest of the (simulated)
/// network starts at. Defaults to the latest recorded height.
///
/// As a shortcut, `--start-height` sets both.
///
///
/// Examples
///
/// ```console
/// # Pure catchup from genesis height to the end of the recorded history.
/// $ mock-node ~/.near/localnet/node0
///
/// # Pure block production starting from block height 61.
/// $ mock-node ~/.near/localnet/node0 --start-height 61
///
/// # Mixed: client starts at genesis and tries to catch up with the network, which starts at height 20.
/// $ mock-node ~/.near/localnet/node0 --network-height 20
/// ```
#[derive(clap::Parser)]
struct Cli {
/// Existing home dir for the pre-generated chain history. For example, you can use
/// the home dir of a near node.
chain_history_home_dir: String,
/// Home dir for the new client that will be started. If not specified, the binary will
/// generate a temporary directory
client_home_dir: Option<PathBuf>,
/// Simulated network delay (in ms)
#[clap(long)]
home: String,
/// If set, the mock node will wait this many millis before sending messages
#[clap(short = 'd', long)]
network_delay: Option<u64>,
/// If specified, the binary will set up client home dir before starting the
/// client node so head of the client chain will be the specified height
/// when the client starts. The given height must be the last block in an
/// epoch.
#[clap(long, default_value = "0")]
client_height: BlockHeight,
/// The height at which the mock network starts. The client would have to
/// catch up to this height before participating in new block production.
///
/// Defaults to the largest height in history.
#[clap(long)]
network_height: Option<BlockHeight>,
/// Shortcut to set both `--client-height` and `--network-height`.
#[clap(long, conflicts_with_all(&["client-height", "network-height"]))]
start_height: Option<BlockHeight>,
/// Target height that the client should sync to before stopping. If not specified,
/// use the height of the last block in chain history
#[clap(long)]
target_height: Option<BlockHeight>,
/// If true, use in memory storage instead of rocksdb for the client
#[clap(short = 'i', long)]
in_memory_storage: bool,
/// port the mock node should listen on
#[clap(long)]
mock_port: Option<u16>,
}

async fn target_height_reached(client: &JsonRpcClient, target_height: BlockHeight) -> bool {
let t = Instant::now();
let status = client.status().await;
let latency = t.elapsed();
if latency > Duration::from_millis(100) {
tracing::warn!(
target: "mock_node", latency = %format_args!("{latency:0.2?}"),
"client is unresponsive, took too long to handle status request"
);
}
match status {
Ok(status) => status.sync_info.latest_block_height >= target_height,
Err(_) => false,
}
}

fn main() -> anyhow::Result<()> {
init_integration_logger();
let args: Cli = clap::Parser::parse();
let home_dir = Path::new(&args.chain_history_home_dir);
let mut near_config = nearcore::config::load_config(home_dir, GenesisValidationMode::Full)
.context("Error loading config")?;
near_config.validator_signer = MutableConfigValue::new(None, "validator_signer");
near_config.client_config.min_num_peers = 1;
let signer = InMemorySigner::from_random("mock_node".parse().unwrap(), KeyType::ED25519);
near_config.network_config.node_key = signer.secret_key;
near_config.client_config.tracked_shards =
near_config.genesis.config.shard_layout.shard_ids().collect();
if near_config.rpc_config.is_none() {
near_config.rpc_config = Some(near_jsonrpc::RpcConfig::default());
}
let tempdir;
let client_home_dir = match &args.client_home_dir {
Some(it) => it.as_path(),
None => {
tempdir = tempfile::Builder::new().prefix("mock_node").tempdir().unwrap();
tempdir.path()
}
};
let home_dir = Path::new(&args.home);

let mock_config_path = home_dir.join("mock.json");
let mut network_config = if mock_config_path.exists() {
Expand All @@ -137,66 +55,14 @@ fn main() -> anyhow::Result<()> {
network_config.response_delay = Duration::from_millis(delay);
}

let client_height = args.start_height.unwrap_or(args.client_height);
let network_height = args.start_height.or(args.network_height);
let addr = tcp::ListenerAddr::new(SocketAddr::new(
"127.0.0.1".parse().unwrap(),
args.mock_port.unwrap_or(24566),
));
let sys = setup_actix();
let res = block_on_interruptible(&sys, async move {
let mock_peer =
setup_mock_node(home_dir, network_config, args.network_height, args.target_height)
.context("failed setting up mock node")?;

run_actix(async move {
let MockNode { target_height, mut mock_peer, rpc_client } = setup_mock_node(
Path::new(&client_home_dir),
home_dir,
near_config,
&network_config,
client_height,
network_height,
args.target_height,
args.in_memory_storage,
addr,
);

// TODO: would be nice to be able to somehow quit right after the target block
// is applied rather than polling like this
let mut interval = tokio::time::interval(Duration::from_millis(100));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

let start = Instant::now();
// Let's set the timeout to 5 seconds per block - just in case we test on very full blocks.
let timeout = target_height * 5;
let timeout = u32::try_from(timeout).unwrap_or(u32::MAX) * Duration::from_secs(1);

loop {
if start.elapsed() > timeout {
tracing::error!(
"node still hasn't made it to #{} after {:?}",
target_height,
timeout
);
mock_peer.abort();
break;
}
tokio::select! {
_ = interval.tick() => {
if target_height_reached(&rpc_client, target_height).await {
tracing::info!("node reached target height");
mock_peer.abort();
break;
}
}
result = &mut mock_peer => {
match result {
Ok(Ok(_)) => tracing::info!("mock peer exited"),
Ok(Err(e)) => tracing::error!("mock peer exited with error: {:?}", e),
Err(e) => tracing::error!("failed running mock peer task: {:?}", e),
};
break;
}
}
}

System::current().stop();
mock_peer.await.context("failed running mock peer task")?.context("mock peer failed")
});
Ok(())

res
}
Loading

0 comments on commit 5d9d184

Please sign in to comment.