Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: thread-safe storage #283

Draft
wants to merge 1 commit into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 35 additions & 39 deletions storage/src/rocksdb_storage/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

//! Implementation for a storage abstraction over RocksDB.

use std::path::Path;
use std::{path::Path, sync::Arc};

use error::Error;
use grovedb_costs::{
Expand All @@ -40,8 +40,8 @@
use integer_encoding::VarInt;
use lazy_static::lazy_static;
use rocksdb::{
checkpoint::Checkpoint, ColumnFamily, ColumnFamilyDescriptor, OptimisticTransactionDB,
Transaction, WriteBatchWithTransaction, DEFAULT_COLUMN_FAMILY_NAME,
checkpoint::Checkpoint, BoundColumnFamily, ColumnFamily, ColumnFamilyDescriptor, MultiThreaded,

Check warning on line 43 in storage/src/rocksdb_storage/storage.rs

View workflow job for this annotation

GitHub Actions / clippy

unused import: `ColumnFamily`

warning: unused import: `ColumnFamily` --> storage/src/rocksdb_storage/storage.rs:43:48 | 43 | checkpoint::Checkpoint, BoundColumnFamily, ColumnFamily, ColumnFamilyDescriptor, MultiThreaded, | ^^^^^^^^^^^^ | = note: `#[warn(unused_imports)]` on by default
OptimisticTransactionDB, Transaction, WriteBatchWithTransaction, DEFAULT_COLUMN_FAMILY_NAME,
};

use super::{
Expand Down Expand Up @@ -89,14 +89,14 @@
}

/// Type alias for a database
pub(crate) type Db = OptimisticTransactionDB;
pub(crate) type Db = OptimisticTransactionDB<MultiThreaded>;

/// Type alias for a transaction
pub(crate) type Tx<'db> = Transaction<'db, Db>;

/// Storage which uses RocksDB as its backend.
pub struct RocksDbStorage {
db: OptimisticTransactionDB,
db: Db,
}

impl RocksDbStorage {
Expand Down Expand Up @@ -207,7 +207,7 @@
value,
cost_info,
} => {
db_batch.put_cf(cf_aux(&self.db), &key, &value);
db_batch.put_cf(&self.cf_aux(), &key, &value);
cost.seek_count += 1;
cost_return_on_error_no_add!(
&cost,
Expand All @@ -226,7 +226,7 @@
value,
cost_info,
} => {
db_batch.put_cf(cf_roots(&self.db), &key, &value);
db_batch.put_cf(&self.cf_roots(), &key, &value);
cost.seek_count += 1;
// We only add costs for put root if they are set, otherwise it is free
if cost_info.is_some() {
Expand All @@ -248,7 +248,7 @@
value,
cost_info,
} => {
db_batch.put_cf(cf_meta(&self.db), &key, &value);
db_batch.put_cf(&self.cf_meta(), &key, &value);
cost.seek_count += 1;
cost_return_on_error_no_add!(
&cost,
Expand Down Expand Up @@ -292,7 +292,7 @@
}
}
AbstractBatchOperation::DeleteAux { key, cost_info } => {
db_batch.delete_cf(cf_aux(&self.db), &key);
db_batch.delete_cf(&self.cf_aux(), &key);

// TODO: fix not atomic freed size computation
if let Some(key_value_removed_bytes) = cost_info {
Expand All @@ -303,7 +303,7 @@
cost.seek_count += 2;
let value_len = cost_return_on_error_no_add!(
&cost,
self.db.get_cf(cf_aux(&self.db), &key).map_err(RocksDBError)
self.db.get_cf(&self.cf_aux(), &key).map_err(RocksDBError)
)
.map(|x| x.len() as u32)
.unwrap_or(0);
Expand All @@ -320,7 +320,7 @@
}
}
AbstractBatchOperation::DeleteRoot { key, cost_info } => {
db_batch.delete_cf(cf_roots(&self.db), &key);
db_batch.delete_cf(&self.cf_roots(), &key);

// TODO: fix not atomic freed size computation
if let Some(key_value_removed_bytes) = cost_info {
Expand All @@ -331,9 +331,7 @@
cost.seek_count += 2;
let value_len = cost_return_on_error_no_add!(
&cost,
self.db
.get_cf(cf_roots(&self.db), &key)
.map_err(RocksDBError)
self.db.get_cf(&self.cf_roots(), &key).map_err(RocksDBError)
)
.map(|x| x.len() as u32)
.unwrap_or(0);
Expand All @@ -350,7 +348,7 @@
}
}
AbstractBatchOperation::DeleteMeta { key, cost_info } => {
db_batch.delete_cf(cf_meta(&self.db), &key);
db_batch.delete_cf(&self.cf_meta(), &key);

// TODO: fix not atomic freed size computation
if let Some(key_value_removed_bytes) = cost_info {
Expand All @@ -361,9 +359,7 @@
cost.seek_count += 2;
let value_len = cost_return_on_error_no_add!(
&cost,
self.db
.get_cf(cf_meta(&self.db), &key)
.map_err(RocksDBError)
self.db.get_cf(&self.cf_meta(), &key).map_err(RocksDBError)
)
.map(|x| x.len() as u32)
.unwrap_or(0);
Expand Down Expand Up @@ -432,6 +428,27 @@
}
Ok(())
}

/// Get auxiliary data column family
fn cf_aux(&self) -> Arc<BoundColumnFamily> {
self.db
.cf_handle(AUX_CF_NAME)
.expect("aux column family must exist")
}

/// Get trees roots data column family
fn cf_roots(&self) -> Arc<BoundColumnFamily> {
self.db
.cf_handle(ROOTS_CF_NAME)
.expect("roots column family must exist")
}

/// Get metadata column family
fn cf_meta(&self) -> Arc<BoundColumnFamily> {
self.db
.cf_handle(META_CF_NAME)
.expect("meta column family must exist")
}
}

impl<'db> Storage<'db> for RocksDbStorage {
Expand Down Expand Up @@ -530,27 +547,6 @@
}
}

/// Get auxiliary data column family
fn cf_aux(storage: &Db) -> &ColumnFamily {
storage
.cf_handle(AUX_CF_NAME)
.expect("aux column family must exist")
}

/// Get trees roots data column family
fn cf_roots(storage: &Db) -> &ColumnFamily {
storage
.cf_handle(ROOTS_CF_NAME)
.expect("roots column family must exist")
}

/// Get metadata column family
fn cf_meta(storage: &Db) -> &ColumnFamily {
storage
.cf_handle(META_CF_NAME)
.expect("meta column family must exist")
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
16 changes: 9 additions & 7 deletions storage/src/rocksdb_storage/storage_context/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@

//! Prefixed storage batch implementation for RocksDB backend.

use std::sync::Arc;

use grovedb_costs::{
storage_cost::key_value_cost::KeyValueStorageCost, ChildrenSizesWithIsSumTree, OperationCost,
};
use integer_encoding::VarInt;
use rocksdb::{ColumnFamily, WriteBatchWithTransaction};
use rocksdb::{BoundColumnFamily, ColumnFamily, WriteBatchWithTransaction};

Check warning on line 37 in storage/src/rocksdb_storage/storage_context/batch.rs

View workflow job for this annotation

GitHub Actions / clippy

unused import: `ColumnFamily`

warning: unused import: `ColumnFamily` --> storage/src/rocksdb_storage/storage_context/batch.rs:37:34 | 37 | use rocksdb::{BoundColumnFamily, ColumnFamily, WriteBatchWithTransaction}; | ^^^^^^^^^^^^

use super::make_prefixed_key;
use crate::{rocksdb_storage::storage::SubtreePrefix, Batch, StorageBatch};
Expand All @@ -43,8 +45,8 @@
pub struct PrefixedRocksDbBatch<'db> {
pub(crate) prefix: SubtreePrefix,
pub(crate) batch: WriteBatchWithTransaction<true>,
pub(crate) cf_aux: &'db ColumnFamily,
pub(crate) cf_roots: &'db ColumnFamily,
pub(crate) cf_aux: Arc<BoundColumnFamily<'db>>,
pub(crate) cf_roots: Arc<BoundColumnFamily<'db>>,

/// As a batch to be commited is a RocksDB batch and there is no way to get
/// what it will do, we collect costs at the moment we append something to
Expand Down Expand Up @@ -109,7 +111,7 @@
cost_info,
)?;

self.batch.put_cf(self.cf_aux, prefixed_key, value);
self.batch.put_cf(&self.cf_aux, prefixed_key, value);
Ok(())
}

Expand All @@ -132,7 +134,7 @@
)?;
}

self.batch.put_cf(self.cf_roots, prefixed_key, value);
self.batch.put_cf(&self.cf_roots, prefixed_key, value);
Ok(())
}

Expand All @@ -157,7 +159,7 @@
self.cost_acc.storage_cost.removed_bytes += removed_bytes.combined_removed_bytes();
}

self.batch.delete_cf(self.cf_aux, prefixed_key);
self.batch.delete_cf(&self.cf_aux, prefixed_key);
}

fn delete_root<K: AsRef<[u8]>>(&mut self, key: K, cost_info: Option<KeyValueStorageCost>) {
Expand All @@ -169,7 +171,7 @@
self.cost_acc.storage_cost.removed_bytes += removed_bytes.combined_removed_bytes();
}

self.batch.delete_cf(self.cf_roots, prefixed_key);
self.batch.delete_cf(&self.cf_roots, prefixed_key);
}
}

Expand Down
34 changes: 21 additions & 13 deletions storage/src/rocksdb_storage/storage_context/context_immediate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,16 @@

//! Storage context implementation with a transaction.

use std::sync::Arc;

use error::Error;
use grovedb_costs::{
storage_cost::key_value_cost::KeyValueStorageCost, ChildrenSizesWithIsSumTree, CostResult,
CostsExt,
};
use rocksdb::{ColumnFamily, DBRawIteratorWithThreadMode, WriteBatchWithTransaction};
use rocksdb::{
BoundColumnFamily, ColumnFamily, DBRawIteratorWithThreadMode, WriteBatchWithTransaction,

Check warning on line 39 in storage/src/rocksdb_storage/storage_context/context_immediate.rs

View workflow job for this annotation

GitHub Actions / clippy

unused import: `ColumnFamily`

warning: unused import: `ColumnFamily` --> storage/src/rocksdb_storage/storage_context/context_immediate.rs:39:24 | 39 | BoundColumnFamily, ColumnFamily, DBRawIteratorWithThreadMode, WriteBatchWithTransaction, | ^^^^^^^^^^^^
};

use super::{make_prefixed_key, PrefixedRocksDbBatch, PrefixedRocksDbRawIterator};
use crate::{
Expand Down Expand Up @@ -64,21 +68,21 @@

impl<'db> PrefixedRocksDbImmediateStorageContext<'db> {
/// Get auxiliary data column family
fn cf_aux(&self) -> &'db ColumnFamily {
fn cf_aux(&self) -> Arc<BoundColumnFamily<'db>> {
self.storage
.cf_handle(AUX_CF_NAME)
.expect("aux column family must exist")
}

/// Get trees roots data column family
fn cf_roots(&self) -> &'db ColumnFamily {
fn cf_roots(&self) -> Arc<BoundColumnFamily<'db>> {
self.storage
.cf_handle(ROOTS_CF_NAME)
.expect("roots column family must exist")
}

/// Get metadata column family
fn cf_meta(&self) -> &'db ColumnFamily {
fn cf_meta(&self) -> Arc<BoundColumnFamily<'db>> {
self.storage
.cf_handle(META_CF_NAME)
.expect("meta column family must exist")
Expand Down Expand Up @@ -109,7 +113,7 @@
_cost_info: Option<KeyValueStorageCost>,
) -> CostResult<(), Error> {
self.transaction
.put_cf(self.cf_aux(), make_prefixed_key(&self.prefix, &key), value)
.put_cf(&self.cf_aux(), make_prefixed_key(&self.prefix, &key), value)
.map_err(RocksDBError)
.wrap_with_cost(Default::default())
}
Expand All @@ -122,7 +126,7 @@
) -> CostResult<(), Error> {
self.transaction
.put_cf(
self.cf_roots(),
&self.cf_roots(),
make_prefixed_key(&self.prefix, &key),
value,
)
Expand All @@ -137,7 +141,11 @@
_cost_info: Option<KeyValueStorageCost>,
) -> CostResult<(), Error> {
self.transaction
.put_cf(self.cf_meta(), make_prefixed_key(&self.prefix, &key), value)
.put_cf(
&self.cf_meta(),
make_prefixed_key(&self.prefix, &key),
value,
)
.map_err(RocksDBError)
.wrap_with_cost(Default::default())
}
Expand All @@ -159,7 +167,7 @@
_cost_info: Option<KeyValueStorageCost>,
) -> CostResult<(), Error> {
self.transaction
.delete_cf(self.cf_aux(), make_prefixed_key(&self.prefix, key))
.delete_cf(&self.cf_aux(), make_prefixed_key(&self.prefix, key))
.map_err(RocksDBError)
.wrap_with_cost(Default::default())
}
Expand All @@ -170,7 +178,7 @@
_cost_info: Option<KeyValueStorageCost>,
) -> CostResult<(), Error> {
self.transaction
.delete_cf(self.cf_roots(), make_prefixed_key(&self.prefix, key))
.delete_cf(&self.cf_roots(), make_prefixed_key(&self.prefix, key))
.map_err(RocksDBError)
.wrap_with_cost(Default::default())
}
Expand All @@ -181,7 +189,7 @@
_cost_info: Option<KeyValueStorageCost>,
) -> CostResult<(), Error> {
self.transaction
.delete_cf(self.cf_meta(), make_prefixed_key(&self.prefix, key))
.delete_cf(&self.cf_meta(), make_prefixed_key(&self.prefix, key))
.map_err(RocksDBError)
.wrap_with_cost(Default::default())
}
Expand All @@ -195,21 +203,21 @@

fn get_aux<K: AsRef<[u8]>>(&self, key: K) -> CostResult<Option<Vec<u8>>, Error> {
self.transaction
.get_cf(self.cf_aux(), make_prefixed_key(&self.prefix, key))
.get_cf(&self.cf_aux(), make_prefixed_key(&self.prefix, key))
.map_err(RocksDBError)
.wrap_with_cost(Default::default())
}

fn get_root<K: AsRef<[u8]>>(&self, key: K) -> CostResult<Option<Vec<u8>>, Error> {
self.transaction
.get_cf(self.cf_roots(), make_prefixed_key(&self.prefix, key))
.get_cf(&self.cf_roots(), make_prefixed_key(&self.prefix, key))
.map_err(RocksDBError)
.wrap_with_cost(Default::default())
}

fn get_meta<K: AsRef<[u8]>>(&self, key: K) -> CostResult<Option<Vec<u8>>, Error> {
self.transaction
.get_cf(self.cf_meta(), make_prefixed_key(&self.prefix, key))
.get_cf(&self.cf_meta(), make_prefixed_key(&self.prefix, key))
.map_err(RocksDBError)
.wrap_with_cost(Default::default())
}
Expand Down
Loading
Loading