Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: unify puffin name passed to stager #5564

Merged
merged 12 commits into from
Feb 21, 2025
14 changes: 7 additions & 7 deletions src/index/src/fulltext_index/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::fulltext_index::create::{FulltextIndexCreator, TantivyFulltextIndexCr
use crate::fulltext_index::search::{FulltextIndexSearcher, RowId, TantivyFulltextIndexSearcher};
use crate::fulltext_index::{Analyzer, Config};

async fn new_bounded_stager(prefix: &str) -> (TempDir, Arc<BoundedStager>) {
async fn new_bounded_stager(prefix: &str) -> (TempDir, Arc<BoundedStager<String>>) {
let staging_dir = create_temp_dir(prefix);
let path = staging_dir.path().to_path_buf();
(
Expand Down Expand Up @@ -68,13 +68,13 @@ async fn test_search(
let file_accessor = Arc::new(MockFileAccessor::new(prefix));
let puffin_manager = FsPuffinManager::new(stager, file_accessor);

let file_name = "fulltext_index";
let blob_key = "fulltext_index";
let mut writer = puffin_manager.writer(file_name).await.unwrap();
create_index(prefix, &mut writer, blob_key, texts, config).await;
let file_name = "fulltext_index".to_string();
let blob_key = "fulltext_index".to_string();
let mut writer = puffin_manager.writer(&file_name).await.unwrap();
create_index(prefix, &mut writer, &blob_key, texts, config).await;

let reader = puffin_manager.reader(file_name).await.unwrap();
let index_dir = reader.dir(blob_key).await.unwrap();
let reader = puffin_manager.reader(&file_name).await.unwrap();
let index_dir = reader.dir(&blob_key).await.unwrap();
let searcher = TantivyFulltextIndexSearcher::new(index_dir.path()).unwrap();
let results = searcher.search(query).await.unwrap();

Expand Down
32 changes: 25 additions & 7 deletions src/mito2/src/access_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,14 @@ impl AccessLayer {
} else {
// Write cache is disabled.
let store = self.object_store.clone();
let path_provider = RegionFilePathFactory::new(self.region_dir.clone());
let indexer_builder = IndexerBuilderImpl {
op_type: request.op_type,
metadata: request.metadata.clone(),
row_group_size: write_opts.row_group_size,
puffin_manager: self.puffin_manager_factory.build(store),
puffin_manager: self
.puffin_manager_factory
.build(store, path_provider.clone()),
intermediate_manager: self.intermediate_manager.clone(),
index_options: request.index_options,
inverted_index_config: request.inverted_index_config,
Expand All @@ -161,9 +164,7 @@ impl AccessLayer {
self.object_store.clone(),
request.metadata,
indexer_builder,
RegionFilePathFactory {
region_dir: self.region_dir.clone(),
},
path_provider,
)
.await;
writer
Expand Down Expand Up @@ -248,8 +249,18 @@ pub trait FilePathProvider: Send + Sync {
/// Path provider that builds paths in local write cache.
#[derive(Clone)]
pub(crate) struct WriteCachePathProvider {
pub(crate) region_id: RegionId,
pub(crate) file_cache: FileCacheRef,
region_id: RegionId,
file_cache: FileCacheRef,
}

impl WriteCachePathProvider {
/// Creates a new `WriteCachePathProvider` instance.
pub fn new(region_id: RegionId, file_cache: FileCacheRef) -> Self {
Self {
region_id,
file_cache,
}
}
}

impl FilePathProvider for WriteCachePathProvider {
Expand All @@ -267,7 +278,14 @@ impl FilePathProvider for WriteCachePathProvider {
/// Path provider that builds paths in region storage path.
#[derive(Clone, Debug)]
pub(crate) struct RegionFilePathFactory {
pub(crate) region_dir: String,
region_dir: String,
}

impl RegionFilePathFactory {
/// Creates a new `RegionFilePathFactory` instance.
pub fn new(region_dir: String) -> Self {
Self { region_dir }
}
}

impl FilePathProvider for RegionFilePathFactory {
Expand Down
17 changes: 6 additions & 11 deletions src/mito2/src/cache/write_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,15 +114,14 @@ impl WriteCache {
let region_id = write_request.metadata.region_id;

let store = self.file_cache.local_store();
let path_provider = WriteCachePathProvider {
file_cache: self.file_cache.clone(),
region_id,
};
let path_provider = WriteCachePathProvider::new(region_id, self.file_cache.clone());
let indexer = IndexerBuilderImpl {
op_type: write_request.op_type,
metadata: write_request.metadata.clone(),
row_group_size: write_opts.row_group_size,
puffin_manager: self.puffin_manager_factory.build(store),
puffin_manager: self
.puffin_manager_factory
.build(store, path_provider.clone()),
intermediate_manager: self.intermediate_manager.clone(),
index_options: write_request.index_options,
inverted_index_config: write_request.inverted_index_config,
Expand Down Expand Up @@ -355,9 +354,7 @@ mod tests {
// and now just use local file system to mock.
let mut env = TestEnv::new();
let mock_store = env.init_object_store_manager();
let path_provider = RegionFilePathFactory {
region_dir: "test".to_string(),
};
let path_provider = RegionFilePathFactory::new("test".to_string());

let local_dir = create_temp_dir("");
let local_store = new_fs_store(local_dir.path().to_str().unwrap());
Expand Down Expand Up @@ -488,9 +485,7 @@ mod tests {
..Default::default()
};
let upload_request = SstUploadRequest {
dest_path_provider: RegionFilePathFactory {
region_dir: data_home.clone(),
},
dest_path_provider: RegionFilePathFactory::new(data_home.clone()),
remote_store: mock_store.clone(),
};

Expand Down
11 changes: 10 additions & 1 deletion src/mito2/src/read/scan_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,6 @@ impl ScanRegion {
}();

let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();

let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();

BloomFilterIndexApplierBuilder::new(
Expand All @@ -499,12 +498,22 @@ impl ScanRegion {
return None;
}

let file_cache = || -> Option<FileCacheRef> {
let write_cache = self.cache_strategy.write_cache()?;
let file_cache = write_cache.file_cache();
Some(file_cache)
}();
let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();

FulltextIndexApplierBuilder::new(
self.access_layer.region_dir().to_string(),
self.version.metadata.region_id,
self.access_layer.object_store().clone(),
self.access_layer.puffin_manager_factory().clone(),
self.version.metadata.as_ref(),
)
.with_file_cache(file_cache)
.with_puffin_metadata_cache(puffin_metadata_cache)
.build(&self.request.filters)
.inspect_err(|err| warn!(err; "Failed to build fulltext index applier"))
.ok()
Expand Down
27 changes: 2 additions & 25 deletions src/mito2/src/sst/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,31 +174,8 @@ impl FileMeta {
.contains(&IndexType::BloomFilterIndex)
}

/// Returns the size of the inverted index file
pub fn inverted_index_size(&self) -> Option<u64> {
if self.available_indexes.len() == 1 && self.inverted_index_available() {
Some(self.index_file_size)
} else {
None
}
}

/// Returns the size of the fulltext index file
pub fn fulltext_index_size(&self) -> Option<u64> {
if self.available_indexes.len() == 1 && self.fulltext_index_available() {
Some(self.index_file_size)
} else {
None
}
}

/// Returns the size of the bloom filter index file
pub fn bloom_filter_index_size(&self) -> Option<u64> {
if self.available_indexes.len() == 1 && self.bloom_filter_index_available() {
Some(self.index_file_size)
} else {
None
}
pub fn index_file_size(&self) -> u64 {
self.index_file_size
}
}

Expand Down
4 changes: 1 addition & 3 deletions src/mito2/src/sst/file_purger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,9 @@ impl FilePurger for LocalFilePurger {
}

// Purges index content in the stager.
let puffin_file_name =
crate::sst::location::index_file_path(sst_layer.region_dir(), file_meta.file_id);
if let Err(e) = sst_layer
.puffin_manager_factory()
.purge_stager(&puffin_file_name)
.purge_stager(file_meta.file_id)
.await
{
error!(e; "Failed to purge stager with index file, file_id: {}, region: {}",
Expand Down
Loading
Loading