From 6cddba9a8a31398655fa70f9287a7faa88cd98f0 Mon Sep 17 00:00:00 2001 From: Samuel Sarle Date: Mon, 18 Nov 2024 16:21:04 +0200 Subject: [PATCH 01/17] Move transport layer port connections from `SystemBuilding` to `SystemExecution` --- lib/protoflow-blocks/src/system.rs | 20 +++++++++++--- lib/protoflow-core/src/system.rs | 41 +++++++++++++++++++++++------ lib/protoflow-core/src/transport.rs | 9 ++++++- 3 files changed, 57 insertions(+), 13 deletions(-) diff --git a/lib/protoflow-blocks/src/system.rs b/lib/protoflow-blocks/src/system.rs index 35de5f8c..4cb789a9 100644 --- a/lib/protoflow-blocks/src/system.rs +++ b/lib/protoflow-blocks/src/system.rs @@ -3,15 +3,15 @@ #![allow(dead_code)] use crate::{ - prelude::{fmt, Arc, Box, FromStr, Rc, String, ToString}, + prelude::{fmt, Arc, Box, FromStr, Rc, String, ToString, Vec}, types::{DelayType, Encoding}, AllBlocks, Buffer, Const, CoreBlocks, Count, Decode, DecodeJson, Delay, Drop, Encode, EncodeHex, EncodeJson, FlowBlocks, HashBlocks, IoBlocks, MathBlocks, Random, ReadDir, ReadEnv, ReadFile, ReadStdin, SysBlocks, TextBlocks, WriteFile, WriteStderr, WriteStdout, }; use protoflow_core::{ - Block, BlockID, BlockResult, BoxedBlockType, InputPort, Message, OutputPort, PortID, - PortResult, Process, SystemBuilding, SystemExecution, + Block, BlockID, BlockResult, BoxedBlockType, InputPort, InputPortID, Message, OutputPort, + OutputPortID, PortID, PortResult, Process, SystemBuilding, SystemExecution, }; #[cfg(feature = "hash")] @@ -92,8 +92,12 @@ impl fmt::Debug for System { } impl SystemExecution for System { + fn prepare(&self) -> BlockResult<()> { + SystemExecution::prepare(&self.0) + } + fn execute(self) -> BlockResult> { - self.0.execute() + SystemExecution::execute(self.0) } } @@ -118,6 +122,14 @@ impl SystemBuilding for System { fn connect(&mut self, source: &OutputPort, target: &InputPort) -> bool { self.0.connect(source, target) } + + fn connections(&self) -> Vec<(OutputPortID, InputPortID)> { + self.0.connections() + } + + fn validate(self) -> BlockResult<()> { + self.0.validate() + } } impl AllBlocks for System {} diff --git a/lib/protoflow-core/src/system.rs b/lib/protoflow-core/src/system.rs index b8917b1d..43c19afe 100644 --- a/lib/protoflow-core/src/system.rs +++ b/lib/protoflow-core/src/system.rs @@ -1,7 +1,7 @@ // This is free and unencumbered software released into the public domain. use crate::{ - prelude::{fmt, Arc, Box, Bytes, PhantomData, Rc, String, VecDeque}, + prelude::{fmt, Arc, BTreeSet, Box, Bytes, PhantomData, Rc, String, Vec, VecDeque}, runtimes::StdRuntime, transports::MpscTransport, types::Any, @@ -57,9 +57,18 @@ pub trait SystemBuilding { /// /// Both ports must be of the same message type. fn connect(&mut self, source: &OutputPort, target: &InputPort) -> bool; + + fn connections(&self) -> Vec<(OutputPortID, InputPortID)>; + + /// Validates system for execution. + fn validate(self) -> BlockResult<()>; } pub trait SystemExecution { + /// Prepare: + /// - Calls the transport layer to connect all the output->input ports. + /// The connections are defined by `SystemBuilding.connect()`. + fn prepare(&self) -> BlockResult<()>; /// Executes the system, returning the system process. fn execute(self) -> BlockResult>; } @@ -71,6 +80,8 @@ pub struct System { /// The registered blocks in the system. pub(crate) blocks: VecDeque, + pub(crate) connections: BTreeSet<(OutputPortID, InputPortID)>, + _phantom: PhantomData, } @@ -99,6 +110,7 @@ impl System { Self { runtime: runtime.clone(), blocks: VecDeque::new(), + connections: BTreeSet::default(), _phantom: PhantomData, } } @@ -147,19 +159,18 @@ impl System { self.blocks.get(block_id.into()) } - pub fn connect(&self, source: &OutputPort, target: &InputPort) -> bool { + pub fn connect(&mut self, source: &OutputPort, target: &InputPort) -> bool { self.connect_by_id(PortID::Output(source.id), PortID::Input(target.id)) .unwrap() } #[doc(hidden)] - pub fn connect_by_id(&self, source_id: PortID, target_id: PortID) -> PortResult { - let runtime = self.runtime.as_ref(); - let transport = runtime.transport.as_ref(); - transport.connect( + pub fn connect_by_id(&mut self, source_id: PortID, target_id: PortID) -> PortResult { + let add = self.connections.insert(( OutputPortID(source_id.into()), InputPortID(target_id.into()), - ) + )); + Ok(true) } } @@ -184,10 +195,24 @@ impl SystemBuilding for System { fn connect(&mut self, source: &OutputPort, target: &InputPort) -> bool { System::connect(self, source, target) } + + fn connections(&self) -> Vec<(OutputPortID, InputPortID)> { + self.connections.clone().into_iter().collect() + } + + fn validate(self) -> BlockResult<()> { + Ok(()) // TODO + } } impl SystemExecution for System { + fn prepare(&self) -> BlockResult<()> { + self.runtime.transport.connect_system(self)?; + Ok(()) + } + fn execute(self) -> BlockResult> { - System::execute(self) + SystemExecution::prepare(&self)?; + self.execute() } } diff --git a/lib/protoflow-core/src/transport.rs b/lib/protoflow-core/src/transport.rs index 6fefd615..facf1615 100644 --- a/lib/protoflow-core/src/transport.rs +++ b/lib/protoflow-core/src/transport.rs @@ -1,6 +1,6 @@ // This is free and unencumbered software released into the public domain. -use crate::{prelude::Bytes, InputPortID, OutputPortID, PortID, PortResult, PortState}; +use crate::{prelude::Bytes, InputPortID, OutputPortID, PortID, PortResult, PortState, System}; #[allow(unused)] pub trait Transport: AsTransport + Send + Sync { @@ -30,6 +30,13 @@ pub trait Transport: AsTransport + Send + Sync { fn send(&self, output: OutputPortID, message: Bytes) -> PortResult<()>; fn recv(&self, input: InputPortID) -> PortResult>; fn try_recv(&self, input: InputPortID) -> PortResult>; + + fn connect_system(&self, system: &System) -> PortResult<()> { + system + .connections + .iter() + .try_for_each(|&(output, input)| self.connect(output, input).map(|_| ())) + } } pub trait AsTransport { From 940e7babf95ba868ccb8ca2733a32e4cc5570003 Mon Sep 17 00:00:00 2001 From: Samuel Sarle Date: Mon, 18 Nov 2024 18:12:19 +0200 Subject: [PATCH 02/17] Fix tests --- lib/protoflow-blocks/src/blocks/sys/read_file.rs | 5 +++-- lib/protoflow-blocks/tests/json_roundtrip.rs | 4 ++-- lib/protoflow/tests/mpsc.rs | 6 +++--- lib/protoflow/tests/zst.rs | 12 ++++++------ 4 files changed, 14 insertions(+), 13 deletions(-) diff --git a/lib/protoflow-blocks/src/blocks/sys/read_file.rs b/lib/protoflow-blocks/src/blocks/sys/read_file.rs index 50ec4a42..09490ed7 100644 --- a/lib/protoflow-blocks/src/blocks/sys/read_file.rs +++ b/lib/protoflow-blocks/src/blocks/sys/read_file.rs @@ -91,6 +91,7 @@ impl StdioSystem for ReadFile { #[cfg(test)] mod tests { extern crate std; + use super::ReadFile; use crate::{System, SystemBuilding, SystemExecution}; @@ -122,7 +123,7 @@ mod tests { system.connect(&path, &read_file.path); system.connect(&read_file.output, &output); - let thrd = std::thread::spawn(move || system.execute().and_then(|p| p.join()).unwrap()); + let process = system.execute().unwrap(); path.send(&temp_file.path().to_string_lossy().into()) .unwrap(); @@ -143,6 +144,6 @@ mod tests { "want EOS signal after path port is closed" ); - thrd.join().unwrap() + process.join().unwrap(); } } diff --git a/lib/protoflow-blocks/tests/json_roundtrip.rs b/lib/protoflow-blocks/tests/json_roundtrip.rs index 99e802ab..4353814f 100644 --- a/lib/protoflow-blocks/tests/json_roundtrip.rs +++ b/lib/protoflow-blocks/tests/json_roundtrip.rs @@ -19,11 +19,11 @@ fn json_roundtrip() -> Result<(), ()> { system.connect(&decode.output, &encode.input); system.connect(&encode.output, &output); - let thread = std::thread::spawn(|| system.execute().unwrap().join().unwrap()); + let process = system.execute().unwrap(); let message = output.recv().unwrap().unwrap(); - thread.join().unwrap(); + process.join().unwrap(); assert_eq!(input_bytes, message); diff --git a/lib/protoflow/tests/mpsc.rs b/lib/protoflow/tests/mpsc.rs index bbf43cb4..72419469 100644 --- a/lib/protoflow/tests/mpsc.rs +++ b/lib/protoflow/tests/mpsc.rs @@ -4,13 +4,13 @@ use protoflow::{ blocks::{Const, Drop}, runtimes::StdRuntime, transports::MpscTransport, - Runtime, System, + System, SystemExecution, }; #[test] fn execute_mpsc_transport() -> Result<(), ()> { let transport = MpscTransport::new(); - let mut runtime = StdRuntime::new(transport).unwrap(); + let runtime = StdRuntime::new(transport).unwrap(); let mut system = System::new(&runtime); let constant = system.block(Const { output: system.output(), @@ -18,7 +18,7 @@ fn execute_mpsc_transport() -> Result<(), ()> { }); let blackhole = system.block(Drop::new(system.input())); system.connect(&constant.output, &blackhole.input); - let process = runtime.execute(system).unwrap(); + let process = SystemExecution::execute(system).unwrap(); process.join().unwrap(); Ok(()) } diff --git a/lib/protoflow/tests/zst.rs b/lib/protoflow/tests/zst.rs index cd5629c7..3948f20d 100644 --- a/lib/protoflow/tests/zst.rs +++ b/lib/protoflow/tests/zst.rs @@ -1,11 +1,13 @@ // This is free and unencumbered software released into the public domain. -use protoflow::{blocks::Const, runtimes::StdRuntime, transports::MpscTransport, Runtime, System}; +use protoflow::{ + blocks::Const, runtimes::StdRuntime, transports::MpscTransport, System, SystemExecution, +}; #[test] fn const_with_numeric_zero() -> Result<(), ()> { let transport = MpscTransport::new(); - let mut runtime = StdRuntime::new(transport).unwrap(); + let runtime = StdRuntime::new(transport).unwrap(); let mut system = System::new(&runtime); let constant: Const = system.block(Const { @@ -16,11 +18,9 @@ fn const_with_numeric_zero() -> Result<(), ()> { system.connect(&constant.output, &output); - std::thread::spawn(move || { - let process = runtime.execute(system).unwrap(); - process.join().unwrap(); - }); + let process = SystemExecution::execute(system).unwrap(); assert_eq!(output.recv(), Ok(Some(0))); // not Ok(None) + process.join().unwrap(); Ok(()) } From ae1a7a31d0e07d5424c8ea05de2efa466efc5a3a Mon Sep 17 00:00:00 2001 From: Samuel Sarle Date: Mon, 18 Nov 2024 18:18:07 +0200 Subject: [PATCH 03/17] Remove unused var --- lib/protoflow-core/src/system.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/protoflow-core/src/system.rs b/lib/protoflow-core/src/system.rs index 43c19afe..3a66a094 100644 --- a/lib/protoflow-core/src/system.rs +++ b/lib/protoflow-core/src/system.rs @@ -166,7 +166,7 @@ impl System { #[doc(hidden)] pub fn connect_by_id(&mut self, source_id: PortID, target_id: PortID) -> PortResult { - let add = self.connections.insert(( + self.connections.insert(( OutputPortID(source_id.into()), InputPortID(target_id.into()), )); From 39b013367bbb06fbff1d222ae5f9daa85d74a91f Mon Sep 17 00:00:00 2001 From: Samuel Sarle Date: Sat, 16 Nov 2024 21:46:06 +0200 Subject: [PATCH 04/17] ZMQ skeleton --- lib/protoflow-zeromq/Cargo.toml | 3 +- lib/protoflow-zeromq/src/lib.rs | 69 +++++++++++++++++++++++++++++++++ 2 files changed, 71 insertions(+), 1 deletion(-) diff --git a/lib/protoflow-zeromq/Cargo.toml b/lib/protoflow-zeromq/Cargo.toml index 00fde5ab..1cc3329d 100644 --- a/lib/protoflow-zeromq/Cargo.toml +++ b/lib/protoflow-zeromq/Cargo.toml @@ -27,6 +27,7 @@ cfg_aliases.workspace = true [dependencies] protoflow-core.workspace = true tracing = { version = "0.1", default-features = false, optional = true } -#zeromq = { version = "0.4", default-features = false } +zeromq = { version = "0.4.1", default-features = false, features = ["tokio-runtime", "all-transport"] } +tokio = { version = "1.40.0", default-features = false } [dev-dependencies] diff --git a/lib/protoflow-zeromq/src/lib.rs b/lib/protoflow-zeromq/src/lib.rs index 0f186b2c..cea14760 100644 --- a/lib/protoflow-zeromq/src/lib.rs +++ b/lib/protoflow-zeromq/src/lib.rs @@ -5,3 +5,72 @@ #[doc(hidden)] pub use protoflow_core::prelude; + +extern crate std; + +use protoflow_core::{prelude::Bytes, InputPortID, OutputPortID, PortResult, PortState, Transport}; + +use zeromq::{Socket, SocketRecv, SocketSend}; + +pub struct ZMQTransport { + psock: zeromq::PubSocket, + ssock: zeromq::SubSocket, + tokio: tokio::runtime::Handle, +} + +impl ZMQTransport { + pub fn new(url: &str) -> Self { + let tokio = tokio::runtime::Handle::current(); + let mut psock = zeromq::PubSocket::new(); + tokio.block_on(psock.connect(url)).expect("psock conn"); + let mut ssock = zeromq::SubSocket::new(); + tokio.block_on(ssock.connect(url)).expect("ssock conn"); + Self { + psock, + ssock, + tokio, + } + } +} + +impl Transport for ZMQTransport { + fn input_state(&self, input: InputPortID) -> PortResult { + todo!(); + } + + fn output_state(&self, output: OutputPortID) -> PortResult { + todo!(); + } + + fn open_input(&self) -> PortResult { + todo!(); + } + + fn open_output(&self) -> PortResult { + todo!(); + } + + fn close_input(&self, input: InputPortID) -> PortResult { + todo!(); + } + + fn close_output(&self, output: OutputPortID) -> PortResult { + todo!(); + } + + fn connect(&self, source: OutputPortID, target: InputPortID) -> PortResult { + todo!(); + } + + fn send(&self, output: OutputPortID, message: Bytes) -> PortResult<()> { + todo!(); + } + + fn recv(&self, input: InputPortID) -> PortResult> { + todo!(); + } + + fn try_recv(&self, _input: InputPortID) -> PortResult> { + todo!(); + } +} From d3a7398f0ad862beec69192a424df6aec3b478e1 Mon Sep 17 00:00:00 2001 From: Samuel Sarle Date: Mon, 18 Nov 2024 19:26:06 +0200 Subject: [PATCH 05/17] Add rudimentary `send` and `recv` --- lib/protoflow-zeromq/Cargo.toml | 8 ++- lib/protoflow-zeromq/src/lib.rs | 93 ++++++++++++++++++++++++++++----- 2 files changed, 87 insertions(+), 14 deletions(-) diff --git a/lib/protoflow-zeromq/Cargo.toml b/lib/protoflow-zeromq/Cargo.toml index 1cc3329d..c911369c 100644 --- a/lib/protoflow-zeromq/Cargo.toml +++ b/lib/protoflow-zeromq/Cargo.toml @@ -17,7 +17,7 @@ publish.workspace = true [features] default = ["all", "std"] all = ["tracing"] -std = ["protoflow-core/std", "tracing?/std"] #, "zeromq/default"] +std = ["protoflow-core/std", "tracing?/std"] tracing = ["protoflow-core/tracing", "dep:tracing"] unstable = ["protoflow-core/unstable"] @@ -27,7 +27,11 @@ cfg_aliases.workspace = true [dependencies] protoflow-core.workspace = true tracing = { version = "0.1", default-features = false, optional = true } -zeromq = { version = "0.4.1", default-features = false, features = ["tokio-runtime", "all-transport"] } +zeromq = { version = "0.4.1", default-features = false, features = [ + "tokio-runtime", + "all-transport", +] } tokio = { version = "1.40.0", default-features = false } +parking_lot = "0.12" [dev-dependencies] diff --git a/lib/protoflow-zeromq/src/lib.rs b/lib/protoflow-zeromq/src/lib.rs index cea14760..6ab44c85 100644 --- a/lib/protoflow-zeromq/src/lib.rs +++ b/lib/protoflow-zeromq/src/lib.rs @@ -8,27 +8,67 @@ pub use protoflow_core::prelude; extern crate std; -use protoflow_core::{prelude::Bytes, InputPortID, OutputPortID, PortResult, PortState, Transport}; +use protoflow_core::{ + prelude::{BTreeMap, Bytes}, + InputPortID, OutputPortID, PortError, PortResult, PortState, Transport, +}; -use zeromq::{Socket, SocketRecv, SocketSend}; +use parking_lot::{Mutex, RwLock}; +use std::sync::mpsc::{Receiver, SyncSender}; +use zeromq::{util::PeerIdentity, Socket, SocketOptions, SocketRecv, SocketSend}; pub struct ZMQTransport { - psock: zeromq::PubSocket, - ssock: zeromq::SubSocket, + sock: Mutex, tokio: tokio::runtime::Handle, + outputs: BTreeMap>, + inputs: BTreeMap>, +} + +#[derive(Debug, Clone, Default)] +enum ZmqOutputPortState { + #[default] + Open, + Connected(SyncSender), + Closed, +} + +#[derive(Debug, Default)] +enum ZmqInputPortState { + #[default] + Open, + Connected(Mutex>), + Closed, +} + +#[derive(Clone, Debug)] +enum ZmqTransportEvent { + Connect, + Message(Bytes), + Disconnect, } impl ZMQTransport { pub fn new(url: &str) -> Self { let tokio = tokio::runtime::Handle::current(); - let mut psock = zeromq::PubSocket::new(); - tokio.block_on(psock.connect(url)).expect("psock conn"); - let mut ssock = zeromq::SubSocket::new(); - tokio.block_on(ssock.connect(url)).expect("ssock conn"); + + let peer_id = PeerIdentity::new(); + let mut sock_opts = SocketOptions::default(); + sock_opts.peer_identity(peer_id); + + let mut sock = zeromq::RouterSocket::with_options(sock_opts); + tokio + .block_on(sock.connect(url)) + .expect("failed to connect"); + let sock = Mutex::new(sock); + + let outputs = BTreeMap::default(); + let inputs = BTreeMap::default(); + Self { - psock, - ssock, + sock, tokio, + outputs, + inputs, } } } @@ -63,11 +103,40 @@ impl Transport for ZMQTransport { } fn send(&self, output: OutputPortID, message: Bytes) -> PortResult<()> { - todo!(); + let Some(output_state) = self.outputs.get(&output) else { + todo!(); + }; + + use ZmqOutputPortState::*; + match *output_state.read() { + Open => todo!(), + Closed => todo!(), + Connected(ref sender) => { + let msg = ZmqTransportEvent::Message(message); + Ok(sender.send(msg).unwrap()) + } + } } fn recv(&self, input: InputPortID) -> PortResult> { - todo!(); + let Some(input_state) = self.inputs.get(&input) else { + todo!(); + }; + + use ZmqInputPortState::*; + match *input_state.read() { + Open => todo!(), + Closed => todo!(), + Connected(ref receiver) => { + use ZmqTransportEvent::*; + let receiver = receiver.lock(); + match receiver.recv().map_err(|_| PortError::Disconnected)? { + Connect => todo!(), + Disconnect => todo!(), + Message(bytes) => Ok(Some(bytes)), + } + } + } } fn try_recv(&self, _input: InputPortID) -> PortResult> { From 7ba654a834e1c76abfec8c9e79b30df2c3fce89b Mon Sep 17 00:00:00 2001 From: Samuel Sarle Date: Tue, 19 Nov 2024 07:54:05 +0200 Subject: [PATCH 06/17] wip --- lib/protoflow-zeromq/src/lib.rs | 54 +++++++++++++++++++++++++-------- 1 file changed, 42 insertions(+), 12 deletions(-) diff --git a/lib/protoflow-zeromq/src/lib.rs b/lib/protoflow-zeromq/src/lib.rs index 6ab44c85..15d521bb 100644 --- a/lib/protoflow-zeromq/src/lib.rs +++ b/lib/protoflow-zeromq/src/lib.rs @@ -9,7 +9,7 @@ pub use protoflow_core::prelude; extern crate std; use protoflow_core::{ - prelude::{BTreeMap, Bytes}, + prelude::{BTreeMap, Bytes, ToString}, InputPortID, OutputPortID, PortError, PortResult, PortState, Transport, }; @@ -18,7 +18,8 @@ use std::sync::mpsc::{Receiver, SyncSender}; use zeromq::{util::PeerIdentity, Socket, SocketOptions, SocketRecv, SocketSend}; pub struct ZMQTransport { - sock: Mutex, + psock: Mutex, + ssock: Mutex, tokio: tokio::runtime::Handle, outputs: BTreeMap>, inputs: BTreeMap>, @@ -52,20 +53,42 @@ impl ZMQTransport { let tokio = tokio::runtime::Handle::current(); let peer_id = PeerIdentity::new(); - let mut sock_opts = SocketOptions::default(); - sock_opts.peer_identity(peer_id); - let mut sock = zeromq::RouterSocket::with_options(sock_opts); - tokio - .block_on(sock.connect(url)) - .expect("failed to connect"); - let sock = Mutex::new(sock); + let psock = { + let peer_id = peer_id.clone(); + let mut sock_opts = SocketOptions::default(); + sock_opts.peer_identity(peer_id); + + let mut psock = zeromq::PubSocket::with_options(sock_opts); + tokio + .block_on(psock.connect(url)) + .expect("failed to connect PUB"); + Mutex::new(psock) + }; + + let ssock = { + let mut sock_opts = SocketOptions::default(); + sock_opts.peer_identity(peer_id); + + let mut ssock = zeromq::SubSocket::with_options(sock_opts); + tokio + .block_on(ssock.connect(url)) + .expect("failed to connect SUB"); + Mutex::new(ssock) + }; + + // let mut sock = zeromq::RouterSocket::with_options(sock_opts); + // tokio + // .block_on(sock.connect(url)) + // .expect("failed to connect"); + // let sock = Mutex::new(sock); let outputs = BTreeMap::default(); let inputs = BTreeMap::default(); Self { - sock, + psock, + ssock, tokio, outputs, inputs, @@ -83,11 +106,18 @@ impl Transport for ZMQTransport { } fn open_input(&self) -> PortResult { - todo!(); + let id = self.inputs.len() + 1; + InputPortID::try_from(id as isize).map_err(|e| PortError::Other(e.to_string())) } fn open_output(&self) -> PortResult { - todo!(); + let id = self.inputs.len() + 1; + let id = + OutputPortID::try_from(id as isize).map_err(|e| PortError::Other(e.to_string()))?; + self.outputs + .insert(id, RwLock::new(ZmqOutputPortState::Open)) + .unwrap(); + Ok(id) } fn close_input(&self, input: InputPortID) -> PortResult { From e323f73277db22b9b62fd2ad432e8168b1257221 Mon Sep 17 00:00:00 2001 From: Samuel Sarle Date: Mon, 25 Nov 2024 09:49:39 +0200 Subject: [PATCH 07/17] Reset impl again --- lib/protoflow-zeromq/src/lib.rs | 50 +++------------------------------ 1 file changed, 4 insertions(+), 46 deletions(-) diff --git a/lib/protoflow-zeromq/src/lib.rs b/lib/protoflow-zeromq/src/lib.rs index 15d521bb..ba95ec69 100644 --- a/lib/protoflow-zeromq/src/lib.rs +++ b/lib/protoflow-zeromq/src/lib.rs @@ -77,12 +77,6 @@ impl ZMQTransport { Mutex::new(ssock) }; - // let mut sock = zeromq::RouterSocket::with_options(sock_opts); - // tokio - // .block_on(sock.connect(url)) - // .expect("failed to connect"); - // let sock = Mutex::new(sock); - let outputs = BTreeMap::default(); let inputs = BTreeMap::default(); @@ -106,18 +100,11 @@ impl Transport for ZMQTransport { } fn open_input(&self) -> PortResult { - let id = self.inputs.len() + 1; - InputPortID::try_from(id as isize).map_err(|e| PortError::Other(e.to_string())) + todo!(); } fn open_output(&self) -> PortResult { - let id = self.inputs.len() + 1; - let id = - OutputPortID::try_from(id as isize).map_err(|e| PortError::Other(e.to_string()))?; - self.outputs - .insert(id, RwLock::new(ZmqOutputPortState::Open)) - .unwrap(); - Ok(id) + todo!(); } fn close_input(&self, input: InputPortID) -> PortResult { @@ -133,40 +120,11 @@ impl Transport for ZMQTransport { } fn send(&self, output: OutputPortID, message: Bytes) -> PortResult<()> { - let Some(output_state) = self.outputs.get(&output) else { - todo!(); - }; - - use ZmqOutputPortState::*; - match *output_state.read() { - Open => todo!(), - Closed => todo!(), - Connected(ref sender) => { - let msg = ZmqTransportEvent::Message(message); - Ok(sender.send(msg).unwrap()) - } - } + todo!(); } fn recv(&self, input: InputPortID) -> PortResult> { - let Some(input_state) = self.inputs.get(&input) else { - todo!(); - }; - - use ZmqInputPortState::*; - match *input_state.read() { - Open => todo!(), - Closed => todo!(), - Connected(ref receiver) => { - use ZmqTransportEvent::*; - let receiver = receiver.lock(); - match receiver.recv().map_err(|_| PortError::Disconnected)? { - Connect => todo!(), - Disconnect => todo!(), - Message(bytes) => Ok(Some(bytes)), - } - } - } + todo!(); } fn try_recv(&self, _input: InputPortID) -> PortResult> { From 1010e2ff30cd793736dfd07ab390d756986297fa Mon Sep 17 00:00:00 2001 From: Samuel Sarle Date: Mon, 25 Nov 2024 09:53:59 +0200 Subject: [PATCH 08/17] Add topics --- lib/protoflow-zeromq/src/lib.rs | 37 +++++++++++++++++++++++++++------ 1 file changed, 31 insertions(+), 6 deletions(-) diff --git a/lib/protoflow-zeromq/src/lib.rs b/lib/protoflow-zeromq/src/lib.rs index ba95ec69..0d56dbaa 100644 --- a/lib/protoflow-zeromq/src/lib.rs +++ b/lib/protoflow-zeromq/src/lib.rs @@ -9,18 +9,24 @@ pub use protoflow_core::prelude; extern crate std; use protoflow_core::{ - prelude::{BTreeMap, Bytes, ToString}, + prelude::{BTreeMap, Bytes, String, ToString}, InputPortID, OutputPortID, PortError, PortResult, PortState, Transport, }; use parking_lot::{Mutex, RwLock}; -use std::sync::mpsc::{Receiver, SyncSender}; +use std::{ + fmt::{self, Write}, + sync::mpsc::{Receiver, SyncSender}, + write, +}; use zeromq::{util::PeerIdentity, Socket, SocketOptions, SocketRecv, SocketSend}; pub struct ZMQTransport { + tokio: tokio::runtime::Handle, + psock: Mutex, ssock: Mutex, - tokio: tokio::runtime::Handle, + outputs: BTreeMap>, inputs: BTreeMap>, } @@ -41,11 +47,30 @@ enum ZmqInputPortState { Closed, } +type SequenceID = u64; + #[derive(Clone, Debug)] enum ZmqTransportEvent { - Connect, - Message(Bytes), - Disconnect, + Connect(OutputPortID, InputPortID), + AckConnection(OutputPortID, InputPortID), + Message(OutputPortID, InputPortID, SequenceID, Bytes), + AckMessage(OutputPortID, InputPortID, SequenceID), + CloseOutput(OutputPortID, InputPortID), + CloseInput(InputPortID), +} + +impl ZmqTransportEvent { + fn write_topic(&self, f: &mut W) -> Result<(), fmt::Error> { + use ZmqTransportEvent::*; + match self { + Connect(o, i) => write!(f, "{}:conn:{}", i, o), + AckConnection(o, i) => write!(f, "{}:ackConn:{}", i, o), + Message(o, i, seq, _payload) => write!(f, "{}:msg:{}:{}", i, o, seq), + AckMessage(o, i, seq) => write!(f, "{}:ackMsg:{}:{}", i, o, seq), + CloseOutput(o, i) => write!(f, "{}:closeOut:{}", i, o), + CloseInput(i) => write!(f, "{}:closeIn", i), + } + } } impl ZMQTransport { From 3f3f128451d1b95355c85ef785eb0fa42821e6fc Mon Sep 17 00:00:00 2001 From: Samuel Sarle Date: Mon, 25 Nov 2024 11:15:47 +0200 Subject: [PATCH 09/17] Add port operation skeletons --- lib/protoflow-zeromq/Cargo.toml | 1 + lib/protoflow-zeromq/src/lib.rs | 181 ++++++++++++++++++++++++++++---- 2 files changed, 160 insertions(+), 22 deletions(-) diff --git a/lib/protoflow-zeromq/Cargo.toml b/lib/protoflow-zeromq/Cargo.toml index c911369c..e28a73e7 100644 --- a/lib/protoflow-zeromq/Cargo.toml +++ b/lib/protoflow-zeromq/Cargo.toml @@ -35,3 +35,4 @@ tokio = { version = "1.40.0", default-features = false } parking_lot = "0.12" [dev-dependencies] +futures-util = "0.3.31" diff --git a/lib/protoflow-zeromq/src/lib.rs b/lib/protoflow-zeromq/src/lib.rs index 0d56dbaa..ec6f3fcd 100644 --- a/lib/protoflow-zeromq/src/lib.rs +++ b/lib/protoflow-zeromq/src/lib.rs @@ -9,7 +9,7 @@ pub use protoflow_core::prelude; extern crate std; use protoflow_core::{ - prelude::{BTreeMap, Bytes, String, ToString}, + prelude::{BTreeMap, Bytes, ToString}, InputPortID, OutputPortID, PortError, PortResult, PortState, Transport, }; @@ -19,23 +19,29 @@ use std::{ sync::mpsc::{Receiver, SyncSender}, write, }; -use zeromq::{util::PeerIdentity, Socket, SocketOptions, SocketRecv, SocketSend}; +use zeromq::{util::PeerIdentity, Socket, SocketOptions}; -pub struct ZMQTransport { +const DEFAULT_PUB_SOCKET: &str = "tcp://127.0.0.1:10000"; +const DEFAULT_SUB_SOCKET: &str = "tcp://127.0.0.1:10001"; + +pub struct ZmqTransport { tokio: tokio::runtime::Handle, psock: Mutex, ssock: Mutex, - outputs: BTreeMap>, - inputs: BTreeMap>, + outputs: RwLock>>, + inputs: RwLock>>, } -#[derive(Debug, Clone, Default)] +#[derive(Debug, Default)] enum ZmqOutputPortState { #[default] Open, - Connected(SyncSender), + Connected( + Mutex>, + SyncSender, + ), Closed, } @@ -43,7 +49,10 @@ enum ZmqOutputPortState { enum ZmqInputPortState { #[default] Open, - Connected(Mutex>), + Connected( + Mutex>, + SyncSender, + ), Closed, } @@ -73,8 +82,14 @@ impl ZmqTransportEvent { } } -impl ZMQTransport { - pub fn new(url: &str) -> Self { +impl Default for ZmqTransport { + fn default() -> Self { + Self::new(DEFAULT_PUB_SOCKET, DEFAULT_SUB_SOCKET) + } +} + +impl ZmqTransport { + pub fn new(pub_url: &str, sub_url: &str) -> Self { let tokio = tokio::runtime::Handle::current(); let peer_id = PeerIdentity::new(); @@ -86,7 +101,7 @@ impl ZMQTransport { let mut psock = zeromq::PubSocket::with_options(sock_opts); tokio - .block_on(psock.connect(url)) + .block_on(psock.connect(pub_url)) .expect("failed to connect PUB"); Mutex::new(psock) }; @@ -97,13 +112,13 @@ impl ZMQTransport { let mut ssock = zeromq::SubSocket::with_options(sock_opts); tokio - .block_on(ssock.connect(url)) + .block_on(ssock.connect(sub_url)) .expect("failed to connect SUB"); Mutex::new(ssock) }; - let outputs = BTreeMap::default(); - let inputs = BTreeMap::default(); + let outputs = RwLock::new(BTreeMap::default()); + let inputs = RwLock::new(BTreeMap::default()); Self { psock, @@ -115,33 +130,109 @@ impl ZMQTransport { } } -impl Transport for ZMQTransport { +impl Transport for ZmqTransport { fn input_state(&self, input: InputPortID) -> PortResult { - todo!(); + use ZmqInputPortState::*; + match self.inputs.read().get(&input) { + Some(input) => match *input.read() { + Open => Ok(PortState::Open), + Connected(_, _) => Ok(PortState::Connected), + Closed => Ok(PortState::Closed), + }, + None => Err(PortError::Invalid(input.into())), + } } fn output_state(&self, output: OutputPortID) -> PortResult { - todo!(); + use ZmqOutputPortState::*; + match self.outputs.read().get(&output) { + Some(output) => match *output.read() { + Open => Ok(PortState::Open), + Connected(_, _) => Ok(PortState::Connected), + Closed => Ok(PortState::Closed), + }, + None => Err(PortError::Invalid(output.into())), + } } fn open_input(&self) -> PortResult { - todo!(); + let mut inputs = self.inputs.write(); + + let new_id = InputPortID::try_from(-(inputs.len() as isize + 1)) + .map_err(|e| PortError::Other(e.to_string()))?; + + let state = RwLock::new(ZmqInputPortState::Open); + inputs.insert(new_id, state); + + // TODO: start worker + + Ok(new_id) } fn open_output(&self) -> PortResult { - todo!(); + let mut outputs = self.outputs.write(); + + let new_id = OutputPortID::try_from(outputs.len() as isize + 1) + .map_err(|e| PortError::Other(e.to_string()))?; + + let state = RwLock::new(ZmqOutputPortState::Open); + outputs.insert(new_id, state); + + // TODO: start worker + + Ok(new_id) } fn close_input(&self, input: InputPortID) -> PortResult { - todo!(); + let inputs = self.inputs.read(); + + let Some(state) = inputs.get(&input) else { + return Err(PortError::Invalid(input.into())); + }; + + let mut state = state.write(); + + let ZmqInputPortState::Connected(_, _) = *state else { + return Err(PortError::Disconnected); + }; + + // TODO: send message to worker + + *state = ZmqInputPortState::Closed; + + Ok(true) } fn close_output(&self, output: OutputPortID) -> PortResult { - todo!(); + let outputs = self.outputs.read(); + + let Some(state) = outputs.get(&output) else { + return Err(PortError::Invalid(output.into())); + }; + + let mut state = state.write(); + + let ZmqOutputPortState::Connected(_, _) = *state else { + return Err(PortError::Disconnected); + }; + + // TODO: send message to worker + + *state = ZmqOutputPortState::Closed; + + Ok(true) } fn connect(&self, source: OutputPortID, target: InputPortID) -> PortResult { - todo!(); + let Some(output) = self.outputs.read().get(&source) else { + return Err(PortError::Invalid(source.into())); + }; + + let Some(input) = self.inputs.read().get(&target) else { + return Err(PortError::Invalid(target.into())); + }; + + Ok(true) } fn send(&self, output: OutputPortID, message: Bytes) -> PortResult<()> { @@ -156,3 +247,49 @@ impl Transport for ZMQTransport { todo!(); } } + +#[cfg(test)] +mod tests { + use super::*; + + use protoflow_core::System; + + use futures_util::future::TryFutureExt; + use zeromq::{PubSocket, SocketRecv, SocketSend, SubSocket}; + + fn start_zmqtransport_server(rt: &tokio::runtime::Runtime) { + // bind a *SUB* socket to the *PUB* address so that the transport can *PUB* to it + let mut pub_srv = SubSocket::new(); + rt.block_on(pub_srv.bind(DEFAULT_PUB_SOCKET)).unwrap(); + + // bind a *PUB* socket to the *SUB* address so that the transport can *SUB* to it + let mut sub_srv = PubSocket::new(); + rt.block_on(sub_srv.bind(DEFAULT_SUB_SOCKET)).unwrap(); + + // subscribe to all messages + rt.block_on(pub_srv.subscribe("")).unwrap(); + + // resend anything received on the *SUB* socket to the *PUB* socket + tokio::task::spawn(async move { + let mut pub_srv = pub_srv; + loop { + pub_srv + .recv() + .and_then(|msg| sub_srv.send(msg)) + .await + .unwrap(); + } + }); + } + + #[test] + fn implementation_matches() { + let rt = tokio::runtime::Runtime::new().unwrap(); + let _guard = rt.enter(); + + //zeromq::proxy(frontend, backend, capture) + start_zmqtransport_server(&rt); + + let _ = System::::build(|_s| { /* do nothing */ }); + } +} From d0aadded172db7bf361723d568027623ffb3100f Mon Sep 17 00:00:00 2001 From: Samuel Sarle Date: Tue, 26 Nov 2024 12:12:45 +0200 Subject: [PATCH 10/17] Add port worker skeletons --- lib/protoflow-zeromq/src/lib.rs | 228 +++++++++++++++++++++++++------- 1 file changed, 182 insertions(+), 46 deletions(-) diff --git a/lib/protoflow-zeromq/src/lib.rs b/lib/protoflow-zeromq/src/lib.rs index ec6f3fcd..a0e72947 100644 --- a/lib/protoflow-zeromq/src/lib.rs +++ b/lib/protoflow-zeromq/src/lib.rs @@ -9,17 +9,18 @@ pub use protoflow_core::prelude; extern crate std; use protoflow_core::{ - prelude::{BTreeMap, Bytes, ToString}, + prelude::{BTreeMap, Bytes, ToString, Vec}, InputPortID, OutputPortID, PortError, PortResult, PortState, Transport, }; use parking_lot::{Mutex, RwLock}; use std::{ fmt::{self, Write}, - sync::mpsc::{Receiver, SyncSender}, + format, + sync::mpsc::{sync_channel, Receiver, SyncSender}, write, }; -use zeromq::{util::PeerIdentity, Socket, SocketOptions}; +use zeromq::{util::PeerIdentity, Socket, SocketOptions, ZmqError}; const DEFAULT_PUB_SOCKET: &str = "tcp://127.0.0.1:10000"; const DEFAULT_SUB_SOCKET: &str = "tcp://127.0.0.1:10001"; @@ -39,25 +40,53 @@ enum ZmqOutputPortState { #[default] Open, Connected( - Mutex>, SyncSender, + Mutex>, + InputPortID, ), Closed, } -#[derive(Debug, Default)] +impl ZmqOutputPortState { + fn state(&self) -> PortState { + use ZmqOutputPortState::*; + match self { + Open => PortState::Open, + Connected(_, _, _) => PortState::Connected, + Closed => PortState::Closed, + } + } +} + +#[derive(Debug)] enum ZmqInputPortState { - #[default] - Open, + Open( + SyncSender, + Mutex>, + ), Connected( - Mutex>, SyncSender, + Mutex>, + Vec, ), Closed, } +impl ZmqInputPortState { + fn state(&self) -> PortState { + use ZmqInputPortState::*; + match self { + Open(_, _) => PortState::Open, + Connected(_, _, _) => PortState::Connected, + Closed => PortState::Closed, + } + } +} + type SequenceID = u64; +/// ZmqTransportEvent represents the data that goes over the wire, sent from an output port over +/// the network to an input port. #[derive(Clone, Debug)] enum ZmqTransportEvent { Connect(OutputPortID, InputPortID), @@ -82,6 +111,24 @@ impl ZmqTransportEvent { } } +/// ZmqOutputPortEvent represents events that we receive from the background worker of the port. +#[derive(Clone, Debug)] +enum ZmqOutputPortEvent { + Opened, + Connected(InputPortID), + Message(Bytes), + Closed, +} + +/// ZmqInputPortEvent represents events that we receive from the background worker of the port. +#[derive(Clone, Debug)] +enum ZmqInputPortEvent { + Opened, + Connected(OutputPortID), + Message(Bytes), + Closed, +} + impl Default for ZmqTransport { fn default() -> Self { Self::new(DEFAULT_PUB_SOCKET, DEFAULT_SUB_SOCKET) @@ -128,31 +175,47 @@ impl ZmqTransport { inputs, } } + + fn subscribe_for_input_port( + &self, + input: InputPortID, + ) -> Result<(SyncSender, Receiver), ZmqError> { + // TODO: only sub to relevant events + let topic = format!("{}:", input); + self.tokio.block_on(self.ssock.lock().subscribe(&topic))?; + let (from_worker_send, from_worker_recv) = sync_channel(1); + let (to_worker_send, to_worker_recv) = sync_channel(1); + + // Input worker loop: + // 1. Receive connection attempts and respond + // 2. Receive messages and forward to channel + // 3. Receive and handle disconnects + tokio::task::spawn(async { + let (output, input) = (from_worker_send, to_worker_recv); + loop { + todo!(); + } + }); + + Ok((to_worker_send, from_worker_recv)) + } } impl Transport for ZmqTransport { fn input_state(&self, input: InputPortID) -> PortResult { - use ZmqInputPortState::*; - match self.inputs.read().get(&input) { - Some(input) => match *input.read() { - Open => Ok(PortState::Open), - Connected(_, _) => Ok(PortState::Connected), - Closed => Ok(PortState::Closed), - }, - None => Err(PortError::Invalid(input.into())), - } + self.inputs + .read() + .get(&input) + .map(|port| port.read().state()) + .ok_or(PortError::Invalid(input.into())) } fn output_state(&self, output: OutputPortID) -> PortResult { - use ZmqOutputPortState::*; - match self.outputs.read().get(&output) { - Some(output) => match *output.read() { - Open => Ok(PortState::Open), - Connected(_, _) => Ok(PortState::Connected), - Closed => Ok(PortState::Closed), - }, - None => Err(PortError::Invalid(output.into())), - } + self.outputs + .read() + .get(&output) + .map(|port| port.read().state()) + .ok_or(PortError::Invalid(output.into())) } fn open_input(&self) -> PortResult { @@ -161,10 +224,12 @@ impl Transport for ZmqTransport { let new_id = InputPortID::try_from(-(inputs.len() as isize + 1)) .map_err(|e| PortError::Other(e.to_string()))?; - let state = RwLock::new(ZmqInputPortState::Open); - inputs.insert(new_id, state); + let (sender, receiver) = self + .subscribe_for_input_port(new_id) + .map_err(|e| PortError::Other(e.to_string()))?; - // TODO: start worker + let state = RwLock::new(ZmqInputPortState::Open(sender, Mutex::new(receiver))); + inputs.insert(new_id, state); Ok(new_id) } @@ -178,8 +243,6 @@ impl Transport for ZmqTransport { let state = RwLock::new(ZmqOutputPortState::Open); outputs.insert(new_id, state); - // TODO: start worker - Ok(new_id) } @@ -190,17 +253,27 @@ impl Transport for ZmqTransport { return Err(PortError::Invalid(input.into())); }; - let mut state = state.write(); + let state = state.read(); - let ZmqInputPortState::Connected(_, _) = *state else { + let ZmqInputPortState::Connected(sender, receiver, _) = &*state else { return Err(PortError::Disconnected); }; - // TODO: send message to worker - - *state = ZmqInputPortState::Closed; + sender + .send(ZmqTransportEvent::CloseInput(input)) + .map_err(|e| PortError::Other(e.to_string()))?; - Ok(true) + loop { + let msg = receiver + .lock() + .recv() + .map_err(|e| PortError::Other(e.to_string()))?; + use ZmqInputPortEvent::*; + match msg { + Closed => break Ok(true), + _ => continue, // TODO + }; + } } fn close_output(&self, output: OutputPortID) -> PortResult { @@ -210,29 +283,92 @@ impl Transport for ZmqTransport { return Err(PortError::Invalid(output.into())); }; - let mut state = state.write(); + let state = state.write(); - let ZmqOutputPortState::Connected(_, _) = *state else { + let ZmqOutputPortState::Connected(sender, receiver, input) = &*state else { return Err(PortError::Disconnected); }; - // TODO: send message to worker - - *state = ZmqOutputPortState::Closed; + sender + .send(ZmqTransportEvent::CloseOutput(output, *input)) + .map_err(|e| PortError::Other(e.to_string()))?; - Ok(true) + loop { + let msg = receiver + .lock() + .recv() + .map_err(|e| PortError::Other(e.to_string()))?; + use ZmqOutputPortEvent::*; + match msg { + Closed => break Ok(true), + _ => continue, // TODO + } + } } fn connect(&self, source: OutputPortID, target: InputPortID) -> PortResult { - let Some(output) = self.outputs.read().get(&source) else { + let outputs = self.outputs.read(); + let Some(output) = outputs.get(&source) else { return Err(PortError::Invalid(source.into())); }; - let Some(input) = self.inputs.read().get(&target) else { + let inputs = self.inputs.read(); + let Some(input) = inputs.get(&target) else { return Err(PortError::Invalid(target.into())); }; - Ok(true) + //let mut output = output.write(); + //if !output.state().is_open() { + // return Err(PortError::Invalid(source.into())); + //} + // + //let mut input = input.write(); + //if !input.state().is_open() { + // return Err(PortError::Invalid(source.into())); + //} + + // TODO: send from output, receive and respond from input + + //let (out_recv, out_send) = { + // let (from_worker_send, from_worker_recv) = sync_channel::(1); + // let (to_worker_send, to_worker_recv) = sync_channel::(1); + // + // tokio::task::spawn(async { + // let (output, input) = (from_worker_send, to_worker_recv); + // loop { + // tokio::time::sleep(Duration::from_secs(1)).await; + // } + // }); + // + // (from_worker_recv, to_worker_send) + //}; + + let (from_worker_send, from_worker_recv) = sync_channel::(1); + let (to_worker_send, to_worker_recv) = sync_channel::(1); + + // Output worker loop: + // 1. Send connection attempts + // 2. Send messages + // 2.1 Wait for ACK + // 2.2. Resend on timeout + // 3. Send disconnect events + tokio::task::spawn(async { + let (output, input) = (from_worker_send, to_worker_recv); + loop { + todo!(); + } + }); + + loop { + let msg = from_worker_recv + .recv() + .map_err(|e| PortError::Other(e.to_string()))?; + use ZmqOutputPortEvent::*; + match msg { + Connected(_) => break Ok(true), + _ => continue, // TODO + } + } } fn send(&self, output: OutputPortID, message: Bytes) -> PortResult<()> { From 6e2502be8778bb1baeac299ee871de53a47e5a37 Mon Sep 17 00:00:00 2001 From: Samuel Sarle Date: Tue, 26 Nov 2024 12:23:02 +0200 Subject: [PATCH 11/17] Add ZMQ socket workers for listening and sending --- lib/protoflow-zeromq/src/lib.rs | 455 ++++++++++++++++++++++++++------ 1 file changed, 372 insertions(+), 83 deletions(-) diff --git a/lib/protoflow-zeromq/src/lib.rs b/lib/protoflow-zeromq/src/lib.rs index a0e72947..cb7ca70b 100644 --- a/lib/protoflow-zeromq/src/lib.rs +++ b/lib/protoflow-zeromq/src/lib.rs @@ -9,18 +9,20 @@ pub use protoflow_core::prelude; extern crate std; use protoflow_core::{ - prelude::{BTreeMap, Bytes, ToString, Vec}, + prelude::{Arc, BTreeMap, Bytes, String, ToString, Vec}, InputPortID, OutputPortID, PortError, PortResult, PortState, Transport, }; +use core::fmt::Error; use parking_lot::{Mutex, RwLock}; use std::{ - fmt::{self, Write}, format, sync::mpsc::{sync_channel, Receiver, SyncSender}, write, }; -use zeromq::{util::PeerIdentity, Socket, SocketOptions, ZmqError}; +use zeromq::{ + util::PeerIdentity, Socket, SocketOptions, SocketRecv, SocketSend, ZmqError, ZmqMessage, +}; const DEFAULT_PUB_SOCKET: &str = "tcp://127.0.0.1:10000"; const DEFAULT_SUB_SOCKET: &str = "tcp://127.0.0.1:10001"; @@ -28,20 +30,19 @@ const DEFAULT_SUB_SOCKET: &str = "tcp://127.0.0.1:10001"; pub struct ZmqTransport { tokio: tokio::runtime::Handle, - psock: Mutex, - ssock: Mutex, + out_queue: Arc>, + sub_queue: Arc>, - outputs: RwLock>>, - inputs: RwLock>>, + outputs: Arc>>>, + inputs: Arc>>>, } -#[derive(Debug, Default)] +#[derive(Debug, Clone)] enum ZmqOutputPortState { - #[default] - Open, + Open(Arc>), Connected( - SyncSender, - Mutex>, + Arc>, + Arc>>, InputPortID, ), Closed, @@ -51,22 +52,22 @@ impl ZmqOutputPortState { fn state(&self) -> PortState { use ZmqOutputPortState::*; match self { - Open => PortState::Open, + Open(_) => PortState::Open, Connected(_, _, _) => PortState::Connected, Closed => PortState::Closed, } } } -#[derive(Debug)] +#[derive(Debug, Clone)] enum ZmqInputPortState { Open( - SyncSender, - Mutex>, + Arc>, + Arc>>, ), Connected( - SyncSender, - Mutex>, + Arc>, + Arc>>, Vec, ), Closed, @@ -98,7 +99,7 @@ enum ZmqTransportEvent { } impl ZmqTransportEvent { - fn write_topic(&self, f: &mut W) -> Result<(), fmt::Error> { + fn write_topic(&self, f: &mut W) -> Result<(), std::io::Error> { use ZmqTransportEvent::*; match self { Connect(o, i) => write!(f, "{}:conn:{}", i, o), @@ -111,12 +112,42 @@ impl ZmqTransportEvent { } } +impl From for ZmqMessage { + fn from(value: ZmqTransportEvent) -> Self { + let mut topic = Vec::new(); + value.write_topic(&mut topic).unwrap(); + + // first frame of the message is the topic + let mut msg = ZmqMessage::from(topic.clone()); + + // second frame of the message is the payload + match value { + ZmqTransportEvent::Connect(output_port_id, input_port_id) => todo!(), + ZmqTransportEvent::AckConnection(output_port_id, input_port_id) => todo!(), + ZmqTransportEvent::Message(_, _, _, bytes) => msg.push_back(bytes), + ZmqTransportEvent::AckMessage(output_port_id, input_port_id, _) => todo!(), + ZmqTransportEvent::CloseOutput(output_port_id, input_port_id) => todo!(), + ZmqTransportEvent::CloseInput(input_port_id) => todo!(), + }; + + msg + } +} + +impl TryFrom for ZmqTransportEvent { + type Error = protoflow_core::DecodeError; + + fn try_from(value: ZmqMessage) -> Result { + todo!() + } +} + /// ZmqOutputPortEvent represents events that we receive from the background worker of the port. #[derive(Clone, Debug)] enum ZmqOutputPortEvent { Opened, Connected(InputPortID), - Message(Bytes), + Ack(SequenceID), Closed, } @@ -135,6 +166,12 @@ impl Default for ZmqTransport { } } +#[derive(Clone)] +enum ZmqSubscriptionRequest { + Subscribe(String), + Unsubscribe(String), +} + impl ZmqTransport { pub fn new(pub_url: &str, sub_url: &str) -> Self { let tokio = tokio::runtime::Handle::current(); @@ -150,7 +187,7 @@ impl ZmqTransport { tokio .block_on(psock.connect(pub_url)) .expect("failed to connect PUB"); - Mutex::new(psock) + psock }; let ssock = { @@ -161,41 +198,145 @@ impl ZmqTransport { tokio .block_on(ssock.connect(sub_url)) .expect("failed to connect SUB"); - Mutex::new(ssock) + ssock }; - let outputs = RwLock::new(BTreeMap::default()); - let inputs = RwLock::new(BTreeMap::default()); + let outputs = Arc::new(RwLock::new(BTreeMap::default())); + let inputs = Arc::new(RwLock::new(BTreeMap::default())); + + let (out_queue, out_queue_recv) = sync_channel(1); + + let out_queue = Arc::new(out_queue); - Self { - psock, - ssock, + let (sub_queue, sub_queue_recv) = tokio::sync::mpsc::channel(1); + let sub_queue = Arc::new(sub_queue); + + let transport = Self { + out_queue, + sub_queue, tokio, outputs, inputs, - } + }; + + transport.start_send_worker(psock, out_queue_recv); + transport.start_recv_worker(ssock, sub_queue_recv); + + transport + } + + fn start_send_worker(&self, psock: zeromq::PubSocket, queue: Receiver) { + let tokio = self.tokio.clone(); + let mut psock = psock; + + tokio::task::spawn(async move { + loop { + let Ok(event) = queue.recv() else { + continue; + }; + + let msg = ZmqMessage::from(event); + + tokio.block_on(psock.send(msg)).expect("zmq send worker") + } + }); + } + + fn start_recv_worker( + &self, + ssock: zeromq::SubSocket, + queue: tokio::sync::mpsc::Receiver, + ) { + let mut ssock = ssock; + let mut queue = queue; + + let outputs = self.outputs.clone(); + let inputs = self.inputs.clone(); + + tokio::task::spawn(async move { + loop { + tokio::select! { + Ok(msg) = ssock.recv() => handle_zmq_msg(msg, &outputs, &inputs).unwrap(), + Some(req) = queue.recv() => { + use ZmqSubscriptionRequest::*; + match req { + Subscribe(topic) => ssock.subscribe(&topic).await.expect("zmq recv worker subscribe"), + Unsubscribe(topic) => ssock.unsubscribe(&topic).await.expect("zmq recv worker subscribe"), + }; + } + }; + } + }); } fn subscribe_for_input_port( &self, input: InputPortID, - ) -> Result<(SyncSender, Receiver), ZmqError> { + ) -> Result< + ( + Arc>, + Arc>>, + ), + ZmqError, + > { // TODO: only sub to relevant events let topic = format!("{}:", input); - self.tokio.block_on(self.ssock.lock().subscribe(&topic))?; + self.tokio + .block_on( + self.sub_queue + .send(ZmqSubscriptionRequest::Subscribe(topic)), + ) + .unwrap(); + let (from_worker_send, from_worker_recv) = sync_channel(1); let (to_worker_send, to_worker_recv) = sync_channel(1); + let to_worker_send = Arc::new(to_worker_send); + let from_worker_recv = Arc::new(Mutex::new(from_worker_recv)); + // Input worker loop: // 1. Receive connection attempts and respond // 2. Receive messages and forward to channel // 3. Receive and handle disconnects - tokio::task::spawn(async { - let (output, input) = (from_worker_send, to_worker_recv); - loop { - todo!(); - } - }); + { + let inputs = self.inputs.clone(); + + let to_worker_send = to_worker_send.clone(); + let from_worker_recv = from_worker_recv.clone(); + + tokio::task::spawn(async move { + let (output, input) = (from_worker_send, to_worker_recv); + + loop { + use ZmqTransportEvent::*; + let event = input.recv().expect("input worker recv"); + match event { + // Connection attempt + Connect(output_port_id, input_port_id) => { + let inputs = inputs.read(); + let Some(input) = inputs.get(&input_port_id) else { + todo!(); + }; + + let mut input = input.write(); + *input = ZmqInputPortState::Connected( + to_worker_send.clone(), + from_worker_recv.clone(), + Vec::new(), + ); + } + + // Message from output port + Message(output_port_id, input_port_id, _, bytes) => todo!(), + // Output port reports being closed + CloseInput(input_port_id) => todo!(), + + // ignore output port type events: + AckConnection(_, _) | AckMessage(_, _, _) | CloseOutput(_, _) => continue, + }; + } + }); + } Ok((to_worker_send, from_worker_recv)) } @@ -219,19 +360,25 @@ impl Transport for ZmqTransport { } fn open_input(&self) -> PortResult { - let mut inputs = self.inputs.write(); + let inputs = self.inputs.write(); let new_id = InputPortID::try_from(-(inputs.len() as isize + 1)) .map_err(|e| PortError::Other(e.to_string()))?; - let (sender, receiver) = self + let (_, receiver) = self .subscribe_for_input_port(new_id) .map_err(|e| PortError::Other(e.to_string()))?; - let state = RwLock::new(ZmqInputPortState::Open(sender, Mutex::new(receiver))); - inputs.insert(new_id, state); - - Ok(new_id) + loop { + let msg = receiver + .lock() + .recv() + .map_err(|e| PortError::Other(e.to_string()))?; + match msg { + ZmqInputPortEvent::Opened => break Ok(new_id), + _ => continue, // TODO + } + } } fn open_output(&self) -> PortResult { @@ -240,7 +387,10 @@ impl Transport for ZmqTransport { let new_id = OutputPortID::try_from(outputs.len() as isize + 1) .map_err(|e| PortError::Other(e.to_string()))?; - let state = RwLock::new(ZmqOutputPortState::Open); + let (sender, _receiver) = sync_channel(1); + let sender = Arc::new(sender); + + let state = RwLock::new(ZmqOutputPortState::Open(sender)); outputs.insert(new_id, state); Ok(new_id) @@ -308,59 +458,90 @@ impl Transport for ZmqTransport { fn connect(&self, source: OutputPortID, target: InputPortID) -> PortResult { let outputs = self.outputs.read(); - let Some(output) = outputs.get(&source) else { + if outputs + .get(&source) + .is_some_and(|state| !state.read().state().is_open()) + { return Err(PortError::Invalid(source.into())); - }; - - let inputs = self.inputs.read(); - let Some(input) = inputs.get(&target) else { - return Err(PortError::Invalid(target.into())); - }; - - //let mut output = output.write(); - //if !output.state().is_open() { - // return Err(PortError::Invalid(source.into())); - //} - // - //let mut input = input.write(); - //if !input.state().is_open() { - // return Err(PortError::Invalid(source.into())); - //} - - // TODO: send from output, receive and respond from input - - //let (out_recv, out_send) = { - // let (from_worker_send, from_worker_recv) = sync_channel::(1); - // let (to_worker_send, to_worker_recv) = sync_channel::(1); - // - // tokio::task::spawn(async { - // let (output, input) = (from_worker_send, to_worker_recv); - // loop { - // tokio::time::sleep(Duration::from_secs(1)).await; - // } - // }); - // - // (from_worker_recv, to_worker_send) - //}; + } let (from_worker_send, from_worker_recv) = sync_channel::(1); let (to_worker_send, to_worker_recv) = sync_channel::(1); + let to_worker_send = Arc::new(to_worker_send); + let from_worker_recv = Arc::new(Mutex::new(from_worker_recv)); + // Output worker loop: - // 1. Send connection attempts + // 1. Send connection attempt // 2. Send messages // 2.1 Wait for ACK // 2.2. Resend on timeout // 3. Send disconnect events - tokio::task::spawn(async { - let (output, input) = (from_worker_send, to_worker_recv); - loop { - todo!(); - } - }); + { + //let output_state = output_state.clone(); + let to_worker_send = to_worker_send.clone(); + let from_worker_recv = from_worker_recv.clone(); + + let outputs = self.outputs.clone(); + + tokio::task::spawn(async move { + let (output, input) = (from_worker_send, to_worker_recv); + + // connect loop + loop { + let request = input.recv().expect("output worker recv"); + match request { + ZmqTransportEvent::AckConnection(_, input_port_id) => { + let outputs = outputs.read(); + let Some(output_state) = outputs.get(&source) else { + todo!(); + }; + let mut output_state = output_state.write(); + debug_assert!(matches!(*output_state, ZmqOutputPortState::Open(_))); + *output_state = ZmqOutputPortState::Connected( + to_worker_send, + from_worker_recv, + input_port_id, + ); + break; + } + _ => continue, // TODO: when and why would we receive other events? + } + } + + // work loop + loop { + use ZmqTransportEvent::*; + let event = input.recv().expect("output worker recv"); + match event { + AckMessage(output_port_id, input_port_id, seq_id) => { + output + .send(ZmqOutputPortEvent::Ack(seq_id)) + .expect("worker loop ack send"); + } + + CloseInput(input_port_id) => todo!(), + + AckConnection(_, _) => { + unreachable!("already connected") + } + + // ignore input port type events + Connect(_, _) | CloseOutput(_, _) | Message(_, _, _, _) => continue, // TODO + } + } + }); + } + + // send request to connect + self.out_queue + .send(ZmqTransportEvent::Connect(source, target)) + .unwrap(); + // wait for the `Connected` event loop { let msg = from_worker_recv + .lock() .recv() .map_err(|e| PortError::Other(e.to_string()))?; use ZmqOutputPortEvent::*; @@ -384,6 +565,114 @@ impl Transport for ZmqTransport { } } +fn handle_zmq_msg( + msg: ZmqMessage, + outputs: &RwLock>>, + inputs: &RwLock>>, +) -> Result<(), Error> { + let Ok(event) = ZmqTransportEvent::try_from(msg) else { + todo!(); + }; + + use ZmqTransportEvent::*; + match event { + // input ports + Connect(_, input_port_id) => { + let inputs = inputs.read(); + let Some(input) = inputs.get(&input_port_id) else { + todo!(); + }; + let input = input.read(); + + use ZmqInputPortState::*; + match &*input { + Closed => todo!(), + Open(sender, _) | Connected(sender, _, _) => { + sender.send(event).unwrap(); + } + }; + } + Message(output_port_id, input_port_id, _, _) => { + let inputs = inputs.read(); + let Some(input) = inputs.get(&input_port_id) else { + todo!(); + }; + + let input = input.read(); + let ZmqInputPortState::Connected(sender, _, ids) = &*input else { + todo!(); + }; + + // TODO: probably move to ports worker? no sense having here + if !ids.iter().any(|&id| id == output_port_id) { + todo!(); + } + + sender.send(event).unwrap(); + } + CloseOutput(_, input_port_id) => { + let inputs = inputs.read(); + let Some(input) = inputs.get(&input_port_id) else { + todo!(); + }; + let input = input.read(); + + use ZmqInputPortState::*; + match &*input { + Closed => todo!(), + Open(sender, _) | Connected(sender, _, _) => { + sender.send(event).unwrap(); + } + }; + } + + // output ports + AckConnection(output_port_id, _) => { + let outputs = outputs.read(); + let Some(output) = outputs.get(&output_port_id) else { + todo!(); + }; + let output = output.read(); + + let ZmqOutputPortState::Open(sender) = &*output else { + todo!(); + }; + sender.send(event).unwrap(); + } + AckMessage(output_port_id, _, _) => { + let outputs = outputs.read(); + let Some(output) = outputs.get(&output_port_id) else { + todo!(); + }; + let output = output.read(); + + let ZmqOutputPortState::Connected(sender, _, _) = &*output else { + todo!(); + }; + sender.send(event).unwrap(); + } + CloseInput(input_port_id) => { + let outputs = outputs.read(); + + for (_, state) in outputs.iter() { + let state = state.read(); + + let ZmqOutputPortState::Connected(sender, _, id) = &*state else { + todo!(); + }; + + if *id != input_port_id { + todo!(); + } + + sender.send(event.clone()).unwrap(); + } + } + } + + Ok(()) +} + #[cfg(test)] mod tests { use super::*; From 44f4eeee22ca65b551b385d6bfb13bbdfc695d1e Mon Sep 17 00:00:00 2001 From: Samuel Sarle Date: Fri, 29 Nov 2024 08:36:26 +0200 Subject: [PATCH 12/17] Add draft implementations for the public `send` and `recv` --- lib/protoflow-zeromq/src/lib.rs | 61 ++++++++++++++++++++++++++++----- 1 file changed, 52 insertions(+), 9 deletions(-) diff --git a/lib/protoflow-zeromq/src/lib.rs b/lib/protoflow-zeromq/src/lib.rs index cb7ca70b..ff33ff7b 100644 --- a/lib/protoflow-zeromq/src/lib.rs +++ b/lib/protoflow-zeromq/src/lib.rs @@ -478,10 +478,10 @@ impl Transport for ZmqTransport { // 2.2. Resend on timeout // 3. Send disconnect events { - //let output_state = output_state.clone(); let to_worker_send = to_worker_send.clone(); let from_worker_recv = from_worker_recv.clone(); + let out_queue = self.out_queue.clone(); let outputs = self.outputs.clone(); tokio::task::spawn(async move { @@ -489,6 +489,11 @@ impl Transport for ZmqTransport { // connect loop loop { + // send request to connect + out_queue + .send(ZmqTransportEvent::Connect(source, target)) + .unwrap(); + let request = input.recv().expect("output worker recv"); match request { ZmqTransportEvent::AckConnection(_, input_port_id) => { @@ -509,10 +514,13 @@ impl Transport for ZmqTransport { } } - // work loop + // work loop for handling events loop { use ZmqTransportEvent::*; let event = input.recv().expect("output worker recv"); + if !matches!(event, Message(_, _, _, _)) { + unreachable!("why are we getting non-Message?"); + } match event { AckMessage(output_port_id, input_port_id, seq_id) => { output @@ -533,11 +541,6 @@ impl Transport for ZmqTransport { }); } - // send request to connect - self.out_queue - .send(ZmqTransportEvent::Connect(source, target)) - .unwrap(); - // wait for the `Connected` event loop { let msg = from_worker_recv @@ -553,11 +556,51 @@ impl Transport for ZmqTransport { } fn send(&self, output: OutputPortID, message: Bytes) -> PortResult<()> { - todo!(); + let outputs = self.outputs.read(); + let Some(output) = outputs.get(&output) else { + return Err(PortError::Invalid(output.into())); + }; + let output = output.read(); + + let ZmqOutputPortState::Connected(sender, receiver, input_port_id) = &*output else { + return Err(PortError::Disconnected); + }; + + sender.send(message).unwrap(); + + loop { + let msg = receiver.lock().recv().unwrap(); + + use ZmqOutputPortEvent::*; + match msg { + Ack(_seq_id) => break Ok(()), + _ => continue, // TODO + } + } } fn recv(&self, input: InputPortID) -> PortResult> { - todo!(); + let inputs = self.inputs.read(); + let Some(input) = inputs.get(&input) else { + return Err(PortError::Invalid(input.into())); + }; + let input = input.read(); + + let ZmqInputPortState::Connected(_, receiver, _) = &*input else { + return Err(PortError::Disconnected); + }; + + loop { + use ZmqInputPortEvent::*; + match receiver.lock().recv() { + // ignore + Ok(Opened) | Ok(Connected(_)) => continue, + + Ok(Closed) => break Ok(None), // EOS + Ok(Message(bytes)) => break Ok(Some(bytes)), + Err(e) => break Err(PortError::Other(e.to_string())), + } + } } fn try_recv(&self, _input: InputPortID) -> PortResult> { From 052db1348b06d2c5405b2080294eef0b4563cc06 Mon Sep 17 00:00:00 2001 From: Samuel Sarle Date: Fri, 29 Nov 2024 12:27:55 +0200 Subject: [PATCH 13/17] Add separate ports for requests from public API --- lib/protoflow-zeromq/src/lib.rs | 143 ++++++++++++++++++++++---------- 1 file changed, 101 insertions(+), 42 deletions(-) diff --git a/lib/protoflow-zeromq/src/lib.rs b/lib/protoflow-zeromq/src/lib.rs index ff33ff7b..3603230d 100644 --- a/lib/protoflow-zeromq/src/lib.rs +++ b/lib/protoflow-zeromq/src/lib.rs @@ -39,10 +39,14 @@ pub struct ZmqTransport { #[derive(Debug, Clone)] enum ZmqOutputPortState { - Open(Arc>), + Open(Arc>)>>), Connected( + // channels for public send, contained channel is for the ack back + Arc)>>, + // internal channels for events Arc>, Arc>>, + // id of the connected input port InputPortID, ), Closed, @@ -53,7 +57,7 @@ impl ZmqOutputPortState { use ZmqOutputPortState::*; match self { Open(_) => PortState::Open, - Connected(_, _, _) => PortState::Connected, + Connected(_, _, _, _) => PortState::Connected, Closed => PortState::Closed, } } @@ -66,8 +70,13 @@ enum ZmqInputPortState { Arc>>, ), Connected( + // channels for the public recv + Arc>, + Arc>>, + // internal channels for events Arc>, Arc>>, + // vec of the connected port ids Vec, ), Closed, @@ -78,7 +87,7 @@ impl ZmqInputPortState { use ZmqInputPortState::*; match self { Open(_, _) => PortState::Open, - Connected(_, _, _) => PortState::Connected, + Connected(_, _, _, _, _) => PortState::Connected, Closed => PortState::Closed, } } @@ -318,8 +327,15 @@ impl ZmqTransport { todo!(); }; + let (msgs_send, msgs_recv) = sync_channel(1); + + let msgs_send = Arc::new(msgs_send); + let msgs_recv = Arc::new(Mutex::new(msgs_recv)); + let mut input = input.write(); *input = ZmqInputPortState::Connected( + msgs_send, + msgs_recv, to_worker_send.clone(), from_worker_recv.clone(), Vec::new(), @@ -327,7 +343,23 @@ impl ZmqTransport { } // Message from output port - Message(output_port_id, input_port_id, _, bytes) => todo!(), + Message(output_port_id, input_port_id, _, bytes) => { + let inputs = inputs.read(); + let Some(input) = inputs.get(&input_port_id) else { + todo!(); + }; + + let input = input.read(); + use ZmqInputPortState::*; + match &*input { + Open(arc, arc1) => todo!(), + Closed => todo!(), + Connected(sender, _, _, _, _) => { + sender.send(ZmqInputPortEvent::Message(bytes)).unwrap() + } + }; + } + // Output port reports being closed CloseInput(input_port_id) => todo!(), @@ -405,7 +437,7 @@ impl Transport for ZmqTransport { let state = state.read(); - let ZmqInputPortState::Connected(sender, receiver, _) = &*state else { + let ZmqInputPortState::Connected(_, _, sender, receiver, _) = &*state else { return Err(PortError::Disconnected); }; @@ -435,7 +467,7 @@ impl Transport for ZmqTransport { let state = state.write(); - let ZmqOutputPortState::Connected(sender, receiver, input) = &*state else { + let ZmqOutputPortState::Connected(_, sender, receiver, input) = &*state else { return Err(PortError::Disconnected); }; @@ -471,6 +503,10 @@ impl Transport for ZmqTransport { let to_worker_send = Arc::new(to_worker_send); let from_worker_recv = Arc::new(Mutex::new(from_worker_recv)); + let (msg_req_send, msg_req_recv) = sync_channel(1); + let msg_req_send = Arc::new(msg_req_send); + let msg_req_recv = Arc::new(Mutex::new(msg_req_recv)); + // Output worker loop: // 1. Send connection attempt // 2. Send messages @@ -504,6 +540,7 @@ impl Transport for ZmqTransport { let mut output_state = output_state.write(); debug_assert!(matches!(*output_state, ZmqOutputPortState::Open(_))); *output_state = ZmqOutputPortState::Connected( + msg_req_send, to_worker_send, from_worker_recv, input_port_id, @@ -514,30 +551,57 @@ impl Transport for ZmqTransport { } } - // work loop for handling events - loop { - use ZmqTransportEvent::*; - let event = input.recv().expect("output worker recv"); - if !matches!(event, Message(_, _, _, _)) { - unreachable!("why are we getting non-Message?"); + // work loop for sending events + tokio::task::spawn(async move { + let mut seq_id = 1; + loop { + let req = msg_req_recv.lock().recv().expect("output worker req recv"); + + let outputs = outputs.read(); + let Some(output_state) = outputs.get(&source) else { + todo!(); + }; + + let ZmqOutputPortState::Connected(ack_send, sender, _, _) = + &*output_state.read() + else { + todo!(); + }; + + sender + .send(ZmqTransportEvent::Message(source, target, seq_id, req.0)) + .unwrap(); + + seq_id += 1; } - match event { - AckMessage(output_port_id, input_port_id, seq_id) => { - output - .send(ZmqOutputPortEvent::Ack(seq_id)) - .expect("worker loop ack send"); + }); + + // work loop for handling events + tokio::task::spawn(async move { + loop { + use ZmqTransportEvent::*; + let event = input.recv().expect("output worker event recv"); + if !matches!(event, Message(_, _, _, _)) { + unreachable!("why are we getting non-Message?"); } + match event { + AckMessage(output_port_id, input_port_id, seq_id) => { + output + .send(ZmqOutputPortEvent::Ack(seq_id)) + .expect("worker loop ack send"); + } - CloseInput(input_port_id) => todo!(), + CloseInput(input_port_id) => todo!(), - AckConnection(_, _) => { - unreachable!("already connected") - } + AckConnection(_, _) => { + unreachable!("already connected") + } - // ignore input port type events - Connect(_, _) | CloseOutput(_, _) | Message(_, _, _, _) => continue, // TODO + // ignore input port type events + Connect(_, _) | CloseOutput(_, _) | Message(_, _, _, _) => continue, // TODO + } } - } + }); }); } @@ -562,21 +626,16 @@ impl Transport for ZmqTransport { }; let output = output.read(); - let ZmqOutputPortState::Connected(sender, receiver, input_port_id) = &*output else { + let ZmqOutputPortState::Connected(sender, _, _, _) = &*output else { return Err(PortError::Disconnected); }; - sender.send(message).unwrap(); + let (ack_send, ack_recv) = sync_channel(1); - loop { - let msg = receiver.lock().recv().unwrap(); + sender.send((message, ack_send)).unwrap(); - use ZmqOutputPortEvent::*; - match msg { - Ack(_seq_id) => break Ok(()), - _ => continue, // TODO - } - } + ack_recv.recv().unwrap(); + Ok(()) } fn recv(&self, input: InputPortID) -> PortResult> { @@ -586,7 +645,7 @@ impl Transport for ZmqTransport { }; let input = input.read(); - let ZmqInputPortState::Connected(_, receiver, _) = &*input else { + let ZmqInputPortState::Connected(_, receiver, _, _, _) = &*input else { return Err(PortError::Disconnected); }; @@ -630,19 +689,19 @@ fn handle_zmq_msg( use ZmqInputPortState::*; match &*input { Closed => todo!(), - Open(sender, _) | Connected(sender, _, _) => { + Open(sender, _) | Connected(_, _, sender, _, _) => { sender.send(event).unwrap(); } }; } - Message(output_port_id, input_port_id, _, _) => { + Message(output_port_id, input_port_id, _, bytes) => { let inputs = inputs.read(); let Some(input) = inputs.get(&input_port_id) else { todo!(); }; let input = input.read(); - let ZmqInputPortState::Connected(sender, _, ids) = &*input else { + let ZmqInputPortState::Connected(sender, _, _, _, ids) = &*input else { todo!(); }; @@ -651,7 +710,7 @@ fn handle_zmq_msg( todo!(); } - sender.send(event).unwrap(); + sender.send(ZmqInputPortEvent::Message(bytes)).unwrap(); } CloseOutput(_, input_port_id) => { let inputs = inputs.read(); @@ -663,7 +722,7 @@ fn handle_zmq_msg( use ZmqInputPortState::*; match &*input { Closed => todo!(), - Open(sender, _) | Connected(sender, _, _) => { + Open(sender, _) | Connected(_, _, sender, _, _) => { sender.send(event).unwrap(); } }; @@ -689,7 +748,7 @@ fn handle_zmq_msg( }; let output = output.read(); - let ZmqOutputPortState::Connected(sender, _, _) = &*output else { + let ZmqOutputPortState::Connected(_, sender, _, _) = &*output else { todo!(); }; sender.send(event).unwrap(); @@ -700,7 +759,7 @@ fn handle_zmq_msg( for (_, state) in outputs.iter() { let state = state.read(); - let ZmqOutputPortState::Connected(sender, _, id) = &*state else { + let ZmqOutputPortState::Connected(_, sender, _, id) = &*state else { todo!(); }; From 3df484433f2f816156dcf9167ae44345da8e0e8b Mon Sep 17 00:00:00 2001 From: Samuel Sarle Date: Fri, 29 Nov 2024 13:26:35 +0200 Subject: [PATCH 14/17] Separate port channels further --- lib/protoflow-zeromq/src/lib.rs | 147 ++++++++++++++++++++------------ 1 file changed, 93 insertions(+), 54 deletions(-) diff --git a/lib/protoflow-zeromq/src/lib.rs b/lib/protoflow-zeromq/src/lib.rs index 3603230d..10e80f46 100644 --- a/lib/protoflow-zeromq/src/lib.rs +++ b/lib/protoflow-zeromq/src/lib.rs @@ -30,7 +30,7 @@ const DEFAULT_SUB_SOCKET: &str = "tcp://127.0.0.1:10001"; pub struct ZmqTransport { tokio: tokio::runtime::Handle, - out_queue: Arc>, + pub_queue: Arc>, sub_queue: Arc>, outputs: Arc>>>, @@ -42,7 +42,7 @@ enum ZmqOutputPortState { Open(Arc>)>>), Connected( // channels for public send, contained channel is for the ack back - Arc)>>, + Arc>)>>, // internal channels for events Arc>, Arc>>, @@ -52,6 +52,12 @@ enum ZmqOutputPortState { Closed, } +#[derive(Debug, Clone)] +enum ZmqOutputPortRequest { + Close, + Send(Bytes), +} + impl ZmqOutputPortState { fn state(&self) -> PortState { use ZmqOutputPortState::*; @@ -66,10 +72,14 @@ impl ZmqOutputPortState { #[derive(Debug, Clone)] enum ZmqInputPortState { Open( + // TODO: hide these Arc>, Arc>>, ), Connected( + // channels for requests from public close + Arc>)>>, + Arc>)>>>, // channels for the public recv Arc>, Arc>>, @@ -82,12 +92,17 @@ enum ZmqInputPortState { Closed, } +#[derive(Debug, Clone)] +enum ZmqInputPortRequest { + Close, +} + impl ZmqInputPortState { fn state(&self) -> PortState { use ZmqInputPortState::*; match self { Open(_, _) => PortState::Open, - Connected(_, _, _, _, _) => PortState::Connected, + Connected(_, _, _, _, _, _, _) => PortState::Connected, Closed => PortState::Closed, } } @@ -130,13 +145,14 @@ impl From for ZmqMessage { let mut msg = ZmqMessage::from(topic.clone()); // second frame of the message is the payload + use ZmqTransportEvent::*; match value { - ZmqTransportEvent::Connect(output_port_id, input_port_id) => todo!(), - ZmqTransportEvent::AckConnection(output_port_id, input_port_id) => todo!(), - ZmqTransportEvent::Message(_, _, _, bytes) => msg.push_back(bytes), - ZmqTransportEvent::AckMessage(output_port_id, input_port_id, _) => todo!(), - ZmqTransportEvent::CloseOutput(output_port_id, input_port_id) => todo!(), - ZmqTransportEvent::CloseInput(input_port_id) => todo!(), + Connect(output_port_id, input_port_id) => todo!(), + AckConnection(output_port_id, input_port_id) => todo!(), + Message(_, _, _, bytes) => msg.push_back(bytes), + AckMessage(output_port_id, input_port_id, _) => todo!(), + CloseOutput(output_port_id, input_port_id) => todo!(), + CloseInput(input_port_id) => todo!(), }; msg @@ -221,7 +237,7 @@ impl ZmqTransport { let sub_queue = Arc::new(sub_queue); let transport = Self { - out_queue, + pub_queue: out_queue, sub_queue, tokio, outputs, @@ -239,25 +255,21 @@ impl ZmqTransport { let mut psock = psock; tokio::task::spawn(async move { - loop { - let Ok(event) = queue.recv() else { - continue; - }; - - let msg = ZmqMessage::from(event); - - tokio.block_on(psock.send(msg)).expect("zmq send worker") - } + queue + .into_iter() + .map(ZmqMessage::from) + .try_for_each(|msg| tokio.block_on(psock.send(msg))) + .expect("zmq send worker") }); } fn start_recv_worker( &self, ssock: zeromq::SubSocket, - queue: tokio::sync::mpsc::Receiver, + sub_queue: tokio::sync::mpsc::Receiver, ) { let mut ssock = ssock; - let mut queue = queue; + let mut sub_queue = sub_queue; let outputs = self.outputs.clone(); let inputs = self.inputs.clone(); @@ -266,7 +278,7 @@ impl ZmqTransport { loop { tokio::select! { Ok(msg) = ssock.recv() => handle_zmq_msg(msg, &outputs, &inputs).unwrap(), - Some(req) = queue.recv() => { + Some(req) = sub_queue.recv() => { use ZmqSubscriptionRequest::*; match req { Subscribe(topic) => ssock.subscribe(&topic).await.expect("zmq recv worker subscribe"), @@ -321,12 +333,16 @@ impl ZmqTransport { let event = input.recv().expect("input worker recv"); match event { // Connection attempt - Connect(output_port_id, input_port_id) => { + Connect(_, input_port_id) => { let inputs = inputs.read(); let Some(input) = inputs.get(&input_port_id) else { todo!(); }; + let (req_send, req_recv) = sync_channel(1); + let req_send = Arc::new(req_send); + let req_recv = Arc::new(Mutex::new(req_recv)); + let (msgs_send, msgs_recv) = sync_channel(1); let msgs_send = Arc::new(msgs_send); @@ -334,6 +350,8 @@ impl ZmqTransport { let mut input = input.write(); *input = ZmqInputPortState::Connected( + req_send, + req_recv, msgs_send, msgs_recv, to_worker_send.clone(), @@ -343,7 +361,7 @@ impl ZmqTransport { } // Message from output port - Message(output_port_id, input_port_id, _, bytes) => { + Message(_, input_port_id, _, bytes) => { let inputs = inputs.read(); let Some(input) = inputs.get(&input_port_id) else { todo!(); @@ -352,16 +370,31 @@ impl ZmqTransport { let input = input.read(); use ZmqInputPortState::*; match &*input { - Open(arc, arc1) => todo!(), + Open(_, _) => todo!(), Closed => todo!(), - Connected(sender, _, _, _, _) => { + Connected(_, _, sender, _, _, _, _) => { sender.send(ZmqInputPortEvent::Message(bytes)).unwrap() } }; } // Output port reports being closed - CloseInput(input_port_id) => todo!(), + CloseInput(input_port_id) => { + let inputs = inputs.read(); + let Some(input) = inputs.get(&input_port_id) else { + todo!(); + }; + + let input = input.read(); + use ZmqInputPortState::*; + match &*input { + Open(_, _) => todo!(), + Closed => todo!(), + Connected(_, _, sender, _, _, _, _) => { + sender.send(ZmqInputPortEvent::Closed).unwrap() + } + }; + } // ignore output port type events: AckConnection(_, _) | AckMessage(_, _, _) | CloseOutput(_, _) => continue, @@ -437,25 +470,20 @@ impl Transport for ZmqTransport { let state = state.read(); - let ZmqInputPortState::Connected(_, _, sender, receiver, _) = &*state else { + let ZmqInputPortState::Connected(sender, _, _, _, _, _, _) = &*state else { return Err(PortError::Disconnected); }; + let (close_send, close_recv) = sync_channel(1); + sender - .send(ZmqTransportEvent::CloseInput(input)) + .send((ZmqInputPortRequest::Close, close_send)) .map_err(|e| PortError::Other(e.to_string()))?; - loop { - let msg = receiver - .lock() - .recv() - .map_err(|e| PortError::Other(e.to_string()))?; - use ZmqInputPortEvent::*; - match msg { - Closed => break Ok(true), - _ => continue, // TODO - }; - } + close_recv + .recv() + .map_err(|_| PortError::Disconnected)? + .map(|_| true) } fn close_output(&self, output: OutputPortID) -> PortResult { @@ -517,7 +545,7 @@ impl Transport for ZmqTransport { let to_worker_send = to_worker_send.clone(); let from_worker_recv = from_worker_recv.clone(); - let out_queue = self.out_queue.clone(); + let pub_queue = self.pub_queue.clone(); let outputs = self.outputs.clone(); tokio::task::spawn(async move { @@ -526,7 +554,7 @@ impl Transport for ZmqTransport { // connect loop loop { // send request to connect - out_queue + pub_queue .send(ZmqTransportEvent::Connect(source, target)) .unwrap(); @@ -551,6 +579,8 @@ impl Transport for ZmqTransport { } } + // TODO: combine these two spawns by using tokio's channels and `select!` + // work loop for sending events tokio::task::spawn(async move { let mut seq_id = 1; @@ -562,17 +592,25 @@ impl Transport for ZmqTransport { todo!(); }; - let ZmqOutputPortState::Connected(ack_send, sender, _, _) = + let ZmqOutputPortState::Connected(ack_send, sender, _, output_id) = &*output_state.read() else { todo!(); }; - sender - .send(ZmqTransportEvent::Message(source, target, seq_id, req.0)) - .unwrap(); + let resp = req.1; // TODO: respond - seq_id += 1; + match req.0 { + ZmqOutputPortRequest::Send(bytes) => { + sender + .send(ZmqTransportEvent::Message(source, target, seq_id, bytes)) + .unwrap(); + seq_id += 1; + } + ZmqOutputPortRequest::Close => sender + .send(ZmqTransportEvent::CloseOutput(source, *output_id)) + .unwrap(), + }; } }); @@ -632,10 +670,11 @@ impl Transport for ZmqTransport { let (ack_send, ack_recv) = sync_channel(1); - sender.send((message, ack_send)).unwrap(); + sender + .send((ZmqOutputPortRequest::Send(message), ack_send)) + .unwrap(); - ack_recv.recv().unwrap(); - Ok(()) + ack_recv.recv().map_err(|_| PortError::Disconnected)? } fn recv(&self, input: InputPortID) -> PortResult> { @@ -645,7 +684,7 @@ impl Transport for ZmqTransport { }; let input = input.read(); - let ZmqInputPortState::Connected(_, receiver, _, _, _) = &*input else { + let ZmqInputPortState::Connected(_, _, _, receiver, _, _, _) = &*input else { return Err(PortError::Disconnected); }; @@ -689,7 +728,7 @@ fn handle_zmq_msg( use ZmqInputPortState::*; match &*input { Closed => todo!(), - Open(sender, _) | Connected(_, _, sender, _, _) => { + Open(sender, _) | Connected(_, _, _, _, sender, _, _) => { sender.send(event).unwrap(); } }; @@ -701,7 +740,7 @@ fn handle_zmq_msg( }; let input = input.read(); - let ZmqInputPortState::Connected(sender, _, _, _, ids) = &*input else { + let ZmqInputPortState::Connected(_, _, sender, _, _, _, ids) = &*input else { todo!(); }; @@ -722,7 +761,7 @@ fn handle_zmq_msg( use ZmqInputPortState::*; match &*input { Closed => todo!(), - Open(sender, _) | Connected(_, _, sender, _, _) => { + Open(sender, _) | Connected(_, _, _, _, sender, _, _) => { sender.send(event).unwrap(); } }; From e900e29a883d58c7ff020de47f2fd6bd9d6dfde3 Mon Sep 17 00:00:00 2001 From: Samuel Sarle Date: Fri, 29 Nov 2024 13:33:48 +0200 Subject: [PATCH 15/17] Send event when output connects --- lib/protoflow-zeromq/src/lib.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/lib/protoflow-zeromq/src/lib.rs b/lib/protoflow-zeromq/src/lib.rs index 10e80f46..220015e9 100644 --- a/lib/protoflow-zeromq/src/lib.rs +++ b/lib/protoflow-zeromq/src/lib.rs @@ -573,6 +573,9 @@ impl Transport for ZmqTransport { from_worker_recv, input_port_id, ); + output + .send(ZmqOutputPortEvent::Opened) + .expect("output worker connected send"); break; } _ => continue, // TODO: when and why would we receive other events? From a8a050a223f13095e04bc4271f84b334541a187c Mon Sep 17 00:00:00 2001 From: Samuel Sarle Date: Fri, 29 Nov 2024 13:34:41 +0200 Subject: [PATCH 16/17] Silence unused-var warnings --- lib/protoflow-zeromq/src/lib.rs | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/lib/protoflow-zeromq/src/lib.rs b/lib/protoflow-zeromq/src/lib.rs index 220015e9..52834dbe 100644 --- a/lib/protoflow-zeromq/src/lib.rs +++ b/lib/protoflow-zeromq/src/lib.rs @@ -147,12 +147,12 @@ impl From for ZmqMessage { // second frame of the message is the payload use ZmqTransportEvent::*; match value { - Connect(output_port_id, input_port_id) => todo!(), - AckConnection(output_port_id, input_port_id) => todo!(), + Connect(_, _) => todo!(), + AckConnection(_, _) => todo!(), Message(_, _, _, bytes) => msg.push_back(bytes), - AckMessage(output_port_id, input_port_id, _) => todo!(), - CloseOutput(output_port_id, input_port_id) => todo!(), - CloseInput(input_port_id) => todo!(), + AckMessage(_, _, _) => todo!(), + CloseOutput(_, _) => todo!(), + CloseInput(_) => todo!(), }; msg @@ -162,7 +162,7 @@ impl From for ZmqMessage { impl TryFrom for ZmqTransportEvent { type Error = protoflow_core::DecodeError; - fn try_from(value: ZmqMessage) -> Result { + fn try_from(_value: ZmqMessage) -> Result { todo!() } } @@ -326,7 +326,7 @@ impl ZmqTransport { let from_worker_recv = from_worker_recv.clone(); tokio::task::spawn(async move { - let (output, input) = (from_worker_send, to_worker_recv); + let (_output, input) = (from_worker_send, to_worker_recv); loop { use ZmqTransportEvent::*; @@ -595,13 +595,13 @@ impl Transport for ZmqTransport { todo!(); }; - let ZmqOutputPortState::Connected(ack_send, sender, _, output_id) = + let ZmqOutputPortState::Connected(_, sender, _, output_id) = &*output_state.read() else { todo!(); }; - let resp = req.1; // TODO: respond + let _resp = req.1; // TODO: respond match req.0 { ZmqOutputPortRequest::Send(bytes) => { @@ -626,13 +626,13 @@ impl Transport for ZmqTransport { unreachable!("why are we getting non-Message?"); } match event { - AckMessage(output_port_id, input_port_id, seq_id) => { + AckMessage(_, _, seq_id) => { output .send(ZmqOutputPortEvent::Ack(seq_id)) .expect("worker loop ack send"); } - CloseInput(input_port_id) => todo!(), + CloseInput(_) => todo!(), AckConnection(_, _) => { unreachable!("already connected") From 053e5dd715710971f728e224cc66ea78e38852d0 Mon Sep 17 00:00:00 2001 From: Samuel Sarle Date: Fri, 29 Nov 2024 14:01:18 +0200 Subject: [PATCH 17/17] Partially refactor input port opening --- lib/protoflow-zeromq/src/lib.rs | 98 +++++++++++++++++++++++++++------ 1 file changed, 81 insertions(+), 17 deletions(-) diff --git a/lib/protoflow-zeromq/src/lib.rs b/lib/protoflow-zeromq/src/lib.rs index 52834dbe..6797b8cb 100644 --- a/lib/protoflow-zeromq/src/lib.rs +++ b/lib/protoflow-zeromq/src/lib.rs @@ -9,7 +9,7 @@ pub use protoflow_core::prelude; extern crate std; use protoflow_core::{ - prelude::{Arc, BTreeMap, Bytes, String, ToString, Vec}, + prelude::{vec, Arc, BTreeMap, Bytes, String, ToString, Vec}, InputPortID, OutputPortID, PortError, PortResult, PortState, Transport, }; @@ -74,7 +74,7 @@ enum ZmqInputPortState { Open( // TODO: hide these Arc>, - Arc>>, + Arc>>, ), Connected( // channels for requests from public close @@ -85,7 +85,7 @@ enum ZmqInputPortState { Arc>>, // internal channels for events Arc>, - Arc>>, + Arc>>, // vec of the connected port ids Vec, ), @@ -405,6 +405,83 @@ impl ZmqTransport { Ok((to_worker_send, from_worker_recv)) } + + fn start_input_worker(&self, input_port_id: InputPortID) -> Result<(), PortError> { + let topic = format!("{}:", input_port_id); + + let (to_worker_send, to_worker_recv) = sync_channel(1); + let to_worker_send = Arc::new(to_worker_send); + let to_worker_recv = Arc::new(Mutex::new(to_worker_recv)); + + { + let mut inputs = self.inputs.write(); + let state = ZmqInputPortState::Open(to_worker_send.clone(), to_worker_recv.clone()); + let state = RwLock::new(state); + inputs.insert(input_port_id, state); + } + + let inputs = self.inputs.clone(); + tokio::task::spawn(async move { + let input = &to_worker_recv; + + let inputs = inputs; + + loop { + let event: ZmqTransportEvent = input.lock().recv().expect("input worker recv"); + use ZmqTransportEvent::*; + match event { + Connect(output_port_id, input_port_id) => { + let inputs = inputs.read(); + let Some(input_state) = inputs.get(&input_port_id) else { + todo!(); + }; + let input_state = input_state.write(); + + use ZmqInputPortState::*; + match &*input_state { + Open(_, _) => { + let (req_send, req_recv) = sync_channel(1); + let req_send = Arc::new(req_send); + let req_recv = Arc::new(Mutex::new(req_recv)); + + let (msgs_send, msgs_recv) = sync_channel(1); + + let msgs_send = Arc::new(msgs_send); + let msgs_recv = Arc::new(Mutex::new(msgs_recv)); + + let mut input_state = input_state; + + *input_state = ZmqInputPortState::Connected( + req_send, + req_recv, + msgs_send, + msgs_recv, + to_worker_send.clone(), + input.clone(), + vec![output_port_id], + ); + } + Connected(_, _, _, _, _, _, _) => todo!(), + Closed => todo!(), + } + } + AckConnection(output_port_id, input_port_id) => todo!(), + Message(output_port_id, input_port_id, _, bytes) => todo!(), + AckMessage(output_port_id, input_port_id, _) => todo!(), + CloseOutput(output_port_id, input_port_id) => todo!(), + CloseInput(input_port_id) => todo!(), + }; + } + }); + + // send sub request + self.tokio + .block_on( + self.sub_queue + .send(ZmqSubscriptionRequest::Subscribe(topic)), + ) + .map_err(|e| PortError::Other(e.to_string())) + } } impl Transport for ZmqTransport { @@ -430,20 +507,7 @@ impl Transport for ZmqTransport { let new_id = InputPortID::try_from(-(inputs.len() as isize + 1)) .map_err(|e| PortError::Other(e.to_string()))?; - let (_, receiver) = self - .subscribe_for_input_port(new_id) - .map_err(|e| PortError::Other(e.to_string()))?; - - loop { - let msg = receiver - .lock() - .recv() - .map_err(|e| PortError::Other(e.to_string()))?; - match msg { - ZmqInputPortEvent::Opened => break Ok(new_id), - _ => continue, // TODO - } - } + self.start_input_worker(new_id).map(|_| new_id) } fn open_output(&self) -> PortResult {