Skip to content

Commit

Permalink
Fix cradit handling
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed Oct 9, 2023
1 parent 1c6bb68 commit 114f1dc
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 17 deletions.
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [0.8.4] - 2023-10-09

* Fix cradit handling

## [0.8.2] - 2023-08-10

* Update ntex deps
Expand Down
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-amqp"
version = "0.8.3"
version = "0.8.4"
authors = ["ntex contributors <[email protected]>"]
description = "AMQP 1.0 Client/Server framework"
documentation = "https://docs.rs/ntex-amqp"
Expand All @@ -24,7 +24,7 @@ default = []
frame-trace = []

[dependencies]
ntex = "0.7.3"
ntex = "0.7.4"
ntex-amqp-codec = "0.9.0"

bitflags = "2.4"
Expand All @@ -36,7 +36,7 @@ uuid = { version = "1", features = ["v4"] }

[dev-dependencies]
env_logger = "0.10"
ntex = { version = "0.7.3", features = ["tokio"] }
ntex = { version = "0.7.4", features = ["tokio"] }

[patch.crates-io]
ntex-amqp = { path = "." }
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#![deny(rust_2018_idioms, warnings, unreachable_pub)]
#![allow(clippy::type_complexity)]
#![allow(clippy::type_complexity, clippy::let_underscore_future)]

#[macro_use]
extern crate derive_more;
Expand Down
13 changes: 8 additions & 5 deletions src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ pub(crate) enum TransferState {
}

impl TransferState {
fn more(&self) -> bool {
pub(super) fn more(&self) -> bool {
!matches!(self, TransferState::Only(_, _) | TransferState::Last)
}
}
Expand Down Expand Up @@ -1094,8 +1094,8 @@ impl SessionInner {
self.remote_incoming_window = flow
.next_incoming_id()
.unwrap_or(INITIAL_OUTGOING_ID)
.saturating_add(flow.incoming_window())
.saturating_sub(self.next_outgoing_id);
.wrapping_add(flow.incoming_window())
.wrapping_sub(self.next_outgoing_id);

trace!(
"Session received credit {:?}. window: {}, pending: {}",
Expand Down Expand Up @@ -1181,7 +1181,10 @@ impl SessionInner {
message_format,
});
} else {
self.remote_incoming_window -= 1;
let more = state.more();
if !more {
self.remote_incoming_window -= 1;
}

let settled2 = settled.unwrap_or(false);
let tr_settled = if settled2 {
Expand Down Expand Up @@ -1210,7 +1213,7 @@ impl SessionInner {
TransferState::First(promise, delivery_tag)
| TransferState::Only(promise, delivery_tag) => {
let delivery_id = self.next_outgoing_id;
self.next_outgoing_id += 1;
self.next_outgoing_id = self.next_outgoing_id.wrapping_add(1);

transfer.0.more = more;
transfer.0.batchable = more;
Expand Down
19 changes: 12 additions & 7 deletions src/sndlink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,8 +341,8 @@ impl SenderLinkInner {
let delta = flow
.delivery_count()
.unwrap_or(0)
.saturating_add(credit)
.saturating_sub(self.delivery_count);
.wrapping_add(credit)
.wrapping_sub(self.delivery_count);

trace!(
"Apply sender link {:?} flow, credit: {:?}({:?}) flow count: {:?}, delivery count: {:?} pending: {:?} new credit {:?}",
Expand All @@ -361,8 +361,10 @@ impl SenderLinkInner {
// credit became available => drain pending_transfers
while self.link_credit > 0 {
if let Some(transfer) = self.pending_transfers.pop_front() {
self.link_credit -= 1;
self.delivery_count = self.delivery_count.saturating_add(1);
if !transfer.state.more() {
self.link_credit -= 1;
}
self.delivery_count = self.delivery_count.wrapping_add(1);
session.send_transfer(
self.id as u32,
transfer.body,
Expand Down Expand Up @@ -545,8 +547,11 @@ impl SenderLinkInner {
body: Some(body),
});
} else {
self.link_credit -= 1;
self.delivery_count = self.delivery_count.saturating_add(1);
// reduce link credit only if transfer is last
if !state.more() {
self.link_credit -= 1;
}
self.delivery_count = self.delivery_count.wrapping_add(1);
self.session.inner.get_mut().send_transfer(
self.id as u32,
Some(body),
Expand All @@ -572,7 +577,7 @@ impl SenderLinkInner {
fn get_tag(&mut self, tag: Option<Bytes>) -> Bytes {
tag.unwrap_or_else(|| {
let delivery_tag = self.delivery_tag;
self.delivery_tag = delivery_tag.saturating_add(1);
self.delivery_tag = delivery_tag.wrapping_add(1);

let mut buf = self.pool.buf_with_capacity(16);
buf.put_u32(delivery_tag);
Expand Down
2 changes: 1 addition & 1 deletion tests/test_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,9 @@ async fn test_session_end() -> std::io::Result<()> {
.await
.unwrap();
link.send(Bytes::from(b"test".as_ref())).await.unwrap();
session.end().await.unwrap();
sleep(Millis(150)).await;

session.end().await.unwrap();
assert_eq!(link_names.lock().unwrap()[0], "test");
assert!(sink.is_opened());

Expand Down

0 comments on commit 114f1dc

Please sign in to comment.