Skip to content

Commit

Permalink
Move the node context to another file (#56)
Browse files Browse the repository at this point in the history
The node.rs file is over 1k lines, it would be better if we can reduce
this a little. This commit moves all the node context data out of
node.rs and puts the constants inside NodeContext.
This is also useful, because we can fine-tune those constants by Context
  • Loading branch information
Davidson-Souza authored Jun 15, 2023
1 parent f13dfd5 commit cf25472
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 76 deletions.
1 change: 1 addition & 0 deletions src/blockchain/p2p_blockchain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
pub mod address_man;
pub mod mempool;
pub mod node;
pub mod node_context;
pub mod node_interface;
pub mod peer;
pub mod stream_reader;
130 changes: 54 additions & 76 deletions src/blockchain/p2p_blockchain/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use super::{
address_man::{AddressMan, LocalAddress},
mempool::Mempool,
node_context::{IBDNode, NodeContext, RunningNode},
node_interface::{NodeInterface, NodeResponse, PeerInfo, UserRequest},
peer::{Peer, PeerMessages, Version},
};
Expand Down Expand Up @@ -38,34 +39,11 @@ use std::{
time::{Duration, Instant, SystemTime, UNIX_EPOCH},
};

/// Max number of simultaneous connections we initiates we are willing to hold
const MAX_OUTGOING_PEERS: usize = 10;
/// We ask for peers every ASK_FOR_PEERS_INTERVAL seconds
const ASK_FOR_PEERS_INTERVAL: u64 = 60; // One minute
/// Save our database of peers every PEER_DB_DUMP_INTERVAL seconds
const PEER_DB_DUMP_INTERVAL: u64 = 60 * 5; // 5 minutes
/// Attempt to open a new connection (if needed) every TRY_NEW_CONNECTION seconds
const TRY_NEW_CONNECTION: u64 = 30; // 30 seconds
/// If ASSUME_STALE seconds passed since our last tip update, treat it as stale
const ASSUME_STALE: u64 = 30 * 60; // 30 minutes
/// While on IBD, if we've been without blocks for this long, ask for headers again
const IBD_REQUEST_BLOCKS_AGAIN: u64 = 10; // 10 seconds
/// How often we broadcast transactions
const BROADCAST_DELAY: u64 = 30; // 30 seconds
/// Wait up to this many seconds for a peer to respond to a request
const PEER_REQUEST_TIMEOUT: u64 = 30 * 60; // 30 minutes FIXME: This is too long
/// Max number of simultaneous inflight requests we allow
const MAX_INFLIGHT_REQUESTS: usize = 1_000;
/// Interval at which we open new feeler connections
const FEELER_INTERVAL: u64 = 60 * 5; // 5 minutes
/// Interval at which we rearrange our addresses
const ADDRESS_REARRANGE_INTERVAL: u64 = 60 * 60; // 1 hour
/// How long we ban a peer for
const BAN_TIME: u64 = 60 * 60 * 24;
#[derive(Debug)]
pub enum NodeNotification {
FromPeer(u32, PeerMessages),
}

#[derive(Debug, Clone, PartialEq, Hash)]
/// Sent from node to peers, usually to request something
pub enum NodeRequest {
Expand Down Expand Up @@ -96,7 +74,6 @@ enum InflightRequests {
Headers,
Blocks(BlockHash),
RescanBlock(BlockHash),
Addresses,
UserRequest(UserRequest),
Connect(u32),
}
Expand All @@ -115,7 +92,7 @@ pub struct LocalPeerView {
}

#[derive(Debug, PartialEq, Clone, Copy)]
enum RescanStatus {
pub enum RescanStatus {
InProgress(u32),
Completed(Instant),
None,
Expand Down Expand Up @@ -147,14 +124,18 @@ pub struct NodeCommon {
/// It also holds the channels to communicate with peers and the block downloader.
/// The node is the central task that runs and handles important events, such as new blocks,
/// peer connection/disconnection, new addresses, etc.
pub struct UtreexoNode<Context>(NodeCommon, Context);
impl<T> Deref for UtreexoNode<T> {
pub struct UtreexoNode<Context: 'static + Default + NodeContext>(NodeCommon, Context);

// To reduce keystrokes while using node, assuming that we'll predominantly use the inner
// NodeCommon, we implement deref and deref mut as a passthrough of self.0, so self.something
// is an alias to self.0.something
impl<Context: 'static + Default + NodeContext> Deref for UtreexoNode<Context> {
fn deref(&self) -> &Self::Target {
&self.0
}
type Target = NodeCommon;
}
impl<T> DerefMut for UtreexoNode<T> {
impl<Context: 'static + Default + NodeContext> DerefMut for UtreexoNode<Context> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
Expand All @@ -165,18 +146,6 @@ enum PeerStatus {
Ready,
ShutingDown,
}
#[derive(Debug, Default, Clone)]
pub struct IBDNode {
blocks: HashMap<BlockHash, UtreexoBlock>,
}

#[derive(Debug, Clone)]
pub struct RunningNode {
last_rescan_request: RescanStatus,
last_feeler: Instant,
last_address_rearrange: Instant,
user_requests: Arc<NodeInterface>,
}

impl Default for RunningNode {
fn default() -> Self {
Expand All @@ -191,7 +160,7 @@ impl Default for RunningNode {
}
}

impl<T: 'static + Default> UtreexoNode<T> {
impl<T: 'static + Default + NodeContext> UtreexoNode<T> {
pub fn new(
chain: Arc<ChainState<KvChainStore>>,
mempool: Arc<RwLock<Mempool>>,
Expand Down Expand Up @@ -249,7 +218,8 @@ impl<T: 'static + Default> UtreexoNode<T> {
);
Ok(())
}
fn get_peer_info(&self, peer: &LocalPeerView) -> Option<PeerInfo> {
fn get_peer_info(&self, peer: &u32) -> Option<PeerInfo> {
let peer = self.peers.get(peer)?;
Some(PeerInfo {
address: format!("{}:{}", peer.address, peer.port),
services: peer.services.to_string(),
Expand Down Expand Up @@ -290,6 +260,7 @@ impl<T: 'static + Default> UtreexoNode<T> {
.has(ServiceFlags::NETWORK | ServiceFlags::WITNESS)
{
self.send_to_peer(peer, NodeRequest::Shutdown).await?;
self.peers.remove(&peer);
return Ok(());
}
peer_data.state = PeerStatus::Ready;
Expand Down Expand Up @@ -383,7 +354,7 @@ impl<T: 'static + Default> UtreexoNode<T> {
let mut timed_out = vec![];
for request in self.inflight.keys() {
let (_, time) = self.inflight.get(request).unwrap();
if time.elapsed() > Duration::from_secs(PEER_REQUEST_TIMEOUT) {
if time.elapsed() > Duration::from_secs(T::REQUEST_TIMEOUT) {
timed_out.push(request.clone());
}
}
Expand All @@ -397,19 +368,13 @@ impl<T: 'static + Default> UtreexoNode<T> {
InflightRequests::Blocks(block) | InflightRequests::RescanBlock(block) => {
to_request.push(block)
}
InflightRequests::Addresses => {
let locator = self.chain.get_block_locator()?;
self.send_to_random_peer(NodeRequest::GetHeaders(locator), ServiceFlags::NONE)
.await?;
self.last_get_address_request = Instant::now();
}
InflightRequests::Headers => {
self.send_to_random_peer(NodeRequest::GetAddresses, ServiceFlags::NONE)
.await?;
self.last_headers_request = Instant::now();
}
InflightRequests::UserRequest(_) => {}
InflightRequests::Connect(_) => {
InflightRequests::Connect(peer) => {
self.send_to_peer(peer, NodeRequest::Shutdown).await?
}
}
Expand Down Expand Up @@ -438,6 +403,7 @@ impl<T: 'static + Default> UtreexoNode<T> {
}
let idx = if required_services.has(ServiceFlags::NODE_UTREEXO) {
if self.utreexo_peers.is_empty() {
self.state = NodeState::WaitingPeer;
return Err(BlockchainError::NoPeersAvailable);
}
let idx = rand::random::<usize>() % self.utreexo_peers.len();
Expand All @@ -447,6 +413,7 @@ impl<T: 'static + Default> UtreexoNode<T> {
.expect("node is in the interval 0..utreexo_peers.len(), but is not here?")
} else {
if self.peer_ids.is_empty() {
self.state = NodeState::WaitingPeer;
return Err(BlockchainError::NoPeersAvailable);
}
let idx = rand::random::<usize>() % self.peer_ids.len();
Expand Down Expand Up @@ -517,12 +484,9 @@ impl<T: 'static + Default> UtreexoNode<T> {
Ok(())
}
pub async fn ask_for_addresses(&mut self) -> Result<(), BlockchainError> {
let peer = self
let _ = self
.send_to_random_peer(NodeRequest::GetAddresses, ServiceFlags::NONE)
.await?;
self.inflight
.insert(InflightRequests::Addresses, (peer, Instant::now()));

Ok(())
}
fn save_peers(&self) -> Result<(), BlockchainError> {
Expand All @@ -545,7 +509,7 @@ impl<T: 'static + Default> UtreexoNode<T> {
}

async fn maybe_open_connection(&mut self) -> Result<(), BlockchainError> {
if self.peers.len() < MAX_OUTGOING_PEERS {
if self.peers.len() < T::MAX_OUTGOING_PEERS {
self.create_connection(false).await;
}
Ok(())
Expand Down Expand Up @@ -723,7 +687,8 @@ impl UtreexoNode<IBDNode> {
periodic_job!(
self.maybe_open_connection().await,
self.last_connection,
TRY_NEW_CONNECTION
TRY_NEW_CONNECTION,
IBDNode
);

self.last_tip_update = Instant::now();
Expand Down Expand Up @@ -752,7 +717,8 @@ impl UtreexoNode<IBDNode> {
}
// If we are downloading blocks, we need to request more if we have space.
let currently_inflight = self.0.inflight.len() + self.1.blocks.len();
if self.state == NodeState::DownloadBlocks && currently_inflight < MAX_INFLIGHT_REQUESTS
if self.state == NodeState::DownloadBlocks
&& currently_inflight < IBDNode::MAX_INFLIGHT_REQUESTS
{
let blocks = self.get_blocks_to_download()?;
if blocks.is_empty() {
Expand All @@ -769,7 +735,8 @@ impl UtreexoNode<IBDNode> {
periodic_job!(
self.maybe_request_headers().await,
self.last_headers_request,
IBD_REQUEST_BLOCKS_AGAIN
IBD_REQUEST_BLOCKS_AGAIN,
IBDNode
);
}
Ok(())
Expand Down Expand Up @@ -808,7 +775,7 @@ impl UtreexoNode<IBDNode> {
if let Some(peer) = self.peers.get(&peer).cloned() {
self.address_man.update_set_state(
peer.address_id as usize,
AddressState::Banned(BAN_TIME),
AddressState::Banned(IBDNode::BAN_TIME),
);
}
error!(
Expand All @@ -830,7 +797,7 @@ impl UtreexoNode<IBDNode> {

let currently_inflight = self.inflight.len() + self.1.blocks.len();
if self.state == NodeState::DownloadBlocks
&& currently_inflight < MAX_INFLIGHT_REQUESTS
&& currently_inflight < IBDNode::MAX_INFLIGHT_REQUESTS
{
let blocks = self.get_blocks_to_download()?;
self.request_blocks(blocks).await?;
Expand Down Expand Up @@ -918,7 +885,7 @@ impl UtreexoNode<RunningNode> {
}
fn handle_get_peer_info(&self) {
let mut peers = vec![];
for peer in self.peers.values() {
for peer in self.peer_ids.iter() {
peers.push(self.get_peer_info(peer));
}
let peers = peers.into_iter().flatten().collect();
Expand Down Expand Up @@ -979,19 +946,22 @@ impl UtreexoNode<RunningNode> {
periodic_job!(
self.save_peers(),
self.last_peer_db_dump,
PEER_DB_DUMP_INTERVAL
PEER_DB_DUMP_INTERVAL,
RunningNode
);
// Rework our address database
periodic_job!(
self.address_man.rearrange_buckets(),
self.1.last_address_rearrange,
ADDRESS_REARRANGE_INTERVAL
ADDRESS_REARRANGE_INTERVAL,
RunningNode
);
// Perhaps we need more connections
periodic_job!(
self.maybe_open_connection().await,
self.last_connection,
TRY_NEW_CONNECTION
TRY_NEW_CONNECTION,
RunningNode
);
try_and_log!(self.check_request_timeout());
self.handle_user_request().await;
Expand All @@ -1004,25 +974,29 @@ impl UtreexoNode<RunningNode> {
periodic_job!(
self.ask_for_addresses().await,
self.last_get_address_request,
ASK_FOR_PEERS_INTERVAL
ASK_FOR_PEERS_INTERVAL,
RunningNode
);
// Check whether we are in a stale tip
periodic_job!(
self.check_for_stale_tip().await,
self.last_tip_update,
ASSUME_STALE
ASSUME_STALE,
RunningNode
);
// Open new feeler connection periodically
periodic_job!(
self.open_feeler_connection().await,
self.1.last_feeler,
FEELER_INTERVAL
FEELER_INTERVAL,
RunningNode
);
// Try broadcast transactions
periodic_job!(
self.handle_broadcast().await,
self.last_broadcast,
BROADCAST_DELAY
BROADCAST_DELAY,
RunningNode
);
try_and_log!(self.ask_block().await);
try_and_log!(self.check_for_timeout().await);
Expand All @@ -1031,7 +1005,7 @@ impl UtreexoNode<RunningNode> {
}
async fn request_rescan_block(&mut self) -> Result<(), BlockchainError> {
let tip = self.chain.get_height().unwrap();
if self.inflight.len() + 10 > MAX_INFLIGHT_REQUESTS {
if self.inflight.len() + 10 > RunningNode::MAX_INFLIGHT_REQUESTS {
return Ok(());
}
// We use a grace period to avoid looping at the end of rescan
Expand Down Expand Up @@ -1134,8 +1108,10 @@ impl UtreexoNode<RunningNode> {
{
// We didn't request this block, so we should disconnect the peer.
if let Some(peer) = self.peers.get(&peer).cloned() {
self.address_man
.update_set_state(peer.address_id as usize, AddressState::Banned(BAN_TIME));
self.address_man.update_set_state(
peer.address_id as usize,
AddressState::Banned(RunningNode::BAN_TIME),
);
}
error!(
"Peer {peer} sent us block {} which we didn't request",
Expand Down Expand Up @@ -1166,8 +1142,10 @@ impl UtreexoNode<RunningNode> {
);
// Disconnect the peer and ban it.
if let Some(peer) = self.peers.get(&peer).cloned() {
self.address_man
.update_set_state(peer.address_id as usize, AddressState::Banned(BAN_TIME));
self.address_man.update_set_state(
peer.address_id as usize,
AddressState::Banned(RunningNode::BAN_TIME),
);
}
self.send_to_peer(peer, NodeRequest::Shutdown).await?;

Expand Down Expand Up @@ -1261,8 +1239,8 @@ macro_rules! try_and_log {
};
}
macro_rules! periodic_job {
($what: expr, $timer: expr, $interval: ident) => {
if $timer.elapsed() > Duration::from_secs($interval) {
($what: expr, $timer: expr, $interval: ident, $context: ty) => {
if $timer.elapsed() > Duration::from_secs(<$context>::$interval) {
try_and_log!($what);
$timer = Instant::now();
}
Expand Down
Loading

0 comments on commit cf25472

Please sign in to comment.