From fbb07c718aae686a514a9bb9634d51882513dcec Mon Sep 17 00:00:00 2001 From: james58899 Date: Fri, 20 Dec 2024 07:38:06 +0000 Subject: [PATCH] Impl cache state save --- Cargo.lock | 2 ++ Cargo.toml | 2 ++ src/cache_manager.rs | 74 +++++++++++++++++++++++++++++++++++++++++--- src/main.rs | 15 +++------ 4 files changed, 78 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 10348a5..e6d5d01 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1902,6 +1902,8 @@ dependencies = [ "rustix", "rustls", "scopeguard", + "serde", + "serde_json", "sha1", "tao", "tempfile", diff --git a/Cargo.toml b/Cargo.toml index 71a1473..621cddf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,6 +33,8 @@ regex-lite = "0.1.6" reqwest = { version = "0.12", default-features = false, features = ["rustls-tls", "stream", "socks"] } rustls = { version = "0.23.18", default-features = false, features = ["brotli", "ring", "std", "tls12", "zlib"] } scopeguard = "1.2" +serde = { version = "1.0.216", features = ["serde_derive"] } +serde_json = "1.0.133" tempfile = "3.14.0" tokio = { version = "1.42.0", features = ["full", "parking_lot"] } tokio-rustls = { version = "0.26.0", default-features = false, features = ["tls12"] } diff --git a/src/cache_manager.rs b/src/cache_manager.rs index 3384ba5..10c8f8a 100644 --- a/src/cache_manager.rs +++ b/src/cache_manager.rs @@ -21,11 +21,12 @@ use hex::FromHex; use log::{debug, error, info, warn}; use mime::Mime; use parking_lot::Mutex; +use serde::{Deserialize, Serialize}; use sha1::{Digest, Sha1}; use tempfile::TempPath; use tokio::{ fs::{copy, create_dir_all, metadata, read_dir, remove_dir_all, remove_file, rename, DirEntry, File}, - io::AsyncReadExt, + io::{AsyncReadExt, AsyncWriteExt}, spawn, sync::mpsc::{channel, UnboundedSender}, task::spawn_blocking, @@ -41,10 +42,12 @@ const SIZE_100MB: u64 = 100 * 1024 * 1024; pub struct CacheManager { cache_dir: PathBuf, cache_state: Mutex>, + cache_state_path: PathBuf, temp_dir: PathBuf, size_limit: AtomicU64, } +#[derive(Clone, Serialize, Deserialize)] struct CacheState { file_count: u64, size: u64, @@ -75,6 +78,7 @@ impl Default for CacheState { impl CacheManager { pub async fn new>( + data_dir: P, cache_dir: P, temp_dir: P, settings: Arc, @@ -85,11 +89,13 @@ impl CacheManager { let new = Arc::new(Self { cache_dir: cache_dir.as_ref().to_path_buf(), cache_state: Default::default(), + cache_state_path: data_dir.as_ref().join("cache_state.json"), temp_dir: temp_dir.as_ref().to_path_buf(), size_limit: AtomicU64::new(u64::MAX), }); new.update_settings(settings); + delete_java_cache_data(data_dir).await; // Force official version rescan cache clean_temp_dir(temp_dir.as_ref()).await; let manager = new.clone(); @@ -98,6 +104,20 @@ impl CacheManager { let free = get_available_space(cache_dir.as_ref()); let low_disk = free.is_some_and(|x| x < SIZE_100MB); + // Load cache state + let old_state = match read_state(&new.cache_state_path).await { + Ok(state) => state, + Err(err) => { + error!("Load cache state error: err={}", err); + HashMap::new() + } + }; + for (sr, state) in old_state { + if static_range.contains(&sr) { + new.cache_state.lock().insert(sr, state); + } + } + if low_disk || (verify_cache && !force_background_scan) { // Low space or force cache check if low_disk { @@ -260,6 +280,26 @@ impl CacheManager { self.size_limit.store(size_limit, Relaxed); } + pub async fn save_state(&self) { + let mut file = match File::create(&self.cache_state_path).await { + Ok(file) => file, + Err(err) => { + error!("Create cache state file error: err={}", err); + return; + } + }; + let json = match serde_json::to_string(&*self.cache_state.lock()) { + Ok(json) => json, + Err(err) => { + error!("Serialize cache state error: err={}", err); + return; + } + }; + if let Err(err) = file.write_all(json.as_bytes()).await { + error!("Write cache state error: err={}", err); + } + } + fn start_background_task(new: Arc) { let manager = Arc::downgrade(&new); spawn(async move { @@ -268,6 +308,7 @@ impl CacheManager { loop { sleep_until(next_run).await; if let Some(manager) = manager.upgrade() { + manager.save_state().await; manager.check_cache_usage().await; next_run = Instant::now() + Duration::from_secs(600); } else { @@ -308,8 +349,8 @@ impl CacheManager { let mut hash = l1.file_name().clone(); hash.push(l2.file_name()); - if static_range.iter().any(|sr| hash.eq_ignore_ascii_case(sr)) { - dirs.push(l2_path); + if let Some(sr) = static_range.iter().find(|sr| hash.eq_ignore_ascii_case(sr)) { + dirs.push((sr, l2_path)); } else { warn!("Delete not in static range dir: {}", l2_path.to_str().unwrap_or_default()); if let Err(err) = remove_dir_all(&l2_path).await { @@ -323,9 +364,18 @@ impl CacheManager { let mut counter = 0; let total = dirs.len(); - for dir in dirs.into_iter() { + for (sr, path) in dirs.into_iter() { + // Check old state match + let file_count = ReadDirStream::new(read_dir(&path).await?).count().await as u64; + if file_count == self.cache_state.lock().get(sr).map(|s| s.file_count).unwrap_or(0) { + counter += 1; + continue; + } + + // State mismatch, scan dir + self.cache_state.lock().remove(sr); // Clear old state let btree = Mutex::new(BTreeMap::new()); - ReadDirStream::new(read_dir(&dir).await?) + ReadDirStream::new(read_dir(&path).await?) .for_each_concurrent(parallelism, |entry| async { if let Err(err) = entry { error!("Read cache dir error: {}", err); @@ -601,6 +651,20 @@ async fn async_filesize_fast(path: &Path, metadata: &Metadata) -> Option { } } +async fn delete_java_cache_data>(data_dir: P) { + let base = data_dir.as_ref(); + let _ = remove_file(base.join("pcache_info")).await; + let _ = remove_file(base.join("pcache_ages")).await; + let _ = remove_file(base.join("pcache_lru")).await; +} + +async fn read_state(path: &Path) -> Result, Box> { + let mut file = File::open(path).await?; + let mut json = String::new(); + file.read_to_string(&mut json).await?; + Ok(serde_json::from_str(&json)?) +} + #[cfg(unix)] async fn fix_permission(path: &Path) { use std::os::unix::prelude::PermissionsExt; diff --git a/src/main.rs b/src/main.rs index 2ed3fb0..48ddec6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -15,7 +15,7 @@ use regex_lite::Regex; use reqwest::Proxy; use tempfile::TempPath; use tokio::{ - fs::{self, remove_file, try_exists, File}, + fs::{self, try_exists, File}, io::{stderr, stdin, AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader}, runtime::Handle, signal, @@ -185,8 +185,8 @@ async fn main() -> Result<(), Box> { let (shutdown_send, shutdown_recv) = mpsc::unbounded_channel::<()>(); let settings = client.settings(); logger.config().write_info(!settings.disable_logging()); - delete_java_cache_data(args.data_dir).await; // Rust cache data incompatible with Java, so we must delete it let cache_manager = CacheManager::new( + args.data_dir, args.cache_dir, args.temp_dir, settings.clone(), @@ -296,6 +296,7 @@ async fn main() -> Result<(), Box> { // Schedule task let client3 = client.clone(); + let cache_manager3 = cache_manager.clone(); let keepalive = tokio::spawn(async move { let mut counter: u32 = 0; let mut next_run = Instant::now() + Duration::from_secs(10); @@ -315,7 +316,7 @@ async fn main() -> Result<(), Box> { if counter % 2160 == 2159 { if let Some(list) = client3.get_purgelist(43200).await { for info in list.iter().filter_map(CacheFileInfo::from_file_id) { - cache_manager.remove_cache(&info).await; + cache_manager3.remove_cache(&info).await; } } } @@ -337,6 +338,7 @@ async fn main() -> Result<(), Box> { info!("Shutdown in progress - please wait"); sleep(Duration::from_secs(15)).await; server.shutdown().await; + cache_manager.save_state().await; logger.shutdown().await; Ok(()) } @@ -409,13 +411,6 @@ After registering, enter your ID and Key below to start your client. Ok((id, key)) } -async fn delete_java_cache_data>(data_dir: P) { - let base = data_dir.as_ref(); - let _ = remove_file(base.join("pcache_info")).await; - let _ = remove_file(base.join("pcache_ages")).await; - let _ = remove_file(base.join("pcache_lru")).await; -} - #[cfg(unix)] async fn wait_shutdown_signal(mut shutdown_channel: UnboundedReceiver<()>) { use tokio::signal::unix::{signal, SignalKind};