From 4247e1b2938df4f22604579caea5da5f7f15c1d2 Mon Sep 17 00:00:00 2001 From: Darius Clark Date: Sun, 22 Oct 2023 07:12:34 -0400 Subject: [PATCH] chore: Implement conversation timestamp (#343) --- extensions/warp-ipfs/examples/messenger.rs | 6 +- .../warp-ipfs/src/store/conversation.rs | 28 ++- extensions/warp-ipfs/src/store/message.rs | 200 +++++++++++------- warp/src/raygun/mod.rs | 21 ++ 4 files changed, 165 insertions(+), 90 deletions(-) diff --git a/extensions/warp-ipfs/examples/messenger.rs b/extensions/warp-ipfs/examples/messenger.rs index 5f301c6f3..2678a35d9 100644 --- a/extensions/warp-ipfs/examples/messenger.rs +++ b/extensions/warp-ipfs/examples/messenger.rs @@ -439,7 +439,7 @@ async fn main() -> anyhow::Result<()> { } Some("/list-conversations") => { let mut table = Table::new(); - table.set_header(vec!["Name", "ID", "Recipients"]); + table.set_header(vec!["Name", "ID", "Created", "Updated", "Recipients"]); let list = chat.list_conversations().await?; for convo in list.iter() { let mut recipients = vec![]; @@ -447,7 +447,9 @@ async fn main() -> anyhow::Result<()> { let username = get_username(new_account.clone(), recipient.clone()).await.unwrap_or_else(|_| recipient.to_string()); recipients.push(username); } - table.add_row(vec![convo.name().unwrap_or_default(), convo.id().to_string(), recipients.join(",").to_string()]); + let created = convo.created(); + let modified = convo.modified(); + table.add_row(vec![convo.name().unwrap_or_default(), convo.id().to_string(), created.to_string(), modified.to_string(), recipients.join(",").to_string()]); } writeln!(stdout, "{table}")?; }, diff --git a/extensions/warp-ipfs/src/store/conversation.rs b/extensions/warp-ipfs/src/store/conversation.rs index fb69b773a..eeb70829d 100644 --- a/extensions/warp-ipfs/src/store/conversation.rs +++ b/extensions/warp-ipfs/src/store/conversation.rs @@ -36,6 +36,8 @@ pub struct ConversationDocument { pub name: Option, #[serde(skip_serializing_if = "Option::is_none")] pub creator: Option, + pub created: DateTime, + pub modified: DateTime, pub conversation_type: ConversationType, pub recipients: Vec, pub excluded: HashMap, @@ -57,7 +59,8 @@ impl From<&Conversation> for ConversationDocument { id: conversation.id(), name: conversation.name(), creator: conversation.creator(), - + created: conversation.created(), + modified: conversation.modified(), conversation_type: conversation.conversation_type(), recipients: conversation.recipients(), excluded: Default::default(), @@ -130,12 +133,15 @@ impl ConversationDocument { } impl ConversationDocument { + #[allow(clippy::too_many_arguments)] pub fn new( did: &DID, name: Option, mut recipients: Vec, id: Option, conversation_type: ConversationType, + created: Option>, + modified: Option>, creator: Option, signature: Option, ) -> Result { @@ -152,11 +158,16 @@ impl ConversationDocument { let messages = None; let excluded = Default::default(); + let created = created.unwrap_or(Utc::now()); + let modified = modified.unwrap_or(created); + let mut document = Self { id, name, recipients, creator, + created, + modified, conversation_type, excluded, messages, @@ -196,6 +207,8 @@ impl ConversationDocument { ConversationType::Direct, None, None, + None, + None, ) } @@ -207,6 +220,8 @@ impl ConversationDocument { recipients.to_vec(), conversation_id, ConversationType::Group, + None, + None, Some(did.clone()), None, ) @@ -301,6 +316,7 @@ impl ConversationDocument { ipfs: &Ipfs, list: BTreeSet, ) -> Result<(), Error> { + self.modified = Utc::now(); let cid = list.to_cid(ipfs).await?; self.messages = Some(cid); Ok(()) @@ -534,13 +550,7 @@ impl ConversationDocument { impl From for Conversation { fn from(document: ConversationDocument) -> Self { - let mut conversation = Conversation::default(); - conversation.set_id(document.id); - conversation.set_name(document.name()); - conversation.set_creator(document.creator.clone()); - conversation.set_conversation_type(document.conversation_type); - conversation.set_recipients(document.recipients()); - conversation + Conversation::from(&document) } } @@ -552,6 +562,8 @@ impl From<&ConversationDocument> for Conversation { conversation.set_creator(document.creator.clone()); conversation.set_conversation_type(document.conversation_type); conversation.set_recipients(document.recipients()); + conversation.set_created(document.created); + conversation.set_modified(document.modified); conversation } } diff --git a/extensions/warp-ipfs/src/store/message.rs b/extensions/warp-ipfs/src/store/message.rs index fb1446c33..cbe324fd7 100644 --- a/extensions/warp-ipfs/src/store/message.rs +++ b/extensions/warp-ipfs/src/store/message.rs @@ -211,26 +211,32 @@ impl MessageStore { async fn start_event_task(&self, conversation_id: Uuid) { info!("Event Task started for {conversation_id}"); - let did = self.did.clone(); - let Ok(mut conversation) = self.conversations.get(conversation_id).await else { - return; - }; - conversation.recipients.clear(); - - let Ok(tx) = self.get_conversation_sender(conversation_id).await else { - return; - }; - let Ok(stream) = self.ipfs.pubsub_subscribe(conversation.event_topic()).await else { - return; - }; - - let conversation_type = conversation.conversation_type; - - drop(conversation); let task = tokio::spawn({ let store = self.clone(); async move { + let did = store.did.clone(); + + let (topic, conversation_type) = store + .conversations + .get(conversation_id) + .await + .map(|conversation| { + (conversation.event_topic(), conversation.conversation_type) + }) + .expect("Conversation exist"); + + let stream = store + .ipfs + .pubsub_subscribe(topic) + .await + .expect("Topic isnt subscribed"); + + let tx = store + .get_conversation_sender(conversation_id) + .await + .expect("Conversation exist"); + futures::pin_mut!(stream); while let Some(stream) = stream.next().await { @@ -245,8 +251,7 @@ impl MessageStore { .conversations .get(conversation_id) .await - .map(|c| c.recipients()) - .unwrap_or_default() + .map(|c| c.recipients())? .iter() .filter(|did| own_did.ne(did)) .cloned() @@ -267,29 +272,56 @@ impl MessageStore { } }; - if let Ok(data) = bytes.await { - if let Ok(MessagingEvents::Event { - conversation_id, - member, - event, - cancelled, - }) = serde_json::from_slice::(&data) - { - let ev = match cancelled { - true => MessageEventKind::EventCancelled { - conversation_id, - did_key: member, - event, - }, - false => MessageEventKind::EventReceived { - conversation_id, - did_key: member, - event, - }, - }; - if let Err(e) = tx.send(ev) { - error!("Error broadcasting event: {e}"); - } + let data = match bytes.await { + Ok(data) => data, + Err(e) => { + warn!( + "Failed to decrypt payload from {} in {}: {e}", + payload.sender(), + conversation_id + ); + continue; + } + }; + + let event = match serde_json::from_slice::(&data) { + Ok(event @ MessagingEvents::Event { .. }) => event, + Ok(_) => { + warn!("Unreachable event in {conversation_id}"); + continue; + } + Err(e) => { + warn!( + "Failed to deserialize payload from {} in {}: {e}", + payload.sender(), + conversation_id + ); + continue; + } + }; + + if let MessagingEvents::Event { + conversation_id, + member, + event, + cancelled, + } = event + { + let ev = match cancelled { + true => MessageEventKind::EventCancelled { + conversation_id, + did_key: member, + event, + }, + false => MessageEventKind::EventReceived { + conversation_id, + did_key: member, + event, + }, + }; + + if let Err(e) = tx.send(ev) { + error!("Error broadcasting event: {e}"); } } } @@ -304,23 +336,25 @@ impl MessageStore { async fn start_reqres_task(&self, conversation_id: Uuid) { info!("RequestResponse Task started for {conversation_id}"); - let did = self.did.clone(); - let Ok(conversation) = self.conversations.get(conversation_id).await else { - return; - }; - - let Ok(stream) = self - .ipfs - .pubsub_subscribe(conversation.reqres_topic(&did)) - .await - else { - return; - }; - drop(conversation); let task = tokio::spawn({ let store = self.clone(); async move { + let did = store.did.clone(); + + let topic = store + .conversations + .get(conversation_id) + .await + .map(|conversation| conversation.reqres_topic(&did)) + .expect("Conversation exist"); + + let stream = store + .ipfs + .pubsub_subscribe(topic) + .await + .expect("Topic isnt subscribed"); + futures::pin_mut!(stream); while let Some(stream) = stream.next().await { @@ -338,11 +372,11 @@ impl MessageStore { kind, } => match kind { ConversationRequestKind::Key => { - let Ok(conversation) = - store.conversations.get(conversation_id).await - else { - continue; - }; + let conversation = store + .conversations + .get(conversation_id) + .await + .expect("Conversation exist"); if !matches!( conversation.conversation_type, @@ -356,6 +390,7 @@ impl MessageStore { .recipients() .contains(&payload.sender()) { + warn!("{} is not apart of conversation {conversation_id}", payload.sender()); continue; } @@ -404,7 +439,7 @@ impl MessageStore { match ecdh_encrypt(&did, Some(&sender), raw_key) { Ok(key) => key, Err(e) => { - error!("Error: {e}"); + error!("Failed to encrypt response: {e}"); continue; } }; @@ -641,27 +676,17 @@ impl MessageStore { loop { let (direction, event, ret) = tokio::select! { biased; - event = rx.next() => { - let Some((event, ret)) = event else { - continue; - }; - + Some((event, ret)) = rx.next() => { (MessageDirection::Out, event, ret) } - event = stream.next() => { - let Some(event) = event else { - continue; - }; - + Some(event) = stream.next() => { let Ok(data) = Payload::from_bytes(&event.data) else { continue; }; let own_did = &*did; - let Ok(conversation) = store.conversations.get(conversation_id).await else { - continue; - }; + let conversation = store.conversations.get(conversation_id).await.expect("Conversation exist"); let bytes_results = match conversation.conversation_type { ConversationType::Direct => { @@ -673,26 +698,39 @@ impl MessageStore { .collect::>() .first() .cloned() else { + tracing::log::warn!("participant is not in {}", conversation_id); continue; }; ecdh_decrypt(own_did, Some(&recipient), data.data()) } ConversationType::Group => { - - let Ok(key) = store.conversation_keystore(conversation.id()).await.and_then(|keystore| keystore.get_latest(own_did, &data.sender())) else { - continue; + let key = match store.conversation_keystore(conversation.id()).await.and_then(|store| store.get_latest(own_did, &data.sender())) { + Ok(key) => key, + Err(e) => { + tracing::log::warn!("Failed to obtain key for {}: {e}", data.sender()); + continue; + } }; + Cipher::direct_decrypt(data.data(), &key) } }; drop(conversation); - let Ok(bytes) = bytes_results else { - continue; + let bytes = match bytes_results { + Ok(b) => b, + Err(e) => { + tracing::log::warn!("Failed to decrypt payload from {} in {conversation_id}: {e}", data.sender()); + continue; + } }; - let Ok(event) = serde_json::from_slice::(&bytes) else { - continue; + let event = match serde_json::from_slice::(&bytes) { + Ok(e) => e, + Err(e) => { + tracing::log::warn!("Failed to deserialize message from {} in {conversation_id}: {e}", data.sender()); + continue; + } }; (MessageDirection::In, event, None) @@ -1320,6 +1358,8 @@ impl MessageStore { list.clone(), Some(conversation_id), ConversationType::Group, + None, + None, Some(creator), signature, )?; diff --git a/warp/src/raygun/mod.rs b/warp/src/raygun/mod.rs index 5b3875785..a9734be1d 100644 --- a/warp/src/raygun/mod.rs +++ b/warp/src/raygun/mod.rs @@ -403,6 +403,8 @@ pub struct Conversation { #[serde(skip_serializing_if = "Option::is_none")] name: Option, creator: Option, + created: DateTime, + modified: DateTime, conversation_type: ConversationType, recipients: Vec, } @@ -426,10 +428,13 @@ impl Default for Conversation { let creator = None; let conversation_type = ConversationType::Direct; let recipients = Vec::new(); + let timestamp = Utc::now(); Self { id, name, creator, + created: timestamp, + modified: timestamp, conversation_type, recipients, } @@ -449,6 +454,14 @@ impl Conversation { self.creator.clone() } + pub fn created(&self) -> DateTime { + self.created + } + + pub fn modified(&self) -> DateTime { + self.modified + } + pub fn conversation_type(&self) -> ConversationType { self.conversation_type } @@ -471,6 +484,14 @@ impl Conversation { self.creator = creator; } + pub fn set_created(&mut self, created: DateTime) { + self.created = created; + } + + pub fn set_modified(&mut self, modified: DateTime) { + self.modified = modified; + } + pub fn set_conversation_type(&mut self, conversation_type: ConversationType) { self.conversation_type = conversation_type; }