Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: handle connection to multiple upstream peers #85

Merged
merged 2 commits into from
Feb 5, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ cargo run --release -- import --snapshot-dir snapshots

```console
cargo run --release -- import-chain-db \
--peer 127.0.0.1:3000 --starting-point 69638382.5da6ba37a4a07df015c4ea92c880e3600d7f098b97e73816f8df04bbb5fad3b7
--peer-address 127.0.0.1:3000 --starting-point 69638382.5da6ba37a4a07df015c4ea92c880e3600d7f098b97e73816f8df04bbb5fad3b7
```

> [!WARNING]
Expand All @@ -75,7 +75,7 @@ AMARU_DEV_LOG=warn cargo run --release -- daemon \

> [!TIP]
> Replace `--peer-address` with your Cardano node peer address. It can be either
> a local or remote node (i.e. any existing node relay).
> a local or remote node (i.e. any existing node relay), and you can even add multiple peers.

### Monitoring

Expand Down
25 changes: 15 additions & 10 deletions crates/amaru/src/bin/amaru/cmd/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use crate::{config::NetworkName, metrics::track_system_metrics};
use amaru::sync::Config;
use amaru_consensus::consensus::nonce;
use clap::{builder::TypedValueParser as _, Parser};
use clap::{builder::TypedValueParser as _, ArgAction, Parser};
use miette::IntoDiagnostic;
use opentelemetry_sdk::metrics::SdkMeterProvider;
use pallas_network::facades::PeerClient;
Expand All @@ -26,9 +26,12 @@ use tracing::{debug, error};

#[derive(Debug, Parser)]
pub struct Args {
/// Upstream peer address to synchronize from.
#[arg(long)]
peer_address: String,
/// Upstream peer addresses to synchronize from.
///
/// This option can be specified multiple times to connect to multiple peers.
/// At least one peer address must be specified.
#[arg(long, action = ArgAction::Append, required = true)]
peer_address: Vec<String>,

/// The target network to choose from.
#[arg(
Expand Down Expand Up @@ -57,13 +60,15 @@ pub async fn run(args: Args, metrics: Option<SdkMeterProvider>) -> miette::Resul

let metrics = metrics.map(track_system_metrics);

let client = Arc::new(Mutex::new(
PeerClient::connect(config.upstream_peer.clone(), config.network_magic as u64)
let mut clients: Vec<(String, Arc<Mutex<PeerClient>>)> = vec![];
for peer in &config.upstream_peers {
let client = PeerClient::connect(peer.clone(), config.network_magic as u64)
.await
.into_diagnostic()?,
));
.into_diagnostic()?;
clients.push((peer.clone(), Arc::new(Mutex::new(client))));
}

let sync = amaru::sync::bootstrap(config, &client)?;
let sync = amaru::sync::bootstrap(config, clients)?;

let exit = crate::exit::hook_exit_token();

Expand Down Expand Up @@ -105,7 +110,7 @@ fn parse_args(args: Args) -> miette::Result<Config> {
Ok(Config {
ledger_dir: args.ledger_dir,
chain_dir: args.chain_dir,
upstream_peer: args.peer_address,
upstream_peers: args.peer_address,
network_magic: args.network.to_network_magic(),
nonces,
})
Expand Down
55 changes: 37 additions & 18 deletions crates/amaru/src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,16 @@

use amaru_consensus::consensus;
use amaru_consensus::consensus::store::rocksdb::RocksDBStore;

use amaru_consensus::consensus::{
chain_selection::{ChainSelector, ChainSelectorBuilder},
header::{point_hash, ConwayHeader},
store::ChainStore,
};
use amaru_ouroboros::protocol::peer::{Peer, PeerSession};
use amaru_ouroboros::protocol::Point;
use amaru_ouroboros::protocol::{Point, PullEvent};
use amaru_stores::rocksdb::RocksDB;
use gasket::messaging::tokio::funnel_ports;
use gasket::messaging::OutputPort;
use gasket::runtime::Tether;
use pallas_crypto::hash::Hash;
use pallas_network::facades::PeerClient;
Expand All @@ -39,7 +40,7 @@ pub type BlockHash = pallas_crypto::hash::Hash<32>;
pub struct Config {
pub ledger_dir: PathBuf,
pub chain_dir: PathBuf,
pub upstream_peer: String,
pub upstream_peers: Vec<String>,
pub network_magic: u32,
pub nonces: HashMap<Epoch, Hash<32>>,
}
Expand All @@ -62,49 +63,67 @@ fn define_gasket_policy() -> gasket::runtime::Policy {
}
}

pub fn bootstrap(config: Config, client: &Arc<Mutex<PeerClient>>) -> miette::Result<Vec<Tether>> {
pub fn bootstrap(
config: Config,
clients: Vec<(String, Arc<Mutex<PeerClient>>)>,
) -> miette::Result<Vec<Tether>> {
// FIXME: Take from config / command args
let store = RocksDB::new(&config.ledger_dir)
.unwrap_or_else(|e| panic!("unable to open ledger store: {e:?}"));
let (mut ledger, tip) = amaru_ledger::Stage::new(store);

let peer_session = PeerSession {
peer: Peer::new(&config.upstream_peer),
peer_client: client.clone(),
};

let mut pull = pull::Stage::new(peer_session.clone(), vec![tip.clone()]);
let peer_sessions: Vec<PeerSession> = clients
.iter()
.map(|(peer_name, client)| PeerSession {
peer: Peer::new(peer_name),
peer_client: client.clone(),
})
.collect();

let mut pulls = peer_sessions
.iter()
.map(|session| pull::Stage::new(session.clone(), vec![tip.clone()]))
.collect::<Vec<_>>();
Comment on lines +75 to +86
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add some error handling to this peer session setup.

At the moment, if anything goes wrong during peer session creation, it'll be about as graceful as a kangaroo in roller skates.

Consider adding error handling:

-    let peer_sessions: Vec<PeerSession> = clients
-        .iter()
-        .map(|(peer_name, client)| PeerSession {
-            peer: Peer::new(peer_name),
-            peer_client: client.clone(),
-        })
-        .collect();
+    let peer_sessions: miette::Result<Vec<PeerSession>> = clients
+        .iter()
+        .map(|(peer_name, client)| {
+            if peer_name.is_empty() {
+                return Err(miette!("Empty peer name not allowed"));
+            }
+            Ok(PeerSession {
+                peer: Peer::new(peer_name),
+                peer_client: client.clone(),
+            })
+        })
+        .collect();
+    let peer_sessions = peer_sessions?;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let peer_sessions: Vec<PeerSession> = clients
.iter()
.map(|(peer_name, client)| PeerSession {
peer: Peer::new(peer_name),
peer_client: client.clone(),
})
.collect();
let mut pulls = peer_sessions
.iter()
.map(|session| pull::Stage::new(session.clone(), vec![tip.clone()]))
.collect::<Vec<_>>();
let peer_sessions: miette::Result<Vec<PeerSession>> = clients
.iter()
.map(|(peer_name, client)| {
if peer_name.is_empty() {
return Err(miette!("Empty peer name not allowed"));
}
Ok(PeerSession {
peer: Peer::new(peer_name),
peer_client: client.clone(),
})
})
.collect();
let peer_sessions = peer_sessions?;
let mut pulls = peer_sessions
.iter()
.map(|session| pull::Stage::new(session.clone(), vec![tip.clone()]))
.collect::<Vec<_>>();

let chain_store = RocksDBStore::new(config.chain_dir.clone())?;
let chain_selector = make_chain_selector(tip, &chain_store, &[&peer_session]);
let chain_selector = make_chain_selector(tip, &chain_store, &peer_sessions);
let mut consensus = consensus::Stage::new(
peer_session,
peer_sessions,
ledger.state.clone(),
Arc::new(Mutex::new(chain_store)),
chain_selector,
config.nonces,
);

let (to_header_validation, from_pull) = gasket::messaging::tokio::mpsc_channel(50);
let (to_ledger, from_header_validation) = gasket::messaging::tokio::mpsc_channel(50);

pull.downstream.connect(to_header_validation);
consensus.upstream.connect(from_pull);
let outputs: Vec<&mut OutputPort<PullEvent>> = pulls
.iter_mut()
.map(|p| &mut p.downstream)
.collect::<Vec<_>>();
funnel_ports(outputs, &mut consensus.upstream, 50);
consensus.downstream.connect(to_ledger);
ledger.upstream.connect(from_header_validation);

let policy = define_gasket_policy();

let pull = gasket::runtime::spawn_stage(pull, policy.clone());
let mut pulls = pulls
.into_iter()
.map(|p| gasket::runtime::spawn_stage(p, policy.clone()))
.collect::<Vec<_>>();

let header_validation = gasket::runtime::spawn_stage(consensus, policy.clone());
let ledger = gasket::runtime::spawn_stage(ledger, policy.clone());

Ok(vec![pull, header_validation, ledger])
pulls.push(header_validation);
pulls.push(ledger);

Ok(pulls)
}

fn make_chain_selector(
tip: Point,
chain_store: &impl ChainStore<ConwayHeader>,
peers: &[&PeerSession],
peers: &Vec<PeerSession>,
) -> Arc<Mutex<ChainSelector<ConwayHeader>>> {
let mut builder = ChainSelectorBuilder::new();

Expand Down
2 changes: 1 addition & 1 deletion crates/amaru/src/sync/pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ impl Stage {
self.chain_tip.set(tip.0.slot_or_default() as i64);
}

#[instrument(level = Level::DEBUG, skip(self), fields(intersection = self.intersection.last().unwrap().slot_or_default()))]
#[instrument(level = Level::DEBUG, skip(self), fields(peer = ?self.peer_session.peer.name, intersection = self.intersection.last().unwrap().slot_or_default()))]
pub async fn find_intersection(&self) -> Result<(), WorkerError> {
let mut peer_client = self.peer_session.peer_client.lock().await;
let client = (*peer_client).chainsync();
Expand Down
28 changes: 21 additions & 7 deletions crates/consensus/src/consensus/chain_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,12 @@ pub enum ChainSelection<H: Header> {
NoChange,

/// The current best chain has switched to given fork starting at
/// given hash.
SwitchToFork(Point, Vec<H>),
/// given hash, from given Peer.
///
/// FIXME: The peer should not be needed here, as the fork should be
/// comprised of known blocks. It is only needed to download the blocks
/// we don't currently store.
SwitchToFork(Peer, Point, Vec<H>),
Comment on lines 96 to +102
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Consider decoupling block storage from peer information.

The FIXME raises a fair dinkum point! We might want to separate the concerns of block storage and peer management. Perhaps introduce a block cache or local storage service?

Let's check if there's any existing block storage implementation:


🏁 Script executed:

#!/bin/bash
# Search for block storage related code
rg -i "block.*store|store.*block" --type rust

Length of output: 172

Decoupling Peer Information from Block Storage

After a friendly little spelunking through our codebase, the search turned up a solitary reference to block storage—specifically in the ledger rewards (see the count_blocks function). In contrast, our consensus module’s SwitchToFork(Peer, Point, Vec<H>) variant still couples peer information right into the chain switching process. This confirms the architectural concern raised by the FIXME comment: the Peer parameter is being used as a crutch for downloading blocks, rather than relying on a dedicated block storage or caching mechanism.

Recommendations:

  • Refactor the SwitchToFork variant: Remove the Peer dependency if possible, and decouple block storage concerns from peer management.
  • Consider a dedicated block storage mechanism: Explore implementing or reusing a block cache or local storage service, in line with the isolated block storage practices found elsewhere (e.g. within ledger rewards).

Cheers, and let’s give this a proper tidy-up!

}

/// Builder pattern for `ChainSelector`.
Expand Down Expand Up @@ -169,16 +173,22 @@ where
let (best_peer, best_tip) = self.find_best_chain().unwrap();

let result = if best_tip.parent().unwrap() == self.tip.hash() {
info!(target: "amaru::consensus::chain_selection::new_tip", hash = ?header.hash().to_string(), slot = ?header.slot());
ChainSelection::NewTip(header.clone())
} else if best_tip.block_height() > self.tip.block_height() {
let fragment = self.peers_chains.get(&best_peer).unwrap();
ChainSelection::SwitchToFork(fragment.anchor.point(), fragment.headers.clone())
info!(target: "amaru::consensus::chain_selection::switch_to_fork", peer = ?best_peer,
hash = ?best_tip.hash().to_string(), slot = ?best_tip.slot());
ChainSelection::SwitchToFork(
best_peer,
fragment.anchor.point(),
fragment.headers.clone(),
)
} else {
ChainSelection::NoChange
};

if result != ChainSelection::NoChange {
info!(target: "amaru::consensus::chain_selection::new_tip", hash = ?header.hash().to_string(), slot = ?header.slot());
self.tip = header;
}

Expand Down Expand Up @@ -210,7 +220,11 @@ where
let fragment = self.peers_chains.get(&best_peer).unwrap();
// TODO: do not always switch to anchor if there's a better intersection
// with current chain
ChainSelection::SwitchToFork(fragment.anchor.point(), fragment.headers.clone())
ChainSelection::SwitchToFork(
best_peer,
fragment.anchor.point(),
fragment.headers.clone(),
)
};

self.tip = best_tip.clone();
Expand Down Expand Up @@ -328,7 +342,7 @@ mod tests {
.map(|header| chain_selector.roll_forward(&bob, *header))
.last();

assert_eq!(SwitchToFork(Point::Origin, chain2), result.unwrap());
assert_eq!(SwitchToFork(bob, Point::Origin, chain2), result.unwrap());
}

#[test]
Expand Down Expand Up @@ -432,6 +446,6 @@ mod tests {
let rollback_point = &chain1[3];
let result = chain_selector.rollback(&alice, rollback_point.hash());

assert_eq!(SwitchToFork(Point::Origin, chain2), result);
assert_eq!(SwitchToFork(bob, Point::Origin, chain2), result);
}
}
13 changes: 9 additions & 4 deletions crates/consensus/src/consensus/header_validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use pallas_primitives::conway::Epoch;
use std::collections::HashMap;
use tracing::{instrument, warn};

use super::header::ConwayHeader;
use super::header::{ConwayHeader, Header};

#[instrument(skip_all)]
pub fn assert_header<'a>(
Expand All @@ -38,8 +38,13 @@ pub fn assert_header<'a>(
FixedDecimal::from(5u64) / FixedDecimal::from(100u64);
let c = (FixedDecimal::from(1u64) - active_slots_coeff).ln();
let block_validator = BlockValidator::new(header, cbor, ledger, epoch_nonce, &c);
block_validator.validate().or_panic()?;
block_validator
.validate()
.map_err(|e| {
warn!("fail to validate header {}: {:?}", header.hash(), e);
Copy link
Contributor

@KtorZ KtorZ Feb 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
warn!("fail to validate header {}: {:?}", header.hash(), e);
warn!(header.hash = ?header.hash(), validation.error = ?e, "failed_to_validate");

A "best-practice" I've noted from using the tracing library (but which apply to telemetry in general) is to leverage fields as much as possible, and avoid embedding fields/payloads in the log message. So said differently, embrace structured logs/traces ! This is what allows us to have nice JSON logs out-of-the-box 😛

Also by convention for now, I've always used snake_cased names for all traces & spans; really more out of paranoia than out of any legit reason.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Totally agreed, that's why I voluntarily wrote this temporary kludge as a textual log entry so that we spot it more easily and change it to something consistent with the rest.

In general, I think we should only ever need instrument!, any other kind of logs is probably unneeded, or is begging for a function to instrument :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should only ever need instrument!

That's a good rule of thumb regarding how we slice the code, indeed.

})
.or(Ok(())) // FIXME: Remove this once we have a proper error handling
Comment on lines +41 to +46
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Crikey! Let's make this error handling proper sturdy.

While moving from panic to warning is a step in the right direction (like trading Marmite for Vegemite), we could make it even better. The current FIXME suggests this is temporary, but we should consider a more robust approach.

Consider this more comprehensive error handling:

-        block_validator
-            .validate()
-            .map_err(|e| {
-                warn!("fail to validate header {}: {:?}", header.hash(), e);
-            })
-            .or(Ok(())) // FIXME: Remove this once we have a proper error handling
+        block_validator.validate().map_err(|e| {
+            warn!("Header validation failed for {}: {:?}", header.hash(), e);
+            WorkerError::from(miette!("Header validation error: {}", e))
+        })

This way, we:

  1. Maintain proper error propagation
  2. Keep the warning for observability
  3. Convert the error into a proper WorkerError
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
block_validator
.validate()
.map_err(|e| {
warn!("fail to validate header {}: {:?}", header.hash(), e);
})
.or(Ok(())) // FIXME: Remove this once we have a proper error handling
block_validator.validate().map_err(|e| {
warn!("Header validation failed for {}: {:?}", header.hash(), e);
WorkerError::from(miette!("Header validation error: {}", e))
})

} else {
Ok(())
}

Ok(())
}
34 changes: 22 additions & 12 deletions crates/consensus/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub mod store;
#[derive(Stage)]
#[stage(name = "consensus", unit = "PullEvent", worker = "Worker")]
pub struct Stage {
peer_session: PeerSession,
peer_sessions: HashMap<Peer, PeerSession>,
chain_selector: Arc<Mutex<ChainSelector<ConwayHeader>>>,
ledger: Arc<Mutex<dyn LedgerState>>,
store: Arc<Mutex<dyn ChainStore<ConwayHeader>>>,
Expand All @@ -62,14 +62,18 @@ pub struct Stage {

impl Stage {
pub fn new(
peer_session: PeerSession,
peer_sessions: Vec<PeerSession>,
ledger: Arc<Mutex<dyn LedgerState>>,
store: Arc<Mutex<dyn ChainStore<ConwayHeader>>>,
chain_selector: Arc<Mutex<ChainSelector<ConwayHeader>>>,
epoch_to_nonce: HashMap<Epoch, Hash<32>>,
) -> Self {
let peer_sessions = peer_sessions
.into_iter()
.map(|p| (p.peer.clone(), p))
.collect::<HashMap<_, _>>();
Self {
peer_session,
peer_sessions,
chain_selector,
ledger,
store,
Expand All @@ -86,11 +90,16 @@ impl Stage {
self.validation_tip.set(tip.slot_or_default() as i64);
}

async fn forward_block(&mut self, header: &dyn Header) -> Result<(), WorkerError> {
async fn forward_block(&mut self, peer: &Peer, header: &dyn Header) -> Result<(), WorkerError> {
let point = header.point();
let block = {
let mut peer_session = self.peer_session.lock().await;
let client = (*peer_session).blockfetch();
// FIXME: should not crash if the peer is not found
let peer_session = self
.peer_sessions
.get(peer)
.expect("Unknown peer, bailing out");
abailly marked this conversation as resolved.
Show resolved Hide resolved
let mut session = peer_session.peer_client.lock().await;
let client = (*session).blockfetch();
client.fetch_single(point.clone()).await.or_restart()?
};

Expand All @@ -102,6 +111,7 @@ impl Stage {

async fn switch_to_fork(
&mut self,
peer: &Peer,
rollback_point: &Point,
fork: Vec<ConwayHeader>,
) -> Result<(), WorkerError> {
Expand All @@ -111,7 +121,7 @@ impl Stage {
.or_panic()?;

for header in fork {
self.forward_block(&header).await?;
self.forward_block(peer, &header).await?;
}

Ok(())
Expand Down Expand Up @@ -152,16 +162,16 @@ impl Stage {

match result {
chain_selection::ChainSelection::NewTip(hdr) => {
self.forward_block(&hdr).await?;
self.forward_block(peer, &hdr).await?;

self.block_count.inc(1);
self.track_validation_tip(point);
}
chain_selection::ChainSelection::RollbackTo(_) => {
panic!("RollbackTo should never happen on a RollForward")
}
chain_selection::ChainSelection::SwitchToFork(rollback_point, fork) => {
self.switch_to_fork(&rollback_point, fork).await?;
chain_selection::ChainSelection::SwitchToFork(peer, rollback_point, fork) => {
self.switch_to_fork(&peer, &rollback_point, fork).await?;
}
chain_selection::ChainSelection::NoChange => (),
}
Expand Down Expand Up @@ -189,8 +199,8 @@ impl Stage {
self.track_validation_tip(rollback);
}
chain_selection::ChainSelection::NoChange => (),
chain_selection::ChainSelection::SwitchToFork(rollback_point, fork) => {
self.switch_to_fork(&rollback_point, fork).await?
chain_selection::ChainSelection::SwitchToFork(peer, rollback_point, fork) => {
self.switch_to_fork(&peer, &rollback_point, fork).await?
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/ouroboros/src/protocol/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use tokio::sync::Mutex;
/// A single peer in the network, with a unique identifier.
#[derive(Debug, PartialEq, Eq, Hash, Clone)]
pub struct Peer {
name: String,
pub name: String,
}

impl Peer {
Expand Down
Loading