Skip to content

Commit

Permalink
Refactor cache scan
Browse files Browse the repository at this point in the history
Parallel scanning the same directory to improve efficiency
  • Loading branch information
james58899 committed May 30, 2024
1 parent d955b92 commit 07bd74b
Showing 1 changed file with 53 additions and 58 deletions.
111 changes: 53 additions & 58 deletions src/cache_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{
io::Error,
path::{Path, PathBuf},
sync::{
atomic::{AtomicU64, AtomicUsize, Ordering::Relaxed},
atomic::{AtomicU64, Ordering::Relaxed},
Arc,
},
time::{Duration, SystemTime},
Expand Down Expand Up @@ -282,31 +282,35 @@ impl CacheManager {
debug!("Cache dir number: {}", &dirs.len());

let lru_cutoff = FileTime::from_system_time(SystemTime::now() - Duration::from_secs(60 * 60 * 24 * 7)); // 1 week
let counter = &AtomicUsize::new(0);
let mut counter = 0;
let total = dirs.len();
let scan_task = stream::iter(dirs)
.map(|dir| async move {
let mut time = FileTime::now();
let mut stream = read_dir(&dir).await?;
let mut btree = BTreeMap::new();
while let Some(entry) = stream.next_entry().await? {
for dir in dirs.into_iter() {
let time = Mutex::new(FileTime::now());
let btree = Mutex::new(BTreeMap::new());
ReadDirStream::new(read_dir(&dir).await?)
.for_each_concurrent(parallelism, |entry| async {
if let Err(err) = entry {
error!("Read cache dir error: {}", err);
return;
}
let entry = entry.unwrap();
let path = entry.path();
let metadata = entry.metadata().await;
if let Err(err) = metadata {
error!("Read cache file metadata error: path={:?}, err={}", path, err);
continue;
return;
}
let metadata = metadata.unwrap();
if metadata.is_dir() {
warn!("Found unexpected dir in cache dir: {:?}", path);
continue;
return;
}

// Parse info
let info = entry.file_name().to_str().and_then(CacheFileInfo::from_file_id);
if info.is_none() {
warn!("Invalid cache file: {:?}", path);
continue;
return;
}
let info = info.unwrap();

Expand All @@ -322,41 +326,49 @@ impl CacheManager {
if let Err(err) = remove_file(&path).await {
error!("Delete corrupt cache file error: path={:?}, err={}", &path, err);
}
continue;
return;
}

if verify_cache {
// We need verify cache integrity, save path to btree and delay size calculation
let mut btree = btree.lock();
let inode = get_inode(&metadata).unwrap_or(btree.len() as u64); // sort by inode or sequential
btree.insert(inode, path);
} else {
self.total_size.fetch_add(file_real_size_fast(&path, &metadata)?, Relaxed);
if let Ok(size) = file_real_size_fast(&path, &metadata) {
self.total_size.fetch_add(size, Relaxed);
}

// Add recently accessed file to the cache.
let mtime = FileTime::from_last_modification_time(&metadata);
if mtime > lru_cutoff {
self.mark_recently_accessed(&info, false).await;
}
if mtime < time {
time = mtime;
let mut time = time.lock();
if mtime < *time {
*time = mtime;
}
}
}
})
.await;

if verify_cache {
while let Some((_, path)) = btree.pop_first() {
// Verify cache integrity
if verify_cache {
stream::iter(btree.into_inner().into_values())
.for_each_concurrent(parallelism, |path| async {
let path = path;
let mut file = match File::open(&path).await {
Ok(file) => file,
Err(err) => {
error!("Open cache file {} error: {}", path.display(), err);
continue;
return;
}
};
let info = match path.file_name().and_then(OsStr::to_str).and_then(CacheFileInfo::from_file_id) {
Some(info) => info,
None => {
warn!("Failed to parse cache info: {:?}", path);
continue;
return;
}
};
let mut hasher = Sha1::new();
Expand All @@ -370,7 +382,7 @@ impl CacheManager {
hasher.update(&buf[0..n]);
}
Err(e) => {
error!("Read cache file {} error: {}", &path.display(), e);
error!("Read cache file {} error: {}", path.display(), e);
break;
}
}
Expand All @@ -384,55 +396,46 @@ impl CacheManager {
if let Err(err) = remove_file(&path).await {
error!("Delete corrupt cache file error: path={:?}, err={}", path, err);
}
continue;
return;
}

// File is correct, calculate size
if let Ok(metadata) = file.metadata().await {
self.total_size.fetch_add(file_real_size_fast(&path, &metadata)?, Relaxed);
if let Ok(size) = file_real_size_fast(&path, &metadata) {
self.total_size.fetch_add(size, Relaxed);
}

// Add recently accessed file to the cache.
let mtime = FileTime::from_last_modification_time(&metadata);
if mtime > lru_cutoff {
self.mark_recently_accessed(&info, false).await;
}
if mtime < time {
time = mtime;
let mut time = time.lock();
if mtime < *time {
*time = mtime;
}
}
}
}
})
.await;
}

let count = counter.fetch_add(1, Relaxed) + 1;
if count % 100 == 0 || count == total {
info!("Scanned {}/{} static ranges.", count, total);
}
// Scan progress
counter += 1;
if counter % 100 == 0 || counter == total {
info!("Scanned {}/{} static ranges.", counter, total);
}

Ok::<(PathBuf, FileTime), Error>((dir, time))
})
.buffer_unordered(parallelism)
.collect::<Vec<_>>()
.await;
// Save mtime
self.cache_date.lock().insert(dir, time.into_inner());
}

if counter.load(Relaxed) == 0 && static_range.len() > 20 {
if counter == 0 && static_range.len() > 20 {
error!(
"This client has static ranges assigned to it, but the cache is empty. Check file permissions and file system integrity."
);
return Err(Error::new(std::io::ErrorKind::NotFound, "Cache is empty."));
}

// Save oldest mtime
let mut map = self.cache_date.lock();
map.clear();
for task in scan_task {
match task {
Ok((dir, time)) => {
map.insert(dir, time);
}
Err(err) => error!("Scan cache dir error: {}", err),
}
}

info!("Finished cache scan. Cache size: {}", self.total_size.load(Relaxed));

Ok(())
Expand Down Expand Up @@ -597,15 +600,7 @@ fn get_inode(metadata: &Metadata) -> Option<u64> {
Some(metadata.ino())
}

#[cfg(windows)]
fn get_inode(metadata: &Metadata) -> Option<u64> {
use std::os::windows::fs::MetadataExt;

// On windows file_index not stable yet, use ctime instead
Some(metadata.creation_time())
}

#[cfg(not(any(unix, windows)))]
#[cfg(not(unix))]
fn get_inode(_metadata: &Metadata) -> Option<u64> {
// Not support
None
Expand Down

0 comments on commit 07bd74b

Please sign in to comment.