Skip to content

Commit

Permalink
scupt-net
Browse files Browse the repository at this point in the history
  • Loading branch information
ybbh committed Nov 29, 2023
1 parent ac96bab commit 1499cc6
Showing 1 changed file with 60 additions and 0 deletions.
60 changes: 60 additions & 0 deletions src/task.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::future::Future;
use std::time::Duration;

use scupt_util::res::Res;
use tokio::time::sleep;
use tokio::{select, task};
use tokio::task::JoinHandle;
use tracing::trace;
Expand Down Expand Up @@ -44,6 +46,21 @@ pub fn spawn_task<F>(cancel_notifier: Notifier, _name: &str, future: F) -> Res<J
}))
}

#[cfg(not(task_name))]
pub fn spawn_task_timeout<F>(
cancel_notifier: Notifier,
duration: Duration,
_name: &str,
future: F
) -> Res<JoinHandle<Result<F::Output, TaskFailed>>>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
Ok(task::spawn(async move {
__select_local_till_done_or_timeout(cancel_notifier, duration, future).await
}))
}

#[cfg(task_name)]
pub fn spawn_task<F>(cancel_notifier: Notifier, name: &str, future: F) -> Res<JoinHandle<Option<F::Output>>>
Expand All @@ -60,6 +77,20 @@ pub fn spawn_task<F>(cancel_notifier: Notifier, name: &str, future: F) -> Res<Jo
}
}

#[cfg(task_name)]
pub fn spawn_task<F>(cancel_notifier: Notifier, duration: Duration, name: &str, future: F) -> Res<JoinHandle<Result<F::Output, TaskFailed>>>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
let r = task::Builder::default().name(name).spawn(async move {
__select_local_till_done_or_timeout(cancel_notifier, duration, future).await
});
match r {
Ok(f) => Ok(f),
Err(e) => Err(scupt_util::error_type::ET::FatalError(e.to_string()))
}
}
async fn __select_local_till_done<F>(notify: Notifier, future: F) -> Option<F::Output>
where
F: Future + 'static,
Expand All @@ -82,6 +113,35 @@ async fn __select_local_till_done<F>(notify: Notifier, future: F) -> Option<F::O
opt
}

pub enum TaskFailed {
Cancel,
Timeout,
}

async fn __select_local_till_done_or_timeout<F>(notify: Notifier, duration:Duration, future: F) -> Result<F::Output, TaskFailed>
where
F: Future + 'static,
F::Output: 'static,
{
let future = async move {
let r = select! {
_ = notify.notified() => {
trace ! ("local task stop");
Err(TaskFailed::Cancel)
}
r = future => {
trace ! ("local task end");
Ok(r)
}
_ = sleep(duration) => {
Err(TaskFailed::Timeout)
}
};
r
};
let opt = future.await;
opt
}

async fn __select_till_done<F>(notify: Notifier, future: F) -> Option<F::Output>
where
Expand Down

0 comments on commit 1499cc6

Please sign in to comment.