From 9da584a8c1868434dc7c2d83afe51ccc89b514e7 Mon Sep 17 00:00:00 2001 From: Congyu WANG Date: Tue, 14 May 2024 16:59:59 +0800 Subject: [PATCH] add widecolumns get put and columns api --- .gitmodules | 2 +- librocksdb-sys/rocksdb | 2 +- librocksdb-sys/rocksdb_lib_sources.txt | 11 +++ src/db.rs | 83 +++++++++++++++++++++- src/db_iterator.rs | 20 +++++- src/lib.rs | 2 + src/wide_columns.rs | 96 ++++++++++++++++++++++++++ 7 files changed, 210 insertions(+), 6 deletions(-) create mode 100644 src/wide_columns.rs diff --git a/.gitmodules b/.gitmodules index 2c818bdcc..7a4a6fb48 100644 --- a/.gitmodules +++ b/.gitmodules @@ -3,4 +3,4 @@ url = https://github.com/google/snappy.git [submodule "rocksdb_sys/rocksdb"] path = librocksdb-sys/rocksdb - url = https://github.com/facebook/rocksdb.git + url = https://github.com/Congyuwang/rocksdb.git diff --git a/librocksdb-sys/rocksdb b/librocksdb-sys/rocksdb index f44419665..e5bc3a895 160000 --- a/librocksdb-sys/rocksdb +++ b/librocksdb-sys/rocksdb @@ -1 +1 @@ -Subproject commit f4441966592636253fd5ab0bb9ed44fc2697fc53 +Subproject commit e5bc3a89534eca6ed341ccca7f970065168ac666 diff --git a/librocksdb-sys/rocksdb_lib_sources.txt b/librocksdb-sys/rocksdb_lib_sources.txt index f7159dddc..f59ba8d15 100644 --- a/librocksdb-sys/rocksdb_lib_sources.txt +++ b/librocksdb-sys/rocksdb_lib_sources.txt @@ -12,6 +12,7 @@ cache/secondary_cache_adapter.cc cache/sharded_cache.cc cache/tiered_secondary_cache.cc db/arena_wrapped_db_iter.cc +db/attribute_group_iterator_impl.cc db/blob/blob_contents.cc db/blob/blob_fetcher.cc db/blob/blob_file_addition.cc @@ -28,6 +29,7 @@ db/blob/blob_source.cc db/blob/prefetch_buffer_collection.cc db/builder.cc db/c.cc +db/coalescing_iterator.cc db/column_family.cc db/compaction/compaction.cc db/compaction/compaction_iterator.cc @@ -49,6 +51,7 @@ db/db_impl/db_impl_compaction_flush.cc db/db_impl/db_impl_debug.cc db/db_impl/db_impl_experimental.cc db/db_impl/db_impl_files.cc +db/db_impl/db_impl_follower.cc db/db_impl/db_impl_open.cc db/db_impl/db_impl_readonly.cc db/db_impl/db_impl_secondary.cc @@ -105,6 +108,7 @@ env/env_chroot.cc env/env_encryption.cc env/env_posix.cc env/file_system.cc +env/fs_on_demand.cc env/fs_posix.cc env/fs_remap.cc env/file_system_tracer.cc @@ -160,6 +164,12 @@ options/options_helper.cc options/options_parser.cc port/mmap.cc port/port_posix.cc +port/win/env_default.cc +port/win/env_win.cc +port/win/io_win.cc +port/win/port_win.cc +port/win/win_logger.cc +port/win/win_thread.cc port/stack_trace.cc table/adaptive/adaptive_table_factory.cc table/block_based/binary_search_index_reader.cc @@ -311,6 +321,7 @@ utilities/transactions/write_prepared_txn_db.cc utilities/transactions/write_unprepared_txn.cc utilities/transactions/write_unprepared_txn_db.cc utilities/ttl/db_ttl_impl.cc +utilities/types_util.cc utilities/wal_filter.cc utilities/write_batch_with_index/write_batch_with_index.cc utilities/write_batch_with_index/write_batch_with_index_internal.cc diff --git a/src/db.rs b/src/db.rs index f2cb2fcf2..9e7524059 100644 --- a/src/db.rs +++ b/src/db.rs @@ -14,12 +14,11 @@ // use crate::{ - column_family::AsColumnFamilyRef, - column_family::BoundColumnFamily, - column_family::UnboundColumnFamily, + column_family::{AsColumnFamilyRef, BoundColumnFamily, UnboundColumnFamily}, db_options::OptionsMustOutliveDB, ffi, ffi_util::{from_cstr, opt_bytes_to_ptr, raw_data, to_cpath, CStrLike}, + wide_columns::PinnableWideColumns, ColumnFamily, ColumnFamilyDescriptor, CompactOptions, DBIteratorWithThreadMode, DBPinnableSlice, DBRawIteratorWithThreadMode, DBWALIterator, Direction, Error, FlushOptions, IngestExternalFileOptions, IteratorMode, Options, ReadOptions, SnapshotWithThreadMode, @@ -1064,6 +1063,40 @@ impl DBCommon { } } + /// Return the value associated with a key using RocksDB's PinnableSlice + /// so as to avoid unnecessary memory copy. Similar to get_pinned_opt but + /// allows specifying ColumnFamily + pub fn get_entity_cf_opt>( + &self, + cf: &impl AsColumnFamilyRef, + key: K, + readopts: &ReadOptions, + ) -> Result, Error> { + if readopts.inner.is_null() { + return Err(Error::new( + "Unable to create RocksDB read options. This is a fairly trivial call, and its \ + failure may be indicative of a mis-compiled or mis-loaded RocksDB library." + .to_owned(), + )); + } + + let key = key.as_ref(); + unsafe { + let val = ffi_try!(ffi::rocksdb_get_entity_cf( + self.inner.inner(), + readopts.inner, + cf.inner(), + key.as_ptr() as *const c_char, + key.len() as size_t, + )); + if val.is_null() { + Ok(None) + } else { + Ok(Some(PinnableWideColumns::from_c(val))) + } + } + } + /// Return the value associated with a key using RocksDB's PinnableSlice /// so as to avoid unnecessary memory copy. Similar to get_pinned_cf_opt but /// leverages default options. @@ -1536,6 +1569,50 @@ impl DBCommon { } } + pub fn put_entity_cf_opt( + &self, + cf: &impl AsColumnFamilyRef, + key: K, + names: N, + values: V, + writeopts: &WriteOptions, + ) -> Result<(), Error> + where + K: AsRef<[u8]>, + N: AsRef<[S]>, + V: AsRef<[S]>, + S: AsRef<[u8]>, + { + let key = key.as_ref(); + let names = names.as_ref(); + let values = values.as_ref(); + + if names.len() != values.len() { + return Err(Error::new( + "columns names and values length mismatch".to_string(), + )); + } + + let names_sizes: Vec = names.iter().map(|x| x.as_ref().len()).collect(); + let values_sizes: Vec = values.iter().map(|x| x.as_ref().len()).collect(); + + unsafe { + ffi_try!(ffi::rocksdb_put_entity_cf( + self.inner.inner(), + writeopts.inner, + cf.inner(), + key.as_ptr() as *const c_char, + key.len() as size_t, + names.len(), + names.as_ptr() as *const *const c_char, + names_sizes.as_ptr() as *const usize, + values.as_ptr() as *const *const c_char, + values_sizes.as_ptr() as *const usize, + )); + Ok(()) + } + } + pub fn merge_opt(&self, key: K, value: V, writeopts: &WriteOptions) -> Result<(), Error> where K: AsRef<[u8]>, diff --git a/src/db_iterator.rs b/src/db_iterator.rs index 846cfcf25..704f9fc65 100644 --- a/src/db_iterator.rs +++ b/src/db_iterator.rs @@ -14,7 +14,9 @@ use crate::{ db::{DBAccess, DB}, - ffi, Error, ReadOptions, WriteBatch, + ffi, + wide_columns::WideColumns, + Error, ReadOptions, WriteBatch, }; use libc::{c_char, c_uchar, size_t}; use std::{marker::PhantomData, slice}; @@ -332,6 +334,15 @@ impl<'a, D: DBAccess> DBRawIteratorWithThreadMode<'a, D> { } } + /// Returns pair with slice of the current key and current value. + pub fn columns(&self) -> Option { + if self.valid() { + Some(self.columns_impl()) + } else { + None + } + } + /// Returns a slice of the current key; assumes the iterator is valid. fn key_impl(&self) -> &[u8] { // Safety Note: This is safe as all methods that may invalidate the buffer returned @@ -355,6 +366,13 @@ impl<'a, D: DBAccess> DBRawIteratorWithThreadMode<'a, D> { slice::from_raw_parts(val_ptr as *const c_uchar, val_len) } } + + fn columns_impl(&self) -> WideColumns { + unsafe { + let columns = ffi::rocksdb_iter_columns(self.inner.as_ptr()); + WideColumns::from_c(columns) + } + } } impl<'a, D: DBAccess> Drop for DBRawIteratorWithThreadMode<'a, D> { diff --git a/src/lib.rs b/src/lib.rs index b0173e649..ce457d587 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -96,6 +96,7 @@ mod snapshot; mod sst_file_writer; pub mod statistics; mod transactions; +mod wide_columns; mod write_batch; pub use crate::{ @@ -133,6 +134,7 @@ pub use crate::{ OptimisticTransactionDB, OptimisticTransactionOptions, Transaction, TransactionDB, TransactionDBOptions, TransactionOptions, }, + wide_columns::{PinnableWideColumns, WideColumn, WideColumns}, write_batch::{WriteBatch, WriteBatchIterator, WriteBatchWithTransaction}, }; diff --git a/src/wide_columns.rs b/src/wide_columns.rs new file mode 100644 index 000000000..e457e7fc8 --- /dev/null +++ b/src/wide_columns.rs @@ -0,0 +1,96 @@ +use crate::ffi; +use std::{ + marker::PhantomData, + ptr::{null, slice_from_raw_parts}, +}; + +pub struct WideColumn<'a> { + pub name: &'a [u8], + pub value: &'a [u8], +} + +pub struct WideColumns<'a> { + inner: *const ffi::rocksdb_widecolumns_t, + columns_size: usize, + iter: PhantomData<&'a ()>, +} + +impl<'a> WideColumns<'a> { + pub(crate) unsafe fn from_c(inner: *const ffi::rocksdb_widecolumns_t) -> Self { + Self { + inner, + columns_size: ffi::rocksdb_widecolumns_len(inner), + iter: PhantomData, + } + } + + pub fn len(&self) -> usize { + self.columns_size + } + + pub unsafe fn get_column_name_unchecked(&self, idx: usize) -> &[u8] { + let mut name_len: usize = 0; + let name_len_ptr: *mut usize = &mut name_len; + let name = null::<*const i8>() as *mut *const i8; + ffi::rocksdb_widecolumns_name(self.inner, idx, name, name_len_ptr); + &*slice_from_raw_parts(name as *const u8, name_len) + } + + pub unsafe fn get_column_value_unchecked(&self, idx: usize) -> &[u8] { + let mut value_len: usize = 0; + let value_len_ptr: *mut usize = &mut value_len; + let value = null::<*const i8>() as *mut *const i8; + ffi::rocksdb_widecolumns_value(self.inner, idx, value, value_len_ptr); + &*slice_from_raw_parts(value as *const u8, value_len) + } + + pub unsafe fn get_column_unchecked(&self, idx: usize) -> WideColumn { + WideColumn { + name: self.get_column_name_unchecked(idx), + value: self.get_column_value_unchecked(idx), + } + } +} + +pub struct PinnableWideColumns<'a> { + inner: *const ffi::rocksdb_pinnablewidecolumns_t, + columns_size: usize, + iter: PhantomData<&'a ()>, +} + +impl<'a> PinnableWideColumns<'a> { + pub(crate) unsafe fn from_c(inner: *const ffi::rocksdb_pinnablewidecolumns_t) -> Self { + Self { + inner, + columns_size: ffi::rocksdb_pinnablewidecolumns_len(inner), + iter: PhantomData, + } + } + + pub fn len(&self) -> usize { + self.columns_size + } + + pub unsafe fn get_column_name_unchecked(&self, idx: usize) -> &[u8] { + let mut name_len: usize = 0; + let name_len_ptr: *mut usize = &mut name_len; + let name = null::<*const i8>() as *mut *const i8; + ffi::rocksdb_pinnablewidecolumns_name(self.inner, idx, name, name_len_ptr); + &*slice_from_raw_parts(name as *const u8, name_len) + } + + pub unsafe fn get_column_value_unchecked(&self, idx: usize) -> &[u8] { + let mut value_len: usize = 0; + let value_len_ptr: *mut usize = &mut value_len; + let value = null::<*const i8>() as *mut *const i8; + ffi::rocksdb_pinnablewidecolumns_value(self.inner, idx, value, value_len_ptr); + &*slice_from_raw_parts(value as *const u8, value_len) + } + + pub unsafe fn get_column_unchecked(&self, idx: usize) -> WideColumn { + WideColumn { + name: self.get_column_name_unchecked(idx), + value: self.get_column_value_unchecked(idx), + } + } +}