Skip to content

Commit

Permalink
Add tightening_ratio support in BloomFilterType structure
Browse files Browse the repository at this point in the history
Signed-off-by: Nihal Mehta <[email protected]>
  • Loading branch information
nnmehta committed Nov 28, 2024
1 parent dc303c1 commit 6bec972
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 42 deletions.
29 changes: 28 additions & 1 deletion src/bloom/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::bloom::utils::BloomFilterType;
use crate::configs;
use crate::configs::{
BLOOM_CAPACITY_MAX, BLOOM_CAPACITY_MIN, BLOOM_EXPANSION_MAX, BLOOM_EXPANSION_MIN,
BLOOM_FP_RATE_MAX, BLOOM_FP_RATE_MIN,
BLOOM_FP_RATE_MAX, BLOOM_FP_RATE_MIN, TIGHTENING_RATIO_MAX, TIGHTENING_RATIO_MIN,
};
use std::sync::atomic::Ordering;
use valkey_module::ContextFlags;
Expand Down Expand Up @@ -110,10 +110,12 @@ pub fn bloom_filter_add_value(
None => {
// Instantiate empty bloom filter.
let fp_rate = configs::BLOOM_FP_RATE_DEFAULT;
let tightening_ratio = configs::TIGHTENING_RATIO;
let capacity = configs::BLOOM_CAPACITY.load(Ordering::Relaxed) as u32;
let expansion = configs::BLOOM_EXPANSION.load(Ordering::Relaxed) as u32;
let mut bloom = match BloomFilterType::new_reserved(
fp_rate,
tightening_ratio,
capacity,
expansion,
validate_size_limit,
Expand Down Expand Up @@ -228,6 +230,16 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> Valke
}
};
curr_cmd_idx += 1;
let tightening_ratio = match input_args[curr_cmd_idx].to_string_lossy().parse::<f64>() {
Ok(num) if num > TIGHTENING_RATIO_MIN && num < TIGHTENING_RATIO_MAX => num,
Ok(num) if !(num > TIGHTENING_RATIO_MIN && num < TIGHTENING_RATIO_MAX) => {
return Err(ValkeyError::Str(utils::ERROR_RATIO_RANGE));
}
_ => {
return Err(ValkeyError::Str(utils::BAD_ERROR_RATIO));
}
};
curr_cmd_idx += 1;
// Parse the capacity
let capacity = match input_args[curr_cmd_idx].to_string_lossy().parse::<u32>() {
Ok(num) if (BLOOM_CAPACITY_MIN..=BLOOM_CAPACITY_MAX).contains(&num) => num,
Expand Down Expand Up @@ -278,6 +290,7 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> Valke
let validate_size_limit = !ctx.get_flags().contains(ContextFlags::REPLICATED);
let bloom = match BloomFilterType::new_reserved(
fp_rate,
tightening_ratio,
capacity,
expansion,
validate_size_limit,
Expand Down Expand Up @@ -307,6 +320,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
let filter_name = &input_args[idx];
idx += 1;
let mut fp_rate = configs::BLOOM_FP_RATE_DEFAULT;
let mut tightening_ratio = configs::TIGHTENING_RATIO;
let mut capacity = configs::BLOOM_CAPACITY.load(Ordering::Relaxed) as u32;
let mut expansion = configs::BLOOM_EXPANSION.load(Ordering::Relaxed) as u32;
let mut nocreate = false;
Expand All @@ -327,6 +341,18 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
}
};
}
"RATIO" => {
idx += 1;
tightening_ratio = match input_args[idx].to_string_lossy().parse::<f64>() {
Ok(num) if num > TIGHTENING_RATIO_MIN && num < TIGHTENING_RATIO_MAX => num,
Ok(num) if !(num > TIGHTENING_RATIO_MIN && num < TIGHTENING_RATIO_MAX) => {
return Err(ValkeyError::Str(utils::ERROR_RATIO_RANGE));
}
_ => {
return Err(ValkeyError::Str(utils::BAD_ERROR_RATIO));
}
};
}
"CAPACITY" => {
if idx >= (argc - 1) {
return Err(ValkeyError::WrongArity);
Expand Down Expand Up @@ -405,6 +431,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
}
let mut bloom = match BloomFilterType::new_reserved(
fp_rate,
tightening_ratio,
capacity,
expansion,
validate_size_limit,
Expand Down
5 changes: 5 additions & 0 deletions src/bloom/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ impl ValkeyDataType for BloomFilterType {
let Ok(fp_rate) = raw::load_double(rdb) else {
return None;
};
let Ok(tightening_ratio) = raw::load_double(rdb) else {
return None;
};
for i in 0..num_filters {
let Ok(bitmap) = raw::load_string_buffer(rdb) else {
return None;
Expand Down Expand Up @@ -124,6 +127,7 @@ impl ValkeyDataType for BloomFilterType {
let item = BloomFilterType {
expansion: expansion as u32,
fp_rate,
tightening_ratio,
filters,
};
Some(item)
Expand All @@ -133,6 +137,7 @@ impl ValkeyDataType for BloomFilterType {
fn debug_digest(&self, mut dig: Digest) {
dig.add_long_long(self.expansion.into());
dig.add_string_buffer(&self.fp_rate.to_le_bytes());
// dig.add_string_buffer(&self.tightening_ratio.to_le_bytes());
for filter in &self.filters {
dig.add_string_buffer(&filter.bloom.bitmap());
for &(key1, key2) in &filter.sip_keys() {
Expand Down
110 changes: 69 additions & 41 deletions src/bloom/utils.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::{
configs::{
self, BLOOM_EXPANSION_MAX, BLOOM_EXPANSION_MIN, BLOOM_FP_RATE_MAX, BLOOM_FP_RATE_MIN,
TIGHTENING_RATIO_MAX, TIGHTENING_RATIO_MIN,
},
metrics,
};
Expand All @@ -24,6 +25,8 @@ pub const BAD_EXPANSION: &str = "ERR bad expansion";
pub const BAD_CAPACITY: &str = "ERR bad capacity";
pub const BAD_ERROR_RATE: &str = "ERR bad error rate";
pub const ERROR_RATE_RANGE: &str = "ERR (0 < error rate range < 1)";
pub const BAD_ERROR_RATIO: &str = "ERR bad error ratio";
pub const ERROR_RATIO_RANGE: &str = "ERR (0 < error ratio range < 1)";
pub const CAPACITY_LARGER_THAN_0: &str = "ERR (capacity should be larger than 0)";
pub const MAX_NUM_SCALING_FILTERS: &str = "ERR bloom object reached max number of filters";
pub const UNKNOWN_ARGUMENT: &str = "ERR unknown argument received";
Expand Down Expand Up @@ -69,13 +72,15 @@ impl BloomError {
pub struct BloomFilterType {
pub expansion: u32,
pub fp_rate: f64,
pub tightening_ratio: f64,
pub filters: Vec<BloomFilter>,
}

impl BloomFilterType {
/// Create a new BloomFilterType object.
pub fn new_reserved(
fp_rate: f64,
tightening_ratio: f64,
capacity: u32,
expansion: u32,
validate_size_limit: bool,
Expand All @@ -97,6 +102,7 @@ impl BloomFilterType {
let bloom = BloomFilterType {
expansion,
fp_rate,
tightening_ratio,
filters,
};
Ok(bloom)
Expand All @@ -117,6 +123,7 @@ impl BloomFilterType {
BloomFilterType {
expansion: from_bf.expansion,
fp_rate: from_bf.fp_rate,
tightening_ratio: from_bf.tightening_ratio,
filters,
}
}
Expand Down Expand Up @@ -245,39 +252,47 @@ impl BloomFilterType {
1 => {
// always use new version to init bloomFilterType.
// This is to ensure that the new fields can be recognized when the object is serialized and deserialized in the future.
let (expansion, fp_rate, filters): (u32, f64, Vec<BloomFilter>) =
match bincode::deserialize::<(u32, f64, Vec<BloomFilter>)>(&decoded_bytes[1..])
{
Ok(values) => {
if !(BLOOM_EXPANSION_MIN..=BLOOM_EXPANSION_MAX).contains(&values.0) {
return Err(BloomError::BadExpansion);
}
if !(values.1 > BLOOM_FP_RATE_MIN && values.1 < BLOOM_FP_RATE_MAX) {
return Err(BloomError::ErrorRateRange);
}
if values.2.len() >= configs::MAX_FILTERS_PER_OBJ as usize {
return Err(BloomError::MaxNumScalingFilters);
}
for _filter in values.2.iter() {
// Reject the request, if the operation will result in creation of a filter of size greater than what is allowed.
if validate_size_limit
&& _filter.number_of_bytes()
> configs::BLOOM_MEMORY_LIMIT_PER_FILTER
.load(Ordering::Relaxed)
as usize
{
return Err(BloomError::ExceedsMaxBloomSize);
}
}
values
let (expansion, fp_rate, tightening_ratio, filters): (
u32,
f64,
f64,
Vec<BloomFilter>,
) = match bincode::deserialize::<(u32, f64, f64, Vec<BloomFilter>)>(
&decoded_bytes[1..],
) {
Ok(values) => {
if !(BLOOM_EXPANSION_MIN..=BLOOM_EXPANSION_MAX).contains(&values.0) {
return Err(BloomError::BadExpansion);
}
if !(values.1 > BLOOM_FP_RATE_MIN && values.1 < BLOOM_FP_RATE_MAX) {
return Err(BloomError::ErrorRateRange);
}
Err(_) => {
return Err(BloomError::DecodeBloomFilterFailed);
if !(values.2 > TIGHTENING_RATIO_MIN && values.1 < TIGHTENING_RATIO_MAX) {
return Err(BloomError::ErrorRateRange);
}
};
if values.3.len() >= configs::MAX_FILTERS_PER_OBJ as usize {
return Err(BloomError::MaxNumScalingFilters);
}
for _filter in values.3.iter() {
// Reject the request, if the operation will result in creation of a filter of size greater than what is allowed.
if validate_size_limit
&& _filter.number_of_bytes()
> configs::BLOOM_MEMORY_LIMIT_PER_FILTER.load(Ordering::Relaxed)
as usize
{
return Err(BloomError::ExceedsMaxBloomSize);
}
}
values
}
Err(_) => {
return Err(BloomError::DecodeBloomFilterFailed);
}
};
let item = BloomFilterType {
expansion,
fp_rate,
tightening_ratio,
filters,
};
// add bloom filter type metrics.
Expand Down Expand Up @@ -613,13 +628,19 @@ mod tests {
let rand_prefix = random_prefix(7);
// 1 in every 1000 operations is expected to be a false positive.
let expected_fp_rate: f64 = 0.001;
let expected_tightening_ratio: f64 = 0.5;
let initial_capacity = 10000;
// Expansion of 0 indicates non scaling.
let expansion = 0;
// Validate the non scaling behavior of the bloom filter.
let mut bf =
BloomFilterType::new_reserved(expected_fp_rate, initial_capacity, expansion, true)
.expect("Expect bloom creation to succeed");
let mut bf = BloomFilterType::new_reserved(
expected_fp_rate,
expected_tightening_ratio,
initial_capacity,
expansion,
true,
)
.expect("Expect bloom creation to succeed");
let (error_count, add_operation_idx) =
add_items_till_capacity(&mut bf, initial_capacity as i64, 1, &rand_prefix);
assert_eq!(
Expand Down Expand Up @@ -671,12 +692,18 @@ mod tests {
let rand_prefix = random_prefix(7);
// 1 in every 1000 operations is expected to be a false positive.
let expected_fp_rate: f64 = 0.001;
let expected_tightening_ratio: f64 = 0.5;
let initial_capacity = 10000;
let expansion = 2;
let num_filters_to_scale = 5;
let mut bf =
BloomFilterType::new_reserved(expected_fp_rate, initial_capacity, expansion, true)
.expect("Expect bloom creation to succeed");
let mut bf = BloomFilterType::new_reserved(
expected_fp_rate,
expected_tightening_ratio,
initial_capacity,
expansion,
true,
)
.expect("Expect bloom creation to succeed");
assert_eq!(bf.capacity(), initial_capacity as i64);
assert_eq!(bf.cardinality(), 0);
let mut total_error_count = 0;
Expand Down Expand Up @@ -749,18 +776,18 @@ mod tests {
fn test_exceeded_size_limit() {
// Validate that bloom filter allocations within bloom objects are rejected if their memory usage would be beyond
// the configured limit.
let result = BloomFilterType::new_reserved(0.5_f64, u32::MAX, 1, true);
let result = BloomFilterType::new_reserved(0.5_f64, 0.5_f64, u32::MAX, 1, true);
assert_eq!(result.err(), Some(BloomError::ExceedsMaxBloomSize));
let capacity = 50000000;
assert!(!BloomFilter::validate_size(capacity, 0.001_f64));
let result2 = BloomFilterType::new_reserved(0.001_f64, capacity, 1, true);
let result2 = BloomFilterType::new_reserved(0.001_f64, 0.5_f64, capacity, 1, true);
assert_eq!(result2.err(), Some(BloomError::ExceedsMaxBloomSize));
}

#[test]
fn test_bf_encode_and_decode() {
// arrange: prepare bloom filter
let mut bf = BloomFilterType::new_reserved(0.5_f64, 1000_u32, 2, true).unwrap();
let mut bf = BloomFilterType::new_reserved(0.5_f64, 0.5_f64, 1000_u32, 2, true).unwrap();
let key = "key";
let _ = bf.add_item(key.as_bytes(), true);

Expand All @@ -779,6 +806,7 @@ mod tests {

// verify new_bf and bf
assert_eq!(bf.fp_rate, new_bf.fp_rate);
assert_eq!(bf.tightening_ratio, new_bf.tightening_ratio);
assert_eq!(bf.expansion, new_bf.expansion);
assert_eq!(bf.capacity(), new_bf.capacity());

Expand All @@ -789,7 +817,7 @@ mod tests {
#[test]
fn test_bf_decode_when_unsupported_version_should_failed() {
// arrange: prepare bloom filter
let mut bf = BloomFilterType::new_reserved(0.5_f64, 1000_u32, 2, true).unwrap();
let mut bf = BloomFilterType::new_reserved(0.5_f64, 0.5_f64, 1000_u32, 2, true).unwrap();
let key = "key";
let _ = bf.add_item(key.as_bytes(), true).unwrap();

Expand All @@ -811,7 +839,7 @@ mod tests {
#[test]
fn test_bf_decode_when_bytes_is_empty_should_failed() {
// arrange: prepare bloom filter
let mut bf = BloomFilterType::new_reserved(0.5_f64, 1000_u32, 2, true).unwrap();
let mut bf = BloomFilterType::new_reserved(0.5_f64, 0.5_f64, 1000_u32, 2, true).unwrap();
let key = "key";
let _ = bf.add_item(key.as_bytes(), true);

Expand All @@ -831,7 +859,7 @@ mod tests {
#[test]
fn test_bf_decode_when_bytes_is_exceed_limit_should_failed() {
// arrange: prepare bloom filter
let mut bf = BloomFilterType::new_reserved(0.5_f64, 1000_u32, 2, true).unwrap();
let mut bf = BloomFilterType::new_reserved(0.5_f64, 0.5_f64, 1000_u32, 2, true).unwrap();
let key = "key";
let _ = bf.add_item(key.as_bytes(), true);
let origin_expansion = bf.expansion;
Expand Down Expand Up @@ -873,7 +901,7 @@ mod tests {

// 3. build a larger than 64mb filter
let extra_large_filter =
BloomFilterType::new_reserved(0.01_f64, 57000000, 2, false).unwrap();
BloomFilterType::new_reserved(0.01_f64, 0.5_f64, 57000000, 2, false).unwrap();
let vec = extra_large_filter.encode_bloom_filter().unwrap();
// should return error
assert_eq!(
Expand Down
2 changes: 2 additions & 0 deletions src/configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ lazy_static! {
// Tightening ratio used during scale out for the calculation of fp_rate of every new filter within a bloom object to
// maintain the bloom object's overall fp_rate to the configured value.
pub const TIGHTENING_RATIO: f64 = 0.5;
pub const TIGHTENING_RATIO_MIN: f64 = 0.001;
pub const TIGHTENING_RATIO_MAX: f64 = 1.0;
// Max number of filters allowed within a bloom object.
pub const MAX_FILTERS_PER_OBJ: i32 = i32::MAX;
/// Below constants are fixed seed and sip keys to help create bloom objects using the same seed and to restore the bloom objects with the same hasher which
Expand Down

0 comments on commit 6bec972

Please sign in to comment.