Skip to content

Commit

Permalink
feat(resharding): simple nayduck test for resharding (#10162)
Browse files Browse the repository at this point in the history
- Added a simple nayduck test for resharding. It checks that the shard
layout version and number of shards change at the right block.
- Added configuration for the delay while waiting for the state
snapshot. Needed it, otherwise the test was all over the place.
- Refactored some common resharding test logic in nayduck.
  • Loading branch information
wacban authored Nov 14, 2023
1 parent b80f5b5 commit 5b3e69a
Show file tree
Hide file tree
Showing 8 changed files with 272 additions and 79 deletions.
2 changes: 2 additions & 0 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2146,6 +2146,8 @@ impl Chain {
"start_process_block_impl",
height = block_height)
.entered();

tracing::debug!(target: "chain", "start process block");
// 0) Before we proceed with any further processing, we first check that the block
// hash and signature matches to make sure the block is indeed produced by the assigned
// block producer. If not, we drop the block immediately
Expand Down
6 changes: 0 additions & 6 deletions chain/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2386,10 +2386,6 @@ impl Client {
let state_sync_timeout = self.config.state_sync_timeout;
let epoch_id = self.chain.get_block(&sync_hash)?.header().epoch_id().clone();

// TODO(resharding) what happens to the shards_to_split here when
// catchup_state_syncs already contains an entry for the sync hash?
// Does it get overwritten? Are we guaranteed that the existing
// entry contains the same data?
let (state_sync, shards_to_split, blocks_catch_up_state) =
self.catchup_state_syncs.entry(sync_hash).or_insert_with(|| {
notify_state_sync = true;
Expand Down Expand Up @@ -2516,8 +2512,6 @@ impl Client {
.iter()
.filter_map(|ShardInfo(shard_id, _)| self.should_split_shard(shard_id, me, prev_hash))
.collect();
// For colour decorators to work, they need to printed directly. Otherwise the decorators get escaped, garble output and don't add colours.
debug!(target: "catchup", progress_per_shard = ?format_shard_sync_phase_per_shard(&shards_to_split, false), "Need to split states for shards");
Ok(shards_to_split)
}

Expand Down
8 changes: 3 additions & 5 deletions chain/client/src/sync_jobs_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@ use near_primitives::state_part::PartId;
use near_primitives::state_sync::StatePartKey;
use near_primitives::types::ShardId;
use near_store::DBCol;
use std::time::Duration;

const RESHARDING_RETRY_TIME: Duration = Duration::from_secs(30);

pub(crate) struct SyncJobsActor {
pub(crate) client_addr: actix::Addr<ClientActor>,
Expand Down Expand Up @@ -163,8 +160,9 @@ impl actix::Handler<WithSpanContext<StateSplitRequest>> for SyncJobsActor {
// Actix implementation let's us send message to ourselves with a delay.
// In case snapshots are not ready yet, we will retry resharding later.
tracing::debug!(target: "client", ?state_split_request, "Snapshot missing, retrying resharding later");
state_split_request.curr_poll_time += RESHARDING_RETRY_TIME;
context.notify_later(state_split_request.with_span_context(), RESHARDING_RETRY_TIME);
let retry_delay = state_split_request.config.retry_delay;
state_split_request.curr_poll_time += retry_delay;
context.notify_later(state_split_request.with_span_context(), retry_delay);
} else {
tracing::debug!(target: "client", ?state_split_request, "Starting resharding");
let response = Chain::build_state_for_split_shards(state_split_request);
Expand Down
11 changes: 10 additions & 1 deletion core/chain-configs/src/client_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,17 +168,26 @@ pub struct StateSplitConfig {
/// decreased if resharding is consuming too many resources and interfering
/// with regular node operation.
pub batch_size: bytesize::ByteSize,

/// The delay between writing batches to the db. The batch delay can be
/// increased if resharding is consuming too many resources and interfering
/// with regular node operation.
pub batch_delay: Duration,

/// The delay between attempts to start resharding while waiting for the
/// state snapshot to become available.
pub retry_delay: Duration,
}

impl Default for StateSplitConfig {
fn default() -> Self {
// Conservative default for a slower resharding that puts as little
// extra load on the node as possible.
Self { batch_size: bytesize::ByteSize::kb(500), batch_delay: Duration::from_millis(100) }
Self {
batch_size: bytesize::ByteSize::kb(500),
batch_delay: Duration::from_millis(100),
retry_delay: Duration::from_secs(10),
}
}
}

Expand Down
4 changes: 4 additions & 0 deletions nightly/pytest-sanity.txt
Original file line number Diff line number Diff line change
Expand Up @@ -146,3 +146,7 @@ pytest sanity/meta_tx.py --features nightly
# Tests for split storage and split storage migration
pytest --timeout=600 sanity/split_storage.py
pytest --timeout=600 sanity/split_storage.py --features nightly

# Test for resharding
pytest --timeout=120 sanity/resharding.py
pytest --timeout=120 sanity/resharding.py --features nightly
115 changes: 115 additions & 0 deletions pytest/lib/resharding_lib.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
# A library with the common constants and functions for testing resharding.

V1_PROTOCOL_VERSION = 48
V2_PROTOCOL_VERSION = 135

V0_SHARD_LAYOUT = {
"V0": {
"num_shards": 1,
"version": 0
},
}
V1_SHARD_LAYOUT = {
"V1": {
"boundary_accounts": [
"aurora", "aurora-0", "kkuuue2akv_1630967379.near"
],
"shards_split_map": [[0, 1, 2, 3]],
"to_parent_shard_map": [0, 0, 0, 0],
"version": 1
}
}


# Append the genesis config changes that are required for testing resharding.
# This method will set the protocol version, shard layout and a few other
# configs so that it matches the protocol configuration as of right before the
# protocol version of the binary under test.
def append_shard_layout_config_changes(
genesis_config_changes,
binary_protocol_version,
logger=None,
):
genesis_config_changes.append(["use_production_config", True])

if binary_protocol_version >= V2_PROTOCOL_VERSION:
if logger:
logger.info("Testing migration from V1 to V2.")
# Set the initial protocol version to a version just before V2.
genesis_config_changes.append([
"protocol_version",
V2_PROTOCOL_VERSION - 1,
])
genesis_config_changes.append([
"shard_layout",
V1_SHARD_LAYOUT,
])
genesis_config_changes.append([
"num_block_producer_seats_per_shard",
[1, 1, 1, 1],
])
genesis_config_changes.append([
"avg_hidden_validator_seats_per_shard",
[0, 0, 0, 0],
])
return

if binary_protocol_version >= V1_PROTOCOL_VERSION:
if logger:
logger.info("Testing migration from V0 to V1.")
# Set the initial protocol version to a version just before V1.
genesis_config_changes.append([
"protocol_version",
V1_PROTOCOL_VERSION - 1,
])
genesis_config_changes.append([
"shard_layout",
V0_SHARD_LAYOUT,
])
genesis_config_changes.append([
"num_block_producer_seats_per_shard",
[100],
])
genesis_config_changes.append([
"avg_hidden_validator_seats_per_shard",
[0],
])
return

assert False


def get_genesis_shard_layout_version(binary_protocol_version):
if binary_protocol_version >= V2_PROTOCOL_VERSION:
return 1
if binary_protocol_version >= V1_PROTOCOL_VERSION:
return 0

assert False


def get_target_shard_layout_version(binary_protocol_version):
if binary_protocol_version >= V2_PROTOCOL_VERSION:
return 2
if binary_protocol_version >= V1_PROTOCOL_VERSION:
return 1

assert False


def get_genesis_num_shards(binary_protocol_version):
if binary_protocol_version >= V2_PROTOCOL_VERSION:
return 4
if binary_protocol_version >= V1_PROTOCOL_VERSION:
return 1

assert False


def get_target_num_shards(binary_protocol_version):
if binary_protocol_version >= V2_PROTOCOL_VERSION:
return 5
if binary_protocol_version >= V1_PROTOCOL_VERSION:
return 4

assert False
134 changes: 134 additions & 0 deletions pytest/tests/sanity/resharding.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
#!/usr/bin/env python3

# Small test for resharding. Spins up a few nodes from genesis with the previous
# shard layout, waits for a few epochs and verifies that the shard layout is
# upgraded.
# Usage:
# python3 pytest/tests/sanity/resharding.py

import unittest
import sys
import pathlib

sys.path.append(str(pathlib.Path(__file__).resolve().parents[2] / 'lib'))

from configured_logger import logger
from cluster import get_binary_protocol_version, init_cluster, load_config, spin_up_node
from utils import MetricsTracker, poll_blocks
from resharding_lib import append_shard_layout_config_changes, get_genesis_num_shards, get_genesis_shard_layout_version, get_target_num_shards, get_target_shard_layout_version


class ReshardingTest(unittest.TestCase):

def setUp(self) -> None:
self.epoch_length = 5
self.config = load_config()
self.binary_protocol_version = get_binary_protocol_version(self.config)
assert self.binary_protocol_version is not None

self.genesis_shard_layout_version = get_genesis_shard_layout_version(
self.binary_protocol_version)
self.target_shard_layout_version = get_target_shard_layout_version(
self.binary_protocol_version)

self.genesis_num_shards = get_genesis_num_shards(
self.binary_protocol_version)
self.target_num_shards = get_target_num_shards(
self.binary_protocol_version)

def __get_genesis_config_changes(self):
genesis_config_changes = [
["epoch_length", self.epoch_length],
]

append_shard_layout_config_changes(
genesis_config_changes,
self.binary_protocol_version,
logger,
)

return genesis_config_changes

def __get_client_config_changes(self, num_nodes):
single = {
"tracked_shards": [0],
"state_split_config": {
"batch_size": 1000000,
# don't throttle resharding
"batch_delay": {
"secs": 0,
"nanos": 0,
},
# retry often to start resharding as fast as possible
"retry_delay": {
"secs": 0,
"nanos": 500_000_000
}
}
}
return {i: single for i in range(num_nodes)}

def test_resharding(self):
logger.info("The resharding test is starting.")
num_nodes = 2

genesis_config_changes = self.__get_genesis_config_changes()
client_config_changes = self.__get_client_config_changes(num_nodes)

near_root, [node0_dir, node1_dir] = init_cluster(
num_nodes=num_nodes,
num_observers=0,
num_shards=1,
config=self.config,
genesis_config_changes=genesis_config_changes,
client_config_changes=client_config_changes,
)

node0 = spin_up_node(
self.config,
near_root,
node0_dir,
0,
)
node1 = spin_up_node(
self.config,
near_root,
node1_dir,
1,
boot_node=node0,
)

metrics_tracker = MetricsTracker(node0)

for height, _ in poll_blocks(node0):
version = metrics_tracker.get_int_metric_value(
"near_shard_layout_version")
num_shards = metrics_tracker.get_int_metric_value(
"near_shard_layout_num_shards")

logger.info(
f"#{height} shard layout version: {version}, num shards: {num_shards} "
)

# This may be flaky - it shouldn't - but it may. We collect metrics
# after the block is processed. If there is some delay the shard
# layout may change and the assertions below will fail.

if height <= 2 * self.epoch_length + 1:
self.assertEqual(version, self.genesis_shard_layout_version)
self.assertEqual(num_shards, self.genesis_num_shards)
else:
self.assertEqual(version, self.target_shard_layout_version)
self.assertEqual(num_shards, self.target_num_shards)

if height >= 4 * self.epoch_length:
break

node0.kill()
node1.kill()

logger.info("The resharding test is finished.")


if __name__ == '__main__':
unittest.main()
Loading

0 comments on commit 5b3e69a

Please sign in to comment.