Skip to content

Commit

Permalink
move actors
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexander Abroskin committed May 13, 2019
1 parent 9607c0b commit a748905
Show file tree
Hide file tree
Showing 13 changed files with 508 additions and 469 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ This crate try to implement stellar node on rust. Still in very ealy stage.
We already complete handsnake process with remote peer. So you can monitor all messages from remote peer or explore the stellar network jumping between known addresses.

## Requirements
PostgreSQL
SQLite
libsodium-dev

## Quick start
Expand All @@ -20,5 +20,5 @@ diesel migration run
```
4. Run astrocore:
```
RIKER_CONF="riker.toml" RUST_LOG="astrocore=info" RUSTFLAGS=-Awarnings cargo run
RUST_LOG="astrocore=info" RUSTFLAGS=-Awarnings cargo run
```
File renamed without changes.
96 changes: 96 additions & 0 deletions src/actors/flood_gate.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
use super::{
address_peer_to_actor, debug, message_abbr, riker::actors::*, xdr, AstroProtocol,
FloodGate,
};
use std::collections::HashSet;
use std::time::{SystemTime, UNIX_EPOCH};

#[derive(Clone, Debug)]
pub(crate) struct FloodGateActor {
state: FloodGate,
}

impl FloodGateActor {
pub fn new() -> BoxActor<AstroProtocol> {
let actor = FloodGateActor {
state: FloodGate::new(),
};

Box::new(actor)
}

pub fn props() -> BoxActorProd<AstroProtocol> {
Props::new(Box::new(FloodGateActor::new))
}

/// Send message to anyone you haven't gotten it from
pub fn broadcast(
&mut self,
ctx: &Context<AstroProtocol>,
message: xdr::StellarMessage,
force: bool,
peers: HashSet<String>,
) {
if self.state.m_shutting_down {
return;
};

let index = message_abbr(&message);

// no one has sent us this message
if self.state.flood_map.get(&index).is_none() || force {
self.state
.add_record(&message, "self".to_string(), unix_time());
};

let record = match self.state.flood_map.get_mut(&index) {
Some(rec) => rec,
None => return,
};

let previous_sent = record.m_peers_told.clone();
for peer in peers {
let name = format!("/user/peer-{}", address_peer_to_actor(peer.clone()));
ctx.select(&name)
.unwrap()
.tell(AstroProtocol::SendPeerMessageCmd(message.to_owned()), None);
record.m_peers_told.push(peer.to_owned());
}

debug!(
"[Overlay][FloodGate] broadcast told {}",
record.m_peers_told.len() - previous_sent.len()
);
}
}

impl Actor for FloodGateActor {
type Msg = AstroProtocol;

fn receive(
&mut self,
ctx: &Context<Self::Msg>,
msg: Self::Msg,
_sender: Option<ActorRef<Self::Msg>>,
) {
debug!("FLOOD_GATE RECEIVE: {:?}", msg);
match msg {
AstroProtocol::AddRecordFloodGateCmd(message, address, seq_ledger) => {
self.state.add_record(&message, address, seq_ledger);
}
AstroProtocol::BroadcastFloodGateCmd(message, force, peers) => {
self.broadcast(ctx, message, force, peers);
}
AstroProtocol::ClearFloodGateCmd(seq_ledger) => self.state.clear_below(seq_ledger),
_ => unreachable!(),
}
}
}

// stub for current ledger value
fn unix_time() -> u32 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs() as u32
}
33 changes: 33 additions & 0 deletions src/actors/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#![allow(dead_code, unused_must_use)]

mod flood_gate;
mod overlay_manager;
mod peer;

pub(crate) use crate::{
astro_protocol::AstroProtocol,
config::CONFIG,
overlay::{message_abbr, FloodGate, OverlayManager, Peer, PeerInterface},
xdr,
};
pub(crate) use log::{debug, info};
pub(crate) use riker;

use self::flood_gate::FloodGateActor;
pub(crate) use self::overlay_manager::OverlayManagerActor;
use self::peer::PeerActor;

fn address_peer_to_actor(address: String) -> String {
address.replace(".", "-").replace(":", "-")
}

pub(crate) fn start() {
use riker::actors::*;
use riker_default::DefaultModel;

let model: DefaultModel<AstroProtocol> = DefaultModel::new();
let sys = ActorSystem::new(&model).unwrap();
let props = OverlayManagerActor::props();

sys.actor_of(props, "overlay_manager").unwrap();
}
216 changes: 216 additions & 0 deletions src/actors/overlay_manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
use super::{
address_peer_to_actor, info, riker::actors::*, xdr, AstroProtocol, FloodGateActor,
OverlayManager, Peer, PeerActor, CONFIG,
};
use std::net::TcpListener;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

#[derive(Clone, Debug)]
pub(crate) struct OverlayManagerActor {
state: OverlayManager,
}

impl OverlayManagerActor {
pub fn new() -> BoxActor<AstroProtocol> {
let actor = OverlayManagerActor {
state: OverlayManager::new(),
};

Box::new(actor)
}

pub fn props() -> BoxActorProd<AstroProtocol> {
Props::new(Box::new(OverlayManagerActor::new))
}

/// Run FloodGate
pub fn run_flood_gate(&mut self, ctx: &Context<AstroProtocol>) {
ctx.system
.actor_of(FloodGateActor::props(), "flood_gate")
.unwrap();
}

/// Run Listener actor for checking incoming connections
pub fn run_listener_actor(&mut self, ctx: &Context<AstroProtocol>) {
ctx.system
.actor_of(OverlayListenerActor::props(), "overlay_connection_listener")
.unwrap();
}

/// Run sheduler for checking minimal count of connections with peers
pub fn run_periodic_checker(&mut self, ctx: &Context<AstroProtocol>) {
let delay = Duration::from_millis(1000);
ctx.schedule_once(
delay,
ctx.myself(),
None,
AstroProtocol::CheckOverlayMinConnectionsCmd,
);
}

/// Check minimal connections
pub fn check_min_connections(&mut self, ctx: &Context<AstroProtocol>) {
if self.state.reached_limit_of_authenticated_peers() {
return;
}

let limit = self.state.peers_to_authenticated_limit() as usize;
let taked_peers: Vec<_> = self
.state
.known_peer_adresses()
.iter()
.cloned()
.take(limit)
.collect();

for peer in taked_peers {
self.state.move_peer_to_pending_list(peer.to_owned());
self.handle_new_initiated_peer(ctx, peer);
}

self.run_periodic_checker(ctx);
}

pub fn handle_new_incoming_peer(&mut self, ctx: &Context<AstroProtocol>, peer: Peer) {
if !self.state.reached_limit_of_authenticated_peers() {
let name = format!("peer-{}", address_peer_to_actor(peer.peer_addr()));
ctx.system
.actor_of(PeerActor::incoming_peer_props(peer), &name);
}
}

pub fn handle_new_initiated_peer(&mut self, ctx: &Context<AstroProtocol>, address: String) {
let name = format!("peer-{}", address_peer_to_actor(address.clone()));
ctx.system
.actor_of(PeerActor::initiated_peer_props(address), &name);
}

pub fn handle_incoming_message(
&mut self,
ctx: &Context<AstroProtocol>,
address: String,
message: xdr::StellarMessage,
) {
match message {
xdr::StellarMessage::Peers(ref set_of_peers) => {
self.state.add_known_peers(set_of_peers);
}
xdr::StellarMessage::Transaction(_) | xdr::StellarMessage::Envelope(_) => {
let flood_gate = ctx.select("/user/flood_gate").unwrap();
flood_gate.tell(
AstroProtocol::AddRecordFloodGateCmd(message.to_owned(), address, unix_time()),
None,
);
flood_gate.tell(
AstroProtocol::BroadcastFloodGateCmd(
message.to_owned(),
false,
self.state.authenticated_peers().clone(),
),
None,
);
flood_gate.tell(AstroProtocol::ClearFloodGateCmd(unix_time()), None);
}
_ => (),
}
}
}

impl Actor for OverlayManagerActor {
type Msg = AstroProtocol;

fn pre_start(&mut self, _ctx: &Context<Self::Msg>) {
self.state = OverlayManager::new();
self.state.populate_known_peers_from_db();
}

fn receive(
&mut self,
ctx: &Context<Self::Msg>,
msg: Self::Msg,
sender: Option<ActorRef<Self::Msg>>,
) {
match msg {
AstroProtocol::CheckOverlayMinConnectionsCmd => self.check_min_connections(ctx),
AstroProtocol::HandleOverlayIncomingPeerCmd(peer) => {
self.handle_new_incoming_peer(ctx, peer)
}
AstroProtocol::ReceivedPeerMessageCmd(address, message) => {
self.handle_incoming_message(ctx, address, message)
}
AstroProtocol::AuthPeerOkCmd(address) => {
self.state.move_peer_to_authenticated_list(address)
}
AstroProtocol::FailedPeerCmd(address) => {
self.state.move_peer_to_failed_list(address);
ctx.system.stop(&sender.unwrap());
}
_ => unreachable!(),
}
}

fn post_start(&mut self, ctx: &Context<Self::Msg>) {
self.run_listener_actor(ctx);
self.run_periodic_checker(ctx);
self.run_flood_gate(ctx);
}
}

pub(crate) struct OverlayListenerActor;

impl OverlayListenerActor {
pub fn new() -> BoxActor<AstroProtocol> {
Box::new(OverlayListenerActor)
}

fn props() -> BoxActorProd<AstroProtocol> {
Props::new(Box::new(OverlayListenerActor::new))
}
}

impl Actor for OverlayListenerActor {
type Msg = AstroProtocol;

fn receive(
&mut self,
_ctx: &Context<Self::Msg>,
_msg: Self::Msg,
_sender: Option<ActorRef<Self::Msg>>,
) {
unreachable!();
}

fn post_start(&mut self, ctx: &Context<Self::Msg>) {
let listener = TcpListener::bind(CONFIG.local_node().address()).expect(
"[Overlay][Listener] Unable to listen local address to handle incoming connections",
);

info!(
"[Overlay][Listener] start to listen incoming connections on {:?}",
CONFIG.local_node().address()
);

for stream in listener.incoming() {
match stream {
Ok(stream) => {
let peer = Peer::new(stream, CONFIG.local_node().address());
ctx.myself()
.parent()
.tell(AstroProtocol::HandleOverlayIncomingPeerCmd(peer), None)
}
Err(e) => {
info!("[Overlay][Listener] CONNECTION FAILED, cause: {:?}", e);
}
}
}
unreachable!();
}
}

// stub for current ledger value
fn unix_time() -> u32 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs() as u32
}
Loading

0 comments on commit a748905

Please sign in to comment.