Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement the ZeroMQ transport #27

Open
wants to merge 64 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
e8d9b83
ZMQ skeleton
SamuelSarle Nov 16, 2024
838bdeb
Add rudimentary `send` and `recv`
SamuelSarle Nov 18, 2024
8cd9b4f
wip
SamuelSarle Nov 19, 2024
d3d5c6d
Reset impl again
SamuelSarle Nov 25, 2024
e7496c2
Add topics
SamuelSarle Nov 25, 2024
d812570
Add port operation skeletons
SamuelSarle Nov 25, 2024
56e063f
Add port worker skeletons
SamuelSarle Nov 26, 2024
ea1d54d
Add ZMQ socket workers for listening and sending
SamuelSarle Nov 26, 2024
00eef9b
Add draft implementations for the public `send` and `recv`
SamuelSarle Nov 29, 2024
a65edd1
Add separate ports for requests from public API
SamuelSarle Nov 29, 2024
264414c
Separate port channels further
SamuelSarle Nov 29, 2024
03145ab
Send event when output connects
SamuelSarle Nov 29, 2024
33431fd
Silence unused-var warnings
SamuelSarle Nov 29, 2024
0f5c529
Partially refactor input port opening
SamuelSarle Nov 29, 2024
b19ce07
Finish input worker refactor
SamuelSarle Nov 29, 2024
ffb39a3
Send connection ack from input port
SamuelSarle Nov 29, 2024
42816de
Partially refactor output port worker
SamuelSarle Nov 29, 2024
a532995
Reimplement public `connect` method
SamuelSarle Nov 29, 2024
8c9fe14
Remove unused channels from InputPortState
SamuelSarle Nov 29, 2024
b5f13c3
Remove needless `Arc<Mutex<_>>`
SamuelSarle Nov 29, 2024
90713cb
Simplify sub socket worker
SamuelSarle Nov 29, 2024
2d146b6
Handle events from socket in input port worker
SamuelSarle Nov 29, 2024
a0617da
Begin transition from std::sync:mpsc to tokio::sync:mpsc
SamuelSarle Nov 29, 2024
9045d93
Send InputPortEvent::Closed
SamuelSarle Nov 29, 2024
c927e03
Remove `Arc`s
SamuelSarle Nov 29, 2024
9bd59b2
Report port closure from output worker
SamuelSarle Nov 29, 2024
ceddf93
Remove unused port event enum fields
SamuelSarle Nov 29, 2024
0e6a50b
Refactor input port worker
SamuelSarle Nov 29, 2024
561a8e1
Send sub and unsub topics
SamuelSarle Nov 29, 2024
d2a930c
Simplify input worker's inner fn signature
SamuelSarle Nov 29, 2024
3d8d714
Add serialization layer
SamuelSarle Nov 29, 2024
3bfdafd
Change to tokio's mutices
SamuelSarle Dec 2, 2024
2fc2154
Use await inside async block
SamuelSarle Dec 2, 2024
50b06cb
Add test for sending and receiving
SamuelSarle Dec 2, 2024
dbbf11d
Fix message sending: first working version 🎉
SamuelSarle Dec 2, 2024
386f233
Unsubscribe from topics when input closes
SamuelSarle Dec 2, 2024
33fb3de
Fix deadlock when closing ports
SamuelSarle Dec 2, 2024
1520e2b
Implement `try_recv`
SamuelSarle Dec 2, 2024
9fc2a26
Remove explicit `drop`s
SamuelSarle Dec 2, 2024
be6afca
Rename protobuf file
SamuelSarle Dec 2, 2024
3f3d5a0
Consume response channel in output worker
SamuelSarle Dec 2, 2024
93dc100
Add tracing
SamuelSarle Dec 2, 2024
24cd68e
Move InputPort to it's own module
SamuelSarle Dec 2, 2024
b2c4844
Move OutputPort to it's own module
SamuelSarle Dec 2, 2024
e141688
Move the socket helpers and messages to their own modules
SamuelSarle Dec 2, 2024
1ae285f
Refactor topic subscriptions
SamuelSarle Dec 2, 2024
3a4e836
Refactor output port worker
SamuelSarle Dec 3, 2024
a9e9ffc
Refactor input port worker
SamuelSarle Dec 3, 2024
7961ea0
Refactor socket worker
SamuelSarle Dec 3, 2024
33528bc
Remove `todo!` from message parsing
SamuelSarle Dec 3, 2024
0c34c6a
Make open input ports closable
SamuelSarle Dec 4, 2024
c068ada
Default back to `Open` for disconnected input ports
SamuelSarle Dec 4, 2024
ce2f6ee
Fix input port worker exit
SamuelSarle Dec 4, 2024
896b093
Add trace for output worker exit
SamuelSarle Dec 4, 2024
7f26864
Make open output ports closable
SamuelSarle Dec 4, 2024
2c9289c
Implement message redelivery in output port
SamuelSarle Dec 4, 2024
a994872
Handle message redelivery and re-acknowledgment in input worker
SamuelSarle Dec 4, 2024
af6d207
Shortcut sending data to network if input port is reachable locally
SamuelSarle Dec 4, 2024
a6c4faa
Refactor port event sender access helpers
SamuelSarle Dec 4, 2024
c133cfd
Separate proto and rust files
SamuelSarle Dec 5, 2024
dcbe449
Improve test reliability
SamuelSarle Dec 5, 2024
b09c7f1
Add test for multiple outputs to single input
SamuelSarle Dec 7, 2024
2d26531
Add test for redelivery to input worker
SamuelSarle Dec 7, 2024
9280574
Merge branch 'master' into samuel/impl-zmq-transport
arto-asimov Dec 9, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions lib/protoflow-zeromq/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,25 @@ publish.workspace = true
[features]
default = ["all", "std"]
all = ["tracing"]
std = ["protoflow-core/std", "tracing?/std"] #, "zeromq/default"]
std = ["protoflow-core/std", "tracing?/std"]
tracing = ["protoflow-core/tracing", "dep:tracing"]
unstable = ["protoflow-core/unstable"]

[build-dependencies]
cfg_aliases.workspace = true
prost-build = "0.13.2"

[dependencies]
protoflow-core.workspace = true
tracing = { version = "0.1", default-features = false, optional = true }
#zeromq = { version = "0.4", default-features = false }
zeromq = { version = "0.4.1", default-features = false, features = [
"tokio-runtime",
"all-transport",
] }
tokio = { version = "1.40.0", default-features = false }
prost = "0.13.2"
prost-types = "0.13.2"

[dev-dependencies]
futures-util = "0.3.31"
tracing-subscriber = "0.3.19"
6 changes: 6 additions & 0 deletions lib/protoflow-zeromq/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
use std::io::Result;
fn main() -> Result<()> {
prost_build::Config::default()
.out_dir("src/")
.compile_protos(&["proto/transport_event.proto"], &["proto/"])
}
46 changes: 46 additions & 0 deletions lib/protoflow-zeromq/proto/transport_event.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
syntax = "proto3";

package protoflow.zmq;

message Connect {
int64 output = 1;
int64 input = 2;
}

message AckConnection {
int64 output = 1;
int64 input = 2;
}

message Message {
int64 output = 1;
int64 input = 2;
uint64 sequence = 3;
bytes message = 4;
}

message AckMessage {
int64 output = 1;
int64 input = 2;
uint64 sequence = 3;
}

message CloseOutput {
int64 output = 1;
int64 input = 2;
}

message CloseInput {
int64 input = 1;
}

message Event {
oneof payload {
Connect connect = 1;
AckConnection ack_connection = 2;
Message message = 3;
AckMessage ack_message = 4;
CloseOutput close_output = 5;
CloseInput close_input = 6;
}
}
160 changes: 160 additions & 0 deletions lib/protoflow-zeromq/src/event.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
// This is free and unencumbered software released into the public domain.

use protoflow_core::{
prelude::{Bytes, Vec},
InputPortID, OutputPortID,
};
use zeromq::ZmqMessage;

pub type SequenceID = u64;

/// ZmqTransportEvent represents the data that goes over the wire from one port to another.
#[derive(Clone, Debug, PartialEq)]
pub enum ZmqTransportEvent {
Connect(OutputPortID, InputPortID),
AckConnection(OutputPortID, InputPortID),
Message(OutputPortID, InputPortID, SequenceID, Bytes),
AckMessage(OutputPortID, InputPortID, SequenceID),
CloseOutput(OutputPortID, InputPortID),
CloseInput(InputPortID),
}

impl ZmqTransportEvent {
fn write_topic<W: std::io::Write + ?Sized>(&self, f: &mut W) -> Result<(), std::io::Error> {
use ZmqTransportEvent::*;
match self {
Connect(o, i) => write!(f, "{}:conn:{}", i, o),
AckConnection(o, i) => write!(f, "{}:ackConn:{}", i, o),
Message(o, i, seq, _) => write!(f, "{}:msg:{}:{}", i, o, seq),
AckMessage(o, i, seq) => write!(f, "{}:ackMsg:{}:{}", i, o, seq),
CloseOutput(o, i) => write!(f, "{}:closeOut:{}", i, o),
CloseInput(i) => write!(f, "{}:closeIn", i),
}
}
}

impl From<ZmqTransportEvent> for ZmqMessage {
fn from(value: ZmqTransportEvent) -> Self {
let mut topic = Vec::new();
value.write_topic(&mut topic).unwrap();

// first frame of the message is the topic
let mut msg = ZmqMessage::from(topic);

fn map_id<T>(id: T) -> i64
where
isize: From<T>,
{
isize::from(id) as i64
}

// second frame of the message is the payload
use crate::protoflow_zmq::{self, event::Payload, Event};
use prost::Message;
use ZmqTransportEvent::*;
let payload = match value {
Connect(output, input) => Payload::Connect(protoflow_zmq::Connect {
output: map_id(output),
input: map_id(input),
}),
AckConnection(output, input) => Payload::AckConnection(protoflow_zmq::AckConnection {
output: map_id(output),
input: map_id(input),
}),
Message(output, input, sequence, message) => Payload::Message(protoflow_zmq::Message {
output: map_id(output),
input: map_id(input),
sequence,
message: message.to_vec(),
}),
AckMessage(output, input, sequence) => Payload::AckMessage(protoflow_zmq::AckMessage {
output: map_id(output),
input: map_id(input),
sequence,
}),
CloseOutput(output, input) => Payload::CloseOutput(protoflow_zmq::CloseOutput {
output: map_id(output),
input: map_id(input),
}),
CloseInput(input) => Payload::CloseInput(protoflow_zmq::CloseInput {
input: map_id(input),
}),
};

let bytes = Event {
payload: Some(payload),
}
.encode_to_vec();
msg.push_back(bytes.into());

msg
}
}

impl TryFrom<ZmqMessage> for ZmqTransportEvent {
type Error = protoflow_core::DecodeError;

fn try_from(value: ZmqMessage) -> Result<Self, Self::Error> {
use crate::protoflow_zmq::{self, event::Payload, Event};
use prost::Message;
use protoflow_core::DecodeError;

fn map_id<T>(id: i64) -> Result<T, DecodeError>
where
T: TryFrom<isize>,
std::borrow::Cow<'static, str>: From<<T as TryFrom<isize>>::Error>,
{
(id as isize).try_into().map_err(DecodeError::new)
}

value
.get(1)
.ok_or_else(|| {
protoflow_core::DecodeError::new("message contains less than two frames")
})
.and_then(|bytes| {
let event = Event::decode(bytes.as_ref())?;

use ZmqTransportEvent::*;
Ok(match event.payload {
None => {
return Err(protoflow_core::DecodeError::new("message payload is empty"))
}
Some(Payload::Connect(protoflow_zmq::Connect { output, input })) => {
Connect(map_id(output)?, map_id(input)?)
}

Some(Payload::AckConnection(protoflow_zmq::AckConnection {
output,
input,
})) => AckConnection(map_id(output)?, map_id(input)?),

Some(Payload::Message(protoflow_zmq::Message {
output,
input,
sequence,
message,
})) => Message(
map_id(output)?,
map_id(input)?,
sequence,
Bytes::from(message),
),

Some(Payload::AckMessage(protoflow_zmq::AckMessage {
output,
input,
sequence,
})) => AckMessage(map_id(output)?, map_id(input)?, sequence),

Some(Payload::CloseOutput(protoflow_zmq::CloseOutput { output, input })) => {
CloseOutput(map_id(output)?, map_id(input)?)
}

Some(Payload::CloseInput(protoflow_zmq::CloseInput { input })) => {
CloseInput(map_id(input)?)
}
})
})
}
}
Loading