Skip to content

Commit

Permalink
[feature] [server] add dbStorage_readAheadCacheBatchBytesSize propert…
Browse files Browse the repository at this point in the history
…ies when read ahead entries (apache#3895)

* [feature] [server] add dbStorage_readAheadCacheBatchBytesSize properties when read ahead entries

---------

Co-authored-by: lushiji <[email protected]>
(cherry picked from commit f5455f0)
  • Loading branch information
StevenLuMT authored and hangc0276 committed Jul 8, 2024
1 parent 4e73837 commit 972abba
Show file tree
Hide file tree
Showing 6 changed files with 429 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,10 @@ public class DbLedgerStorage implements LedgerStorage {
(long) (0.25 * PlatformDependent.estimateMaxDirectMemory()) / MB;

static final String READ_AHEAD_CACHE_BATCH_SIZE = "dbStorage_readAheadCacheBatchSize";
static final String READ_AHEAD_CACHE_BATCH_BYTES_SIZE = "dbStorage_readAheadCacheBatchBytesSize";
private static final int DEFAULT_READ_AHEAD_CACHE_BATCH_SIZE = 100;
// the default value is -1. this feature(limit of read ahead bytes) is disabled
private static final int DEFAULT_READ_AHEAD_CACHE_BATCH_BYTES_SIZE = -1;

private static final long DEFAULT_DIRECT_IO_TOTAL_WRITEBUFFER_SIZE_MB =
(long) (0.125 * PlatformDependent.estimateMaxDirectMemory())
Expand Down Expand Up @@ -171,6 +174,8 @@ public void initialize(ServerConfiguration conf, LedgerManager ledgerManager, Le
long perDirectoryWriteCacheSize = writeCacheMaxSize / numberOfDirs;
long perDirectoryReadCacheSize = readCacheMaxSize / numberOfDirs;
int readAheadCacheBatchSize = conf.getInt(READ_AHEAD_CACHE_BATCH_SIZE, DEFAULT_READ_AHEAD_CACHE_BATCH_SIZE);
long readAheadCacheBatchBytesSize = conf.getInt(READ_AHEAD_CACHE_BATCH_BYTES_SIZE,
DEFAULT_READ_AHEAD_CACHE_BATCH_BYTES_SIZE);

ledgerStorageList = Lists.newArrayList();
for (int i = 0; i < ledgerDirsManager.getAllLedgerDirs().size(); i++) {
Expand Down Expand Up @@ -237,7 +242,7 @@ public void initialize(ServerConfiguration conf, LedgerManager ledgerManager, Le
idm, entrylogger,
statsLogger, perDirectoryWriteCacheSize,
perDirectoryReadCacheSize,
readAheadCacheBatchSize));
readAheadCacheBatchSize, readAheadCacheBatchBytesSize));
ldm.getListeners().forEach(ledgerDirsManager::addLedgerDirsListener);
if (!lDirs[0].getPath().equals(iDirs[0].getPath())) {
idm.getListeners().forEach(indexDirsManager::addLedgerDirsListener);
Expand Down Expand Up @@ -276,11 +281,11 @@ public Long getSample() {
protected SingleDirectoryDbLedgerStorage newSingleDirectoryDbLedgerStorage(ServerConfiguration conf,
LedgerManager ledgerManager, LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager,
EntryLogger entryLogger, StatsLogger statsLogger, long writeCacheSize, long readCacheSize,
int readAheadCacheBatchSize)
int readAheadCacheBatchSize, long readAheadCacheBatchBytesSize)
throws IOException {
return new SingleDirectoryDbLedgerStorage(conf, ledgerManager, ledgerDirsManager, indexDirsManager, entryLogger,
statsLogger, allocator, writeCacheSize, readCacheSize,
readAheadCacheBatchSize);
readAheadCacheBatchSize, readAheadCacheBatchBytesSize);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ protected Thread newThread(Runnable r, String name) {
private final long writeCacheMaxSize;
private final long readCacheMaxSize;
private final int readAheadCacheBatchSize;
private final long readAheadCacheBatchBytesSize;

private final long maxThrottleTimeNanos;

Expand All @@ -152,7 +153,8 @@ protected Thread newThread(Runnable r, String name) {
public SingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerManager ledgerManager,
LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager,
EntryLogger entryLogger, StatsLogger statsLogger, ByteBufAllocator allocator,
long writeCacheSize, long readCacheSize, int readAheadCacheBatchSize)
long writeCacheSize, long readCacheSize, int readAheadCacheBatchSize,
long readAheadCacheBatchBytesSize)
throws IOException {
checkArgument(ledgerDirsManager.getAllLedgerDirs().size() == 1,
"Db implementation only allows for one storage dir");
Expand Down Expand Up @@ -182,6 +184,7 @@ public SingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerManager le

readCacheMaxSize = readCacheSize;
this.readAheadCacheBatchSize = readAheadCacheBatchSize;
this.readAheadCacheBatchBytesSize = readAheadCacheBatchBytesSize;

// Do not attempt to perform read-ahead more than half the total size of the cache
maxReadAheadBytesSize = readCacheMaxSize / 2;
Expand Down Expand Up @@ -663,9 +666,7 @@ private void fillReadAheadCache(long orginalLedgerId, long firstEntryId, long fi
long currentEntryLogId = firstEntryLogId;
long currentEntryLocation = firstEntryLocation;

while (count < readAheadCacheBatchSize
&& size < maxReadAheadBytesSize
&& currentEntryLogId == firstEntryLogId) {
while (chargeReadAheadCache(count, size) && currentEntryLogId == firstEntryLogId) {
ByteBuf entry = entryLogger.readEntry(orginalLedgerId,
firstEntryId, currentEntryLocation);

Expand Down Expand Up @@ -703,6 +704,17 @@ private void fillReadAheadCache(long orginalLedgerId, long firstEntryId, long fi
}
}

protected boolean chargeReadAheadCache(int currentReadAheadCount, long currentReadAheadBytes) {
// compatible with old logic
boolean chargeSizeCondition = currentReadAheadCount < readAheadCacheBatchSize
&& currentReadAheadBytes < maxReadAheadBytesSize;
if (chargeSizeCondition && readAheadCacheBatchBytesSize > 0) {
// exact limits limit the size and count for each batch
chargeSizeCondition = currentReadAheadBytes < readAheadCacheBatchBytesSize;
}
return chargeSizeCondition;
}

public ByteBuf getLastEntry(long ledgerId) throws IOException, BookieException {
throwIfLimbo(ledgerId);

Expand Down Expand Up @@ -1278,4 +1290,9 @@ public void clearStorageStateFlag(StorageState flag) throws IOException {
}
}
}

@VisibleForTesting
DbLedgerStorageStats getDbLedgerStorageStats() {
return dbLedgerStorageStats;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.bookkeeper.meta.NullMetadataBookieDriver;
import org.apache.bookkeeper.proto.SimpleBookieServiceInfoProvider;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.util.DiskChecker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -45,6 +46,19 @@ public TestBookieImpl(ServerConfiguration conf) throws Exception {
this(new ResourceBuilder(conf).build());
}

public TestBookieImpl(Resources resources, StatsLogger statsLogger) throws Exception {
super(resources.conf,
resources.registrationManager,
resources.storage,
resources.diskChecker,
resources.ledgerDirsManager,
resources.indexDirsManager,
statsLogger,
UnpooledByteBufAllocator.DEFAULT,
new SimpleBookieServiceInfoProvider(resources.conf));
this.resources = resources;
}

public TestBookieImpl(Resources resources) throws Exception {
super(resources.conf,
resources.registrationManager,
Expand Down Expand Up @@ -157,12 +171,16 @@ public ResourceBuilder withRegistrationManager(RegistrationManager registrationM
return this;
}

Resources build() throws Exception {
public Resources build() throws Exception {
return build(NullStatsLogger.INSTANCE);
}

public Resources build(StatsLogger statsLogger) throws Exception {
if (metadataBookieDriver == null) {
if (conf.getMetadataServiceUri() == null) {
metadataBookieDriver = new NullMetadataBookieDriver();
} else {
metadataBookieDriver = BookieResources.createMetadataDriver(conf, NullStatsLogger.INSTANCE);
metadataBookieDriver = BookieResources.createMetadataDriver(conf, statsLogger);
}
}
if (registrationManager == null) {
Expand All @@ -173,13 +191,13 @@ Resources build() throws Exception {

DiskChecker diskChecker = BookieResources.createDiskChecker(conf);
LedgerDirsManager ledgerDirsManager = BookieResources.createLedgerDirsManager(
conf, diskChecker, NullStatsLogger.INSTANCE);
conf, diskChecker, statsLogger);
LedgerDirsManager indexDirsManager = BookieResources.createIndexDirsManager(
conf, diskChecker, NullStatsLogger.INSTANCE, ledgerDirsManager);
conf, diskChecker, statsLogger, ledgerDirsManager);

LedgerStorage storage = BookieResources.createLedgerStorage(
conf, ledgerManager, ledgerDirsManager, indexDirsManager,
NullStatsLogger.INSTANCE, UnpooledByteBufAllocator.DEFAULT);
statsLogger, UnpooledByteBufAllocator.DEFAULT);

return new Resources(conf,
metadataBookieDriver,
Expand Down
Loading

0 comments on commit 972abba

Please sign in to comment.