Skip to content

Commit

Permalink
add DbSetAccess
Browse files Browse the repository at this point in the history
* mark empty children for new block
  • Loading branch information
simonjiao committed Jul 11, 2024
1 parent 66d60c0 commit 0050f48
Show file tree
Hide file tree
Showing 4 changed files with 194 additions and 39 deletions.
66 changes: 27 additions & 39 deletions flexidag/src/consensusdb/consensus_relations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ use super::{
db::DBStorage,
prelude::{BatchDbWriter, CachedDbAccess, DirectDbWriter, StoreError},
};
use crate::consensusdb::set_access::CachedDbSetAccess;
use crate::define_schema;
use itertools::Itertools;
use rocksdb::WriteBatch;
use starcoin_crypto::HashValue as Hash;
use starcoin_types::blockhash::{BlockHashes, BlockLevel};
Expand All @@ -28,7 +30,8 @@ pub(crate) const PARENTS_CF: &str = "block-parents";
pub(crate) const CHILDREN_CF: &str = "block-children";

define_schema!(RelationParent, Hash, Arc<Vec<Hash>>, PARENTS_CF);
define_schema!(RelationChildren, Hash, Arc<Vec<Hash>>, CHILDREN_CF);
//define_schema!(LegacyRelationChildren, Hash, Arc<Vec<Hash>>, CHILDREN_CF);
define_schema!(RelationChildren, Hash, Hash, CHILDREN_CF);

impl KeyCodec<RelationParent> for Hash {
fn encode_key(&self) -> Result<Vec<u8>, StoreError> {
Expand Down Expand Up @@ -58,13 +61,13 @@ impl KeyCodec<RelationChildren> for Hash {
}
}

impl ValueCodec<RelationChildren> for Arc<Vec<Hash>> {
impl ValueCodec<RelationChildren> for Hash {
fn encode_value(&self) -> Result<Vec<u8>, StoreError> {
bcs_ext::to_bytes(self).map_err(|e| StoreError::EncodeError(e.to_string()))
Ok(self.to_vec())
}

fn decode_value(data: &[u8]) -> Result<Self, StoreError> {
bcs_ext::from_bytes(data).map_err(|e| StoreError::DecodeError(e.to_string()))
Hash::from_slice(data).map_err(|e| StoreError::DecodeError(e.to_string()))
}
}

Expand All @@ -74,7 +77,7 @@ pub struct DbRelationsStore {
db: Arc<DBStorage>,
level: BlockLevel,
parents_access: CachedDbAccess<RelationParent>,
children_access: CachedDbAccess<RelationChildren>,
children_access: CachedDbSetAccess<RelationChildren>,
}

impl DbRelationsStore {
Expand All @@ -83,7 +86,7 @@ impl DbRelationsStore {
db: Arc::clone(&db),
level,
parents_access: CachedDbAccess::new(Arc::clone(&db), cache_size),
children_access: CachedDbAccess::new(db, cache_size),
children_access: CachedDbSetAccess::new(db, cache_size),
}
}

Expand All @@ -105,22 +108,13 @@ impl DbRelationsStore {
self.parents_access
.write(BatchDbWriter::new(batch), hash, parents.clone())?;

// The new hash has no children yet
self.children_access.write(
BatchDbWriter::new(batch),
hash,
BlockHashes::new(Vec::new()),
)?;
// Mark empty children for `hash`, no writing happened.
self.children_access.initialize(hash);

// Update `children` for each parent
for parent in parents.iter().cloned() {
let mut children = (*self.get_children(parent)?).clone();
children.push(hash);
self.children_access.write(
BatchDbWriter::new(batch),
parent,
BlockHashes::new(children),
)?;
self.children_access
.write(BatchDbWriter::new(batch), parent, hash)?;
}

Ok(())
Expand All @@ -132,17 +126,20 @@ impl RelationsStoreReader for DbRelationsStore {
self.parents_access.read(hash)
}

// todo: use a more efficient way to get children
fn get_children(&self, hash: Hash) -> Result<BlockHashes, StoreError> {
self.children_access.read(hash)
Ok(Arc::new(
self.children_access
.read(hash)?
.read()
.iter()
.cloned()
.collect_vec(),
))
}

fn has(&self, hash: Hash) -> Result<bool, StoreError> {
if self.parents_access.has(hash)? {
debug_assert!(self.children_access.has(hash)?);
Ok(true)
} else {
Ok(false)
}
self.parents_access.has(hash)
}
}

Expand All @@ -158,22 +155,13 @@ impl RelationsStore for DbRelationsStore {
self.parents_access
.write(DirectDbWriter::new(&self.db), hash, parents.clone())?;

// The new hash has no children yet
self.children_access.write(
DirectDbWriter::new(&self.db),
hash,
BlockHashes::new(Vec::new()),
)?;
// Mark empty children for `hash`, no writing happened.
self.children_access.initialize(hash);

// Update `children` for each parent
for parent in parents.iter().cloned() {
let mut children = (*self.get_children(parent)?).clone();
children.push(hash);
self.children_access.write(
DirectDbWriter::new(&self.db),
parent,
BlockHashes::new(children),
)?;
self.children_access
.write(DirectDbWriter::new(&self.db), parent, hash)?;
}

Ok(())
Expand Down
1 change: 1 addition & 0 deletions flexidag/src/consensusdb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ mod db;
mod error;
mod item;
pub mod schema;
mod set_access;
mod writer;

pub mod prelude {
Expand Down
129 changes: 129 additions & 0 deletions flexidag/src/consensusdb/set_access.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
use crate::consensusdb::schema::{KeyCodec, ValueCodec};
use crate::consensusdb::{cache::DagCache, error::StoreError, schema::Schema, writer::DbWriter};
use parking_lot::RwLock;
use rocksdb::{IteratorMode, ReadOptions};
use starcoin_storage::db_storage::DBStorage;
use std::collections::HashSet;
use std::error::Error;
use std::{collections::hash_map::RandomState, marker::PhantomData, sync::Arc};

#[derive(Clone)]
pub struct DbSetAccess<S: Schema, R = RandomState> {
db: Arc<DBStorage>,
_phantom: PhantomData<(S, R)>,
}

impl<S: Schema> DbSetAccess<S> {
pub fn new(db: Arc<DBStorage>) -> Self {
Self {
db,
_phantom: Default::default(),
}
}

pub fn read(&self, key: S::Key) -> Result<HashSet<S::Value>, StoreError>
where
S::Value: std::cmp::Eq + std::hash::Hash,
{
self.seek_iterator(key, usize::MAX, false)
.map(|iter| iter.filter_map(Result::ok).collect::<HashSet<S::Value>>())
.map_err(Into::into)
}

pub fn write(
&self,
mut writer: impl DbWriter,
key: S::Key,
value: S::Value,
) -> Result<(), StoreError>
where
S::Value: std::cmp::Eq + std::hash::Hash,
{
let db_key = key
.encode_key()?
.iter()
.chain(value.encode_value()?.iter())
.copied()
.collect::<Vec<_>>();

writer.put_inner::<S>(&db_key, &[])
}

fn seek_iterator(
&self,
db_key: S::Key,
limit: usize, // amount to take.
skip_first: bool, // skips the first value, (useful in conjunction with the seek-key, as to not re-retrieve).
) -> Result<impl Iterator<Item = Result<S::Value, Box<dyn Error>>> + '_, StoreError> {
let db_key = db_key.encode_key()?;
let mut read_opts = ReadOptions::default();
read_opts.set_iterate_range(rocksdb::PrefixRange::<&[u8]>(db_key.as_ref()));

let mut db_iterator = self
.db
.raw_iterator_cf_opt(S::COLUMN_FAMILY, IteratorMode::Start, read_opts)
.map_err(|e| StoreError::CFNotExist(e.to_string()))?;

if skip_first {
db_iterator.next();
}

Ok(db_iterator.take(limit).map(move |item| match item {
Ok((key_bytes, _)) => {
S::Value::decode_value(&key_bytes[db_key.len()..]).map_err(Into::into)
}
Err(err) => Err(err.into()),
}))
}
}

#[derive(Clone)]
pub struct CachedDbSetAccess<S: Schema, R = RandomState> {
inner: DbSetAccess<S, R>,
cache: DagCache<S::Key, Arc<RwLock<HashSet<S::Value>>>>,
}
impl<S: Schema> CachedDbSetAccess<S> {
pub fn new(db: Arc<DBStorage>, cache_size: usize) -> Self {
Self {
inner: DbSetAccess::<S>::new(db),
cache: DagCache::new_with_capacity(cache_size),
}
}

// Mark the key has been initialized in memory to speed up the read operation.
pub fn initialize(&self, key: S::Key) {
self.cache
.insert(key, Arc::new(RwLock::new(HashSet::new())));
}

pub fn read(&self, key: S::Key) -> Result<Arc<RwLock<HashSet<S::Value>>>, StoreError>
where
S::Value: std::cmp::Eq + std::hash::Hash,
{
self.cache.get(&key).map_or_else(
|| {
self.inner.read(key.clone()).map(|v| {
let v = Arc::new(RwLock::new(v));
self.cache.insert(key, v.clone());
v
})
},
Ok,
)
}

pub fn write(
&self,
writer: impl DbWriter,
key: S::Key,
value: S::Value,
) -> Result<(), StoreError>
where
S::Value: std::cmp::Eq + std::hash::Hash,
{
let data = self.read(key.clone())?;
self.inner.write(writer, key, value.clone())?;
data.write().insert(value);
Ok(())
}
}
37 changes: 37 additions & 0 deletions flexidag/src/consensusdb/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ use super::{db::DBStorage, error::StoreError};
pub trait DbWriter {
fn put<S: Schema>(&mut self, key: &S::Key, value: &S::Value) -> Result<(), StoreError>;
fn delete<S: Schema>(&mut self, key: &S::Key) -> Result<(), StoreError>;
fn put_inner<S: Schema>(
&mut self,
key: &dyn AsRef<[u8]>,
value: &dyn AsRef<[u8]>,
) -> Result<(), StoreError>;
}

pub struct DirectDbWriter<'a> {
Expand Down Expand Up @@ -35,6 +40,20 @@ impl DbWriter for DirectDbWriter<'_> {
.remove(S::COLUMN_FAMILY, key)
.map_err(|e| StoreError::DBIoError(e.to_string()))
}

fn put_inner<S: Schema>(
&mut self,
key: &dyn AsRef<[u8]>,
value: &dyn AsRef<[u8]>,
) -> Result<(), StoreError> {
self.db
.put(
S::COLUMN_FAMILY,
key.as_ref().to_vec(),
value.as_ref().to_vec(),
)
.map_err(|e| StoreError::DBIoError(e.to_string()))
}
}

pub struct BatchDbWriter<'a> {
Expand All @@ -60,6 +79,15 @@ impl DbWriter for BatchDbWriter<'_> {
self.batch.delete(key);
Ok(())
}

fn put_inner<S: Schema>(
&mut self,
key: &dyn AsRef<[u8]>,
value: &dyn AsRef<[u8]>,
) -> Result<(), StoreError> {
self.batch.put(key, value);
Ok(())
}
}

impl<T: DbWriter> DbWriter for &mut T {
Expand All @@ -72,4 +100,13 @@ impl<T: DbWriter> DbWriter for &mut T {
fn delete<S: Schema>(&mut self, key: &S::Key) -> Result<(), StoreError> {
(*self).delete::<S>(key)
}

#[inline]
fn put_inner<S: Schema>(
&mut self,
key: &dyn AsRef<[u8]>,
value: &dyn AsRef<[u8]>,
) -> Result<(), StoreError> {
(*self).put_inner::<S>(key, value)
}
}

0 comments on commit 0050f48

Please sign in to comment.