diff --git a/src/bloom/command_handler.rs b/src/bloom/command_handler.rs index dad87a0..d32c3fb 100644 --- a/src/bloom/command_handler.rs +++ b/src/bloom/command_handler.rs @@ -110,12 +110,10 @@ pub fn bloom_filter_add_value( None => { // Instantiate empty bloom filter. let fp_rate = configs::BLOOM_FP_RATE_DEFAULT; - let tightening_ratio = configs::TIGHTENING_RATIO; let capacity = configs::BLOOM_CAPACITY.load(Ordering::Relaxed) as u32; let expansion = configs::BLOOM_EXPANSION.load(Ordering::Relaxed) as u32; let mut bloom = match BloomFilterType::new_reserved( fp_rate, - tightening_ratio, capacity, expansion, validate_size_limit, @@ -229,17 +227,6 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> Valke return Err(ValkeyError::Str(utils::BAD_ERROR_RATE)); } }; - curr_cmd_idx += 1; - let tightening_ratio = match input_args[curr_cmd_idx].to_string_lossy().parse::() { - Ok(num) if num > TIGHTENING_RATIO_MIN && num < TIGHTENING_RATIO_MAX => num, - Ok(num) if !(num > TIGHTENING_RATIO_MIN && num < TIGHTENING_RATIO_MAX) => { - return Err(ValkeyError::Str(utils::ERROR_RATIO_RANGE)); - } - _ => { - return Err(ValkeyError::Str(utils::BAD_ERROR_RATIO)); - } - }; - curr_cmd_idx += 1; // Parse the capacity let capacity = match input_args[curr_cmd_idx].to_string_lossy().parse::() { Ok(num) if (BLOOM_CAPACITY_MIN..=BLOOM_CAPACITY_MAX).contains(&num) => num, @@ -290,7 +277,6 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> Valke let validate_size_limit = !ctx.get_flags().contains(ContextFlags::REPLICATED); let bloom = match BloomFilterType::new_reserved( fp_rate, - tightening_ratio, capacity, expansion, validate_size_limit, @@ -312,7 +298,7 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> Valke pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyResult { let argc = input_args.len(); // At the very least, we need: BF.INSERT ITEMS - if argc < 4 { + if argc < 5 { return Err(ValkeyError::WrongArity); } let mut idx = 1; @@ -341,7 +327,10 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey } }; } - "RATIO" => { + "TIGHTENING" => { + if !ctx.get_flags().contains(ContextFlags::REPLICATED) { + return Err(ValkeyError::Str(utils::ERROR)); + } idx += 1; tightening_ratio = match input_args[idx].to_string_lossy().parse::() { Ok(num) if num > TIGHTENING_RATIO_MIN && num < TIGHTENING_RATIO_MAX => num, @@ -431,7 +420,6 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey } let mut bloom = match BloomFilterType::new_reserved( fp_rate, - tightening_ratio, capacity, expansion, validate_size_limit, diff --git a/src/bloom/data_type.rs b/src/bloom/data_type.rs index 0de5cbd..f2887bf 100644 --- a/src/bloom/data_type.rs +++ b/src/bloom/data_type.rs @@ -137,7 +137,7 @@ impl ValkeyDataType for BloomFilterType { fn debug_digest(&self, mut dig: Digest) { dig.add_long_long(self.expansion.into()); dig.add_string_buffer(&self.fp_rate.to_le_bytes()); - // dig.add_string_buffer(&self.tightening_ratio.to_le_bytes()); + dig.add_string_buffer(&self.tightening_ratio.to_le_bytes()); for filter in &self.filters { dig.add_string_buffer(&filter.bloom.bitmap()); for &(key1, key2) in &filter.sip_keys() { diff --git a/src/bloom/utils.rs b/src/bloom/utils.rs index 0a4e1d9..583aec3 100644 --- a/src/bloom/utils.rs +++ b/src/bloom/utils.rs @@ -80,7 +80,6 @@ impl BloomFilterType { /// Create a new BloomFilterType object. pub fn new_reserved( fp_rate: f64, - tightening_ratio: f64, capacity: u32, expansion: u32, validate_size_limit: bool, @@ -99,6 +98,7 @@ impl BloomFilterType { // Create the bloom filter and add to the main BloomFilter object. let bloom = BloomFilter::new(fp_rate, capacity); let filters = vec![bloom]; + let tightening_ratio = 0.5; let bloom = BloomFilterType { expansion, fp_rate, @@ -628,19 +628,13 @@ mod tests { let rand_prefix = random_prefix(7); // 1 in every 1000 operations is expected to be a false positive. let expected_fp_rate: f64 = 0.001; - let expected_tightening_ratio: f64 = 0.5; let initial_capacity = 10000; // Expansion of 0 indicates non scaling. let expansion = 0; // Validate the non scaling behavior of the bloom filter. - let mut bf = BloomFilterType::new_reserved( - expected_fp_rate, - expected_tightening_ratio, - initial_capacity, - expansion, - true, - ) - .expect("Expect bloom creation to succeed"); + let mut bf = + BloomFilterType::new_reserved(expected_fp_rate, initial_capacity, expansion, true) + .expect("Expect bloom creation to succeed"); let (error_count, add_operation_idx) = add_items_till_capacity(&mut bf, initial_capacity as i64, 1, &rand_prefix); assert_eq!( @@ -692,18 +686,12 @@ mod tests { let rand_prefix = random_prefix(7); // 1 in every 1000 operations is expected to be a false positive. let expected_fp_rate: f64 = 0.001; - let expected_tightening_ratio: f64 = 0.5; let initial_capacity = 10000; let expansion = 2; let num_filters_to_scale = 5; - let mut bf = BloomFilterType::new_reserved( - expected_fp_rate, - expected_tightening_ratio, - initial_capacity, - expansion, - true, - ) - .expect("Expect bloom creation to succeed"); + let mut bf = + BloomFilterType::new_reserved(expected_fp_rate, initial_capacity, expansion, true) + .expect("Expect bloom creation to succeed"); assert_eq!(bf.capacity(), initial_capacity as i64); assert_eq!(bf.cardinality(), 0); let mut total_error_count = 0; @@ -776,18 +764,18 @@ mod tests { fn test_exceeded_size_limit() { // Validate that bloom filter allocations within bloom objects are rejected if their memory usage would be beyond // the configured limit. - let result = BloomFilterType::new_reserved(0.5_f64, 0.5_f64, u32::MAX, 1, true); + let result = BloomFilterType::new_reserved(0.5_f64, u32::MAX, 1, true); assert_eq!(result.err(), Some(BloomError::ExceedsMaxBloomSize)); let capacity = 50000000; assert!(!BloomFilter::validate_size(capacity, 0.001_f64)); - let result2 = BloomFilterType::new_reserved(0.001_f64, 0.5_f64, capacity, 1, true); + let result2 = BloomFilterType::new_reserved(0.001_f64, capacity, 1, true); assert_eq!(result2.err(), Some(BloomError::ExceedsMaxBloomSize)); } #[test] fn test_bf_encode_and_decode() { // arrange: prepare bloom filter - let mut bf = BloomFilterType::new_reserved(0.5_f64, 0.5_f64, 1000_u32, 2, true).unwrap(); + let mut bf = BloomFilterType::new_reserved(0.5_f64, 1000_u32, 2, true).unwrap(); let key = "key"; let _ = bf.add_item(key.as_bytes(), true); @@ -817,7 +805,7 @@ mod tests { #[test] fn test_bf_decode_when_unsupported_version_should_failed() { // arrange: prepare bloom filter - let mut bf = BloomFilterType::new_reserved(0.5_f64, 0.5_f64, 1000_u32, 2, true).unwrap(); + let mut bf = BloomFilterType::new_reserved(0.5_f64, 1000_u32, 2, true).unwrap(); let key = "key"; let _ = bf.add_item(key.as_bytes(), true).unwrap(); @@ -839,7 +827,7 @@ mod tests { #[test] fn test_bf_decode_when_bytes_is_empty_should_failed() { // arrange: prepare bloom filter - let mut bf = BloomFilterType::new_reserved(0.5_f64, 0.5_f64, 1000_u32, 2, true).unwrap(); + let mut bf = BloomFilterType::new_reserved(0.5_f64, 1000_u32, 2, true).unwrap(); let key = "key"; let _ = bf.add_item(key.as_bytes(), true); @@ -859,7 +847,7 @@ mod tests { #[test] fn test_bf_decode_when_bytes_is_exceed_limit_should_failed() { // arrange: prepare bloom filter - let mut bf = BloomFilterType::new_reserved(0.5_f64, 0.5_f64, 1000_u32, 2, true).unwrap(); + let mut bf = BloomFilterType::new_reserved(0.5_f64, 1000_u32, 2, true).unwrap(); let key = "key"; let _ = bf.add_item(key.as_bytes(), true); let origin_expansion = bf.expansion; @@ -901,7 +889,7 @@ mod tests { // 3. build a larger than 64mb filter let extra_large_filter = - BloomFilterType::new_reserved(0.01_f64, 0.5_f64, 57000000, 2, false).unwrap(); + BloomFilterType::new_reserved(0.01_f64, 57000000, 2, false).unwrap(); let vec = extra_large_filter.encode_bloom_filter().unwrap(); // should return error assert_eq!( diff --git a/src/configs.rs b/src/configs.rs index f2e6d00..cf10579 100644 --- a/src/configs.rs +++ b/src/configs.rs @@ -32,7 +32,7 @@ lazy_static! { // Tightening ratio used during scale out for the calculation of fp_rate of every new filter within a bloom object to // maintain the bloom object's overall fp_rate to the configured value. pub const TIGHTENING_RATIO: f64 = 0.5; -pub const TIGHTENING_RATIO_MIN: f64 = 0.001; +pub const TIGHTENING_RATIO_MIN: f64 = 0.0; pub const TIGHTENING_RATIO_MAX: f64 = 1.0; // Max number of filters allowed within a bloom object. pub const MAX_FILTERS_PER_OBJ: i32 = i32::MAX; diff --git a/src/wrapper/bloom_callback.rs b/src/wrapper/bloom_callback.rs index e2bde8e..7333c80 100644 --- a/src/wrapper/bloom_callback.rs +++ b/src/wrapper/bloom_callback.rs @@ -21,6 +21,7 @@ pub unsafe extern "C" fn bloom_rdb_save(rdb: *mut raw::RedisModuleIO, value: *mu raw::save_unsigned(rdb, v.filters.len() as u64); raw::save_unsigned(rdb, v.expansion as u64); raw::save_double(rdb, v.fp_rate); + raw::save_double(rdb, v.tightening_ratio); let filter_list = &v.filters; let mut filter_list_iter = filter_list.iter().peekable(); while let Some(filter) = filter_list_iter.next() {