Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(tests): fix state sync test errors #12150

Merged
merged 5 commits into from
Oct 4, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 67 additions & 28 deletions integration-tests/src/tests/client/sync_state_nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,16 @@ fn sync_state_nodes() {
let mut near1 = load_test_config("test1", port1, genesis.clone());
near1.network_config.peer_store.boot_nodes = convert_boot_nodes(vec![]);
near1.client_config.min_num_peers = 0;

// In this test and the ones below, we have an Arc<TempDir>, that we make sure to keep alive by cloning it
// and keeping the original one around after we pass the clone to run_actix(). Otherwise it will be dropped early
// and the directories will actually be removed while the nodes are running.
let _dir1 = Arc::new(tempfile::Builder::new().prefix("sync_nodes_1").tempdir().unwrap());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be helpful to leave a comment here explaining the observed issues with directories being cleaned up too soon.

let dir1 = _dir1.clone();
let _dir2 = Arc::new(tempfile::Builder::new().prefix("sync_nodes_2").tempdir().unwrap());
let dir2 = _dir2.clone();

run_actix(async move {
let dir1 = tempfile::Builder::new().prefix("sync_nodes_1").tempdir().unwrap();
let nearcore::NearNode { view_client: view_client1, .. } =
start_with_config(dir1.path(), near1).expect("start_with_config");

Expand All @@ -61,6 +69,7 @@ fn sync_state_nodes() {
let view_client2_holder2 = view_client2_holder.clone();
let arbiters_holder2 = arbiters_holder2.clone();
let genesis2 = genesis.clone();
let dir2 = dir2.clone();

let actor = view_client1.send(GetBlock::latest().with_span_context());
let actor = actor.then(move |res| {
Expand All @@ -78,10 +87,6 @@ fn sync_state_nodes() {
near2.network_config.peer_store.boot_nodes =
convert_boot_nodes(vec![("test1", *port1)]);

let dir2 = tempfile::Builder::new()
.prefix("sync_nodes_2")
.tempdir()
.unwrap();
let nearcore::NearNode {
view_client: view_client2,
arbiters,
Expand Down Expand Up @@ -125,6 +130,8 @@ fn sync_state_nodes() {
)
.start();
});
drop(_dir1);
drop(_dir2);
});
}

Expand All @@ -148,6 +155,15 @@ fn sync_state_nodes_multishard() {
);
genesis.config.epoch_length = 150; // so that by the time test2 joins it is not kicked out yet

let _dir1 = Arc::new(tempfile::Builder::new().prefix("sync_nodes_1").tempdir().unwrap());
let dir1 = _dir1.clone();
let _dir2 = Arc::new(tempfile::Builder::new().prefix("sync_nodes_2").tempdir().unwrap());
let dir2 = _dir2.clone();
let _dir3 = Arc::new(tempfile::Builder::new().prefix("sync_nodes_3").tempdir().unwrap());
let dir3 = _dir3.clone();
let _dir4 = Arc::new(tempfile::Builder::new().prefix("sync_nodes_4").tempdir().unwrap());
let dir4 = _dir4.clone();

run_actix(async move {
let (port1, port2, port3, port4) = (
tcp::ListenerAddr::reserve_for_test(),
Expand Down Expand Up @@ -181,14 +197,10 @@ fn sync_state_nodes_multishard() {
near4.client_config.max_block_production_delay =
near1.client_config.max_block_production_delay;

let dir1 = tempfile::Builder::new().prefix("sync_nodes_1").tempdir().unwrap();
let nearcore::NearNode { view_client: view_client1, .. } =
start_with_config(dir1.path(), near1).expect("start_with_config");

let dir3 = tempfile::Builder::new().prefix("sync_nodes_3").tempdir().unwrap();
start_with_config(dir3.path(), near3).expect("start_with_config");

let dir4 = tempfile::Builder::new().prefix("sync_nodes_4").tempdir().unwrap();
start_with_config(dir4.path(), near4).expect("start_with_config");

let view_client2_holder = Arc::new(RwLock::new(None));
Expand All @@ -201,6 +213,7 @@ fn sync_state_nodes_multishard() {
let view_client2_holder2 = view_client2_holder.clone();
let arbiter_holder2 = arbiter_holder2.clone();
let genesis2 = genesis.clone();
let dir2 = dir2.clone();

let actor = view_client1.send(GetBlock::latest().with_span_context());
let actor = actor.then(move |res| {
Expand All @@ -225,10 +238,6 @@ fn sync_state_nodes_multishard() {
("test4", *port4),
]);

let dir2 = tempfile::Builder::new()
.prefix("sync_nodes_2")
.tempdir()
.unwrap();
let nearcore::NearNode {
view_client: view_client2,
arbiters,
Expand Down Expand Up @@ -280,6 +289,10 @@ fn sync_state_nodes_multishard() {
)
.start();
});
drop(_dir1);
drop(_dir2);
drop(_dir3);
drop(_dir4);
});
}

Expand All @@ -298,9 +311,15 @@ fn sync_empty_state() {
);
genesis.config.epoch_length = 20;

let _dir1 = Arc::new(tempfile::Builder::new().prefix("sync_nodes_1").tempdir().unwrap());
let dir1 = _dir1.clone();
let _dir2 = Arc::new(tempfile::Builder::new().prefix("sync_nodes_2").tempdir().unwrap());
let dir2 = _dir2.clone();

run_actix(async move {
let (port1, port2) =
(tcp::ListenerAddr::reserve_for_test(), tcp::ListenerAddr::reserve_for_test());

// State sync triggers when header head is two epochs in the future.
// Produce more blocks to make sure that state sync gets triggered when the second node starts.
let state_sync_horizon = 10;
Expand All @@ -312,10 +331,8 @@ fn sync_empty_state() {
near1.client_config.min_block_production_delay = Duration::milliseconds(200);
near1.client_config.max_block_production_delay = Duration::milliseconds(400);

let dir1 = tempfile::Builder::new().prefix("sync_nodes_1").tempdir().unwrap();
let nearcore::NearNode { view_client: view_client1, .. } =
start_with_config(dir1.path(), near1).expect("start_with_config");
let dir2 = Arc::new(tempfile::Builder::new().prefix("sync_nodes_2").tempdir().unwrap());

let view_client2_holder = Arc::new(RwLock::new(None));
let arbiters_holder = Arc::new(RwLock::new(vec![]));
Expand Down Expand Up @@ -403,6 +420,8 @@ fn sync_empty_state() {
)
.start();
});
drop(_dir1);
drop(_dir2);
});
}

Expand All @@ -426,6 +445,14 @@ fn sync_state_dump() {
// start, sync headers and find a dump of state.
genesis.config.epoch_length = 30;

let _dump_dir =
Arc::new(tempfile::Builder::new().prefix("state_dump_1").tempdir().unwrap());
let dump_dir = _dump_dir.clone();
let _dir1 = Arc::new(tempfile::Builder::new().prefix("sync_nodes_1").tempdir().unwrap());
let dir1 = _dir1.clone();
let _dir2 = Arc::new(tempfile::Builder::new().prefix("sync_nodes_2").tempdir().unwrap());
let dir2 = _dir2.clone();

run_actix(async move {
let (port1, port2) =
(tcp::ListenerAddr::reserve_for_test(), tcp::ListenerAddr::reserve_for_test());
Expand All @@ -440,7 +467,7 @@ fn sync_state_dump() {
near1.client_config.min_block_production_delay = Duration::milliseconds(300);
near1.client_config.max_block_production_delay = Duration::milliseconds(600);
near1.client_config.tracked_shards = vec![0]; // Track all shards.
let dump_dir = tempfile::Builder::new().prefix("state_dump_1").tempdir().unwrap();

near1.client_config.state_sync.dump = Some(DumpConfig {
location: Filesystem { root_dir: dump_dir.path().to_path_buf() },
restart_dump_for_shards: None,
Expand All @@ -449,14 +476,12 @@ fn sync_state_dump() {
});
near1.config.store.state_snapshot_enabled = true;

let dir1 = tempfile::Builder::new().prefix("sync_nodes_1").tempdir().unwrap();
let nearcore::NearNode {
view_client: view_client1,
// State sync dumper should be kept in the scope to avoid dropping it, which stops the state dumper loop.
state_sync_dumper: _dumper,
..
} = start_with_config(dir1.path(), near1).expect("start_with_config");
let dir2 = tempfile::Builder::new().prefix("sync_nodes_2").tempdir().unwrap();

let view_client2_holder = Arc::new(RwLock::new(None));
let arbiters_holder = Arc::new(RwLock::new(vec![]));
Expand Down Expand Up @@ -542,6 +567,9 @@ fn sync_state_dump() {
.unwrap();
System::current().stop();
});
drop(_dump_dir);
drop(_dir1);
drop(_dir2);
});
}

Expand Down Expand Up @@ -741,6 +769,10 @@ fn test_state_sync_headers() {
heavy_test(|| {
init_test_logger();

let _dir1 =
Arc::new(tempfile::Builder::new().prefix("test_state_sync_headers").tempdir().unwrap());
let dir1 = _dir1.clone();

run_actix(async {
let mut genesis = Genesis::test(vec!["test1".parse().unwrap()], 1);
// Increase epoch_length if the test is flaky.
Expand All @@ -750,8 +782,6 @@ fn test_state_sync_headers() {
load_test_config("test1", tcp::ListenerAddr::reserve_for_test(), genesis.clone());
near1.client_config.min_num_peers = 0;
near1.client_config.tracked_shards = vec![0]; // Track all shards.
let dir1 =
tempfile::Builder::new().prefix("test_state_sync_headers").tempdir().unwrap();
near1.config.store.state_snapshot_enabled = true;

let nearcore::NearNode { view_client: view_client1, .. } =
Expand Down Expand Up @@ -924,6 +954,7 @@ fn test_state_sync_headers() {
.unwrap();
System::current().stop();
});
drop(_dir1);
});
}

Expand All @@ -933,6 +964,20 @@ fn test_state_sync_headers_no_tracked_shards() {
heavy_test(|| {
init_test_logger();

let _dir1 = Arc::new(
tempfile::Builder::new()
.prefix("test_state_sync_headers_no_tracked_shards_1")
.tempdir()
.unwrap(),
);
let dir1 = _dir1.clone();
let _dir2 = Arc::new(
tempfile::Builder::new()
.prefix("test_state_sync_headers_no_tracked_shards_2")
.tempdir()
.unwrap(),
);
let dir2 = _dir2.clone();
run_actix(async {
let mut genesis = Genesis::test(vec!["test1".parse().unwrap()], 1);
// Increase epoch_length if the test is flaky.
Expand All @@ -943,10 +988,6 @@ fn test_state_sync_headers_no_tracked_shards() {
let mut near1 = load_test_config("test1", port1, genesis.clone());
near1.client_config.min_num_peers = 0;
near1.client_config.tracked_shards = vec![0]; // Track all shards, it is a validator.
let dir1 = tempfile::Builder::new()
.prefix("test_state_sync_headers_no_tracked_shards_1")
.tempdir()
.unwrap();
near1.config.store.state_snapshot_enabled = false;
near1.config.state_sync_enabled = false;
near1.client_config.state_sync_enabled = false;
Expand All @@ -959,10 +1000,6 @@ fn test_state_sync_headers_no_tracked_shards() {
convert_boot_nodes(vec![("test1", *port1)]);
near2.client_config.min_num_peers = 0;
near2.client_config.tracked_shards = vec![]; // Track no shards.
let dir2 = tempfile::Builder::new()
.prefix("test_state_sync_headers_no_tracked_shards_2")
.tempdir()
.unwrap();
near2.config.store.state_snapshot_enabled = true;
near2.config.state_sync_enabled = false;
near2.client_config.state_sync_enabled = false;
Expand Down Expand Up @@ -1082,5 +1119,7 @@ fn test_state_sync_headers_no_tracked_shards() {
.unwrap();
System::current().stop();
});
drop(_dir1);
drop(_dir2);
});
}
Loading