diff --git a/Cargo.lock b/Cargo.lock index 6fb306f..8faba17 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1909,8 +1909,8 @@ dependencies = [ "tokio", "tokio-rustls", "tokio-stream", + "tokio-util", "tower", - "tower-http", "tray-icon", "unicode-segmentation", "vergen-gix", @@ -1996,12 +1996,6 @@ dependencies = [ "pin-project-lite", ] -[[package]] -name = "http-range-header" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "08a397c49fec283e3d6211adbe480be95aae5f304cfb923e9970e08956d5168a" - [[package]] name = "httparse" version = "1.9.5" @@ -2349,16 +2343,6 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" -[[package]] -name = "mime_guess" -version = "2.0.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7c44f8e672c00fe5308fa235f821cb4198414e1c77935c1ab6948d3fd78550e" -dependencies = [ - "mime", - "unicase", -] - [[package]] name = "miniz_oxide" version = "0.8.0" @@ -3191,6 +3175,16 @@ dependencies = [ "cfg-if", "cpufeatures", "digest", + "sha1-asm", +] + +[[package]] +name = "sha1-asm" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "286acebaf8b67c1130aedffad26f594eff0c1292389158135327d2e23aed582b" +dependencies = [ + "cc", ] [[package]] @@ -3625,9 +3619,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.12" +version = "0.7.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61e7c3654c13bcd040d4a03abee2c75b1d14a37b423cf5a813ceae1cc903ec6a" +checksum = "d7fcaa8d55a2bdd6b83ace262b016eca0d79ee02818c5c1bcdf0305114081078" dependencies = [ "bytes", "futures-core", @@ -3696,31 +3690,6 @@ dependencies = [ "tower-service", ] -[[package]] -name = "tower-http" -version = "0.6.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "403fa3b783d4b626a8ad51d766ab03cb6d2dbfc46b1c5d4448395e6628dc9697" -dependencies = [ - "bitflags 2.6.0", - "bytes", - "futures-util", - "http", - "http-body", - "http-body-util", - "http-range-header", - "httpdate", - "mime", - "mime_guess", - "percent-encoding", - "pin-project-lite", - "tokio", - "tokio-util", - "tower-layer", - "tower-service", - "tracing", -] - [[package]] name = "tower-layer" version = "0.3.3" @@ -3784,12 +3753,6 @@ version = "1.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825" -[[package]] -name = "unicase" -version = "2.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e51b68083f157f853b6379db119d1c1be0e6e4dec98101079dec41f6f5cf6df" - [[package]] name = "unicode-bom" version = "2.0.3" diff --git a/Cargo.toml b/Cargo.toml index 54e67d3..dcece99 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,13 +33,13 @@ regex-lite = "0.1.6" reqwest = { version = "0.12", default-features = false, features = ["rustls-tls", "stream", "socks"] } rustls = { version = "0.23.18", default-features = false, features = ["brotli", "ring", "std", "tls12", "zlib"] } scopeguard = "1.2" -sha1 = { version = "0.10.6", default-features = false, features = ["oid"] } +sha1 = { version = "0.10.6", default-features = false, features = ["asm", "oid"] } tempfile = "3.10" tokio = { version = "1", features = ["full", "parking_lot"] } tokio-rustls = { version = "0.26.0", default-features = false, features = ["tls12"] } tokio-stream = { version = "0.1", default-features = false, features = ["fs"] } -tower = { version = "0.5.1", features = ["util", "timeout"] } -tower-http = { version = "0.6.1", features = ["fs"] } +tokio-util = { version = "0.7.13", features = ["io"] } +tower = { version = "0.5.1", features = ["timeout"] } unicode-segmentation = "1.11.0" webpki-roots = "0.26.3" x509-cert = { version = "0.2.5", default-features = false } diff --git a/src/cache_manager.rs b/src/cache_manager.rs index 6c0b4aa..1c53781 100644 --- a/src/cache_manager.rs +++ b/src/cache_manager.rs @@ -12,36 +12,35 @@ use std::{ time::{Duration, SystemTime}, }; +use async_stream::stream; +use bytes::{Bytes, BytesMut}; use filesize::{file_real_size, file_real_size_fast}; use filetime::{set_file_mtime, FileTime}; -use futures::{stream, StreamExt, TryFutureExt}; +use futures::{stream, Stream, StreamExt, TryFutureExt}; use hex::FromHex; use log::{debug, error, info, warn}; use mime::Mime; -use parking_lot::{Mutex, RwLock}; -use rand::{rngs::SmallRng, Rng, SeedableRng}; +use parking_lot::Mutex; use sha1::{Digest, Sha1}; use tempfile::TempPath; use tokio::{ fs::{copy, create_dir_all, metadata, read_dir, remove_dir_all, remove_file, rename, DirEntry, File}, io::AsyncReadExt, spawn, - sync::mpsc::UnboundedSender, + sync::mpsc::{channel, UnboundedSender}, task::spawn_blocking, time::{sleep_until, Instant}, }; use tokio_stream::wrappers::ReadDirStream; +use tokio_util::io::ReaderStream; use crate::rpc::{InitSettings, Settings}; -const LRU_SIZE: usize = 1048576; // u16 * LRU_SIZE = 2MiB const SIZE_100MB: u64 = 100 * 1024 * 1024; pub struct CacheManager { cache_dir: PathBuf, cache_date: Mutex>, - lru_cache: RwLock>, - lru_clear_pos: Mutex, temp_dir: PathBuf, total_size: Arc, size_limit: AtomicU64, @@ -59,8 +58,6 @@ impl CacheManager { let new = Arc::new(Self { cache_dir: cache_dir.as_ref().to_path_buf(), cache_date: Mutex::new(HashMap::with_capacity(6000)), - lru_cache: RwLock::new(vec![0; LRU_SIZE]), - lru_clear_pos: Mutex::new(SmallRng::from_entropy().gen_range(0..LRU_SIZE)), temp_dir: temp_dir.as_ref().to_path_buf(), total_size: Arc::new(AtomicU64::new(0)), size_limit: AtomicU64::new(u64::MAX), @@ -120,13 +117,76 @@ impl CacheManager { .unwrap() } - pub async fn get_file(&self, info: &CacheFileInfo) -> Option { - let file = info.get_file(&self.cache_dir).await; - if file.is_some() { - self.mark_recently_accessed(info, true).await; + pub async fn get_file(self: &Arc, info: &CacheFileInfo) -> Option>> { + let path = info.to_path(&self.cache_dir); + let metadata = metadata(&path).await.ok()?; + + // Check file exists + if !metadata.is_file() || metadata.len() != info.size() as u64 { + warn!( + "Unexcepted cache file metadata: type={:?}, size={}", + metadata.file_type(), + metadata.len() + ); + return None; + } + + let mut file = match File::open(&path).await { + Ok(file) => file, + Err(err) => { + error!("Open cache file error: path={:?}, err={}", &path, err); + return None; + } + }; + + // Skip check if file is recently accessed + let one_week_ago = SystemTime::now() - Duration::from_secs(60 * 60 * 24 * 7); + if FileTime::from_last_modification_time(&metadata) >= one_week_ago.into() { + return Some(ReaderStream::with_capacity(file, 64 * 1024).boxed()); } - file + self.mark_recently_accessed(info).await; + let cache_manager = self.clone(); + let info = info.clone(); + let (tx, mut rx) = channel::>(1); + tokio::spawn(async move { + let file_size = metadata.len(); + let mut buffer = BytesMut::with_capacity(64 * 1024); // 64 KiB + let mut read_off = 0; + let mut hasher = Sha1::new(); + + while read_off < file_size { + buffer.reserve(64 * 1024); + match file.read_buf(&mut buffer).await { + Ok(s) => read_off += s as u64, + Err(err) => { + let _ = tx.send(Err(err)).await; + } + }; + hasher.update(&buffer); + let _ = tx.send(Ok(buffer.split().freeze())).await; + } + + let hash: [u8; 20] = hasher.finalize().into(); + if hash != info.hash() { + warn!( + "Delete corrupt cache file: path={:?}, hash={:x?}, actual={:x?}", + &path, + info.hash(), + hash + ); + cache_manager.remove_cache(&info).await; + } + }); + + Some( + stream! { + while let Some(item) = rx.recv().await { + yield item; + } + } + .boxed(), + ) } pub async fn import_cache(&self, info: &CacheFileInfo, file_path: &TempPath) { @@ -157,8 +217,6 @@ impl CacheManager { // Fix permission fix_permission(&path).await; - self.mark_recently_accessed(info, false).await; - if let Some(size) = async_filesize(&path).await { self.total_size.fetch_add(size, Relaxed); } @@ -184,37 +242,18 @@ impl CacheManager { let size_limit = settings.size_limit(); info!("Set size limit to {:} GiB", size_limit / 1024 / 1024 / 1024); self.size_limit.store(size_limit, Relaxed); - - let disable_lru = settings.disable_lru_cache(); - let disabled = self.lru_cache.read().is_empty(); - if disable_lru && !disabled { - info!("Disable LRU cache"); - let mut cache = self.lru_cache.write(); - cache.clear(); - cache.shrink_to_fit(); - } else if !disable_lru && disabled { - info!("Enable LRU cache"); - self.lru_cache.write().resize(LRU_SIZE, 0); - } } fn start_background_task(new: Arc) { let manager = Arc::downgrade(&new); spawn(async move { - let mut counter: u32 = 0; + // Check cache size every 10min let mut next_run = Instant::now(); loop { sleep_until(next_run).await; if let Some(manager) = manager.upgrade() { - // Cycle LRU cache - manager.cycle_lru_cache(); - // Check cache size every 10min - if counter % 60 == 0 { - manager.check_cache_usage().await; - } - - counter = counter.wrapping_add(1); - next_run = Instant::now() + Duration::from_secs(10); + manager.check_cache_usage().await; + next_run = Instant::now() + Duration::from_secs(600); } else { break; } @@ -222,37 +261,14 @@ impl CacheManager { }); } - async fn mark_recently_accessed(&self, info: &CacheFileInfo, update_file: bool) { - let hash = info.hash(); - let index = (u32::from_be_bytes([0, hash[2], hash[3], hash[4]]) >> 4) as usize; - let bitmask: u16 = 1 << (hash[4] & 0b0000_1111); - - // Check if the file is already in the cache. - if self.lru_cache.read().get(index).map(|bit| bit & bitmask != 0).unwrap_or(false) { - // Already marked, return - return; - } - - // Mark the file as recently accessed. - if let Some(bit) = self.lru_cache.write().get_mut(index) { - *bit |= bitmask; - } - - if update_file { - let path = info.to_path(&self.cache_dir); - if let Ok(metadata) = metadata(&path).await { - let one_week_ago = SystemTime::now() - Duration::from_secs(60 * 60 * 24 * 7); - if FileTime::from_last_modification_time(&metadata) < one_week_ago.into() { - // Update file modification time - let _ = spawn_blocking(move || { - if let Err(err) = set_file_mtime(&path, FileTime::now()) { - error!("Update cache file time error: path={:?}, err={}", &path, err); - } - }) - .await; - } + async fn mark_recently_accessed(&self, info: &CacheFileInfo) { + let path = info.to_path(&self.cache_dir); + let _ = spawn_blocking(move || { + if let Err(err) = set_file_mtime(&path, FileTime::now()) { + error!("Update cache file time error: path={:?}, err={}", &path, err); } - } + }) + .await; } async fn scan_cache(&self, static_range: Vec, parallelism: usize, verify_cache: bool) -> Result<(), Error> { @@ -289,7 +305,6 @@ impl CacheManager { debug!("Cache dir number: {}", &dirs.len()); - let lru_cutoff = FileTime::from_system_time(SystemTime::now() - Duration::from_secs(60 * 60 * 24 * 7)); // 1 week let mut counter = 0; let total = dirs.len(); for dir in dirs.into_iter() { @@ -347,11 +362,8 @@ impl CacheManager { self.total_size.fetch_add(size, Relaxed); } - // Add recently accessed file to the cache. + // Save oldest mtime let mtime = FileTime::from_last_modification_time(&metadata); - if mtime > lru_cutoff { - self.mark_recently_accessed(&info, false).await; - } let mut time = time.lock(); if mtime < *time { *time = mtime; @@ -413,11 +425,8 @@ impl CacheManager { self.total_size.fetch_add(size, Relaxed); } - // Add recently accessed file to the cache. + // Save oldest mtime let mtime = FileTime::from_last_modification_time(&metadata); - if mtime > lru_cutoff { - self.mark_recently_accessed(&info, false).await; - } let mut time = time.lock(); if mtime < *time { *time = mtime; @@ -449,17 +458,6 @@ impl CacheManager { Ok(()) } - fn cycle_lru_cache(&self) { - let mut pos = self.lru_clear_pos.lock(); - // 1048576 / (1week / 10s) =~ 17 - for _ in 0..17 { - if let Some(bit) = self.lru_cache.write().get_mut(*pos) { - *bit = 0; - *pos = (*pos + 1) % LRU_SIZE; - } - } - } - async fn check_cache_usage(&self) { let total_size = self.total_size.load(Relaxed); let size_limit = self.size_limit.load(Relaxed); @@ -772,16 +770,6 @@ impl CacheFileInfo { path } - async fn get_file(&self, cache_dir: &Path) -> Option { - let path = self.to_path(cache_dir); - let metadata = metadata(&path).await; - if metadata.is_ok() && metadata.unwrap().is_file() { - Some(path) - } else { - None - } - } - pub fn mime_type(&self) -> Mime { self.mime_type.to_mime() } diff --git a/src/route/cache.rs b/src/route/cache.rs index 081e73a..d04d079 100644 --- a/src/route/cache.rs +++ b/src/route/cache.rs @@ -6,7 +6,7 @@ use axum::{ extract::{Path, Request, State}, http::{ header::{CACHE_CONTROL, CONTENT_DISPOSITION, CONTENT_LENGTH, CONTENT_TYPE}, - HeaderName, HeaderValue, + HeaderValue, }, response::{IntoResponse, Response}, }; @@ -20,8 +20,6 @@ use tokio::{ sync::watch, time::timeout, }; -use tower::util::ServiceExt; -use tower_http::services::ServeFile; use crate::{ cache_manager::CacheFileInfo, @@ -31,22 +29,18 @@ use crate::{ }; const TTL: RangeInclusive = -900..=900; // Token TTL 15 minutes -#[allow(clippy::declare_interior_mutable_const)] -const CACHE_HEADER: (HeaderName, HeaderValue) = (CACHE_CONTROL, HeaderValue::from_static("public, max-age=31536000")); +const CACHE_HEADER: HeaderValue = HeaderValue::from_static("public, max-age=31536000"); +const DEFAULT_CD: HeaderValue = HeaderValue::from_static("inline"); pub(super) async fn hath( Path((file_id, additional, file_name)): Path<(String, String, String)>, data: State>, - req: Request, + _req: Request, ) -> impl IntoResponse { let additional = parse_additional(&additional); let mut keystamp = additional.get("keystamp").unwrap_or(&"").split('-'); let file_index = additional.get("fileindex").unwrap_or(&""); let xres = additional.get("xres").unwrap_or(&""); - let content_disposition = ( - CONTENT_DISPOSITION, - HeaderValue::from_maybe_shared(format!("inline; filename=\"{file_name}\"")).unwrap_or_else(|_| HeaderValue::from_static("inline")), - ); // keystamp check let time = keystamp.next().unwrap_or_default(); @@ -57,22 +51,27 @@ pub(super) async fn hath( return forbidden(); }; - // Check cache hit + // Parse file info let info = match CacheFileInfo::from_file_id(&file_id) { Some(info) => info, None => return not_found(), }; - if let Some(path) = data.cache_manager.get_file(&info).await { - let mut res = ServeFile::new_with_mime(path, &info.mime_type()).oneshot(req).await.unwrap(); - let header = res.headers_mut(); - header.insert(CACHE_HEADER.0, CACHE_HEADER.1); - header.insert(content_disposition.0, content_disposition.1); - return res.map(Body::new); + let file_size = info.size() as u64; + let content_type = HeaderValue::from_maybe_shared(info.mime_type().to_string()).unwrap(); + let content_disposition = HeaderValue::from_maybe_shared(format!("inline;filename=\"{file_name}\"")).unwrap_or(DEFAULT_CD); + + // Check cache hit + if let Some(file) = data.cache_manager.get_file(&info).await { + return Response::builder() + .header(CACHE_CONTROL, CACHE_HEADER) + .header(CONTENT_LENGTH, file_size) + .header(CONTENT_TYPE, content_type) + .header(CONTENT_DISPOSITION, content_disposition) + .body(Body::from_stream(file)) + .unwrap(); } // Cache miss, proxy request - let file_size = info.size() as u64; - // Check if the file is already downloading let (temp_tx, temp_rx) = watch::channel(None); // Tempfile let tx = Arc::new(watch::channel(0).0); // Download progress @@ -206,10 +205,10 @@ pub(super) async fn hath( } Response::builder() + .header(CACHE_CONTROL, CACHE_HEADER) .header(CONTENT_LENGTH, file_size) - .header(CONTENT_TYPE, HeaderValue::from_maybe_shared(info.mime_type().to_string()).unwrap()) - .header(CACHE_HEADER.0, CACHE_HEADER.1) - .header(content_disposition.0, content_disposition.1) + .header(CONTENT_TYPE, content_type) + .header(CONTENT_DISPOSITION, content_disposition) .body(Body::from_stream(stream! { let mut file = File::open(temp_path.as_ref()).await.unwrap(); let mut read_off = 0; diff --git a/src/rpc.rs b/src/rpc.rs index 510b227..e6c7808 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -47,7 +47,6 @@ pub struct Settings { disable_logging: AtomicBool, max_connection: AtomicU64, disable_ip_check: bool, - disable_lru_cache: AtomicBool, } pub struct InitSettings { @@ -94,10 +93,6 @@ impl Settings { self.disable_logging.load(Ordering::Relaxed) } - pub fn disable_lru_cache(&self) -> bool { - self.disable_lru_cache.load(Ordering::Relaxed) - } - fn update(&self, settings: HashMap<&str, &str>) { if let Some(size) = settings.get("disklimit_bytes").and_then(|s| s.parse().ok()) { self.size_limit.store(size, Ordering::Relaxed); @@ -109,9 +104,6 @@ impl Settings { let disable_logging = settings.get("disable_logging").and_then(|s| s.parse().ok()).unwrap_or(false); self.disable_logging.store(disable_logging, Ordering::Relaxed); - - let use_less_memory = settings.get("use_less_memory").and_then(|s| s.parse().ok()).unwrap_or(false); - self.disable_lru_cache.store(use_less_memory, Ordering::Relaxed); } } @@ -139,7 +131,6 @@ impl RPCClient { disable_logging: AtomicBool::new(false), max_connection: AtomicU64::new(max_connection), disable_ip_check, - disable_lru_cache: AtomicBool::new(false), }), } }