Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add sync primitives to AsyncRuntime trait #1013

Open
6 of 7 tasks
drmingdrmer opened this issue Feb 18, 2024 · 12 comments
Open
6 of 7 tasks

Add sync primitives to AsyncRuntime trait #1013

drmingdrmer opened this issue Feb 18, 2024 · 12 comments
Labels
A-runtime Area: async-runtime

Comments

@drmingdrmer
Copy link
Member

drmingdrmer commented Feb 18, 2024

Yes. Tokio primitives do work in other runtimes, but they are not optimally-supported.

For example, Tokio has its own counters for cooperative multitasking (which, obviously, only make sense and are active if running within Tokio), other runtimes have their own algorithm for how not to block the runtime for too long. The same is true for resource monitoring support (e.g., via tracing in Tokio).

Another issue is that Tokio sync primitives allocate memory at unexpected points. This is OK if you are fine with your program going broken on OOM, but we don't have this luxury :-). We need to handle OOMs properly.

Originally posted by @schreter in #1010 (review)

TODO:

Copy link

👋 Thanks for opening this issue!

Get help or engage by:

  • /help : to print help messages.
  • /assignme : to assign this issue to you.

@SteveLauC
Copy link
Collaborator

SteveLauC commented Jul 26, 2024

I would like to share the things I found while drafting the support for Monoio:)

To make Openraft implementation (not tests) runtime-agnostic (#249)

If we merge #1204, we still need these from Tokio:

  1. tokio::select!

    I am not aware of any way to be generic over a macro, but it is fine as tokio::select!() can be used with other runtimes.

  2. tokio::sync::Mutex

    We can have this in trait AsyncRuntime, and can replace it with the one configured in AsyncRuntime once we add it. Considering it can be used on other runtimes, it is not a blocker.

    Though not every runtime has their native Mutex implementation, for such runtimes, they may still need to use the Tokio Mutex when implementing AsyncRuntime.

  3. Tokio I/O traits

    $ cd openraft/openraft
    $ rg "io::Async"
    src/network/v2/adapt_v1.rs
    25:    C::SnapshotData: tokio::io::AsyncRead + tokio::io::AsyncWrite + tokio::io::AsyncSeek + Unpin,
    
    src/network/snapshot_transport.rs
    10:use tokio::io::AsyncReadExt;
    11:use tokio::io::AsyncSeekExt;
    12:use tokio::io::AsyncWriteExt;
    98:where C::SnapshotData: tokio::io::AsyncRead + tokio::io::AsyncWrite + tokio::io::AsyncSeek + Unpin
    318:    C::SnapshotData: tokio::io::AsyncWrite + tokio::io::AsyncSeek + Unpin,
    
    src/raft/mod.rs
    445:        C::SnapshotData: tokio::io::AsyncRead + tokio::io::AsyncWrite + tokio::io::AsyncSeek + Unpin,

    These traits are needed for RaftNetwork interface v1 and they are really blockers, we cannot directly remove these things and force user to migrate to the v2 interface, so I suggest adding a new feature network-v1 and gating the above stuff with it, enabling this feature will pull tokio in.

Add Monoio support for Openraft - Primitive support

  • Oneshot: local-sync has a oneshot impl, which is extracted from Tokio
  • Unbounded MPSC: local-sync does have it provided, but it does not contain a weak sender, which is needed by Openraft
  • Watch: This is not implemented in local-sync
  • RwLock/Mutex(If we are gonna add them to AsyncRuntime): not implemented

I implemented a draft version of Monoio support there, since the reason/situation listed above, for primitives like MPSC and Watch, I simply use the ones provided by Tokio.

Primitive support of various runtimes

rt\primitive Oneshot unbounded MPSC Watch RwLock Mutex
Tokio Yes Yes Yes Yes Yes
async-std No No/there is a MPMC No Yes Yes
futures Yes Yes, but no weak sender No No Yes
smol No No/there is a MPMC No Yes Yes
Monoio Yes, from Tokio Yes, but no weak sender No No No
Glommio No No No Yes No
Compio No No No No No

This is a rough summary, I cannot really speak for runtimes other than Monoio given that I didn't try to implement them.

I would say Tokio's ecosystem is huge, other runtimes are relatively immature and thus primarily concentrating on core I/O part. For Openraft, we should definitely add more generic needed primitives to AsyncRuntime due to the reason @schreter said, but we should consider the API compatibility when deciding interfaces and accept that the Tokio's primitives will be used as fallback for other runtimes in the distant future.

@drmingdrmer
Copy link
Member Author

It's a great road map! Thank you!

About Mutex:

I think some of the Mutex usage can be replaced with std::sync::Mutex. And it's easy to build a Mutex with oneshot primitives. Mutex in Openraft is not used on hot path. It's alright to mock a Mutex instead of requiring user to implement it.

About AsyncRead|Write|Seek:

As we discussed before, there will be a feature flag to control if Openraft adds a default Tokio runtime to RaftTypeConfig.

The Chunked codec can be enabled just by this feature flag: Only when using the default tokio runtime chunked codec is provided.

@schreter
Copy link
Collaborator

Tokio's primitives will be used as fallback

Yes, this is fairly easily possible. Fortunately, tokio synchronization primitives work with basically any runtime, so the runtime implementation for a particular runtime can use stuff from tokio until it's implemented in it. We also still use some primitives from tokio (for openraft, only Watch is the relevant one) in our runtime, because not everything is implemented yet.

However, what does NOT work from tokio are any things which require timing or I/O. But, fortunately, we have abstracted timing and the I/O is user's responsibility, so no issue for openraft.

Regarding select!, there is futures::select! macro, which unfortunately didn't make it to std::future. This macro is similar to tokio::select!, but not 1:1 compatible. However, it's probably easy to rewrite the usage with futures. Other stuff, for example, poll_fn made it to std already.

The Chunked codec can be enabled just by this feature flag

+1 :-), then we don't have any remaining dependency left.

@drmingdrmer
Copy link
Member Author

The only select! Openraft used is to get notified by either of two channels(rx_shutdown can be moved out):

https://github.com/datafuselabs/openraft/blob/ec42d9a1772dc48769f3f43218f43f3340ac9d2d/openraft/src/core/raft_core.rs#L913-L944

Thus select! can be replaced with futures select(a,b).

@SteveLauC
Copy link
Collaborator

And it's easy to build a Mutex with oneshot primitives.

This is neat!


As we discussed before, there will be a feature flag to control if Openraft adds a default Tokio runtime to RaftTypeConfig.

The Chunked codec can be enabled just by this feature flag: Only when using the default tokio runtime chunked codec is provided.

Get it:)


Regarding select!, there is futures::select! macro, which unfortunately didn't make it to std::future

Yeah, this is unfortunate:<

Thus select! can be replaced with futures select(a,b).

Great, tokio usage -=1 😁

@ariesdevil
Copy link
Contributor

Thus select! can be replaced with futures select(a,b).

Noticed that select! use the biased; to ensure the order. Does futures.select(a, b) also ensure the order?

@drmingdrmer
Copy link
Member Author

Thus select! can be replaced with futures select(a,b).

Noticed that select! use the biased; to ensure the order. Does futures.select(a, b) also ensure the order?

The biased keyword is used to prioritize the rx_shutdown channel over the other two channels. In practice, when using select(), you should first check rx_shutdown. If it's not ready, then proceed to use select(a, b) to handle the other two event channels.

@schreter
Copy link
Collaborator

futures::select function won't work, as it consumes the two futures and requires Unpin.

There is select_biased! macro in futures, which does the needful.

@drmingdrmer
Copy link
Member Author

futures::select function won't work, as it consumes the two futures and requires Unpin.

There is select_biased! macro in futures, which does the needful.

It looks like just pin them then it is working with the following approach:

use std::pin::pin;

#[tokio::main]
async fn main() {
    let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<()>();

    let fu = rx.recv();
    let sleep = tokio::time::sleep(std::time::Duration::from_secs(1));

    let fpin = pin!(fu);
    let spin = pin!(sleep);

    futures::future::select(fpin, spin).await;
}

https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=d3aafad649aa77365457da34f0e33fb9

@schreter
Copy link
Collaborator

futures::future::select(fpin, spin).await;

But why do you want to avoid using select_biased! macro? That is basically drop-in replacement for the tokio one, which allows you to easily handle first or second future completing first.

use std::pin::pin;
use futures::future::FutureExt;

#[tokio::main]
async fn main() {
    let (_tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<()>();

    let sleep = tokio::time::sleep(std::time::Duration::from_secs(1));

    loop {
        futures::select_biased!(
            _ = rx.recv().fuse() => {
                println!("recv completed");
                break;
            }
            _ = sleep.fuse() => {
                println!("sleep completed");
                break;
            }
        );
    }
}

https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=9937a5b1845e2b636995fd9e3efcf411

@drmingdrmer
Copy link
Member Author

futures::future::select(fpin, spin).await;

But why do you want to avoid using select_biased! macro? That is basically drop-in replacement for the tokio one, which allows you to easily handle first or second future completing first.

use std::pin::pin;
use futures::future::FutureExt;

#[tokio::main]
async fn main() {
    let (_tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<()>();

    let sleep = tokio::time::sleep(std::time::Duration::from_secs(1));

    loop {
        futures::select_biased!(
            _ = rx.recv().fuse() => {
                println!("recv completed");
                break;
            }
            _ = sleep.fuse() => {
                println!("sleep completed");
                break;
            }
        );
    }
}

https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=9937a5b1845e2b636995fd9e3efcf411

Right, select_biased Is better.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-runtime Area: async-runtime
Projects
None yet
Development

No branches or pull requests

4 participants