Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement delivery service using redis #11

Merged
merged 4 commits into from
Jul 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
workspace = { members = [ "sc_key_store","ds"] }
workspace = { members = ["sc_key_store", "ds"] }
[package]
name = "de-mls"
version = "0.1.0"
Expand All @@ -14,11 +14,12 @@ openmls_traits = "=0.2.0"

# waku-bindings = "0.6.0"
bus = "=2.4.1"
tokio = "=1.38.0"

rand = "=0.8.5"

anyhow = "=1.0.71"
thiserror = "=1.0.61"

ds = { path = "ds" }
sc_key_store = {path = "sc_key_store" }
sc_key_store = { path = "sc_key_store" }
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
# de-mls
Decentralized MLS PoC using a smart contract for group coordination

## Run Redis Server

`docker-compose up`

## Install deps

1. `Foundry`
Expand Down
6 changes: 6 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
version: "3.8"
services:
redis:
image: redis:7-alpine
ports:
- 6379:6379
6 changes: 5 additions & 1 deletion ds/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,15 @@ edition = "2021"
# chrono = "=0.4.38"
# waku-bindings = "=0.6.0"
bus = "=2.4.1"
fred = { version = "=9.0.3", features = ["subscriber-client"] }
tokio = "=1.38.0"

openmls = "=0.5.0"
openmls = { version = "=0.5.0", features = ["test-utils"] }
rand = { version = "^0.8" }

anyhow = "=1.0.71"
thiserror = "=1.0.61"

tls_codec = "=0.3.0"

sc_key_store = { path = "../sc_key_store" }
123 changes: 54 additions & 69 deletions ds/src/ds.rs
Original file line number Diff line number Diff line change
@@ -1,89 +1,74 @@
use bus::{Bus, BusReader};
// use chrono::Utc;
use std::collections::HashMap;
use fred::{
clients::{RedisClient, SubscriberClient},
error::RedisError,
prelude::*,
types::Message,
};

use openmls::framing::MlsMessageIn;
// use waku_bindings::*;
use tokio::sync::broadcast::{error::RecvError, Receiver};

use sc_key_store::{pks::PublicKeyStorage, KeyStoreError};
use openmls::{
framing::{MlsMessageIn, MlsMessageOut},
prelude::{TlsDeserializeTrait, TlsSerializeTrait},
};
// use waku_bindings::*;

#[derive(Debug)]
pub struct DSClient {
// node: WakuNodeHandle<Running>,
pub pub_node: Bus<MlsMessageIn>,
pub sub_node: HashMap<Vec<u8>, BusReader<MlsMessageIn>>,
pub struct RClient {
group_id: String,
client: RedisClient,
sub_client: SubscriberClient,
broadcaster: Receiver<Message>,
}

impl DSClient {
pub fn new_with_subscriber(id: Vec<u8>) -> Result<DSClient, DeliveryServiceError> {
let mut ds = DSClient {
pub_node: Bus::new(10),
sub_node: HashMap::new(),
};
ds.add_subscriber(id)?;
Ok(ds)
impl RClient {
pub async fn new_for_group(group_id: String) -> Result<Self, DeliveryServiceError> {
let redis_client = RedisClient::default();
let subscriber: SubscriberClient =
Builder::default_centralized().build_subscriber_client()?;
redis_client.init().await?;
subscriber.init().await?;
subscriber.subscribe(group_id.clone()).await?;
Ok(RClient {
group_id,
client: redis_client,
sub_client: subscriber.clone(),
broadcaster: subscriber.message_rx(),
})
}

pub fn add_subscriber(&mut self, id: Vec<u8>) -> Result<(), DeliveryServiceError> {
let rx = self.pub_node.add_rx();
self.sub_node.insert(id, rx);
pub async fn remove_from_group(&mut self) -> Result<(), DeliveryServiceError> {
self.sub_client.unsubscribe(self.group_id.clone()).await?;
self.sub_client.quit().await?;
self.client.quit().await?;
Ok(())
}

pub fn msg_send(
&mut self,
msg: MlsMessageIn,
// pubsub_topic: WakuPubSubTopic,
// content_topic: WakuContentTopic,
) -> Result<(), DeliveryServiceError> {
// let buff = self.msg.tls_serialize_detached().unwrap();
// Message::encode(&self.msg, &mut buff).expect("Could not encode :(");
self.pub_node.broadcast(msg);

// let waku_message = WakuMessage::new(
// buff,
// content_topic,
// 2,
// Utc::now().timestamp() as usize,
// vec![],
// true,
// );

// node_handle
// .node
// .relay_publish_message(&waku_message, Some(pubsub_topic.clone()), None)
// .map_err(|e| {
// debug!(
// error = tracing::field::debug(&e),
// "Failed to relay publish the message"
// );
// WakuHandlingError::PublishMessage(e)
// });
pub async fn msg_send(&mut self, msg: MlsMessageOut) -> Result<(), DeliveryServiceError> {
let buf = msg.tls_serialize_detached()?;
self.client
.publish(self.group_id.clone(), buf.as_slice())
.await?;
rymnc marked this conversation as resolved.
Show resolved Hide resolved

Ok(())
}

pub fn msg_recv(
&mut self,
id: &[u8],
pks: &PublicKeyStorage,
) -> Result<MlsMessageIn, DeliveryServiceError> {
let node: &mut BusReader<MlsMessageIn> = match self.sub_node.get_mut(id) {
Some(node) => node,
None => return Err(DeliveryServiceError::EmptySubNodeError),
};
node.recv().map_err(DeliveryServiceError::RecieveError)
pub async fn msg_recv(&mut self) -> Result<MlsMessageIn, DeliveryServiceError> {
// check only one message
let msg = self.broadcaster.recv().await?;
let bytes: Vec<u8> = msg.value.convert()?;
let res = MlsMessageIn::tls_deserialize_bytes(bytes)?;
Ok(res)
}
}

#[derive(Debug, thiserror::Error)]
pub enum DeliveryServiceError {
#[error("Could not get data from Key Store: {0}")]
KeyStoreError(KeyStoreError),
#[error("Unauthorized User")]
UnauthorizedUserError,
#[error("Subscriber doesn't exist")]
EmptySubNodeError,
#[error("Reciever error: {0}")]
RecieveError(#[from] std::sync::mpsc::RecvError),
#[error("Redis error: {0}")]
RedisError(#[from] RedisError),
#[error("Tokio error: {0}")]
TokioRecieveError(#[from] RecvError),
#[error("Serialization problem: {0}")]
TlsError(#[from] tls_codec::Error),
#[error("Unknown error: {0}")]
Other(anyhow::Error),
}
59 changes: 28 additions & 31 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ mod identity;
mod openmls_provider;
mod user;

use std::{rc::Rc, str::FromStr};
use std::str::FromStr;

use bus::Bus;
use openmls::framing::{MlsMessageIn, MlsMessageInBody};
use sc_key_store::pks::PublicKeyStorage;
use user::User;

fn main() {
#[tokio::main]
async fn main() {
let mut pks = PublicKeyStorage::default();
// This channel for message before adding to group.
// Message are still encrypted, but this channel not attached to any group
Expand Down Expand Up @@ -41,19 +42,21 @@ fn main() {
//// Alice create group: Alice_Group
println!("Start create group");
let group_name = String::from_str("Alice_Group").unwrap();
let res = a_user.create_group(group_name.clone());
let res = a_user.create_group(group_name.clone()).await;
assert!(res.is_ok());
assert!(a_user.groups.borrow().contains_key("Alice_Group"));
assert!(a_user.groups.contains_key("Alice_Group"));
println!("Create group successfully");
//////

//// Alice invite Bob
println!("Alice inviting Bob");
let welcome = a_user.invite(b_user.username(), group_name.clone(), &mut pks);
let welcome = a_user
.invite(b_user.username(), group_name.clone(), &mut pks)
.await;
assert!(welcome.is_ok());
// Alice should skip message with invite update because she already update her instance
// It is failed because of wrong epoch
let res = a_user.recieve_msg(group_name.clone(), &pks);
let res = a_user.recieve_msg(group_name.clone()).await;
assert!(res.is_err());

//// Send welcome message to system broadcast. Only Bob can use it
Expand All @@ -63,13 +66,9 @@ fn main() {
assert!(welc.is_ok());
let _ = match welc.unwrap().extract() {
MlsMessageInBody::Welcome(welcome) => {
let res = b_user.join_group(
welcome,
// same ds_node, need to think how to process this
Rc::clone(&a_user.groups.borrow().get("Alice_Group").unwrap().ds_node),
);
let res = b_user.join_group(welcome).await;
assert!(res.is_ok());
assert!(b_user.groups.borrow().contains_key("Alice_Group"));
assert!(b_user.groups.contains_key("Alice_Group"));
Ok(())
}
_ => Err("do nothing".to_string()),
Expand All @@ -78,26 +77,26 @@ fn main() {
/////

//// Bob send message and Alice recieve it
let res = b_user.send_msg("Hi!", group_name.clone());
let res = b_user.send_msg("Hi!", group_name.clone()).await;
assert!(res.is_ok());

// Bob also get the message but he cant decrypt it (regarding the mls rfc)
let res = b_user.recieve_msg(group_name.clone(), &pks);
let res = b_user.recieve_msg(group_name.clone()).await;
// Expected error with invalid decryption
assert!(res.is_err());

let res = a_user.recieve_msg(group_name.clone(), &pks);
let res = a_user.recieve_msg(group_name.clone()).await;
assert!(res.is_ok());
/////

//// Alice send message and Bob recieve it
let res = a_user.send_msg("Hi Bob!", group_name.clone());
let res = a_user.send_msg("Hi Bob!", group_name.clone()).await;
assert!(res.is_ok());

let res = a_user.recieve_msg(group_name.clone(), &pks);
let res = a_user.recieve_msg(group_name.clone()).await;
assert!(res.is_err());

let res = b_user.recieve_msg(group_name.clone(), &pks);
let res = b_user.recieve_msg(group_name.clone()).await;
assert!(res.is_ok());
/////

Expand All @@ -119,13 +118,15 @@ fn main() {

//// Alice invite Carla
println!("Alice inviting Carla");
let welcome = a_user.invite(c_user.username(), group_name.clone(), &mut pks);
let welcome = a_user
.invite(c_user.username(), group_name.clone(), &mut pks)
.await;
assert!(welcome.is_ok());
// Alice should skip message with invite update because she already update her instance
// It is failed because of wrong epoch
let res = a_user.recieve_msg(group_name.clone(), &pks);
let res = a_user.recieve_msg(group_name.clone()).await;
assert!(res.is_err());
let res = b_user.recieve_msg(group_name.clone(), &pks);
let res = b_user.recieve_msg(group_name.clone()).await;
assert!(res.is_ok());

//// Send welcome message to system broadcast. Only Bob can use it
Expand All @@ -136,13 +137,9 @@ fn main() {
assert!(welc.is_ok());
let _ = match welc.unwrap().extract() {
MlsMessageInBody::Welcome(welcome) => {
let res = c_user.join_group(
welcome,
// same ds_node, need to think how to process this
Rc::clone(&a_user.groups.borrow().get("Alice_Group").unwrap().ds_node),
);
let res = c_user.join_group(welcome).await;
assert!(res.is_ok());
assert!(c_user.groups.borrow().contains_key("Alice_Group"));
assert!(c_user.groups.contains_key("Alice_Group"));
Ok(())
}
_ => Err("do nothing".to_string()),
Expand All @@ -151,16 +148,16 @@ fn main() {
/////

//// Carla send message and Alice and Bob recieve it
let res = c_user.send_msg("Hi all!", group_name.clone());
let res = c_user.send_msg("Hi all!", group_name.clone()).await;
assert!(res.is_ok());

let res = c_user.recieve_msg(group_name.clone(), &pks);
let res = c_user.recieve_msg(group_name.clone()).await;
assert!(res.is_err());

let res = a_user.recieve_msg(group_name.clone(), &pks);
let res = a_user.recieve_msg(group_name.clone()).await;
assert!(res.is_ok());

let res = b_user.recieve_msg(group_name.clone(), &pks);
let res = b_user.recieve_msg(group_name.clone()).await;
assert!(res.is_ok());
////

Expand Down
Loading
Loading