From 077309c1f89d249c3054607013e59e4e606ced30 Mon Sep 17 00:00:00 2001 From: bit-aloo Date: Tue, 28 Jan 2025 23:11:34 +0530 Subject: [PATCH] fmt changes --- roles/jd-client/src/lib/downstream.rs | 9 +++- roles/jd-client/src/lib/job_declarator/mod.rs | 8 +++- .../lib/job_declarator/setup_connection.rs | 8 +++- roles/jd-client/src/lib/mod.rs | 7 ++-- roles/jd-client/src/lib/status.rs | 33 ++++++++------- .../src/lib/template_receiver/mod.rs | 8 +++- .../lib/template_receiver/setup_connection.rs | 3 +- roles/jd-server/src/lib/job_declarator/mod.rs | 3 +- roles/jd-server/src/lib/mod.rs | 13 ++++-- roles/jd-server/src/lib/status.rs | 4 +- .../mining-proxy/src/lib/downstream_mining.rs | 6 ++- roles/mining-proxy/src/lib/upstream_mining.rs | 5 ++- roles/pool/src/lib/error.rs | 14 ++++--- roles/pool/src/lib/mod.rs | 4 +- roles/pool/src/lib/status.rs | 21 ++++++---- roles/pool/src/lib/template_receiver/mod.rs | 5 ++- .../lib/template_receiver/setup_connection.rs | 3 +- .../src/noise_connection_tokio.rs | 33 +++++++++------ .../src/plain_connection_tokio.rs | 10 ++++- .../mining-device-sv1/src/client.rs | 13 +++--- roles/test-utils/mining-device/src/lib/mod.rs | 42 +++++++++++++------ roles/tests-integration/lib/sniffer.rs | 10 ++++- .../src/lib/downstream_sv1/diff_management.rs | 8 ++-- .../src/lib/downstream_sv1/downstream.rs | 5 ++- roles/translator/src/lib/error.rs | 20 ++++++--- roles/translator/src/lib/mod.rs | 5 ++- roles/translator/src/lib/proxy/bridge.rs | 14 ++++--- roles/translator/src/lib/status.rs | 31 ++++++++------ .../src/lib/upstream_sv2/upstream.rs | 4 +- 29 files changed, 228 insertions(+), 121 deletions(-) diff --git a/roles/jd-client/src/lib/downstream.rs b/roles/jd-client/src/lib/downstream.rs index 27fc88fa5..dc4beb124 100644 --- a/roles/jd-client/src/lib/downstream.rs +++ b/roles/jd-client/src/lib/downstream.rs @@ -708,7 +708,14 @@ pub async fn listen_for_downstream_mining( jd, ); - let mut incoming: StdFrame = node.receiver.subscribe().recv().await.unwrap().try_into().unwrap(); + let mut incoming: StdFrame = node + .receiver + .subscribe() + .recv() + .await + .unwrap() + .try_into() + .unwrap(); let message_type = incoming.get_header().unwrap().msg_type(); let payload = incoming.payload(); let routing_logic = roles_logic_sv2::routing_logic::CommonRoutingLogic::None; diff --git a/roles/jd-client/src/lib/job_declarator/mod.rs b/roles/jd-client/src/lib/job_declarator/mod.rs index 18a4e192f..edba23289 100644 --- a/roles/jd-client/src/lib/job_declarator/mod.rs +++ b/roles/jd-client/src/lib/job_declarator/mod.rs @@ -290,7 +290,13 @@ impl JobDeclarator { tokio::task::spawn(async move { let receiver = self_mutex.safe_lock(|d| d.receiver.clone()).unwrap(); loop { - let mut incoming: StdFrame = receiver.subscribe().recv().await.unwrap().try_into().unwrap(); + let mut incoming: StdFrame = receiver + .subscribe() + .recv() + .await + .unwrap() + .try_into() + .unwrap(); let message_type = incoming.get_header().unwrap().msg_type(); let payload = incoming.payload(); let next_message_to_send = diff --git a/roles/jd-client/src/lib/job_declarator/setup_connection.rs b/roles/jd-client/src/lib/job_declarator/setup_connection.rs index db11befb6..1fec45b10 100644 --- a/roles/jd-client/src/lib/job_declarator/setup_connection.rs +++ b/roles/jd-client/src/lib/job_declarator/setup_connection.rs @@ -54,7 +54,13 @@ impl SetupConnectionHandler { sender.send(sv2_frame).map_err(|_| ())?; - let mut incoming: StdFrame = receiver.subscribe().recv().await.unwrap().try_into().unwrap(); + let mut incoming: StdFrame = receiver + .subscribe() + .recv() + .await + .unwrap() + .try_into() + .unwrap(); let message_type = incoming.get_header().unwrap().msg_type(); let payload = incoming.payload(); diff --git a/roles/jd-client/src/lib/mod.rs b/roles/jd-client/src/lib/mod.rs index 18ff60ec2..a228b73ae 100644 --- a/roles/jd-client/src/lib/mod.rs +++ b/roles/jd-client/src/lib/mod.rs @@ -331,10 +331,9 @@ impl JobDeclaratorClient { { Ok(c) => c, Err(e) => { - let _ = tx_status - .send(status::Status { - state: status::State::UpstreamShutdown(e), - }); + let _ = tx_status.send(status::Status { + state: status::State::UpstreamShutdown(e), + }); return; } }; diff --git a/roles/jd-client/src/lib/status.rs b/roles/jd-client/src/lib/status.rs index 5f73b9338..e96edf657 100644 --- a/roles/jd-client/src/lib/status.rs +++ b/roles/jd-client/src/lib/status.rs @@ -15,22 +15,25 @@ pub enum Sender { #[derive(Debug)] pub enum ErrorS { AsyncError(async_channel::SendError>), - TokioError(tokio::sync::mpsc::error::SendError>) + TokioError(tokio::sync::mpsc::error::SendError>), } impl Sender { - pub async fn send( - &self, - status: Status<'static>, - ) -> Result<(), ErrorS> { + pub async fn send(&self, status: Status<'static>) -> Result<(), ErrorS> { match self { Self::Downstream(inner) => inner.send(status).await.map_err(|e| ErrorS::AsyncError(e)), - Self::DownstreamListener(inner) => inner.send(status).await.map_err(|e| ErrorS::AsyncError(e)), + Self::DownstreamListener(inner) => { + inner.send(status).await.map_err(|e| ErrorS::AsyncError(e)) + } Self::Upstream(inner) => inner.send(status).await.map_err(|e| ErrorS::AsyncError(e)), - Self::TemplateReceiver(inner) => inner.send(status).await.map_err(|e| ErrorS::AsyncError(e)), + Self::TemplateReceiver(inner) => { + inner.send(status).await.map_err(|e| ErrorS::AsyncError(e)) + } Self::DownstreamTokio(inner) => inner.send(status).map_err(|e| ErrorS::TokioError(e)), - Self::TemplateReceiverTokio(inner) => inner.send(status).map_err(|e| ErrorS::TokioError(e)), - Self::UpstreamTokio(inner) => inner.send(status).map_err(|e| ErrorS::TokioError(e)) + Self::TemplateReceiverTokio(inner) => { + inner.send(status).map_err(|e| ErrorS::TokioError(e)) + } + Self::UpstreamTokio(inner) => inner.send(status).map_err(|e| ErrorS::TokioError(e)), } } } @@ -42,9 +45,9 @@ impl Clone for Sender { Self::DownstreamListener(inner) => Self::DownstreamListener(inner.clone()), Self::Upstream(inner) => Self::Upstream(inner.clone()), Self::TemplateReceiver(inner) => Self::TemplateReceiver(inner.clone()), - Self::DownstreamTokio( inner) => Self::DownstreamTokio(inner.clone()), - Self::TemplateReceiverTokio( inner) => Self::TemplateReceiverTokio(inner.clone()), - Self::UpstreamTokio( inner) => Self::UpstreamTokio(inner.clone()) + Self::DownstreamTokio(inner) => Self::DownstreamTokio(inner.clone()), + Self::TemplateReceiverTokio(inner) => Self::TemplateReceiverTokio(inner.clone()), + Self::UpstreamTokio(inner) => Self::UpstreamTokio(inner.clone()), } } } @@ -95,19 +98,19 @@ async fn send_status( }) .await .unwrap_or(()); - }, + } Sender::DownstreamTokio(tx) => { tx.send(Status { state: State::Healthy(e.to_string()), }) .unwrap_or(()); - }, + } Sender::TemplateReceiverTokio(tx) => { tx.send(Status { state: State::UpstreamShutdown(e), }) .unwrap_or(()); - }, + } Sender::UpstreamTokio(tx) => { tx.send(Status { state: State::UpstreamShutdown(e), diff --git a/roles/jd-client/src/lib/template_receiver/mod.rs b/roles/jd-client/src/lib/template_receiver/mod.rs index 17373c8d6..8a028aef4 100644 --- a/roles/jd-client/src/lib/template_receiver/mod.rs +++ b/roles/jd-client/src/lib/template_receiver/mod.rs @@ -190,7 +190,8 @@ impl TemplateRx { .clone() .safe_lock(|s| s.receiver.clone()) .unwrap(); - let received = handle_result!(tx_status.clone(), receiver.subscribe().recv().await); + let received = + handle_result!(tx_status.clone(), receiver.subscribe().recv().await); let mut frame: StdFrame = handle_result!(tx_status.clone(), received.try_into()); let message_type = frame.get_header().unwrap().msg_type(); @@ -312,7 +313,10 @@ impl TemplateRx { .unwrap(); } - async fn on_new_solution(self_: Arc>,mut rx: tokio::sync::mpsc::Receiver>) { + async fn on_new_solution( + self_: Arc>, + mut rx: tokio::sync::mpsc::Receiver>, + ) { while let Some(solution) = rx.recv().await { if !self_ .safe_lock(|s| s.test_only_do_not_send_solution_to_tp) diff --git a/roles/jd-client/src/lib/template_receiver/setup_connection.rs b/roles/jd-client/src/lib/template_receiver/setup_connection.rs index 68ba35d76..318c22482 100644 --- a/roles/jd-client/src/lib/template_receiver/setup_connection.rs +++ b/roles/jd-client/src/lib/template_receiver/setup_connection.rs @@ -46,7 +46,8 @@ impl SetupConnectionHandler { let sv2_frame = sv2_frame.into(); sender.send(sv2_frame).map_err(|_| ())?; - let mut incoming: StdFrame = receiver.subscribe() + let mut incoming: StdFrame = receiver + .subscribe() .recv() .await .expect("Connection to TP closed!") diff --git a/roles/jd-server/src/lib/job_declarator/mod.rs b/roles/jd-server/src/lib/job_declarator/mod.rs index 853d27264..b2abf8b17 100644 --- a/roles/jd-server/src/lib/job_declarator/mod.rs +++ b/roles/jd-server/src/lib/job_declarator/mod.rs @@ -160,8 +160,7 @@ impl JobDeclaratorDownstream { .unwrap(); let sender_add_txs_to_mempool = add_txs_to_mempool.sender_add_txs_to_mempool; let add_txs_to_mempool_inner = add_txs_to_mempool.add_txs_to_mempool_inner; - let _ = sender_add_txs_to_mempool - .send(add_txs_to_mempool_inner); + let _ = sender_add_txs_to_mempool.send(add_txs_to_mempool_inner); // the trasnactions sent to the mempool can be freed let _ = self_mutex.safe_lock(|a| { a.add_txs_to_mempool.add_txs_to_mempool_inner = AddTrasactionsToMempoolInner { diff --git a/roles/jd-server/src/lib/mod.rs b/roles/jd-server/src/lib/mod.rs index b73fc03ad..bedbcbe1d 100644 --- a/roles/jd-server/src/lib/mod.rs +++ b/roles/jd-server/src/lib/mod.rs @@ -45,7 +45,10 @@ impl JobDeclaratorServer { // broadcast can be used, as JDSMempool is clonable. // let (new_block_sender, new_block_receiver): (Sender, Receiver) = // bounded(10); - let (new_block_sender, _): (tokio::sync::broadcast::Sender, tokio::sync::broadcast::Receiver) = tokio::sync::broadcast::channel(10); + let (new_block_sender, _): ( + tokio::sync::broadcast::Sender, + tokio::sync::broadcast::Receiver, + ) = tokio::sync::broadcast::channel(10); let mempool = Arc::new(Mutex::new(mempool::JDsMempool::new( url.clone(), username, @@ -56,7 +59,7 @@ impl JobDeclaratorServer { let mempool_cloned_ = mempool.clone(); // mpsc can be used. // let (status_tx, status_rx) = unbounded(); - let (status_tx,mut status_rx) = tokio::sync::mpsc::unbounded_channel(); + let (status_tx, mut status_rx) = tokio::sync::mpsc::unbounded_channel(); let sender = status::Sender::DownstreamTokio(status_tx.clone()); let mut last_empty_mempool_warning = std::time::Instant::now().sub(std::time::Duration::from_secs(60)); @@ -130,7 +133,8 @@ impl JobDeclaratorServer { let mempool_cloned = mempool.clone(); // mpsc should work here // let (sender_add_txs_to_mempool, receiver_add_txs_to_mempool) = unbounded(); - let (sender_add_txs_to_mempool,mut receiver_add_txs_to_mempool) = tokio::sync::mpsc::unbounded_channel(); + let (sender_add_txs_to_mempool, mut receiver_add_txs_to_mempool) = + tokio::sync::mpsc::unbounded_channel(); task::spawn(async move { JobDeclarator::start( cloned, @@ -143,7 +147,8 @@ impl JobDeclaratorServer { }); task::spawn(async move { loop { - if let Some(add_transactions_to_mempool) = receiver_add_txs_to_mempool.recv().await { + if let Some(add_transactions_to_mempool) = receiver_add_txs_to_mempool.recv().await + { let mempool_cloned = mempool.clone(); task::spawn(async move { match mempool::JDsMempool::add_tx_data_to_mempool( diff --git a/roles/jd-server/src/lib/status.rs b/roles/jd-server/src/lib/status.rs index 37d303cb9..4f3e26d61 100644 --- a/roles/jd-server/src/lib/status.rs +++ b/roles/jd-server/src/lib/status.rs @@ -91,7 +91,7 @@ async fn send_status( }) .await .unwrap_or(()); - }, + } Sender::DownstreamTokio(tx) => match e { JdsError::Sv2ProtocolError((id, Mining::OpenMiningChannelError(_))) => { tx.send(Status { @@ -156,7 +156,7 @@ pub async fn handle_error(sender: &Sender, e: JdsError) -> error_handling::Error } JdsError::NoLastDeclaredJob => { send_status(sender, e, error_handling::ErrorBranch::Continue).await - }, + } JdsError::ChannelRecvTokio(_) => { send_status(sender, e, error_handling::ErrorBranch::Break).await } diff --git a/roles/mining-proxy/src/lib/downstream_mining.rs b/roles/mining-proxy/src/lib/downstream_mining.rs index 6eede6a20..483d902cd 100644 --- a/roles/mining-proxy/src/lib/downstream_mining.rs +++ b/roles/mining-proxy/src/lib/downstream_mining.rs @@ -143,7 +143,11 @@ impl DownstreamMiningNode { .open_channel_for_down_hom_up_extended(channel_id, group_id); } - pub fn new(receiver: tokio::sync::broadcast::Sender, sender: tokio::sync::broadcast::Sender, id: u32) -> Self { + pub fn new( + receiver: tokio::sync::broadcast::Sender, + sender: tokio::sync::broadcast::Sender, + id: u32, + ) -> Self { Self { receiver, sender, diff --git a/roles/mining-proxy/src/lib/upstream_mining.rs b/roles/mining-proxy/src/lib/upstream_mining.rs index 583b6c515..3f5ece278 100644 --- a/roles/mining-proxy/src/lib/upstream_mining.rs +++ b/roles/mining-proxy/src/lib/upstream_mining.rs @@ -127,7 +127,10 @@ struct UpstreamMiningConnection { } impl UpstreamMiningConnection { - async fn send(&mut self, sv2_frame: StdFrame) -> Result<(), tokio::sync::broadcast::error::SendError> { + async fn send( + &mut self, + sv2_frame: StdFrame, + ) -> Result<(), tokio::sync::broadcast::error::SendError> { info!("SEND"); let either_frame = sv2_frame.into(); match self.sender.send(either_frame) { diff --git a/roles/pool/src/lib/error.rs b/roles/pool/src/lib/error.rs index 0b8147a3a..13c09b602 100644 --- a/roles/pool/src/lib/error.rs +++ b/roles/pool/src/lib/error.rs @@ -21,7 +21,7 @@ pub enum PoolError { Custom(String), Sv2ProtocolError((u32, Mining<'static>)), TokioChannelRecv(Box), - TokioBroadcastChannelRecv(tokio::sync::broadcast::error::RecvError) + TokioBroadcastChannelRecv(tokio::sync::broadcast::error::RecvError), } impl std::fmt::Display for PoolError { @@ -41,9 +41,9 @@ impl std::fmt::Display for PoolError { Custom(ref e) => write!(f, "Custom SV2 error: `{:?}`", e), Sv2ProtocolError(ref e) => { write!(f, "Received Sv2 Protocol Error from upstream: `{:?}`", e) - }, + } TokioChannelRecv(ref e) => write!(f, "Channel recv failed: `{:?}`", e), - TokioBroadcastChannelRecv(ref e) => write!(f, "BroadCastChannel Recv failed: {:?}", e) + TokioBroadcastChannelRecv(ref e) => write!(f, "BroadCastChannel Recv failed: {:?}", e), } } } @@ -98,13 +98,17 @@ impl<'a, T: 'static + std::marker::Send + Debug> From From> for PoolError { +impl<'a, T: 'static + std::marker::Send + Debug> From> + for PoolError +{ fn from(e: tokio::sync::mpsc::error::SendError) -> PoolError { PoolError::TokioChannelRecv(Box::new(e)) } } -impl<'a, T: 'static + std::marker::Send + Debug> From> for PoolError { +impl<'a, T: 'static + std::marker::Send + Debug> From> + for PoolError +{ fn from(e: tokio::sync::broadcast::error::SendError) -> PoolError { PoolError::TokioChannelRecv(Box::new(e)) } diff --git a/roles/pool/src/lib/mod.rs b/roles/pool/src/lib/mod.rs index 93e4e7e70..016c158a2 100644 --- a/roles/pool/src/lib/mod.rs +++ b/roles/pool/src/lib/mod.rs @@ -28,7 +28,7 @@ impl PoolSv2 { // producers are clonable so no issue. but its unbounded. // tokio also provide unbounded mpsc. // let (status_tx, status_rx) = unbounded(); - let (status_tx,mut status_rx) = tokio::sync::mpsc::unbounded_channel(); + let (status_tx, mut status_rx) = tokio::sync::mpsc::unbounded_channel(); // r_new_t consumer is sent in pool::start, s_new_t is sent in templateRx::connect // sender or producer I dont give a damn about. even the r_new_t is passed in only // start then to on_new_template, so mpsc makes sense here as well. @@ -39,7 +39,7 @@ impl PoolSv2 { // sent to on_new_prevhash, so mpsc also works here. // let (s_prev_hash, r_prev_hash) = bounded(10); let (s_prev_hash, r_prev_hash) = tokio::sync::mpsc::channel(10); - // s_solution is sent to pool (no one give a damn about clonable), r_solution is sent + // s_solution is sent to pool (no one give a damn about clonable), r_solution is sent // to templateRx and then to on_new_solution, so mpsc works. let (s_solution, r_solution) = tokio::sync::mpsc::channel(10); // This is spicy, as the r_message_recv_signal is cloning at few of the places, so, we can diff --git a/roles/pool/src/lib/status.rs b/roles/pool/src/lib/status.rs index 7a000816a..e9fbda1a1 100644 --- a/roles/pool/src/lib/status.rs +++ b/roles/pool/src/lib/status.rs @@ -13,14 +13,13 @@ pub enum Sender { DownstreamTokio(tokio::sync::mpsc::UnboundedSender), DownstreamListenerTokio(tokio::sync::mpsc::UnboundedSender), UpstreamTokio(tokio::sync::mpsc::UnboundedSender), - } #[derive(Debug)] pub enum Error { AsyncChannel(async_channel::SendError), TokioChannel(tokio::sync::mpsc::error::SendError), - TokioChannelUnbounded(tokio::sync::mpsc::error::SendError) + TokioChannelUnbounded(tokio::sync::mpsc::error::SendError), } impl From> for Error { @@ -50,11 +49,15 @@ impl Sender { pub async fn send(&self, status: Status) -> Result<(), Error> { match self { Self::Downstream(inner) => inner.send(status).await.map_err(|e| Error::AsyncChannel(e)), - Self::DownstreamListener(inner) => inner.send(status).await.map_err(|e| Error::AsyncChannel(e)), + Self::DownstreamListener(inner) => { + inner.send(status).await.map_err(|e| Error::AsyncChannel(e)) + } Self::Upstream(inner) => inner.send(status).await.map_err(|e| Error::AsyncChannel(e)), - Self::DownstreamListenerTokio(inner) => inner.send(status).map_err(|e| Error::TokioChannel(e)), + Self::DownstreamListenerTokio(inner) => { + inner.send(status).map_err(|e| Error::TokioChannel(e)) + } Self::DownstreamTokio(inner) => inner.send(status).map_err(|e| Error::TokioChannel(e)), - Self::UpstreamTokio(inner) => inner.send(status).map_err(|e| Error::TokioChannel(e)) + Self::UpstreamTokio(inner) => inner.send(status).map_err(|e| Error::TokioChannel(e)), } } } @@ -67,7 +70,7 @@ impl Clone for Sender { Self::Upstream(inner) => Self::Upstream(inner.clone()), Self::DownstreamTokio(inner) => Self::DownstreamTokio(inner.clone()), Self::DownstreamListenerTokio(inner) => Self::DownstreamListenerTokio(inner.clone()), - Self::UpstreamTokio(inner) => Self::UpstreamTokio(inner.clone()) + Self::UpstreamTokio(inner) => Self::UpstreamTokio(inner.clone()), } } } @@ -134,7 +137,7 @@ async fn send_status( }) .await .unwrap_or(()); - }, + } Sender::DownstreamTokio(tx) => match e { PoolError::Sv2ProtocolError((id, Mining::OpenMiningChannelError(_))) => { tx.send(Status { @@ -210,10 +213,10 @@ pub async fn handle_error(sender: &Sender, e: PoolError) -> error_handling::Erro } PoolError::Sv2ProtocolError(_) => { send_status(sender, e, error_handling::ErrorBranch::Break).await - }, + } PoolError::TokioChannelRecv(_) => { send_status(sender, e, error_handling::ErrorBranch::Continue).await - }, + } PoolError::TokioBroadcastChannelRecv(_) => { send_status(sender, e, error_handling::ErrorBranch::Continue).await } diff --git a/roles/pool/src/lib/template_receiver/mod.rs b/roles/pool/src/lib/template_receiver/mod.rs index 0e457477d..ce8dca40e 100644 --- a/roles/pool/src/lib/template_receiver/mod.rs +++ b/roles/pool/src/lib/template_receiver/mod.rs @@ -164,7 +164,10 @@ impl TemplateRx { Ok(()) } - async fn on_new_solution(self_: Arc>,mut rx: tokio::sync::mpsc::Receiver>) { + async fn on_new_solution( + self_: Arc>, + mut rx: tokio::sync::mpsc::Receiver>, + ) { let status_tx = self_.safe_lock(|s| s.status_tx.clone()).unwrap(); while let Some(solution) = rx.recv().await { info!("Sending Solution to TP: {:?}", &solution); diff --git a/roles/pool/src/lib/template_receiver/setup_connection.rs b/roles/pool/src/lib/template_receiver/setup_connection.rs index 8a383bc86..0e27ff36b 100644 --- a/roles/pool/src/lib/template_receiver/setup_connection.rs +++ b/roles/pool/src/lib/template_receiver/setup_connection.rs @@ -47,7 +47,8 @@ impl SetupConnectionHandler { let sv2_frame = sv2_frame.into(); sender.send(sv2_frame)?; - let mut incoming: StdFrame = receiver.subscribe() + let mut incoming: StdFrame = receiver + .subscribe() .recv() .await? .try_into() diff --git a/roles/roles-utils/network-helpers/src/noise_connection_tokio.rs b/roles/roles-utils/network-helpers/src/noise_connection_tokio.rs index 29c7e3ab4..93f036896 100644 --- a/roles/roles-utils/network-helpers/src/noise_connection_tokio.rs +++ b/roles/roles-utils/network-helpers/src/noise_connection_tokio.rs @@ -1,6 +1,8 @@ use crate::Error; use binary_sv2::{Deserialize, Serialize}; -use const_sv2::{INITIATOR_EXPECTED_HANDSHAKE_MESSAGE_SIZE, RESPONDER_EXPECTED_HANDSHAKE_MESSAGE_SIZE}; +use const_sv2::{ + INITIATOR_EXPECTED_HANDSHAKE_MESSAGE_SIZE, RESPONDER_EXPECTED_HANDSHAKE_MESSAGE_SIZE, +}; use futures::lock::Mutex; use std::{fmt::Debug, sync::Arc, time::Duration}; use tokio::{ @@ -9,13 +11,12 @@ use tokio::{ task::{self, AbortHandle}, }; -use std::{ - convert::TryInto, - sync::atomic::AtomicBool, -}; +use std::{convert::TryInto, sync::atomic::AtomicBool}; use binary_sv2::GetSize; -use codec_sv2::{HandShakeFrame, HandshakeRole, Initiator, Responder, StandardEitherFrame, StandardNoiseDecoder}; +use codec_sv2::{ + HandShakeFrame, HandshakeRole, Initiator, Responder, StandardEitherFrame, StandardNoiseDecoder, +}; use tracing::{debug, error}; @@ -42,7 +43,10 @@ impl crate::SetState for Connection { impl Connection { #[allow(clippy::new_ret_no_self)] // Debug added for some trait requirement - pub async fn new<'a, Message: Serialize + Deserialize<'a> + GetSize + Send + 'static + Clone + Debug>( + pub async fn new< + 'a, + Message: Serialize + Deserialize<'a> + GetSize + Send + 'static + Clone + Debug, + >( stream: TcpStream, role: HandshakeRole, ) -> Result< @@ -66,7 +70,7 @@ impl Connection { tokio::sync::broadcast::Sender>, tokio::sync::broadcast::Receiver>, ) = tokio::sync::broadcast::channel(10); // TODO caller should provide this param - + // let (sender_outgoing, receiver_outgoing): ( // Sender>, // Receiver>, @@ -87,7 +91,7 @@ impl Connection { // RECEIVE AND PARSE INCOMING MESSAGES FROM TCP STREAM let recv_task = task::spawn(async move { let mut decoder = StandardNoiseDecoder::::new(); - + loop { let writable = decoder.writable(); match reader.read_exact(writable).await { @@ -241,8 +245,6 @@ pub async fn connect( Ok((stream, role)) } - - async fn initialize_as_downstream_tokio< 'a, // Debug added for this unwrap, remove later @@ -284,7 +286,11 @@ async fn initialize_as_downstream_tokio< // Addition of Debug, for some trait requirement // clone is again a broadcast requirement -async fn initialize_as_upstream_tokio<'a, Message: Serialize + Deserialize<'a> + GetSize + Debug + Clone, T: crate::SetState>( +async fn initialize_as_upstream_tokio< + 'a, + Message: Serialize + Deserialize<'a> + GetSize + Debug + Clone, + T: crate::SetState, +>( self_: Arc>, role: HandshakeRole, sender_outgoing: tokio::sync::broadcast::Sender>, @@ -295,7 +301,8 @@ async fn initialize_as_upstream_tokio<'a, Message: Serialize + Deserialize<'a> + // Receive and deserialize first handshake message let first_message: HandShakeFrame = receiver_incoming .recv() - .await.unwrap() + .await + .unwrap() .try_into() .map_err(|_| Error::HandshakeRemoteInvalidMessage)?; let first_message: [u8; RESPONDER_EXPECTED_HANDSHAKE_MESSAGE_SIZE] = first_message diff --git a/roles/roles-utils/network-helpers/src/plain_connection_tokio.rs b/roles/roles-utils/network-helpers/src/plain_connection_tokio.rs index 71c9d77be..9e883d112 100644 --- a/roles/roles-utils/network-helpers/src/plain_connection_tokio.rs +++ b/roles/roles-utils/network-helpers/src/plain_connection_tokio.rs @@ -22,7 +22,10 @@ impl PlainConnection { /// * `strict` - true - will disconnect a connection that sends a message that can't be /// translated, false - will ignore messages that can't be translated #[allow(clippy::new_ret_no_self)] - pub async fn new<'a, Message: Serialize + Deserialize<'a> + GetSize + Send + 'static + Clone>( + pub async fn new< + 'a, + Message: Serialize + Deserialize<'a> + GetSize + Send + 'static + Clone, + >( stream: TcpStream, ) -> ( tokio::sync::broadcast::Sender>, @@ -119,7 +122,10 @@ impl PlainConnection { Err(e) => { // Just fail and force to reinitilize everything let _ = writer.shutdown().await; - error!("Failed to read from stream - terminating connection, {:?}",e); + error!( + "Failed to read from stream - terminating connection, {:?}", + e + ); task::yield_now().await; break; } diff --git a/roles/test-utils/mining-device-sv1/src/client.rs b/roles/test-utils/mining-device-sv1/src/client.rs index f680e04a1..4ae08032f 100644 --- a/roles/test-utils/mining-device-sv1/src/client.rs +++ b/roles/test-utils/mining-device-sv1/src/client.rs @@ -1,10 +1,8 @@ -use async_std::net::TcpStream; -use std::{convert::TryInto, net::SocketAddr, ops::Div}; -use async_std::{io::BufReader, prelude::*, task}; +use async_std::{io::BufReader, net::TcpStream, prelude::*, task}; use num_bigint::BigUint; use num_traits::FromPrimitive; use roles_logic_sv2::utils::Mutex; -use std::{sync::Arc, time}; +use std::{convert::TryInto, net::SocketAddr, ops::Div, sync::Arc, time}; use tracing::{error, info, warn}; use stratum_common::bitcoin::util::uint::Uint256; @@ -84,7 +82,7 @@ impl Client { // results are formated into a "mining.submit" messages that is then sent to the // Upstream via `sender_outgoing` // mpsc can be used - let (sender_share,mut receiver_share) = tokio::sync::mpsc::channel(10); + let (sender_share, mut receiver_share) = tokio::sync::mpsc::channel(10); // Instantiates a new `Miner` (a mock of an actual Mining Device) with a job id of 0. let miner = Arc::new(Mutex::new(Miner::new(0))); @@ -208,7 +206,10 @@ impl Client { sender_outgoing_clone.send(message).await.unwrap(); } }); - let mut recv_incoming = client.safe_lock(|c| c.sender_incoming.clone()).unwrap().subscribe(); + let mut recv_incoming = client + .safe_lock(|c| c.sender_incoming.clone()) + .unwrap() + .subscribe(); loop { match client.clone().safe_lock(|c| c.status).unwrap() { diff --git a/roles/test-utils/mining-device/src/lib/mod.rs b/roles/test-utils/mining-device/src/lib/mod.rs index e51149959..5ff088f39 100644 --- a/roles/test-utils/mining-device/src/lib/mod.rs +++ b/roles/test-utils/mining-device/src/lib/mod.rs @@ -73,10 +73,14 @@ pub async fn connect( info!("Pool tcp connection established at {}", address); let address = socket.peer_addr().unwrap(); let initiator = Initiator::new(pub_key.map(|e| e.0)); - let (receiver, sender, _, _): (tokio::sync::broadcast::Sender, tokio::sync::broadcast::Sender, _, _) = - Connection::new(socket, codec_sv2::HandshakeRole::Initiator(initiator)) - .await - .unwrap(); + let (receiver, sender, _, _): ( + tokio::sync::broadcast::Sender, + tokio::sync::broadcast::Sender, + _, + _, + ) = Connection::new(socket, codec_sv2::HandshakeRole::Initiator(initiator)) + .await + .unwrap(); info!("Pool noise connection established at {}", address); Device::start( receiver, @@ -143,7 +147,13 @@ impl SetupConnectionHandler { sender.send(sv2_frame).unwrap(); info!("Setup connection sent to {}", address); - let mut incoming: StdFrame = receiver.subscribe().recv().await.unwrap().try_into().unwrap(); + let mut incoming: StdFrame = receiver + .subscribe() + .recv() + .await + .unwrap() + .try_into() + .unwrap(); let message_type = incoming.get_header().unwrap().msg_type(); let payload = incoming.payload(); ParseUpstreamCommonMessages::handle_message_common( @@ -250,7 +260,8 @@ impl Device { let miner = Arc::new(Mutex::new(Miner::new(handicap))); // mpsc can be used. // let (notify_changes_to_mining_thread, update_miners) = async_channel::unbounded(); - let (notify_changes_to_mining_thread, update_miners) = tokio::sync::mpsc::unbounded_channel(); + let (notify_changes_to_mining_thread, update_miners) = + tokio::sync::mpsc::unbounded_channel(); let self_ = Self { channel_opened: false, receiver: receiver.clone(), @@ -286,7 +297,13 @@ impl Device { }); loop { - let mut incoming: StdFrame = receiver.subscribe().recv().await.unwrap().try_into().unwrap(); + let mut incoming: StdFrame = receiver + .subscribe() + .recv() + .await + .unwrap() + .try_into() + .unwrap(); let message_type = incoming.get_header().unwrap().msg_type(); let payload = incoming.payload(); let next = Device::handle_message_mining( @@ -304,10 +321,7 @@ impl Device { || message_type == const_sv2::MESSAGE_TYPE_SET_NEW_PREV_HASH || message_type == const_sv2::MESSAGE_TYPE_SET_TARGET) { - notify_changes_to_mining_thread - .sender - .send(()) - .unwrap(); + notify_changes_to_mining_thread.sender.send(()).unwrap(); notify_changes_to_mining_thread.should_send = false; }; match next { @@ -698,7 +712,11 @@ fn start_mining_threads( }); } -fn mine(mut miner: Miner, share_send: tokio::sync::mpsc::UnboundedSender<(u32, u32, u32, u32)>, kill: Arc) { +fn mine( + mut miner: Miner, + share_send: tokio::sync::mpsc::UnboundedSender<(u32, u32, u32, u32)>, + kill: Arc, +) { if miner.handicap != 0 { loop { if kill.load(Ordering::Relaxed) { diff --git a/roles/tests-integration/lib/sniffer.rs b/roles/tests-integration/lib/sniffer.rs index ed9464d4b..182c4315d 100644 --- a/roles/tests-integration/lib/sniffer.rs +++ b/roles/tests-integration/lib/sniffer.rs @@ -166,7 +166,10 @@ impl Sniffer { async fn create_downstream( stream: TcpStream, - ) -> Option<(tokio::sync::broadcast::Sender, tokio::sync::broadcast::Sender)> { + ) -> Option<( + tokio::sync::broadcast::Sender, + tokio::sync::broadcast::Sender, + )> { let pub_key = "9auqWEzQDVyd2oe1JVGFLMLHZtCo2FFqZwtKA5gd9xbuEu7PH72" .to_string() .parse::() @@ -195,7 +198,10 @@ impl Sniffer { async fn create_upstream( stream: TcpStream, - ) -> Option<(tokio::sync::broadcast::Sender, tokio::sync::broadcast::Sender)> { + ) -> Option<( + tokio::sync::broadcast::Sender, + tokio::sync::broadcast::Sender, + )> { let initiator = Initiator::without_pk().expect("This fn call can not fail"); if let Ok((receiver_from_server, sender_to_server, _, _)) = Connection::new::<'static, AnyMessage<'static>>( diff --git a/roles/translator/src/lib/downstream_sv1/diff_management.rs b/roles/translator/src/lib/downstream_sv1/diff_management.rs index fb64e040c..3f366ed52 100644 --- a/roles/translator/src/lib/downstream_sv1/diff_management.rs +++ b/roles/translator/src/lib/downstream_sv1/diff_management.rs @@ -348,14 +348,14 @@ impl Downstream { // } // let calculated_share_per_min = count as f32 / (elapsed.as_secs_f32() / 60.0); -// // This is the error margin for a confidence of 99.99...% given the expect number of shares -// // per minute TODO the review the math under it +// // This is the error margin for a confidence of 99.99...% given the expect number of +// shares // per minute TODO the review the math under it // let error_margin = get_error(expected_shares_per_minute); // let error = (calculated_share_per_min - expected_shares_per_minute as f32).abs(); // assert!( // error <= error_margin as f32, -// "Calculated shares per minute are outside the 99.99...% confidence interval. Error: {:?}, Error margin: {:?}, {:?}", error, error_margin,calculated_share_per_min -// ); +// "Calculated shares per minute are outside the 99.99...% confidence interval. Error: +// {:?}, Error margin: {:?}, {:?}", error, error_margin,calculated_share_per_min ); // } // fn get_error(lambda: f64) -> f64 { diff --git a/roles/translator/src/lib/downstream_sv1/downstream.rs b/roles/translator/src/lib/downstream_sv1/downstream.rs index c558aa1ef..66f6c73ef 100644 --- a/roles/translator/src/lib/downstream_sv1/downstream.rs +++ b/roles/translator/src/lib/downstream_sv1/downstream.rs @@ -149,7 +149,10 @@ impl Downstream { // receiving messages with a future (either TCP recv or Receiver<_>) we use the // futures::select! macro to merge the receiving end of a task channels into a single loop // within the task - let (tx_shutdown, _): (tokio::sync::broadcast::Sender, tokio::sync::broadcast::Receiver) = tokio::sync::broadcast::channel(3); + let (tx_shutdown, _): ( + tokio::sync::broadcast::Sender, + tokio::sync::broadcast::Receiver, + ) = tokio::sync::broadcast::channel(3); let tx_shutdown_clone = tx_shutdown.clone(); let tx_status_reader = tx_status.clone(); diff --git a/roles/translator/src/lib/error.rs b/roles/translator/src/lib/error.rs index 9fb354df2..74286c026 100644 --- a/roles/translator/src/lib/error.rs +++ b/roles/translator/src/lib/error.rs @@ -32,7 +32,9 @@ pub enum ChannelSendError<'a> { ), NewExtendedMiningJobTokio(tokio::sync::broadcast::error::SendError>), ExtranonceTokio(tokio::sync::mpsc::error::SendError<(ExtendedExtranonce, u32)>), - SetNewPrevHashTokio(tokio::sync::broadcast::error::SendError>), + SetNewPrevHashTokio( + tokio::sync::broadcast::error::SendError>, + ), V1MessageTokio(tokio::sync::mpsc::error::SendError), } @@ -219,10 +221,15 @@ impl<'a> From From>> +impl<'a> + From>> for Error<'a> { - fn from(e: tokio::sync::broadcast::error::SendError>) -> Self { + fn from( + e: tokio::sync::broadcast::error::SendError< + roles_logic_sv2::mining_sv2::SetNewPrevHash<'a>, + >, + ) -> Self { Error::ChannelErrorSender(ChannelSendError::SetNewPrevHashTokio(e)) } } @@ -263,9 +270,12 @@ impl<'a> From>> for Error<'a> } } - impl<'a> From>> for Error<'a> { - fn from(e: tokio::sync::broadcast::error::SendError>) -> Self { + fn from( + e: tokio::sync::broadcast::error::SendError< + roles_logic_sv2::mining_sv2::NewExtendedMiningJob<'a>, + >, + ) -> Self { Error::ChannelErrorSender(ChannelSendError::NewExtendedMiningJobTokio(e)) } } diff --git a/roles/translator/src/lib/mod.rs b/roles/translator/src/lib/mod.rs index 34d468245..1032f89a2 100644 --- a/roles/translator/src/lib/mod.rs +++ b/roles/translator/src/lib/mod.rs @@ -49,7 +49,7 @@ impl TranslatorSv2 { pub async fn start(self) { // mpsc can be used as single consumer. // let (tx_status, rx_status) = unbounded(); - let (tx_status,mut rx_status) = tokio::sync::mpsc::unbounded_channel(); + let (tx_status, mut rx_status) = tokio::sync::mpsc::unbounded_channel(); let target = Arc::new(Mutex::new(vec![0; 32])); @@ -152,7 +152,8 @@ impl TranslatorSv2 { ) { // Sender/Receiver to send a SV2 `SubmitSharesExtended` from the `Bridge` to the `Upstream` // (Sender>, Receiver>) - // Producer I dont give much damn about, consumer are getting cloned,so broadcast should be used. + // Producer I dont give much damn about, consumer are getting cloned,so broadcast should be + // used. let (tx_sv2_submit_shares_ext, _) = tokio::sync::broadcast::channel(10); // `tx_sv1_bridge` sender is used by `Downstream` to send a `DownstreamMessages` message to diff --git a/roles/translator/src/lib/proxy/bridge.rs b/roles/translator/src/lib/proxy/bridge.rs index 602a51be9..cc7389824 100644 --- a/roles/translator/src/lib/proxy/bridge.rs +++ b/roles/translator/src/lib/proxy/bridge.rs @@ -392,8 +392,10 @@ impl Bridge { let handle_new_prev_hash = tokio::task::spawn(async move { loop { // Receive `SetNewPrevHash` from `Upstream` - let sv2_set_new_prev_hash: SetNewPrevHash = - handle_result!(tx_status, tx_sv2_set_new_prev_hash.clone().subscribe().recv().await); + let sv2_set_new_prev_hash: SetNewPrevHash = handle_result!( + tx_status, + tx_sv2_set_new_prev_hash.clone().subscribe().recv().await + ); debug!( "handle_new_prev_hash job_id: {:?}", &sv2_set_new_prev_hash.job_id @@ -560,8 +562,8 @@ pub struct OpenSv1Downstream { // let (tx_sv1_notify, rx_sv1_notify) = broadcast::channel(1); // let (tx_status, _rx_status) = bounded(1); // let upstream_target = vec![ -// 0, 0, 0, 0, 255, 255, 255, 255, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, -// 0, 0, 0, 0, 0, 0, 0, +// 0, 0, 0, 0, 255, 255, 255, 255, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, +// 0, 0, 0, 0, 0, 0, 0, 0, // ]; // let interface = BridgeInterface { // tx_sv1_submit, @@ -642,8 +644,8 @@ pub struct OpenSv1Downstream { // channel_id, // job_id: 0, // prev_hash: [ -// 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, -// 3, 3, 3, 3, 3, 3, 3, +// 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, +// 3, 3, 3, 3, 3, 3, 3, 3, // ] // .into(), // min_ntime: 989898, diff --git a/roles/translator/src/lib/status.rs b/roles/translator/src/lib/status.rs index dcb3e6b01..f12fd7b01 100644 --- a/roles/translator/src/lib/status.rs +++ b/roles/translator/src/lib/status.rs @@ -17,7 +17,7 @@ pub enum Sender { #[derive(Debug)] pub enum ErrorS { AsyncError(async_channel::SendError>), - TokioError(tokio::sync::mpsc::error::SendError>) + TokioError(tokio::sync::mpsc::error::SendError>), } impl Sender { @@ -29,21 +29,26 @@ impl Sender { } } - pub async fn send( - &self, - status: Status<'static>, - ) -> Result<(), ErrorS> { + pub async fn send(&self, status: Status<'static>) -> Result<(), ErrorS> { match self { Self::Downstream(inner) => inner.send(status).await.map_err(|e| ErrorS::AsyncError(e)), - Self::DownstreamListener(inner) => inner.send(status).await.map_err(|e| ErrorS::AsyncError(e)), + Self::DownstreamListener(inner) => { + inner.send(status).await.map_err(|e| ErrorS::AsyncError(e)) + } Self::Bridge(inner) => inner.send(status).await.map_err(|e| ErrorS::AsyncError(e)), Self::Upstream(inner) => inner.send(status).await.map_err(|e| ErrorS::AsyncError(e)), - Self::TemplateReceiver(inner) => inner.send(status).await.map_err(|e| ErrorS::AsyncError(e)), + Self::TemplateReceiver(inner) => { + inner.send(status).await.map_err(|e| ErrorS::AsyncError(e)) + } Self::UpstreamTokio(inner) => inner.send(status).map_err(|e| ErrorS::TokioError(e)), Self::BridgeTokio(inner) => inner.send(status).map_err(|e| ErrorS::TokioError(e)), - Self::DownstreamListenerTokio(inner) => inner.send(status).map_err(|e| ErrorS::TokioError(e)), + Self::DownstreamListenerTokio(inner) => { + inner.send(status).map_err(|e| ErrorS::TokioError(e)) + } Self::DownstreamTokio(inner) => inner.send(status).map_err(|e| ErrorS::TokioError(e)), - Self::TemplateReceiverTokio(inner) => inner.send(status).map_err(|e| ErrorS::TokioError(e)), + Self::TemplateReceiverTokio(inner) => { + inner.send(status).map_err(|e| ErrorS::TokioError(e)) + } } } } @@ -128,7 +133,7 @@ async fn send_status( }) .await .unwrap_or(()); - }, + } Sender::UpstreamTokio(tx) => match e { Error::ChannelErrorReceiver(_) => { tx.send(Status { @@ -148,7 +153,7 @@ async fn send_status( state: State::BridgeShutdown(e), }) .unwrap_or(()); - }, + } Sender::DownstreamTokio(tx) => { tx.send(Status { state: State::Healthy(e.to_string()), @@ -160,13 +165,13 @@ async fn send_status( state: State::DownstreamShutdown(e), }) .unwrap_or(()); - }, + } Sender::TemplateReceiverTokio(tx) => { tx.send(Status { state: State::UpstreamShutdown(e), }) .unwrap_or(()); - }, + } } outcome } diff --git a/roles/translator/src/lib/upstream_sv2/upstream.rs b/roles/translator/src/lib/upstream_sv2/upstream.rs index 68341334e..72c12c9e7 100644 --- a/roles/translator/src/lib/upstream_sv2/upstream.rs +++ b/roles/translator/src/lib/upstream_sv2/upstream.rs @@ -153,7 +153,7 @@ impl Upstream { ); // Channel to send and receive messages to the SV2 Upstream role - let (receiver, sender,_,_) = Connection::new(socket, HandshakeRole::Initiator(initiator)) + let (receiver, sender, _, _) = Connection::new(socket, HandshakeRole::Initiator(initiator)) .await .unwrap(); // Initialize `UpstreamConnection` with channel for SV2 Upstream role communication and @@ -479,7 +479,7 @@ impl Upstream { pub fn handle_submit(self_: Arc>) -> ProxyResult<'static, ()> { let task_collector = self_.safe_lock(|s| s.task_collector.clone()).unwrap(); let clone = self_.clone(); - let (tx_frame,mut receiver, tx_status) = clone + let (tx_frame, mut receiver, tx_status) = clone .safe_lock(|s| { ( s.connection.sender.clone(),