Skip to content

Commit

Permalink
Filter out deleted entries before read entries from bookie.
Browse files Browse the repository at this point in the history
  • Loading branch information
dao-jun committed Dec 16, 2023
1 parent 4d42fd5 commit 81de6c1
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2125,7 +2125,12 @@ private void internalReadFromLedger(ReadHandle ledger, OpReadEntry opReadEntry)
public void asyncReadEntry(ReadHandle ledger, Set<Long> entryIds, OpReadEntry opReadEntry,
Object ctx) {
if (entryIds.isEmpty()) {
opReadEntry.readEntriesComplete(Collections.emptyList(), ctx);
// If the entryIds is empty, should not move the `readPosition` of `cursor`.
// OpReadEntry#internalReadEntriesComplete will move the `readPosition` of `cursor`
// to the next position of `lastEntry`, so here uses the previous position of `readPosition`
// to offset the impact of OpReadEntry#internalReadEntriesComplete.
PositionImpl previous = this.getPreviousPosition(opReadEntry.readPosition);
opReadEntry.internalReadEntriesComplete(Collections.emptyList(), ctx, previous);
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,12 @@
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import com.google.common.collect.Lists;
import com.google.common.collect.Range;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;

import java.lang.reflect.Field;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -69,6 +71,7 @@
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import lombok.Cleanup;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
Expand Down Expand Up @@ -123,7 +126,7 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {

@DataProvider(name = "useOpenRangeSet")
public static Object[][] useOpenRangeSet() {
return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
return new Object[][]{{Boolean.TRUE}, {Boolean.FALSE}};
}


Expand All @@ -142,7 +145,7 @@ public void testCloseCursor() throws Exception {
ledger.addEntry(new byte[]{5});
// Persistent cursor info to ledger.
c1.delete(PositionImpl.get(c1.getReadPosition().getLedgerId(), c1.getReadPosition().getEntryId()));
Awaitility.await().until(() ->c1.getStats().getPersistLedgerSucceed() > 0);
Awaitility.await().until(() -> c1.getStats().getPersistLedgerSucceed() > 0);
// Make cursor ledger can not work.
closeCursorLedger(c1);
c1.delete(PositionImpl.get(c1.getReadPosition().getLedgerId(), c1.getReadPosition().getEntryId() + 2));
Expand Down Expand Up @@ -302,7 +305,7 @@ void testPersistentMarkDeleteIfSwitchCursorLedgerFailed() throws Exception {
bkc.failNow(BKException.Code.NoBookieAvailableException);
// Verify the cursor status will be persistent to ZK even if the cursor ledger creation always fails.
// This time ZK will be written due to catch up.
Position lastEntry = positions.get(entryCount -1);
Position lastEntry = positions.get(entryCount - 1);
cursor.markDelete(lastEntry);
long persistZookeeperSucceed2 = cursor.getStats().getPersistZookeeperSucceed();
assertTrue(persistZookeeperSucceed2 > persistZookeeperSucceed1);
Expand Down Expand Up @@ -769,7 +772,7 @@ void testResetCursor() throws Exception {
@Test(timeOut = 20000)
void testResetCursor1() throws Exception {
ManagedLedger ledger = factory.open("my_test_move_cursor_ledger",
new ManagedLedgerConfig().setMaxEntriesPerLedger(2));
new ManagedLedgerConfig().setMaxEntriesPerLedger(2));
ManagedCursor cursor = ledger.openCursor("trc1");
PositionImpl actualEarliest = (PositionImpl) ledger.addEntry("dummy-entry-1".getBytes(Encoding));
ledger.addEntry("dummy-entry-2".getBytes(Encoding));
Expand Down Expand Up @@ -1601,12 +1604,17 @@ void testFilteringReadEntries() throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(3));
ManagedCursor cursor = ledger.openCursor("c1");

/* Position p1 = */ledger.addEntry("entry1".getBytes());
/* Position p2 = */ledger.addEntry("entry2".getBytes());
/* Position p3 = */ledger.addEntry("entry3".getBytes());
/* Position p4 = */ledger.addEntry("entry4".getBytes());
/* Position p1 = */
ledger.addEntry("entry1".getBytes());
/* Position p2 = */
ledger.addEntry("entry2".getBytes());
/* Position p3 = */
ledger.addEntry("entry3".getBytes());
/* Position p4 = */
ledger.addEntry("entry4".getBytes());
Position p5 = ledger.addEntry("entry5".getBytes());
/* Position p6 = */ledger.addEntry("entry6".getBytes());
/* Position p6 = */
ledger.addEntry("entry6".getBytes());

assertEquals(cursor.getNumberOfEntries(), 6);
assertEquals(cursor.getNumberOfEntriesInBacklog(false), 6);
Expand Down Expand Up @@ -1660,9 +1668,12 @@ void testCountingWithDeletedEntries() throws Exception {
ManagedCursor cursor = ledger.openCursor("c1");

Position p1 = ledger.addEntry("entry1".getBytes());
/* Position p2 = */ledger.addEntry("entry2".getBytes());
/* Position p3 = */ledger.addEntry("entry3".getBytes());
/* Position p4 = */ledger.addEntry("entry4".getBytes());
/* Position p2 = */
ledger.addEntry("entry2".getBytes());
/* Position p3 = */
ledger.addEntry("entry3".getBytes());
/* Position p4 = */
ledger.addEntry("entry4".getBytes());
Position p5 = ledger.addEntry("entry5".getBytes());
Position p6 = ledger.addEntry("entry6".getBytes());
Position p7 = ledger.addEntry("entry7".getBytes());
Expand Down Expand Up @@ -2017,19 +2028,19 @@ void testFindNewestMatching() throws Exception {

@DataProvider(name = "testScanValues")
public static Object[][] testScanValues() {
return new Object[][] {
{ 10, 1 }, // single entry
{ 10, 3 }, // batches with remainder
{ 10, 5 }, // batches, half
{ 10, 1000 }, // big batch size, scan whole ledger in one round
{ 0, 10 } // empty ledger
return new Object[][]{
{10, 1}, // single entry
{10, 3}, // batches with remainder
{10, 5}, // batches, half
{10, 1000}, // big batch size, scan whole ledger in one round
{0, 10} // empty ledger
};
}

@Test(dataProvider = "testScanValues", timeOut = 30000)
void testScan(int numEntries, int batchSize) throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger_scan_" + numEntries
+ "_" +batchSize);
+ "_" + batchSize);

ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor("c1");
List<Position> positions = new ArrayList<>();
Expand Down Expand Up @@ -2130,7 +2141,7 @@ void testScan(int numEntries, int batchSize) throws Exception {
positionsFinal.add(entry.getPosition());
return true;
}), batchSize, Long.MAX_VALUE, Long.MAX_VALUE).get());
assertEquals(0,positionsFinal.size());
assertEquals(0, positionsFinal.size());

}

Expand Down Expand Up @@ -2289,7 +2300,7 @@ void testFindNewestMatchingEdgeCase1() throws Exception {

ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor("c1");
assertNull(c1.findNewestMatching(
entry -> Arrays.equals(entry.getDataAndRelease(), "expired".getBytes(Encoding))));
entry -> Arrays.equals(entry.getDataAndRelease(), "expired".getBytes(Encoding))));
}

@Test(timeOut = 20000)
Expand Down Expand Up @@ -2598,7 +2609,7 @@ public void findEntryComplete(Position position, Object ctx) {

@Override
public void findEntryFailed(ManagedLedgerException exception, Optional<Position> failedReadPosition,
Object ctx) {
Object ctx) {
result.exception = exception;
counter.countDown();
}
Expand All @@ -2624,7 +2635,7 @@ public void findEntryFailed(ManagedLedgerException exception, Optional<Position>
}

void internalTestFindNewestMatchingAllEntries(final String name, final int entriesPerLedger,
final int expectedEntryId) throws Exception {
final int expectedEntryId) throws Exception {
final String ledgerAndCursorName = name;
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setRetentionSizeInMB(10);
Expand Down Expand Up @@ -2718,7 +2729,7 @@ void testReplayEntries() throws Exception {
assertTrue((Arrays.equals(entries.get(0).getData(), "entry1".getBytes(Encoding))
&& Arrays.equals(entries.get(1).getData(), "entry3".getBytes(Encoding)))
|| (Arrays.equals(entries.get(0).getData(), "entry3".getBytes(Encoding))
&& Arrays.equals(entries.get(1).getData(), "entry1".getBytes(Encoding))));
&& Arrays.equals(entries.get(1).getData(), "entry1".getBytes(Encoding))));
entries.forEach(Entry::release);

// 3. Fail on reading non-existing position
Expand Down Expand Up @@ -2750,7 +2761,7 @@ void testGetLastIndividualDeletedRange() throws Exception {

ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor("c1");
PositionImpl markDeletedPosition = (PositionImpl) c1.getMarkDeletedPosition();
for(int i = 0; i < 10; i++) {
for (int i = 0; i < 10; i++) {
ledger.addEntry(("entry" + i).getBytes(Encoding));
}
PositionImpl p1 = PositionImpl.get(markDeletedPosition.getLedgerId(), markDeletedPosition.getEntryId() + 1);
Expand All @@ -2777,7 +2788,7 @@ void testTrimDeletedEntries() throws ManagedLedgerException, InterruptedExceptio

ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor("c1");
PositionImpl markDeletedPosition = (PositionImpl) c1.getMarkDeletedPosition();
for(int i = 0; i < 10; i++) {
for (int i = 0; i < 10; i++) {
ledger.addEntry(("entry" + i).getBytes(Encoding));
}
PositionImpl p1 = PositionImpl.get(markDeletedPosition.getLedgerId(), markDeletedPosition.getEntryId() + 1);
Expand Down Expand Up @@ -3097,9 +3108,10 @@ public void testOutOfOrderDeletePersistenceAfterCrash() throws Exception {
* Verifies that {@link ManagedCursorImpl#createNewMetadataLedger()} cleans up orphan ledgers if fails to switch new
* ledger
* </pre>
*
* @throws Exception
*/
@Test(timeOut=5000)
@Test(timeOut = 5000)
public void testLeakFailedLedgerOfManageCursor() throws Exception {

ManagedLedgerConfig mlConfig = new ManagedLedgerConfig();
Expand Down Expand Up @@ -3145,7 +3157,7 @@ public void operationFailed(ManagedLedgerException exception) {

try {
bkc.openLedgerNoRecovery(ledgerId, DigestType.fromApiDigestType(mlConfig.getDigestType()),
mlConfig.getPassword());
mlConfig.getPassword());
fail("ledger should have deleted due to update-cursor failure");
} catch (BKException e) {
// ok
Expand Down Expand Up @@ -3368,9 +3380,13 @@ public void testEstimatedUnackedSize() throws Exception {
byte[] entryData = new byte[5];

// write 15 entries, saving position of 5th
for (int i = 0; i < 4; i++) { ledger.addEntry(entryData); }
for (int i = 0; i < 4; i++) {
ledger.addEntry(entryData);
}
Position deleteAt = ledger.addEntry(entryData);
for (int i = 0; i < 10; i++) { ledger.addEntry(entryData); }
for (int i = 0; i < 10; i++) {
ledger.addEntry(entryData);
}

assertEquals(cursor.getEstimatedSizeSinceMarkDeletePosition(), 15 * entryData.length);

Expand Down Expand Up @@ -3552,6 +3568,7 @@ public void operationFailed(ManagedLedgerException exception) {
assertEquals(c.getReadPosition(), readPositionBeforeRecover);
assertEquals(c.getNumberOfEntries(), 2L);
}

@Test
void testAlwaysInactive() throws Exception {
ManagedLedger ml = factory.open("testAlwaysInactive");
Expand Down Expand Up @@ -3764,17 +3781,17 @@ private void deleteBatchIndex(ManagedCursor cursor, Position position, int batch
pos.ackSet = bitSet.toLongArray();

cursor.asyncDelete(pos,
new DeleteCallback() {
@Override
public void deleteComplete(Object ctx) {
latch.countDown();
}
new DeleteCallback() {
@Override
public void deleteComplete(Object ctx) {
latch.countDown();
}

@Override
public void deleteFailed(ManagedLedgerException exception, Object ctx) {
latch.countDown();
}
}, null);
@Override
public void deleteFailed(ManagedLedgerException exception, Object ctx) {
latch.countDown();
}
}, null);
latch.await();
pos.ackSet = null;
}
Expand Down Expand Up @@ -4008,7 +4025,6 @@ public void deleteFailed(ManagedLedgerException exception, Object ctx) {
}



@Test
public void testFlushCursorAfterError() throws Exception {
ManagedLedgerConfig config = new ManagedLedgerConfig();
Expand Down Expand Up @@ -4081,7 +4097,7 @@ public void testConsistencyOfIndividualMessages() throws Exception {
}

assertEquals(c1.getTotalNonContiguousDeletedMessagesRange(), 0);
assertEquals(c1.getMarkDeletedPosition(), positions.get(positions.size() -1));
assertEquals(c1.getMarkDeletedPosition(), positions.get(positions.size() - 1));
}

@Test
Expand Down Expand Up @@ -4589,7 +4605,8 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
@Test
public void testReadEntriesWithSkipDeletedEntriesAndWithSkipConditions() throws Exception {
@Cleanup
ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testReadEntriesWithSkipDeletedEntries");
ManagedLedgerImpl ledger = (ManagedLedgerImpl)
factory.open("testReadEntriesWithSkipDeletedEntriesAndWithSkipConditions");
ledger = Mockito.spy(ledger);

List<Long> actualReadEntryIds = new ArrayList<>();
Expand Down Expand Up @@ -4674,6 +4691,47 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
}


@Test
public void testReadEmptyEntryIds() throws Exception {
@Cleanup
ManagedLedgerImpl ledger = (ManagedLedgerImpl)
factory.open("testReadEntriesWithSkipDeletedEntriesAndWithSkipConditions");
@Cleanup
ManagedCursor cursor = ledger.openCursor("c");

int entries = 20;
Position maxPosition = PositionImpl.EARLIEST;
for (int i = 0; i < entries; i++) {
maxPosition = ledger.addEntry(new byte[1024]);
}

CompletableFuture<Void> future = new CompletableFuture<>();
Position cursorPosition = cursor.getReadPosition();
ReadHandle handle = ledger.getLedgerHandle(maxPosition.getLedgerId()).get();
OpReadEntry opReadEntry = OpReadEntry.create(
(ManagedCursorImpl) cursor,
PositionImpl.get(cursorPosition.getLedgerId(), cursorPosition.getEntryId()),
0, // Set count = 0.
new ReadEntriesCallback() {
@Override
public void readEntriesComplete(List<Entry> entries, Object ctx) {
future.complete(null);
}

@Override
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
future.completeExceptionally(exception);
}
}, null, PositionImpl.get(maxPosition.getLedgerId(), maxPosition.getEntryId()), null);
ledger.asyncReadEntry(handle, Collections.emptySet(), opReadEntry, null);
future.get();

// `readPosition` should not be moved.
Position newCursorReadPosition = cursor.getReadPosition();
assertTrue(newCursorReadPosition.getLedgerId() == cursorPosition.getLedgerId()
&& newCursorReadPosition.getEntryId() == cursorPosition.getEntryId());
}


private static final Logger log = LoggerFactory.getLogger(ManagedCursorTest.class);
}

0 comments on commit 81de6c1

Please sign in to comment.