Skip to content

Commit

Permalink
Implements #95 and fixes #162
Browse files Browse the repository at this point in the history
  • Loading branch information
wolf4ood committed Feb 21, 2023
1 parent 634adb6 commit dad1581
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 8 deletions.
57 changes: 51 additions & 6 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use rabbitmq_stream_protocol::{
delete::Delete,
delete_publisher::DeletePublisherCommand,
generic::GenericResponse,
heart_beat::HeartBeatCommand,
metadata::MetadataCommand,
open::{OpenCommand, OpenResponse},
peer_properties::{PeerPropertiesCommand, PeerPropertiesResponse},
Expand All @@ -41,6 +42,7 @@ use rabbitmq_stream_protocol::{
types::PublishedMessage,
FromResponse, Request, Response, ResponseCode, ResponseKind,
};
use tokio_native_tls::TlsStream;
use tracing::trace;

pub use self::handler::{MessageHandler, MessageResult};
Expand All @@ -58,14 +60,14 @@ use std::{
pin::Pin,
sync::{atomic::AtomicU64, Arc},
task::{Context, Poll},
time::{Duration, Instant},
};
use std::{future::Future, sync::atomic::Ordering};
use tokio::io::AsyncRead;
use tokio::io::AsyncWrite;
use tokio::io::ReadBuf;
use tokio::sync::RwLock;
use tokio::{net::TcpStream, sync::Notify};
use tokio_native_tls::TlsStream;
use tokio::{sync::RwLock, task::JoinHandle};
use tokio_util::codec::Framed;

#[cfg_attr(docsrs, doc(cfg(feature = "tokio-stream")))]
Expand Down Expand Up @@ -125,6 +127,8 @@ pub struct ClientState {
handler: Option<Arc<dyn MessageHandler>>,
heartbeat: u32,
max_frame_size: u32,
last_heatbeat: Instant,
heartbeat_task: Option<JoinHandle<()>>,
}

#[async_trait::async_trait]
Expand All @@ -133,6 +137,7 @@ impl MessageHandler for Client {
match &item {
Some(Ok(response)) => match response.kind_ref() {
ResponseKind::Tunes(tune) => self.handle_tune_command(tune).await,
ResponseKind::Heartbeat(_) => self.handle_heart_beat_command().await,
_ => {
if let Some(handler) = self.state.read().await.handler.as_ref() {
let handler = handler.clone();
Expand Down Expand Up @@ -188,6 +193,8 @@ impl Client {
handler: None,
heartbeat: broker.heartbeat,
max_frame_size: broker.max_frame_size,
last_heatbeat: Instant::now(),
heartbeat_task: None,
};
let mut client = Client {
dispatcher,
Expand Down Expand Up @@ -228,6 +235,14 @@ impl Client {
CloseRequest::new(correlation_id, ResponseCode::Ok, "Ok".to_owned())
})
.await?;

let mut state = self.state.write().await;

if let Some(heartbeat_task) = state.heartbeat_task.take() {
heartbeat_task.abort();
}

drop(state);
self.channel.close().await
}
pub async fn subscribe(
Expand Down Expand Up @@ -451,10 +466,10 @@ impl Client {
Ok(())
}

fn max_value(&self, client: u32, server: u32) -> u32 {
fn negotiate_value(&self, client: u32, server: u32) -> u32 {
match (client, server) {
(client, server) if client == 0 || server == 0 => client.max(server),
(client, server) => client.max(server),
(client, server) => client.min(server),
}
}

Expand Down Expand Up @@ -543,11 +558,35 @@ impl Client {

async fn handle_tune_command(&self, tunes: &TunesCommand) {
let mut state = self.state.write().await;
state.heartbeat = self.max_value(self.opts.heartbeat, tunes.heartbeat);
state.max_frame_size = self.max_value(self.opts.max_frame_size, tunes.max_frame_size);
state.heartbeat = self.negotiate_value(self.opts.heartbeat, tunes.heartbeat);
state.max_frame_size = self.negotiate_value(self.opts.max_frame_size, tunes.max_frame_size);

let heart_beat = state.heartbeat;
let max_frame_size = state.max_frame_size;

trace!(
"Handling tune with frame size {} and heartbeat {}",
max_frame_size,
heart_beat
);

if let Some(task) = state.heartbeat_task.take() {
task.abort();
}

if heart_beat != 0 {
let heartbeat_interval = (heart_beat / 2).max(1);
let channel = self.channel.clone();
let heartbeat_task = tokio::spawn(async move {
loop {
trace!("Sending heartbeat");
let _ = channel.send(HeartBeatCommand::default().into()).await;
tokio::time::sleep(Duration::from_secs(heartbeat_interval.into())).await;
}
});
state.heartbeat_task = Some(heartbeat_task);
}

drop(state);

let _ = self
Expand All @@ -557,4 +596,10 @@ impl Client {

self.tune_notifier.notify_one();
}

async fn handle_heart_beat_command(&self) {
trace!("Received heartbeat");
let mut state = self.state.write().await;
state.last_heatbeat = Instant::now();
}
}
8 changes: 7 additions & 1 deletion src/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ impl Environment {

/// Delete a stream
pub async fn delete_stream(&self, stream: &str) -> Result<(), StreamDeleteError> {
let response = self.create_client().await?.delete_stream(stream).await?;
let client = self.create_client().await?;
let response = client.delete_stream(stream).await?;
client.close().await?;

if response.is_ok() {
Ok(())
Expand Down Expand Up @@ -122,6 +124,10 @@ impl EnvironmentBuilder {
self
}

pub fn heartbeat(mut self, heartbeat: u32) -> EnvironmentBuilder {
self.0.client_options.heartbeat = heartbeat;
self
}
pub fn metrics_collector(
mut self,
collector: impl MetricsCollector + Send + Sync + 'static,
Expand Down
6 changes: 5 additions & 1 deletion src/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ impl<T> ProducerBuilder<T> {
metadata.leader,
stream
);
client.close().await?;
client = Client::connect(ClientOptions {
host: metadata.leader.host.clone(),
port: metadata.leader.port as u16,
Expand Down Expand Up @@ -553,7 +554,10 @@ impl MessageHandler for ProducerConfirmHandler {
trace!(?error);
// TODO clean all waiting for confirm
}
None => todo!(),
None => {
trace!("Connection closed");
// TODO connection close clean all waiting
}
}
Ok(())
}
Expand Down

0 comments on commit dad1581

Please sign in to comment.