Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(mater): add mater::ReadOnlyBlockstore #774

Merged
merged 6 commits into from
Feb 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions mater/lib/src/cid.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ impl<const S: usize> CidExt for CidGeneric<S> {
}

/// Async implementation of
/// https://github.com/multiformats/rust-cid/blob/eb03f566e9bfb19bad79b2691dbcb2541627c0b3/src/cid.rs#L143C12-L143C22
/// <https://github.com/multiformats/rust-cid/blob/eb03f566e9bfb19bad79b2691dbcb2541627c0b3/src/cid.rs#L143C12-L143C22>
async fn read_bytes_async<R>(mut r: R) -> Result<(Self, usize), Error>
where
R: AsyncRead + Unpin,
Expand Down Expand Up @@ -63,7 +63,7 @@ impl<const S: usize> CidExt for CidGeneric<S> {

impl<const S: usize> MultihashExt for Multihash<S> {
/// Async implementation of
/// https://github.com/multiformats/rust-multihash/blob/90a6c19ec71ced09469eec164a3586aafeddfbbd/src/multihash.rs#L271
/// <https://github.com/multiformats/rust-multihash/blob/90a6c19ec71ced09469eec164a3586aafeddfbbd/src/multihash.rs#L271>
async fn read_async<R>(mut r: R) -> Result<(Self, usize), Error>
where
R: AsyncRead + Unpin,
Expand Down
255 changes: 236 additions & 19 deletions mater/lib/src/file_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,39 +17,40 @@ use crate::{multicodec, v1::BlockMetadata, v2, Error};

/// Extracts the raw data from a CARv2 file.
/// It expects the CAR file to have only 1 root.
pub struct CarExtractor<R>
where
R: AsyncRead + AsyncSeek + Unpin,
{
pub struct CarExtractor<R> {
reader: v2::Reader<R>,
index: HashMap<Cid, BlockMetadata>,
}

impl<R> CarExtractor<R> {
/// Creates a new [`CarExtractor`] from the given reader.
pub async fn new(reader: R) -> Result<Self, Error>
where
R: AsyncRead + AsyncSeek + Unpin,
{
let mut self_ = Self {
reader: v2::Reader::new(reader),
index: HashMap::with_capacity(1),
};
self_.naive_build_index().await?;
Ok(self_)
}
}

impl CarExtractor<File> {
/// Creates a [`CarExtractor`] from the given file path.
pub async fn from_path<P>(path: P) -> Result<Self, Error>
where
P: AsRef<Path>,
{
let file = File::open(path).await?;
let mut loader = Self {
reader: v2::Reader::new(file),
index: HashMap::new(),
};
loader.naive_build_index().await?;
Ok(loader)
Self::new(File::open(path).await?).await
}
}

impl CarExtractor<Cursor<Vec<u8>>> {
/// Creates a [`CarExtractor`] from a vector of bytes.
pub async fn from_vec(vec: Vec<u8>) -> Result<Self, Error> {
let mut loader = Self {
reader: v2::Reader::new(Cursor::new(vec)),
index: HashMap::new(),
};
loader.naive_build_index().await?;
Ok(loader)
Self::new(Cursor::new(vec)).await
}
}

Expand Down Expand Up @@ -170,11 +171,227 @@ where
}
}

#[cfg(feature = "blockstore")]
pub(crate) mod blockstore {
use std::{any::type_name, ops::Deref, path::Path};

use blockstore::Blockstore;
use futures::TryFutureExt;
use ipld_core::cid::Cid;
use tokio::{
fs::File,
io::{AsyncRead, AsyncSeek, AsyncSeekExt},
sync::RwLock,
};

use crate::{stores::to_blockstore_cid, CarExtractor, CidExt, Error};

// Methods in here are marked as unused in the "main" `impl` because they're only used here.
impl<R> CarExtractor<R>
where
R: AsyncRead + AsyncSeek + Unpin,
{
fn has(&self, cid: &Cid) -> bool {
if cid.get_identity_data().is_some() {
return true;
}
// Since we're using the naive index, if the Cid isn't in the index, there is no Cid inside
self.index.contains_key(cid)
}

async fn get(&mut self, cid: &Cid) -> Result<Option<Vec<u8>>, Error> {
if let Some(identity_data) = cid.get_identity_data() {
return Ok(Some(identity_data.to_vec()));
}

match self.index.get(&cid) {
Some(metadata) => {
// We could seek directly to the data and not read the Cid, but this is "canonical"
self.reader
.get_inner_mut()
.seek(std::io::SeekFrom::Start(metadata.block_offset))
.await?;
let (_, block) = self.reader.read_block().await?;
Ok(Some(block))
}
None => Ok(None),
}
}
}

/// A read-only [`blockstore::Blockstore`] implementation of [`CarExtractor`].
pub struct ReadOnlyBlockstore<R> {
inner: RwLock<CarExtractor<R>>,
}

impl<R> ReadOnlyBlockstore<R>
where
R: AsyncRead + AsyncSeek + Unpin + blockstore::cond_send::CondSync,
{
/// Create a new [`CarReadOnlyBlockstore`] from the given reader.
pub async fn new(reader: R) -> Result<Self, Error> {
Ok(Self {
inner: RwLock::new(CarExtractor::new(reader).await?),
})
}
}

impl ReadOnlyBlockstore<File> {
/// Create a new [`CarReadOnlyBlockstore<tokio::io::File>`](CarReadOnlyBlockstore) from the given path.
pub async fn from_path<P>(path: P) -> Result<Self, Error>
where
P: AsRef<Path>,
{
Self::new(File::open(path).await?).await
}
}

impl<R> Deref for ReadOnlyBlockstore<R> {
type Target = RwLock<CarExtractor<R>>;

fn deref(&self) -> &Self::Target {
&self.inner
}
}

impl<R> Blockstore for ReadOnlyBlockstore<R>
where
R: AsyncRead + AsyncSeek + Unpin + Send + Sync,
{
async fn get<const S: usize>(
&self,
cid: &ipld_core::cid::CidGeneric<S>,
) -> blockstore::Result<Option<Vec<u8>>> {
let cid = to_blockstore_cid(cid)?;
self.inner
.write()
.await
.get(&cid)
.map_err(|err| blockstore::Error::FatalDatabaseError(err.to_string()))
.await
}

async fn has<const S: usize>(
&self,
cid: &ipld_core::cid::CidGeneric<S>,
) -> blockstore::Result<bool> {
let cid = to_blockstore_cid(cid)?;
Ok(self.inner.read().await.has(&cid))
}

async fn put_keyed<const S: usize>(
&self,
_: &ipld_core::cid::CidGeneric<S>,
_: &[u8],
) -> blockstore::Result<()> {
Err(blockstore::Error::FatalDatabaseError(format!(
"{} is read-only",
type_name::<Self>()
)))
}

async fn remove<const S: usize>(
&self,
_: &ipld_core::cid::CidGeneric<S>,
) -> blockstore::Result<()> {
Err(blockstore::Error::FatalDatabaseError(format!(
"{} is read-only",
type_name::<Self>()
)))
}

async fn close(self) -> blockstore::Result<()> {
Ok(())
}
}

#[cfg(test)]
mod test {
use std::{str::FromStr, sync::Arc};

use ipld_core::cid::{multihash::Multihash, Cid};
use sha2::Sha256;
use tokio::fs::File;

use super::*;
use crate::{
multicodec::generate_multihash, test_utils::assert_buffer_eq, IDENTITY_CODE, RAW_CODE,
};

type FileBlockstore = ReadOnlyBlockstore<File>;

#[tokio::test]
async fn test_identity_cid() {
let blockstore = FileBlockstore::from_path("tests/fixtures/car_v2/spaceglenda.car")
.await
.unwrap();

let payload = b"Hello World!";
let multihash = Multihash::wrap(IDENTITY_CODE, payload).unwrap();
let identity_cid = Cid::new_v1(RAW_CODE, multihash);

let has_block = blockstore.has(&identity_cid).await.unwrap();
assert!(has_block);

let content = blockstore.get(&identity_cid).await.unwrap().unwrap();
assert_buffer_eq!(&payload, &content);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 3)]
async fn test_parallel_readers() {
let blockstore = FileBlockstore::from_path("tests/fixtures/car_v2/spaceglenda.car")
.await
.unwrap();
let blockstore = Arc::new(blockstore);

// CIDs of the content blocks that the spaceglenda.car contains. We are
// only looking at the raw content so that our validation is easier later.
let cids = vec![
Cid::from_str("bafkreic6kcrue6ms42ykrisq6or24pbrubnyouvmgvk7ft73fjd4ynslxi")
.unwrap(),
Cid::from_str("bafkreicvuc5rwwjqzix7saaia55du44qqsnphdugvjxlbe446mjmupekl4")
.unwrap(),
Cid::from_str("bafkreiepxrkqexuff4vhc4vp6co73ubbp2vmskbwwazaihln6wws2z4wly")
.unwrap(),
];

// Request many blocks
let handles = (0..100)
.into_iter()
.map(|i| {
let requested = cids[i % cids.len()];
tokio::spawn({
let blockstore = Arc::clone(&blockstore);
async move {
(
requested,
blockstore.get(&requested).await.unwrap().unwrap(),
)
}
})
})
.collect::<Vec<_>>();

// Validate if the blocks received are correct
for handle in handles {
let (requested_cid, block_bytes) = handle.await.expect("Panic in task");

// Generate the CID form the bytes. That way we can check if the
// block data returned is correct.
let multihash = generate_multihash::<Sha256, _>(&block_bytes);
let generated_cid = Cid::new_v1(RAW_CODE, multihash);

assert_eq!(requested_cid, generated_cid);
}
}
}
}

#[cfg(test)]
mod test {
use std::{io::Cursor, path::Path};

use crate::CarExtractor;
use crate::{test_utils::assert_buffer_eq, CarExtractor};

#[tokio::test]
async fn read_duplicated_blocks() {
Expand All @@ -189,7 +406,7 @@ mod test {
let inner = out_check.into_inner();
let result = inner.as_slice();

assert_eq!(expected, result);
assert_buffer_eq!(expected, result);
}

async fn load_and_compare<P1, P2>(original: P1, path: P2)
Expand Down
12 changes: 9 additions & 3 deletions mater/lib/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
//! A library to handle CAR files.
//! Both version 1 and version 2 are supported.
//!
//! You can make use of the lower-level utilities such as [`CarV2Reader`] to read a CARv2 file,
//! though these utilities were designed to be used in higher-level abstractions, like the [`Blockstore`].

#![warn(unused_crate_dependencies)]
#![warn(missing_docs)]
Expand Down Expand Up @@ -33,6 +30,15 @@ pub use v2::{
MultihashIndexSorted, Reader as CarV2Reader, SingleWidthIndex, Writer as CarV2Writer,
};

/// [`blockstore`] abstractions over CAR files.
#[cfg(feature = "blockstore")]
pub mod blockstore {
// Re-export the API so users don't need to add an extra crate in Cargo.toml
pub use blockstore::{Blockstore, Error};

pub use crate::file_reader::blockstore::ReadOnlyBlockstore;
}

/// CAR handling errors.
#[derive(Debug, thiserror::Error)]
pub enum Error {
Expand Down
18 changes: 3 additions & 15 deletions mater/lib/src/stores/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,10 +267,10 @@ impl FileBlockstore {

#[cfg(feature = "blockstore")]
mod blockstore {
use blockstore::{block::CidError, Blockstore, Error};
use ipld_core::cid::{Cid, CidGeneric};
use blockstore::{Blockstore, Error};
use ipld_core::cid::CidGeneric;

use crate::FileBlockstore;
use crate::{stores::to_blockstore_cid, FileBlockstore};

impl Blockstore for FileBlockstore {
async fn get<const S: usize>(&self, cid: &CidGeneric<S>) -> Result<Option<Vec<u8>>, Error> {
Expand Down Expand Up @@ -313,18 +313,6 @@ mod blockstore {
.map_err(|err| Error::FatalDatabaseError(err.to_string()))
}
}

/// Convert CID with the generic Multihash size to the CID with the specific
/// Multihash size that the underlying blockstore expects.
fn to_blockstore_cid<const S: usize>(cid: &CidGeneric<S>) -> Result<Cid, Error> {
let digest_size = cid.hash().size() as usize;
let hash = cid
.hash()
.resize::<64>()
.map_err(|_| Error::CidError(CidError::InvalidMultihashLength(digest_size)))?;

Ok(Cid::new(cid.version(), cid.codec(), hash).expect("we know cid is correct here"))
}
}

#[cfg(test)]
Expand Down
21 changes: 21 additions & 0 deletions mater/lib/src/stores/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,24 @@ impl Default for Config {
}
}
}

#[cfg(feature = "blockstore")]
mod _blockstore {
use blockstore::{block::CidError, Error};
use ipld_core::cid::{Cid, CidGeneric};

/// Convert CID with the generic Multihash size to the CID with the specific
/// Multihash size that the underlying blockstore expects.
pub fn to_blockstore_cid<const S: usize>(cid: &CidGeneric<S>) -> Result<Cid, Error> {
let digest_size = cid.hash().size() as usize;
let hash = cid
.hash()
.resize::<64>()
.map_err(|_| Error::CidError(CidError::InvalidMultihashLength(digest_size)))?;

Ok(Cid::new(cid.version(), cid.codec(), hash).expect("we know cid is correct here"))
}
}

#[cfg(feature = "blockstore")]
pub(crate) use _blockstore::to_blockstore_cid;
Loading
Loading