Skip to content

Commit

Permalink
Change handling of unordered acks
Browse files Browse the repository at this point in the history
  • Loading branch information
OptimumCode committed May 27, 2024
1 parent 2dfa84c commit fcc144e
Showing 1 changed file with 14 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1182,23 +1182,28 @@ private void removeInflightRequests(long deliveryTag, boolean multiple) {
if (multiple) {
inflightRequests.headMap(deliveryTag, true).clear();
} else {
var head = inflightRequests.headMap(deliveryTag, true);
// received the confirmation for oldest publication
// check all earlier confirmation that were confirmed but not removed
if (head.size() == 1) {
head.clear();
long oldestPublication = Objects.requireNonNullElse(inflightRequests.firstKey(), deliveryTag);
if (oldestPublication == deliveryTag) {
// received the confirmation for oldest publication
// check all earlier confirmation that were confirmed but not removed
Iterator<Map.Entry<Long, PublicationHolder>> tailIterator =
inflightRequests.tailMap(deliveryTag, false).entrySet().iterator();
inflightRequests.tailMap(deliveryTag, true).entrySet().iterator();
while (tailIterator.hasNext()) {
if (!tailIterator.next().getValue().isConfirmed()) {
Map.Entry<Long, PublicationHolder> next = tailIterator.next();
long key = next.getKey();
PublicationHolder holder = next.getValue();
if (key > deliveryTag && !holder.isConfirmed()) {
break;
}
tailIterator.remove();
}
} else if (!head.isEmpty()) {
} else {
// this is not the oldest publication
// mark as confirm but wait for oldest to be confirmed
head.lastEntry().getValue().confirmed();
var holder = inflightRequests.get(deliveryTag);
if (holder != null) {
holder.confirmed();
}
}
}
if (inflightRequests.isEmpty()) {
Expand Down

0 comments on commit fcc144e

Please sign in to comment.