Skip to content

Commit

Permalink
Merge master
Browse files Browse the repository at this point in the history
  • Loading branch information
SamuelSarle committed Nov 29, 2024
2 parents 393d990 + 053e5dd commit 4ebc035
Show file tree
Hide file tree
Showing 9 changed files with 1,000 additions and 28 deletions.
5 changes: 3 additions & 2 deletions lib/protoflow-blocks/src/blocks/sys/read_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ impl StdioSystem for ReadFile {
#[cfg(test)]
mod tests {
extern crate std;

use super::ReadFile;
use crate::{System, SystemBuilding, SystemExecution};

Expand Down Expand Up @@ -122,7 +123,7 @@ mod tests {
system.connect(&path, &read_file.path);
system.connect(&read_file.output, &output);

let thrd = std::thread::spawn(move || system.execute().and_then(|p| p.join()).unwrap());
let process = system.execute().unwrap();

path.send(&temp_file.path().to_string_lossy().into())
.unwrap();
Expand All @@ -143,6 +144,6 @@ mod tests {
"want EOS signal after path port is closed"
);

thrd.join().unwrap()
process.join().unwrap();
}
}
20 changes: 16 additions & 4 deletions lib/protoflow-blocks/src/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@
#![allow(dead_code)]

use crate::{
prelude::{fmt, Arc, Box, FromStr, Rc, String, ToString},
prelude::{fmt, Arc, Box, FromStr, Rc, String, ToString, Vec},
types::{DelayType, Encoding},
AllBlocks, Buffer, ConcatStrings, Const, CoreBlocks, Count, Decode, DecodeCsv, DecodeHex,
DecodeJson, Delay, Drop, Encode, EncodeCsv, EncodeHex, EncodeJson, FlowBlocks, HashBlocks,
IoBlocks, MathBlocks, Random, ReadDir, ReadEnv, ReadFile, ReadSocket, ReadStdin, SplitString,
SysBlocks, TextBlocks, WriteFile, WriteSocket, WriteStderr, WriteStdout,
};
use protoflow_core::{
Block, BlockID, BlockResult, BoxedBlockType, InputPort, Message, OutputPort, PortID,
PortResult, Process, SystemBuilding, SystemExecution,
Block, BlockID, BlockResult, BoxedBlockType, InputPort, InputPortID, Message, OutputPort,
OutputPortID, PortID, PortResult, Process, SystemBuilding, SystemExecution,
};

#[cfg(any(
Expand Down Expand Up @@ -98,8 +98,12 @@ impl fmt::Debug for System {
}

impl SystemExecution for System {
fn prepare(&self) -> BlockResult<()> {
SystemExecution::prepare(&self.0)
}

fn execute(self) -> BlockResult<Rc<dyn Process>> {
self.0.execute()
SystemExecution::execute(self.0)
}
}

Expand All @@ -124,6 +128,14 @@ impl SystemBuilding for System {
fn connect<M: Message>(&mut self, source: &OutputPort<M>, target: &InputPort<M>) -> bool {
self.0.connect(source, target)
}

fn connections(&self) -> Vec<(OutputPortID, InputPortID)> {
self.0.connections()
}

fn validate(self) -> BlockResult<()> {
self.0.validate()
}
}

impl AllBlocks for System {}
Expand Down
4 changes: 2 additions & 2 deletions lib/protoflow-blocks/tests/json_roundtrip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ fn json_roundtrip() -> Result<(), ()> {
system.connect(&decode.output, &encode.input);
system.connect(&encode.output, &output);

let thread = std::thread::spawn(|| system.execute().unwrap().join().unwrap());
let process = system.execute().unwrap();

let message = output.recv().unwrap().unwrap();

thread.join().unwrap();
process.join().unwrap();

assert_eq!(input_bytes, message);

Expand Down
41 changes: 33 additions & 8 deletions lib/protoflow-core/src/system.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// This is free and unencumbered software released into the public domain.

use crate::{
prelude::{fmt, Arc, Box, Bytes, PhantomData, Rc, String, VecDeque},
prelude::{fmt, Arc, BTreeSet, Box, Bytes, PhantomData, Rc, String, Vec, VecDeque},
runtimes::StdRuntime,
transports::MpscTransport,
types::Any,
Expand Down Expand Up @@ -57,9 +57,18 @@ pub trait SystemBuilding {
///
/// Both ports must be of the same message type.
fn connect<M: Message>(&mut self, source: &OutputPort<M>, target: &InputPort<M>) -> bool;

fn connections(&self) -> Vec<(OutputPortID, InputPortID)>;

/// Validates system for execution.
fn validate(self) -> BlockResult<()>;
}

pub trait SystemExecution {
/// Prepare:
/// - Calls the transport layer to connect all the output->input ports.
/// The connections are defined by `SystemBuilding.connect()`.
fn prepare(&self) -> BlockResult<()>;
/// Executes the system, returning the system process.
fn execute(self) -> BlockResult<Rc<dyn Process>>;
}
Expand All @@ -71,6 +80,8 @@ pub struct System<X: Transport + Default + 'static = MpscTransport> {
/// The registered blocks in the system.
pub(crate) blocks: VecDeque<BoxedBlockType>,

pub(crate) connections: BTreeSet<(OutputPortID, InputPortID)>,

_phantom: PhantomData<X>,
}

Expand Down Expand Up @@ -99,6 +110,7 @@ impl<X: Transport + Default + 'static> System<X> {
Self {
runtime: runtime.clone(),
blocks: VecDeque::new(),
connections: BTreeSet::default(),
_phantom: PhantomData,
}
}
Expand Down Expand Up @@ -147,19 +159,18 @@ impl<X: Transport + Default + 'static> System<X> {
self.blocks.get(block_id.into())
}

pub fn connect<M: Message>(&self, source: &OutputPort<M>, target: &InputPort<M>) -> bool {
pub fn connect<M: Message>(&mut self, source: &OutputPort<M>, target: &InputPort<M>) -> bool {
self.connect_by_id(PortID::Output(source.id), PortID::Input(target.id))
.unwrap()
}

#[doc(hidden)]
pub fn connect_by_id(&self, source_id: PortID, target_id: PortID) -> PortResult<bool> {
let runtime = self.runtime.as_ref();
let transport = runtime.transport.as_ref();
transport.connect(
pub fn connect_by_id(&mut self, source_id: PortID, target_id: PortID) -> PortResult<bool> {
self.connections.insert((
OutputPortID(source_id.into()),
InputPortID(target_id.into()),
)
));
Ok(true)
}
}

Expand All @@ -184,10 +195,24 @@ impl SystemBuilding for System {
fn connect<M: Message>(&mut self, source: &OutputPort<M>, target: &InputPort<M>) -> bool {
System::connect(self, source, target)
}

fn connections(&self) -> Vec<(OutputPortID, InputPortID)> {
self.connections.clone().into_iter().collect()
}

fn validate(self) -> BlockResult<()> {
Ok(()) // TODO
}
}

impl SystemExecution for System {
fn prepare(&self) -> BlockResult<()> {
self.runtime.transport.connect_system(self)?;
Ok(())
}

fn execute(self) -> BlockResult<Rc<dyn Process>> {
System::execute(self)
SystemExecution::prepare(&self)?;
self.execute()
}
}
9 changes: 8 additions & 1 deletion lib/protoflow-core/src/transport.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// This is free and unencumbered software released into the public domain.

use crate::{prelude::Bytes, InputPortID, OutputPortID, PortID, PortResult, PortState};
use crate::{prelude::Bytes, InputPortID, OutputPortID, PortID, PortResult, PortState, System};

#[allow(unused)]
pub trait Transport: AsTransport + Send + Sync {
Expand Down Expand Up @@ -30,6 +30,13 @@ pub trait Transport: AsTransport + Send + Sync {
fn send(&self, output: OutputPortID, message: Bytes) -> PortResult<()>;
fn recv(&self, input: InputPortID) -> PortResult<Option<Bytes>>;
fn try_recv(&self, input: InputPortID) -> PortResult<Option<Bytes>>;

fn connect_system(&self, system: &System) -> PortResult<()> {
system
.connections
.iter()
.try_for_each(|&(output, input)| self.connect(output, input).map(|_| ()))
}
}

pub trait AsTransport {
Expand Down
10 changes: 8 additions & 2 deletions lib/protoflow-zeromq/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ publish.workspace = true
[features]
default = ["all", "std"]
all = ["tracing"]
std = ["protoflow-core/std", "tracing?/std"] #, "zeromq/default"]
std = ["protoflow-core/std", "tracing?/std"]
tracing = ["protoflow-core/tracing", "dep:tracing"]
unstable = ["protoflow-core/unstable"]

Expand All @@ -27,6 +27,12 @@ cfg_aliases.workspace = true
[dependencies]
protoflow-core.workspace = true
tracing = { version = "0.1", default-features = false, optional = true }
#zeromq = { version = "0.4", default-features = false }
zeromq = { version = "0.4.1", default-features = false, features = [
"tokio-runtime",
"all-transport",
] }
tokio = { version = "1.40.0", default-features = false }
parking_lot = "0.12"

[dev-dependencies]
futures-util = "0.3.31"
Loading

0 comments on commit 4ebc035

Please sign in to comment.