From 2ec6b10f39d59f6ba42ebe2544e2373edb49e108 Mon Sep 17 00:00:00 2001 From: seemenkina Date: Mon, 1 Jul 2024 12:21:14 +0700 Subject: [PATCH] add redis connection --- Cargo.toml | 5 +- ds/Cargo.toml | 6 +- ds/src/ds.rs | 121 ++++++++++++++------------------- src/main.rs | 59 ++++++++-------- src/user.rs | 185 +++++++++++++++++++++----------------------------- 5 files changed, 165 insertions(+), 211 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index c2fac4c..d857fd2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,4 +1,4 @@ -workspace = { members = [ "sc_key_store","ds"] } +workspace = { members = ["sc_key_store", "ds"] } [package] name = "de-mls" version = "0.1.0" @@ -14,6 +14,7 @@ openmls_traits = "=0.2.0" # waku-bindings = "0.6.0" bus = "=2.4.1" +tokio = "=1.38.0" rand = "=0.8.5" @@ -21,4 +22,4 @@ anyhow = "=1.0.71" thiserror = "=1.0.61" ds = { path = "ds" } -sc_key_store = {path = "sc_key_store" } +sc_key_store = { path = "sc_key_store" } diff --git a/ds/Cargo.toml b/ds/Cargo.toml index 04d2df8..06ca4a0 100644 --- a/ds/Cargo.toml +++ b/ds/Cargo.toml @@ -9,11 +9,15 @@ edition = "2021" # chrono = "=0.4.38" # waku-bindings = "=0.6.0" bus = "=2.4.1" +fred = { version = "=9.0.3", features = ["subscriber-client"] } +tokio = "=1.38.0" -openmls = "=0.5.0" +openmls = { version = "=0.5.0", features = ["test-utils"] } rand = { version = "^0.8" } anyhow = "=1.0.71" thiserror = "=1.0.61" +tls_codec = "=0.4.1" + sc_key_store = { path = "../sc_key_store" } diff --git a/ds/src/ds.rs b/ds/src/ds.rs index 40dddf7..2bcba44 100644 --- a/ds/src/ds.rs +++ b/ds/src/ds.rs @@ -1,89 +1,72 @@ -use bus::{Bus, BusReader}; -// use chrono::Utc; -use std::collections::HashMap; +use fred::{ + clients::{RedisClient, SubscriberClient}, + error::RedisError, + prelude::*, + types::Message, +}; -use openmls::framing::MlsMessageIn; -// use waku_bindings::*; +use tokio::sync::broadcast::{error::RecvError, Receiver}; -use sc_key_store::{pks::PublicKeyStorage, KeyStoreError}; +use openmls::{ + framing::{MlsMessageIn, MlsMessageOut}, + prelude::{TlsDeserializeTrait, TlsSerializeTrait}, +}; +// use waku_bindings::*; -#[derive(Debug)] -pub struct DSClient { - // node: WakuNodeHandle, - pub pub_node: Bus, - pub sub_node: HashMap, BusReader>, +pub struct RClient { + group_id: String, + client: RedisClient, + sub_client: SubscriberClient, + broadcaster: Receiver, } -impl DSClient { - pub fn new_with_subscriber(id: Vec) -> Result { - let mut ds = DSClient { - pub_node: Bus::new(10), - sub_node: HashMap::new(), - }; - ds.add_subscriber(id)?; - Ok(ds) +impl RClient { + pub async fn new_for_group(group_id: String) -> Result { + let redis_client = RedisClient::default(); + let subscriber: SubscriberClient = + Builder::default_centralized().build_subscriber_client()?; + redis_client.init().await?; + subscriber.init().await?; + subscriber.subscribe(group_id.clone()).await?; + Ok(RClient { + group_id, + client: redis_client, + sub_client: subscriber.clone(), + broadcaster: subscriber.message_rx(), + }) } - pub fn add_subscriber(&mut self, id: Vec) -> Result<(), DeliveryServiceError> { - let rx = self.pub_node.add_rx(); - self.sub_node.insert(id, rx); + pub async fn remove_from_group(&mut self) -> Result<(), DeliveryServiceError> { + self.sub_client.unsubscribe(self.group_id.clone()).await?; + self.sub_client.quit().await?; + self.client.quit().await?; Ok(()) } - pub fn msg_send( - &mut self, - msg: MlsMessageIn, - // pubsub_topic: WakuPubSubTopic, - // content_topic: WakuContentTopic, - ) -> Result<(), DeliveryServiceError> { - // let buff = self.msg.tls_serialize_detached().unwrap(); - // Message::encode(&self.msg, &mut buff).expect("Could not encode :("); - self.pub_node.broadcast(msg); - - // let waku_message = WakuMessage::new( - // buff, - // content_topic, - // 2, - // Utc::now().timestamp() as usize, - // vec![], - // true, - // ); - - // node_handle - // .node - // .relay_publish_message(&waku_message, Some(pubsub_topic.clone()), None) - // .map_err(|e| { - // debug!( - // error = tracing::field::debug(&e), - // "Failed to relay publish the message" - // ); - // WakuHandlingError::PublishMessage(e) - // }); + pub async fn msg_send(&mut self, msg: MlsMessageOut) -> Result<(), DeliveryServiceError> { + let buf = msg.tls_serialize_detached().unwrap(); + self.client + .publish(self.group_id.clone(), buf.as_slice()) + .await?; Ok(()) } - pub fn msg_recv( - &mut self, - id: &[u8], - pks: &PublicKeyStorage, - ) -> Result { - let node: &mut BusReader = match self.sub_node.get_mut(id) { - Some(node) => node, - None => return Err(DeliveryServiceError::EmptySubNodeError), - }; - node.recv().map_err(DeliveryServiceError::RecieveError) + pub async fn msg_recv(&mut self) -> Result { + // check only one message + let msg = self.broadcaster.recv().await?; + let bytes: Vec = msg.value.convert()?; + let res = MlsMessageIn::tls_deserialize_bytes(bytes).unwrap(); + Ok(res) } } #[derive(Debug, thiserror::Error)] pub enum DeliveryServiceError { - #[error("Could not get data from Key Store: {0}")] - KeyStoreError(KeyStoreError), - #[error("Unauthorized User")] - UnauthorizedUserError, - #[error("Subscriber doesn't exist")] - EmptySubNodeError, - #[error("Reciever error: {0}")] - RecieveError(#[from] std::sync::mpsc::RecvError), + #[error("Redis error: {0}")] + RedisError(#[from] RedisError), + #[error("Tokyo error: {0}")] + TokyoRecieveError(#[from] RecvError), + #[error("Serialization problem: {0}")] + TlsError(#[from] tls_codec::Error), } diff --git a/src/main.rs b/src/main.rs index 5c27299..4c97c71 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,14 +3,15 @@ mod identity; mod openmls_provider; mod user; -use std::{rc::Rc, str::FromStr}; +use std::str::FromStr; use bus::Bus; use openmls::framing::{MlsMessageIn, MlsMessageInBody}; use sc_key_store::pks::PublicKeyStorage; use user::User; -fn main() { +#[tokio::main] +async fn main() { let mut pks = PublicKeyStorage::default(); // This channel for message before adding to group. // Message are still encrypted, but this channel not attached to any group @@ -41,19 +42,21 @@ fn main() { //// Alice create group: Alice_Group println!("Start create group"); let group_name = String::from_str("Alice_Group").unwrap(); - let res = a_user.create_group(group_name.clone()); + let res = a_user.create_group(group_name.clone()).await; assert!(res.is_ok()); - assert!(a_user.groups.borrow().contains_key("Alice_Group")); + assert!(a_user.groups.contains_key("Alice_Group")); println!("Create group successfully"); ////// //// Alice invite Bob println!("Alice inviting Bob"); - let welcome = a_user.invite(b_user.username(), group_name.clone(), &mut pks); + let welcome = a_user + .invite(b_user.username(), group_name.clone(), &mut pks) + .await; assert!(welcome.is_ok()); // Alice should skip message with invite update because she already update her instance // It is failed because of wrong epoch - let res = a_user.recieve_msg(group_name.clone(), &pks); + let res = a_user.recieve_msg(group_name.clone()).await; assert!(res.is_err()); //// Send welcome message to system broadcast. Only Bob can use it @@ -63,13 +66,9 @@ fn main() { assert!(welc.is_ok()); let _ = match welc.unwrap().extract() { MlsMessageInBody::Welcome(welcome) => { - let res = b_user.join_group( - welcome, - // same ds_node, need to think how to process this - Rc::clone(&a_user.groups.borrow().get("Alice_Group").unwrap().ds_node), - ); + let res = b_user.join_group(welcome).await; assert!(res.is_ok()); - assert!(b_user.groups.borrow().contains_key("Alice_Group")); + assert!(b_user.groups.contains_key("Alice_Group")); Ok(()) } _ => Err("do nothing".to_string()), @@ -78,26 +77,26 @@ fn main() { ///// //// Bob send message and Alice recieve it - let res = b_user.send_msg("Hi!", group_name.clone()); + let res = b_user.send_msg("Hi!", group_name.clone()).await; assert!(res.is_ok()); // Bob also get the message but he cant decrypt it (regarding the mls rfc) - let res = b_user.recieve_msg(group_name.clone(), &pks); + let res = b_user.recieve_msg(group_name.clone()).await; // Expected error with invalid decryption assert!(res.is_err()); - let res = a_user.recieve_msg(group_name.clone(), &pks); + let res = a_user.recieve_msg(group_name.clone()).await; assert!(res.is_ok()); ///// //// Alice send message and Bob recieve it - let res = a_user.send_msg("Hi Bob!", group_name.clone()); + let res = a_user.send_msg("Hi Bob!", group_name.clone()).await; assert!(res.is_ok()); - let res = a_user.recieve_msg(group_name.clone(), &pks); + let res = a_user.recieve_msg(group_name.clone()).await; assert!(res.is_err()); - let res = b_user.recieve_msg(group_name.clone(), &pks); + let res = b_user.recieve_msg(group_name.clone()).await; assert!(res.is_ok()); ///// @@ -119,13 +118,15 @@ fn main() { //// Alice invite Carla println!("Alice inviting Carla"); - let welcome = a_user.invite(c_user.username(), group_name.clone(), &mut pks); + let welcome = a_user + .invite(c_user.username(), group_name.clone(), &mut pks) + .await; assert!(welcome.is_ok()); // Alice should skip message with invite update because she already update her instance // It is failed because of wrong epoch - let res = a_user.recieve_msg(group_name.clone(), &pks); + let res = a_user.recieve_msg(group_name.clone()).await; assert!(res.is_err()); - let res = b_user.recieve_msg(group_name.clone(), &pks); + let res = b_user.recieve_msg(group_name.clone()).await; assert!(res.is_ok()); //// Send welcome message to system broadcast. Only Bob can use it @@ -136,13 +137,9 @@ fn main() { assert!(welc.is_ok()); let _ = match welc.unwrap().extract() { MlsMessageInBody::Welcome(welcome) => { - let res = c_user.join_group( - welcome, - // same ds_node, need to think how to process this - Rc::clone(&a_user.groups.borrow().get("Alice_Group").unwrap().ds_node), - ); + let res = c_user.join_group(welcome).await; assert!(res.is_ok()); - assert!(c_user.groups.borrow().contains_key("Alice_Group")); + assert!(c_user.groups.contains_key("Alice_Group")); Ok(()) } _ => Err("do nothing".to_string()), @@ -151,16 +148,16 @@ fn main() { ///// //// Carla send message and Alice and Bob recieve it - let res = c_user.send_msg("Hi all!", group_name.clone()); + let res = c_user.send_msg("Hi all!", group_name.clone()).await; assert!(res.is_ok()); - let res = c_user.recieve_msg(group_name.clone(), &pks); + let res = c_user.recieve_msg(group_name.clone()).await; assert!(res.is_err()); - let res = a_user.recieve_msg(group_name.clone(), &pks); + let res = a_user.recieve_msg(group_name.clone()).await; assert!(res.is_ok()); - let res = b_user.recieve_msg(group_name.clone(), &pks); + let res = b_user.recieve_msg(group_name.clone()).await; assert!(res.is_ok()); //// diff --git a/src/user.rs b/src/user.rs index b0d7edd..5ffa525 100644 --- a/src/user.rs +++ b/src/user.rs @@ -1,6 +1,6 @@ use std::str::Utf8Error; use std::string::FromUtf8Error; -use std::{cell::RefCell, collections::HashMap, rc::Rc, str}; +use std::{cell::RefCell, collections::HashMap, str}; use ds::ds::*; use openmls::{group::*, prelude::*}; @@ -13,19 +13,18 @@ use crate::conversation::*; use crate::identity::{Identity, IdentityError}; use crate::openmls_provider::{CryptoProvider, CIPHERSUITE}; -#[derive(Debug)] pub struct Group { group_name: String, conversation: Conversation, mls_group: RefCell, - pub ds_node: Rc>, + rc_client: RClient, // pubsub_topic: WakuPubSubTopic, // content_topics: Vec, } pub struct User { - pub(crate) identity: RefCell, - pub(crate) groups: RefCell>, + pub(crate) identity: Identity, + pub(crate) groups: HashMap, provider: CryptoProvider, local_ks: LocalCache, // pub(crate) contacts: HashMap, WakuPeers>, @@ -37,8 +36,8 @@ impl User { let crypto = CryptoProvider::default(); let id = Identity::new(CIPHERSUITE, &crypto, username)?; Ok(User { - groups: RefCell::new(HashMap::new()), - identity: RefCell::new(id), + groups: HashMap::new(), + identity: id, provider: crypto, local_ks: LocalCache::empty_key_store(username), // contacts: HashMap::new(), @@ -46,26 +45,17 @@ impl User { } pub(crate) fn username(&self) -> String { - self.identity.borrow().to_string() + self.identity.to_string() } pub(crate) fn id(&self) -> Vec { - self.identity.borrow().identity() + self.identity.identity() } - fn is_signature_eq(&self, sign: &Vec) -> bool { - self.identity - .borrow() - .credential_with_key - .signature_key - .as_slice() - == sign - } - - pub fn create_group(&mut self, group_name: String) -> Result<(), UserError> { + pub async fn create_group(&mut self, group_name: String) -> Result<(), UserError> { let group_id = group_name.as_bytes(); - if self.groups.borrow().contains_key(&group_name) { + if self.groups.contains_key(&group_name) { return Err(UserError::UnknownGroupError(group_name)); } @@ -75,39 +65,39 @@ impl User { let mls_group = MlsGroup::new_with_group_id( &self.provider, - &self.identity.borrow().signer, + &self.identity.signer, &group_config, GroupId::from_slice(group_id), - self.identity.borrow().credential_with_key.clone(), + self.identity.credential_with_key.clone(), )?; - let ds = DSClient::new_with_subscriber(self.id())?; + let rc = RClient::new_for_group(group_name.clone()).await?; let group = Group { group_name: group_name.clone(), conversation: Conversation::default(), mls_group: RefCell::new(mls_group), - ds_node: Rc::new(RefCell::new(ds)), + rc_client: rc, // pubsub_topic: WakuPubSubTopic::new(), // content_topics: Vec::new(), }; - self.groups.borrow_mut().insert(group_name, group); + self.groups.insert(group_name, group); Ok(()) } pub fn register(&mut self, mut pks: &mut PublicKeyStorage) -> Result<(), UserError> { - pks.add_user(self.key_packages(), self.identity.borrow().signer.public())?; + pks.add_user(self.key_packages(), self.identity.signer.public())?; self.local_ks.get_update_from_smart_contract(pks)?; Ok(()) } /// Get the key packages fo this user. pub fn key_packages(&self) -> UserKeyPackages { - let mut kpgs = self.identity.borrow().kp.clone(); + let mut kpgs = self.identity.kp.clone(); UserKeyPackages(kpgs.drain().collect::, KeyPackage)>>()) } - pub fn invite( + pub async fn invite( &mut self, username: String, group_name: String, @@ -122,20 +112,18 @@ impl User { let joiner_key_package = pks.get_avaliable_user_kp(username.as_bytes())?; // Build a proposal with this key package and do the MLS bits. - let mut groups = self.groups.borrow_mut(); - let group = match groups.get_mut(&group_name) { + let group = match self.groups.get_mut(&group_name) { Some(g) => g, None => return Err(UserError::UnknownGroupError(group_name)), }; let (out_messages, welcome, _group_info) = group.mls_group.borrow_mut().add_members( &self.provider, - &self.identity.borrow().signer, + &self.identity.signer, &[joiner_key_package], )?; - let msg = out_messages.into(); - group.ds_node.as_ref().borrow_mut().msg_send(msg)?; + group.rc_client.msg_send(out_messages).await?; // Second, process the invitation on our end. group .mls_group @@ -143,28 +131,18 @@ impl User { .merge_pending_commit(&self.provider)?; // Put sending welcome by p2p here - // group.ds_node.as_ref().borrow_mut().msg_send(welcome.into())?; - - drop(groups); Ok(welcome.into()) } - pub fn recieve_msg(&self, group_name: String, pks: &PublicKeyStorage) -> Result<(), UserError> { - let msg = { - let groups = self.groups.borrow(); - let group = match groups.get(&group_name) { - Some(g) => g, - None => return Err(UserError::UnknownGroupError(group_name)), - }; - let msg = group - .ds_node - .as_ref() - .borrow_mut() - .msg_recv(self.id().as_ref(), pks)?; - msg + pub async fn recieve_msg(&mut self, group_name: String) -> Result<(), UserError> { + let group = match self.groups.get_mut(&group_name) { + Some(g) => g, + None => return Err(UserError::UnknownGroupError(group_name)), }; + let msg = group.rc_client.msg_recv().await?; + match msg.extract() { MlsMessageInBody::Welcome(_welcome) => { // Now irrelevant because message are attached to group @@ -181,10 +159,9 @@ impl User { Ok(()) } - fn process_protocol_msg(&self, message: ProtocolMessage) -> Result<(), UserError> { - let mut groups = self.groups.borrow_mut(); + fn process_protocol_msg(&mut self, message: ProtocolMessage) -> Result<(), UserError> { let group_name = str::from_utf8(message.group_id().as_slice())?; - let group = match groups.get_mut(group_name) { + let group = match self.groups.get_mut(group_name) { Some(g) => g, None => return Err(UserError::UnknownGroupError(group_name.to_string())), }; @@ -202,7 +179,6 @@ impl User { == processed_message_credential.identity() && (self .identity - .borrow() .credential_with_key .signature_key .as_slice() @@ -236,7 +212,7 @@ impl User { if remove_proposal { println!( "update::Processing StagedCommitMessage removing {} from group {} ", - self.username(), + self.identity.to_string(), group.group_name ); return Ok(()); @@ -246,29 +222,23 @@ impl User { Ok(()) } - pub fn send_msg(&mut self, msg: &str, group_name: String) -> Result<(), UserError> { - let groups = self.groups.borrow(); - let group = match groups.get(&group_name) { + pub async fn send_msg(&mut self, msg: &str, group_name: String) -> Result<(), UserError> { + let group = match self.groups.get_mut(&group_name) { Some(g) => g, None => return Err(UserError::UnknownGroupError(group_name)), }; let message_out = group.mls_group.borrow_mut().create_message( &self.provider, - &self.identity.borrow().signer, + &self.identity.signer, msg.as_bytes(), )?; - group - .ds_node - .as_ref() - .borrow_mut() - .msg_send(message_out.into())?; - + group.rc_client.msg_send(message_out).await?; Ok(()) } - pub fn join_group(&self, welcome: Welcome, ds: Rc>) -> Result<(), UserError> { + pub async fn join_group(&mut self, welcome: Welcome) -> Result<(), UserError> { let group_config = MlsGroupConfig::builder() .use_ratchet_tree_extension(true) .build(); @@ -278,67 +248,38 @@ impl User { let group_id = mls_group.group_id().to_vec(); let group_name = String::from_utf8(group_id)?; - ds.borrow_mut() - .add_subscriber(self.identity.borrow().identity())?; - + let rc = RClient::new_for_group(group_name.clone()).await?; let group = Group { group_name: group_name.clone(), conversation: Conversation::default(), mls_group: RefCell::new(mls_group), - ds_node: ds, + rc_client: rc, }; - match self.groups.borrow_mut().insert(group_name, group) { + match self.groups.insert(group_name, group) { Some(old) => Err(UserError::AlreadyExistedGroupError(old.group_name)), None => Ok(()), } } - /// Get a member - fn find_member_index(&self, name: String, group: &Group) -> Result { - let mls_group = group.mls_group.borrow(); - - let member = mls_group - .members() - .find(|m| m.credential.identity().eq(name.as_bytes())); - - match member { - Some(m) => Ok(m.index), - None => Err(UserError::UnknownGroupMemberError(name)), - } - } - - fn group_members(&self, group: &Group) -> Result>, UserError> { - let mls_group = group.mls_group.borrow(); - - let members: Vec> = mls_group - .members() - .filter(|m| self.is_signature_eq(&m.signature_key)) - .map(|m| m.credential.identity().to_vec()) - .collect(); - Ok(members) - } - - pub fn remove(&mut self, name: String, group_name: String) -> Result<(), UserError> { + pub async fn remove(&mut self, name: String, group_name: String) -> Result<(), UserError> { // Get the group ID - let mut groups = self.groups.borrow_mut(); - let group = match groups.get_mut(&group_name) { + let group = match self.groups.get_mut(&group_name) { Some(g) => g, None => return Err(UserError::UnknownGroupError(group_name)), }; // Get the user leaf index - let leaf_index = self.find_member_index(name, group)?; + let leaf_index = group.find_member_index(name)?; // Remove operation on the mls group let (remove_message, _welcome, _group_info) = group.mls_group.borrow_mut().remove_members( &self.provider, - &self.identity.borrow().signer, + &self.identity.signer, &[leaf_index], )?; - let msg = remove_message.into(); - group.ds_node.as_ref().borrow_mut().msg_send(msg)?; + group.rc_client.msg_send(remove_message).await?; // Second, process the removal on our end. group @@ -346,8 +287,6 @@ impl User { .borrow_mut() .merge_pending_commit(&self.provider)?; - drop(groups); - Ok(()) } @@ -356,8 +295,7 @@ impl User { &self, group_name: String, ) -> Result>, UserError> { - let groups = self.groups.borrow(); - groups.get(&group_name).map_or_else( + self.groups.get(&group_name).map_or_else( || Err(UserError::UnknownGroupError(group_name)), |g| { Ok(g.conversation @@ -368,20 +306,51 @@ impl User { } } +impl Group { + /// Get a member + fn find_member_index(&self, name: String) -> Result { + let member = self + .mls_group + .borrow() + .members() + .find(|m| m.credential.identity().eq(name.as_bytes())); + + match member { + Some(m) => Ok(m.index), + None => Err(GroupError::UnknownGroupMemberError(name)), + } + } + + fn group_members(&self, user_signature: &[u8]) -> Vec> { + self.mls_group + .borrow() + .members() + .filter(|m| m.signature_key == user_signature) + .map(|m| m.credential.identity().to_vec()) + .collect::>>() + } +} + +#[derive(Debug, thiserror::Error)] +pub enum GroupError { + #[error("Unknown group member : {0}")] + UnknownGroupMemberError(String), +} + #[derive(Debug, thiserror::Error)] pub enum UserError { #[error("Unknown group: {0}")] UnknownGroupError(String), #[error("Group already exist: {0}")] AlreadyExistedGroupError(String), - #[error("Unknown group member : {0}")] - UnknownGroupMemberError(String), #[error("Unsupported message type")] MessageTypeError, #[error("Unknown user")] UnknownUserError, #[error("Delivery Service error: {0}")] DeliveryServiceError(#[from] DeliveryServiceError), + #[error(transparent)] + GroupError(#[from] GroupError), #[error("Key Store error: {0}")] KeyStoreError(#[from] KeyStoreError), #[error("Identity error: {0}")]