Skip to content

Commit

Permalink
Merge pull request #4 from ourzora/erik/dat-97-peer-limit
Browse files Browse the repository at this point in the history
[DAT-97] Implement peer limit
  • Loading branch information
erikreppel authored Mar 25, 2024
2 parents b324d13 + 507de8a commit b87e43c
Show file tree
Hide file tree
Showing 8 changed files with 170 additions and 88 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ clap = { version = "4.5.2", features = ["derive"] }
envconfig = "0.10.0"
colored = "2.1.0"
sqlx = { version = "0.7.4", features = ["runtime-tokio", "macros", "sqlite", "postgres", "any"] }
rand = "0.8.5"


[profile.dev.package.sqlx-macros]
Expand Down
3 changes: 3 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ pub struct Config {

#[envconfig(from = "PRUNE_MINTED_PREMINTS", default = "true")]
pub prune_minted_premints: bool,

#[envconfig(from = "PEER_LIMIT", default = "1000")]
pub peer_limit: u64,
}

impl Config {
Expand Down
2 changes: 1 addition & 1 deletion src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ impl Controller {
P2PEvent::PremintReceived(premint) => {
tracing::debug!(premint = premint.to_json().ok(), "Received premint");

self.validate_and_insert(premint).await;
let _ = self.validate_and_insert(premint).await;
}
}
}
Expand Down
194 changes: 115 additions & 79 deletions src/p2p.rs
Original file line number Diff line number Diff line change
@@ -1,89 +1,94 @@
use crate::config::Config;
use crate::controller::{P2PEvent, SwarmCommand};
use crate::types::{MintpoolNodeInfo, Premint, PremintTypes};
use eyre::WrapErr;
use libp2p::core::ConnectedPoint;
use libp2p::futures::StreamExt;
use libp2p::identity::Keypair;
use libp2p::kad::store::MemoryStore;
use libp2p::kad::Addresses;
use libp2p::multiaddr::Protocol;
use libp2p::swarm::{NetworkBehaviour, NetworkInfo, SwarmEvent};
use libp2p::{gossipsub, kad, noise, tcp, yamux, Multiaddr};
use libp2p::swarm::{ConnectionId, NetworkBehaviour, NetworkInfo, SwarmEvent};
use libp2p::{gossipsub, kad, noise, tcp, yamux, Multiaddr, PeerId};
use std::hash::{DefaultHasher, Hash, Hasher};
use std::time::Duration;
use tokio::select;

pub fn make_swarm_controller(
id_keys: Keypair,
command_receiver: tokio::sync::mpsc::Receiver<SwarmCommand>,
event_sender: tokio::sync::mpsc::Sender<P2PEvent>,
) -> eyre::Result<SwarmController> {
let peer_id = id_keys.public().to_peer_id();
let swarm = libp2p::SwarmBuilder::with_existing_identity(id_keys)
.with_tokio()
.with_tcp(
tcp::Config::default(),
noise::Config::new,
yamux::Config::default,
)?
.with_behaviour(|key| {
let message_id_fn = |message: &gossipsub::Message| {
let mut s = DefaultHasher::new();
message.data.hash(&mut s);
gossipsub::MessageId::from(s.finish().to_string())
};

let mut b = kad::Behaviour::new(peer_id, MemoryStore::new(key.public().to_peer_id()));
b.set_mode(Some(kad::Mode::Server));
let gossipsub_config = gossipsub::ConfigBuilder::default()
.heartbeat_interval(Duration::from_secs(10))
.validation_mode(gossipsub::ValidationMode::Strict)
.message_id_fn(message_id_fn)
.build()
.expect("valid config");

let gs = gossipsub::Behaviour::new(
gossipsub::MessageAuthenticity::Signed(key.clone()),
gossipsub_config,
)
.expect("valid config");

MintpoolBehaviour {
gossipsub: gs,
kad: b,
}
})?
.with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(60)))
.build();

Ok(SwarmController::new(
swarm,
"zora-premints-v2".to_string(),
command_receiver,
event_sender,
))
#[derive(NetworkBehaviour)]
pub struct MintpoolBehaviour {
gossipsub: gossipsub::Behaviour,
kad: kad::Behaviour<MemoryStore>,
}

pub struct SwarmController {
swarm: libp2p::Swarm<MintpoolBehaviour>,
topic_name: String,
command_receiver: tokio::sync::mpsc::Receiver<SwarmCommand>,
event_sender: tokio::sync::mpsc::Sender<P2PEvent>,
max_peers: u64,
local_mode: bool,
}

impl SwarmController {
pub fn new(
swarm: libp2p::Swarm<MintpoolBehaviour>,
topic_name: String,
id_keys: Keypair,
config: &Config,
command_receiver: tokio::sync::mpsc::Receiver<SwarmCommand>,
event_sender: tokio::sync::mpsc::Sender<P2PEvent>,
) -> Self {
let swarm = Self::make_swarm_controller(id_keys).expect("Invalid config for swarm");

Self {
swarm,
topic_name,
command_receiver,
event_sender,
max_peers: config.peer_limit,
local_mode: config.connect_external == false,
}
}

fn make_swarm_controller(id_keys: Keypair) -> eyre::Result<libp2p::Swarm<MintpoolBehaviour>> {
let peer_id = id_keys.public().to_peer_id();
let swarm = libp2p::SwarmBuilder::with_existing_identity(id_keys)
.with_tokio()
.with_tcp(
tcp::Config::default(),
noise::Config::new,
yamux::Config::default,
)?
.with_behaviour(|key| {
let message_id_fn = |message: &gossipsub::Message| {
let mut s = DefaultHasher::new();
message.data.hash(&mut s);
gossipsub::MessageId::from(s.finish().to_string())
};

let mut b =
kad::Behaviour::new(peer_id, MemoryStore::new(key.public().to_peer_id()));
b.set_mode(Some(kad::Mode::Server));
let gossipsub_config = gossipsub::ConfigBuilder::default()
.heartbeat_interval(Duration::from_secs(10))
.validation_mode(gossipsub::ValidationMode::Strict)
.message_id_fn(message_id_fn)
.build()
.expect("valid config");

let gs = gossipsub::Behaviour::new(
gossipsub::MessageAuthenticity::Signed(key.clone()),
gossipsub_config,
)
.expect("valid config");

MintpoolBehaviour {
gossipsub: gs,
kad: b,
}
})?
.with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(60)))
.build();

Ok(swarm)
}

pub async fn run(&mut self, port: u64, listen_ip: String) -> eyre::Result<()> {
let registry_topic = gossipsub::IdentTopic::new("announce::premints");

Expand Down Expand Up @@ -117,12 +122,14 @@ impl SwarmController {
async fn handle_command(&mut self, command: SwarmCommand) {
tracing::info!("Received command: {:?}", command);
match command {
SwarmCommand::ConnectToPeer { address } => {
let addr: Multiaddr = address.parse().unwrap();
if let Err(err) = self.swarm.dial(addr) {
tracing::error!("Error dialing peer: {:?}", err);
SwarmCommand::ConnectToPeer { address } => match address.parse() {
Ok(addr) => {
self.safe_dial(addr).await;
}
}
Err(err) => {
tracing::warn!("Error parsing address: {:?}", err);
}
},
SwarmCommand::ReturnNetworkState { channel } => {
let network_state = self.make_network_state();
if channel.send(network_state).is_err() {
Expand Down Expand Up @@ -161,10 +168,16 @@ impl SwarmController {
send_back_addr,
} => {
tracing::info!("Incoming connection: {connection_id}, local_addr: {local_addr}, send_back_addr: {send_back_addr}");
self.reject_connection_if_over_max(connection_id);
}
SwarmEvent::ConnectionEstablished {
peer_id, endpoint, ..
peer_id,
endpoint,
connection_id,
..
} => {
self.reject_connection_if_over_max(connection_id);

match endpoint {
ConnectedPoint::Dialer { address, .. } => {
let addr = address;
Expand Down Expand Up @@ -216,7 +229,7 @@ impl SwarmController {
tracing::info!("Dialing: {:?}", peer_id)
}
other => {
tracing::info!("Unhandled swarm event: {:?}", other)
tracing::debug!("Unhandled swarm event: {:?}", other)
}
}
}
Expand Down Expand Up @@ -269,7 +282,7 @@ impl SwarmController {
.parse()
.wrap_err(format!("invalid address found from announce: {}", msg))?;

self.swarm.dial(addr)?;
self.safe_dial(addr).await;
} else {
match serde_json::from_str::<PremintTypes>(&msg) {
Ok(premint) => {
Expand Down Expand Up @@ -323,27 +336,56 @@ impl SwarmController {
Ok(())
}

// Returns True if the connection was rejected
fn reject_connection_if_over_max(&mut self, connection_id: ConnectionId) -> bool {
let state = self.make_network_state();
if self.max_peers < state.network_info.num_peers() as u64 {
tracing::warn!("Max peers reached, rejecting connection",);
self.swarm.close_connection(connection_id);
return true;
}
false
}

async fn safe_dial(&mut self, address: Multiaddr) {
let state = self.make_network_state();
let peers = state.gossipsub_peers.len();
if peers as u64 >= self.max_peers {
tracing::warn!(
peers = peers,
max_peers = self.max_peers,
"Max peers reached, not connecting to peer"
);
return;
}

if state.all_external_addresses.contains(&address) && !self.local_mode {
tracing::warn!("Already connected to peer: {:?}", address);
return;
}

if let Err(err) = self.swarm.dial(address) {
tracing::error!("Error dialing peer: {:?}", err);
}
}

fn make_network_state(&mut self) -> NetworkState {
let dht_peers: Vec<_> = self
.swarm
.behaviour_mut()
.kad
.kbuckets()
.flat_map(|x| {
x.iter()
.map(|x| format!("{:?}", x.node.value))
.collect::<Vec<_>>()
})
.flat_map(|x| x.iter().map(|x| x.node.value.clone()).collect::<Vec<_>>())
.collect();

let my_id = self.swarm.local_peer_id().to_string();
let my_id = *self.swarm.local_peer_id();

let gossipsub_peers = self
.swarm
.behaviour_mut()
.gossipsub
.all_mesh_peers()
.map(|p| p.to_string())
.cloned()
.collect::<Vec<_>>();

NetworkState {
Expand All @@ -356,17 +398,11 @@ impl SwarmController {
}
}

#[derive(NetworkBehaviour)]
pub struct MintpoolBehaviour {
gossipsub: gossipsub::Behaviour,
kad: kad::Behaviour<MemoryStore>,
}

#[derive(Debug)]
pub struct NetworkState {
pub local_peer_id: String,
pub local_peer_id: PeerId,
pub network_info: NetworkInfo,
pub dht_peers: Vec<String>,
pub gossipsub_peers: Vec<String>,
pub dht_peers: Vec<Addresses>,
pub gossipsub_peers: Vec<PeerId>,
pub all_external_addresses: Vec<Multiaddr>,
}
4 changes: 2 additions & 2 deletions src/run.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::config::Config;
use crate::controller::{Controller, ControllerInterface};
use crate::p2p::make_swarm_controller;
use crate::p2p::SwarmController;
use crate::storage::PremintStorage;
use libp2p::identity;

Expand All @@ -18,7 +18,7 @@ pub async fn start_swarm_and_controller(config: &Config) -> eyre::Result<Control

let store = PremintStorage::new(config).await;

let mut swarm_controller = make_swarm_controller(id_keys, swrm_recv, event_send)?;
let mut swarm_controller = SwarmController::new(id_keys, config, swrm_recv, event_send);
let mut controller = Controller::new(swrm_cmd_send, event_recv, ext_cmd_recv, store);
let controller_interface = ControllerInterface::new(ext_cmd_send);

Expand Down
2 changes: 2 additions & 0 deletions src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ mod test {
db_url: None, // in-memory for testing
persist_state: false,
prune_minted_premints: false,
peer_limit: 1000,
};

let store = PremintStorage::new(&config).await;
Expand All @@ -139,6 +140,7 @@ mod test {
db_url: None, // in-memory for testing
persist_state: false,
prune_minted_premints: false,
peer_limit: 1000,
};

let store = PremintStorage::new(&config).await;
Expand Down
Loading

0 comments on commit b87e43c

Please sign in to comment.