Skip to content

Commit

Permalink
fix: Include module
Browse files Browse the repository at this point in the history
  • Loading branch information
dariusc93 committed Oct 14, 2023
1 parent 0e0edab commit 99f937d
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 12 deletions.
8 changes: 4 additions & 4 deletions extensions/warp-ipfs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -707,7 +707,7 @@ impl MultiPass for WarpIpfs {
.await
.map_err(anyhow::Error::from)??;

let cid = store::document::image::store_photo(
let cid = store::document::image_dag::store_photo(
&self.ipfs()?,
futures::stream::iter(Ok::<_, std::io::Error>(Ok(data))).boxed(),
format.into(),
Expand Down Expand Up @@ -782,7 +782,7 @@ impl MultiPass for WarpIpfs {
}
};

let cid = store::document::image::store_photo(
let cid = store::document::image_dag::store_photo(
&self.ipfs()?,
stream.boxed(),
extension.into(),
Expand Down Expand Up @@ -846,7 +846,7 @@ impl MultiPass for WarpIpfs {
.await
.map_err(anyhow::Error::from)??;

let cid = store::document::image::store_photo(
let cid = store::document::image_dag::store_photo(
&self.ipfs()?,
futures::stream::iter(Ok::<_, std::io::Error>(Ok(data))).boxed(),
format.into(),
Expand Down Expand Up @@ -921,7 +921,7 @@ impl MultiPass for WarpIpfs {
}
};

let cid = store::document::image::store_photo(
let cid = store::document::image_dag::store_photo(
&self.ipfs()?,
stream.boxed(),
extension.into(),
Expand Down
2 changes: 1 addition & 1 deletion extensions/warp-ipfs/src/store/document.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
pub mod cache;
pub mod identity;
pub mod image_dag;
pub mod root;
pub mod utils;
pub mod image;

use chrono::{DateTime, Utc};
use futures::TryFutureExt;
Expand Down
121 changes: 121 additions & 0 deletions extensions/warp-ipfs/src/store/document/image_dag.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
use futures::{stream::BoxStream, StreamExt};
use libipld::{serde::to_ipld, Cid};
use rust_ipfs::Ipfs;
use serde::{Deserialize, Serialize};
use std::task::Poll;
use tracing::log;
use warp::{constellation::file::FileType, error::Error, multipass::identity::IdentityImage};

use super::{
identity::unixfs_fetch,
utils::{GetDag, GetLocalDag},
};

#[derive(Deserialize, Serialize, Debug, Clone)]
pub struct ImageDag {
pub link: Cid,
pub size: u64,
pub mime: FileType,
}

#[tracing::instrument(skip(ipfs, stream))]
pub async fn store_photo(
ipfs: &Ipfs,
stream: BoxStream<'static, std::io::Result<Vec<u8>>>,
file_type: FileType,
limit: Option<usize>,
) -> Result<Cid, Error> {
let mut stream = ipfs.add_unixfs(stream).await?;

let mut size = 0;

let cid = futures::future::poll_fn(|cx| loop {
match stream.poll_next_unpin(cx) {
Poll::Ready(Some(rust_ipfs::unixfs::UnixfsStatus::ProgressStatus {
written, ..
})) => {
if let Some(limit) = limit {
if written > limit {
return Poll::Ready(Err(Error::InvalidLength {
context: "photo".into(),
current: written,
minimum: Some(1),
maximum: Some(limit),
}));
}
}
log::trace!("{written} bytes written");
}
Poll::Ready(Some(rust_ipfs::unixfs::UnixfsStatus::CompletedStatus {
path,
written,
..
})) => {
size = written;
log::debug!("Image is written with {written} bytes - stored at {path}");
return Poll::Ready(path.root().cid().copied().ok_or(Error::Other));
}
Poll::Ready(Some(rust_ipfs::unixfs::UnixfsStatus::FailedStatus {
written,
error,
..
})) => {
let err = match error {
Some(e) => {
log::error!(
"Error uploading picture with {written} bytes written with error: {e}"
);
e.into()
}
None => {
log::error!("Error uploading picture with {written} bytes written");
Error::OtherWithContext("Error uploading photo".into())
}
};

return Poll::Ready(Err(err));
}
Poll::Ready(None) => return Poll::Ready(Err(Error::ReceiverChannelUnavailable)),
Poll::Pending => return Poll::Pending,
}
})
.await?;

let dag = ImageDag {
link: cid,
size: size as _,
mime: file_type,
};

let cid = ipfs
.put_dag(to_ipld(dag).map_err(anyhow::Error::from)?)
.await?;

if !ipfs.is_pinned(&cid).await? {
ipfs.insert_pin(&cid, true).await?;
}

Ok(cid)
}

#[tracing::instrument(skip(ipfs))]
pub async fn get_image(
ipfs: &Ipfs,
cid: Cid,
local: bool,
limit: Option<usize>,
) -> Result<IdentityImage, Error> {
let dag: ImageDag = match local {
true => cid.get_local_dag(&ipfs).await?,
false => cid.get_dag(&ipfs, None).await?,
};

let image = unixfs_fetch(ipfs, dag.link, None, local, limit).await?;

let mut id_img = IdentityImage::default();

id_img.set_data(image);
id_img.set_image_type(dag.mime);

Ok(id_img)
}
16 changes: 9 additions & 7 deletions extensions/warp-ipfs/src/store/identity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ use warp::{
use super::{
connected_to_peer, did_keypair,
document::{
cache::IdentityCache, identity::IdentityDocument, image::get_image, root::RootDocumentMap,
utils::GetLocalDag, ExtractedRootDocument, RootDocument, ToCid,
cache::IdentityCache, identity::IdentityDocument, image_dag::get_image,
root::RootDocumentMap, utils::GetLocalDag, ExtractedRootDocument, RootDocument, ToCid,
},
ecdh_decrypt, ecdh_encrypt, libp2p_pub_to_did,
phonebook::PhoneBook,
Expand Down Expand Up @@ -793,7 +793,8 @@ impl IdentityStore {
}

let image =
super::document::image::get_image(&self.ipfs, cid, true, Some(2 * 1024 * 1024)).await?;
super::document::image_dag::get_image(&self.ipfs, cid, true, Some(2 * 1024 * 1024))
.await?;

let event = IdentityEvent::Receive {
option: ResponseOption::Image {
Expand Down Expand Up @@ -844,7 +845,8 @@ impl IdentityStore {
}

let image =
super::document::image::get_image(&self.ipfs, cid, true, Some(2 * 1024 * 1024)).await?;
super::document::image_dag::get_image(&self.ipfs, cid, true, Some(2 * 1024 * 1024))
.await?;

let event = IdentityEvent::Receive {
option: ResponseOption::Image {
Expand Down Expand Up @@ -1125,7 +1127,7 @@ impl IdentityStore {
let did = in_did.clone();
let store = self.clone();
async move {
let _ = super::document::image::get_image(
let _ = super::document::image_dag::get_image(
&ipfs,
picture,
false,
Expand All @@ -1148,7 +1150,7 @@ impl IdentityStore {

let did = in_did.clone();
async move {
let _ = super::document::image::get_image(
let _ = super::document::image_dag::get_image(
&ipfs,
banner,
false,
Expand Down Expand Up @@ -1181,7 +1183,7 @@ impl IdentityStore {
let cid = cid;
let store = self.clone();
async move {
let added_cid = super::document::image::store_photo(
let added_cid = super::document::image_dag::store_photo(
&store.ipfs,
futures::stream::iter(Ok::<_, std::io::Error>(Ok(data))).boxed(),
ty,
Expand Down

0 comments on commit 99f937d

Please sign in to comment.