Skip to content

Commit

Permalink
Merge branch 'main' into feat/ref-list-stream
Browse files Browse the repository at this point in the history
  • Loading branch information
dariusc93 authored Jan 2, 2025
2 parents 439ed87 + 90a6e83 commit 7c7316e
Show file tree
Hide file tree
Showing 11 changed files with 548 additions and 640 deletions.
2 changes: 1 addition & 1 deletion extensions/warp-ipfs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ image = { workspace = true }
derive_more.workspace = true
mediatype.workspace = true

async-rt = "0.1.2"
async-rt = "0.1.3"

bincode.workspace = true
bytes.workspace = true
Expand Down
8 changes: 2 additions & 6 deletions extensions/warp-ipfs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1780,13 +1780,9 @@ impl RayGunCommunity for WarpIpfs {
.get_community_invite(community_id, invite_id)
.await
}
async fn accept_community_invite(
&mut self,
community_id: Uuid,
invite_id: Uuid,
) -> Result<(), Error> {
async fn request_join_community(&mut self, community_id: Uuid) -> Result<(), Error> {
self.messaging_store()?
.accept_community_invite(community_id, invite_id)
.request_join_community(community_id)
.await
}
async fn edit_community_invite(
Expand Down
13 changes: 7 additions & 6 deletions extensions/warp-ipfs/src/store/community.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,10 @@ impl CommunityDocument {
self.id.exchange_topic(did)
}

pub fn join_topic(&self) -> String {
self.id.join_topic()
}

pub fn sign(&mut self, keypair: &Keypair) -> Result<(), Error> {
let construct = warp::crypto::hash::sha256_iter(
[
Expand Down Expand Up @@ -275,12 +279,9 @@ impl From<CommunityDocument> for Community {
}
impl CommunityDocument {
pub fn participants(&self) -> IndexSet<DID> {
self.invites
.iter()
.filter_map(|(_, invite)| invite.target_user.clone())
.chain(self.members.clone())
.chain(std::iter::once(self.owner.clone()))
.collect::<IndexSet<_>>()
let mut participants = self.members.clone();
participants.insert(self.owner.clone());
participants
}
pub fn has_valid_invite(&self, user: &DID) -> bool {
for (_, invite) in &self.invites {
Expand Down
223 changes: 147 additions & 76 deletions extensions/warp-ipfs/src/store/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ use rust_ipfs::{Ipfs, PeerId};
use serde::{Deserialize, Serialize};
use uuid::Uuid;

use super::community::CommunityInviteDocument;
use super::topics::ConversationTopic;
use super::{document::root::RootDocumentMap, ds_key::DataStoreKey, PeerIdExt};
use crate::store::CommunityJoinEvents;
use crate::store::{
conversation::ConversationDocument,
discovery::Discovery,
Expand Down Expand Up @@ -92,6 +95,7 @@ impl MessageStore {
ipfs: ipfs.clone(),
conversation_task: HashMap::new(),
community_task: HashMap::new(),
community_invites: vec![],
identity: identity.clone(),
root,
discovery,
Expand Down Expand Up @@ -903,8 +907,12 @@ impl MessageStore {
inner.list_communities_joined().await
}
pub async fn list_communities_invited_to(&self) -> Result<Vec<(Uuid, CommunityInvite)>, Error> {
let inner = &mut *self.inner.write().await;
inner.list_communities_invited_to().await
let inner = &*self.inner.read().await;
Ok(inner
.community_invites
.iter()
.map(|(community_id, i)| (*community_id, CommunityInvite::from(i.clone())))
.collect())
}
pub async fn leave_community(&mut self, community_id: Uuid) -> Result<(), Error> {
let inner = &*self.inner.read().await;
Expand Down Expand Up @@ -1024,20 +1032,38 @@ impl MessageStore {
invite_id: Uuid,
) -> Result<(), Error> {
let inner = &*self.inner.read().await;
let community_meta = inner
.community_task
.get(&community_id)
.ok_or(Error::InvalidCommunity)?;
let (tx, rx) = oneshot::channel();
let _ = community_meta
.command_tx
.clone()
.send(CommunityTaskCommand::DeleteCommunityInvite {
invite_id,
response: tx,
})
.await;
rx.await.map_err(anyhow::Error::from)?
match inner.community_task.get(&community_id) {
None => {
let keypair = inner.root.keypair();

let event = CommunityJoinEvents::DeleteInvite { invite_id };
let payload = PayloadBuilder::new(keypair, event)
.from_ipfs(&inner.ipfs)
.await?;
let bytes = payload.to_bytes()?;

if let Err(e) = inner
.ipfs
.pubsub_publish(community_id.join_topic(), bytes)
.await
{
tracing::error!(id=%community_id, "Unable to send event: {e}");
}
}
Some(community_meta) => {
let (tx, rx) = oneshot::channel();
let _ = community_meta
.command_tx
.clone()
.send(CommunityTaskCommand::DeleteCommunityInvite {
invite_id,
response: tx,
})
.await;
return rx.await.map_err(anyhow::Error::from)?;
}
}
Ok(())
}
pub async fn get_community_invite(
&mut self,
Expand All @@ -1060,26 +1086,24 @@ impl MessageStore {
.await;
rx.await.map_err(anyhow::Error::from)?
}
pub async fn accept_community_invite(
&mut self,
community_id: Uuid,
invite_id: Uuid,
) -> Result<(), Error> {
pub async fn request_join_community(&mut self, community_id: Uuid) -> Result<(), Error> {
let inner = &*self.inner.read().await;
let community_meta = inner
.community_task
.get(&community_id)
.ok_or(Error::InvalidCommunity)?;
let (tx, rx) = oneshot::channel();
let _ = community_meta
.command_tx
.clone()
.send(CommunityTaskCommand::AcceptCommunityInvite {
invite_id,
response: tx,
})
.await;
rx.await.map_err(anyhow::Error::from)?
let keypair = inner.root.keypair();

let event = CommunityJoinEvents::Join;
let payload = PayloadBuilder::new(keypair, event)
.from_ipfs(&inner.ipfs)
.await?;
let bytes = payload.to_bytes()?;

if let Err(e) = inner
.ipfs
.pubsub_publish(community_id.join_topic(), bytes)
.await
{
tracing::error!(id=%community_id, "Unable to send event: {e}");
}
Ok(())
}
pub async fn edit_community_invite(
&mut self,
Expand Down Expand Up @@ -2105,6 +2129,7 @@ struct ConversationInner {
ipfs: Ipfs,
conversation_task: HashMap<Uuid, ConversationInnerMeta>,
community_task: HashMap<Uuid, CommunityInnerMeta>,
community_invites: Vec<(Uuid, CommunityInviteDocument)>,
root: RootDocumentMap,
file: FileStore,
event: EventSubscription<RayGunEventKind>,
Expand Down Expand Up @@ -3016,8 +3041,7 @@ impl ConversationInner {
pub async fn get_community(&mut self, community_id: Uuid) -> Result<Community, Error> {
let doc = self.get_community_document(community_id).await?;
let own_did = &self.identity.did_key();
if own_did != &doc.owner && !doc.has_valid_invite(own_did) && !doc.members.contains(own_did)
{
if !doc.participants().contains(own_did) {
return Err(Error::Unauthorized);
}
Ok(doc.into())
Expand All @@ -3038,24 +3062,6 @@ impl ConversationInner {
})
.collect())
}
pub async fn list_communities_invited_to(&self) -> Result<Vec<(Uuid, CommunityInvite)>, Error> {
let own_did = &self.identity.did_key();
Ok(self
.list_community()
.await
.iter()
.filter_map(|c| {
for (_, invite) in &c.invites {
if let Some(target) = &invite.target_user {
if target == own_did {
return Some((c.id, CommunityInvite::from(invite.clone())));
}
}
}
None
})
.collect())
}
}

async fn process_conversation(
Expand Down Expand Up @@ -3196,30 +3202,18 @@ async fn process_conversation(
ConversationEvents::NewCommunityInvite {
community_id,
invite,
community_document,
} => {
let did = this.identity.did_key();

if this.contains_community(community_id).await {
return Err(anyhow::anyhow!("Already apart of {community_id}").into());
}

let recipients = community_document.participants().clone();

for recipient in &recipients {
if !this.discovery.contains(recipient).await {
let _ = this.discovery.insert(recipient).await;
let mut updated = false;
for i in this.community_invites.len()..0 {
let (community, invitation) = this.community_invites[i].clone();
if community == community_id && invitation.id == invite.id {
this.community_invites[i] = (community_id, invite.clone());
updated = true;
break;
}
}

this.set_community_document(community_document).await?;

this.create_community_task(community_id).await?;

for recipient in recipients.iter().filter(|d| did.ne(d)) {
if let Err(e) = this.request_community_key(community_id, recipient).await {
tracing::warn!(%community_id, error = %e, %recipient, "Failed to send exchange request");
}
if !updated {
this.community_invites.push((community_id, invite.clone()));
}

this.event
Expand All @@ -3229,6 +3223,83 @@ async fn process_conversation(
})
.await;
}
ConversationEvents::DeleteCommunityInvite {
community_id,
invite,
} => {
for i in this.community_invites.len()..0 {
let (community, invitation) = this.community_invites[i].clone();
if community == community_id && invitation.id == invite.id {
this.community_invites.swap_remove(i);
this.event
.emit(RayGunEventKind::CommunityUninvited {
community_id,
invite_id: invite.id,
})
.await;
break;
}
}
}
ConversationEvents::JoinCommunity {
community_id,
community_document: result,
} => match result {
None => {
this.event
.emit(RayGunEventKind::CommunityJoinRejected { community_id })
.await;
return Ok(());
}
Some(community_document) => {
for i in this.community_invites.len()..0 {
let (community, _) = this.community_invites[i];
if community == community_id {
this.community_invites.swap_remove(i);
}
}

let did = this.identity.did_key();

if this.contains_community(community_id).await {
return Ok(());
}

let recipients = community_document.participants().clone();

for recipient in &recipients {
if !this.discovery.contains(recipient).await {
let _ = this.discovery.insert(recipient).await;
}
}

this.set_community_document(community_document).await?;

this.create_community_task(community_id).await?;

for recipient in recipients.iter().filter(|d| did.ne(d)) {
if let Err(e) = this.request_community_key(community_id, recipient).await {
tracing::warn!(%community_id, error = %e, %recipient, "Failed to send exchange request");
}
}

let community_meta = this
.community_task
.get(&community_id)
.ok_or(Error::InvalidCommunity)?;
let (tx, rx) = oneshot::channel();
let _ = community_meta
.command_tx
.clone()
.send(CommunityTaskCommand::SendJoinedCommunityEvent { response: tx })
.await;
let _ = rx.await.map_err(anyhow::Error::from)?;

this.event
.emit(RayGunEventKind::CommunityJoined { community_id })
.await;
}
},
ConversationEvents::DeleteCommunity { community_id } => {
tracing::trace!("Delete community event received for {community_id}");
if !this.contains_community(community_id).await {
Expand Down
Loading

0 comments on commit 7c7316e

Please sign in to comment.