composite is a lightweight framework for building componentized streaming applications. It provides a modular approach to constructing streaming workflows.
- Modular Architecture: Build applications by composing reusable components.
- Lightweight Design: Minimal overhead ensures high performance in streaming scenarios.
- Efficient Memory Management: Minimize copies with smart pointer movement between component ports.
Ensure you have the following installed:
- CMake (version 3.15 or higher)
- A compatible C++ compiler (e.g., GCC, Clang) with C++20 support
- OpenSSL (version 3.0 or higher) if compiling with
-DCOMPOSITE_USE_OPENSSL=ON
- nats.c if compiling with
-DCOMPOSITE_USE_NATS=ON
cmake -B build
cmake --build build [--parallel N]
cmake --install build
COMPOSITE_USE_NATS
: Enable components to publish data to a NATS server on a defined subjectCOMPOSITE_USE_OPENSSL
: Compile with OpenSSL support to enable a secure REST server
The composite framework is designed around a component-based architecture. Each component follows a well-defined interface that allows it to be integrated into a larger streaming pipeline. The key aspects of the component interface include:
- Lifecycle Management: Each component follows a structured lifecycle, including creation, execution, and teardown.
- Configuration: Components can be configured via properties, allowing for flexible runtime behavior.
- Initialization: Components define an initialization phase where necessary resources are allocated.
- Data Processing: Components process incoming data and produce outputs, which are streamed to downstream components.
The composite framework provides a type-safe, smart-pointer-based port system for connecting components.
The system facilitates the transfer of time-stamped contiguous data buffers and associated metadata between components.
It handles smart pointer ownership semantics (std::unique_ptr
and std::shared_ptr
) to optimize memory use and performance.
Each port is either an input_port<T>
or an output_port<T>
, where T
is a smart pointer to a contiguous buffer type
such that T::element_type
satisfies std::ranges::contiguous_range
(e.g., std::unique_ptr<std::vector<float>>
).
The output_port
class is responsible for publishing time-stamped buffer data to one or more connected input_port<T>
instances or to a NATS subject if configured. It supports efficient transfer semantics by minimizing copies and
adjusting behavior based on the pointer types of connected inputs.
- Compatible with
std::unique_ptr
andstd::shared_ptr
- Ability to send metadata independently to connected input ports via
send_metadata()
- Sorts connected inputs internally to handle ownership safety and optimize move semantics
- Forwards data intelligently by choosing to move, copy, or promote based on smart pointer types
- Respects range and type constraints for safety and flexibility
- Optionally publishes raw byte buffers over NATS
Behavior is determined by the smart pointer types of the output and input ports. Connected ports are internally sorted
such that unique_ptr
destinations are processed last to enable efficient move semantics for the final unique_ptr
recipient.
From output_port | To input_port | Behavior |
---|---|---|
unique_ptr | unique_ptr | Move (last), Deep-copy (others) |
unique_ptr | shared_ptr | Promote to shared_ptr once (by copy or release), reuse shared_ptr |
shared_ptr | shared_ptr | Share reference (no copy) |
shared_ptr | unique_ptr | Deep-copy buffer into unique_ptr |
- An
output_port
can send metadata to all its connectedinput_port
instances using thesend_metadata(const metadata&)
function.- Updated metadata must be sent before the next data packet so that it can be associated correctly
- This metadata is "latched" by the receiving input ports and is intended to be associated with the next data packet that is subsequently enqueued and retrieved from those input ports.
The input_port
class provides a thread-safe queue to receive time-stamped data buffers from an output_port
.
It can be configured with a depth limit and exposes methods for inspection and clearing.
- Compatible with
std::unique_ptr
andstd::shared_ptr
- Thread-safe receive queue with condition variable
- Optional bounded queue depth (default: unbounded, i.e.,
std::numeric_limits<std::size_t>::max()
) - Blocking
get_data()
retrieves a tuple containing the data buffer, its timestamp, and optional associated metadata, with 1-second timeout - Methods to clear and inspect the current queue state
- Data, along with its timestamp, is enqueued by an
output_port
'ssend_data()
method. If metadata was previously sent by the output port, that metadata is packaged with this incoming data during theadd_data
call. - The internal queue honors the configured
depth
limit; data arriving when the queue is full (i.e.,m_queue.size() >= m_depth when
add_data
is called) is dropped.- The queue depth can be configured dynamically at runtime with the
input_port
'sdepth(std::size_t)
method. - Setting a depth of 0 "disables" the port because all incoming data will be dropped.
- The queue depth can be configured dynamically at runtime with the
- Consumers call get_data() to retrieve a
std::tuple<buffer_type, timestamp, std::optional<metadata>>
.- If data is available, the tuple contains the data, its timestamp, and any metadata that was associated with it at the time of enqueuing.
- If no data is received within the 1-second timeout, the
buffer_type
element of the tuple will be null (or equivalent, e.g. anullptr
for smart pointers), and the timestamp and metadata will be default/empty (asget_data()
returns {}).
- Metadata sent by an
output_port
is received by theinput_port
and stored in its internalm_metadata
member. This is the "latching" mechanism. - When the next data packet is enqueued into the
input_port
, this latchedm_metadata
is bundled with that data packet and timestamp into a tuple, which is then added to the queue. - Immediately after the latched m_metadata is used to form this tuple, the
input_port
's internal m_metadata member is reset. This makes the input port ready to latch new metadata for any subsequent data packets.
Components in the composite framework are configurable through a property system managed by the property_set
class. This allows for flexible
adaptation of component behavior at initialization or, for certain properties, during runtime.
Properties are typically defined with a component's constructor by linking them to member variable. This is done using the add_property()
method
provided by the component
base class:
#include <composite/component.hpp>
#include <optional>
#include <string>
class MyConfigurableComponent : public composite::component {
public:
MyConfigurableComponent() : composite::component("MyConfigurableComponent") {
// Define a mandatory integer property with units and runtime configurability
add_property("threshold", &m_threshold)
.units("dB")
.configurability(composite::properties::config_type::RUNTIME);
// Define an optional string property (m_api_key is std::optional<std::string>)
// Default configurability is INITIALIZE, no units specified
add_property("api_key", &m_api_key);
// Define a property that can only be set at initialization (default behavior)
add_property("buffer_size", &m_buffer_size).units("elements");
}
// ... process() and other methods ...
private:
// Member variables for properties
int32_t m_threshold{};
std::optional<std::string> m_api_key{}; // Initially no value
uint32_t m_buffer_size{1024};
};
- Type System: The system automatically deduces the property type from the member variable's C++ type
(e.g.,
int
becomes"int32"
,float
becomes"float"
,std::string
becomes"string"
).std::optional<T>
is supported for properties that may not always have a value. Its type will be represented as"<type>?"
(e.g.,std::optional<int>
corresponds to type string"int32?"
).
- Fluent Configuration:
add_property()
returns a reference that allows for chained calls to set metadata:.units(std::string_view)
: Specifies units for the property (e.g., "ms", "items", "percent"). This is for informational purposes..configurability(composite::properties::config_type)
: Defines when the property can be changed:composite::properties::config_type::INITIALIZE
(default): The property can only be set during initialization configuration of values from JSON file.composite::properties::config_type::RUNTIME
: The property can be modified while the component is running.
- Pointers: Properties are registered by passing a pointer to the component's member variable that will store the actual value.
The
property_set
directly manipulates this memory location.
For more complex configurations, properties can be grouped into structures using add_struct_property()
. This allows for namespaced properties
(e.g., "network.host"
, "network.port"
) and better organization.
#include <composite/component.hpp>
#include <string>
struct NetworkConfig {
std::string host{"localhost"};
uint16_t port{8080};
std::optional<std::string> protocol{};
};
class MyComponentWithStructProp : public composite::component {
public:
MyComponentWithStructProp() : composite::component("MyComponentWithStructProp") {
add_struct_property("network", &m_net_config,
// This lambda registers the fields of the NetworkConfig struct
[](auto& ps, auto* conf) {
ps.add_property("host", &conf->host).configurability(composite::properties::config_type::RUNTIME);
ps.add_property("port", &conf->port); // Default: INITIALIZE
ps.add_property("protocol", &conf->protocol); // Optional property
}
);
}
// ... other methods and members ...
private:
NetworkConfig m_net_config;
};
While properties are defined within the component, their values are typically set externally (e.g., from a configuration file or via REST APIs).
The component
base class provides a set_properties()
method that accepts a list of string-based key-value pairs. This method handles:
- Resolving property names, including structured paths like
"network.port"
- Performing type conversion from the input string to the target property's actual C++ type
- Validating changes against the property's configurability rules (INITIALIZE vs RUNTIME)
- Invoking registered change listeners (see below)
Components can react to changes in their properties in two main ways:
-
Change Listeners: A specific callback function can be attached to an individual property using the
property
'schange_listener()
method. This callback is invoked byset_property
before the property is finalized but after the pointed-to-member variable has been tenatively updated. If the callback returnsfalse
, the change is rejected, and the property value is reverted to its previous state.// Use the change_listener() method to add a callback // Assume m_threshold is an int32_t member variable add_property("threshold", &m_threshold) .units("percentage") .configurability(composite::properties::config_type::RUNTIME) .change_listener([this]() { // Inside the listener, m_threshold already holds the new, proposed value if (m_threshold < 0 || m_threshold > 100) { logger()->warn("Proposed threshold {} is out of range [0, 100]. Change will be rejected.", m_threshold); // Returning false will cause property_set to revert m_threshold to its previous value return false; // Reject change } logger()->info("Threshold will be changed to: {}. Change accepted.", m_threshold); // Perform any immediate actions needed due to this specific change // For example: self->reconfigure_threshold_dependent_logic(); return true; // Accept change });
-
property_change_handler()
: Thecomponent
class provides avirtual void property_change_handler()
method. This method is called once at the end of a successfulset_properties()
call, after all specified properties have been updated and their individual change listeners (if any) have approved the changes. Subclasses can override this method to perform more complex or coordinated reconfigurations based on the new overall state of multiple properties.// In MyComponent class void property_change_handler() override { // This method is called after one or more properties have been successfully updated. logger()->info("Properties updated. Component will reconfigure based on new state."); // Example: if m_buffer_size or other related properties changed, reallocate buffers or update internal structures. // this->reinitialize_buffers_if_needed(); // this->update_processing_parameters(); }
To create a new component, developers must implement the required interface functions, ensuring compatibility with the composite framework. Example:
#include <composite/component.hpp>
class MyComponent : public composite::component {
using input_t = std::unique_ptr<std::vector<float>>;
using output_t = input_t;
public:
MyComponent() : composite::component("MyComponent") {
// Add ports to port set
add_port(&m_in_port);
add_port(&m_out_port);
// Add properties to configure
add_property("processing_gain", &m_processing_gain)
.units("factor")
.configurability(composite::properties::config_type::RUNTIME)
.change_listener([this]() {
logger()->info("Change listener validating new processing_gain value: {}", m_processing_gain);
// Add validation logic as needed
// ...
// return false; // reject change if invalid value
return true; // accept change
});
}
~MyComponent() final = default;
// Implement the pure virtual function defined in composite::component
auto process() -> composite::retval override {
using enum composite::retval;
// Get data from an input port (if available)
// get_data() returns a tuple: {data_buffer, timestamp, optional_metadata}
auto [data, ts, metadata] = m_in_port.get_data();
if (data == nullptr) {
// No data received within the timeout
return NOOP; // Indicate no operation was performed, component will sleep briefly
}
// Check is metadata was received with this data packet
if (metadata.has_value()) {
// Printing metadata for debug purposes
logger()->debug("Received metadata with data packet: {}", metadata->to_string());
// Process metadata as needed
// ...
// Send metadata downstream for follow-on components
// Any updated metadata must be sent before the next data packet is sent
m_out_port.send_metadata(metadata.value());
}
// User-defined processing logic
// Example: Apply gain (actual processing depends on data content)
logger()->debug("Processing data (size: {}) with gain: {}", data->size(), m_processing_gain);
// This example assumes the data processing modifies the data in-place
for (auto& val : *data) {
val *= m_processing_gain;
}
// Send data via an output port
m_out_port.send_data(std::move(data), ts);
return NORMAL; // indicate normal processing occurred, component will yield
}
auto property_change_handler() -> void override {
logger()->info("Properties have been updated. Current gain: {}", m_processing_gain);
// Potentially reconfigure aspects of the component based on new property values
}
private:
// Ports
composite::input_port<input_t> m_in_port{"data_in"};
composite::output_port<output_t> m_out_port{"data_out"};
// Properties
float m_processing_gain{1}; // example property with a default value
}; // class MyComponent