Skip to content

Commit 5e1eab0

Browse files
committed
lru using ptrs + lazy pruging + max_evicted + nits
1 parent bba089a commit 5e1eab0

File tree

2 files changed

+145
-101
lines changed

2 files changed

+145
-101
lines changed

Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ zerocopy = { version = "0.8.24", features = ["derive"] }
1818
parking_lot = { version = "0.12.4", features = ["send_guard"] }
1919
fxhash = "0.2.1"
2020
static_assertions = "1.1.0"
21-
lru = "0.16.0"
2221

2322
[dev-dependencies]
2423
criterion = "0.6.0"

src/cache.rs

Lines changed: 145 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,17 @@ use alloy_trie::Nibbles;
33
use std::{
44
collections::HashMap,
55
num::NonZeroUsize,
6-
sync::{Arc, RwLock},
6+
sync::{Arc, Mutex, RwLock, Weak},
77
};
88

9-
/// An entry in the versioned LRU cache with doubly-linked list indices.
9+
/// An entry in the versioned LRU cache.
1010
#[derive(Debug, Clone)]
1111
struct Entry {
1212
snapshot_id: SnapshotId,
1313
key: Nibbles,
1414
value: Option<(PageId, u8)>,
15-
lru_prev: Option<usize>,
16-
lru_next: Option<usize>,
15+
lru_prev: Option<Weak<Mutex<Entry>>>,
16+
lru_next: Option<Arc<Mutex<Entry>>>,
1717
}
1818

1919
impl Entry {
@@ -32,48 +32,46 @@ struct VersionedLru {
3232
// Sorted by snapshot_id
3333
entries: HashMap<Nibbles, Vec<Entry>>,
3434

35-
// Doubly-linked list of entries
36-
lru: Vec<Entry>,
37-
head: Option<usize>,
38-
tail: Option<usize>,
35+
// Keep track of the head and tail
36+
head: Option<Arc<Mutex<Entry>>>,
37+
tail: Option<Weak<Mutex<Entry>>>,
3938
capacity: usize,
39+
size: usize,
4040

4141
// Proactively purge obsolete entries and free up cache space
4242
min_snapshot_id: Option<SnapshotId>,
43+
44+
// Track highest snapshot_id that was ever evicted to maintain temporal coherence
45+
max_evicted_version: SnapshotId,
4346
}
4447

4548
impl VersionedLru {
4649
fn new(capacity: usize) -> Self {
4750
Self {
4851
entries: HashMap::new(),
49-
lru: Vec::new(),
5052
head: None,
5153
tail: None,
5254
capacity,
55+
size: 0,
5356
min_snapshot_id: None,
57+
max_evicted_version: 0,
5458
}
5559
}
5660

5761
/// Finds the entry matching the key via `entries` with the largest snapshot_id <=
5862
/// target_snapshot_id. If the entry is found, it is moved to the front of the LRU list.
5963
fn get(&mut self, key: &Nibbles, target_snapshot_id: SnapshotId) -> Option<(PageId, u8)> {
60-
let versions = self.entries.get(key)?;
64+
self.purge(key);
6165

66+
// Get entry
67+
let versions = self.entries.get(key)?;
6268
let entry_idx =
6369
versions.iter().rposition(|entry| entry.snapshot_id <= target_snapshot_id)?;
64-
6570
let entry = &versions[entry_idx];
71+
72+
// Find the entry in LRU list and move to front
6673
if let Some(value) = entry.value {
67-
// Find the entry and move to front
68-
let snapshot_id = entry.snapshot_id;
69-
if let Some(lru_idx) = self
70-
.lru
71-
.iter()
72-
.position(|entry| &entry.key == key && entry.snapshot_id == snapshot_id)
73-
{
74-
self.remove(lru_idx);
75-
self.add_to_front(lru_idx);
76-
}
74+
self.update_position(key, entry.snapshot_id);
7775
Some(value)
7876
} else {
7977
None
@@ -84,47 +82,51 @@ impl VersionedLru {
8482
/// If the cache is full, evicts the tail entry and removes it from `entries`, and pops it from
8583
/// the linked list.
8684
fn set(&mut self, key: Nibbles, snapshot_id: SnapshotId, value: Option<(PageId, u8)>) {
87-
let new_entry = Entry::new(snapshot_id, key.clone(), value);
85+
// Prevent insertion of entries older than max_evicted_version to maintain temporal
86+
// coherence
87+
if snapshot_id < self.max_evicted_version {
88+
return;
89+
}
8890

89-
// Add to `entries`
91+
// Make entry and find appropriate position
9092
let versions = self.entries.entry(key.clone()).or_default();
91-
versions.push(new_entry.clone());
92-
versions.sort_by_key(|e| e.snapshot_id);
93-
94-
// Add to linked list
95-
let lru_idx = self.lru.len();
96-
self.lru.push(new_entry);
97-
self.add_to_front(lru_idx);
98-
99-
// Cache full - evict smallest snapshot_id entry
100-
if self.lru.len() > self.capacity && self.tail.is_some() {
101-
let tail_idx = self.tail.unwrap();
102-
let tail_key = self.lru[tail_idx].key.clone();
103-
104-
// Find smallest snapshot_id for this key
105-
let smallest = if let Some(versions) = self.entries.get(&tail_key) {
106-
versions.iter().map(|e| e.snapshot_id).min()
107-
} else {
108-
None
109-
};
110-
111-
// Find the LRU index of the smallest snapshot_id
112-
if let Some(smallest) = smallest {
113-
let smallest_idx = self
114-
.lru
115-
.iter()
116-
.position(|entry| entry.key == tail_key && entry.snapshot_id == smallest);
117-
118-
// Remove from `entries` hashmap
119-
if let Some(evict_idx) = smallest_idx {
120-
if let Some(versions) = self.entries.get_mut(&tail_key) {
121-
versions.retain(|e| e.snapshot_id != smallest);
93+
let entry = Entry::new(snapshot_id, key.clone(), value);
94+
let pos = versions
95+
.binary_search_by_key(&snapshot_id, |e| e.snapshot_id)
96+
.unwrap_or_else(|pos| pos);
97+
98+
if pos < versions.len() && versions[pos].snapshot_id == snapshot_id {
99+
// existing entry, update it and move to front
100+
versions[pos] = entry;
101+
self.update_position(&key, snapshot_id);
102+
} else {
103+
// new entry
104+
versions.insert(pos, entry.clone());
105+
self.add_to_front(Arc::new(Mutex::new(entry.clone())));
106+
self.size += 1;
107+
}
108+
self.purge(&key);
109+
110+
// Cache full - evict oldest entry (tail)
111+
if self.size > self.capacity && self.tail.is_some() {
112+
if let Some(weak) = &self.tail {
113+
if let Some(entry) = weak.upgrade() {
114+
let key = entry.lock().unwrap().key.clone();
115+
let snapshot = entry.lock().unwrap().snapshot_id;
116+
117+
// Track max evicted version for temporal coherence
118+
self.max_evicted_version = self.max_evicted_version.max(snapshot);
119+
120+
// Remove from `entries` hashmap
121+
if let Some(versions) = self.entries.get_mut(&key) {
122+
versions.retain(|e| e.snapshot_id != snapshot);
122123
if versions.is_empty() {
123-
self.entries.remove(&tail_key);
124+
self.entries.remove(&key);
124125
}
125126
}
126127

127-
self.remove(evict_idx);
128+
self.remove(entry);
129+
self.size -= 1;
128130
}
129131
}
130132
}
@@ -133,73 +135,87 @@ impl VersionedLru {
133135
//////////////////////////////
134136
//// Helpers
135137
//////////////////////////////
136-
fn add_to_front(&mut self, lru_idx: usize) {
137-
self.lru[lru_idx].lru_prev = None;
138-
self.lru[lru_idx].lru_next = self.head;
138+
fn get_entry(&self, key: &Nibbles, snapshot_id: SnapshotId) -> Option<Arc<Mutex<Entry>>> {
139+
let mut current = self.head.clone();
140+
while let Some(entry) = current {
141+
let guard = entry.lock().unwrap();
142+
if &guard.key == key && guard.snapshot_id == snapshot_id {
143+
drop(guard);
144+
return Some(entry);
145+
}
146+
current = guard.lru_next.clone();
147+
}
148+
None
149+
}
150+
151+
/// Update head pointer and `Entry`'s pointers
152+
fn add_to_front(&mut self, entry: Arc<Mutex<Entry>>) {
153+
let mut guard = entry.lock().unwrap();
154+
guard.lru_prev = None;
155+
guard.lru_next = self.head.clone();
156+
drop(guard);
139157

140-
if let Some(old_head_idx) = self.head {
141-
self.lru[old_head_idx].lru_prev = Some(lru_idx);
158+
if let Some(old_head) = &self.head {
159+
old_head.lock().unwrap().lru_prev = Some(Arc::downgrade(&entry));
142160
} else {
143-
self.tail = Some(lru_idx);
161+
self.tail = Some(Arc::downgrade(&entry));
144162
}
145163

146-
self.head = Some(lru_idx);
164+
self.head = Some(entry);
147165
}
148166

149-
fn remove(&mut self, lru_idx: usize) {
150-
let prev_idx = self.lru[lru_idx].lru_prev;
151-
let next_idx = self.lru[lru_idx].lru_next;
167+
/// Remove an entry from LRU
168+
fn remove(&mut self, entry: Arc<Mutex<Entry>>) {
169+
let (prev, next) = {
170+
let entry_guard = entry.lock().unwrap();
171+
(entry_guard.lru_prev.clone(), entry_guard.lru_next.clone())
172+
};
152173

153-
if let Some(prev) = prev_idx {
154-
self.lru[prev].lru_next = next_idx;
174+
if let Some(weak) = &prev {
175+
if let Some(prev_entry) = weak.upgrade() {
176+
prev_entry.lock().unwrap().lru_next = next.clone();
177+
}
155178
} else {
156-
self.head = next_idx;
179+
self.head = next.clone();
157180
}
158181

159-
if let Some(next) = next_idx {
160-
self.lru[next].lru_prev = prev_idx;
182+
if let Some(next_entry) = &next {
183+
next_entry.lock().unwrap().lru_prev = prev.clone();
161184
} else {
162-
self.tail = prev_idx;
185+
self.tail = prev;
163186
}
164187

165-
// Mark as removed
166-
self.lru[lru_idx].lru_prev = None;
167-
self.lru[lru_idx].lru_next = None;
188+
let mut guard = entry.lock().unwrap();
189+
guard.lru_prev = None;
190+
guard.lru_next = None;
168191
}
169192

193+
/// Purging is done lazily in `get` and `set` methods
170194
fn set_min_snapshot_id(&mut self, min_snapshot_id: SnapshotId) {
171195
self.min_snapshot_id = Some(min_snapshot_id);
196+
}
172197

173-
// Purge obsolete entries
198+
/// Finds the first entry with snapshot id less than min_id and removes it from the list
199+
fn purge(&mut self, key: &Nibbles) {
174200
if let Some(min_id) = self.min_snapshot_id {
175-
let keys_to_update: Vec<Nibbles> = self.entries.keys().cloned().collect();
176-
177-
for key in keys_to_update {
178-
if let Some(versions) = self.entries.get_mut(&key) {
179-
versions.retain(|entry| entry.snapshot_id >= min_id);
180-
181-
if versions.is_empty() {
182-
self.entries.remove(&key);
183-
}
201+
if let Some(versions) = self.entries.get_mut(key) {
202+
if let Some(idx) = versions.iter().position(|e| e.snapshot_id >= min_id) {
203+
versions.drain(0..idx);
184204
}
185205
}
206+
}
207+
}
186208

187-
// Remove from LRU list
188-
self.lru.retain(|entry| entry.snapshot_id >= min_id);
189-
190-
// Rebuild LRU pointers after retention
191-
self.head = None;
192-
self.tail = None;
193-
194-
for i in 0..self.lru.len() {
195-
self.lru[i].lru_prev = if i > 0 { Some(i - 1) } else { None };
196-
self.lru[i].lru_next = if i < self.lru.len() - 1 { Some(i + 1) } else { None };
197-
}
198-
199-
if !self.lru.is_empty() {
200-
self.head = Some(0);
201-
self.tail = Some(self.lru.len() - 1);
209+
/// Updates the position of an entry in the LRU
210+
fn update_position(&mut self, key: &Nibbles, snapshot_id: SnapshotId) {
211+
if let Some(lru_entry) = self.get_entry(key, snapshot_id) {
212+
if let Some(head) = &self.head {
213+
if Arc::ptr_eq(head, &lru_entry) {
214+
return;
215+
}
202216
}
217+
self.remove(lru_entry.clone());
218+
self.add_to_front(lru_entry);
203219
}
204220
}
205221
}
@@ -429,4 +445,33 @@ mod tests {
429445
Some((PageId::new(40).unwrap(), 41))
430446
);
431447
}
448+
449+
#[test]
450+
fn test_temporal_coherence() {
451+
let cache = CacheManager::new(NonZeroUsize::new(2).unwrap());
452+
let shared_cache = Arc::new(cache);
453+
454+
// insert entries
455+
shared_cache.insert(100, Nibbles::from_nibbles([1]), Some((PageId::new(10).unwrap(), 11)));
456+
shared_cache.insert(200, Nibbles::from_nibbles([2]), Some((PageId::new(20).unwrap(), 21)));
457+
458+
// this should evict snapshot 100, setting max_evicted_version to 100
459+
shared_cache.insert(300, Nibbles::from_nibbles([3]), Some((PageId::new(30).unwrap(), 31)));
460+
461+
// this should be rejected since it's older than max_evicted_version
462+
shared_cache.insert(50, Nibbles::from_nibbles([4]), Some((PageId::new(5).unwrap(), 6)));
463+
464+
// should not be retrievable
465+
assert_eq!(shared_cache.get(50, &Nibbles::from_nibbles([4])), None);
466+
467+
// rest should still work
468+
assert_eq!(
469+
shared_cache.get(200, &Nibbles::from_nibbles([2])),
470+
Some((PageId::new(20).unwrap(), 21))
471+
);
472+
assert_eq!(
473+
shared_cache.get(300, &Nibbles::from_nibbles([3])),
474+
Some((PageId::new(30).unwrap(), 31))
475+
);
476+
}
432477
}

0 commit comments

Comments
 (0)