Skip to content

Commit

Permalink
feat(mater): add ReadOnlyBlockstore
Browse files Browse the repository at this point in the history
  • Loading branch information
jmg-duarte committed Feb 24, 2025
1 parent 179379c commit 3473dfb
Show file tree
Hide file tree
Showing 8 changed files with 280 additions and 43 deletions.
4 changes: 2 additions & 2 deletions mater/lib/src/cid.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ impl<const S: usize> CidExt for CidGeneric<S> {
}

/// Async implementation of
/// https://github.com/multiformats/rust-cid/blob/eb03f566e9bfb19bad79b2691dbcb2541627c0b3/src/cid.rs#L143C12-L143C22
/// <https://github.com/multiformats/rust-cid/blob/eb03f566e9bfb19bad79b2691dbcb2541627c0b3/src/cid.rs#L143C12-L143C22>
async fn read_bytes_async<R>(mut r: R) -> Result<(Self, usize), Error>
where
R: AsyncRead + Unpin,
Expand Down Expand Up @@ -63,7 +63,7 @@ impl<const S: usize> CidExt for CidGeneric<S> {

impl<const S: usize> MultihashExt for Multihash<S> {
/// Async implementation of
/// https://github.com/multiformats/rust-multihash/blob/90a6c19ec71ced09469eec164a3586aafeddfbbd/src/multihash.rs#L271
/// <https://github.com/multiformats/rust-multihash/blob/90a6c19ec71ced09469eec164a3586aafeddfbbd/src/multihash.rs#L271>
async fn read_async<R>(mut r: R) -> Result<(Self, usize), Error>
where
R: AsyncRead + Unpin,
Expand Down
255 changes: 236 additions & 19 deletions mater/lib/src/file_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,39 +17,40 @@ use crate::{multicodec, v1::BlockMetadata, v2, Error};

/// Extracts the raw data from a CARv2 file.
/// It expects the CAR file to have only 1 root.
pub struct CarExtractor<R>
where
R: AsyncRead + AsyncSeek + Unpin,
{
pub struct CarExtractor<R> {
reader: v2::Reader<R>,
index: HashMap<Cid, BlockMetadata>,
}

impl<R> CarExtractor<R> {
/// Creates a new [`CarExtractor`] from the given reader.
pub async fn new(reader: R) -> Result<Self, Error>
where
R: AsyncRead + AsyncSeek + Unpin,
{
let mut self_ = Self {
reader: v2::Reader::new(reader),
index: HashMap::with_capacity(1),
};
self_.naive_build_index().await?;
Ok(self_)
}
}

impl CarExtractor<File> {
/// Creates a [`CarExtractor`] from the given file path.
pub async fn from_path<P>(path: P) -> Result<Self, Error>
where
P: AsRef<Path>,
{
let file = File::open(path).await?;
let mut loader = Self {
reader: v2::Reader::new(file),
index: HashMap::new(),
};
loader.naive_build_index().await?;
Ok(loader)
Self::new(File::open(path).await?).await
}
}

impl CarExtractor<Cursor<Vec<u8>>> {
/// Creates a [`CarExtractor`] from a vector of bytes.
pub async fn from_vec(vec: Vec<u8>) -> Result<Self, Error> {
let mut loader = Self {
reader: v2::Reader::new(Cursor::new(vec)),
index: HashMap::new(),
};
loader.naive_build_index().await?;
Ok(loader)
Self::new(Cursor::new(vec)).await
}
}

Expand Down Expand Up @@ -170,11 +171,227 @@ where
}
}

#[cfg(feature = "blockstore")]
pub(crate) mod blockstore {
use std::{any::type_name, ops::Deref, path::Path};

use blockstore::Blockstore;
use futures::TryFutureExt;
use ipld_core::cid::Cid;
use tokio::{
fs::File,
io::{AsyncRead, AsyncSeek, AsyncSeekExt},
sync::RwLock,
};

use crate::{stores::to_blockstore_cid, CarExtractor, CidExt, Error};

// Methods in here are marked as unused in the "main" `impl` because they're only used here.
impl<R> CarExtractor<R>
where
R: AsyncRead + AsyncSeek + Unpin,
{
fn has(&self, cid: &Cid) -> bool {
if cid.get_identity_data().is_some() {
return true;
}
// Since we're using the naive index, if the Cid isn't in the index, there is no Cid inside
self.index.contains_key(cid)
}

async fn get(&mut self, cid: &Cid) -> Result<Option<Vec<u8>>, Error> {
if let Some(identity_data) = cid.get_identity_data() {
return Ok(Some(identity_data.to_vec()));
}

match self.index.get(&cid) {
Some(metadata) => {
// We could seek directly to the data and not read the Cid, but this is "canonical"
self.reader
.get_inner_mut()
.seek(std::io::SeekFrom::Start(metadata.block_offset))
.await?;
let (_, block) = self.reader.read_block().await?;
Ok(Some(block))
}
None => Ok(None),
}
}
}

/// A read-only [`blockstore::Blockstore`] implementation of [`CarExtractor`].
pub struct ReadOnlyBlockstore<R> {
inner: RwLock<CarExtractor<R>>,
}

impl<R> ReadOnlyBlockstore<R>
where
R: AsyncRead + AsyncSeek + Unpin + blockstore::cond_send::CondSync,
{
/// Create a new [`CarReadOnlyBlockstore`] from the given reader.
pub async fn new(reader: R) -> Result<Self, Error> {
Ok(Self {
inner: RwLock::new(CarExtractor::new(reader).await?),
})
}
}

impl ReadOnlyBlockstore<File> {
/// Create a new [`CarReadOnlyBlockstore<tokio::io::File>`](CarReadOnlyBlockstore) from the given path.
pub async fn from_path<P>(path: P) -> Result<Self, Error>
where
P: AsRef<Path>,
{
Self::new(File::open(path).await?).await
}
}

impl<R> Deref for ReadOnlyBlockstore<R> {
type Target = RwLock<CarExtractor<R>>;

fn deref(&self) -> &Self::Target {
&self.inner
}
}

impl<R> Blockstore for ReadOnlyBlockstore<R>
where
R: AsyncRead + AsyncSeek + Unpin + Send + Sync,
{
async fn get<const S: usize>(
&self,
cid: &ipld_core::cid::CidGeneric<S>,
) -> blockstore::Result<Option<Vec<u8>>> {
let cid = to_blockstore_cid(cid)?;
self.inner
.write()
.await
.get(&cid)
.map_err(|err| blockstore::Error::FatalDatabaseError(err.to_string()))
.await
}

async fn has<const S: usize>(
&self,
cid: &ipld_core::cid::CidGeneric<S>,
) -> blockstore::Result<bool> {
let cid = to_blockstore_cid(cid)?;
Ok(self.inner.read().await.has(&cid))
}

async fn put_keyed<const S: usize>(
&self,
_: &ipld_core::cid::CidGeneric<S>,
_: &[u8],
) -> blockstore::Result<()> {
Err(blockstore::Error::FatalDatabaseError(format!(
"{} is read-only",
type_name::<Self>()
)))
}

async fn remove<const S: usize>(
&self,
_: &ipld_core::cid::CidGeneric<S>,
) -> blockstore::Result<()> {
Err(blockstore::Error::FatalDatabaseError(format!(
"{} is read-only",
type_name::<Self>()
)))
}

async fn close(self) -> blockstore::Result<()> {
Ok(())
}
}

#[cfg(test)]
mod test {
use std::{str::FromStr, sync::Arc};

use ipld_core::cid::{multihash::Multihash, Cid};
use sha2::Sha256;
use tokio::fs::File;

use super::*;
use crate::{
multicodec::generate_multihash, test_utils::assert_buffer_eq, IDENTITY_CODE, RAW_CODE,
};

type FileBlockstore = ReadOnlyBlockstore<File>;

#[tokio::test]
async fn test_identity_cid() {
let blockstore = FileBlockstore::from_path("tests/fixtures/car_v2/spaceglenda.car")
.await
.unwrap();

let payload = b"Hello World!";
let multihash = Multihash::wrap(IDENTITY_CODE, payload).unwrap();
let identity_cid = Cid::new_v1(RAW_CODE, multihash);

let has_block = blockstore.has(&identity_cid).await.unwrap();
assert!(has_block);

let content = blockstore.get(&identity_cid).await.unwrap().unwrap();
assert_buffer_eq!(&payload, &content);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 3)]
async fn test_parallel_readers() {
let blockstore = FileBlockstore::from_path("tests/fixtures/car_v2/spaceglenda.car")
.await
.unwrap();
let blockstore = Arc::new(blockstore);

// CIDs of the content blocks that the spaceglenda.car contains. We are
// only looking at the raw content so that our validation is easier later.
let cids = vec![
Cid::from_str("bafkreic6kcrue6ms42ykrisq6or24pbrubnyouvmgvk7ft73fjd4ynslxi")
.unwrap(),
Cid::from_str("bafkreicvuc5rwwjqzix7saaia55du44qqsnphdugvjxlbe446mjmupekl4")
.unwrap(),
Cid::from_str("bafkreiepxrkqexuff4vhc4vp6co73ubbp2vmskbwwazaihln6wws2z4wly")
.unwrap(),
];

// Request many blocks
let handles = (0..100)
.into_iter()
.map(|i| {
let requested = cids[i % cids.len()];
tokio::spawn({
let blockstore = Arc::clone(&blockstore);
async move {
(
requested,
blockstore.get(&requested).await.unwrap().unwrap(),
)
}
})
})
.collect::<Vec<_>>();

// Validate if the blocks received are correct
for handle in handles {
let (requested_cid, block_bytes) = handle.await.expect("Panic in task");

// Generate the CID form the bytes. That way we can check if the
// block data returned is correct.
let multihash = generate_multihash::<Sha256, _>(&block_bytes);
let generated_cid = Cid::new_v1(RAW_CODE, multihash);

assert_eq!(requested_cid, generated_cid);
}
}
}
}

#[cfg(test)]
mod test {
use std::{io::Cursor, path::Path};

use crate::CarExtractor;
use crate::{test_utils::assert_buffer_eq, CarExtractor};

#[tokio::test]
async fn read_duplicated_blocks() {
Expand All @@ -189,7 +406,7 @@ mod test {
let inner = out_check.into_inner();
let result = inner.as_slice();

assert_eq!(expected, result);
assert_buffer_eq!(expected, result);
}

async fn load_and_compare<P1, P2>(original: P1, path: P2)
Expand Down
12 changes: 9 additions & 3 deletions mater/lib/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
//! A library to handle CAR files.
//! Both version 1 and version 2 are supported.
//!
//! You can make use of the lower-level utilities such as [`CarV2Reader`] to read a CARv2 file,
//! though these utilities were designed to be used in higher-level abstractions, like the [`Blockstore`].
#![warn(unused_crate_dependencies)]
#![warn(missing_docs)]
Expand Down Expand Up @@ -33,6 +30,15 @@ pub use v2::{
MultihashIndexSorted, Reader as CarV2Reader, SingleWidthIndex, Writer as CarV2Writer,
};

/// [`blockstore`] abstractions over CAR files.
#[cfg(feature = "blockstore")]
pub mod blockstore {
// Re-export the API so users don't need to add an extra crate in Cargo.toml
pub use blockstore::{Blockstore, Error};

pub use crate::file_reader::blockstore::ReadOnlyBlockstore;
}

/// CAR handling errors.
#[derive(Debug, thiserror::Error)]
pub enum Error {
Expand Down
18 changes: 3 additions & 15 deletions mater/lib/src/stores/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,10 +267,10 @@ impl FileBlockstore {

#[cfg(feature = "blockstore")]
mod blockstore {
use blockstore::{block::CidError, Blockstore, Error};
use ipld_core::cid::{Cid, CidGeneric};
use blockstore::{Blockstore, Error};
use ipld_core::cid::CidGeneric;

use crate::FileBlockstore;
use crate::{stores::to_blockstore_cid, FileBlockstore};

impl Blockstore for FileBlockstore {
async fn get<const S: usize>(&self, cid: &CidGeneric<S>) -> Result<Option<Vec<u8>>, Error> {
Expand Down Expand Up @@ -313,18 +313,6 @@ mod blockstore {
.map_err(|err| Error::FatalDatabaseError(err.to_string()))
}
}

/// Convert CID with the generic Multihash size to the CID with the specific
/// Multihash size that the underlying blockstore expects.
fn to_blockstore_cid<const S: usize>(cid: &CidGeneric<S>) -> Result<Cid, Error> {
let digest_size = cid.hash().size() as usize;
let hash = cid
.hash()
.resize::<64>()
.map_err(|_| Error::CidError(CidError::InvalidMultihashLength(digest_size)))?;

Ok(Cid::new(cid.version(), cid.codec(), hash).expect("we know cid is correct here"))
}
}

#[cfg(test)]
Expand Down
21 changes: 21 additions & 0 deletions mater/lib/src/stores/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,24 @@ impl Default for Config {
}
}
}

#[cfg(feature = "blockstore")]
mod _blockstore {
use blockstore::{block::CidError, Error};
use ipld_core::cid::{Cid, CidGeneric};

/// Convert CID with the generic Multihash size to the CID with the specific
/// Multihash size that the underlying blockstore expects.
pub fn to_blockstore_cid<const S: usize>(cid: &CidGeneric<S>) -> Result<Cid, Error> {
let digest_size = cid.hash().size() as usize;
let hash = cid
.hash()
.resize::<64>()
.map_err(|_| Error::CidError(CidError::InvalidMultihashLength(digest_size)))?;

Ok(Cid::new(cid.version(), cid.codec(), hash).expect("we know cid is correct here"))
}
}

#[cfg(feature = "blockstore")]
pub(crate) use _blockstore::to_blockstore_cid;
Loading

0 comments on commit 3473dfb

Please sign in to comment.