Skip to content

Commit

Permalink
Feature: AsyncRuntime::MpscUnbounded to abstract mpsc channels
Browse files Browse the repository at this point in the history
  • Loading branch information
drmingdrmer committed Jul 8, 2024
1 parent b51cb8d commit 0a8a5da
Show file tree
Hide file tree
Showing 14 changed files with 257 additions and 38 deletions.
22 changes: 13 additions & 9 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@ use futures::StreamExt;
use futures::TryFutureExt;
use maplit::btreeset;
use tokio::select;
use tokio::sync::mpsc;
use tokio::sync::watch;
use tracing::Instrument;
use tracing::Level;
use tracing::Span;

use crate::async_runtime::MpscUnboundedSender;
use crate::async_runtime::OneshotSender;
use crate::async_runtime::TryRecvError;
use crate::config::Config;
use crate::config::RuntimeConfig;
use crate::core::balancer::Balancer;
Expand Down Expand Up @@ -86,8 +87,11 @@ use crate::runtime::RaftRuntime;
use crate::storage::LogFlushed;
use crate::storage::RaftLogStorage;
use crate::type_config::alias::InstantOf;
use crate::type_config::alias::MpscUnboundedReceiverOf;
use crate::type_config::alias::MpscUnboundedSenderOf;
use crate::type_config::alias::OneshotReceiverOf;
use crate::type_config::alias::ResponderOf;
use crate::type_config::async_runtime::MpscUnboundedReceiver;
use crate::type_config::TypeConfigExt;
use crate::ChangeMembers;
use crate::Instant;
Expand Down Expand Up @@ -164,15 +168,15 @@ where
pub(crate) replications: BTreeMap<C::NodeId, ReplicationHandle<C>>,

#[allow(dead_code)]
pub(crate) tx_api: mpsc::UnboundedSender<RaftMsg<C>>,
pub(crate) rx_api: mpsc::UnboundedReceiver<RaftMsg<C>>,
pub(crate) tx_api: MpscUnboundedSenderOf<C, RaftMsg<C>>,
pub(crate) rx_api: MpscUnboundedReceiverOf<C, RaftMsg<C>>,

/// A Sender to send callback by other components to [`RaftCore`], when an action is finished,
/// such as flushing log to disk, or applying log entries to state machine.
pub(crate) tx_notify: mpsc::UnboundedSender<Notify<C>>,
pub(crate) tx_notify: MpscUnboundedSenderOf<C, Notify<C>>,

/// A Receiver to receive callback from other components.
pub(crate) rx_notify: mpsc::UnboundedReceiver<Notify<C>>,
pub(crate) rx_notify: MpscUnboundedReceiverOf<C, Notify<C>>,

pub(crate) tx_metrics: watch::Sender<RaftMetrics<C>>,
pub(crate) tx_data_metrics: watch::Sender<RaftDataMetrics<C>>,
Expand Down Expand Up @@ -931,11 +935,11 @@ where
let msg = match res {
Ok(msg) => msg,
Err(e) => match e {
mpsc::error::TryRecvError::Empty => {
TryRecvError::Empty => {
tracing::debug!("all RaftMsg are processed, wait for more");
return Ok(i + 1);
}
mpsc::error::TryRecvError::Disconnected => {
TryRecvError::Disconnected => {
tracing::debug!("rx_api is disconnected, quit");
return Err(Fatal::Stopped);
}
Expand Down Expand Up @@ -966,11 +970,11 @@ where
let notify = match res {
Ok(msg) => msg,
Err(e) => match e {
mpsc::error::TryRecvError::Empty => {
TryRecvError::Empty => {
tracing::debug!("all Notify are processed, wait for more");
return Ok(i + 1);
}
mpsc::error::TryRecvError::Disconnected => {
TryRecvError::Disconnected => {
tracing::error!("rx_notify is disconnected, quit");
return Err(Fatal::Stopped);
}
Expand Down
13 changes: 8 additions & 5 deletions openraft/src/core/sm/handle.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
//! State machine control handle
use tokio::sync::mpsc;

use crate::async_runtime::MpscUnboundedSender;
use crate::async_runtime::MpscUnboundedWeakSender;
use crate::async_runtime::SendError;
use crate::core::sm;
use crate::type_config::alias::JoinHandleOf;
use crate::type_config::alias::MpscUnboundedSenderOf;
use crate::type_config::alias::MpscUnboundedWeakSenderOf;
use crate::type_config::TypeConfigExt;
use crate::RaftTypeConfig;
use crate::Snapshot;
Expand All @@ -12,7 +15,7 @@ use crate::Snapshot;
pub(crate) struct Handle<C>
where C: RaftTypeConfig
{
pub(in crate::core::sm) cmd_tx: mpsc::UnboundedSender<sm::Command<C>>,
pub(in crate::core::sm) cmd_tx: MpscUnboundedSenderOf<C, sm::Command<C>>,

#[allow(dead_code)]
pub(in crate::core::sm) join_handle: JoinHandleOf<C, ()>,
Expand All @@ -21,7 +24,7 @@ where C: RaftTypeConfig
impl<C> Handle<C>
where C: RaftTypeConfig
{
pub(crate) fn send(&mut self, cmd: sm::Command<C>) -> Result<(), mpsc::error::SendError<sm::Command<C>>> {
pub(crate) fn send(&mut self, cmd: sm::Command<C>) -> Result<(), SendError<sm::Command<C>>> {
tracing::debug!("sending command to state machine worker: {:?}", cmd);
self.cmd_tx.send(cmd)
}
Expand All @@ -43,7 +46,7 @@ where C: RaftTypeConfig
/// It is weak because the [`Worker`] watches the close event of this channel for shutdown.
///
/// [`Worker`]: sm::worker::Worker
cmd_tx: mpsc::WeakUnboundedSender<sm::Command<C>>,
cmd_tx: MpscUnboundedWeakSenderOf<C, sm::Command<C>>,
}

impl<C> SnapshotReader<C>
Expand Down
16 changes: 9 additions & 7 deletions openraft/src/core/sm/worker.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use tokio::sync::mpsc;

use crate::async_runtime::MpscUnboundedReceiver;
use crate::async_runtime::MpscUnboundedSender;
use crate::async_runtime::OneshotSender;
use crate::core::notify::Notify;
use crate::core::raft_msg::ResultSender;
Expand All @@ -18,6 +18,8 @@ use crate::storage::RaftLogReaderExt;
use crate::storage::RaftStateMachine;
use crate::type_config::alias::JoinHandleOf;
use crate::type_config::alias::LogIdOf;
use crate::type_config::alias::MpscUnboundedReceiverOf;
use crate::type_config::alias::MpscUnboundedSenderOf;
use crate::type_config::TypeConfigExt;
use crate::RaftLogId;
use crate::RaftLogReader;
Expand All @@ -41,10 +43,10 @@ where
log_reader: LR,

/// Raed command from RaftCore to execute.
cmd_rx: mpsc::UnboundedReceiver<Command<C>>,
cmd_rx: MpscUnboundedReceiverOf<C, Command<C>>,

/// Send back the result of the command to RaftCore.
resp_tx: mpsc::UnboundedSender<Notify<C>>,
resp_tx: MpscUnboundedSenderOf<C, Notify<C>>,
}

impl<C, SM, LR> Worker<C, SM, LR>
Expand All @@ -54,8 +56,8 @@ where
LR: RaftLogReader<C>,
{
/// Spawn a new state machine worker, return a controlling handle.
pub(crate) fn spawn(state_machine: SM, log_reader: LR, resp_tx: mpsc::UnboundedSender<Notify<C>>) -> Handle<C> {
let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
pub(crate) fn spawn(state_machine: SM, log_reader: LR, resp_tx: MpscUnboundedSenderOf<C, Notify<C>>) -> Handle<C> {
let (cmd_tx, cmd_rx) = C::mpsc_unbounded();

let worker = Worker {
state_machine,
Expand Down Expand Up @@ -193,7 +195,7 @@ where
/// as applying a log entry,
/// - or it must be able to acquire a lock that prevents any write operations.
#[tracing::instrument(level = "info", skip_all)]
async fn build_snapshot(&mut self, seq: CommandSeq, resp_tx: mpsc::UnboundedSender<Notify<C>>) {
async fn build_snapshot(&mut self, seq: CommandSeq, resp_tx: MpscUnboundedSenderOf<C, Notify<C>>) {
// TODO: need to be abortable?
// use futures::future::abortable;
// let (fu, abort_handle) = abortable(async move { builder.build_snapshot().await });
Expand Down
9 changes: 5 additions & 4 deletions openraft/src/core/tick.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@ use std::sync::Mutex;
use std::time::Duration;

use futures::future::Either;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tracing::Instrument;
use tracing::Level;
use tracing::Span;

use crate::async_runtime::MpscUnboundedSender;
use crate::core::notify::Notify;
use crate::type_config::alias::JoinHandleOf;
use crate::type_config::alias::MpscUnboundedSenderOf;
use crate::type_config::TypeConfigExt;
use crate::RaftTypeConfig;

Expand All @@ -24,7 +25,7 @@ where C: RaftTypeConfig
{
interval: Duration,

tx: mpsc::UnboundedSender<Notify<C>>,
tx: MpscUnboundedSenderOf<C, Notify<C>>,

/// Emit event or not
enabled: Arc<AtomicBool>,
Expand Down Expand Up @@ -53,7 +54,7 @@ where C: RaftTypeConfig
impl<C> Tick<C>
where C: RaftTypeConfig
{
pub(crate) fn spawn(interval: Duration, tx: mpsc::UnboundedSender<Notify<C>>, enabled: bool) -> TickHandle<C> {
pub(crate) fn spawn(interval: Duration, tx: MpscUnboundedSenderOf<C, Notify<C>>, enabled: bool) -> TickHandle<C> {
let enabled = Arc::new(AtomicBool::from(enabled));
let this = Self {
interval,
Expand Down Expand Up @@ -180,7 +181,7 @@ mod tests {
#[cfg(not(feature = "singlethreaded"))]
#[tokio::test]
async fn test_shutdown() -> anyhow::Result<()> {
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
let (tx, mut rx) = TickUTConfig::mpsc_unbounded();
let th = Tick::<TickUTConfig>::spawn(Duration::from_millis(100), tx, true);

TickUTConfig::sleep(Duration::from_millis(500)).await;
Expand Down
6 changes: 3 additions & 3 deletions openraft/src/raft/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ pub use message::InstallSnapshotResponse;
pub use message::SnapshotResponse;
pub use message::VoteRequest;
pub use message::VoteResponse;
use tokio::sync::mpsc;
use tokio::sync::watch;
use tokio::sync::Mutex;
use tracing::trace_span;
use tracing::Instrument;
use tracing::Level;

use crate::async_runtime::MpscUnboundedSender;
use crate::async_runtime::OneshotSender;
use crate::config::Config;
use crate::config::RuntimeConfig;
Expand Down Expand Up @@ -239,8 +239,8 @@ where C: RaftTypeConfig
LS: RaftLogStorage<C>,
SM: RaftStateMachine<C>,
{
let (tx_api, rx_api) = mpsc::unbounded_channel();
let (tx_notify, rx_notify) = mpsc::unbounded_channel();
let (tx_api, rx_api) = C::mpsc_unbounded();
let (tx_notify, rx_notify) = C::mpsc_unbounded();
let (tx_metrics, rx_metrics) = watch::channel(RaftMetrics::new_initial(id));
let (tx_data_metrics, rx_data_metrics) = watch::channel(RaftDataMetrics::default());
let (tx_server_metrics, rx_server_metrics) = watch::channel(RaftServerMetrics::default());
Expand Down
5 changes: 3 additions & 2 deletions openraft/src/raft/raft_inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ use std::fmt::Debug;
use std::future::Future;
use std::sync::Arc;

use tokio::sync::mpsc;
use tokio::sync::watch;
use tokio::sync::Mutex;
use tracing::Level;

use crate::async_runtime::MpscUnboundedSender;
use crate::config::RuntimeConfig;
use crate::core::raft_msg::external_command::ExternalCommand;
use crate::core::raft_msg::RaftMsg;
Expand All @@ -17,6 +17,7 @@ use crate::error::RaftError;
use crate::metrics::RaftDataMetrics;
use crate::metrics::RaftServerMetrics;
use crate::raft::core_state::CoreState;
use crate::type_config::alias::MpscUnboundedSenderOf;
use crate::type_config::alias::OneshotReceiverOf;
use crate::type_config::alias::OneshotSenderOf;
use crate::type_config::AsyncRuntime;
Expand All @@ -34,7 +35,7 @@ where C: RaftTypeConfig
pub(in crate::raft) config: Arc<Config>,
pub(in crate::raft) runtime_config: Arc<RuntimeConfig>,
pub(in crate::raft) tick_handle: TickHandle<C>,
pub(in crate::raft) tx_api: mpsc::UnboundedSender<RaftMsg<C>>,
pub(in crate::raft) tx_api: MpscUnboundedSenderOf<C, RaftMsg<C>>,
pub(in crate::raft) rx_metrics: watch::Receiver<RaftMetrics<C>>,
pub(in crate::raft) rx_data_metrics: watch::Receiver<RaftDataMetrics<C>>,
pub(in crate::raft) rx_server_metrics: watch::Receiver<RaftServerMetrics<C>>,
Expand Down
21 changes: 13 additions & 8 deletions openraft/src/replication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ use request::Replicate;
use response::ReplicationResult;
pub(crate) use response::Response;
use tokio::select;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::sync::Mutex;
use tracing_futures::Instrument;

use crate::async_runtime::MpscUnboundedReceiver;
use crate::async_runtime::MpscUnboundedSender;
use crate::async_runtime::MpscUnboundedWeakSender;
use crate::config::Config;
use crate::core::notify::Notify;
use crate::core::sm::handle::SnapshotReader;
Expand Down Expand Up @@ -51,6 +53,9 @@ use crate::storage::Snapshot;
use crate::type_config::alias::InstantOf;
use crate::type_config::alias::JoinHandleOf;
use crate::type_config::alias::LogIdOf;
use crate::type_config::alias::MpscUnboundedReceiverOf;
use crate::type_config::alias::MpscUnboundedSenderOf;
use crate::type_config::alias::MpscUnboundedWeakSenderOf;
use crate::type_config::TypeConfigExt;
use crate::LogId;
use crate::RaftLogId;
Expand All @@ -68,7 +73,7 @@ where C: RaftTypeConfig
pub(crate) join_handle: JoinHandleOf<C, Result<(), ReplicationClosed>>,

/// The channel used for communicating with the replication task.
pub(crate) tx_repl: mpsc::UnboundedSender<Replicate<C>>,
pub(crate) tx_repl: MpscUnboundedSenderOf<C, Replicate<C>>,
}

/// A task responsible for sending replication events to a target follower in the Raft cluster.
Expand All @@ -90,17 +95,17 @@ where

/// A channel for sending events to the RaftCore.
#[allow(clippy::type_complexity)]
tx_raft_core: mpsc::UnboundedSender<Notify<C>>,
tx_raft_core: MpscUnboundedSenderOf<C, Notify<C>>,

/// A channel for receiving events from the RaftCore and snapshot transmitting task.
rx_event: mpsc::UnboundedReceiver<Replicate<C>>,
rx_event: MpscUnboundedReceiverOf<C, Replicate<C>>,

/// A weak reference to the Sender for the separate sending-snapshot task to send callback.
///
/// Because 1) ReplicationCore replies on the `close` event to shutdown.
/// 2) ReplicationCore holds this tx; It is made a weak so that when
/// RaftCore drops the only non-weak tx, the Receiver `rx_repl` will be closed.
weak_tx_event: mpsc::WeakUnboundedSender<Replicate<C>>,
weak_tx_event: MpscUnboundedWeakSenderOf<C, Replicate<C>>,

/// The `RaftNetwork` interface for replicating logs and heartbeat.
network: N::Network,
Expand Down Expand Up @@ -164,7 +169,7 @@ where
snapshot_network: N::Network,
log_reader: LS::LogReader,
snapshot_reader: SnapshotReader<C>,
tx_raft_core: mpsc::UnboundedSender<Notify<C>>,
tx_raft_core: MpscUnboundedSenderOf<C, Notify<C>>,
span: tracing::Span,
) -> ReplicationHandle<C> {
tracing::debug!(
Expand All @@ -176,7 +181,7 @@ where
);

// other component to ReplicationStream
let (tx_event, rx_event) = mpsc::unbounded_channel();
let (tx_event, rx_event) = C::mpsc_unbounded();

let this = Self {
target,
Expand Down Expand Up @@ -757,7 +762,7 @@ where
snapshot: Snapshot<C>,
option: RPCOption,
cancel: oneshot::Receiver<()>,
weak_tx: mpsc::WeakUnboundedSender<Replicate<C>>,
weak_tx: MpscUnboundedWeakSenderOf<C, Replicate<C>>,
) {
let meta = snapshot.meta.clone();

Expand Down
9 changes: 9 additions & 0 deletions openraft/src/type_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub(crate) mod util;
use std::fmt::Debug;

pub use async_runtime::AsyncRuntime;
pub use async_runtime::MpscUnbounded;
pub use async_runtime::OneshotSender;
pub use util::TypeConfigExt;

Expand Down Expand Up @@ -93,6 +94,7 @@ pub trait RaftTypeConfig:
///
/// [`type-alias`]: crate::docs::feature_flags#feature-flag-type-alias
pub mod alias {
use crate::async_runtime::MpscUnbounded;
use crate::raft::responder::Responder;
use crate::type_config::AsyncRuntime;
use crate::RaftTypeConfig;
Expand All @@ -118,6 +120,13 @@ pub mod alias {
pub type OneshotSenderOf<C, T> = <Rt<C> as AsyncRuntime>::OneshotSender<T>;
pub type OneshotReceiverErrorOf<C> = <Rt<C> as AsyncRuntime>::OneshotReceiverError;
pub type OneshotReceiverOf<C, T> = <Rt<C> as AsyncRuntime>::OneshotReceiver<T>;
pub type MpscUnboundedOf<C> = <Rt<C> as AsyncRuntime>::MpscUnbounded;

type Mpsc<C> = MpscUnboundedOf<C>;

pub type MpscUnboundedSenderOf<C, T> = <Mpsc<C> as MpscUnbounded>::Sender<T>;
pub type MpscUnboundedReceiverOf<C, T> = <Mpsc<C> as MpscUnbounded>::Receiver<T>;
pub type MpscUnboundedWeakSenderOf<C, T> = <Mpsc<C> as MpscUnbounded>::WeakSender<T>;

// Usually used types
pub type LogIdOf<C> = crate::LogId<NodeIdOf<C>>;
Expand Down
Loading

0 comments on commit 0a8a5da

Please sign in to comment.