Skip to content

Commit

Permalink
[fix][broker] Support large number of unack message store for cursor …
Browse files Browse the repository at this point in the history
…recovery

Fix test
  • Loading branch information
rdhabalia committed Sep 27, 2024
1 parent b1c5d96 commit 1b7bb32
Show file tree
Hide file tree
Showing 9 changed files with 248 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -505,8 +505,10 @@ public int getMaxUnackedRangesToPersistInMetadataStore() {
return maxUnackedRangesToPersistInMetadataStore;
}

public void setMaxUnackedRangesToPersistInMetadataStore(int maxUnackedRangesToPersistInMetadataStore) {
public ManagedLedgerConfig setMaxUnackedRangesToPersistInMetadataStore(
int maxUnackedRangesToPersistInMetadataStore) {
this.maxUnackedRangesToPersistInMetadataStore = maxUnackedRangesToPersistInMetadataStore;
return this;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
Expand Down Expand Up @@ -91,12 +92,15 @@
import org.apache.bookkeeper.mledger.ScanOutcome;
import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.LongListMap;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.LongProperty;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.MessageRange;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo.Builder;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.StringProperty;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
import org.apache.pulsar.common.util.DateFormatter;
Expand Down Expand Up @@ -606,9 +610,7 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac
}

Position position = PositionFactory.create(positionInfo.getLedgerId(), positionInfo.getEntryId());
if (positionInfo.getIndividualDeletedMessagesCount() > 0) {
recoverIndividualDeletedMessages(positionInfo.getIndividualDeletedMessagesList());
}
recoverIndividualDeletedMessages(positionInfo);
if (getConfig().isDeletionAtBatchIndexLevelEnabled()
&& positionInfo.getBatchedEntryDeletionIndexInfoCount() > 0) {
recoverBatchDeletedIndexes(positionInfo.getBatchedEntryDeletionIndexInfoList());
Expand All @@ -627,6 +629,45 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac
}
}

public void recoverIndividualDeletedMessages(PositionInfo positionInfo) {
if (positionInfo.getIndividualDeletedMessagesCount() > 0) {
recoverIndividualDeletedMessages(positionInfo.getIndividualDeletedMessagesList());
} else if (positionInfo.getIndividualDeletedMessageRangesCount() > 0) {
List<LongListMap> rangeList = positionInfo.getIndividualDeletedMessageRangesList();
try {
Map<Long, long[]> rangeMap = rangeList.stream().collect(Collectors.toMap(LongListMap::getKey,
list -> list.getValuesList().stream().mapToLong(i -> i).toArray()));
individualDeletedMessages.build(rangeMap);
} catch (Exception e) {
log.warn("[{}]-{} Failed to recover individualDeletedMessages from serialized data", ledger.getName(),
name, e);
}
}
}

private List<LongListMap> buildLongPropertiesMap(Map<Long, long[]> properties) {
if (properties.isEmpty()) {
return Collections.emptyList();
}
List<LongListMap> longListMap = new ArrayList<>();
MutableInt serializedSize = new MutableInt();
properties.forEach((id, ranges) -> {
if (ranges == null || ranges.length <= 0) {
return;
}
org.apache.bookkeeper.mledger.proto.MLDataFormats.LongListMap.Builder lmBuilder = LongListMap.newBuilder()
.setKey(id);
for (long range : ranges) {
lmBuilder.addValues(range);
}
LongListMap lm = lmBuilder.build();
longListMap.add(lm);
serializedSize.add(lm.getSerializedSize());
});
individualDeletedMessagesSerializedSize = serializedSize.toInteger();
return longListMap;
}

private void recoverIndividualDeletedMessages(List<MLDataFormats.MessageRange> individualDeletedMessagesList) {
lock.writeLock().lock();
try {
Expand Down Expand Up @@ -3125,12 +3166,23 @@ private List<MLDataFormats.BatchedEntryDeletionIndexInfo> buildBatchEntryDeletio

void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry mdEntry, final VoidCallback callback) {
Position position = mdEntry.newPosition;
PositionInfo pi = PositionInfo.newBuilder().setLedgerId(position.getLedgerId())
Builder piBuilder = PositionInfo.newBuilder().setLedgerId(position.getLedgerId())
.setEntryId(position.getEntryId())
.addAllIndividualDeletedMessages(buildIndividualDeletedMessageRanges())
.addAllBatchedEntryDeletionIndexInfo(buildBatchEntryDeletionIndexInfoList())
.addAllProperties(buildPropertiesMap(mdEntry.properties)).build();
.addAllProperties(buildPropertiesMap(mdEntry.properties));

Map<Long, long[]> internalRanges = null;
try {
internalRanges = individualDeletedMessages.toRanges(getConfig().getMaxUnackedRangesToPersist());
} catch (Exception e) {
log.warn("[{}]-{} Failed to serialize individualDeletedMessages", ledger.getName(), name, e);
}
if (internalRanges != null && !internalRanges.isEmpty()) {
piBuilder.addAllIndividualDeletedMessageRanges(buildLongPropertiesMap(internalRanges));
} else {
piBuilder.addAllIndividualDeletedMessages(buildIndividualDeletedMessageRanges());
}
PositionInfo pi = piBuilder.build();

if (log.isDebugEnabled()) {
log.debug("[{}] Cursor {} Appending to ledger={} position={}", ledger.getName(), name, lh.getId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.pulsar.common.util.collections.LongPairRangeSet;
import org.apache.pulsar.common.util.collections.OpenLongPairRangeSet;
Expand Down Expand Up @@ -142,6 +143,16 @@ public Range<T> lastRange() {
return rangeSet.lastRange();
}

@Override
public Map<Long, long[]> toRanges(int maxRanges) {
return rangeSet.toRanges(maxRanges);
}

@Override
public void build(Map<Long, long[]> internalRange) {
rangeSet.build(internalRange);
}

@Override
public int cardinality(long lowerKey, long lowerValue, long upperKey, long upperValue) {
return rangeSet.cardinality(lowerKey, lowerValue, upperKey, upperValue);
Expand Down Expand Up @@ -176,4 +187,22 @@ public boolean isDirtyLedgers(long ledgerId) {
public String toString() {
return rangeSet.toString();
}

@Override
public int hashCode() {
return rangeSet.hashCode();
}

@Override
public boolean equals(Object obj) {
if (!(obj instanceof RangeSetWrapper)) {
return false;
}
if (this == obj) {
return true;
}
@SuppressWarnings("rawtypes")
RangeSetWrapper set = (RangeSetWrapper) obj;
return this.rangeSet.equals(set.rangeSet);
}
}
6 changes: 6 additions & 0 deletions managed-ledger/src/main/proto/MLDataFormats.proto
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,19 @@ message PositionInfo {

// Store which index in the batch message has been deleted
repeated BatchedEntryDeletionIndexInfo batchedEntryDeletionIndexInfo = 5;
repeated LongListMap individualDeletedMessageRanges = 6;
}

message NestedPositionInfo {
required int64 ledgerId = 1;
required int64 entryId = 2;
}

message LongListMap {
required int64 key = 1;
repeated int64 values = 2;
}

message MessageRange {
required NestedPositionInfo lowerEndpoint = 1;
required NestedPositionInfo upperEndpoint = 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3223,7 +3223,7 @@ public void testOutOfOrderDeletePersistenceIntoLedgerWithClose() throws Exceptio
managedLedgerConfig.setMaxUnackedRangesToPersistInMetadataStore(10);
ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open(ledgerName, managedLedgerConfig);

ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor(cursorName);
final ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor(cursorName);

List<Position> addedPositions = new ArrayList<>();
for (int i = 0; i < totalAddEntries; i++) {
Expand Down Expand Up @@ -3269,7 +3269,8 @@ public void operationFailed(MetaStoreException e) {
LedgerEntry entry = seq.nextElement();
PositionInfo positionInfo;
positionInfo = PositionInfo.parseFrom(entry.getEntry());
individualDeletedMessagesCount.set(positionInfo.getIndividualDeletedMessagesCount());
c1.recoverIndividualDeletedMessages(positionInfo);
individualDeletedMessagesCount.set(c1.getIndividuallyDeletedMessagesSet().asRanges().size());
} catch (Exception e) {
}
latch.countDown();
Expand All @@ -3286,12 +3287,12 @@ public void operationFailed(MetaStoreException e) {
@Cleanup("shutdown")
ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(metadataStore, bkc);
ledger = (ManagedLedgerImpl) factory2.open(ledgerName, managedLedgerConfig);
c1 = (ManagedCursorImpl) ledger.openCursor("c1");
ManagedCursorImpl reopenCursor = (ManagedCursorImpl) ledger.openCursor("c1");
// verify cursor has been recovered
assertEquals(c1.getNumberOfEntriesInBacklog(false), totalAddEntries / 2);
assertEquals(reopenCursor.getNumberOfEntriesInBacklog(false), totalAddEntries / 2);

// try to read entries which should only read non-deleted positions
List<Entry> entries = c1.readEntries(totalAddEntries);
List<Entry> entries = reopenCursor.readEntries(totalAddEntries);
assertEquals(entries.size(), totalAddEntries / 2);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import io.netty.buffer.ByteBuf;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
Expand All @@ -34,7 +33,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import lombok.Cleanup;

import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeperTestClient;
import org.apache.bookkeeper.client.api.DigestType;
Expand All @@ -53,9 +52,13 @@
import org.apache.bookkeeper.mledger.util.ThrowableToStringUtil;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
import org.apache.pulsar.common.util.collections.LongPairRangeSet;
import org.awaitility.Awaitility;
import org.testng.annotations.Test;

import io.netty.buffer.ByteBuf;
import lombok.Cleanup;

public class ManagedLedgerBkTest extends BookKeeperClusterTestCase {

public ManagedLedgerBkTest() {
Expand Down Expand Up @@ -587,4 +590,44 @@ public void testPeriodicRollover() throws Exception {
Awaitility.await().until(() -> cursorImpl.getCursorLedger() != currentLedgerId);
}

/**
* This test validates that cursor serializes and deserializes individual-ack list from the bk-ledger.
*
* @throws Exception
*/
@Test
public void testUnackmessagesAndRecovery() throws Exception {
ManagedLedgerFactoryConfig factoryConf = new ManagedLedgerFactoryConfig();
factoryConf.setMaxCacheSize(0);

ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc, factoryConf);

ManagedLedgerConfig config = new ManagedLedgerConfig().setEnsembleSize(1).setWriteQuorumSize(1)
.setAckQuorumSize(1).setMetadataEnsembleSize(1).setMetadataWriteQuorumSize(1)
.setMaxUnackedRangesToPersistInMetadataStore(1).setMaxEntriesPerLedger(5).setMetadataAckQuorumSize(1);
ManagedLedger ledger = factory.open("my_test_unack_messages", config);
ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("c1");

int totalEntries = 100;
for (int i = 0; i < totalEntries; i++) {
Position p = ledger.addEntry("entry".getBytes());
if (i % 2 == 0) {
cursor.delete(p);
}
}

LongPairRangeSet<Position> unackMessagesBefore = cursor.getIndividuallyDeletedMessagesSet();

ledger.close();

// open and recover cursor
ledger = factory.open("my_test_unack_messages", config);
cursor = (ManagedCursorImpl) ledger.openCursor("c1");

LongPairRangeSet<Position> unackMessagesAfter = cursor.getIndividuallyDeletedMessagesSet();
assertTrue(unackMessagesBefore.equals(unackMessagesAfter));

ledger.close();
factory.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,21 @@
*/
package org.apache.pulsar.common.util.collections;

import static java.util.BitSet.valueOf;
import static java.util.Objects.requireNonNull;
import com.google.common.collect.BoundType;
import com.google.common.collect.Range;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang.mutable.MutableInt;

/**
Expand Down Expand Up @@ -253,6 +258,42 @@ public Range<T> lastRange() {
return Range.openClosed(consumer.apply(lastSet.getKey(), lower), consumer.apply(lastSet.getKey(), upper));
}

@Override
public Map<Long, long[]> toRanges(int maxRanges) {
Map<Long, long[]> internalBitSetMap = new HashMap<>();
AtomicInteger rangeCount = new AtomicInteger();
rangeBitSetMap.forEach((id, bmap) -> {
if (rangeCount.getAndAdd(bmap.cardinality()) > maxRanges) {
return;
}
internalBitSetMap.put(id, bmap.toLongArray());
});
return internalBitSetMap;
}

@Override
public void build(Map<Long, long[]> internalRange) {
internalRange.forEach((id, ranges) -> rangeBitSetMap.put(id, valueOf(ranges)));
}

@Override
public int hashCode() {
return Objects.hashCode(rangeBitSetMap);
}

@Override
public boolean equals(Object obj) {
if (!(obj instanceof ConcurrentOpenLongPairRangeSet)) {
return false;
}
if (this == obj) {
return true;
}
@SuppressWarnings("rawtypes")
ConcurrentOpenLongPairRangeSet set = (ConcurrentOpenLongPairRangeSet) obj;
return this.rangeBitSetMap.equals(set.rangeBitSetMap);
}

@Override
public int cardinality(long lowerKey, long lowerValue, long upperKey, long upperValue) {
NavigableMap<Long, BitSet> subMap = rangeBitSetMap.subMap(lowerKey, true, upperKey, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import lombok.EqualsAndHashCode;
Expand Down Expand Up @@ -136,6 +137,19 @@ public interface LongPairRangeSet<T extends Comparable<T>> {
*/
Range<T> lastRange();

default Map<Long, long[]> toRanges(int maxRanges) {
throw new UnsupportedOperationException();
}

/**
* Build {@link LongPairRangeSet} using internal ranges returned by {@link #toRanges(int)} .
*
* @param ranges
*/
default void build(Map<Long, long[]> ranges) {
throw new UnsupportedOperationException();
}

/**
* Return the number bit sets to true from lower (inclusive) to upper (inclusive).
*/
Expand Down
Loading

0 comments on commit 1b7bb32

Please sign in to comment.