-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
c14b0a1
commit c3be050
Showing
5 changed files
with
165 additions
and
211 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,89 +1,72 @@ | ||
use bus::{Bus, BusReader}; | ||
// use chrono::Utc; | ||
use std::collections::HashMap; | ||
use fred::{ | ||
clients::{RedisClient, SubscriberClient}, | ||
error::RedisError, | ||
prelude::*, | ||
types::Message, | ||
}; | ||
|
||
use openmls::framing::MlsMessageIn; | ||
// use waku_bindings::*; | ||
use tokio::sync::broadcast::{error::RecvError, Receiver}; | ||
|
||
use sc_key_store::{pks::PublicKeyStorage, KeyStoreError}; | ||
use openmls::{ | ||
framing::{MlsMessageIn, MlsMessageOut}, | ||
prelude::{TlsDeserializeTrait, TlsSerializeTrait}, | ||
}; | ||
// use waku_bindings::*; | ||
|
||
#[derive(Debug)] | ||
pub struct DSClient { | ||
// node: WakuNodeHandle<Running>, | ||
pub pub_node: Bus<MlsMessageIn>, | ||
pub sub_node: HashMap<Vec<u8>, BusReader<MlsMessageIn>>, | ||
pub struct RClient { | ||
group_id: String, | ||
client: RedisClient, | ||
sub_client: SubscriberClient, | ||
broadcaster: Receiver<Message>, | ||
} | ||
|
||
impl DSClient { | ||
pub fn new_with_subscriber(id: Vec<u8>) -> Result<DSClient, DeliveryServiceError> { | ||
let mut ds = DSClient { | ||
pub_node: Bus::new(10), | ||
sub_node: HashMap::new(), | ||
}; | ||
ds.add_subscriber(id)?; | ||
Ok(ds) | ||
impl RClient { | ||
pub async fn new_for_group(group_id: String) -> Result<Self, DeliveryServiceError> { | ||
let redis_client = RedisClient::default(); | ||
let subscriber: SubscriberClient = | ||
Builder::default_centralized().build_subscriber_client()?; | ||
redis_client.init().await?; | ||
subscriber.init().await?; | ||
subscriber.subscribe(group_id.clone()).await?; | ||
Ok(RClient { | ||
group_id, | ||
client: redis_client, | ||
sub_client: subscriber.clone(), | ||
broadcaster: subscriber.message_rx(), | ||
}) | ||
} | ||
|
||
pub fn add_subscriber(&mut self, id: Vec<u8>) -> Result<(), DeliveryServiceError> { | ||
let rx = self.pub_node.add_rx(); | ||
self.sub_node.insert(id, rx); | ||
pub async fn remove_from_group(&mut self) -> Result<(), DeliveryServiceError> { | ||
self.sub_client.unsubscribe(self.group_id.clone()).await?; | ||
self.sub_client.quit().await?; | ||
self.client.quit().await?; | ||
Ok(()) | ||
} | ||
|
||
pub fn msg_send( | ||
&mut self, | ||
msg: MlsMessageIn, | ||
// pubsub_topic: WakuPubSubTopic, | ||
// content_topic: WakuContentTopic, | ||
) -> Result<(), DeliveryServiceError> { | ||
// let buff = self.msg.tls_serialize_detached().unwrap(); | ||
// Message::encode(&self.msg, &mut buff).expect("Could not encode :("); | ||
self.pub_node.broadcast(msg); | ||
|
||
// let waku_message = WakuMessage::new( | ||
// buff, | ||
// content_topic, | ||
// 2, | ||
// Utc::now().timestamp() as usize, | ||
// vec![], | ||
// true, | ||
// ); | ||
|
||
// node_handle | ||
// .node | ||
// .relay_publish_message(&waku_message, Some(pubsub_topic.clone()), None) | ||
// .map_err(|e| { | ||
// debug!( | ||
// error = tracing::field::debug(&e), | ||
// "Failed to relay publish the message" | ||
// ); | ||
// WakuHandlingError::PublishMessage(e) | ||
// }); | ||
pub async fn msg_send(&mut self, msg: MlsMessageOut) -> Result<(), DeliveryServiceError> { | ||
let buf = msg.tls_serialize_detached().unwrap(); | ||
self.client | ||
.publish(self.group_id.clone(), buf.as_slice()) | ||
.await?; | ||
|
||
Ok(()) | ||
} | ||
|
||
pub fn msg_recv( | ||
&mut self, | ||
id: &[u8], | ||
pks: &PublicKeyStorage, | ||
) -> Result<MlsMessageIn, DeliveryServiceError> { | ||
let node: &mut BusReader<MlsMessageIn> = match self.sub_node.get_mut(id) { | ||
Some(node) => node, | ||
None => return Err(DeliveryServiceError::EmptySubNodeError), | ||
}; | ||
node.recv().map_err(DeliveryServiceError::RecieveError) | ||
pub async fn msg_recv(&mut self) -> Result<MlsMessageIn, DeliveryServiceError> { | ||
// check only one message | ||
let msg = self.broadcaster.recv().await?; | ||
let bytes: Vec<u8> = msg.value.convert()?; | ||
let res = MlsMessageIn::tls_deserialize_bytes(bytes).unwrap(); | ||
Ok(res) | ||
} | ||
} | ||
|
||
#[derive(Debug, thiserror::Error)] | ||
pub enum DeliveryServiceError { | ||
#[error("Could not get data from Key Store: {0}")] | ||
KeyStoreError(KeyStoreError), | ||
#[error("Unauthorized User")] | ||
UnauthorizedUserError, | ||
#[error("Subscriber doesn't exist")] | ||
EmptySubNodeError, | ||
#[error("Reciever error: {0}")] | ||
RecieveError(#[from] std::sync::mpsc::RecvError), | ||
#[error("Redis error: {0}")] | ||
RedisError(#[from] RedisError), | ||
#[error("Tokyo error: {0}")] | ||
TokyoRecieveError(#[from] RecvError), | ||
#[error("Serialization problem: {0}")] | ||
TlsError(#[from] tls_codec::Error), | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.