Skip to content

Commit

Permalink
feat!: Address agnostic p2p (#5176)
Browse files Browse the repository at this point in the history
* refactor: Move address from `PeerId` to `Peer`

Signed-off-by: Dmitry Murzin <[email protected]>

* feat!: Gossip peer addresses

Signed-off-by: Dmitry Murzin <[email protected]>

* Review fixes

Signed-off-by: Dmitry Murzin <[email protected]>

* Review fixes: limit `gossip_peers` to peers from topology

Signed-off-by: Dmitry Murzin <[email protected]>

* Review fixes: use full `public_address` (instead of `external_port`)

Signed-off-by: Dmitry Murzin <[email protected]>

* Store address from each peer independently and use majority rule

Signed-off-by: Dmitry Murzin <[email protected]>

* Fix clippy warnings

Signed-off-by: Dmitry Murzin <[email protected]>

* Review fixes

Signed-off-by: Dmitry Murzin <[email protected]>

---------

Signed-off-by: Dmitry Murzin <[email protected]>
  • Loading branch information
dima74 authored Nov 1, 2024
1 parent 007d2e3 commit f5e5d20
Show file tree
Hide file tree
Showing 49 changed files with 819 additions and 625 deletions.
17 changes: 7 additions & 10 deletions crates/iroha/tests/extra_functional/connected_peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@ use std::iter::once;
use assert_matches::assert_matches;
use eyre::Result;
use futures_util::{stream::FuturesUnordered, StreamExt};
use iroha::data_model::{
isi::{Register, Unregister},
peer::Peer,
};
use iroha::data_model::isi::{Register, Unregister};
use iroha_config_base::toml::WriteExt;
use iroha_test_network::*;
use rand::{prelude::IteratorRandom, seq::SliceRandom, thread_rng};
Expand All @@ -31,12 +28,12 @@ async fn register_new_peer() -> Result<()> {
network
.config()
// only one random peer
.write(["sumeragi", "trusted_peers"], [network.peer().id()]),
.write(["sumeragi", "trusted_peers"], [network.peer().peer()]),
None,
)
.await;

let register = Register::peer(Peer::new(peer.id()));
let register = Register::peer(peer.peer_id());
let client = network.client();
spawn_blocking(move || client.submit_blocking(register)).await??;

Expand All @@ -62,7 +59,7 @@ async fn connected_peers_with_f(faults: usize) -> Result<()> {

// Unregister a peer: committed with f = `faults` then `status.peers` decrements
let client = randomized_peers.choose(&mut thread_rng()).unwrap().client();
let unregister_peer = Unregister::peer(removed_peer.id());
let unregister_peer = Unregister::peer(removed_peer.peer_id());
spawn_blocking(move || client.submit_blocking(unregister_peer)).await??;
timeout(
network.sync_timeout(),
Expand All @@ -81,7 +78,7 @@ async fn connected_peers_with_f(faults: usize) -> Result<()> {
assert_eq!(status.peers, 0);

// Re-register the peer: committed with f = `faults` - 1 then `status.peers` increments
let register_peer = Register::peer(Peer::new(removed_peer.id()));
let register_peer = Register::peer(removed_peer.peer_id());
let client = randomized_peers
.iter()
.choose(&mut thread_rng())
Expand Down Expand Up @@ -112,13 +109,13 @@ async fn assert_peers_status(
status.peers,
expected_peers,
"unexpected peers for {}",
peer.id()
peer.peer_id()
);
assert_eq!(
status.blocks,
expected_blocks,
"expected blocks for {}",
peer.id()
peer.peer_id()
);
})
.collect::<FuturesUnordered<_>>()
Expand Down
11 changes: 3 additions & 8 deletions crates/iroha/tests/extra_functional/offline_peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,8 @@ use eyre::{OptionExt, Result};
use futures_util::stream::{FuturesUnordered, StreamExt};
use iroha::{
crypto::KeyPair,
data_model::{
peer::{Peer as DataModelPeer, PeerId},
prelude::*,
},
data_model::{peer::PeerId, prelude::*},
};
use iroha_primitives::addr::socket_addr;
use iroha_test_network::*;
use iroha_test_samples::ALICE_ID;
use tokio::task::spawn_blocking;
Expand Down Expand Up @@ -66,11 +62,10 @@ async fn register_offline_peer() -> Result<()> {
let network = NetworkBuilder::new().with_peers(N_PEERS).start().await?;
check_status(&network, N_PEERS as u64 - 1).await;

let address = socket_addr!(128.0.0.2:8085);
let key_pair = KeyPair::random();
let public_key = key_pair.public_key().clone();
let peer_id = PeerId::new(address, public_key);
let register_peer = Register::peer(DataModelPeer::new(peer_id));
let peer_id = PeerId::new(public_key);
let register_peer = Register::peer(peer_id);

// Wait for some time to allow peers to connect
let client = network.client();
Expand Down
4 changes: 2 additions & 2 deletions crates/iroha/tests/extra_functional/unregister_peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,14 @@ async fn network_stable_after_add_and_after_remove_peer() -> Result<()> {
network.ensure_blocks(3).await?;
// and a new peer is registered
let new_peer = NetworkPeer::generate();
let new_peer_id = new_peer.id();
let new_peer_id = new_peer.peer_id();
let new_peer_client = new_peer.client();
network.add_peer(&new_peer);
new_peer.start(network.config(), None).await;
{
let client = client.clone();
let id = new_peer_id.clone();
spawn_blocking(move || client.submit_blocking(Register::peer(Peer::new(id)))).await??;
spawn_blocking(move || client.submit_blocking(Register::peer(id))).await??;
}
network.ensure_blocks(4).await?;
// Then the new peer should already have the mint result.
Expand Down
26 changes: 5 additions & 21 deletions crates/iroha_cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use erased_serde::Serialize;
use error_stack::{fmt::ColorMode, IntoReportCompat, ResultExt};
use eyre::{eyre, Error, Result, WrapErr};
use iroha::{client::Client, config::Config, data_model::prelude::*};
use iroha_primitives::{addr::SocketAddr, json::Json};
use iroha_primitives::json::Json;
use thiserror::Error;

/// Re-usable clap `--metadata <PATH>` (`-m`) argument.
Expand Down Expand Up @@ -1033,9 +1033,6 @@ mod peer {
/// Register subcommand of peer
#[derive(clap::Args, Debug)]
pub struct Register {
/// P2P address of the peer e.g. `127.0.0.1:1337`
#[arg(short, long)]
pub address: SocketAddr,
/// Public key of the peer
#[arg(short, long)]
pub key: PublicKey,
Expand All @@ -1045,23 +1042,15 @@ mod peer {

impl RunArgs for Register {
fn run(self, context: &mut dyn RunContext) -> Result<()> {
let Self {
address,
key,
metadata,
} = self;
let register_peer =
iroha::data_model::isi::Register::peer(Peer::new(PeerId::new(address, key)));
let Self { key, metadata } = self;
let register_peer = iroha::data_model::isi::Register::peer(key.into());
submit([register_peer], metadata.load()?, context).wrap_err("Failed to register peer")
}
}

/// Unregister subcommand of peer
#[derive(clap::Args, Debug)]
pub struct Unregister {
/// P2P address of the peer e.g. `127.0.0.1:1337`
#[arg(short, long)]
pub address: SocketAddr,
/// Public key of the peer
#[arg(short, long)]
pub key: PublicKey,
Expand All @@ -1071,13 +1060,8 @@ mod peer {

impl RunArgs for Unregister {
fn run(self, context: &mut dyn RunContext) -> Result<()> {
let Self {
address,
key,
metadata,
} = self;
let unregister_peer =
iroha::data_model::isi::Unregister::peer(PeerId::new(address, key));
let Self { key, metadata } = self;
let unregister_peer = iroha::data_model::isi::Unregister::peer(key.into());
submit([unregister_peer], metadata.load()?, context)
.wrap_err("Failed to unregister peer")
}
Expand Down
1 change: 1 addition & 0 deletions crates/iroha_config/iroha_test_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ private_key = "802620282ED9F3CF92811C3818DBC4AE594ED59DC1A2F78E4241E31924E101D6B

[network]
address = "127.0.0.1:1337"
public_address = "127.0.0.1:1337"

[genesis]
public_key = "ed01204164BF554923ECE1FD412D241036D863A6AE430476C898248B8237D77534CFC4"
Expand Down
31 changes: 19 additions & 12 deletions crates/iroha_config/src/parameters/actual.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ use std::{
use error_stack::{Result, ResultExt};
use iroha_config_base::{read::ConfigReader, toml::TomlSource, util::Bytes, WithOrigin};
use iroha_crypto::{KeyPair, PublicKey};
use iroha_data_model::{peer::PeerId, ChainId};
use iroha_data_model::{
peer::{Peer, PeerId},
ChainId, Identifiable,
};
use iroha_primitives::{addr::SocketAddr, unique_vec::UniqueVec};
use url::Url;
pub use user::{DevTelemetry, Logger, Snapshot};
Expand Down Expand Up @@ -66,14 +69,15 @@ impl Root {
pub struct Common {
pub chain: ChainId,
pub key_pair: KeyPair,
pub peer: PeerId,
pub peer: Peer,
}

/// Network options
#[allow(missing_docs)]
#[derive(Debug, Clone)]
pub struct Network {
pub address: WithOrigin<SocketAddr>,
pub public_address: WithOrigin<SocketAddr>,
pub idle_timeout: Duration,
}

Expand Down Expand Up @@ -125,15 +129,18 @@ pub struct Sumeragi {
#[derive(Debug, Clone)]
#[allow(missing_docs)]
pub struct TrustedPeers {
pub myself: PeerId,
pub others: UniqueVec<PeerId>,
pub myself: Peer,
pub others: UniqueVec<Peer>,
}

impl TrustedPeers {
/// Returns a list of trusted peers which is guaranteed to have at
/// least one element - the id of the peer itself.
pub fn into_non_empty_vec(self) -> UniqueVec<PeerId> {
std::iter::once(self.myself).chain(self.others).collect()
std::iter::once(self.myself)
.chain(self.others)
.map(|peer| peer.id().clone())
.collect()
}

/// Tells whether a trusted peers list has some other peers except for the peer itself
Expand Down Expand Up @@ -197,8 +204,8 @@ mod tests {

use super::*;

fn dummy_id(port: u16) -> PeerId {
PeerId::new(
fn dummy_peer(port: u16) -> Peer {
Peer::new(
socket_addr!(127.0.0.1:port),
KeyPair::random().into_parts().0,
)
Expand All @@ -207,7 +214,7 @@ mod tests {
#[test]
fn no_trusted_peers() {
let value = TrustedPeers {
myself: dummy_id(80),
myself: dummy_peer(80),
others: unique_vec![],
};
assert!(!value.contains_other_trusted_peers());
Expand All @@ -216,17 +223,17 @@ mod tests {
#[test]
fn one_trusted_peer() {
let value = TrustedPeers {
myself: dummy_id(80),
others: unique_vec![dummy_id(81)],
myself: dummy_peer(80),
others: unique_vec![dummy_peer(81)],
};
assert!(value.contains_other_trusted_peers());
}

#[test]
fn many_trusted_peers() {
let value = TrustedPeers {
myself: dummy_id(80),
others: unique_vec![dummy_id(1), dummy_id(2), dummy_id(3), dummy_id(4),],
myself: dummy_peer(80),
others: unique_vec![dummy_peer(1), dummy_peer(2), dummy_peer(3), dummy_peer(4),],
};
assert!(value.contains_other_trusted_peers());
}
Expand Down
24 changes: 15 additions & 9 deletions crates/iroha_config/src/parameters/user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use iroha_config_base::{
ReadConfig, WithOrigin,
};
use iroha_crypto::{PrivateKey, PublicKey};
use iroha_data_model::{peer::PeerId, ChainId};
use iroha_data_model::{peer::Peer, ChainId};
use iroha_primitives::{addr::SocketAddr, unique_vec::UniqueVec};
use serde::Deserialize;
use url::Url;
Expand Down Expand Up @@ -116,24 +116,24 @@ impl Root {
let (torii, live_query_store) = self.torii.parse();
let telemetry = self.telemetry.map(actual::Telemetry::from);

let peer_id = key_pair.as_ref().map(|key_pair| {
PeerId::new(
let peer = key_pair.as_ref().map(|key_pair| {
Peer::new(
network.address.value().clone(),
key_pair.public_key().clone(),
)
});

let sumeragi = peer_id
let sumeragi = peer
.as_ref()
.map(|id| self.sumeragi.parse_and_push_self(id.clone()));
.map(|peer| self.sumeragi.parse_and_push_self(peer.clone()));

emitter.into_result()?;

let key_pair = key_pair.unwrap();
let peer = actual::Common {
chain: self.chain.0,
key_pair,
peer: peer_id.unwrap(),
peer: peer.unwrap(),
};

Ok(actual::Root {
Expand Down Expand Up @@ -226,7 +226,7 @@ pub struct Sumeragi {
}

#[derive(Debug, Deserialize)]
pub struct TrustedPeers(UniqueVec<PeerId>);
pub struct TrustedPeers(UniqueVec<Peer>);

impl FromEnvStr for TrustedPeers {
type Error = json5::Error;
Expand All @@ -246,7 +246,7 @@ impl Default for TrustedPeers {
}

impl Sumeragi {
fn parse_and_push_self(self, self_id: PeerId) -> actual::Sumeragi {
fn parse_and_push_self(self, self_id: Peer) -> actual::Sumeragi {
let Self {
trusted_peers,
debug: SumeragiDebug { force_soft_fork },
Expand All @@ -270,9 +270,13 @@ pub struct SumeragiDebug {

#[derive(Debug, Clone, ReadConfig)]
pub struct Network {
/// Peer-to-peer address
/// Peer-to-peer address (internal, will be used only to bind to it).
#[config(env = "P2P_ADDRESS")]
pub address: WithOrigin<SocketAddr>,
/// Peer-to-peer address (external, as seen by other peers).
/// Will be gossiped to connected peers so that they can gossip it to other peers.
#[config(env = "P2P_PUBLIC_ADDRESS")]
pub public_address: WithOrigin<SocketAddr>,
#[config(default = "defaults::network::BLOCK_GOSSIP_SIZE")]
pub block_gossip_size: NonZeroU32,
#[config(default = "defaults::network::BLOCK_GOSSIP_PERIOD.into()")]
Expand All @@ -296,6 +300,7 @@ impl Network {
) {
let Self {
address,
public_address,
block_gossip_size,
block_gossip_period_ms: block_gossip_period,
transaction_gossip_size,
Expand All @@ -306,6 +311,7 @@ impl Network {
(
actual::Network {
address,
public_address,
idle_timeout: idle_timeout.get(),
},
actual::BlockSync {
Expand Down
Loading

0 comments on commit f5e5d20

Please sign in to comment.