Skip to content

Commit

Permalink
fix(cubestore): Eviction - schedule compaction as blocking task (cube…
Browse files Browse the repository at this point in the history
  • Loading branch information
ovr authored Sep 14, 2023
1 parent ae80eb1 commit 009b1d6
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 14 deletions.
24 changes: 17 additions & 7 deletions rust/cubestore/cubestore/src/cachestore/cache_eviction_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::fmt;
use std::str::FromStr;
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc::error::TrySendError;
use tokio::sync::RwLockWriteGuard;

Expand Down Expand Up @@ -508,7 +509,7 @@ impl CacheEvictionManager {
} else {
log::trace!("Nothing to evict");

self.check_compaction_trigger(&store);
self.check_compaction_trigger(&store).await;

return Ok(EvictionResult::Finished(EvictionFinishedResult {
total_keys_removed: 0,
Expand All @@ -522,7 +523,7 @@ impl CacheEvictionManager {

let result = eviction_fut.await?;

self.check_compaction_trigger(&store);
self.check_compaction_trigger(&store).await;

log::debug!(
"Eviction finished, total_keys: {}, total_size: {}",
Expand Down Expand Up @@ -1101,7 +1102,7 @@ impl CacheEvictionManager {
}
}

fn check_compaction_trigger(&self, store: &Arc<RocksStore>) {
async fn check_compaction_trigger(&self, store: &Arc<RocksStore>) {
let default_cf_metadata = store.db.get_column_family_metadata();

log::trace!(
Expand All @@ -1110,16 +1111,25 @@ impl CacheEvictionManager {
);

if default_cf_metadata.size > self.compaction_trigger_size {
let start: Option<&[u8]> = None;
let end: Option<&[u8]> = None;

log::debug!(
"Triggering compaction, CF default size: {} > {}",
humansize::format_size(default_cf_metadata.size, humansize::DECIMAL),
humansize::format_size(self.compaction_trigger_size, humansize::DECIMAL)
);

store.db.compact_range(start, end)
let _ = store
.read_operation_out_of_queue_opt(
|db_ref| {
let start: Option<&[u8]> = None;
let end: Option<&[u8]> = None;

db_ref.db.compact_range(start, end);

Ok(())
},
Duration::from_secs(60),
)
.await;
}
}
}
Expand Down
24 changes: 17 additions & 7 deletions rust/cubestore/cubestore/src/metastore/rocks_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1244,7 +1244,7 @@ impl RocksStore {
let (tx, rx) = oneshot::channel::<Result<R, CubeError>>();

let res = rw_loop_sender.send(Box::new(move || {
let db_span = warn_long("metastore read operation", Duration::from_millis(100));
let db_span = warn_long("store read operation", Duration::from_millis(100));

let snapshot = db_to_send.snapshot();
let res = f(DbTableRef {
Expand Down Expand Up @@ -1279,7 +1279,11 @@ impl RocksStore {
})?
}

pub async fn read_operation_out_of_queue<F, R>(&self, f: F) -> Result<R, CubeError>
pub async fn read_operation_out_of_queue_opt<F, R>(
&self,
f: F,
timeout: Duration,
) -> Result<R, CubeError>
where
F: for<'a> FnOnce(DbTableRef<'a>) -> Result<R, CubeError> + Send + Sync + 'static,
R: Send + Sync + 'static,
Expand All @@ -1288,11 +1292,8 @@ impl RocksStore {
let db_to_send = self.db.clone();

cube_ext::spawn_blocking(move || {
let db_span = warn_long(
"metastore read operation out of queue",
Duration::from_millis(100),
);
let span = tracing::trace_span!("metastore read operation out of queue");
let db_span = warn_long("store read operation out of queue", timeout);
let span = tracing::trace_span!("store read operation out of queue");
let span_holder = span.enter();

let snapshot = db_to_send.snapshot();
Expand All @@ -1311,6 +1312,15 @@ impl RocksStore {
.await?
}

pub async fn read_operation_out_of_queue<F, R>(&self, f: F) -> Result<R, CubeError>
where
F: for<'a> FnOnce(DbTableRef<'a>) -> Result<R, CubeError> + Send + Sync + 'static,
R: Send + Sync + 'static,
{
self.read_operation_out_of_queue_opt::<F, R>(f, Duration::from_millis(100))
.await
}

pub fn cleanup_test_store(test_name: &str) {
let store_path = env::current_dir()
.unwrap()
Expand Down

0 comments on commit 009b1d6

Please sign in to comment.