From 8e0b090197f2d7360034eb3d344e2a0a1fa6a3f5 Mon Sep 17 00:00:00 2001 From: Amr Bashir Date: Wed, 4 Oct 2023 06:19:25 +0300 Subject: [PATCH 1/5] chore: `tauri-mobile` -> `cargo-mobile2` (#331) --- Dockerfile.aarch64-android | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile.aarch64-android b/Dockerfile.aarch64-android index 21334cc71..d216b1e3a 100644 --- a/Dockerfile.aarch64-android +++ b/Dockerfile.aarch64-android @@ -24,7 +24,7 @@ RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | bash -s -- -y # RUN . $HOME/.cargo/env && rustup target add aarch64-unknown-linux-gnu # RUN . $HOME/.cargo/env && rustup toolchain install stable-aarch64-unknown-linux-gnu RUN . $HOME/.cargo/env && rustup target add aarch64-linux-android -RUN . $HOME/.cargo/env && cargo install --git https://github.com/tauri-apps/tauri-mobile +RUN . $HOME/.cargo/env && cargo install --git https://github.com/tauri-apps/cargo-mobile2 WORKDIR /root/cmake RUN wget https://github.com/Kitware/CMake/releases/download/v3.23.1/cmake-3.23.1.tar.gz From 8bcb537ff205e4443f0501bc92a01b075905d393 Mon Sep 17 00:00:00 2001 From: Darius Clark Date: Wed, 4 Oct 2023 14:58:53 -0400 Subject: [PATCH 2/5] refactor: Dont download attachment using constellation api (#332) --- extensions/warp-ipfs/src/store/message.rs | 173 ++++++---------------- 1 file changed, 48 insertions(+), 125 deletions(-) diff --git a/extensions/warp-ipfs/src/store/message.rs b/extensions/warp-ipfs/src/store/message.rs index 219074f70..37186388a 100644 --- a/extensions/warp-ipfs/src/store/message.rs +++ b/extensions/warp-ipfs/src/store/message.rs @@ -11,12 +11,10 @@ use futures::channel::mpsc::{unbounded, Sender, UnboundedSender}; use futures::channel::oneshot::{self, Sender as OneshotSender}; use futures::stream::{FuturesUnordered, SelectAll}; use futures::{SinkExt, Stream, StreamExt}; -use rust_ipfs::libp2p::swarm::dial_opts::{DialOpts, PeerCondition}; -use rust_ipfs::{Ipfs, PeerId, SubscriptionStream}; +use rust_ipfs::{Ipfs, IpfsPath, PeerId, SubscriptionStream}; use libipld::Cid; use serde::{Deserialize, Serialize}; -use tokio::io::AsyncWriteExt; use tokio::sync::broadcast::{self, Receiver as BroadcastReceiver, Sender as BroadcastSender}; use tokio::sync::{OwnedSemaphorePermit, Semaphore}; use tokio_stream::wrappers::ReadDirStream; @@ -47,7 +45,7 @@ use super::document::utils::{GetLocalDag, ToCid}; use super::friends::FriendsStore; use super::identity::IdentityStore; use super::keystore::Keystore; -use super::{did_to_libp2p_pub, verify_serde_sig, ConversationEvents, DidExt, MessagingEvents}; +use super::{did_to_libp2p_pub, verify_serde_sig, ConversationEvents, MessagingEvents}; const PERMIT_AMOUNT: usize = 1; @@ -926,32 +924,6 @@ impl MessageStore { spam_check(&mut message, self.spam_filter.clone())?; let conversation_id = message.conversation_id(); - if message.message_type() == MessageType::Attachment - && direction == MessageDirection::In - { - if let Some(fs) = self.filesystem.clone() { - let dir = fs.root_directory(); - for file in message.attachments() { - let original = file.name(); - let mut inc = 0; - loop { - if dir.has_item(&original) { - if inc >= 20 { - break; - } - inc += 1; - file.set_name(&format!("{original}-{inc}")); - continue; - } - break; - } - if let Err(e) = dir.add_file(file) { - error!("Error adding file to constellation: {e}"); - } - } - } - } - let message_id = message.id(); let message_document = @@ -3506,111 +3478,62 @@ impl MessageStore { .cloned() .ok_or(Error::FileNotFound)?; - let root = constellation.root_directory(); - if !root.has_item(&attachment.name()) { - root.add_file(attachment.clone())?; - } + let _root = constellation.root_directory(); + + let reference = attachment + .reference() + .and_then(|reference| IpfsPath::from_str(&reference).ok()) + .ok_or(Error::FileNotFound)?; let ipfs = self.ipfs.clone(); - let constellation = constellation.clone(); - let own_did = self.did.clone(); + let _constellation = constellation.clone(); let progress_stream = async_stream::stream! { - yield Progression::CurrentProgress { - name: attachment.name(), - current: 0, - total: Some(attachment.size()), - }; - - let did = message.sender(); - if !did.eq(&own_did) { - if let Ok(peer_id) = did.to_peer_id() { - //This is done to insure we can successfully exchange blocks - let opt = DialOpts::peer_id(peer_id).condition(PeerCondition::NotDialing).build(); - if let Err(e) = ipfs.connect(opt).await { - warn!("Issue performing a connection to peer: {e}"); - } - } + let stream = match ipfs.get_unixfs(reference, &path).await { + Ok(stream) => stream, + Err(e) => { + yield Progression::ProgressFailed { + name: attachment.name(), + last_size: None, + error: Some(e.to_string()), + }; + return; } + }; - let mut file = match tokio::fs::File::create(&path).await { - Ok(file) => file, - Err(e) => { - error!("Error creating file: {e}"); - yield Progression::ProgressFailed { - name: attachment.name(), - last_size: None, - error: Some(e.to_string()), - }; - return; - } - }; + yield Progression::CurrentProgress { + name: attachment.name(), + current: 0, + total: Some(attachment.size()), + }; - let stream = match constellation.get_stream(&attachment.name()).await { - Ok(s) => s, - Err(e) => { - error!("Error creating stream: {e}"); - yield Progression::ProgressFailed { - name: attachment.name(), - last_size: None, - error: Some(e.to_string()), + for await event in stream { + match event { + rust_ipfs::unixfs::UnixfsStatus::ProgressStatus { written, total_size } => { + yield Progression::CurrentProgress { + name: attachment.name(), + current: written, + total: total_size }; - return; - } - }; - - let mut written = 0; - let mut failed = false; - for await res in stream { - match res { - Ok(bytes) => match file.write_all(&bytes).await { - Ok(_) => { - written += bytes.len(); - yield Progression::CurrentProgress { - name: attachment.name(), - current: written, - total: Some(attachment.size()), - }; - } - Err(e) => { - error!("Error writing to disk: {e}"); - yield Progression::ProgressFailed { - name: attachment.name(), - last_size: Some(written), - error: Some(e.to_string()), - }; - failed = true; - break; - } - }, - Err(e) => { - error!("Error reading from stream: {e}"); - yield Progression::ProgressFailed { - name: attachment.name(), - last_size: Some(written), - error: Some(e.to_string()), - }; - failed = true; - break; + }, + rust_ipfs::unixfs::UnixfsStatus::CompletedStatus { total_size, .. } => { + yield Progression::ProgressComplete { + name: attachment.name(), + total: total_size, + }; + }, + rust_ipfs::unixfs::UnixfsStatus::FailedStatus { written, error, .. } => { + if let Err(e) = tokio::fs::remove_file(&path).await { + error!("Error removing file: {e}"); } - } - } - - if failed { - if let Err(e) = tokio::fs::remove_file(&path).await { - error!("Error removing file: {e}"); - } - } - - if !failed { - if let Err(e) = file.flush().await { - error!("Error flushing stream: {e}"); - } - yield Progression::ProgressComplete { - name: attachment.name(), - total: Some(written), - }; + yield Progression::ProgressFailed { + name: attachment.name(), + last_size: Some(written), + error: error.map(|e| e.to_string()), + }; + }, } + } }; Ok(ConstellationProgressStream(progress_stream.boxed())) From 92e52b26fd0a168c8bfee03326fab065b75c81db Mon Sep 17 00:00:00 2001 From: Darius Clark Date: Wed, 4 Oct 2023 15:24:12 -0400 Subject: [PATCH 3/5] chore: Update dependency (#333) --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index a5a8e7035..87ab302c3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -81,7 +81,7 @@ either = "1" void = "1" #ipfs dependency -rust-ipfs = "0.4.4" +rust-ipfs = "0.4.6" # Blink related crates # av-data is needed to use libaom. need to ensure that Warp and libaom use the same version of av-data From 2ae7f3884dbbfc04edbac8c0f3727d2982150bba Mon Sep 17 00:00:00 2001 From: sdwoodbury Date: Wed, 4 Oct 2023 17:25:35 -0400 Subject: [PATCH 4/5] feat(raygun): use MessageOptions.limit when fetching messages (#330) Co-authored-by: Darius Clark Co-authored-by: Darius --- .../warp-ipfs/src/store/conversation.rs | 19 ++++++++++++------- warp/src/raygun/mod.rs | 6 +++--- 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/extensions/warp-ipfs/src/store/conversation.rs b/extensions/warp-ipfs/src/store/conversation.rs index caa0ff4cc..e59292bc1 100644 --- a/extensions/warp-ipfs/src/store/conversation.rs +++ b/extensions/warp-ipfs/src/store/conversation.rs @@ -376,8 +376,11 @@ impl ConversationDocument { let ipfs = ipfs.clone(); let stream = async_stream::stream! { - + let mut remaining = option.limit(); for (index, document) in messages.iter().enumerate() { + if remaining.as_ref().map(|x| *x == 0).unwrap_or_default() { + break; + } if let Some(range) = option.range() { if range.start > index || range.end < index { continue @@ -388,20 +391,22 @@ impl ConversationDocument { continue } } - if let Ok(message) = document.resolve(&ipfs, &did, keystore.as_ref()).await { if option.pinned() && !message.pinned() { continue; } - if let Some(keyword) = option.keyword() { - if message + let should_yield = if let Some(keyword) = option.keyword() { + message .value() .iter() .any(|line| line.to_lowercase().contains(&keyword.to_lowercase())) - { - yield message; - } } else { + true + }; + if should_yield { + if let Some(remaining) = remaining.as_mut() { + *remaining = remaining.saturating_sub(1); + } yield message; } } diff --git a/warp/src/raygun/mod.rs b/warp/src/raygun/mod.rs index 5c638fdcc..f872ab01e 100644 --- a/warp/src/raygun/mod.rs +++ b/warp/src/raygun/mod.rs @@ -189,7 +189,7 @@ pub struct MessageOptions { keyword: Option, pinned: bool, range: Option>, - limit: Option, + limit: Option, skip: Option, } @@ -204,7 +204,7 @@ impl MessageOptions { self } - pub fn set_limit(mut self, limit: i64) -> MessageOptions { + pub fn set_limit(mut self, limit: u8) -> MessageOptions { self.limit = Some(limit); self } @@ -253,7 +253,7 @@ impl MessageOptions { } impl MessageOptions { - pub fn limit(&self) -> Option { + pub fn limit(&self) -> Option { self.limit } From 15c125a92712438cd8c919b87c4073c28442f16a Mon Sep 17 00:00:00 2001 From: Darius Clark Date: Fri, 6 Oct 2023 02:10:30 -0400 Subject: [PATCH 5/5] chore: Update dependency (#334) --- Cargo.toml | 2 +- .../warp-ipfs/src/store/document/identity.rs | 2 +- extensions/warp-ipfs/src/store/files.rs | 2 +- extensions/warp-ipfs/src/store/identity.rs | 6 ++++-- extensions/warp-ipfs/src/store/message.rs | 15 +++++++-------- 5 files changed, 14 insertions(+), 13 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 87ab302c3..9d8995a6a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -81,7 +81,7 @@ either = "1" void = "1" #ipfs dependency -rust-ipfs = "0.4.6" +rust-ipfs = "0.5.0" # Blink related crates # av-data is needed to use libaom. need to ensure that Warp and libaom use the same version of av-data diff --git a/extensions/warp-ipfs/src/store/document/identity.rs b/extensions/warp-ipfs/src/store/document/identity.rs index 2ccd7c7d4..e4a69c035 100644 --- a/extensions/warp-ipfs/src/store/document/identity.rs +++ b/extensions/warp-ipfs/src/store/document/identity.rs @@ -145,7 +145,7 @@ pub async fn unixfs_fetch( let fut = async { let stream = ipfs .unixfs() - .cat(IpfsPath::from(cid), None, &[], local) + .cat(IpfsPath::from(cid), None, &[], local, None) .await .map_err(anyhow::Error::from)?; diff --git a/extensions/warp-ipfs/src/store/files.rs b/extensions/warp-ipfs/src/store/files.rs index 62290e788..26800f6d2 100644 --- a/extensions/warp-ipfs/src/store/files.rs +++ b/extensions/warp-ipfs/src/store/files.rs @@ -127,7 +127,7 @@ impl FileStore { let mut index_stream = self .ipfs .unixfs() - .cat(IpfsPath::from(cid), None, &[], true) + .cat(IpfsPath::from(cid), None, &[], true, None) .await .map_err(anyhow::Error::from)? .boxed(); diff --git a/extensions/warp-ipfs/src/store/identity.rs b/extensions/warp-ipfs/src/store/identity.rs index 6680f4df5..ea64e282a 100644 --- a/extensions/warp-ipfs/src/store/identity.rs +++ b/extensions/warp-ipfs/src/store/identity.rs @@ -1107,6 +1107,7 @@ impl IdentityStore { None, &[], false, + None, ) .await? .boxed(); @@ -1163,6 +1164,7 @@ impl IdentityStore { None, &[], false, + None, ) .await? .boxed(); @@ -1263,7 +1265,7 @@ impl IdentityStore { async move { let mut stream = ipfs .unixfs() - .cat(picture, None, &[], false) + .cat(picture, None, &[], false, None) .await? .boxed(); @@ -1291,7 +1293,7 @@ impl IdentityStore { async move { let mut stream = ipfs .unixfs() - .cat(banner, None, &[], false) + .cat(banner, None, &[], false, None) .await? .boxed(); while let Some(_d) = stream.next().await { diff --git a/extensions/warp-ipfs/src/store/message.rs b/extensions/warp-ipfs/src/store/message.rs index 37186388a..a6745129f 100644 --- a/extensions/warp-ipfs/src/store/message.rs +++ b/extensions/warp-ipfs/src/store/message.rs @@ -3487,8 +3487,13 @@ impl MessageStore { let ipfs = self.ipfs.clone(); let _constellation = constellation.clone(); - let progress_stream = async_stream::stream! { + yield Progression::CurrentProgress { + name: attachment.name(), + current: 0, + total: Some(attachment.size()), + }; + let stream = match ipfs.get_unixfs(reference, &path).await { Ok(stream) => stream, Err(e) => { @@ -3498,13 +3503,7 @@ impl MessageStore { error: Some(e.to_string()), }; return; - } - }; - - yield Progression::CurrentProgress { - name: attachment.name(), - current: 0, - total: Some(attachment.size()), + }, }; for await event in stream {