Skip to content

Commit

Permalink
Merge pull request #76 from spiralover/refactor/app
Browse files Browse the repository at this point in the history
feat(rabbitmq): "close" method to close connection
  • Loading branch information
Ahmard authored Oct 3, 2024
2 parents eabab34 + 09b79f5 commit 06c4302
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 5 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
# Medullah Changelog
medullah-web changelog file

## 0.20.7 (2024-10-03)
* feat(rabbitmq): "close" method to close connection
* feat(rabbitmq): acquire connection pool in use by this instance

## 0.20.6 (2024-09-25)
* bump(crates): to their respective latest versions

Expand Down
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "medullah-web"
version = "0.20.6"
version = "0.20.7"
edition = "2021"
license = "MIT"
description = "Micro-Framework Base on Ntex"
Expand Down
31 changes: 28 additions & 3 deletions src/rabbitmq/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use futures_util::StreamExt;
use lapin::types::FieldTable;
use lapin::{BasicProperties, Channel, ChannelState};
use lapin::types::{FieldTable, ReplyCode};
use lapin::{BasicProperties, Channel, ChannelState, ConnectionState};
use log::{error, info, warn};
use std::future::Future;
use std::time::Duration;
Expand All @@ -13,7 +13,7 @@ pub use {
lapin::{options::*, ExchangeKind},
};

use crate::prelude::{AppResult, OnceLockHelper};
use crate::prelude::{AppMessage, AppResult, OnceLockHelper};
pub use crate::rabbitmq::message::Message;
use crate::MEDULLAH;

Expand All @@ -25,6 +25,8 @@ pub struct RabbitMQ {
conn_pool: deadpool_lapin::Pool,
publish_channel: Channel,
consume_channel: Channel,
/// helps determine if the connection can be reconnected
can_reconnect: bool,
/// automatically nack a message if the handler returns an error.
nack_on_failure: bool,
/// whether to requeue a message if the handler returns an error.
Expand Down Expand Up @@ -79,6 +81,7 @@ impl RabbitMQ {
conn_pool: pool,
publish_channel,
consume_channel,
can_reconnect: true,
nack_on_failure: opt.nack_on_failure,
requeue_on_failure: opt.requeue_on_failure,
execute_handler_asynchronously: opt.execute_handler_asynchronously,
Expand Down Expand Up @@ -277,6 +280,22 @@ impl RabbitMQ {
Ok(())
}

/// Request a connection close.
///
/// This method is only successful if the connection is in the connected state,
/// otherwise an [`InvalidConnectionState`] error is returned.
///
pub async fn close(&mut self, reply_code: ReplyCode, reply_text: &str) -> AppResult<()> {
let connection = self.conn_pool.get().await?;
self.can_reconnect = false;
Ok(connection.close(reply_code, reply_text).await?)
}

/// Acquire connection pool in use by this instance
pub fn connection_pool(&self) -> deadpool_lapin::Pool {
self.conn_pool.clone()
}

async fn ensure_channel_is_usable(&mut self, is_publish_channel: bool) -> AppResult<()> {
loop {
let channel = match is_publish_channel {
Expand Down Expand Up @@ -305,6 +324,12 @@ impl RabbitMQ {
}

async fn recreate_channel(&mut self, is_publish_channel: bool) -> AppResult<()> {
if !self.can_reconnect {
return Err(AppMessage::RabbitmqError(
lapin::Error::InvalidConnectionState(ConnectionState::Closed),
));
}

let connection = self.conn_pool.get().await?;

sleep(Self::RETRY_DELAY).await;
Expand Down

0 comments on commit 06c4302

Please sign in to comment.