Skip to content

Commit

Permalink
scupt-net
Browse files Browse the repository at this point in the history
  • Loading branch information
ybbh committed Jan 8, 2024
1 parent 3f51c11 commit ef7d5dc
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 14 deletions.
11 changes: 7 additions & 4 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ struct Handler {
}

impl <M:MsgTrait +'static> Client<M> {
pub fn new(node_id:NID, name:String, addr:String, notifier:Notifier) -> Res<Self> {
pub fn new(node_id:NID, name:String, addr:String, opt_client: OptClient, notifier:Notifier) -> Res<Self> {
Ok(Self {
inner:Arc::new(ClientInner::new(node_id, name, addr, notifier)?)
inner:Arc::new(ClientInner::new(node_id, name, addr, opt_client, notifier)?)
})
}

Expand Down Expand Up @@ -76,6 +76,9 @@ impl Handler {
}
}

pub struct OptClient {
pub enable_testing:bool,
}
pub struct OptClientConnect {
pub retry_max:u64,
pub retry_wait_ms:u64
Expand All @@ -97,11 +100,11 @@ impl Default for OptClientConnect {
}

impl <M:MsgTrait +'static> ClientInner<M> {
pub fn new(node_id:NID, name:String, addr:String, notifier: Notifier) -> Res<Self> {
pub fn new(node_id:NID, name:String, addr:String, opt:OptClient, notifier: Notifier) -> Res<Self> {
let r = Self {
nid: node_id.clone(),
addr,
node: Node::new(node_id, name, Handler::new(), notifier)?,
node: Node::new(node_id, name, Handler::new(), opt.enable_testing, notifier)?,
opt_endpoint: Default::default(),
};
Ok(r)
Expand Down
4 changes: 3 additions & 1 deletion src/io_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ pub struct IOService<M: MsgTrait> {


pub struct IOServiceOpt {
pub num_message_receiver: u32
pub num_message_receiver: u32,
pub testing:bool
}

impl<M: MsgTrait> IOService<M> {
Expand All @@ -45,6 +46,7 @@ impl<M: MsgTrait> IOService<M> {
node_id.clone(),
name,
handler.clone(),
opt.testing,
stop_notify)?;

let s = Self {
Expand Down
33 changes: 26 additions & 7 deletions src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,10 @@ Node<
node_id: NID,
name: String,
handle: H,
testing:bool,
stop_notify: Notifier,
) -> Res<Self> {
let node_context = NodeContext::new(node_id.clone(), name, stop_notify);


let node_context = NodeContext::new(node_id.clone(), name, testing, stop_notify);
let node = Self {
_node_id: node_id,
handle: Arc::new(handle),
Expand Down Expand Up @@ -115,6 +114,7 @@ Node<
trace!("run local once {}", name);
let h = self.handle.clone();
let n = self.node_context.clone();
let enable_testing = n.enable_testing();
let c = self.node_context.default_event_channel().receiver().unwrap();
trace!("main loop {}", n.name());
let task_name = format!("{}_main_loop", n.name());
Expand All @@ -125,6 +125,7 @@ Node<
n,
c,
h,
enable_testing
).instrument(trace_span!("main loop")).await;
};

Expand All @@ -139,6 +140,7 @@ Node<
node: Arc<NodeContext<M>>,
channel: EventReceiver<M>,
handle: Arc<H>,
enable_testing:bool
) {
trace!("node {}, run main loop, {}", name, node.name());
let mut receiver = channel;
Expand All @@ -150,7 +152,7 @@ Node<
Ok(event) => {
let h = handle.clone();
let _r = Self::handle_event(
node.clone(), event, h)
node.clone(), event, h, enable_testing)
.instrument(trace_span!("handle_event")).await;
match _r {
Ok(_) => {}
Expand Down Expand Up @@ -179,6 +181,7 @@ Node<
node: Arc<NodeContext<M>>,
event: NetEvent<M>,
handle: Arc<H>,
enable_testing:bool
) -> Res<()> {
match event {
NetEvent::NetConnect {
Expand All @@ -196,6 +199,7 @@ Node<
handle,
opt_sender,
return_endpoint,
enable_testing
);
trace!("node {}: handle event:connect {} done", id, node_id);
}
Expand All @@ -207,6 +211,7 @@ Node<
address,
handle,
opt_s,
enable_testing
);
trace!("node {}: handle event: listen {} done", id, address.to_string());
}
Expand Down Expand Up @@ -256,6 +261,7 @@ Node<
node.clone(),
ch.receiver().unwrap(),
handle.clone(),
enable_testing
).await?;
}
}
Expand Down Expand Up @@ -284,6 +290,7 @@ Node<
node: Arc<NodeContext<M>>,
channel: EventReceiver<M>,
handle: Arc<H>,
enable_testing:bool
) -> Res<()> {
let notify = node.stop_notify();
let task_name = format!("main loop {}", name);
Expand All @@ -293,6 +300,7 @@ Node<
node,
channel,
handle,
enable_testing
).await;
};
spawn_local_task(notify, task_name.as_str(), main_loop)?;
Expand All @@ -307,6 +315,7 @@ Node<
handle: Arc<H>,
opt_sender: Option<ResultSender>,
return_endpoint: bool,
enable_testing:bool
) {
let node_name = node.name().clone();
let notify = node.stop_notify();
Expand All @@ -318,6 +327,7 @@ Node<
node, node_id,
address, handle, opt_sender,
return_endpoint,
enable_testing
).await;
trace!("on connected done {}", task_name2);
};
Expand All @@ -335,6 +345,7 @@ Node<
handle: Arc<H>,
opt_sender: Option<ResultSender>,
return_endpoint: bool,
enable_testing:bool
) {
trace!("{} task handle connect to {} {}", node.name(), node_id, address.to_string());
let r_connect = TcpStream::connect(address).await;
Expand All @@ -346,7 +357,8 @@ Node<
let r_addr = s.peer_addr();
match res_io(r_addr) {
Ok(addr) => {
let ep = Endpoint::new(s, addr, OptEP::default());
let opt = OptEP::new().enable_dtm_test(enable_testing);
let ep = Endpoint::new(s, addr, opt);
{
let r = node.add_endpoint(node_id, ep.clone()).await;
match r {
Expand Down Expand Up @@ -391,6 +403,7 @@ Node<
address: SocketAddr,
handle: Arc<H>,
opt_sender: Option<ResultSender>,
enable_testing:bool
) -> Res<()> {
let node_id = node.node_id();
let h = handle.clone();
Expand All @@ -413,7 +426,9 @@ Node<
match Self::accept_new_connection(
node,
listener,
h.clone()).await {
h.clone(),
enable_testing
).await {
Ok(()) => {}
Err(e) => {
h.on_error(e.clone()).await;
Expand All @@ -434,12 +449,13 @@ Node<
handle: Arc<H>,
socket: TcpStream,
addr: SocketAddr,
enable_testing:bool
) -> Res<()> {
trace!("accept new {}", addr.to_string());
let ep = Endpoint::new(
socket,
addr,
OptEP::default().enable_dtm_test(true)
OptEP::default().enable_dtm_test(enable_testing)
);
let on_accepted = {
let h = handle.clone();
Expand All @@ -466,6 +482,7 @@ Node<
n,
listener,
h.clone(),
enable_testing
).await {
Err(e) => {
match e {
Expand Down Expand Up @@ -498,6 +515,7 @@ Node<
node: Arc<NodeContext<M>>,
listener: TcpListener,
handle: Arc<H>,
enable_testing:bool
) -> Res<()> {
let r = listener.accept().await;
let (socket, addr) = res_io(r)?;
Expand All @@ -507,6 +525,7 @@ Node<
handle,
socket,
addr,
enable_testing
).await
}

Expand Down
8 changes: 7 additions & 1 deletion src/node_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,12 @@ pub struct NodeContext<M: MsgTrait + 'static> {
mutex_ctx: Mutex<_NodeContext>,
channel_set: Arc<SyncMutex<EventChannelMap<M>>>,
default_channel: Arc<EventChannel<M>>,
enable_testing:bool
}


impl<M: MsgTrait + 'static> NodeContext<M> {
pub fn new(node_id: NID, name: String, stop_notify: Notifier) -> Self {
pub fn new(node_id: NID, name: String, testing:bool, stop_notify: Notifier) -> Self {
let mut map = HashMap::new();
let channel_name = format!("{}_default", name);
let default_channel = Arc::new(Self::create_event_channel(channel_name));
Expand All @@ -52,6 +53,7 @@ impl<M: MsgTrait + 'static> NodeContext<M> {
mutex_ctx: Mutex::new(_NodeContext::new(name)),
channel_set: Arc::new(SyncMutex::new(map)),
default_channel,
enable_testing: testing,
}
}

Expand Down Expand Up @@ -114,6 +116,10 @@ impl<M: MsgTrait + 'static> NodeContext<M> {
self.default_channel.clone()
}

pub fn enable_testing(&self) -> bool {
self.enable_testing
}

pub async fn stop(&self) {
let mut map = self.channel_set.lock().unwrap();
for (_, v) in map.iter() {
Expand Down
3 changes: 2 additions & 1 deletion tests/test_io_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ fn test_service(
for (k, _) in &id2address {
let name = format!("service_{}", k);
let opt = IOServiceOpt {
num_message_receiver
num_message_receiver,
testing: false,
};
let s = IOService::<TestMsg>::new(k.clone(), name, opt, Notifier::new())?;
services.push(Arc::new(s));
Expand Down

0 comments on commit ef7d5dc

Please sign in to comment.