diff --git a/CHANGES.md b/CHANGES.md index cb2f3a0..f96e8ae 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [0.8.8] - 2024-01-03 + +* Use io tags for logging + ## [0.8.7] - 2023-12-04 * Fix overflow in Configuration::idle_timeout() diff --git a/Cargo.toml b/Cargo.toml index 9526877..20e336b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-amqp" -version = "0.8.7" +version = "0.8.8" authors = ["ntex contributors "] description = "AMQP 1.0 Client/Server framework" documentation = "https://docs.rs/ntex-amqp" @@ -24,8 +24,8 @@ default = [] frame-trace = [] [dependencies] -ntex = "0.7.12" -ntex-amqp-codec = "0.9.0" +ntex = "0.7.16" +ntex-amqp-codec = "0.9.1" bitflags = "2.4" derive_more = "0.99" diff --git a/codec/Cargo.toml b/codec/Cargo.toml index d8447c8..af64af0 100644 --- a/codec/Cargo.toml +++ b/codec/Cargo.toml @@ -1,20 +1,20 @@ [package] name = "ntex-amqp-codec" -version = "0.9.0" +version = "0.9.1" description = "AMQP 1.0 Protocol Codec" authors = ["Nikolay Kim ", "Max Gortman ", "Mike Yagley "] license = "MIT/Apache-2.0" edition = "2018" [dependencies] -ntex-bytes = "0.1.14" +ntex-bytes = "0.1.21" ntex-codec = "0.6.2" -byteorder = "1.4" +byteorder = "1.5" fxhash = "0.2.1" chrono = { version = "0.4", default-features = false } derive_more = "0.99" -ordered-float = "2.5" -uuid = { version = "1.2", features = ["v4"] } +ordered-float = "4.2" +uuid = { version = "1", features = ["v4"] } [build-dependencies] handlebars = { version = "0.27", optional = true } diff --git a/codec/src/message/message.rs b/codec/src/message/message.rs index 29db334..a9bf6b1 100644 --- a/codec/src/message/message.rs +++ b/codec/src/message/message.rs @@ -87,7 +87,7 @@ impl Message { /// Add property pub fn set_properties(&mut self, f: F) -> &mut Self where - F: Fn(&mut Properties), + F: FnOnce(&mut Properties), { if let Some(ref mut props) = self.0.properties { f(props); diff --git a/codec/src/types/mod.rs b/codec/src/types/mod.rs index 03a23dc..4ad7503 100644 --- a/codec/src/types/mod.rs +++ b/codec/src/types/mod.rs @@ -136,6 +136,12 @@ impl From for Str { } } +impl<'a> From<&'a ByteString> for Str { + fn from(s: &'a ByteString) -> Str { + Str::ByteStr(s.clone()) + } +} + impl hash::Hash for Str { fn hash(&self, state: &mut H) { match self { diff --git a/src/client/connector.rs b/src/client/connector.rs index 416e50f..fa4c5ef 100644 --- a/src/client/connector.rs +++ b/src/client/connector.rs @@ -25,7 +25,7 @@ impl Connector { /// Create new amqp connector pub fn new() -> Connector> { Connector { - connector: Pipeline::new(connect::Connector::default()), + connector: Pipeline::new(connect::Connector::default().tag("AMQP-CLIENT")), config: Configuration::default(), pool: PoolId::P6.pool_ref(), _t: PhantomData, @@ -149,7 +149,7 @@ where /// Negotiate amqp protocol over opened socket pub fn negotiate(&self, io: IoBoxed) -> impl Future> { - trace!("Negotiation client protocol id: Amqp"); + log::trace!("{}: Negotiation client protocol id: Amqp", io.tag()); io.set_memory_pool(self.pool); _connect_plain(io, self.config.clone()) @@ -184,7 +184,7 @@ where io: IoBoxed, auth: SaslAuth, ) -> impl Future> { - trace!("Negotiation client protocol id: Amqp"); + log::trace!("{}: Negotiation client protocol id: Amqp", io.tag()); let config = self.config.clone(); io.set_memory_pool(self.pool); @@ -209,12 +209,12 @@ async fn _connect_sasl( auth: SaslAuth, config: Configuration, ) -> Result { - trace!("Negotiation client protocol id: AmqpSasl"); + log::trace!("{}: Negotiation client protocol id: AmqpSasl", io.tag()); io.send(ProtocolId::AmqpSasl, &ProtocolIdCodec).await?; let proto = io.recv(&ProtocolIdCodec).await?.ok_or_else(|| { - log::trace!("Amqp server is disconnected during handshake"); + log::trace!("{}: Amqp server is disconnected during handshake", io.tag()); ConnectError::Disconnected })?; @@ -259,7 +259,7 @@ async fn _connect_sasl( } async fn _connect_plain(io: IoBoxed, config: Configuration) -> Result { - trace!("Negotiation client protocol id: Amqp"); + log::trace!("{}: Negotiation client protocol id: Amqp", io.tag()); io.send(ProtocolId::Amqp, &ProtocolIdCodec).await?; @@ -268,7 +268,7 @@ async fn _connect_plain(io: IoBoxed, config: Configuration) -> Result Result::new().max_size(config.max_frame_size as usize); - trace!("Open client amqp connection: {:?}", open); + log::trace!("{}: Open client amqp connection: {:?}", io.tag(), open); io.send(AmqpFrame::new(0, Frame::Open(open)), &codec) .await?; let frame = io.recv(&codec).await?.ok_or_else(|| { - log::trace!("Amqp server is disconnected during handshake"); + log::trace!("{}: Amqp server is disconnected during handshake", io.tag()); ConnectError::Disconnected })?; if let Frame::Open(open) = frame.performative() { - trace!("Open confirmed: {:?}", open); + log::trace!("{}: Open confirmed: {:?}", io.tag(), open); let remote_config = config.from_remote(open); let connection = Connection::new(io.get_ref(), &config, &remote_config); let client = Client::new(io, codec, connection, remote_config); diff --git a/src/connection.rs b/src/connection.rs index a02c712..9aa19ce 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -67,6 +67,12 @@ impl Connection { })) } + #[inline] + /// Get io tag for current connection + pub fn tag(&self) -> &'static str { + self.0.get_ref().io.tag() + } + #[inline] /// Force close connection pub fn force_close(&self) { @@ -139,7 +145,7 @@ impl Connection { let inner = inner.get_mut(); if let Some(ref e) = inner.error { - log::error!("Connection is in error state: {:?}", e); + log::error!("{}: Connection is in error state: {:?}", inner.io.tag(), e); Err(e.clone()) } else { let (tx, rx) = oneshot::channel(); @@ -148,7 +154,7 @@ impl Connection { let token = entry.key(); if token >= inner.channel_max as usize { - log::trace!("Too many channels: {:?}", token); + log::trace!("{}: Too many channels: {:?}", inner.io.tag(), token); Err(AmqpProtocolError::TooManyChannels) } else { entry.insert(SessionState::Opening(Some(tx), cell)); @@ -180,10 +186,11 @@ impl Connection { } pub(crate) fn post_frame(&self, frame: AmqpFrame) { + let inner = self.0.get_mut(); + #[cfg(feature = "frame-trace")] - log::trace!("outgoing: {:#?}", frame); + log::trace!("{}: outgoing: {:#?}", inner.io.tag(), frame); - let inner = self.0.get_mut(); if let Err(e) = inner.io.encode(frame, &inner.codec) { inner.set_error(e.into()) } @@ -208,7 +215,7 @@ impl ConnectionInner { } pub(crate) fn set_error(&mut self, err: AmqpProtocolError) { - log::trace!("Set connection error: {:?}", err); + log::trace!("{}: Set connection error: {:?}", self.io.tag(), err); for (_, channel) in self.sessions.iter_mut() { match channel { SessionState::Opening(_, _) | SessionState::Closing(_) => (), @@ -227,7 +234,7 @@ impl ConnectionInner { pub(crate) fn post_frame(&mut self, frame: AmqpFrame) { #[cfg(feature = "frame-trace")] - log::trace!("outgoing: {:#?}", frame); + log::trace!("{}: outgoing: {:#?}", self.io.tag(), frame); if let Err(e) = self.io.encode(frame, &self.codec) { self.set_error(e.into()) @@ -240,7 +247,11 @@ impl ConnectionInner { begin: &Begin, cell: &Cell, ) -> Result<(), AmqpProtocolError> { - trace!("Remote session opened: {:?}", remote_channel_id); + log::trace!( + "{}: Remote session opened: {:?}", + self.io.tag(), + remote_channel_id + ); let entry = self.sessions.vacant_entry(); let local_token = entry.key(); @@ -283,8 +294,9 @@ impl ConnectionInner { remote_channel_id: u16, begin: &Begin, ) { - trace!( - "Begin response received: local {:?} remote {:?}", + log::trace!( + "{}: Begin response received: local {:?} remote {:?}", + self.io.tag(), local_channel_id, remote_channel_id, ); @@ -310,21 +322,24 @@ impl ConnectionInner { .and_then(|tx| tx.send(Session::new(session.clone())).err()); *channel = SessionState::Established(session); - trace!( - "Session established: local {:?} remote {:?}", + log::trace!( + "{}: Session established: local {:?} remote {:?}", + self.io.tag(), local_channel_id, remote_channel_id, ); } } else { // TODO: send error response - error!("Begin received for channel not in opening state. local channel: {} (remote channel: {})", local_channel_id, remote_channel_id); + log::error!("{}: Begin received for channel not in opening state. local channel: {} (remote channel: {})", self.io.tag(), local_channel_id, remote_channel_id); } } else { // TODO: rogue begin right now - do nothing. in future might indicate incoming attach - error!( - "Begin received for unknown local channel: {} (remote channel: {})", - local_channel_id, remote_channel_id + log::error!( + "{}: Begin received for unknown local channel: {} (remote channel: {})", + self.io.tag(), + local_channel_id, + remote_channel_id ); } } @@ -342,11 +357,11 @@ impl ConnectionInner { } Frame::Close(close) => { if self.state == ConnectionState::Closing { - log::trace!("Connection closed: {:?}", close); + log::trace!("{}: Connection closed: {:?}", self.io.tag(), close); self.set_error(AmqpProtocolError::Disconnected); return Ok(Action::None); } else { - log::trace!("Connection closed remotely: {:?}", close); + log::trace!("{}: Connection closed remotely: {:?}", self.io.tag(), close); let err = AmqpProtocolError::Closed(close.error); self.set_error(err.clone()); let close = Close { error: None }; @@ -370,7 +385,11 @@ impl ConnectionInner { } if self.error.is_some() { - error!("Connection closed but new framed is received: {:?}", frame); + log::error!( + "{}: Connection closed but new framed is received: {:?}", + self.io.tag(), + frame + ); return Ok(Action::None); } @@ -379,7 +398,7 @@ impl ConnectionInner { if let Some(state) = self.sessions.get_mut(*token) { state } else { - log::error!("Inconsistent internal state"); + log::error!("{}: Inconsistent internal state", self.io.tag()); return Err(AmqpProtocolError::UnknownSession(frame)); } } else { @@ -389,7 +408,11 @@ impl ConnectionInner { // handle session frames match state { SessionState::Opening(_, _) => { - error!("Unexpected opening state: {}", channel_id); + log::error!( + "{}: Unexpected opening state: {}", + self.io.tag(), + channel_id + ); Err(AmqpProtocolError::UnexpectedOpeningState(frame)) } SessionState::Established(ref mut session) => match frame { @@ -420,7 +443,7 @@ impl ConnectionInner { } } Frame::End(remote_end) => { - trace!("Remote session end: {}", channel_id); + log::trace!("{}: Remote session end: {}", self.io.tag(), channel_id); let id = session.get_mut().id(); let action = session .get_mut() @@ -435,7 +458,7 @@ impl ConnectionInner { }, SessionState::Closing(ref mut session) => match frame { Frame::End(frm) => { - trace!("Session end is confirmed: {:?}", frm); + log::trace!("{}: Session end is confirmed: {:?}", self.io.tag(), frm); let _ = session .get_mut() .end(AmqpProtocolError::SessionEnded(frm.error)); @@ -445,7 +468,11 @@ impl ConnectionInner { Ok(Action::None) } frm => { - trace!("Got frame after initiated session end: {:?}", frm); + log::trace!( + "{}: Got frame after initiated session end: {:?}", + self.io.tag(), + frm + ); Ok(Action::None) } }, diff --git a/src/dispatcher.rs b/src/dispatcher.rs index b012af1..3f16c6e 100644 --- a/src/dispatcher.rs +++ b/src/dispatcher.rs @@ -71,7 +71,8 @@ where fn handle_idle_timeout(&self, cx: &mut Context<'_>) { if self.idle_timeout.non_zero() && self.expire.poll_elapsed(cx).is_ready() { log::trace!( - "Send keep-alive ping, timeout: {:?} secs", + "{}: Send keep-alive ping, timeout: {:?} secs", + self.sink.tag(), self.idle_timeout ); self.sink.post_frame(AmqpFrame::new(0, Frame::Empty)); @@ -217,14 +218,22 @@ where // check readiness let service_poll = self.service.poll_ready(cx).map_err(|err| { let err = Error::from(err); - error!("Publish service readiness check failed: {:?}", err); + log::error!( + "{}: Publish service readiness check failed: {:?}", + self.sink.tag(), + err + ); let _ = self.sink.close_with_error(err); AmqpDispatcherError::Service })?; let ctl_service_poll = self.ctl_service.poll_ready(cx).map_err(|err| { let err = Error::from(err); - error!("Control service readiness check failed: {:?}", err); + log::error!( + "{}: Control service readiness check failed: {:?}", + self.sink.tag(), + err + ); let _ = self.sink.close_with_error(err); AmqpDispatcherError::Service })?; @@ -280,7 +289,7 @@ where match request { DispatchItem::Item(frame) => { #[cfg(feature = "frame-trace")] - log::trace!("incoming: {:#?}", frame); + log::trace!("{}: incoming: {:#?}", self.sink.tag(), frame); let action = match self .sink diff --git a/src/lib.rs b/src/lib.rs index 1e0f242..3fbd9dc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,8 +3,6 @@ #[macro_use] extern crate derive_more; -#[macro_use] -extern crate log; use ntex::{io::DispatcherConfig, time::Seconds, util::ByteString}; use ntex_amqp_codec::protocol::{Handle, Milliseconds, Open, OpenInner}; diff --git a/src/rcvlink.rs b/src/rcvlink.rs index 222f04a..9da75f0 100644 --- a/src/rcvlink.rs +++ b/src/rcvlink.rs @@ -136,8 +136,9 @@ impl ReceiverLink { pub(crate) fn remote_closed(&self, error: Option) { let inner = self.inner.get_mut(); - trace!( - "Receiver link has been closed remotely handle: {:?} name: {:?}", + log::trace!( + "{}: Receiver link has been closed remotely handle: {:?} name: {:?}", + inner.session.tag(), inner.remote_handle, inner.name ); @@ -360,7 +361,7 @@ impl ReceiverLinkInner { inner: inner.clone(), })) } else { - log::error!("Inconsistent state, bug"); + log::error!("{}: Inconsistent state, bug", self.session.tag()); let err = Error(Box::new(codec::ErrorInner { condition: LinkError::DetachForced.into(), description: Some(ByteString::from_static("Internal error")), diff --git a/src/router.rs b/src/router.rs index fb171c0..579452f 100644 --- a/src/router.rs +++ b/src/router.rs @@ -95,7 +95,7 @@ impl Service for RouterService { let inner = self.0.get_mut(); let mut link = Link::new(frm, link, inner.state.clone(), path); if let Some((hnd, _info)) = inner.router.recognize(link.path_mut()) { - trace!("Create handler service for {}", link.path().get_ref()); + log::trace!("Create handler service for {}", link.path().get_ref()); inner.handlers.insert(link.receiver().clone(), None); let rcv_link = link.link.clone(); let fut = hnd.create(link); @@ -105,7 +105,7 @@ impl Service for RouterService { state: RouterServiceResponseState::NewService(fut), }) } else { - trace!( + log::trace!( "Target address is not recognized: {}", link.path().get_ref() ); @@ -128,11 +128,11 @@ impl Service for RouterService { } Message::Detached(link) => { if let Some(Some(srv)) = self.0.get_mut().handlers.remove(&link) { - trace!("Releasing handler service for {}", link.name()); + log::trace!("Releasing handler service for {}", link.name()); let name = link.name().clone(); ntex::rt::spawn(async move { ntex::util::poll_fn(move |cx| srv.poll_shutdown(cx)).await; - trace!("Handler service for {} has shutdown", name); + log::trace!("Handler service for {} has shutdown", name); }); } Either::Left(Ready::Ok(())) @@ -143,7 +143,7 @@ impl Service for RouterService { .filter_map(|link| { self.0.get_mut().handlers.remove(&link).and_then(|srv| { srv.map(|srv| { - trace!( + log::trace!( "Releasing handler service for {} (session ended)", link.name() ); @@ -153,7 +153,7 @@ impl Service for RouterService { }) .collect(); - trace!( + log::trace!( "Shutting down {} handler services (session ended)", futs.len() ); @@ -161,7 +161,7 @@ impl Service for RouterService { ntex::rt::spawn(async move { let len = futs.len(); let _ = join_all(futs).await; - trace!( + log::trace!( "Handler services for {} links have shutdown (session ended)", len ); diff --git a/src/server/handshake.rs b/src/server/handshake.rs index 5c91c3e..0b031ad 100644 --- a/src/server/handshake.rs +++ b/src/server/handshake.rs @@ -54,14 +54,17 @@ impl HandshakeAmqp { let codec = AmqpCodec::::new(); let frame = state.recv(&codec).await?.ok_or_else(|| { - log::trace!("Server amqp is disconnected during open frame"); + log::trace!( + "{}: Server amqp is disconnected during open frame", + state.tag() + ); HandshakeError::Disconnected(None) })?; let frame = frame.into_parts().1; match frame { Frame::Open(frame) => { - trace!("Got open frame: {:?}", frame); + log::trace!("{}: Got open frame: {:?}", state.tag(), frame); let remote_config = local_config.from_remote(&frame); let sink = Connection::new(state.get_ref(), &local_config, &remote_config); Ok(HandshakeAmqpOpened { diff --git a/src/server/sasl.rs b/src/server/sasl.rs index d5b46fc..25486c9 100644 --- a/src/server/sasl.rs +++ b/src/server/sasl.rs @@ -266,7 +266,7 @@ impl SaslSuccess { let frame = frame.into_parts().1; match frame { protocol::Frame::Open(frame) => { - trace!("Got open frame: {:?}", frame); + log::trace!("{}: Got open frame: {:?}", state.tag(), frame); let local_config = self.local_config; let remote_config = local_config.from_remote(&frame); diff --git a/src/server/service.rs b/src/server/service.rs index c700d5f..8057a03 100644 --- a/src/server/service.rs +++ b/src/server/service.rs @@ -207,13 +207,13 @@ where // create publish service let pb_srv = inner.publish.pipeline(st.clone()).await.map_err(|e| { - error!("Publish service init error: {:?}", e); + log::error!("Publish service init error: {:?}", e); ServerError::PublishServiceError })?; // create control service let ctl_srv = inner.control.pipeline(st.clone()).await.map_err(|e| { - error!("Control service init error: {:?}", e); + log::error!("Control service init error: {:?}", e); ServerError::ControlServiceError })?; @@ -290,7 +290,10 @@ where .await .map_err(HandshakeError::from)? .ok_or_else(|| { - log::trace!("Server amqp is disconnected during handshake"); + log::trace!( + "{}: Server amqp is disconnected during handshake", + state.tag() + ); HandshakeError::Disconnected(None) })?; diff --git a/src/session.rs b/src/session.rs index 4ffb5ac..15897dc 100644 --- a/src/session.rs +++ b/src/session.rs @@ -35,6 +35,12 @@ impl Session { Session { inner } } + #[inline] + /// Get io tag for current connection + pub fn tag(&self) -> &'static str { + self.inner.get_ref().sink.tag() + } + #[inline] pub fn connection(&self) -> &Connection { &self.inner.get_ref().sink @@ -159,7 +165,7 @@ impl Session { Err(e) } Err(_) => { - log::trace!("Cannot complete detach receiver link, connection is gone"); + log::trace!("Cannot complete detach receiver link, connection is gone",); Err(AmqpProtocolError::Disconnected) } } @@ -327,13 +333,21 @@ impl SessionInner { self.id as u16 } + pub(crate) fn tag(&self) -> &'static str { + self.sink.tag() + } + pub(crate) fn memory_pool(&self) -> PoolRef { self.sink.0.memory_pool() } /// Set error. New operations will return error. pub(crate) fn set_error(&mut self, err: AmqpProtocolError) { - log::trace!("Connection is failed, dropping state: {:?}", err); + log::trace!( + "{}: Connection is failed, dropping state: {:?}", + self.tag(), + err + ); // drop pending transfers for tr in self.pending_transfers.drain(..) { @@ -377,7 +391,7 @@ impl SessionInner { /// End session. pub(crate) fn end(&mut self, err: AmqpProtocolError) -> Action { - log::trace!("Session is ended: {:?}", err); + log::trace!("{}: Session is ended: {:?}", self.tag(), err); let links = self.get_all_links(); self.set_error(err); @@ -429,8 +443,9 @@ impl SessionInner { let entry = self.links.vacant_entry(); let token = entry.key(); entry.insert(Either::Left(SenderLinkState::Opening(Some(tx)))); - trace!( - "Local sender link opening: {:?} hnd:{:?}", + log::trace!( + "{}: Local sender link opening: {:?} hnd:{:?}", + self.tag(), frame.name(), token ); @@ -448,7 +463,11 @@ impl SessionInner { attach: &Attach, link: Cell, ) -> SenderLink { - trace!("Remote sender link attached: {:?}", attach.name()); + log::trace!( + "{}: Remote sender link attached: {:?}", + self.tag(), + attach.name() + ); let token = link.id; if let Some(source) = attach.source() { @@ -521,16 +540,28 @@ impl SessionInner { } SenderLinkState::OpeningRemote => { let _ = tx.send(Ok(())); - error!("Unexpected sender link state: opening remote - {}", id); + log::error!( + "{}: Unexpected sender link state: opening remote - {}", + self.tag(), + id + ); } SenderLinkState::Closing(_) => { let _ = tx.send(Ok(())); - error!("Unexpected sender link state: closing - {}", id); + log::error!( + "{}: Unexpected sender link state: closing - {}", + self.tag(), + id + ); } } } else { let _ = tx.send(Ok(())); - error!("Sender link does not exist while detaching: {}", id); + log::error!( + "{}: Sender link does not exist while detaching: {}", + self.tag(), + id + ); } } @@ -669,7 +700,7 @@ impl SessionInner { } } // TODO: close session - error!("Unexpected receiver link state"); + log::error!("{}: Unexpected receiver link state", self.tag()); } /// Detach receiver link @@ -729,13 +760,21 @@ impl SessionInner { ReceiverLinkState::Closing(_) => { let _ = tx.send(Ok(())); let _ = self.links.remove(id as usize); - error!("Unexpected receiver link state: closing - {}", id); + log::error!( + "{}: Unexpected receiver link state: closing - {}", + self.tag(), + id + ); } ReceiverLinkState::OpeningLocal(_inner) => unimplemented!(), } } else { let _ = tx.send(Ok(())); - error!("Receiver link does not exist while detaching: {}", id); + log::error!( + "{}: Receiver link does not exist while detaching: {}", + self.tag(), + id + ); } } @@ -773,7 +812,7 @@ impl SessionInner { if let SenderLinkState::Established(ref link) = link { return Ok(Action::Flow(link.clone(), flow)); } - warn!("Received flow frame"); + log::warn!("{}: Received flow frame", self.tag()); } self.handle_flow(&flow, None); Ok(Action::None) @@ -790,20 +829,25 @@ impl SessionInner { let idx = if let Some(idx) = self.remote_handles.get(&transfer.handle()) { *idx } else { - error!("Transfer's link {:?} is unknown", transfer.handle()); + log::error!( + "{}: Transfer's link {:?} is unknown", + self.tag(), + transfer.handle() + ); return Err(AmqpProtocolError::UnknownLink(Frame::Transfer(transfer))); }; if let Some(link) = self.links.get_mut(idx) { match link { Either::Left(_) => { - error!("Got trasfer from sender link"); + log::error!("{}: Got trasfer from sender link", self.tag()); Err(AmqpProtocolError::Unexpected(Frame::Transfer(transfer))) } Either::Right(link) => match link { ReceiverLinkState::Opening(_) => { - error!( - "Got transfer for opening link: {} -> {}", + log::error!( + "{}: Got transfer for opening link: {} -> {}", + self.tag(), transfer.handle(), idx ); @@ -812,8 +856,9 @@ impl SessionInner { ))) } ReceiverLinkState::OpeningLocal(_) => { - error!( - "Got transfer for opening link: {} -> {}", + log::error!( + "{}: Got transfer for opening link: {} -> {}", + self.tag(), transfer.handle(), idx ); @@ -835,7 +880,7 @@ impl SessionInner { } Frame::Detach(detach) => Ok(self.handle_detach(detach)), frame => { - error!("Unexpected frame: {:?}", frame); + log::error!("{}: Unexpected frame: {:?}", self.tag(), frame); Ok(Action::None) } } @@ -852,8 +897,9 @@ impl SessionInner { match self.links.get_mut(*index) { Some(Either::Left(item)) => { if item.is_opening() { - trace!( - "Local sender link attached: {:?} {} -> {}, {:?}", + log::trace!( + "{}: Local sender link attached: {:?} {} -> {}, {:?}", + self.sink.tag(), name, index, attach.handle(), @@ -882,8 +928,9 @@ impl SessionInner { } Some(Either::Right(item)) => { if item.is_opening() { - trace!( - "Local receiver link attached: {:?} {} -> {}", + log::trace!( + "{}: Local receiver link attached: {:?} {} -> {}", + self.sink.tag(), name, index, attach.handle() @@ -897,7 +944,7 @@ impl SessionInner { let _ = tx.send(Ok(ReceiverLink::new(link))); } else { // TODO: close session - error!("Inconsistent session state, bug"); + log::error!("{}: Inconsistent session state, bug", self.tag()); } } } @@ -922,7 +969,7 @@ impl SessionInner { frame.handle() as usize } else { // should not happen, error - log::info!("Detaching unknown link: {:?}", frame); + log::info!("{}: Detaching unknown link: {:?}", self.tag(), frame); return Action::None; }; @@ -975,8 +1022,9 @@ impl SessionInner { true } SenderLinkState::OpeningRemote => { - error!( - "Detach frame received for unconfirmed sender link: {:?}", + log::error!( + "{}: Detach frame received for unconfirmed sender link: {:?}", + self.tag(), frame ); true @@ -1003,7 +1051,7 @@ impl SessionInner { let _ = tx.send(Err(AmqpProtocolError::LinkDetached(None))); } } else { - error!("Inconsistent session state, bug"); + log::error!("{}: Inconsistent session state, bug", self.tag()); } true @@ -1053,10 +1101,11 @@ impl SessionInner { let to = disposition.last().unwrap_or(from); if cfg!(feature = "frame-trace") { - trace!("Settle delivery: {:#?}", disposition); + log::trace!("{}: Settle delivery: {:#?}", self.tag(), disposition); } else { - trace!( - "Settle delivery from {} - {}, state {:?} settled: {:?}", + log::trace!( + "{}: Settle delivery from {} - {}, state {:?} settled: {:?}", + self.tag(), from, to, disposition.state(), @@ -1077,7 +1126,8 @@ impl SessionInner { val.ready(Ok(disposition.clone())); } else { log::info!( - "Could not find handler for {:?}, no: {:?}, unsettled: {:?}", + "{}: Could not find handler for {:?}, no: {:?}, unsettled: {:?}", + self.tag(), disposition, no, self.unsettled_deliveries.len(), @@ -1097,8 +1147,9 @@ impl SessionInner { .wrapping_add(flow.incoming_window()) .wrapping_sub(self.next_outgoing_id); - trace!( - "Session received credit {:?}. window: {}, pending: {}", + log::trace!( + "{}: Session received credit {:?}. window: {}, pending: {}", + self.tag(), flow.link_credit(), self.remote_incoming_window, self.pending_transfers.len(), @@ -1170,7 +1221,8 @@ impl SessionInner { ) { if self.remote_incoming_window == 0 { log::trace!( - "Remote window is 0, push to pending queue, hnd:{:?}", + "{}: Remote window is 0, push to pending queue, hnd:{:?}", + self.sink.tag(), link_handle ); self.pending_transfers.push_back(PendingTransfer { @@ -1231,7 +1283,7 @@ impl SessionInner { } log::trace!( - "Sending transfer over handle {}. window: {} delivery_id: {:?} delivery_tag: {:?}, more: {:?}, batchable: {:?}, settled: {:?}", + "{}: Sending transfer over handle {}. window: {} delivery_id: {:?} delivery_tag: {:?}, more: {:?}, batchable: {:?}, settled: {:?}", self.sink.tag(), link_handle, self.remote_incoming_window, transfer.delivery_id(), diff --git a/src/sndlink.rs b/src/sndlink.rs index a10e524..832c54e 100644 --- a/src/sndlink.rs +++ b/src/sndlink.rs @@ -294,7 +294,12 @@ impl SenderLinkInner { } pub(crate) fn detached(&mut self, err: AmqpProtocolError) { - trace!("Detaching sender link {:?} with error {:?}", self.name, err); + log::trace!( + "{}: Detaching sender link {:?} with error {:?}", + self.session.tag(), + self.name, + err + ); // drop pending transfers for tr in self.pending_transfers.drain(..) { @@ -344,8 +349,9 @@ impl SenderLinkInner { .wrapping_add(credit) .wrapping_sub(self.delivery_count); - trace!( - "Apply sender link {:?} flow, credit: {:?}({:?}) flow count: {:?}, delivery count: {:?} pending: {:?} new credit {:?}", + log::trace!( + "{}: Apply sender link {:?} flow, credit: {:?}({:?}) flow count: {:?}, delivery count: {:?} pending: {:?} new credit {:?}", + self.session.tag(), self.name, credit, delta, @@ -489,7 +495,8 @@ impl SenderLinkInner { let chunk = body.split_to(std::cmp::min(max_frame_size, body.len())); let tag = self.get_tag(tag); log::trace!( - "Body size if larger than max size, sending multiple tranfers for {:?}", + "{}: Body size if larger than max size, sending multiple tranfers for {:?}", + self.session.tag(), tag ); @@ -504,18 +511,26 @@ impl SenderLinkInner { // last chunk if body.is_empty() { - log::trace!("Sending last tranfer for {:?}", tag); + log::trace!("{}: Sending last tranfer for {:?}", self.session.tag(), tag); self.send_inner(chunk.into(), message_format, TransferState::Last); break; } - log::trace!("Sending chunk tranfer for {:?}", tag); + log::trace!( + "{}: Sending chunk tranfer for {:?}", + self.session.tag(), + tag + ); self.send_inner(chunk.into(), message_format, TransferState::Continue); } Ok(tag) } else { let tag = self.get_tag(tag); - log::trace!("Sending non-blocking tranfer for {:?}", tag); + log::trace!( + "{}: Sending non-blocking tranfer for {:?}", + self.session.tag(), + tag + ); let st = TransferState::Only(DeliveryPromise::new_link(link, tag.clone()), tag.clone()); self.send_inner(body, message_format, st); @@ -532,7 +547,7 @@ impl SenderLinkInner { ) { if self.link_credit == 0 || !self.pending_transfers.is_empty() { log::trace!( - "Sender link credit is 0({:?}), push to pending queue hnd:{}({} -> {}) {:?}, queue size: {}", + "{}: Sender link credit is 0({:?}), push to pending queue hnd:{}({} -> {}) {:?}, queue size: {}", self.session.tag(), self.link_credit, self.name, self.id, @@ -667,7 +682,7 @@ impl Future for Delivery { return match ready!(Pin::new(receiver).poll(cx)) { Ok(r) => Poll::Ready(r), Err(e) => { - trace!("delivery oneshot is gone: {:?}", e); + log::trace!("Delivery oneshot is gone: {:?}", e); Poll::Ready(Err(AmqpProtocolError::Disconnected)) } };