Skip to content
Permalink

Comparing changes

This is a direct comparison between two commits made in this repository or its related repositories. View the default comparison for this range or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: valkey-io/valkey-bloom
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: e08ca07c34ef5643f34b68692a1dfd34abb0d700
Choose a base ref
..
head repository: valkey-io/valkey-bloom
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: f4eedcfb9b9402f4664105e433cb1898d57972ad
Choose a head ref
Showing with 26 additions and 111 deletions.
  1. +1 −1 Cargo.toml
  2. +12 −33 src/bloom/command_handler.rs
  3. +0 −5 src/bloom/data_type.rs
  4. +12 −43 src/bloom/utils.rs
  5. +0 −2 src/configs.rs
  6. +0 −1 src/wrapper/bloom_callback.rs
  7. +0 −25 tests/test_basic.py
  8. +1 −1 tests/test_bloom_metrics.py
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -12,7 +12,7 @@ homepage = "https://github.com/valkey-io/valkey-bloom"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
valkey-module = { path = "../valkeymodule-rs", features = ["min-valkey-compatibility-version-8-0", "min-redis-compatibility-version-7-2"]}
valkey-module = { version = "0.1.3", features = ["min-valkey-compatibility-version-8-0", "min-redis-compatibility-version-7-2"]}
valkey-module-macros = "0"
linkme = "0"
bloomfilter = { version = "3.0.1", features = ["serde"] }
45 changes: 12 additions & 33 deletions src/bloom/command_handler.rs
Original file line number Diff line number Diff line change
@@ -4,7 +4,7 @@ use crate::bloom::utils::BloomFilterType;
use crate::configs;
use crate::configs::{
BLOOM_CAPACITY_MAX, BLOOM_CAPACITY_MIN, BLOOM_EXPANSION_MAX, BLOOM_EXPANSION_MIN,
BLOOM_FP_RATE_MAX, BLOOM_FP_RATE_MIN, TIGHTENING_RATIO_MAX, TIGHTENING_RATIO_MIN,
BLOOM_FP_RATE_MAX, BLOOM_FP_RATE_MIN,
};
use std::sync::atomic::Ordering;
use valkey_module::ContextFlags;
@@ -87,7 +87,7 @@ pub fn bloom_filter_add_value(
let value = match filter_key.get_value::<BloomFilterType>(&BLOOM_FILTER_TYPE) {
Ok(v) => v,
Err(_) => {
return Err(ValkeyError::WrongType);
return Err(ValkeyError::Str(utils::ERROR));
}
};
// Skip bloom filter size validation on replicated cmds.
@@ -110,13 +110,11 @@ pub fn bloom_filter_add_value(
None => {
// Instantiate empty bloom filter.
let fp_rate = configs::BLOOM_FP_RATE_DEFAULT;
let tightening_ratio = configs::TIGHTENING_RATIO;
let capacity = configs::BLOOM_CAPACITY.load(Ordering::Relaxed) as u32;
let expansion = configs::BLOOM_EXPANSION.load(Ordering::Relaxed) as u32;
let use_random_seed = configs::BLOOM_USE_RANDOM_SEED.load(Ordering::Relaxed);
let mut bloom = match BloomFilterType::new_reserved(
fp_rate,
tightening_ratio,
capacity,
expansion,
use_random_seed,
@@ -175,7 +173,7 @@ pub fn bloom_filter_exists(
let value = match filter_key.get_value::<BloomFilterType>(&BLOOM_FILTER_TYPE) {
Ok(v) => v,
Err(_) => {
return Err(ValkeyError::WrongType);
return Err(ValkeyError::Str(utils::ERROR));
}
};
if !multi {
@@ -203,7 +201,7 @@ pub fn bloom_filter_card(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyRe
let value = match filter_key.get_value::<BloomFilterType>(&BLOOM_FILTER_TYPE) {
Ok(v) => v,
Err(_) => {
return Err(ValkeyError::WrongType);
return Err(ValkeyError::Str(utils::ERROR));
}
};
match value {
@@ -272,7 +270,7 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> Valke
let value = match filter_key.get_value::<BloomFilterType>(&BLOOM_FILTER_TYPE) {
Ok(v) => v,
Err(_) => {
return Err(ValkeyError::WrongType);
return Err(ValkeyError::Str(utils::ERROR));
}
};
match value {
@@ -281,10 +279,8 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> Valke
let use_random_seed = configs::BLOOM_USE_RANDOM_SEED.load(Ordering::Relaxed);
// Skip bloom filter size validation on replicated cmds.
let validate_size_limit = !ctx.get_flags().contains(ContextFlags::REPLICATED);
let tightening_ratio = configs::TIGHTENING_RATIO;
let bloom = match BloomFilterType::new_reserved(
fp_rate,
tightening_ratio,
capacity,
expansion,
use_random_seed,
@@ -306,7 +302,6 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> Valke

pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyResult {
let argc = input_args.len();
let replicated_cmd = ctx.get_flags().contains(ContextFlags::REPLICATED);
// At the very least, we need: BF.INSERT <key> ITEMS <item>
if argc < 4 {
return Err(ValkeyError::WrongArity);
@@ -316,7 +311,6 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
let filter_name = &input_args[idx];
idx += 1;
let mut fp_rate = configs::BLOOM_FP_RATE_DEFAULT;
let mut tightening_ratio = configs::TIGHTENING_RATIO;
let mut capacity = configs::BLOOM_CAPACITY.load(Ordering::Relaxed) as u32;
let mut expansion = configs::BLOOM_EXPANSION.load(Ordering::Relaxed) as u32;
let mut nocreate = false;
@@ -337,21 +331,6 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
}
};
}
"TIGHTENING" if replicated_cmd => {
if idx >= (argc - 1) {
return Err(ValkeyError::WrongArity);
}
idx += 1;
tightening_ratio = match input_args[idx].to_string_lossy().parse::<f64>() {
Ok(num) if num > TIGHTENING_RATIO_MIN && num < TIGHTENING_RATIO_MAX => num,
Ok(num) if !(num > TIGHTENING_RATIO_MIN && num < TIGHTENING_RATIO_MAX) => {
return Err(ValkeyError::Str(utils::ERROR_RATIO_RANGE));
}
_ => {
return Err(ValkeyError::Str(utils::BAD_ERROR_RATIO));
}
};
}
"CAPACITY" => {
if idx >= (argc - 1) {
return Err(ValkeyError::WrongArity);
@@ -404,10 +383,11 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
let value = match filter_key.get_value::<BloomFilterType>(&BLOOM_FILTER_TYPE) {
Ok(v) => v,
Err(_) => {
return Err(ValkeyError::WrongType);
return Err(ValkeyError::Str(utils::ERROR));
}
};
// Skip bloom filter size validation on replicated cmds.
let validate_size_limit = !ctx.get_flags().contains(ContextFlags::REPLICATED);
let mut add_succeeded = false;
match value {
Some(bloom) => {
@@ -418,7 +398,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
bloom,
true,
&mut add_succeeded,
!replicated_cmd,
validate_size_limit,
);
replicate_and_notify_events(ctx, filter_name, add_succeeded, false);
response
@@ -430,11 +410,10 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
let use_random_seed = configs::BLOOM_USE_RANDOM_SEED.load(Ordering::Relaxed);
let mut bloom = match BloomFilterType::new_reserved(
fp_rate,
tightening_ratio,
capacity,
expansion,
use_random_seed,
!replicated_cmd,
validate_size_limit,
) {
Ok(bf) => bf,
Err(err) => return Err(ValkeyError::Str(err.as_str())),
@@ -446,7 +425,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
&mut bloom,
true,
&mut add_succeeded,
!replicated_cmd,
validate_size_limit,
);
match filter_key.set_value(&BLOOM_FILTER_TYPE, bloom) {
Ok(()) => {
@@ -472,7 +451,7 @@ pub fn bloom_filter_info(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyRe
let value = match filter_key.get_value::<BloomFilterType>(&BLOOM_FILTER_TYPE) {
Ok(v) => v,
Err(_) => {
return Err(ValkeyError::WrongType);
return Err(ValkeyError::Str(utils::ERROR));
}
};
match value {
@@ -534,7 +513,7 @@ pub fn bloom_filter_load(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyRe
Ok(v) => v,
Err(_) => {
// error
return Err(ValkeyError::WrongType);
return Err(ValkeyError::Str(utils::ERROR));
}
};
match filter {
5 changes: 0 additions & 5 deletions src/bloom/data_type.rs
Original file line number Diff line number Diff line change
@@ -71,9 +71,6 @@ impl ValkeyDataType for BloomFilterType {
let Ok(fp_rate) = raw::load_double(rdb) else {
return None;
};
let Ok(tightening_ratio) = raw::load_double(rdb) else {
return None;
};
let mut filters: Vec<BloomFilter> = Vec::with_capacity(num_filters as usize);
let Ok(is_seed_random_u64) = raw::load_unsigned(rdb) else {
return None;
@@ -124,7 +121,6 @@ impl ValkeyDataType for BloomFilterType {
let item = BloomFilterType {
expansion: expansion as u32,
fp_rate,
tightening_ratio,
is_seed_random,
filters,
};
@@ -135,7 +131,6 @@ impl ValkeyDataType for BloomFilterType {
fn debug_digest(&self, mut dig: Digest) {
dig.add_long_long(self.expansion.into());
dig.add_string_buffer(&self.fp_rate.to_le_bytes());
dig.add_string_buffer(&self.tightening_ratio.to_le_bytes());
for filter in &self.filters {
dig.add_string_buffer(filter.bloom.as_slice());
dig.add_long_long(filter.num_items.into());
Loading