diff --git a/src/common/cache/src/cache.rs b/src/common/cache/src/cache.rs index 4328287536274..db06084088269 100644 --- a/src/common/cache/src/cache.rs +++ b/src/common/cache/src/cache.rs @@ -28,20 +28,6 @@ pub trait Cache { K: Borrow, Q: Hash + Eq + ?Sized; - /// Returns a reference to the value corresponding to the key in the cache or `None` if it is - /// not present in the cache. Unlike `get`, `peek` does not update the Cache state so the key's - /// position will be unchanged. - fn peek(&self, k: &Q) -> Option<&V> - where - K: Borrow, - Q: Hash + Eq + ?Sized; - - /// Returns the value corresponding to the item by policy or `None` if the - /// cache is empty. Like `peek`, `peek_by_policy` does not update the Cache state so the item's - /// position will be unchanged. - // TODO: change to fn peek_by_policy<'a>(&self) -> Option<(&'a K, &'a V)>; - fn peek_by_policy(&self) -> Option<(&K, &V)>; - /// Inserts a key-value pair into the cache. If the key already existed, the old value is /// returned. fn insert(&mut self, k: K, v: V) -> Option; diff --git a/src/common/cache/src/cache/lru.rs b/src/common/cache/src/cache/lru.rs index 29eefaa078819..f5f4fcacf62f7 100644 --- a/src/common/cache/src/cache/lru.rs +++ b/src/common/cache/src/cache/lru.rs @@ -128,49 +128,6 @@ impl Cache for LruCache { } } - /// Returns a reference to the value corresponding to the key in the cache or `None` if it is - /// not present in the cache. Unlike `get`, `peek` does not update the LRU list so the key's - /// position will be unchanged. - /// - /// # Example - /// - /// ```rust,ignore - /// use databend_common_cache::{Cache, LruCache}; - /// let mut cache = LruCache::new(2); - /// - /// cache.put(1, "a"); - /// cache.put(2, "b"); - /// - /// assert_eq!(cache.peek(&1), Some(&"a")); - /// assert_eq!(cache.peek(&2), Some(&"b")); - /// ``` - fn peek(&self, k: &Q) -> Option<&V> - where - K: Borrow, - Q: Hash + Eq + ?Sized, - { - self.map.get(k) - } - - /// Returns the value corresponding to the least recently used item or `None` if the - /// cache is empty. Like `peek`, `peek_by_policy` does not update the LRU list so the item's - /// position will be unchanged. - /// - /// # Example - /// - /// ```rust,ignore - /// use databend_common_cache::{Cache, LruCache}; - /// let mut cache = LruCache::new(2); - /// - /// cache.put(1, "a"); - /// cache.put(2, "b"); - /// - /// assert_eq!(cache.peek_by_policy(), Some((&1, &"a"))); - /// ``` - fn peek_by_policy(&self) -> Option<(&K, &V)> { - self.map.front() - } - /// Inserts a key-value pair into the cache. If the key already existed, the old value is /// returned. /// diff --git a/src/query/storages/common/cache/src/cache.rs b/src/query/storages/common/cache/src/cache.rs index 863125d248443..81d47f4c33de9 100644 --- a/src/query/storages/common/cache/src/cache.rs +++ b/src/query/storages/common/cache/src/cache.rs @@ -32,15 +32,16 @@ impl Display for Unit { } // The cache accessor, crate users usually working on this interface while manipulating caches -pub trait CacheAccessor { - type V; +pub trait CacheAccessor: Send + Sync { + type V: Send + Sync; - fn get>(&self, k: Q) -> Option>; - fn get_sized>(&self, k: Q, len: u64) -> Option>; + fn get(&self, k: &String) -> Option>; + fn get_sized(&self, k: &String, len: u64) -> Option>; fn insert(&self, key: String, value: Self::V) -> Arc; - fn evict(&self, k: &str) -> bool; - fn contains_key(&self, k: &str) -> bool; + fn evict(&self, k: &String) -> bool; + + fn contains_key(&self, k: &String) -> bool; fn bytes_size(&self) -> u64; fn items_capacity(&self) -> u64; fn bytes_capacity(&self) -> u64; diff --git a/src/query/storages/common/cache/src/caches.rs b/src/query/storages/common/cache/src/caches.rs index f344a436bb3b7..948fc12b1e323 100644 --- a/src/query/storages/common/cache/src/caches.rs +++ b/src/query/storages/common/cache/src/caches.rs @@ -70,82 +70,82 @@ pub type SizedColumnArray = ( // - and implement `CacheAccessor` properly pub trait CachedObject { type Cache: CacheAccessor; - fn cache() -> Option; + fn cache() -> Arc>; } impl CachedObject for CompactSegmentInfo { - type Cache = CompactSegmentInfoCache; - fn cache() -> Option { + type Cache = Option; + fn cache() -> Arc> { CacheManager::instance().get_table_segment_cache() } } impl CachedObject for SegmentInfo { - type Cache = CompactSegmentInfoCache; - fn cache() -> Option { + type Cache = Option; + fn cache() -> Arc> { CacheManager::instance().get_table_segment_cache() } } impl CachedObject for TableSnapshot { - type Cache = TableSnapshotCache; - fn cache() -> Option { + type Cache = Option; + fn cache() -> Arc> { CacheManager::instance().get_table_snapshot_cache() } } impl CachedObject>> for Vec> { - type Cache = BlockMetaCache; - fn cache() -> Option { + type Cache = Option; + fn cache() -> Arc>>> { CacheManager::instance().get_block_meta_cache() } } impl CachedObject for TableSnapshotStatistics { - type Cache = TableSnapshotStatisticCache; - fn cache() -> Option { + type Cache = Option; + fn cache() -> Arc> { CacheManager::instance().get_table_snapshot_statistics_cache() } } impl CachedObject for BloomIndexMeta { - type Cache = BloomIndexMetaCache; - fn cache() -> Option { + type Cache = Option; + fn cache() -> Arc> { CacheManager::instance().get_bloom_index_meta_cache() } } impl CachedObject<(PartStatistics, Partitions)> for (PartStatistics, Partitions) { - type Cache = PrunePartitionsCache; - fn cache() -> Option { + type Cache = Option; + fn cache() -> Arc> { CacheManager::instance().get_prune_partitions_cache() } } impl CachedObject for Xor8Filter { - type Cache = BloomIndexFilterCache; - fn cache() -> Option { + type Cache = Option; + fn cache() -> Arc> { CacheManager::instance().get_bloom_index_filter_cache() } } impl CachedObject for FileMetaData { - type Cache = FileMetaDataCache; - fn cache() -> Option { + type Cache = Option; + fn cache() -> Arc> { CacheManager::instance().get_file_meta_data_cache() } } impl CachedObject for InvertedIndexFile { - type Cache = InvertedIndexFileCache; - fn cache() -> Option { + type Cache = Option; + fn cache() -> Arc> { CacheManager::instance().get_inverted_index_file_cache() } } impl CachedObject for InvertedIndexMeta { - type Cache = InvertedIndexMetaCache; - fn cache() -> Option { + type Cache = Option; + fn cache() -> Arc> { CacheManager::instance().get_inverted_index_meta_cache() } } diff --git a/src/query/storages/common/cache/src/manager.rs b/src/query/storages/common/cache/src/manager.rs index 6553f64f30fdb..4408882973dd1 100644 --- a/src/query/storages/common/cache/src/manager.rs +++ b/src/query/storages/common/cache/src/manager.rs @@ -15,11 +15,22 @@ use std::path::PathBuf; use std::sync::Arc; +use databend_common_arrow::parquet::metadata::FileMetaData; use databend_common_base::base::GlobalInstance; +use databend_common_catalog::plan::PartStatistics; +use databend_common_catalog::plan::Partitions; use databend_common_config::CacheConfig; use databend_common_config::CacheStorageTypeInnerConfig; use databend_common_config::DiskCacheKeyReloadPolicy; use databend_common_exception::Result; +use databend_storages_common_index::filters::Xor8Filter; +use databend_storages_common_index::BloomIndexMeta; +use databend_storages_common_index::InvertedIndexFile; +use databend_storages_common_index::InvertedIndexMeta; +use databend_storages_common_table_meta::meta::BlockMeta; +use databend_storages_common_table_meta::meta::CompactSegmentInfo; +use databend_storages_common_table_meta::meta::TableSnapshot; +use databend_storages_common_table_meta::meta::TableSnapshotStatistics; use log::info; use crate::caches::BlockMetaCache; @@ -34,7 +45,9 @@ use crate::caches::InvertedIndexMetaCache; use crate::caches::PrunePartitionsCache; use crate::caches::TableSnapshotCache; use crate::caches::TableSnapshotStatisticCache; +use crate::CacheAccessor; use crate::InMemoryLruCache; +use crate::SizedColumnArray; use crate::TableDataCache; use crate::TableDataCacheBuilder; @@ -204,52 +217,56 @@ impl CacheManager { GlobalInstance::get() } - pub fn get_table_snapshot_cache(&self) -> Option { - self.table_snapshot_cache.clone() + pub fn get_table_snapshot_cache(&self) -> Arc> { + Arc::new(self.table_snapshot_cache.clone()) } - pub fn get_block_meta_cache(&self) -> Option { - self.block_meta_cache.clone() + pub fn get_block_meta_cache(&self) -> Arc>>> { + Arc::new(self.block_meta_cache.clone()) } - pub fn get_table_snapshot_statistics_cache(&self) -> Option { - self.table_statistic_cache.clone() + pub fn get_table_snapshot_statistics_cache( + &self, + ) -> Arc> { + Arc::new(self.table_statistic_cache.clone()) } - pub fn get_table_segment_cache(&self) -> Option { - self.compact_segment_info_cache.clone() + pub fn get_table_segment_cache(&self) -> Arc> { + Arc::new(self.compact_segment_info_cache.clone()) } - pub fn get_bloom_index_filter_cache(&self) -> Option { - self.bloom_index_filter_cache.clone() + pub fn get_bloom_index_filter_cache(&self) -> Arc> { + Arc::new(self.bloom_index_filter_cache.clone()) } - pub fn get_bloom_index_meta_cache(&self) -> Option { - self.bloom_index_meta_cache.clone() + pub fn get_bloom_index_meta_cache(&self) -> Arc> { + Arc::new(self.bloom_index_meta_cache.clone()) } - pub fn get_inverted_index_meta_cache(&self) -> Option { - self.inverted_index_meta_cache.clone() + pub fn get_inverted_index_meta_cache(&self) -> Arc> { + Arc::new(self.inverted_index_meta_cache.clone()) } - pub fn get_inverted_index_file_cache(&self) -> Option { - self.inverted_index_file_cache.clone() + pub fn get_inverted_index_file_cache(&self) -> Arc> { + Arc::new(self.inverted_index_file_cache.clone()) } - pub fn get_prune_partitions_cache(&self) -> Option { - self.prune_partitions_cache.clone() + pub fn get_prune_partitions_cache( + &self, + ) -> Arc> { + Arc::new(self.prune_partitions_cache.clone()) } - pub fn get_file_meta_data_cache(&self) -> Option { - self.parquet_file_meta_data_cache.clone() + pub fn get_file_meta_data_cache(&self) -> Arc> { + Arc::new(self.parquet_file_meta_data_cache.clone()) } pub fn get_table_data_cache(&self) -> Option { self.table_data_cache.clone() } - pub fn get_table_data_array_cache(&self) -> Option { - self.in_memory_table_data_cache.clone() + pub fn get_table_data_array_cache(&self) -> Arc> { + Arc::new(self.in_memory_table_data_cache.clone()) } pub fn new_named_items_cache>>( diff --git a/src/query/storages/common/cache/src/providers/disk_cache/disk_cache.rs b/src/query/storages/common/cache/src/providers/disk_cache/disk_cache.rs index d9a7618c9b81c..bf4cace11377a 100644 --- a/src/query/storages/common/cache/src/providers/disk_cache/disk_cache.rs +++ b/src/query/storages/common/cache/src/providers/disk_cache/disk_cache.rs @@ -321,7 +321,7 @@ impl DiskCache { self.cache.contains(&cache_key.0) } - pub fn get_cache_path(&mut self, key: &str) -> Option { + pub fn get_cache_path(&mut self, key: &String) -> Option { let cache_key = self.cache_key(key); self.cache .get(&cache_key.0) @@ -330,7 +330,7 @@ impl DiskCache { } /// Remove the given key from the cache. - pub fn remove(&mut self, key: &str) -> Result<()> { + pub fn remove(&mut self, key: &String) -> Result<()> { let cache_key = self.cache_key(key); match self.cache.pop(&cache_key.0) { Some(_) => { diff --git a/src/query/storages/common/cache/src/providers/disk_cache/disk_cache_lru.rs b/src/query/storages/common/cache/src/providers/disk_cache/disk_cache_lru.rs index 8654cafc3f5ce..8ec13a11d3f26 100644 --- a/src/query/storages/common/cache/src/providers/disk_cache/disk_cache_lru.rs +++ b/src/query/storages/common/cache/src/providers/disk_cache/disk_cache_lru.rs @@ -35,8 +35,7 @@ impl CacheAccessor for LruDiskCacheHolder { "LruDiskCacheHolder" } - fn get>(&self, k: Q) -> Option> { - let k = k.as_ref(); + fn get(&self, k: &String) -> Option> { { let mut cache = self.write(); cache.get_cache_path(k) @@ -82,7 +81,7 @@ impl CacheAccessor for LruDiskCacheHolder { }) } - fn get_sized>(&self, k: Q, len: u64) -> Option> { + fn get_sized(&self, k: &String, len: u64) -> Option> { let Some(cached_value) = self.get(k) else { metrics_inc_cache_miss_bytes(len, self.name()); return None; @@ -101,7 +100,7 @@ impl CacheAccessor for LruDiskCacheHolder { Arc::new(value) } - fn evict(&self, k: &str) -> bool { + fn evict(&self, k: &String) -> bool { if let Err(e) = { let mut cache = self.write(); cache.remove(k) @@ -113,7 +112,7 @@ impl CacheAccessor for LruDiskCacheHolder { } } - fn contains_key(&self, k: &str) -> bool { + fn contains_key(&self, k: &String) -> bool { let cache = self.read(); cache.contains_key(k) } diff --git a/src/query/storages/common/cache/src/providers/memory_cache.rs b/src/query/storages/common/cache/src/providers/memory_cache.rs index c0856a34c690d..eb320e50ac72f 100644 --- a/src/query/storages/common/cache/src/providers/memory_cache.rs +++ b/src/query/storages/common/cache/src/providers/memory_cache.rs @@ -76,13 +76,13 @@ mod impls { use crate::cache::CacheAccessor; // Wrap a Cache with RwLock, and impl CacheAccessor for it - impl>> CacheAccessor for InMemoryLruCache { + impl>> CacheAccessor for InMemoryLruCache { type V = V; - fn get>(&self, k: Q) -> Option> { + fn get(&self, k: &String) -> Option> { metrics_inc_cache_access_count(1, self.name()); let mut guard = self.inner.write(); - match guard.get(k.as_ref()) { + match guard.get(k) { None => { metrics_inc_cache_miss_count(1, &self.name); None @@ -94,7 +94,7 @@ mod impls { } } - fn get_sized>(&self, k: Q, len: u64) -> Option> { + fn get_sized(&self, k: &String, len: u64) -> Option> { let Some(cached_value) = self.get(k) else { metrics_inc_cache_miss_bytes(len, &self.name); return None; @@ -111,16 +111,21 @@ mod impls { res } - fn evict(&self, k: &str) -> bool { + fn evict(&self, k: &String) -> bool { let mut guard = self.inner.write(); guard.pop(k).is_some() } - fn contains_key(&self, k: &str) -> bool { + fn contains_key(&self, k: &String) -> bool { let guard = self.inner.read(); guard.contains(k) } + fn bytes_size(&self) -> u64 { + let guard = self.inner.read(); + guard.bytes_size() + } + fn items_capacity(&self) -> u64 { let guard = self.inner.read(); guard.items_capacity() @@ -136,11 +141,6 @@ mod impls { guard.len() } - fn bytes_size(&self) -> u64 { - let guard = self.inner.read(); - guard.bytes_size() - } - fn name(&self) -> &str { &self.name } @@ -151,14 +151,7 @@ mod impls { impl CacheAccessor for Option { type V = T::V; - fn name(&self) -> &str { - match self.as_ref() { - None => "Unknown", - Some(v) => v.name(), - } - } - - fn get>(&self, k: Q) -> Option> { + fn get(&self, k: &String) -> Option> { let Some(inner_cache) = self.as_ref() else { metrics_inc_cache_access_count(1, self.name()); metrics_inc_cache_miss_count(1, self.name()); @@ -168,7 +161,7 @@ mod impls { inner_cache.get(k) } - fn get_sized>(&self, k: Q, len: u64) -> Option> { + fn get_sized(&self, k: &String, len: u64) -> Option> { let Some(inner_cache) = self.as_ref() else { metrics_inc_cache_access_count(1, self.name()); metrics_inc_cache_miss_count(1, self.name()); @@ -186,7 +179,7 @@ mod impls { } } - fn evict(&self, k: &str) -> bool { + fn evict(&self, k: &String) -> bool { if let Some(cache) = self { cache.evict(k) } else { @@ -194,7 +187,7 @@ mod impls { } } - fn contains_key(&self, k: &str) -> bool { + fn contains_key(&self, k: &String) -> bool { if let Some(cache) = self { cache.contains_key(k) } else { @@ -217,6 +210,13 @@ mod impls { } } + fn bytes_capacity(&self) -> u64 { + match self.as_ref() { + None => 0, + Some(cache) => cache.bytes_capacity(), + } + } + fn len(&self) -> usize { match self.as_ref() { None => 0, @@ -224,10 +224,10 @@ mod impls { } } - fn bytes_capacity(&self) -> u64 { + fn name(&self) -> &str { match self.as_ref() { - None => 0, - Some(cache) => cache.bytes_capacity(), + None => "Unknown", + Some(v) => v.name(), } } } diff --git a/src/query/storages/common/cache/src/providers/table_data_cache.rs b/src/query/storages/common/cache/src/providers/table_data_cache.rs index 72fac2d6d8908..e57cbfecb02b2 100644 --- a/src/query/storages/common/cache/src/providers/table_data_cache.rs +++ b/src/query/storages/common/cache/src/providers/table_data_cache.rs @@ -38,7 +38,7 @@ struct CacheItem { #[derive(Clone)] pub struct TableDataCacheKey { - cache_key: String, + pub cache_key: String, } impl TableDataCacheKey { @@ -103,9 +103,8 @@ impl CacheAccessor for TableDataCache { DISK_TABLE_DATA_CACHE_NAME } - fn get>(&self, k: Q) -> Option> { + fn get(&self, k: &String) -> Option> { metrics_inc_cache_access_count(1, DISK_TABLE_DATA_CACHE_NAME); - let k = k.as_ref(); if let Some(item) = self.external_cache.get(k) { Profile::record_usize_profile(ProfileStatisticsName::ScanCacheBytes, item.len()); metrics_inc_cache_hit_count(1, DISK_TABLE_DATA_CACHE_NAME); @@ -116,7 +115,7 @@ impl CacheAccessor for TableDataCache { } } - fn get_sized>(&self, k: Q, len: u64) -> Option> { + fn get_sized(&self, k: &String, len: u64) -> Option> { let Some(cached_value) = self.get(k) else { metrics_inc_cache_miss_bytes(len, DISK_TABLE_DATA_CACHE_NAME); return None; @@ -149,11 +148,11 @@ impl CacheAccessor for TableDataCache { Arc::new(v) } - fn evict(&self, k: &str) -> bool { + fn evict(&self, k: &String) -> bool { self.external_cache.evict(k) } - fn contains_key(&self, k: &str) -> bool { + fn contains_key(&self, k: &String) -> bool { self.external_cache.contains_key(k) } diff --git a/src/query/storages/common/cache/src/read/cached_reader.rs b/src/query/storages/common/cache/src/read/cached_reader.rs index 9dde7aa46ef61..e4d2210327e2b 100644 --- a/src/query/storages/common/cache/src/read/cached_reader.rs +++ b/src/query/storages/common/cache/src/read/cached_reader.rs @@ -21,55 +21,48 @@ use databend_common_metrics::cache::*; use super::loader::LoadParams; use crate::caches::CacheValue; use crate::CacheAccessor; -use crate::InMemoryLruCache; use crate::Loader; /// A cache-aware reader -pub struct CachedReader { - cache: Option, +pub struct CachedReader { + cache: Arc>, loader: L, } -impl>, L> CachedReader> +impl>, L> CachedReader where L: Loader + Sync { - pub fn new(cache: Option>, loader: L) -> Self { + pub fn new(cache: Arc>, loader: L) -> Self { Self { cache, loader } } /// Load the object at `location`, uses/populates the cache if possible/necessary. #[async_backtrace::framed] pub async fn read(&self, params: &LoadParams) -> Result> { - match &self.cache { - None => Ok(Arc::new(self.loader.load(params).await?)), - Some(cache) => { - let cache_key = self.loader.cache_key(params); - match cache.get(cache_key.as_str()) { - Some(item) => Ok(item), - None => { - let start = Instant::now(); + let cache_key = self.loader.cache_key(params); - let v = self.loader.load(params).await?; + if let Some(item) = self.cache.get(&cache_key) { + return Ok(item); + } + + let start = Instant::now(); + let v = self.loader.load(params).await?; - // Perf. - { - metrics_inc_cache_miss_load_millisecond( - start.elapsed().as_millis() as u64, - cache.name(), - ); - } + // Perf. + { + metrics_inc_cache_miss_load_millisecond( + start.elapsed().as_millis() as u64, + self.cache.name(), + ); + } - match params.put_cache { - true => Ok(cache.insert(cache_key, v)), - false => Ok(Arc::new(v)), - } - } - } - } + match params.put_cache { + true => Ok(self.cache.insert(cache_key, v)), + false => Ok(Arc::new(v)), } } pub fn name(&self) -> &str { - self.cache.as_ref().map(|c| c.name()).unwrap_or("") + self.cache.name() } } diff --git a/src/query/storages/common/cache/src/read/readers.rs b/src/query/storages/common/cache/src/read/readers.rs index a160e68098b12..dc6ce7f725f91 100644 --- a/src/query/storages/common/cache/src/read/readers.rs +++ b/src/query/storages/common/cache/src/read/readers.rs @@ -13,7 +13,6 @@ // limitations under the License. use crate::read::cached_reader::CachedReader; -use crate::InMemoryLruCache; -pub type InMemoryItemCacheReader = CachedReader>; -pub type InMemoryCacheReader = CachedReader>; +pub type InMemoryItemCacheReader = CachedReader; +pub type InMemoryCacheReader = CachedReader; diff --git a/src/query/storages/fuse/src/io/read/block/block_reader_merge_io_async.rs b/src/query/storages/fuse/src/io/read/block/block_reader_merge_io_async.rs index 2e12bc18b797e..2eb7e86ae1b88 100644 --- a/src/query/storages/fuse/src/io/read/block/block_reader_merge_io_async.rs +++ b/src/query/storages/fuse/src/io/read/block/block_reader_merge_io_async.rs @@ -162,14 +162,16 @@ impl BlockReader { // first, check in memory table data cache // column_array_cache - if let Some(cache_array) = column_array_cache.get_sized(&column_cache_key, len) { + if let Some(cache_array) = + column_array_cache.get_sized(&column_cache_key.cache_key, len) + { cached_column_array.push((*column_id, cache_array)); continue; } // and then, check on disk table data cache if let Some(cached_column_raw_data) = - column_data_cache.get_sized(&column_cache_key, len) + column_data_cache.get_sized(&column_cache_key.cache_key, len) { cached_column_data.push((*column_id, cached_column_raw_data)); continue; diff --git a/src/query/storages/fuse/src/io/read/block/block_reader_merge_io_sync.rs b/src/query/storages/fuse/src/io/read/block/block_reader_merge_io_sync.rs index ebf4fc32b335c..a518f0f5cd179 100644 --- a/src/query/storages/fuse/src/io/read/block/block_reader_merge_io_sync.rs +++ b/src/query/storages/fuse/src/io/read/block/block_reader_merge_io_sync.rs @@ -23,7 +23,6 @@ use databend_common_exception::Result; use databend_common_expression::ColumnId; use databend_common_storage::parquet_rs::infer_schema_with_extension; use databend_common_storage::parquet_rs::read_metadata_sync; -use databend_storages_common_cache::CacheAccessor; use databend_storages_common_cache::CacheManager; use databend_storages_common_cache::TableDataCacheKey; use opendal::Operator; @@ -120,7 +119,7 @@ impl BlockReader { // first, check column array object cache let (offset, len) = column_meta.offset_length(); let column_cache_key = TableDataCacheKey::new(block_path, *column_id, offset, len); - if let Some(cache_array) = column_array_cache.get(&column_cache_key) { + if let Some(cache_array) = column_array_cache.get(&column_cache_key.cache_key) { cached_column_array.push((*column_id, cache_array)); continue; } diff --git a/src/query/storages/fuse/src/io/read/block/block_reader_native_deserialize.rs b/src/query/storages/fuse/src/io/read/block/block_reader_native_deserialize.rs index ca9fcc0672457..8a0ed7f599e70 100644 --- a/src/query/storages/fuse/src/io/read/block/block_reader_native_deserialize.rs +++ b/src/query/storages/fuse/src/io/read/block/block_reader_native_deserialize.rs @@ -37,7 +37,6 @@ use databend_common_expression::ColumnId; use databend_common_expression::DataBlock; use databend_common_metrics::storage::*; use databend_common_storage::ColumnNode; -use databend_storages_common_cache::CacheAccessor; use databend_storages_common_cache::CacheManager; use databend_storages_common_cache::TableDataCacheKey; use databend_storages_common_table_meta::meta::ColumnMeta; @@ -160,15 +159,14 @@ impl BlockReader { // populate cache if necessary if self.put_cache { - if let Some(cache) = CacheManager::instance().get_table_data_array_cache() { - // populate array cache items - for item in deserialized_column_arrays.into_iter() { - if let DeserializedArray::Deserialized((column_id, array, size)) = item { - let meta = column_metas.get(&column_id).unwrap(); - let (offset, len) = meta.offset_length(); - let key = TableDataCacheKey::new(block_path, column_id, offset, len); - cache.insert(key.into(), (array, size)); - } + let cache = CacheManager::instance().get_table_data_array_cache(); + // populate array cache items + for item in deserialized_column_arrays.into_iter() { + if let DeserializedArray::Deserialized((column_id, array, size)) = item { + let meta = column_metas.get(&column_id).unwrap(); + let (offset, len) = meta.offset_length(); + let key = TableDataCacheKey::new(block_path, column_id, offset, len); + cache.insert(key.into(), (array, size)); } } } diff --git a/src/query/storages/fuse/src/io/read/block/parquet/mod.rs b/src/query/storages/fuse/src/io/read/block/parquet/mod.rs index c4b212b4e0f41..d0cd66a09d28b 100644 --- a/src/query/storages/fuse/src/io/read/block/parquet/mod.rs +++ b/src/query/storages/fuse/src/io/read/block/parquet/mod.rs @@ -26,7 +26,6 @@ use databend_common_expression::DataBlock; use databend_common_expression::TableDataType; use databend_common_expression::TableSchema; use databend_common_expression::Value; -use databend_storages_common_cache::CacheAccessor; use databend_storages_common_cache::CacheManager; use databend_storages_common_cache::TableDataCacheKey; use databend_storages_common_table_meta::meta::ColumnMeta; @@ -62,11 +61,7 @@ impl BlockReader { let mut columns = Vec::with_capacity(self.projected_schema.fields.len()); let name_paths = column_name_paths(&self.projection, &self.original_schema); - let array_cache = if self.put_cache { - CacheManager::instance().get_table_data_array_cache() - } else { - None - }; + let array_cache = CacheManager::instance().get_table_data_array_cache(); for ((i, field), column_node) in self .projected_schema @@ -97,14 +92,11 @@ impl BlockReader { let arrow_array = column_by_name(&record_batch, &name_paths[i]); let arrow2_array: Box = arrow_array.into(); - if !column_node.is_nested { - if let Some(cache) = &array_cache { - let meta = column_metas.get(&field.column_id).unwrap(); - let (offset, len) = meta.offset_length(); - let key = - TableDataCacheKey::new(block_path, field.column_id, offset, len); - cache.insert(key.into(), (arrow2_array.clone(), data.len())); - } + if !column_node.is_nested && self.put_cache { + let meta = column_metas.get(&field.column_id).unwrap(); + let (offset, len) = meta.offset_length(); + let key = TableDataCacheKey::new(block_path, field.column_id, offset, len); + array_cache.insert(key.into(), (arrow2_array.clone(), data.len())); } Value::Column(Column::from_arrow(arrow2_array.as_ref(), &data_type)?) } diff --git a/src/query/storages/fuse/src/io/read/meta/meta_readers.rs b/src/query/storages/fuse/src/io/read/meta/meta_readers.rs index 9945fffe17959..8ba6db6153cb2 100644 --- a/src/query/storages/fuse/src/io/read/meta/meta_readers.rs +++ b/src/query/storages/fuse/src/io/read/meta/meta_readers.rs @@ -14,6 +14,7 @@ use std::io::Read; use std::io::SeekFrom; +use std::sync::Arc; use bytes::Buf; use databend_common_exception::ErrorCode; @@ -21,6 +22,7 @@ use databend_common_exception::Result; use databend_common_expression::TableSchemaRef; use databend_storages_common_cache::CacheManager; use databend_storages_common_cache::InMemoryItemCacheReader; +use databend_storages_common_cache::InMemoryLruCache; use databend_storages_common_cache::LoadParams; use databend_storages_common_cache::Loader; use databend_storages_common_index::BloomIndexMeta; @@ -65,7 +67,10 @@ impl MetaReaders { dal: Operator, schema: TableSchemaRef, ) -> CompactSegmentInfoReader { - CompactSegmentInfoReader::new(None, LoaderWrapper((dal, schema))) + CompactSegmentInfoReader::new( + Arc::new(None::>), + LoaderWrapper((dal, schema)), + ) } pub fn table_snapshot_reader(dal: Operator) -> TableSnapshotReader { @@ -76,7 +81,10 @@ impl MetaReaders { } pub fn table_snapshot_reader_without_cache(dal: Operator) -> TableSnapshotReader { - TableSnapshotReader::new(None, LoaderWrapper(dal)) + TableSnapshotReader::new( + Arc::new(None::>), + LoaderWrapper(dal), + ) } pub fn table_snapshot_statistics_reader(dal: Operator) -> TableSnapshotStatisticsReader { diff --git a/src/query/storages/fuse/src/io/segments.rs b/src/query/storages/fuse/src/io/segments.rs index 1ad24231047b2..899479b38a659 100644 --- a/src/query/storages/fuse/src/io/segments.rs +++ b/src/query/storages/fuse/src/io/segments.rs @@ -19,7 +19,6 @@ use databend_common_catalog::table_context::TableContext; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::TableSchemaRef; -use databend_storages_common_cache::CacheAccessor; use databend_storages_common_cache::CacheManager; use databend_storages_common_cache::LoadParams; use databend_storages_common_table_meta::meta::CompactSegmentInfo; @@ -126,9 +125,8 @@ impl SegmentsIO { let raw_bytes = serialized_segment.segment.to_bytes()?; let compact_segment_info = CompactSegmentInfo::from_slice(&raw_bytes)?; dal.write(&serialized_segment.path, raw_bytes).await?; - if let Some(segment_cache) = CacheManager::instance().get_table_segment_cache() { - segment_cache.insert(serialized_segment.path, compact_segment_info); - } + let segment_cache = CacheManager::instance().get_table_segment_cache(); + segment_cache.insert(serialized_segment.path, compact_segment_info); Ok(()) } } diff --git a/src/query/storages/fuse/src/io/write/meta_writer.rs b/src/query/storages/fuse/src/io/write/meta_writer.rs index 9f31a3f84e279..eb617cb7ecdfa 100644 --- a/src/query/storages/fuse/src/io/write/meta_writer.rs +++ b/src/query/storages/fuse/src/io/write/meta_writer.rs @@ -13,7 +13,6 @@ // limitations under the License. use databend_common_exception::Result; -use databend_storages_common_cache::CacheAccessor; use databend_storages_common_cache::CachedObject; use databend_storages_common_table_meta::meta::CompactSegmentInfo; use databend_storages_common_table_meta::meta::SegmentInfo; @@ -58,9 +57,8 @@ impl CachedMetaWriter for SegmentInfo { ) -> Result<()> { let bytes = self.marshal()?; data_accessor.write(location, bytes.clone()).await?; - if let Some(cache) = CompactSegmentInfo::cache() { - cache.insert(location.to_owned(), CompactSegmentInfo::try_from(&self)?); - } + let cache = CompactSegmentInfo::cache(); + cache.insert(location.to_owned(), CompactSegmentInfo::try_from(&self)?); Ok(()) } } diff --git a/src/query/storages/fuse/src/operations/commit.rs b/src/query/storages/fuse/src/operations/commit.rs index 618366ad3f80c..4e78219aeb548 100644 --- a/src/query/storages/fuse/src/operations/commit.rs +++ b/src/query/storages/fuse/src/operations/commit.rs @@ -38,7 +38,6 @@ use databend_common_metrics::storage::*; use databend_common_pipeline_core::Pipeline; use databend_common_pipeline_transforms::processors::TransformPipelineHelper; use databend_common_sql::executor::physical_plans::MutationKind; -use databend_storages_common_cache::CacheAccessor; use databend_storages_common_cache::CachedObject; use databend_storages_common_table_meta::meta::Location; use databend_storages_common_table_meta::meta::SegmentInfo; diff --git a/src/query/storages/fuse/src/operations/common/processors/transform_serialize_segment.rs b/src/query/storages/fuse/src/operations/common/processors/transform_serialize_segment.rs index 9c392eccc3609..78f14fbf6ccf5 100644 --- a/src/query/storages/fuse/src/operations/common/processors/transform_serialize_segment.rs +++ b/src/query/storages/fuse/src/operations/common/processors/transform_serialize_segment.rs @@ -27,7 +27,6 @@ use databend_common_pipeline_core::processors::OutputPort; use databend_common_pipeline_core::processors::Processor; use databend_common_pipeline_core::processors::ProcessorPtr; use databend_common_pipeline_core::PipeItem; -use databend_storages_common_cache::CacheAccessor; use databend_storages_common_cache::CachedObject; use databend_storages_common_table_meta::meta::BlockMeta; use databend_storages_common_table_meta::meta::SegmentInfo; @@ -191,9 +190,8 @@ impl Processor for TransformSerializeSegment { } } State::PreCommitSegment { location, segment } => { - if let Some(segment_cache) = SegmentInfo::cache() { - segment_cache.insert(location.clone(), segment.as_ref().try_into()?); - } + let segment_cache = SegmentInfo::cache(); + segment_cache.insert(location.clone(), segment.as_ref().try_into()?); let format_version = SegmentInfo::VERSION; diff --git a/src/query/storages/fuse/src/operations/gc.rs b/src/query/storages/fuse/src/operations/gc.rs index e451ea26a113b..d3abe83bf1813 100644 --- a/src/query/storages/fuse/src/operations/gc.rs +++ b/src/query/storages/fuse/src/operations/gc.rs @@ -23,7 +23,6 @@ use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_meta_app::schema::ListIndexesByIdReq; use databend_common_meta_app::schema::TableIndex; -use databend_storages_common_cache::CacheAccessor; use databend_storages_common_cache::CachedObject; use databend_storages_common_cache::LoadParams; use databend_storages_common_index::BloomIndexMeta; @@ -581,14 +580,13 @@ impl FuseTable { counter.inverted_indexes += inverted_index_count; // if there is inverted index file cache, evict the cached items - if let Some(inverted_index_cache) = InvertedIndexFile::cache() { - for index_path in &inverted_indexes_to_be_purged { - InvertedIndexReader::cache_key_of_index_columns(index_path) - .iter() - .for_each(|cache_key| { - inverted_index_cache.evict(cache_key); - }) - } + let inverted_index_cache = InvertedIndexFile::cache(); + for index_path in &inverted_indexes_to_be_purged { + InvertedIndexReader::cache_key_of_index_columns(index_path) + .iter() + .for_each(|cache_key| { + inverted_index_cache.evict(cache_key); + }) } self.try_purge_location_files_and_cache::( @@ -686,13 +684,14 @@ impl FuseTable { locations_to_be_purged: HashSet, ) -> Result<()> where - T: CachedObject, + T: Send + Sync + CachedObject, { - if let Some(cache) = T::cache() { - for loc in locations_to_be_purged.iter() { - cache.evict(loc); - } + let object_cache = T::cache(); + + for purged in locations_to_be_purged.iter() { + object_cache.evict(purged); } + self.try_purge_location_files(ctx, locations_to_be_purged) .await } diff --git a/src/query/storages/fuse/src/operations/read_partitions.rs b/src/query/storages/fuse/src/operations/read_partitions.rs index 5a00dd102b0a5..3351749dbd24d 100644 --- a/src/query/storages/fuse/src/operations/read_partitions.rs +++ b/src/query/storages/fuse/src/operations/read_partitions.rs @@ -31,7 +31,6 @@ use databend_common_expression::Scalar; use databend_common_expression::TableSchemaRef; use databend_common_sql::field_default_value; use databend_common_storage::ColumnNodes; -use databend_storages_common_cache::CacheAccessor; use databend_storages_common_cache::CachedObject; use databend_storages_common_index::BloomIndex; use databend_storages_common_pruner::BlockMetaIndex; @@ -154,15 +153,13 @@ impl FuseTable { }); if let Some(cache_key) = derterministic_cache_key.as_ref() { - if let Some(cache) = CacheItem::cache() { - if let Some(data) = cache.get(cache_key) { - info!( - "prune snapshot block from cache, final block numbers:{}, cost:{:?}", - data.1.len(), - start.elapsed() - ); - return Ok((data.0.clone(), data.1.clone())); - } + if let Some(data) = CacheItem::cache().get(cache_key) { + info!( + "prune snapshot block from cache, final block numbers:{}, cost:{:?}", + data.1.len(), + start.elapsed() + ); + return Ok((data.0.clone(), data.1.clone())); } } @@ -235,9 +232,7 @@ impl FuseTable { )?; if let Some(cache_key) = derterministic_cache_key { - if let Some(cache) = CacheItem::cache() { - cache.insert(cache_key, result.clone()); - } + CacheItem::cache().insert(cache_key, result.clone()); } Ok(result) } diff --git a/src/query/storages/fuse/src/pruning/fuse_pruner.rs b/src/query/storages/fuse/src/pruning/fuse_pruner.rs index e2394fc2d5e13..19792596b88ca 100644 --- a/src/query/storages/fuse/src/pruning/fuse_pruner.rs +++ b/src/query/storages/fuse/src/pruning/fuse_pruner.rs @@ -27,7 +27,6 @@ use databend_common_expression::SEGMENT_NAME_COL_NAME; use databend_common_functions::BUILTIN_FUNCTIONS; use databend_common_sql::field_default_value; use databend_common_sql::BloomIndexColumns; -use databend_storages_common_cache::BlockMetaCache; use databend_storages_common_cache::CacheAccessor; use databend_storages_common_cache::CacheManager; use databend_storages_common_index::RangeIndex; @@ -196,7 +195,7 @@ pub struct FusePruner { pub push_down: Option, pub inverse_range_index: Option, pub deleted_segments: Vec, - pub block_meta_cache: Option, + pub block_meta_cache: Arc>>>, } impl FusePruner { @@ -403,21 +402,18 @@ impl FusePruner { } fn extract_block_metas( - segment_path: &str, + segment_path: &String, segment: &CompactSegmentInfo, populate_cache: bool, ) -> Result>>> { - if let Some(cache) = CacheManager::instance().get_block_meta_cache() { - if let Some(metas) = cache.get(segment_path) { - Ok(metas) - } else { - match populate_cache { - true => Ok(cache.insert(segment_path.to_string(), segment.block_metas()?)), - false => Ok(Arc::new(segment.block_metas()?)), - } - } + let cache = CacheManager::instance().get_block_meta_cache(); + if let Some(metas) = cache.get(segment_path) { + Ok(metas) } else { - Ok(Arc::new(segment.block_metas()?)) + match populate_cache { + true => Ok(cache.insert(segment_path.to_string(), segment.block_metas()?)), + false => Ok(Arc::new(segment.block_metas()?)), + } } } diff --git a/src/query/storages/system/src/caches_table.rs b/src/query/storages/system/src/caches_table.rs index 12dd78bc64de8..dd56d47667f06 100644 --- a/src/query/storages/system/src/caches_table.rs +++ b/src/query/storages/system/src/caches_table.rs @@ -34,8 +34,6 @@ use databend_common_storages_fuse::TableContext; use databend_storages_common_cache::CacheAccessor; use databend_storages_common_cache::CacheManager; use databend_storages_common_cache::CacheValue; -use databend_storages_common_cache::InMemoryLruCache; -use databend_storages_common_cache::Unit; use databend_storages_common_cache::DISK_TABLE_DATA_CACHE_NAME; use crate::SyncOneBlockSystemTable; @@ -51,8 +49,8 @@ struct CachesTableColumns { names: Vec, num_items: Vec, size: Vec, - capacity: Vec, - unit: Vec, + items_capacity: Vec, + bytes_capacity: Vec, access: Vec, hit: Vec, miss: Vec, @@ -86,53 +84,28 @@ impl SyncSystemTable for CachesTable { let mut columns = CachesTableColumns::default(); - if let Some(table_snapshot_cache) = table_snapshot_cache { - Self::append_row(&table_snapshot_cache, &local_node, &mut columns); - } - if let Some(table_snapshot_statistic_cache) = table_snapshot_statistic_cache { - Self::append_row(&table_snapshot_statistic_cache, &local_node, &mut columns); - } - - if let Some(segment_info_cache) = segment_info_cache { - Self::append_row(&segment_info_cache, &local_node, &mut columns); - } - - if let Some(bloom_index_filter_cache) = bloom_index_filter_cache { - Self::append_row(&bloom_index_filter_cache, &local_node, &mut columns); - } - - if let Some(bloom_index_meta_cache) = bloom_index_meta_cache { - Self::append_row(&bloom_index_meta_cache, &local_node, &mut columns); - } - - if let Some(block_meta_cache) = block_meta_cache { - Self::append_row(&block_meta_cache, &local_node, &mut columns); - } - - if let Some(inverted_index_meta_cache) = inverted_index_meta_cache { - Self::append_row(&inverted_index_meta_cache, &local_node, &mut columns); - } - - if let Some(inverted_index_file_cache) = inverted_index_file_cache { - Self::append_row(&inverted_index_file_cache, &local_node, &mut columns); - } - - if let Some(prune_partitions_cache) = prune_partitions_cache { - Self::append_row(&prune_partitions_cache, &local_node, &mut columns); - } - - if let Some(file_meta_data_cache) = file_meta_data_cache { - Self::append_row(&file_meta_data_cache, &local_node, &mut columns); - } - + // In memory cache + Self::append_row(&table_snapshot_cache, &local_node, &mut columns); + Self::append_row(&table_snapshot_statistic_cache, &local_node, &mut columns); + Self::append_row(&segment_info_cache, &local_node, &mut columns); + Self::append_row(&bloom_index_filter_cache, &local_node, &mut columns); + Self::append_row(&bloom_index_meta_cache, &local_node, &mut columns); + Self::append_row(&block_meta_cache, &local_node, &mut columns); + Self::append_row(&inverted_index_meta_cache, &local_node, &mut columns); + Self::append_row(&inverted_index_file_cache, &local_node, &mut columns); + Self::append_row(&prune_partitions_cache, &local_node, &mut columns); + Self::append_row(&file_meta_data_cache, &local_node, &mut columns); + Self::append_row(&table_column_array_cache, &local_node, &mut columns); + + // In disk cache if let Some(cache) = table_data_cache { // table data cache is not a named cache yet columns.nodes.push(local_node.clone()); columns.names.push(DISK_TABLE_DATA_CACHE_NAME.to_string()); columns.num_items.push(cache.len() as u64); columns.size.push(cache.bytes_size()); - columns.capacity.push(cache.bytes_capacity()); - columns.unit.push(Unit::Bytes.to_string()); + columns.items_capacity.push(cache.items_capacity()); + columns.bytes_capacity.push(cache.bytes_capacity()); let access = get_cache_access_count(DISK_TABLE_DATA_CACHE_NAME); let hit = get_cache_hit_count(DISK_TABLE_DATA_CACHE_NAME); let miss = get_cache_miss_count(DISK_TABLE_DATA_CACHE_NAME); @@ -141,17 +114,13 @@ impl SyncSystemTable for CachesTable { columns.miss.push(miss); } - if let Some(table_column_array_cache) = table_column_array_cache { - Self::append_row(&table_column_array_cache, &local_node, &mut columns); - } - Ok(DataBlock::new_from_columns(vec![ StringType::from_data(columns.nodes), StringType::from_data(columns.names), UInt64Type::from_data(columns.num_items), UInt64Type::from_data(columns.size), - UInt64Type::from_data(columns.capacity), - StringType::from_data(columns.unit), + UInt64Type::from_data(columns.items_capacity), + UInt64Type::from_data(columns.bytes_capacity), UInt64Type::from_data(columns.access), UInt64Type::from_data(columns.hit), UInt64Type::from_data(columns.miss), @@ -166,8 +135,14 @@ impl CachesTable { TableField::new("name", TableDataType::String), TableField::new("num_items", TableDataType::Number(NumberDataType::UInt64)), TableField::new("size", TableDataType::Number(NumberDataType::UInt64)), - TableField::new("capacity", TableDataType::Number(NumberDataType::UInt64)), - TableField::new("unit", TableDataType::String), + TableField::new( + "items_capacity", + TableDataType::Number(NumberDataType::UInt64), + ), + TableField::new( + "bytes_capacity", + TableDataType::Number(NumberDataType::UInt64), + ), TableField::new("access", TableDataType::Number(NumberDataType::UInt64)), TableField::new("hit", TableDataType::Number(NumberDataType::UInt64)), TableField::new("miss", TableDataType::Number(NumberDataType::UInt64)), @@ -188,8 +163,8 @@ impl CachesTable { SyncOneBlockSystemTable::create(Self { table_info }) } - fn append_row>>( - cache: &InMemoryLruCache, + fn append_row>>( + cache: &Arc>, local_node: &str, columns: &mut CachesTableColumns, ) { @@ -197,17 +172,8 @@ impl CachesTable { columns.names.push(cache.name().to_string()); columns.num_items.push(cache.len() as u64); columns.size.push(cache.bytes_size()); - - match cache.unit() { - Unit::Bytes => { - columns.unit.push(cache.unit().to_string()); - columns.capacity.push(cache.bytes_capacity()); - } - Unit::Count => { - columns.unit.push(cache.unit().to_string()); - columns.capacity.push(cache.items_capacity()); - } - } + columns.bytes_capacity.push(cache.bytes_capacity()); + columns.items_capacity.push(cache.items_capacity()); let access = get_cache_access_count(cache.name()); let hit = get_cache_hit_count(cache.name());