diff --git a/rust/cubestore/cubestore/src/cachestore/cache_eviction_manager.rs b/rust/cubestore/cubestore/src/cachestore/cache_eviction_manager.rs index 9039423369932..b530f09f30d1c 100644 --- a/rust/cubestore/cubestore/src/cachestore/cache_eviction_manager.rs +++ b/rust/cubestore/cubestore/src/cachestore/cache_eviction_manager.rs @@ -22,6 +22,7 @@ use std::fmt; use std::str::FromStr; use std::sync::atomic::{AtomicU32, AtomicU64, Ordering}; use std::sync::Arc; +use std::time::Duration; use tokio::sync::mpsc::error::TrySendError; use tokio::sync::RwLockWriteGuard; @@ -508,7 +509,7 @@ impl CacheEvictionManager { } else { log::trace!("Nothing to evict"); - self.check_compaction_trigger(&store); + self.check_compaction_trigger(&store).await; return Ok(EvictionResult::Finished(EvictionFinishedResult { total_keys_removed: 0, @@ -522,7 +523,7 @@ impl CacheEvictionManager { let result = eviction_fut.await?; - self.check_compaction_trigger(&store); + self.check_compaction_trigger(&store).await; log::debug!( "Eviction finished, total_keys: {}, total_size: {}", @@ -1101,7 +1102,7 @@ impl CacheEvictionManager { } } - fn check_compaction_trigger(&self, store: &Arc) { + async fn check_compaction_trigger(&self, store: &Arc) { let default_cf_metadata = store.db.get_column_family_metadata(); log::trace!( @@ -1110,16 +1111,25 @@ impl CacheEvictionManager { ); if default_cf_metadata.size > self.compaction_trigger_size { - let start: Option<&[u8]> = None; - let end: Option<&[u8]> = None; - log::debug!( "Triggering compaction, CF default size: {} > {}", humansize::format_size(default_cf_metadata.size, humansize::DECIMAL), humansize::format_size(self.compaction_trigger_size, humansize::DECIMAL) ); - store.db.compact_range(start, end) + let _ = store + .read_operation_out_of_queue_opt( + |db_ref| { + let start: Option<&[u8]> = None; + let end: Option<&[u8]> = None; + + db_ref.db.compact_range(start, end); + + Ok(()) + }, + Duration::from_secs(60), + ) + .await; } } } diff --git a/rust/cubestore/cubestore/src/metastore/rocks_store.rs b/rust/cubestore/cubestore/src/metastore/rocks_store.rs index cbc5d1b9029f1..ae94f513d0d51 100644 --- a/rust/cubestore/cubestore/src/metastore/rocks_store.rs +++ b/rust/cubestore/cubestore/src/metastore/rocks_store.rs @@ -1244,7 +1244,7 @@ impl RocksStore { let (tx, rx) = oneshot::channel::>(); let res = rw_loop_sender.send(Box::new(move || { - let db_span = warn_long("metastore read operation", Duration::from_millis(100)); + let db_span = warn_long("store read operation", Duration::from_millis(100)); let snapshot = db_to_send.snapshot(); let res = f(DbTableRef { @@ -1279,7 +1279,11 @@ impl RocksStore { })? } - pub async fn read_operation_out_of_queue(&self, f: F) -> Result + pub async fn read_operation_out_of_queue_opt( + &self, + f: F, + timeout: Duration, + ) -> Result where F: for<'a> FnOnce(DbTableRef<'a>) -> Result + Send + Sync + 'static, R: Send + Sync + 'static, @@ -1288,11 +1292,8 @@ impl RocksStore { let db_to_send = self.db.clone(); cube_ext::spawn_blocking(move || { - let db_span = warn_long( - "metastore read operation out of queue", - Duration::from_millis(100), - ); - let span = tracing::trace_span!("metastore read operation out of queue"); + let db_span = warn_long("store read operation out of queue", timeout); + let span = tracing::trace_span!("store read operation out of queue"); let span_holder = span.enter(); let snapshot = db_to_send.snapshot(); @@ -1311,6 +1312,15 @@ impl RocksStore { .await? } + pub async fn read_operation_out_of_queue(&self, f: F) -> Result + where + F: for<'a> FnOnce(DbTableRef<'a>) -> Result + Send + Sync + 'static, + R: Send + Sync + 'static, + { + self.read_operation_out_of_queue_opt::(f, Duration::from_millis(100)) + .await + } + pub fn cleanup_test_store(test_name: &str) { let store_path = env::current_dir() .unwrap()