Skip to content

Commit

Permalink
add redis connection
Browse files Browse the repository at this point in the history
  • Loading branch information
seemenkina committed Jul 1, 2024
1 parent 30b8ebb commit 2ec6b10
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 211 deletions.
5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
workspace = { members = [ "sc_key_store","ds"] }
workspace = { members = ["sc_key_store", "ds"] }
[package]
name = "de-mls"
version = "0.1.0"
Expand All @@ -14,11 +14,12 @@ openmls_traits = "=0.2.0"

# waku-bindings = "0.6.0"
bus = "=2.4.1"
tokio = "=1.38.0"

rand = "=0.8.5"

anyhow = "=1.0.71"
thiserror = "=1.0.61"

ds = { path = "ds" }
sc_key_store = {path = "sc_key_store" }
sc_key_store = { path = "sc_key_store" }
6 changes: 5 additions & 1 deletion ds/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,15 @@ edition = "2021"
# chrono = "=0.4.38"
# waku-bindings = "=0.6.0"
bus = "=2.4.1"
fred = { version = "=9.0.3", features = ["subscriber-client"] }
tokio = "=1.38.0"

openmls = "=0.5.0"
openmls = { version = "=0.5.0", features = ["test-utils"] }
rand = { version = "^0.8" }

anyhow = "=1.0.71"
thiserror = "=1.0.61"

tls_codec = "=0.4.1"

sc_key_store = { path = "../sc_key_store" }
121 changes: 52 additions & 69 deletions ds/src/ds.rs
Original file line number Diff line number Diff line change
@@ -1,89 +1,72 @@
use bus::{Bus, BusReader};
// use chrono::Utc;
use std::collections::HashMap;
use fred::{
clients::{RedisClient, SubscriberClient},
error::RedisError,
prelude::*,
types::Message,
};

use openmls::framing::MlsMessageIn;
// use waku_bindings::*;
use tokio::sync::broadcast::{error::RecvError, Receiver};

use sc_key_store::{pks::PublicKeyStorage, KeyStoreError};
use openmls::{
framing::{MlsMessageIn, MlsMessageOut},
prelude::{TlsDeserializeTrait, TlsSerializeTrait},
};
// use waku_bindings::*;

#[derive(Debug)]
pub struct DSClient {
// node: WakuNodeHandle<Running>,
pub pub_node: Bus<MlsMessageIn>,
pub sub_node: HashMap<Vec<u8>, BusReader<MlsMessageIn>>,
pub struct RClient {
group_id: String,
client: RedisClient,
sub_client: SubscriberClient,
broadcaster: Receiver<Message>,
}

impl DSClient {
pub fn new_with_subscriber(id: Vec<u8>) -> Result<DSClient, DeliveryServiceError> {
let mut ds = DSClient {
pub_node: Bus::new(10),
sub_node: HashMap::new(),
};
ds.add_subscriber(id)?;
Ok(ds)
impl RClient {
pub async fn new_for_group(group_id: String) -> Result<Self, DeliveryServiceError> {
let redis_client = RedisClient::default();
let subscriber: SubscriberClient =
Builder::default_centralized().build_subscriber_client()?;
redis_client.init().await?;
subscriber.init().await?;
subscriber.subscribe(group_id.clone()).await?;
Ok(RClient {
group_id,
client: redis_client,
sub_client: subscriber.clone(),
broadcaster: subscriber.message_rx(),
})
}

pub fn add_subscriber(&mut self, id: Vec<u8>) -> Result<(), DeliveryServiceError> {
let rx = self.pub_node.add_rx();
self.sub_node.insert(id, rx);
pub async fn remove_from_group(&mut self) -> Result<(), DeliveryServiceError> {
self.sub_client.unsubscribe(self.group_id.clone()).await?;
self.sub_client.quit().await?;
self.client.quit().await?;
Ok(())
}

pub fn msg_send(
&mut self,
msg: MlsMessageIn,
// pubsub_topic: WakuPubSubTopic,
// content_topic: WakuContentTopic,
) -> Result<(), DeliveryServiceError> {
// let buff = self.msg.tls_serialize_detached().unwrap();
// Message::encode(&self.msg, &mut buff).expect("Could not encode :(");
self.pub_node.broadcast(msg);

// let waku_message = WakuMessage::new(
// buff,
// content_topic,
// 2,
// Utc::now().timestamp() as usize,
// vec![],
// true,
// );

// node_handle
// .node
// .relay_publish_message(&waku_message, Some(pubsub_topic.clone()), None)
// .map_err(|e| {
// debug!(
// error = tracing::field::debug(&e),
// "Failed to relay publish the message"
// );
// WakuHandlingError::PublishMessage(e)
// });
pub async fn msg_send(&mut self, msg: MlsMessageOut) -> Result<(), DeliveryServiceError> {
let buf = msg.tls_serialize_detached().unwrap();
self.client
.publish(self.group_id.clone(), buf.as_slice())
.await?;

Ok(())
}

pub fn msg_recv(
&mut self,
id: &[u8],
pks: &PublicKeyStorage,
) -> Result<MlsMessageIn, DeliveryServiceError> {
let node: &mut BusReader<MlsMessageIn> = match self.sub_node.get_mut(id) {
Some(node) => node,
None => return Err(DeliveryServiceError::EmptySubNodeError),
};
node.recv().map_err(DeliveryServiceError::RecieveError)
pub async fn msg_recv(&mut self) -> Result<MlsMessageIn, DeliveryServiceError> {
// check only one message
let msg = self.broadcaster.recv().await?;
let bytes: Vec<u8> = msg.value.convert()?;
let res = MlsMessageIn::tls_deserialize_bytes(bytes).unwrap();
Ok(res)
}
}

#[derive(Debug, thiserror::Error)]
pub enum DeliveryServiceError {
#[error("Could not get data from Key Store: {0}")]
KeyStoreError(KeyStoreError),
#[error("Unauthorized User")]
UnauthorizedUserError,
#[error("Subscriber doesn't exist")]
EmptySubNodeError,
#[error("Reciever error: {0}")]
RecieveError(#[from] std::sync::mpsc::RecvError),
#[error("Redis error: {0}")]
RedisError(#[from] RedisError),
#[error("Tokyo error: {0}")]
TokyoRecieveError(#[from] RecvError),
#[error("Serialization problem: {0}")]
TlsError(#[from] tls_codec::Error),
}
59 changes: 28 additions & 31 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ mod identity;
mod openmls_provider;
mod user;

use std::{rc::Rc, str::FromStr};
use std::str::FromStr;

use bus::Bus;
use openmls::framing::{MlsMessageIn, MlsMessageInBody};
use sc_key_store::pks::PublicKeyStorage;
use user::User;

fn main() {
#[tokio::main]
async fn main() {
let mut pks = PublicKeyStorage::default();
// This channel for message before adding to group.
// Message are still encrypted, but this channel not attached to any group
Expand Down Expand Up @@ -41,19 +42,21 @@ fn main() {
//// Alice create group: Alice_Group
println!("Start create group");
let group_name = String::from_str("Alice_Group").unwrap();
let res = a_user.create_group(group_name.clone());
let res = a_user.create_group(group_name.clone()).await;
assert!(res.is_ok());
assert!(a_user.groups.borrow().contains_key("Alice_Group"));
assert!(a_user.groups.contains_key("Alice_Group"));
println!("Create group successfully");
//////

//// Alice invite Bob
println!("Alice inviting Bob");
let welcome = a_user.invite(b_user.username(), group_name.clone(), &mut pks);
let welcome = a_user
.invite(b_user.username(), group_name.clone(), &mut pks)
.await;
assert!(welcome.is_ok());
// Alice should skip message with invite update because she already update her instance
// It is failed because of wrong epoch
let res = a_user.recieve_msg(group_name.clone(), &pks);
let res = a_user.recieve_msg(group_name.clone()).await;
assert!(res.is_err());

//// Send welcome message to system broadcast. Only Bob can use it
Expand All @@ -63,13 +66,9 @@ fn main() {
assert!(welc.is_ok());
let _ = match welc.unwrap().extract() {
MlsMessageInBody::Welcome(welcome) => {
let res = b_user.join_group(
welcome,
// same ds_node, need to think how to process this
Rc::clone(&a_user.groups.borrow().get("Alice_Group").unwrap().ds_node),
);
let res = b_user.join_group(welcome).await;
assert!(res.is_ok());
assert!(b_user.groups.borrow().contains_key("Alice_Group"));
assert!(b_user.groups.contains_key("Alice_Group"));
Ok(())
}
_ => Err("do nothing".to_string()),
Expand All @@ -78,26 +77,26 @@ fn main() {
/////

//// Bob send message and Alice recieve it
let res = b_user.send_msg("Hi!", group_name.clone());
let res = b_user.send_msg("Hi!", group_name.clone()).await;
assert!(res.is_ok());

// Bob also get the message but he cant decrypt it (regarding the mls rfc)
let res = b_user.recieve_msg(group_name.clone(), &pks);
let res = b_user.recieve_msg(group_name.clone()).await;
// Expected error with invalid decryption
assert!(res.is_err());

let res = a_user.recieve_msg(group_name.clone(), &pks);
let res = a_user.recieve_msg(group_name.clone()).await;
assert!(res.is_ok());
/////

//// Alice send message and Bob recieve it
let res = a_user.send_msg("Hi Bob!", group_name.clone());
let res = a_user.send_msg("Hi Bob!", group_name.clone()).await;
assert!(res.is_ok());

let res = a_user.recieve_msg(group_name.clone(), &pks);
let res = a_user.recieve_msg(group_name.clone()).await;
assert!(res.is_err());

let res = b_user.recieve_msg(group_name.clone(), &pks);
let res = b_user.recieve_msg(group_name.clone()).await;
assert!(res.is_ok());
/////

Expand All @@ -119,13 +118,15 @@ fn main() {

//// Alice invite Carla
println!("Alice inviting Carla");
let welcome = a_user.invite(c_user.username(), group_name.clone(), &mut pks);
let welcome = a_user
.invite(c_user.username(), group_name.clone(), &mut pks)
.await;
assert!(welcome.is_ok());
// Alice should skip message with invite update because she already update her instance
// It is failed because of wrong epoch
let res = a_user.recieve_msg(group_name.clone(), &pks);
let res = a_user.recieve_msg(group_name.clone()).await;
assert!(res.is_err());
let res = b_user.recieve_msg(group_name.clone(), &pks);
let res = b_user.recieve_msg(group_name.clone()).await;
assert!(res.is_ok());

//// Send welcome message to system broadcast. Only Bob can use it
Expand All @@ -136,13 +137,9 @@ fn main() {
assert!(welc.is_ok());
let _ = match welc.unwrap().extract() {
MlsMessageInBody::Welcome(welcome) => {
let res = c_user.join_group(
welcome,
// same ds_node, need to think how to process this
Rc::clone(&a_user.groups.borrow().get("Alice_Group").unwrap().ds_node),
);
let res = c_user.join_group(welcome).await;
assert!(res.is_ok());
assert!(c_user.groups.borrow().contains_key("Alice_Group"));
assert!(c_user.groups.contains_key("Alice_Group"));
Ok(())
}
_ => Err("do nothing".to_string()),
Expand All @@ -151,16 +148,16 @@ fn main() {
/////

//// Carla send message and Alice and Bob recieve it
let res = c_user.send_msg("Hi all!", group_name.clone());
let res = c_user.send_msg("Hi all!", group_name.clone()).await;
assert!(res.is_ok());

let res = c_user.recieve_msg(group_name.clone(), &pks);
let res = c_user.recieve_msg(group_name.clone()).await;
assert!(res.is_err());

let res = a_user.recieve_msg(group_name.clone(), &pks);
let res = a_user.recieve_msg(group_name.clone()).await;
assert!(res.is_ok());

let res = b_user.recieve_msg(group_name.clone(), &pks);
let res = b_user.recieve_msg(group_name.clone()).await;
assert!(res.is_ok());
////

Expand Down
Loading

0 comments on commit 2ec6b10

Please sign in to comment.