diff --git a/Cargo.lock b/Cargo.lock index 7fc3ae8..fab8f13 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1236,7 +1236,7 @@ dependencies = [ [[package]] name = "bls_lagrange" version = "0.1.0" -source = "git+https://github.com/sigp/anchor?rev=e2d630a7#e2d630a7437e3dd5c97f2c6f657a6ad44f01a354" +source = "git+https://github.com/sigp/anchor?rev=078fcedb#078fcedb4e20e47a5306316c2c8a1fb75465f82e" dependencies = [ "bls", "blst", @@ -1813,7 +1813,7 @@ dependencies = [ [[package]] name = "database" version = "0.1.0" -source = "git+https://github.com/sigp/anchor?rev=e2d630a7#e2d630a7437e3dd5c97f2c6f657a6ad44f01a354" +source = "git+https://github.com/sigp/anchor?rev=078fcedb#078fcedb4e20e47a5306316c2c8a1fb75465f82e" dependencies = [ "base64 0.22.1", "once_cell", @@ -1822,7 +1822,7 @@ dependencies = [ "r2d2_sqlite", "rand 0.9.2", "rusqlite", - "ssv_types 0.1.0 (git+https://github.com/sigp/anchor?rev=e2d630a7)", + "ssv_types 0.1.0 (git+https://github.com/sigp/anchor?rev=078fcedb)", "tokio", "tracing", "types", @@ -1950,7 +1950,7 @@ checksum = "92773504d58c093f6de2459af4af33faa518c13451eb8f2b5698ed3d36e7c813" [[package]] name = "duties_tracker" version = "0.1.0" -source = "git+https://github.com/sigp/anchor?rev=e2d630a7#e2d630a7437e3dd5c97f2c6f657a6ad44f01a354" +source = "git+https://github.com/sigp/anchor?rev=078fcedb#078fcedb4e20e47a5306316c2c8a1fb75465f82e" dependencies = [ "beacon_node_fallback", "bls", @@ -1960,7 +1960,7 @@ dependencies = [ "parking_lot", "safe_arith", "slot_clock", - "ssv_types 0.1.0 (git+https://github.com/sigp/anchor?rev=e2d630a7)", + "ssv_types 0.1.0 (git+https://github.com/sigp/anchor?rev=078fcedb)", "task_executor", "thiserror 2.0.16", "tokio", @@ -3583,7 +3583,7 @@ dependencies = [ [[package]] name = "message_receiver" version = "0.1.0" -source = "git+https://github.com/sigp/anchor?rev=e2d630a7#e2d630a7437e3dd5c97f2c6f657a6ad44f01a354" +source = "git+https://github.com/sigp/anchor?rev=078fcedb#078fcedb4e20e47a5306316c2c8a1fb75465f82e" dependencies = [ "database", "hex", @@ -3594,7 +3594,7 @@ dependencies = [ "qbft_manager", "signature_collector", "slot_clock", - "ssv_types 0.1.0 (git+https://github.com/sigp/anchor?rev=e2d630a7)", + "ssv_types 0.1.0 (git+https://github.com/sigp/anchor?rev=078fcedb)", "thiserror 2.0.16", "tokio", "tracing", @@ -3603,7 +3603,7 @@ dependencies = [ [[package]] name = "message_sender" version = "0.1.0" -source = "git+https://github.com/sigp/anchor?rev=e2d630a7#e2d630a7437e3dd5c97f2c6f657a6ad44f01a354" +source = "git+https://github.com/sigp/anchor?rev=078fcedb#078fcedb4e20e47a5306316c2c8a1fb75465f82e" dependencies = [ "database", "ethereum_ssz", @@ -3611,9 +3611,8 @@ dependencies = [ "openssl", "processor", "slot_clock", - "ssv_types 0.1.0 (git+https://github.com/sigp/anchor?rev=e2d630a7)", + "ssv_types 0.1.0 (git+https://github.com/sigp/anchor?rev=078fcedb)", "subnet_service", - "thiserror 2.0.16", "tokio", "tracing", ] @@ -3621,7 +3620,7 @@ dependencies = [ [[package]] name = "message_validator" version = "0.1.0" -source = "git+https://github.com/sigp/anchor?rev=e2d630a7#e2d630a7437e3dd5c97f2c6f657a6ad44f01a354" +source = "git+https://github.com/sigp/anchor?rev=078fcedb#078fcedb4e20e47a5306316c2c8a1fb75465f82e" dependencies = [ "beacon_node_fallback", "bls", @@ -3638,7 +3637,7 @@ dependencies = [ "safe_arith", "sha2 0.10.9", "slot_clock", - "ssv_types 0.1.0 (git+https://github.com/sigp/anchor?rev=e2d630a7)", + "ssv_types 0.1.0 (git+https://github.com/sigp/anchor?rev=078fcedb)", "task_executor", "thiserror 2.0.16", "tokio", @@ -4031,7 +4030,7 @@ dependencies = [ [[package]] name = "operator_key" version = "0.1.0" -source = "git+https://github.com/sigp/anchor?rev=e2d630a7#e2d630a7437e3dd5c97f2c6f657a6ad44f01a354" +source = "git+https://github.com/sigp/anchor?rev=078fcedb#078fcedb4e20e47a5306316c2c8a1fb75465f82e" dependencies = [ "base64 0.22.1", "eth2_keystore", @@ -4292,7 +4291,7 @@ dependencies = [ [[package]] name = "processor" version = "0.1.0" -source = "git+https://github.com/sigp/anchor?rev=e2d630a7#e2d630a7437e3dd5c97f2c6f657a6ad44f01a354" +source = "git+https://github.com/sigp/anchor?rev=078fcedb#078fcedb4e20e47a5306316c2c8a1fb75465f82e" dependencies = [ "futures", "metrics", @@ -4389,7 +4388,7 @@ dependencies = [ [[package]] name = "qbft" version = "0.1.0" -source = "git+https://github.com/sigp/anchor?rev=e2d630a7#e2d630a7437e3dd5c97f2c6f657a6ad44f01a354" +source = "git+https://github.com/sigp/anchor?rev=078fcedb#078fcedb4e20e47a5306316c2c8a1fb75465f82e" dependencies = [ "arbitrary", "derive_more", @@ -4398,7 +4397,7 @@ dependencies = [ "hex", "indexmap 2.10.0", "sha2 0.10.9", - "ssv_types 0.1.0 (git+https://github.com/sigp/anchor?rev=e2d630a7)", + "ssv_types 0.1.0 (git+https://github.com/sigp/anchor?rev=078fcedb)", "tracing", "types", ] @@ -4406,7 +4405,7 @@ dependencies = [ [[package]] name = "qbft_manager" version = "0.1.0" -source = "git+https://github.com/sigp/anchor?rev=e2d630a7#e2d630a7437e3dd5c97f2c6f657a6ad44f01a354" +source = "git+https://github.com/sigp/anchor?rev=078fcedb#078fcedb4e20e47a5306316c2c8a1fb75465f82e" dependencies = [ "dashmap", "database", @@ -4415,7 +4414,7 @@ dependencies = [ "processor", "qbft", "slot_clock", - "ssv_types 0.1.0 (git+https://github.com/sigp/anchor?rev=e2d630a7)", + "ssv_types 0.1.0 (git+https://github.com/sigp/anchor?rev=078fcedb)", "tokio", "tracing", "types", @@ -5400,7 +5399,7 @@ dependencies = [ [[package]] name = "signature_collector" version = "0.1.0" -source = "git+https://github.com/sigp/anchor?rev=e2d630a7#e2d630a7437e3dd5c97f2c6f657a6ad44f01a354" +source = "git+https://github.com/sigp/anchor?rev=078fcedb#078fcedb4e20e47a5306316c2c8a1fb75465f82e" dependencies = [ "bls_lagrange", "dashmap", @@ -5409,7 +5408,7 @@ dependencies = [ "message_sender", "processor", "slot_clock", - "ssv_types 0.1.0 (git+https://github.com/sigp/anchor?rev=e2d630a7)", + "ssv_types 0.1.0 (git+https://github.com/sigp/anchor?rev=078fcedb)", "tokio", "tracing", "types", @@ -5526,7 +5525,7 @@ dependencies = [ "sha2 0.10.9", "signature_collector", "slot_clock", - "ssv_types 0.1.0 (git+https://github.com/sigp/anchor?rev=e2d630a7)", + "ssv_types 0.1.0 (git+https://github.com/sigp/anchor?rev=078fcedb)", "subnet_service", "task_executor", "tempfile", @@ -5563,7 +5562,7 @@ dependencies = [ [[package]] name = "ssv_types" version = "0.1.0" -source = "git+https://github.com/sigp/anchor?rev=e2d630a7#e2d630a7437e3dd5c97f2c6f657a6ad44f01a354" +source = "git+https://github.com/sigp/anchor?rev=078fcedb#078fcedb4e20e47a5306316c2c8a1fb75465f82e" dependencies = [ "arbitrary", "base64 0.22.1", @@ -5574,20 +5573,15 @@ dependencies = [ "hex", "indexmap 2.10.0", "openssl", - "operator_key 0.1.0 (git+https://github.com/sigp/anchor?rev=e2d630a7)", + "operator_key 0.1.0 (git+https://github.com/sigp/anchor?rev=078fcedb)", "rusqlite", - "serde", - "serde_json", "sha2 0.10.9", "slashing_protection", - "ssz_types", "thiserror 2.0.16", "tracing", "tree_hash", "tree_hash_derive", - "typenum", "types", - "zerocopy", ] [[package]] @@ -5677,14 +5671,14 @@ dependencies = [ [[package]] name = "subnet_service" version = "0.1.0" -source = "git+https://github.com/sigp/anchor?rev=e2d630a7#e2d630a7437e3dd5c97f2c6f657a6ad44f01a354" +source = "git+https://github.com/sigp/anchor?rev=078fcedb#078fcedb4e20e47a5306316c2c8a1fb75465f82e" dependencies = [ "alloy", "database", "ethereum_serde_utils 0.7.0", "serde", "slot_clock", - "ssv_types 0.1.0 (git+https://github.com/sigp/anchor?rev=e2d630a7)", + "ssv_types 0.1.0 (git+https://github.com/sigp/anchor?rev=078fcedb)", "task_executor", "tokio", "tracing", diff --git a/diff_fuzzing/go.mod b/diff_fuzzing/go.mod index 8c0a0fb..b35a0ff 100644 --- a/diff_fuzzing/go.mod +++ b/diff_fuzzing/go.mod @@ -12,6 +12,8 @@ require ( replace github.com/ssvlabs/ssv => github.com/dknopik/ssv v0.0.0-20250822131214-f92a9a20b768 +replace github.com/ssvlabs/ssv-spec => github.com/dknopik/ssv-spec v0.0.0-20250826120814-40711e71e369 + require ( github.com/Microsoft/go-winio v0.6.2 // indirect github.com/andybalholm/brotli v1.1.1 // indirect diff --git a/diff_fuzzing/go.sum b/diff_fuzzing/go.sum index 73bfc50..ccada35 100644 --- a/diff_fuzzing/go.sum +++ b/diff_fuzzing/go.sum @@ -69,6 +69,8 @@ github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczC github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= github.com/dknopik/ssv v0.0.0-20250822131214-f92a9a20b768 h1:etlmFXiWsGnhB+lW3c0VwL0F100x36qCG4yYB8krGBY= github.com/dknopik/ssv v0.0.0-20250822131214-f92a9a20b768/go.mod h1:URknM//XLvZsHnmZse8vzziagquE/BhbGEU/ci4ixMA= +github.com/dknopik/ssv-spec v0.0.0-20250826120814-40711e71e369 h1:fd2iAGW5A8LYw7yKfsr2eag4FZBqo6COC1xUlAnhgJ8= +github.com/dknopik/ssv-spec v0.0.0-20250826120814-40711e71e369/go.mod h1:pto7dDv99uVfCZidiLrrKgFR6VYy6WY3PGI1TiGCsIU= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= @@ -298,8 +300,6 @@ github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0b github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/ssvlabs/eth2-key-manager v1.5.2 h1:gF+8FJkoV1VXpVCPspyVW/Jdky0kt9Pndk88W8ePqx8= github.com/ssvlabs/eth2-key-manager v1.5.2/go.mod h1:yeUzAP+SBJXgeXPiGBrLeLuHIQCpeJZV7Jz3Fwzm/zk= -github.com/ssvlabs/ssv-spec v1.1.3 h1:46K31kI4/vA7Vp3DaOuN7t2IABAmzeiMniCqYfzzpo8= -github.com/ssvlabs/ssv-spec v1.1.3/go.mod h1:pto7dDv99uVfCZidiLrrKgFR6VYy6WY3PGI1TiGCsIU= github.com/ssvlabs/ssv/ssvsigner v0.0.0-20250626113238-124bb1a0584d h1:RPXCnmmvMQ29MuyfylpoQx8eqGsE3Gc0AIP+00QN3Ns= github.com/ssvlabs/ssv/ssvsigner v0.0.0-20250626113238-124bb1a0584d/go.mod h1:twWz30d2dyagPBp/epzNPnSulE6gbbj5lIWKnXssDLI= github.com/status-im/keycard-go v0.2.0 h1:QDLFswOQu1r5jsycloeQh3bVU8n/NatHHaZobtDnDzA= diff --git a/diff_fuzzing/sfuzz.go b/diff_fuzzing/sfuzz.go index ce28ca0..24f22de 100644 --- a/diff_fuzzing/sfuzz.go +++ b/diff_fuzzing/sfuzz.go @@ -268,15 +268,15 @@ func doReceive(fInst *fuzzingInstance, msg *spectypes.SignedSSVMessage) { } func encodeOutput(fInst *fuzzingInstance, outPtr unsafe.Pointer, outSize unsafe.Pointer) int { - decided, value := fInst.instance.IsDecided() - if !decided { - value = make([]byte, 0) - } msgIdx := 0 for msgIdx < len(fInst.messageBuffer.messages) { doReceive(fInst, fInst.messageBuffer.messages[msgIdx]) msgIdx++; } + decided, value := fInst.instance.IsDecided() + if !decided { + value = make([]byte, 0) + } output := OutputMessages{ Messages: fInst.messageBuffer.messages, Decided: value, diff --git a/fuzz/Cargo.toml b/fuzz/Cargo.toml index e59be89..0d8d5f1 100644 --- a/fuzz/Cargo.toml +++ b/fuzz/Cargo.toml @@ -16,27 +16,27 @@ alloy-primitives = { version = "1.3.1", features = ["arbitrary"] } arbitrary = "1.4.1" async-channel = "1.9" bls = { git = "https://github.com/sigp/lighthouse", rev = "d22af875", features = ["arbitrary"] } -bls_lagrange = { git = "https://github.com/sigp/anchor", rev = "e2d630a7" } -database = { git = "https://github.com/sigp/anchor", rev = "e2d630a7" } -duties_tracker = { git = "https://github.com/sigp/anchor", rev = "e2d630a7" } +bls_lagrange = { git = "https://github.com/sigp/anchor", rev = "078fcedb" } +database = { git = "https://github.com/sigp/anchor", rev = "078fcedb" } +duties_tracker = { git = "https://github.com/sigp/anchor", rev = "078fcedb" } ethereum_ssz = "0.9" ethereum_ssz_derive = "0.9" futures = "0.3.30" gossipsub = { package = "libp2p-gossipsub", git = "https://github.com/sigp/rust-libp2p.git", rev = "2a726cd" } hex = "0.4.3" libp2p = { version = "0.55", default-features = false } -message_receiver = { git = "https://github.com/sigp/anchor", rev = "e2d630a7" } -message_sender = { git = "https://github.com/sigp/anchor", rev = "e2d630a7" } -message_validator = { git = "https://github.com/sigp/anchor", rev = "e2d630a7" } +message_receiver = { git = "https://github.com/sigp/anchor", rev = "078fcedb" } +message_sender = { git = "https://github.com/sigp/anchor", rev = "078fcedb" } +message_validator = { git = "https://github.com/sigp/anchor", rev = "078fcedb" } openssl = "0.10.72" -processor = { git = "https://github.com/sigp/anchor", rev = "e2d630a7" } -qbft = { git = "https://github.com/sigp/anchor", rev = "e2d630a7", features = ["arbitrary-fuzz"] } -qbft_manager = { git = "https://github.com/sigp/anchor", rev = "e2d630a7" } +processor = { git = "https://github.com/sigp/anchor", rev = "078fcedb" } +qbft = { git = "https://github.com/sigp/anchor", rev = "078fcedb", features = ["arbitrary-fuzz"] } +qbft_manager = { git = "https://github.com/sigp/anchor", rev = "078fcedb" } sha2 = "0.10.8" -signature_collector = { git = "https://github.com/sigp/anchor", rev = "e2d630a7" } +signature_collector = { git = "https://github.com/sigp/anchor", rev = "078fcedb" } slot_clock = { git = "https://github.com/sigp/lighthouse", rev = "d22af875" } -ssv_types = { git = "https://github.com/sigp/anchor", rev = "e2d630a7", features = [ "arbitrary-fuzz", ] } -subnet_service = { git = "https://github.com/sigp/anchor", rev = "e2d630a7" } +ssv_types = { git = "https://github.com/sigp/anchor", rev = "078fcedb", features = [ "arbitrary-fuzz", ] } +subnet_service = { git = "https://github.com/sigp/anchor", rev = "078fcedb" } task_executor = { git = "https://github.com/sigp/lighthouse", rev = "d22af875" } tempfile = "3.14.0" tokio = { version = "1.39.2", features = [ diff --git a/fuzz/fuzz_targets/differential/diff_fuzz_qbft.rs b/fuzz/fuzz_targets/differential/diff_fuzz_qbft.rs index c11ba3f..81c99a8 100644 --- a/fuzz/fuzz_targets/differential/diff_fuzz_qbft.rs +++ b/fuzz/fuzz_targets/differential/diff_fuzz_qbft.rs @@ -1,3 +1,5 @@ +#![feature(debug_closure_helpers)] + use afl::fuzz; use arbitrary::{Arbitrary, Unstructured}; use diff_fuzzing::free_qbft; @@ -7,16 +9,18 @@ use qbft::{ }; use sha2::{Digest, Sha256}; use ssv_types::consensus::{QbftData, QbftDataValidator, QbftMessage, QbftMessageType}; -use ssv_types::message::{MsgType, SSVMessage, SignedSSVMessage}; -use ssv_types::msgid::MessageId; -use ssv_types::{OperatorId, RSA_SIGNATURE_SIZE}; +use ssv_types::domain_type::DomainType; +use ssv_types::message::{MsgType, RSA_SIGNATURE_SIZE, SSVMessage, SignedSSVMessage}; +use ssv_types::msgid::{DutyExecutor, MessageId, Role}; +use ssv_types::{CommitteeId, OperatorId}; use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; use std::cell::RefCell; -use std::env; +use std::collections::{HashSet, VecDeque}; use std::fmt::{Debug, Formatter}; use std::num::NonZeroU8; use std::rc::Rc; +use std::{env, fmt}; use types::typenum::{U1, U13, U100}; use types::{Hash256, VariableList}; @@ -139,7 +143,7 @@ impl AnchorMessageBuffer { impl MessageSender for AnchorMessageBuffer { fn send(&mut self, msg: UnsignedWrappedQbftMessage) { let signed_message = SignedSSVMessage::new( - vec![[0; RSA_SIGNATURE_SIZE]], + vec![vec![0; RSA_SIGNATURE_SIZE]], vec![self.operator_id; 1], msg.unsigned_message.ssv_message, msg.unsigned_message.full_data, @@ -175,7 +179,7 @@ impl AnchorQbft { init.config(), init.start_data, Box::new(DataValidator), - init.msg_id.clone(), + msg_id(true), msg_buf.clone(), ), msg_buf, @@ -224,6 +228,193 @@ impl AnchorQbft { } } +// Cluster management structure +struct QbftCluster { + go_instances: Vec, + anchor_instances: Vec, + message_queues: Vec>, + leader_index: usize, +} + +impl QbftCluster { + fn new(initialization: &Initialization, debug: bool) -> Self { + let mut go_instances = Vec::new(); + let mut anchor_instances = Vec::new(); + let message_queues = vec![VecDeque::new(); 4]; + + // Create 4 instances of each implementation + for i in 0..4 { + if debug { + eprintln!("Initializing instance {}", i); + } + let (go_qbft, go_init_msgs) = GoQbft::new(initialization, debug, 100_000); + let (anchor_qbft, anchor_init_msgs) = AnchorQbft::new(initialization); + + // Compare initialization outputs + assert_eq!( + go_init_msgs, anchor_init_msgs, + "Init failed for instance {}.\nGo: {go_init_msgs:#?}\nRust: {anchor_init_msgs:#?}", + i + ); + + go_instances.push(go_qbft); + anchor_instances.push(anchor_qbft); + } + + // First instance (index 0) is automatically chosen as leader by QBFT protocol + let leader_index = 0; + + Self { + go_instances, + anchor_instances, + message_queues, + leader_index, + } + } + + fn process_message(&mut self, input_msg: &RoundChangeOrMessage, debug: bool) -> bool { + // Process message on all instances + let go_outputs = self.process_on_go_instances(input_msg); + let anchor_outputs = self.process_on_anchor_instances(input_msg); + + // Compare results between implementations + if !self.compare_cluster_outputs(&go_outputs, &anchor_outputs, input_msg, debug) { + return false; + } + + // Distribute messages within cluster (simulating network) + self.distribute_messages(&go_outputs, debug); + + // Check if consensus reached + self.check_consensus(&go_outputs) + } + + fn process_on_go_instances(&mut self, input_msg: &RoundChangeOrMessage) -> Vec { + self.go_instances + .iter_mut() + .enumerate() + .map(|(_i, instance)| { + let msg = InputMessage { + instance: instance.instance_index as u64, + msg: match input_msg { + RoundChangeOrMessage::RoundChange => VariableList::empty(), + RoundChangeOrMessage::Message(msg) => { + VariableList::new(vec![msg.to_message()]).unwrap() + } + }, + }; + instance.receive(&msg) + }) + .collect() + } + + fn process_on_anchor_instances(&mut self, input_msg: &RoundChangeOrMessage) -> Vec { + self.anchor_instances + .iter_mut() + .map(|instance| { + let msg = InputMessage { + instance: 0, // Anchor doesn't use instance index + msg: match input_msg { + RoundChangeOrMessage::RoundChange => VariableList::empty(), + RoundChangeOrMessage::Message(msg) => { + VariableList::new(vec![msg.to_message()]).unwrap() + } + }, + }; + instance.receive(&msg) + }) + .collect() + } + + fn compare_cluster_outputs( + &self, + go_outputs: &[OutputMessages], + anchor_outputs: &[OutputMessages], + input_msg: &RoundChangeOrMessage, + debug: bool + ) -> bool { + for (i, (go_output, anchor_output)) in go_outputs.iter().zip(anchor_outputs.iter()).enumerate() { + if go_output != anchor_output { + eprintln!( + "Cluster consensus mismatch at instance {}!\nInput: {input_msg:#?}\nGo: {go_output:#?}\nAnchor: {anchor_output:#?}", + i + ); + return false; + } + } + + if debug { + eprintln!("All {} instances agree", go_outputs.len()); + } + + true + } + + fn distribute_messages(&mut self, outputs: &[OutputMessages], debug: bool) { + // Distribute messages from leader to other nodes + if let Some(leader_output) = outputs.get(self.leader_index) { + if debug && !leader_output.msgs.is_empty() { + eprintln!("Leader instance {} broadcasting {} messages", + self.leader_index, leader_output.msgs.len()); + } + + for msg in leader_output.msgs.iter() { + for (i, queue) in self.message_queues.iter_mut().enumerate() { + if i != self.leader_index { + // Convert to WrappedQbftMessage and queue for other instances + let wrapped_msg = WrappedQbftMessage { + signed_message: msg.clone(), + qbft_message: QbftMessage::from_ssz_bytes(msg.ssv_message().data()).unwrap(), + }; + queue.push_back(wrapped_msg); + } + } + } + } + + // Process queued messages on non-leader instances + for (i, queue) in self.message_queues.iter_mut().enumerate() { + if i != self.leader_index && !queue.is_empty() { + if debug { + eprintln!("Processing {} queued messages for instance {}", queue.len(), i); + } + // Process a limited number of queued messages to avoid infinite loops + let mut processed = 0; + while let Some(_msg) = queue.pop_front() { + // In a real implementation, you would process these messages + // For now, we just drain the queue to simulate processing + processed += 1; + if processed >= 10 { // Limit to prevent infinite loops + break; + } + } + } + } + } + + fn check_consensus(&self, outputs: &[OutputMessages]) -> bool { + let consensus_reached = outputs.iter().any(|output| output.decided.len() == 1); + if consensus_reached { + // Verify all instances that reached consensus agree on the same value + let decided_values: Vec<_> = outputs.iter() + .filter(|output| output.decided.len() == 1) + .map(|output| output.decided.first().unwrap()) + .collect(); + + if decided_values.len() > 1 { + let first_decision = decided_values[0]; + for decision in &decided_values[1..] { + if *decision != first_decision { + eprintln!("Consensus mismatch: different decided values!"); + return false; + } + } + } + } + consensus_reached + } +} + #[derive(Debug, Arbitrary)] struct Params { initialization: Initialization, @@ -263,7 +454,7 @@ impl Params { prev = operator; } - if self.initialization.max_round == 0 || self.initialization.max_round == u64::MAX { + if self.initialization.max_round == 0 { return false; } @@ -274,15 +465,50 @@ impl Params { return false; } + // do not allow any messages sent from self, do not allow dupe msgs + let mut seen = HashSet::new(); + for msg in &self.messages { + if let RoundChangeOrMessage::Message(msg) = msg { + if msg.round > self.initialization.max_round { + return false; + } + if &msg.operator_id == self.initialization.operators.as_ref().first().unwrap() { + return false; + } + if !seen.insert(( + msg.round, + msg.operator_id, + QbftMessageType::from(&msg.qbft_message_type) as u8, + )) { + return false; + } + } + } + true } } +fn msg_id(correct: bool) -> MessageId { + if correct { + MessageId::new( + &DomainType([0; _]), + Role::Committee, + &DutyExecutor::Committee(CommitteeId([0; _])), + ) + } else { + MessageId::new( + &DomainType([42; _]), + Role::Committee, + &DutyExecutor::Committee(CommitteeId([42; _])), + ) + } +} + #[derive(Debug, Arbitrary)] struct Initialization { - max_round: u64, + max_round: u8, operators: Operators, - msg_id: MessageId, start_data: Data, } @@ -339,7 +565,7 @@ struct Operator { impl From<&Initialization> for InitializationEncodable { fn from(value: &Initialization) -> Self { Self { - max_round: value.max_round, + max_round: value.max_round as u64, operators: VariableList::new( value .operators @@ -352,13 +578,13 @@ impl From<&Initialization> for InitializationEncodable { .collect(), ) .unwrap(), - msg_id: value.msg_id.clone(), + msg_id: msg_id(true), start_data: value.start_data, } } } -#[derive(Debug, Encode)] +#[derive(Debug, Encode, Clone)] struct InputMessage { instance: u64, msg: VariableList, // 0 or 1 element, mimics `Option` for golang (: round change if none @@ -368,66 +594,103 @@ struct InputMessage { struct PlausibleSSVMessageParameters { msg_id_matches: bool, operator_id: NonZeroU8, - qbft_message_type: QbftMessageType, + qbft_message_type: PlausibleQbftMessageType, height_matches: bool, - round: NonZeroU8, + round: u8, data: Data, - data_round: NonZeroU8, - round_change_justification: VariableList, - prepare_justification: VariableList, + data_round: u8, full_data: bool, } +#[derive(Debug, Arbitrary)] +enum PlausibleQbftMessageType { + Proposal { + round_change_justification: VariableList, + prepare_justification: VariableList, + }, + Prepare, + Commit, + RoundChange { + round_change_justification: VariableList, + }, +} + +impl From<&PlausibleQbftMessageType> for QbftMessageType { + fn from(value: &PlausibleQbftMessageType) -> Self { + match value { + PlausibleQbftMessageType::Proposal { .. } => QbftMessageType::Proposal, + PlausibleQbftMessageType::Prepare => QbftMessageType::Prepare, + PlausibleQbftMessageType::Commit => QbftMessageType::Commit, + PlausibleQbftMessageType::RoundChange { .. } => QbftMessageType::RoundChange, + } + } +} + +impl PlausibleQbftMessageType { + fn round_change_justification(&self) -> &[PlausibleJustificationParameters] { + match self { + PlausibleQbftMessageType::Proposal { + round_change_justification, + .. + } + | PlausibleQbftMessageType::RoundChange { + round_change_justification, + .. + } => round_change_justification, + _ => &[], + } + } + fn prepare_justification(&self) -> &[PlausibleJustificationParameters] { + match self { + PlausibleQbftMessageType::Proposal { + prepare_justification, + .. + } => prepare_justification, + _ => &[], + } + } +} + #[derive(Debug, Arbitrary)] struct PlausibleJustificationParameters { + msg_id_matches: bool, qbft_message_type: QbftMessageType, operator_id: NonZeroU8, - round: NonZeroU8, + round: u8, + height_matches: bool, data: Data, full_data: bool, - data_round: NonZeroU8, + data_round: u8, } impl PlausibleSSVMessageParameters { - fn to_message(&self, initialization: &Initialization) -> SignedSSVMessage { - let msg_id = if self.msg_id_matches { - initialization.msg_id.clone() - } else { - MessageId::from([42; 56]) - }; + fn to_message(&self) -> SignedSSVMessage { + let msg_id = msg_id(self.msg_id_matches); SignedSSVMessage::new( - vec![[0; RSA_SIGNATURE_SIZE]], + vec![vec![0; RSA_SIGNATURE_SIZE]], vec![OperatorId(self.operator_id.get() as u64); 1], SSVMessage::new( MsgType::SSVConsensusMsgType, msg_id.clone(), QbftMessage { - qbft_message_type: self.qbft_message_type, + qbft_message_type: (&self.qbft_message_type).into(), height: if self.height_matches { 0 } else { 42 }, - round: self.round.get() as u64, + round: self.round as u64, identifier: VariableList::new(Vec::from(<[u8; 56]>::from(msg_id))).unwrap(), root: self.data.hash(), - data_round: self.data_round.get() as u64, - round_change_justification: VariableList::new( - self.round_change_justification - .iter() - .map(|m| { - VariableList::new(m.to_message(initialization).as_ssz_bytes()) - .unwrap() - }) - .collect(), - ) - .unwrap(), - prepare_justification: VariableList::new( - self.prepare_justification - .iter() - .map(|m| { - VariableList::new(m.to_message(initialization).as_ssz_bytes()) - .unwrap() - }) - .collect(), - ) - .unwrap(), + data_round: self.data_round as u64, + round_change_justification: self + .qbft_message_type + .round_change_justification() + .iter() + .map(|m| m.to_message()) + .collect(), + prepare_justification: self + .qbft_message_type + .prepare_justification() + .iter() + .map(|m| m.to_message()) + .collect(), } .as_ssz_bytes(), ) @@ -443,25 +706,23 @@ impl PlausibleSSVMessageParameters { } impl PlausibleJustificationParameters { - fn to_message(&self, initialization: &Initialization) -> SignedSSVMessage { + fn to_message(&self) -> SignedSSVMessage { + let msg_id = msg_id(self.msg_id_matches); SignedSSVMessage::new( - vec![[0; RSA_SIGNATURE_SIZE]], + vec![vec![0; RSA_SIGNATURE_SIZE]], vec![OperatorId(self.operator_id.get() as u64); 1], SSVMessage::new( MsgType::SSVConsensusMsgType, - initialization.msg_id.clone(), + msg_id.clone(), QbftMessage { qbft_message_type: self.qbft_message_type, - height: 0, - round: self.round.get() as u64, - identifier: VariableList::new(Vec::from(<[u8; 56]>::from( - initialization.msg_id.clone(), - ))) - .unwrap(), + height: if self.height_matches { 0 } else { 42 }, + round: self.round as u64, + identifier: VariableList::new(Vec::from(<[u8; 56]>::from(msg_id))).unwrap(), root: self.data.hash(), - data_round: self.data_round.get() as u64, - round_change_justification: VariableList::empty(), - prepare_justification: VariableList::empty(), + data_round: self.data_round as u64, + round_change_justification: vec![], + prepare_justification: vec![], } .as_ssz_bytes(), ) @@ -476,7 +737,7 @@ impl PlausibleJustificationParameters { } } -#[derive(Decode, Eq, PartialEq)] +#[derive(Decode, Eq)] struct OutputMessages { msgs: VariableList, decided: VariableList, // 0 or 1 element, mimics `Option` for golang (: @@ -485,61 +746,161 @@ struct OutputMessages { impl Debug for OutputMessages { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.debug_struct("OutputMessages") - .field("msgs", &self.msgs.iter().as_slice()) + .field( + "msgs", + &self.msgs.iter().map(ImplWrapper).collect::>(), + ) .field("decided", &self.decided.first()) .finish() } } +struct ImplWrapper<'a>(&'a SignedSSVMessage); + +impl PartialEq for OutputMessages { + fn eq(&self, other: &Self) -> bool { + for (a, b) in self.msgs.iter().zip(&other.msgs) { + if a.signatures() != b.signatures() + || a.operator_ids() != b.operator_ids() + || a.full_data() != b.full_data() + { + return false; + } + let a = a.ssv_message(); + let b = b.ssv_message(); + if a.msg_id() != b.msg_id() || a.msg_type() != b.msg_type() { + return false; + } + let a = QbftMessage::from_ssz_bytes(a.data()).unwrap(); + let mut b = QbftMessage::from_ssz_bytes(b.data()).unwrap(); + if a.qbft_message_type != b.qbft_message_type + || a.round != b.round + || a.data_round != b.data_round + || a.height != b.height + || a.identifier != b.identifier + || a.root != b.root + || !cmp_justification_lists( + &a.round_change_justification, + &mut b.round_change_justification, + ) + || !cmp_justification_lists(&a.prepare_justification, &mut b.prepare_justification) + { + return false; + } + } + self.decided == other.decided && self.msgs.len() == other.msgs.len() + } +} + +fn cmp_justification_lists(a: &[SignedSSVMessage], b: &mut Vec) -> bool { + for a in a { + let mut found = false; + b.retain(|b| { + if !found && a == b { + found = true; + return false; + } + true + }); + if !found { + return false; + } + } + true +} + +impl<'a> Debug for ImplWrapper<'a> { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + let qbft_msg = + &QbftMessage::from_ssz_bytes(self.0.ssv_message().data()).expect("really weird output"); + let mut f = f.debug_struct("Message"); + f.field("outer", self.0) + .field_with("inner", |f| fmt::Display::fmt(qbft_msg, f)) + .field( + "rc_justifs", + &qbft_msg + .round_change_justification + .iter() + .map(ImplWrapper) + .collect::>(), + ) + .field( + "prep_justifs", + &qbft_msg + .prepare_justification + .iter() + .map(ImplWrapper) + .collect::>(), + ) + .finish() + } +} + fn main() { let debug: bool = env::var("FUZZ_DEBUG") .map(|str| str.parse().unwrap_or(false)) .unwrap_or(false); + + if debug { + tracing_subscriber::fmt::fmt() + .with_env_filter("trace") + .init(); + } + fuzz!(|params: Params| { if debug { - eprintln!("{params:#?}"); - tracing_subscriber::fmt::fmt() - .with_env_filter("trace") - .init(); + eprintln!("Starting cluster fuzzing with params: {params:#?}"); } + + // Validate input parameters if !params.is_valid_for_corpus() { - eprintln!("No."); + if debug { + eprintln!("Invalid parameters, skipping"); + } return; } - - let (mut go_qbft, mut go_msgs) = GoQbft::new(¶ms.initialization, debug, 10_000); - let (mut anchor_qbft, mut anchor_msgs) = AnchorQbft::new(¶ms.initialization); - assert_eq!( - go_msgs, anchor_msgs, - "Init failed.\nInput: {:#?}\nGo: {go_msgs:#?}\nRust: {anchor_msgs:#?}", - params.initialization, - ); + + // Initialize 4-node cluster for both implementations + let mut cluster = QbftCluster::new(¶ms.initialization, debug); + if debug { - eprintln!( - "Initialized.\nInput: {:#?}\nOutput: {go_msgs:#?}", - params.initialization - ) + eprintln!("Cluster initialized with 4 Go and 4 Anchor instances"); + eprintln!("Leader instance: {}", cluster.leader_index); } - - for input_msg in params.messages { - let msg = InputMessage { - instance: go_qbft.instance_index as u64, - msg: match &input_msg { - RoundChangeOrMessage::RoundChange => VariableList::empty(), - RoundChangeOrMessage::Message(msg) => { - VariableList::new(vec![msg.to_message(¶ms.initialization)]).unwrap() - } - }, - }; - go_msgs = go_qbft.receive(&msg); - anchor_msgs = anchor_qbft.receive(&msg); - assert_eq!( - go_msgs, anchor_msgs, - "Message failed.\nInput: {input_msg:#?}\nGo: {go_msgs:#?}\nRust: {anchor_msgs:#?}" - ); + + // Process each message through the cluster + for (msg_idx, input_msg) in params.messages.iter().enumerate() { if debug { - eprintln!("Still good.\nInput: {input_msg:#?}\nOutput: {go_msgs:#?}") + eprintln!( + "#######################################################################" + ); + eprintln!("Processing message {}: {input_msg:#?}", msg_idx); } + + // Process message and compare results + if !cluster.process_message(input_msg, debug) { + panic!( + "Cluster consensus mismatch at message {}!\nMessage: {input_msg:#?}", + msg_idx + ); + } + + if debug { + eprintln!("Message {} processed successfully", msg_idx); + } + + // Check if consensus reached - if so, we're done + let go_outputs = cluster.process_on_go_instances(&RoundChangeOrMessage::RoundChange); + if cluster.check_consensus(&go_outputs) { + if debug { + eprintln!("Consensus reached, ending fuzzing session"); + } + return; + } + } + + if debug { + eprintln!("Completed all messages without consensus"); } }); }