Skip to content

Commit

Permalink
Optimize cache scanning
Browse files Browse the repository at this point in the history
  • Loading branch information
james58899 committed Dec 23, 2024
1 parent 93861f6 commit 5e3ab40
Showing 1 changed file with 53 additions and 84 deletions.
137 changes: 53 additions & 84 deletions src/cache_manager.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::{
collections::{BTreeMap, HashMap},
ffi::OsStr,
collections::HashMap,
fmt::Display,
fs::Metadata,
io::Error,
Expand All @@ -16,7 +15,7 @@ use async_stream::stream;
use bytes::{Bytes, BytesMut};
use filesize::{file_real_size, file_real_size_fast};
use filetime::{set_file_mtime, FileTime};
use futures::{stream, Stream, StreamExt, TryFutureExt};
use futures::{stream, Stream, StreamExt, TryFutureExt, TryStreamExt};
use hex::FromHex;
use log::{debug, error, info, warn};
use mime::Mime;
Expand Down Expand Up @@ -368,22 +367,25 @@ impl CacheManager {
let total = dirs.len();
for (sr, path) in dirs.into_iter() {
// Check old state match
let file_count = ReadDirStream::new(read_dir(&path).await?).count().await as u64;
let file_count = match open_dir(&path).await {
Some(stream) => stream.filter_map(|e| async { e.ok() }).count().await as u64,
None => continue,
};
if file_count == self.cache_state.lock().get(sr).map(|s| s.file_count).unwrap_or(0) {
counter += 1;
continue;
}

// State mismatch, scan dir
self.cache_state.lock().remove(sr); // Clear old state
let btree = Mutex::new(BTreeMap::new());
ReadDirStream::new(read_dir(&path).await?)
.for_each_concurrent(parallelism, |entry| async {
if let Err(err) = entry {
error!("Read cache dir error: {}", err);
return;
}
let entry = entry.unwrap();

let files = match open_dir(&path).await {
Some(mut stream) => list_files(&mut stream).await,
None => continue,
};

stream::iter(files)
.for_each_concurrent(parallelism, |entry| async move {
let path = entry.path();
let metadata = entry.metadata().await;
if let Err(err) = metadata {
Expand Down Expand Up @@ -415,45 +417,13 @@ impl CacheManager {
}

if verify_cache {
// We need verify cache integrity, save path to btree and delay size calculation
let mut btree = btree.lock();
let inode = get_inode(&metadata).unwrap_or(btree.len() as u64); // sort by inode or sequential
btree.insert(inode, path);
return;
}

// Update cache state
if let Some(size) = async_filesize_fast(&path, &metadata).await {
let mtime = FileTime::from_last_modification_time(&metadata).unix_seconds();
let mut cache_state = self.cache_state.lock();
let state = cache_state.entry(info.static_range()).or_default();
state.add_file(size);
if state.oldest > mtime {
state.oldest = mtime;
}
}
})
.await;

// Verify cache integrity
if verify_cache {
stream::iter(btree.into_inner().into_values())
.for_each_concurrent(parallelism, |path| async {
let path = path;
let mut file = match File::open(&path).await {
Ok(file) => file,
Err(err) => {
error!("Open cache file {:?} error: {}", path, err);
return;
}
};
let info = match path.file_name().and_then(OsStr::to_str).and_then(CacheFileInfo::from_file_id) {
Some(info) => info,
None => {
warn!("Failed to parse cache info: {:?}", path);
return;
}
};
let mut hasher = Sha1::new();
let mut buf = vec![0; 1024 * 1024]; // 1MiB
loop {
Expand All @@ -478,22 +448,20 @@ impl CacheManager {
}
return;
}
}

// File is correct, update cache state
if let Ok(metadata) = file.metadata().await {
if let Some(size) = async_filesize_fast(&path, &metadata).await {
let mtime = FileTime::from_last_modification_time(&metadata).unix_seconds();
let mut cache_state = self.cache_state.lock();
let state = cache_state.entry(info.static_range()).or_default();
state.add_file(size);
if state.oldest > mtime {
state.oldest = mtime;
}
}
// Update cache state
if let Some(size) = async_filesize_fast(&path, &metadata).await {
let mtime = FileTime::from_last_modification_time(&metadata).unix_seconds();
let mut cache_state = self.cache_state.lock();
let state = cache_state.entry(info.static_range()).or_default();
state.add_file(size);
if state.oldest > mtime {
state.oldest = mtime;
}
})
.await;
}
}
})
.await;

// Scan progress
counter += 1;
Expand Down Expand Up @@ -559,22 +527,14 @@ impl CacheManager {
}

// List files
let files = read_dir(&target_dir).map_ok(ReadDirStream::new).await;
if let Err(err) = files {
error!("Read cache dir {:?} error: {}", target_dir, err);
break;
}
let files = match open_dir(&target_dir).await {
Some(mut stream) => list_files(&mut stream).await,
None => break,
};

// Sort by mtime
let mut files: Vec<(DirEntry, FileTime, Metadata)> = files
.unwrap()
let mut files: Vec<(DirEntry, FileTime, Metadata)> = stream::iter(files)
.filter_map(|entry| async move {
if let Err(err) = entry {
error!("Read cache file error: {}", err);
return None;
}

let entry = entry.ok()?;
let metadata = match entry.metadata().await {
Ok(metadata) => metadata,
Err(err) => {
Expand Down Expand Up @@ -653,6 +613,28 @@ async fn async_filesize_fast(path: &Path, metadata: &Metadata) -> Option<u64> {
}
}

async fn open_dir(path: &Path) -> Option<ReadDirStream> {
read_dir(path)
.map_ok(ReadDirStream::new)
.await
.inspect_err(|err| error!("Read cache dir error: path={:?}, err={}", path, err))
.ok()
}

async fn list_files(stream: &mut ReadDirStream) -> Vec<DirEntry> {
let mut files: Vec<DirEntry> = stream
.inspect_err(|err| error!("List cache file error: {}", err))
.filter_map(|entry| async { entry.ok() })
.collect()
.await;

if cfg!(unix) {
files.sort_by_key(|f| f.ino());
}

files
}

async fn delete_java_cache_data<P: AsRef<Path>>(data_dir: P) {
let base = data_dir.as_ref();
let _ = remove_file(base.join("pcache_info")).await;
Expand Down Expand Up @@ -710,19 +692,6 @@ fn get_available_space(path: &Path) -> Option<u64> {
None // Not support
}

#[cfg(unix)]
fn get_inode(metadata: &Metadata) -> Option<u64> {
use std::os::unix::fs::MetadataExt;

Some(metadata.ino())
}

#[cfg(not(unix))]
fn get_inode(_metadata: &Metadata) -> Option<u64> {
// Not support
None
}

async fn clean_temp_dir(path: &Path) {
info!("Deleting old temp files");

Expand Down

0 comments on commit 5e3ab40

Please sign in to comment.