Skip to content

Commit

Permalink
Support low memory mode
Browse files Browse the repository at this point in the history
  • Loading branch information
james58899 committed Mar 19, 2024
1 parent 0d2f99a commit 7b8c60b
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 15 deletions.
43 changes: 33 additions & 10 deletions src/cache_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use tokio_stream::wrappers::ReadDirStream;

use crate::rpc::{InitSettings, Settings};

const LRU_SIZE: usize = 1048576;
const LRU_SIZE: usize = 1048576; // u16 * LRU_SIZE = 2MiB

struct CacheState {
path: PathBuf,
Expand Down Expand Up @@ -73,11 +73,11 @@ impl Eq for CacheState {}
pub struct CacheManager {
cache_dir: PathBuf,
cache_date: Mutex<BTreeSet<CacheState>>,
lru_cache: RwLock<Vec<u16>>, // 2MiB
lru_cache: RwLock<Vec<u16>>,
lru_clear_pos: Mutex<usize>,
settings: Arc<Settings>,
temp_dir: PathBuf,
total_size: Arc<AtomicU64>,
size_limit: AtomicU64,
}

impl CacheManager {
Expand All @@ -91,12 +91,13 @@ impl CacheManager {
let new = Arc::new(Self {
cache_dir: cache_dir.as_ref().to_path_buf(),
cache_date: Mutex::new(BTreeSet::new()),
lru_cache: RwLock::new(vec![0; LRU_SIZE]),
lru_cache: RwLock::new(vec![]),
lru_clear_pos: Mutex::new(0),
settings,
temp_dir: temp_dir.as_ref().to_path_buf(),
total_size: Arc::new(AtomicU64::new(0)),
size_limit: AtomicU64::new(u64::MAX),
});
new.update_settings(settings);

clean_temp_dir(temp_dir.as_ref()).await;

Expand Down Expand Up @@ -203,6 +204,24 @@ impl CacheManager {
}
}

pub fn update_settings(&self, settings: Arc<Settings>) {
let size_limit = settings.size_limit();
info!("Set size limit to {:} GiB", size_limit / 1024 / 1024 / 1024);
self.size_limit.store(size_limit, Relaxed);

let disable_lru = settings.disable_lru_cache();
let disabled = self.lru_cache.read().is_empty();
if disable_lru && !disabled {
info!("Disable LRU cache");
let mut cache = self.lru_cache.write();
cache.clear();
cache.shrink_to_fit();
} else if !disable_lru && disabled {
info!("Enable LRU cache");
self.lru_cache.write().resize(LRU_SIZE, 0);
}
}

fn start_background_task(new: Arc<Self>) {
let manager = Arc::downgrade(&new);
spawn(async move {
Expand Down Expand Up @@ -233,13 +252,15 @@ impl CacheManager {
let bitmask: u16 = 1 << (hash[4] & 0b0000_1111);

// Check if the file is already in the cache.
if self.lru_cache.read()[index] & bitmask != 0 {
if self.lru_cache.read().get(index).map(|bit| bit & bitmask != 0).unwrap_or(false) {
// Already marked, return
return;
}

// Mark the file as recently accessed.
self.lru_cache.write()[index] |= bitmask;
if let Some(bit) = self.lru_cache.write().get_mut(index) {
*bit |= bitmask;
}

if update_file {
let path = info.to_path(&self.cache_dir);
Expand Down Expand Up @@ -417,13 +438,15 @@ impl CacheManager {
let mut pos = self.lru_clear_pos.lock();
// 1048576 / (1week / 10s) =~ 17
for _ in 0..17 {
self.lru_cache.write()[*pos] = 0;
*pos = (*pos + 1) % LRU_SIZE;
if let Some(bit) = self.lru_cache.write().get_mut(*pos) {
*bit = 0;
*pos = (*pos + 1) % LRU_SIZE;
}
}
}

async fn check_cache_usage(&self) {
let mut need_free = self.total_size.load(Relaxed).saturating_sub(self.settings.size_limit());
let mut need_free = self.total_size.load(Relaxed).saturating_sub(self.size_limit.load(Relaxed));
if need_free == 0 {
return;
}
Expand Down
2 changes: 2 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ async fn main() -> Result<(), Box<dyn Error>> {

// Command listener
let client2 = client.clone();
let cache_manager2 = cache_manager.clone();
let downloader = Arc::new(Mutex::new(None));
let downloader2 = downloader.clone();
let logger_config = logger.config();
Expand All @@ -253,6 +254,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
}
Command::RefreshSettings => {
client2.refresh_settings().await;
cache_manager2.update_settings(client2.settings());
if !args.disable_logging {
logger_config.write_info(!client2.settings().disable_logging());
}
Expand Down
16 changes: 11 additions & 5 deletions src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pub struct Settings {
disable_logging: AtomicBool,
max_connection: AtomicU64,
disable_ip_check: bool,
disable_lru_cache: AtomicBool,
}

pub struct InitSettings {
Expand Down Expand Up @@ -93,6 +94,10 @@ impl Settings {
self.disable_logging.load(Ordering::Relaxed)
}

pub fn disable_lru_cache(&self) -> bool {
self.disable_lru_cache.load(Ordering::Relaxed)
}

fn update(&self, settings: HashMap<&str, &str>) {
if let Some(size) = settings.get("disklimit_bytes").and_then(|s| s.parse().ok()) {
self.size_limit.store(size, Ordering::Relaxed);
Expand All @@ -102,11 +107,11 @@ impl Settings {
self.throttle_bytes.store(size, Ordering::Relaxed);
}

if let Some(disabled) = settings.get("disable_logging").and_then(|s| s.parse().ok()) {
self.disable_logging.store(disabled, Ordering::Relaxed);
} else {
self.disable_logging.store(false, Ordering::Relaxed);
}
let disable_logging = settings.get("disable_logging").and_then(|s| s.parse().ok()).unwrap_or(false);
self.disable_logging.store(disable_logging, Ordering::Relaxed);

let use_less_memory = settings.get("use_less_memory").and_then(|s| s.parse().ok()).unwrap_or(false);
self.disable_lru_cache.store(use_less_memory, Ordering::Relaxed);
}
}

Expand All @@ -133,6 +138,7 @@ impl RPCClient {
disable_logging: AtomicBool::new(false),
max_connection: AtomicU64::new(max_connection),
disable_ip_check,
disable_lru_cache: AtomicBool::new(false),
}),
}
}
Expand Down

0 comments on commit 7b8c60b

Please sign in to comment.