From b067f74c096a45a91f38f7f66c690b05765cf421 Mon Sep 17 00:00:00 2001 From: Darius Clark Date: Fri, 27 Oct 2023 16:16:07 -0400 Subject: [PATCH] chore: Split raw data from processing functions (#348) --- .../host_media/audio/opus/source/framer.rs | 3 +- .../src/host_media/audio/opus/source/mod.rs | 21 +- .../warp-ipfs/src/behaviour/discovery.rs | 375 ------------------ extensions/warp-ipfs/src/behaviour/mod.rs | 3 - extensions/warp-ipfs/src/lib.rs | 6 +- .../warp-ipfs/src/store/document/cache.rs | 4 +- .../src/store/document/conversation.rs | 2 +- .../warp-ipfs/src/store/document/identity.rs | 15 +- .../warp-ipfs/src/store/document/root.rs | 2 +- extensions/warp-ipfs/src/store/identity.rs | 240 ++++++----- extensions/warp-ipfs/src/store/message.rs | 24 +- extensions/warp-ipfs/tests/common.rs | 14 +- warp/src/constellation/file.rs | 2 +- warp/src/crypto/cipher.rs | 6 +- warp/src/lib.rs | 1 + 15 files changed, 171 insertions(+), 547 deletions(-) delete mode 100644 extensions/warp-ipfs/src/behaviour/discovery.rs diff --git a/extensions/warp-blink-wrtc/src/host_media/audio/opus/source/framer.rs b/extensions/warp-blink-wrtc/src/host_media/audio/opus/source/framer.rs index 5e3c968bd..04dffac79 100644 --- a/extensions/warp-blink-wrtc/src/host_media/audio/opus/source/framer.rs +++ b/extensions/warp-blink-wrtc/src/host_media/audio/opus/source/framer.rs @@ -40,8 +40,7 @@ impl Framer { let loudness_calculator = loudness::Calculator::new(frame_size); let mut buf: Vec = Vec::new(); buf.reserve(frame_size); - let mut opus_out: Vec = Vec::new(); - opus_out.resize(frame_size * 4, 0); + let opus_out: Vec = vec![0; frame_size * 4]; let mut encoder = opus::Encoder::new( webrtc_codec.sample_rate(), opus::Channels::Mono, diff --git a/extensions/warp-blink-wrtc/src/host_media/audio/opus/source/mod.rs b/extensions/warp-blink-wrtc/src/host_media/audio/opus/source/mod.rs index 05465754e..0c05fee38 100644 --- a/extensions/warp-blink-wrtc/src/host_media/audio/opus/source/mod.rs +++ b/extensions/warp-blink-wrtc/src/host_media/audio/opus/source/mod.rs @@ -327,8 +327,7 @@ mod test { let mut buf1: Vec = Vec::new(); buf1.resize(buff_size, 0_f32); - let mut buf2: Vec = Vec::new(); - buf2.resize(buff_size * 4, 0); + let mut buf2: Vec = vec![0; buff_size * 4]; encoder .encode_float(buf1.as_slice(), buf2.as_mut_slice()) @@ -343,8 +342,7 @@ mod test { let mut buf1: Vec = Vec::new(); buf1.resize(buff_size, 0_f32); - let mut buf2: Vec = Vec::new(); - buf2.resize(buff_size * 4, 0); + let mut buf2: Vec = vec![0; buff_size * 4]; encoder .encode_float(buf1.as_slice(), buf2.as_mut_slice()) @@ -359,8 +357,7 @@ mod test { let mut buf1: Vec = Vec::new(); buf1.resize(buff_size, 0_f32); - let mut buf2: Vec = Vec::new(); - buf2.resize(buff_size * 4, 0); + let mut buf2: Vec = vec![0; buff_size * 4]; encoder .encode_float(buf1.as_slice(), buf2.as_mut_slice()) @@ -375,8 +372,7 @@ mod test { let mut buf1: Vec = Vec::new(); buf1.resize(buff_size, 0_f32); - let mut buf2: Vec = Vec::new(); - buf2.resize(buff_size * 4, 0); + let mut buf2: Vec = vec![0; buff_size * 4]; encoder .encode_float(buf1.as_slice(), buf2.as_mut_slice()) @@ -391,8 +387,7 @@ mod test { let mut buf1: Vec = Vec::new(); buf1.resize(buff_size, 0_f32); - let mut buf2: Vec = Vec::new(); - buf2.resize(buff_size * 4, 0); + let mut buf2: Vec = vec![0; buff_size * 4]; encoder .encode_float(buf1.as_slice(), buf2.as_mut_slice()) @@ -404,11 +399,9 @@ mod test { let mut encoder = opus::Encoder::new(48000, opus::Channels::Mono, opus::Application::Voip).unwrap(); let buff_size = 120; - let mut buf1: Vec = Vec::new(); - buf1.resize(buff_size, 0); + let buf1: Vec = vec![0; buff_size]; - let mut buf2: Vec = Vec::new(); - buf2.resize(buff_size * 2, 0); + let mut buf2: Vec = vec![0; buff_size * 2]; encoder .encode(buf1.as_slice(), buf2.as_mut_slice()) diff --git a/extensions/warp-ipfs/src/behaviour/discovery.rs b/extensions/warp-ipfs/src/behaviour/discovery.rs deleted file mode 100644 index 416147733..000000000 --- a/extensions/warp-ipfs/src/behaviour/discovery.rs +++ /dev/null @@ -1,375 +0,0 @@ -use std::{ - collections::{hash_map::Entry, HashMap, HashSet, VecDeque}, - task::{Context, Poll}, -}; - -use rust_ipfs::libp2p::{ - core::Endpoint, - rendezvous::{Cookie, Namespace}, - swarm::{ - derive_prelude::ConnectionEstablished, ConnectionClosed, ConnectionDenied, ConnectionId, - FromSwarm, PollParameters, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, - }, - Multiaddr, PeerId, -}; -use rust_ipfs::NetworkBehaviour; - -use futures::{channel::oneshot::Sender as OneshotSender, StreamExt}; - -use tracing::info; -use warp::error::Error; - -#[allow(dead_code)] -pub enum DiscoveryCommand { - RegisterDiscoveryNode { - peer_id: PeerId, - addr: Multiaddr, - namespace: String, - response: OneshotSender>, - }, - RegisterNamespace { - namespace: String, - peer_id: PeerId, - response: OneshotSender>, - }, - Refresh { - namespace: String, - response: OneshotSender>, - }, -} - -pub struct Behaviour { - events: VecDeque::ToSwarm, THandlerInEvent>>, - inner: rust_ipfs::libp2p::rendezvous::client::Behaviour, - command: futures::channel::mpsc::Receiver, - - /// External nodes - namespace_peer_addrs: HashMap>, - namespace_point: HashMap>, - - /// Cookie of the namespace - cookie: HashMap>>, - - /// Discovered peers from a given namespace - discovered_peers: HashMap>>, - discovery_responses: HashMap>>>, - pending_registration: HashMap>>>, - connections: HashMap>, -} - -#[allow(dead_code)] -impl Behaviour { - pub fn new( - keypair: rust_ipfs::Keypair, - command: futures::channel::mpsc::Receiver, - ) -> Self { - Behaviour { - events: Default::default(), - namespace_peer_addrs: HashMap::new(), - inner: rust_ipfs::libp2p::rendezvous::client::Behaviour::new(keypair), - namespace_point: Default::default(), - command, - cookie: Default::default(), - discovered_peers: Default::default(), - discovery_responses: Default::default(), - pending_registration: Default::default(), - connections: HashMap::default(), - } - } -} - -impl NetworkBehaviour for Behaviour { - type ConnectionHandler = - ::ConnectionHandler; - type ToSwarm = void::Void; - - fn handle_established_inbound_connection( - &mut self, - connection_id: ConnectionId, - peer: PeerId, - local_addr: &Multiaddr, - remote_addr: &Multiaddr, - ) -> Result, ConnectionDenied> { - self.inner.handle_established_inbound_connection( - connection_id, - peer, - local_addr, - remote_addr, - ) - } - - fn handle_established_outbound_connection( - &mut self, - connection_id: ConnectionId, - peer: PeerId, - addr: &Multiaddr, - role_override: Endpoint, - ) -> Result, ConnectionDenied> { - self.inner - .handle_established_outbound_connection(connection_id, peer, addr, role_override) - } - - fn handle_pending_outbound_connection( - &mut self, - connection_id: ConnectionId, - maybe_peer: Option, - addresses: &[Multiaddr], - effective_role: Endpoint, - ) -> Result, ConnectionDenied> { - self.inner.handle_pending_outbound_connection( - connection_id, - maybe_peer, - addresses, - effective_role, - ) - } - - fn on_connection_handler_event( - &mut self, - peer_id: PeerId, - connection_id: ConnectionId, - event: THandlerOutEvent, - ) { - self.inner - .on_connection_handler_event(peer_id, connection_id, event); - } - - fn on_swarm_event(&mut self, event: FromSwarm) { - match event { - FromSwarm::ConnectionEstablished(ConnectionEstablished { - peer_id, - connection_id, - .. - }) => match self.connections.entry(peer_id) { - Entry::Occupied(mut entry) => { - let connections = entry.get_mut(); - if !connections.contains(&connection_id) { - connections.push(connection_id); - } - } - Entry::Vacant(entry) => { - entry.insert(vec![connection_id]); - } - }, - FromSwarm::ConnectionClosed(ConnectionClosed { - peer_id, - connection_id, - .. - }) => { - if let Entry::Occupied(mut entry) = self.connections.entry(peer_id) { - let connections = entry.get_mut(); - connections.retain(|conn| conn != &connection_id); - if connections.is_empty() { - entry.remove(); - } - } - } - _ => {} - } - - self.inner.on_swarm_event(event); - } - - fn poll( - &mut self, - cx: &mut Context, - params: &mut impl PollParameters, - ) -> Poll>> { - if let Some(event) = self.events.pop_front() { - return Poll::Ready(event); - } - - loop { - match self.inner.poll(cx, params) { - Poll::Ready(ToSwarm::GenerateEvent(event)) => match event { - rust_ipfs::libp2p::rendezvous::client::Event::Discovered { - rendezvous_node, - registrations, - cookie, - } => { - if !self.namespace_peer_addrs.contains_key(&rendezvous_node) { - continue; - } - - let mut namespaces = HashSet::new(); - - for registeration in registrations { - let namespace = registeration.namespace.clone(); - let peer_id = registeration.record.peer_id(); - let addresses = registeration.record.addresses(); - let entry = self - .discovered_peers - .entry(namespace.clone()) - .or_default() - .entry(peer_id) - .or_default(); - - for addr in addresses { - if !entry.contains(addr) { - entry.push(addr.clone()); - } - } - - let entry = self - .cookie - .entry(namespace.clone()) - .or_default() - .entry(rendezvous_node) - .or_default(); - - *entry = Some(cookie.clone()); - - namespaces.insert(namespace); - } - - for ns in namespaces { - let channels = match self.discovery_responses.remove(&ns) { - Some(chs) => chs, - None => continue, - }; - - for ch in channels { - let _ = ch.send(Ok(())); - } - } - } - rust_ipfs::libp2p::rendezvous::client::Event::DiscoverFailed { - namespace, - error, - .. - } => { - let namespace = match namespace { - Some(ns) => ns, - None => continue, - }; - - let channels = match self.discovery_responses.remove(&namespace) { - Some(chs) => chs, - None => continue, - }; - - for ch in channels { - let _ = ch.send(Err(Error::Any(anyhow::anyhow!( - "Error performing discovery: {:?}", - error - )))); - } - } - rust_ipfs::libp2p::rendezvous::client::Event::Registered { - rendezvous_node, - ttl, - namespace, - } => { - info!("Registered to {rendezvous_node} for namespace {namespace} with ttl {ttl}"); - if let Some(channels) = self.pending_registration.remove(&namespace) { - for ch in channels { - let _ = ch.send(Ok(())); - } - } - } - rust_ipfs::libp2p::rendezvous::client::Event::RegisterFailed { - rendezvous_node, - namespace, - error, - } => { - if let Some(channels) = self.pending_registration.remove(&namespace) { - for ch in channels { - let _ = ch.send(Err(Error::Any(anyhow::anyhow!( - "Error registering to discovery node {rendezvous_node}: {:?}", - error - )))); - } - } - } - rust_ipfs::libp2p::rendezvous::client::Event::Expired { .. } => {} - }, - Poll::Ready(event) => { - let new_to_swarm = - event.map_out(|_| unreachable!("we manually map `GenerateEvent` variants")); - return Poll::Ready(new_to_swarm); - } - Poll::Pending => break, - } - } - - loop { - match self.command.poll_next_unpin(cx) { - Poll::Ready(Some(DiscoveryCommand::RegisterDiscoveryNode { - peer_id: _, - addr: _, - namespace: _, - response, - })) => { - let _ = response.send(Ok(())); - } - Poll::Ready(Some(DiscoveryCommand::RegisterNamespace { - namespace, - peer_id, - response, - })) => { - let namespace = match Namespace::new(namespace) { - Ok(ns) => ns, - Err(e) => { - let _ = response.send(Err(Error::Boxed(Box::new(e)))); - continue; - } - }; - - if let Err(e) = self.inner.register(namespace.clone(), peer_id, None) { - let _ = response.send(Err(Error::Boxed(Box::new(e)))); - continue; - } - - self.pending_registration - .entry(namespace) - .or_default() - .push(response); - } - Poll::Ready(Some(DiscoveryCommand::Refresh { - namespace, - response, - })) => { - let namespace = match Namespace::new(namespace) { - Ok(ns) => ns, - Err(e) => { - let _ = response.send(Err(Error::Boxed(Box::new(e)))); - continue; - } - }; - - let nodes = match self.namespace_point.get(&namespace) { - Some(nodes) => nodes, - None => { - let _ = response.send(Err(Error::OtherWithContext( - "Namespace is not registered".into(), - ))); - continue; - } - }; - - for peer_id in nodes.iter() { - let cookie_store = match self.cookie.get(&namespace) { - Some(cs) => cs, - None => continue, - }; - - let cookie = cookie_store.get(peer_id).cloned().flatten(); - if cookie.is_none() { - continue; - } - self.inner - .discover(Some(namespace.clone()), cookie, None, *peer_id); - } - - self.discovery_responses - .entry(namespace) - .or_default() - .push(response); - } - Poll::Ready(None) => unreachable!("Channels are owned"), - Poll::Pending => break, - } - } - Poll::Pending - } -} diff --git a/extensions/warp-ipfs/src/behaviour/mod.rs b/extensions/warp-ipfs/src/behaviour/mod.rs index 373911a0e..91555f5ee 100644 --- a/extensions/warp-ipfs/src/behaviour/mod.rs +++ b/extensions/warp-ipfs/src/behaviour/mod.rs @@ -1,13 +1,10 @@ -pub mod discovery; pub mod phonebook; use libp2p::swarm::NetworkBehaviour; use rust_ipfs::libp2p; -use rust_ipfs::libp2p::swarm::behaviour::toggle::Toggle; #[derive(NetworkBehaviour)] #[behaviour(prelude = "libp2p::swarm::derive_prelude", to_swarm = "void::Void")] pub struct Behaviour { pub phonebook: phonebook::Behaviour, - pub rz_discovery: Toggle, } diff --git a/extensions/warp-ipfs/src/lib.rs b/extensions/warp-ipfs/src/lib.rs index 01d4cb605..718dbf560 100644 --- a/extensions/warp-ipfs/src/lib.rs +++ b/extensions/warp-ipfs/src/lib.rs @@ -69,6 +69,7 @@ use warp::multipass::{ use crate::config::Bootstrap; use crate::store::discovery::Discovery; +use crate::store::phonebook::PhoneBook; use crate::store::{ecdh_decrypt, ecdh_encrypt}; #[derive(Clone)] @@ -267,7 +268,6 @@ impl WarpIpfs { let behaviour = behaviour::Behaviour { phonebook: behaviour::phonebook::Behaviour::new(self.multipass_tx.clone(), pb_rx), - rz_discovery: None.into(), }; info!("Starting ipfs"); @@ -454,13 +454,15 @@ impl WarpIpfs { relays.clone(), ); + let phonebook = PhoneBook::new(discovery.clone(), pb_tx); + info!("Initializing identity profile"); let identity_store = IdentityStore::new( ipfs.clone(), config.path.clone(), tesseract.clone(), self.multipass_tx.clone(), - pb_tx, + phonebook, &config, discovery.clone(), ) diff --git a/extensions/warp-ipfs/src/store/document/cache.rs b/extensions/warp-ipfs/src/store/document/cache.rs index 52cf8863d..e71fa5a2f 100644 --- a/extensions/warp-ipfs/src/store/document/cache.rs +++ b/extensions/warp-ipfs/src/store/document/cache.rs @@ -57,7 +57,7 @@ impl IdentityCache { None => None, }; - let (tx, rx) = futures::channel::mpsc::channel(1); + let (tx, rx) = futures::channel::mpsc::channel(0); let mut task = IdentityCacheTask { ipfs: ipfs.clone(), @@ -169,7 +169,7 @@ impl IdentityCacheTask { match old_document { Some(old_document) => { if !old_document.different(&document) { - let _ = response.send(Err(Error::IdentityExist)); + let _ = response.send(Ok(None)); continue; } diff --git a/extensions/warp-ipfs/src/store/document/conversation.rs b/extensions/warp-ipfs/src/store/document/conversation.rs index c413cde8d..efc066267 100644 --- a/extensions/warp-ipfs/src/store/document/conversation.rs +++ b/extensions/warp-ipfs/src/store/document/conversation.rs @@ -89,7 +89,7 @@ impl Conversations { None => None, }; - let (tx, rx) = futures::channel::mpsc::channel(1); + let (tx, rx) = futures::channel::mpsc::channel(0); let mut task = ConversationTask { ipfs: ipfs.clone(), diff --git a/extensions/warp-ipfs/src/store/document/identity.rs b/extensions/warp-ipfs/src/store/document/identity.rs index 0e74d1d08..5951192dd 100644 --- a/extensions/warp-ipfs/src/store/document/identity.rs +++ b/extensions/warp-ipfs/src/store/document/identity.rs @@ -107,17 +107,20 @@ impl IdentityDocument { return false; } - if self.username != other.username + if other.verify().is_err() { + tracing::warn!( + "identity for {} is not valid, corrupted or been tampered with.", + self.did + ); + return false; + } + + self.username != other.username || self.status_message != other.status_message || self.status != other.status || self.profile_banner != other.profile_banner || self.profile_picture != other.profile_picture || self.platform != other.platform - { - return other.verify().is_ok(); - } - - false } } diff --git a/extensions/warp-ipfs/src/store/document/root.rs b/extensions/warp-ipfs/src/store/document/root.rs index 4d616197f..89963da7a 100644 --- a/extensions/warp-ipfs/src/store/document/root.rs +++ b/extensions/warp-ipfs/src/store/document/root.rs @@ -121,7 +121,7 @@ impl RootDocumentMap { None => None, }; - let (tx, rx) = futures::channel::mpsc::channel(1); + let (tx, rx) = futures::channel::mpsc::channel(0); let mut task = RootDocumentTask { ipfs: ipfs.clone(), diff --git a/extensions/warp-ipfs/src/store/identity.rs b/extensions/warp-ipfs/src/store/identity.rs index 132ba7c85..a98032e21 100644 --- a/extensions/warp-ipfs/src/store/identity.rs +++ b/extensions/warp-ipfs/src/store/identity.rs @@ -2,7 +2,6 @@ //onto the lock. #![allow(clippy::clone_on_copy)] use crate::{ - behaviour::phonebook::PhoneBookCommand, config::{self, Discovery as DiscoveryConfig, UpdateEvents}, store::{did_to_libp2p_pub, discovery::Discovery, DidExt, PeerIdExt, PeerTopic}, }; @@ -149,24 +148,6 @@ pub enum RequestType { Outgoing, } -#[allow(clippy::large_enum_variant)] -pub enum RootDocumentEvents { - Get(oneshot::Sender>), - Set(RootDocument, oneshot::Sender>), - AddFriend(DID, oneshot::Sender>), - RemoveFriend(DID, oneshot::Sender>), - GetFriendList(oneshot::Sender, Error>>), - AddRequest(Request, oneshot::Sender>), - RemoveRequest(Request, oneshot::Sender>), - GetRequestList(oneshot::Sender, Error>>), - AddBlock(DID, oneshot::Sender>), - RemoveBlock(DID, oneshot::Sender>), - GetBlockList(oneshot::Sender, Error>>), - AddBlockBy(DID, oneshot::Sender>), - RemoveBlockBy(DID, oneshot::Sender>), - GetBlockByList(oneshot::Sender, Error>>), -} - #[allow(clippy::large_enum_variant)] #[derive(Debug, Clone)] pub enum LookupBy { @@ -234,7 +215,7 @@ impl IdentityStore { path: Option, tesseract: Tesseract, tx: broadcast::Sender, - pb_tx: futures::channel::mpsc::Sender, + phonebook: PhoneBook, config: &config::Config, discovery: Discovery, ) -> Result { @@ -262,8 +243,6 @@ impl IdentityStore { discovery.clone(), ); - let phonebook = PhoneBook::new(discovery.clone(), pb_tx); - let signal = Default::default(); let store = Self { @@ -362,12 +341,30 @@ impl IdentityStore { }, None => continue, }; - if let Some(in_did) = entry { - if let Err(e) = store.process_message(in_did, &message.data).await { - error!("Error: {e}"); + + let Some(in_did) = entry else { + continue; + }; + + log::info!("Received event from {in_did}"); + + let event = match ecdh_decrypt(&store.did_key, Some(&in_did), &message.data).and_then(|bytes| { + serde_json::from_slice::(&bytes).map_err(Error::from) + }) { + Ok(e) => e, + Err(e) => { + error!("Failed to decrypt payload from {in_did}: {e}"); + continue; } + }; + + log::debug!("Event: {event:?}"); + + if let Err(e) = store.process_message(&in_did, event).await { + error!("Failed to process identity message from {in_did}: {e}"); } + } Some(event) = friend_stream.next() => { let Some(peer_id) = event.source else { @@ -382,8 +379,33 @@ impl IdentityStore { continue; }; - if let Err(e) = store.check_request_message(&did, &event.data).await { - error!("Error: {e}"); + let mut signal = store.signal.write().await.remove(&did); + + log::trace!("received payload size: {} bytes", event.data.len()); + + log::info!("Received event from {did}"); + + let data = match ecdh_decrypt(&store.did_key, Some(&did), &event.data).and_then(|bytes| { + serde_json::from_slice::(&bytes).map_err(Error::from) + }) { + Ok(pl) => pl, + Err(e) => { + if let Some(tx) = signal { + let _ = tx.send(Err(e)); + } + continue; + } + }; + + log::debug!("Event from {did}: {:?}", data.event); + + let result = store.check_request_message(&did, data, &mut signal).await.map_err(|e| { + error!("Error processing message: {e}"); + e + }); + + if let Some(tx) = signal { + let _ = tx.send(result); } } // Used as the initial request/push @@ -414,18 +436,13 @@ impl IdentityStore { } //TODO: Implement Errors - #[tracing::instrument(skip(self, data))] - async fn check_request_message(&mut self, did: &DID, data: &[u8]) -> anyhow::Result<()> { - let pk_did = &*self.did_key; - - let bytes = ecdh_decrypt(pk_did, Some(did), data)?; - - log::trace!("received payload size: {} bytes", bytes.len()); - - let data = serde_json::from_slice::(&bytes)?; - - log::info!("Received event from {did}"); - + #[tracing::instrument(skip(self, data, signal))] + async fn check_request_message( + &mut self, + did: &DID, + data: RequestResponsePayload, + signal: &mut Option>>, + ) -> Result<(), Error> { if self .list_incoming_request() .await @@ -437,11 +454,6 @@ impl IdentityStore { return Ok(()); } - //TODO: Send error if dropped early due to error when processing request - let mut signal = self.signal.write().await.remove(&data.sender); - - log::debug!("Event {:?}", data.event); - // Before we validate the request, we should check to see if the key is blocked // If it is, skip the request so we dont wait resources storing it. if self.is_blocked(&data.sender).await? && !matches!(data.event, Event::Block) { @@ -451,10 +463,9 @@ impl IdentityStore { event: Event::Block, }; - self.broadcast_request((&data.sender, &payload), false, true) - .await?; - - return Ok(()); + return self + .broadcast_request((&data.sender, &payload), false, true) + .await; } match data.event { @@ -467,16 +478,16 @@ impl IdentityStore { .find(|req| data.sender.eq(req.did())) .cloned() else { - anyhow::bail!( + return Err(Error::from(anyhow::anyhow!( "Unable to locate pending request. Already been accepted or rejected?" - ) + ))); }; // Maybe just try the function instead and have it be a hard error? if self.root_document.remove_request(&item).await.is_err() { - anyhow::bail!( + return Err(Error::from(anyhow::anyhow!( "Unable to locate pending request. Already been accepted or rejected?" - ) + ))); } self.add_friend(item.did()).await?; @@ -489,10 +500,9 @@ impl IdentityStore { event: Event::Accept, }; - self.broadcast_request((&data.sender, &payload), false, false) - .await?; - - return Ok(()); + return self + .broadcast_request((&data.sender, &payload), false, false) + .await; } let list = self.list_all_raw_request().await?; @@ -513,28 +523,13 @@ impl IdentityStore { .add_request(&Request::In(data.sender.clone())) .await?; - tokio::spawn({ - let store = self.clone(); - let from = data.sender.clone(); - async move { - let _ = tokio::time::timeout(Duration::from_secs(10), async { - loop { - if let Ok(list) = - store.lookup(LookupBy::DidKey(from.clone())).await - { - if !list.is_empty() { - break; - } - } - tokio::time::sleep(Duration::from_secs(1)).await; - } - }) - .await - .ok(); + let from = data.sender.clone(); - store.emit_event(MultiPassEventKind::FriendRequestReceived { from }); - } - }); + if self.identity_cache.get(&from).await.is_err() { + self.request(&from, RequestOption::Identity).await?; + } + + self.emit_event(MultiPassEventKind::FriendRequestReceived { from }); } let payload = RequestResponsePayload { sender: (*self.did_key).clone(), @@ -611,7 +606,7 @@ impl IdentityStore { self.emit_event(MultiPassEventKind::BlockedBy { did: data.sender }); } - if let Some(tx) = std::mem::take(&mut signal) { + if let Some(tx) = signal.take() { log::debug!("Signaling broadcast of response..."); let _ = tx.send(Err(Error::BlockedByUser)); } @@ -633,16 +628,12 @@ impl IdentityStore { } } Event::Response => { - if let Some(tx) = std::mem::take(&mut signal) { + if let Some(tx) = signal.take() { log::debug!("Signaling broadcast of response..."); let _ = tx.send(Ok(())); } } }; - if let Some(tx) = std::mem::take(&mut signal) { - log::debug!("Signaling broadcast of response..."); - let _ = tx.send(Ok(())); - } Ok(()) } @@ -668,20 +659,24 @@ impl IdentityStore { #[tracing::instrument(skip(self))] pub async fn request(&self, out_did: &DID, option: RequestOption) -> Result<(), Error> { - let pk_did = self.get_keypair_did()?; + let out_peer_id = out_did.to_peer_id()?; + + if !self.ipfs.is_connected(out_peer_id).await? { + return Err(Error::IdentityDoesntExist); + } + + let pk_did = &*self.did_key; let event = IdentityEvent::Request { option }; let payload_bytes = serde_json::to_vec(&event)?; - let bytes = ecdh_encrypt(&pk_did, Some(out_did), payload_bytes)?; + let bytes = ecdh_encrypt(pk_did, Some(out_did), payload_bytes)?; log::trace!("Payload size: {} bytes", bytes.len()); log::info!("Sending event to {out_did}"); - let out_peer_id = did_to_libp2p_pub(out_did)?.to_peer_id(); - if self .ipfs .pubsub_peers(Some(out_did.events())) @@ -700,7 +695,13 @@ impl IdentityStore { #[tracing::instrument(skip(self))] pub async fn push(&self, out_did: &DID) -> Result<(), Error> { - let pk_did = self.get_keypair_did()?; + let out_peer_id = out_did.to_peer_id()?; + + if !self.ipfs.is_connected(out_peer_id).await? { + return Err(Error::IdentityDoesntExist); + } + + let pk_did = &*self.did_key; let mut identity = self.own_identity_document().await?; @@ -753,14 +754,12 @@ impl IdentityStore { let payload_bytes = serde_json::to_vec(&event)?; - let bytes = ecdh_encrypt(&pk_did, Some(out_did), payload_bytes)?; + let bytes = ecdh_encrypt(pk_did, Some(out_did), payload_bytes)?; log::trace!("Payload size: {} bytes", bytes.len()); log::info!("Sending event to {out_did}"); - let out_peer_id = did_to_libp2p_pub(out_did)?.to_peer_id(); - if self .ipfs .pubsub_peers(Some(out_did.events())) @@ -779,7 +778,13 @@ impl IdentityStore { #[tracing::instrument(skip(self))] pub async fn push_profile_picture(&self, out_did: &DID, cid: Cid) -> Result<(), Error> { - let pk_did = self.get_keypair_did()?; + let out_peer_id = out_did.to_peer_id()?; + + if !self.ipfs.is_connected(out_peer_id).await? { + return Err(Error::IdentityDoesntExist); + } + + let pk_did = &*self.did_key; let identity = self.own_identity_document().await?; @@ -811,14 +816,12 @@ impl IdentityStore { let payload_bytes = serde_json::to_vec(&event)?; - let bytes = ecdh_encrypt(&pk_did, Some(out_did), payload_bytes)?; + let bytes = ecdh_encrypt(pk_did, Some(out_did), payload_bytes)?; log::trace!("Payload size: {} bytes", bytes.len()); log::info!("Sending event to {out_did}"); - let out_peer_id = did_to_libp2p_pub(out_did)?.to_peer_id(); - if self .ipfs .pubsub_peers(Some(out_did.events())) @@ -837,7 +840,13 @@ impl IdentityStore { #[tracing::instrument(skip(self))] pub async fn push_profile_banner(&self, out_did: &DID, cid: Cid) -> Result<(), Error> { - let pk_did = self.get_keypair_did()?; + let out_peer_id = out_did.to_peer_id()?; + + if !self.ipfs.is_connected(out_peer_id).await? { + return Err(Error::IdentityDoesntExist); + } + + let pk_did = &*self.did_key; let identity = self.own_identity_document().await?; @@ -868,14 +877,12 @@ impl IdentityStore { let payload_bytes = serde_json::to_vec(&event)?; - let bytes = ecdh_encrypt(&pk_did, Some(out_did), payload_bytes)?; + let bytes = ecdh_encrypt(pk_did, Some(out_did), payload_bytes)?; log::trace!("Payload size: {} bytes", bytes.len()); log::info!("Sending event to {out_did}"); - let out_peer_id = did_to_libp2p_pub(out_did)?.to_peer_id(); - if self .ipfs .pubsub_peers(Some(out_did.events())) @@ -892,26 +899,18 @@ impl IdentityStore { Ok(()) } - #[tracing::instrument(skip(self, message))] + #[tracing::instrument(skip(self))] #[allow(clippy::if_same_then_else)] - async fn process_message(&mut self, in_did: DID, message: &[u8]) -> anyhow::Result<()> { - let pk_did = self.get_keypair_did()?; - - let bytes = ecdh_decrypt(&pk_did, Some(&in_did), message)?; - - log::info!("Received event from {in_did}"); - let event = serde_json::from_slice::(&bytes)?; - - log::debug!("Event: {event:?}"); + async fn process_message(&mut self, in_did: &DID, event: IdentityEvent) -> anyhow::Result<()> { match event { IdentityEvent::Request { option } => match option { - RequestOption::Identity => self.push(&in_did).await?, + RequestOption::Identity => self.push(in_did).await?, RequestOption::Image { banner, picture } => { if let Some(cid) = banner { - self.push_profile_banner(&in_did, cid).await?; + self.push_profile_banner(in_did, cid).await?; } if let Some(cid) = picture { - self.push_profile_picture(&in_did, cid).await?; + self.push_profile_picture(in_did, cid).await?; } } }, @@ -922,7 +921,7 @@ impl IdentityStore { // let _pk = did_to_libp2p_pub(&raw_object.did)?; //TODO: Remove upon offline implementation - anyhow::ensure!(identity.did == in_did, "Payload doesnt match identity"); + anyhow::ensure!(identity.did.eq(in_did), "Payload doesnt match identity"); // Validate after making sure the identity did matches the payload identity.verify()?; @@ -974,7 +973,7 @@ impl IdentityStore { if !self.config.store_setting.fetch_over_bitswap { if let Err(e) = self .request( - &in_did, + in_did, RequestOption::Image { banner: None, picture: identity.profile_picture, @@ -1030,7 +1029,7 @@ impl IdentityStore { if !self.config.store_setting.fetch_over_bitswap { if let Err(e) = self .request( - &in_did, + in_did, RequestOption::Image { banner: identity.profile_banner, picture: None, @@ -1129,7 +1128,7 @@ impl IdentityStore { if banner.is_some() || picture.is_some() { if !self.config.store_setting.fetch_over_bitswap { - self.request(&in_did, RequestOption::Image { banner, picture }) + self.request(in_did, RequestOption::Image { banner, picture }) .await?; } else { if let Some(picture) = picture { @@ -1207,11 +1206,12 @@ impl IdentityStore { IdentityEvent::Receive { option: ResponseOption::Image { cid, ty, data }, } => { - let cache = self.identity_cache.get(&in_did).await?; + let cache = self.identity_cache.get(in_did).await?; if cache.profile_picture == Some(cid) || cache.profile_banner == Some(cid) { tokio::spawn({ let store = self.clone(); + let did = in_did.clone(); async move { let added_cid = super::document::image_dag::store_photo( &store.ipfs, @@ -1222,9 +1222,7 @@ impl IdentityStore { .await?; debug_assert_eq!(added_cid, cid); - store.emit_event(MultiPassEventKind::IdentityUpdate { - did: in_did.clone(), - }); + store.emit_event(MultiPassEventKind::IdentityUpdate { did }); Ok::<_, Error>(()) } }); @@ -1476,7 +1474,7 @@ impl IdentityStore { let identity = identity.sign(&kp)?; - log::debug!("Updateing document"); + log::debug!("Updating document"); let mut root_document = self.root_document.get().await?; let ident_cid = identity.to_cid(&self.ipfs).await?; root_document.identity = ident_cid; diff --git a/extensions/warp-ipfs/src/store/message.rs b/extensions/warp-ipfs/src/store/message.rs index 58e1ac7ce..fc14e0602 100644 --- a/extensions/warp-ipfs/src/store/message.rs +++ b/extensions/warp-ipfs/src/store/message.rs @@ -741,7 +741,7 @@ impl MessageStore { .message_event(conversation_id, &event, direction, Default::default()) .await { - error!("Error processing message: {e}"); + error!("Failure while processing message in {conversation_id}: {e}"); if let Some(ret) = ret { let _ = ret.send(Err(e)).ok(); } @@ -765,7 +765,7 @@ impl MessageStore { events: &MessagingEvents, direction: MessageDirection, opt: EventOpt, - ) -> Result { + ) -> Result<(), Error> { let tx = self.get_conversation_sender(conversation_id).await?; let mut document = self.conversations.get(conversation_id).await?; @@ -1003,7 +1003,7 @@ impl MessageStore { let event = match state { PinState::Pin => { if message.pinned() { - return Ok(false); + return Ok(()); } *message.pinned_mut() = true; MessageEventKind::MessagePinned { @@ -1013,7 +1013,7 @@ impl MessageStore { } PinState::Unpin => { if !message.pinned() { - return Ok(false); + return Ok(()); } *message.pinned_mut() = false; MessageEventKind::MessageUnpinned { @@ -1154,15 +1154,9 @@ impl MessageStore { document.signature = Some(signature); self.conversations.set(document).await?; - tokio::spawn({ - let store = self.clone(); - let recipient = recipient.clone(); - async move { - if let Err(e) = store.request_key(conversation_id, &recipient).await { - error!("Error requesting key: {e}"); - } - } - }); + if let Err(e) = self.request_key(conversation_id, &recipient).await { + error!("Error requesting key: {e}"); + } if let Err(e) = tx.send(MessageEventKind::RecipientAdded { conversation_id, @@ -1216,7 +1210,7 @@ impl MessageStore { } if let Some(current_name) = document.name() { if current_name.eq(&name) { - return Ok(false); + return Ok(()); } } @@ -1234,7 +1228,7 @@ impl MessageStore { } _ => {} } - Ok(false) + Ok(()) } async fn end_task(&self, conversation_id: Uuid) { diff --git a/extensions/warp-ipfs/tests/common.rs b/extensions/warp-ipfs/tests/common.rs index 83bf7e3a0..bf0f88762 100644 --- a/extensions/warp-ipfs/tests/common.rs +++ b/extensions/warp-ipfs/tests/common.rs @@ -11,6 +11,8 @@ use warp_ipfs::{ WarpIpfsBuilder, }; +use tracing_subscriber::{fmt, prelude::*, EnvFilter}; + pub async fn node_info(nodes: Vec) -> Vec<(Ipfs, PeerId, Vec)> { stream::iter(nodes) .filter_map(|node| async move { @@ -78,6 +80,11 @@ pub async fn create_account( pub async fn create_accounts( infos: Vec<(Option<&str>, Option<&str>, Option)>, ) -> anyhow::Result, DID, Identity)>> { + let _ = tracing_subscriber::registry() + .with(fmt::layer().pretty()) + .with(EnvFilter::from_default_env()) + .try_init(); + let mut accounts = vec![]; let mut nodes = vec![]; for (username, passphrase, context) in infos { @@ -132,6 +139,11 @@ pub async fn create_account_and_chat( pub async fn create_accounts_and_chat( infos: Vec<(Option<&str>, Option<&str>, Option)>, ) -> anyhow::Result, Box, DID, Identity)>> { + let _ = tracing_subscriber::registry() + .with(fmt::layer().pretty()) + .with(EnvFilter::from_default_env()) + .try_init(); + let mut accounts = vec![]; let mut nodes = vec![]; for (username, passphrase, context) in infos { @@ -546,4 +558,4 @@ pub const PROFILE_IMAGE: &[u8] = &[ 1, 80, 6, 133, 5, 64, 25, 20, 22, 0, 101, 80, 88, 0, 148, 65, 97, 1, 80, 6, 133, 5, 64, 25, 20, 22, 0, 101, 80, 88, 0, 148, 241, 127, 2, 0, 0, 255, 255, 236, 61, 134, 14, 191, 70, 226, 46, 0, 0, 0, 0, 73, 69, 78, 68, 174, 66, 96, 130, -]; \ No newline at end of file +]; diff --git a/warp/src/constellation/file.rs b/warp/src/constellation/file.rs index caac9d47c..cc5b38660 100644 --- a/warp/src/constellation/file.rs +++ b/warp/src/constellation/file.rs @@ -466,6 +466,6 @@ pub mod ffi { false => CStr::from_ptr(name).to_string_lossy().to_string(), }; let file = Box::new(File::new(name.as_str())); - Box::into_raw(file) as *mut File + Box::into_raw(file) } } diff --git a/warp/src/crypto/cipher.rs b/warp/src/crypto/cipher.rs index 464819627..6d8bb1031 100644 --- a/warp/src/crypto/cipher.rs +++ b/warp/src/crypto/cipher.rs @@ -252,7 +252,7 @@ impl Cipher { } }; } - Ok(read_count) if read_count == 0 => break, + Ok(0) => break, Ok(read_count) => { match stream.decrypt_last(&buffer[..read_count]).map_err(|_| Error::DecryptionStreamError) { Ok(data) => { @@ -350,7 +350,7 @@ impl Cipher { } }; } - Ok(read_count) if read_count == 0 => break, + Ok(0) => break, Ok(read_count) => { match stream.decrypt_last(&buffer[..read_count]).map_err(|_| Error::DecryptionStreamError) { Ok(data) => { @@ -460,7 +460,7 @@ impl Cipher { writer.write_all(&plaintext)? } - Ok(read_count) if read_count == 0 => break, + Ok(0) => break, Ok(read_count) => { let plaintext = stream .decrypt_last(&buffer[..read_count]) diff --git a/warp/src/lib.rs b/warp/src/lib.rs index e2ded726c..c032f4a15 100644 --- a/warp/src/lib.rs +++ b/warp/src/lib.rs @@ -1,3 +1,4 @@ +#![allow(non_camel_case_types)] #![allow(clippy::result_large_err)] pub mod sync { pub use parking_lot::{Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard};