Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Fix possible mark delete NPE when batch index ack is enabled #23833

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.bookkeeper.mledger.impl;

import javax.annotation.Nullable;

/**
* Interface to manage the ackSet state attached to a position.
* Helpers in {@link AckSetStateUtil} to create positions with
Expand All @@ -28,7 +30,7 @@ public interface AckSetState {
* Get the ackSet bitset information encoded as a long array.
* @return the ackSet
*/
long[] getAckSet();
@Nullable long[] getAckSet();

/**
* Set the ackSet bitset information as a long array.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.bookkeeper.mledger.impl;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.util.Objects.requireNonNull;
import static org.apache.bookkeeper.mledger.ManagedLedgerException.getManagedLedgerException;
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC;
Expand Down Expand Up @@ -61,6 +62,7 @@
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import javax.annotation.Nullable;
import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
import org.apache.bookkeeper.client.BKException;
Expand Down Expand Up @@ -200,7 +202,7 @@ public class ManagedCursorImpl implements ManagedCursor {

// Maintain the deletion status for batch messages
// (ledgerId, entryId) -> deletion indexes
protected final ConcurrentSkipListMap<Position, BitSetRecyclable> batchDeletedIndexes;
@Nullable protected final ConcurrentSkipListMap<Position, BitSetRecyclable> batchDeletedIndexes;
private final ReadWriteLock lock = new ReentrantReadWriteLock();

private RateLimiter markDeleteLimiter;
Expand Down Expand Up @@ -709,6 +711,7 @@ private void recoverIndividualDeletedMessages(List<MLDataFormats.MessageRange> i

private void recoverBatchDeletedIndexes (
List<MLDataFormats.BatchedEntryDeletionIndexInfo> batchDeletedIndexInfoList) {
checkNotNull(batchDeletedIndexes);
lock.writeLock().lock();
try {
this.batchDeletedIndexes.clear();
Expand Down Expand Up @@ -1381,7 +1384,7 @@ public void operationComplete() {
lastMarkDeleteEntry = new MarkDeleteEntry(newMarkDeletePosition, isCompactionCursor()
? getProperties() : Collections.emptyMap(), null, null);
individualDeletedMessages.clear();
if (getConfig().isDeletionAtBatchIndexLevelEnabled()) {
if (batchDeletedIndexes != null) {
batchDeletedIndexes.values().forEach(BitSetRecyclable::recycle);
batchDeletedIndexes.clear();
AckSetStateUtil.maybeGetAckSetState(newReadPosition).ifPresent(ackSetState -> {
Expand Down Expand Up @@ -2017,47 +2020,7 @@ public void asyncMarkDelete(final Position position, Map<String, Long> propertie
log.debug("[{}] Mark delete cursor {} up to position: {}", ledger.getName(), name, position);
}

Position newPosition = position;

Optional<AckSetState> ackSetStateOptional = AckSetStateUtil.maybeGetAckSetState(newPosition);
if (getConfig().isDeletionAtBatchIndexLevelEnabled()) {
if (ackSetStateOptional.isPresent()) {
AtomicReference<BitSetRecyclable> bitSetRecyclable = new AtomicReference<>();
BitSetRecyclable givenBitSet =
BitSetRecyclable.create().resetWords(ackSetStateOptional.map(AckSetState::getAckSet).get());
// In order to prevent the batch index recorded in batchDeletedIndexes from rolling back,
// only update batchDeletedIndexes when the submitted batch index is greater
// than the recorded index.
batchDeletedIndexes.compute(newPosition,
(k, v) -> {
if (v == null) {
return givenBitSet;
}
if (givenBitSet.nextSetBit(0) > v.nextSetBit(0)) {
bitSetRecyclable.set(v);
return givenBitSet;
} else {
bitSetRecyclable.set(givenBitSet);
return v;
}
});
if (bitSetRecyclable.get() != null) {
bitSetRecyclable.get().recycle();
}
newPosition = ledger.getPreviousPosition(newPosition);
}
Map<Position, BitSetRecyclable> subMap = batchDeletedIndexes.subMap(PositionFactory.EARLIEST, newPosition);
subMap.values().forEach(BitSetRecyclable::recycle);
subMap.clear();
} else {
if (ackSetStateOptional.isPresent()) {
AckSetState ackSetState = ackSetStateOptional.get();
if (ackSetState.getAckSet() != null) {
newPosition = ledger.getPreviousPosition(newPosition);
}
}
}

Position newPosition = ackBatchPosition(position);
if (ledger.getLastConfirmedEntry().compareTo(newPosition) < 0) {
boolean shouldCursorMoveForward = false;
try {
Expand Down Expand Up @@ -2103,6 +2066,36 @@ public void asyncMarkDelete(final Position position, Map<String, Long> propertie
internalAsyncMarkDelete(newPosition, properties, callback, ctx);
}

private Position ackBatchPosition(Position position) {
return AckSetStateUtil.maybeGetAckSetState(position)
.map(AckSetState::getAckSet)
.map(ackSet -> {
if (batchDeletedIndexes == null) {
return ledger.getPreviousPosition(position);
}
// In order to prevent the batch index recorded in batchDeletedIndexes from rolling back,
// only update batchDeletedIndexes when the submitted batch index is greater
// than the recorded index.
final var givenBitSet = BitSetRecyclable.create().resetWords(ackSet);
batchDeletedIndexes.compute(position, (k, v) -> {
if (v == null) {
return givenBitSet;
}
if (givenBitSet.nextSetBit(0) > v.nextSetBit(0)) {
v.recycle();
return givenBitSet;
} else {
return v;
}
});
final var newPosition = ledger.getPreviousPosition(position);
batchDeletedIndexes.subMap(PositionFactory.EARLIEST, newPosition).values()
.forEach(BitSetRecyclable::recycle);
return newPosition;
})
.orElse(position);
}

protected void internalAsyncMarkDelete(final Position newPosition, Map<String, Long> properties,
final MarkDeleteCallback callback, final Object ctx) {
ledger.mbean.addMarkDeleteOp();
Expand Down Expand Up @@ -2208,7 +2201,7 @@ public void operationComplete() {
try {
individualDeletedMessages.removeAtMost(mdEntry.newPosition.getLedgerId(),
mdEntry.newPosition.getEntryId());
if (getConfig().isDeletionAtBatchIndexLevelEnabled()) {
if (batchDeletedIndexes != null) {
Map<Position, BitSetRecyclable> subMap = batchDeletedIndexes.subMap(PositionFactory.EARLIEST,
false, PositionFactory.create(mdEntry.newPosition.getLedgerId(),
mdEntry.newPosition.getEntryId()), true);
Expand Down Expand Up @@ -2348,7 +2341,7 @@ public void asyncDelete(Iterable<Position> positions, AsyncCallbacks.DeleteCallb
}

if (internalIsMessageDeleted(position)) {
if (getConfig().isDeletionAtBatchIndexLevelEnabled()) {
if (batchDeletedIndexes != null) {
BitSetRecyclable bitSetRecyclable = batchDeletedIndexes.remove(position);
if (bitSetRecyclable != null) {
bitSetRecyclable.recycle();
Expand All @@ -2361,7 +2354,7 @@ public void asyncDelete(Iterable<Position> positions, AsyncCallbacks.DeleteCallb
}
long[] ackSet = AckSetStateUtil.getAckSetArrayOrNull(position);
if (ackSet == null) {
if (getConfig().isDeletionAtBatchIndexLevelEnabled()) {
if (batchDeletedIndexes != null) {
BitSetRecyclable bitSetRecyclable = batchDeletedIndexes.remove(position);
if (bitSetRecyclable != null) {
bitSetRecyclable.recycle();
Expand All @@ -2378,7 +2371,7 @@ public void asyncDelete(Iterable<Position> positions, AsyncCallbacks.DeleteCallb
log.debug("[{}] [{}] Individually deleted messages: {}", ledger.getName(), name,
individualDeletedMessages);
}
} else if (getConfig().isDeletionAtBatchIndexLevelEnabled()) {
} else if (batchDeletedIndexes != null) {
BitSetRecyclable givenBitSet = BitSetRecyclable.create().resetWords(ackSet);
BitSetRecyclable bitSet = batchDeletedIndexes.computeIfAbsent(position, (v) -> givenBitSet);
if (givenBitSet != bitSet) {
Expand Down Expand Up @@ -3185,7 +3178,7 @@ private List<MLDataFormats.MessageRange> buildIndividualDeletedMessageRanges() {
private List<MLDataFormats.BatchedEntryDeletionIndexInfo> buildBatchEntryDeletionIndexInfoList() {
lock.readLock().lock();
try {
if (!getConfig().isDeletionAtBatchIndexLevelEnabled() || batchDeletedIndexes.isEmpty()) {
if (batchDeletedIndexes == null || batchDeletedIndexes.isEmpty()) {
return Collections.emptyList();
}
MLDataFormats.NestedPositionInfo.Builder nestedPositionBuilder = MLDataFormats.NestedPositionInfo
Expand Down Expand Up @@ -3722,7 +3715,7 @@ private ManagedCursorImpl cursorImpl() {

@Override
public long[] getDeletedBatchIndexesAsLongArray(Position position) {
if (getConfig().isDeletionAtBatchIndexLevelEnabled()) {
if (batchDeletedIndexes != null) {
BitSetRecyclable bitSet = batchDeletedIndexes.get(position);
return bitSet == null ? null : bitSet.toLongArray();
} else {
Expand Down Expand Up @@ -3851,6 +3844,7 @@ public ManagedCursor duplicateNonDurableCursor(String nonDurableCursorName) thro
lock.readLock().unlock();
}
if (batchDeletedIndexes != null) {
checkNotNull(newNonDurableCursor.batchDeletedIndexes);
for (Map.Entry<Position, BitSetRecyclable> entry : this.batchDeletedIndexes.entrySet()) {
BitSetRecyclable copiedBitSet = BitSetRecyclable.valueOf(entry.getValue());
newNonDurableCursor.batchDeletedIndexes.put(entry.getKey(), copiedBitSet);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public class MLPendingAckStoreTest extends TransactionTestBase {
@BeforeClass
@Override
protected void setup() throws Exception {
conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
setUpBase(1, 1, NAMESPACE1 + "/test", 0);
}

Expand Down Expand Up @@ -304,4 +305,4 @@ private LinkedHashSet<Long> calculatePendingAckIndexes(List<Long> positionList,
}
return indexes;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1211,6 +1211,9 @@ public void recycle() {
if (recyclerHandle != null) {
this.clear();
recyclerHandle.recycle(this);
// Reset with null so that recycle() can be called many times but only the 1st time takes effect.
BewareMyPower marked this conversation as resolved.
Show resolved Hide resolved
// It's used when recycle() is called in ConcurrentSkipListMap's callback
recyclerHandle = null;
}
}
}
Loading