Skip to content

Commit

Permalink
add widecolumns get put and columns api
Browse files Browse the repository at this point in the history
  • Loading branch information
Congyuwang committed May 14, 2024
1 parent 7b031f4 commit 9da584a
Show file tree
Hide file tree
Showing 7 changed files with 210 additions and 6 deletions.
2 changes: 1 addition & 1 deletion .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
url = https://github.com/google/snappy.git
[submodule "rocksdb_sys/rocksdb"]
path = librocksdb-sys/rocksdb
url = https://github.com/facebook/rocksdb.git
url = https://github.com/Congyuwang/rocksdb.git
2 changes: 1 addition & 1 deletion librocksdb-sys/rocksdb
Submodule rocksdb updated 429 files
11 changes: 11 additions & 0 deletions librocksdb-sys/rocksdb_lib_sources.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ cache/secondary_cache_adapter.cc
cache/sharded_cache.cc
cache/tiered_secondary_cache.cc
db/arena_wrapped_db_iter.cc
db/attribute_group_iterator_impl.cc
db/blob/blob_contents.cc
db/blob/blob_fetcher.cc
db/blob/blob_file_addition.cc
Expand All @@ -28,6 +29,7 @@ db/blob/blob_source.cc
db/blob/prefetch_buffer_collection.cc
db/builder.cc
db/c.cc
db/coalescing_iterator.cc
db/column_family.cc
db/compaction/compaction.cc
db/compaction/compaction_iterator.cc
Expand All @@ -49,6 +51,7 @@ db/db_impl/db_impl_compaction_flush.cc
db/db_impl/db_impl_debug.cc
db/db_impl/db_impl_experimental.cc
db/db_impl/db_impl_files.cc
db/db_impl/db_impl_follower.cc
db/db_impl/db_impl_open.cc
db/db_impl/db_impl_readonly.cc
db/db_impl/db_impl_secondary.cc
Expand Down Expand Up @@ -105,6 +108,7 @@ env/env_chroot.cc
env/env_encryption.cc
env/env_posix.cc
env/file_system.cc
env/fs_on_demand.cc
env/fs_posix.cc
env/fs_remap.cc
env/file_system_tracer.cc
Expand Down Expand Up @@ -160,6 +164,12 @@ options/options_helper.cc
options/options_parser.cc
port/mmap.cc
port/port_posix.cc
port/win/env_default.cc
port/win/env_win.cc
port/win/io_win.cc
port/win/port_win.cc
port/win/win_logger.cc
port/win/win_thread.cc
port/stack_trace.cc
table/adaptive/adaptive_table_factory.cc
table/block_based/binary_search_index_reader.cc
Expand Down Expand Up @@ -311,6 +321,7 @@ utilities/transactions/write_prepared_txn_db.cc
utilities/transactions/write_unprepared_txn.cc
utilities/transactions/write_unprepared_txn_db.cc
utilities/ttl/db_ttl_impl.cc
utilities/types_util.cc
utilities/wal_filter.cc
utilities/write_batch_with_index/write_batch_with_index.cc
utilities/write_batch_with_index/write_batch_with_index_internal.cc
83 changes: 80 additions & 3 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,11 @@
//

use crate::{
column_family::AsColumnFamilyRef,
column_family::BoundColumnFamily,
column_family::UnboundColumnFamily,
column_family::{AsColumnFamilyRef, BoundColumnFamily, UnboundColumnFamily},
db_options::OptionsMustOutliveDB,
ffi,
ffi_util::{from_cstr, opt_bytes_to_ptr, raw_data, to_cpath, CStrLike},
wide_columns::PinnableWideColumns,
ColumnFamily, ColumnFamilyDescriptor, CompactOptions, DBIteratorWithThreadMode,
DBPinnableSlice, DBRawIteratorWithThreadMode, DBWALIterator, Direction, Error, FlushOptions,
IngestExternalFileOptions, IteratorMode, Options, ReadOptions, SnapshotWithThreadMode,
Expand Down Expand Up @@ -1064,6 +1063,40 @@ impl<T: ThreadMode, D: DBInner> DBCommon<T, D> {
}
}

/// Return the value associated with a key using RocksDB's PinnableSlice
/// so as to avoid unnecessary memory copy. Similar to get_pinned_opt but
/// allows specifying ColumnFamily
pub fn get_entity_cf_opt<K: AsRef<[u8]>>(
&self,
cf: &impl AsColumnFamilyRef,
key: K,
readopts: &ReadOptions,
) -> Result<Option<PinnableWideColumns>, Error> {
if readopts.inner.is_null() {
return Err(Error::new(
"Unable to create RocksDB read options. This is a fairly trivial call, and its \
failure may be indicative of a mis-compiled or mis-loaded RocksDB library."
.to_owned(),
));
}

let key = key.as_ref();
unsafe {
let val = ffi_try!(ffi::rocksdb_get_entity_cf(
self.inner.inner(),
readopts.inner,
cf.inner(),
key.as_ptr() as *const c_char,
key.len() as size_t,
));
if val.is_null() {
Ok(None)
} else {
Ok(Some(PinnableWideColumns::from_c(val)))
}
}
}

/// Return the value associated with a key using RocksDB's PinnableSlice
/// so as to avoid unnecessary memory copy. Similar to get_pinned_cf_opt but
/// leverages default options.
Expand Down Expand Up @@ -1536,6 +1569,50 @@ impl<T: ThreadMode, D: DBInner> DBCommon<T, D> {
}
}

pub fn put_entity_cf_opt<K, N, V, S>(
&self,
cf: &impl AsColumnFamilyRef,
key: K,
names: N,
values: V,
writeopts: &WriteOptions,
) -> Result<(), Error>
where
K: AsRef<[u8]>,
N: AsRef<[S]>,
V: AsRef<[S]>,
S: AsRef<[u8]>,
{
let key = key.as_ref();
let names = names.as_ref();
let values = values.as_ref();

if names.len() != values.len() {
return Err(Error::new(
"columns names and values length mismatch".to_string(),
));
}

let names_sizes: Vec<usize> = names.iter().map(|x| x.as_ref().len()).collect();
let values_sizes: Vec<usize> = values.iter().map(|x| x.as_ref().len()).collect();

unsafe {
ffi_try!(ffi::rocksdb_put_entity_cf(
self.inner.inner(),
writeopts.inner,
cf.inner(),
key.as_ptr() as *const c_char,
key.len() as size_t,
names.len(),
names.as_ptr() as *const *const c_char,
names_sizes.as_ptr() as *const usize,
values.as_ptr() as *const *const c_char,
values_sizes.as_ptr() as *const usize,
));
Ok(())
}
}

pub fn merge_opt<K, V>(&self, key: K, value: V, writeopts: &WriteOptions) -> Result<(), Error>
where
K: AsRef<[u8]>,
Expand Down
20 changes: 19 additions & 1 deletion src/db_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@

use crate::{
db::{DBAccess, DB},
ffi, Error, ReadOptions, WriteBatch,
ffi,
wide_columns::WideColumns,
Error, ReadOptions, WriteBatch,
};
use libc::{c_char, c_uchar, size_t};
use std::{marker::PhantomData, slice};
Expand Down Expand Up @@ -332,6 +334,15 @@ impl<'a, D: DBAccess> DBRawIteratorWithThreadMode<'a, D> {
}
}

/// Returns pair with slice of the current key and current value.
pub fn columns(&self) -> Option<WideColumns> {
if self.valid() {
Some(self.columns_impl())
} else {
None
}
}

/// Returns a slice of the current key; assumes the iterator is valid.
fn key_impl(&self) -> &[u8] {
// Safety Note: This is safe as all methods that may invalidate the buffer returned
Expand All @@ -355,6 +366,13 @@ impl<'a, D: DBAccess> DBRawIteratorWithThreadMode<'a, D> {
slice::from_raw_parts(val_ptr as *const c_uchar, val_len)
}
}

fn columns_impl(&self) -> WideColumns {
unsafe {
let columns = ffi::rocksdb_iter_columns(self.inner.as_ptr());
WideColumns::from_c(columns)
}
}
}

impl<'a, D: DBAccess> Drop for DBRawIteratorWithThreadMode<'a, D> {
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ mod snapshot;
mod sst_file_writer;
pub mod statistics;
mod transactions;
mod wide_columns;
mod write_batch;

pub use crate::{
Expand Down Expand Up @@ -133,6 +134,7 @@ pub use crate::{
OptimisticTransactionDB, OptimisticTransactionOptions, Transaction, TransactionDB,
TransactionDBOptions, TransactionOptions,
},
wide_columns::{PinnableWideColumns, WideColumn, WideColumns},
write_batch::{WriteBatch, WriteBatchIterator, WriteBatchWithTransaction},
};

Expand Down
96 changes: 96 additions & 0 deletions src/wide_columns.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
use crate::ffi;
use std::{
marker::PhantomData,
ptr::{null, slice_from_raw_parts},
};

pub struct WideColumn<'a> {
pub name: &'a [u8],
pub value: &'a [u8],
}

pub struct WideColumns<'a> {
inner: *const ffi::rocksdb_widecolumns_t,
columns_size: usize,
iter: PhantomData<&'a ()>,
}

impl<'a> WideColumns<'a> {
pub(crate) unsafe fn from_c(inner: *const ffi::rocksdb_widecolumns_t) -> Self {
Self {
inner,
columns_size: ffi::rocksdb_widecolumns_len(inner),
iter: PhantomData,
}
}

pub fn len(&self) -> usize {

Check failure on line 27 in src/wide_columns.rs

View workflow job for this annotation

GitHub Actions / clippy

struct `WideColumns` has a public `len` method, but no `is_empty` method

error: struct `WideColumns` has a public `len` method, but no `is_empty` method --> src/wide_columns.rs:27:5 | 27 | pub fn len(&self) -> usize { | ^^^^^^^^^^^^^^^^^^^^^^^^^^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#len_without_is_empty = note: `-D clippy::len-without-is-empty` implied by `-D warnings`

Check failure on line 27 in src/wide_columns.rs

View workflow job for this annotation

GitHub Actions / clippy

struct `WideColumns` has a public `len` method, but no `is_empty` method

error: struct `WideColumns` has a public `len` method, but no `is_empty` method --> src/wide_columns.rs:27:5 | 27 | pub fn len(&self) -> usize { | ^^^^^^^^^^^^^^^^^^^^^^^^^^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#len_without_is_empty = note: `-D clippy::len-without-is-empty` implied by `-D warnings`
self.columns_size
}

pub unsafe fn get_column_name_unchecked(&self, idx: usize) -> &[u8] {
let mut name_len: usize = 0;
let name_len_ptr: *mut usize = &mut name_len;
let name = null::<*const i8>() as *mut *const i8;
ffi::rocksdb_widecolumns_name(self.inner, idx, name, name_len_ptr);
&*slice_from_raw_parts(name as *const u8, name_len)
}

pub unsafe fn get_column_value_unchecked(&self, idx: usize) -> &[u8] {
let mut value_len: usize = 0;
let value_len_ptr: *mut usize = &mut value_len;
let value = null::<*const i8>() as *mut *const i8;
ffi::rocksdb_widecolumns_value(self.inner, idx, value, value_len_ptr);
&*slice_from_raw_parts(value as *const u8, value_len)
}

pub unsafe fn get_column_unchecked(&self, idx: usize) -> WideColumn {
WideColumn {
name: self.get_column_name_unchecked(idx),
value: self.get_column_value_unchecked(idx),
}
}
}

pub struct PinnableWideColumns<'a> {
inner: *const ffi::rocksdb_pinnablewidecolumns_t,
columns_size: usize,
iter: PhantomData<&'a ()>,
}

impl<'a> PinnableWideColumns<'a> {
pub(crate) unsafe fn from_c(inner: *const ffi::rocksdb_pinnablewidecolumns_t) -> Self {
Self {
inner,
columns_size: ffi::rocksdb_pinnablewidecolumns_len(inner),
iter: PhantomData,
}
}

pub fn len(&self) -> usize {

Check failure on line 70 in src/wide_columns.rs

View workflow job for this annotation

GitHub Actions / clippy

struct `PinnableWideColumns` has a public `len` method, but no `is_empty` method

error: struct `PinnableWideColumns` has a public `len` method, but no `is_empty` method --> src/wide_columns.rs:70:5 | 70 | pub fn len(&self) -> usize { | ^^^^^^^^^^^^^^^^^^^^^^^^^^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#len_without_is_empty

Check failure on line 70 in src/wide_columns.rs

View workflow job for this annotation

GitHub Actions / clippy

struct `PinnableWideColumns` has a public `len` method, but no `is_empty` method

error: struct `PinnableWideColumns` has a public `len` method, but no `is_empty` method --> src/wide_columns.rs:70:5 | 70 | pub fn len(&self) -> usize { | ^^^^^^^^^^^^^^^^^^^^^^^^^^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#len_without_is_empty
self.columns_size
}

pub unsafe fn get_column_name_unchecked(&self, idx: usize) -> &[u8] {
let mut name_len: usize = 0;
let name_len_ptr: *mut usize = &mut name_len;
let name = null::<*const i8>() as *mut *const i8;
ffi::rocksdb_pinnablewidecolumns_name(self.inner, idx, name, name_len_ptr);
&*slice_from_raw_parts(name as *const u8, name_len)
}

pub unsafe fn get_column_value_unchecked(&self, idx: usize) -> &[u8] {
let mut value_len: usize = 0;
let value_len_ptr: *mut usize = &mut value_len;
let value = null::<*const i8>() as *mut *const i8;
ffi::rocksdb_pinnablewidecolumns_value(self.inner, idx, value, value_len_ptr);
&*slice_from_raw_parts(value as *const u8, value_len)
}

pub unsafe fn get_column_unchecked(&self, idx: usize) -> WideColumn {
WideColumn {
name: self.get_column_name_unchecked(idx),
value: self.get_column_value_unchecked(idx),
}
}
}

0 comments on commit 9da584a

Please sign in to comment.