From c7c5f29d263e8bc589589a9fd7186d033399f478 Mon Sep 17 00:00:00 2001 From: Jingjia Luo Date: Wed, 17 May 2023 16:13:04 +0800 Subject: [PATCH 1/9] several tests added in recent years --- src/kvraft/tests.rs | 144 +++++++++++++++------ src/raft/tester.rs | 19 ++- src/raft/tests.rs | 307 +++++++++++++++++++++++++++++++++++++------- 3 files changed, 377 insertions(+), 93 deletions(-) diff --git a/src/kvraft/tests.rs b/src/kvraft/tests.rs index bd077cf..514b08e 100644 --- a/src/kvraft/tests.rs +++ b/src/kvraft/tests.rs @@ -3,7 +3,7 @@ use futures::{future, select, FutureExt}; use madsim::{ rand::{self, Rng, SliceRandom}, task, - time::{self, Duration}, + time::{self, Duration, Instant}, }; use std::sync::{ atomic::{AtomicBool, Ordering}, @@ -65,10 +65,12 @@ fn check_concurrent_appends(v: &str, counts: &[usize]) { async fn generic_test( part: &str, nclients: usize, + nservers: usize, unreliable: bool, crash: bool, partitions: bool, maxraftstate: Option, + randomkeys: bool, ) { let mut title = "Test: ".to_owned(); if unreliable { @@ -86,6 +88,9 @@ async fn generic_test( if maxraftstate.is_some() { title += "snapshots, "; } + if randomkeys { + title += "random keys, "; + } if nclients > 1 { title += "many clients"; } else { @@ -93,8 +98,7 @@ async fn generic_test( } info!("{} ({})", title, part); - const NSERVERS: usize = 5; - let t = Arc::new(Tester::new(NSERVERS, unreliable, maxraftstate).await); + let t = Arc::new(Tester::new(nservers, unreliable, maxraftstate).await); let ck = t.make_client(&t.all()); @@ -110,21 +114,34 @@ async fn generic_test( // TODO: change the closure to a future. let mut j = 0; let mut rng = rand::rng(); - let mut last = String::new(); - let key = format!("{}", cli); - ck.put(&key, &last).await; + let mut last = String::new(); // only used when not randomkeys + let mut key = format!("{}", cli); + if !randomkeys { + ck.put(&key, &last).await; + } while !done.load(Ordering::Relaxed) { + if randomkeys { + key = format!("{}", rng.gen_range(0..nclients)); + } + let nv = format!("x {} {} y", cli, j); if rng.gen_bool(0.5) { - let nv = format!("x {} {} y", cli, j); debug!("{}: client new append {:?}", cli, nv); // predict effect of append(k, val) if old value is prev. - last += &nv; ck.append(&key, &nv).await; + if !randomkeys { + last += &nv; + } j += 1; + } else if randomkeys && rng.gen_bool(0.1) { + // we only do this when using random keys, because it would break the + // check done after Get() operations. + ck.put(&key, &nv).await; } else { debug!("{}: client new get {:?}", cli, key); let v = ck.get(&key).await; - assert_eq!(v, last, "get wrong value, key {:?}", key); + if !randomkeys { + assert_eq!(v, last, "get wrong value, key {:?}", key); + } } } j @@ -174,7 +191,7 @@ async fn generic_test( if crash { debug!("shutdown servers"); - for i in 0..NSERVERS { + for i in 0..nservers { t.shutdown_server(i); } // Wait for a while for servers to shutdown, since @@ -182,7 +199,7 @@ async fn generic_test( time::sleep(RAFT_ELECTION_TIMEOUT).await; debug!("restart servers"); // crash and re-start all - for i in 0..NSERVERS { + for i in 0..nservers { t.start_server(i).await; } t.connect_all(); @@ -201,7 +218,9 @@ async fn generic_test( let key = format!("{}", i); debug!("Check {:?} for client {}", j, i); let v = ck.get(&key).await; - check_clnt_appends(i, &v, j); + if !randomkeys { + check_clnt_appends(i, &v, j); + } } if let Some(maxraftstate) = maxraftstate { @@ -216,25 +235,71 @@ async fn generic_test( } } + // TODO linearizable check + + t.end(); +} + +/// Check that ops are committed fast enough, +/// better than 1 per heartbeat interval +async fn generic_test_speed(part: &str, maxraftstate: Option) { + const NSERVERS: usize = 3; + const NUM_OPS: u32 = 1000; + + info!("Test: ops complete fast enough ({})", part); + + let t = Tester::new(NSERVERS, false, maxraftstate).await; + let ck = t.make_client(&t.all()); + + // wait until first op completes, so we know a leader is elected and + // KV servers are ready to process client requests. + ck.get("x").await; + + let start = Instant::now(); + for i in 0..NUM_OPS { + let value = format!("x 0 {} y", i); + ck.append("x", &value).await; + } + let dur = start.elapsed(); + + let v = ck.get("x").await; + check_clnt_appends(0, &v, NUM_OPS as _); + + // heartbeat interval should be ~100ms, require at least 3 ops per heartbeat + const HEARTBEAT_INTERVAL: Duration = Duration::from_millis(100); + const OPS_PER_INTERVAL: u32 = 3; + let time_per_op = HEARTBEAT_INTERVAL / OPS_PER_INTERVAL; + assert!( + dur <= NUM_OPS * time_per_op, + "Operations completed too slowly {:?}/op > {:?}/op", + dur / NUM_OPS, + time_per_op + ); + t.end(); } #[madsim::test] async fn basic_3a() { // Test: one client (3A) ... - generic_test("3A", 1, false, false, false, None).await; + generic_test("3A", 1, 5, false, false, false, None, false).await; } #[madsim::test] async fn concurrent_3a() { // Test: many clients (3A) ... - generic_test("3A", 5, false, false, false, None).await; + generic_test("3A", 5, 5, false, false, false, None, false).await; +} + +#[madsim::test] +async fn speed_3a() { + generic_test_speed("3A", None).await; } #[madsim::test] async fn unreliable_3a() { // Test: unreliable net, many clients (3A) ... - generic_test("3A", 5, true, false, false, None).await; + generic_test("3A", 5, 5, true, false, false, None, false).await; } #[madsim::test] @@ -344,50 +409,50 @@ async fn one_partition_3a() { #[madsim::test] async fn many_partitions_one_client_3a() { // Test: partitions, one client (3A) ... - generic_test("3A", 1, false, false, true, None).await; + generic_test("3A", 1, 5, false, false, true, None, false).await; } #[madsim::test] async fn many_partitions_many_clients_3a() { // Test: partitions, many clients (3A) ... - generic_test("3A", 5, false, false, true, None).await; + generic_test("3A", 5, 5, false, false, true, None, false).await; } #[madsim::test] async fn persist_one_client_3a() { // Test: restarts, one client (3A) ... - generic_test("3A", 1, false, true, false, None).await; + generic_test("3A", 1, 5, false, true, false, None, false).await; } #[madsim::test] async fn persist_concurrent_3a() { // Test: restarts, many clients (3A) ... - generic_test("3A", 5, false, true, false, None).await; + generic_test("3A", 5, 5, false, true, false, None, false).await; } #[madsim::test] async fn persist_concurrent_unreliable_3a() { // Test: unreliable net, restarts, many clients (3A) ... - generic_test("3A", 5, true, true, false, None).await; + generic_test("3A", 5, 5, true, true, false, None, false).await; } #[madsim::test] async fn persist_partition_3a() { // Test: restarts, partitions, many clients (3A) ... - generic_test("3A", 5, false, true, true, None).await; + generic_test("3A", 5, 5, false, true, true, None, false).await; } #[madsim::test] async fn persist_partition_unreliable_3a() { // Test: unreliable net, restarts, partitions, many clients (3A) ... - generic_test("3A", 5, true, true, true, None).await; + generic_test("3A", 5, 5, true, true, true, None, false).await; } -// #[madsim::test] -// async fn persist_partition_unreliable_linearizable_3a() { -// // Test: unreliable net, restarts, partitions, linearizability checks (3A) ... -// generic_test_linearizability("3A", 15, 7, true, true, true, None) -// } +#[madsim::test] +async fn persist_partition_unreliable_linearizable_3a() { + // Test: unreliable net, restarts, partitions, linearizability checks (3A) ... + generic_test("3A", 15, 7, true, true, true, None, true).await; +} // if one server falls behind, then rejoins, does it // recover by using the InstallSnapshot RPC? @@ -491,38 +556,43 @@ async fn snapshot_size_3b() { t.end(); } +#[madsim::test] +async fn speed_3b() { + generic_test_speed("3B", Some(1000)).await; +} + #[madsim::test] async fn snapshot_recover_3b() { // Test: restarts, snapshots, one client (3B) ... - generic_test("3B", 1, false, true, false, Some(1000)).await; + generic_test("3B", 1, 5, false, true, false, Some(1000), false).await; } #[madsim::test] async fn snapshot_recover_many_clients_3b() { // Test: restarts, snapshots, many clients (3B) ... - generic_test("3B", 20, false, true, false, Some(1000)).await; + generic_test("3B", 20, 5, false, true, false, Some(1000), false).await; } #[madsim::test] async fn snapshot_unreliable_3b() { // Test: unreliable net, snapshots, many clients (3B) ... - generic_test("3B", 5, true, false, false, Some(1000)).await; + generic_test("3B", 5, 5, true, false, false, Some(1000), false).await; } #[madsim::test] async fn snapshot_unreliable_recover_3b() { // Test: unreliable net, restarts, snapshots, many clients (3B) ... - generic_test("3B", 5, true, true, false, Some(1000)).await; + generic_test("3B", 5, 5, true, true, false, Some(1000), false).await; } #[madsim::test] async fn snapshot_unreliable_recover_concurrent_partition_3b() { // Test: unreliable net, restarts, partitions, snapshots, many clients (3B) ... - generic_test("3B", 5, true, true, true, Some(1000)).await; + generic_test("3B", 5, 5, true, true, true, Some(1000), false).await; } -// #[madsim::test] -// async fn snapshot_unreliable_recover_concurrent_partition_linearizable_3b() { -// // Test: unreliable net, restarts, partitions, snapshots, linearizability checks (3B) ... -// generic_test_linearizability("3B", 15, 7, true, true, true, Some(1000)).await; -// } +#[madsim::test] +async fn snapshot_unreliable_recover_concurrent_partition_linearizable_3b() { + // Test: unreliable net, restarts, partitions, snapshots, linearizability checks (3B) ... + generic_test("3B", 15, 7, true, true, true, Some(1000), true).await; +} diff --git a/src/raft/tester.rs b/src/raft/tester.rs index eb39994..4d9ef3c 100644 --- a/src/raft/tester.rs +++ b/src/raft/tester.rs @@ -162,7 +162,10 @@ impl RaftTester { self.storage.n_committed(index) } - pub async fn start(&self, i: usize, cmd: Entry) -> Result { + pub async fn start(&self, i: usize, cmd: C) -> Result + where + C: 'static + Send + Sync + Serialize, + { let raft = self.rafts.lock().unwrap()[i].as_ref().unwrap().clone(); self.handle .local_handle(self.addrs[i]) @@ -358,9 +361,10 @@ impl RaftTester { } } -#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] -pub struct Entry { - pub x: u64, +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub enum Entry { + X(u64), + Str(String), } #[derive(Clone)] @@ -405,8 +409,9 @@ impl StorageHandle { fn n_committed(&self, index: u64) -> (usize, Option) { let mut count = 0; let mut cmd = None; - for log in self.logs.lock().unwrap().iter() { - if let Some(&Some(cmd1)) = log.get(index as usize) { + let logs = self.logs.lock().unwrap(); + for log in logs.iter() { + if let Some(Some(cmd1)) = &log.get(index as usize) { if let Some(cmd) = cmd { assert_eq!( cmd, cmd1, @@ -418,7 +423,7 @@ impl StorageHandle { cmd = Some(cmd1); } } - (count, cmd) + (count, cmd.cloned()) } fn max_index(&self) -> usize { diff --git a/src/raft/tests.rs b/src/raft/tests.rs index 0d1433c..c78db5b 100644 --- a/src/raft/tests.rs +++ b/src/raft/tests.rs @@ -13,6 +13,13 @@ use std::{ time::Duration, }; +fn rand_string(rng: &mut R, len: usize) -> String { + (0..len) + .into_iter() + .map(|_| rng.gen_range('a'..'z')) + .collect() +} + /// The tester generously allows solutions to complete elections in one second /// (much more than the paper's range of timeouts). const RAFT_ELECTION_TIMEOUT: Duration = Duration::from_millis(1000); @@ -122,10 +129,123 @@ async fn basic_agree_2b() { let (nd, _) = t.n_committed(index); assert_eq!(nd, 0, "some have committed before start()"); - let xindex = t.one(Entry { x: index * 100 }, servers, false).await; + let xindex = t.one(Entry::X(index * 100), servers, false).await; + assert_eq!(xindex, index, "got index {} but expected {}", xindex, index); + } + + t.end(); +} + +/// check, based on counting bytes of RPCs, that each command is sent to each +/// peer just once. +#[madsim::test] +#[ignore] +async fn rpc_bytes_2b() { + let servers = 3; + + let t = RaftTester::new(servers).await; + info!("Test (2B): RPC byte count"); + let mut random = rand::rng(); + + t.one(Entry::X(99), servers, false).await; + let bytes0 = 0; // TODO bytes metrics + + let iters = 10; + let mut sent = 0; + for index in 2..(iters + 2) { + let cmd = rand_string(&mut random, 5000); + let xindex = t.one(Entry::Str(cmd), servers, false).await; assert_eq!(xindex, index, "got index {} but expected {}", xindex, index); + sent += 5000; } + let bytes1 = 100; + let got = bytes1 - bytes0; + let expected = servers * sent; + assert!( + got <= expected + 50000, + "too many RPC bytes; got {}, expected {}", + got, + expected + ); + + t.end(); +} + +/// test failure of follower +#[madsim::test] +async fn follower_failure_2b() { + let servers = 3; + let t = RaftTester::new(servers).await; + info!("Test (2B): test progressive failure of followers"); + + t.one(Entry::X(101), servers, false).await; + + // disconnect one follower from the network. + let leader1 = t.check_one_leader().await; + t.disconnect((leader1 + 1) % servers); + + // the leader and remaining follower should be + // able to agree despite the disconnected follower. + t.one(Entry::X(102), servers - 1, false).await; + time::sleep(RAFT_ELECTION_TIMEOUT).await; + t.one(Entry::X(103), servers - 1, false).await; + + // disconnect the remaining follower. + let leader2 = t.check_one_leader().await; + t.disconnect((leader2 + 1) % servers); + t.disconnect((leader2 + 2) % servers); + + // submit a command. + let index = t + .start(leader2, Entry::X(104)) + .await + .expect("leader rejected start") + .index; + assert_eq!(index, 4, "expected index 4, got {}", index); + + time::sleep(2 * RAFT_ELECTION_TIMEOUT).await; + + // check that command 104 did not commit. + let (n, _) = t.n_committed(index); + assert_eq!(n, 0, "{} committed but no majority", n); + + t.end(); +} + +/// test failure of leaders +#[madsim::test] +async fn leader_failure_2b() { + let servers = 3; + let t = RaftTester::new(servers).await; + + info!("Test (2B): test failure of leaders"); + + t.one(Entry::X(101), servers, false).await; + + // disconnect the first leader. + let leader1 = t.check_one_leader().await; + t.disconnect(leader1); + + // the remain followers should elect a new leader. + t.one(Entry::X(102), servers - 1, false).await; + time::sleep(RAFT_ELECTION_TIMEOUT).await; + t.one(Entry::X(103), servers - 1, false).await; + + // disconnect the new leader. + let leader2 = t.check_one_leader().await; + t.disconnect(leader2); + + // submit a command to each server. + for i in 0..servers { + t.start(i, Entry::X(104)).await.ok(); + } + time::sleep(2 * RAFT_ELECTION_TIMEOUT).await; + + // check that command 104 did not commit. + let (n, _) = t.n_committed(4); + assert_eq!(n, 0, "{} committed but no majority", n); + t.end(); } @@ -136,26 +256,26 @@ async fn fail_agree_2b() { info!("Test (2B): agreement despite follower disconnection"); - t.one(Entry { x: 101 }, servers, false).await; + t.one(Entry::X(101), servers, false).await; // follower network disconnection let leader = t.check_one_leader().await; t.disconnect((leader + 1) % servers); // agree despite one disconnected server? - t.one(Entry { x: 102 }, servers - 1, false).await; - t.one(Entry { x: 103 }, servers - 1, false).await; + t.one(Entry::X(102), servers - 1, false).await; + t.one(Entry::X(103), servers - 1, false).await; time::sleep(RAFT_ELECTION_TIMEOUT).await; - t.one(Entry { x: 104 }, servers - 1, false).await; - t.one(Entry { x: 105 }, servers - 1, false).await; + t.one(Entry::X(104), servers - 1, false).await; + t.one(Entry::X(105), servers - 1, false).await; // re-connect t.connect((leader + 1) % servers); // agree with full set of servers? - t.one(Entry { x: 106 }, servers, true).await; + t.one(Entry::X(106), servers, true).await; time::sleep(RAFT_ELECTION_TIMEOUT).await; - t.one(Entry { x: 107 }, servers, true).await; + t.one(Entry::X(107), servers, true).await; t.end(); } @@ -167,7 +287,7 @@ async fn fail_no_agree_2b() { info!("Test (2B): no agreement if too many followers disconnect"); - t.one(Entry { x: 10 }, servers, false).await; + t.one(Entry::X(10), servers, false).await; // 3 of 5 followers disconnect let leader = t.check_one_leader().await; @@ -175,7 +295,7 @@ async fn fail_no_agree_2b() { t.disconnect((leader + 2) % servers); t.disconnect((leader + 3) % servers); let index = t - .start(leader, Entry { x: 20 }) + .start(leader, Entry::X(20)) .await .expect("leader rejected start") .index; @@ -197,13 +317,13 @@ async fn fail_no_agree_2b() { // among their own ranks, forgetting index 2. let leader2 = t.check_one_leader().await; let index2 = t - .start(leader2, Entry { x: 30 }) + .start(leader2, Entry::X(30)) .await .expect("leader2 rejected start") .index; assert!((2..=3).contains(&index2), "unexpected index {}", index2); - t.one(Entry { x: 1000 }, servers, true).await; + t.one(Entry::X(1000), servers, true).await; t.end(); } @@ -222,7 +342,7 @@ async fn concurrent_starts_2b() { } let leader = t.check_one_leader().await; - let term = match t.start(leader, Entry { x: 1 }).await { + let term = match t.start(leader, Entry::X(1)).await { Err(err) => { warn!("start leader {} meet error {:?}", leader, err); continue; @@ -232,7 +352,7 @@ async fn concurrent_starts_2b() { let mut idxes = vec![]; for ii in 0..5 { - match t.start(leader, Entry { x: 100 + ii }).await { + match t.start(leader, Entry::X(100 + ii)).await { Err(err) => { warn!("start leader {} meet error {:?}", leader, err); } @@ -251,8 +371,8 @@ async fn concurrent_starts_2b() { let mut cmds = vec![]; for index in idxes { - if let Some(cmd) = t.wait(index, servers, Some(term)).await { - cmds.push(cmd.x); + if let Some(Entry::X(x)) = t.wait(index, servers, Some(term)).await { + cmds.push(x); } else { // peers have moved on to later terms // so we can't expect all Start()s to @@ -281,19 +401,19 @@ async fn rejoin_2b() { info!("Test (2B): rejoin of partitioned leader"); - t.one(Entry { x: 101 }, servers, true).await; + t.one(Entry::X(101), servers, true).await; // leader network failure let leader1 = t.check_one_leader().await; t.disconnect(leader1); // make old leader try to agree on some entries - let _ = t.start(leader1, Entry { x: 102 }).await; - let _ = t.start(leader1, Entry { x: 103 }).await; - let _ = t.start(leader1, Entry { x: 104 }).await; + let _ = t.start(leader1, Entry::X(102)).await; + let _ = t.start(leader1, Entry::X(103)).await; + let _ = t.start(leader1, Entry::X(104)).await; // new leader commits, also for index=2 - t.one(Entry { x: 103 }, 2, true).await; + t.one(Entry::X(103), 2, true).await; // new leader network failure let leader2 = t.check_one_leader().await; @@ -302,12 +422,12 @@ async fn rejoin_2b() { // old leader connected again t.connect(leader1); - t.one(Entry { x: 104 }, 2, true).await; + t.one(Entry::X(104), 2, true).await; // all together now t.connect(leader2); - t.one(Entry { x: 105 }, servers, true).await; + t.one(Entry::X(105), servers, true).await; t.end(); } @@ -413,7 +533,7 @@ async fn count_2b() { total1 = t.rpc_total(); let iters = 10; - let (starti, term) = match t.start(leader, Entry { x: 1 }).await { + let (starti, term) = match t.start(leader, Entry::X(1)).await { Ok(s) => (s.index, s.term), Err(err) => { warn!("start leader {} meet error {:?}", leader, err); @@ -425,7 +545,7 @@ async fn count_2b() { for i in 1..iters + 2 { let x = random.gen::(); cmds.push(x); - match t.start(leader, Entry { x }).await { + match t.start(leader, Entry::X(x)).await { Ok(s) => { if s.term != term { // Term changed while starting @@ -441,9 +561,9 @@ async fn count_2b() { } for i in 1..=iters { - if let Some(ix) = t.wait(starti + i, servers, Some(term)).await { + if let Some(Entry::X(ix)) = t.wait(starti + i, servers, Some(term)).await { assert_eq!( - ix.x, + ix, cmds[(i - 1) as usize], "wrong value {:?} committed for index {}; expected {:?}", ix, @@ -485,7 +605,7 @@ async fn persist1_2c() { info!("Test (2C): basic persistence"); - t.one(Entry { x: 11 }, servers, true).await; + t.one(Entry::X(11), servers, true).await; // crash and re-start all for i in 0..servers { @@ -496,18 +616,18 @@ async fn persist1_2c() { t.connect(i); } - t.one(Entry { x: 12 }, servers, true).await; + t.one(Entry::X(12), servers, true).await; let leader1 = t.check_one_leader().await; t.disconnect(leader1); t.start1(leader1).await; t.connect(leader1); - t.one(Entry { x: 13 }, servers, true).await; + t.one(Entry::X(13), servers, true).await; let leader2 = t.check_one_leader().await; t.disconnect(leader2); - t.one(Entry { x: 14 }, servers - 1, true).await; + t.one(Entry::X(14), servers - 1, true).await; t.start1(leader2).await; t.connect(leader2); @@ -516,11 +636,11 @@ async fn persist1_2c() { let i3 = (t.check_one_leader().await + 1) % servers; t.disconnect(i3); - t.one(Entry { x: 15 }, servers - 1, true).await; + t.one(Entry::X(15), servers - 1, true).await; t.start1(i3).await; t.connect(i3); - t.one(Entry { x: 16 }, servers, true).await; + t.one(Entry::X(16), servers, true).await; t.end(); } @@ -534,7 +654,7 @@ async fn persist2_2c() { let mut index = 1; for _ in 0..5 { - t.one(Entry { x: 10 + index }, servers, true).await; + t.one(Entry::X(10 + index), servers, true).await; index += 1; let leader1 = t.check_one_leader().await; @@ -542,7 +662,7 @@ async fn persist2_2c() { t.disconnect((leader1 + 1) % servers); t.disconnect((leader1 + 2) % servers); - t.one(Entry { x: 10 + index }, servers - 2, true).await; + t.one(Entry::X(10 + index), servers - 2, true).await; index += 1; t.disconnect((leader1 + 0) % servers); @@ -559,14 +679,14 @@ async fn persist2_2c() { t.start1((leader1 + 3) % servers).await; t.connect((leader1 + 3) % servers); - t.one(Entry { x: 10 + index }, servers - 2, true).await; + t.one(Entry::X(10 + index), servers - 2, true).await; index += 1; t.connect((leader1 + 4) % servers); t.connect((leader1 + 0) % servers); } - t.one(Entry { x: 1000 }, servers, true).await; + t.one(Entry::X(1000), servers, true).await; t.end(); } @@ -578,12 +698,12 @@ async fn persist3_2c() { info!("Test (2C): partitioned leader and one follower crash, leader restarts"); - t.one(Entry { x: 101 }, 3, true).await; + t.one(Entry::X(101), 3, true).await; let leader = t.check_one_leader().await; t.disconnect((leader + 2) % servers); - t.one(Entry { x: 102 }, 2, true).await; + t.one(Entry::X(102), 2, true).await; t.crash1((leader + 0) % servers); t.crash1((leader + 1) % servers); @@ -591,12 +711,12 @@ async fn persist3_2c() { t.start1((leader + 0) % servers).await; t.connect((leader + 0) % servers); - t.one(Entry { x: 103 }, 2, true).await; + t.one(Entry::X(103), 2, true).await; t.start1((leader + 1) % servers).await; t.connect((leader + 1) % servers); - t.one(Entry { x: 104 }, servers, true).await; + t.one(Entry::X(104), servers, true).await; t.end(); } @@ -672,15 +792,15 @@ async fn unreliable_agree_2c() { for j in 0..4 { let x = (100 * iters) + j; let t = t.clone(); - let future = async move { t.one(Entry { x }, 1, true).await }; + let future = async move { t.one(Entry::X(x), 1, true).await }; dones.push(task::spawn_local(future)); } - t.one(Entry { x: iters }, 1, true).await; + t.one(Entry::X(iters), 1, true).await; } t.set_unreliable(false); future::join_all(dones).await; - t.one(Entry { x: 100 }, servers, true).await; + t.one(Entry::X(100), servers, true).await; t.end(); } @@ -781,9 +901,9 @@ async fn internal_churn(unreliable: bool) { // but don't wait forever. for to in [10, 20, 50, 100, 200] { let (_, cmd) = t.n_committed(index); - if let Some(cmd) = cmd { - if cmd == x { - values.push(cmd.x); + if let Some(Entry::X(cx)) = cmd { + if Entry::X(cx) == x { + values.push(cx); } break; } @@ -846,7 +966,9 @@ async fn internal_churn(unreliable: bool) { let mut really = vec![]; for index in 1..=last_index { let v = t.wait(index, servers, None).await.unwrap(); - really.push(v.x); + if let Entry::X(x) = v { + really.push(x); + } } for v1 in future::join_all(nrec).await.iter().flatten() { assert!(really.contains(v1), "didn't find a value"); @@ -940,12 +1062,99 @@ async fn snapshot_install_unreliable_crash_2d() { snap_common(false, false, true).await; } +/// do the servers persist the snapshots, and restart using snapshot along +/// with the tail of the log? +#[madsim::test] +async fn snapshot_all_crash_2d() { + let servers = 3; + let iters = 5; + + let t = RaftTester::new_with_snapshot(servers).await; + info!("Test (2D): crash and restart all servers"); + let mut random = rand::rng(); + + t.one(random.gen_entry(), servers, true).await; + + for _iter in 0..iters { + // enough to get a snapshot + for _ in 0..=SNAPSHOT_INTERVAL { + t.one(random.gen_entry(), servers, true).await; + } + let index1 = t.one(random.gen_entry(), servers, true).await; + + // crash all + for i in 0..servers { + t.crash1(i); + } + // revive all + for i in 0..servers { + t.start1(i).await; + t.connect(i); + } + + let index2 = t.one(random.gen_entry(), servers, true).await; + assert!( + index2 > index1, + "index decreased from {} to {}", + index1, + index2 + ); + } + + t.end(); +} + +/// do servers correctly initialize their in-memory copy of the snapshot, +/// making sure that future writes to persistent state don't lose state? +#[madsim::test] +async fn snapshot_init_2d() { + let servers = 3; + let t = RaftTester::new_with_snapshot(servers).await; + + info!("Test (2D): snapshot initialization after crash"); + let mut random = rand::rng(); + t.one(random.gen_entry(), servers, true).await; + + // enough ops to make a snapshot + for _ in 0..=SNAPSHOT_INTERVAL { + t.one(random.gen_entry(), servers, true).await; + } + + // crash all + for i in 0..servers { + t.crash1(i); + } + // revive all + for i in 0..servers { + t.start1(i).await; + t.connect(i); + } + + // a single op, to get something to be written back to + // persistent storage. + t.one(random.gen_entry(), servers, true).await; + + // crash all + for i in 0..servers { + t.crash1(i); + } + // revive all + for i in 0..servers { + t.start1(i).await; + t.connect(i); + } + + // do anothor op to trigger potential bug + t.one(random.gen_entry(), servers, true).await; + t.end(); +} + trait GenEntry { fn gen_entry(&mut self) -> Entry; } impl GenEntry for R { fn gen_entry(&mut self) -> Entry { - Entry { x: self.gen() } + Entry::X(self.gen()) } } From 1186598a4baca041c26babedbce090daa519cc7e Mon Sep 17 00:00:00 2001 From: Jingjia Luo Date: Thu, 18 May 2023 15:25:37 +0800 Subject: [PATCH 2/9] init porcupine --- src/lib.rs | 1 + src/porcupine/checker.rs | 20 ++++++++++++++ src/porcupine/kv.rs | 60 ++++++++++++++++++++++++++++++++++++++++ src/porcupine/mod.rs | 49 ++++++++++++++++++++++++++++++++ src/porcupine/model.rs | 42 ++++++++++++++++++++++++++++ src/raft/tester.rs | 9 ++++++ src/raft/tests.rs | 18 ++++-------- 7 files changed, 186 insertions(+), 13 deletions(-) create mode 100644 src/porcupine/checker.rs create mode 100644 src/porcupine/kv.rs create mode 100644 src/porcupine/mod.rs create mode 100644 src/porcupine/model.rs diff --git a/src/lib.rs b/src/lib.rs index 2f90e8f..53a3f3b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,6 +2,7 @@ extern crate log; pub mod kvraft; +mod porcupine; pub mod raft; pub mod shard_ctrler; pub mod shardkv; diff --git a/src/porcupine/checker.rs b/src/porcupine/checker.rs new file mode 100644 index 0000000..4ce6ab7 --- /dev/null +++ b/src/porcupine/checker.rs @@ -0,0 +1,20 @@ +//! Parallel linearizability checker. + +use std::time::Duration; + +use crate::porcupine::{CheckResult, Model, Operation}; + +pub(crate) struct LinearizationInfo {} + +pub(super) fn check_operations( + model: M, + history: Vec>, + verbose: bool, + timeout: Option, +) -> (CheckResult, LinearizationInfo) +where + M: Model, +{ + let partition = ::partition(history); + todo!() +} diff --git a/src/porcupine/kv.rs b/src/porcupine/kv.rs new file mode 100644 index 0000000..c5a8ed3 --- /dev/null +++ b/src/porcupine/kv.rs @@ -0,0 +1,60 @@ +//! A key-value model. + +use crate::porcupine::model::{Model, Operation}; + +#[derive(Debug, Clone, Copy)] +pub(crate) enum KvOp { + Get, + Put, + Append, +} + +#[derive(Debug)] +pub(crate) struct KvInput { + op: KvOp, + key: String, + value: String, +} + +#[derive(Debug)] +pub(crate) struct KvOutput { + value: String, +} + +pub(crate) struct KvModel { + state: String, +} + +impl Model for KvModel { + type In = KvInput; + type Out = KvOutput; + + fn partition( + history: Vec>, + ) -> Vec>> { + todo!("partition by key") + } + + fn init() -> Self { + KvModel { + state: "".to_string(), + } + } + + fn step(self, input: KvInput, output: KvOutput) -> (bool, Self) { + match input.op { + KvOp::Get => (output.value == self.state, self), + KvOp::Put => (true, Self { state: input.value }), + KvOp::Append => ( + true, + Self { + state: self.state + input.value.as_str(), + }, + ), + } + } + + fn equal(&self, other: &Self) -> bool { + self.state == other.state + } +} diff --git a/src/porcupine/mod.rs b/src/porcupine/mod.rs new file mode 100644 index 0000000..67e1600 --- /dev/null +++ b/src/porcupine/mod.rs @@ -0,0 +1,49 @@ +//! A lib checking linearizability. + +#![deny(clippy::all)] + +#![allow(dead_code)] // TODO + +use std::time::Duration; + +use checker::LinearizationInfo; +use model::{Model, Operation}; + +mod checker; +pub mod kv; +pub mod model; + +/// Linearizability check result. +pub(crate) enum CheckResult { + /// Timeout + Unknown, + /// Ok + Ok, + /// Unlinearizable + Illegal, +} + +pub(crate) fn check_operations( + model: impl Model, + history: Vec>, +) -> bool { + let (res, _) = checker::check_operations(model, history, false, None); + matches!(res, CheckResult::Ok) +} + +pub(crate) fn check_operations_timeout( + model: impl Model, + history: Vec>, + timeout: Duration, +) -> CheckResult { + let (res, _) = checker::check_operations(model, history, false, Some(timeout)); + res +} + +pub(crate) fn check_operation_verbose( + model: impl Model, + history: Vec>, + timeout: Duration, +) -> (CheckResult, LinearizationInfo) { + checker::check_operations(model, history, true, Some(timeout)) +} diff --git a/src/porcupine/model.rs b/src/porcupine/model.rs new file mode 100644 index 0000000..b0158e1 --- /dev/null +++ b/src/porcupine/model.rs @@ -0,0 +1,42 @@ +//! History model. + +/// Operation +pub(crate) struct Operation { + /// optional, unless you want a visualization + pub client_id: Option, + pub input: In, + /// invocation time + pub call: u64, + pub output: Out, + /// response time + pub ret: u64, +} + +/// Model. +/// +/// - Eq trait needs to be implemented to represent equality on states. +pub(crate) trait Model { + /// Input type + type In; + + /// Output type + type Out; + + /// Partition operations, such that a history is linearizable if and only if + /// each partition is linearzable. + fn partition( + history: Vec>, + ) -> Vec>>; + + /// Initial state of the system. + fn init() -> Self; + + /// Step functions for the system. + /// + /// Returns whether or not the system could take this step with the given + /// inputs and outputs. + fn step(self, input: Self::In, output: Self::Out) -> (bool, Self); + + /// Equality on states. + fn equal(&self, other: &Self) -> bool; +} diff --git a/src/raft/tester.rs b/src/raft/tester.rs index 4d9ef3c..992263e 100644 --- a/src/raft/tester.rs +++ b/src/raft/tester.rs @@ -6,6 +6,7 @@ use madsim::{ time::{self, Instant}, Handle, }; +use ::rand::distributions::Alphanumeric; use serde::{Deserialize, Serialize}; use std::{ collections::HashMap, @@ -431,3 +432,11 @@ impl StorageHandle { logs.iter().map(|log| log.len() - 1).max().unwrap() } } + +pub fn rand_string(len: usize) -> String { + rand::rng() + .sample_iter(&Alphanumeric) + .take(len) + .map(char::from) + .collect() +} diff --git a/src/raft/tests.rs b/src/raft/tests.rs index c78db5b..ce5d63e 100644 --- a/src/raft/tests.rs +++ b/src/raft/tests.rs @@ -13,13 +13,6 @@ use std::{ time::Duration, }; -fn rand_string(rng: &mut R, len: usize) -> String { - (0..len) - .into_iter() - .map(|_| rng.gen_range('a'..'z')) - .collect() -} - /// The tester generously allows solutions to complete elections in one second /// (much more than the paper's range of timeouts). const RAFT_ELECTION_TIMEOUT: Duration = Duration::from_millis(1000); @@ -145,23 +138,22 @@ async fn rpc_bytes_2b() { let t = RaftTester::new(servers).await; info!("Test (2B): RPC byte count"); - let mut random = rand::rng(); t.one(Entry::X(99), servers, false).await; - let bytes0 = 0; // TODO bytes metrics + let rpc0 = t.rpc_total(); let iters = 10; let mut sent = 0; for index in 2..(iters + 2) { - let cmd = rand_string(&mut random, 5000); + let cmd = rand_string(5000); let xindex = t.one(Entry::Str(cmd), servers, false).await; assert_eq!(xindex, index, "got index {} but expected {}", xindex, index); sent += 5000; } - let bytes1 = 100; - let got = bytes1 - bytes0; - let expected = servers * sent; + let rpc1 = t.rpc_total(); + let got = (rpc1 - rpc0) * 5000; + let expected = (servers * sent) as u64; assert!( got <= expected + 50000, "too many RPC bytes; got {}, expected {}", From 09ddc9ecf1e6b51985a9faa06d55f2b17d77620a Mon Sep 17 00:00:00 2001 From: Jingjia Luo Date: Sat, 20 May 2023 16:08:56 +0800 Subject: [PATCH 3/9] feat: add a linearizability checker --- Cargo.toml | 2 + src/kvraft/tester.rs | 90 ++++++++++++++++++++- src/kvraft/tests.rs | 29 +++++-- src/porcupine/checker.rs | 130 ++++++++++++++++++++++++++++--- src/porcupine/kv.rs | 84 +++++++++++++++----- src/porcupine/mod.rs | 31 +++----- src/porcupine/model.rs | 53 +++++++++---- src/porcupine/utils.rs | 164 +++++++++++++++++++++++++++++++++++++++ src/raft/tester.rs | 2 +- src/shardkv/tests.rs | 51 +++++++++++- 10 files changed, 557 insertions(+), 79 deletions(-) create mode 100644 src/porcupine/utils.rs diff --git a/Cargo.toml b/Cargo.toml index 0872fa0..8333066 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,3 +15,5 @@ futures = "0.3" serde = { version = "1.0", features = ["derive"] } bincode = "1.3" thiserror = "1.0" +bit-vec = "0.6" +lazy_static = "1.4" diff --git a/src/kvraft/tester.rs b/src/kvraft/tester.rs index 055c77f..27e4ba0 100644 --- a/src/kvraft/tester.rs +++ b/src/kvraft/tester.rs @@ -1,3 +1,8 @@ +use crate::porcupine::{ + kv::{KvInput, KvModel, KvOp, KvOutput}, + model::Operation, +}; +use lazy_static::lazy_static; use madsim::{time::*, Handle, LocalHandle}; use std::{ net::SocketAddr, @@ -211,6 +216,29 @@ impl Tester { } } +lazy_static! { + static ref T0: Instant = Instant::now(); +} + +#[derive(Debug)] +pub struct OpLog { + operations: Mutex>>, +} + +impl OpLog { + pub(super) fn new() -> Self { + Self { + operations: Mutex::new(Vec::new()), + } + } + fn append(&self, op: Operation) { + self.operations.lock().unwrap().push(op); + } + pub(super) fn read(&self) -> Vec> { + self.operations.lock().unwrap().clone() + } +} + #[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord)] pub struct ClerkId(usize); @@ -239,11 +267,30 @@ impl Clerk { let value = value.to_owned(); self.handle .spawn(async move { - ck.put(key, value).await; + ck.put(key.clone(), value.clone()).await; }) .await } + pub async fn put_and_log(&self, key: &str, value: &str, log: &Arc) { + let start = T0.elapsed().as_micros(); + self.put(key, value).await; + let end = T0.elapsed().as_micros(); + log.append(Operation { + client_id: Some(self.id.0), + input: KvInput { + op: KvOp::Put, + key: key.to_owned(), + value: value.to_owned(), + }, + call: start, + output: KvOutput { + value: "".to_string(), + }, + ret: end, + }); + } + pub async fn append(&self, key: &str, value: &str) { self.op(); let ck = self.ck.clone(); @@ -251,11 +298,30 @@ impl Clerk { let value = value.to_owned(); self.handle .spawn(async move { - ck.append(key, value).await; + ck.append(key.clone(), value.clone()).await; }) .await } + pub async fn append_and_log(&self, key: &str, value: &str, log: &Arc) { + let start = T0.elapsed().as_micros(); + self.append(key, value).await; + let end = T0.elapsed().as_micros(); + log.append(Operation { + client_id: Some(self.id.0), + input: KvInput { + op: KvOp::Append, + key: key.to_owned(), + value: value.to_owned(), + }, + call: start, + output: KvOutput { + value: "".to_string(), + }, + ret: end, + }); + } + pub async fn get(&self, key: &str) -> String { self.op(); let ck = self.ck.clone(); @@ -263,6 +329,26 @@ impl Clerk { self.handle.spawn(async move { ck.get(key).await }).await } + pub async fn get_and_log(&self, key: &str, log: &Arc) -> String { + let start = T0.elapsed().as_micros(); + let value = self.get(key).await; + let end = T0.elapsed().as_micros(); + log.append(Operation { + client_id: Some(self.id.0), + input: KvInput { + op: KvOp::Get, + key: key.to_owned(), + value: "".to_string(), + }, + call: start, + output: KvOutput { + value: value.to_owned(), + }, + ret: end, + }); + value + } + pub async fn check(&self, key: &str, value: &str) { let ck = self.ck.clone(); let key1 = key.to_owned(); diff --git a/src/kvraft/tests.rs b/src/kvraft/tests.rs index 514b08e..e09112b 100644 --- a/src/kvraft/tests.rs +++ b/src/kvraft/tests.rs @@ -1,3 +1,8 @@ +use crate::{ + kvraft::tester::OpLog, + porcupine::{self, kv::KvModel, CheckResult}, +}; + use super::tester::Tester; use futures::{future, select, FutureExt}; use madsim::{ @@ -14,7 +19,7 @@ use std::sync::{ /// (much more than the paper's range of timeouts). const RAFT_ELECTION_TIMEOUT: Duration = Duration::from_millis(1000); -// const LINEARIZABILITY_CHECK_TIMEOUT: Duration = Duration::from_millis(1000); +const LINEARIZABILITY_CHECK_TIMEOUT: Duration = Duration::from_millis(1000); // check that for a specific client all known appends are present in a value, // and in order @@ -99,6 +104,7 @@ async fn generic_test( info!("{} ({})", title, part); let t = Arc::new(Tester::new(nservers, unreliable, maxraftstate).await); + let op_log = Arc::new(OpLog::new()); let ck = t.make_client(&t.all()); @@ -110,6 +116,7 @@ async fn generic_test( for cli in 0..nclients { let ck = t.make_client(&t.all()); let done = done.clone(); + let log = Arc::clone(&op_log); cas.push(task::spawn_local(async move { // TODO: change the closure to a future. let mut j = 0; @@ -117,7 +124,7 @@ async fn generic_test( let mut last = String::new(); // only used when not randomkeys let mut key = format!("{}", cli); if !randomkeys { - ck.put(&key, &last).await; + ck.put_and_log(&key, &last, &log).await; } while !done.load(Ordering::Relaxed) { if randomkeys { @@ -127,7 +134,7 @@ async fn generic_test( if rng.gen_bool(0.5) { debug!("{}: client new append {:?}", cli, nv); // predict effect of append(k, val) if old value is prev. - ck.append(&key, &nv).await; + ck.append_and_log(&key, &nv, &log).await; if !randomkeys { last += &nv; } @@ -135,10 +142,10 @@ async fn generic_test( } else if randomkeys && rng.gen_bool(0.1) { // we only do this when using random keys, because it would break the // check done after Get() operations. - ck.put(&key, &nv).await; + ck.put_and_log(&key, &nv, &log).await; } else { debug!("{}: client new get {:?}", cli, key); - let v = ck.get(&key).await; + let v = ck.get_and_log(&key, &log).await; if !randomkeys { assert_eq!(v, last, "get wrong value, key {:?}", key); } @@ -217,7 +224,7 @@ async fn generic_test( } let key = format!("{}", i); debug!("Check {:?} for client {}", j, i); - let v = ck.get(&key).await; + let v = ck.get_and_log(&key, &op_log).await; if !randomkeys { check_clnt_appends(i, &v, j); } @@ -236,6 +243,16 @@ async fn generic_test( } // TODO linearizable check + let (res, _info) = + porcupine::check_operation_verbose::(op_log.read(), LINEARIZABILITY_CHECK_TIMEOUT) + .await; + assert!( + !matches!(res, CheckResult::Illegal), + "history is not linearizable" + ); + if matches!(res, CheckResult::Unknown) { + warn!("linearizability check timed out, assuming history is ok"); + } t.end(); } diff --git a/src/porcupine/checker.rs b/src/porcupine/checker.rs index 4ce6ab7..1c97bfe 100644 --- a/src/porcupine/checker.rs +++ b/src/porcupine/checker.rs @@ -1,20 +1,126 @@ //! Parallel linearizability checker. -use std::time::Duration; - -use crate::porcupine::{CheckResult, Model, Operation}; +use crate::porcupine::{ + model::EntryValue, + utils::{EntryNode, LinkedEntries}, + CheckResult, Entry, Model, Operation, +}; +use bit_vec::BitVec; +use futures::{stream::FuturesUnordered, StreamExt}; +use madsim::time; +use std::{collections::HashMap, mem, time::Duration}; pub(crate) struct LinearizationInfo {} -pub(super) fn check_operations( - model: M, - history: Vec>, +fn cache_contains(cache: &HashMap>, bv: &BitVec, m: &M) -> bool { + if let Some(entries) = cache.get(bv) { + return entries.contains(m); + } + return false; +} + +struct CallEntry { + call: Box>, + ret: Box>, + state: M, +} + +/// Check single sub-history. Return Some() if it's linearizable. +fn check_single(history: Vec>, _verbose: bool) -> Option<()> { + let n = history.len() / 2; // number of operations + + // calls considered done + let mut linearized = BitVec::with_capacity(n); // call set + let mut cache = HashMap::>::new(); // call set -> state + let mut calls: Vec> = vec![]; // sort in time + let undecided = LinkedEntries::from(history); + + // cursor + let mut entry = undecided.front_mut().unwrap(); + + let mut state = M::init(); + + while !undecided.is_empty() { + if matches!(entry.value, EntryValue::Call(_)) { + // the matched return entry + let matched = entry.matched_mut().unwrap(); + let (ok, new_state) = state.step(entry.unwrap_in(), matched.unwrap_out()); + if ok { + let mut new_linearized = linearized.clone(); + new_linearized.set(entry.id, true); + if !cache_contains(&cache, &new_linearized, &new_state) { + linearized.set(entry.id, true); + cache + .get_mut(&new_linearized) + .unwrap() + .push(new_state.clone()); + let (call, ret) = entry.lift(); + calls.push(CallEntry { + call, + ret, + state: mem::replace(&mut state, new_state), + }); + entry = undecided.front_mut().unwrap(); + } else { + // this state is visited before + entry = entry.next_mut().unwrap(); + } + } else { + entry = entry.next_mut().unwrap(); + } + } else { + // back track + let CallEntry { + call, + ret, + state: state0, + } = calls.pop()?; + state = state0; + linearized.set(call.id as _, false); + call.unlift(ret); + entry = entry.next_mut().unwrap(); + } + } + Some(()) +} + +/// Check history in parallel. +/// +/// For each sub-history, spawn a task to test its linearizability. +async fn check_parallel( + histories: Vec>>, + verbose: bool, +) -> (CheckResult, LinearizationInfo) { + let mut futures: FuturesUnordered<_> = histories + .into_iter() + .map(|subhistory| async move { check_single::(subhistory, verbose) }) + .collect(); + let mut check_result = CheckResult::Ok; + while let Some(res) = futures.next().await { + if res.is_none() { + check_result = CheckResult::Illegal; + if !verbose { + break; // collect linearizable prefix under verbose mode + } + } + } + // TODO support verbose print + (check_result, LinearizationInfo {}) +} + +pub(super) async fn check_operations( + history: Vec>, verbose: bool, timeout: Option, -) -> (CheckResult, LinearizationInfo) -where - M: Model, -{ - let partition = ::partition(history); - todo!() +) -> (CheckResult, LinearizationInfo) { + let histories = ::partition(history); + if let Some(dur) = timeout { + // XXX I'm not sure + match time::timeout(dur, check_parallel::(histories, verbose)).await { + Ok(v) => v, + Err(_) => (CheckResult::Unknown, LinearizationInfo {}), + } + } else { + check_parallel::(histories, verbose).await + } } diff --git a/src/porcupine/kv.rs b/src/porcupine/kv.rs index c5a8ed3..938b953 100644 --- a/src/porcupine/kv.rs +++ b/src/porcupine/kv.rs @@ -1,6 +1,7 @@ //! A key-value model. -use crate::porcupine::model::{Model, Operation}; +use crate::porcupine::model::{Entry, EntryValue, Model, Operation}; +use std::{cmp::Ordering, collections::HashMap}; #[derive(Debug, Clone, Copy)] pub(crate) enum KvOp { @@ -9,18 +10,23 @@ pub(crate) enum KvOp { Append, } -#[derive(Debug)] +#[derive(Debug, Clone)] pub(crate) struct KvInput { - op: KvOp, - key: String, - value: String, + pub(crate) op: KvOp, + pub(crate) key: String, + pub(crate) value: String, } -#[derive(Debug)] +#[derive(Debug, Clone)] pub(crate) struct KvOutput { - value: String, + pub(crate) value: String, } +/// Model for single-version key-value store. +/// +/// A single instance of `KvModel` indicates the state of a single entry +/// in the KV store. +#[derive(Debug, Clone, PartialEq, Eq)] pub(crate) struct KvModel { state: String, } @@ -29,10 +35,49 @@ impl Model for KvModel { type In = KvInput; type Out = KvOutput; - fn partition( - history: Vec>, - ) -> Vec>> { - todo!("partition by key") + // partition the history by key, and then sort by time + fn partition(history: Vec>) -> Vec>> { + // key -> history of a single key + let mut map = HashMap::>)>::new(); + + for op in history { + let key = op.input.key.clone(); + let (id, key_hist) = map.entry(key).or_default(); + + // turn operation into a call entry and a return entry + let call_entry = Entry { + value: EntryValue::Call::(op.input), + id: *id, + time: op.call, + client_id: op.client_id, + }; + let return_entry = Entry { + value: EntryValue::Return::(op.output), + id: *id, + time: op.ret, + client_id: op.client_id, + }; + + *id += 1; + key_hist.push(call_entry); + key_hist.push(return_entry); + } + + map.into_values() + .map(|(_, mut v)| { + // sort by time and then entry type + v.sort_by(|x, y| { + x.time.cmp(&y.time).then_with(|| { + if matches!(x.value, EntryValue::Call(_)) { + Ordering::Less + } else { + Ordering::Greater + } + }) + }); + v + }) + .collect() } fn init() -> Self { @@ -41,20 +86,21 @@ impl Model for KvModel { } } - fn step(self, input: KvInput, output: KvOutput) -> (bool, Self) { + fn step(&self, input: &KvInput, output: &KvOutput) -> (bool, Self) { match input.op { - KvOp::Get => (output.value == self.state, self), - KvOp::Put => (true, Self { state: input.value }), + KvOp::Get => (output.value == self.state, self.to_owned()), + KvOp::Put => ( + true, + Self { + state: input.value.clone(), + }, + ), KvOp::Append => ( true, Self { - state: self.state + input.value.as_str(), + state: self.state.to_owned() + input.value.as_str(), }, ), } } - - fn equal(&self, other: &Self) -> bool { - self.state == other.state - } } diff --git a/src/porcupine/mod.rs b/src/porcupine/mod.rs index 67e1600..5a5475f 100644 --- a/src/porcupine/mod.rs +++ b/src/porcupine/mod.rs @@ -1,17 +1,16 @@ //! A lib checking linearizability. - +#![cfg(test)] #![deny(clippy::all)] -#![allow(dead_code)] // TODO - use std::time::Duration; use checker::LinearizationInfo; -use model::{Model, Operation}; +use model::{Entry, Model, Operation}; mod checker; pub mod kv; pub mod model; +mod utils; /// Linearizability check result. pub(crate) enum CheckResult { @@ -23,27 +22,19 @@ pub(crate) enum CheckResult { Illegal, } -pub(crate) fn check_operations( - model: impl Model, - history: Vec>, -) -> bool { - let (res, _) = checker::check_operations(model, history, false, None); - matches!(res, CheckResult::Ok) -} - -pub(crate) fn check_operations_timeout( - model: impl Model, - history: Vec>, +/// If this operation times out, then a false positive is possible. +pub(crate) async fn check_operations_timeout( + history: Vec>, timeout: Duration, ) -> CheckResult { - let (res, _) = checker::check_operations(model, history, false, Some(timeout)); + let (res, _) = checker::check_operations::(history, false, Some(timeout)).await; res } -pub(crate) fn check_operation_verbose( - model: impl Model, - history: Vec>, +/// If this operation times out, then a false positive is possible. +pub(crate) async fn check_operation_verbose( + history: Vec>, timeout: Duration, ) -> (CheckResult, LinearizationInfo) { - checker::check_operations(model, history, true, Some(timeout)) + checker::check_operations::(history, true, Some(timeout)).await } diff --git a/src/porcupine/model.rs b/src/porcupine/model.rs index b0158e1..6d46ec7 100644 --- a/src/porcupine/model.rs +++ b/src/porcupine/model.rs @@ -1,32 +1,53 @@ //! History model. /// Operation -pub(crate) struct Operation { +#[derive(Debug, Clone)] +pub(crate) struct Operation { /// optional, unless you want a visualization pub client_id: Option, - pub input: In, + /// input value + pub input: M::In, /// invocation time - pub call: u64, - pub output: Out, + pub call: u128, + /// output value + pub output: M::Out, /// response time - pub ret: u64, + pub ret: u128, +} + +/// Entry type, could be call or return. +#[derive(Debug)] +pub(crate) enum EntryValue { + Call(In), + Return(Out), +} + +/// Entry +#[derive(Debug)] +pub(crate) struct Entry { + pub value: EntryValue, + pub id: usize, + pub time: u128, + #[allow(dead_code)] // used in verbose mode + pub client_id: Option, } /// Model. /// /// - Eq trait needs to be implemented to represent equality on states. -pub(crate) trait Model { +pub(crate) trait Model: Eq + Clone { /// Input type - type In; + type In: Clone; /// Output type - type Out; + type Out: Clone; /// Partition operations, such that a history is linearizable if and only if - /// each partition is linearzable. - fn partition( - history: Vec>, - ) -> Vec>>; + /// each partition is linearizable. + /// + /// Each partition should be sorted by time. If two entries are of the same time, + /// calls should always be placed before returns. + fn partition(history: Vec>) -> Vec>>; /// Initial state of the system. fn init() -> Self; @@ -34,9 +55,7 @@ pub(crate) trait Model { /// Step functions for the system. /// /// Returns whether or not the system could take this step with the given - /// inputs and outputs. - fn step(self, input: Self::In, output: Self::Out) -> (bool, Self); - - /// Equality on states. - fn equal(&self, other: &Self) -> bool; + /// inputs and outputs, and the new state. This should not mutate the + /// existing state. + fn step(&self, input: &Self::In, output: &Self::Out) -> (bool, Self); } diff --git a/src/porcupine/utils.rs b/src/porcupine/utils.rs new file mode 100644 index 0000000..4a2888c --- /dev/null +++ b/src/porcupine/utils.rs @@ -0,0 +1,164 @@ +//! Some data structures utils. + +use super::model::{Entry, EntryValue, Model}; +use std::{collections::HashMap, ptr::NonNull}; + +/// a pointer type +pub(super) type MaybeNull = Option>; + +fn maybe_null(elem: &mut T) -> MaybeNull { + unsafe { Some(NonNull::new_unchecked(elem as _)) } +} + +#[derive(Debug)] +struct Sentinel { + next: MaybeNull>, +} + +#[derive(Debug)] +pub(super) struct EntryNode { + pub(super) value: EntryValue, + matched: MaybeNull>, + pub(super) id: usize, + prev: MaybeNull>, + next: MaybeNull>, +} + +impl EntryNode { + pub(super) fn next_mut(&self) -> Option<&mut EntryNode> { + Some(unsafe { self.next?.as_mut() }) + } + + pub(super) fn matched_mut(&self) -> Option<&mut EntryNode> { + Some(unsafe { self.matched?.as_mut() }) + } + + pub(super) fn unwrap_in(&self) -> &In { + match &self.value { + EntryValue::Call(v) => v, + EntryValue::Return(_) => panic!("type mismatch"), + } + } + pub(super) fn unwrap_out(&self) -> &Out { + match &self.value { + EntryValue::Call(_) => panic!("type mismatch"), + EntryValue::Return(v) => v, + } + } + + /// detach self and self's match from list + pub(super) fn lift(&mut self) -> (Box>, Box>) { + assert!(matches!(self.value, EntryValue::Call(_))); + unsafe { + self.prev.unwrap().as_mut().next = self.next; + // since calls and returns are paired, call.next won't be None. + self.next.unwrap().as_mut().prev = self.prev; + let self_box = Box::from_raw(self as _); + let matched = self.matched_mut().unwrap(); + matched.prev.unwrap().as_mut().next = matched.next; + if let Some(mut n) = matched.next { + n.as_mut().prev = matched.prev; + } + (self_box, Box::from_raw(matched as _)) + } + } + + /// re-attach self and self's match to their original places + pub(super) fn unlift(self: Box, matched: Box) { + assert!(matches!(self.value, EntryValue::Call(_))); + unsafe { + let self_ptr = NonNull::new_unchecked(Box::into_raw(self)); + let matched_ptr = NonNull::new_unchecked(Box::into_raw(matched)); + matched_ptr.as_ref().prev.unwrap().as_mut().next = Some(matched_ptr); + if let Some(mut n) = matched_ptr.as_ref().next { + n.as_mut().prev = Some(matched_ptr); + } + self_ptr.as_ref().prev.unwrap().as_mut().next = Some(self_ptr); + // since calls and returns are paired, call.next won't be None. + self_ptr.as_ref().next.unwrap().as_mut().prev = Some(self_ptr); + } + } +} + +/// A linked list. +#[derive(Debug)] +pub(super) struct LinkedEntries { + sentinel: Sentinel, +} + +impl LinkedEntries { + pub(super) fn new() -> Self { + Self { + sentinel: Sentinel { next: None }, + } + } + + pub(super) const fn is_empty(&self) -> bool { + self.sentinel.next.is_none() + } + + fn push_front_node(&mut self, node: EntryNode) { + assert!(node.prev.is_none()); + assert!(node.next.is_none()); + unsafe { + let mut node_ptr = NonNull::new_unchecked(Box::into_raw(Box::new(node))); + if let Some(mut prev_head) = self.sentinel.next { + node_ptr.as_mut().next = Some(prev_head); + prev_head.as_mut().prev = Some(node_ptr); + } else { + self.sentinel.next = Some(node_ptr); + } + } + } + + pub(super) fn front_mut(&self) -> Option<&mut EntryNode> { + Some(unsafe { self.sentinel.next?.as_mut() }) + } + + fn pop_front_node(&mut self) -> Option>> { + self.sentinel.next.map(|head| unsafe { + let node = Box::from_raw(head.as_ptr()); + self.sentinel.next = node.next; + node + }) + } +} + +impl Drop for LinkedEntries { + fn drop(&mut self) { + while self.pop_front_node().is_some() {} + } +} + +impl From>> for LinkedEntries { + fn from(value: Vec>) -> Self { + let mut me = LinkedEntries::new(); + // id -> return entry of this id + let mut matches: HashMap>> = + HashMap::with_capacity((value.len() + 1) / 2); + for entry in value.into_iter().rev() { + let node = match entry.value { + EntryValue::Call(v) => EntryNode { + value: EntryValue::Call(v), + matched: matches[&entry.id], // call -> return + id: entry.id, + prev: None, + next: None, + }, + EntryValue::Return(v) => { + let mut node = EntryNode { + value: EntryValue::Return(v), + matched: None, + id: entry.id, + prev: None, + next: None, + }; + matches.insert(entry.id, maybe_null(&mut node)); + node + } + }; + me.push_front_node(node); + } + me + } +} diff --git a/src/raft/tester.rs b/src/raft/tester.rs index 992263e..5e4ac63 100644 --- a/src/raft/tester.rs +++ b/src/raft/tester.rs @@ -1,4 +1,5 @@ use super::raft::*; +use ::rand::distributions::Alphanumeric; use futures::StreamExt; use log::*; use madsim::{ @@ -6,7 +7,6 @@ use madsim::{ time::{self, Instant}, Handle, }; -use ::rand::distributions::Alphanumeric; use serde::{Deserialize, Serialize}; use std::{ collections::HashMap, diff --git a/src/shardkv/tests.rs b/src/shardkv/tests.rs index a5fb133..c1af0fb 100644 --- a/src/shardkv/tests.rs +++ b/src/shardkv/tests.rs @@ -1,3 +1,5 @@ +use crate::porcupine::{check_operation_verbose, kv::KvModel, model::Operation, CheckResult}; + use super::tester::*; use futures::future; use log::*; @@ -10,10 +12,12 @@ use std::{ future::Future, sync::{ atomic::{AtomicBool, AtomicUsize, Ordering}, - Arc, + Arc, Mutex, }, }; +const LINEARIZABILITY_CHECK_TIMEOUT: Duration = Duration::from_millis(1000); + /// test static 2-way sharding, without shard movement. #[madsim::test] async fn static_shards_4b() { @@ -429,7 +433,50 @@ async fn unreliable2_4b() { #[ignore] #[madsim::test] async fn unreliable3_4b() { - // TODO: linearizable + info!("Test: unreliable 3...\n"); + + let t = Tester::new(3, true, Some(100)).await; + let begin = Instant::now(); + let operations = Arc::new(Mutex::new(Vec::>::new())); + + let ck = t.make_client(); + t.join(0).await; + + let n = 10; + let kvs = (0..n) + .map(|i| (i.to_string(), rand_string(5))) + .collect::>(); + ck.put_kvs(&kvs).await; // TODO with log + + let kvs_fut = t.spawn_concurrent_append(kvs, 5, 0); // TODO with log + + time::sleep(Duration::from_millis(150)).await; + t.join(1).await; + time::sleep(Duration::from_millis(500)).await; + t.join(2).await; + time::sleep(Duration::from_millis(500)).await; + t.leave(0).await; + time::sleep(Duration::from_millis(500)).await; + t.leave(1).await; + time::sleep(Duration::from_millis(500)).await; + t.join(1).await; + t.join(0).await; + + time::sleep(Duration::from_secs(2)).await; + let kvs = kvs_fut.await; + + let history = operations.lock().unwrap().clone(); + let (res, _info) = + check_operation_verbose::(history, LINEARIZABILITY_CHECK_TIMEOUT).await; + assert!( + !matches!(res, CheckResult::Illegal), + "history is not linearizable" + ); + if matches!(res, CheckResult::Unknown) { + warn!("linearizability check timed out, assuming history is ok"); + } + + t.end(); } /// optional test to see whether servers are deleting From e164b23e14bc31acc0257c606c47899c4c5be2ac Mon Sep 17 00:00:00 2001 From: Jingjia Luo Date: Sat, 20 May 2023 21:40:38 +0800 Subject: [PATCH 4/9] fix: bugs in linearizability test --- src/kvraft/tests.rs | 9 ++-- src/porcupine/checker.rs | 27 +++++++--- src/porcupine/model.rs | 12 +++-- src/porcupine/utils.rs | 105 +++++++++++++++++++++------------------ src/shardkv/tests.rs | 4 +- 5 files changed, 92 insertions(+), 65 deletions(-) diff --git a/src/kvraft/tests.rs b/src/kvraft/tests.rs index e09112b..75aa722 100644 --- a/src/kvraft/tests.rs +++ b/src/kvraft/tests.rs @@ -242,10 +242,11 @@ async fn generic_test( } } - // TODO linearizable check - let (res, _info) = - porcupine::check_operation_verbose::(op_log.read(), LINEARIZABILITY_CHECK_TIMEOUT) - .await; + let res = porcupine::check_operations_timeout::( + op_log.read(), + LINEARIZABILITY_CHECK_TIMEOUT, + ) + .await; assert!( !matches!(res, CheckResult::Illegal), "history is not linearizable" diff --git a/src/porcupine/checker.rs b/src/porcupine/checker.rs index 1c97bfe..89b27f4 100644 --- a/src/porcupine/checker.rs +++ b/src/porcupine/checker.rs @@ -28,20 +28,20 @@ struct CallEntry { /// Check single sub-history. Return Some() if it's linearizable. fn check_single(history: Vec>, _verbose: bool) -> Option<()> { let n = history.len() / 2; // number of operations + debug!("history {:?}", history); - // calls considered done - let mut linearized = BitVec::with_capacity(n); // call set + let mut linearized = BitVec::from_elem(n, false); // call set let mut cache = HashMap::>::new(); // call set -> state let mut calls: Vec> = vec![]; // sort in time let undecided = LinkedEntries::from(history); // cursor let mut entry = undecided.front_mut().unwrap(); - let mut state = M::init(); while !undecided.is_empty() { if matches!(entry.value, EntryValue::Call(_)) { + debug!("id={} call", entry.id); // the matched return entry let matched = entry.matched_mut().unwrap(); let (ok, new_state) = state.step(entry.unwrap_in(), matched.unwrap_out()); @@ -49,10 +49,11 @@ fn check_single(history: Vec>, _verbose: bool) -> Option<()> let mut new_linearized = linearized.clone(); new_linearized.set(entry.id, true); if !cache_contains(&cache, &new_linearized, &new_state) { + debug!("cache miss, push {} into calls", entry.id); linearized.set(entry.id, true); cache - .get_mut(&new_linearized) - .unwrap() + .entry(new_linearized) + .or_default() .push(new_state.clone()); let (call, ret) = entry.lift(); calls.push(CallEntry { @@ -60,24 +61,34 @@ fn check_single(history: Vec>, _verbose: bool) -> Option<()> ret, state: mem::replace(&mut state, new_state), }); - entry = undecided.front_mut().unwrap(); + if let Some(front) = undecided.front_mut() { + entry = front; + } else { + break; + } } else { // this state is visited before entry = entry.next_mut().unwrap(); } } else { + // call entry has next entry = entry.next_mut().unwrap(); } } else { - // back track + // an undecided return found, meaning that a call considered done before this + // time point has to be revoked. + debug!("id={} return", entry.id); let CallEntry { - call, + mut call, ret, state: state0, } = calls.pop()?; + debug!("revoke call {}", call.id); state = state0; linearized.set(call.id as _, false); + entry = call.leak(); call.unlift(ret); + // call entry has next entry = entry.next_mut().unwrap(); } } diff --git a/src/porcupine/model.rs b/src/porcupine/model.rs index 6d46ec7..b84c857 100644 --- a/src/porcupine/model.rs +++ b/src/porcupine/model.rs @@ -1,5 +1,7 @@ //! History model. +use std::fmt::Debug; + /// Operation #[derive(Debug, Clone)] pub(crate) struct Operation { @@ -16,8 +18,10 @@ pub(crate) struct Operation { } /// Entry type, could be call or return. -#[derive(Debug)] +#[derive(Debug, Default)] pub(crate) enum EntryValue { + #[default] // only used in sentinel node + Null, Call(In), Return(Out), } @@ -35,12 +39,12 @@ pub(crate) struct Entry { /// Model. /// /// - Eq trait needs to be implemented to represent equality on states. -pub(crate) trait Model: Eq + Clone { +pub(crate) trait Model: Eq + Clone + Debug { /// Input type - type In: Clone; + type In: Clone + Debug; /// Output type - type Out: Clone; + type Out: Clone + Debug; /// Partition operations, such that a history is linearizable if and only if /// each partition is linearizable. diff --git a/src/porcupine/utils.rs b/src/porcupine/utils.rs index 4a2888c..34165c4 100644 --- a/src/porcupine/utils.rs +++ b/src/porcupine/utils.rs @@ -6,15 +6,6 @@ use std::{collections::HashMap, ptr::NonNull}; /// a pointer type pub(super) type MaybeNull = Option>; -fn maybe_null(elem: &mut T) -> MaybeNull { - unsafe { Some(NonNull::new_unchecked(elem as _)) } -} - -#[derive(Debug)] -struct Sentinel { - next: MaybeNull>, -} - #[derive(Debug)] pub(super) struct EntryNode { pub(super) value: EntryValue, @@ -25,24 +16,28 @@ pub(super) struct EntryNode { } impl EntryNode { + pub(super) fn leak<'a, 'b>(self: &'a mut Box) -> &'b mut Self { + // SAFETY: self will not be dropped + unsafe { &mut *(self.as_mut() as *mut Self) } + } pub(super) fn next_mut(&self) -> Option<&mut EntryNode> { - Some(unsafe { self.next?.as_mut() }) + unsafe { Some(self.next?.as_mut()) } } pub(super) fn matched_mut(&self) -> Option<&mut EntryNode> { - Some(unsafe { self.matched?.as_mut() }) + unsafe { Some(self.matched?.as_mut()) } } pub(super) fn unwrap_in(&self) -> &In { match &self.value { EntryValue::Call(v) => v, - EntryValue::Return(_) => panic!("type mismatch"), + _ => panic!("type mismatch"), } } pub(super) fn unwrap_out(&self) -> &Out { match &self.value { - EntryValue::Call(_) => panic!("type mismatch"), EntryValue::Return(v) => v, + _ => panic!("type mismatch"), } } @@ -83,50 +78,69 @@ impl EntryNode { /// A linked list. #[derive(Debug)] pub(super) struct LinkedEntries { - sentinel: Sentinel, + sentinel: NonNull>, } impl LinkedEntries { pub(super) fn new() -> Self { - Self { - sentinel: Sentinel { next: None }, + let node = Box::new(EntryNode { + value: EntryValue::Null, + matched: None, + id: usize::MAX, + prev: None, + next: None, + }); + unsafe { + Self { + sentinel: NonNull::new_unchecked(Box::into_raw(node)), + } } } - pub(super) const fn is_empty(&self) -> bool { - self.sentinel.next.is_none() + pub(super) fn is_empty(&self) -> bool { + unsafe { self.sentinel.as_ref().next.is_none() } } - fn push_front_node(&mut self, node: EntryNode) { + pub(super) fn front_mut(&self) -> Option<&mut EntryNode> { + unsafe { Some(self.sentinel.as_ref().next?.as_mut()) } + } + + fn push_front_node( + &mut self, + node: EntryNode, + matches: &mut HashMap>>, + ) { assert!(node.prev.is_none()); assert!(node.next.is_none()); + let is_return = matches!(node.value, EntryValue::Return(_)); unsafe { let mut node_ptr = NonNull::new_unchecked(Box::into_raw(Box::new(node))); - if let Some(mut prev_head) = self.sentinel.next { - node_ptr.as_mut().next = Some(prev_head); - prev_head.as_mut().prev = Some(node_ptr); - } else { - self.sentinel.next = Some(node_ptr); + if is_return { + matches.insert(node_ptr.as_ref().id, node_ptr.clone()); } + let prev_head_ptr = self.sentinel.as_ref().next; + node_ptr.as_mut().prev = Some(self.sentinel); + node_ptr.as_mut().next = prev_head_ptr; + self.sentinel.as_mut().next = Some(node_ptr); + prev_head_ptr.map(|mut head| head.as_mut().prev = Some(node_ptr)); } } - pub(super) fn front_mut(&self) -> Option<&mut EntryNode> { - Some(unsafe { self.sentinel.next?.as_mut() }) - } - fn pop_front_node(&mut self) -> Option>> { - self.sentinel.next.map(|head| unsafe { - let node = Box::from_raw(head.as_ptr()); - self.sentinel.next = node.next; - node - }) + unsafe { + self.sentinel.as_ref().next.map(|head| { + let node = Box::from_raw(head.as_ptr()); + self.sentinel.as_mut().next = node.next; + node + }) + } } } impl Drop for LinkedEntries { fn drop(&mut self) { while self.pop_front_node().is_some() {} + _ = unsafe { Box::from_raw(self.sentinel.as_ptr()) }; } } @@ -134,30 +148,27 @@ impl From>> for LinkedEntries { fn from(value: Vec>) -> Self { let mut me = LinkedEntries::new(); // id -> return entry of this id - let mut matches: HashMap>> = + let mut matches: HashMap>> = HashMap::with_capacity((value.len() + 1) / 2); for entry in value.into_iter().rev() { let node = match entry.value { EntryValue::Call(v) => EntryNode { value: EntryValue::Call(v), - matched: matches[&entry.id], // call -> return + matched: Some(matches[&entry.id]), // call -> return + id: entry.id, + prev: None, + next: None, + }, + EntryValue::Return(v) => EntryNode { + value: EntryValue::Return(v), + matched: None, id: entry.id, prev: None, next: None, }, - EntryValue::Return(v) => { - let mut node = EntryNode { - value: EntryValue::Return(v), - matched: None, - id: entry.id, - prev: None, - next: None, - }; - matches.insert(entry.id, maybe_null(&mut node)); - node - } + _ => unreachable!("EntryValue::Null is only used in senital"), }; - me.push_front_node(node); + me.push_front_node(node, &mut matches); } me } diff --git a/src/shardkv/tests.rs b/src/shardkv/tests.rs index c1af0fb..11712ef 100644 --- a/src/shardkv/tests.rs +++ b/src/shardkv/tests.rs @@ -436,7 +436,7 @@ async fn unreliable3_4b() { info!("Test: unreliable 3...\n"); let t = Tester::new(3, true, Some(100)).await; - let begin = Instant::now(); + let _begin = Instant::now(); let operations = Arc::new(Mutex::new(Vec::>::new())); let ck = t.make_client(); @@ -463,7 +463,7 @@ async fn unreliable3_4b() { t.join(0).await; time::sleep(Duration::from_secs(2)).await; - let kvs = kvs_fut.await; + let _kvs = kvs_fut.await; let history = operations.lock().unwrap().clone(); let (res, _info) = From 66f8a85e90d03ce35a84a380e87fb8aa50f1372c Mon Sep 17 00:00:00 2001 From: Jingjia Luo Date: Sun, 21 May 2023 14:45:10 +0800 Subject: [PATCH 5/9] add unreliable3_4b --- src/porcupine/checker.rs | 2 +- src/porcupine/mod.rs | 1 + src/porcupine/model.rs | 4 +- src/porcupine/utils.rs | 1 + src/raft/tests.rs | 1 + src/shardkv/tester.rs | 38 ++++++++++++++++- src/shardkv/tests.rs | 90 ++++++++++++++++++++++++++++++++++++---- 7 files changed, 124 insertions(+), 13 deletions(-) diff --git a/src/porcupine/checker.rs b/src/porcupine/checker.rs index 89b27f4..8556d44 100644 --- a/src/porcupine/checker.rs +++ b/src/porcupine/checker.rs @@ -16,7 +16,7 @@ fn cache_contains(cache: &HashMap>, bv: &BitVec, m: &M) if let Some(entries) = cache.get(bv) { return entries.contains(m); } - return false; + false } struct CallEntry { diff --git a/src/porcupine/mod.rs b/src/porcupine/mod.rs index 5a5475f..0abd36c 100644 --- a/src/porcupine/mod.rs +++ b/src/porcupine/mod.rs @@ -36,5 +36,6 @@ pub(crate) async fn check_operation_verbose( history: Vec>, timeout: Duration, ) -> (CheckResult, LinearizationInfo) { + // TODO support verbose checker::check_operations::(history, true, Some(timeout)).await } diff --git a/src/porcupine/model.rs b/src/porcupine/model.rs index b84c857..bb49bc1 100644 --- a/src/porcupine/model.rs +++ b/src/porcupine/model.rs @@ -20,8 +20,8 @@ pub(crate) struct Operation { /// Entry type, could be call or return. #[derive(Debug, Default)] pub(crate) enum EntryValue { - #[default] // only used in sentinel node - Null, + #[default] + Null, // only used in sentinel node Call(In), Return(Out), } diff --git a/src/porcupine/utils.rs b/src/porcupine/utils.rs index 34165c4..9440a7d 100644 --- a/src/porcupine/utils.rs +++ b/src/porcupine/utils.rs @@ -34,6 +34,7 @@ impl EntryNode { _ => panic!("type mismatch"), } } + pub(super) fn unwrap_out(&self) -> &Out { match &self.value { EntryValue::Return(v) => v, diff --git a/src/raft/tests.rs b/src/raft/tests.rs index ce5d63e..02c9166 100644 --- a/src/raft/tests.rs +++ b/src/raft/tests.rs @@ -152,6 +152,7 @@ async fn rpc_bytes_2b() { } let rpc1 = t.rpc_total(); + // XXX should count bytes instead, but madsim does not provide such functionality let got = (rpc1 - rpc0) * 5000; let expected = (servers * sent) as u64; assert!( diff --git a/src/shardkv/tester.rs b/src/shardkv/tester.rs index dad106b..a0ca1c1 100644 --- a/src/shardkv/tester.rs +++ b/src/shardkv/tester.rs @@ -1,5 +1,11 @@ use super::{client::Clerk, server::ShardKvServer}; -use crate::shard_ctrler::{client::Clerk as CtrlerClerk, server::ShardCtrler, N_SHARDS}; +use crate::{ + porcupine::{ + kv::{KvInput, KvModel, KvOp, KvOutput}, + model::Operation, + }, + shard_ctrler::{client::Clerk as CtrlerClerk, server::ShardCtrler, N_SHARDS}, +}; use ::rand::distributions::Alphanumeric; use madsim::{ rand::{self, Rng}, @@ -25,7 +31,7 @@ pub struct Tester { max_raft_state: Option, // begin()/end() statistics - t0: Instant, + pub t0: Instant, } struct Group { @@ -238,6 +244,34 @@ impl Clerk { } } + pub(super) async fn put_kvs_with_log( + &self, + begin: Instant, + kvs: &[(String, String)], + log: &Arc>>>, + ) { + for (k, v) in kvs { + let start = begin.elapsed().as_micros(); + self.put(k.clone(), v.clone()).await; + let end = begin.elapsed().as_micros(); + let input = KvInput { + op: KvOp::Put, + key: k.to_owned(), + value: v.to_owned(), + }; + let output = KvOutput { + value: "".to_string(), + }; + log.lock().unwrap().push(Operation { + client_id: None, + input, + call: start, + output, + ret: end, + }); + } + } + pub async fn check(&self, k: String, v: String) { let v0 = self.get(k.clone()).await; assert_eq!(v0, v, "check failed: key={:?}", k); diff --git a/src/shardkv/tests.rs b/src/shardkv/tests.rs index 11712ef..e356b9a 100644 --- a/src/shardkv/tests.rs +++ b/src/shardkv/tests.rs @@ -1,4 +1,9 @@ -use crate::porcupine::{check_operation_verbose, kv::KvModel, model::Operation, CheckResult}; +use crate::porcupine::{ + check_operation_verbose, check_operations_timeout, + kv::{KvInput, KvModel, KvOp, KvOutput}, + model::Operation, + CheckResult, +}; use super::tester::*; use futures::future; @@ -221,6 +226,77 @@ impl Tester { future::join_all(handles).await } } + + fn spawn_concurrent_ops_with_log( + &self, + n: usize, + begin: Instant, + kvs: Arc>, + value_len: usize, + log: &Arc>>>, + ) -> impl Future { + let done = Arc::new(AtomicBool::new(false)); + let mut handles = vec![]; + for _ in 0..n { + let ck = self.make_client(); + let done = done.clone(); + let kvs = kvs.clone(); + let log = log.clone(); + let t0 = begin.clone(); + handles.push(task::spawn_local(async move { + let mut rng = rand::rng(); + while !done.load(Ordering::SeqCst) { + let ki = rng.gen_range(0..n); + let nv = rand_string(value_len); + let start = t0.elapsed().as_micros(); + let (input, output) = if rng.gen_bool(0.5) { + ck.append(kvs[ki].0.clone(), nv.clone()).await; + let input = KvInput { + op: KvOp::Append, + key: kvs[ki].0.clone(), + value: nv, + }; + let output = KvOutput { + value: "".to_string(), + }; + (input, output) + } else if rng.gen_bool(0.1) { + ck.put(kvs[ki].0.clone(), nv.clone()).await; + let input = KvInput { + op: KvOp::Put, + key: kvs[ki].0.clone(), + value: nv, + }; + let output = KvOutput { + value: "".to_string(), + }; + (input, output) + } else { + let v = ck.get(kvs[ki].0.clone()).await; + let input = KvInput { + op: KvOp::Get, + key: kvs[ki].0.clone(), + value: "".to_string(), + }; + let output = KvOutput { value: v }; + (input, output) + }; + let end = t0.elapsed().as_micros(); + log.lock().unwrap().push(Operation { + client_id: None, + input, + call: start, + output, + ret: end, + }); + } + })); + } + async move { + done.store(true, Ordering::SeqCst); + future::join_all(handles).await; + } + } } #[madsim::test] @@ -430,13 +506,12 @@ async fn unreliable2_4b() { t.end(); } -#[ignore] #[madsim::test] async fn unreliable3_4b() { info!("Test: unreliable 3...\n"); let t = Tester::new(3, true, Some(100)).await; - let _begin = Instant::now(); + let begin = Instant::now(); let operations = Arc::new(Mutex::new(Vec::>::new())); let ck = t.make_client(); @@ -446,9 +521,9 @@ async fn unreliable3_4b() { let kvs = (0..n) .map(|i| (i.to_string(), rand_string(5))) .collect::>(); - ck.put_kvs(&kvs).await; // TODO with log + ck.put_kvs_with_log(begin, &kvs, &operations).await; - let kvs_fut = t.spawn_concurrent_append(kvs, 5, 0); // TODO with log + let fut = t.spawn_concurrent_ops_with_log(n, begin, Arc::new(kvs), 5, &operations); time::sleep(Duration::from_millis(150)).await; t.join(1).await; @@ -463,11 +538,10 @@ async fn unreliable3_4b() { t.join(0).await; time::sleep(Duration::from_secs(2)).await; - let _kvs = kvs_fut.await; + fut.await; let history = operations.lock().unwrap().clone(); - let (res, _info) = - check_operation_verbose::(history, LINEARIZABILITY_CHECK_TIMEOUT).await; + let res = check_operations_timeout::(history, LINEARIZABILITY_CHECK_TIMEOUT).await; assert!( !matches!(res, CheckResult::Illegal), "history is not linearizable" From d8bb8ef6fe6a1f5216451c8ca102bbe5f7f0df38 Mon Sep 17 00:00:00 2001 From: Jingjia Luo Date: Sun, 21 May 2023 20:08:41 +0800 Subject: [PATCH 6/9] fix timeout --- src/porcupine/checker.rs | 61 +++++++++++++++++++++++++++------------- src/porcupine/mod.rs | 23 +++++++-------- src/porcupine/model.rs | 4 +-- src/porcupine/utils.rs | 2 +- 4 files changed, 56 insertions(+), 34 deletions(-) diff --git a/src/porcupine/checker.rs b/src/porcupine/checker.rs index 8556d44..c16a8e2 100644 --- a/src/porcupine/checker.rs +++ b/src/porcupine/checker.rs @@ -7,8 +7,16 @@ use crate::porcupine::{ }; use bit_vec::BitVec; use futures::{stream::FuturesUnordered, StreamExt}; -use madsim::time; -use std::{collections::HashMap, mem, time::Duration}; +use std::{ + collections::HashMap, + mem, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + thread, + time::Duration, +}; pub(crate) struct LinearizationInfo {} @@ -26,7 +34,11 @@ struct CallEntry { } /// Check single sub-history. Return Some() if it's linearizable. -fn check_single(history: Vec>, _verbose: bool) -> Option<()> { +fn check_single( + history: Vec>, + _verbose: bool, + killed: Arc, +) -> CheckResult { let n = history.len() / 2; // number of operations debug!("history {:?}", history); @@ -40,6 +52,9 @@ fn check_single(history: Vec>, _verbose: bool) -> Option<()> let mut state = M::init(); while !undecided.is_empty() { + if killed.load(Ordering::Relaxed) { + return CheckResult::Unknown; + } if matches!(entry.value, EntryValue::Call(_)) { debug!("id={} call", entry.id); // the matched return entry @@ -78,21 +93,24 @@ fn check_single(history: Vec>, _verbose: bool) -> Option<()> // an undecided return found, meaning that a call considered done before this // time point has to be revoked. debug!("id={} return", entry.id); + if calls.is_empty() { + return CheckResult::Illegal; + } let CallEntry { mut call, ret, state: state0, - } = calls.pop()?; + } = calls.pop().unwrap(); debug!("revoke call {}", call.id); state = state0; linearized.set(call.id as _, false); - entry = call.leak(); + entry = call.ref_mut(); call.unlift(ret); // call entry has next entry = entry.next_mut().unwrap(); } } - Some(()) + CheckResult::Ok } /// Check history in parallel. @@ -101,21 +119,25 @@ fn check_single(history: Vec>, _verbose: bool) -> Option<()> async fn check_parallel( histories: Vec>>, verbose: bool, + killed: Arc, ) -> (CheckResult, LinearizationInfo) { let mut futures: FuturesUnordered<_> = histories .into_iter() - .map(|subhistory| async move { check_single::(subhistory, verbose) }) + .map(|subhistory| { + let killed = killed.clone(); + async move { check_single::(subhistory, verbose, killed) } + }) .collect(); let mut check_result = CheckResult::Ok; while let Some(res) = futures.next().await { - if res.is_none() { - check_result = CheckResult::Illegal; - if !verbose { + if res as u8 > check_result as u8 { + check_result = res; + if matches!(check_result, CheckResult::Illegal) && !verbose { + killed.store(true, Ordering::Relaxed); break; // collect linearizable prefix under verbose mode } } } - // TODO support verbose print (check_result, LinearizationInfo {}) } @@ -125,13 +147,14 @@ pub(super) async fn check_operations( timeout: Option, ) -> (CheckResult, LinearizationInfo) { let histories = ::partition(history); - if let Some(dur) = timeout { - // XXX I'm not sure - match time::timeout(dur, check_parallel::(histories, verbose)).await { - Ok(v) => v, - Err(_) => (CheckResult::Unknown, LinearizationInfo {}), - } - } else { - check_parallel::(histories, verbose).await + let killed = Arc::new(AtomicBool::new(false)); + if let Some(duration) = timeout { + let killed1 = killed.clone(); + thread::spawn(move || { + thread::sleep(duration); + killed1.store(true, Ordering::Relaxed); + }); } + check_parallel(histories, verbose, killed.clone()).await } + diff --git a/src/porcupine/mod.rs b/src/porcupine/mod.rs index 0abd36c..0cc01dc 100644 --- a/src/porcupine/mod.rs +++ b/src/porcupine/mod.rs @@ -1,25 +1,25 @@ -//! A lib checking linearizability. +//! A tool to check linearizability. #![cfg(test)] -#![deny(clippy::all)] - -use std::time::Duration; - -use checker::LinearizationInfo; -use model::{Entry, Model, Operation}; mod checker; pub mod kv; pub mod model; mod utils; +use std::time::Duration; +use checker::LinearizationInfo; +use model::{Entry, Model, Operation}; + /// Linearizability check result. +#[derive(Debug, Clone, Copy)] +#[repr(u8)] pub(crate) enum CheckResult { - /// Timeout - Unknown, /// Ok - Ok, + Ok = 0, + /// Timeout + Unknown = 1, /// Unlinearizable - Illegal, + Illegal = 2, } /// If this operation times out, then a false positive is possible. @@ -32,6 +32,7 @@ pub(crate) async fn check_operations_timeout( } /// If this operation times out, then a false positive is possible. +#[allow(dead_code)] // TODO support verbose pub(crate) async fn check_operation_verbose( history: Vec>, timeout: Duration, diff --git a/src/porcupine/model.rs b/src/porcupine/model.rs index bb49bc1..8de979a 100644 --- a/src/porcupine/model.rs +++ b/src/porcupine/model.rs @@ -36,9 +36,7 @@ pub(crate) struct Entry { pub client_id: Option, } -/// Model. -/// -/// - Eq trait needs to be implemented to represent equality on states. +/// Model. Eq trait needs to be implemented to represent equality on states. pub(crate) trait Model: Eq + Clone + Debug { /// Input type type In: Clone + Debug; diff --git a/src/porcupine/utils.rs b/src/porcupine/utils.rs index 9440a7d..992d970 100644 --- a/src/porcupine/utils.rs +++ b/src/porcupine/utils.rs @@ -16,7 +16,7 @@ pub(super) struct EntryNode { } impl EntryNode { - pub(super) fn leak<'a, 'b>(self: &'a mut Box) -> &'b mut Self { + pub(super) fn ref_mut<'a, 'b>(self: &'a mut Box) -> &'b mut Self { // SAFETY: self will not be dropped unsafe { &mut *(self.as_mut() as *mut Self) } } From c3be11af6cc0ac2e4fb6b502a70fd623d36ee8b2 Mon Sep 17 00:00:00 2001 From: Jingjia Luo Date: Thu, 25 May 2023 21:46:32 +0800 Subject: [PATCH 7/9] refactor: remove timeout, use Result as return value in porcupine refactor: remove timeout, use Result as return value in porcupine --- src/kvraft/tests.rs | 17 +-------- src/porcupine/checker.rs | 81 +++++++++------------------------------- src/porcupine/mod.rs | 43 ++++++++++----------- src/raft/tests.rs | 6 +-- src/shardkv/tests.rs | 14 +------ 5 files changed, 44 insertions(+), 117 deletions(-) diff --git a/src/kvraft/tests.rs b/src/kvraft/tests.rs index 75aa722..663efa5 100644 --- a/src/kvraft/tests.rs +++ b/src/kvraft/tests.rs @@ -1,6 +1,6 @@ use crate::{ kvraft::tester::OpLog, - porcupine::{self, kv::KvModel, CheckResult}, + porcupine::{self, kv::KvModel}, }; use super::tester::Tester; @@ -19,8 +19,6 @@ use std::sync::{ /// (much more than the paper's range of timeouts). const RAFT_ELECTION_TIMEOUT: Duration = Duration::from_millis(1000); -const LINEARIZABILITY_CHECK_TIMEOUT: Duration = Duration::from_millis(1000); - // check that for a specific client all known appends are present in a value, // and in order fn check_clnt_appends(clnt: usize, v: &str, count: usize) { @@ -242,18 +240,7 @@ async fn generic_test( } } - let res = porcupine::check_operations_timeout::( - op_log.read(), - LINEARIZABILITY_CHECK_TIMEOUT, - ) - .await; - assert!( - !matches!(res, CheckResult::Illegal), - "history is not linearizable" - ); - if matches!(res, CheckResult::Unknown) { - warn!("linearizability check timed out, assuming history is ok"); - } + porcupine::check_operations::(op_log.read()).expect("history is not linearizable"); t.end(); } diff --git a/src/porcupine/checker.rs b/src/porcupine/checker.rs index c16a8e2..49a6ca9 100644 --- a/src/porcupine/checker.rs +++ b/src/porcupine/checker.rs @@ -3,23 +3,20 @@ use crate::porcupine::{ model::EntryValue, utils::{EntryNode, LinkedEntries}, - CheckResult, Entry, Model, Operation, + Entry, Error, Model, Operation, Result, }; use bit_vec::BitVec; -use futures::{stream::FuturesUnordered, StreamExt}; -use std::{ - collections::HashMap, - mem, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, - }, - thread, - time::Duration, -}; +use std::{collections::HashMap, fmt, mem}; +#[derive(Debug)] pub(crate) struct LinearizationInfo {} +impl fmt::Display for LinearizationInfo { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("LinearizationInfo") + } +} + fn cache_contains(cache: &HashMap>, bv: &BitVec, m: &M) -> bool { if let Some(entries) = cache.get(bv) { return entries.contains(m); @@ -34,11 +31,7 @@ struct CallEntry { } /// Check single sub-history. Return Some() if it's linearizable. -fn check_single( - history: Vec>, - _verbose: bool, - killed: Arc, -) -> CheckResult { +fn check_single(history: Vec>, _verbose: bool) -> Result<()> { let n = history.len() / 2; // number of operations debug!("history {:?}", history); @@ -52,9 +45,6 @@ fn check_single( let mut state = M::init(); while !undecided.is_empty() { - if killed.load(Ordering::Relaxed) { - return CheckResult::Unknown; - } if matches!(entry.value, EntryValue::Call(_)) { debug!("id={} call", entry.id); // the matched return entry @@ -94,7 +84,7 @@ fn check_single( // time point has to be revoked. debug!("id={} return", entry.id); if calls.is_empty() { - return CheckResult::Illegal; + return Err(Error::Illegal(LinearizationInfo {})); } let CallEntry { mut call, @@ -110,51 +100,14 @@ fn check_single( entry = entry.next_mut().unwrap(); } } - CheckResult::Ok + Ok(()) } -/// Check history in parallel. -/// -/// For each sub-history, spawn a task to test its linearizability. -async fn check_parallel( - histories: Vec>>, - verbose: bool, - killed: Arc, -) -> (CheckResult, LinearizationInfo) { - let mut futures: FuturesUnordered<_> = histories - .into_iter() - .map(|subhistory| { - let killed = killed.clone(); - async move { check_single::(subhistory, verbose, killed) } - }) - .collect(); - let mut check_result = CheckResult::Ok; - while let Some(res) = futures.next().await { - if res as u8 > check_result as u8 { - check_result = res; - if matches!(check_result, CheckResult::Illegal) && !verbose { - killed.store(true, Ordering::Relaxed); - break; // collect linearizable prefix under verbose mode - } - } - } - (check_result, LinearizationInfo {}) -} - -pub(super) async fn check_operations( - history: Vec>, - verbose: bool, - timeout: Option, -) -> (CheckResult, LinearizationInfo) { +pub(super) fn check_operations(history: Vec>, verbose: bool) -> Result<()> { let histories = ::partition(history); - let killed = Arc::new(AtomicBool::new(false)); - if let Some(duration) = timeout { - let killed1 = killed.clone(); - thread::spawn(move || { - thread::sleep(duration); - killed1.store(true, Ordering::Relaxed); - }); + for history in histories { + // TODO get linearized prefix under verbose mode + check_single(history, verbose)?; } - check_parallel(histories, verbose, killed.clone()).await + Ok(()) } - diff --git a/src/porcupine/mod.rs b/src/porcupine/mod.rs index 0cc01dc..5320fc8 100644 --- a/src/porcupine/mod.rs +++ b/src/porcupine/mod.rs @@ -6,37 +6,34 @@ pub mod kv; pub mod model; mod utils; -use std::time::Duration; use checker::LinearizationInfo; use model::{Entry, Model, Operation}; +use std::fmt; -/// Linearizability check result. -#[derive(Debug, Clone, Copy)] -#[repr(u8)] -pub(crate) enum CheckResult { - /// Ok - Ok = 0, - /// Timeout - Unknown = 1, - /// Unlinearizable - Illegal = 2, +#[derive(Debug)] +pub(crate) enum Error { + Illegal(LinearizationInfo), } +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Error::Illegal(info) => f.write_fmt(format_args!("Illegal result: {}", info)), + } + } +} + +impl std::error::Error for Error {} + +pub(crate) type Result = std::result::Result; + /// If this operation times out, then a false positive is possible. -pub(crate) async fn check_operations_timeout( - history: Vec>, - timeout: Duration, -) -> CheckResult { - let (res, _) = checker::check_operations::(history, false, Some(timeout)).await; - res +pub(crate) fn check_operations(history: Vec>) -> Result<()> { + checker::check_operations::(history, false) } /// If this operation times out, then a false positive is possible. #[allow(dead_code)] // TODO support verbose -pub(crate) async fn check_operation_verbose( - history: Vec>, - timeout: Duration, -) -> (CheckResult, LinearizationInfo) { - // TODO support verbose - checker::check_operations::(history, true, Some(timeout)).await +pub(crate) fn check_operation_verbose(history: Vec>) -> Result<()> { + checker::check_operations::(history, true) } diff --git a/src/raft/tests.rs b/src/raft/tests.rs index 02c9166..ee26313 100644 --- a/src/raft/tests.rs +++ b/src/raft/tests.rs @@ -132,7 +132,6 @@ async fn basic_agree_2b() { /// check, based on counting bytes of RPCs, that each command is sent to each /// peer just once. #[madsim::test] -#[ignore] async fn rpc_bytes_2b() { let servers = 3; @@ -1070,7 +1069,8 @@ async fn snapshot_all_crash_2d() { for _iter in 0..iters { // enough to get a snapshot - for _ in 0..=SNAPSHOT_INTERVAL { + let nn = SNAPSHOT_INTERVAL / 2 + random.gen_range(0..SNAPSHOT_INTERVAL); + for _ in 0..nn { t.one(random.gen_entry(), servers, true).await; } let index1 = t.one(random.gen_entry(), servers, true).await; @@ -1087,7 +1087,7 @@ async fn snapshot_all_crash_2d() { let index2 = t.one(random.gen_entry(), servers, true).await; assert!( - index2 > index1, + index2 >= index1 + 1, "index decreased from {} to {}", index1, index2 diff --git a/src/shardkv/tests.rs b/src/shardkv/tests.rs index e356b9a..725baec 100644 --- a/src/shardkv/tests.rs +++ b/src/shardkv/tests.rs @@ -1,8 +1,7 @@ use crate::porcupine::{ - check_operation_verbose, check_operations_timeout, + check_operations, kv::{KvInput, KvModel, KvOp, KvOutput}, model::Operation, - CheckResult, }; use super::tester::*; @@ -21,8 +20,6 @@ use std::{ }, }; -const LINEARIZABILITY_CHECK_TIMEOUT: Duration = Duration::from_millis(1000); - /// test static 2-way sharding, without shard movement. #[madsim::test] async fn static_shards_4b() { @@ -541,14 +538,7 @@ async fn unreliable3_4b() { fut.await; let history = operations.lock().unwrap().clone(); - let res = check_operations_timeout::(history, LINEARIZABILITY_CHECK_TIMEOUT).await; - assert!( - !matches!(res, CheckResult::Illegal), - "history is not linearizable" - ); - if matches!(res, CheckResult::Unknown) { - warn!("linearizability check timed out, assuming history is ok"); - } + check_operations::(history).expect("history is not linearizable"); t.end(); } From 9ebbdca674c03d49aa573962a6c966e98474bb00 Mon Sep 17 00:00:00 2001 From: Jingjia Luo Date: Mon, 29 May 2023 17:46:06 +0800 Subject: [PATCH 8/9] refactor: reduce unsafe code in LinkedEntries --- Cargo.toml | 1 - src/kvraft/tester.rs | 19 ++-- src/porcupine/checker.rs | 16 ++-- src/porcupine/utils.rs | 182 ++++++++++++++++++++++++--------------- 4 files changed, 128 insertions(+), 90 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 8333066..5df532b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,4 +16,3 @@ serde = { version = "1.0", features = ["derive"] } bincode = "1.3" thiserror = "1.0" bit-vec = "0.6" -lazy_static = "1.4" diff --git a/src/kvraft/tester.rs b/src/kvraft/tester.rs index 27e4ba0..b5b93fb 100644 --- a/src/kvraft/tester.rs +++ b/src/kvraft/tester.rs @@ -2,7 +2,6 @@ use crate::porcupine::{ kv::{KvInput, KvModel, KvOp, KvOutput}, model::Operation, }; -use lazy_static::lazy_static; use madsim::{time::*, Handle, LocalHandle}; use std::{ net::SocketAddr, @@ -139,6 +138,7 @@ impl Tester { handle: self.handle.local_handle(id.to_addr()), ck: Arc::new(client::Clerk::new(self.addrs.clone())), ops: self.ops.clone(), + t0: self.t0, } } @@ -216,10 +216,6 @@ impl Tester { } } -lazy_static! { - static ref T0: Instant = Instant::now(); -} - #[derive(Debug)] pub struct OpLog { operations: Mutex>>, @@ -253,6 +249,7 @@ pub struct Clerk { id: ClerkId, ck: Arc, ops: Arc, + t0: Instant, } impl Clerk { @@ -273,9 +270,9 @@ impl Clerk { } pub async fn put_and_log(&self, key: &str, value: &str, log: &Arc) { - let start = T0.elapsed().as_micros(); + let start = self.t0.elapsed().as_micros(); self.put(key, value).await; - let end = T0.elapsed().as_micros(); + let end = self.t0.elapsed().as_micros(); log.append(Operation { client_id: Some(self.id.0), input: KvInput { @@ -304,9 +301,9 @@ impl Clerk { } pub async fn append_and_log(&self, key: &str, value: &str, log: &Arc) { - let start = T0.elapsed().as_micros(); + let start = self.t0.elapsed().as_micros(); self.append(key, value).await; - let end = T0.elapsed().as_micros(); + let end = self.t0.elapsed().as_micros(); log.append(Operation { client_id: Some(self.id.0), input: KvInput { @@ -330,9 +327,9 @@ impl Clerk { } pub async fn get_and_log(&self, key: &str, log: &Arc) -> String { - let start = T0.elapsed().as_micros(); + let start = self.t0.elapsed().as_micros(); let value = self.get(key).await; - let end = T0.elapsed().as_micros(); + let end = self.t0.elapsed().as_micros(); log.append(Operation { client_id: Some(self.id.0), input: KvInput { diff --git a/src/porcupine/checker.rs b/src/porcupine/checker.rs index 49a6ca9..1f4ccb8 100644 --- a/src/porcupine/checker.rs +++ b/src/porcupine/checker.rs @@ -41,14 +41,14 @@ fn check_single(history: Vec>, _verbose: bool) -> Result<()> let undecided = LinkedEntries::from(history); // cursor - let mut entry = undecided.front_mut().unwrap(); + let mut entry = undecided.front().unwrap(); let mut state = M::init(); while !undecided.is_empty() { if matches!(entry.value, EntryValue::Call(_)) { debug!("id={} call", entry.id); // the matched return entry - let matched = entry.matched_mut().unwrap(); + let matched = entry.matched().unwrap(); let (ok, new_state) = state.step(entry.unwrap_in(), matched.unwrap_out()); if ok { let mut new_linearized = linearized.clone(); @@ -66,18 +66,18 @@ fn check_single(history: Vec>, _verbose: bool) -> Result<()> ret, state: mem::replace(&mut state, new_state), }); - if let Some(front) = undecided.front_mut() { + if let Some(front) = undecided.front() { entry = front; } else { break; } } else { // this state is visited before - entry = entry.next_mut().unwrap(); + entry = entry.next().unwrap(); } } else { // call entry has next - entry = entry.next_mut().unwrap(); + entry = entry.next().unwrap(); } } else { // an undecided return found, meaning that a call considered done before this @@ -87,17 +87,17 @@ fn check_single(history: Vec>, _verbose: bool) -> Result<()> return Err(Error::Illegal(LinearizationInfo {})); } let CallEntry { - mut call, + call, ret, state: state0, } = calls.pop().unwrap(); debug!("revoke call {}", call.id); state = state0; linearized.set(call.id as _, false); - entry = call.ref_mut(); + entry = call.get_ref(); call.unlift(ret); // call entry has next - entry = entry.next_mut().unwrap(); + entry = entry.next().unwrap(); } } Ok(()) diff --git a/src/porcupine/utils.rs b/src/porcupine/utils.rs index 992d970..de85198 100644 --- a/src/porcupine/utils.rs +++ b/src/porcupine/utils.rs @@ -1,31 +1,76 @@ //! Some data structures utils. use super::model::{Entry, EntryValue, Model}; -use std::{collections::HashMap, ptr::NonNull}; +use std::{ + collections::HashMap, + ops::{Deref, DerefMut}, + ptr::NonNull, +}; -/// a pointer type -pub(super) type MaybeNull = Option>; +/// Pointer type used in the linked list. +#[derive(Debug)] +pub(super) struct Ptr(NonNull); + +impl Ptr { + fn new(item: Box) -> Self { + // SAFETY: pointer is get from a `Box`. + unsafe { Self(NonNull::new_unchecked(Box::into_raw(item))) } + } + + /// Reclaim ownership from the linked list. + /// + /// # Safety + /// + /// The pointer must not be accessed later. + unsafe fn reclaim(self) -> Box { + Box::from_raw(self.0.as_ptr()) + } +} + +impl Copy for Ptr {} + +impl Clone for Ptr { + fn clone(&self) -> Self { + Self(self.0.clone()) + } +} + +impl Deref for Ptr { + type Target = T; + + fn deref(&self) -> &Self::Target { + // SAFETY: `Ptr` initialized with a valid pointer. + unsafe { self.0.as_ref() } + } +} + +impl DerefMut for Ptr { + fn deref_mut(&mut self) -> &mut Self::Target { + // SAFETY: `Ptr` initialized with a valid pointer. + unsafe { self.0.as_mut() } + } +} #[derive(Debug)] pub(super) struct EntryNode { pub(super) value: EntryValue, - matched: MaybeNull>, + matched: Option>>, pub(super) id: usize, - prev: MaybeNull>, - next: MaybeNull>, + prev: Option>>, + next: Option>>, } impl EntryNode { - pub(super) fn ref_mut<'a, 'b>(self: &'a mut Box) -> &'b mut Self { + pub(super) fn get_ref<'a, 'b>(self: &'a Box) -> &'b Self { // SAFETY: self will not be dropped - unsafe { &mut *(self.as_mut() as *mut Self) } + unsafe { &*(self.as_ref() as *const Self) } } - pub(super) fn next_mut(&self) -> Option<&mut EntryNode> { - unsafe { Some(self.next?.as_mut()) } + pub(super) fn next(&self) -> Option<&EntryNode> { + self.next.as_deref() } - pub(super) fn matched_mut(&self) -> Option<&mut EntryNode> { - unsafe { Some(self.matched?.as_mut()) } + pub(super) fn matched(&self) -> Option<&EntryNode> { + self.matched.as_deref() } pub(super) fn unwrap_in(&self) -> &In { @@ -43,105 +88,102 @@ impl EntryNode { } /// detach self and self's match from list - pub(super) fn lift(&mut self) -> (Box>, Box>) { + pub(super) fn lift(&self) -> (Box>, Box>) { assert!(matches!(self.value, EntryValue::Call(_))); - unsafe { - self.prev.unwrap().as_mut().next = self.next; - // since calls and returns are paired, call.next won't be None. - self.next.unwrap().as_mut().prev = self.prev; - let self_box = Box::from_raw(self as _); - let matched = self.matched_mut().unwrap(); - matched.prev.unwrap().as_mut().next = matched.next; - if let Some(mut n) = matched.next { - n.as_mut().prev = matched.prev; - } - (self_box, Box::from_raw(matched as _)) + self.prev.unwrap().next = self.next; + // since calls and returns are paired, call.next won't be None. + self.next.unwrap().prev = self.prev; + // SAFETY: `Box` is used to transfer ownership, the `EntryNode` in it is inaccessible + // from the linked list anymore. + let self_box = unsafe { Box::from_raw(self as *const _ as *mut _) }; + let matched = self.matched().unwrap(); + matched.prev.unwrap().next = matched.next; + if let Some(mut n) = matched.next { + n.prev = matched.prev; } + // SAFETY: `Box` is used to transfer ownership, the `EntryNode` in it is inaccessible + // from the linked list anymore. + let match_box = unsafe { Box::from_raw(matched as *const _ as *mut _) }; + (self_box, match_box) } /// re-attach self and self's match to their original places pub(super) fn unlift(self: Box, matched: Box) { assert!(matches!(self.value, EntryValue::Call(_))); - unsafe { - let self_ptr = NonNull::new_unchecked(Box::into_raw(self)); - let matched_ptr = NonNull::new_unchecked(Box::into_raw(matched)); - matched_ptr.as_ref().prev.unwrap().as_mut().next = Some(matched_ptr); - if let Some(mut n) = matched_ptr.as_ref().next { - n.as_mut().prev = Some(matched_ptr); - } - self_ptr.as_ref().prev.unwrap().as_mut().next = Some(self_ptr); - // since calls and returns are paired, call.next won't be None. - self_ptr.as_ref().next.unwrap().as_mut().prev = Some(self_ptr); + let self_ptr = Ptr::new(self); + let matched_ptr = Ptr::new(matched); + matched_ptr.prev.unwrap().next = Some(matched_ptr); + if let Some(mut n) = matched_ptr.next { + n.prev = Some(matched_ptr); } + self_ptr.prev.unwrap().next = Some(self_ptr); + // since calls and returns are paired, call.next won't be None. + self_ptr.next.unwrap().prev = Some(self_ptr); } } /// A linked list. #[derive(Debug)] pub(super) struct LinkedEntries { - sentinel: NonNull>, + sentinel: Ptr>, } impl LinkedEntries { pub(super) fn new() -> Self { - let node = Box::new(EntryNode { - value: EntryValue::Null, - matched: None, - id: usize::MAX, - prev: None, - next: None, - }); - unsafe { - Self { - sentinel: NonNull::new_unchecked(Box::into_raw(node)), - } + Self { + sentinel: Ptr::new(Box::new(EntryNode { + value: EntryValue::Null, + matched: None, + id: usize::MAX, + prev: None, + next: None, + })), } } pub(super) fn is_empty(&self) -> bool { - unsafe { self.sentinel.as_ref().next.is_none() } + self.sentinel.next.is_none() } - pub(super) fn front_mut(&self) -> Option<&mut EntryNode> { - unsafe { Some(self.sentinel.as_ref().next?.as_mut()) } + pub(super) fn front(&self) -> Option<&EntryNode> { + self.sentinel.next.as_deref() } fn push_front_node( &mut self, node: EntryNode, - matches: &mut HashMap>>, + matches: &mut HashMap>>, ) { assert!(node.prev.is_none()); assert!(node.next.is_none()); - let is_return = matches!(node.value, EntryValue::Return(_)); - unsafe { - let mut node_ptr = NonNull::new_unchecked(Box::into_raw(Box::new(node))); - if is_return { - matches.insert(node_ptr.as_ref().id, node_ptr.clone()); - } - let prev_head_ptr = self.sentinel.as_ref().next; - node_ptr.as_mut().prev = Some(self.sentinel); - node_ptr.as_mut().next = prev_head_ptr; - self.sentinel.as_mut().next = Some(node_ptr); - prev_head_ptr.map(|mut head| head.as_mut().prev = Some(node_ptr)); + let mut node_ptr = Ptr::new(Box::new(node)); + if matches!(node_ptr.value, EntryValue::Return(_)) { + matches.insert(node_ptr.id, node_ptr.clone()); } + let prev_head_ptr = self.sentinel.next; + node_ptr.prev = Some(self.sentinel); + node_ptr.next = prev_head_ptr; + self.sentinel.next = Some(node_ptr); + prev_head_ptr.map(|mut head| head.prev = Some(node_ptr)); } fn pop_front_node(&mut self) -> Option>> { - unsafe { - self.sentinel.as_ref().next.map(|head| { - let node = Box::from_raw(head.as_ptr()); - self.sentinel.as_mut().next = node.next; - node - }) - } + self.sentinel.next.map(|head| { + // SAFETY: used in drop + let node = unsafe { head.reclaim() }; + self.sentinel.next = node.next; + node + }) } } impl Drop for LinkedEntries { fn drop(&mut self) { while self.pop_front_node().is_some() {} - _ = unsafe { Box::from_raw(self.sentinel.as_ptr()) }; + // SAFETY: used in drop + unsafe { + _ = self.sentinel.reclaim(); + } } } @@ -149,7 +191,7 @@ impl From>> for LinkedEntries { fn from(value: Vec>) -> Self { let mut me = LinkedEntries::new(); // id -> return entry of this id - let mut matches: HashMap>> = + let mut matches: HashMap>> = HashMap::with_capacity((value.len() + 1) / 2); for entry in value.into_iter().rev() { let node = match entry.value { From d345384a415b290138956aa752227f705b7879cc Mon Sep 17 00:00:00 2001 From: Jingjia Luo Date: Mon, 5 Jun 2023 22:14:50 +0800 Subject: [PATCH 9/9] refactor: make the linked list totally safe --- src/porcupine/checker.rs | 37 +++--- src/porcupine/utils.rs | 247 +++++++++++++++++++-------------------- 2 files changed, 138 insertions(+), 146 deletions(-) diff --git a/src/porcupine/checker.rs b/src/porcupine/checker.rs index 1f4ccb8..6ce9144 100644 --- a/src/porcupine/checker.rs +++ b/src/porcupine/checker.rs @@ -2,7 +2,7 @@ use crate::porcupine::{ model::EntryValue, - utils::{EntryNode, LinkedEntries}, + utils::{EntryNode, EntryView, LinkedEntries}, Entry, Error, Model, Operation, Result, }; use bit_vec::BitVec; @@ -25,8 +25,7 @@ fn cache_contains(cache: &HashMap>, bv: &BitVec, m: &M) } struct CallEntry { - call: Box>, - ret: Box>, + call: EntryView, state: M, } @@ -45,25 +44,25 @@ fn check_single(history: Vec>, _verbose: bool) -> Result<()> let mut state = M::init(); while !undecided.is_empty() { - if matches!(entry.value, EntryValue::Call(_)) { - debug!("id={} call", entry.id); + if matches!(*entry.borrow().value, EntryValue::Call(_)) { + debug!("id={} call", entry.borrow().id); // the matched return entry - let matched = entry.matched().unwrap(); - let (ok, new_state) = state.step(entry.unwrap_in(), matched.unwrap_out()); + let matched = entry.borrow().matched().unwrap(); + let (ok, new_state) = + state.step(entry.borrow().unwrap_in(), matched.borrow().unwrap_out()); if ok { let mut new_linearized = linearized.clone(); - new_linearized.set(entry.id, true); + new_linearized.set(entry.borrow().id, true); if !cache_contains(&cache, &new_linearized, &new_state) { - debug!("cache miss, push {} into calls", entry.id); - linearized.set(entry.id, true); + debug!("cache miss, push {} into calls", entry.borrow().id); + linearized.set(entry.borrow().id, true); cache .entry(new_linearized) .or_default() .push(new_state.clone()); - let (call, ret) = entry.lift(); + let call = entry.borrow().lift(); calls.push(CallEntry { call, - ret, state: mem::replace(&mut state, new_state), }); if let Some(front) = undecided.front() { @@ -73,31 +72,30 @@ fn check_single(history: Vec>, _verbose: bool) -> Result<()> } } else { // this state is visited before - entry = entry.next().unwrap(); + entry = EntryNode::next(entry).unwrap(); } } else { // call entry has next - entry = entry.next().unwrap(); + entry = EntryNode::next(entry).unwrap(); } } else { // an undecided return found, meaning that a call considered done before this // time point has to be revoked. - debug!("id={} return", entry.id); + debug!("id={} return", entry.borrow().id); if calls.is_empty() { return Err(Error::Illegal(LinearizationInfo {})); } let CallEntry { call, - ret, state: state0, } = calls.pop().unwrap(); debug!("revoke call {}", call.id); state = state0; linearized.set(call.id as _, false); - entry = call.get_ref(); - call.unlift(ret); + // entry = call.get_ref(); + entry = call.unlift(); // call entry has next - entry = entry.next().unwrap(); + entry = EntryNode::next(entry).unwrap(); } } Ok(()) @@ -106,7 +104,6 @@ fn check_single(history: Vec>, _verbose: bool) -> Result<()> pub(super) fn check_operations(history: Vec>, verbose: bool) -> Result<()> { let histories = ::partition(history); for history in histories { - // TODO get linearized prefix under verbose mode check_single(history, verbose)?; } Ok(()) diff --git a/src/porcupine/utils.rs b/src/porcupine/utils.rs index de85198..e7afdb7 100644 --- a/src/porcupine/utils.rs +++ b/src/porcupine/utils.rs @@ -2,137 +2,151 @@ use super::model::{Entry, EntryValue, Model}; use std::{ + cell::RefCell, collections::HashMap, - ops::{Deref, DerefMut}, - ptr::NonNull, + rc::{Rc, Weak}, }; -/// Pointer type used in the linked list. -#[derive(Debug)] -pub(super) struct Ptr(NonNull); - -impl Ptr { - fn new(item: Box) -> Self { - // SAFETY: pointer is get from a `Box`. - unsafe { Self(NonNull::new_unchecked(Box::into_raw(item))) } - } - - /// Reclaim ownership from the linked list. - /// - /// # Safety - /// - /// The pointer must not be accessed later. - unsafe fn reclaim(self) -> Box { - Box::from_raw(self.0.as_ptr()) - } -} - -impl Copy for Ptr {} - -impl Clone for Ptr { - fn clone(&self) -> Self { - Self(self.0.clone()) - } -} - -impl Deref for Ptr { - type Target = T; - - fn deref(&self) -> &Self::Target { - // SAFETY: `Ptr` initialized with a valid pointer. - unsafe { self.0.as_ref() } - } -} +type SharedPtr = Option>>; +type WeakPtr = Option>>; -impl DerefMut for Ptr { - fn deref_mut(&mut self) -> &mut Self::Target { - // SAFETY: `Ptr` initialized with a valid pointer. - unsafe { self.0.as_mut() } - } +#[derive(Debug)] +pub(super) struct EntryView { + value: Rc>, + matched: SharedPtr>, + pub(super) id: usize, + prev: WeakPtr>, + next: WeakPtr>, } #[derive(Debug)] pub(super) struct EntryNode { - pub(super) value: EntryValue, - matched: Option>>, + pub(super) value: Rc>, + matched: WeakPtr>, pub(super) id: usize, - prev: Option>>, - next: Option>>, + prev: WeakPtr>, + next: SharedPtr>, } -impl EntryNode { - pub(super) fn get_ref<'a, 'b>(self: &'a Box) -> &'b Self { - // SAFETY: self will not be dropped - unsafe { &*(self.as_ref() as *const Self) } +impl EntryView { + /// re-attach self and self's match to their original places + pub(super) fn unlift<'a>(self: Self) -> Rc>> { + assert!(matches!(*self.value, EntryValue::Call(_))); + let matched_ptr = self.matched.clone().unwrap(); + let self_ptr = Rc::new(RefCell::new(EntryNode { + value: self.value.clone(), + matched: Some(Rc::downgrade(&matched_ptr)), + id: self.id, + prev: self.prev.clone(), + next: Some(self.next.unwrap().upgrade().unwrap()), + })); + matched_ptr + .as_ref() + .borrow() + .prev + .as_ref() + .unwrap() + .upgrade() + .unwrap() + .borrow_mut() + .next = Some(matched_ptr.clone()); + if let Some(n) = matched_ptr.as_ref().borrow().next.as_ref() { + n.borrow_mut().prev = Some(Rc::downgrade(&matched_ptr)); + } + self_ptr + .as_ref() + .borrow() + .prev + .as_ref() + .unwrap() + .upgrade() + .unwrap() + .borrow_mut() + .next = Some(self_ptr.clone()); + // since calls and returns are paired, call.next won't be None. + self_ptr + .as_ref() + .borrow() + .next + .as_ref() + .unwrap() + .borrow_mut() + .prev = Some(Rc::downgrade(&self_ptr)); + self_ptr } - pub(super) fn next(&self) -> Option<&EntryNode> { - self.next.as_deref() +} + +impl EntryNode { + pub(super) fn next(this: Rc>) -> SharedPtr> { + this.as_ref().borrow().next.clone() } - pub(super) fn matched(&self) -> Option<&EntryNode> { - self.matched.as_deref() + pub(super) fn matched(&self) -> SharedPtr> { + Some(self.matched.as_ref()?.clone().upgrade().unwrap()) } pub(super) fn unwrap_in(&self) -> &In { - match &self.value { - EntryValue::Call(v) => v, + match *self.value { + EntryValue::Call(ref v) => v, _ => panic!("type mismatch"), } } pub(super) fn unwrap_out(&self) -> &Out { - match &self.value { - EntryValue::Return(v) => v, + match *self.value { + EntryValue::Return(ref v) => v, _ => panic!("type mismatch"), } } /// detach self and self's match from list - pub(super) fn lift(&self) -> (Box>, Box>) { - assert!(matches!(self.value, EntryValue::Call(_))); - self.prev.unwrap().next = self.next; + pub(super) fn lift(&self) -> EntryView { + assert!(matches!(*self.value, EntryValue::Call(_))); + self.prev + .as_ref() + .unwrap() + .upgrade() + .unwrap() + .borrow_mut() + .next = self.next.clone(); // since calls and returns are paired, call.next won't be None. - self.next.unwrap().prev = self.prev; - // SAFETY: `Box` is used to transfer ownership, the `EntryNode` in it is inaccessible - // from the linked list anymore. - let self_box = unsafe { Box::from_raw(self as *const _ as *mut _) }; + self.next.as_ref().unwrap().borrow_mut().prev = self.prev.clone(); + let self_view = EntryView { + value: self.value.clone(), + matched: self.matched(), + id: self.id, + prev: self.prev.clone(), + next: Some(Rc::downgrade(self.next.as_ref().unwrap())), + }; let matched = self.matched().unwrap(); - matched.prev.unwrap().next = matched.next; - if let Some(mut n) = matched.next { - n.prev = matched.prev; + matched + .as_ref() + .borrow() + .prev + .as_ref() + .unwrap() + .upgrade() + .unwrap() + .borrow_mut() + .next = matched.borrow().next.clone(); + if let Some(n) = matched.as_ref().borrow().next.as_ref() { + n.borrow_mut().prev = matched.borrow().prev.clone(); } - // SAFETY: `Box` is used to transfer ownership, the `EntryNode` in it is inaccessible - // from the linked list anymore. - let match_box = unsafe { Box::from_raw(matched as *const _ as *mut _) }; - (self_box, match_box) - } - - /// re-attach self and self's match to their original places - pub(super) fn unlift(self: Box, matched: Box) { - assert!(matches!(self.value, EntryValue::Call(_))); - let self_ptr = Ptr::new(self); - let matched_ptr = Ptr::new(matched); - matched_ptr.prev.unwrap().next = Some(matched_ptr); - if let Some(mut n) = matched_ptr.next { - n.prev = Some(matched_ptr); - } - self_ptr.prev.unwrap().next = Some(self_ptr); - // since calls and returns are paired, call.next won't be None. - self_ptr.next.unwrap().prev = Some(self_ptr); + self_view } } /// A linked list. #[derive(Debug)] pub(super) struct LinkedEntries { - sentinel: Ptr>, + sentinel: Rc>>, } impl LinkedEntries { pub(super) fn new() -> Self { Self { - sentinel: Ptr::new(Box::new(EntryNode { - value: EntryValue::Null, + sentinel: Rc::new(RefCell::new(EntryNode { + value: Rc::new(EntryValue::Null), matched: None, id: usize::MAX, prev: None, @@ -142,48 +156,29 @@ impl LinkedEntries { } pub(super) fn is_empty(&self) -> bool { - self.sentinel.next.is_none() + self.sentinel.borrow().next.is_some() } - pub(super) fn front(&self) -> Option<&EntryNode> { - self.sentinel.next.as_deref() + pub(super) fn front(&self) -> SharedPtr> { + self.sentinel.borrow().next.clone() } fn push_front_node( &mut self, node: EntryNode, - matches: &mut HashMap>>, + matches: &mut HashMap>>>, ) { assert!(node.prev.is_none()); assert!(node.next.is_none()); - let mut node_ptr = Ptr::new(Box::new(node)); - if matches!(node_ptr.value, EntryValue::Return(_)) { - matches.insert(node_ptr.id, node_ptr.clone()); - } - let prev_head_ptr = self.sentinel.next; - node_ptr.prev = Some(self.sentinel); - node_ptr.next = prev_head_ptr; - self.sentinel.next = Some(node_ptr); - prev_head_ptr.map(|mut head| head.prev = Some(node_ptr)); - } - - fn pop_front_node(&mut self) -> Option>> { - self.sentinel.next.map(|head| { - // SAFETY: used in drop - let node = unsafe { head.reclaim() }; - self.sentinel.next = node.next; - node - }) - } -} - -impl Drop for LinkedEntries { - fn drop(&mut self) { - while self.pop_front_node().is_some() {} - // SAFETY: used in drop - unsafe { - _ = self.sentinel.reclaim(); + let node_ptr = Rc::new(RefCell::new(node)); + if matches!(*node_ptr.borrow().value, EntryValue::Return(_)) { + matches.insert(node_ptr.borrow().id, Rc::downgrade(&node_ptr)); } + let prev_head_ptr = self.sentinel.borrow().next.clone(); + node_ptr.borrow_mut().prev = Some(Rc::downgrade(&self.sentinel)); + node_ptr.borrow_mut().next = prev_head_ptr.clone(); + self.sentinel.borrow_mut().next = Some(node_ptr.clone()); + prev_head_ptr.map(|head| head.borrow_mut().prev = Some(Rc::downgrade(&node_ptr))); } } @@ -191,25 +186,25 @@ impl From>> for LinkedEntries { fn from(value: Vec>) -> Self { let mut me = LinkedEntries::new(); // id -> return entry of this id - let mut matches: HashMap>> = + let mut matches: HashMap>>> = HashMap::with_capacity((value.len() + 1) / 2); for entry in value.into_iter().rev() { let node = match entry.value { EntryValue::Call(v) => EntryNode { - value: EntryValue::Call(v), - matched: Some(matches[&entry.id]), // call -> return + value: Rc::new(EntryValue::Call(v)), + matched: Some(matches[&entry.id].clone()), // call -> return id: entry.id, prev: None, next: None, }, EntryValue::Return(v) => EntryNode { - value: EntryValue::Return(v), + value: Rc::new(EntryValue::Return(v)), matched: None, id: entry.id, prev: None, next: None, }, - _ => unreachable!("EntryValue::Null is only used in senital"), + _ => unreachable!("EntryValue::Null is only used in sentinel"), }; me.push_front_node(node, &mut matches); }