Skip to content

Commit

Permalink
scupt-net
Browse files Browse the repository at this point in the history
  • Loading branch information
ybbh committed Nov 28, 2023
1 parent a6e9050 commit e78cdd6
Show file tree
Hide file tree
Showing 7 changed files with 141 additions and 153 deletions.
2 changes: 0 additions & 2 deletions .cargo/config.toml
Original file line number Diff line number Diff line change
@@ -1,2 +0,0 @@
[build]
rustflags = ["--cfg", "tokio_unstable"]
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ version = "0.0.1"
edition = "2021"

[build]
rustflags = ["--cfg", "tokio_unstable"]


[dependencies]
scupt-util = { git = "https://github.com/scuptio/scupt-util.git" }
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,6 @@ pub mod client;
pub mod message_receiver_endpoint;
mod message_sender_endpoint;
mod parse_dtm_message;
pub mod task;


7 changes: 4 additions & 3 deletions src/net_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ use crate::event_sink_impl::EventSenderImpl;
use crate::handle_event::HandleEvent;
use crate::message_channel::MessageChSender;
use crate::message_receiver_channel::MessageReceiverChannel;
use crate::notifier::{Notifier, spawn_cancelable_task};
use crate::notifier::Notifier;
use crate::task::spawn_local_task;

pub type NodeSender<MsgTrait> = EventSenderImpl<MsgTrait>;

Expand All @@ -42,9 +43,9 @@ impl<M: MsgTrait> HandleEvent for NetHandler<M> {
async fn on_accepted(&self, endpoint: Endpoint) -> Res<()> {
trace!("{} accept connection {}", self.name, endpoint.remote_address().to_string());
let inner = self.inner.clone();
spawn_cancelable_task(inner.stop_notify.clone(), "handle_message, ", async move {
spawn_local_task(inner.stop_notify.clone(), "handle_message, ", async move {
let _r = inner.process_message(endpoint).await;
});
})?;
Ok(())
}

Expand Down
52 changes: 26 additions & 26 deletions src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use scupt_util::res_of::res_io;
use tokio::net::{TcpListener, TcpStream};
use tokio::runtime::Runtime;
use tokio::task::LocalSet;
use tracing::{error, trace, trace_span, Instrument};
use tracing::{error, Instrument, trace, trace_span};

use crate::endpoint::Endpoint;
use crate::event::{EventResult, NetEvent, ResultSender};
Expand All @@ -19,8 +19,9 @@ use crate::handle_event::HandleEvent;
use crate::message_sender::{Sender, SenderRR};
use crate::net_handler::NodeSender;
use crate::node_context::NodeContext;
use crate::notifier::{Notifier, spawn_cancelable_task, spawn_cancelable_task_local_set};
use crate::notifier::Notifier;
use crate::opt_ep::OptEP;
use crate::task::spawn_local_task;

#[derive(Clone)]
pub struct Node<
Expand Down Expand Up @@ -118,19 +119,18 @@ Node<
trace!("main loop {}", n.name());
let task_name = format!("{}_main_loop", n.name());
let notify = n.stop_notify();
spawn_cancelable_task_local_set(
&local_set,
notify,
task_name.as_str(),
async move {
Self::run_main_loop(
name,
n,
c,
h,
).instrument(trace_span!("main loop")).await;
},
);
let f = async move {
Self::run_main_loop(
name,
n,
c,
h,
).instrument(trace_span!("main loop")).await;
};

local_set.spawn_local(async move {
spawn_local_task(notify, task_name.as_str(), f)
});
}

#[async_backtrace::framed]
Expand Down Expand Up @@ -222,11 +222,11 @@ Node<
}
NetEvent::Stop(opt_s) => {
let stop_notify = node.stop_notify();
spawn_cancelable_task(stop_notify, "stop and notify", async move {
let _ = spawn_local_task(stop_notify, "stop and notify", async move {
node.stop_and_notify().await;
handle.on_stop().await;
Self::handle_opt_send_result(EventResult::ErrorType(ET::OK), opt_s);
});
})?;
return Err(ET::EOF);
}
NetEvent::NewEventChannel(ch) => {
Expand Down Expand Up @@ -269,7 +269,7 @@ Node<
handle,
).await;
};
spawn_cancelable_task(notify, task_name.as_str(), main_loop);
spawn_local_task(notify, task_name.as_str(), main_loop)?;
Ok(())
}

Expand All @@ -295,11 +295,11 @@ Node<
).await;
trace!("on connected done {}", task_name2);
};
spawn_cancelable_task(
spawn_local_task(
notify,
task_name.as_str(),
on_connected,
);
).unwrap();
}

async fn task_handle_connected(
Expand Down Expand Up @@ -394,11 +394,11 @@ Node<
}
};
};
spawn_cancelable_task(
spawn_local_task(
notify,
format!("first accept {}", node_id).as_str(),
future_accept_first,
);
)?;
Ok(())
}

Expand Down Expand Up @@ -455,16 +455,16 @@ Node<
}
}
};
spawn_cancelable_task(
spawn_local_task(
node.stop_notify(),
format!("accept connect {}", node.name()).as_str(),
on_accepted,
);
spawn_cancelable_task(
)?;
spawn_local_task(
node.stop_notify(),
format!("accept new connect {}", node.name()).as_str(),
future_accept_new_connection,
);
)?;
Ok(())
}

Expand Down
124 changes: 3 additions & 121 deletions src/notifier.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,11 @@
use std::future::Future;
use std::io::Result;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::atomic::Ordering::SeqCst;
use std::thread;
use std::time::Duration;
use scupt_util::error_type::ET;
use scupt_util::res::Res;

use tokio::select;
use tokio::sync::Notify;
use tokio::task;
use tokio::task::{JoinHandle, LocalSet};
use tokio::time::sleep;
use tracing::trace;

Expand Down Expand Up @@ -66,13 +60,10 @@ impl Notifier {
trace!("notify waiter {}", self.name);
let ret = self.inner.notify_all();
let inner = self.inner.clone();
let r = task::Builder::default().spawn(async move {

let _r = task::spawn(async move {
inner.repeat_notify_until_no_waiting().await;
});
match r {
Ok(_j) => {}
Err(e) => { trace!("notify error {:?}", e); }
}
ret
}

Expand Down Expand Up @@ -141,113 +132,4 @@ impl NotifyInner {
self.stop_notifier.notify_waiters();
}
}
}

pub async fn select_till_done<F>(notify: Notifier, future: F)
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
let f = async move {
let _ = select! {
_ = notify.notified() => {
trace ! ("task stop");
}
_r = future => {
trace ! ("task end");
}
};
};
f.await;
}


pub async fn select_local_till_done<F>(notify: Notifier, future: F)
where
F: Future + 'static,
F::Output: 'static,
{
let future = async move {
let _ = select! {
_ = notify.notified() => {
trace ! ("local task stop");
}
_r = future => {
trace ! ("local task end");
}
};
};
future.await;
}

pub fn spawn_cancelable_task_mt<F>(stop_notify: Notifier, name: &str, future: F) -> Result<JoinHandle<()>>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
task::Builder::default().name(name).spawn(async move {
select_till_done(stop_notify, future).await
})
}

pub fn spawn_cancelable_task_local_set<F>(
task_set: &LocalSet,
stop_notify: Notifier,
name: &str,
future: F)
where
F: Future + 'static,
F::Output: 'static,
{
let n = String::from(name);
task_set.spawn_local(async move {
spawn_cancelable_task(stop_notify, n.as_str(), future)
});
}


pub fn spawn_cancelable_task<F>(stop_notify: Notifier, name: &str, future: F)
where
F: Future + 'static,
F::Output: 'static,
{
let _ = task::Builder::default().name(name).spawn_local(async move {
select_local_till_done(stop_notify, future).await
}).unwrap();
}


pub async fn __select_local_till_done<F>(notify: Notifier, future: F) -> Option<F::Output>
where
F: Future + 'static,
F::Output: 'static,
{
let future = async move {
let r = select! {
_ = notify.notified() => {
trace ! ("local task stop");
None
}
r = future => {
trace ! ("local task end");
Some(r)
}
};
r
};
let opt = future.await;
opt
}
pub fn spawn_task<F>(cancel_notifier: Notifier, name: &str, future: F) -> Res<JoinHandle<Option<F::Output>>>
where
F: Future + 'static,
F::Output: 'static,
{
let r = task::Builder::default().name(name).spawn_local(async move {
__select_local_till_done(cancel_notifier, future).await
});
match r {
Ok(f) => Ok(f),
Err(e) => Err(ET::FatalError(e.to_string()))
}
}
}
Loading

0 comments on commit e78cdd6

Please sign in to comment.