Skip to content

Commit

Permalink
Add support to disable record cache
Browse files Browse the repository at this point in the history
Summary:
Since record cache will be used in multiple places. It's possible
to disable it by setting the memory budget to None. This way code
can still use the cache normally but no records will be cached.

All calls to Get will always return None if cache is disabled
  • Loading branch information
muhamadazmy committed Oct 7, 2024
1 parent 060a5ed commit 86e6966
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 16 deletions.
9 changes: 7 additions & 2 deletions crates/bifrost/src/providers/replicated_loglet/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,13 @@ impl<T: TransportConnect> ReplicatedLogletProvider<T> {
active_loglets: Default::default(),
_metadata_store_client: metadata_store_client,
networking,
// todo(asoli): read memory budget from ReplicatedLogletOptions
record_cache: RecordCache::new(20_000_000), // 20MB
record_cache: RecordCache::new(
Configuration::pinned()
.bifrost
.replicated_loglet
.record_cache_memory_size
.as_usize(),
),
logserver_rpc_routers,
sequencer_rpc_routers,
}
Expand Down
44 changes: 31 additions & 13 deletions crates/bifrost/src/providers/replicated_loglet/record_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,27 +27,39 @@ type RecordKey = (ReplicatedLogletId, LogletOffset);
/// RemoteSequencers
#[derive(Clone)]
pub struct RecordCache {
inner: Cache<RecordKey, Record>,
inner: Option<Cache<RecordKey, Record>>,
}

impl RecordCache {
/// Creates a new instance of RecordCache. If memory budget is None
/// cache will be disabled
pub fn new(memory_budget_bytes: usize) -> Self {
let inner: Cache<RecordKey, Record> = CacheBuilder::default()
.weigher(|_, record: &Record| {
(size_of::<RecordKey>() + record.estimated_encode_size())
.try_into()
.unwrap_or(u32::MAX)
})
.max_capacity(memory_budget_bytes.try_into().unwrap_or(u64::MAX))
.eviction_policy(EvictionPolicy::lru())
.build();
let inner = if memory_budget_bytes > 0 {
Some(
CacheBuilder::default()
.weigher(|_, record: &Record| {
(size_of::<RecordKey>() + record.estimated_encode_size())
.try_into()
.unwrap_or(u32::MAX)
})
.max_capacity(memory_budget_bytes.try_into().unwrap_or(u64::MAX))
.eviction_policy(EvictionPolicy::lru())
.build(),
)
} else {
None
};

Self { inner }
}

/// Writes a record to cache externally
pub fn add(&self, loglet_id: ReplicatedLogletId, offset: LogletOffset, record: Record) {
self.inner.insert((loglet_id, offset), record);
let Some(ref inner) = self.inner else {
return;
};

inner.insert((loglet_id, offset), record);
}

/// Extend cache with records
Expand All @@ -57,14 +69,20 @@ impl RecordCache {
mut first_offset: LogletOffset,
records: I,
) {
let Some(ref inner) = self.inner else {
return;
};

for record in records.as_ref() {
self.inner.insert((loglet_id, first_offset), record.clone());
inner.insert((loglet_id, first_offset), record.clone());
first_offset = first_offset.next();
}
}

/// Get a for given loglet id and offset.
pub fn get(&self, loglet_id: ReplicatedLogletId, offset: LogletOffset) -> Option<Record> {
self.inner.get(&(loglet_id, offset))
let inner = self.inner.as_ref()?;

inner.get(&(loglet_id, offset))
}
}
11 changes: 10 additions & 1 deletion crates/types/src/config/bifrost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use std::time::Duration;
use serde::{Deserialize, Serialize};
use serde_with::serde_as;

use restate_serde_util::NonZeroByteCount;
use restate_serde_util::{ByteCount, NonZeroByteCount};
use tracing::warn;

use crate::logs::metadata::ProviderKind;
Expand Down Expand Up @@ -237,6 +237,14 @@ pub struct ReplicatedLogletOptions {
///
/// Retry policy for log server RPCs
pub log_server_retry_policy: RetryPolicy,

/// # In-memory RecordCache memory limit
///
/// Optional size of record cache in bytes.
/// If set to 0, record cache will be disabled.
/// Defaults: 20M
#[cfg_attr(feature = "schemars", schemars(with = "ByteCount"))]
pub record_cache_memory_size: ByteCount,
}

impl Default for ReplicatedLogletOptions {
Expand All @@ -257,6 +265,7 @@ impl Default for ReplicatedLogletOptions {
Some(10),
Some(Duration::from_millis(2000)),
),
record_cache_memory_size: 20_000_000u64.into(), // 20MB
}
}
}

0 comments on commit 86e6966

Please sign in to comment.