Skip to content

Commit

Permalink
[ISSUE #8764] Implement consume lag estimation in cq rocksdb store (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
LetLetMe authored Oct 15, 2024
1 parent 2355a5f commit 2fa7513
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -168,12 +168,6 @@ public void run() {
}
}

@Override
public long estimateMessageCount(String topic, int queueId, long from, long to, MessageFilter filter) {
// todo
return 0;
}

@Override
public void initMetrics(Meter meter, Supplier<AttributesBuilder> attributesBuilderSupplier) {
DefaultStoreMetricsManager.init(meter, attributesBuilderSupplier, this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,10 +222,47 @@ public void increaseQueueOffset(QueueOffsetOperator queueOffsetOperator, Message

@Override
public long estimateMessageCount(long from, long to, MessageFilter filter) {
// todo
return 0;
// Check from and to offset validity
Pair<CqUnit, Long> fromUnit = getCqUnitAndStoreTime(from);
if (fromUnit == null) {
return -1;
}

if (from >= to) {
return -1;
}

if (to > getMaxOffsetInQueue()) {
to = getMaxOffsetInQueue();
}

int maxSampleSize = messageStore.getMessageStoreConfig().getMaxConsumeQueueScan();
int sampleSize = to - from > maxSampleSize ? maxSampleSize : (int) (to - from);

int matchThreshold = messageStore.getMessageStoreConfig().getSampleCountThreshold();
int matchSize = 0;

for (int i = 0; i < sampleSize; i++) {
long index = from + i;
Pair<CqUnit, Long> pair = getCqUnitAndStoreTime(index);
if (pair == null) {
continue;
}
CqUnit cqUnit = pair.getObject1();
if (filter.isMatchedByConsumeQueue(cqUnit.getTagsCode(), cqUnit.getCqExtUnit())) {
matchSize++;
// if matchSize is plenty, early exit estimate
if (matchSize > matchThreshold) {
sampleSize = i;
break;
}
}
}
// Make sure the second half is a floating point number, otherwise it will be truncated to 0
return sampleSize == 0 ? 0 : (long) ((to - from) * (matchSize / (sampleSize * 1.0)));
}


@Override
public long getMinOffsetInQueue() {
return this.messageStore.getMinOffsetInQueue(this.topic, this.queueId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.attribute.CQType;
import org.apache.rocketmq.common.message.MessageDecoder;
Expand All @@ -31,6 +32,7 @@
import org.apache.rocketmq.store.DispatchRequest;
import org.apache.rocketmq.store.MessageFilter;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.RocksDBMessageStore;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.junit.Assert;
Expand Down Expand Up @@ -84,7 +86,26 @@ messageStoreConfig, new BrokerStatsManager(brokerConfig),
return master;
}

protected void putMsg(DefaultMessageStore messageStore) throws Exception {
protected RocksDBMessageStore genRocksdbMessageStore() throws Exception {
MessageStoreConfig messageStoreConfig = buildStoreConfig(
COMMIT_LOG_FILE_SIZE, CQ_FILE_SIZE, true, CQ_EXT_FILE_SIZE
);

BrokerConfig brokerConfig = new BrokerConfig();

RocksDBMessageStore master = new RocksDBMessageStore(
messageStoreConfig, new BrokerStatsManager(brokerConfig),
(topic, queueId, logicOffset, tagsCode, msgStoreTime, filterBitMap, properties) -> {
}, brokerConfig, new ConcurrentHashMap<>());

assertThat(master.load()).isTrue();

master.start();

return master;
}

protected void putMsg(MessageStore messageStore) {
int totalMsgs = 200;
for (int i = 0; i < totalMsgs; i++) {
MessageExtBrokerInner message = buildMessage();
Expand Down Expand Up @@ -184,9 +205,33 @@ public void testIterator() throws Exception {

@Test
public void testEstimateMessageCountInEmptyConsumeQueue() {
DefaultMessageStore master = null;
DefaultMessageStore messageStore = null;
try {
messageStore = gen();
doTestEstimateMessageCountInEmptyConsumeQueue(messageStore);
} catch (Exception e) {
e.printStackTrace();
assertThat(Boolean.FALSE).isTrue();
}
}

@Test
public void testEstimateRocksdbMessageCountInEmptyConsumeQueue() {
if (notExecuted()) {
return;
}
DefaultMessageStore messageStore = null;
try {
messageStore = genRocksdbMessageStore();
doTestEstimateMessageCountInEmptyConsumeQueue(messageStore);
} catch (Exception e) {
e.printStackTrace();
assertThat(Boolean.FALSE).isTrue();
}
}

public void doTestEstimateMessageCountInEmptyConsumeQueue(MessageStore master) {
try {
master = gen();
ConsumeQueueInterface consumeQueue = master.findConsumeQueue(TOPIC, QUEUE_ID);
MessageFilter filter = new MessageFilter() {
@Override
Expand Down Expand Up @@ -219,16 +264,34 @@ public boolean isMatchedByCommitLog(ByteBuffer msgBuffer, Map<String, String> pr
}
}

@Test
public void testEstimateRocksdbMessageCount() {
if (notExecuted()) {
return;
}
DefaultMessageStore messageStore = null;
try {
messageStore = genRocksdbMessageStore();
doTestEstimateMessageCount(messageStore);
} catch (Exception e) {
e.printStackTrace();
assertThat(Boolean.FALSE).isTrue();
}
}

@Test
public void testEstimateMessageCount() {
DefaultMessageStore messageStore = null;
try {
messageStore = gen();
doTestEstimateMessageCount(messageStore);
} catch (Exception e) {
e.printStackTrace();
assertThat(Boolean.FALSE).isTrue();
}
}

public void doTestEstimateMessageCount(MessageStore messageStore) {
try {
try {
putMsg(messageStore);
Expand Down Expand Up @@ -265,15 +328,34 @@ public boolean isMatchedByCommitLog(ByteBuffer msgBuffer, Map<String, String> pr
}
}

@Test
public void testEstimateRocksdbMessageCountSample() {
if (notExecuted()) {
return;
}
DefaultMessageStore messageStore = null;
try {
messageStore = genRocksdbMessageStore();
doTestEstimateMessageCountSample(messageStore);
} catch (Exception e) {
e.printStackTrace();
assertThat(Boolean.FALSE).isTrue();
}
}

@Test
public void testEstimateMessageCountSample() {
DefaultMessageStore messageStore = null;
try {
messageStore = gen();
doTestEstimateMessageCountSample(messageStore);
} catch (Exception e) {
e.printStackTrace();
assertThat(Boolean.FALSE).isTrue();
}
}

public void doTestEstimateMessageCountSample(MessageStore messageStore) {

try {
try {
Expand Down Expand Up @@ -303,4 +385,8 @@ public boolean isMatchedByCommitLog(ByteBuffer msgBuffer, Map<String, String> pr
UtilAll.deleteFile(new File(STORE_PATH));
}
}

private boolean notExecuted() {
return MixAll.isMac();
}
}

0 comments on commit 2fa7513

Please sign in to comment.