Skip to content

Commit

Permalink
Delay publication removal until all older publications are confirmed
Browse files Browse the repository at this point in the history
  • Loading branch information
OptimumCode committed May 27, 2024
1 parent 9369297 commit 2dfa84c
Showing 1 changed file with 36 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1091,12 +1091,25 @@ public void publishUnconfirmedMessages() throws IOException {
private static class PublicationHolder {
private final ChannelPublisher publisher;
private final byte[] payload;
private volatile boolean confirmed;

private PublicationHolder(ChannelPublisher publisher, byte[] payload) {
this.publisher = publisher;
this.payload = payload;
}

boolean isConfirmed() {
return confirmed;
}

void confirmed() {
confirmed = true;
}

void reset() {
confirmed = false;
}

public void publish(Channel channel) throws IOException {
publisher.publish(channel, payload);
}
Expand Down Expand Up @@ -1165,15 +1178,35 @@ private void removeInflightRequests(long deliveryTag, boolean multiple) {
}
lock.writeLock().lock();
try {
int initialSize = inflightRequests.size();
if (multiple) {
inflightRequests.headMap(deliveryTag, true).clear();
} else {
inflightRequests.remove(deliveryTag);
var head = inflightRequests.headMap(deliveryTag, true);
// received the confirmation for oldest publication
// check all earlier confirmation that were confirmed but not removed
if (head.size() == 1) {
head.clear();
Iterator<Map.Entry<Long, PublicationHolder>> tailIterator =
inflightRequests.tailMap(deliveryTag, false).entrySet().iterator();
while (tailIterator.hasNext()) {
if (!tailIterator.next().getValue().isConfirmed()) {
break;
}
tailIterator.remove();
}
} else if (!head.isEmpty()) {
// this is not the oldest publication
// mark as confirm but wait for oldest to be confirmed
head.lastEntry().getValue().confirmed();
}
}
if (inflightRequests.isEmpty()) {
allMessagesConfirmed.signalAll();
}
hasSpaceToWriteCondition.signalAll();
if (inflightRequests.size() != initialSize) {
hasSpaceToWriteCondition.signalAll();
}
} finally {
lock.writeLock().unlock();
}
Expand Down Expand Up @@ -1202,6 +1235,7 @@ public void transferUnconfirmedTo(Deque<PublicationHolder> redelivery) {
lock.writeLock().lock();
try {
for (PublicationHolder payload : inflightRequests.descendingMap().values()) {
payload.reset();
redelivery.addFirst(payload);
}
inflightRequests.clear();
Expand Down

0 comments on commit 2dfa84c

Please sign in to comment.