Skip to content

Commit

Permalink
Moving producers/consumers client connection creation to Environment (#…
Browse files Browse the repository at this point in the history
…259)

* moving connection logic inside environment

* minor udpate / making create_client public at crate level
  • Loading branch information
DanielePalaia authored Nov 25, 2024
1 parent 32bfe53 commit 1de1526
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 95 deletions.
54 changes: 4 additions & 50 deletions src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,9 @@ use crate::error::ConsumerStoreOffsetError;
use crate::{
client::{MessageHandler, MessageResult},
error::{ConsumerCloseError, ConsumerCreateError, ConsumerDeliveryError},
Client, ClientOptions, Environment, MetricsCollector,
Client, Environment, MetricsCollector,
};
use futures::{future::BoxFuture, task::AtomicWaker, Stream};
use rand::rngs::StdRng;
use rand::{seq::SliceRandom, SeedableRng};

type FilterPredicate = Option<Arc<dyn Fn(&Message) -> bool + Send + Sync>>;

Expand Down Expand Up @@ -132,56 +130,12 @@ impl ConsumerBuilder {
return Err(ConsumerCreateError::SingleActiveConsumerNotSupported);
}

// Connect to the user specified node first, then look for a random replica to connect to instead.
// This is recommended for load balancing purposes
let mut opt_with_client_provided_name = self.environment.options.client_options.clone();
opt_with_client_provided_name.client_provided_name = self.client_provided_name.clone();
let collector = self.environment.options.client_options.collector.clone();

let mut client = self
let client = self
.environment
.create_client_with_options(opt_with_client_provided_name)
.create_consumer_client(stream, self.client_provided_name.clone())
.await?;
let collector = self.environment.options.client_options.collector.clone();
if let Some(metadata) = client.metadata(vec![stream.to_string()]).await?.get(stream) {
// If there are no replicas we do not reassign client, meaning we just keep reading from the leader.
// This is desired behavior in case there is only one node in the cluster.
if let Some(replica) = metadata.replicas.choose(&mut StdRng::from_entropy()) {
tracing::debug!(
"Picked replica {:?} out of possible candidates {:?} for stream {}",
replica,
metadata.replicas,
stream
);
let load_balancer_mode = self.environment.options.client_options.load_balancer_mode;
if load_balancer_mode {
let options = self.environment.options.client_options.clone();
loop {
let temp_client = Client::connect(options.clone()).await?;
let mapping = temp_client.connection_properties().await;
if let Some(advertised_host) = mapping.get("advertised_host") {
if *advertised_host == replica.host.clone() {
client.close().await?;
client = temp_client;
break;
}
}
temp_client.close().await?;
}
} else {
client.close().await?;
client = Client::connect(ClientOptions {
host: replica.host.clone(),
port: replica.port as u16,
..self.environment.options.client_options
})
.await?;
}
}
} else {
return Err(ConsumerCreateError::StreamDoesNotExist {
stream: stream.into(),
});
}

let subscription_id = 1;
let (tx, rx) = channel(10000);
Expand Down
113 changes: 112 additions & 1 deletion src/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@ use std::time::Duration;

use crate::producer::NoDedup;
use crate::types::OffsetSpecification;
use rand::prelude::SliceRandom;
use rand::rngs::StdRng;
use rand::SeedableRng;
use std::collections::HashMap;

use crate::{
client::{Client, ClientOptions, MetricsCollector},
consumer::ConsumerBuilder,
error::StreamDeleteError,
error::{ConsumerCreateError, ProducerCreateError, StreamDeleteError},
producer::ProducerBuilder,
stream_creator::StreamCreator,
superstream::RoutingStrategy,
Expand All @@ -36,6 +39,114 @@ impl Environment {
Ok(Environment { options })
}

pub(crate) async fn create_producer_client(
self,
stream: &str,
client_provided_name: String,
) -> Result<Client, ProducerCreateError> {
let mut opt_with_client_provided_name = self.options.client_options.clone();
opt_with_client_provided_name.client_provided_name = client_provided_name.clone();

let mut client = self
.create_client_with_options(opt_with_client_provided_name.clone())
.await?;

if let Some(metadata) = client.metadata(vec![stream.to_string()]).await?.get(stream) {
tracing::debug!(
"Connecting to leader node {:?} of stream {}",
metadata.leader,
stream
);
let load_balancer_mode = self.options.client_options.load_balancer_mode;
if load_balancer_mode {
// Producer must connect to leader node
let options: ClientOptions = self.options.client_options.clone();
loop {
let temp_client = Client::connect(options.clone()).await?;
let mapping = temp_client.connection_properties().await;
if let Some(advertised_host) = mapping.get("advertised_host") {
if *advertised_host == metadata.leader.host.clone() {
client.close().await?;
client = temp_client;
break;
}
}
temp_client.close().await?;
}
} else {
client.close().await?;
client = Client::connect(ClientOptions {
host: metadata.leader.host.clone(),
port: metadata.leader.port as u16,
..opt_with_client_provided_name.clone()
})
.await?
};
} else {
return Err(ProducerCreateError::StreamDoesNotExist {
stream: stream.into(),
});
}

Ok(client)
}

pub(crate) async fn create_consumer_client(
self,
stream: &str,
client_provided_name: String,
) -> Result<Client, ConsumerCreateError> {
let mut opt_with_client_provided_name = self.options.client_options.clone();
opt_with_client_provided_name.client_provided_name = client_provided_name.clone();

let mut client = self
.create_client_with_options(opt_with_client_provided_name.clone())
.await?;

if let Some(metadata) = client.metadata(vec![stream.to_string()]).await?.get(stream) {
// If there are no replicas we do not reassign client, meaning we just keep reading from the leader.
// This is desired behavior in case there is only one node in the cluster.
if let Some(replica) = metadata.replicas.choose(&mut StdRng::from_entropy()) {
tracing::debug!(
"Picked replica {:?} out of possible candidates {:?} for stream {}",
replica,
metadata.replicas,
stream
);
let load_balancer_mode = self.options.client_options.load_balancer_mode;
if load_balancer_mode {
let options = self.options.client_options.clone();
loop {
let temp_client = Client::connect(options.clone()).await?;
let mapping = temp_client.connection_properties().await;
if let Some(advertised_host) = mapping.get("advertised_host") {
if *advertised_host == replica.host.clone() {
client.close().await?;
client = temp_client;
break;
}
}
temp_client.close().await?;
}
} else {
client.close().await?;
client = Client::connect(ClientOptions {
host: replica.host.clone(),
port: replica.port as u16,
..self.options.client_options
})
.await?;
}
}
} else {
return Err(ConsumerCreateError::StreamDoesNotExist {
stream: stream.into(),
});
}

Ok(client)
}

/// Returns a builder for creating a stream with a specific configuration
pub fn stream_creator(&self) -> StreamCreator {
StreamCreator::new(self.clone())
Expand Down
48 changes: 4 additions & 44 deletions src/producer.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::future::Future;
use std::vec;
use std::{
marker::PhantomData,
sync::{
Expand All @@ -19,7 +18,7 @@ use rabbitmq_stream_protocol::{message::Message, ResponseCode, ResponseKind};

use crate::client::ClientMessage;
use crate::MetricsCollector;
use crate::{client::MessageHandler, ClientOptions, RabbitMQStreamResult};
use crate::{client::MessageHandler, RabbitMQStreamResult};
use crate::{
client::{Client, MessageResult},
environment::Environment,
Expand Down Expand Up @@ -119,12 +118,11 @@ impl<T> ProducerBuilder<T> {
// The leader is the recommended node for writing, because writing to a replica will redundantly pass these messages
// to the leader anyway - it is the only one capable of writing.

let mut opt_with_client_provided_name = self.environment.options.client_options.clone();
opt_with_client_provided_name.client_provided_name = self.client_provided_name.clone();
let metrics_collector = self.environment.options.client_options.collector.clone();

let mut client = self
let client = self
.environment
.create_client_with_options(opt_with_client_provided_name.clone())
.create_producer_client(stream, self.client_provided_name.clone())
.await?;

let mut publish_version = 1;
Expand All @@ -137,44 +135,6 @@ impl<T> ProducerBuilder<T> {
}
}

let metrics_collector = self.environment.options.client_options.collector.clone();
if let Some(metadata) = client.metadata(vec![stream.to_string()]).await?.get(stream) {
tracing::debug!(
"Connecting to leader node {:?} of stream {}",
metadata.leader,
stream
);
let load_balancer_mode = self.environment.options.client_options.load_balancer_mode;
if load_balancer_mode {
// Producer must connect to leader node
let options: ClientOptions = self.environment.options.client_options.clone();
loop {
let temp_client = Client::connect(options.clone()).await?;
let mapping = temp_client.connection_properties().await;
if let Some(advertised_host) = mapping.get("advertised_host") {
if *advertised_host == metadata.leader.host.clone() {
client.close().await?;
client = temp_client;
break;
}
}
temp_client.close().await?;
}
} else {
client.close().await?;
client = Client::connect(ClientOptions {
host: metadata.leader.host.clone(),
port: metadata.leader.port as u16,
..opt_with_client_provided_name.clone()
})
.await?
};
} else {
return Err(ProducerCreateError::StreamDoesNotExist {
stream: stream.into(),
});
}

let waiting_confirmations: WaiterMap = Arc::new(DashMap::new());

let confirm_handler = ProducerConfirmHandler {
Expand Down

0 comments on commit 1de1526

Please sign in to comment.