From 81df82ed6b8c5470d51c717f320acdc1f09b9ec6 Mon Sep 17 00:00:00 2001 From: ashneverdawn <8341280+ashneverdawn@users.noreply.github.com> Date: Tue, 31 Dec 2024 10:48:46 -0400 Subject: [PATCH 1/2] feat: community invites that anyone can use (#649) Co-authored-by: Darius Co-authored-by: Darius Clark --- extensions/warp-ipfs/src/lib.rs | 8 +- extensions/warp-ipfs/src/store/community.rs | 13 +- extensions/warp-ipfs/src/store/message.rs | 223 ++++--- .../src/store/message/community_task.rs | 311 +++++----- extensions/warp-ipfs/src/store/mod.rs | 28 +- extensions/warp-ipfs/tests/community.rs | 575 ++++++------------ warp/src/error.rs | 6 + warp/src/raygun/community.rs | 6 +- warp/src/raygun/mod.rs | 6 +- warp/src/warp.rs | 10 +- 10 files changed, 547 insertions(+), 639 deletions(-) diff --git a/extensions/warp-ipfs/src/lib.rs b/extensions/warp-ipfs/src/lib.rs index 1536f5d72..294139d7c 100644 --- a/extensions/warp-ipfs/src/lib.rs +++ b/extensions/warp-ipfs/src/lib.rs @@ -1780,13 +1780,9 @@ impl RayGunCommunity for WarpIpfs { .get_community_invite(community_id, invite_id) .await } - async fn accept_community_invite( - &mut self, - community_id: Uuid, - invite_id: Uuid, - ) -> Result<(), Error> { + async fn request_join_community(&mut self, community_id: Uuid) -> Result<(), Error> { self.messaging_store()? - .accept_community_invite(community_id, invite_id) + .request_join_community(community_id) .await } async fn edit_community_invite( diff --git a/extensions/warp-ipfs/src/store/community.rs b/extensions/warp-ipfs/src/store/community.rs index e2707b003..b71acf270 100644 --- a/extensions/warp-ipfs/src/store/community.rs +++ b/extensions/warp-ipfs/src/store/community.rs @@ -139,6 +139,10 @@ impl CommunityDocument { self.id.exchange_topic(did) } + pub fn join_topic(&self) -> String { + self.id.join_topic() + } + pub fn sign(&mut self, keypair: &Keypair) -> Result<(), Error> { let construct = warp::crypto::hash::sha256_iter( [ @@ -275,12 +279,9 @@ impl From for Community { } impl CommunityDocument { pub fn participants(&self) -> IndexSet { - self.invites - .iter() - .filter_map(|(_, invite)| invite.target_user.clone()) - .chain(self.members.clone()) - .chain(std::iter::once(self.owner.clone())) - .collect::>() + let mut participants = self.members.clone(); + participants.insert(self.owner.clone()); + participants } pub fn has_valid_invite(&self, user: &DID) -> bool { for (_, invite) in &self.invites { diff --git a/extensions/warp-ipfs/src/store/message.rs b/extensions/warp-ipfs/src/store/message.rs index e261016d5..dd31d7c5f 100644 --- a/extensions/warp-ipfs/src/store/message.rs +++ b/extensions/warp-ipfs/src/store/message.rs @@ -32,7 +32,10 @@ use rust_ipfs::{Ipfs, PeerId}; use serde::{Deserialize, Serialize}; use uuid::Uuid; +use super::community::CommunityInviteDocument; +use super::topics::ConversationTopic; use super::{document::root::RootDocumentMap, ds_key::DataStoreKey, PeerIdExt}; +use crate::store::CommunityJoinEvents; use crate::store::{ conversation::ConversationDocument, discovery::Discovery, @@ -92,6 +95,7 @@ impl MessageStore { ipfs: ipfs.clone(), conversation_task: HashMap::new(), community_task: HashMap::new(), + community_invites: vec![], identity: identity.clone(), root, discovery, @@ -903,8 +907,12 @@ impl MessageStore { inner.list_communities_joined().await } pub async fn list_communities_invited_to(&self) -> Result, Error> { - let inner = &mut *self.inner.write().await; - inner.list_communities_invited_to().await + let inner = &*self.inner.read().await; + Ok(inner + .community_invites + .iter() + .map(|(community_id, i)| (*community_id, CommunityInvite::from(i.clone()))) + .collect()) } pub async fn leave_community(&mut self, community_id: Uuid) -> Result<(), Error> { let inner = &*self.inner.read().await; @@ -1024,20 +1032,38 @@ impl MessageStore { invite_id: Uuid, ) -> Result<(), Error> { let inner = &*self.inner.read().await; - let community_meta = inner - .community_task - .get(&community_id) - .ok_or(Error::InvalidCommunity)?; - let (tx, rx) = oneshot::channel(); - let _ = community_meta - .command_tx - .clone() - .send(CommunityTaskCommand::DeleteCommunityInvite { - invite_id, - response: tx, - }) - .await; - rx.await.map_err(anyhow::Error::from)? + match inner.community_task.get(&community_id) { + None => { + let keypair = inner.root.keypair(); + + let event = CommunityJoinEvents::DeleteInvite { invite_id }; + let payload = PayloadBuilder::new(keypair, event) + .from_ipfs(&inner.ipfs) + .await?; + let bytes = payload.to_bytes()?; + + if let Err(e) = inner + .ipfs + .pubsub_publish(community_id.join_topic(), bytes) + .await + { + tracing::error!(id=%community_id, "Unable to send event: {e}"); + } + } + Some(community_meta) => { + let (tx, rx) = oneshot::channel(); + let _ = community_meta + .command_tx + .clone() + .send(CommunityTaskCommand::DeleteCommunityInvite { + invite_id, + response: tx, + }) + .await; + return rx.await.map_err(anyhow::Error::from)?; + } + } + Ok(()) } pub async fn get_community_invite( &mut self, @@ -1060,26 +1086,24 @@ impl MessageStore { .await; rx.await.map_err(anyhow::Error::from)? } - pub async fn accept_community_invite( - &mut self, - community_id: Uuid, - invite_id: Uuid, - ) -> Result<(), Error> { + pub async fn request_join_community(&mut self, community_id: Uuid) -> Result<(), Error> { let inner = &*self.inner.read().await; - let community_meta = inner - .community_task - .get(&community_id) - .ok_or(Error::InvalidCommunity)?; - let (tx, rx) = oneshot::channel(); - let _ = community_meta - .command_tx - .clone() - .send(CommunityTaskCommand::AcceptCommunityInvite { - invite_id, - response: tx, - }) - .await; - rx.await.map_err(anyhow::Error::from)? + let keypair = inner.root.keypair(); + + let event = CommunityJoinEvents::Join; + let payload = PayloadBuilder::new(keypair, event) + .from_ipfs(&inner.ipfs) + .await?; + let bytes = payload.to_bytes()?; + + if let Err(e) = inner + .ipfs + .pubsub_publish(community_id.join_topic(), bytes) + .await + { + tracing::error!(id=%community_id, "Unable to send event: {e}"); + } + Ok(()) } pub async fn edit_community_invite( &mut self, @@ -2105,6 +2129,7 @@ struct ConversationInner { ipfs: Ipfs, conversation_task: HashMap, community_task: HashMap, + community_invites: Vec<(Uuid, CommunityInviteDocument)>, root: RootDocumentMap, file: FileStore, event: EventSubscription, @@ -3016,8 +3041,7 @@ impl ConversationInner { pub async fn get_community(&mut self, community_id: Uuid) -> Result { let doc = self.get_community_document(community_id).await?; let own_did = &self.identity.did_key(); - if own_did != &doc.owner && !doc.has_valid_invite(own_did) && !doc.members.contains(own_did) - { + if !doc.participants().contains(own_did) { return Err(Error::Unauthorized); } Ok(doc.into()) @@ -3038,24 +3062,6 @@ impl ConversationInner { }) .collect()) } - pub async fn list_communities_invited_to(&self) -> Result, Error> { - let own_did = &self.identity.did_key(); - Ok(self - .list_community() - .await - .iter() - .filter_map(|c| { - for (_, invite) in &c.invites { - if let Some(target) = &invite.target_user { - if target == own_did { - return Some((c.id, CommunityInvite::from(invite.clone()))); - } - } - } - None - }) - .collect()) - } } async fn process_conversation( @@ -3196,30 +3202,18 @@ async fn process_conversation( ConversationEvents::NewCommunityInvite { community_id, invite, - community_document, } => { - let did = this.identity.did_key(); - - if this.contains_community(community_id).await { - return Err(anyhow::anyhow!("Already apart of {community_id}").into()); - } - - let recipients = community_document.participants().clone(); - - for recipient in &recipients { - if !this.discovery.contains(recipient).await { - let _ = this.discovery.insert(recipient).await; + let mut updated = false; + for i in this.community_invites.len()..0 { + let (community, invitation) = this.community_invites[i].clone(); + if community == community_id && invitation.id == invite.id { + this.community_invites[i] = (community_id, invite.clone()); + updated = true; + break; } } - - this.set_community_document(community_document).await?; - - this.create_community_task(community_id).await?; - - for recipient in recipients.iter().filter(|d| did.ne(d)) { - if let Err(e) = this.request_community_key(community_id, recipient).await { - tracing::warn!(%community_id, error = %e, %recipient, "Failed to send exchange request"); - } + if !updated { + this.community_invites.push((community_id, invite.clone())); } this.event @@ -3229,6 +3223,83 @@ async fn process_conversation( }) .await; } + ConversationEvents::DeleteCommunityInvite { + community_id, + invite, + } => { + for i in this.community_invites.len()..0 { + let (community, invitation) = this.community_invites[i].clone(); + if community == community_id && invitation.id == invite.id { + this.community_invites.swap_remove(i); + this.event + .emit(RayGunEventKind::CommunityUninvited { + community_id, + invite_id: invite.id, + }) + .await; + break; + } + } + } + ConversationEvents::JoinCommunity { + community_id, + community_document: result, + } => match result { + None => { + this.event + .emit(RayGunEventKind::CommunityJoinRejected { community_id }) + .await; + return Ok(()); + } + Some(community_document) => { + for i in this.community_invites.len()..0 { + let (community, _) = this.community_invites[i]; + if community == community_id { + this.community_invites.swap_remove(i); + } + } + + let did = this.identity.did_key(); + + if this.contains_community(community_id).await { + return Ok(()); + } + + let recipients = community_document.participants().clone(); + + for recipient in &recipients { + if !this.discovery.contains(recipient).await { + let _ = this.discovery.insert(recipient).await; + } + } + + this.set_community_document(community_document).await?; + + this.create_community_task(community_id).await?; + + for recipient in recipients.iter().filter(|d| did.ne(d)) { + if let Err(e) = this.request_community_key(community_id, recipient).await { + tracing::warn!(%community_id, error = %e, %recipient, "Failed to send exchange request"); + } + } + + let community_meta = this + .community_task + .get(&community_id) + .ok_or(Error::InvalidCommunity)?; + let (tx, rx) = oneshot::channel(); + let _ = community_meta + .command_tx + .clone() + .send(CommunityTaskCommand::SendJoinedCommunityEvent { response: tx }) + .await; + let _ = rx.await.map_err(anyhow::Error::from)?; + + this.event + .emit(RayGunEventKind::CommunityJoined { community_id }) + .await; + } + }, ConversationEvents::DeleteCommunity { community_id } => { tracing::trace!("Delete community event received for {community_id}"); if !this.contains_community(community_id).await { diff --git a/extensions/warp-ipfs/src/store/message/community_task.rs b/extensions/warp-ipfs/src/store/message/community_task.rs index 0084b6320..bb5d9dc76 100644 --- a/extensions/warp-ipfs/src/store/message/community_task.rs +++ b/extensions/warp-ipfs/src/store/message/community_task.rs @@ -45,9 +45,9 @@ use crate::store::ds_key::DataStoreKey; use crate::store::event_subscription::EventSubscription; use crate::store::topics::PeerTopic; use crate::store::{ - CommunityUpdateKind, ConversationEvents, ConversationImageType, MAX_COMMUNITY_CHANNELS, - MAX_COMMUNITY_DESCRIPTION, MAX_CONVERSATION_BANNER_SIZE, MAX_CONVERSATION_ICON_SIZE, - MAX_MESSAGE_SIZE, MIN_MESSAGE_SIZE, + CommunityJoinEvents, CommunityUpdateKind, ConversationEvents, ConversationImageType, + MAX_COMMUNITY_CHANNELS, MAX_COMMUNITY_DESCRIPTION, MAX_CONVERSATION_BANNER_SIZE, + MAX_CONVERSATION_ICON_SIZE, MAX_MESSAGE_SIZE, MIN_MESSAGE_SIZE, }; use crate::utils::{ByteCollection, ExtensionType}; use crate::{ @@ -101,10 +101,6 @@ pub enum CommunityTaskCommand { invite_id: Uuid, response: oneshot::Sender>, }, - AcceptCommunityInvite { - invite_id: Uuid, - response: oneshot::Sender>, - }, EditCommunityInvite { invite_id: Uuid, invite: CommunityInvite, @@ -307,6 +303,9 @@ pub enum CommunityTaskCommand { response: oneshot::Sender>, Error>>, }, + SendJoinedCommunityEvent { + response: oneshot::Sender>, + }, EventHandler { response: oneshot::Sender>, }, @@ -329,6 +328,7 @@ pub struct CommunityTask { messaging_stream: SubscriptionStream, event_stream: SubscriptionStream, request_stream: SubscriptionStream, + join_stream: SubscriptionStream, attachment_tx: futures::channel::mpsc::Sender, attachment_rx: futures::channel::mpsc::Receiver, @@ -386,12 +386,12 @@ impl CommunityTask { let main_topic = document.topic(); let event_topic = document.event_topic(); let request_topic = document.exchange_topic(&identity.did_key()); + let join_topic = document.join_topic(); let messaging_stream = ipfs.pubsub_subscribe(main_topic).await?; - let event_stream = ipfs.pubsub_subscribe(event_topic).await?; - let request_stream = ipfs.pubsub_subscribe(request_topic).await?; + let join_stream = ipfs.pubsub_subscribe(join_topic).await?; let (atx, arx) = futures::channel::mpsc::channel(256); let (btx, _) = tokio::sync::broadcast::channel(1024); @@ -409,6 +409,7 @@ impl CommunityTask { messaging_stream, request_stream, event_stream, + join_stream, attachment_tx: atx, attachment_rx: arx, @@ -504,6 +505,12 @@ impl CommunityTask { tracing::error!(%community_id, sender = ?source, error = %e, name = "msg", "Failed to process payload"); } }, + Some(message) = this.join_stream.next() => { + let source = message.source; + if let Err(e) = this.process_join_event(message).await { + tracing::error!(%community_id, sender = ?source, error = %e, name = "join", "Failed to process payload"); + } + }, _ = &mut queue_timer => { _ = process_queue(this).await; queue_timer.reset(Duration::from_secs(1)); @@ -738,13 +745,6 @@ impl CommunityTask { let result = self.get_community_invite(invite_id).await; let _ = response.send(result); } - CommunityTaskCommand::AcceptCommunityInvite { - response, - invite_id, - } => { - let result = self.accept_community_invite(invite_id).await; - let _ = response.send(result); - } CommunityTaskCommand::EditCommunityInvite { response, invite_id, @@ -1096,6 +1096,14 @@ impl CommunityTask { let _ = response.send(result); } + CommunityTaskCommand::SendJoinedCommunityEvent { response } => { + let event = CommunityMessagingEvents::JoinedCommunity { + community_id: self.community_id, + user: self.identity.did_key(), + }; + let result = self.publish(None, event, true).await; + let _ = response.send(result); + } CommunityTaskCommand::EventHandler { response } => { let sender = self.event_broadcast.clone(); let _ = response.send(sender); @@ -1263,6 +1271,90 @@ impl CommunityTask { Ok(()) } + async fn process_join_event(&mut self, msg: Message) -> Result<(), Error> { + let data = PayloadMessage::::from_bytes(&msg.data)?; + let community_id = self.community_id; + let sender = data.sender().to_did()?; + + match data.message(None)? { + CommunityJoinEvents::Join => { + let now = Utc::now(); + + if !self.document.invites.iter().any(|(_, invite)| { + invite.expiry.is_none_or(|expiry| expiry > now) + && invite + .target_user + .as_ref() + .is_none_or(|target| &sender == target) + }) { + self.send_single_community_event( + &sender, + ConversationEvents::JoinCommunity { + community_id, + community_document: None, + }, + ) + .await?; + return Ok(()); + } + + self.document.members.insert(sender.clone()); + + self.document.invites.retain(|_, invite| { + !invite + .target_user + .as_ref() + .is_some_and(|target| &sender == target) + }); + + self.set_document().await?; + + self.send_single_community_event( + &sender, + ConversationEvents::JoinCommunity { + community_id: self.community_id, + community_document: Some(self.document.clone()), + }, + ) + .await?; + + if !self.discovery.contains(&sender).await { + let _ = self.discovery.insert(&sender).await; + } + if let Err(_e) = self.request_key(&sender).await {} + } + CommunityJoinEvents::DeleteInvite { invite_id } => { + let invite_id = invite_id.to_string(); + let invite = self + .document + .invites + .get(&invite_id) + .ok_or(Error::CommunityInviteDoesntExist)? + .clone(); + + if !invite + .target_user + .clone() + .is_some_and(|target| target == sender) + { + return Err(Error::InvalidCommunityInvite); + } + + self.document.invites.swap_remove(&invite_id); + self.set_document().await?; + + self.send_single_community_event( + &sender, + ConversationEvents::DeleteCommunityInvite { + community_id: self.community_id, + invite, + }, + ) + .await?; + } + } + Ok(()) + } fn community_key(&self, member: Option<&DID>) -> Result, Error> { let keypair = self.root.keypair(); @@ -1367,7 +1459,6 @@ impl CommunityTask { kind: CommunityUpdateKind::LeaveCommunity, }, true, - vec![], ) .await } @@ -1558,7 +1649,7 @@ impl CommunityTask { let _ = self.event_broadcast.send(message_event); - self.publish(None, event, true, vec![]).await + self.publish(None, event, true).await } pub async fn create_community_invite( @@ -1574,6 +1665,12 @@ impl CommunityTask { return Err(Error::Unauthorized); } + if let Some(target) = &target_user { + if self.document.members.contains(target) { + return Err(Error::AlreadyCommunityMember); + } + } + let invite_doc = CommunityInviteDocument::new(target_user.clone(), expiry); self.document .invites @@ -1588,10 +1685,6 @@ impl CommunityTask { invite: CommunityInvite::from(invite_doc.clone()), }); - let mut exclude = vec![]; - if let Some(target) = &target_user { - exclude.push(target.clone()); - } self.publish( None, CommunityMessagingEvents::UpdateCommunity { @@ -1601,22 +1694,18 @@ impl CommunityTask { }, }, true, - exclude, ) .await?; - //TODO: implement non targeted invites if let Some(did_key) = target_user { self.send_single_community_event( &did_key.clone(), ConversationEvents::NewCommunityInvite { community_id: self.community_id, - community_document: self.document.clone(), invite: invite_doc.clone(), }, ) .await?; - if let Err(_e) = self.request_key(&did_key.clone()).await {} } Ok(CommunityInvite::from(invite_doc)) @@ -1640,6 +1729,12 @@ impl CommunityTask { return Err(Error::Unauthorized); } + let invite = self + .document + .invites + .get(&invite_id.to_string()) + .ok_or(Error::CommunityInviteDoesntExist)? + .clone(); self.document.invites.swap_remove(&invite_id.to_string()); self.set_document().await?; @@ -1657,9 +1752,21 @@ impl CommunityTask { kind: CommunityUpdateKind::DeleteCommunityInvite { invite_id }, }, true, - vec![], ) - .await + .await?; + + if let Some(did_key) = &invite.target_user { + self.send_single_community_event( + &did_key.clone(), + ConversationEvents::DeleteCommunityInvite { + community_id: self.community_id, + invite, + }, + ) + .await?; + } + + Ok(()) } pub async fn get_community_invite( &mut self, @@ -1670,55 +1777,6 @@ impl CommunityTask { None => Err(Error::CommunityInviteDoesntExist), } } - pub async fn accept_community_invite(&mut self, invite_id: Uuid) -> Result<(), Error> { - let own_did = &self.identity.did_key(); - let invite_doc = self - .document - .invites - .get(&invite_id.to_string()) - .ok_or(Error::CommunityInviteDoesntExist)?; - - if let Some(target_user) = &invite_doc.target_user { - if own_did != target_user { - return Err(Error::CommunityInviteIncorrectUser); - } - } - if let Some(expiry) = &invite_doc.expiry { - if expiry < &Utc::now() { - return Err(Error::CommunityInviteExpired); - } - } - - self.document.members.insert(own_did.clone()); - if invite_doc.target_user.is_some() { - self.document - .invites - .swap_remove(&invite_doc.id.to_string()); - } - self.set_document().await?; - - let _ = self - .event_broadcast - .send(MessageEventKind::AcceptedCommunityInvite { - community_id: self.community_id, - invite_id, - user: own_did.clone(), - }); - - self.publish( - None, - CommunityMessagingEvents::UpdateCommunity { - community: self.document.clone(), - kind: CommunityUpdateKind::AcceptCommunityInvite { - invite_id, - user: own_did.clone(), - }, - }, - true, - vec![], - ) - .await - } pub async fn edit_community_invite( &mut self, invite_id: Uuid, @@ -1755,9 +1813,26 @@ impl CommunityTask { kind: CommunityUpdateKind::EditCommunityInvite { invite_id }, }, true, - vec![], ) - .await + .await?; + + let invite = self + .document + .invites + .get(&invite_id.to_string()) + .ok_or(Error::CommunityInviteDoesntExist)?; + if let Some(did_key) = &invite.target_user { + self.send_single_community_event( + &did_key.clone(), + ConversationEvents::NewCommunityInvite { + community_id: self.community_id, + invite: invite.clone(), + }, + ) + .await?; + } + + Ok(()) } pub async fn create_community_role(&mut self, name: String) -> Result { @@ -1789,7 +1864,6 @@ impl CommunityTask { kind: CommunityUpdateKind::CreateCommunityRole { role: role.clone() }, }, true, - vec![], ) .await?; @@ -1832,7 +1906,6 @@ impl CommunityTask { kind: CommunityUpdateKind::DeleteCommunityRole { role_id }, }, true, - vec![], ) .await } @@ -1878,7 +1951,6 @@ impl CommunityTask { kind: CommunityUpdateKind::EditCommunityRole { role_id }, }, true, - vec![], ) .await } @@ -1917,7 +1989,6 @@ impl CommunityTask { kind: CommunityUpdateKind::GrantCommunityRole { role_id, user }, }, true, - vec![], ) .await } @@ -1953,7 +2024,6 @@ impl CommunityTask { kind: CommunityUpdateKind::RevokeCommunityRole { role_id, user }, }, true, - vec![], ) .await } @@ -1997,7 +2067,6 @@ impl CommunityTask { }, }, true, - vec![], ) .await?; @@ -2029,7 +2098,6 @@ impl CommunityTask { kind: CommunityUpdateKind::DeleteCommunityChannel { channel_id }, }, true, - vec![], ) .await } @@ -2082,7 +2150,6 @@ impl CommunityTask { }, }, true, - vec![], ) .await } @@ -2128,7 +2195,6 @@ impl CommunityTask { }, }, true, - vec![], ) .await } @@ -2176,7 +2242,6 @@ impl CommunityTask { }, }, true, - vec![], ) .await } @@ -2216,7 +2281,6 @@ impl CommunityTask { }, }, true, - vec![], ) .await } @@ -2253,7 +2317,6 @@ impl CommunityTask { kind: CommunityUpdateKind::GrantCommunityPermissionForAll { permission }, }, true, - vec![], ) .await } @@ -2288,7 +2351,6 @@ impl CommunityTask { kind: CommunityUpdateKind::RevokeCommunityPermissionForAll { permission }, }, true, - vec![], ) .await } @@ -2321,7 +2383,6 @@ impl CommunityTask { kind: CommunityUpdateKind::RemoveCommunityMember { member }, }, true, - vec![], ) .await } @@ -2365,7 +2426,6 @@ impl CommunityTask { }, }, true, - vec![], ) .await } @@ -2408,7 +2468,6 @@ impl CommunityTask { }, }, true, - vec![], ) .await } @@ -2463,7 +2522,6 @@ impl CommunityTask { }, }, true, - vec![], ) .await } @@ -2511,7 +2569,6 @@ impl CommunityTask { }, }, true, - vec![], ) .await } @@ -2558,7 +2615,6 @@ impl CommunityTask { }, }, true, - vec![], ) .await } @@ -2601,7 +2657,6 @@ impl CommunityTask { }, }, true, - vec![], ) .await } @@ -2895,7 +2950,7 @@ impl CommunityTask { // } // } - self.publish(Some(message_id), event, true, vec![]) + self.publish(Some(message_id), event, true) .await .map(|_| message_id) } @@ -3002,7 +3057,7 @@ impl CommunityTask { // } // } - self.publish(None, event, true, vec![]).await + self.publish(None, event, true).await } pub async fn reply_to_community_channel_message( &mut self, @@ -3108,7 +3163,7 @@ impl CommunityTask { // } // } - self.publish(Some(message_id), event, true, vec![]) + self.publish(Some(message_id), event, true) .await .map(|_| message_id) } @@ -3161,7 +3216,7 @@ impl CommunityTask { channel_id, message_id, }); - self.publish(None, event, true, vec![]).await?; + self.publish(None, event, true).await?; Ok(()) } pub async fn pin_community_channel_message( @@ -3250,7 +3305,7 @@ impl CommunityTask { state, }; - self.publish(None, event, true, vec![]).await + self.publish(None, event, true).await } pub async fn react_to_community_channel_message( &mut self, @@ -3348,7 +3403,7 @@ impl CommunityTask { // } // } - self.publish(None, event, true, vec![]).await + self.publish(None, event, true).await } pub async fn send_community_channel_messsage_event( &mut self, @@ -3565,7 +3620,7 @@ impl CommunityTask { // } // } - self.publish(Some(message_id), event, true, vec![]).await + self.publish(Some(message_id), event, true).await } pub async fn publish( @@ -3573,7 +3628,6 @@ impl CommunityTask { message_id: Option, event: CommunityMessagingEvents, queue: bool, - exclude: Vec, ) -> Result<(), Error> { let keypair = self.root.keypair(); let own_did = self.identity.did_key(); @@ -3583,12 +3637,7 @@ impl CommunityTask { let recipients = self.document.participants(); let payload = PayloadBuilder::new(keypair, event) - .add_recipients( - recipients - .iter() - .filter(|did| own_did.ne(did)) - .filter(|did| !exclude.contains(did)), - )? + .add_recipients(recipients.iter().filter(|did| own_did.ne(did)))? .set_key(key) .from_ipfs(&self.ipfs) .await?; @@ -3601,11 +3650,7 @@ impl CommunityTask { let bytes = payload.to_bytes()?; - for recipient in recipients - .iter() - .filter(|did| own_did.ne(did)) - .filter(|did| !exclude.contains(did)) - { + for recipient in recipients.iter().filter(|did| own_did.ne(did)) { let peer_id = recipient.to_peer_id()?; // We want to confirm that there is atleast one peer subscribed before attempting to send a message @@ -3992,6 +4037,14 @@ async fn message_event( } } } + CommunityMessagingEvents::JoinedCommunity { community_id, user } => { + if let Err(e) = this + .event_broadcast + .send(MessageEventKind::CommunityJoined { community_id, user }) + { + tracing::warn!(%community_id, error = %e, "Error broadcasting event"); + } + } CommunityMessagingEvents::UpdateCommunity { community, kind } => { match kind { CommunityUpdateKind::LeaveCommunity => { @@ -4005,15 +4058,6 @@ async fn message_event( } CommunityUpdateKind::CreateCommunityInvite { invite } => { this.replace_document(community).await?; - if let Some(did) = &invite.target_user { - if !this.discovery.contains(did).await { - let _ = this.discovery.insert(did).await; - } - if let Err(e) = this.request_key(did).await { - tracing::error!(%community_id, error = %e, "error requesting key"); - } - } - if let Err(e) = this.event_broadcast .send(MessageEventKind::CreatedCommunityInvite { @@ -4036,19 +4080,6 @@ async fn message_event( tracing::warn!(%community_id, error = %e, "Error broadcasting event"); } } - CommunityUpdateKind::AcceptCommunityInvite { invite_id, user } => { - this.replace_document(community).await?; - if let Err(e) = - this.event_broadcast - .send(MessageEventKind::AcceptedCommunityInvite { - community_id, - invite_id, - user, - }) - { - tracing::warn!(%community_id, error = %e, "Error broadcasting event"); - } - } CommunityUpdateKind::EditCommunityInvite { invite_id } => { this.replace_document(community).await?; if let Err(e) = diff --git a/extensions/warp-ipfs/src/store/mod.rs b/extensions/warp-ipfs/src/store/mod.rs index 5ca8e00a9..ba3fbb919 100644 --- a/extensions/warp-ipfs/src/store/mod.rs +++ b/extensions/warp-ipfs/src/store/mod.rs @@ -118,6 +118,9 @@ pub(super) mod topics { fn exchange_topic(&self, did: &DID) -> String { format!("{}/exchange/{}", self.base(), did) } + fn join_topic(&self) -> String { + format!("{}/join", self.base()) + } } impl ConversationTopic for Uuid { @@ -322,9 +325,16 @@ pub enum ConversationEvents { NewCommunityInvite { community_id: Uuid, - community_document: CommunityDocument, invite: CommunityInviteDocument, }, + DeleteCommunityInvite { + community_id: Uuid, + invite: CommunityInviteDocument, + }, + JoinCommunity { + community_id: Uuid, + community_document: Option, + }, DeleteCommunity { community_id: Uuid, }, @@ -455,6 +465,10 @@ pub enum CommunityMessagingEvents { state: ReactionState, emoji: String, }, + JoinedCommunity { + community_id: Uuid, + user: DID, + }, UpdateCommunity { community: CommunityDocument, kind: CommunityUpdateKind, @@ -468,6 +482,14 @@ pub enum CommunityMessagingEvents { }, } +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[allow(clippy::large_enum_variant)] +#[serde(rename_all = "snake_case", tag = "type")] +pub enum CommunityJoinEvents { + Join, + DeleteInvite { invite_id: Uuid }, +} + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "snake_case", tag = "type")] pub enum ConversationUpdateKind { @@ -494,10 +516,6 @@ pub enum CommunityUpdateKind { DeleteCommunityInvite { invite_id: Uuid, }, - AcceptCommunityInvite { - invite_id: Uuid, - user: DID, - }, EditCommunityInvite { invite_id: Uuid, }, diff --git a/extensions/warp-ipfs/tests/community.rs b/extensions/warp-ipfs/tests/community.rs index 470b34963..31c8a4e14 100644 --- a/extensions/warp-ipfs/tests/community.rs +++ b/extensions/warp-ipfs/tests/community.rs @@ -93,26 +93,16 @@ mod test { } ); - let invite_list = instance_b.list_communities_invited_to().await?; - assert_eq!(invite_list.len(), 1); - - let mut stream_b = instance_b.get_community_stream(community.id()).await?; - for (community_id, invite) in invite_list { - instance_b - .accept_community_invite(community_id, invite.id()) - .await?; - } - assert_next_msg_event( - vec![&mut stream_a, &mut stream_b], - Duration::from_secs(60), - MessageEventKind::AcceptedCommunityInvite { + instance_b.request_join_community(community.id()).await?; + assert_eq!( + next_event(&mut stream_a, Duration::from_secs(60)).await?, + MessageEventKind::CommunityJoined { community_id: community.id(), - invite_id: invite.id(), user: did_b.clone(), - }, - ) - .await?; + } + ); + let mut stream_b = instance_b.get_community_stream(community.id()).await?; let mut rg_stream_c = instance_c.raygun_subscribe().await?; let invite = instance_a .create_community_invite(community.id(), Some(did_c.clone()), None) @@ -134,21 +124,13 @@ mod test { } ); - let invite_list = instance_c.list_communities_invited_to().await?; - assert_eq!(invite_list.len(), 1); + instance_c.request_join_community(community.id()).await?; - let mut stream_c = instance_c.get_community_stream(community.id()).await?; - for (community_id, invite) in invite_list { - instance_c - .accept_community_invite(community_id, invite.id()) - .await?; - } assert_next_msg_event( - vec![&mut stream_a, &mut stream_b, &mut stream_c], + vec![&mut stream_a, &mut stream_b], Duration::from_secs(60), - MessageEventKind::AcceptedCommunityInvite { + MessageEventKind::CommunityJoined { community_id: community.id(), - invite_id: invite.id(), user: did_c.clone(), }, ) @@ -198,17 +180,20 @@ mod test { ); let mut stream_a = instance_a.get_community_stream(community.id()).await?; - instance_b - .accept_community_invite(community.id(), invite.id()) - .await?; + instance_b.request_join_community(community.id()).await?; assert_eq!( next_event(&mut stream_a, Duration::from_secs(60)).await?, - MessageEventKind::AcceptedCommunityInvite { + MessageEventKind::CommunityJoined { community_id: community.id(), - invite_id: invite.id(), user: did_b.clone() } ); + assert_eq!( + next_event(&mut rg_stream_b, Duration::from_secs(60)).await?, + RayGunEventKind::CommunityJoined { + community_id: community.id() + } + ); instance_a.delete_community(community.id()).await?; assert_eq!( @@ -255,14 +240,11 @@ mod test { ); let mut stream_a = instance_a.get_community_stream(community.id()).await?; - instance_b - .accept_community_invite(community.id(), invite.id()) - .await?; + instance_b.request_join_community(community.id()).await?; assert_eq!( next_event(&mut stream_a, Duration::from_secs(60)).await?, - MessageEventKind::AcceptedCommunityInvite { + MessageEventKind::CommunityJoined { community_id: community.id(), - invite_id: invite.id(), user: did_b.clone() } ); @@ -333,8 +315,11 @@ mod test { } ); - let got_community = instance_b.get_community(community.id()).await?; - assert!(got_community.invites().contains(&invite.id())); + let result = instance_b.get_community(community.id()).await; + assert_eq!( + format!("{:?}", result), + format!("{:?}", Err::(Error::InvalidCommunity)), + ); Ok(()) } #[async_test] @@ -358,9 +343,7 @@ mod test { } ); - instance_b - .accept_community_invite(community.id(), invite.id()) - .await?; + instance_b.request_join_community(community.id()).await?; let got_community = instance_b.get_community(community.id()).await?; assert!(got_community.members().contains(&did_b.clone())); @@ -388,9 +371,7 @@ mod test { } ); - instance_b - .accept_community_invite(community.id(), invite.id()) - .await?; + instance_b.request_join_community(community.id()).await?; let list = instance_b.list_communities_joined().await?; assert!(list.len() == 1); @@ -399,7 +380,7 @@ mod test { } #[async_test] - async fn list_community_invited_to() -> anyhow::Result<()> { + async fn list_communites_invited_to() -> anyhow::Result<()> { let context = Some("test::list_communites_invited_to".into()); let acc = (None, None, context); let accounts = create_accounts(vec![acc.clone(), acc]).await?; @@ -456,14 +437,11 @@ mod test { ); let mut stream_a = instance_a.get_community_stream(community.id()).await?; - instance_b - .accept_community_invite(community.id(), invite.id()) - .await?; + instance_b.request_join_community(community.id()).await?; assert_eq!( next_event(&mut stream_a, Duration::from_secs(60)).await?, - MessageEventKind::AcceptedCommunityInvite { + MessageEventKind::CommunityJoined { community_id: community.id(), - invite_id: invite.id(), user: did_b.clone() } ); @@ -508,14 +486,11 @@ mod test { ); let mut stream_a = instance_a.get_community_stream(community.id()).await?; - instance_b - .accept_community_invite(community.id(), invite.id()) - .await?; + instance_b.request_join_community(community.id()).await?; assert_eq!( next_event(&mut stream_a, Duration::from_secs(60)).await?, - MessageEventKind::AcceptedCommunityInvite { + MessageEventKind::CommunityJoined { community_id: community.id(), - invite_id: invite.id(), user: did_b.clone() } ); @@ -566,14 +541,11 @@ mod test { ); let mut stream_a = instance_a.get_community_stream(community.id()).await?; - instance_b - .accept_community_invite(community.id(), invite.id()) - .await?; + instance_b.request_join_community(community.id()).await?; assert_eq!( next_event(&mut stream_a, Duration::from_secs(60)).await?, - MessageEventKind::AcceptedCommunityInvite { + MessageEventKind::CommunityJoined { community_id: community.id(), - invite_id: invite.id(), user: did_b.clone() } ); @@ -618,14 +590,11 @@ mod test { ); let mut stream_a = instance_a.get_community_stream(community.id()).await?; - instance_b - .accept_community_invite(community.id(), invite.id()) - .await?; + instance_b.request_join_community(community.id()).await?; assert_eq!( next_event(&mut stream_a, Duration::from_secs(60)).await?, - MessageEventKind::AcceptedCommunityInvite { + MessageEventKind::CommunityJoined { community_id: community.id(), - invite_id: invite.id(), user: did_b.clone() } ); @@ -679,14 +648,11 @@ mod test { ); let mut stream_a = instance_a.get_community_stream(community.id()).await?; - instance_b - .accept_community_invite(community.id(), invite.id()) - .await?; + instance_b.request_join_community(community.id()).await?; assert_eq!( next_event(&mut stream_a, Duration::from_secs(60)).await?, - MessageEventKind::AcceptedCommunityInvite { + MessageEventKind::CommunityJoined { community_id: community.id(), - invite_id: invite.id(), user: did_b.clone() } ); @@ -734,17 +700,20 @@ mod test { ); let mut stream_a = instance_a.get_community_stream(community.id()).await?; - instance_b - .accept_community_invite(community.id(), invite_for_b.id()) - .await?; + instance_b.request_join_community(community.id()).await?; assert_eq!( next_event(&mut stream_a, Duration::from_secs(60)).await?, - MessageEventKind::AcceptedCommunityInvite { + MessageEventKind::CommunityJoined { community_id: community.id(), - invite_id: invite_for_b.id(), user: did_b.clone() } ); + assert_eq!( + next_event(&mut rg_stream_b, Duration::from_secs(60)).await?, + RayGunEventKind::CommunityJoined { + community_id: community.id() + } + ); let mut stream_b = instance_b.get_community_stream(community.id()).await?; instance_a @@ -780,17 +749,20 @@ mod test { } ); - instance_c - .accept_community_invite(community.id(), invite_for_c.id()) - .await?; + instance_c.request_join_community(community.id()).await?; assert_eq!( next_event(&mut stream_a, Duration::from_secs(60)).await?, - MessageEventKind::AcceptedCommunityInvite { + MessageEventKind::CommunityJoined { community_id: community.id(), - invite_id: invite_for_c.id(), user: did_c.clone() } ); + assert_eq!( + next_event(&mut rg_stream_c, Duration::from_secs(60)).await?, + RayGunEventKind::CommunityJoined { + community_id: community.id() + } + ); Ok(()) } #[async_test] @@ -815,14 +787,11 @@ mod test { ); let mut stream_a = instance_a.get_community_stream(community.id()).await?; - instance_b - .accept_community_invite(community.id(), invite_for_b.id()) - .await?; + instance_b.request_join_community(community.id()).await?; assert_eq!( next_event(&mut stream_a, Duration::from_secs(60)).await?, - MessageEventKind::AcceptedCommunityInvite { + MessageEventKind::CommunityJoined { community_id: community.id(), - invite_id: invite_for_b.id(), user: did_b.clone() } ); @@ -863,14 +832,11 @@ mod test { ); let mut stream_a = instance_a.get_community_stream(community.id()).await?; - instance_b - .accept_community_invite(community.id(), invite_for_b.id()) - .await?; + instance_b.request_join_community(community.id()).await?; assert_eq!( next_event(&mut stream_a, Duration::from_secs(60)).await?, - MessageEventKind::AcceptedCommunityInvite { + MessageEventKind::CommunityJoined { community_id: community.id(), - invite_id: invite_for_b.id(), user: did_b.clone() } ); @@ -921,7 +887,7 @@ mod test { } ); - let got_invite = instance_b + let got_invite = instance_a .get_community_invite(community.id(), invite.id()) .await?; assert_eq!(invite, got_invite); @@ -949,14 +915,11 @@ mod test { ); let mut stream_a = instance_a.get_community_stream(community.id()).await?; - instance_b - .accept_community_invite(community.id(), invite_for_b.id()) - .await?; + instance_b.request_join_community(community.id()).await?; assert_eq!( next_event(&mut stream_a, Duration::from_secs(60)).await?, - MessageEventKind::AcceptedCommunityInvite { + MessageEventKind::CommunityJoined { community_id: community.id(), - invite_id: invite_for_b.id(), user: did_b.clone() } ); @@ -996,14 +959,11 @@ mod test { ); let mut stream_a = instance_a.get_community_stream(community.id()).await?; - instance_b - .accept_community_invite(community.id(), invite_for_b.id()) - .await?; + instance_b.request_join_community(community.id()).await?; assert_eq!( next_event(&mut stream_a, Duration::from_secs(60)).await?, - MessageEventKind::AcceptedCommunityInvite { + MessageEventKind::CommunityJoined { community_id: community.id(), - invite_id: invite_for_b.id(), user: did_b.clone() } ); @@ -1054,14 +1014,11 @@ mod test { ); let mut stream_a = instance_a.get_community_stream(community.id()).await?; - instance_b - .accept_community_invite(community.id(), invite_for_b.id()) - .await?; + instance_b.request_join_community(community.id()).await?; assert_eq!( next_event(&mut stream_a, Duration::from_secs(60)).await?, - MessageEventKind::AcceptedCommunityInvite { + MessageEventKind::CommunityJoined { community_id: community.id(), - invite_id: invite_for_b.id(), user: did_b.clone() } ); @@ -1089,12 +1046,13 @@ mod test { } ); - let result = instance_b - .accept_community_invite(community.id(), invite_for_b.id()) - .await; + let mut rg_stream_b = instance_b.raygun_subscribe().await?; + instance_b.request_join_community(community.id()).await?; assert_eq!( - format!("{:?}", result), - format!("{:?}", Err::(Error::InvalidCommunity)) + next_event(&mut rg_stream_b, Duration::from_secs(60)).await?, + RayGunEventKind::CommunityJoinRejected { + community_id: community.id(), + } ); Ok(()) } @@ -1123,15 +1081,12 @@ mod test { } ); - let result = instance_b - .accept_community_invite(community.id(), invite_for_b.id()) - .await; + instance_b.request_join_community(community.id()).await?; assert_eq!( - format!("{:?}", result), - format!( - "{:?}", - Err::(Error::CommunityInviteExpired) - ) + next_event(&mut rg_stream_b, Duration::from_secs(60)).await?, + RayGunEventKind::CommunityJoinRejected { + community_id: community.id(), + } ); Ok(()) } @@ -1169,14 +1124,11 @@ mod test { ); let mut stream_a = instance_a.get_community_stream(community.id()).await?; - instance_b - .accept_community_invite(community.id(), invite.id()) - .await?; + instance_b.request_join_community(community.id()).await?; assert_eq!( next_event(&mut stream_a, Duration::from_secs(60)).await?, - MessageEventKind::AcceptedCommunityInvite { + MessageEventKind::CommunityJoined { community_id: community.id(), - invite_id: invite.id(), user: did_b.clone() } ); @@ -1216,14 +1168,11 @@ mod test { ); let mut stream_a = instance_a.get_community_stream(community.id()).await?; - instance_b - .accept_community_invite(community.id(), invite.id()) - .await?; + instance_b.request_join_community(community.id()).await?; assert_eq!( next_event(&mut stream_a, Duration::from_secs(60)).await?, - MessageEventKind::AcceptedCommunityInvite { + MessageEventKind::CommunityJoined { community_id: community.id(), - invite_id: invite.id(), user: did_b.clone() } ); @@ -1257,14 +1206,11 @@ mod test { ); let mut stream_a = instance_a.get_community_stream(community.id()).await?; - instance_b - .accept_community_invite(community.id(), invite.id()) - .await?; + instance_b.request_join_community(community.id()).await?; assert_eq!( next_event(&mut stream_a, Duration::from_secs(60)).await?, - MessageEventKind::AcceptedCommunityInvite { + MessageEventKind::CommunityJoined { community_id: community.id(), - invite_id: invite.id(), user: did_b.clone() } ); @@ -1300,14 +1246,11 @@ mod test { ); let mut stream_a = instance_a.get_community_stream(community.id()).await?; - instance_b - .accept_community_invite(community.id(), invite.id()) - .await?; + instance_b.request_join_community(community.id()).await?; assert_eq!( next_event(&mut stream_a, Duration::from_secs(60)).await?, - MessageEventKind::AcceptedCommunityInvite { + MessageEventKind::CommunityJoined { community_id: community.id(), - invite_id: invite.id(), user: did_b.clone() } ); @@ -1359,14 +1302,11 @@ mod test { ); let mut stream_a = instance_a.get_community_stream(community.id()).await?; - instance_b - .accept_community_invite(community.id(), invite.id()) - .await?; + instance_b.request_join_community(community.id()).await?; assert_eq!( next_event(&mut stream_a, Duration::from_secs(60)).await?, - MessageEventKind::AcceptedCommunityInvite { + MessageEventKind::CommunityJoined { community_id: community.id(), - invite_id: invite.id(), user: did_b.clone() } ); @@ -1405,14 +1345,11 @@ mod test { ); let mut stream_a = instance_a.get_community_stream(community.id()).await?; - instance_b - .accept_community_invite(community.id(), invite.id()) - .await?; + instance_b.request_join_community(community.id()).await?; assert_eq!( next_event(&mut stream_a, Duration::from_secs(60)).await?, - MessageEventKind::AcceptedCommunityInvite { + MessageEventKind::CommunityJoined { community_id: community.id(), - invite_id: invite.id(), user: did_b.clone() } ); @@ -1467,14 +1404,11 @@ mod test { ); let mut stream_a = instance_a.get_community_stream(community.id()).await?; - instance_b - .accept_community_invite(community.id(), invite.id()) - .await?; + instance_b.request_join_community(community.id()).await?; assert_eq!( next_event(&mut stream_a, Duration::from_secs(60)).await?, - MessageEventKind::AcceptedCommunityInvite { + MessageEventKind::CommunityJoined { community_id: community.id(), - invite_id: invite.id(), user: did_b.clone() } ); @@ -1510,14 +1444,11 @@ mod test { ); let mut stream_a = instance_a.get_community_stream(community.id()).await?; - instance_b - .accept_community_invite(community.id(), invite.id()) - .await?; + instance_b.request_join_community(community.id()).await?; assert_eq!( next_event(&mut stream_a, Duration::from_secs(60)).await?, - MessageEventKind::AcceptedCommunityInvite { + MessageEventKind::CommunityJoined { community_id: community.id(), - invite_id: invite.id(), user: did_b.clone() } ); @@ -1562,14 +1493,11 @@ mod test { ); let mut stream_a = instance_a.get_community_stream(community.id()).await?; - instance_b - .accept_community_invite(community.id(), invite.id()) - .await?; + instance_b.request_join_community(community.id()).await?; assert_eq!( next_event(&mut stream_a, Duration::from_secs(60)).await?, - MessageEventKind::AcceptedCommunityInvite { + MessageEventKind::CommunityJoined { community_id: community.id(), - invite_id: invite.id(), user: did_b.clone() } ); @@ -1605,14 +1533,11 @@ mod test { ); let mut stream_a = instance_a.get_community_stream(community.id()).await?; - instance_b - .accept_community_invite(community.id(), invite.id()) - .await?; + instance_b.request_join_community(community.id()).await?; assert_eq!( next_event(&mut stream_a, Duration::from_secs(60)).await?, - MessageEventKind::AcceptedCommunityInvite { + MessageEventKind::CommunityJoined { community_id: community.id(), - invite_id: invite.id(), user: did_b.clone() } ); @@ -1683,14 +1608,11 @@ mod test { ); let mut stream_a = instance_a.get_community_stream(community.id()).await?; - instance_b - .accept_community_invite(community.id(), invite.id()) - .await?; + instance_b.request_join_community(community.id()).await?; assert_eq!( next_event(&mut stream_a, Duration::from_secs(60)).await?, - MessageEventKind::AcceptedCommunityInvite { + MessageEventKind::CommunityJoined { community_id: community.id(), - invite_id: invite.id(), user: did_b.clone() } ); @@ -1726,14 +1648,11 @@ mod test { ); let mut stream_a = instance_a.get_community_stream(community.id()).await?; - instance_b - .accept_community_invite(community.id(), invite.id()) - .await?; + instance_b.request_join_community(community.id()).await?; assert_eq!( next_event(&mut stream_a, Duration::from_secs(60)).await?, - MessageEventKind::AcceptedCommunityInvite { + MessageEventKind::CommunityJoined { community_id: community.id(), - invite_id: invite.id(), user: did_b.clone() } ); @@ -1784,14 +1703,11 @@ mod test { ); let mut stream_a = instance_a.get_community_stream(community.id()).await?; - instance_b - .accept_community_invite(community.id(), invite.id()) - .await?; + instance_b.request_join_community(community.id()).await?; assert_eq!( next_event(&mut stream_a, Duration::from_secs(60)).await?, - MessageEventKind::AcceptedCommunityInvite { + MessageEventKind::CommunityJoined { community_id: community.id(), - invite_id: invite.id(), user: did_b.clone() } ); @@ -1827,14 +1743,11 @@ mod test { ); let mut stream_a = instance_a.get_community_stream(community.id()).await?; - instance_b - .accept_community_invite(community.id(), invite.id()) - .await?; + instance_b.request_join_community(community.id()).await?; assert_eq!( next_event(&mut stream_a, Duration::from_secs(60)).await?, - MessageEventKind::AcceptedCommunityInvite { + MessageEventKind::CommunityJoined { community_id: community.id(), - invite_id: invite.id(), user: did_b.clone() } ); @@ -1887,14 +1800,11 @@ mod test { ); let mut stream_a = instance_a.get_community_stream(community.id()).await?; - instance_b - .accept_community_invite(community.id(), invite.id()) - .await?; + instance_b.request_join_community(community.id()).await?; assert_eq!( next_event(&mut stream_a, Duration::from_secs(60)).await?, - MessageEventKind::AcceptedCommunityInvite { + MessageEventKind::CommunityJoined { community_id: community.id(), - invite_id: invite.id(), user: did_b.clone() } ); @@ -1912,14 +1822,11 @@ mod test { ); let mut stream_b = instance_b.get_community_stream(community.id()).await?; - instance_c - .accept_community_invite(community.id(), invite.id()) - .await?; + instance_c.request_join_community(community.id()).await?; assert_eq!( next_event(&mut stream_b, Duration::from_secs(60)).await?, - MessageEventKind::AcceptedCommunityInvite { + MessageEventKind::CommunityJoined { community_id: community.id(), - invite_id: invite.id(), user: did_c.clone() } ); @@ -1967,14 +1874,11 @@ mod test { ); let mut stream_a = instance_a.get_community_stream(community.id()).await?; - instance_b - .accept_community_invite(community.id(), invite.id()) - .await?; + instance_b.request_join_community(community.id()).await?; assert_eq!( next_event(&mut stream_a, Duration::from_secs(60)).await?, - MessageEventKind::AcceptedCommunityInvite { + MessageEventKind::CommunityJoined { community_id: community.id(), - invite_id: invite.id(), user: did_b.clone() } ); @@ -2015,15 +1919,12 @@ mod test { } ); - instance_c - .accept_community_invite(community.id(), invite.id()) - .await?; + instance_c.request_join_community(community.id()).await?; assert_next_msg_event( vec![&mut stream_a, &mut stream_b], Duration::from_secs(60), - MessageEventKind::AcceptedCommunityInvite { + MessageEventKind::CommunityJoined { community_id: community.id(), - invite_id: invite.id(), user: did_c.clone(), }, ) @@ -2072,14 +1973,11 @@ mod test { ); let mut stream_a = instance_a.get_community_stream(community.id()).await?; - instance_b - .accept_community_invite(community.id(), invite.id()) - .await?; + instance_b.request_join_community(community.id()).await?; assert_eq!( next_event(&mut stream_a, Duration::from_secs(60)).await?, - MessageEventKind::AcceptedCommunityInvite { + MessageEventKind::CommunityJoined { community_id: community.id(), - invite_id: invite.id(), user: did_b.clone() } ); @@ -2119,14 +2017,11 @@ mod test { ); let mut stream_a = instance_a.get_community_stream(community.id()).await?; - instance_b - .accept_community_invite(community.id(), invite.id()) - .await?; + instance_b.request_join_community(community.id()).await?; assert_eq!( next_event(&mut stream_a, Duration::from_secs(60)).await?, - MessageEventKind::AcceptedCommunityInvite { + MessageEventKind::CommunityJoined { community_id: community.id(), - invite_id: invite.id(), user: did_b.clone() } ); @@ -2182,14 +2077,11 @@ mod test { ); let mut stream_a = instance_a.get_community_stream(community.id()).await?; - instance_b - .accept_community_invite(community.id(), invite.id()) - .await?; + instance_b.request_join_community(community.id()).await?; assert_eq!( next_event(&mut stream_a, Duration::from_secs(60)).await?, - MessageEventKind::AcceptedCommunityInvite { + MessageEventKind::CommunityJoined { community_id: community.id(), - invite_id: invite.id(), user: did_b.clone() } ); @@ -2233,14 +2125,11 @@ mod test { ); let mut stream_a = instance_a.get_community_stream(community.id()).await?; - instance_b - .accept_community_invite(community.id(), invite.id()) - .await?; + instance_b.request_join_community(community.id()).await?; assert_eq!( next_event(&mut stream_a, Duration::from_secs(60)).await?, - MessageEventKind::AcceptedCommunityInvite { + MessageEventKind::CommunityJoined { community_id: community.id(), - invite_id: invite.id(), user: did_b.clone() } ); @@ -2311,14 +2200,11 @@ mod test { ); let mut stream_a = instance_a.get_community_stream(community.id()).await?; - instance_b - .accept_community_invite(community.id(), invite.id()) - .await?; + instance_b.request_join_community(community.id()).await?; assert_eq!( next_event(&mut stream_a, Duration::from_secs(60)).await?, - MessageEventKind::AcceptedCommunityInvite { + MessageEventKind::CommunityJoined { community_id: community.id(), - invite_id: invite.id(), user: did_b.clone() } ); @@ -2369,14 +2255,11 @@ mod test { ); let mut stream_a = instance_a.get_community_stream(community.id()).await?; - instance_b - .accept_community_invite(community.id(), invite.id()) - .await?; + instance_b.request_join_community(community.id()).await?; assert_eq!( next_event(&mut stream_a, Duration::from_secs(60)).await?, - MessageEventKind::AcceptedCommunityInvite { + MessageEventKind::CommunityJoined { community_id: community.id(), - invite_id: invite.id(), user: did_b.clone() } ); @@ -2446,14 +2329,11 @@ mod test { ); let mut stream_a = instance_a.get_community_stream(community.id()).await?; - instance_b - .accept_community_invite(community.id(), invite.id()) - .await?; + instance_b.request_join_community(community.id()).await?; assert_eq!( next_event(&mut stream_a, Duration::from_secs(60)).await?, - MessageEventKind::AcceptedCommunityInvite { + MessageEventKind::CommunityJoined { community_id: community.id(), - invite_id: invite.id(), user: did_b.clone() } ); @@ -2525,14 +2405,11 @@ mod test { ); let mut stream_a = instance_a.get_community_stream(community.id()).await?; - instance_b - .accept_community_invite(community.id(), invite.id()) - .await?; + instance_b.request_join_community(community.id()).await?; assert_eq!( next_event(&mut stream_a, Duration::from_secs(60)).await?, - MessageEventKind::AcceptedCommunityInvite { + MessageEventKind::CommunityJoined { community_id: community.id(), - invite_id: invite.id(), user: did_b.clone() } ); @@ -2610,14 +2487,11 @@ mod test { ); let mut stream_a = instance_a.get_community_stream(community.id()).await?; - instance_b - .accept_community_invite(community.id(), invite.id()) - .await?; + instance_b.request_join_community(community.id()).await?; assert_eq!( next_event(&mut stream_a, Duration::from_secs(60)).await?, - MessageEventKind::AcceptedCommunityInvite { + MessageEventKind::CommunityJoined { community_id: community.id(), - invite_id: invite.id(), user: did_b.clone() } ); @@ -2689,14 +2563,11 @@ mod test { ); let mut stream_a = instance_a.get_community_stream(community.id()).await?; - instance_b - .accept_community_invite(community.id(), invite.id()) - .await?; + instance_b.request_join_community(community.id()).await?; assert_eq!( next_event(&mut stream_a, Duration::from_secs(60)).await?, - MessageEventKind::AcceptedCommunityInvite { + MessageEventKind::CommunityJoined { community_id: community.id(), - invite_id: invite.id(), user: did_b.clone() } ); @@ -2781,14 +2652,11 @@ mod test { ); let mut stream_a = instance_a.get_community_stream(community.id()).await?; - instance_b - .accept_community_invite(community.id(), invite.id()) - .await?; + instance_b.request_join_community(community.id()).await?; assert_eq!( next_event(&mut stream_a, Duration::from_secs(60)).await?, - MessageEventKind::AcceptedCommunityInvite { + MessageEventKind::CommunityJoined { community_id: community.id(), - invite_id: invite.id(), user: did_b.clone() } ); @@ -2860,14 +2728,11 @@ mod test { ); let mut stream_a = instance_a.get_community_stream(community.id()).await?; - instance_b - .accept_community_invite(community.id(), invite.id()) - .await?; + instance_b.request_join_community(community.id()).await?; assert_eq!( next_event(&mut stream_a, Duration::from_secs(60)).await?, - MessageEventKind::AcceptedCommunityInvite { + MessageEventKind::CommunityJoined { community_id: community.id(), - invite_id: invite.id(), user: did_b.clone() } ); @@ -2943,14 +2808,11 @@ mod test { ); let mut stream_a = instance_a.get_community_stream(community.id()).await?; - instance_b - .accept_community_invite(community.id(), invite.id()) - .await?; + instance_b.request_join_community(community.id()).await?; assert_eq!( next_event(&mut stream_a, Duration::from_secs(60)).await?, - MessageEventKind::AcceptedCommunityInvite { + MessageEventKind::CommunityJoined { community_id: community.id(), - invite_id: invite.id(), user: did_b.clone() } ); @@ -3022,14 +2884,11 @@ mod test { ); let mut stream_a = instance_a.get_community_stream(community.id()).await?; - instance_b - .accept_community_invite(community.id(), invite.id()) - .await?; + instance_b.request_join_community(community.id()).await?; assert_eq!( next_event(&mut stream_a, Duration::from_secs(60)).await?, - MessageEventKind::AcceptedCommunityInvite { + MessageEventKind::CommunityJoined { community_id: community.id(), - invite_id: invite.id(), user: did_b.clone() } ); @@ -3107,14 +2966,11 @@ mod test { ); let mut stream_a = instance_a.get_community_stream(community.id()).await?; - instance_b - .accept_community_invite(community.id(), invite.id()) - .await?; + instance_b.request_join_community(community.id()).await?; assert_eq!( next_event(&mut stream_a, Duration::from_secs(60)).await?, - MessageEventKind::AcceptedCommunityInvite { + MessageEventKind::CommunityJoined { community_id: community.id(), - invite_id: invite.id(), user: did_b.clone() } ); @@ -3190,14 +3046,11 @@ mod test { ); let mut stream_a = instance_a.get_community_stream(community.id()).await?; - instance_b - .accept_community_invite(community.id(), invite.id()) - .await?; + instance_b.request_join_community(community.id()).await?; assert_eq!( next_event(&mut stream_a, Duration::from_secs(60)).await?, - MessageEventKind::AcceptedCommunityInvite { + MessageEventKind::CommunityJoined { community_id: community.id(), - invite_id: invite.id(), user: did_b.clone() } ); @@ -3281,14 +3134,11 @@ mod test { ); let mut stream_a = instance_a.get_community_stream(community.id()).await?; - instance_b - .accept_community_invite(community.id(), invite.id()) - .await?; + instance_b.request_join_community(community.id()).await?; assert_eq!( next_event(&mut stream_a, Duration::from_secs(60)).await?, - MessageEventKind::AcceptedCommunityInvite { + MessageEventKind::CommunityJoined { community_id: community.id(), - invite_id: invite.id(), user: did_b.clone() } ); @@ -3328,14 +3178,11 @@ mod test { ); let mut stream_a = instance_a.get_community_stream(community.id()).await?; - instance_b - .accept_community_invite(community.id(), invite.id()) - .await?; + instance_b.request_join_community(community.id()).await?; assert_eq!( next_event(&mut stream_a, Duration::from_secs(60)).await?, - MessageEventKind::AcceptedCommunityInvite { + MessageEventKind::CommunityJoined { community_id: community.id(), - invite_id: invite.id(), user: did_b.clone() } ); @@ -3408,14 +3255,11 @@ mod test { ); let mut stream_a = instance_a.get_community_stream(community.id()).await?; - instance_b - .accept_community_invite(community.id(), invite.id()) - .await?; + instance_b.request_join_community(community.id()).await?; assert_eq!( next_event(&mut stream_a, Duration::from_secs(60)).await?, - MessageEventKind::AcceptedCommunityInvite { + MessageEventKind::CommunityJoined { community_id: community.id(), - invite_id: invite.id(), user: did_b.clone() } ); @@ -3491,14 +3335,11 @@ mod test { ); let mut stream_a = instance_a.get_community_stream(community.id()).await?; - instance_b - .accept_community_invite(community.id(), invite.id()) - .await?; + instance_b.request_join_community(community.id()).await?; assert_eq!( next_event(&mut stream_a, Duration::from_secs(60)).await?, - MessageEventKind::AcceptedCommunityInvite { + MessageEventKind::CommunityJoined { community_id: community.id(), - invite_id: invite.id(), user: did_b.clone() } ); @@ -3565,14 +3406,11 @@ mod test { ); let mut stream_a = instance_a.get_community_stream(community.id()).await?; - instance_b - .accept_community_invite(community.id(), invite.id()) - .await?; + instance_b.request_join_community(community.id()).await?; assert_eq!( next_event(&mut stream_a, Duration::from_secs(60)).await?, - MessageEventKind::AcceptedCommunityInvite { + MessageEventKind::CommunityJoined { community_id: community.id(), - invite_id: invite.id(), user: did_b.clone() } ); @@ -3665,14 +3503,11 @@ mod test { ); let mut stream_a = instance_a.get_community_stream(community.id()).await?; - instance_b - .accept_community_invite(community.id(), invite.id()) - .await?; + instance_b.request_join_community(community.id()).await?; assert_eq!( next_event(&mut stream_a, Duration::from_secs(60)).await?, - MessageEventKind::AcceptedCommunityInvite { + MessageEventKind::CommunityJoined { community_id: community.id(), - invite_id: invite.id(), user: did_b.clone() } ); @@ -3757,14 +3592,11 @@ mod test { ); let mut stream_a = instance_a.get_community_stream(community.id()).await?; - instance_b - .accept_community_invite(community.id(), invite.id()) - .await?; + instance_b.request_join_community(community.id()).await?; assert_eq!( next_event(&mut stream_a, Duration::from_secs(60)).await?, - MessageEventKind::AcceptedCommunityInvite { + MessageEventKind::CommunityJoined { community_id: community.id(), - invite_id: invite.id(), user: did_b.clone() } ); @@ -3857,14 +3689,11 @@ mod test { ); let mut stream_a = instance_a.get_community_stream(community.id()).await?; - instance_b - .accept_community_invite(community.id(), invite.id()) - .await?; + instance_b.request_join_community(community.id()).await?; assert_eq!( next_event(&mut stream_a, Duration::from_secs(60)).await?, - MessageEventKind::AcceptedCommunityInvite { + MessageEventKind::CommunityJoined { community_id: community.id(), - invite_id: invite.id(), user: did_b.clone() } ); @@ -3972,14 +3801,11 @@ mod test { ); let mut stream_a = instance_a.get_community_stream(community.id()).await?; - instance_b - .accept_community_invite(community.id(), invite.id()) - .await?; + instance_b.request_join_community(community.id()).await?; assert_eq!( next_event(&mut stream_a, Duration::from_secs(60)).await?, - MessageEventKind::AcceptedCommunityInvite { + MessageEventKind::CommunityJoined { community_id: community.id(), - invite_id: invite.id(), user: did_b.clone() } ); @@ -4053,14 +3879,11 @@ mod test { ); let mut stream_a = instance_a.get_community_stream(community.id()).await?; - instance_b - .accept_community_invite(community.id(), invite.id()) - .await?; + instance_b.request_join_community(community.id()).await?; assert_eq!( next_event(&mut stream_a, Duration::from_secs(60)).await?, - MessageEventKind::AcceptedCommunityInvite { + MessageEventKind::CommunityJoined { community_id: community.id(), - invite_id: invite.id(), user: did_b.clone() } ); @@ -4147,14 +3970,11 @@ mod test { ); let mut stream_a = instance_a.get_community_stream(community.id()).await?; - instance_b - .accept_community_invite(community.id(), invite.id()) - .await?; + instance_b.request_join_community(community.id()).await?; assert_eq!( next_event(&mut stream_a, Duration::from_secs(60)).await?, - MessageEventKind::AcceptedCommunityInvite { + MessageEventKind::CommunityJoined { community_id: community.id(), - invite_id: invite.id(), user: did_b.clone() } ); @@ -4246,14 +4066,11 @@ mod test { ); let mut stream_a = instance_a.get_community_stream(community.id()).await?; - instance_b - .accept_community_invite(community.id(), invite.id()) - .await?; + instance_b.request_join_community(community.id()).await?; assert_eq!( next_event(&mut stream_a, Duration::from_secs(60)).await?, - MessageEventKind::AcceptedCommunityInvite { + MessageEventKind::CommunityJoined { community_id: community.id(), - invite_id: invite.id(), user: did_b.clone() } ); @@ -4342,14 +4159,11 @@ mod test { ); let mut stream_a = instance_a.get_community_stream(community.id()).await?; - instance_b - .accept_community_invite(community.id(), invite.id()) - .await?; + instance_b.request_join_community(community.id()).await?; assert_eq!( next_event(&mut stream_a, Duration::from_secs(60)).await?, - MessageEventKind::AcceptedCommunityInvite { + MessageEventKind::CommunityJoined { community_id: community.id(), - invite_id: invite.id(), user: did_b.clone() } ); @@ -4448,14 +4262,11 @@ mod test { ); let mut stream_a = instance_a.get_community_stream(community.id()).await?; - instance_b - .accept_community_invite(community.id(), invite.id()) - .await?; + instance_b.request_join_community(community.id()).await?; assert_eq!( next_event(&mut stream_a, Duration::from_secs(60)).await?, - MessageEventKind::AcceptedCommunityInvite { + MessageEventKind::CommunityJoined { community_id: community.id(), - invite_id: invite.id(), user: did_b.clone() } ); @@ -4544,14 +4355,11 @@ mod test { ); let mut stream_a = instance_a.get_community_stream(community.id()).await?; - instance_b - .accept_community_invite(community.id(), invite.id()) - .await?; + instance_b.request_join_community(community.id()).await?; assert_eq!( next_event(&mut stream_a, Duration::from_secs(60)).await?, - MessageEventKind::AcceptedCommunityInvite { + MessageEventKind::CommunityJoined { community_id: community.id(), - invite_id: invite.id(), user: did_b.clone() } ); @@ -4625,14 +4433,11 @@ mod test { ); let mut stream_a = instance_a.get_community_stream(community.id()).await?; - instance_b - .accept_community_invite(community.id(), invite.id()) - .await?; + instance_b.request_join_community(community.id()).await?; assert_eq!( next_event(&mut stream_a, Duration::from_secs(60)).await?, - MessageEventKind::AcceptedCommunityInvite { + MessageEventKind::CommunityJoined { community_id: community.id(), - invite_id: invite.id(), user: did_b.clone() } ); @@ -4680,14 +4485,11 @@ mod test { ); let mut stream_a = instance_a.get_community_stream(community.id()).await?; - instance_b - .accept_community_invite(community.id(), invite.id()) - .await?; + instance_b.request_join_community(community.id()).await?; assert_eq!( next_event(&mut stream_a, Duration::from_secs(60)).await?, - MessageEventKind::AcceptedCommunityInvite { + MessageEventKind::CommunityJoined { community_id: community.id(), - invite_id: invite.id(), user: did_b.clone() } ); @@ -4789,14 +4591,11 @@ mod test { ); let mut stream_a = instance_a.get_community_stream(community.id()).await?; - instance_b - .accept_community_invite(community.id(), invite.id()) - .await?; + instance_b.request_join_community(community.id()).await?; assert_eq!( next_event(&mut stream_a, Duration::from_secs(60)).await?, - MessageEventKind::AcceptedCommunityInvite { + MessageEventKind::CommunityJoined { community_id: community.id(), - invite_id: invite.id(), user: did_b.clone() } ); @@ -4908,14 +4707,11 @@ mod test { ); let mut stream_a = instance_a.get_community_stream(community.id()).await?; - instance_b - .accept_community_invite(community.id(), invite.id()) - .await?; + instance_b.request_join_community(community.id()).await?; assert_eq!( next_event(&mut stream_a, Duration::from_secs(60)).await?, - MessageEventKind::AcceptedCommunityInvite { + MessageEventKind::CommunityJoined { community_id: community.id(), - invite_id: invite.id(), user: did_b.clone() } ); @@ -5017,14 +4813,11 @@ mod test { ); let mut stream_a = instance_a.get_community_stream(community.id()).await?; - instance_b - .accept_community_invite(community.id(), invite.id()) - .await?; + instance_b.request_join_community(community.id()).await?; assert_eq!( next_event(&mut stream_a, Duration::from_secs(60)).await?, - MessageEventKind::AcceptedCommunityInvite { + MessageEventKind::CommunityJoined { community_id: community.id(), - invite_id: invite.id(), user: did_b.clone() } ); diff --git a/warp/src/error.rs b/warp/src/error.rs index 713fa8ca9..583fecdf8 100644 --- a/warp/src/error.rs +++ b/warp/src/error.rs @@ -33,6 +33,8 @@ pub enum Error { InvalidConversion, #[error("Attempted community is invalid")] InvalidCommunity, + #[error("Attempted community invite is invalid")] + InvalidCommunityInvite, #[error("Path supplied is invalid")] InvalidPath, #[error("Directory already exist")] @@ -163,10 +165,14 @@ pub enum Error { InvalidGroupId, #[error("Invalid Group Member")] InvalidGroupMember, + #[error("Already Community Member")] + AlreadyCommunityMember, #[error("Invalid Community Member")] InvalidCommunityMember, #[error("Invite is invalid")] InvalidInvite, + #[error("No valid Community invite")] + NoValidCommunityInvite, #[error("Invite is targeting a different user")] CommunityInviteIncorrectUser, #[error("Invite is expired")] diff --git a/warp/src/raygun/community.rs b/warp/src/raygun/community.rs index 219470a05..fa6234dc6 100644 --- a/warp/src/raygun/community.rs +++ b/warp/src/raygun/community.rs @@ -349,11 +349,7 @@ pub trait RayGunCommunity: Sync + Send { ) -> Result { Err(Error::Unimplemented) } - async fn accept_community_invite( - &mut self, - _community_id: Uuid, - _invite_id: Uuid, - ) -> Result<(), Error> { + async fn request_join_community(&mut self, _community_id: Uuid) -> Result<(), Error> { Err(Error::Unimplemented) } async fn edit_community_invite( diff --git a/warp/src/raygun/mod.rs b/warp/src/raygun/mod.rs index 9c78945c9..e26febe1d 100644 --- a/warp/src/raygun/mod.rs +++ b/warp/src/raygun/mod.rs @@ -37,6 +37,9 @@ pub enum RayGunEventKind { ConversationDeleted { conversation_id: Uuid }, CommunityCreated { community_id: Uuid }, CommunityInvited { community_id: Uuid, invite_id: Uuid }, + CommunityUninvited { community_id: Uuid, invite_id: Uuid }, + CommunityJoined { community_id: Uuid }, + CommunityJoinRejected { community_id: Uuid }, CommunityDeleted { community_id: Uuid }, } @@ -141,9 +144,8 @@ pub enum MessageEventKind { community_id: Uuid, invite_id: Uuid, }, - AcceptedCommunityInvite { + CommunityJoined { community_id: Uuid, - invite_id: Uuid, user: DID, }, EditedCommunityInvite { diff --git a/warp/src/warp.rs b/warp/src/warp.rs index 84e0c6aa9..a5f0c5c94 100644 --- a/warp/src/warp.rs +++ b/warp/src/warp.rs @@ -650,14 +650,8 @@ where .get_community_invite(community_id, invite_id) .await } - async fn accept_community_invite( - &mut self, - community_id: Uuid, - invite_id: Uuid, - ) -> Result<(), Error> { - self.raygun - .accept_community_invite(community_id, invite_id) - .await + async fn request_join_community(&mut self, community_id: Uuid) -> Result<(), Error> { + self.raygun.request_join_community(community_id).await } async fn edit_community_invite( &mut self, From 90a6e83b2328c42434d21888991a349a4d864d7b Mon Sep 17 00:00:00 2001 From: Darius Date: Thu, 2 Jan 2025 09:24:35 -0500 Subject: [PATCH 2/2] chore: bump async-rt to 0.1.3 --- extensions/warp-ipfs/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extensions/warp-ipfs/Cargo.toml b/extensions/warp-ipfs/Cargo.toml index d12941b3a..e40a5d5b9 100644 --- a/extensions/warp-ipfs/Cargo.toml +++ b/extensions/warp-ipfs/Cargo.toml @@ -38,7 +38,7 @@ image = { workspace = true } derive_more.workspace = true mediatype.workspace = true -async-rt = "0.1.2" +async-rt = "0.1.3" bincode.workspace = true bytes.workspace = true