Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Example Request - network threads - listeners and connection handlers #95

Closed
rabarar opened this issue Dec 16, 2024 · 27 comments · Fixed by #96
Closed

Example Request - network threads - listeners and connection handlers #95

rabarar opened this issue Dec 16, 2024 · 27 comments · Fixed by #96

Comments

@rabarar
Copy link

rabarar commented Dec 16, 2024

can you write an example that show how one would shutdown a listening thread for a network connection as well as all open handled spawned threads?

@Finomnis
Copy link
Owner

Finomnis commented Dec 16, 2024

I'm unsure what you mean, are you talking about async tasks? Tokio does not spawn a thread per connection.
Please write a minimal example of the kind of threads you mean, then I can help you further.

@rabarar
Copy link
Author

rabarar commented Dec 16, 2024

let me clarify - (daily new to rust) so when I spawn an asynchronous function, first for the listener and then for each connection, how do i use your crate to gracefully shutdown all of them? here's the code for example, that spawn each, respectively.

let producer = tokio::spawn(async move {
        for stream in listener.incoming() {
            match stream {
                Ok(stream) => {
                    let acceptor = acceptor.clone();
                    let tx = tx.clone();
                    let thread_clone = notify_producer_clone.clone();
                    tokio::spawn(async move {
                        let stream = acceptor.accept(stream).unwrap();
                        println!("accepting connection...");
                        handle_connection(stream, tx, thread_clone).await;
                    });
                }
                Err(e) => eprintln!("Connection failed: {}", e),
            }
        }
    });

@Finomnis
Copy link
Owner

Finomnis commented Dec 16, 2024

I see what you mean now.

It might be worth looking into the difference of threads and tasks in tokio, using those two words incorrectly causes a bunch of confusion. I assume all your 'threads' here are 'tasks'.

So what you do here is spawn one task per connection via tokio::spawn. The task then is idling in handle_connection().await.
handle_connection() returns a future, which is then awaitable. But further, this future is also cancellable. There are several ways you can cancel a future/task, with the most common ones being tokio::select or the one built-into this crate, future::cancel_on_shutdown().

That said, it is not recommended to spawn one tokio_graceful_shutdown-subsystem per connection. It has too much overhead.

Instead, I recommend using this crate for the general plumbing of your app, and then handing over to a more light-weight mechanism for the connections themselves, like tokio_util::task::TaskTracker.

A good example of how this can work together to create a high-performance webserver can be seen in this crate's hyper example.

Let me know if you have any further questions regarding this topic.

@rabarar
Copy link
Author

rabarar commented Dec 16, 2024 via email

@Finomnis
Copy link
Owner

Finomnis commented Dec 16, 2024

After reading the hyper example, let me know if that cleared it up for you or if I need to add further examples.

@rabarar
Copy link
Author

rabarar commented Dec 16, 2024 via email

@Finomnis
Copy link
Owner

Line 59 is an empty line for me :) sure that number is correct?

@Finomnis
Copy link
Owner

@rabarar
Copy link
Author

rabarar commented Dec 16, 2024 via email

@Finomnis
Copy link
Owner

Anything else, or should I close this issue once the example is merged?

@rabarar
Copy link
Author

rabarar commented Dec 16, 2024 via email

@rabarar
Copy link
Author

rabarar commented Dec 17, 2024 via email

@rabarar
Copy link
Author

rabarar commented Dec 17, 2024 via email

@Finomnis
Copy link
Owner

Finomnis commented Dec 17, 2024

Like this?

async fn connection_handler(
    subsys: SubsystemHandle,
    listener: TcpListener,
    connection_tracker: TaskTracker,
) -> Result<()> {
    // Load the identity from a PKCS12 file
    let mut file = File::open("robert.p12").unwrap();
    let mut identity = vec![];
    file.read_to_end(&mut identity).unwrap();
    let identity = Identity::from_pkcs12(&identity, "atakatak").unwrap();
    let acceptor = TlsAcceptor::builder(identity).build().unwrap();
    let acceptor = Arc::new(acceptor);

    loop {
        let connection = match listener.accept().cancel_on_shutdown(&subsys).await {
            Ok(connection) => connection,
            Err(CancelledByShutdown) => break,
        };
        let (tcp, addr) = connection
            .into_diagnostic()
            .context("Error while waiting for connection")?;

        // Spawn handler on connection tracker to give the parent subsystem
        // the chance to wait for the shutdown to finish
        connection_tracker.spawn({
            let cancellation_token = subsys.create_cancellation_token();
            let acceptor = acceptor.clone();
            async move {
                tracing::info!("Connected to {} ...", addr);
                let mut tcp = acceptor.accept(tcp).unwrap();
                let result = tokio::select! {
                    e = echo_connection(&mut tcp) => e,
                    _ = cancellation_token.cancelled() => {
                        tracing::info!("Shutting down {} ...", addr);
                        echo_connection_shutdown(&mut tcp).await
                    },
                };

                if let Err(err) = result {
                    tracing::warn!("Error serving connection: {:?}", err);
                } else {
                    tracing::info!("Connection to {} closed.", addr);
                }
            }
        });
    }

    Ok(())
}

Didn't test it, though. Just from the general idea, that's how it should work.

@rabarar
Copy link
Author

rabarar commented Dec 17, 2024 via email

@Finomnis
Copy link
Owner

Please respond here on github and not via email, I've already modified my response to include that. And it's really hard to follow your syntax highlighting like that.

@Finomnis
Copy link
Owner

But it honestly feels like I'm starting to do your programming task for you ... do you have actual, real, conceptual problems with the code? What exactly confuses you? Are you stuck somewhere? Why, what exactly puzzles you?

@rabarar
Copy link
Author

rabarar commented Dec 17, 2024 via email

@Finomnis
Copy link
Owner

Feel absolutely free to ask if you hit any real road blocks, though.

@rabarar
Copy link
Author

rabarar commented Dec 18, 2024 via email

@Finomnis
Copy link
Owner

I'm unsure what you mean... For one, I guess you are talking about tasks again, not threads?

Further, please provide more context to your code. Is FunctionA already a subsystem? If so, why do you need another task?

tokio::select does not spawn anything, it runs multiple async branches concurrently on the current task, canceling all others once one completed.

@rabarar
Copy link
Author

rabarar commented Dec 18, 2024 via email

@Finomnis
Copy link
Owner

Not sure what to send you, because this is basically what the example I merged already contains... What about it is confusing you?

Waiting for a queue item is future that can be canceled, as described earlier:

There are several ways you can cancel a future/task, with the most common ones being tokio::select or the one built-into this crate, future::cancel_on_shutdown().

Several examples demonstrate this, including the new example.

I kind of understand what you are trying to achieve, I just don't understand the problems you are having. Please provide some code that demonstrates your problem.

@rabarar
Copy link
Author

rabarar commented Dec 19, 2024

Okay, I think I get it… but I am getting a timeout on shutdown:

2024-12-19T04:17:09.527552Z ERROR tokio_graceful_shutdown::toplevel: Shutdown timed out!
2024-12-19T04:17:09.527656Z  WARN tokio_graceful_shutdown::runner: Subsystem cancelled: '/Subsys2'
Error: Diagnostic { message: "shutdown timed out", code: "graceful_shutdown::timeout" }
NOTE: If you're looking for the fancy error reports, install miette with the `fancy` feature, or write your own and hook it up with miette::set_hook().

Here’s the code I wrote to demonstrate comms between two subsystems using tokio channels

use miette::Result;
use tokio::sync::mpsc;
use tokio::time::{sleep, Duration};
use tokio_graceful_shutdown::{SubsystemBuilder, SubsystemHandle, Toplevel};

struct Subsystem1 {
    tx: mpsc::Sender<String>,
    rx: mpsc::Receiver<String>,
}

impl Subsystem1 {
    async fn run(mut self, subsys: SubsystemHandle) -> Result<()> {
        tracing::info!("Subsystem1 started.");
        if let Err(e) = self.tx.send("hi there from ss1".to_string()).await {
            tracing::info!(">>> Subsystem 1 failed to sent to Subsystem 2: {e}");
        } else {
            tracing::info!(">>> Subsystem 1 sent to Subsystem 2");
        }

        let tx2 = self.tx.clone();
        loop {
            tokio::select! {
                _ = subsys.on_shutdown_requested() => {
                    break;
                }
                Some(message) = self.rx.recv() => {
                        tracing::info!("Subsystem 1 received: {}", message);
                        let response = format!("{}", message);
                        // Send a response back to subsystem 1
                        match tx2.send(response.clone()).await {
                            Ok(()) => {
                                tracing::info!("subsystem 1 sent response...");
                            },
                            Err(_e) => {
                                tracing::error!("subsystem 1 failed to send response...");
                            },
                        }

                        tracing::info!("Subsystem 1 sent: {}", response);
                        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
                }
            }
        }
        subsys.on_shutdown_requested().await;
        tracing::info!("Shutting down Subsystem1 ...");
        sleep(Duration::from_millis(500)).await;
        tracing::info!("Subsystem1 stopped.");
        Ok(())
    }
}

struct Subsystem2 {
    tx: mpsc::Sender<String>,
    rx: mpsc::Receiver<String>,
}

impl Subsystem2 {
    async fn run(mut self, subsys: SubsystemHandle) -> Result<()> {
        tracing::info!("Subsystem1 started.");
        if let Err(e) = self.tx.send("hi there from ss2".to_string()).await {
            tracing::info!(">>> Subsystem 2 failed to sent to Subsystem 1: {e}");
        } else {
            tracing::info!(">>> Subsystem 2 sent to Subsystem 1");
        }

        let tx2 = self.tx.clone();
        loop {
            tokio::select! {
                _ = subsys.on_shutdown_requested() => {
                    break;
                }
                Some(message) = self.rx.recv() => {
                        tracing::info!("Subsystem 2 received: {}", message);
                        let response = format!("{}", message);
                        // Send a response back to subsystem 1
                        match tx2.send(response.clone()).await {
                            Ok(()) => {
                                tracing::info!("subsystem 1 sent response...");
                            },
                            Err(_e) => {
                                tracing::error!("subsystem 1 failed to send response...");
                            },
                        }
                        tracing::info!("Subsystem 2 sent: {}", response);
                        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
                }
            }
        }
        subsys.on_shutdown_requested().await;
        tracing::info!("Shutting down Subsystem2 ...");
        sleep(Duration::from_millis(500)).await;
        tracing::info!("Subsystem2 stopped.");
        Ok(())
    }
}

#[tokio::main]
async fn main() -> Result<()> {
    // Init logging
    tracing_subscriber::fmt()
        .with_max_level(tracing::Level::DEBUG)
        .init();

    // Create channels for communication
    let (tx1, rx1) = mpsc::channel::<String>(100);
    let (tx2, rx2) = mpsc::channel::<String>(100);

    let subsys1 = Subsystem1 { tx: tx2, rx: rx1 };
    let subsys2 = Subsystem2 { tx: tx1, rx: rx2 };

    // Setup and execute subsystem tree
    Toplevel::new(|s| async move {
        s.start(SubsystemBuilder::new("Subsys1", |a| subsys1.run(a)));
        s.start(SubsystemBuilder::new("Subsys2", |a| subsys2.run(a)));
    })
    .catch_signals()
    .handle_shutdown_requests(Duration::from_millis(1000))
    .await
    .map_err(Into::into)
}

@Finomnis
Copy link
Owner

Finomnis commented Dec 19, 2024

You're almost there.

  • You still have an arbitrary wait of 500ms every shutdown. This is of course valid for demonstration purposes, but I would remove it at some point ;)
  • The subsys.on_shutdown_requested().await; during shutdown is pointless, as you already do that in the select statement.
  • There's no point in cloning self.tx, you can use it directly.
  • The biggest and actual error: on_shutdown_requested() does not interrupt the tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;.
    Also, there will always be a message available in self.rx.recv(), so on_shutdown_requested() never gets checked.

Your select statement runs self.rx.recv() and subsys.on_shutdown_requested() in parallel. Once self.rx.recv() succeeds, the subsys.on_shutdown_requested() gets cancelled and the code after recv gets executed.

If that's what you want, great; but I assume you want the sleep(from_secs(1)) to be cancelled as well.

The easiest solution is to split the init/shutdown/cancellation code and the worker code into two separate functions, and cancel the entire worker code on shutdown, like so:

use miette::Result;
use tokio::sync::mpsc;
use tokio::time::{sleep, Duration};
use tokio_graceful_shutdown::{SubsystemBuilder, SubsystemHandle, Toplevel};

struct Subsystem1 {
    tx: mpsc::Sender<String>,
    rx: mpsc::Receiver<String>,
}

impl Subsystem1 {
    async fn communicate(&mut self) -> Result<()> {
        while let Some(message) = self.rx.recv().await {
            tracing::info!("Subsystem 1 received: {}", message);
            let response = format!("{}", message);
            // Send a response back to subsystem 2
            match self.tx.send(response.clone()).await {
                Ok(()) => {
                    tracing::info!("subsystem 1 sent response...");
                }
                Err(_e) => {
                    tracing::error!("subsystem 1 failed to send response...");
                }
            }

            tracing::info!("Subsystem 1 sent: {}", response);
            sleep(Duration::from_secs(1)).await;
        }

        Ok(())
    }

    async fn run(mut self, subsys: SubsystemHandle) -> Result<()> {
        tracing::info!("Subsystem1 started.");
        if let Err(e) = self.tx.send("hi there from ss1".to_string()).await {
            tracing::info!(">>> Subsystem 1 failed to sent to Subsystem 2: {e}");
        } else {
            tracing::info!(">>> Subsystem 1 sent to Subsystem 2");
        }

        tokio::select! {
            _ = subsys.on_shutdown_requested() => {
                tracing::info!("Subsystem1 received a shutdown request ...");
            }
            err = self.communicate() => {
                err?;
                tracing::info!("Subsystem1 receive channel got closed ...");
            }
        }

        tracing::info!("Subsystem1 stopped.");
        Ok(())
    }
}

struct Subsystem2 {
    tx: mpsc::Sender<String>,
    rx: mpsc::Receiver<String>,
}

impl Subsystem2 {
    async fn communicate(&mut self) -> Result<()> {
        while let Some(message) = self.rx.recv().await {
            tracing::info!("Subsystem 2 received: {}", message);
            let response = format!("{}", message);
            // Send a response back to subsystem 1
            match self.tx.send(response.clone()).await {
                Ok(()) => {
                    tracing::info!("subsystem 2 sent response...");
                }
                Err(_e) => {
                    tracing::error!("subsystem 2 failed to send response...");
                }
            }

            tracing::info!("Subsystem 2 sent: {}", response);
            sleep(Duration::from_secs(1)).await;
        }

        Ok(())
    }

    async fn run(mut self, subsys: SubsystemHandle) -> Result<()> {
        tracing::info!("Subsystem2 started.");
        if let Err(e) = self.tx.send("hi there from ss2".to_string()).await {
            tracing::info!(">>> Subsystem 2 failed to sent to Subsystem 1: {e}");
        } else {
            tracing::info!(">>> Subsystem 2 sent to Subsystem 1");
        }

        tokio::select! {
            _ = subsys.on_shutdown_requested() => {
                tracing::info!("Subsystem2 received a shutdown request ...");
            }
            err = self.communicate() => {
                err?;
                tracing::info!("Subsystem2 receive channel got closed ...");
            }
        }

        tracing::info!("Subsystem2 stopped.");
        Ok(())
    }
}

#[tokio::main]
async fn main() -> Result<()> {
    // Init logging
    tracing_subscriber::fmt()
        .with_max_level(tracing::Level::DEBUG)
        .init();

    // Create channels for communication
    let (tx1, rx1) = mpsc::channel::<String>(100);
    let (tx2, rx2) = mpsc::channel::<String>(100);

    let subsys1 = Subsystem1 { tx: tx2, rx: rx1 };
    let subsys2 = Subsystem2 { tx: tx1, rx: rx2 };

    // Setup and execute subsystem tree
    Toplevel::new(|s| async move {
        s.start(SubsystemBuilder::new("Subsys1", |a| subsys1.run(a)));
        s.start(SubsystemBuilder::new("Subsys2", |a| subsys2.run(a)));
    })
    .catch_signals()
    .handle_shutdown_requests(Duration::from_millis(1000))
    .await
    .map_err(Into::into)
}
2024-12-19T06:30:41.703599Z  INFO rust_playground: Subsystem1 started.
2024-12-19T06:30:41.703629Z  INFO rust_playground: Subsystem2 started.
2024-12-19T06:30:41.703773Z  INFO rust_playground: >>> Subsystem 1 sent to Subsystem 2
2024-12-19T06:30:41.703907Z  INFO rust_playground: >>> Subsystem 2 sent to Subsystem 1
2024-12-19T06:30:41.704029Z  INFO rust_playground: Subsystem 1 received: hi there from ss2
2024-12-19T06:30:41.704155Z  INFO rust_playground: Subsystem 2 received: hi there from ss1
2024-12-19T06:30:41.704268Z  INFO rust_playground: subsystem 1 sent response...
2024-12-19T06:30:41.704383Z  INFO rust_playground: subsystem 2 sent response...
2024-12-19T06:30:41.704495Z  INFO rust_playground: Subsystem 1 sent: hi there from ss2
2024-12-19T06:30:41.704599Z  INFO rust_playground: Subsystem 2 sent: hi there from ss1
2024-12-19T06:30:42.711124Z  INFO rust_playground: Subsystem 1 received: hi there from ss1
2024-12-19T06:30:42.711131Z  INFO rust_playground: Subsystem 2 received: hi there from ss2
2024-12-19T06:30:42.711317Z  INFO rust_playground: subsystem 1 sent response...
2024-12-19T06:30:42.711448Z  INFO rust_playground: subsystem 2 sent response...
2024-12-19T06:30:42.711573Z  INFO rust_playground: Subsystem 1 sent: hi there from ss1
2024-12-19T06:30:42.711681Z  INFO rust_playground: Subsystem 2 sent: hi there from ss2
2024-12-19T06:30:42.985749Z DEBUG tokio_graceful_shutdown::signal_handling: Received CTRL_C.
2024-12-19T06:30:42.985949Z  INFO rust_playground: Subsystem2 received a shutdown request ...
2024-12-19T06:30:42.985950Z  INFO rust_playground: Subsystem1 received a shutdown request ...
2024-12-19T06:30:42.985950Z  INFO tokio_graceful_shutdown::toplevel: Shutting down ...
2024-12-19T06:30:42.986087Z  INFO rust_playground: Subsystem2 stopped.
2024-12-19T06:30:42.986200Z  INFO rust_playground: Subsystem1 stopped.
2024-12-19T06:30:42.986565Z  INFO tokio_graceful_shutdown::toplevel: Shutdown finished.

In case you are wondering how my project is set up:

[package]
name = "rust-playground"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
miette = { version = "7.4.0", features = ["fancy"] }
tokio = { version = "1.42.0", features = ["full"] }
tokio-graceful-shutdown = "0.15.2"
tracing = "0.1.41"
tracing-subscriber = "0.3.19"

@Finomnis
Copy link
Owner

Finomnis commented Dec 19, 2024

In case you only receive this via EMail again: I edited the previous message.

@rabarar
Copy link
Author

rabarar commented Dec 19, 2024

Thank you. That makes complete sense - it was the separation of the run function and the worker that was throwing me when laying out the select …

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants