From ff16cfc5436fbb792bce12ec0b78fa2f6b80354a Mon Sep 17 00:00:00 2001 From: Arnaud Bailly Date: Sun, 2 Feb 2025 09:53:03 +0100 Subject: [PATCH] feat: handle connection to multiple upstream peers Signed-off-by: Arnaud Bailly --- crates/amaru/src/bin/amaru/cmd/daemon.rs | 25 ++++--- crates/amaru/src/sync/mod.rs | 75 ++++++++++++------- crates/amaru/src/sync/pull.rs | 2 +- .../src/consensus/chain_selection.rs | 28 +++++-- crates/consensus/src/consensus/mod.rs | 34 ++++++--- crates/ouroboros/src/protocol/peer.rs | 2 +- 6 files changed, 109 insertions(+), 57 deletions(-) diff --git a/crates/amaru/src/bin/amaru/cmd/daemon.rs b/crates/amaru/src/bin/amaru/cmd/daemon.rs index fa715b8..bb6d52a 100644 --- a/crates/amaru/src/bin/amaru/cmd/daemon.rs +++ b/crates/amaru/src/bin/amaru/cmd/daemon.rs @@ -15,7 +15,7 @@ use crate::{config::NetworkName, metrics::track_system_metrics}; use amaru::sync::Config; use amaru_consensus::consensus::nonce; -use clap::{builder::TypedValueParser as _, Parser}; +use clap::{builder::TypedValueParser as _, ArgAction, Parser}; use miette::IntoDiagnostic; use opentelemetry_sdk::metrics::SdkMeterProvider; use pallas_network::facades::PeerClient; @@ -26,9 +26,12 @@ use tracing::{debug, error}; #[derive(Debug, Parser)] pub struct Args { - /// Upstream peer address to synchronize from. - #[arg(long)] - peer_address: String, + /// Upstream peer addresses to synchronize from. + /// + /// This option can be specified multiple times to connect to multiple peers. + /// At least one peer address must be specified. + #[arg(long, action = ArgAction::Append, required = true)] + peer_address: Vec, /// The target network to choose from. #[arg( @@ -57,13 +60,15 @@ pub async fn run(args: Args, metrics: Option) -> miette::Resul let metrics = metrics.map(track_system_metrics); - let client = Arc::new(Mutex::new( - PeerClient::connect(config.upstream_peer.clone(), config.network_magic as u64) + let mut clients: Vec<(String, Arc>)> = vec![]; + for peer in &config.upstream_peers { + let client = PeerClient::connect(peer.clone(), config.network_magic as u64) .await - .into_diagnostic()?, - )); + .into_diagnostic()?; + clients.push((peer.clone(), Arc::new(Mutex::new(client)))); + } - let sync = amaru::sync::bootstrap(config, &client)?; + let sync = amaru::sync::bootstrap(config, clients)?; let exit = crate::exit::hook_exit_token(); @@ -105,7 +110,7 @@ fn parse_args(args: Args) -> miette::Result { Ok(Config { ledger_dir: args.ledger_dir, chain_database_path: args.chain_database_path, - upstream_peer: args.peer_address, + upstream_peers: args.peer_address, network_magic: args.network.to_network_magic(), nonces, }) diff --git a/crates/amaru/src/sync/mod.rs b/crates/amaru/src/sync/mod.rs index 553b16f..4c2b153 100644 --- a/crates/amaru/src/sync/mod.rs +++ b/crates/amaru/src/sync/mod.rs @@ -12,18 +12,23 @@ // See the License for the specific language governing permissions and // limitations under the License. -use amaru_consensus::consensus; -use amaru_consensus::consensus::store::rocksdb::RocksDBStore; - -use amaru_consensus::consensus::{ - chain_selection::{ChainSelector, ChainSelectorBuilder}, - header::{point_hash, ConwayHeader}, - store::ChainStore, +use amaru_consensus::{ + consensus, + consensus::{ + chain_selection::{ChainSelector, ChainSelectorBuilder}, + header::{point_hash, ConwayHeader}, + store::{rocksdb::RocksDBStore, ChainStore}, + }, +}; +use amaru_ouroboros::protocol::{ + peer::{Peer, PeerSession}, + Point, PullEvent, }; -use amaru_ouroboros::protocol::peer::{Peer, PeerSession}; -use amaru_ouroboros::protocol::Point; use amaru_stores::rocksdb::RocksDB; -use gasket::runtime::Tether; +use gasket::{ + messaging::{tokio::funnel_ports, OutputPort}, + runtime::Tether, +}; use pallas_crypto::hash::Hash; use pallas_network::facades::PeerClient; use pallas_primitives::conway::Epoch; @@ -39,7 +44,7 @@ pub type BlockHash = pallas_crypto::hash::Hash<32>; pub struct Config { pub ledger_dir: PathBuf, pub chain_database_path: PathBuf, - pub upstream_peer: String, + pub upstream_peers: Vec, pub network_magic: u32, pub nonces: HashMap>, } @@ -62,49 +67,67 @@ fn define_gasket_policy() -> gasket::runtime::Policy { } } -pub fn bootstrap(config: Config, client: &Arc>) -> miette::Result> { +pub fn bootstrap( + config: Config, + clients: Vec<(String, Arc>)>, +) -> miette::Result> { // FIXME: Take from config / command args let store = RocksDB::new(&config.ledger_dir) .unwrap_or_else(|e| panic!("unable to open ledger store: {e:?}")); let (mut ledger, tip) = amaru_ledger::Stage::new(store); - let peer_session = PeerSession { - peer: Peer::new(&config.upstream_peer), - peer_client: client.clone(), - }; - - let mut pull = pull::Stage::new(peer_session.clone(), vec![tip.clone()]); + let peer_sessions: Vec = clients + .iter() + .map(|(peer_name, client)| PeerSession { + peer: Peer::new(peer_name), + peer_client: client.clone(), + }) + .collect(); + + let mut pulls = peer_sessions + .iter() + .map(|session| pull::Stage::new(session.clone(), vec![tip.clone()])) + .collect::>(); let chain_store = RocksDBStore::new(config.chain_database_path.clone())?; - let chain_selector = make_chain_selector(tip, &chain_store, &[&peer_session]); + let chain_selector = make_chain_selector(tip, &chain_store, &peer_sessions); let mut consensus = consensus::Stage::new( - peer_session, + peer_sessions, ledger.state.clone(), Arc::new(Mutex::new(chain_store)), chain_selector, config.nonces, ); - let (to_header_validation, from_pull) = gasket::messaging::tokio::mpsc_channel(50); let (to_ledger, from_header_validation) = gasket::messaging::tokio::mpsc_channel(50); - pull.downstream.connect(to_header_validation); - consensus.upstream.connect(from_pull); + let outputs: Vec<&mut OutputPort> = pulls + .iter_mut() + .map(|p| &mut p.downstream) + .collect::>(); + funnel_ports(outputs, &mut consensus.upstream, 50); consensus.downstream.connect(to_ledger); ledger.upstream.connect(from_header_validation); let policy = define_gasket_policy(); - let pull = gasket::runtime::spawn_stage(pull, policy.clone()); + let mut pulls = pulls + .into_iter() + .map(|p| gasket::runtime::spawn_stage(p, policy.clone())) + .collect::>(); + let header_validation = gasket::runtime::spawn_stage(consensus, policy.clone()); let ledger = gasket::runtime::spawn_stage(ledger, policy.clone()); - Ok(vec![pull, header_validation, ledger]) + pulls.push(header_validation); + pulls.push(ledger); + + Ok(pulls) } fn make_chain_selector( tip: Point, chain_store: &impl ChainStore, - peers: &[&PeerSession], + peers: &Vec, ) -> Arc>> { let mut builder = ChainSelectorBuilder::new(); diff --git a/crates/amaru/src/sync/pull.rs b/crates/amaru/src/sync/pull.rs index b128702..551993c 100644 --- a/crates/amaru/src/sync/pull.rs +++ b/crates/amaru/src/sync/pull.rs @@ -66,7 +66,7 @@ impl Stage { self.chain_tip.set(tip.0.slot_or_default() as i64); } - #[instrument(skip(self), fields(intersection = self.intersection.last().unwrap().slot_or_default()))] + #[instrument(skip(self), fields(peer = ?self.peer_session.peer.name, intersection = self.intersection.last().unwrap().slot_or_default()))] pub async fn find_intersection(&self) -> Result<(), WorkerError> { let mut peer_client = self.peer_session.peer_client.lock().await; let client = (*peer_client).chainsync(); diff --git a/crates/consensus/src/consensus/chain_selection.rs b/crates/consensus/src/consensus/chain_selection.rs index 034ba64..aa9150e 100644 --- a/crates/consensus/src/consensus/chain_selection.rs +++ b/crates/consensus/src/consensus/chain_selection.rs @@ -94,8 +94,12 @@ pub enum ChainSelection { NoChange, /// The current best chain has switched to given fork starting at - /// given hash. - SwitchToFork(Point, Vec), + /// given hash, from given Peer. + /// + /// FIXME: The peer should not be needed here, as the fork should be + /// comprised of known blocks. It is only needed to download the blocks + /// we don't currently store. + SwitchToFork(Peer, Point, Vec), } /// Builder pattern for `ChainSelector`. @@ -169,16 +173,22 @@ where let (best_peer, best_tip) = self.find_best_chain().unwrap(); let result = if best_tip.parent().unwrap() == self.tip.hash() { + info!(target: "amaru::consensus::chain_selection::new_tip", hash = ?header.hash().to_string(), slot = ?header.slot()); ChainSelection::NewTip(header.clone()) } else if best_tip.block_height() > self.tip.block_height() { let fragment = self.peers_chains.get(&best_peer).unwrap(); - ChainSelection::SwitchToFork(fragment.anchor.point(), fragment.headers.clone()) + info!(target: "amaru::consensus::chain_selection::switch_to_fork", peer = ?best_peer, + hash = ?best_tip.hash().to_string(), slot = ?best_tip.slot()); + ChainSelection::SwitchToFork( + best_peer, + fragment.anchor.point(), + fragment.headers.clone(), + ) } else { ChainSelection::NoChange }; if result != ChainSelection::NoChange { - info!(target: "amaru::consensus::chain_selection::new_tip", hash = ?header.hash().to_string(), slot = ?header.slot()); self.tip = header; } @@ -210,7 +220,11 @@ where let fragment = self.peers_chains.get(&best_peer).unwrap(); // TODO: do not always switch to anchor if there's a better intersection // with current chain - ChainSelection::SwitchToFork(fragment.anchor.point(), fragment.headers.clone()) + ChainSelection::SwitchToFork( + best_peer, + fragment.anchor.point(), + fragment.headers.clone(), + ) }; self.tip = best_tip.clone(); @@ -328,7 +342,7 @@ mod tests { .map(|header| chain_selector.roll_forward(&bob, *header)) .last(); - assert_eq!(SwitchToFork(Point::Origin, chain2), result.unwrap()); + assert_eq!(SwitchToFork(bob, Point::Origin, chain2), result.unwrap()); } #[test] @@ -432,6 +446,6 @@ mod tests { let rollback_point = &chain1[3]; let result = chain_selector.rollback(&alice, rollback_point.hash()); - assert_eq!(SwitchToFork(Point::Origin, chain2), result); + assert_eq!(SwitchToFork(bob, Point::Origin, chain2), result); } } diff --git a/crates/consensus/src/consensus/mod.rs b/crates/consensus/src/consensus/mod.rs index 46e89ab..f26ede1 100644 --- a/crates/consensus/src/consensus/mod.rs +++ b/crates/consensus/src/consensus/mod.rs @@ -41,7 +41,7 @@ pub mod store; #[derive(Stage)] #[stage(name = "consensus", unit = "PullEvent", worker = "Worker")] pub struct Stage { - peer_session: PeerSession, + peer_sessions: HashMap, chain_selector: Arc>>, ledger: Arc>, store: Arc>>, @@ -62,14 +62,18 @@ pub struct Stage { impl Stage { pub fn new( - peer_session: PeerSession, + peer_sessions: Vec, ledger: Arc>, store: Arc>>, chain_selector: Arc>>, epoch_to_nonce: HashMap>, ) -> Self { + let peer_sessions = peer_sessions + .into_iter() + .map(|p| (p.peer.clone(), p)) + .collect::>(); Self { - peer_session, + peer_sessions, chain_selector, ledger, store, @@ -86,11 +90,16 @@ impl Stage { self.validation_tip.set(tip.slot_or_default() as i64); } - async fn forward_block(&mut self, header: &dyn Header) -> Result<(), WorkerError> { + async fn forward_block(&mut self, peer: &Peer, header: &dyn Header) -> Result<(), WorkerError> { let point = header.point(); let block = { - let mut peer_session = self.peer_session.lock().await; - let client = (*peer_session).blockfetch(); + // FIXME: should not crash if the peer is not found + let peer_session = self + .peer_sessions + .get(peer) + .expect("Unknown peer, bailing out"); + let mut session = peer_session.peer_client.lock().await; + let client = (*session).blockfetch(); client.fetch_single(point.clone()).await.or_restart()? }; @@ -102,6 +111,7 @@ impl Stage { async fn switch_to_fork( &mut self, + peer: &Peer, rollback_point: &Point, fork: Vec, ) -> Result<(), WorkerError> { @@ -111,7 +121,7 @@ impl Stage { .or_panic()?; for header in fork { - self.forward_block(&header).await?; + self.forward_block(peer, &header).await?; } Ok(()) @@ -152,7 +162,7 @@ impl Stage { match result { chain_selection::ChainSelection::NewTip(hdr) => { - self.forward_block(&hdr).await?; + self.forward_block(peer, &hdr).await?; self.block_count.inc(1); self.track_validation_tip(point); @@ -160,8 +170,8 @@ impl Stage { chain_selection::ChainSelection::RollbackTo(_) => { panic!("RollbackTo should never happen on a RollForward") } - chain_selection::ChainSelection::SwitchToFork(rollback_point, fork) => { - self.switch_to_fork(&rollback_point, fork).await?; + chain_selection::ChainSelection::SwitchToFork(peer, rollback_point, fork) => { + self.switch_to_fork(&peer, &rollback_point, fork).await?; } chain_selection::ChainSelection::NoChange => (), } @@ -189,8 +199,8 @@ impl Stage { self.track_validation_tip(rollback); } chain_selection::ChainSelection::NoChange => (), - chain_selection::ChainSelection::SwitchToFork(rollback_point, fork) => { - self.switch_to_fork(&rollback_point, fork).await? + chain_selection::ChainSelection::SwitchToFork(peer, rollback_point, fork) => { + self.switch_to_fork(&peer, &rollback_point, fork).await? } } diff --git a/crates/ouroboros/src/protocol/peer.rs b/crates/ouroboros/src/protocol/peer.rs index 0176b4d..e1dfb6b 100644 --- a/crates/ouroboros/src/protocol/peer.rs +++ b/crates/ouroboros/src/protocol/peer.rs @@ -20,7 +20,7 @@ use tokio::sync::Mutex; /// A single peer in the network, with a unique identifier. #[derive(Debug, PartialEq, Eq, Hash, Clone)] pub struct Peer { - name: String, + pub name: String, } impl Peer {