Skip to content

Commit

Permalink
Impl cache state save
Browse files Browse the repository at this point in the history
  • Loading branch information
james58899 committed Dec 20, 2024
1 parent 3983144 commit fbb07c7
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 15 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ regex-lite = "0.1.6"
reqwest = { version = "0.12", default-features = false, features = ["rustls-tls", "stream", "socks"] }
rustls = { version = "0.23.18", default-features = false, features = ["brotli", "ring", "std", "tls12", "zlib"] }
scopeguard = "1.2"
serde = { version = "1.0.216", features = ["serde_derive"] }
serde_json = "1.0.133"
tempfile = "3.14.0"
tokio = { version = "1.42.0", features = ["full", "parking_lot"] }
tokio-rustls = { version = "0.26.0", default-features = false, features = ["tls12"] }
Expand Down
74 changes: 69 additions & 5 deletions src/cache_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ use hex::FromHex;
use log::{debug, error, info, warn};
use mime::Mime;
use parking_lot::Mutex;
use serde::{Deserialize, Serialize};
use sha1::{Digest, Sha1};
use tempfile::TempPath;
use tokio::{
fs::{copy, create_dir_all, metadata, read_dir, remove_dir_all, remove_file, rename, DirEntry, File},
io::AsyncReadExt,
io::{AsyncReadExt, AsyncWriteExt},
spawn,
sync::mpsc::{channel, UnboundedSender},
task::spawn_blocking,
Expand All @@ -41,10 +42,12 @@ const SIZE_100MB: u64 = 100 * 1024 * 1024;
pub struct CacheManager {
cache_dir: PathBuf,
cache_state: Mutex<HashMap<String, CacheState>>,
cache_state_path: PathBuf,
temp_dir: PathBuf,
size_limit: AtomicU64,
}

#[derive(Clone, Serialize, Deserialize)]
struct CacheState {
file_count: u64,
size: u64,
Expand Down Expand Up @@ -75,6 +78,7 @@ impl Default for CacheState {

impl CacheManager {
pub async fn new<P: AsRef<Path>>(
data_dir: P,
cache_dir: P,
temp_dir: P,
settings: Arc<Settings>,
Expand All @@ -85,11 +89,13 @@ impl CacheManager {
let new = Arc::new(Self {
cache_dir: cache_dir.as_ref().to_path_buf(),
cache_state: Default::default(),
cache_state_path: data_dir.as_ref().join("cache_state.json"),
temp_dir: temp_dir.as_ref().to_path_buf(),
size_limit: AtomicU64::new(u64::MAX),
});
new.update_settings(settings);

delete_java_cache_data(data_dir).await; // Force official version rescan cache
clean_temp_dir(temp_dir.as_ref()).await;

let manager = new.clone();
Expand All @@ -98,6 +104,20 @@ impl CacheManager {
let free = get_available_space(cache_dir.as_ref());
let low_disk = free.is_some_and(|x| x < SIZE_100MB);

// Load cache state
let old_state = match read_state(&new.cache_state_path).await {
Ok(state) => state,
Err(err) => {
error!("Load cache state error: err={}", err);
HashMap::new()
}
};
for (sr, state) in old_state {
if static_range.contains(&sr) {
new.cache_state.lock().insert(sr, state);
}
}

if low_disk || (verify_cache && !force_background_scan) {
// Low space or force cache check
if low_disk {
Expand Down Expand Up @@ -260,6 +280,26 @@ impl CacheManager {
self.size_limit.store(size_limit, Relaxed);
}

pub async fn save_state(&self) {
let mut file = match File::create(&self.cache_state_path).await {
Ok(file) => file,
Err(err) => {
error!("Create cache state file error: err={}", err);
return;
}
};
let json = match serde_json::to_string(&*self.cache_state.lock()) {
Ok(json) => json,
Err(err) => {
error!("Serialize cache state error: err={}", err);
return;
}
};
if let Err(err) = file.write_all(json.as_bytes()).await {
error!("Write cache state error: err={}", err);
}
}

fn start_background_task(new: Arc<Self>) {
let manager = Arc::downgrade(&new);
spawn(async move {
Expand All @@ -268,6 +308,7 @@ impl CacheManager {
loop {
sleep_until(next_run).await;
if let Some(manager) = manager.upgrade() {
manager.save_state().await;
manager.check_cache_usage().await;
next_run = Instant::now() + Duration::from_secs(600);
} else {
Expand Down Expand Up @@ -308,8 +349,8 @@ impl CacheManager {

let mut hash = l1.file_name().clone();
hash.push(l2.file_name());
if static_range.iter().any(|sr| hash.eq_ignore_ascii_case(sr)) {
dirs.push(l2_path);
if let Some(sr) = static_range.iter().find(|sr| hash.eq_ignore_ascii_case(sr)) {
dirs.push((sr, l2_path));
} else {
warn!("Delete not in static range dir: {}", l2_path.to_str().unwrap_or_default());
if let Err(err) = remove_dir_all(&l2_path).await {
Expand All @@ -323,9 +364,18 @@ impl CacheManager {

let mut counter = 0;
let total = dirs.len();
for dir in dirs.into_iter() {
for (sr, path) in dirs.into_iter() {
// Check old state match
let file_count = ReadDirStream::new(read_dir(&path).await?).count().await as u64;
if file_count == self.cache_state.lock().get(sr).map(|s| s.file_count).unwrap_or(0) {
counter += 1;
continue;
}

// State mismatch, scan dir
self.cache_state.lock().remove(sr); // Clear old state
let btree = Mutex::new(BTreeMap::new());
ReadDirStream::new(read_dir(&dir).await?)
ReadDirStream::new(read_dir(&path).await?)
.for_each_concurrent(parallelism, |entry| async {
if let Err(err) = entry {
error!("Read cache dir error: {}", err);
Expand Down Expand Up @@ -601,6 +651,20 @@ async fn async_filesize_fast(path: &Path, metadata: &Metadata) -> Option<u64> {
}
}

async fn delete_java_cache_data<P: AsRef<Path>>(data_dir: P) {
let base = data_dir.as_ref();
let _ = remove_file(base.join("pcache_info")).await;
let _ = remove_file(base.join("pcache_ages")).await;
let _ = remove_file(base.join("pcache_lru")).await;
}

async fn read_state(path: &Path) -> Result<HashMap<String, CacheState>, Box<dyn std::error::Error>> {
let mut file = File::open(path).await?;
let mut json = String::new();
file.read_to_string(&mut json).await?;
Ok(serde_json::from_str(&json)?)
}

#[cfg(unix)]
async fn fix_permission(path: &Path) {
use std::os::unix::prelude::PermissionsExt;
Expand Down
15 changes: 5 additions & 10 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use regex_lite::Regex;
use reqwest::Proxy;
use tempfile::TempPath;
use tokio::{
fs::{self, remove_file, try_exists, File},
fs::{self, try_exists, File},
io::{stderr, stdin, AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader},
runtime::Handle,
signal,
Expand Down Expand Up @@ -185,8 +185,8 @@ async fn main() -> Result<(), Box<dyn Error>> {
let (shutdown_send, shutdown_recv) = mpsc::unbounded_channel::<()>();
let settings = client.settings();
logger.config().write_info(!settings.disable_logging());
delete_java_cache_data(args.data_dir).await; // Rust cache data incompatible with Java, so we must delete it
let cache_manager = CacheManager::new(
args.data_dir,
args.cache_dir,
args.temp_dir,
settings.clone(),
Expand Down Expand Up @@ -296,6 +296,7 @@ async fn main() -> Result<(), Box<dyn Error>> {

// Schedule task
let client3 = client.clone();
let cache_manager3 = cache_manager.clone();
let keepalive = tokio::spawn(async move {
let mut counter: u32 = 0;
let mut next_run = Instant::now() + Duration::from_secs(10);
Expand All @@ -315,7 +316,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
if counter % 2160 == 2159 {
if let Some(list) = client3.get_purgelist(43200).await {
for info in list.iter().filter_map(CacheFileInfo::from_file_id) {
cache_manager.remove_cache(&info).await;
cache_manager3.remove_cache(&info).await;
}
}
}
Expand All @@ -337,6 +338,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
info!("Shutdown in progress - please wait");
sleep(Duration::from_secs(15)).await;
server.shutdown().await;
cache_manager.save_state().await;
logger.shutdown().await;
Ok(())
}
Expand Down Expand Up @@ -409,13 +411,6 @@ After registering, enter your ID and Key below to start your client.
Ok((id, key))
}

async fn delete_java_cache_data<P: AsRef<Path>>(data_dir: P) {
let base = data_dir.as_ref();
let _ = remove_file(base.join("pcache_info")).await;
let _ = remove_file(base.join("pcache_ages")).await;
let _ = remove_file(base.join("pcache_lru")).await;
}

#[cfg(unix)]
async fn wait_shutdown_signal(mut shutdown_channel: UnboundedReceiver<()>) {
use tokio::signal::unix::{signal, SignalKind};
Expand Down

0 comments on commit fbb07c7

Please sign in to comment.