Skip to content

Commit

Permalink
Remove Config type parameter from MessageSink
Browse files Browse the repository at this point in the history
Signed-off-by: Didier Wenzek <[email protected]>
  • Loading branch information
didier-wenzek committed Mar 13, 2024
1 parent 9bbb6c8 commit bf4fac9
Show file tree
Hide file tree
Showing 26 changed files with 48 additions and 70 deletions.
2 changes: 1 addition & 1 deletion crates/common/batcher/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ impl<B: Batchable> BatchingActorBuilder<B> {
}
}

impl<B: Batchable> MessageSink<BatchDriverInput<B>, NoConfig> for BatchingActorBuilder<B> {
impl<B: Batchable> MessageSink<BatchDriverInput<B>> for BatchingActorBuilder<B> {
fn get_sender(&self) -> DynSender<BatchDriverInput<B>> {
self.message_box.get_sender()
}
Expand Down
21 changes: 10 additions & 11 deletions crates/core/tedge_actors/src/builders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,15 +97,14 @@ pub struct NoConfig;
///
/// An actor whose builder is a `MessageSink<M, C>` can be connected to any other actor
/// whose builder is a `MessageSource<M, C>` so that the sink can receive messages from that source.
///
/// A sink might be interested only in a subset of the messages emitted by the source.
/// For that purpose each source implementation defines a `Config` type parameter,
/// and the sink has to provide the configuration value specific to its needs.
pub trait MessageSink<M: Message, Config> {
pub trait MessageSink<M: Message> {
/// Return the sender that can be used by peers to send messages to this actor
fn get_sender(&self) -> DynSender<M>;

/// Add a source of messages to the actor under construction
///
/// A sink might be interested only in a subset of the messages emitted by the source.
/// This subset is defined by the config parameter.
fn add_input<N, C>(&mut self, config: C, source: &mut impl MessageSource<N, C>)
where
N: Message,
Expand Down Expand Up @@ -178,13 +177,13 @@ pub trait MessageSink<M: Message, Config> {
/// The [Builder] of an [Actor](crate::Actor) must implement this trait
/// for every message type that actor can send to its peers.
///
/// To receive messages from a `MessageSource<M, C>`, the peer must be a `MessageSink<M, C>`.
/// To receive messages from a `MessageSource<M, C>`, the peer must be a `MessageSink<M>`.
pub trait MessageSource<M: Message, Config> {
/// The message will be sent to the peer using the provided `sender`
fn register_peer(&mut self, config: Config, sender: DynSender<M>);

/// Connect a peer actor that will consume the message produced by this actor
fn add_sink<C>(&mut self, config: Config, peer: &impl MessageSink<M, C>) {
fn add_sink(&mut self, config: Config, peer: &impl MessageSink<M>) {
self.register_peer(config, peer.get_sender());
}
}
Expand Down Expand Up @@ -295,7 +294,7 @@ pub trait RuntimeRequestSink {
/// # self.messages.register_peer(config, sender)
/// # }
/// # }
/// # impl MessageSink<MyActorInput, NoConfig> for MyActorBuilder {
/// # impl MessageSink<MyActorInput> for MyActorBuilder {
/// # fn get_sender(&self) -> DynSender<MyActorInput> {
/// # self.messages.get_sender()
/// # }
Expand Down Expand Up @@ -378,7 +377,7 @@ impl<I: Message, O: Message> SimpleMessageBoxBuilder<I, O> {
pub fn set_connection<Config>(
&mut self,
config: Config,
service: &mut (impl MessageSink<O, NoConfig> + MessageSource<I, Config>),
service: &mut (impl MessageSink<O> + MessageSource<I, Config>),
) {
service.register_peer(config, self.input_sender.sender_clone());
self.register_peer(NoConfig, service.get_sender());
Expand All @@ -391,7 +390,7 @@ impl<I: Message, O: Message> SimpleMessageBoxBuilder<I, O> {
pub fn with_connection<Config>(
mut self,
config: Config,
service: &mut (impl MessageSink<O, NoConfig> + MessageSource<I, Config>),
service: &mut (impl MessageSink<O> + MessageSource<I, Config>),
) -> Self {
self.set_connection(config, service);
self
Expand All @@ -406,7 +405,7 @@ impl<I: Message, O: Message, C> MessageSource<O, C> for SimpleMessageBoxBuilder<
}

/// A `SimpleMessageBoxBuilder<Input,Output>` is a [MessageSink] of `Input` messages with no specific config.
impl<I: Message, O: Message> MessageSink<I, NoConfig> for SimpleMessageBoxBuilder<I, O> {
impl<I: Message, O: Message> MessageSink<I> for SimpleMessageBoxBuilder<I, O> {
fn get_sender(&self) -> DynSender<I> {
self.input_sender.sender_clone()
}
Expand Down
4 changes: 2 additions & 2 deletions crates/core/tedge_actors/src/converter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ impl<C: Converter> ConvertingActor<C> {
///
/// /// Return a converting actor connected to a peer that is both a source and sink of MQTT messages
/// fn new_converter(
/// mut mqtt_builder: impl MessageSource<MqttMessage, TopicFilter> + MessageSink<MqttMessage, NoConfig>
/// mut mqtt_builder: impl MessageSource<MqttMessage, TopicFilter> + MessageSink<MqttMessage>
/// ) -> ConvertingActor<MyConverter> {
/// // Use a builder to connect and build the actor
/// let mut converter_builder = ConvertingActor::builder(
Expand Down Expand Up @@ -264,7 +264,7 @@ impl<C: Converter> MessageSource<C::Output, NoConfig> for ConvertingActorBuilder
}
}

impl<C: Converter> MessageSink<C::Input, NoConfig> for ConvertingActorBuilder<C> {
impl<C: Converter> MessageSink<C::Input> for ConvertingActorBuilder<C> {
fn get_sender(&self) -> DynSender<C::Input> {
self.message_box.get_sender()
}
Expand Down
4 changes: 2 additions & 2 deletions crates/core/tedge_actors/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,8 @@
//! }
//!
//! /// An actor builder also declares that it consumes messages
//! /// by implementing the `MessageSink` trait for the appropriate input and config types.
//! impl MessageSink<SomeMessage, SomeConfig> for SomeOtherActorBuilder {
//! /// by implementing the `MessageSink` trait for the appropriate input type.
//! impl MessageSink<SomeMessage> for SomeOtherActorBuilder {
//! /// Return a sender where to send the input messages.
//! fn get_sender(&self) -> DynSender<SomeMessage> {
//! todo!()
Expand Down
3 changes: 1 addition & 2 deletions crates/core/tedge_actors/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use crate::Builder;
use crate::ChannelError;
use crate::DynSender;
use crate::MessageSink;
use crate::NoConfig;
use crate::RuntimeError;
use crate::RuntimeRequestSink;
use futures::channel::mpsc;
Expand Down Expand Up @@ -138,7 +137,7 @@ impl RuntimeHandle {
}
}

impl MessageSink<RuntimeAction, NoConfig> for RuntimeHandle {
impl MessageSink<RuntimeAction> for RuntimeHandle {
fn get_sender(&self) -> DynSender<RuntimeAction> {
self.actions_sender.clone().into()
}
Expand Down
5 changes: 2 additions & 3 deletions crates/core/tedge_actors/src/servers/builders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use crate::DynSender;
use crate::LoggingReceiver;
use crate::Message;
use crate::MessageSink;
use crate::NoConfig;
use crate::RequestEnvelope;
use crate::RuntimeError;
use crate::RuntimeRequest;
Expand Down Expand Up @@ -74,7 +73,7 @@ impl<Req: Message, Res: Message> RuntimeRequestSink for ServerMessageBoxBuilder<
}
}

impl<Req: Message, Res: Message> MessageSink<RequestEnvelope<Req, Res>, NoConfig>
impl<Req: Message, Res: Message> MessageSink<RequestEnvelope<Req, Res>>
for ServerMessageBoxBuilder<Req, Res>
{
fn get_sender(&self) -> DynSender<RequestEnvelope<Req, Res>> {
Expand Down Expand Up @@ -184,7 +183,7 @@ impl<S: Server + Clone> Builder<ConcurrentServerActor<S>> for ServerActorBuilder
}
}

impl<S: Server, K> MessageSink<RequestEnvelope<S::Request, S::Response>, NoConfig>
impl<S: Server, K> MessageSink<RequestEnvelope<S::Request, S::Response>>
for ServerActorBuilder<S, K>
{
fn get_sender(&self) -> DynSender<RequestEnvelope<S::Request, S::Response>> {
Expand Down
5 changes: 1 addition & 4 deletions crates/core/tedge_actors/src/servers/message_boxes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use crate::LoggingReceiver;
use crate::Message;
use crate::MessageReceiver;
use crate::MessageSink;
use crate::NoConfig;
use crate::RequestEnvelope;
use crate::RuntimeRequest;
use crate::Sender;
Expand Down Expand Up @@ -111,9 +110,7 @@ impl<Request: Message, Response: Message> Clone for ClientMessageBox<Request, Re

impl<Request: Message, Response: Message> ClientMessageBox<Request, Response> {
/// Create a [ClientMessageBox] connected to a given [Server]
pub fn new(
server: &mut impl MessageSink<RequestEnvelope<Request, Response>, NoConfig>,
) -> Self {
pub fn new(server: &mut impl MessageSink<RequestEnvelope<Request, Response>>) -> Self {
ClientMessageBox {
sender: server.get_sender(),
}
Expand Down
8 changes: 4 additions & 4 deletions crates/core/tedge_actors/src/servers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,20 +149,20 @@ pub type DynRequestSender<Request, Response> = DynSender<RequestEnvelope<Request

/// A connector to a [Server] expecting Request and returning Response.
pub trait Service<Request: Message, Response: Message>:
MessageSink<RequestEnvelope<Request, Response>, NoConfig>
MessageSink<RequestEnvelope<Request, Response>>
{
/// Connect a request message box to the server box under construction
fn add_requester(&mut self, response_sender: DynSender<Response>) -> DynSender<Request>;

fn add_client(
&mut self,
client: &mut (impl MessageSource<Request, NoConfig> + MessageSink<Response, NoConfig>),
client: &mut (impl MessageSource<Request, NoConfig> + MessageSink<Response>),
);
}

impl<T, Request: Message, Response: Message> Service<Request, Response> for T
where
T: MessageSink<RequestEnvelope<Request, Response>, NoConfig>,
T: MessageSink<RequestEnvelope<Request, Response>>,
{
fn add_requester(&mut self, reply_to: DynSender<Response>) -> DynSender<Request> {
let request_sender = RequestSender {
Expand All @@ -174,7 +174,7 @@ where

fn add_client(
&mut self,
client: &mut (impl MessageSource<Request, NoConfig> + MessageSink<Response, NoConfig>),
client: &mut (impl MessageSource<Request, NoConfig> + MessageSink<Response>),
) {
let request_sender = self.add_requester(client.get_sender());
client.register_peer(NoConfig, request_sender);
Expand Down
8 changes: 4 additions & 4 deletions crates/core/tedge_actors/src/test_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ impl<I: MessagePlus, O: MessagePlus> Probe<I, O> {
&mut self,
config: C,
source: &mut impl MessageSource<I, C>,
sink: &mut impl MessageSink<O, NoConfig>,
sink: &mut impl MessageSink<O>,
) {
self.output_forwarder = sink.get_sender();
source.register_peer(config, self.input_interceptor.clone().into());
Expand Down Expand Up @@ -633,7 +633,7 @@ pub trait ServiceConsumerExt<Request: MessagePlus, Response: MessagePlus> {
impl<T, Request: MessagePlus, Response: MessagePlus> ServiceConsumerExt<Request, Response> for T
where
T: MessageSource<Request, NoConfig>,
T: MessageSink<Response, NoConfig>,
T: MessageSink<Response>,
{
fn with_probe<'a>(
&'a mut self,
Expand All @@ -651,7 +651,7 @@ impl<I: MessagePlus, O: MessagePlus> MessageSource<O, NoConfig> for Probe<I, O>
}
}

impl<I: MessagePlus, O: MessagePlus> MessageSink<I, NoConfig> for Probe<I, O> {
impl<I: MessagePlus, O: MessagePlus> MessageSink<I> for Probe<I, O> {
fn get_sender(&self) -> DynSender<I> {
self.input_interceptor.clone().into()
}
Expand Down Expand Up @@ -794,7 +794,7 @@ impl<Request: Message, Response: Message> Default for FakeServerBoxBuilder<Reque
}
}

impl<Request: Message, Response: Message> MessageSink<RequestEnvelope<Request, Response>, NoConfig>
impl<Request: Message, Response: Message> MessageSink<RequestEnvelope<Request, Response>>
for FakeServerBoxBuilder<Request, Response>
{
fn get_sender(&self) -> DynSender<RequestEnvelope<Request, Response>> {
Expand Down
6 changes: 2 additions & 4 deletions crates/core/tedge_agent/src/operation_file_cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use tedge_actors::LoggingReceiver;
use tedge_actors::MessageReceiver;
use tedge_actors::MessageSink;
use tedge_actors::MessageSource;
use tedge_actors::NoConfig;
use tedge_actors::RuntimeError;
use tedge_actors::RuntimeRequest;
use tedge_actors::RuntimeRequestSink;
Expand Down Expand Up @@ -291,8 +290,7 @@ impl FileCacheActorBuilder {
tedge_http_host: Arc<str>,
data_dir: DataDir,
downloader_actor: &mut impl Service<IdDownloadRequest, IdDownloadResult>,
mqtt_actor: &mut (impl MessageSource<MqttMessage, TopicFilter>
+ MessageSink<MqttMessage, NoConfig>),
mqtt_actor: &mut (impl MessageSource<MqttMessage, TopicFilter> + MessageSink<MqttMessage>),
) -> Self {
let message_box = SimpleMessageBoxBuilder::new("RestartManager", 10);

Expand Down Expand Up @@ -323,7 +321,7 @@ impl FileCacheActorBuilder {
}
}

impl MessageSink<FileCacheInput, NoConfig> for FileCacheActorBuilder {
impl MessageSink<FileCacheInput> for FileCacheActorBuilder {
fn get_sender(&self) -> DynSender<FileCacheInput> {
self.message_box.get_sender()
}
Expand Down
2 changes: 1 addition & 1 deletion crates/core/tedge_agent/src/restart_manager/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ impl RestartManagerBuilder {
}
}

impl MessageSink<RestartCommand, NoConfig> for RestartManagerBuilder {
impl MessageSink<RestartCommand> for RestartManagerBuilder {
fn get_sender(&self) -> DynSender<RestartCommand> {
self.message_box.get_sender()
}
Expand Down
2 changes: 1 addition & 1 deletion crates/core/tedge_agent/src/software_manager/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ impl SoftwareManagerBuilder {
}
}

impl MessageSink<SoftwareCommand, NoConfig> for SoftwareManagerBuilder {
impl MessageSink<SoftwareCommand> for SoftwareManagerBuilder {
fn get_sender(&self) -> DynSender<SoftwareCommand> {
self.message_box.get_sender()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,10 @@ impl TedgeOperationConverterBuilder {
pub fn new(
config: OperationConfig,
mut workflows: WorkflowSupervisor,
software_actor: &mut (impl MessageSink<SoftwareCommand, NoConfig>
software_actor: &mut (impl MessageSink<SoftwareCommand>
+ MessageSource<SoftwareCommand, NoConfig>),
restart_actor: &mut (impl MessageSink<RestartCommand, NoConfig>
+ MessageSource<RestartCommand, NoConfig>),
mqtt_actor: &mut (impl MessageSource<MqttMessage, TopicFilter>
+ MessageSink<MqttMessage, NoConfig>),
restart_actor: &mut (impl MessageSink<RestartCommand> + MessageSource<RestartCommand, NoConfig>),
mqtt_actor: &mut (impl MessageSource<MqttMessage, TopicFilter> + MessageSink<MqttMessage>),
script_runner: &mut impl Service<Execute, std::io::Result<Output>>,
) -> Self {
let (input_sender, input_receiver) = mpsc::unbounded();
Expand Down
4 changes: 1 addition & 3 deletions crates/extensions/c8y_firmware_manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use tedge_actors::LinkError;
use tedge_actors::LoggingReceiver;
use tedge_actors::MessageSink;
use tedge_actors::MessageSource;
use tedge_actors::NoConfig;
use tedge_actors::RuntimeRequest;
use tedge_actors::RuntimeRequestSink;
use tedge_actors::Service;
Expand All @@ -47,8 +46,7 @@ pub struct FirmwareManagerBuilder {
impl FirmwareManagerBuilder {
pub fn try_new(
config: FirmwareManagerConfig,
mqtt_actor: &mut (impl MessageSource<MqttMessage, TopicFilter>
+ MessageSink<MqttMessage, NoConfig>),
mqtt_actor: &mut (impl MessageSource<MqttMessage, TopicFilter> + MessageSink<MqttMessage>),
jwt_actor: &mut impl Service<(), JwtResult>,
downloader_actor: &mut impl Service<IdDownloadRequest, IdDownloadResult>,
) -> Result<FirmwareManagerBuilder, FileError> {
Expand Down
3 changes: 1 addition & 2 deletions crates/extensions/c8y_http_proxy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use tedge_actors::Builder;
use tedge_actors::ClientMessageBox;
use tedge_actors::DynSender;
use tedge_actors::MessageSink;
use tedge_actors::NoConfig;
use tedge_actors::RequestEnvelope;
use tedge_actors::RuntimeRequest;
use tedge_actors::RuntimeRequestSink;
Expand Down Expand Up @@ -126,7 +125,7 @@ impl Builder<C8YHttpProxyActor> for C8YHttpProxyBuilder {
}
}

impl MessageSink<RequestEnvelope<C8YRestRequest, C8YRestResult>, NoConfig> for C8YHttpProxyBuilder {
impl MessageSink<RequestEnvelope<C8YRestRequest, C8YRestResult>> for C8YHttpProxyBuilder {
fn get_sender(&self) -> DynSender<RequestEnvelope<C8YRestRequest, C8YRestResult>> {
self.clients.get_sender()
}
Expand Down
6 changes: 2 additions & 4 deletions crates/extensions/c8y_mapper_ext/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use tedge_actors::LoggingSender;
use tedge_actors::MessageReceiver;
use tedge_actors::MessageSink;
use tedge_actors::MessageSource;
use tedge_actors::NoConfig;
use tedge_actors::RuntimeError;
use tedge_actors::RuntimeRequest;
use tedge_actors::RuntimeRequestSink;
Expand Down Expand Up @@ -294,14 +293,13 @@ impl C8yMapperBuilder {
#[allow(clippy::too_many_arguments)]
pub fn try_new(
config: C8yMapperConfig,
mqtt: &mut (impl MessageSource<MqttMessage, TopicFilter> + MessageSink<MqttMessage, NoConfig>),
mqtt: &mut (impl MessageSource<MqttMessage, TopicFilter> + MessageSink<MqttMessage>),
http: &mut impl Service<C8YRestRequest, C8YRestResult>,
timer: &mut impl Service<SyncStart, SyncComplete>,
uploader: &mut impl Service<IdUploadRequest, IdUploadResult>,
downloader: &mut impl Service<IdDownloadRequest, IdDownloadResult>,
fs_watcher: &mut impl MessageSource<FsWatchEvent, PathBuf>,
service_monitor: &mut (impl MessageSource<MqttMessage, TopicFilter>
+ MessageSink<MqttMessage, NoConfig>),
service_monitor: &mut (impl MessageSource<MqttMessage, TopicFilter> + MessageSink<MqttMessage>),
) -> Result<Self, FileError> {
Self::init(&config)?;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub struct OldAgentAdapter;

impl OldAgentAdapter {
pub fn builder(
mqtt: &mut (impl MessageSource<MqttMessage, TopicFilter> + MessageSink<MqttMessage, NoConfig>),
mqtt: &mut (impl MessageSource<MqttMessage, TopicFilter> + MessageSink<MqttMessage>),
) -> ConvertingActorBuilder<OldAgentAdapter> {
let mut builder = ConvertingActor::builder("OldAgentAdapter", OldAgentAdapter);
builder.add_input(old_and_new_command_topics(), mqtt);
Expand Down
3 changes: 1 addition & 2 deletions crates/extensions/tedge_config_manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use tedge_actors::LoggingReceiver;
use tedge_actors::LoggingSender;
use tedge_actors::MessageSink;
use tedge_actors::MessageSource;
use tedge_actors::NoConfig;
use tedge_actors::RuntimeRequest;
use tedge_actors::RuntimeRequestSink;
use tedge_actors::Service;
Expand Down Expand Up @@ -46,7 +45,7 @@ pub struct ConfigManagerBuilder {
impl ConfigManagerBuilder {
pub async fn try_new(
config: ConfigManagerConfig,
mqtt: &mut (impl MessageSource<MqttMessage, TopicFilter> + MessageSink<MqttMessage, NoConfig>),
mqtt: &mut (impl MessageSource<MqttMessage, TopicFilter> + MessageSink<MqttMessage>),
fs_notify: &mut impl MessageSource<FsWatchEvent, PathBuf>,
downloader_actor: &mut impl Service<ConfigDownloadRequest, ConfigDownloadResult>,
uploader_actor: &mut impl Service<ConfigUploadRequest, ConfigUploadResult>,
Expand Down
2 changes: 1 addition & 1 deletion crates/extensions/tedge_health_ext/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl HealthMonitorBuilder {
pub fn from_service_topic_id(
service: Service,
mqtt: &mut (impl MessageSource<MqttMessage, TopicFilter>
+ MessageSink<MqttMessage, NoConfig>
+ MessageSink<MqttMessage>
+ AsMut<MqttConfig>),
// TODO: pass it less annoying way
mqtt_schema: &MqttSchema,
Expand Down
Loading

0 comments on commit bf4fac9

Please sign in to comment.