Skip to content

Commit

Permalink
small refactor + comments (#105)
Browse files Browse the repository at this point in the history
* start small refactor + comments

* rename Peer to Client

* clippy + fmt

* fix .gitignore to correctly ignore any /data/ directories

* use info and error macros
  • Loading branch information
a-moreira authored Dec 15, 2023
1 parent 55d75a0 commit 51ee084
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 92 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ perf.*
*.svg
config.toml
/builds
/data
data/
186 changes: 103 additions & 83 deletions crates/floresta-electrum/src/electrum_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,66 +22,71 @@ use bitcoin::{
};
use bitcoin::{Transaction, TxOut};

use log::{info, log, trace, Level};
use log::{error, info, trace};
use serde_json::{json, Value};
use std::collections::{HashMap, HashSet};
use std::sync::Arc;

/// Type alias for u32 representing a ClientId
type ClientId = u32;

/// A client connected to the server
#[derive(Debug, Clone, Default)]
pub struct Peer {
#[derive(Debug, Clone)]
pub struct Client {
client_id: ClientId,
_addresses: HashSet<Script>,
stream: Option<Arc<TcpStream>>,
stream: Arc<TcpStream>,
}

impl Peer {
impl Client {
/// Send a message to the client, should be a serialized JSON
pub async fn write(&self, data: &[u8]) -> Result<(), std::io::Error> {
if let Some(stream) = &self.stream {
let mut stream = &**stream;
let _ = stream.write(data).await;
let _ = stream.write('\n'.to_string().as_bytes()).await;
}
let mut stream = self.stream.as_ref();
let _ = stream.write(data).await;
let _ = stream.write('\n'.to_string().as_bytes()).await;

Ok(())
}
/// Create a new peer from a stream
pub fn new(stream: Arc<TcpStream>) -> Self {
Peer {
/// Create a new client from a stream
pub fn new(client_id: ClientId, stream: Arc<TcpStream>) -> Self {
Client {
client_id,
_addresses: HashSet::new(),
stream: Some(stream),
stream,
}
}
}

pub enum Message {
/// A new client just connected to the server
NewClient((ClientId, Arc<Client>)),
/// Some client just sent a message
Message((ClientId, String)),
/// A client just disconnected
Disconnect(ClientId),
}

pub struct ElectrumServer<Blockchain: BlockchainInterface> {
/// The blockchain backend we are using. This will be used to query
/// blockchain information and broadcast transactions.
pub chain: Arc<Blockchain>,
/// The address cache is used to store addresses and transactions, like a
/// watch-only wallet, but it is adapted to the electrum protocol.
pub address_cache: Arc<RwLock<AddressCache<KvDatabase>>>,
/// The listener is used to accept new connections to our server.
pub listener: Option<Arc<TcpListener>>,
/// The peers are the clients connected to our server, we keep track of them
/// The TCP listener is used to accept new connections to our server.
pub tcp_listener: Arc<TcpListener>,
/// The clients are the clients connected to our server, we keep track of them
/// using a unique id.
pub peers: HashMap<u32, Arc<Peer>>,
/// The peer_accept channel is used to send peer related message to the main
/// thread.
pub peer_accept: Receiver<Message>,
/// The notify_tx channel is used to send notifications to the main thread.
pub notify_tx: Sender<Message>,
/// The peer_addresses is used to keep track of the addresses of each peer.
/// We keep the script_hash and which peer has it, so we can notify the
/// peers when a new transaction is received.
pub peer_addresses: HashMap<sha256::Hash, Arc<Peer>>,
}
pub enum Message {
/// A new peer just connected to the server
NewPeer((u32, Arc<Peer>)),
/// Some peer just sent a message
Message((u32, String)),
/// A peer just disconnected
Disconnect(u32),
pub clients: HashMap<ClientId, Arc<Client>>,
/// The message_receiver receive messages and handles them.
pub message_receiver: Receiver<Message>,
/// The message_transmitter is used to send requests from clients or notifications
/// like new or dropped clients
pub message_transmitter: Sender<Message>,
/// The client_addresses is used to keep track of the addresses of each client.
/// We keep the script_hash and which client has it, so we can notify the
/// clients when a new transaction is received.
pub client_addresses: HashMap<sha256::Hash, Arc<Client>>,
}

impl<Blockchain: BlockchainInterface> ElectrumServer<Blockchain> {
Expand All @@ -99,18 +104,19 @@ impl<Blockchain: BlockchainInterface> ElectrumServer<Blockchain> {
Ok(ElectrumServer {
chain,
address_cache,
listener: Some(listener),
peers: HashMap::new(),
peer_accept: rx,
notify_tx: tx,
peer_addresses: HashMap::new(),
tcp_listener: listener,
clients: HashMap::new(),
message_receiver: rx,
message_transmitter: tx,
client_addresses: HashMap::new(),
})
}
/// Handle a request from a peer. All methods are defined in the electrum

/// Handle a request from a client. All methods are defined in the electrum
/// protocol.
pub async fn handle_peer_request(
pub async fn handle_client_request(
&mut self,
peer: Arc<Peer>,
client: Arc<Client>,
request: Request,
) -> Result<Value, super::error::Error> {
// Methods are in alphabetical order
Expand Down Expand Up @@ -245,7 +251,7 @@ impl<Blockchain: BlockchainInterface> ElectrumServer<Blockchain> {
}
"blockchain.scripthash.subscribe" => {
let hash = get_arg!(request, sha256::Hash, 0);
self.peer_addresses.insert(hash, peer);
self.client_addresses.insert(hash, client);

let history = self.address_cache.read().await.get_address_history(&hash);

Expand All @@ -257,7 +263,7 @@ impl<Blockchain: BlockchainInterface> ElectrumServer<Blockchain> {
}
"blockchain.scripthash.unsubscribe" => {
let address = get_arg!(request, sha256::Hash, 0);
self.peer_addresses.remove(&address);
self.client_addresses.remove(&address);
json_rpc_res!(request, true)
}
"blockchain.transaction.broadcast" => {
Expand Down Expand Up @@ -335,7 +341,7 @@ impl<Blockchain: BlockchainInterface> ElectrumServer<Blockchain> {
);
json_rpc_res!(request, res)
}
// TODO: Return peers?

"server.peers.subscribe" => json_rpc_res!(request, []),
"server.ping" => json_rpc_res!(request, null),
"server.version" => json_rpc_res!(
Expand All @@ -359,7 +365,7 @@ impl<Blockchain: BlockchainInterface> ElectrumServer<Blockchain> {
}
let request = async_std::future::timeout(
std::time::Duration::from_secs(1),
self.peer_accept.recv(),
self.message_receiver.recv(),
)
.await;

Expand All @@ -383,12 +389,12 @@ impl<Blockchain: BlockchainInterface> ElectrumServer<Blockchain> {
lock.bump_height(height);
}
if self.chain.get_height().unwrap() == height {
for peer in &mut self.peers.values() {
let res = peer
for client in &mut self.clients.values() {
let res = client
.write(serde_json::to_string(&result).unwrap().as_bytes())
.await;
if res.is_err() {
info!("Could not write to peer {:?}", peer);
info!("Could not write to client {:?}", client);
}
}
}
Expand All @@ -400,28 +406,29 @@ impl<Blockchain: BlockchainInterface> ElectrumServer<Blockchain> {

self.wallet_notify(&transactions).await;
}

/// Handles each kind of Message
async fn handle_message(&mut self, message: Message) -> Result<(), crate::error::Error> {
match message {
Message::NewPeer((id, stream)) => {
self.peers.insert(id, stream);
Message::NewClient((id, client)) => {
self.clients.insert(id, client);
}
Message::Message((peer, msg)) => {

Message::Message((client, msg)) => {
trace!("Message: {msg}");
if let Ok(req) = serde_json::from_str::<Request>(msg.as_str()) {
let peer = self.peers.get(&peer);
if peer.is_none() {
log!(
Level::Error,
"Peer sent a message but is not listed as peer"
);
let client = self.clients.get(&client);
if client.is_none() {
error!("Client sent a message but is not listed as client");
return Ok(());
}
let peer = peer.unwrap().to_owned();
let client = client.unwrap().to_owned();
let id = req.id.to_owned();
let res = self.handle_peer_request(peer.clone(), req).await;
let res = self.handle_client_request(client.clone(), req).await;

if let Ok(res) = res {
peer.write(serde_json::to_string(&res).unwrap().as_bytes())
client
.write(serde_json::to_string(&res).unwrap().as_bytes())
.await?;
} else {
let res = json!({
Expand All @@ -433,22 +440,25 @@ impl<Blockchain: BlockchainInterface> ElectrumServer<Blockchain> {
},
"id": id
});
peer.write(serde_json::to_string(&res).unwrap().as_bytes())
client
.write(serde_json::to_string(&res).unwrap().as_bytes())
.await?;
}
}
}

Message::Disconnect(id) => {
self.peers.remove(&id);
self.clients.remove(&id);
}
}

Ok(())
}

async fn wallet_notify(&self, transactions: &[(Transaction, TxOut)]) {
for (_, out) in transactions {
let hash = get_spk_hash(&out.script_pubkey);
if let Some(peer) = self.peer_addresses.get(&hash) {
if let Some(client) = self.client_addresses.get(&hash) {
let history = self.address_cache.read().await.get_address_history(&hash);

let status_hash = get_status(history);
Expand All @@ -457,54 +467,64 @@ impl<Blockchain: BlockchainInterface> ElectrumServer<Blockchain> {
"method": "blockchain.scripthash.subscribe",
"params": [hash, status_hash]
});
if let Err(err) = peer
if let Err(err) = client
.write(serde_json::to_string(&notify).unwrap().as_bytes())
.await
{
log!(Level::Error, "{err}");
error!("{err}");
}
}
}
}
}
/// Each peer get one reading loop
async fn peer_loop(
stream: Arc<TcpStream>,
id: u32,
notify_channel: Sender<Message>,

/// Each client gets one loop to deal with their requests
async fn client_broker_loop(
client: Arc<Client>,
message_transmitter: Sender<Message>,
) -> Result<(), std::io::Error> {
let mut _stream = &*stream;
let mut _stream = &*client.stream;
let mut lines = BufReader::new(_stream).lines();

while let Some(Ok(line)) = lines.next().await {
notify_channel
.send(Message::Message((id, line)))
message_transmitter
.send(Message::Message((client.client_id, line)))
.await
.expect("Main loop is broken");
}
log!(Level::Info, "Lost a peer");
notify_channel
.send(Message::Disconnect(id))

info!("Lost client with ID: {}", client.client_id);

message_transmitter
.send(Message::Disconnect(client.client_id))
.await
.expect("Main loop is broken");

Ok(())
}

pub async fn accept_loop(listener: Arc<TcpListener>, notify_channel: Sender<Message>) {
/// Listens to new TCP connections in a loop
pub async fn client_accept_loop(listener: Arc<TcpListener>, message_transmitter: Sender<Message>) {
let mut id_count = 0;
loop {
if let Ok((stream, _addr)) = listener.accept().await {
info!("New client connection");
let stream = Arc::new(stream);
async_std::task::spawn(peer_loop(stream.clone(), id_count, notify_channel.clone()));
let peer = Arc::new(Peer::new(stream));
notify_channel
.send(Message::NewPeer((id_count, peer)))
let client = Arc::new(Client::new(id_count, stream));
async_std::task::spawn(client_broker_loop(
client.clone(),
message_transmitter.clone(),
));

message_transmitter
.send(Message::NewClient((client.client_id, client)))
.await
.expect("Main loop is broken");
id_count += 1;
}
}
}

/// As per electrum documentation:
/// ### To calculate the status of a script hash (or address):
///
Expand Down
2 changes: 1 addition & 1 deletion florestad/src/json_rpc/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ impl Rpc for RpcImpl {
}
}
match txout {
Some(txout) => Ok(json!({"txout": txout})),
Some(txout) => Ok(json!({ "txout": txout })),
None => Ok(json!({})),
}
}
Expand Down
11 changes: 4 additions & 7 deletions florestad/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use floresta_chain::{
};
use floresta_common::constants::DIR_NAME;
use floresta_compact_filters::{FilterBackendBuilder, KvFiltersStore};
use floresta_electrum::electrum_protocol::{accept_loop, ElectrumServer};
use floresta_electrum::electrum_protocol::{client_accept_loop, ElectrumServer};
use floresta_watch_only::{kv_database::KvDatabase, AddressCache, AddressCacheDatabase};
use floresta_wire::{mempool::Mempool, node::UtreexoNode};
use log::{debug, error, info};
Expand Down Expand Up @@ -280,12 +280,9 @@ fn run_with_ctx(ctx: Ctx) {
// Spawn all services

// Electrum accept loop
task::spawn(accept_loop(
electrum_server
.listener
.clone()
.expect("Listener can't be none by this far"),
electrum_server.notify_tx.clone(),
task::spawn(client_accept_loop(
electrum_server.tcp_listener.clone(),
electrum_server.message_transmitter.clone(),
));
// Electrum main loop
task::spawn(electrum_server.main_loop());
Expand Down

0 comments on commit 51ee084

Please sign in to comment.