Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(jstzd): define Task trait and type JoinHandle #555

Closed
wants to merge 4 commits into from

Conversation

huancheng-trili
Copy link
Collaborator

@huancheng-trili huancheng-trili commented Sep 10, 2024

Context

Define the Task trait and implement the future wrapper type JoinHandle. Task represents any task to be run with backends, e.g. docker engine, shell, jstz node.

JSTZ-53

Description

Changes in Cargo.toml will be updated accordingly after #553 is merged.

Manually testing the PR

cargo test

$ cd $(git rev-parse --show-toplevel)/crates/jstzd && cargo test
...

running 6 tests
test task::joinhandle::test_joinhandle::wait_err ... ok
test task::joinhandle::test_joinhandle::drop_test ... ok
test task::joinhandle::test_joinhandle::abort_test ... ok
test task::joinhandle::test_joinhandle::wait_ok ... ok
test task::joinhandle::test_joinhandle::future_implementation ... ok
test task::joinhandle::test_joinhandle::signal_send ... ok

test result: ok. 6 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 1.04s

   Doc-tests jstzd

running 1 test
test crates/jstzd/src/task/joinhandle.rs - task::joinhandle::JoinHandle::new (line 77) ... ok

test result: ok. 1 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.37s


pub struct JoinHandle {
task: OnceCell<tokioJoinHandle<()>>,
allowed_signal: SignalKind,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you've misunderstood the purpose of signals here. A signal is used to notify the task of some external action (commonly terminating).

But instead of abruptly aborting the Tokio task (which doesn't leave any room for graceful termination / cleaning up of resources), its the implementator of the Task trait responsibility to ensure that when sent a termination signal, run_task returns.

The intended use would be that the signal would be queued in the mpsc::Sender, with the implementator using the mpsc::Recieve to handle signals (including termination).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we should have an abort on a configurable timeout?

}

pub struct JoinHandle {
task: OnceCell<tokioJoinHandle<()>>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would Option work instead? I don't see anywhere that we're relying on OnceCell::set?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. The idea was that this task should only be consumed once, e.g. accessing it after .wait is called should lead to an error, but it's not really a big deal at this moment. This is now changed to Option.

return match self.task.take() {
Some(handle) => {
let result = handle.await;
if result.is_err() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest using an if let here to avoid the unwrap_err

Comment on lines 121 to 126
let waker = cx.waker().clone();
tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_secs_f32(0.3)).await;
waker.wake();
});
Poll::Pending
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of creating a new thread to wakeup the waker, recommend using the .poll method of the handle (it also implements Future). Something like the following should work:

  let pin = Pin::new(handle);
  match pin.poll(cx) {
      Poll::Ready(_) => Poll::Ready(())
      Poll::Pending => Poll::Pending,
  }

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed offline, calling poll here requires a mutable reference to handle and that makes things quite complicated here. I made a few attempts but still could not make it work.

}

impl Future for JoinHandle {
type Output = ();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we return an anyhow::Result<()>? Since .await is essentially .wait?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. This is now updated.

/// );
/// # });
/// ```
pub fn new(task: tokioJoinHandle<()>, signal: SignalKind) -> Self {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really intended to be part of the API, recommend guarding with #[cfg(test)]

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we will need this initialiser since all Task implementations will also involve creating new JoinHandle instances (in method spawn_task.) Having this initialiser makes it easier and prevents implementations from messing up internal members.

Comment on lines 134 to 144
// On drop, send shutdown signal and wait for the task to terminate
impl Drop for JoinHandle {
fn drop(&mut self) {
if !self.dropped {
let mut this = JoinHandle::default();
std::mem::swap(&mut this, self);
this.dropped = true;
tokio::spawn(async move { this.async_drop().await });
}
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume this is rather similar to the AsyncDropper wrapper from async_dropper_simple. I'm more leaning towards using the derive feature from async_drop instead of the simple version in which case we could use #[derive(AsyncDrop)]. What do you think? cc @ryutamago @zcabter

Copy link
Collaborator

@zcabter zcabter Sep 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initially, was leaning towards async_dropper_simple since it removed the Default requirement as it gets more difficult to define default for large structs and there is a tendency to simply derive non-meaningful defaults makes the code harder to use but it adds invasive boilerplate.

If we use derive, our structs are rather simple (✅ ) and so we just have to be careful to define a meaningful defaults where we can.

tldr; derive is fine with me, define meaningful defaults where possible.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the derive feature requires us to implement Eq and PartialEq, which is going to be even messier. Had a discussion with @johnyob and the cleaner solution would be to write a macro that implements the drop for us

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, we don't need to implement drop if the tasks don't panic. Instead, have the jstzd orchestrator registers shutdown/clean up hooks for each task it registers. The orchestrator could control shutdown of tasks based on the state of other tasks. If orchestrator itself panics, it calls shutdown in its async dropper which isolates the async dropper boilerplate. This makes the shutdowns more explicit rather than implicitly cascading, its easier to test and easier to read. wdyt?

@johnyob

));
}
if let Some(handle) = self.task.get() {
handle.abort();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comment above re: signals

impl Drop for JoinHandle {
fn drop(&mut self) {
if !self.dropped {
let mut this = JoinHandle::default();
Copy link
Collaborator

@ryutamago ryutamago Sep 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

q:
shouldn't we set self.dropped = true before and after the swap to prevent double drop?

if !self.dropped {
   self.dropped = true
   let mut this = ...
   std::mem::swap(&mut this, self);
   self.dropped = true;
   tokio::spwan(

let mut this = JoinHandle::default();
std::mem::swap(&mut this, self);
this.dropped = true;
tokio::spawn(async move { this.async_drop().await });
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the main thread exits before the async_drop completes, the drop will not get executed. the async-dropper-simple library uses the scope_and_block function instead to block the current thread until the task ends. perhaps it is a better option ? cc @johnyob @zcabter

Copy link
Collaborator

@zcabter zcabter Sep 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think calling tokio::spawn here causes race conditions. Drop is synchronous so we should only allow synchronous calls. I suggest something like tokio::block_in_place?

@huancheng-trili
Copy link
Collaborator Author

huancheng-trili commented Sep 12, 2024

I've managed to fix everything. This implementation is a bit annoying, but I think we need it to be like this. Here is the explanation.

We want to enable the following things:

  • Users should be able to wait for the task and retrieve the result in the end.
  • Users should be able to implement signal handlers inside tasks and send signals to tasks to gracefully shutdown tasks.
  • There must be some mechanism for aborting tasks forcefully.
  • When a handle is being dropped, the running task should be cancelled.

To achieve these, some new members are added to the JoinHandle struct. Now it has the following members:

  • handle: the tokio task handle of the actually running task in a wrapper struct.
  • signal_tx: the channel sender that allows users to send signals to the running task. Its receiver counterpart should be injected somewhere in the task defined by users.
  • execution_result: a mutex that keeps the result of the running task.

Signalling is now achieved through channels. Task creators are supposed to somehow include the receiver in the task definition (and implement signal handling) and pass the sender to the handle. Calling .signal basically calls the underlying signal sender to send the given signals.

One main challenge here is moving handle instances. A possible use case of the handles is as follows:

let handle = JoinHandle::new(...);

// Do something and send signals to the task
tokio::spawn(async move {
    ...
    handle.signal(...);
});

// Abort the task should anything fatal happen
tokio::spawn(async move {
    ...
    handle.abort().await;
});

let result = handle.wait().await;

handle then needs to be moved, unless copies are passed into the futures. This means the handle needs to implement Clone and each copy should hold a reference to the running task handle. On the other hand, we want to cancel the underlying task when a handle instance is being destroyed but not when the copies are destroyed. This means this cancellation needs to be implemented for the underlying handle struct, which is why there is a new wrapper struct JoinHandleWrapper that wraps around the actual tokio handles. The main point for this wrapper is that it terminates the running task in its Drop implementation. Therefore, when the last copy of our JoinHandle struct is destroyed, the underlying task will surely be cancelled but not prematurely when other copies of the handle are dropped.

Also moved JoinHandle into its own file since it's way too long.

Copy link

codecov bot commented Sep 12, 2024

Codecov Report

Attention: Patch coverage is 80.95238% with 44 lines in your changes missing coverage. Please review.

Project coverage is 35.39%. Comparing base (045203c) to head (570f1ff).
Report is 2 commits behind head on main.

Files with missing lines Patch % Lines
crates/jstzd/src/task/joinhandle.rs 80.95% 26 Missing and 18 partials ⚠️
Files with missing lines Coverage Δ
crates/jstzd/src/task/mod.rs 75.83% <ø> (ø)
crates/jstzd/src/task/joinhandle.rs 80.95% <80.95%> (ø)

... and 14 files with indirect coverage changes


Continue to review full report in Codecov by Sentry.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 045203c...570f1ff. Read the comment docs.

@huancheng-trili huancheng-trili deleted the huan/jstzd-task-trait branch September 13, 2024 14:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants