Skip to content

Commit

Permalink
super_stream implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
DanielePalaia committed Oct 9, 2024
1 parent 4a7cb45 commit ba13c06
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 0 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ thiserror = "1.0"
async-trait = "0.1.51"
rand = "0.8"
dashmap = "5.3.4"
murmur3 = "0.5.2"

[dev-dependencies]
tracing-subscriber = "0.3.1"
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ pub mod error;
mod offset_specification;
mod producer;
mod stream_creator;
mod superstream;

pub type RabbitMQStreamResult<T> = Result<T, error::ClientError>;

Expand Down
98 changes: 98 additions & 0 deletions src/superstream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
use crate::{
client::Client,
};

use murmur3::murmur3_32;
use std::any::Any;
use rabbitmq_stream_protocol::message::Message;
use std::io::Cursor;

trait Metadata {

async fn partitions(&mut self) -> Vec<String>;
async fn routes(&mut self, routing_key: String) -> Vec<String>;

}

struct DefaultSuperStreamMetadata {
super_stream: String,
client: Client,
partitions: Vec<String>,
routes: Vec<String>,
}

impl Metadata for DefaultSuperStreamMetadata {

async fn partitions(&mut self) -> Vec<String> {

if self.partitions.len() == 0 {

let response = self.client.partitions(self.super_stream.clone()).await;

self.partitions = response.unwrap().streams;

}

return self.partitions.clone()

}
async fn routes(&mut self, routing_key: String) -> Vec<String> {

if self.routes.len() == 0 {

let response = self.client.route(routing_key, self.super_stream.clone()).await;

self.routes = response.unwrap().streams;

}

return self.routes.clone()

}

}

trait RoutingStrategy {
async fn routes(&self, message: Message, metadata: & mut impl Metadata) -> Vec<String>;
}

struct RoutingKeyRoutingStrategy {
routing_extractor: &'static dyn Fn(Message) -> String,
}

impl RoutingStrategy for RoutingKeyRoutingStrategy {

async fn routes(&self, message: Message, metadata: & mut impl Metadata) -> Vec<String> {

let key = (self.routing_extractor)(message);

let routes = metadata.routes(key).await;

return routes;

}
}

struct HashRoutingMurmurStrategy {
routing_extractor: &'static dyn Fn(Message) -> String,
}

impl RoutingStrategy for HashRoutingMurmurStrategy {

async fn routes(&self, message: Message, metadata: & mut impl Metadata) -> Vec<String> {

let mut streams: Vec<String> = Vec::new();

let key = (self.routing_extractor)(message);
let hash_result = murmur3_32(&mut Cursor::new(key), 104729);

let number_of_partitions = metadata.partitions().await.len();
let route = hash_result.unwrap() % number_of_partitions as u32;
let partitions: Vec<String> = metadata.partitions().await;
let stream = partitions.into_iter().nth(route as usize).unwrap();
streams.push(stream);

return streams

}
}
Empty file added src/superstream_consumer.rs
Empty file.
Empty file added src/superstream_producer.rs
Empty file.

0 comments on commit ba13c06

Please sign in to comment.