diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java index 8824b1cb6fa..60f752e2264 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java @@ -102,7 +102,10 @@ public class DbLedgerStorage implements LedgerStorage { (long) (0.25 * PlatformDependent.estimateMaxDirectMemory()) / MB; static final String READ_AHEAD_CACHE_BATCH_SIZE = "dbStorage_readAheadCacheBatchSize"; + static final String READ_AHEAD_CACHE_BATCH_BYTES_SIZE = "dbStorage_readAheadCacheBatchBytesSize"; private static final int DEFAULT_READ_AHEAD_CACHE_BATCH_SIZE = 100; + // the default value is -1. this feature(limit of read ahead bytes) is disabled + private static final int DEFAULT_READ_AHEAD_CACHE_BATCH_BYTES_SIZE = -1; private static final long DEFAULT_DIRECT_IO_TOTAL_WRITEBUFFER_SIZE_MB = (long) (0.125 * PlatformDependent.estimateMaxDirectMemory()) @@ -171,6 +174,8 @@ public void initialize(ServerConfiguration conf, LedgerManager ledgerManager, Le long perDirectoryWriteCacheSize = writeCacheMaxSize / numberOfDirs; long perDirectoryReadCacheSize = readCacheMaxSize / numberOfDirs; int readAheadCacheBatchSize = conf.getInt(READ_AHEAD_CACHE_BATCH_SIZE, DEFAULT_READ_AHEAD_CACHE_BATCH_SIZE); + long readAheadCacheBatchBytesSize = conf.getInt(READ_AHEAD_CACHE_BATCH_BYTES_SIZE, + DEFAULT_READ_AHEAD_CACHE_BATCH_BYTES_SIZE); ledgerStorageList = Lists.newArrayList(); for (int i = 0; i < ledgerDirsManager.getAllLedgerDirs().size(); i++) { @@ -237,7 +242,7 @@ public void initialize(ServerConfiguration conf, LedgerManager ledgerManager, Le idm, entrylogger, statsLogger, perDirectoryWriteCacheSize, perDirectoryReadCacheSize, - readAheadCacheBatchSize)); + readAheadCacheBatchSize, readAheadCacheBatchBytesSize)); ldm.getListeners().forEach(ledgerDirsManager::addLedgerDirsListener); if (!lDirs[0].getPath().equals(iDirs[0].getPath())) { idm.getListeners().forEach(indexDirsManager::addLedgerDirsListener); @@ -276,11 +281,11 @@ public Long getSample() { protected SingleDirectoryDbLedgerStorage newSingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerManager ledgerManager, LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager, EntryLogger entryLogger, StatsLogger statsLogger, long writeCacheSize, long readCacheSize, - int readAheadCacheBatchSize) + int readAheadCacheBatchSize, long readAheadCacheBatchBytesSize) throws IOException { return new SingleDirectoryDbLedgerStorage(conf, ledgerManager, ledgerDirsManager, indexDirsManager, entryLogger, statsLogger, allocator, writeCacheSize, readCacheSize, - readAheadCacheBatchSize); + readAheadCacheBatchSize, readAheadCacheBatchBytesSize); } @Override diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java index 7f004c20e5a..61aebd8e1a1 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java @@ -137,6 +137,7 @@ protected Thread newThread(Runnable r, String name) { private final long writeCacheMaxSize; private final long readCacheMaxSize; private final int readAheadCacheBatchSize; + private final long readAheadCacheBatchBytesSize; private final long maxThrottleTimeNanos; @@ -152,7 +153,8 @@ protected Thread newThread(Runnable r, String name) { public SingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerManager ledgerManager, LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager, EntryLogger entryLogger, StatsLogger statsLogger, ByteBufAllocator allocator, - long writeCacheSize, long readCacheSize, int readAheadCacheBatchSize) + long writeCacheSize, long readCacheSize, int readAheadCacheBatchSize, + long readAheadCacheBatchBytesSize) throws IOException { checkArgument(ledgerDirsManager.getAllLedgerDirs().size() == 1, "Db implementation only allows for one storage dir"); @@ -182,6 +184,7 @@ public SingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerManager le readCacheMaxSize = readCacheSize; this.readAheadCacheBatchSize = readAheadCacheBatchSize; + this.readAheadCacheBatchBytesSize = readAheadCacheBatchBytesSize; // Do not attempt to perform read-ahead more than half the total size of the cache maxReadAheadBytesSize = readCacheMaxSize / 2; @@ -663,9 +666,7 @@ private void fillReadAheadCache(long orginalLedgerId, long firstEntryId, long fi long currentEntryLogId = firstEntryLogId; long currentEntryLocation = firstEntryLocation; - while (count < readAheadCacheBatchSize - && size < maxReadAheadBytesSize - && currentEntryLogId == firstEntryLogId) { + while (chargeReadAheadCache(count, size) && currentEntryLogId == firstEntryLogId) { ByteBuf entry = entryLogger.readEntry(orginalLedgerId, firstEntryId, currentEntryLocation); @@ -703,6 +704,17 @@ private void fillReadAheadCache(long orginalLedgerId, long firstEntryId, long fi } } + protected boolean chargeReadAheadCache(int currentReadAheadCount, long currentReadAheadBytes) { + // compatible with old logic + boolean chargeSizeCondition = currentReadAheadCount < readAheadCacheBatchSize + && currentReadAheadBytes < maxReadAheadBytesSize; + if (chargeSizeCondition && readAheadCacheBatchBytesSize > 0) { + // exact limits limit the size and count for each batch + chargeSizeCondition = currentReadAheadBytes < readAheadCacheBatchBytesSize; + } + return chargeSizeCondition; + } + public ByteBuf getLastEntry(long ledgerId) throws IOException, BookieException { throwIfLimbo(ledgerId); @@ -1278,4 +1290,9 @@ public void clearStorageStateFlag(StorageState flag) throws IOException { } } } + + @VisibleForTesting + DbLedgerStorageStats getDbLedgerStorageStats() { + return dbLedgerStorageStats; + } } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestBookieImpl.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestBookieImpl.java index a6bb99174e1..cd0e967b61c 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestBookieImpl.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestBookieImpl.java @@ -29,6 +29,7 @@ import org.apache.bookkeeper.meta.NullMetadataBookieDriver; import org.apache.bookkeeper.proto.SimpleBookieServiceInfoProvider; import org.apache.bookkeeper.stats.NullStatsLogger; +import org.apache.bookkeeper.stats.StatsLogger; import org.apache.bookkeeper.util.DiskChecker; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,6 +46,19 @@ public TestBookieImpl(ServerConfiguration conf) throws Exception { this(new ResourceBuilder(conf).build()); } + public TestBookieImpl(Resources resources, StatsLogger statsLogger) throws Exception { + super(resources.conf, + resources.registrationManager, + resources.storage, + resources.diskChecker, + resources.ledgerDirsManager, + resources.indexDirsManager, + statsLogger, + UnpooledByteBufAllocator.DEFAULT, + new SimpleBookieServiceInfoProvider(resources.conf)); + this.resources = resources; + } + public TestBookieImpl(Resources resources) throws Exception { super(resources.conf, resources.registrationManager, @@ -157,12 +171,16 @@ public ResourceBuilder withRegistrationManager(RegistrationManager registrationM return this; } - Resources build() throws Exception { + public Resources build() throws Exception { + return build(NullStatsLogger.INSTANCE); + } + + public Resources build(StatsLogger statsLogger) throws Exception { if (metadataBookieDriver == null) { if (conf.getMetadataServiceUri() == null) { metadataBookieDriver = new NullMetadataBookieDriver(); } else { - metadataBookieDriver = BookieResources.createMetadataDriver(conf, NullStatsLogger.INSTANCE); + metadataBookieDriver = BookieResources.createMetadataDriver(conf, statsLogger); } } if (registrationManager == null) { @@ -173,13 +191,13 @@ Resources build() throws Exception { DiskChecker diskChecker = BookieResources.createDiskChecker(conf); LedgerDirsManager ledgerDirsManager = BookieResources.createLedgerDirsManager( - conf, diskChecker, NullStatsLogger.INSTANCE); + conf, diskChecker, statsLogger); LedgerDirsManager indexDirsManager = BookieResources.createIndexDirsManager( - conf, diskChecker, NullStatsLogger.INSTANCE, ledgerDirsManager); + conf, diskChecker, statsLogger, ledgerDirsManager); LedgerStorage storage = BookieResources.createLedgerStorage( conf, ledgerManager, ledgerDirsManager, indexDirsManager, - NullStatsLogger.INSTANCE, UnpooledByteBufAllocator.DEFAULT); + statsLogger, UnpooledByteBufAllocator.DEFAULT); return new Resources(conf, metadataBookieDriver, diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageReadCacheTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageReadCacheTest.java new file mode 100644 index 00000000000..81ef7f9495c --- /dev/null +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageReadCacheTest.java @@ -0,0 +1,368 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.bookie.storage.ldb; + +import static org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage.READ_AHEAD_CACHE_BATCH_BYTES_SIZE; +import static org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage.READ_AHEAD_CACHE_BATCH_SIZE; +import static org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage.READ_AHEAD_CACHE_MAX_SIZE_MB; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import java.io.File; +import java.util.List; +import org.apache.bookkeeper.bookie.BookieImpl; +import org.apache.bookkeeper.bookie.DefaultEntryLogger; +import org.apache.bookkeeper.bookie.TestBookieImpl; +import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.bookkeeper.conf.TestBKConfiguration; +import org.apache.bookkeeper.test.TestStatsProvider; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Unit test for {@link DbLedgerStorage}. + */ +public class DbLedgerStorageReadCacheTest { + private static final Logger LOGGER = LoggerFactory.getLogger(DbLedgerStorageReadCacheTest.class); + + @Test + public void chargeReadAheadCacheRegressionTest() { + TestDB testDB = new TestDB(); + try { + long readAheadCacheMaxSizeMb = 16L; + int readAheadCacheBatchSize = 1024; + long readAheadCacheBatchBytesSize = -1; + setup(testDB, readAheadCacheMaxSizeMb, readAheadCacheBatchSize, readAheadCacheBatchBytesSize); + SingleDirectoryDbLedgerStorage sdb = testDB.getStorage().getLedgerStorageList().get(0); + /** + * case1: currentReadAheadCount < readAheadCacheBatchSize + * currentReadAheadBytes < maxReadAheadBytesSize + * result: true + */ + int currentReadAheadCount = 1; + long currentReadAheadBytes = 1; + assertTrue(sdb.chargeReadAheadCache(currentReadAheadCount, currentReadAheadBytes)); + + /** + * case2: currentReadAheadCount > readAheadCacheBatchSize + * currentReadAheadBytes < maxReadAheadBytesSize + * result: false + */ + currentReadAheadCount = readAheadCacheBatchSize + 1; + currentReadAheadBytes = 1; + assertFalse(sdb.chargeReadAheadCache(currentReadAheadCount, currentReadAheadBytes)); + + /** + * case3: currentReadAheadCount < readAheadCacheBatchSize + * currentReadAheadBytes > maxReadAheadBytesSize + * result: false + */ + currentReadAheadCount = 1; + currentReadAheadBytes = readAheadCacheMaxSizeMb / 2 * 1024 * 1024 + 1; + assertFalse(sdb.chargeReadAheadCache(currentReadAheadCount, currentReadAheadBytes)); + } catch (Throwable e) { + LOGGER.error("readAheadCacheBatchSizeUnitTest run error", e); + } finally { + teardown(testDB.getStorage(), testDB.getTmpDir()); + } + } + + @Test + public void chargeReadAheadCacheUnitTest() { + TestDB testDB = new TestDB(); + try { + long readAheadCacheMaxSizeMb = 16L; + int readAheadCacheBatchSize = 1024; + long readAheadCacheBatchBytesSize = 2 * 1024 * 1024; + setup(testDB, readAheadCacheMaxSizeMb, readAheadCacheBatchSize, readAheadCacheBatchBytesSize); + SingleDirectoryDbLedgerStorage sdb = testDB.getStorage().getLedgerStorageList().get(0); + /** + * case1: currentReadAheadCount < readAheadCacheBatchSize + * currentReadAheadBytes < readAheadCacheBatchBytesSize + * currentReadAheadBytes < readCacheMaxSize + * result: true + */ + int currentReadAheadCount = 1; + long currentReadAheadBytes = 1; + assertTrue(sdb.chargeReadAheadCache(currentReadAheadCount, currentReadAheadBytes)); + + /** + * case2: currentReadAheadCount > readAheadCacheBatchSize + * currentReadAheadBytes < readAheadCacheBatchBytesSize + * currentReadAheadBytes < readCacheMaxSize + * result: false + */ + currentReadAheadCount = readAheadCacheBatchSize + 1; + currentReadAheadBytes = 1; + assertFalse(sdb.chargeReadAheadCache(currentReadAheadCount, currentReadAheadBytes)); + + /** + * case3: currentReadAheadCount < readAheadCacheBatchSize + * currentReadAheadBytes > readAheadCacheBatchBytesSize + * currentReadAheadBytes < readCacheMaxSize + * result: false + */ + currentReadAheadCount = 1; + currentReadAheadBytes = readAheadCacheBatchBytesSize + 1; + assertFalse(sdb.chargeReadAheadCache(currentReadAheadCount, currentReadAheadBytes)); + } catch (Throwable e) { + LOGGER.error("readAheadCacheBatchSizeUnitTest run error", e); + } finally { + teardown(testDB.getStorage(), testDB.getTmpDir()); + } + } + + @Test + public void compareDiffReadAheadPerfTest() { + /** + * case1(read ahead cache by limit bytes size): + * config: readAheadCacheMaxSizeMb = 2 * 8; + * readAheadCacheBatchSize = 1024; + * readAheadCacheBatchBytesSize = 2 * 1024 * 1024; + * case content: + * LedgerId:0, read 1024 pieces of entry,each piece of entry is 10KB + * LedgerId:1, read 1024 pieces of entry,each piece of entry is 10KB + * LedgerId:2, read 1024 pieces of entry,each piece of entry is 10KB + * LedgerId:3, read 1024 pieces of entry,each piece of entry is 10KB + */ + CacheResult cacheBatchBytesSizeResult = readAheadCacheBatchBytesSize(); + + /** + * case2(read ahead cache by limit count): + * config: readAheadCacheMaxSizeMb = 2 * 8; + * readAheadCacheBatchSize = 1024; + * case content: + * LedgerId:0, read 1024 pieces of entry,each piece of entry is 10KB + * LedgerId:1, read 1024 pieces of entry,each piece of entry is 10KB + * LedgerId:2, read 1024 pieces of entry,each piece of entry is 10KB + * LedgerId:3, read 1024 pieces of entry,each piece of entry is 10KB + */ + CacheResult cacheBatchSizeResult = readAheadCacheBatchSize(); + + /** + * result: case1(read ahead cache by limit bytes size) get less cachemiss, + * it is suitable for large messages, reduce the pollution of readAhead large messages to readCache + */ + assertEquals(8, cacheBatchBytesSizeResult.getCacheMissCount()); + assertEquals(132, cacheBatchSizeResult.getCacheMissCount()); + assertTrue(cacheBatchBytesSizeResult.getCacheMissCount() < cacheBatchSizeResult.getCacheMissCount()); + assertEquals( + cacheBatchBytesSizeResult.getCacheMissCount() + cacheBatchBytesSizeResult.getCacheHitCount(), + cacheBatchSizeResult.getCacheMissCount() + cacheBatchSizeResult.getCacheHitCount()); + } + + public void setup(TestDB testDB, long readAheadCacheMaxSizeMb, + int readAheadCacheBatchSize, long readAheadCacheBatchBytesSize) throws Exception { + File tmpDir = File.createTempFile("bkTest", ".dir"); + tmpDir.delete(); + tmpDir.mkdir(); + File curDir = BookieImpl.getCurrentDirectory(tmpDir); + BookieImpl.checkDirectoryStructure(curDir); + + int gcWaitTime = 1000; + ServerConfiguration conf = TestBKConfiguration.newServerConfiguration(); + conf.setGcWaitTime(gcWaitTime); + conf.setLedgerStorageClass(DbLedgerStorage.class.getName()); + conf.setLedgerDirNames(new String[]{tmpDir.toString()}); + if (readAheadCacheMaxSizeMb > 0) { + conf.setProperty(READ_AHEAD_CACHE_MAX_SIZE_MB, readAheadCacheMaxSizeMb); + } + if (readAheadCacheBatchSize > 0) { + conf.setProperty(READ_AHEAD_CACHE_BATCH_SIZE, readAheadCacheBatchSize); + } + if (readAheadCacheBatchBytesSize > 0) { + conf.setProperty(READ_AHEAD_CACHE_BATCH_BYTES_SIZE, readAheadCacheBatchBytesSize); + } + TestStatsProvider.TestStatsLogger statsLogger = new TestStatsProvider().getStatsLogger("test"); + BookieImpl bookie = new TestBookieImpl(new TestBookieImpl.ResourceBuilder(conf).build(statsLogger), + statsLogger); + + DbLedgerStorage storage = (DbLedgerStorage) bookie.getLedgerStorage(); + + storage.getLedgerStorageList().forEach(singleDirectoryDbLedgerStorage -> { + assertTrue(singleDirectoryDbLedgerStorage.getEntryLogger() instanceof DefaultEntryLogger); + }); + testDB.setStorage(storage); + testDB.setTmpDir(tmpDir); + } + + public void teardown(DbLedgerStorage storage, File tmpDir) { + if (storage != null) { + try { + storage.shutdown(); + } catch (InterruptedException e) { + LOGGER.error("storage.shutdown has error", e); + } + } + if (tmpDir != null) { + tmpDir.delete(); + } + } + + private void addEntries(DbLedgerStorage storage, long minLedgerId, long maxLedgerId, + long minEntryId, long maxEntryId) throws Exception { + // Add entries + for (long lid = minLedgerId; lid < maxLedgerId; lid++) { + long lac = 0; + for (long eid = minEntryId; eid < maxEntryId; eid++) { + ByteBuf entry = Unpooled.buffer(1024); + entry.writeLong(lid); // ledger id + entry.writeLong(eid); // entry id + entry.writeLong(lac); // lac + entry.writeBytes((get4KbMsg()).getBytes()); + assertEquals(eid, storage.addEntry(entry)); + lac++; + } + } + } + + private String get4KbMsg() { + StringBuffer buffer = new StringBuffer(); + for (int i = 0; i < 1024; i++) { + buffer.append("1234"); + } + assertEquals(4 * 1024, buffer.toString().length()); + return buffer.toString(); + } + + private CacheResult readAheadCacheBatchBytesSize() { + Long cacheMissCount; + TestDB testDB = new TestDB(); + try { + long readAheadCacheMaxSizeMb = 2 * 8L; + int readAheadCacheBatchSize = 1024; + long readAheadCacheBatchBytesSize = 2 * 1024 * 1024; + long minEntryId = 0; + long maxEntryId = 1024; + + setup(testDB, readAheadCacheMaxSizeMb, readAheadCacheBatchSize, readAheadCacheBatchBytesSize); + addEntries(testDB.getStorage(), 0, 4, minEntryId, maxEntryId); + + testDB.getStorage().flush(); + assertEquals(false, testDB.getStorage().isFlushRequired()); + // Read from db + for (long eid = minEntryId; eid < maxEntryId / 2; eid++) { + testDB.getStorage().getEntry(0, eid); + testDB.getStorage().getEntry(1, eid); + testDB.getStorage().getEntry(2, eid); + testDB.getStorage().getEntry(3, eid); + } + List ledgerStorageList = testDB.getStorage().getLedgerStorageList(); + DbLedgerStorageStats ledgerStats = ledgerStorageList.get(0).getDbLedgerStorageStats(); + cacheMissCount = ledgerStats.getReadCacheMissCounter().get(); + Long cacheHitCount = ledgerStats.getReadCacheHitCounter().get(); + LOGGER.info("simple1.cacheMissCount={},cacheHitCount={}", cacheMissCount, cacheHitCount); + return new CacheResult(cacheMissCount, cacheHitCount); + } catch (Throwable e) { + LOGGER.error("test case run error", e); + return new CacheResult(0, 0); + } finally { + teardown(testDB.getStorage(), testDB.getTmpDir()); + } + } + + public CacheResult readAheadCacheBatchSize() { + Long cacheMissCount; + TestDB testDB = new TestDB(); + try { + long readAheadCacheMaxSizeMb = 2 * 8L; + int readAheadCacheBatchSize = 1024; + long readAheadCacheBatchBytesSize = -1; + long minEntryId = 0; + long maxEntryId = 1024; + + setup(testDB, readAheadCacheMaxSizeMb, readAheadCacheBatchSize, readAheadCacheBatchBytesSize); + addEntries(testDB.getStorage(), 0, 4, minEntryId, maxEntryId); + + testDB.getStorage().flush(); + assertEquals(false, testDB.getStorage().isFlushRequired()); + // Read from db + for (long eid = minEntryId; eid < maxEntryId / 2; eid++) { + testDB.getStorage().getEntry(0, eid); + testDB.getStorage().getEntry(1, eid); + testDB.getStorage().getEntry(2, eid); + testDB.getStorage().getEntry(3, eid); + } + List ledgerStorageList = testDB.getStorage().getLedgerStorageList(); + DbLedgerStorageStats ledgerStats = ledgerStorageList.get(0).getDbLedgerStorageStats(); + cacheMissCount = ledgerStats.getReadCacheMissCounter().get(); + Long cacheHitCount = ledgerStats.getReadCacheHitCounter().get(); + LOGGER.info("simple2.cacheMissCount={},cacheHitCount={}", cacheMissCount, cacheHitCount); + return new CacheResult(cacheMissCount, cacheHitCount); + } catch (Throwable e) { + LOGGER.error("test case run error", e); + return new CacheResult(0, 0); + } finally { + teardown(testDB.getStorage(), testDB.getTmpDir()); + } + } + + private class TestDB { + private DbLedgerStorage storage; + private File tmpDir; + + public DbLedgerStorage getStorage() { + return storage; + } + + public void setStorage(DbLedgerStorage storage) { + this.storage = storage; + } + + public File getTmpDir() { + return tmpDir; + } + + public void setTmpDir(File tmpDir) { + this.tmpDir = tmpDir; + } + } + + private class CacheResult { + private long cacheMissCount; + private long cacheHitCount; + + private CacheResult(long cacheMissCount, long cacheHitCount) { + this.cacheMissCount = cacheMissCount; + this.cacheHitCount = cacheHitCount; + } + + public long getCacheMissCount() { + return cacheMissCount; + } + + public void setCacheMissCount(long cacheMissCount) { + this.cacheMissCount = cacheMissCount; + } + + public long getCacheHitCount() { + return cacheHitCount; + } + + public void setCacheHitCount(long cacheHitCount) { + this.cacheHitCount = cacheHitCount; + } + } +} \ No newline at end of file diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWriteCacheTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWriteCacheTest.java index a9bee08cb26..102f7f5addc 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWriteCacheTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWriteCacheTest.java @@ -56,11 +56,11 @@ private static class MockedDbLedgerStorage extends DbLedgerStorage { protected SingleDirectoryDbLedgerStorage newSingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerManager ledgerManager, LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager, EntryLogger entryLogger, StatsLogger statsLogger, - long writeCacheSize, long readCacheSize, int readAheadCacheBatchSize) + long writeCacheSize, long readCacheSize, int readAheadCacheBatchSize, long readAheadCacheBatchBytesSize) throws IOException { return new MockedSingleDirectoryDbLedgerStorage(conf, ledgerManager, ledgerDirsManager, indexDirsManager, entryLogger, statsLogger, allocator, writeCacheSize, - readCacheSize, readAheadCacheBatchSize); + readCacheSize, readAheadCacheBatchSize, readAheadCacheBatchBytesSize); } private static class MockedSingleDirectoryDbLedgerStorage extends SingleDirectoryDbLedgerStorage { @@ -68,9 +68,11 @@ public MockedSingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerMana LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager, EntryLogger entryLogger, StatsLogger statsLogger, ByteBufAllocator allocator, long writeCacheSize, - long readCacheSize, int readAheadCacheBatchSize) throws IOException { + long readCacheSize, int readAheadCacheBatchSize, long readAheadCacheBatchBytesSize) + throws IOException { super(conf, ledgerManager, ledgerDirsManager, indexDirsManager, entryLogger, - statsLogger, allocator, writeCacheSize, readCacheSize, readAheadCacheBatchSize); + statsLogger, allocator, writeCacheSize, readCacheSize, readAheadCacheBatchSize, + readAheadCacheBatchBytesSize); } @Override diff --git a/conf/bk_server.conf b/conf/bk_server.conf index f7dbf380329..c0a021418d9 100755 --- a/conf/bk_server.conf +++ b/conf/bk_server.conf @@ -749,6 +749,9 @@ gcEntryLogMetadataCacheEnabled=false # By default it will be allocated to 25% of the available direct memory # dbStorage_readAheadCacheMaxSizeMb= +# How many entries' bytes to pre-fill in cache after a read cache miss. Default is -1. 0 or less disables this feature +# dbStorage_readAheadCacheBatchBytesSize=-1 + # How many entries to pre-fill in cache after a read cache miss # dbStorage_readAheadCacheBatchSize=100