Skip to content

Commit

Permalink
feat: impl eviction-time disk cache insertion (#830)
Browse files Browse the repository at this point in the history
* feat: introduce Pipe and Piece as the connector of mem and disk cache

Signed-off-by: MrCroxx <[email protected]>

* refactor: impl eviction-time disk cache insertion

Signed-off-by: MrCroxx <[email protected]>

* fix: fix segment fault on Piece drop

Signed-off-by: MrCroxx <[email protected]>

* fix: fix memory leak caused by circle reference count

Signed-off-by: MrCroxx <[email protected]>

* chore: add comments, pass ffmt check

Signed-off-by: MrCroxx <[email protected]>

---------

Signed-off-by: MrCroxx <[email protected]>
  • Loading branch information
MrCroxx authored Jan 10, 2025
1 parent 204f84b commit 170a410
Show file tree
Hide file tree
Showing 26 changed files with 548 additions and 474 deletions.
28 changes: 26 additions & 2 deletions foyer-memory/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use crate::{
},
raw::{FetchMark, FetchState, RawCache, RawCacheConfig, RawCacheEntry, RawFetch, Weighter},
record::CacheHint,
Result,
Piece, Pipe, Result,
};

pub type FifoCache<K, V, S = RandomState> = RawCache<Fifo<K, V>, S>;
Expand Down Expand Up @@ -226,6 +226,16 @@ where
CacheEntry::S3Fifo(entry) => entry.is_outdated(),
}
}

/// Get the piece of the entry record.
pub fn piece(&self) -> Piece<K, V> {
match self {
CacheEntry::Fifo(entry) => entry.piece(),
CacheEntry::Lru(entry) => entry.piece(),
CacheEntry::Lfu(entry) => entry.piece(),
CacheEntry::S3Fifo(entry) => entry.piece(),
}
}
}

/// Eviction algorithm config.
Expand Down Expand Up @@ -282,6 +292,7 @@ where
weighter: Arc<dyn Weighter<K, V>>,

event_listener: Option<Arc<dyn EventListener<Key = K, Value = V>>>,
pipe: Option<Arc<dyn Pipe<Key = K, Value = V>>>,

registry: BoxedRegistry,
metrics: Option<Arc<Metrics>>,
Expand All @@ -304,6 +315,7 @@ where
hash_builder: RandomState::default(),
weighter: Arc::new(|_, _| 1),
event_listener: None,
pipe: None,

registry: Box::new(NoopMetricsRegistry),
metrics: None,
Expand Down Expand Up @@ -355,6 +367,7 @@ where
hash_builder,
weighter: self.weighter,
event_listener: self.event_listener,
pipe: self.pipe,
registry: self.registry,
metrics: self.metrics,
}
Expand All @@ -372,6 +385,13 @@ where
self
}

/// Set pipe.
#[doc(hidden)]
pub fn with_pipe(mut self, pipe: Arc<dyn Pipe<Key = K, Value = V>>) -> Self {
self.pipe = Some(pipe);
self
}

/// Set metrics registry.
///
/// Default: [`NoopMetricsRegistry`].
Expand Down Expand Up @@ -411,6 +431,7 @@ where
hash_builder: self.hash_builder,
weighter: self.weighter,
event_listener: self.event_listener,
pipe: self.pipe,
metrics,
}))),
EvictionConfig::S3Fifo(eviction_config) => Cache::S3Fifo(Arc::new(RawCache::new(RawCacheConfig {
Expand All @@ -420,6 +441,7 @@ where
hash_builder: self.hash_builder,
weighter: self.weighter,
event_listener: self.event_listener,
pipe: self.pipe,
metrics,
}))),
EvictionConfig::Lru(eviction_config) => Cache::Lru(Arc::new(RawCache::new(RawCacheConfig {
Expand All @@ -429,6 +451,7 @@ where
hash_builder: self.hash_builder,
weighter: self.weighter,
event_listener: self.event_listener,
pipe: self.pipe,
metrics,
}))),
EvictionConfig::Lfu(eviction_config) => Cache::Lfu(Arc::new(RawCache::new(RawCacheConfig {
Expand All @@ -438,6 +461,7 @@ where
hash_builder: self.hash_builder,
weighter: self.weighter,
event_listener: self.event_listener,
pipe: self.pipe,
metrics,
}))),
}
Expand Down Expand Up @@ -660,7 +684,7 @@ where
}

/// Get the hash builder of the in-memory cache.
pub fn hash_builder(&self) -> &S {
pub fn hash_builder(&self) -> &Arc<S> {
match self {
Cache::Fifo(cache) => cache.hash_builder(),
Cache::S3Fifo(cache) => cache.hash_builder(),
Expand Down
1 change: 1 addition & 0 deletions foyer-memory/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ mod cache;
mod error;
mod eviction;
mod indexer;
mod pipe;
mod raw;
mod record;

Expand Down
127 changes: 127 additions & 0 deletions foyer-memory/src/pipe.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// Copyright 2025 foyer Project Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::{fmt::Debug, sync::Arc};

use crate::{record::Record, Eviction};

/// A piece of record that is irrelevant to the eviction algorithm.
///
/// With [`Piece`], the disk cache doesn't need to consider the eviction generic type.
pub struct Piece<K, V> {
record: *const (),
key: *const K,
value: *const V,
hash: u64,
drop_fn: fn(*const ()),
}

impl<K, V> Debug for Piece<K, V> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Piece")
.field("record", &self.record)
.field("hash", &self.hash)
.finish()
}
}

unsafe impl<K, V> Send for Piece<K, V> {}
unsafe impl<K, V> Sync for Piece<K, V> {}

impl<K, V> Drop for Piece<K, V> {
fn drop(&mut self) {
(self.drop_fn)(self.record);
}
}

impl<K, V> Piece<K, V> {
/// Create a record piece from an record wrapped by [`Arc`].
pub fn new<E>(record: Arc<Record<E>>) -> Self
where
E: Eviction<Key = K, Value = V>,
{
let raw = Arc::into_raw(record);
let record = raw as *const ();
let key = unsafe { (*raw).key() } as *const _;
let value = unsafe { (*raw).value() } as *const _;
let hash = unsafe { (*raw).hash() };
let drop_fn = |ptr| unsafe {
let _ = Arc::from_raw(ptr as *const Record<E>);
};
Self {
record,
key,
value,
hash,
drop_fn,
}
}

/// Get the key of the record.
pub fn key(&self) -> &K {
unsafe { &*self.key }
}

/// Get the value of the record.
pub fn value(&self) -> &V {
unsafe { &*self.value }
}

/// Get the hash of the record.
pub fn hash(&self) -> u64 {
self.hash
}
}

/// Pipe is used to notify disk cache to cache entries from the in-memory cache.
pub trait Pipe: Send + Sync + 'static {
/// Type of the key of the record.
type Key;
/// Type of the value of the record.
type Value;

/// Send the piece to the disk cache.
fn send(&self, piece: Piece<Self::Key, Self::Value>);
}

#[cfg(test)]
mod tests {
use super::*;
use crate::{
eviction::fifo::{Fifo, FifoHint},
record::Data,
};

#[test]
fn test_piece() {
let r1 = Arc::new(Record::new(Data::<Fifo<Arc<Vec<u8>>, Arc<Vec<u8>>>> {
key: Arc::new(vec![b'k'; 4096]),
value: Arc::new(vec![b'k'; 16384]),
hint: FifoHint,
hash: 1,
weight: 1,
}));

let p1 = Piece::new(r1.clone());
let k1 = p1.key().clone();
let r2 = r1.clone();
let p2 = Piece::new(r1.clone());

drop(p1);
drop(r2);
drop(p2);
drop(r1);
drop(k1);
}
}
1 change: 1 addition & 0 deletions foyer-memory/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub use crate::{
cache::{Cache, CacheBuilder, CacheEntry, EvictionConfig, Fetch},
error::{Error, Result},
eviction::{fifo::FifoConfig, lfu::LfuConfig, lru::LruConfig, s3fifo::S3FifoConfig, Eviction, Op},
pipe::{Piece, Pipe},
raw::{FetchMark, FetchState, Weighter},
record::CacheHint,
};
Loading

0 comments on commit 170a410

Please sign in to comment.