Skip to content

Commit

Permalink
feat: handle connection to multiple upstream peers
Browse files Browse the repository at this point in the history
Signed-off-by: Arnaud Bailly <[email protected]>
  • Loading branch information
abailly committed Feb 5, 2025
1 parent e69bcd8 commit 76b760c
Show file tree
Hide file tree
Showing 7 changed files with 98 additions and 50 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ AMARU_DEV_LOG=warn cargo run --release -- daemon \

> [!TIP]
> Replace `--peer-address` with your Cardano node peer address. It can be either
> a local or remote node (i.e. any existing node relay).
> a local or remote node (i.e. any existing node relay), and you can even add multiple peers.
### Monitoring

Expand Down
25 changes: 15 additions & 10 deletions crates/amaru/src/bin/amaru/cmd/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use crate::{config::NetworkName, metrics::track_system_metrics};
use amaru::sync::Config;
use amaru_consensus::consensus::nonce;
use clap::{builder::TypedValueParser as _, Parser};
use clap::{builder::TypedValueParser as _, ArgAction, Parser};
use miette::IntoDiagnostic;
use opentelemetry_sdk::metrics::SdkMeterProvider;
use pallas_network::facades::PeerClient;
Expand All @@ -26,9 +26,12 @@ use tracing::{debug, error};

#[derive(Debug, Parser)]
pub struct Args {
/// Upstream peer address to synchronize from.
#[arg(long)]
peer_address: String,
/// Upstream peer addresses to synchronize from.
///
/// This option can be specified multiple times to connect to multiple peers.
/// At least one peer address must be specified.
#[arg(long, action = ArgAction::Append, required = true)]
peer_address: Vec<String>,

/// The target network to choose from.
#[arg(
Expand Down Expand Up @@ -57,13 +60,15 @@ pub async fn run(args: Args, metrics: Option<SdkMeterProvider>) -> miette::Resul

let metrics = metrics.map(track_system_metrics);

let client = Arc::new(Mutex::new(
PeerClient::connect(config.upstream_peer.clone(), config.network_magic as u64)
let mut clients: Vec<(String, Arc<Mutex<PeerClient>>)> = vec![];
for peer in &config.upstream_peers {
let client = PeerClient::connect(peer.clone(), config.network_magic as u64)
.await
.into_diagnostic()?,
));
.into_diagnostic()?;
clients.push((peer.clone(), Arc::new(Mutex::new(client))));
}

let sync = amaru::sync::bootstrap(config, &client)?;
let sync = amaru::sync::bootstrap(config, clients)?;

let exit = crate::exit::hook_exit_token();

Expand Down Expand Up @@ -105,7 +110,7 @@ fn parse_args(args: Args) -> miette::Result<Config> {
Ok(Config {
ledger_dir: args.ledger_dir,
chain_dir: args.chain_dir,
upstream_peer: args.peer_address,
upstream_peers: args.peer_address,
network_magic: args.network.to_network_magic(),
nonces,
})
Expand Down
55 changes: 37 additions & 18 deletions crates/amaru/src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,16 @@

use amaru_consensus::consensus;
use amaru_consensus::consensus::store::rocksdb::RocksDBStore;

use amaru_consensus::consensus::{
chain_selection::{ChainSelector, ChainSelectorBuilder},
header::{point_hash, ConwayHeader},
store::ChainStore,
};
use amaru_ouroboros::protocol::peer::{Peer, PeerSession};
use amaru_ouroboros::protocol::Point;
use amaru_ouroboros::protocol::{Point, PullEvent};
use amaru_stores::rocksdb::RocksDB;
use gasket::messaging::tokio::funnel_ports;
use gasket::messaging::OutputPort;
use gasket::runtime::Tether;
use pallas_crypto::hash::Hash;
use pallas_network::facades::PeerClient;
Expand All @@ -39,7 +40,7 @@ pub type BlockHash = pallas_crypto::hash::Hash<32>;
pub struct Config {
pub ledger_dir: PathBuf,
pub chain_dir: PathBuf,
pub upstream_peer: String,
pub upstream_peers: Vec<String>,
pub network_magic: u32,
pub nonces: HashMap<Epoch, Hash<32>>,
}
Expand All @@ -62,49 +63,67 @@ fn define_gasket_policy() -> gasket::runtime::Policy {
}
}

pub fn bootstrap(config: Config, client: &Arc<Mutex<PeerClient>>) -> miette::Result<Vec<Tether>> {
pub fn bootstrap(
config: Config,
clients: Vec<(String, Arc<Mutex<PeerClient>>)>,
) -> miette::Result<Vec<Tether>> {
// FIXME: Take from config / command args
let store = RocksDB::new(&config.ledger_dir)
.unwrap_or_else(|e| panic!("unable to open ledger store: {e:?}"));
let (mut ledger, tip) = amaru_ledger::Stage::new(store);

let peer_session = PeerSession {
peer: Peer::new(&config.upstream_peer),
peer_client: client.clone(),
};

let mut pull = pull::Stage::new(peer_session.clone(), vec![tip.clone()]);
let peer_sessions: Vec<PeerSession> = clients
.iter()
.map(|(peer_name, client)| PeerSession {
peer: Peer::new(peer_name),
peer_client: client.clone(),
})
.collect();

let mut pulls = peer_sessions
.iter()
.map(|session| pull::Stage::new(session.clone(), vec![tip.clone()]))
.collect::<Vec<_>>();
let chain_store = RocksDBStore::new(config.chain_dir.clone())?;
let chain_selector = make_chain_selector(tip, &chain_store, &[&peer_session]);
let chain_selector = make_chain_selector(tip, &chain_store, &peer_sessions);
let mut consensus = consensus::Stage::new(
peer_session,
peer_sessions,
ledger.state.clone(),
Arc::new(Mutex::new(chain_store)),
chain_selector,
config.nonces,
);

let (to_header_validation, from_pull) = gasket::messaging::tokio::mpsc_channel(50);
let (to_ledger, from_header_validation) = gasket::messaging::tokio::mpsc_channel(50);

pull.downstream.connect(to_header_validation);
consensus.upstream.connect(from_pull);
let outputs: Vec<&mut OutputPort<PullEvent>> = pulls
.iter_mut()
.map(|p| &mut p.downstream)
.collect::<Vec<_>>();
funnel_ports(outputs, &mut consensus.upstream, 50);
consensus.downstream.connect(to_ledger);
ledger.upstream.connect(from_header_validation);

let policy = define_gasket_policy();

let pull = gasket::runtime::spawn_stage(pull, policy.clone());
let mut pulls = pulls
.into_iter()
.map(|p| gasket::runtime::spawn_stage(p, policy.clone()))
.collect::<Vec<_>>();

let header_validation = gasket::runtime::spawn_stage(consensus, policy.clone());
let ledger = gasket::runtime::spawn_stage(ledger, policy.clone());

Ok(vec![pull, header_validation, ledger])
pulls.push(header_validation);
pulls.push(ledger);

Ok(pulls)
}

fn make_chain_selector(
tip: Point,
chain_store: &impl ChainStore<ConwayHeader>,
peers: &[&PeerSession],
peers: &Vec<PeerSession>,
) -> Arc<Mutex<ChainSelector<ConwayHeader>>> {
let mut builder = ChainSelectorBuilder::new();

Expand Down
2 changes: 1 addition & 1 deletion crates/amaru/src/sync/pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ impl Stage {
self.chain_tip.set(tip.0.slot_or_default() as i64);
}

#[instrument(level = Level::DEBUG, skip(self), fields(intersection = self.intersection.last().unwrap().slot_or_default()))]
#[instrument(level = Level::DEBUG, skip(self), fields(peer = ?self.peer_session.peer.name, intersection = self.intersection.last().unwrap().slot_or_default()))]
pub async fn find_intersection(&self) -> Result<(), WorkerError> {
let mut peer_client = self.peer_session.peer_client.lock().await;
let client = (*peer_client).chainsync();
Expand Down
28 changes: 21 additions & 7 deletions crates/consensus/src/consensus/chain_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,12 @@ pub enum ChainSelection<H: Header> {
NoChange,

/// The current best chain has switched to given fork starting at
/// given hash.
SwitchToFork(Point, Vec<H>),
/// given hash, from given Peer.
///
/// FIXME: The peer should not be needed here, as the fork should be
/// comprised of known blocks. It is only needed to download the blocks
/// we don't currently store.
SwitchToFork(Peer, Point, Vec<H>),
}

/// Builder pattern for `ChainSelector`.
Expand Down Expand Up @@ -169,16 +173,22 @@ where
let (best_peer, best_tip) = self.find_best_chain().unwrap();

let result = if best_tip.parent().unwrap() == self.tip.hash() {
info!(target: "amaru::consensus::chain_selection::new_tip", hash = ?header.hash().to_string(), slot = ?header.slot());
ChainSelection::NewTip(header.clone())
} else if best_tip.block_height() > self.tip.block_height() {
let fragment = self.peers_chains.get(&best_peer).unwrap();
ChainSelection::SwitchToFork(fragment.anchor.point(), fragment.headers.clone())
info!(target: "amaru::consensus::chain_selection::switch_to_fork", peer = ?best_peer,
hash = ?best_tip.hash().to_string(), slot = ?best_tip.slot());
ChainSelection::SwitchToFork(
best_peer,
fragment.anchor.point(),
fragment.headers.clone(),
)
} else {
ChainSelection::NoChange
};

if result != ChainSelection::NoChange {
info!(target: "amaru::consensus::chain_selection::new_tip", hash = ?header.hash().to_string(), slot = ?header.slot());
self.tip = header;
}

Expand Down Expand Up @@ -210,7 +220,11 @@ where
let fragment = self.peers_chains.get(&best_peer).unwrap();
// TODO: do not always switch to anchor if there's a better intersection
// with current chain
ChainSelection::SwitchToFork(fragment.anchor.point(), fragment.headers.clone())
ChainSelection::SwitchToFork(
best_peer,
fragment.anchor.point(),
fragment.headers.clone(),
)
};

self.tip = best_tip.clone();
Expand Down Expand Up @@ -328,7 +342,7 @@ mod tests {
.map(|header| chain_selector.roll_forward(&bob, *header))
.last();

assert_eq!(SwitchToFork(Point::Origin, chain2), result.unwrap());
assert_eq!(SwitchToFork(bob, Point::Origin, chain2), result.unwrap());
}

#[test]
Expand Down Expand Up @@ -432,6 +446,6 @@ mod tests {
let rollback_point = &chain1[3];
let result = chain_selector.rollback(&alice, rollback_point.hash());

assert_eq!(SwitchToFork(Point::Origin, chain2), result);
assert_eq!(SwitchToFork(bob, Point::Origin, chain2), result);
}
}
34 changes: 22 additions & 12 deletions crates/consensus/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub mod store;
#[derive(Stage)]
#[stage(name = "consensus", unit = "PullEvent", worker = "Worker")]
pub struct Stage {
peer_session: PeerSession,
peer_sessions: HashMap<Peer, PeerSession>,
chain_selector: Arc<Mutex<ChainSelector<ConwayHeader>>>,
ledger: Arc<Mutex<dyn LedgerState>>,
store: Arc<Mutex<dyn ChainStore<ConwayHeader>>>,
Expand All @@ -62,14 +62,18 @@ pub struct Stage {

impl Stage {
pub fn new(
peer_session: PeerSession,
peer_sessions: Vec<PeerSession>,
ledger: Arc<Mutex<dyn LedgerState>>,
store: Arc<Mutex<dyn ChainStore<ConwayHeader>>>,
chain_selector: Arc<Mutex<ChainSelector<ConwayHeader>>>,
epoch_to_nonce: HashMap<Epoch, Hash<32>>,
) -> Self {
let peer_sessions = peer_sessions
.into_iter()
.map(|p| (p.peer.clone(), p))
.collect::<HashMap<_, _>>();
Self {
peer_session,
peer_sessions,
chain_selector,
ledger,
store,
Expand All @@ -86,11 +90,16 @@ impl Stage {
self.validation_tip.set(tip.slot_or_default() as i64);
}

async fn forward_block(&mut self, header: &dyn Header) -> Result<(), WorkerError> {
async fn forward_block(&mut self, peer: &Peer, header: &dyn Header) -> Result<(), WorkerError> {
let point = header.point();
let block = {
let mut peer_session = self.peer_session.lock().await;
let client = (*peer_session).blockfetch();
// FIXME: should not crash if the peer is not found
let peer_session = self
.peer_sessions
.get(peer)
.expect("Unknown peer, bailing out");
let mut session = peer_session.peer_client.lock().await;
let client = (*session).blockfetch();
client.fetch_single(point.clone()).await.or_restart()?
};

Expand All @@ -102,6 +111,7 @@ impl Stage {

async fn switch_to_fork(
&mut self,
peer: &Peer,
rollback_point: &Point,
fork: Vec<ConwayHeader>,
) -> Result<(), WorkerError> {
Expand All @@ -111,7 +121,7 @@ impl Stage {
.or_panic()?;

for header in fork {
self.forward_block(&header).await?;
self.forward_block(peer, &header).await?;
}

Ok(())
Expand Down Expand Up @@ -152,16 +162,16 @@ impl Stage {

match result {
chain_selection::ChainSelection::NewTip(hdr) => {
self.forward_block(&hdr).await?;
self.forward_block(peer, &hdr).await?;

self.block_count.inc(1);
self.track_validation_tip(point);
}
chain_selection::ChainSelection::RollbackTo(_) => {
panic!("RollbackTo should never happen on a RollForward")
}
chain_selection::ChainSelection::SwitchToFork(rollback_point, fork) => {
self.switch_to_fork(&rollback_point, fork).await?;
chain_selection::ChainSelection::SwitchToFork(peer, rollback_point, fork) => {
self.switch_to_fork(&peer, &rollback_point, fork).await?;
}
chain_selection::ChainSelection::NoChange => (),
}
Expand Down Expand Up @@ -189,8 +199,8 @@ impl Stage {
self.track_validation_tip(rollback);
}
chain_selection::ChainSelection::NoChange => (),
chain_selection::ChainSelection::SwitchToFork(rollback_point, fork) => {
self.switch_to_fork(&rollback_point, fork).await?
chain_selection::ChainSelection::SwitchToFork(peer, rollback_point, fork) => {
self.switch_to_fork(&peer, &rollback_point, fork).await?
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/ouroboros/src/protocol/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use tokio::sync::Mutex;
/// A single peer in the network, with a unique identifier.
#[derive(Debug, PartialEq, Eq, Hash, Clone)]
pub struct Peer {
name: String,
pub name: String,
}

impl Peer {
Expand Down

0 comments on commit 76b760c

Please sign in to comment.