Skip to content

Commit

Permalink
refactor: use removable queue to simplify fifo impl
Browse files Browse the repository at this point in the history
Signed-off-by: MrCroxx <[email protected]>
  • Loading branch information
MrCroxx committed Feb 23, 2024
1 parent 6106152 commit 9c8b68d
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 185 deletions.
28 changes: 27 additions & 1 deletion foyer-common/src/removable_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ pub struct RemovableQueue<T> {
token: usize,
}

impl<T> Default for RemovableQueue<T> {
fn default() -> Self {
Self::new()
}

Check warning on line 42 in foyer-common/src/removable_queue.rs

View check run for this annotation

Codecov / codecov/patch

foyer-common/src/removable_queue.rs#L40-L42

Added lines #L40 - L42 were not covered by tests
}

impl<T> RemovableQueue<T> {
pub const DEFAULT_CAPACITY: usize = 16;

Expand Down Expand Up @@ -106,12 +112,32 @@ impl<T> RemovableQueue<T> {

/// Randonly remove the element with the given `token` from the queue.
pub fn remove(&mut self, token: Token) -> Option<T> {
debug_assert!(token.0 >= self.token);
if token.0 < self.token + self.head || token.0 >= self.token + self.tail {
return None;

Check warning on line 116 in foyer-common/src/removable_queue.rs

View check run for this annotation

Codecov / codecov/patch

foyer-common/src/removable_queue.rs#L116

Added line #L116 was not covered by tests
}
let pos = (token.0 - self.token) % self.capacity;
self.len -= 1;
self.queue[pos].take()
}

/// Remove and return all the elements from the queue.
pub fn clear(&mut self) -> Vec<T> {
let mut res = Vec::with_capacity(self.len);
for pos in self.head..self.tail {
let pos = pos % self.capacity;
if let Some(elem) = self.queue[pos].take() {
res.push(elem);
}
}

self.token += self.tail;
self.head = 0;
self.tail = 0;
self.len = 0;

res
}

/// Returns the actually element count.
#[inline(always)]
pub fn len(&self) -> usize {
Expand Down
1 change: 1 addition & 0 deletions foyer-memory/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ normal = ["foyer-workspace-hack"]
ahash = "0.8"
bitflags = "2"
crossbeam = "0.8"
foyer-common = { version = "0.3", path = "../foyer-common" }
foyer-workspace-hack = { version = "0.2", path = "../foyer-workspace-hack" }
hashbrown = "0.14"
itertools = "0.12"
Expand Down
207 changes: 24 additions & 183 deletions foyer-memory/src/eviction/fifo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

use std::ptr::NonNull;

use foyer_common::removable_queue::{RemovableQueue, Token};

use crate::{
eviction::Eviction,
handle::{BaseHandle, Handle},
Expand All @@ -26,7 +28,7 @@ where
V: Value,
{
base: BaseHandle<K, V>,
pos: usize,
token: Option<Token>,
}

impl<K, V> Handle for FifoHandle<K, V>
Expand All @@ -40,7 +42,7 @@ where
fn new() -> Self {
Self {
base: BaseHandle::new(),
pos: usize::MAX,
token: None,
}
}

Expand All @@ -67,152 +69,7 @@ where
K: Key,
V: Value,
{
queue: Box<[Option<NonNull<FifoHandle<K, V>>>]>,
head: usize,
tail: usize,
len: usize,
capacity: usize,
}

impl<K, V> Fifo<K, V>
where
K: Key,
V: Value,
{
fn new(config: FifoConfig) -> Self {
Self {
queue: vec![None; config.default_capacity].into_boxed_slice(),
head: 0,
tail: 0,
len: 0,
capacity: config.default_capacity,
}
}

#[inline(always)]
fn usage(&self) -> usize {
self.tail - self.head
}

#[inline(always)]
fn len(&self) -> usize {
self.len
}

#[inline(always)]
fn is_empty(&self) -> bool {
self.len() == 0
}

#[inline(always)]
fn capacity(&self) -> usize {
self.capacity
}

unsafe fn push(&mut self, mut ptr: NonNull<FifoHandle<K, V>>) {
if self.usage() == self.capacity() {
self.grow();
}

debug_assert_ne!(self.usage(), self.capacity());

let pos = self.tail % self.capacity;
ptr.as_mut().pos = pos;
self.queue[pos] = Some(ptr);

self.tail += 1;
self.len += 1;
}

unsafe fn pop(&mut self) -> Option<NonNull<FifoHandle<K, V>>> {
let mut res = None;

while res.is_none() && self.usage() > 0 {
let pos = self.head % self.capacity;
res = self.queue[pos].take();
self.head += 1;
}

while self.usage() > 0
&& let pos = self.head % self.capacity
&& self.queue[pos].is_none()
{
self.head += 1;
}

if let Some(mut ptr) = res {
ptr.as_mut().pos = usize::MAX;
self.len -= 1;
}

res
}

/// Take item at `pos`. The slot will be lazily released.
///
/// # Safety
///
/// Item at `pos` must be some.
unsafe fn remove(&mut self, pos: usize) -> NonNull<FifoHandle<K, V>> {
debug_assert!(self.queue[pos].is_some());
self.len -= 1;
self.queue[pos].take().unwrap_unchecked()
}

unsafe fn grow(&mut self) {
let capacity = self.capacity * 2;
let mut queue = vec![None; capacity].into_boxed_slice();
let usage = self.usage();

if usage == 0 {
debug_assert_eq!(self.len, 0);
} else {
let phead = self.head % self.capacity;
let ptail = self.tail % self.capacity;

if phead < ptail {
// ↓ phead ↓ ptail
// * [ ][ ][x][x][x][x][x][x][ ][ ][ ][ ]
queue[0..ptail - phead].copy_from_slice(&self.queue[phead..ptail]);
} else {
// ↓ ptail ↓ phead
// * [x][x][ ][ ][ ][ ][ ][ ][x][x][x][x]
//
// ↓ ptail & phead
// * [x][x][x][x][x][x][x][x][x][x][x][x]
queue[0..self.capacity - phead].copy_from_slice(&self.queue[phead..self.capacity]);
queue[self.capacity - phead..self.capacity - phead + ptail]
.copy_from_slice(&self.queue[0..ptail]);
}
}

self.queue = queue;
self.capacity = capacity;
self.head = 0;
self.tail = usage;

for (pos, item) in self.queue.iter_mut().enumerate() {
if let Some(item) = item {
item.as_mut().pos = pos;
}
}
}

unsafe fn clear(&mut self) -> Vec<NonNull<FifoHandle<K, V>>> {
let mut res = vec![];
for pos in self.head..self.tail {
let pos = pos % self.capacity;
if let Some(item) = self.queue[pos].take() {
res.push(item);
}
}

self.len = 0;
self.head = 0;
self.tail = 0;

res
}
queue: RemovableQueue<NonNull<FifoHandle<K, V>>>,
}

impl<K, V> Eviction for Fifo<K, V>
Expand All @@ -224,31 +81,34 @@ where
type C = FifoConfig;

fn new(config: Self::C) -> Self {
Self::new(config)
Self {
queue: RemovableQueue::with_capacity(config.default_capacity),
}
}

unsafe fn push(&mut self, ptr: NonNull<Self::H>) {
self.push(ptr);
unsafe fn push(&mut self, mut ptr: NonNull<Self::H>) {
let token = self.queue.push(ptr);
ptr.as_mut().token = Some(token);
}

unsafe fn pop(&mut self) -> Option<NonNull<Self::H>> {
self.pop()
self.queue.pop()
}

unsafe fn access(&mut self, _: NonNull<Self::H>) {}

Check warning on line 98 in foyer-memory/src/eviction/fifo.rs

View check run for this annotation

Codecov / codecov/patch

foyer-memory/src/eviction/fifo.rs#L98

Added line #L98 was not covered by tests

unsafe fn remove(&mut self, ptr: NonNull<Self::H>) {
let mut p = self.remove(ptr.as_ref().pos);
debug_assert_eq!(ptr, p);
p.as_mut().pos = usize::MAX;
unsafe fn remove(&mut self, mut ptr: NonNull<Self::H>) {
debug_assert!(ptr.as_mut().token.is_some());
let token = ptr.as_mut().token.take().unwrap_unchecked();
self.queue.remove(token);
}

unsafe fn clear(&mut self) -> Vec<NonNull<Self::H>> {
self.clear()
self.queue.clear()
}

fn is_empty(&self) -> bool {
self.is_empty()
self.queue.is_empty()
}

Check warning on line 112 in foyer-memory/src/eviction/fifo.rs

View check run for this annotation

Codecov / codecov/patch

foyer-memory/src/eviction/fifo.rs#L110-L112

Added lines #L110 - L112 were not covered by tests
}

Expand Down Expand Up @@ -296,48 +156,29 @@ mod tests {
};

let mut fifo = TestFifo::new(config);
assert_eq!(fifo.capacity(), 4);
assert_eq!(fifo.len(), 0);
assert_eq!(fifo.usage(), 0);

fifo.push(ptrs[0]);
fifo.push(ptrs[1]);
fifo.push(ptrs[2]);
fifo.push(ptrs[3]);
assert_eq!(fifo.capacity(), 4);
assert_eq!(fifo.len(), 4);
assert_eq!(fifo.usage(), 4);

let p0 = fifo.pop().unwrap();
let p1 = fifo.pop().unwrap();
assert_eq!(ptrs[0], p0);
assert_eq!(ptrs[1], p1);
assert_eq!(fifo.capacity(), 4);
assert_eq!(fifo.len(), 2);
assert_eq!(fifo.usage(), 2);

fifo.push(ptrs[4]);
fifo.push(ptrs[5]);
fifo.push(ptrs[6]);
assert_eq!(fifo.capacity(), 8);
assert_eq!(fifo.len(), 5);
assert_eq!(fifo.usage(), 5);

let p3 = fifo.remove(ptrs[3].as_ref().pos);
let p4 = fifo.remove(ptrs[4].as_ref().pos);
let p5 = fifo.remove(ptrs[5].as_ref().pos);
assert_eq!(ptrs[3], p3);
assert_eq!(ptrs[4], p4);
assert_eq!(ptrs[5], p5);
assert_eq!(fifo.capacity(), 8);
assert_eq!(fifo.len(), 2);
assert_eq!(fifo.usage(), 5);

fifo.remove(ptrs[3]);
fifo.remove(ptrs[4]);
fifo.remove(ptrs[5]);

let p2 = fifo.pop().unwrap();
let p6 = fifo.pop().unwrap();
assert_eq!(ptrs[2], p2);
assert_eq!(fifo.capacity(), 8);
assert_eq!(fifo.len(), 1);
assert_eq!(fifo.usage(), 1);
assert_eq!(ptrs[6], p6);

for ptr in ptrs {
del_test_fifo_handle_ptr(ptr);
Expand Down
1 change: 0 additions & 1 deletion foyer-workspace-hack/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ futures-util = { version = "0.3", default-features = false, features = ["async-a
getrandom = { version = "0.2", default-features = false, features = ["std"] }
hashbrown = { version = "0.14", features = ["raw"] }
libc = { version = "0.2", features = ["extra_traits"] }
lock_api = { version = "0.4", features = ["arc_lock"] }
memchr = { version = "2" }
parking_lot = { version = "0.12", features = ["arc_lock", "deadlock_detection"] }
parking_lot_core = { version = "0.9", default-features = false, features = ["deadlock_detection"] }
Expand Down

0 comments on commit 9c8b68d

Please sign in to comment.