Skip to content

Commit

Permalink
[fix] [ml] Mark delete stuck due to switching cursor ledger fails (ap…
Browse files Browse the repository at this point in the history
  • Loading branch information
poorbarcode authored May 7, 2024
1 parent c30765e commit a7e1fcd
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2187,8 +2187,7 @@ public void operationFailed(ManagedLedgerException exception) {
if (ledger.isNoMessagesAfterPos(mdEntry.newPosition)) {
persistPositionToMetaStore(mdEntry, cb);
} else {
mdEntry.callback.markDeleteFailed(new ManagedLedgerException("Create new cursor ledger failed"),
mdEntry.ctx);
cb.operationFailed(new ManagedLedgerException("Switch new cursor ledger failed"));
}
} else {
persistPositionToLedger(cursorLedger, mdEntry, cb);
Expand Down Expand Up @@ -2861,9 +2860,19 @@ public void operationFailed(ManagedLedgerException exception) {
synchronized (pendingMarkDeleteOps) {
// At this point we don't have a ledger ready
STATE_UPDATER.set(ManagedCursorImpl.this, State.NoLedger);
// Note: if the stat is NoLedger, will persist the mark deleted position to metadata store.
// Before giving up, try to persist the position in the metadata store.
flushPendingMarkDeletes();
// There are two case may cause switch ledger fails.
// 1. No enough BKs; BKs are in read-only mode...
// 2. Write ZK fails.
// Regarding the case "No enough BKs", try to persist the position in the metadata store before
// giving up.
if (!(exception instanceof MetaStoreException)) {
flushPendingMarkDeletes();
} else {
while (!pendingMarkDeleteOps.isEmpty()) {
MarkDeleteEntry entry = pendingMarkDeleteOps.poll();
entry.callback.markDeleteFailed(exception, entry.ctx);
}
}
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,48 @@ void testPersistentMarkDeleteIfCreateCursorLedgerFailed() throws Exception {
ml.delete();
}

@Test
void testSwitchLedgerFailed() throws Exception {
final String cursorName = "c1";
final String mlName = UUID.randomUUID().toString().replaceAll("-", "");
final ManagedLedgerConfig mlConfig = new ManagedLedgerConfig();
mlConfig.setMaxEntriesPerLedger(1);
mlConfig.setMetadataMaxEntriesPerLedger(1);
mlConfig.setThrottleMarkDelete(Double.MAX_VALUE);
ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open(mlName, mlConfig);
ManagedCursor cursor = ml.openCursor(cursorName);

List<Position> positionList = new ArrayList<>();
for (int i = 0; i < 10; i++) {
positionList.add(ml.addEntry(("entry-" + i).getBytes(Encoding)));
}

// Inject an error when persistent at the third time.
AtomicInteger persistentCounter = new AtomicInteger();
metadataStore.failConditional(new MetadataStoreException.BadVersionException("mock error"), (op, path) -> {
if (path.equals(String.format("/managed-ledgers/%s/%s", mlName, cursorName))
&& persistentCounter.incrementAndGet() == 3) {
log.info("Trigger an error");
return true;
}
return false;
});

// Verify: the cursor can be recovered after it fails once.
int failedCount = 0;
for (Position position : positionList) {
try {
cursor.markDelete(position);
} catch (Exception ex) {
failedCount++;
}
}
assertEquals(failedCount, 1);

// cleanup.
ml.delete();
}

@Test
void testPersistentMarkDeleteIfSwitchCursorLedgerFailed() throws Exception {
final int entryCount = 10;
Expand Down

0 comments on commit a7e1fcd

Please sign in to comment.