Skip to content

Commit

Permalink
[improve][broker] Make read compacted entries support maxReadSizeByte…
Browse files Browse the repository at this point in the history
…s limitation (apache#21065)
  • Loading branch information
coderzc authored Sep 1, 2023
1 parent 0956def commit 835e9b6
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3490,7 +3490,7 @@ public ManagedCursorMXBean getStats() {
return this.mbean;
}

void updateReadStats(int readEntriesCount, long readEntriesSize) {
public void updateReadStats(int readEntriesCount, long readEntriesSize) {
this.entriesReadCount += readEntriesCount;
this.entriesReadSize += readEntriesSize;
}
Expand Down Expand Up @@ -3522,7 +3522,7 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
}, null);
}

private int applyMaxSizeCap(int maxEntries, long maxSizeBytes) {
public int applyMaxSizeCap(int maxEntries, long maxSizeBytes) {
if (maxSizeBytes == NO_MAX_SIZE_LIMIT) {
return maxEntries;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ public interface CompactedTopic {
*/
@Deprecated
void asyncReadEntriesOrWait(ManagedCursor cursor,
int numberOfEntriesToRead,
int maxEntries,
long bytesToRead,
boolean isFirstRead,
ReadEntriesCallback callback,
Consumer consumer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.EntryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer.ReadEntriesCtx;
Expand Down Expand Up @@ -93,7 +94,8 @@ public CompletableFuture<Void> deleteCompactedLedger(long compactedLedgerId) {
@Override
@Deprecated
public void asyncReadEntriesOrWait(ManagedCursor cursor,
int numberOfEntriesToRead,
int maxEntries,
long bytesToRead,
boolean isFirstRead,
ReadEntriesCallback callback, Consumer consumer) {
PositionImpl cursorPosition;
Expand All @@ -110,8 +112,11 @@ public void asyncReadEntriesOrWait(ManagedCursor cursor,

if (currentCompactionHorizon == null
|| currentCompactionHorizon.compareTo(cursorPosition) < 0) {
cursor.asyncReadEntriesOrWait(numberOfEntriesToRead, callback, readEntriesCtx, PositionImpl.LATEST);
cursor.asyncReadEntriesOrWait(maxEntries, bytesToRead, callback, readEntriesCtx, PositionImpl.LATEST);
} else {
ManagedCursorImpl managedCursor = (ManagedCursorImpl) cursor;
int numberOfEntriesToRead = managedCursor.applyMaxSizeCap(maxEntries, bytesToRead);

compactedTopicContext.thenCompose(
(context) -> findStartPoint(cursorPosition, context.ledger.getLastAddConfirmed(), context.cache)
.thenCompose((startPoint) -> {
Expand All @@ -126,6 +131,12 @@ public void asyncReadEntriesOrWait(ManagedCursor cursor,
startPoint + (numberOfEntriesToRead - 1));
return readEntries(context.ledger, startPoint, endPoint)
.thenAccept((entries) -> {
long entriesSize = 0;
for (Entry entry : entries) {
entriesSize += entry.getLength();
}
managedCursor.updateReadStats(entries.size(), entriesSize);

Entry lastEntry = entries.get(entries.size() - 1);
// The compaction task depends on the last snapshot and the incremental
// entries to build the new snapshot. So for the compaction cursor, we
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.pulsar.broker.service.Consumer;
Expand All @@ -40,13 +41,13 @@ public class CompactedTopicUtils {

@Beta
public static void asyncReadCompactedEntries(TopicCompactionService topicCompactionService,
ManagedCursor cursor, int numberOfEntriesToRead,
ManagedCursor cursor, int maxEntries,
long bytesToRead, boolean readFromEarliest,
AsyncCallbacks.ReadEntriesCallback callback,
boolean wait, @Nullable Consumer consumer) {
Objects.requireNonNull(topicCompactionService);
Objects.requireNonNull(cursor);
checkArgument(numberOfEntriesToRead > 0);
checkArgument(maxEntries > 0);
Objects.requireNonNull(callback);

final PositionImpl readPosition;
Expand All @@ -67,15 +68,18 @@ public static void asyncReadCompactedEntries(TopicCompactionService topicCompact
|| readPosition.compareTo(
lastCompactedPosition.getLedgerId(), lastCompactedPosition.getEntryId()) > 0) {
if (wait) {
cursor.asyncReadEntriesOrWait(numberOfEntriesToRead, bytesToRead, callback, readEntriesCtx,
cursor.asyncReadEntriesOrWait(maxEntries, bytesToRead, callback, readEntriesCtx,
PositionImpl.LATEST);
} else {
cursor.asyncReadEntries(numberOfEntriesToRead, bytesToRead, callback, readEntriesCtx,
cursor.asyncReadEntries(maxEntries, bytesToRead, callback, readEntriesCtx,
PositionImpl.LATEST);
}
return CompletableFuture.completedFuture(null);
}

ManagedCursorImpl managedCursor = (ManagedCursorImpl) cursor;
int numberOfEntriesToRead = managedCursor.applyMaxSizeCap(maxEntries, bytesToRead);

return topicCompactionService.readCompactedEntries(readPosition, numberOfEntriesToRead)
.thenAccept(entries -> {
if (CollectionUtils.isEmpty(entries)) {
Expand All @@ -88,6 +92,12 @@ public static void asyncReadCompactedEntries(TopicCompactionService topicCompact
return;
}

long entriesSize = 0;
for (Entry entry : entries) {
entriesSize += entry.getLength();
}
managedCursor.updateReadStats(entries.size(), entriesSize);

Entry lastEntry = entries.get(entries.size() - 1);
cursor.seek(lastEntry.getPosition().getNext(), true);
callback.readEntriesComplete(entries, readEntriesCtx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import java.util.concurrent.atomic.AtomicReference;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.mockito.Mockito;
import org.testng.Assert;
Expand All @@ -46,8 +46,9 @@ public void testReadCompactedEntriesWithEmptyEntries() throws ExecutionException

PositionImpl initPosition = PositionImpl.get(1, 90);
AtomicReference<PositionImpl> readPositionRef = new AtomicReference<>(initPosition.getNext());
ManagedCursor cursor = Mockito.mock(ManagedCursor.class);
ManagedCursorImpl cursor = Mockito.mock(ManagedCursorImpl.class);
Mockito.doReturn(readPositionRef.get()).when(cursor).getReadPosition();
Mockito.doReturn(1).when(cursor).applyMaxSizeCap(Mockito.anyInt(), Mockito.anyLong());
Mockito.doAnswer(invocation -> {
readPositionRef.set(invocation.getArgument(0));
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
import org.apache.bookkeeper.mledger.Position;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
Expand Down Expand Up @@ -90,6 +91,7 @@
import org.apache.pulsar.common.protocol.Markers;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
Expand Down Expand Up @@ -1877,4 +1879,51 @@ public void testReceiverQueueSize() throws Exception {
consumer.close();
producer.close();
}

@Test
public void testDispatcherMaxReadSizeBytes() throws Exception {
final String topicName =
"persistent://my-property/use/my-ns/testDispatcherMaxReadSizeBytes" + UUID.randomUUID();
final String subName = "my-sub";
final int receiveQueueSize = 1;
@Cleanup
PulsarClient client = newPulsarClient(lookupUrl.toString(), 100);
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.topic(topicName).create();

for (int i = 0; i < 10; i+=2) {
producer.newMessage().key(null).value(new byte[4*1024*1024]).send();
}
producer.flush();

admin.topics().triggerCompaction(topicName);

Awaitility.await().untilAsserted(() -> {
assertEquals(admin.topics().compactionStatus(topicName).status,
LongRunningProcessStatus.Status.SUCCESS);
});

admin.topics().unload(topicName);

ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) client.newConsumer(Schema.BYTES)
.topic(topicName).readCompacted(true).receiverQueueSize(receiveQueueSize).subscriptionName(subName)
.subscribe();


PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
TopicCompactionService topicCompactionService = Mockito.spy(topic.getTopicCompactionService());
FieldUtils.writeDeclaredField(topic, "topicCompactionService", topicCompactionService, true);

Awaitility.await().untilAsserted(() -> {
assertEquals(consumer.getStats().getMsgNumInReceiverQueue(),
1);
});

consumer.increaseAvailablePermits(2);

Mockito.verify(topicCompactionService, Mockito.times(1)).readCompactedEntries(Mockito.any(), Mockito.same(1));

consumer.close();
producer.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -148,5 +148,24 @@ public void testNumericOrderCompaction() throws Exception {
Assert.assertEquals(tableView.entrySet(), expectedCopy.entrySet());
}

@Override
public void testCompactCompressedBatching() throws Exception {
compactor = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler, 10);
super.testCompactCompressedBatching();
compactor = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler, 1);
}

@Override
public void testCompactEncryptedAndCompressedBatching() throws Exception {
compactor = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler, 10);
super.testCompactEncryptedAndCompressedBatching();
compactor = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler, 1);
}

@Override
public void testCompactEncryptedBatching() throws Exception {
compactor = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler, 10);
super.testCompactEncryptedBatching();
compactor = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler, 1);
}
}

0 comments on commit 835e9b6

Please sign in to comment.