Skip to content

Commit

Permalink
Merge master; Reset unrelated changes from branch
Browse files Browse the repository at this point in the history
  • Loading branch information
SamuelSarle committed Nov 29, 2024
2 parents 393d990 + 0f5c529 commit 50647e8
Show file tree
Hide file tree
Showing 601 changed files with 26,987 additions and 3 deletions.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
218 changes: 218 additions & 0 deletions .jjconflict-base-0/lib/protoflow-core/src/system.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
// This is free and unencumbered software released into the public domain.

use crate::{
prelude::{fmt, Arc, BTreeSet, Box, Bytes, PhantomData, Rc, String, Vec, VecDeque},
runtimes::StdRuntime,
transports::MpscTransport,
types::Any,
Block, BlockID, BlockResult, BoxedBlock, BoxedBlockType, InputPort, InputPortID, Message,
OutputPort, OutputPortID, PortID, PortResult, Process, Runtime, Transport,
};

#[cfg(feature = "tokio")]
use crate::{AsyncBlock, BoxedAsyncBlock};

#[cfg(feature = "tokio")]
pub type RuntimeHandle = tokio::runtime::Handle;

pub trait SystemBuilding {
fn input_any(&self) -> InputPort<Any> {
self.input()
}

fn input_bytes(&self) -> InputPort<Bytes> {
self.input()
}

fn input_string(&self) -> InputPort<String> {
self.input()
}

/// Creates a new input port inside the system.
fn input<M: Message + 'static>(&self) -> InputPort<M>;

fn output_any(&self) -> OutputPort<Any> {
self.output()
}

fn output_bytes(&self) -> OutputPort<Bytes> {
self.output()
}

fn output_string(&self) -> OutputPort<String> {
self.output()
}

/// Creates a new output port inside the system.
fn output<M: Message + 'static>(&self) -> OutputPort<M>;

/// Instantiates a block inside the system.
fn block<B: Block + Clone + 'static>(&mut self, block: B) -> B;

///
#[cfg(feature = "tokio")]
fn block_async<B: AsyncBlock + Clone + 'static>(&mut self, block: B) -> B;

/// Connects two ports of two blocks in the system.
///
/// Both ports must be of the same message type.
fn connect<M: Message>(&mut self, source: &OutputPort<M>, target: &InputPort<M>) -> bool;

fn connections(&self) -> Vec<(OutputPortID, InputPortID)>;

/// Validates system for execution.
fn validate(self) -> BlockResult<()>;
}

pub trait SystemExecution {
/// Prepare:
/// - Calls the transport layer to connect all the output->input ports.
/// The connections are defined by `SystemBuilding.connect()`.
fn prepare(&self) -> BlockResult<()>;
/// Executes the system, returning the system process.
fn execute(self) -> BlockResult<Rc<dyn Process>>;
}

/// A system is a collection of blocks that are connected together.
pub struct System<X: Transport + Default + 'static = MpscTransport> {
pub(crate) runtime: Arc<StdRuntime<X>>,

/// The registered blocks in the system.
pub(crate) blocks: VecDeque<BoxedBlockType>,

pub(crate) connections: BTreeSet<(OutputPortID, InputPortID)>,

_phantom: PhantomData<X>,
}

pub type Subsystem<X> = System<X>;

impl fmt::Debug for System {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("System")
.field("blocks", &self.blocks)
.finish()
}
}

impl<X: Transport + Default + 'static> System<X> {
/// Builds a new system.
pub fn build<F: FnOnce(&mut System<X>)>(f: F) -> Self {
let transport = X::default();
let runtime = StdRuntime::new(transport).unwrap();
let mut system = System::new(&runtime);
f(&mut system);
system
}

/// Instantiates a new system.
pub fn new(runtime: &Arc<StdRuntime<X>>) -> Self {
Self {
runtime: runtime.clone(),
blocks: VecDeque::new(),
connections: BTreeSet::default(),
_phantom: PhantomData,
}
}

pub fn execute(self) -> BlockResult<Rc<dyn Process>> {
let mut runtime = self.runtime.clone();
runtime.execute(self)
}

pub fn input<M: Message + 'static>(&self) -> InputPort<M> {
InputPort::new(self)
}

pub fn output<M: Message + 'static>(&self) -> OutputPort<M> {
OutputPort::new(self)
}

pub fn block<B: Block + Clone + 'static>(&mut self, block: B) -> B {
self.add_block(Box::new(block.clone()));
block
}

#[cfg(feature = "tokio")]
pub fn block_async<B: AsyncBlock + Clone + 'static>(&mut self, block: B) -> B {
self.add_block_async(Box::new(block.clone()));
block
}

#[doc(hidden)]
pub fn add_block(&mut self, block: BoxedBlock) -> BlockID {
let block_id = BlockID::from(self.blocks.len());
self.blocks.push_back(BoxedBlockType::Normal(block));
block_id
}

#[doc(hidden)]
#[cfg(feature = "tokio")]
pub fn add_block_async(&mut self, block: BoxedAsyncBlock) -> BlockID {
let block_id = BlockID::from(self.blocks.len());
self.blocks.push_back(BoxedBlockType::Async(block));
block_id
}

#[doc(hidden)]
pub fn get_block(&self, block_id: BlockID) -> Option<&BoxedBlockType> {
self.blocks.get(block_id.into())
}

pub fn connect<M: Message>(&mut self, source: &OutputPort<M>, target: &InputPort<M>) -> bool {
self.connect_by_id(PortID::Output(source.id), PortID::Input(target.id))
.unwrap()
}

#[doc(hidden)]
pub fn connect_by_id(&mut self, source_id: PortID, target_id: PortID) -> PortResult<bool> {
self.connections.insert((
OutputPortID(source_id.into()),
InputPortID(target_id.into()),
));
Ok(true)
}
}

impl SystemBuilding for System {
fn input<M: Message + 'static>(&self) -> InputPort<M> {
System::input(self)
}

fn output<M: Message + 'static>(&self) -> OutputPort<M> {
System::output(self)
}

fn block<B: Block + Clone + 'static>(&mut self, block: B) -> B {
System::block(self, block)
}

#[cfg(feature = "tokio")]
fn block_async<B: AsyncBlock + Clone + 'static>(&mut self, block: B) -> B {
System::block_async(self, block)
}

fn connect<M: Message>(&mut self, source: &OutputPort<M>, target: &InputPort<M>) -> bool {
System::connect(self, source, target)
}

fn connections(&self) -> Vec<(OutputPortID, InputPortID)> {
self.connections.clone().into_iter().collect()
}

fn validate(self) -> BlockResult<()> {
Ok(()) // TODO
}
}

impl SystemExecution for System {
fn prepare(&self) -> BlockResult<()> {
self.runtime.transport.connect_system(self)?;
Ok(())
}

fn execute(self) -> BlockResult<Rc<dyn Process>> {
SystemExecution::prepare(&self)?;
self.execute()
}
}
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
38 changes: 38 additions & 0 deletions .jjconflict-base-0/lib/protoflow-zeromq/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
[package]
name = "protoflow-zeromq"
version.workspace = true
authors.workspace = true
edition.workspace = true
rust-version.workspace = true
description.workspace = true
#documentation.workspace = true
readme.workspace = true
homepage.workspace = true
repository.workspace = true
license.workspace = true
keywords.workspace = true
categories.workspace = true
publish.workspace = true

[features]
default = ["all", "std"]
all = ["tracing"]
std = ["protoflow-core/std", "tracing?/std"]
tracing = ["protoflow-core/tracing", "dep:tracing"]
unstable = ["protoflow-core/unstable"]

[build-dependencies]
cfg_aliases.workspace = true

[dependencies]
protoflow-core.workspace = true
tracing = { version = "0.1", default-features = false, optional = true }
zeromq = { version = "0.4.1", default-features = false, features = [
"tokio-runtime",
"all-transport",
] }
tokio = { version = "1.40.0", default-features = false }
parking_lot = "0.12"

[dev-dependencies]
futures-util = "0.3.31"
Loading

0 comments on commit 50647e8

Please sign in to comment.