Skip to content

Commit

Permalink
Use io tags for logging
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed Jan 3, 2024
1 parent 82a549d commit f0fdd83
Show file tree
Hide file tree
Showing 16 changed files with 228 additions and 110 deletions.
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [0.8.8] - 2024-01-03

* Use io tags for logging

## [0.8.7] - 2023-12-04

* Fix overflow in Configuration::idle_timeout()
Expand Down
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-amqp"
version = "0.8.7"
version = "0.8.8"
authors = ["ntex contributors <[email protected]>"]
description = "AMQP 1.0 Client/Server framework"
documentation = "https://docs.rs/ntex-amqp"
Expand All @@ -24,8 +24,8 @@ default = []
frame-trace = []

[dependencies]
ntex = "0.7.12"
ntex-amqp-codec = "0.9.0"
ntex = "0.7.16"
ntex-amqp-codec = "0.9.1"

bitflags = "2.4"
derive_more = "0.99"
Expand Down
10 changes: 5 additions & 5 deletions codec/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
[package]
name = "ntex-amqp-codec"
version = "0.9.0"
version = "0.9.1"
description = "AMQP 1.0 Protocol Codec"
authors = ["Nikolay Kim <[email protected]>", "Max Gortman <[email protected]>", "Mike Yagley <[email protected]>"]
license = "MIT/Apache-2.0"
edition = "2018"

[dependencies]
ntex-bytes = "0.1.14"
ntex-bytes = "0.1.21"
ntex-codec = "0.6.2"
byteorder = "1.4"
byteorder = "1.5"
fxhash = "0.2.1"
chrono = { version = "0.4", default-features = false }
derive_more = "0.99"
ordered-float = "2.5"
uuid = { version = "1.2", features = ["v4"] }
ordered-float = "4.2"
uuid = { version = "1", features = ["v4"] }

[build-dependencies]
handlebars = { version = "0.27", optional = true }
Expand Down
2 changes: 1 addition & 1 deletion codec/src/message/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl Message {
/// Add property
pub fn set_properties<F>(&mut self, f: F) -> &mut Self
where
F: Fn(&mut Properties),
F: FnOnce(&mut Properties),
{
if let Some(ref mut props) = self.0.properties {
f(props);
Expand Down
6 changes: 6 additions & 0 deletions codec/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,12 @@ impl From<String> for Str {
}
}

impl<'a> From<&'a ByteString> for Str {
fn from(s: &'a ByteString) -> Str {
Str::ByteStr(s.clone())
}
}

impl hash::Hash for Str {
fn hash<H: hash::Hasher>(&self, state: &mut H) {
match self {
Expand Down
20 changes: 10 additions & 10 deletions src/client/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ impl<A> Connector<A> {
/// Create new amqp connector
pub fn new() -> Connector<A, connect::Connector<A>> {
Connector {
connector: Pipeline::new(connect::Connector::default()),
connector: Pipeline::new(connect::Connector::default().tag("AMQP-CLIENT")),
config: Configuration::default(),
pool: PoolId::P6.pool_ref(),
_t: PhantomData,
Expand Down Expand Up @@ -149,7 +149,7 @@ where

/// Negotiate amqp protocol over opened socket
pub fn negotiate(&self, io: IoBoxed) -> impl Future<Output = Result<Client, ConnectError>> {
trace!("Negotiation client protocol id: Amqp");
log::trace!("{}: Negotiation client protocol id: Amqp", io.tag());

io.set_memory_pool(self.pool);
_connect_plain(io, self.config.clone())
Expand Down Expand Up @@ -184,7 +184,7 @@ where
io: IoBoxed,
auth: SaslAuth,
) -> impl Future<Output = Result<Client, ConnectError>> {
trace!("Negotiation client protocol id: Amqp");
log::trace!("{}: Negotiation client protocol id: Amqp", io.tag());

let config = self.config.clone();
io.set_memory_pool(self.pool);
Expand All @@ -209,12 +209,12 @@ async fn _connect_sasl(
auth: SaslAuth,
config: Configuration,
) -> Result<Client, ConnectError> {
trace!("Negotiation client protocol id: AmqpSasl");
log::trace!("{}: Negotiation client protocol id: AmqpSasl", io.tag());

io.send(ProtocolId::AmqpSasl, &ProtocolIdCodec).await?;

let proto = io.recv(&ProtocolIdCodec).await?.ok_or_else(|| {
log::trace!("Amqp server is disconnected during handshake");
log::trace!("{}: Amqp server is disconnected during handshake", io.tag());
ConnectError::Disconnected
})?;

Expand Down Expand Up @@ -259,7 +259,7 @@ async fn _connect_sasl(
}

async fn _connect_plain(io: IoBoxed, config: Configuration) -> Result<Client, ConnectError> {
trace!("Negotiation client protocol id: Amqp");
log::trace!("{}: Negotiation client protocol id: Amqp", io.tag());

io.send(ProtocolId::Amqp, &ProtocolIdCodec).await?;

Expand All @@ -268,7 +268,7 @@ async fn _connect_plain(io: IoBoxed, config: Configuration) -> Result<Client, Co
.await
.map_err(ConnectError::from)?
.ok_or_else(|| {
log::trace!("Amqp server is disconnected during handshake");
log::trace!("{}: Amqp server is disconnected during handshake", io.tag());
ConnectError::Disconnected
})?;

Expand All @@ -282,17 +282,17 @@ async fn _connect_plain(io: IoBoxed, config: Configuration) -> Result<Client, Co
let open = config.to_open();
let codec = AmqpCodec::<AmqpFrame>::new().max_size(config.max_frame_size as usize);

trace!("Open client amqp connection: {:?}", open);
log::trace!("{}: Open client amqp connection: {:?}", io.tag(), open);
io.send(AmqpFrame::new(0, Frame::Open(open)), &codec)
.await?;

let frame = io.recv(&codec).await?.ok_or_else(|| {
log::trace!("Amqp server is disconnected during handshake");
log::trace!("{}: Amqp server is disconnected during handshake", io.tag());
ConnectError::Disconnected
})?;

if let Frame::Open(open) = frame.performative() {
trace!("Open confirmed: {:?}", open);
log::trace!("{}: Open confirmed: {:?}", io.tag(), open);
let remote_config = config.from_remote(open);
let connection = Connection::new(io.get_ref(), &config, &remote_config);
let client = Client::new(io, codec, connection, remote_config);
Expand Down
73 changes: 50 additions & 23 deletions src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ impl Connection {
}))
}

#[inline]
/// Get io tag for current connection
pub fn tag(&self) -> &'static str {
self.0.get_ref().io.tag()
}

#[inline]
/// Force close connection
pub fn force_close(&self) {
Expand Down Expand Up @@ -139,7 +145,7 @@ impl Connection {
let inner = inner.get_mut();

if let Some(ref e) = inner.error {
log::error!("Connection is in error state: {:?}", e);
log::error!("{}: Connection is in error state: {:?}", inner.io.tag(), e);
Err(e.clone())
} else {
let (tx, rx) = oneshot::channel();
Expand All @@ -148,7 +154,7 @@ impl Connection {
let token = entry.key();

if token >= inner.channel_max as usize {
log::trace!("Too many channels: {:?}", token);
log::trace!("{}: Too many channels: {:?}", inner.io.tag(), token);
Err(AmqpProtocolError::TooManyChannels)
} else {
entry.insert(SessionState::Opening(Some(tx), cell));
Expand Down Expand Up @@ -180,10 +186,11 @@ impl Connection {
}

pub(crate) fn post_frame(&self, frame: AmqpFrame) {
let inner = self.0.get_mut();

#[cfg(feature = "frame-trace")]
log::trace!("outgoing: {:#?}", frame);
log::trace!("{}: outgoing: {:#?}", inner.io.tag(), frame);

let inner = self.0.get_mut();
if let Err(e) = inner.io.encode(frame, &inner.codec) {
inner.set_error(e.into())
}
Expand All @@ -208,7 +215,7 @@ impl ConnectionInner {
}

pub(crate) fn set_error(&mut self, err: AmqpProtocolError) {
log::trace!("Set connection error: {:?}", err);
log::trace!("{}: Set connection error: {:?}", self.io.tag(), err);
for (_, channel) in self.sessions.iter_mut() {
match channel {
SessionState::Opening(_, _) | SessionState::Closing(_) => (),
Expand All @@ -227,7 +234,7 @@ impl ConnectionInner {

pub(crate) fn post_frame(&mut self, frame: AmqpFrame) {
#[cfg(feature = "frame-trace")]
log::trace!("outgoing: {:#?}", frame);
log::trace!("{}: outgoing: {:#?}", self.io.tag(), frame);

if let Err(e) = self.io.encode(frame, &self.codec) {
self.set_error(e.into())
Expand All @@ -240,7 +247,11 @@ impl ConnectionInner {
begin: &Begin,
cell: &Cell<ConnectionInner>,
) -> Result<(), AmqpProtocolError> {
trace!("Remote session opened: {:?}", remote_channel_id);
log::trace!(
"{}: Remote session opened: {:?}",
self.io.tag(),
remote_channel_id
);

let entry = self.sessions.vacant_entry();
let local_token = entry.key();
Expand Down Expand Up @@ -283,8 +294,9 @@ impl ConnectionInner {
remote_channel_id: u16,
begin: &Begin,
) {
trace!(
"Begin response received: local {:?} remote {:?}",
log::trace!(
"{}: Begin response received: local {:?} remote {:?}",
self.io.tag(),
local_channel_id,
remote_channel_id,
);
Expand All @@ -310,21 +322,24 @@ impl ConnectionInner {
.and_then(|tx| tx.send(Session::new(session.clone())).err());
*channel = SessionState::Established(session);

trace!(
"Session established: local {:?} remote {:?}",
log::trace!(
"{}: Session established: local {:?} remote {:?}",
self.io.tag(),
local_channel_id,
remote_channel_id,
);
}
} else {
// TODO: send error response
error!("Begin received for channel not in opening state. local channel: {} (remote channel: {})", local_channel_id, remote_channel_id);
log::error!("{}: Begin received for channel not in opening state. local channel: {} (remote channel: {})", self.io.tag(), local_channel_id, remote_channel_id);
}
} else {
// TODO: rogue begin right now - do nothing. in future might indicate incoming attach
error!(
"Begin received for unknown local channel: {} (remote channel: {})",
local_channel_id, remote_channel_id
log::error!(
"{}: Begin received for unknown local channel: {} (remote channel: {})",
self.io.tag(),
local_channel_id,
remote_channel_id
);
}
}
Expand All @@ -342,11 +357,11 @@ impl ConnectionInner {
}
Frame::Close(close) => {
if self.state == ConnectionState::Closing {
log::trace!("Connection closed: {:?}", close);
log::trace!("{}: Connection closed: {:?}", self.io.tag(), close);
self.set_error(AmqpProtocolError::Disconnected);
return Ok(Action::None);
} else {
log::trace!("Connection closed remotely: {:?}", close);
log::trace!("{}: Connection closed remotely: {:?}", self.io.tag(), close);
let err = AmqpProtocolError::Closed(close.error);
self.set_error(err.clone());
let close = Close { error: None };
Expand All @@ -370,7 +385,11 @@ impl ConnectionInner {
}

if self.error.is_some() {
error!("Connection closed but new framed is received: {:?}", frame);
log::error!(
"{}: Connection closed but new framed is received: {:?}",
self.io.tag(),
frame
);
return Ok(Action::None);
}

Expand All @@ -379,7 +398,7 @@ impl ConnectionInner {
if let Some(state) = self.sessions.get_mut(*token) {
state
} else {
log::error!("Inconsistent internal state");
log::error!("{}: Inconsistent internal state", self.io.tag());
return Err(AmqpProtocolError::UnknownSession(frame));
}
} else {
Expand All @@ -389,7 +408,11 @@ impl ConnectionInner {
// handle session frames
match state {
SessionState::Opening(_, _) => {
error!("Unexpected opening state: {}", channel_id);
log::error!(
"{}: Unexpected opening state: {}",
self.io.tag(),
channel_id
);
Err(AmqpProtocolError::UnexpectedOpeningState(frame))
}
SessionState::Established(ref mut session) => match frame {
Expand Down Expand Up @@ -420,7 +443,7 @@ impl ConnectionInner {
}
}
Frame::End(remote_end) => {
trace!("Remote session end: {}", channel_id);
log::trace!("{}: Remote session end: {}", self.io.tag(), channel_id);
let id = session.get_mut().id();
let action = session
.get_mut()
Expand All @@ -435,7 +458,7 @@ impl ConnectionInner {
},
SessionState::Closing(ref mut session) => match frame {
Frame::End(frm) => {
trace!("Session end is confirmed: {:?}", frm);
log::trace!("{}: Session end is confirmed: {:?}", self.io.tag(), frm);
let _ = session
.get_mut()
.end(AmqpProtocolError::SessionEnded(frm.error));
Expand All @@ -445,7 +468,11 @@ impl ConnectionInner {
Ok(Action::None)
}
frm => {
trace!("Got frame after initiated session end: {:?}", frm);
log::trace!(
"{}: Got frame after initiated session end: {:?}",
self.io.tag(),
frm
);
Ok(Action::None)
}
},
Expand Down
Loading

0 comments on commit f0fdd83

Please sign in to comment.