Skip to content

Commit

Permalink
Fix lost message issue due to ledger rollover. (apache#14664)
Browse files Browse the repository at this point in the history
(cherry picked from commit ad2cc2d)
  • Loading branch information
Technoboy- authored and codelipenghui committed Mar 12, 2022
1 parent c8a0dff commit 93e15d9
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -774,8 +774,8 @@ private synchronized void internalAsyncAddEntry(OpAddEntry addOperation) {
}
} else if (state == State.ClosedLedger) {
// No ledger and no pending operations. Create a new ledger
log.info("[{}] Creating a new ledger", name);
if (STATE_UPDATER.compareAndSet(this, State.ClosedLedger, State.CreatingLedger)) {
log.info("[{}] Creating a new ledger", name);
this.lastLedgerCreationInitiationTimestamp = System.currentTimeMillis();
mbean.startDataLedgerCreateOp();
asyncCreateLedger(bookKeeper, config, digestType, this, Collections.emptyMap());
Expand Down Expand Up @@ -1644,7 +1644,7 @@ synchronized void ledgerClosed(final LedgerHandle lh) {

synchronized void createLedgerAfterClosed() {
if (isNeededCreateNewLedgerAfterCloseLedger()) {
log.info("[{}] Creating a new ledger", name);
log.info("[{}] Creating a new ledger after closed", name);
STATE_UPDATER.set(this, State.CreatingLedger);
this.lastLedgerCreationInitiationTimestamp = System.currentTimeMillis();
mbean.startDataLedgerCreateOp();
Expand All @@ -1667,8 +1667,8 @@ boolean isNeededCreateNewLedgerAfterCloseLedger() {
@Override
public void rollCurrentLedgerIfFull() {
log.info("[{}] Start checking if current ledger is full", name);
if (currentLedgerEntries > 0 && currentLedgerIsFull()) {
STATE_UPDATER.set(this, State.ClosingLedger);
if (currentLedgerEntries > 0 && currentLedgerIsFull()
&& STATE_UPDATER.compareAndSet(this, State.LedgerOpened, State.ClosingLedger)) {
currentLedger.asyncClose(new AsyncCallback.CloseCallback() {
@Override
public void closeComplete(int rc, LedgerHandle lh, Object o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2240,6 +2240,9 @@ void testFindNewestMatchingAfterLedgerRollover() throws Exception {
// roll a new ledger
int numLedgersBefore = ledger.getLedgersInfo().size();
ledger.getConfig().setMaxEntriesPerLedger(1);
Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state");
stateUpdater.setAccessible(true);
stateUpdater.set(ledger, ManagedLedgerImpl.State.LedgerOpened);
ledger.rollCurrentLedgerIfFull();
Awaitility.await().atMost(20, TimeUnit.SECONDS)
.until(() -> ledger.getLedgersInfo().size() > numLedgersBefore);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1969,6 +1969,9 @@ public void testDeletionAfterLedgerClosedAndRetention() throws Exception {
c1.skipEntries(1, IndividualDeletedEntries.Exclude);
c2.skipEntries(1, IndividualDeletedEntries.Exclude);
// let current ledger close
Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state");
stateUpdater.setAccessible(true);
stateUpdater.set(ml, ManagedLedgerImpl.State.LedgerOpened);
ml.rollCurrentLedgerIfFull();
// let retention expire
Thread.sleep(1500);
Expand Down Expand Up @@ -2238,6 +2241,9 @@ public void testGetPositionAfterN() throws Exception {
managedCursor.markDelete(positionMarkDelete);

//trigger ledger rollover and wait for the new ledger created
Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state");
stateUpdater.setAccessible(true);
stateUpdater.set(managedLedger, ManagedLedgerImpl.State.LedgerOpened);
managedLedger.rollCurrentLedgerIfFull();
Awaitility.await().untilAsserted(() -> assertEquals(managedLedger.getLedgersInfo().size(), 3));
assertEquals(5, managedLedger.getLedgersInfoAsList().get(0).getEntries());
Expand Down Expand Up @@ -3096,7 +3102,7 @@ public void testManagedLedgerRollOverIfFull() throws Exception {
ledger.addEntry(new byte[1024 * 1024]);
}

Assert.assertEquals(ledger.getLedgersInfoAsList().size(), msgNum / 2);
Awaitility.await().untilAsserted(() -> Assert.assertEquals(ledger.getLedgersInfoAsList().size(), msgNum / 2));
List<Entry> entries = cursor.readEntries(msgNum);
Assert.assertEquals(msgNum, entries.size());

Expand All @@ -3107,9 +3113,12 @@ public void testManagedLedgerRollOverIfFull() throws Exception {

// all the messages have benn acknowledged
// and all the ledgers have been removed except the last ledger
Thread.sleep(1000);
Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 1);
Assert.assertEquals(ledger.getTotalSize(), 0);
Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state");
stateUpdater.setAccessible(true);
stateUpdater.set(ledger, ManagedLedgerImpl.State.LedgerOpened);
ledger.rollCurrentLedgerIfFull();
Awaitility.await().untilAsserted(() -> Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 1));
Awaitility.await().untilAsserted(() -> Assert.assertEquals(ledger.getTotalSize(), 0));
}

@Test
Expand All @@ -3127,6 +3136,26 @@ public void testLedgerReachMaximumRolloverTime() throws Exception {
.until(() -> firstLedgerId != ml.addEntry("test".getBytes()).getLedgerId());
}

@Test
public void testLedgerNotRolloverWithoutOpenState() throws Exception {
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setMaxEntriesPerLedger(2);

ManagedLedgerImpl ml = spy((ManagedLedgerImpl)factory.open("ledger-not-rollover-without-open-state", config));
ml.addEntry("test1".getBytes()).getLedgerId();
long ledgerId2 = ml.addEntry("test2".getBytes()).getLedgerId();
Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state");
stateUpdater.setAccessible(true);
// Set state to CreatingLedger to avoid rollover
stateUpdater.set(ml, ManagedLedgerImpl.State.CreatingLedger);
ml.rollCurrentLedgerIfFull();
Field currentLedger = ManagedLedgerImpl.class.getDeclaredField("currentLedger");
currentLedger.setAccessible(true);
LedgerHandle lh = (LedgerHandle) currentLedger.get(ml);
Awaitility.await()
.until(() -> ledgerId2 == lh.getId());
}

@Test
public void testExpiredLedgerDeletionAfterManagedLedgerRestart() throws Exception {
ManagedLedgerConfig config = new ManagedLedgerConfig();
Expand Down Expand Up @@ -3488,5 +3517,4 @@ public void testOffloadTaskCancelled() throws Exception {
Assert.assertFalse(ledgerInfo.get(100, TimeUnit.MILLISECONDS).getOffloadContext().getComplete());
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.service;

import java.lang.reflect.Field;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
Expand Down Expand Up @@ -98,6 +99,9 @@ public void testCurrentLedgerRolloverIfFull() throws Exception {
});

// trigger a ledger rollover
Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state");
stateUpdater.setAccessible(true);
stateUpdater.set(managedLedger, ManagedLedgerImpl.State.LedgerOpened);
managedLedger.rollCurrentLedgerIfFull();

// the last ledger will be closed and removed and we have one ledger for empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,11 @@ public void testRecoverSequenceId(boolean isUseManagedLedgerProperties) throws E
ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) field.get(mlTransactionLog);
Position position = managedLedger.getLastConfirmedEntry();
if (isUseManagedLedgerProperties) {
Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state");
stateUpdater.setAccessible(true);
stateUpdater.set(managedLedger, ManagedLedgerImpl.State.LedgerOpened);
managedLedger.rollCurrentLedgerIfFull();
Awaitility.await().until(() -> {
managedLedger.rollCurrentLedgerIfFull();
return !managedLedger.ledgerExists(position.getLedgerId());
});
}
Expand Down

0 comments on commit 93e15d9

Please sign in to comment.