From 1499cc6b780a9e89bfb49132d34e880017b87aa4 Mon Sep 17 00:00:00 2001 From: ybbh Date: Wed, 29 Nov 2023 16:18:48 +0800 Subject: [PATCH] scupt-net --- src/task.rs | 60 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/src/task.rs b/src/task.rs index 67a2df1..307ede1 100644 --- a/src/task.rs +++ b/src/task.rs @@ -1,6 +1,8 @@ use std::future::Future; +use std::time::Duration; use scupt_util::res::Res; +use tokio::time::sleep; use tokio::{select, task}; use tokio::task::JoinHandle; use tracing::trace; @@ -44,6 +46,21 @@ pub fn spawn_task(cancel_notifier: Notifier, _name: &str, future: F) -> Res( + cancel_notifier: Notifier, + duration: Duration, + _name: &str, + future: F +) -> Res>> + where + F: Future + Send + 'static, + F::Output: Send + 'static, +{ + Ok(task::spawn(async move { + __select_local_till_done_or_timeout(cancel_notifier, duration, future).await + })) +} #[cfg(task_name)] pub fn spawn_task(cancel_notifier: Notifier, name: &str, future: F) -> Res>> @@ -60,6 +77,20 @@ pub fn spawn_task(cancel_notifier: Notifier, name: &str, future: F) -> Res(cancel_notifier: Notifier, duration: Duration, name: &str, future: F) -> Res>> + where + F: Future + Send + 'static, + F::Output: Send + 'static, +{ + let r = task::Builder::default().name(name).spawn(async move { + __select_local_till_done_or_timeout(cancel_notifier, duration, future).await + }); + match r { + Ok(f) => Ok(f), + Err(e) => Err(scupt_util::error_type::ET::FatalError(e.to_string())) + } +} async fn __select_local_till_done(notify: Notifier, future: F) -> Option where F: Future + 'static, @@ -82,6 +113,35 @@ async fn __select_local_till_done(notify: Notifier, future: F) -> Option(notify: Notifier, duration:Duration, future: F) -> Result + where + F: Future + 'static, + F::Output: 'static, +{ + let future = async move { + let r = select! { + _ = notify.notified() => { + trace ! ("local task stop"); + Err(TaskFailed::Cancel) + } + r = future => { + trace ! ("local task end"); + Ok(r) + } + _ = sleep(duration) => { + Err(TaskFailed::Timeout) + } + }; + r + }; + let opt = future.await; + opt +} async fn __select_till_done(notify: Notifier, future: F) -> Option where