Skip to content

Commit

Permalink
Start sending heartbeat from client after tune command received.
Browse files Browse the repository at this point in the history
  • Loading branch information
akloboucnik committed Oct 6, 2022
1 parent 02db3cf commit fb44ccc
Showing 1 changed file with 30 additions and 2 deletions.
32 changes: 30 additions & 2 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,11 @@ use self::{
use std::{
collections::HashMap,
sync::{atomic::AtomicU64, Arc},
time::Duration,
};
use std::{future::Future, sync::atomic::Ordering};
use tokio::sync::RwLock;
use tokio::{net::TcpStream, sync::Notify};
use tokio::{net::TcpStream, sync::Notify, task::JoinHandle};
use tokio_util::codec::Framed;

type SinkConnection = SplitSink<Framed<TcpStream, RabbitMqStreamCodec>, Request>;
Expand All @@ -70,6 +71,7 @@ pub struct ClientState {
handler: Option<Arc<dyn MessageHandler>>,
heartbeat: u32,
max_frame_size: u32,
heartbeat_task: Option<Arc<JoinHandle<()>>>,
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -134,6 +136,7 @@ impl Client {
handler: None,
heartbeat: broker.heartbeat,
max_frame_size: broker.max_frame_size,
heartbeat_task: None,
};
let mut client = Client {
dispatcher,
Expand Down Expand Up @@ -166,6 +169,12 @@ impl Client {
}

pub async fn close(&self) -> RabbitMQStreamResult<()> {
let mut state = self.state.write().await;
if let Some(ref heartbeat_task) = state.heartbeat_task {
heartbeat_task.abort();
state.heartbeat_task = None
}

if self.channel.is_closed() {
return Err(ClientError::AlreadyClosed);
}
Expand Down Expand Up @@ -472,18 +481,37 @@ impl Client {

async fn handle_tune_command(&self, tunes: &TunesCommand) {
let mut state = self.state.write().await;
let old_heartbeat = state.heartbeat;
state.heartbeat = self.max_value(self.opts.heartbeat, tunes.heartbeat);
state.max_frame_size = self.max_value(self.opts.max_frame_size, tunes.max_frame_size);

let heart_beat = state.heartbeat;
let max_frame_size = state.max_frame_size;
drop(state);

let _ = self
.channel
.send(TunesCommand::new(max_frame_size, heart_beat).into())
.await;

// if hearbeat interval changed, abort the old heartbeat task and start a new one
if old_heartbeat != heart_beat {
if let Some(ref heartbeat_task) = state.heartbeat_task {
heartbeat_task.abort();
state.heartbeat_task = None
}
let c = self.channel.clone();
let heartbeat_interval = (heart_beat / 2).max(1);
let handle = tokio::task::spawn(async move {
loop {
trace!("sending heartbeat");
let _ = c.send(HeartBeatCommand::default().into()).await;
tokio::time::sleep(Duration::from_secs(heartbeat_interval.into())).await;
}
});
state.heartbeat_task = Some(Arc::new(handle));
}
drop(state);

self.tune_notifier.notify_one();
}

Expand Down

0 comments on commit fb44ccc

Please sign in to comment.