Skip to content

Commit

Permalink
style: replace 'thread' with 'task'
Browse files Browse the repository at this point in the history
replaces 'thread' with 'task' in struct names, variable names, comments,
and docs.

This is following up on:
Neptune-Crypto#165

quote:

  re threads/tasks/g in neptune-core overview:

  It is incorrect nomenclature to call these threads. They have never been
  threads. Rather they are tokio spawned tasks running on tokio's threadpool.
  They may run on the same or different operating system thread from the parent
  task, at tokio's discretion. Unfortunately there are still some comments and
  variables in the code that refer to tasks as threads, which may perpetuate
  the misconception.

This is now corrected.
  • Loading branch information
dan-da committed Jul 18, 2024
1 parent 7e64b1c commit 8d302dc
Show file tree
Hide file tree
Showing 17 changed files with 314 additions and 324 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ To use tokio-console with neptune-core:

## Local Integration Test Strategy

This repository contains unit tests, but multi-threaded programs are notoriously hard to test. And the unit tests usually only cover narrow parts of the code within a single thread. When you are making changes to the code, you can run through the following checks
This repository contains unit tests, but async programs are notoriously hard to test. And the unit tests usually only cover narrow parts of the code within a single async task. When you are making changes to the code, you can run through the following checks
1. `cargo b` to verify that it builds without warnings
2. `cargo t` to verify that all unit tests work
3. `run-multiple-instances.sh` to spin up three nodes that are connected through `localhost`. Instance `I0` and `I2` should be mining and all three clients should be converging on the same blocks. You can read the hashes of the blocks in the log output and verify that they all store the same blocks.
Expand Down
10 changes: 5 additions & 5 deletions docs/src/neptune-core/events.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ neptune-core can be seen as an event-driven program. Below is a list of all the

## Events

| Description | Direct Thread Messages | Indirect Thread Messages | Spawned Network Messages |
| :-------------------------------------------------------------------------------- | :------------------------- | :------------------------------------------------------------------------------ | :----------------------------- |
| New block found locally | FromMinerToMain::NewBlock | MainToPeerThread::BlockFromMiner <br /> PeerMessage::Block | PeerMessage::Block |
| New block received from peer <br /> Got: PeerMessage::Block | PeerThreadToMain::NewBlock | ToMiner::NewBlock <br /> <span style="color:red">MainToPeerThread::Block</span> | PeerMessage::BlockNotification |
| Block notification received from peer <br /> Got: PeerMessage::BlockNotification | MainToMiner::NewBlock | <span style="color:red">MainToPeerThread::Block</span> | PeerMessage::BlockNotification |
| Description | Direct Task Messages | Indirect Task Messages | Spawned Network Messages |
| :-------------------------------------------------------------------------------- | :------------------------- | :---------------------------------------------------------------------------- | :----------------------------- |
| New block found locally | FromMinerToMain::NewBlock | MainToPeerTask::BlockFromMiner <br /> PeerMessage::Block | PeerMessage::Block |
| New block received from peer <br /> Got: PeerMessage::Block | PeerTaskToMain::NewBlock | ToMiner::NewBlock <br /> <span style="color:red">MainToPeerTask::Block</span> | PeerMessage::BlockNotification |
| Block notification received from peer <br /> Got: PeerMessage::BlockNotification | MainToMiner::NewBlock | <span style="color:red">MainToPeerTask::Block</span> | PeerMessage::BlockNotification |
10 changes: 5 additions & 5 deletions src/bin/dashboard_src/history_screen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ pub struct HistoryScreen {
in_focus: bool,
data: BalanceUpdateArc,
server: Arc<RPCClient>,
poll_thread: Option<JoinHandleArc>,
poll_task: Option<JoinHandleArc>,
escalatable_event: DashboardEventArc,
events: Events,
}
Expand All @@ -117,7 +117,7 @@ impl HistoryScreen {
in_focus: false,
data: data.clone(),
server: rpc_server,
poll_thread: None,
poll_task: None,
escalatable_event: Arc::new(std::sync::Mutex::new(None)),
events: data.into(),
}
Expand Down Expand Up @@ -204,15 +204,15 @@ impl Screen for HistoryScreen {
let server_arc = self.server.clone();
let data_arc = self.data.clone();
let escalatable_event_arc = self.escalatable_event.clone();
self.poll_thread = Some(Arc::new(Mutex::new(tokio::spawn(async move {
self.poll_task = Some(Arc::new(Mutex::new(tokio::spawn(async move {
HistoryScreen::run_polling_loop(server_arc, data_arc, escalatable_event_arc).await;
}))));
}

fn deactivate(&mut self) {
self.active = false;
if let Some(thread_handle) = &self.poll_thread {
(*thread_handle.lock().unwrap()).abort();
if let Some(task_handle) = &self.poll_task {
(*task_handle.lock().unwrap()).abort();
}
}

Expand Down
10 changes: 5 additions & 5 deletions src/bin/dashboard_src/overview_screen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ pub struct OverviewScreen {
in_focus: bool,
data: Arc<std::sync::Mutex<OverviewData>>,
server: Arc<RPCClient>,
poll_thread: Option<Arc<Mutex<JoinHandle<()>>>>,
poll_task: Option<Arc<Mutex<JoinHandle<()>>>>,
escalatable_event: Arc<std::sync::Mutex<Option<DashboardEvent>>>,
}

Expand All @@ -172,7 +172,7 @@ impl OverviewScreen {
listen_addr_for_peers,
))),
server: rpc_server,
poll_thread: None,
poll_task: None,
escalatable_event: Arc::new(std::sync::Mutex::new(None)),
}
}
Expand Down Expand Up @@ -279,15 +279,15 @@ impl Screen for OverviewScreen {
let server_arc = self.server.clone();
let data_arc = self.data.clone();
let escalatable_event_arc = self.escalatable_event.clone();
self.poll_thread = Some(Arc::new(Mutex::new(tokio::spawn(async move {
self.poll_task = Some(Arc::new(Mutex::new(tokio::spawn(async move {
OverviewScreen::run_polling_loop(server_arc, data_arc, escalatable_event_arc).await;
}))));
}

fn deactivate(&mut self) {
self.active = false;
if let Some(thread_handle) = &self.poll_thread {
(*thread_handle.lock().unwrap()).abort();
if let Some(task_handle) = &self.poll_task {
(*task_handle.lock().unwrap()).abort();
}
}

Expand Down
10 changes: 5 additions & 5 deletions src/bin/dashboard_src/peers_screen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub struct PeersScreen {
in_focus: bool,
data: Arc<std::sync::Mutex<Vec<PeerInfo>>>,
server: Arc<RPCClient>,
poll_thread: Option<Arc<Mutex<JoinHandle<()>>>>,
poll_task: Option<Arc<Mutex<JoinHandle<()>>>>,
escalatable_event: Arc<std::sync::Mutex<Option<DashboardEvent>>>,
}

Expand All @@ -38,7 +38,7 @@ impl PeersScreen {
in_focus: false,
data: Arc::new(Mutex::new(vec![])),
server: rpc_server,
poll_thread: None,
poll_task: None,
escalatable_event: Arc::new(std::sync::Mutex::new(None)),
}
}
Expand Down Expand Up @@ -113,15 +113,15 @@ impl Screen for PeersScreen {
let server_arc = self.server.clone();
let data_arc = self.data.clone();
let escalatable_event_arc = self.escalatable_event.clone();
self.poll_thread = Some(Arc::new(Mutex::new(tokio::spawn(async move {
self.poll_task = Some(Arc::new(Mutex::new(tokio::spawn(async move {
PeersScreen::run_polling_loop(server_arc, data_arc, escalatable_event_arc).await;
}))));
}

fn deactivate(&mut self) {
self.active = false;
if let Some(thread_handle) = &self.poll_thread {
(*thread_handle.lock().unwrap()).abort();
if let Some(task_handle) = &self.poll_task {
(*task_handle.lock().unwrap()).abort();
}
}

Expand Down
54 changes: 27 additions & 27 deletions src/connect_to_peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use tracing::{debug, error, info, warn};

use crate::{
models::{
channel::{MainToPeerThread, PeerThreadToMain},
channel::{MainToPeerTask, PeerTaskToMain},
peer::{
ConnectionRefusedReason, ConnectionStatus, HandshakeData, PeerMessage, PeerStanding,
},
Expand Down Expand Up @@ -134,24 +134,24 @@ pub async fn answer_peer_wrapper<S>(
stream: S,
state_lock: GlobalStateLock,
peer_address: std::net::SocketAddr,
main_to_peer_thread_rx: broadcast::Receiver<MainToPeerThread>,
peer_thread_to_main_tx: mpsc::Sender<PeerThreadToMain>,
main_to_peer_task_rx: broadcast::Receiver<MainToPeerTask>,
peer_task_to_main_tx: mpsc::Sender<PeerTaskToMain>,
own_handshake_data: HandshakeData,
) -> Result<()>
where
S: AsyncRead + AsyncWrite + std::fmt::Debug + std::marker::Unpin,
{
let state_lock_clone = state_lock.clone();
let peer_thread_to_main_tx_clone = peer_thread_to_main_tx.clone();
let peer_task_to_main_tx_clone = peer_task_to_main_tx.clone();
let mut inner_ret: anyhow::Result<()> = Ok(());

let panic_result = std::panic::AssertUnwindSafe(async {
inner_ret = answer_peer(
stream,
state_lock_clone,
peer_address,
main_to_peer_thread_rx,
peer_thread_to_main_tx,
main_to_peer_task_rx,
peer_task_to_main_tx,
own_handshake_data,
)
.await;
Expand All @@ -162,11 +162,11 @@ where
match panic_result {
Ok(_) => (),
Err(_err) => {
error!("Peer thread (incoming) for {peer_address} panicked. Invoking close connection callback");
error!("Peer task (incoming) for {peer_address} panicked. Invoking close connection callback");
let _ret = close_peer_connected_callback(
state_lock.clone(),
peer_address,
&peer_thread_to_main_tx_clone,
&peer_task_to_main_tx_clone,
)
.await;
}
Expand All @@ -179,8 +179,8 @@ async fn answer_peer<S>(
stream: S,
state: GlobalStateLock,
peer_address: std::net::SocketAddr,
main_to_peer_thread_rx: broadcast::Receiver<MainToPeerThread>,
peer_thread_to_main_tx: mpsc::Sender<PeerThreadToMain>,
main_to_peer_task_rx: broadcast::Receiver<MainToPeerTask>,
peer_task_to_main_tx: mpsc::Sender<PeerTaskToMain>,
own_handshake_data: HandshakeData,
) -> Result<()>
where
Expand Down Expand Up @@ -249,7 +249,7 @@ where
info!("Connection accepted from {}", peer_address);
let peer_distance = 1; // All incoming connections have distance 1
let peer_loop_handler = PeerLoopHandler::new(
peer_thread_to_main_tx,
peer_task_to_main_tx,
state,
peer_address,
peer_handshake_data,
Expand All @@ -258,24 +258,24 @@ where
);

peer_loop_handler
.run_wrapper(peer, main_to_peer_thread_rx)
.run_wrapper(peer, main_to_peer_task_rx)
.await?;

Ok(())
}

/// Perform handshake and establish connection to a new peer while handling any panics in the peer
/// thread gracefully.
/// task gracefully.
pub async fn call_peer_wrapper(
peer_address: std::net::SocketAddr,
state: GlobalStateLock,
main_to_peer_thread_rx: broadcast::Receiver<MainToPeerThread>,
peer_thread_to_main_tx: mpsc::Sender<PeerThreadToMain>,
main_to_peer_task_rx: broadcast::Receiver<MainToPeerTask>,
peer_task_to_main_tx: mpsc::Sender<PeerTaskToMain>,
own_handshake_data: HandshakeData,
distance: u8,
) {
let state_clone = state.clone();
let peer_thread_to_main_tx_clone = peer_thread_to_main_tx.clone();
let peer_task_to_main_tx_clone = peer_task_to_main_tx.clone();
let panic_result = std::panic::AssertUnwindSafe(async {
debug!("Attempting to initiate connection");
match tokio::net::TcpStream::connect(peer_address).await {
Expand All @@ -287,8 +287,8 @@ pub async fn call_peer_wrapper(
stream,
state,
peer_address,
main_to_peer_thread_rx,
peer_thread_to_main_tx,
main_to_peer_task_rx,
peer_task_to_main_tx,
&own_handshake_data,
distance,
)
Expand All @@ -308,11 +308,11 @@ pub async fn call_peer_wrapper(
match panic_result {
Ok(_) => (),
Err(_) => {
error!("Peer thread (outgoing) for {peer_address} panicked. Invoking close connection callback");
error!("Peer task (outgoing) for {peer_address} panicked. Invoking close connection callback");
let _ret = close_peer_connected_callback(
state_clone,
peer_address,
&peer_thread_to_main_tx_clone,
&peer_task_to_main_tx_clone,
)
.await;
}
Expand All @@ -323,8 +323,8 @@ async fn call_peer<S>(
stream: S,
state: GlobalStateLock,
peer_address: std::net::SocketAddr,
main_to_peer_thread_rx: broadcast::Receiver<MainToPeerThread>,
peer_thread_to_main_tx: mpsc::Sender<PeerThreadToMain>,
main_to_peer_task_rx: broadcast::Receiver<MainToPeerTask>,
peer_task_to_main_tx: mpsc::Sender<PeerTaskToMain>,
own_handshake: &HandshakeData,
peer_distance: u8,
) -> Result<()>
Expand Down Expand Up @@ -404,30 +404,30 @@ where
}

let peer_loop_handler = PeerLoopHandler::new(
peer_thread_to_main_tx,
peer_task_to_main_tx,
state,
peer_address,
other_handshake,
false,
peer_distance,
);
peer_loop_handler
.run_wrapper(peer, main_to_peer_thread_rx)
.run_wrapper(peer, main_to_peer_task_rx)
.await?;

Ok(())
}

/// Remove peer from state. This function must be called every time
/// a peer is disconnected. Whether this happens through a panic
/// in the peer thread or through a regular disconnect.
/// in the peer task or through a regular disconnect.
///
/// Locking:
/// * acquires `global_state_lock` for write
pub async fn close_peer_connected_callback(
global_state_lock: GlobalStateLock,
peer_address: SocketAddr,
to_main_tx: &mpsc::Sender<PeerThreadToMain>,
to_main_tx: &mpsc::Sender<PeerTaskToMain>,
) -> Result<()> {
let mut global_state_mut = global_state_lock.lock_guard_mut().await;
// Store any new peer-standing to database
Expand All @@ -449,7 +449,7 @@ pub async fn close_peer_connected_callback(

// This message is used to determine if we are to exit synchronization mode
to_main_tx
.send(PeerThreadToMain::RemovePeerMaxBlockHeight(peer_address))
.send(PeerTaskToMain::RemovePeerMaxBlockHeight(peer_address))
.await?;

Ok(())
Expand Down
Loading

0 comments on commit 8d302dc

Please sign in to comment.