Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add CacheReader for SStable #193

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
chore: optimization with review comments
KKould committed Nov 11, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
commit 89f5d59a8d67093c0e00b53e396a64b718f42a1c
11 changes: 6 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -11,13 +11,14 @@ resolver = "2"
version = "0.2.0"

[package.metadata]
msrv = "1.81.0"
msrv = "1.82.0"

[features]
bench = ["redb", "rocksdb", "sled"]
bench = ["redb", "rocksdb", "sled", "foyer"]
bytes = ["dep:bytes"]
datafusion = ["dep:async-trait", "dep:datafusion"]
default = ["bytes", "tokio"]
foyer = ["tonbo_ext_reader/foyer"]
load_tbl = []
redb = ["dep:redb"]
rocksdb = ["dep:rocksdb"]
@@ -58,19 +59,19 @@ crc32fast = "1"
crossbeam-skiplist = "0.1"
datafusion = { version = "42", optional = true }
flume = { version = "0.11", features = ["async"] }
fusio = { git = "https://github.com/tonbo-io/fusio.git", rev = "db305599ae88c7a934ab1ab235d8b3efda95a242", package = "fusio", version = "0.3.1", features = [
fusio = { package = "fusio", version = "0.3.3", features = [
"aws",
"dyn",
"fs",
"object_store",
"tokio",
"tokio-http",
] }
fusio-dispatch = { git = "https://github.com/tonbo-io/fusio.git", rev = "db305599ae88c7a934ab1ab235d8b3efda95a242", package = "fusio-dispatch", version = "0.2.1", features = [
fusio-dispatch = { package = "fusio-dispatch", version = "0.2.2", features = [
"aws",
"tokio",
] }
fusio-parquet = { git = "https://github.com/tonbo-io/fusio.git", rev = "db305599ae88c7a934ab1ab235d8b3efda95a242", package = "fusio-parquet", version = "0.2.1" }
fusio-parquet = { package = "fusio-parquet", version = "0.2.2" }
futures-core = "0.3"
futures-io = "0.3"
futures-util = "0.3"
4 changes: 2 additions & 2 deletions bindings/python/Cargo.toml
Original file line number Diff line number Diff line change
@@ -9,8 +9,8 @@ crate-type = ["cdylib"]
[workspace]

[dependencies]
fusio = { git = "https://github.com/tonbo-io/fusio.git", rev = "db305599ae88c7a934ab1ab235d8b3efda95a242", package = "fusio", version = "0.3.1", features = ["aws", "tokio"] }
fusio-dispatch = { git = "https://github.com/tonbo-io/fusio.git", rev = "db305599ae88c7a934ab1ab235d8b3efda95a242", package = "fusio-dispatch", version = "0.2.1", features = [
fusio = { package = "fusio", version = "0.3.3", features = ["aws", "tokio"] }
fusio-dispatch = { package = "fusio-dispatch", version = "0.2.2", features = [
"aws",
"tokio",
] }
4 changes: 2 additions & 2 deletions bindings/python/src/db.rs
Original file line number Diff line number Diff line change
@@ -11,7 +11,7 @@ use tonbo::{
record::{ColumnDesc, DynRecord},
DB,
};
use tonbo_ext_reader::foyer_reader::FoyerReader;
use tonbo_ext_reader::lru_reader::LruReader;
use crate::{
column::Column,
error::{CommitError, DbError},
@@ -27,7 +27,7 @@ type PyExecutor = TokioExecutor;
pub struct TonboDB {
desc: Arc<Vec<Column>>,
primary_key_index: usize,
db: Arc<DB<DynRecord, PyExecutor, FoyerReader>>,
db: Arc<DB<DynRecord, PyExecutor, LruReader>>,
}

#[pymethods]
18 changes: 9 additions & 9 deletions bindings/python/src/transaction.rs
Original file line number Diff line number Diff line change
@@ -7,7 +7,7 @@ use pyo3::{
};
use pyo3_asyncio::tokio::future_into_py;
use tonbo::{record::DynRecord, transaction, Projection};
use tonbo_ext_reader::foyer_reader::FoyerReader;
use tonbo_ext_reader::lru_reader::LruReader;
use crate::{
column::Column,
error::{repeated_commit_err, CommitError, DbError},
@@ -18,14 +18,14 @@ use crate::{

#[pyclass]
pub struct Transaction {
txn: Option<transaction::Transaction<'static, DynRecord, FoyerReader>>,
txn: Option<transaction::Transaction<'static, DynRecord, LruReader>>,
desc: Arc<Vec<Column>>,
primary_key_index: usize,
}

impl Transaction {
pub(crate) fn new<'txn>(
txn: transaction::Transaction<'txn, DynRecord, FoyerReader>,
txn: transaction::Transaction<'txn, DynRecord, LruReader>,
desc: Arc<Vec<Column>>,
) -> Self {
let primary_key_index = desc
@@ -37,8 +37,8 @@ impl Transaction {
Transaction {
txn: Some(unsafe {
transmute::<
transaction::Transaction<'txn, DynRecord, FoyerReader>,
transaction::Transaction<'static, DynRecord, FoyerReader>,
transaction::Transaction<'txn, DynRecord, LruReader>,
transaction::Transaction<'static, DynRecord, LruReader>,
>(txn)
}),
desc,
@@ -84,8 +84,8 @@ impl Transaction {
let txn = self.txn.as_ref().unwrap();
let txn = unsafe {
transmute::<
&transaction::Transaction<'_, DynRecord, FoyerReader>,
&'static transaction::Transaction<'_, DynRecord, FoyerReader>,
&transaction::Transaction<'_, DynRecord, LruReader>,
&'static transaction::Transaction<'_, DynRecord, LruReader>,
>(txn)
};

@@ -169,8 +169,8 @@ impl Transaction {
let txn = self.txn.as_ref().unwrap();
let txn = unsafe {
transmute::<
&transaction::Transaction<'_, DynRecord, FoyerReader>,
&'static transaction::Transaction<'_, DynRecord, FoyerReader>,
&transaction::Transaction<'_, DynRecord, LruReader>,
&'static transaction::Transaction<'_, DynRecord, LruReader>,
>(txn)
};
let col_desc = self.desc.get(self.primary_key_index).unwrap();
2 changes: 1 addition & 1 deletion rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[toolchain]
channel = "1.81.0"
channel = "1.82.0"
components = ["clippy", "rust-analyzer", "rustfmt"]
22 changes: 11 additions & 11 deletions src/compaction/mod.rs
Original file line number Diff line number Diff line change
@@ -525,7 +525,7 @@ pub(crate) mod tests {
use fusio_parquet::writer::AsyncWriter;
use parquet::arrow::AsyncArrowWriter;
use tempfile::TempDir;
use tonbo_ext_reader::{foyer_reader::FoyerReader, CacheReader};
use tonbo_ext_reader::{lru_reader::LruReader, CacheReader};

use crate::{
compaction::Compactor,
@@ -684,7 +684,7 @@ pub(crate) mod tests {
.await
.unwrap();

let scope = Compactor::<Test, FoyerReader>::minor_compaction(
let scope = Compactor::<Test, LruReader>::minor_compaction(
&option,
None,
&vec![
@@ -748,7 +748,7 @@ pub(crate) mod tests {
.await
.unwrap();

let scope = Compactor::<DynRecord, FoyerReader>::minor_compaction(
let scope = Compactor::<DynRecord, LruReader>::minor_compaction(
&option,
None,
&vec![
@@ -813,7 +813,7 @@ pub(crate) mod tests {
let max = 5.to_string();
let mut version_edits = Vec::new();

Compactor::<Test, FoyerReader>::major_compaction(
Compactor::<Test, LruReader>::major_compaction(
&version,
&option,
&min,
@@ -859,7 +859,7 @@ pub(crate) mod tests {
manager: &StoreManager,
) -> (
(FileId, FileId, FileId, FileId, FileId),
Version<Test, FoyerReader>,
Version<Test, LruReader>,
) {
let level_0_fs = option
.level_fs_path(0)
@@ -1070,7 +1070,7 @@ pub(crate) mod tests {
.unwrap();

let (sender, _) = bounded(1);
let (meta_cache, range_cache) = FoyerReader::build_caches(
let (meta_cache, range_cache) = LruReader::build_caches(
path_to_local(&option.cache_path).unwrap(),
option.cache_meta_capacity,
option.cache_meta_shards,
@@ -1082,7 +1082,7 @@ pub(crate) mod tests {
)
.await
.unwrap();
let mut version = Version::<Test, FoyerReader>::new(
let mut version = Version::<Test, LruReader>::new(
option.clone(),
sender,
Arc::new(AtomicU32::default()),
@@ -1205,7 +1205,7 @@ pub(crate) mod tests {

let option = Arc::new(option);
let (sender, _) = bounded(1);
let (meta_cache, range_cache) = FoyerReader::build_caches(
let (meta_cache, range_cache) = LruReader::build_caches(
path_to_local(&option.cache_path).unwrap(),
option.cache_meta_capacity,
option.cache_meta_shards,
@@ -1217,7 +1217,7 @@ pub(crate) mod tests {
)
.await
.unwrap();
let mut version = Version::<Test, FoyerReader>::new(
let mut version = Version::<Test, LruReader>::new(
option.clone(),
sender,
Arc::new(AtomicU32::default()),
@@ -1241,7 +1241,7 @@ pub(crate) mod tests {
let min = 6.to_string();
let max = 9.to_string();

Compactor::<Test, FoyerReader>::major_compaction(
Compactor::<Test, LruReader>::major_compaction(
&version,
&option,
&min,
@@ -1270,7 +1270,7 @@ pub(crate) mod tests {
option.major_default_oldest_table_num = 1;
option.trigger_type = TriggerType::Length(5);

let db: DB<Test, TokioExecutor, FoyerReader> =
let db: DB<Test, TokioExecutor, LruReader> =
DB::new(option, TokioExecutor::new()).await.unwrap();

for i in 5..9 {
6 changes: 3 additions & 3 deletions src/inmem/mutable.rs
Original file line number Diff line number Diff line change
@@ -5,7 +5,7 @@ use crossbeam_skiplist::{
map::{Entry, Range},
SkipMap,
};
use fusio::{buffered::BufWriter, dynamic::DynFile, DynFs};
use fusio::{buffered::BufWriter, DynFs, DynWrite};
use ulid::Ulid;

use crate::{
@@ -37,7 +37,7 @@ where
R: Record,
{
pub(crate) data: SkipMap<Timestamped<R::Key>, Option<R>>,
wal: Option<Mutex<WalFile<Box<dyn DynFile>, R>>>,
wal: Option<Mutex<WalFile<Box<dyn DynWrite>, R>>>,
pub(crate) trigger: Arc<Box<dyn Trigger<R> + Send + Sync>>,
}

@@ -61,7 +61,7 @@ where
)
.await?,
option.wal_buffer_size,
)) as Box<dyn DynFile>;
)) as Box<dyn DynWrite>;

wal = Some(Mutex::new(WalFile::new(file, file_id)));
};
26 changes: 13 additions & 13 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -38,7 +38,7 @@
//! use tokio::fs;
//! use tokio_util::bytes::Bytes;
//! use tonbo::{executor::tokio::TokioExecutor, DbOption, Projection, Record, DB};
//! use tonbo_ext_reader::foyer_reader::FoyerReader;
//! use tonbo_ext_reader::lru_reader::LruReader;
//!
//! // use macro to define schema of column family just like ORM
//! // it provides type safety read & write API
@@ -57,7 +57,7 @@
//!
//! let options = DbOption::from(Path::from_filesystem_path("./db_path/users").unwrap());
//! // pluggable async runtime and I/O
//! let db: DB<User, TokioExecutor, FoyerReader> =
//! let db: DB<User, TokioExecutor, LruReader> =
//! DB::new(options, TokioExecutor::default()).await.unwrap();
//! // insert with owned value
//! db.insert(User {
@@ -858,7 +858,7 @@ pub(crate) mod tests {
use once_cell::sync::Lazy;
use parquet::{arrow::ProjectionMask, format::SortingColumn, schema::types::ColumnPath};
use tempfile::TempDir;
use tonbo_ext_reader::{foyer_reader::FoyerReader, CacheReader};
use tonbo_ext_reader::{lru_reader::LruReader, CacheReader};
use tracing::error;

use crate::{
@@ -1097,7 +1097,7 @@ pub(crate) mod tests {
option: DbOption<Test>,
executor: E,
) -> RecordBatch {
let db: DB<Test, E, FoyerReader> = DB::new(option.clone(), executor).await.unwrap();
let db: DB<Test, E, LruReader> = DB::new(option.clone(), executor).await.unwrap();
let base_fs = db.manager.base_fs();

db.write(
@@ -1534,7 +1534,7 @@ pub(crate) mod tests {
option.major_default_oldest_table_num = 1;
option.trigger_type = TriggerType::Length(/* max_mutable_len */ 5);

let db: DB<Test, TokioExecutor, FoyerReader> =
let db: DB<Test, TokioExecutor, LruReader> =
DB::new(option, TokioExecutor::new()).await.unwrap();

for (i, item) in test_items().into_iter().enumerate() {
@@ -1571,7 +1571,7 @@ pub(crate) mod tests {
option.major_default_oldest_table_num = 1;
option.trigger_type = TriggerType::Length(/* max_mutable_len */ 50);

let db: DB<Test, TokioExecutor, FoyerReader> =
let db: DB<Test, TokioExecutor, LruReader> =
DB::new(option, TokioExecutor::new()).await.unwrap();

for item in &test_items()[0..10] {
@@ -1621,7 +1621,7 @@ pub(crate) mod tests {
schema.flush_wal().await.unwrap();
drop(schema);

let db: DB<Test, TokioExecutor, FoyerReader> =
let db: DB<Test, TokioExecutor, LruReader> =
DB::new(option.as_ref().to_owned(), TokioExecutor::new())
.await
.unwrap();
@@ -1694,7 +1694,7 @@ pub(crate) mod tests {
"id".to_owned(),
primary_key_index,
);
let db: DB<DynRecord, TokioExecutor, FoyerReader> =
let db: DB<DynRecord, TokioExecutor, LruReader> =
DB::with_schema(option, TokioExecutor::new(), desc, primary_key_index)
.await
.unwrap();
@@ -1734,7 +1734,7 @@ pub(crate) mod tests {
option.major_threshold_with_sst_size = 3;
option.major_default_oldest_table_num = 1;
option.trigger_type = TriggerType::Length(5);
let db: DB<Test, TokioExecutor, FoyerReader> =
let db: DB<Test, TokioExecutor, LruReader> =
DB::new(option, TokioExecutor::new()).await.unwrap();

for (idx, item) in test_items().into_iter().enumerate() {
@@ -1777,7 +1777,7 @@ pub(crate) mod tests {
option.major_default_oldest_table_num = 1;
option.trigger_type = TriggerType::Length(5);

let db: DB<DynRecord, TokioExecutor, FoyerReader> =
let db: DB<DynRecord, TokioExecutor, LruReader> =
DB::with_schema(option, TokioExecutor::new(), cols_desc, primary_key_index)
.await
.unwrap();
@@ -2007,23 +2007,23 @@ pub(crate) mod tests {
option3.major_default_oldest_table_num = 1;
option3.trigger_type = TriggerType::Length(5);

let db1: DB<DynRecord, TokioExecutor, FoyerReader> = DB::with_schema(
let db1: DB<DynRecord, TokioExecutor, LruReader> = DB::with_schema(
option,
TokioExecutor::new(),
cols_desc.clone(),
primary_key_index,
)
.await
.unwrap();
let db2: DB<DynRecord, TokioExecutor, FoyerReader> = DB::with_schema(
let db2: DB<DynRecord, TokioExecutor, LruReader> = DB::with_schema(
option2,
TokioExecutor::new(),
cols_desc.clone(),
primary_key_index,
)
.await
.unwrap();
let db3: DB<DynRecord, TokioExecutor, FoyerReader> =
let db3: DB<DynRecord, TokioExecutor, LruReader> =
DB::with_schema(option3, TokioExecutor::new(), cols_desc, primary_key_index)
.await
.unwrap();
Loading