diff --git a/Cargo.toml b/Cargo.toml index 52841ab..78e3f7f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,4 +30,12 @@ async-backtrace = "0.2.6" lazy_static = "1.4.0" scc = "2.0.18" uuid = { version = "1.6.1", features = ["v4"] } +scopeguard = { version = "1.2.0" } + +hyper = { version = "1", features = ["full"] } +hyper-util = { version = "0.1.3", features = ["tokio", "server"] } + +http-body-util = "0.1" + +http = "1" diff --git a/src/client.rs b/src/client.rs index 046e7f7..0c79cdb 100644 --- a/src/client.rs +++ b/src/client.rs @@ -17,6 +17,7 @@ use crate::es_option::ESConnectOption; use crate::handle_event::HandleEvent; use crate::node::Node; use crate::notifier::Notifier; +use crate::task_trace; #[derive(Clone)] pub struct Client { @@ -44,19 +45,27 @@ impl Client { self.inner.run(local); } + #[async_backtrace::framed] pub async fn is_connected(&self) -> bool { + let _t = task_trace!(); self.inner.is_connected().await } + #[async_backtrace::framed] pub async fn connect(&self, opt: OptClientConnect) -> Res<()> { + let _t = task_trace!(); self.inner.connect(opt).await } + #[async_backtrace::framed] pub async fn send(&self, message: Message) -> Res<()> { + let _t = task_trace!(); self.inner.send(message).await } + #[async_backtrace::framed] pub async fn recv(&self) -> Res> { + let _t = task_trace!(); self.inner.recv().await } @@ -114,12 +123,16 @@ impl ClientInner { self.node.run_local(local); } + #[async_backtrace::framed] pub async fn is_connected(&self) -> bool { + let _t = task_trace!(); let g = self.opt_endpoint.lock().await; g.is_some() } + #[async_backtrace::framed] pub async fn connect(&self, opt: OptClientConnect) -> Res<()> { + let _t = task_trace!(); let mut opt_ep = None; let mut n = opt.retry_max; while opt.retry_max == 0 || n > 0 { @@ -147,7 +160,9 @@ impl ClientInner { Ok(()) } + #[async_backtrace::framed] pub async fn send(&self, message: Message) -> Res<()> { + let _t = task_trace!(); let guard = self.opt_endpoint.lock().await; if let Some(e) = &(*guard) { e.send(message).await?; @@ -157,7 +172,9 @@ impl ClientInner { } } + #[async_backtrace::framed] pub async fn recv(&self) -> Res> { + let _t = task_trace!(); let guard = self.opt_endpoint.lock().await; if let Some(e) = &(*guard) { let m = e.recv().await?; diff --git a/src/debug.rs b/src/debug.rs new file mode 100644 index 0000000..56ce695 --- /dev/null +++ b/src/debug.rs @@ -0,0 +1,104 @@ +// Example hyper http server +// https://github.com/hyperium/hyper/blob/master/examples/echo.rs + +use std::net::SocketAddr; + +use bytes::Bytes; +use http::{Method, StatusCode}; +use http_body_util::Full; +use hyper::{Request, Response}; +use hyper::body::Incoming; +use hyper::server::conn::http1; +use hyper::service::service_fn; +use hyper_util::rt::TokioIo; +use lazy_static::lazy_static; +use scc::{HashIndex, HashSet}; +use scupt_util::error_type::ET; +use scupt_util::res::Res; +use tokio::net::TcpListener; + +use crate::dump_task_trace; + +type HandleURL = fn(String) -> Res; + +lazy_static!( + static ref HANDLE_URL : HashIndex = HashIndex::new(); + static ref SERVER: HashSet = HashSet::new(); +); + + +pub fn register_debug_url(url: String, h: HandleURL) { + let _ = HANDLE_URL.insert(url, h); +} + +async fn handle_request(req: Request) -> Result>, hyper::Error> { + let mut response = Response::new(Full::default()); + match req.method() { + &Method::GET => { + let path = req.uri().path(); + match path { + "/task" => { + let dump = dump_task_trace!(); + *response.body_mut() = Full::from(dump); + } + _ => { + let opt = HANDLE_URL.get(&path.to_string()); + match opt { + Some(e) => { + let h = e.get().clone(); + let s = h(path.to_string()).unwrap_or_else(|e| { e.to_string() }); + *response.body_mut() = Full::from(s); + } + None => { + *response.status_mut() = StatusCode::NOT_FOUND; + } + } + } + } + } + _ => { + *response.status_mut() = StatusCode::NOT_FOUND; + } + } + Ok(response) +} + +pub async fn debug_server_serve(addr: SocketAddr) -> Result<(), Box> { + let port = addr.port(); + let r = SERVER.insert(port); + if r.is_err() { + return Err(Box::new(ET::ExistingSuchElement)); + } + + // Bind to the port and listen for incoming TCP connections + let listener = TcpListener::bind(addr).await?; + println!("Listening on http://{}", addr); + loop { + // When an incoming TCP connection is received grab a TCP stream for + // client<->server communication. + // + // Note, this is a .await point, this loop will loop forever but is not a busy loop. The + // .await point allows the Tokio runtime to pull the task off of the thread until the task + // has work to do. In this case, a connection arrives on the port we are listening on and + // the task is woken up, at which point the task is then put back on a thread, and is + // driven forward by the runtime, eventually yielding a TCP stream. + let (tcp, _) = listener.accept().await?; + // Use an adapter to access something implementing `tokio::io` traits as if they implement + // `hyper::rt` IO traits. + let io = TokioIo::new(tcp); + + // Spin up a new task in Tokio so we can continue to listen for new TCP connection on the + // current task without waiting for the processing of the HTTP1 connection we just received + // to finish + tokio::task::spawn(async move { + // Handle the connection from the client using HTTP1 and pass any + // HTTP requests received on that connection to the `hello` function + if let Err(err) = http1::Builder::new() + .serve_connection(io, service_fn(handle_request)) + .await + { + println!("Error serving connection: {:?}", err); + } + }); + } +} \ No newline at end of file diff --git a/src/endpoint_async_impl.rs b/src/endpoint_async_impl.rs index 143f6ce..f69deb2 100644 --- a/src/endpoint_async_impl.rs +++ b/src/endpoint_async_impl.rs @@ -9,6 +9,7 @@ use tokio::net::TcpStream; use crate::endpoint_async::EndpointAsync; use crate::endpoint_inner::_Endpoint; use crate::opt_ep::OptEP; +use crate::task_trace; #[derive(Clone)] pub struct EndpointAsyncImpl { @@ -22,15 +23,22 @@ impl EndpointAsync for EndpointAsyncImpl { self._remote_address() } + #[async_backtrace::framed] async fn send(&self, m: Message) -> Res<()> { + let _t = task_trace!(); self._send(m).await } + #[async_backtrace::framed] async fn recv(&self) -> Res> { + let _t = task_trace!(); + self._recv().await } + #[async_backtrace::framed] async fn close(&self) -> Res<()> { + let _t = task_trace!(); self._close().await } } @@ -42,11 +50,15 @@ impl EndpointAsyncImpl { } } + #[async_backtrace::framed] async fn _send(&self, m: Message) -> Res<()> { + let _t = task_trace!(); self._ep.send(m).await } + #[async_backtrace::framed] async fn _recv(&self) -> Res> { + let _t = task_trace!(); self._ep.recv::().await } @@ -54,7 +66,9 @@ impl EndpointAsyncImpl { self._ep.remote_address() } + #[async_backtrace::framed] async fn _close(&self) -> Res<()> { + let _t = task_trace!(); self._ep.close().await } } diff --git a/src/endpoint_inner.rs b/src/endpoint_inner.rs index dcdb513..e85c7c2 100644 --- a/src/endpoint_inner.rs +++ b/src/endpoint_inner.rs @@ -18,8 +18,8 @@ use tokio::sync::Mutex; use tokio_util::codec::Framed; use tracing::{Instrument, trace_span}; +use crate::{parse_dtm_message, task_trace}; use crate::framed_codec::FramedCodec; -use crate::parse_dtm_message; pub struct _Endpoint { sender: Mutex, BytesMut>>, @@ -52,7 +52,9 @@ impl _Endpoint { } // send message + #[async_backtrace::framed] pub async fn send(&self, m: Message) -> Res<()> { + let _t = task_trace!(); if self.enable_dtm_test { return Ok(()); } @@ -67,7 +69,10 @@ impl _Endpoint { } // receive a message + #[async_backtrace::framed] pub async fn recv(&self) -> Res> { + let _t = task_trace!(); + let mut stream = self.receiver.lock().instrument(trace_span!("lock")).await; let opt = stream.next().await; let r = match opt { @@ -91,7 +96,9 @@ impl _Endpoint { } } + #[async_backtrace::framed] pub async fn close(&self) -> Res<()> { + let _t = task_trace!(); let r1 = { let mut sink = self.sender.lock().await; sink.close().await diff --git a/src/endpoint_sync_impl.rs b/src/endpoint_sync_impl.rs index eb682cb..4ec51a4 100644 --- a/src/endpoint_sync_impl.rs +++ b/src/endpoint_sync_impl.rs @@ -15,6 +15,7 @@ use crate::endpoint_async::EndpointAsync; use crate::endpoint_sync::EndpointSync; use crate::notifier::Notifier; use crate::task::spawn_local_task; +use crate::task_trace; pub struct EndpointSyncImpl { endpoint: Arc>, @@ -69,7 +70,9 @@ impl EndpointSyncImpl { } } + #[async_backtrace::framed] async fn handle_send(&self) -> Res<()> { + let _t = task_trace!(); let mut opt = self.s_receiver.lock().await; let mut opt_receiver = None; std::mem::swap(&mut opt_receiver, &mut opt); @@ -86,7 +89,9 @@ impl EndpointSyncImpl { Ok(()) } + #[async_backtrace::framed] async fn handle_receive(&self) -> Res<()> { + let _t = task_trace!(); let mut opt = self.r_invoke_receiver.lock().await; let mut opt_receiver = None; std::mem::swap(&mut opt_receiver, &mut opt); diff --git a/src/event_channel.rs b/src/event_channel.rs index 23c0b55..cc763b6 100644 --- a/src/event_channel.rs +++ b/src/event_channel.rs @@ -5,6 +5,7 @@ use tokio::sync::mpsc; use tracing::trace; use crate::event::NetEvent; +use crate::task_trace; type SyncMutex = std::sync::Mutex; @@ -34,7 +35,9 @@ impl EventReceiver { } } + #[async_backtrace::framed] pub async fn recv(&mut self) -> Res> { + let _t = task_trace!(); let opt = self.inner.recv().await; trace!("{} receive event", self._name); match opt { diff --git a/src/event_sink_impl.rs b/src/event_sink_impl.rs index ad87d74..a66bfde 100644 --- a/src/event_sink_impl.rs +++ b/src/event_sink_impl.rs @@ -21,6 +21,7 @@ use crate::message_receiver_endpoint::MessageReceiverEndpoint; use crate::message_sender_async::{SenderAsync, SenderRRAsync}; use crate::message_sender_sync::SenderSync; use crate::opt_send::OptSend; +use crate::task_trace; pub struct EventSenderImpl { name: String, @@ -40,10 +41,12 @@ impl EventSenderImpl { &self.name } + #[async_backtrace::framed] async fn recv_result( &self, receiver: AsyncReceiver>, ) -> Res<()> { + let _ = task_trace!(); let ret = receiver.await.map_err(|e| { ET::RecvError(e.to_string()) })?; @@ -60,10 +63,12 @@ impl EventSenderImpl { ret } + #[async_backtrace::framed] async fn recv_result_async_ep( &self, receiver: AsyncReceiver>>>>, ) -> Res>>> { + let _ = task_trace!(); let ret = receiver.await.map_err(|e| { ET::RecvError(e.to_string()) })?; @@ -80,7 +85,9 @@ impl EventSenderImpl { ret } + #[async_backtrace::framed] async fn serve_async(&self, addr: SocketAddr, no_wait: bool) -> Res<()> { + let _ = task_trace!(); trace!("async serve {} {}", self.channel_name(), addr.to_string()); if no_wait { let event = NetEvent::NetListen(addr, ResultSenderType::SendNone); @@ -108,12 +115,14 @@ impl EventSenderImpl { Ok(()) } + #[async_backtrace::framed] async fn connect_async( &self, node_id: NID, address: SocketAddr, no_wait: bool, read_endpoint: bool, ) -> Res>>> { + let _ = task_trace!(); trace!("channel name {}, send connect to {}", self.name, node_id); if no_wait && !read_endpoint { let event = NetEvent::NetConnect { @@ -214,6 +223,7 @@ impl EventSenderImpl { } } + #[async_backtrace::framed] pub async fn stop_async(&self, no_wait: bool) -> Res<()> { if no_wait { let event = NetEvent::Stop(ResultSenderType::SendNone); @@ -268,15 +278,20 @@ impl< > EventSinkAsync for EventSenderImpl< M > { + #[async_backtrace::framed] async fn stop(&self, opt: ESStopOpt) -> Res<()> { + let _ = task_trace!(); self.stop_async(opt.no_wait()).await } + #[async_backtrace::framed] async fn serve(&self, addr: SocketAddr, opt: ESServeOpt) -> Res<()> { self.serve_async(addr, opt.no_wait()).await } + #[async_backtrace::framed] async fn connect(&self, node_id: NID, address: SocketAddr, opt: ESConnectOpt) -> Res>>> { + let _t = task_trace!(); self.connect_async(node_id, address, opt.no_wait(), opt.return_endpoint()).await } } @@ -326,7 +341,9 @@ impl< impl< M: MsgTrait + 'static, > SenderRRAsync for EventSenderImpl { + #[async_backtrace::framed] async fn send(&self, message: Message, _opt: OptSend) -> Res>> { + let _t = task_trace!(); let opt = self.send_async(message, _opt.is_enable_no_wait(), true).await?; match opt { Some(recv) => { Ok(recv) } diff --git a/src/io_service.rs b/src/io_service.rs index f6c1e6d..ce4642a 100644 --- a/src/io_service.rs +++ b/src/io_service.rs @@ -5,6 +5,7 @@ use scupt_util::node_id::NID; use scupt_util::res::Res; use tokio::runtime::Runtime; use tokio::task::LocalSet; +use crate::debug::debug_server_serve; use crate::event_sink_async::EventSinkAsync; use crate::event_sink_sync::EventSinkSync; @@ -19,6 +20,7 @@ use crate::message_sender_sync::SenderSync; use crate::net_handler::NetHandler; use crate::node::Node; use crate::notifier::Notifier; +use crate::task::spawn_local_task; type ServiceNode = Node< M, @@ -27,6 +29,7 @@ type ServiceNode = Node< pub struct IOService { node_id: NID, + port_debug:Option, node: ServiceNode, receiver_async: Vec>>, receiver_sync: Vec>>, @@ -37,6 +40,7 @@ pub struct IOServiceOpt { pub num_message_receiver: u32, pub testing: bool, pub sync_service: bool, + pub port_debug: Option, } impl IOService { @@ -75,6 +79,7 @@ impl IOService { opt: IOServiceOpt, stop_notify: Notifier, ) -> Res { + let handler = NetHandler::::new(node_id.clone(), name.clone(), opt.sync_service, @@ -91,6 +96,7 @@ impl IOService { let s = Self { node_id, + port_debug: opt.port_debug, node, receiver_async, receiver_sync, @@ -103,6 +109,20 @@ impl IOService { Some(ls) => { ls } None => { LocalSet::new() } }; + match &self.port_debug { + None => {} + Some(p) => { + let port = p.clone(); + let f = debug_server_serve(([0, 0, 0, 0], port.clone()).into()); + let n = self.node.stop_notify(); + ls.spawn_local(async move { + spawn_local_task(n, + "debug service", + f + ) + }); + } + } self.node.block_run(Some(ls), runtime); } diff --git a/src/lib.rs b/src/lib.rs index 31efe5b..1373dd5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -41,5 +41,8 @@ mod endpoint_sync; mod endpoint_sync_impl; mod endpoint_inner; mod message_receiver_channel_sync; +pub mod debug; + +mod test_debug_server; diff --git a/src/message_receiver_endpoint.rs b/src/message_receiver_endpoint.rs index 4970bd3..034c5af 100644 --- a/src/message_receiver_endpoint.rs +++ b/src/message_receiver_endpoint.rs @@ -7,6 +7,7 @@ use scupt_util::res::Res; use crate::endpoint_async::EndpointAsync; use crate::message_receiver_async::ReceiverResp; +use crate::task_trace; pub struct MessageReceiverEndpoint { ep: Arc>, @@ -24,7 +25,9 @@ impl MessageReceiverEndpoint { #[async_trait] impl ReceiverResp for MessageReceiverEndpoint { + #[async_backtrace::framed] async fn receive(&self) -> Res> { + let _ = task_trace!(); self.ep.recv().await } } \ No newline at end of file diff --git a/src/net_handler.rs b/src/net_handler.rs index da00b19..38b05b7 100644 --- a/src/net_handler.rs +++ b/src/net_handler.rs @@ -24,6 +24,7 @@ use crate::message_receiver_channel_async::MessageReceiverChannelAsync; use crate::message_receiver_channel_sync::MessageReceiverChannelSync; use crate::notifier::Notifier; use crate::task::spawn_local_task; +use crate::task_trace; pub type NodeSender = EventSenderImpl; @@ -49,7 +50,7 @@ impl HandleEvent for NetHandler { async fn on_accepted(&self, endpoint: Arc>) -> Res<()> { trace!("{} accept connection {}", self.name, endpoint.remote_address().to_string()); let inner = self.inner.clone(); - spawn_local_task(inner.stop_notify.clone(), "handle_message, ", async move { + spawn_local_task(inner.stop_notify.clone(), "connection_handle_message", async move { let _r = inner.process_message(endpoint).await; })?; Ok(()) @@ -161,7 +162,9 @@ impl InnerNetHandler { self.message_sync_ch_receiver.clone() } + #[async_backtrace::framed] async fn receiver_message(&self, message: Message, ep: Arc>) -> Res<()> { + let _t = task_trace!(); let mut hasher = DefaultHasher::new(); message.hash(&mut hasher); let hash = hasher.finish(); @@ -188,17 +191,19 @@ impl InnerNetHandler { } } - + #[async_backtrace::framed] async fn stop(&self) { + let _t = task_trace!(); let mut guard = self.message_async_ch_sender.lock().await; guard.clear(); } - + #[async_backtrace::framed] async fn process_message( &self, ep: Arc>, ) -> Res<()> { + let _t = task_trace!(); let r = self.loop_handle_message(&ep) .instrument(trace_span!("loop handle message")).await; match r { @@ -219,10 +224,12 @@ impl InnerNetHandler { Ok(()) } + #[async_backtrace::framed] async fn loop_handle_message( &self, ep: &Arc>, ) -> Res<()> { + let _t = task_trace!(); let mut r = Ok(()); while r.is_ok() { r = self.handle_next_message(ep).await; @@ -242,10 +249,12 @@ impl InnerNetHandler { r } + #[async_backtrace::framed] async fn handle_next_message( &self, ep: &Arc>, ) -> Res<()> { + let _t = task_trace!(); let m = ep.recv().await?; self.receiver_message(m, ep.clone()).await?; Ok(()) diff --git a/src/node.rs b/src/node.rs index 74e96d9..f683698 100644 --- a/src/node.rs +++ b/src/node.rs @@ -27,6 +27,7 @@ use crate::node_context::NodeContext; use crate::notifier::Notifier; use crate::opt_ep::OptEP; use crate::task::spawn_local_task; +use crate::task_trace; #[derive(Clone)] pub struct Node< @@ -162,6 +163,7 @@ Node< handle: Arc, enable_testing: bool, ) { + let _t = task_trace!(); trace!("node {}, run main loop, {}", name, node.name()); let mut receiver = channel; @@ -203,6 +205,7 @@ Node< handle: Arc, enable_testing: bool, ) -> Res<()> { + let _t = task_trace!(); match event { NetEvent::NetConnect { node_id, @@ -278,6 +281,7 @@ Node< Res>>> >, ) -> Res<()> { + let _t = task_trace!(); let _m = message.clone(); let ep_result = node.get_endpoint(node_id).await; let ep_result = match ep_result { @@ -302,6 +306,7 @@ Node< handle: Arc, enable_testing: bool, ) -> Res<()> { + let _t = task_trace!(); let notify = node.stop_notify(); let task_name = format!("main loop {}", name); let main_loop = async move { @@ -330,6 +335,7 @@ Node< >, enable_testing: bool, ) { + let _t = task_trace!(); let node_name = node.name().clone(); let notify = node.stop_notify(); // future process message @@ -350,6 +356,7 @@ Node< ).unwrap(); } + #[async_backtrace::framed] async fn task_handle_connected( node: Arc>, return_endpoint: bool, @@ -362,6 +369,7 @@ Node< >, enable_testing: bool, ) { + let _t = task_trace!(); trace!("{} task handle connect to {} {}", node.name(), node_id, address.to_string()); let r_connect = TcpStream::connect(address).await; trace!("{} task handle connect done, to {} {} ", node.name(), node_id, address.to_string()); @@ -416,6 +424,7 @@ Node< >, enable_testing: bool, ) -> Res<()> { + let _t = task_trace!(); let node_id = node.node_id(); let h = handle.clone(); let notify = node.stop_notify(); @@ -454,6 +463,7 @@ Node< Ok(()) } + #[async_backtrace::framed] async fn after_accept_connection( node: Arc>, listener: TcpListener, @@ -462,6 +472,7 @@ Node< addr: SocketAddr, enable_testing: bool, ) -> Res<()> { + let _t = task_trace!(); trace!("accept new {}", addr.to_string()); let ep = Arc::new(EndpointAsyncImpl::new( socket, @@ -522,12 +533,14 @@ Node< Ok(()) } + #[async_backtrace::framed] async fn accept_new_connection( node: Arc>, listener: TcpListener, handle: Arc, enable_testing: bool, ) -> Res<()> { + let _t = task_trace!(); let r = listener.accept().await; let (socket, addr) = res_io(r)?; Self::after_accept_connection( @@ -540,11 +553,12 @@ Node< ).await } - + #[async_backtrace::framed] fn handle_opt_send_result( opt_ep_sync: Option, opt_ep_async: Option, opt_sender: ResultSenderType) { + let _t = task_trace!(); match opt_sender { ResultSenderType::SendNone => {} ResultSenderType::Sync(s) => { @@ -574,6 +588,7 @@ Node< } } + #[async_backtrace::framed] fn handle_result_endpoint( node: &NodeContext, return_endpoint: bool, @@ -583,6 +598,7 @@ Node< Res>>> >) -> (Option>>>>, Option>>>>) { + let _t = task_trace!(); match opt_result_sender { ResultSenderType::SendNone => { (None, None) diff --git a/src/node_context.rs b/src/node_context.rs index ae2ead5..729d499 100644 --- a/src/node_context.rs +++ b/src/node_context.rs @@ -15,6 +15,7 @@ use crate::event::{NetEvent, ResultSenderType}; use crate::event_channel::EventChannel; use crate::net_handler::NodeSender; use crate::notifier::Notifier; +use crate::task_trace; pub type EventChannelMap = HashMap>>; @@ -68,23 +69,30 @@ impl NodeContext { self.stop_notify.clone() } + #[async_backtrace::framed] pub async fn stop_and_notify(&self) { + let _t = task_trace!(); let ok = self.stop_notify.task_notify_all(); if ok { self.stop().instrument(trace_span!("stop {}", self.node_id)).await; } } + #[async_backtrace::framed] pub async fn get_endpoint(&self, node_id: NID) -> Res>> { + let _t = task_trace!(); let c = self.mutex_ctx.lock().await; c.get_endpoint(node_id) } + #[async_backtrace::framed] pub async fn add_endpoint(&self, node_id: NID, endpoint: Arc>) -> Res<()> { + let _t = task_trace!(); let mut c = self.mutex_ctx.lock().await; c.add_endpoint(node_id, endpoint) } + pub fn new_event_channel(&self, name: String) -> Res>> { let (n, s) = self.new_event_sender(name)?; Ok(Arc::new(NodeSender::new(n, s))) @@ -120,7 +128,9 @@ impl NodeContext { self.enable_testing } + #[async_backtrace::framed] pub async fn stop(&self) { + let _t = task_trace!(); let mut map = self.channel_set.lock().unwrap(); for (_, v) in map.iter() { self.close_one_channel(v.clone()).instrument(trace_span!("close channel ")).await; diff --git a/src/opt_send.rs b/src/opt_send.rs index 58aa160..6246eb2 100644 --- a/src/opt_send.rs +++ b/src/opt_send.rs @@ -1,3 +1,4 @@ + pub struct OptSend { no_wait: bool, } diff --git a/src/task.rs b/src/task.rs index 6c6f667..6d8310f 100644 --- a/src/task.rs +++ b/src/task.rs @@ -1,9 +1,11 @@ +use std::collections::VecDeque; use std::future::Future; use std::sync::{Arc, Mutex}; use std::time::Duration; + +use async_backtrace::Location as BtLoc; use lazy_static::lazy_static; use scc::HashIndex; - use scupt_util::res::Res; use tokio::{select, task, task_local}; use tokio::task::JoinHandle; @@ -22,54 +24,33 @@ pub type TaskID = u128; task_local! { static TASK_ID: TaskID; } + pub struct TaskContext { name: String, notifier:Notifier, local_task:bool, id:u128, - location:Mutex, + backtrace: Mutex>, } pub struct Trace { } - impl Trace { - pub fn new() -> Self { - Self::enter(); + pub fn new(location: BtLoc) -> Self { + Self::enter(location); Self { } } -} -impl Drop for Trace { - fn drop(&mut self) { - Trace::exit() - } -} -#[macro_export] -macro_rules! task_trace { - ( - $id:expr - ) => { - { - Trace::new($id) - } - }; -} - -pub fn this_task_id() -> TaskID { - TASK_ID.get() -} -impl Trace { - fn enter() { + fn enter(location: BtLoc) { let _id = this_task_id(); let opt = TaskContext::get(_id); match opt { Some(_t) => { - + _t.enter(location); } _ => {} } @@ -80,13 +61,76 @@ impl Trace { let opt = TaskContext::get(_id); match opt { Some(_t) => { - + _t.exit(); } _ => {} } } + + pub fn backtrace() -> String { + let _id = this_task_id(); + let opt = TaskContext::get(_id); + match opt { + Some(_t) => { + _t.backtrace() + } + _ => { + "".to_string() + } + } + } + + pub fn dump_task_trace() -> String { + let mut ret = String::new(); + let guard = scc::ebr::Guard::new(); + for (_id, task) in TASK_CONTEXT.iter(&guard) { + let s = format!("name:{},\t id: {},\t trace {}", task.name(), _id, task.backtrace()); + ret.push_str(s.as_str()); + } + ret + } } -pub fn new_task_id() -> TaskID { + +impl Drop for Trace { + fn drop(&mut self) { + Trace::exit() + } +} + +#[macro_export] +macro_rules! task_trace { + () => { + { + let s = async_backtrace::location!(); + crate::task::Trace::new(s) + } + }; +} + +#[macro_export] +macro_rules! dump_task_trace { + () => { + { + crate::task::Trace::dump_task_trace() + } + }; +} + +#[macro_export] +macro_rules! task_backtrace { + () => { + { + crate::task::Trace::backtrace() + } + }; +} + +fn this_task_id() -> TaskID { + TASK_ID.get() +} + + +fn new_task_id() -> TaskID { Uuid::new_v4().as_u128() } @@ -97,7 +141,7 @@ impl TaskContext { notifier, local_task, id, - location: Default::default(), + backtrace: Default::default(), }; let ret = Arc::new(r); let id = ret.id(); @@ -132,16 +176,30 @@ impl TaskContext { self.notifier.clone() } - pub fn enter(&self) { - let s = async_backtrace::location!().to_string(); - let mut location = self.location.lock().unwrap(); - *location = s; + pub fn enter(&self, l: BtLoc) { + let mut location = self.backtrace.lock().unwrap(); + location.push_back(l); } pub fn exit(&self) { - let s = async_backtrace::location!().to_string(); - let mut location = self.location.lock().unwrap(); - *location = s; + let mut location = self.backtrace.lock().unwrap(); + let _ = location.pop_back(); + } + + pub fn backtrace(&self) -> String { + let deque = self.backtrace.lock().unwrap(); + let mut s = String::new(); + s.push_str("backtrace:\n"); + for (n, l) in deque.iter().enumerate() { + s.push_str(" "); + for _ in 0..n { + s.push_str("--"); + } + s.push_str("->"); + s.push_str(l.to_string().as_str()); + s.push_str("\n"); + } + s } } @@ -155,6 +213,7 @@ pub fn spawn_local_task(cancel_notifier: Notifier, _name: &str, future: F) -> F::Output: 'static, { let id = new_task_id(); + let _ = TaskContext::new_context(id, _name.to_string(), false, cancel_notifier.clone()); Ok(task::spawn_local(TASK_ID.scope(id, async move { let r = __select_local_till_done(cancel_notifier, future).await; let _ = TaskContext::remove_context(id); diff --git a/src/test_debug_server.rs b/src/test_debug_server.rs new file mode 100644 index 0000000..d0376ad --- /dev/null +++ b/src/test_debug_server.rs @@ -0,0 +1,30 @@ + +#[cfg(test)] +mod test { + use std::net::SocketAddr; + use std::time::Duration; + use tokio::runtime::Runtime; + use tokio::task::LocalSet; + + use crate::debug::debug_server_serve; + use crate::notifier::Notifier; + use crate::task::spawn_local_task_timeout; + + #[test] + fn test_server() { + let addr: SocketAddr = ([127, 0, 0, 1], 3000).into(); + let runtime = Runtime::new().unwrap(); + let local = LocalSet::new(); + local.spawn_local(async move { + spawn_local_task_timeout( + Notifier::new(), + Duration::from_secs(1), + "", + async move { + debug_server_serve(addr).await + }, + ) + }); + let _ = runtime.block_on(local); + } +} \ No newline at end of file diff --git a/tests/test_io_service.rs b/tests/test_io_service.rs index 9f02fbd..770686c 100644 --- a/tests/test_io_service.rs +++ b/tests/test_io_service.rs @@ -2,6 +2,7 @@ use std::collections::{HashMap, HashSet}; use std::net::{IpAddr, SocketAddr}; use std::sync::Arc; use std::sync::mpsc::Sender; + use std::time::Duration; use bincode::{Decode, Encode}; @@ -29,6 +30,7 @@ use scupt_net::message_sender_async::SenderAsync; use scupt_net::message_sender_sync::SenderSync; use scupt_net::notifier::Notifier; use scupt_net::opt_send::OptSend; +use scupt_net::task::spawn_local_task; #[derive( Clone, @@ -85,6 +87,7 @@ fn test_service_async( num_message_receiver, testing: false, sync_service: false, + port_debug: Some(3001), }; let s = IOService::::new_async_service(k.clone(), name, opt, Notifier::new())?; services.push(Arc::new(s)); @@ -104,9 +107,12 @@ fn test_service_async( let thd_start_serve = std::thread::spawn(move || { let runtime = Builder::new_current_thread().enable_all().build().unwrap(); let ls = LocalSet::new(); - ls.spawn_local(async move { + let f = async move { let r = serve_async(sink, addr).await; assert!(r.is_ok()); + }; + ls.spawn_local(async move { + spawn_local_task(Notifier::new(), "", f) }); runtime.block_on(ls); trace!("{} serve thread exit", node_id) @@ -138,9 +144,12 @@ fn test_service_async( let thd_recv = std::thread::spawn(move || { let runtime = Builder::new_current_thread().enable_all().build().unwrap(); let ls = LocalSet::new(); - ls.spawn_local(async move { + let f = async move { let r = receive_async(node_id, r, stop_sender).await; assert!(r.is_ok()); + }; + ls.spawn_local(async { + let _ = spawn_local_task(Notifier::new(), "", f); }); runtime.block_on(ls); trace!("{} {} receive thread exit", node_id, i); @@ -164,7 +173,7 @@ fn test_service_async( let thd = std::thread::spawn(move || { let runtime = Builder::new_current_thread().enable_all().build().unwrap(); let ls = LocalSet::new(); - ls.spawn_local(async move { + let f = async move { let r = connect_async(node_id.clone(), sink.clone(), addrs.clone()).await; trace!("before wait, connect done {}, {:?}", node_id, r); let _ = b.wait().await; @@ -172,6 +181,11 @@ fn test_service_async( assert!(r.is_ok()); let r = send_async(node_id.clone(), 10, sender.clone(), addrs.clone()).await; assert!(r.is_ok()); + }; + ls.spawn_local(async move { + spawn_local_task( + Notifier::new(), "", f, + ) }); runtime.block_on(ls); trace!("{} {} sender thread exit", node_id, i); @@ -186,7 +200,7 @@ fn test_service_async( let stop_thd = std::thread::spawn(|| { let runtime = Builder::new_current_thread().enable_all().build().unwrap(); let ls = LocalSet::new(); - ls.spawn_local(async move { + let f = async move { let mut node_id_set = ids; let mut receiver = stop_receiver; while !node_id_set.is_empty() { @@ -206,6 +220,10 @@ fn test_service_async( assert!(r.is_ok()); } trace!("stop thread exit"); + }; + + ls.spawn_local(async move { + spawn_local_task(Notifier::new(), "wait stop", f) }); runtime.block_on(ls); }); @@ -243,6 +261,7 @@ fn test_service_sync( num_message_receiver, testing: false, sync_service: true, + port_debug: None, }; let s = IOService::::new_sync_service(k.clone(), name, opt, Notifier::new())?; services.push(Arc::new(s)); @@ -357,6 +376,7 @@ fn test_service_sync( Ok(()) } + async fn serve_async( sink: Arc>, addr: SocketAddr,