Skip to content

Commit

Permalink
removing unecessary channels in struct
Browse files Browse the repository at this point in the history
  • Loading branch information
a-moreira committed Dec 19, 2023
1 parent ea4398b commit f0217ce
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 31 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

34 changes: 16 additions & 18 deletions crates/floresta-electrum/src/electrum_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use async_std::{
net::{TcpListener, TcpStream},
};
use tokio::sync::mpsc::unbounded_channel;
use tokio::sync::mpsc::UnboundedReceiver;
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::RwLock;

Expand Down Expand Up @@ -78,15 +77,15 @@ pub struct ElectrumServer<Blockchain: BlockchainInterface> {
/// watch-only wallet, but it is adapted to the electrum protocol.
pub address_cache: Arc<RwLock<AddressCache<KvDatabase>>>,
/// The TCP listener is used to accept new connections to our server.
pub tcp_listener: Arc<TcpListener>,
// pub tcp_listener: TcpListener,
/// The clients are the clients connected to our server, we keep track of them
/// using a unique id.
pub clients: HashMap<ClientId, Arc<Client>>,
/// The message_receiver receive messages and handles them.
pub message_receiver: UnboundedReceiver<Message>,
// pub message_receiver: UnboundedReceiver<Message>,
/// The message_transmitter is used to send requests from clients or notifications
/// like new or dropped clients
pub message_transmitter: UnboundedSender<Message>,
// pub message_transmitter: UnboundedSender<Message>,
/// The client_addresses is used to keep track of the addresses of each client.
/// We keep the script_hash and which client has it, so we can notify the
/// clients when a new transaction is received.
Expand All @@ -95,23 +94,20 @@ pub struct ElectrumServer<Blockchain: BlockchainInterface> {

impl<Blockchain: BlockchainInterface> ElectrumServer<Blockchain> {
pub async fn new(
address: &'static str,
address_cache: Arc<RwLock<AddressCache<KvDatabase>>>,
chain: Arc<Blockchain>,
) -> Result<ElectrumServer<Blockchain>, Box<dyn std::error::Error>> {
let listener = Arc::new(TcpListener::bind(address).await?);
let (tx, rx) = unbounded_channel();
// let (::tx, rx) = unbounded_channel::<Message>();
let unconfirmed = address_cache.read().await.find_unconfirmed().unwrap();
for tx in unconfirmed {
chain.broadcast(&tx).expect("Invalid chain");
}
Ok(ElectrumServer {
chain,
address_cache,
tcp_listener: listener,
clients: HashMap::new(),
message_receiver: rx,
message_transmitter: tx,
// message_receiver: rx,
// message_transmitter: tx,
client_addresses: HashMap::new(),
})
}
Expand Down Expand Up @@ -357,7 +353,12 @@ impl<Blockchain: BlockchainInterface> ElectrumServer<Blockchain> {
}
}

pub async fn main_loop(mut self) -> Result<(), crate::error::Error> {
pub async fn main_loop(mut self, address: &str) -> Result<(), crate::error::Error> {
let (tx, mut rx) = unbounded_channel::<Message>();
let tcp_listener = TcpListener::bind(address).await?;

tokio::spawn(client_accept_loop(tcp_listener, tx));

let blocks = Channel::new();
let blocks = Arc::new(blocks);

Expand All @@ -367,12 +368,9 @@ impl<Blockchain: BlockchainInterface> ElectrumServer<Blockchain> {
for (block, height) in blocks.recv() {
self.handle_block(block, height).await;
}
let request = async_std::future::timeout(
std::time::Duration::from_secs(1),
self.message_receiver.recv(),
)
.await
.unwrap();
let request = async_std::future::timeout(std::time::Duration::from_secs(1), rx.recv())
.await
.unwrap();

if let Some(message) = request {
self.handle_message(message).await?;
Expand Down Expand Up @@ -508,7 +506,7 @@ async fn client_broker_loop(

/// Listens to new TCP connections in a loop
pub async fn client_accept_loop(
listener: Arc<TcpListener>,
listener: TcpListener,
message_transmitter: UnboundedSender<Message>,
) {
let mut id_count = 0;
Expand Down
1 change: 1 addition & 0 deletions crates/floresta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ floresta-electrum = { path = "../floresta-electrum", optional = true, version =
rustreexo = "0.1.0"
miniscript = "10.0.0"
async-std = "1.12.0"
tokio = { version = "1.35.0", features = ["full"] }
bitcoin = { version = "0.29", features = [
"serde",
"no-std",
Expand Down
2 changes: 1 addition & 1 deletion crates/floresta/examples/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
//! This will validate all blocks from genesis to the current tip, so it will take a while
//! to sync.
use async_std::sync::RwLock;
use bitcoin::BlockHash;
use floresta::chain::{pruned_utreexo::BlockchainInterface, ChainState, KvChainStore, Network};
use floresta::wire::mempool::Mempool;
Expand All @@ -14,6 +13,7 @@ use floresta::wire::node_context::RunningNode;
use floresta_wire::node_interface::NodeMethods;
use std::str::FromStr;
use std::sync::Arc;
use tokio::sync::RwLock;

const DATA_DIR: &str = "./data";

Expand Down
20 changes: 8 additions & 12 deletions florestad/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use floresta_chain::{
};
use floresta_common::constants::DIR_NAME;
use floresta_compact_filters::{FilterBackendBuilder, KvFiltersStore};
use floresta_electrum::electrum_protocol::{client_accept_loop, ElectrumServer};
use floresta_electrum::electrum_protocol::ElectrumServer;
use floresta_watch_only::{kv_database::KvDatabase, AddressCache, AddressCacheDatabase};
use floresta_wire::{mempool::Mempool, node::UtreexoNode};
use log::{debug, error, info};
Expand Down Expand Up @@ -268,22 +268,18 @@ fn run_with_ctx(ctx: Ctx) {
);

// Electrum
let electrum_server = block_on(ElectrumServer::new(
"0.0.0.0:50001",
wallet,
blockchain_state,
))
.expect("Could not create an Electrum Server");
let electrum_server = block_on(ElectrumServer::new(wallet, blockchain_state))
.expect("Could not create an Electrum Server");

// Spawn all services

// Electrum loop to accept new TCP clients
task::spawn(client_accept_loop(
electrum_server.tcp_listener.clone(),
electrum_server.message_transmitter.clone(),
));
// task::spawn(client_accept_loop(
// electrum_server.tcp_listener.clone(),
// electrum_server.message_transmitter.clone(),
// ));
// Electrum main loop
task::spawn(electrum_server.main_loop());
task::spawn(electrum_server.main_loop("0.0.0.0:50001"));
info!("Server running on: 0.0.0.0:50001");

let _kill_signal = kill_signal.clone();
Expand Down

0 comments on commit f0217ce

Please sign in to comment.