Skip to content

Commit

Permalink
making Messages as references to allow borrowing
Browse files Browse the repository at this point in the history
  • Loading branch information
DanielePalaia committed Oct 15, 2024
1 parent e409ac5 commit aa085c3
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 10 deletions.
10 changes: 5 additions & 5 deletions src/superstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,13 @@ impl DefaultSuperStreamMetadata {

#[derive(Clone)]
pub struct RoutingKeyRoutingStrategy {
pub routing_extractor: &'static dyn Fn(Message) -> String,
pub routing_extractor: &'static dyn Fn(&Message) -> String,
}

impl RoutingKeyRoutingStrategy {
pub async fn routes(
&self,
message: Message,
message: &Message,
metadata: &mut DefaultSuperStreamMetadata,
) -> Vec<String> {
let key = (self.routing_extractor)(message);
Expand All @@ -64,19 +64,19 @@ impl RoutingKeyRoutingStrategy {

#[derive(Clone)]
pub struct HashRoutingMurmurStrategy {
pub routing_extractor: &'static dyn Fn(Message) -> String,
pub routing_extractor: &'static dyn Fn(&Message) -> String,
}

impl HashRoutingMurmurStrategy {
pub async fn routes(
&self,
message: Message,
message: &Message,
metadata: &mut DefaultSuperStreamMetadata,
) -> Vec<String> {
println!("im in routes");
let mut streams: Vec<String> = Vec::new();

let key = (self.routing_extractor)(message.clone());
let key = (self.routing_extractor)(message);
let hash_result = murmur3_32(&mut Cursor::new(key), 104729);

let number_of_partitions = metadata.partitions().await.len();
Expand Down
6 changes: 3 additions & 3 deletions src/superstream_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ impl SuperStreamProducer<NoDedup> {
{
let routes = match self.0.routing_strategy.clone() {
RoutingStrategy::HashRoutingStrategy(routing_strategy) => {
routing_strategy.routes(message.clone(), &mut self.2).await
routing_strategy.routes(&message, &mut self.2).await
}
RoutingStrategy::RoutingKeyStrategy(routing_strategy) => {
routing_strategy.routes(message.clone(), &mut self.2).await
routing_strategy.routes(&message, &mut self.2).await
}
};

Expand All @@ -67,7 +67,7 @@ impl SuperStreamProducer<NoDedup> {
}

let producer = self.1.get(route.as_str()).unwrap();
let result = producer.send(message.clone(), cb.clone()).await?;
let _ = producer.send(message.clone(), cb.clone()).await?;
}
Ok(())
}
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/producer_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,11 +391,11 @@ async fn producer_send_after_close_error() {
);
}

fn routing_key_strategy_value_extractor(message: Message) -> String {
fn routing_key_strategy_value_extractor(message: &Message) -> String {
return "0".to_string();
}

fn hash_strategy_value_extractor(message: Message) -> String {
fn hash_strategy_value_extractor(message: &Message) -> String {
let s = String::from_utf8(Vec::from(message.data().unwrap())).expect("Found invalid UTF-8");
return s;
}
Expand Down

0 comments on commit aa085c3

Please sign in to comment.