From 636c61ffdea726457dddf9ae8d9786a3d0f81130 Mon Sep 17 00:00:00 2001 From: Daiki Ueno Date: Sun, 26 Nov 2023 11:13:07 +0900 Subject: [PATCH] event-broker: Turn listening task into a separate async function Signed-off-by: Daiki Ueno --- event-broker/src/main.rs | 57 ++++++++++++++++++++++------------------ 1 file changed, 32 insertions(+), 25 deletions(-) diff --git a/event-broker/src/main.rs b/event-broker/src/main.rs index 4dec61d..5b09c2b 100644 --- a/event-broker/src/main.rs +++ b/event-broker/src/main.rs @@ -114,41 +114,41 @@ impl Publisher { Ok(StdUnixListener::bind(&self.socket_path)?) } - async fn publish(&self, receiver: Receiver) -> Result<()> { + async fn listen(&self) -> Result<()> { let std_listener = self.get_std_listener()?; std_listener.set_nonblocking(true)?; let listener = UnixListener::from_std(std_listener)?; - let subscriptions = self.subscriptions.clone(); - tokio::spawn(async move { - while let Ok((stream, _sock_addr)) = listener.accept().await { - let subscriber_fd = stream.as_raw_fd(); + while let Ok((stream, _sock_addr)) = listener.accept().await { + let subscriber_fd = stream.as_raw_fd(); - debug!(socket = subscriber_fd, "subscriber connected"); + debug!(socket = subscriber_fd, "subscriber connected"); - let (de, ser) = stream.into_split(); + let (de, ser) = stream.into_split(); - let ser = FramedWrite::new(ser, LengthDelimitedCodec::new()); - let de = FramedRead::new(de, LengthDelimitedCodec::new()); + let ser = FramedWrite::new(ser, LengthDelimitedCodec::new()); + let de = FramedRead::new(de, LengthDelimitedCodec::new()); - let ser = SymmetricallyFramed::new(ser, SymmetricalCbor::::default()); - let mut de = - SymmetricallyFramed::new(de, SymmetricalCbor::>::default()); + let ser = SymmetricallyFramed::new(ser, SymmetricalCbor::::default()); + let mut de = + SymmetricallyFramed::new(de, SymmetricalCbor::>::default()); - // Populate the scopes - if let Some(scopes) = de.try_next().await.unwrap() { - subscriptions.write().unwrap().insert( - subscriber_fd, - Subscription { - stream: ser, - scopes, - errored: Default::default(), - }, - ); - } + // Populate the scopes + if let Some(scopes) = de.try_next().await.unwrap() { + self.subscriptions.write().unwrap().insert( + subscriber_fd, + Subscription { + stream: ser, + scopes, + errored: Default::default(), + }, + ); } - }); + } + Ok(()) + } + async fn publish(&self, receiver: Receiver) -> Result<()> { let mut stream = ReceiverStream::new(receiver); while let Some(group) = stream.next().await { let mut subscriptions = self.subscriptions.write().unwrap(); @@ -171,6 +171,9 @@ impl Publisher { // Remove errored subscriptions subscriptions.retain(|_, v| !v.errored); + if subscriptions.is_empty() { + break; + } } Ok(()) @@ -190,5 +193,9 @@ async fn main() -> anyhow::Result<()> { let publisher = Publisher::new(&config.socket_path); let (tx, rx) = mpsc::channel::(10); - try_join!(reader.read(tx), publisher.publish(rx),).map(|_| ()) + try_join!( + reader.read(tx), + publisher.listen(), + publisher.publish(rx), + ).map(|_| ()) }