Skip to content

Commit

Permalink
Complete rewrite without subscriber logic
Browse files Browse the repository at this point in the history
  • Loading branch information
erer1243 committed Jan 9, 2025
1 parent 17c5aec commit 241be4f
Show file tree
Hide file tree
Showing 9 changed files with 93 additions and 251 deletions.
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/swss-common-bridge/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ tokio.workspace = true
tokio-util.workspace = true
serde.workspace = true
serde_json.workspace = true
replace_with = "0.1"

[lints]
workspace = true
76 changes: 76 additions & 0 deletions crates/swss-common-bridge/src/consumer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
use std::future::Future;
use swbus_actor::prelude::*;
use swss_common::{ConsumerStateTable, KeyOpFieldValues, SubscriberStateTable, ZmqConsumerStateTable};
use tokio_util::task::AbortOnDropHandle;

/// A bridge that converts swss consumer table updates to Swbus messages.
///
/// That is, a [`ConsumerStateTable`], [`SubscriberStateTable`], or [`ZmqConsumerStateTable`].
pub struct ConsumerTableBridge<T>(State<T>);

enum State<T> {
WaitingForInit { table: T, destination: ServicePath },
Running { _watcher_task: AbortOnDropHandle<()> },
}

impl<T: ConsumerTable + 'static> ConsumerTableBridge<T> {
pub fn new(table: T, destination: ServicePath) -> ConsumerTableBridge<T> {
ConsumerTableBridge(State::WaitingForInit { table, destination })
}
}

impl<T: ConsumerTable + 'static> Actor for ConsumerTableBridge<T> {
async fn init(&mut self, outbox: Outbox) {
replace_with::replace_with_or_abort(&mut self.0, |state| match state {
State::WaitingForInit { table, destination } => State::Running {
_watcher_task: AbortOnDropHandle::new(tokio::spawn(watch_table(table, outbox, destination))),
},
_ => unreachable!(),
});
}

async fn handle_message(&mut self, message: IncomingMessage, outbox: Outbox) {
let err = "ConsumerTableBridge doesn't expect any messages".to_string();
let msg = OutgoingMessage::error_response(message, SwbusErrorCode::InvalidPayload, err);
outbox.send(msg).await;
}

async fn handle_message_failure(&mut self, _id: MessageId, _addr: ServicePath, _outbox: Outbox) {}
}

async fn watch_table<T: ConsumerTable>(mut table: T, outbox: Outbox, destination: ServicePath) {
loop {
table.read_data_async().await;
for kfvs in table.pops() {
let payload = crate::encode_kfvs(&kfvs);
outbox
.send(OutgoingMessage::request(destination.clone(), payload))
.await;
}
}
}

pub trait ConsumerTable: Send {
fn read_data_async(&mut self) -> impl Future<Output = ()> + Send;
fn pops(&mut self) -> Vec<KeyOpFieldValues>;
}

macro_rules! impl_consumertable {
($t:ty) => {
impl ConsumerTable for $t {
async fn read_data_async(&mut self) {
<$t>::read_data_async(self)
.await
.expect("read_data_async io error");
}

fn pops(&mut self) -> Vec<KeyOpFieldValues> {
<$t>::pops(self)
}
}
};
}

impl_consumertable!(ConsumerStateTable);
impl_consumertable!(SubscriberStateTable);
impl_consumertable!(ZmqConsumerStateTable);
83 changes: 9 additions & 74 deletions crates/swss-common-bridge/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,80 +1,15 @@
pub mod payload;
mod consumer;
mod producer;

mod oneshot_lazy;
mod table;
mod table_watcher;
use std::error::Error;
use swss_common::KeyOpFieldValues;

use swbus_actor::prelude::*;
use swss_common::{ConsumerStateTable, SubscriberStateTable, ZmqConsumerStateTable};
use table::Table;
use table_watcher::{TableWatcher, TableWatcherMessage};
use tokio::sync::mpsc;
use tokio_util::task::AbortOnDropHandle;
pub use consumer::ConsumerTableBridge;

/// A bridge that converts swss consumer table updates to Swbus messages.
pub struct ConsumerBridge {
outbox_tx: oneshot_lazy::Sender<Outbox>,
tw_msg_tx: mpsc::Sender<TableWatcherMessage>,
_table_watcher_task: AbortOnDropHandle<()>,
pub fn encode_kfvs(kfvs: &KeyOpFieldValues) -> Vec<u8> {
serde_json::to_vec(kfvs).unwrap()
}

impl ConsumerBridge {
pub fn new_consumer_state_table(table: ConsumerStateTable) -> Self {
Self::new(table)
}

pub fn new_subscriber_state_table(table: SubscriberStateTable) -> Self {
Self::new(table)
}

pub fn new_zmq_consumer_state_table(table: ZmqConsumerStateTable) -> Self {
Self::new(table)
}

fn new<T: Table + 'static>(table: T) -> Self {
let (tw_msg_tx, tw_msg_rx) = mpsc::channel(1024);
let (outbox_tx, outbox_rx) = oneshot_lazy::channel();
let table_watcher = TableWatcher::new(table, outbox_rx, tw_msg_rx);
let _table_watcher_task = AbortOnDropHandle::new(tokio::spawn(table_watcher.run()));
Self {
tw_msg_tx,
outbox_tx,
_table_watcher_task,
}
}
}

impl Actor for ConsumerBridge {
async fn init(&mut self, outbox: Outbox) {
self.outbox_tx
.send(outbox.clone())
.unwrap_or_else(|_| unreachable!("outbox_tx.send failed"));
}

async fn handle_message(&mut self, message: IncomingMessage, outbox: Outbox) {
match &message.body {
// Requests are encoded TableWatcherMessages. Decode it and send it to the TableWatcher
MessageBody::Request(req) => match payload::decode_table_watcher_message(&req.payload) {
Ok(tw_msg) => {
self.tw_msg_tx.send(tw_msg).await.expect("TableWatcher task died");
let msg = OutgoingMessage::ok_response(message);
outbox.send(msg).await;
}

Err(e) => {
let msg = OutgoingMessage::error_response(message, SwbusErrorCode::InvalidPayload, e.to_string());
outbox.send(msg).await;
}
},
MessageBody::Response(_) => (),
}
}

async fn handle_message_failure(&mut self, _id: MessageId, subscriber: ServicePath, _outbox: Outbox) {
// If a message failed to be delivered, we unsubscribe the client so we don't waste bandwidth on them in the future
self.tw_msg_tx
.send(TableWatcherMessage::Unsubscribe { subscriber })
.await
.expect("TableWatcher task died");
}
pub fn decode_kfvs(payload: &[u8]) -> Result<KeyOpFieldValues, Box<dyn Error>> {
Ok(serde_json::from_slice(payload)?)
}
37 changes: 0 additions & 37 deletions crates/swss-common-bridge/src/oneshot_lazy.rs

This file was deleted.

29 changes: 0 additions & 29 deletions crates/swss-common-bridge/src/payload.rs

This file was deleted.

Empty file.
37 changes: 0 additions & 37 deletions crates/swss-common-bridge/src/table.rs

This file was deleted.

74 changes: 0 additions & 74 deletions crates/swss-common-bridge/src/table_watcher.rs

This file was deleted.

0 comments on commit 241be4f

Please sign in to comment.