Skip to content

Commit

Permalink
feat: remove files from the write cache in purger (#4655)
Browse files Browse the repository at this point in the history
* feat: remove files from the write cache in purger

* chore: fix typo
  • Loading branch information
evenyag authored Aug 31, 2024
1 parent 68b59e0 commit 8eda36b
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 4 deletions.
2 changes: 1 addition & 1 deletion src/catalog/src/kvbackend/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ struct SystemCatalog {
catalog_cache: Cache<String, Arc<InformationSchemaProvider>>,
pg_catalog_cache: Cache<String, Arc<PGCatalogProvider>>,

// system_schema_provier for default catalog
// system_schema_provider for default catalog
information_schema_provider: Arc<InformationSchemaProvider>,
pg_catalog_provider: Arc<PGCatalogProvider>,
backend: KvBackendRef,
Expand Down
2 changes: 1 addition & 1 deletion src/datanode/src/region_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -860,7 +860,7 @@ impl RegionServerInner {
// complains "higher-ranked lifetime error". Rust can't prove some future is legit.
// Possible related issue: https://github.com/rust-lang/rust/issues/102211
//
// The walkaround is to put the async functions in the `common_runtime::spawn_global`. Or like
// The workaround is to put the async functions in the `common_runtime::spawn_global`. Or like
// it here, collect the values first then use later separately.

let regions = self
Expand Down
1 change: 0 additions & 1 deletion src/mito2/src/cache/file_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,6 @@ impl FileCache {
}
}

#[allow(unused)]
/// Removes a file from the cache explicitly.
pub(crate) async fn remove(&self, key: IndexKey) {
let file_path = self.cache_file_path(key);
Expand Down
12 changes: 12 additions & 0 deletions src/mito2/src/cache/write_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,11 @@ impl WriteCache {
Ok(Some(sst_info))
}

/// Removes a file from the cache by `index_key`.
pub(crate) async fn remove(&self, index_key: IndexKey) {
self.file_cache.remove(index_key).await
}

/// Downloads a file in `remote_path` from the remote object store to the local cache
/// (specified by `index_key`).
pub(crate) async fn download(
Expand Down Expand Up @@ -424,6 +429,13 @@ mod tests {
.await
.unwrap();
assert_eq!(remote_index_data.to_vec(), cache_index_data.to_vec());

// Removes the file from the cache.
let sst_index_key = IndexKey::new(region_id, file_id, FileType::Parquet);
write_cache.remove(sst_index_key).await;
assert!(!write_cache.file_cache.contains_key(&sst_index_key));
write_cache.remove(index_key).await;
assert!(!write_cache.file_cache.contains_key(&index_key));
}

#[tokio::test]
Expand Down
26 changes: 25 additions & 1 deletion src/mito2/src/sst/file_purger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::sync::Arc;
use common_telemetry::{error, info};

use crate::access_layer::AccessLayerRef;
use crate::cache::file_cache::{FileType, IndexKey};
use crate::cache::CacheManagerRef;
use crate::schedule::scheduler::SchedulerRef;
use crate::sst::file::FileMeta;
Expand Down Expand Up @@ -77,16 +78,39 @@ impl FilePurger for LocalFilePurger {
cache.remove_parquet_meta_data(file_meta.region_id, file_meta.file_id);
}

let cache_manager = self.cache_manager.clone();
if let Err(e) = self.scheduler.schedule(Box::pin(async move {
if let Err(e) = sst_layer.delete_sst(&file_meta).await {
error!(e; "Failed to delete SST file, file_id: {}, region: {}",
error!(e; "Failed to delete SST file, file_id: {}, region: {}",
file_meta.file_id, file_meta.region_id);
} else {
info!(
"Successfully deleted SST file, file_id: {}, region: {}",
file_meta.file_id, file_meta.region_id
);
}

if let Some(write_cache) = cache_manager.as_ref().and_then(|cache| cache.write_cache())
{
// Removes the inverted index from the cache.
if file_meta.inverted_index_available() {
write_cache
.remove(IndexKey::new(
file_meta.region_id,
file_meta.file_id,
FileType::Puffin,
))
.await;
}
// Remove the SST file from the cache.
write_cache
.remove(IndexKey::new(
file_meta.region_id,
file_meta.file_id,
FileType::Parquet,
))
.await;
}
})) {
error!(e; "Failed to schedule the file purge request");
}
Expand Down

0 comments on commit 8eda36b

Please sign in to comment.