Skip to content

Commit

Permalink
Add TIGHTENING Ratio and FP_Rate configs support (#31)
Browse files Browse the repository at this point in the history
Signed-off-by: Nihal Mehta <[email protected]>
  • Loading branch information
nnmehta authored Dec 19, 2024
1 parent 98ccdc6 commit 9baf6fc
Show file tree
Hide file tree
Showing 7 changed files with 164 additions and 34 deletions.
33 changes: 25 additions & 8 deletions src/bloom/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::bloom::utils::BloomFilterType;
use crate::configs;
use crate::configs::{
BLOOM_CAPACITY_MAX, BLOOM_CAPACITY_MIN, BLOOM_EXPANSION_MAX, BLOOM_EXPANSION_MIN,
BLOOM_FP_RATE_MAX, BLOOM_FP_RATE_MIN, TIGHTENING_RATIO_MAX, TIGHTENING_RATIO_MIN,
BLOOM_FP_RATE_MAX, BLOOM_FP_RATE_MIN, BLOOM_TIGHTENING_RATIO_MAX, BLOOM_TIGHTENING_RATIO_MIN,
};
use std::sync::atomic::Ordering;
use valkey_module::ContextFlags;
Expand Down Expand Up @@ -203,8 +203,12 @@ pub fn bloom_filter_add_value(
}
None => {
// Instantiate empty bloom filter.
let fp_rate = configs::BLOOM_FP_RATE_DEFAULT;
let tightening_ratio = configs::TIGHTENING_RATIO;
let fp_rate = *configs::BLOOM_FP_RATE_F64
.lock()
.expect("Unable to get a lock on fp_rate static");
let tightening_ratio = *configs::BLOOM_TIGHTENING_F64
.lock()
.expect("Unable to get a lock on tightening ratio static");
let capacity = configs::BLOOM_CAPACITY.load(Ordering::Relaxed);
let expansion = configs::BLOOM_EXPANSION.load(Ordering::Relaxed) as u32;
let use_random_seed = configs::BLOOM_USE_RANDOM_SEED.load(Ordering::Relaxed);
Expand Down Expand Up @@ -401,7 +405,9 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> Valke
};
// Skip bloom filter size validation on replicated cmds.
let validate_size_limit = !ctx.get_flags().contains(ContextFlags::REPLICATED);
let tightening_ratio = configs::TIGHTENING_RATIO;
let tightening_ratio = *configs::BLOOM_TIGHTENING_F64
.lock()
.expect("Failed to lock tightening ratio");
let bloom = match BloomFilterType::new_reserved(
fp_rate,
tightening_ratio,
Expand Down Expand Up @@ -444,8 +450,12 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
let filter_name = &input_args[idx];
idx += 1;
let replicated_cmd = ctx.get_flags().contains(ContextFlags::REPLICATED);
let mut fp_rate = configs::BLOOM_FP_RATE_DEFAULT;
let mut tightening_ratio = configs::TIGHTENING_RATIO;
let mut fp_rate = *configs::BLOOM_FP_RATE_F64
.lock()
.expect("Unable to get a lock on fp_rate static");
let mut tightening_ratio = *configs::BLOOM_TIGHTENING_F64
.lock()
.expect("Unable to get a lock on tightening ratio static");
let mut capacity = configs::BLOOM_CAPACITY.load(Ordering::Relaxed);
let mut expansion = configs::BLOOM_EXPANSION.load(Ordering::Relaxed) as u32;
let use_random_seed = configs::BLOOM_USE_RANDOM_SEED.load(Ordering::Relaxed);
Expand Down Expand Up @@ -479,8 +489,15 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
}
idx += 1;
tightening_ratio = match input_args[idx].to_string_lossy().parse::<f64>() {
Ok(num) if num > TIGHTENING_RATIO_MIN && num < TIGHTENING_RATIO_MAX => num,
Ok(num) if !(num > TIGHTENING_RATIO_MIN && num < TIGHTENING_RATIO_MAX) => {
Ok(num)
if num > BLOOM_TIGHTENING_RATIO_MIN && num < BLOOM_TIGHTENING_RATIO_MAX =>
{
num
}
Ok(num)
if !(num > BLOOM_TIGHTENING_RATIO_MIN
&& num < BLOOM_TIGHTENING_RATIO_MAX) =>
{
return Err(ValkeyError::Str(utils::ERROR_RATIO_RANGE));
}
_ => {
Expand Down
19 changes: 10 additions & 9 deletions src/bloom/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,16 @@ impl ValkeyDataType for BloomFilterType {
let Ok(capacity) = raw::load_unsigned(rdb) else {
return None;
};
let new_fp_rate = match Self::calculate_fp_rate(fp_rate, num_filters as i32) {
Ok(rate) => rate,
Err(_) => {
logging::log_warning(
"Failed to restore bloom object: Reached max number of filters",
);
return None;
}
};
let new_fp_rate =
match Self::calculate_fp_rate(fp_rate, num_filters as i32, tightening_ratio) {
Ok(rate) => rate,
Err(_) => {
logging::log_warning(
"Failed to restore bloom object: Reached max number of filters",
);
return None;
}
};
if !BloomFilter::validate_size(capacity as i64, new_fp_rate) {
logging::log_warning("Failed to restore bloom object: Contains a filter larger than the max allowed size limit.");
return None;
Expand Down
25 changes: 16 additions & 9 deletions src/bloom/utils.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use super::data_type::BLOOM_TYPE_VERSION;
use crate::{
configs::{
self, BLOOM_EXPANSION_MAX, BLOOM_FP_RATE_MAX, BLOOM_FP_RATE_MIN, TIGHTENING_RATIO_MAX,
TIGHTENING_RATIO_MIN,
self, BLOOM_EXPANSION_MAX, BLOOM_FP_RATE_MAX, BLOOM_FP_RATE_MIN,
BLOOM_TIGHTENING_RATIO_MAX, BLOOM_TIGHTENING_RATIO_MIN,
},
metrics,
};
Expand Down Expand Up @@ -265,10 +265,11 @@ impl BloomFilterType {
}
// Scale out by adding a new filter with capacity bounded within the u32 range. false positive rate is also
// bound within the range f64::MIN_POSITIVE <= x < 1.0.
let new_fp_rate = match Self::calculate_fp_rate(self.fp_rate, num_filters) {
Ok(rate) => rate,
Err(e) => return Err(e),
};
let new_fp_rate =
match Self::calculate_fp_rate(self.fp_rate, num_filters, self.tightening_ratio) {
Ok(rate) => rate,
Err(e) => return Err(e),
};
let new_capacity = match filter.capacity.checked_mul(self.expansion.into()) {
Some(new_capacity) => new_capacity,
None => {
Expand Down Expand Up @@ -325,8 +326,12 @@ impl BloomFilterType {
}

/// Calculate the false positive rate for the Nth filter using tightening ratio.
pub fn calculate_fp_rate(fp_rate: f64, num_filters: i32) -> Result<f64, BloomError> {
match fp_rate * configs::TIGHTENING_RATIO.powi(num_filters) {
pub fn calculate_fp_rate(
fp_rate: f64,
num_filters: i32,
tightening_ratio: f64,
) -> Result<f64, BloomError> {
match fp_rate * tightening_ratio.powi(num_filters) {
x if x > f64::MIN_POSITIVE => Ok(x),
_ => Err(BloomError::MaxNumScalingFilters),
}
Expand Down Expand Up @@ -374,7 +379,9 @@ impl BloomFilterType {
if !(values.1 > BLOOM_FP_RATE_MIN && values.1 < BLOOM_FP_RATE_MAX) {
return Err(BloomError::ErrorRateRange);
}
if !(values.2 > TIGHTENING_RATIO_MIN && values.2 < TIGHTENING_RATIO_MAX) {
if !(values.2 > BLOOM_TIGHTENING_RATIO_MIN
&& values.2 < BLOOM_TIGHTENING_RATIO_MAX)
{
return Err(BloomError::ErrorRateRange);
}
if values.4.len() >= configs::MAX_FILTERS_PER_OBJ as usize {
Expand Down
75 changes: 69 additions & 6 deletions src/configs.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
use crate::bloom::utils;
use lazy_static::lazy_static;
use std::sync::atomic::{AtomicBool, AtomicI64};
use std::sync::Mutex;
use valkey_module::logging;
use valkey_module::{
configuration::{ConfigurationContext, ConfigurationFlags},
valkey_module, ConfigurationValue, Context, InfoContext, Status, ValkeyError, ValkeyGILGuard,
ValkeyResult, ValkeyString,
};

/// Configurations
pub const BLOOM_CAPACITY_DEFAULT: i64 = 100;
Expand All @@ -10,10 +18,16 @@ pub const BLOOM_EXPANSION_DEFAULT: i64 = 2;
pub const BLOOM_EXPANSION_MIN: u32 = 1;
pub const BLOOM_EXPANSION_MAX: u32 = u32::MAX;

pub const BLOOM_FP_RATE_DEFAULT: f64 = 0.01;
pub const BLOOM_FP_RATE_DEFAULT: &str = "0.01";
pub const BLOOM_FP_RATE_MIN: f64 = 0.0;
pub const BLOOM_FP_RATE_MAX: f64 = 1.0;

// Tightening ratio used during scale out for the calculation of fp_rate of every new filter within a bloom object to
// maintain the bloom object's overall fp_rate to the configured value.
pub const TIGHTENING_RATIO_DEFAULT: &str = "0.5";
pub const BLOOM_TIGHTENING_RATIO_MIN: f64 = 0.0;
pub const BLOOM_TIGHTENING_RATIO_MAX: f64 = 1.0;

pub const BLOOM_USE_RANDOM_SEED_DEFAULT: bool = true;

pub const BLOOM_DEFRAG_DEAFULT: bool = true;
Expand All @@ -31,14 +45,23 @@ lazy_static! {
AtomicI64::new(BLOOM_MEMORY_LIMIT_PER_FILTER_DEFAULT);
pub static ref BLOOM_USE_RANDOM_SEED: AtomicBool = AtomicBool::default();
pub static ref BLOOM_DEFRAG: AtomicBool = AtomicBool::new(BLOOM_DEFRAG_DEAFULT);
pub static ref BLOOM_FP_RATE_F64: Mutex<f64> = Mutex::new(
BLOOM_FP_RATE_DEFAULT
.parse::<f64>()
.expect("Expected valid f64 for fp rate.")
);
pub static ref BLOOM_FP_RATE: ValkeyGILGuard<ValkeyString> =
ValkeyGILGuard::new(ValkeyString::create(None, BLOOM_FP_RATE_DEFAULT));
pub static ref BLOOM_TIGHTENING_F64: Mutex<f64> = Mutex::new(
TIGHTENING_RATIO_DEFAULT
.parse::<f64>()
.expect("Expected valid f64 for tightening ratio.")
);
pub static ref BLOOM_TIGHTENING_RATIO: ValkeyGILGuard<ValkeyString> =
ValkeyGILGuard::new(ValkeyString::create(None, TIGHTENING_RATIO_DEFAULT));
}

/// Constants
// Tightening ratio used during scale out for the calculation of fp_rate of every new filter within a bloom object to
// maintain the bloom object's overall fp_rate to the configured value.
pub const TIGHTENING_RATIO: f64 = 0.5;
pub const TIGHTENING_RATIO_MIN: f64 = 0.0;
pub const TIGHTENING_RATIO_MAX: f64 = 1.0;
// Max number of filters allowed within a bloom object.
pub const MAX_FILTERS_PER_OBJ: i32 = i32::MAX;
/// Below constants are fixed seed and sip keys to help create bloom objects using the same seed and to restore the bloom objects with the same hasher which
Expand All @@ -47,3 +70,43 @@ pub const FIXED_SEED: [u8; 32] = [
89, 15, 245, 34, 234, 120, 17, 218, 167, 20, 216, 9, 59, 62, 123, 217, 29, 137, 138, 115, 62,
152, 136, 135, 48, 127, 151, 205, 40, 7, 51, 131,
];

/// This is a config set handler for the False Positive Rate and Tightening Ratio configs.
pub fn on_string_config_set(
config_ctx: &ConfigurationContext,
name: &str,
val: &'static ValkeyGILGuard<ValkeyString>,
) -> Result<(), ValkeyError> {
let v = val.get(config_ctx);
let value_str = v.to_string_lossy();
let value = match value_str.parse::<f64>() {
Ok(v) => v,
Err(_) => {
return Err(ValkeyError::Str("Invalid floating-point value"));
}
};

match name {
"bloom-fp-rate" => {
if !(BLOOM_FP_RATE_MIN..BLOOM_FP_RATE_MAX).contains(&value) {
return Err(ValkeyError::Str(utils::ERROR_RATE_RANGE));
}
let mut fp_rate = BLOOM_FP_RATE_F64
.lock()
.expect("We expect the fp_rate static to exist.");
*fp_rate = value;
Ok(())
}
"bloom-tightening-ratio" => {
if !(BLOOM_TIGHTENING_RATIO_MIN..BLOOM_TIGHTENING_RATIO_MAX).contains(&value) {
return Err(ValkeyError::Str(utils::ERROR_RATIO_RANGE));
}
let mut tightening = BLOOM_TIGHTENING_F64
.lock()
.expect("We expect the tightening_ratio static to exist.");
*tightening = value;
Ok(())
}
_ => Err(ValkeyError::Str("Unknown configuration parameter")),
}
}
8 changes: 6 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use metrics::bloom_info_handler;
use valkey_module::configuration::ConfigurationFlags;
use valkey_module::{valkey_module, Context, InfoContext, Status, ValkeyResult, ValkeyString};
use valkey_module::{
configuration::ConfigurationFlags, valkey_module, Context, InfoContext, Status, ValkeyGILGuard,
ValkeyResult, ValkeyString,
};
pub mod bloom;
pub mod configs;
pub mod metrics;
Expand Down Expand Up @@ -106,6 +108,8 @@ valkey_module! {
["bloom-memory-limit-per-filter", &*configs::BLOOM_MEMORY_LIMIT_PER_FILTER, configs::BLOOM_MEMORY_LIMIT_PER_FILTER_DEFAULT, configs::BLOOM_MEMORY_LIMIT_PER_FILTER_MIN, configs::BLOOM_MEMORY_LIMIT_PER_FILTER_MAX, ConfigurationFlags::DEFAULT, None],
],
string: [
["bloom-fp-rate", &*configs::BLOOM_FP_RATE, configs::BLOOM_FP_RATE_DEFAULT, ConfigurationFlags::DEFAULT, None, Some(Box::new(configs::on_string_config_set))],
["bloom-tightening-ratio", &*configs::BLOOM_TIGHTENING_RATIO, configs::TIGHTENING_RATIO_DEFAULT, ConfigurationFlags::DEFAULT, None, Some(Box::new(configs::on_string_config_set))],
],
bool: [
["bloom-use-random-seed", &*configs::BLOOM_USE_RANDOM_SEED, configs::BLOOM_USE_RANDOM_SEED_DEFAULT, ConfigurationFlags::DEFAULT, None],
Expand Down
32 changes: 32 additions & 0 deletions tests/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,22 @@ def test_debug_cmd(self):
else:
madd_scenario_object_digest == madd_default_object_digest

# scenario 6 validates that digest differs on bloom objects after changing the tightening_ratio config
client.execute_command('BF.RESERVE tightening_ratio 0.001 1000')
assert self.client.execute_command('CONFIG SET bf.bloom-tightening-ratio 0.75') == b'OK'
client.execute_command('BF.RESERVE tightening_ratio2 0.001 1000')
scenario_tightening_ratio_object_digest = client.execute_command('DEBUG DIGEST-VALUE tightening_ratio')
scenario_tightening_ratio2_digest = client.execute_command('DEBUG DIGEST-VALUE tightening_ratio2')
assert scenario_tightening_ratio_object_digest != scenario_tightening_ratio2_digest

# scenario 7 validates that digest differs on bloom objects after changing the fp_rate config
client.execute_command('BF.INSERT fp_rate capacity 1000 items 1')
assert self.client.execute_command('CONFIG SET bf.bloom-fp-rate 0.5') == b'OK'
client.execute_command('BF.INSERT fp_rate2 capacity 1000 items 1')
fp_rate_object_digest = client.execute_command('DEBUG DIGEST-VALUE fp_rate')
scenario_fp_rate2_digest = client.execute_command('DEBUG DIGEST-VALUE fp_rate2')
assert fp_rate_object_digest != scenario_fp_rate2_digest

def test_bloom_wrong_type(self):
# List of all bloom commands
bloom_commands = [
Expand All @@ -323,3 +339,19 @@ def test_bloom_wrong_type(self):
except Exception as e:

assert str(e) == f"WRONGTYPE Operation against a key holding the wrong kind of value"

def test_bloom_string_config_set(self):
"""
This is a test that validates the bloom string configuration set logic.
"""
assert self.client.execute_command('CONFIG SET bf.bloom-fp-rate 0.1') == b'OK'
assert self.client.execute_command('CONFIG SET bf.bloom-tightening-ratio 0.75') == b'OK'

assert self.client.execute_command('CONFIG GET bf.bloom-fp-rate')[1] == b'0.1'
assert self.client.execute_command('CONFIG GET bf.bloom-tightening-ratio')[1] == b'0.75'
try:
assert self.client.execute_command('CONFIG SET bf.bloom-fp-rate 1.1') == b'ERR (0 < error rate range < 1)'\
and self.client.execute_command('CONFIG SET bf.bloom-tightening-ratio 1.75') == b'ERR (0 < error ratio range < 1)'
except ResponseError as e:
assert str(e) == f"CONFIG SET failed (possibly related to argument 'bf.bloom-fp-rate') - ERR (0 < error rate range < 1)"\
and f"CONFIG SET failed (possibly related to argument 'bf.bloom-tightening-ratio') - ERR (0 < error rate range < 1)"
6 changes: 6 additions & 0 deletions tests/test_replication.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ def test_deterministic_replication(self):
# replicated with the properties below.
assert self.client.execute_command('CONFIG SET bf.bloom-capacity 1000') == b'OK'
assert self.client.execute_command('CONFIG SET bf.bloom-expansion 3') == b'OK'
assert self.client.execute_command('CONFIG SET bf.bloom-fp-rate 0.1') == b'OK'
assert self.client.execute_command('CONFIG SET bf.bloom-tightening-ratio 0.75') == b'OK'
# Test bloom object creation with every command type.
bloom_write_cmds = [
('BF.ADD', 'BF.ADD key item'),
Expand All @@ -181,3 +183,7 @@ def test_deterministic_replication(self):
assert object_digest_primary == debug_digest_replica
self.client.execute_command('FLUSHALL')
self.waitForReplicaToSyncUp(self.replicas[0])
assert self.replicas[0].client.execute_command('CONFIG GET bf.bloom-capacity')[1] == b'100'
assert self.replicas[0].client.execute_command('CONFIG GET bf.bloom-expansion')[1] == b'2'
assert self.replicas[0].client.execute_command('CONFIG GET bf.bloom-fp-rate')[1] == b'0.01'
assert self.replicas[0].client.execute_command('CONFIG GET bf.bloom-tightening-ratio')[1] == b'0.5'

0 comments on commit 9baf6fc

Please sign in to comment.