Skip to content

Commit

Permalink
scupt-net
Browse files Browse the repository at this point in the history
ybbh committed Nov 26, 2023
1 parent de7e5d9 commit 316173f
Showing 6 changed files with 24 additions and 21 deletions.
9 changes: 5 additions & 4 deletions src/event_sink_impl.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::net::SocketAddr;
use std::sync::Arc;

use async_trait::async_trait;
use scupt_util::error_type::ET;
@@ -12,7 +13,7 @@ use tracing::{error, trace};
use crate::endpoint::Endpoint;
use crate::event::{EventResult, NetEvent, ResultReceiver};
use crate::event_sink::{ESConnectOpt, ESServeOpt, ESStopOpt, EventSink};
use crate::message_receiver::ReceiverOneshot;
use crate::message_receiver::ReceiverResp;
use crate::message_sender::{Sender, SenderRR};
use crate::message_receiver_endpoint::MessageReceiverEndpoint;

@@ -91,7 +92,7 @@ impl<M: MsgTrait> EventSenderImpl<M> {
msg: Message<M>,
no_wait: bool,
read_resp: bool
) -> Res<Option<Box<dyn ReceiverOneshot<M>>>> {
) -> Res<Option<Arc<dyn ReceiverResp<M>>>> {
trace!("channel name {} send message {:?}", self.name, msg);
if no_wait && !read_resp {
let event = NetEvent::NetSend(msg, None);
@@ -105,7 +106,7 @@ impl<M: MsgTrait> EventSenderImpl<M> {
let opt_ep = self.event_result_endpoint(event_result)?;
match opt_ep {
Some(ep) => {
Ok(Some(Box::new(MessageReceiverEndpoint::new(ep))))
Ok(Some(Arc::new(MessageReceiverEndpoint::new(ep))))
}
None => {
Ok(None)
@@ -226,7 +227,7 @@ impl<
impl<
M: MsgTrait + 'static,
> SenderRR<M> for EventSenderImpl<M> {
async fn send(&self, message: Message<M>, _opt: OptSend) -> Res<Box<dyn ReceiverOneshot<M>>> {
async fn send(&self, message: Message<M>, _opt: OptSend) -> Res<Arc<dyn ReceiverResp<M>>> {
let opt = self.async_send(message, _opt.is_enable_no_wait(), true).await?;
match opt {
Some(recv) => { Ok(recv) }
9 changes: 5 additions & 4 deletions src/message_receiver.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::sync::Arc;
use async_trait::async_trait;
use scupt_util::message::{Message, MsgTrait};
use scupt_util::res::Res;
use crate::message_sender::SenderOneshot;
use crate::message_sender::SenderResp;

#[async_trait]
pub trait Receiver<
@@ -12,16 +13,16 @@ pub trait Receiver<


#[async_trait]
pub trait ReceiverOneshot<
pub trait ReceiverResp<
M: MsgTrait + 'static,
>: Sync + Send {
async fn receive(self) -> Res<Message<M>>;
async fn receive(&self) -> Res<Message<M>>;
}


#[async_trait]
pub trait ReceiverRR<
M: MsgTrait + 'static,
>: Sync + Send {
async fn receive(&self) -> Res<(Message<M>, Box<dyn SenderOneshot<M>>)>;
async fn receive(&self) -> Res<(Message<M>, Arc<dyn SenderResp<M>>)>;
}
6 changes: 3 additions & 3 deletions src/message_receiver_channel.rs
Original file line number Diff line number Diff line change
@@ -9,7 +9,7 @@ use crate::endpoint::Endpoint;

use crate::message_channel::MessageChReceiver;
use crate::message_receiver::{Receiver, ReceiverRR};
use crate::message_sender::SenderOneshot;
use crate::message_sender::SenderResp;
use crate::message_sender_endpoint::MessageSenderEndpoint;

pub struct MessageReceiverChannel<M: MsgTrait + 'static> {
@@ -49,13 +49,13 @@ impl<M: MsgTrait + 'static> Receiver<M> for MessageReceiverChannel<M> {

#[async_trait]
impl<M: MsgTrait + 'static> ReceiverRR<M> for MessageReceiverChannel<M> {
async fn receive(&self) -> Res<(Message<M>, Box<dyn SenderOneshot<M>>)> {
async fn receive(&self) -> Res<(Message<M>, Arc<dyn SenderResp<M>>)> {
let mut guard = self.receiver.lock().await;
let opt = guard.recv().await;
match opt {
Some((message, ep)) => {
Ok((message,
Box::new(MessageSenderEndpoint::new(ep)))) }
Arc::new(MessageSenderEndpoint::new(ep)))) }
None => {
// the sender is closed
Err(ET::EOF)
6 changes: 3 additions & 3 deletions src/message_receiver_endpoint.rs
Original file line number Diff line number Diff line change
@@ -3,7 +3,7 @@ use async_trait::async_trait;
use scupt_util::message::{Message, MsgTrait};
use scupt_util::res::Res;
use crate::endpoint::Endpoint;
use crate::message_receiver::ReceiverOneshot;
use crate::message_receiver::ReceiverResp;

pub struct MessageReceiverEndpoint<M:MsgTrait + 'static> {
ep : Endpoint,
@@ -20,8 +20,8 @@ impl <M:MsgTrait + 'static> MessageReceiverEndpoint<M> {
}

#[async_trait]
impl <M:MsgTrait + 'static> ReceiverOneshot<M> for MessageReceiverEndpoint<M> {
async fn receive(self) -> Res<Message<M>> {
impl <M:MsgTrait + 'static> ReceiverResp<M> for MessageReceiverEndpoint<M> {
async fn receive(&self) -> Res<Message<M>> {
self.ep.recv().await
}
}
9 changes: 5 additions & 4 deletions src/message_sender.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::sync::Arc;
use async_trait::async_trait;
use scupt_util::message::{Message, MsgTrait};
use scupt_util::res::Res;
use crate::message_receiver::ReceiverOneshot;
use crate::message_receiver::ReceiverResp;
use crate::opt_send::OptSend;

#[async_trait]
@@ -15,12 +16,12 @@ pub trait Sender<
pub trait SenderRR<
M: MsgTrait + 'static,
>: Sync + Send {
async fn send(&self, message: Message<M>, opt: OptSend) -> Res<Box<dyn ReceiverOneshot<M>>>;
async fn send(&self, message: Message<M>, opt: OptSend) -> Res<Arc<dyn ReceiverResp<M>>>;
}

#[async_trait]
pub trait SenderOneshot<
pub trait SenderResp<
M: MsgTrait + 'static,
>: Sync + Send {
async fn send(self, message: Message<M>) -> Res<()>;
async fn send(&self, message: Message<M>) -> Res<()>;
}
6 changes: 3 additions & 3 deletions src/message_sender_endpoint.rs
Original file line number Diff line number Diff line change
@@ -4,7 +4,7 @@ use async_trait::async_trait;
use scupt_util::message::{Message, MsgTrait};
use scupt_util::res::Res;
use crate::endpoint::Endpoint;
use crate::message_sender::SenderOneshot;
use crate::message_sender::SenderResp;
pub struct MessageSenderEndpoint<M:MsgTrait + 'static> {
ep : Endpoint,
_pd : PhantomData<M>
@@ -20,8 +20,8 @@ impl <M:MsgTrait + 'static> MessageSenderEndpoint<M> {
}

#[async_trait]
impl <M:MsgTrait + 'static> SenderOneshot<M> for MessageSenderEndpoint<M> {
async fn send(self, m: Message<M>) -> Res<()> {
impl <M:MsgTrait + 'static> SenderResp<M> for MessageSenderEndpoint<M> {
async fn send(&self, m: Message<M>) -> Res<()> {
self.ep.send(m).await?;
Ok(())
}

0 comments on commit 316173f

Please sign in to comment.