Skip to content

Commit

Permalink
Merge pull request #279 from Kuadrant/resilient_cache
Browse files Browse the repository at this point in the history
Resilient cache
  • Loading branch information
alexsnaps authored Apr 3, 2024
2 parents a1ea581 + e871dbe commit cce84e4
Show file tree
Hide file tree
Showing 16 changed files with 182 additions and 82 deletions.
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion limitador-server/examples/limits.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@
conditions:
- "req.method == 'POST'"
variables:
- user_id
- user_id
1 change: 1 addition & 0 deletions limitador-server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ pub struct RedisStorageCacheConfiguration {
pub max_ttl: u64,
pub ttl_ratio: u64,
pub max_counters: usize,
pub response_timeout: u64,
}

#[derive(PartialEq, Eq, Debug)]
Expand Down
45 changes: 22 additions & 23 deletions limitador-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use limitador::storage::disk::DiskStorage;
use limitador::storage::infinispan::{Consistency, InfinispanStorageBuilder};
use limitador::storage::redis::{
AsyncRedisStorage, CachedRedisStorage, CachedRedisStorageBuilder, DEFAULT_FLUSHING_PERIOD_SEC,
DEFAULT_MAX_CACHED_COUNTERS, DEFAULT_MAX_TTL_CACHED_COUNTERS_SEC,
DEFAULT_MAX_CACHED_COUNTERS, DEFAULT_MAX_TTL_CACHED_COUNTERS_SEC, DEFAULT_RESPONSE_TIMEOUT_MS,
DEFAULT_TTL_RATIO_CACHED_COUNTERS,
};
use limitador::storage::{AsyncCounterStorage, AsyncStorage, Storage};
Expand Down Expand Up @@ -138,29 +138,17 @@ impl Limiter {
) -> CachedRedisStorage {
// TODO: Not all the options are configurable via ENV. Add them as needed.

let mut cached_redis_storage = CachedRedisStorageBuilder::new(redis_url);
let cached_redis_storage = CachedRedisStorageBuilder::new(redis_url)
.flushing_period(Duration::from_millis(cache_cfg.flushing_period as u64))
.max_ttl_cached_counters(Duration::from_millis(cache_cfg.max_ttl))
.ttl_ratio_cached_counters(cache_cfg.ttl_ratio)
.max_cached_counters(cache_cfg.max_counters)
.response_timeout(Duration::from_millis(cache_cfg.response_timeout));

if cache_cfg.flushing_period < 0 {
cached_redis_storage = cached_redis_storage.flushing_period(None)
} else {
cached_redis_storage = cached_redis_storage.flushing_period(Some(
Duration::from_millis(cache_cfg.flushing_period as u64),
))
}

cached_redis_storage =
cached_redis_storage.max_ttl_cached_counters(Duration::from_millis(cache_cfg.max_ttl));

cached_redis_storage = cached_redis_storage.ttl_ratio_cached_counters(cache_cfg.ttl_ratio);
cached_redis_storage = cached_redis_storage.max_cached_counters(cache_cfg.max_counters);

match cached_redis_storage.build().await {
Ok(storage) => storage,
Err(err) => {
eprintln!("Failed to connect to Redis at {redis_url}: {err}");
process::exit(1)
}
}
cached_redis_storage.build().await.unwrap_or_else(|err| {
eprintln!("Failed to connect to Redis at {redis_url}: {err}");
process::exit(1)
})
}

#[cfg(feature = "infinispan")]
Expand Down Expand Up @@ -655,6 +643,15 @@ fn create_config() -> (Configuration, &'static str) {
.default_value(leak(DEFAULT_MAX_CACHED_COUNTERS))
.display_order(5)
.help("Maximum amount of counters cached"),
)
.arg(
Arg::new("timeout")
.long("response-timeout")
.action(ArgAction::Set)
.value_parser(clap::value_parser!(u64))
.default_value(leak(DEFAULT_RESPONSE_TIMEOUT_MS))
.display_order(6)
.help("Timeout for Redis commands in milliseconds"),
),
);

Expand Down Expand Up @@ -762,6 +759,7 @@ fn create_config() -> (Configuration, &'static str) {
max_ttl: *sub.get_one("TTL").unwrap(),
ttl_ratio: *sub.get_one("ratio").unwrap(),
max_counters: *sub.get_one("max").unwrap(),
response_timeout: *sub.get_one("timeout").unwrap(),
}),
}),
#[cfg(feature = "infinispan")]
Expand Down Expand Up @@ -853,6 +851,7 @@ fn storage_config_from_env() -> Result<StorageConfiguration, ()> {
.parse()
.expect("Expected an u64"),
max_counters: DEFAULT_MAX_CACHED_COUNTERS,
response_timeout: DEFAULT_RESPONSE_TIMEOUT_MS,
})
} else {
None
Expand Down
2 changes: 1 addition & 1 deletion limitador/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ tracing = "0.1.40"

# Optional dependencies
rocksdb = { version = "0.21.0", optional = true, features = ["multi-threaded-cf"] }
redis = { version = "0.23.1", optional = true, features = [
redis = { version = "0.25.1", optional = true, features = [
"connection-manager",
"tokio-comp",
"tls-native-tls",
Expand Down
1 change: 1 addition & 0 deletions limitador/src/storage/disk/expiring_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ impl From<TryFromSliceError> for StorageErr {
fn from(_: TryFromSliceError) -> Self {
Self {
msg: "Corrupted byte sequence while reading 8 bytes for 64-bit integer".to_owned(),
transient: false,
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions limitador/src/storage/disk/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::storage::StorageErr;
use rocksdb::ErrorKind;

mod expiring_value;
mod rocksdb_storage;
Expand All @@ -9,6 +10,7 @@ impl From<rocksdb::Error> for StorageErr {
fn from(error: rocksdb::Error) -> Self {
Self {
msg: format!("Underlying storage error: {error}"),
transient: error.kind() == ErrorKind::TimedOut || error.kind() == ErrorKind::TryAgain,
}
}
}
Expand Down
1 change: 1 addition & 0 deletions limitador/src/storage/infinispan/dist_lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ pub async fn lock(
if retries >= RETRIES {
return Err(StorageErr {
msg: "can't acquire lock".into(),
transient: false,
});
}

Expand Down
10 changes: 8 additions & 2 deletions limitador/src/storage/infinispan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,19 @@ pub use infinispan_storage::InfinispanStorageBuilder;

impl From<reqwest::Error> for StorageErr {
fn from(e: reqwest::Error) -> Self {
Self { msg: e.to_string() }
Self {
msg: e.to_string(),
transient: false,
}
}
}

impl From<InfinispanError> for StorageErr {
fn from(e: InfinispanError) -> Self {
Self { msg: e.to_string() }
Self {
msg: e.to_string(),
transient: false,
}
}
}

Expand Down
5 changes: 5 additions & 0 deletions limitador/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,10 +299,15 @@ pub trait AsyncCounterStorage: Sync + Send {
#[error("error while accessing the limits storage: {msg}")]
pub struct StorageErr {
msg: String,
transient: bool,
}

impl StorageErr {
pub fn msg(&self) -> &str {
&self.msg
}

pub fn is_transient(&self) -> bool {
self.transient
}
}
4 changes: 2 additions & 2 deletions limitador/src/storage/redis/counters_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use ttl_cache::TtlCache;

pub struct CountersCache {
max_ttl_cached_counters: Duration,
ttl_ratio_cached_counters: u64,
pub ttl_ratio_cached_counters: u64,
cache: TtlCache<Counter, i64>,
}

Expand Down Expand Up @@ -204,7 +204,7 @@ mod tests {
}

#[test]
fn insert_saves_0_when_redis_val_is_none() {
fn insert_saves_zero_when_redis_val_is_none() {
let max_val = 10;
let mut values = HashMap::new();
values.insert("app_id".to_string(), "1".to_string());
Expand Down
12 changes: 5 additions & 7 deletions limitador/src/storage/redis/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub const DEFAULT_FLUSHING_PERIOD_SEC: u64 = 1;
pub const DEFAULT_MAX_CACHED_COUNTERS: usize = 10000;
pub const DEFAULT_MAX_TTL_CACHED_COUNTERS_SEC: u64 = 5;
pub const DEFAULT_TTL_RATIO_CACHED_COUNTERS: u64 = 10;
pub const DEFAULT_RESPONSE_TIMEOUT_MS: u64 = 350;

use crate::counter::Counter;
use crate::storage::{Authorization, StorageErr};
Expand All @@ -21,13 +22,10 @@ pub use redis_sync::RedisStorage;

impl From<RedisError> for StorageErr {
fn from(e: RedisError) -> Self {
Self { msg: e.to_string() }
}
}

impl From<::r2d2::Error> for StorageErr {
fn from(e: ::r2d2::Error) -> Self {
Self { msg: e.to_string() }
Self {
msg: e.to_string(),
transient: e.is_timeout() || e.is_connection_dropped() || e.is_cluster_error(),
}
}
}

Expand Down
8 changes: 8 additions & 0 deletions limitador/src/storage/redis/redis_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,14 @@ impl AsyncRedisStorage {
Self { conn_manager }
}

pub async fn is_alive(&self) -> bool {
self.conn_manager
.clone()
.incr::<&str, i32, u64>("LIMITADOR_LIVE_CHECK", 1)
.await
.is_ok()
}

async fn delete_counters_associated_with_limit(&self, limit: &Limit) -> Result<(), StorageErr> {
let mut con = self.conn_manager.clone();

Expand Down
Loading

0 comments on commit cce84e4

Please sign in to comment.