Skip to content

Commit

Permalink
scupt-net
Browse files Browse the repository at this point in the history
ybbh committed Mar 28, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent 2bf0f20 commit 01341e5
Showing 18 changed files with 393 additions and 47 deletions.
8 changes: 8 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -30,4 +30,12 @@ async-backtrace = "0.2.6"
lazy_static = "1.4.0"
scc = "2.0.18"
uuid = { version = "1.6.1", features = ["v4"] }
scopeguard = { version = "1.2.0" }


hyper = { version = "1", features = ["full"] }
hyper-util = { version = "0.1.3", features = ["tokio", "server"] }

http-body-util = "0.1"

http = "1"
17 changes: 17 additions & 0 deletions src/client.rs
Original file line number Diff line number Diff line change
@@ -17,6 +17,7 @@ use crate::es_option::ESConnectOption;
use crate::handle_event::HandleEvent;
use crate::node::Node;
use crate::notifier::Notifier;
use crate::task_trace;

#[derive(Clone)]
pub struct Client<M: MsgTrait + 'static> {
@@ -44,19 +45,27 @@ impl<M: MsgTrait + 'static> Client<M> {
self.inner.run(local);
}

#[async_backtrace::framed]
pub async fn is_connected(&self) -> bool {
let _t = task_trace!();
self.inner.is_connected().await
}

#[async_backtrace::framed]
pub async fn connect(&self, opt: OptClientConnect) -> Res<()> {
let _t = task_trace!();
self.inner.connect(opt).await
}

#[async_backtrace::framed]
pub async fn send(&self, message: Message<M>) -> Res<()> {
let _t = task_trace!();
self.inner.send(message).await
}

#[async_backtrace::framed]
pub async fn recv(&self) -> Res<Message<M>> {
let _t = task_trace!();
self.inner.recv().await
}

@@ -114,12 +123,16 @@ impl<M: MsgTrait + 'static> ClientInner<M> {
self.node.run_local(local);
}

#[async_backtrace::framed]
pub async fn is_connected(&self) -> bool {
let _t = task_trace!();
let g = self.opt_endpoint.lock().await;
g.is_some()
}

#[async_backtrace::framed]
pub async fn connect(&self, opt: OptClientConnect) -> Res<()> {
let _t = task_trace!();
let mut opt_ep = None;
let mut n = opt.retry_max;
while opt.retry_max == 0 || n > 0 {
@@ -147,7 +160,9 @@ impl<M: MsgTrait + 'static> ClientInner<M> {
Ok(())
}

#[async_backtrace::framed]
pub async fn send(&self, message: Message<M>) -> Res<()> {
let _t = task_trace!();
let guard = self.opt_endpoint.lock().await;
if let Some(e) = &(*guard) {
e.send(message).await?;
@@ -157,7 +172,9 @@ impl<M: MsgTrait + 'static> ClientInner<M> {
}
}

#[async_backtrace::framed]
pub async fn recv(&self) -> Res<Message<M>> {
let _t = task_trace!();
let guard = self.opt_endpoint.lock().await;
if let Some(e) = &(*guard) {
let m = e.recv().await?;
104 changes: 104 additions & 0 deletions src/debug.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Example hyper http server
// https://github.com/hyperium/hyper/blob/master/examples/echo.rs

use std::net::SocketAddr;

use bytes::Bytes;
use http::{Method, StatusCode};
use http_body_util::Full;
use hyper::{Request, Response};
use hyper::body::Incoming;
use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper_util::rt::TokioIo;
use lazy_static::lazy_static;
use scc::{HashIndex, HashSet};
use scupt_util::error_type::ET;
use scupt_util::res::Res;
use tokio::net::TcpListener;

use crate::dump_task_trace;

type HandleURL = fn(String) -> Res<String>;

lazy_static!(
static ref HANDLE_URL : HashIndex<String, HandleURL> = HashIndex::new();
static ref SERVER: HashSet<u16> = HashSet::new();
);


pub fn register_debug_url(url: String, h: HandleURL) {
let _ = HANDLE_URL.insert(url, h);
}

async fn handle_request(req: Request<Incoming>) -> Result<Response<Full<Bytes>>, hyper::Error> {
let mut response = Response::new(Full::default());
match req.method() {
&Method::GET => {
let path = req.uri().path();
match path {
"/task" => {
let dump = dump_task_trace!();
*response.body_mut() = Full::from(dump);
}
_ => {
let opt = HANDLE_URL.get(&path.to_string());
match opt {
Some(e) => {
let h = e.get().clone();
let s = h(path.to_string()).unwrap_or_else(|e| { e.to_string() });
*response.body_mut() = Full::from(s);
}
None => {
*response.status_mut() = StatusCode::NOT_FOUND;
}
}
}
}
}
_ => {
*response.status_mut() = StatusCode::NOT_FOUND;
}
}
Ok(response)
}

pub async fn debug_server_serve(addr: SocketAddr) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let port = addr.port();
let r = SERVER.insert(port);
if r.is_err() {
return Err(Box::new(ET::ExistingSuchElement));
}

// Bind to the port and listen for incoming TCP connections
let listener = TcpListener::bind(addr).await?;
println!("Listening on http://{}", addr);
loop {
// When an incoming TCP connection is received grab a TCP stream for
// client<->server communication.
//
// Note, this is a .await point, this loop will loop forever but is not a busy loop. The
// .await point allows the Tokio runtime to pull the task off of the thread until the task
// has work to do. In this case, a connection arrives on the port we are listening on and
// the task is woken up, at which point the task is then put back on a thread, and is
// driven forward by the runtime, eventually yielding a TCP stream.
let (tcp, _) = listener.accept().await?;
// Use an adapter to access something implementing `tokio::io` traits as if they implement
// `hyper::rt` IO traits.
let io = TokioIo::new(tcp);

// Spin up a new task in Tokio so we can continue to listen for new TCP connection on the
// current task without waiting for the processing of the HTTP1 connection we just received
// to finish
tokio::task::spawn(async move {
// Handle the connection from the client using HTTP1 and pass any
// HTTP requests received on that connection to the `hello` function
if let Err(err) = http1::Builder::new()
.serve_connection(io, service_fn(handle_request))
.await
{
println!("Error serving connection: {:?}", err);
}
});
}
}
14 changes: 14 additions & 0 deletions src/endpoint_async_impl.rs
Original file line number Diff line number Diff line change
@@ -9,6 +9,7 @@ use tokio::net::TcpStream;
use crate::endpoint_async::EndpointAsync;
use crate::endpoint_inner::_Endpoint;
use crate::opt_ep::OptEP;
use crate::task_trace;

#[derive(Clone)]
pub struct EndpointAsyncImpl {
@@ -22,15 +23,22 @@ impl<M: MsgTrait + 'static> EndpointAsync<M> for EndpointAsyncImpl {
self._remote_address()
}

#[async_backtrace::framed]
async fn send(&self, m: Message<M>) -> Res<()> {
let _t = task_trace!();
self._send(m).await
}

#[async_backtrace::framed]
async fn recv(&self) -> Res<Message<M>> {
let _t = task_trace!();

self._recv().await
}

#[async_backtrace::framed]
async fn close(&self) -> Res<()> {
let _t = task_trace!();
self._close().await
}
}
@@ -42,19 +50,25 @@ impl EndpointAsyncImpl {
}
}

#[async_backtrace::framed]
async fn _send<M: MsgTrait + 'static>(&self, m: Message<M>) -> Res<()> {
let _t = task_trace!();
self._ep.send(m).await
}

#[async_backtrace::framed]
async fn _recv<M: MsgTrait + 'static>(&self) -> Res<Message<M>> {
let _t = task_trace!();
self._ep.recv::<M>().await
}

fn _remote_address(&self) -> SocketAddr {
self._ep.remote_address()
}

#[async_backtrace::framed]
async fn _close(&self) -> Res<()> {
let _t = task_trace!();
self._ep.close().await
}
}
9 changes: 8 additions & 1 deletion src/endpoint_inner.rs
Original file line number Diff line number Diff line change
@@ -18,8 +18,8 @@ use tokio::sync::Mutex;
use tokio_util::codec::Framed;
use tracing::{Instrument, trace_span};

use crate::{parse_dtm_message, task_trace};
use crate::framed_codec::FramedCodec;
use crate::parse_dtm_message;

pub struct _Endpoint {
sender: Mutex<SplitSink<Framed<TcpStream, FramedCodec>, BytesMut>>,
@@ -52,7 +52,9 @@ impl _Endpoint {
}

// send message
#[async_backtrace::framed]
pub async fn send<M: MsgTrait + 'static>(&self, m: Message<M>) -> Res<()> {
let _t = task_trace!();
if self.enable_dtm_test {
return Ok(());
}
@@ -67,7 +69,10 @@ impl _Endpoint {
}

// receive a message
#[async_backtrace::framed]
pub async fn recv<M: MsgTrait + 'static>(&self) -> Res<Message<M>> {
let _t = task_trace!();

let mut stream = self.receiver.lock().instrument(trace_span!("lock")).await;
let opt = stream.next().await;
let r = match opt {
@@ -91,7 +96,9 @@ impl _Endpoint {
}
}

#[async_backtrace::framed]
pub async fn close(&self) -> Res<()> {
let _t = task_trace!();
let r1 = {
let mut sink = self.sender.lock().await;
sink.close().await
5 changes: 5 additions & 0 deletions src/endpoint_sync_impl.rs
Original file line number Diff line number Diff line change
@@ -15,6 +15,7 @@ use crate::endpoint_async::EndpointAsync;
use crate::endpoint_sync::EndpointSync;
use crate::notifier::Notifier;
use crate::task::spawn_local_task;
use crate::task_trace;

pub struct EndpointSyncImpl<M: MsgTrait + 'static> {
endpoint: Arc<dyn EndpointAsync<M>>,
@@ -69,7 +70,9 @@ impl<M: MsgTrait + 'static> EndpointSyncImpl<M> {
}
}

#[async_backtrace::framed]
async fn handle_send(&self) -> Res<()> {
let _t = task_trace!();
let mut opt = self.s_receiver.lock().await;
let mut opt_receiver = None;
std::mem::swap(&mut opt_receiver, &mut opt);
@@ -86,7 +89,9 @@ impl<M: MsgTrait + 'static> EndpointSyncImpl<M> {
Ok(())
}

#[async_backtrace::framed]
async fn handle_receive(&self) -> Res<()> {
let _t = task_trace!();
let mut opt = self.r_invoke_receiver.lock().await;
let mut opt_receiver = None;
std::mem::swap(&mut opt_receiver, &mut opt);
3 changes: 3 additions & 0 deletions src/event_channel.rs
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@ use tokio::sync::mpsc;
use tracing::trace;

use crate::event::NetEvent;
use crate::task_trace;

type SyncMutex<T> = std::sync::Mutex<T>;

@@ -34,7 +35,9 @@ impl<M: MsgTrait + 'static> EventReceiver<M> {
}
}

#[async_backtrace::framed]
pub async fn recv(&mut self) -> Res<NetEvent<M>> {
let _t = task_trace!();
let opt = self.inner.recv().await;
trace!("{} receive event", self._name);
match opt {
Loading

0 comments on commit 01341e5

Please sign in to comment.