Skip to content

Commit

Permalink
1, fix encode bug
Browse files Browse the repository at this point in the history
2, and write batch sync
  • Loading branch information
jackzhhuang committed Nov 28, 2024
1 parent de84d7b commit 262ba00
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 14 deletions.
25 changes: 11 additions & 14 deletions flexidag/src/consensusdb/consensus_relations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,12 @@ use super::{
prelude::{CachedDbAccess, StoreError},
};
use crate::define_schema;
use bcs_ext::BCSCodec;
use starcoin_crypto::HashValue as Hash;
use starcoin_storage::batch::{WriteBatch, WriteBatchData, WriteBatchWithColumn};
use starcoin_storage::storage::{InnerStore, WriteOp};
use starcoin_types::blockhash::{BlockHashes, BlockLevel};
use std::collections::HashMap;
use std::sync::Arc;

/// Reader API for `RelationsStore`.
pub trait RelationsStoreReader {
fn get_parents(&self, hash: Hash) -> Result<BlockHashes, StoreError>;
Expand Down Expand Up @@ -122,15 +120,17 @@ impl RelationsStore for DbRelationsStore {

let mut parent_to_children = HashMap::new();
for parent in parents.iter().cloned() {
let mut children = (*self.get_children(parent)?).clone();
let mut children = match self.get_children(parent) {
Ok(children) => (*children).clone(),
Err(e) => match e {
StoreError::KeyNotFound(_) => vec![],
_ => return std::result::Result::Err(e),
},
};
children.push(hash);
parent_to_children.insert(
parent
.encode()
.map_err(|e| StoreError::EncodeError(e.to_string()))?,
children
.encode()
.map_err(|e| StoreError::EncodeError(e.to_string()))?,
parent.to_vec(),
bcs_ext::to_bytes(&children).map_err(|e| StoreError::EncodeError(e.to_string()))?,
);
}

Expand All @@ -139,11 +139,9 @@ impl RelationsStore for DbRelationsStore {
WriteBatchData {
column: PARENTS_CF.to_string(),
row_data: WriteBatch::new_with_rows(vec![(
hash.encode()
.map_err(|e| StoreError::EncodeError(e.to_string()))?,
hash.to_vec(),
WriteOp::Value(
parents
.encode()
bcs_ext::to_bytes(&parents)
.map_err(|e| StoreError::EncodeError(e.to_string()))?,
),
)]),
Expand All @@ -162,7 +160,6 @@ impl RelationsStore for DbRelationsStore {
self.db
.write_batch_with_column(batch)
.map_err(|e| StoreError::DBIoError(e.to_string()))?;

Ok(())
}
}
Expand Down
4 changes: 4 additions & 0 deletions storage/src/cache_storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ impl InnerStore for CacheStorage {
self.write_batch(prefix_name, batch)
}

fn write_batch_with_column_sync(&self, batch: WriteBatchWithColumn) -> Result<()> {
self.write_batch_with_column(batch)
}

fn multi_get(&self, prefix_name: &str, keys: Vec<Vec<u8>>) -> Result<Vec<Option<Vec<u8>>>> {
let composed_keys = keys
.into_iter()
Expand Down
23 changes: 23 additions & 0 deletions storage/src/db_storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,29 @@ impl InnerStore for DBStorage {
})
}

fn write_batch_with_column_sync(&self, batch: WriteBatchWithColumn) -> Result<()> {
let mut db_batch = DBWriteBatch::default();
batch.data.into_iter().for_each(|data| {
let cf_handle = self.get_cf_handle(&data.column);
for (key, write_op) in data.row_data.rows {
match write_op {
WriteOp::Value(value) => db_batch.put_cf(cf_handle, key, value),
WriteOp::Deletion => db_batch.delete_cf(cf_handle, key),
};
}
});
record_metrics(
"db",
"write_batch_column",
"write_batch",
self.metrics.as_ref(),
)
.call(|| {
self.db.write_opt(db_batch, &Self::sync_write_options())?;
Ok(())
})
}

fn get_len(&self) -> Result<u64> {
unimplemented!()
}
Expand Down
12 changes: 12 additions & 0 deletions storage/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ pub trait InnerStore: Send + Sync {
fn keys(&self) -> Result<Vec<Vec<u8>>>;
fn put_sync(&self, prefix_name: &str, key: Vec<u8>, value: Vec<u8>) -> Result<()>;
fn write_batch_sync(&self, prefix_name: &str, batch: WriteBatch) -> Result<()>;
fn write_batch_with_column_sync(&self, batch: WriteBatchWithColumn) -> Result<()>;
fn multi_get(&self, prefix_name: &str, keys: Vec<Vec<u8>>) -> Result<Vec<Option<Vec<u8>>>>;
}

Expand Down Expand Up @@ -254,6 +255,17 @@ impl InnerStore for StorageInstance {
}
}

fn write_batch_with_column_sync(&self, batch: WriteBatchWithColumn) -> Result<()> {
match self {
Self::CACHE { cache } => cache.write_batch_with_column_sync(batch),
Self::DB { db } => db.write_batch_with_column_sync(batch),
Self::CacheAndDb { cache, db } => {
db.write_batch_with_column_sync(batch.clone())?;
cache.write_batch_with_column_sync(batch)
}
}
}

fn multi_get(&self, prefix_name: &str, keys: Vec<Vec<u8>>) -> Result<Vec<Option<Vec<u8>>>> {
match self {
Self::CACHE { cache } => cache.multi_get(prefix_name, keys),
Expand Down

0 comments on commit 262ba00

Please sign in to comment.