Skip to content

Commit

Permalink
feat(monofs): add seeking support to FileInputStream (#108)
Browse files Browse the repository at this point in the history
- Move SeekableReader trait from monoutils-store to monoutils
- Add AsyncSeek implementation for FileInputStream
- Move StoreSwitchable trait to store.rs and remove storeswitch.rs
- Update IpldStoreSeekable to use Send + Sync bounds
- Add EmptySeekableReader implementation
- Add seek tests for FileInputStream
  • Loading branch information
appcypher authored Jan 14, 2025
1 parent 35b5928 commit 51114d0
Show file tree
Hide file tree
Showing 16 changed files with 152 additions and 59 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions monofs/lib/filesystem/dir/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,11 +204,11 @@ where
/// dir.create_entity("parent", Entity::Dir(Dir::new(store.clone()))).await?;
///
/// // Now create a file in the parent directory
/// let file = dir.create_entity("parent/file.txt", Entity::File(File::new(store))).await?;
/// let file = dir.create_entity("parent/file.txt", Entity::File(File::new(store.clone()))).await?;
/// assert!(matches!(file, Entity::File(_)));
///
/// // This would fail because intermediate directory doesn't exist
/// assert!(dir.create_entity("nonexistent/file.txt", Entity::File(File::new(store))).await.is_err());
/// assert!(dir.create_entity("nonexistent/file.txt", Entity::File(File::new(store.clone()))).await.is_err());
/// # Ok(())
/// # }
/// ```
Expand Down
65 changes: 56 additions & 9 deletions monofs/lib/filesystem/file/io.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use std::{
io,
io::{self, SeekFrom},
pin::Pin,
task::{Context, Poll},
};

use futures::Future;
use monoutils_store::IpldStore;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use monoutils::{EmptySeekableReader, SeekableReader};
use monoutils_store::{IpldStore, IpldStoreSeekable};
use tokio::io::{AsyncRead, AsyncSeek, AsyncWrite, ReadBuf};

use crate::filesystem::{File, FsResult};

Expand All @@ -16,7 +17,7 @@ use crate::filesystem::{File, FsResult};

/// A stream for reading from a `File` asynchronously.
pub struct FileInputStream<'a> {
reader: Pin<Box<dyn AsyncRead + Send + Sync + 'a>>,
reader: Pin<Box<dyn SeekableReader + Send + Sync + 'a>>,
}

/// A stream for writing to a `File` asynchronously.
Expand All @@ -36,15 +37,17 @@ impl<'a> FileInputStream<'a> {
/// Creates a new `FileInputStream` from a `File`.
pub async fn new<S>(file: &'a File<S>) -> io::Result<Self>
where
S: IpldStore + Send + Sync + 'static,
S: IpldStoreSeekable + Send + Sync + 'a,
{
let store = file.get_store();
let reader = match file.get_content() {
Some(cid) => store
.get_bytes(cid)
.get_seekable_bytes(cid)
.await
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?,
None => Box::pin(tokio::io::empty()) as Pin<Box<dyn AsyncRead + Send + Sync>>,
None => {
Box::pin(EmptySeekableReader) as Pin<Box<dyn SeekableReader + Send + Sync + 'a>>
}
};

Ok(Self { reader })
Expand Down Expand Up @@ -85,7 +88,17 @@ impl AsyncRead for FileInputStream<'_> {
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
self.reader.as_mut().poll_read(cx, buf)
Pin::new(&mut self.reader).poll_read(cx, buf)
}
}

impl AsyncSeek for FileInputStream<'_> {
fn start_seek(mut self: Pin<&mut Self>, position: SeekFrom) -> io::Result<()> {
Pin::new(&mut self.reader).start_seek(position)
}

fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
Pin::new(&mut self.reader).poll_complete(cx)
}
}

Expand Down Expand Up @@ -127,7 +140,7 @@ where
mod tests {
use anyhow::Result;
use monoutils_store::MemoryStore;
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufReader};

use crate::filesystem::File;

Expand Down Expand Up @@ -177,4 +190,38 @@ mod tests {

Ok(())
}

#[tokio::test]
async fn test_file_input_stream_seek() -> Result<()> {
let store = MemoryStore::default();
let mut file = File::new(store.clone());

// Create some content for the file
let content = b"Hello, world!";
let cid = store.put_bytes(content.as_slice()).await?;
file.set_content(Some(cid));

// Create an input stream from the file
let mut input_stream = FileInputStream::new(&file).await?;

// Test seeking from start
input_stream.seek(SeekFrom::Start(7)).await?;
let mut buffer = [0u8; 6];
input_stream.read_exact(&mut buffer).await?;
assert_eq!(&buffer, b"world!");

// Test seeking from current position
input_stream.seek(SeekFrom::Current(-6)).await?;
let mut buffer = [0u8; 5];
input_stream.read_exact(&mut buffer).await?;
assert_eq!(&buffer, b"world");

// Test seeking from end
input_stream.seek(SeekFrom::End(-13)).await?;
let mut buffer = [0u8; 5];
input_stream.read_exact(&mut buffer).await?;
assert_eq!(&buffer, b"Hello");

Ok(())
}
}
2 changes: 0 additions & 2 deletions monofs/lib/filesystem/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ mod error;
mod file;
mod kind;
mod metadata;
mod storeswitch;
mod symcidlink;
mod sympathlink;

Expand All @@ -24,6 +23,5 @@ pub use error::*;
pub use file::*;
pub use kind::*;
pub use metadata::*;
pub use storeswitch::*;
pub use symcidlink::*;
pub use sympathlink::*;
14 changes: 0 additions & 14 deletions monofs/lib/filesystem/storeswitch.rs

This file was deleted.

21 changes: 17 additions & 4 deletions monofs/lib/store/flatfsstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ use libipld::{
multihash::{Code, MultihashDigest},
Cid,
};
use monoutils::SeekableReader;
use monoutils_store::{
Chunker, Codec, FixedSizeChunker, FlatLayout, IpldReferences, IpldStore, Layout, StoreError,
StoreResult,
Chunker, Codec, FixedSizeChunker, FlatLayout, IpldReferences, IpldStore, IpldStoreSeekable,
Layout, LayoutSeekable, StoreError, StoreResult,
};
use serde::{de::DeserializeOwned, Serialize};
use tokio::fs::{create_dir_all, File};
Expand Down Expand Up @@ -373,6 +374,19 @@ where
}
}

impl<C, L> IpldStoreSeekable for FlatFsStore<C, L>
where
C: Chunker + Clone + Send + Sync,
L: LayoutSeekable + Clone + Send + Sync,
{
async fn get_seekable_bytes<'a>(
&'a self,
cid: &'a Cid,
) -> StoreResult<Pin<Box<dyn SeekableReader + Send + Sync + 'a>>> {
self.layout.retrieve_seekable(cid, self.clone()).await
}
}

//--------------------------------------------------------------------------------------------------
// Tests
//--------------------------------------------------------------------------------------------------
Expand All @@ -383,8 +397,8 @@ mod tests {
use std::fs;
use tokio::io::AsyncReadExt;

use super::*;
use super::fixtures::{self, TestNode};
use super::*;

#[tokio::test]
async fn test_flatfsstore_raw_block() -> anyhow::Result<()> {
Expand Down Expand Up @@ -550,7 +564,6 @@ mod tests {
}
}


#[cfg(test)]
mod fixtures {
use serde::Deserialize;
Expand Down
1 change: 1 addition & 0 deletions monoutils-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@ serde_ipld_dagcbor.workspace = true
thiserror.workspace = true
tokio = { workspace = true, features = ["sync"] }
tokio-util = { workspace = true, features = ["io"] }
monoutils.workspace = true
8 changes: 3 additions & 5 deletions monoutils-store/lib/implementations/layouts/flat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,10 @@ use async_stream::try_stream;
use bytes::Bytes;
use futures::{ready, stream::BoxStream, Future, StreamExt};
use libipld::Cid;
use monoutils::SeekableReader;
use tokio::io::{AsyncRead, AsyncSeek, ReadBuf};

use crate::{
IpldStore, Layout, LayoutError, LayoutSeekable, MerkleNode, SeekableReader, StoreError,
StoreResult,
};
use crate::{IpldStore, Layout, LayoutError, LayoutSeekable, MerkleNode, StoreError, StoreResult};

//--------------------------------------------------------------------------------------------------
// Types
Expand Down Expand Up @@ -297,7 +295,7 @@ impl LayoutSeekable for FlatLayout {
&self,
cid: &'a Cid,
store: impl IpldStore + Send + Sync + 'a,
) -> StoreResult<Pin<Box<dyn SeekableReader + Send + 'a>>> {
) -> StoreResult<Pin<Box<dyn SeekableReader + Send + Sync + 'a>>> {
let node = store.get_node(cid).await?;
let reader = FlatLayoutReader::new(node, store)?;
Ok(Box::pin(reader))
Expand Down
5 changes: 3 additions & 2 deletions monoutils-store/lib/implementations/stores/memstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ use std::{
use bytes::Bytes;
use futures::StreamExt;
use libipld::Cid;
use monoutils::SeekableReader;
use serde::{de::DeserializeOwned, Serialize};
use tokio::{io::AsyncRead, sync::RwLock};

use crate::{
utils, Chunker, Codec, FixedSizeChunker, FlatLayout, IpldReferences, IpldStore,
IpldStoreSeekable, Layout, LayoutSeekable, SeekableReader, StoreError, StoreResult,
IpldStoreSeekable, Layout, LayoutSeekable, StoreError, StoreResult,
};

//--------------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -224,7 +225,7 @@ where
async fn get_seekable_bytes<'a>(
&'a self,
cid: &'a Cid,
) -> StoreResult<Pin<Box<dyn SeekableReader + Send + 'a>>> {
) -> StoreResult<Pin<Box<dyn SeekableReader + Send + Sync + 'a>>> {
self.layout.retrieve_seekable(cid, self.clone()).await
}
}
Expand Down
5 changes: 3 additions & 2 deletions monoutils-store/lib/layout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ use std::pin::Pin;
use bytes::Bytes;
use futures::{stream::BoxStream, Future};
use libipld::Cid;
use monoutils::SeekableReader;
use tokio::io::AsyncRead;

use super::{IpldStore, SeekableReader, StoreResult};
use super::{IpldStore, StoreResult};

//--------------------------------------------------------------------------------------------------
// Traits
Expand Down Expand Up @@ -41,5 +42,5 @@ pub trait LayoutSeekable: Layout {
&self,
cid: &'a Cid,
store: impl IpldStore + Send + Sync + 'a,
) -> impl Future<Output = StoreResult<Pin<Box<dyn SeekableReader + Send + 'a>>>> + Send;
) -> impl Future<Output = StoreResult<Pin<Box<dyn SeekableReader + Send + Sync + 'a>>>> + Send;
}
2 changes: 0 additions & 2 deletions monoutils-store/lib/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ mod implementations;
mod layout;
mod merkle;
mod references;
mod seekable;
mod storable;
mod store;
pub(crate) mod utils;
Expand All @@ -24,7 +23,6 @@ pub use implementations::*;
pub use layout::*;
pub use merkle::*;
pub use references::*;
pub use seekable::*;
pub use storable::*;
pub use store::*;

Expand Down
14 changes: 0 additions & 14 deletions monoutils-store/lib/seekable.rs

This file was deleted.

16 changes: 13 additions & 3 deletions monoutils-store/lib/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ use std::{collections::HashSet, future::Future, pin::Pin};

use bytes::Bytes;
use libipld::Cid;
use monoutils::SeekableReader;
use serde::{de::DeserializeOwned, Serialize};
use tokio::io::{AsyncRead, AsyncReadExt};

use super::{IpldReferences, SeekableReader, StoreError, StoreResult};
use super::{IpldReferences, StoreError, StoreResult};

//--------------------------------------------------------------------------------------------------
// Types
Expand All @@ -28,7 +29,7 @@ pub enum Codec {
}

//--------------------------------------------------------------------------------------------------
// Traits: IpldStore, IpldStoreSeekable, IpldStoreExt
// Traits: IpldStore, IpldStoreSeekable, IpldStoreExt, *
//--------------------------------------------------------------------------------------------------

/// `IpldStore` is a content-addressable store for [`IPLD` (InterPlanetary Linked Data)][ipld] that
Expand Down Expand Up @@ -167,7 +168,16 @@ pub trait IpldStoreSeekable: IpldStore {
fn get_seekable_bytes<'a>(
&'a self,
cid: &'a Cid,
) -> impl Future<Output = StoreResult<Pin<Box<dyn SeekableReader + Send + 'a>>>>;
) -> impl Future<Output = StoreResult<Pin<Box<dyn SeekableReader + Send + Sync + 'a>>>>;
}

/// A trait for types that can be changed to a different store.
pub trait StoreSwitchable {
/// The type of the entity.
type WithStore<U: IpldStore>;

/// Change the store used to persist the entity.
fn change_store<U: IpldStore>(self, new_store: U) -> Self::WithStore<U>;
}

//--------------------------------------------------------------------------------------------------
Expand Down
1 change: 1 addition & 0 deletions monoutils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ anyhow.workspace = true
thiserror.workspace = true
typed-path.workspace = true
pretty-error-debug.workspace = true
tokio.workspace = true
2 changes: 2 additions & 0 deletions monoutils/lib/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@

mod error;
mod path;
mod seekable;

//--------------------------------------------------------------------------------------------------
// Exports
//--------------------------------------------------------------------------------------------------

pub use error::*;
pub use path::*;
pub use seekable::*;
Loading

0 comments on commit 51114d0

Please sign in to comment.