diff --git a/src/mito2/src/cache/write_cache.rs b/src/mito2/src/cache/write_cache.rs index 822ff4a29e1b..2bea36ad3d0c 100644 --- a/src/mito2/src/cache/write_cache.rs +++ b/src/mito2/src/cache/write_cache.rs @@ -501,7 +501,7 @@ mod tests { // Read metadata from write cache let builder = ParquetReaderBuilder::new(data_home, handle.clone(), mock_store.clone()) - .cache(Some(cache_manager.clone())); + .cache(cache_manager.clone()); let reader = builder.build().await.unwrap(); // Check parquet metadata diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index 2c2a8f092af8..57337d8f74ae 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -562,7 +562,7 @@ pub struct SerializedCompactionOutput { struct CompactionSstReaderBuilder<'a> { metadata: RegionMetadataRef, sst_layer: AccessLayerRef, - cache: Option, + cache: CacheManagerRef, inputs: &'a [FileHandle], append_mode: bool, filter_deleted: bool, diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index bf197690cf3d..3e0228a4b2a4 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -295,7 +295,7 @@ impl Compactor for DefaultCompactor { let reader = CompactionSstReaderBuilder { metadata: region_metadata.clone(), sst_layer: sst_layer.clone(), - cache: Some(cache_manager.clone()), + cache: cache_manager.clone(), inputs: &output.inputs, append_mode, filter_deleted: output.filter_deleted, diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index bf9777efa5f5..c60b7c4107ed 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -438,16 +438,12 @@ impl EngineInner { channel_size: self.config.parallel_scan_channel_size, }; - let scan_region = ScanRegion::new( - version, - region.access_layer.clone(), - request, - Some(cache_manager), - ) - .with_parallelism(scan_parallelism) - .with_ignore_inverted_index(self.config.inverted_index.apply_on_query.disabled()) - .with_ignore_fulltext_index(self.config.fulltext_index.apply_on_query.disabled()) - .with_start_time(query_start); + let scan_region = + ScanRegion::new(version, region.access_layer.clone(), request, cache_manager) + .with_parallelism(scan_parallelism) + .with_ignore_inverted_index(self.config.inverted_index.apply_on_query.disabled()) + .with_ignore_fulltext_index(self.config.fulltext_index.apply_on_query.disabled()) + .with_start_time(query_start); Ok(scan_region) } diff --git a/src/mito2/src/read/last_row.rs b/src/mito2/src/read/last_row.rs index f40172c21d3b..d97e35ac08b9 100644 --- a/src/mito2/src/read/last_row.rs +++ b/src/mito2/src/read/last_row.rs @@ -85,7 +85,7 @@ impl RowGroupLastRowCachedReader { pub(crate) fn new( file_id: FileId, row_group_idx: usize, - cache_manager: Option, + cache_manager: CacheManagerRef, row_group_reader: RowGroupReader, ) -> Self { let key = SelectorResultKey { @@ -94,9 +94,6 @@ impl RowGroupLastRowCachedReader { selector: TimeSeriesRowSelector::LastRow, }; - let Some(cache_manager) = cache_manager else { - return Self::new_miss(key, row_group_reader, None); - }; if let Some(value) = cache_manager.get_selector_result(&key) { let schema_matches = value.projection == row_group_reader diff --git a/src/mito2/src/read/projection.rs b/src/mito2/src/read/projection.rs index 9ba5f6eccf1e..78866f0c1ba0 100644 --- a/src/mito2/src/read/projection.rs +++ b/src/mito2/src/read/projection.rs @@ -171,7 +171,7 @@ impl ProjectionMapper { pub(crate) fn convert( &self, batch: &Batch, - cache_manager: Option<&CacheManager>, + cache_manager: &CacheManager, ) -> common_recordbatch::error::Result { debug_assert_eq!(self.batch_fields.len(), batch.fields().len()); debug_assert!(self @@ -204,15 +204,12 @@ impl ProjectionMapper { match index { BatchIndex::Tag(idx) => { let value = &pk_values[*idx]; - let vector = match cache_manager { - Some(cache) => repeated_vector_with_cache( - &column_schema.data_type, - value, - num_rows, - cache, - )?, - None => new_repeated_vector(&column_schema.data_type, value, num_rows)?, - }; + let vector = repeated_vector_with_cache( + &column_schema.data_type, + value, + num_rows, + cache_manager, + )?; columns.push(vector); } BatchIndex::Timestamp => { @@ -360,7 +357,7 @@ mod tests { // With vector cache. let cache = CacheManager::builder().vector_cache_size(1024).build(); let batch = new_batch(0, &[1, 2], &[(3, 3), (4, 4)], 3); - let record_batch = mapper.convert(&batch, Some(&cache)).unwrap(); + let record_batch = mapper.convert(&batch, &cache).unwrap(); let expect = "\ +---------------------+----+----+----+----+ | ts | k0 | k1 | v0 | v1 | @@ -380,7 +377,7 @@ mod tests { assert!(cache .get_repeated_vector(&ConcreteDataType::int64_datatype(), &Value::Int64(3)) .is_none()); - let record_batch = mapper.convert(&batch, Some(&cache)).unwrap(); + let record_batch = mapper.convert(&batch, &cache).unwrap(); assert_eq!(expect, print_record_batch(record_batch)); } @@ -401,7 +398,8 @@ mod tests { ); let batch = new_batch(0, &[1, 2], &[(4, 4)], 3); - let record_batch = mapper.convert(&batch, None).unwrap(); + let cache = CacheManager::builder().vector_cache_size(1024).build(); + let record_batch = mapper.convert(&batch, &cache).unwrap(); let expect = "\ +----+----+ | v1 | k0 | diff --git a/src/mito2/src/read/range.rs b/src/mito2/src/read/range.rs index 677b37354d5b..4bf9314915b9 100644 --- a/src/mito2/src/read/range.rs +++ b/src/mito2/src/read/range.rs @@ -90,7 +90,7 @@ impl RangeMeta { Self::push_unordered_file_ranges( input.memtables.len(), &input.files, - input.cache_manager.as_deref(), + &input.cache_manager, &mut ranges, ); @@ -172,16 +172,15 @@ impl RangeMeta { fn push_unordered_file_ranges( num_memtables: usize, files: &[FileHandle], - cache: Option<&CacheManager>, + cache: &CacheManager, ranges: &mut Vec, ) { // For append mode, we can parallelize reading row groups. for (i, file) in files.iter().enumerate() { let file_index = num_memtables + i; // Get parquet meta from the cache. - let parquet_meta = cache.and_then(|c| { - c.get_parquet_meta_data_from_mem_cache(file.region_id(), file.file_id()) - }); + let parquet_meta = + cache.get_parquet_meta_data_from_mem_cache(file.region_id(), file.file_id()); if let Some(parquet_meta) = parquet_meta { // Scans each row group. for row_group_index in 0..file.meta_ref().num_row_groups { diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 04dadf924486..0241ba72037e 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -167,7 +167,7 @@ pub(crate) struct ScanRegion { /// Scan request. request: ScanRequest, /// Cache. - cache_manager: Option, + cache_manager: CacheManagerRef, /// Parallelism to scan. parallelism: ScanParallelism, /// Whether to ignore inverted index. @@ -184,7 +184,7 @@ impl ScanRegion { version: VersionRef, access_layer: AccessLayerRef, request: ScanRequest, - cache_manager: Option, + cache_manager: CacheManagerRef, ) -> ScanRegion { ScanRegion { version, @@ -381,17 +381,12 @@ impl ScanRegion { } let file_cache = || -> Option { - let cache_manager = self.cache_manager.as_ref()?; - let write_cache = cache_manager.write_cache()?; + let write_cache = self.cache_manager.write_cache()?; let file_cache = write_cache.file_cache(); Some(file_cache) }(); - let index_cache = self - .cache_manager - .as_ref() - .and_then(|c| c.index_cache()) - .cloned(); + let index_cache = self.cache_manager.index_cache().cloned(); InvertedIndexApplierBuilder::new( self.access_layer.region_dir().to_string(), @@ -471,7 +466,7 @@ pub(crate) struct ScanInput { /// Handles to SST files to scan. pub(crate) files: Vec, /// Cache. - pub(crate) cache_manager: Option, + pub(crate) cache_manager: CacheManagerRef, /// Ignores file not found error. ignore_file_not_found: bool, /// Parallelism to scan data. @@ -502,7 +497,7 @@ impl ScanInput { predicate: None, memtables: Vec::new(), files: Vec::new(), - cache_manager: None, + cache_manager: CacheManagerRef::default(), ignore_file_not_found: false, parallelism: ScanParallelism::default(), inverted_index_applier: None, @@ -545,7 +540,7 @@ impl ScanInput { /// Sets cache for this query. #[must_use] - pub(crate) fn with_cache(mut self, cache: Option) -> Self { + pub(crate) fn with_cache(mut self, cache: CacheManagerRef) -> Self { self.cache_manager = cache; self } diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index 9b7a71a36c51..345d1d615ba5 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -229,7 +229,7 @@ impl SeqScan { .await .map_err(BoxedError::new) .context(ExternalSnafu)?; - let cache = stream_ctx.input.cache_manager.as_deref(); + let cache = &stream_ctx.input.cache_manager; let mut metrics = ScannerMetrics::default(); let mut fetch_start = Instant::now(); #[cfg(debug_assertions)] diff --git a/src/mito2/src/read/unordered_scan.rs b/src/mito2/src/read/unordered_scan.rs index 707b7d4ba65c..7a2ce12e62ca 100644 --- a/src/mito2/src/read/unordered_scan.rs +++ b/src/mito2/src/read/unordered_scan.rs @@ -135,7 +135,7 @@ impl UnorderedScan { let stream = try_stream! { part_metrics.on_first_poll(); - let cache = stream_ctx.input.cache_manager.as_deref(); + let cache = &stream_ctx.input.cache_manager; // Scans each part. for part_range in part_ranges { let mut metrics = ScannerMetrics::default(); diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index ae51a0d37c29..c94ae600735f 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -195,11 +195,11 @@ mod tests { .unwrap(); // Enable page cache. - let cache = Some(Arc::new( + let cache = Arc::new( CacheManager::builder() .page_cache_size(64 * 1024 * 1024) .build(), - )); + ); let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store) .cache(cache.clone()); for _ in 0..3 { @@ -219,15 +219,15 @@ mod tests { // Doesn't have compressed page cached. let page_key = PageKey::new_compressed(metadata.region_id, handle.file_id(), 0, 0); - assert!(cache.as_ref().unwrap().get_pages(&page_key).is_none()); + assert!(cache.get_pages(&page_key).is_none()); // Cache 4 row groups. for i in 0..4 { let page_key = PageKey::new_uncompressed(metadata.region_id, handle.file_id(), i, 0); - assert!(cache.as_ref().unwrap().get_pages(&page_key).is_some()); + assert!(cache.get_pages(&page_key).is_some()); } let page_key = PageKey::new_uncompressed(metadata.region_id, handle.file_id(), 5, 0); - assert!(cache.as_ref().unwrap().get_pages(&page_key).is_none()); + assert!(cache.get_pages(&page_key).is_none()); } #[tokio::test] diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index b73026a7a6e3..cd219f47ccd6 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -82,7 +82,7 @@ pub struct ParquetReaderBuilder { /// can contain columns not in the parquet file. projection: Option>, /// Manager that caches SST data. - cache_manager: Option, + cache_manager: CacheManagerRef, /// Index appliers. inverted_index_applier: Option, fulltext_index_applier: Option, @@ -106,7 +106,7 @@ impl ParquetReaderBuilder { predicate: None, time_range: None, projection: None, - cache_manager: None, + cache_manager: CacheManagerRef::default(), inverted_index_applier: None, fulltext_index_applier: None, expected_metadata: None, @@ -138,7 +138,7 @@ impl ParquetReaderBuilder { /// Attaches the cache to the builder. #[must_use] - pub fn cache(mut self, cache: Option) -> ParquetReaderBuilder { + pub fn cache(mut self, cache: CacheManagerRef) -> ParquetReaderBuilder { self.cache_manager = cache; self } @@ -313,10 +313,12 @@ impl ParquetReaderBuilder { let region_id = self.file_handle.region_id(); let file_id = self.file_handle.file_id(); // Tries to get from global cache. - if let Some(manager) = &self.cache_manager { - if let Some(metadata) = manager.get_parquet_meta_data(region_id, file_id).await { - return Ok(metadata); - } + if let Some(metadata) = self + .cache_manager + .get_parquet_meta_data(region_id, file_id) + .await + { + return Ok(metadata); } // Cache miss, load metadata directly. @@ -324,13 +326,11 @@ impl ParquetReaderBuilder { let metadata = metadata_loader.load().await?; let metadata = Arc::new(metadata); // Cache the metadata. - if let Some(cache) = &self.cache_manager { - cache.put_parquet_meta_data( - self.file_handle.region_id(), - self.file_handle.file_id(), - metadata.clone(), - ); - } + self.cache_manager.put_parquet_meta_data( + self.file_handle.region_id(), + self.file_handle.file_id(), + metadata.clone(), + ); Ok(metadata) } @@ -846,7 +846,7 @@ pub(crate) struct RowGroupReaderBuilder { /// Field levels to read. field_levels: FieldLevels, /// Cache. - cache_manager: Option, + cache_manager: CacheManagerRef, } impl RowGroupReaderBuilder { @@ -864,7 +864,7 @@ impl RowGroupReaderBuilder { &self.parquet_meta } - pub(crate) fn cache_manager(&self) -> &Option { + pub(crate) fn cache_manager(&self) -> &CacheManagerRef { &self.cache_manager } diff --git a/src/mito2/src/sst/parquet/row_group.rs b/src/mito2/src/sst/parquet/row_group.rs index 73382c06d9b3..dd572d8863f8 100644 --- a/src/mito2/src/sst/parquet/row_group.rs +++ b/src/mito2/src/sst/parquet/row_group.rs @@ -48,7 +48,7 @@ pub struct InMemoryRowGroup<'a> { region_id: RegionId, file_id: FileId, row_group_idx: usize, - cache_manager: Option, + cache_manager: CacheManagerRef, /// Row group level cached pages for each column. /// /// These pages are uncompressed pages of a row group. @@ -69,7 +69,7 @@ impl<'a> InMemoryRowGroup<'a> { file_id: FileId, parquet_meta: &'a ParquetMetaData, row_group_idx: usize, - cache_manager: Option, + cache_manager: CacheManagerRef, file_path: &'a str, object_store: ObjectStore, ) -> Self { @@ -208,19 +208,18 @@ impl<'a> InMemoryRowGroup<'a> { }; let column = self.metadata.column(idx); - if let Some(cache) = &self.cache_manager { - if !cache_uncompressed_pages(column) { - // For columns that have multiple uncompressed pages, we only cache the compressed page - // to save memory. - let page_key = PageKey::new_compressed( - self.region_id, - self.file_id, - self.row_group_idx, - idx, - ); - cache - .put_pages(page_key, Arc::new(PageValue::new_compressed(data.clone()))); - } + + if !cache_uncompressed_pages(column) { + // For columns that have multiple uncompressed pages, we only cache the compressed page + // to save memory. + let page_key = PageKey::new_compressed( + self.region_id, + self.file_id, + self.row_group_idx, + idx, + ); + self.cache_manager + .put_pages(page_key, Arc::new(PageValue::new_compressed(data.clone()))); } *chunk = Some(Arc::new(ColumnChunkData::Dense { @@ -242,9 +241,6 @@ impl<'a> InMemoryRowGroup<'a> { .enumerate() .filter(|(idx, chunk)| chunk.is_none() && projection.leaf_included(*idx)) .for_each(|(idx, chunk)| { - let Some(cache) = &self.cache_manager else { - return; - }; let column = self.metadata.column(idx); if cache_uncompressed_pages(column) { // Fetches uncompressed pages for the row group. @@ -254,7 +250,7 @@ impl<'a> InMemoryRowGroup<'a> { self.row_group_idx, idx, ); - self.column_uncompressed_pages[idx] = cache.get_pages(&page_key); + self.column_uncompressed_pages[idx] = self.cache_manager.get_pages(&page_key); } else { // Fetches the compressed page from the cache. let page_key = PageKey::new_compressed( @@ -264,7 +260,7 @@ impl<'a> InMemoryRowGroup<'a> { idx, ); - *chunk = cache.get_pages(&page_key).map(|page_value| { + *chunk = self.cache_manager.get_pages(&page_key).map(|page_value| { Arc::new(ColumnChunkData::Dense { offset: column.byte_range().0 as usize, data: page_value.compressed.clone(), @@ -300,7 +296,7 @@ impl<'a> InMemoryRowGroup<'a> { key: IndexKey, ranges: &[Range], ) -> Option> { - if let Some(cache) = self.cache_manager.as_ref()?.write_cache() { + if let Some(cache) = self.cache_manager.write_cache() { return cache.file_cache().read_ranges(key, ranges).await; } None @@ -331,10 +327,6 @@ impl<'a> InMemoryRowGroup<'a> { } }; - let Some(cache) = &self.cache_manager else { - return Ok(Box::new(page_reader)); - }; - let column = self.metadata.column(i); if cache_uncompressed_pages(column) { // This column use row group level page cache. @@ -343,7 +335,7 @@ impl<'a> InMemoryRowGroup<'a> { let page_value = Arc::new(PageValue::new_row_group(pages)); let page_key = PageKey::new_uncompressed(self.region_id, self.file_id, self.row_group_idx, i); - cache.put_pages(page_key, page_value.clone()); + self.cache_manager.put_pages(page_key, page_value.clone()); return Ok(Box::new(RowGroupCachedReader::new(&page_value.row_group))); }