Skip to content

Commit

Permalink
BucketListDB in-memory Buckets (#4630)
Browse files Browse the repository at this point in the history
# Description

Partially resolves #3696

This change refactors the `BucketIndex` parts of BucketListDB to be more
friendly towards the Hot Archive BucketList. This includes cleanups to
metrics, which previously only tracked metrics for the main thread
access of the BucketList. Now, both BucketList types and background
threads record metrics properly.

Additionally, this change removes the `IndividualIndex` and instead
caches small Buckets entirely in-memory so we never read from disk.
`RangeIndex` is largely unchanged but has been renamed to `DiskIndex`.

A follow up PR will add a random eviction cache to the `DiskIndex`. I
tried to break this up as much as possible, but it was easiest to do the
refactor + in-memory buckets at the same time so I didn't have to
refactor `IndividualIndex`.

The `BUCKETLIST_DB_INDEX_CUTOFF` config setting determines the maximum
size at which we keep bucket in-memory. I've set this to 250 MB, which
is approximately the first 4-5 levels of the BucketList. This increases
total memory consumption of stellar-core from 2.2 GB to 3 GB. This seems
reasonable, and we could probably go even higher, but I'm holding off
for now as the random eviction cache will further increase memory
requirements.

# Checklist
- [x] Reviewed the
[contributing](https://github.com/stellar/stellar-core/blob/master/CONTRIBUTING.md#submitting-changes)
document
- [x] Rebased on top of master (no merge commits)
- [x] Ran `clang-format` v8.0.0 (via `make format` or the Visual Studio
extension)
- [x] Compiles
- [x] Ran all tests
- [ ] If change impacts performance, include supporting evidence per the
[performance
document](https://github.com/stellar/stellar-core/blob/master/performance-eval/performance-eval.md)
  • Loading branch information
SirTyson authored Jan 31, 2025
2 parents 8d8266f + 3fc04c1 commit 96a822e
Show file tree
Hide file tree
Showing 62 changed files with 2,320 additions and 1,555 deletions.
14 changes: 7 additions & 7 deletions docs/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,13 @@ bucketlist-archive.size.bytes | counter | total size of the hot ar
bucketlist.size.bytes | counter | total size of the BucketList in bytes
bucketlist.entryCounts.-<X> | counter | number of entries of type <X> in the BucketList
bucketlist.entrySizes.-<X> | counter | size of entries of type <X> in the BucketList
bucketlistDB.bloom.lookups | meter | number of bloom filter lookups
bucketlistDB.bloom.misses | meter | number of bloom filter false positives
bucketlistDB.bulk.loads | meter | number of entries BucketListDB queried to prefetch
bucketlistDB.bulk.inflationWinners | timer | time to load inflation winners
bucketlistDB.bulk.poolshareTrustlines | timer | time to load poolshare trustlines by accountID and assetID
bucketlistDB.bulk.prefetch | timer | time to prefetch
bucketlistDB.point.<X> | timer | time to load single entry of type <X> (if no bloom miss occurred)
bucketlistDB-<X>.bloom.lookups | meter | number of bloom filter lookups on BucketList <X> (live/hotArchive)
bucketlistDB-<X>.bloom.misses | meter | number of bloom filter false positives on BucketList <X> (live/hotArchive)
bucketlistDB-<X>.bulk.loads | meter | number of entries BucketListDB queried to prefetch on BucketList <X> (live/hot-archive)
bucketlistDB-live.bulk.inflationWinners | timer | time to load inflation winners
bucketlistDB-live.bulk.poolshareTrustlines | timer | time to load poolshare trustlines by accountID and assetID
bucketlistDB-live.bulk.prefetch | timer | time to prefetch
bucketlistDB-<X>.point.<y> | timer | time to load single entry of type <Y> on BucketList <X> (live/hotArchive)
crypto.verify.hit | meter | number of signature cache hits
crypto.verify.miss | meter | number of signature cache misses
crypto.verify.total | meter | sum of both hits and misses
Expand Down
4 changes: 2 additions & 2 deletions docs/stellar-core_example.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -235,13 +235,13 @@ MAX_DEX_TX_OPERATIONS_IN_TX_SET = 0
# 0, indiviudal index is always used. Default page size 16 kb.
BUCKETLIST_DB_INDEX_PAGE_SIZE_EXPONENT = 14

# BUCKETLIST_DB_INDEX_CUTOFF (Integer) default 20
# BUCKETLIST_DB_INDEX_CUTOFF (Integer) default 250
# Size, in MB, determining whether a bucket should have an individual
# key index or a key range index. If bucket size is below this value, range
# based index will be used. If set to 0, all buckets are range indexed. If
# BUCKETLIST_DB_INDEX_PAGE_SIZE_EXPONENT == 0, value ingnored and all
# buckets have individual key index.
BUCKETLIST_DB_INDEX_CUTOFF = 20
BUCKETLIST_DB_INDEX_CUTOFF = 250

# BUCKETLIST_DB_PERSIST_INDEX (bool) default true
# Determines whether BucketListDB indexes are saved to disk for faster
Expand Down
4 changes: 2 additions & 2 deletions src/bucket/BucketApplicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,15 @@ shouldApplyEntry(BucketEntry const& e)
{
if (e.type() == LIVEENTRY || e.type() == INITENTRY)
{
return BucketIndex::typeNotSupported(e.liveEntry().data.type());
return LiveBucketIndex::typeNotSupported(e.liveEntry().data.type());
}

if (e.type() != DEADENTRY)
{
throw std::runtime_error(
"Malformed bucket: unexpected non-INIT/LIVE/DEAD entry.");
}
return BucketIndex::typeNotSupported(e.deadEntry().type());
return LiveBucketIndex::typeNotSupported(e.deadEntry().type());
}

size_t
Expand Down
103 changes: 49 additions & 54 deletions src/bucket/BucketBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
// else.
#include "util/asio.h" // IWYU pragma: keep
#include "bucket/BucketBase.h"
#include "bucket/BucketIndex.h"
#include "bucket/BucketInputIterator.h"
#include "bucket/BucketManager.h"
#include "bucket/BucketOutputIterator.h"
Expand All @@ -30,36 +29,35 @@
namespace stellar
{

BucketIndex const&
BucketBase::getIndex() const
template <class BucketT, class IndexT>
IndexT const&
BucketBase<BucketT, IndexT>::getIndex() const
{
ZoneScoped;
releaseAssertOrThrow(!mFilename.empty());
releaseAssertOrThrow(mIndex);
return *mIndex;
}

template <class BucketT, class IndexT>
bool
BucketBase::isIndexed() const
BucketBase<BucketT, IndexT>::isIndexed() const
{
return static_cast<bool>(mIndex);
}

std::optional<std::pair<std::streamoff, std::streamoff>>
BucketBase::getOfferRange() const
{
return getIndex().getOfferRange();
}

template <class BucketT, class IndexT>
void
BucketBase::setIndex(std::unique_ptr<BucketIndex const>&& index)
BucketBase<BucketT, IndexT>::setIndex(std::unique_ptr<IndexT const>&& index)
{
releaseAssertOrThrow(!mIndex);
mIndex = std::move(index);
}

BucketBase::BucketBase(std::string const& filename, Hash const& hash,
std::unique_ptr<BucketIndex const>&& index)
template <class BucketT, class IndexT>
BucketBase<BucketT, IndexT>::BucketBase(std::string const& filename,
Hash const& hash,
std::unique_ptr<IndexT const>&& index)
: mFilename(filename), mHash(hash), mIndex(std::move(index))
{
releaseAssert(filename.empty() || fs::exists(filename));
Expand All @@ -71,30 +69,34 @@ BucketBase::BucketBase(std::string const& filename, Hash const& hash,
}
}

BucketBase::BucketBase()
template <class BucketT, class IndexT> BucketBase<BucketT, IndexT>::BucketBase()
{
}

template <class BucketT, class IndexT>
Hash const&
BucketBase::getHash() const
BucketBase<BucketT, IndexT>::getHash() const
{
return mHash;
}

template <class BucketT, class IndexT>
std::filesystem::path const&
BucketBase::getFilename() const
BucketBase<BucketT, IndexT>::getFilename() const
{
return mFilename;
}

template <class BucketT, class IndexT>
size_t
BucketBase::getSize() const
BucketBase<BucketT, IndexT>::getSize() const
{
return mSize;
}

template <class BucketT, class IndexT>
bool
BucketBase::isEmpty() const
BucketBase<BucketT, IndexT>::isEmpty() const
{
if (mFilename.empty() || isZero(mHash))
{
Expand All @@ -105,14 +107,17 @@ BucketBase::isEmpty() const
return false;
}

template <class BucketT, class IndexT>
void
BucketBase::freeIndex()
BucketBase<BucketT, IndexT>::freeIndex()
{
mIndex.reset(nullptr);
}

template <class BucketT, class IndexT>
std::string
BucketBase::randomFileName(std::string const& tmpDir, std::string ext)
BucketBase<BucketT, IndexT>::randomFileName(std::string const& tmpDir,
std::string ext)
{
ZoneScoped;
for (;;)
Expand All @@ -127,14 +132,16 @@ BucketBase::randomFileName(std::string const& tmpDir, std::string ext)
}
}

template <class BucketT, class IndexT>
std::string
BucketBase::randomBucketName(std::string const& tmpDir)
BucketBase<BucketT, IndexT>::randomBucketName(std::string const& tmpDir)
{
return randomFileName(tmpDir, ".xdr");
}

template <class BucketT, class IndexT>
std::string
BucketBase::randomBucketIndexName(std::string const& tmpDir)
BucketBase<BucketT, IndexT>::randomBucketIndexName(std::string const& tmpDir)
{
return randomFileName(tmpDir, ".index");
}
Expand Down Expand Up @@ -172,7 +179,7 @@ BucketBase::randomBucketIndexName(std::string const& tmpDir)
// and shadowing protocol simultaneously, the moment the first new-protocol
// bucket enters the youngest level. At least one new bucket is in every merge's
// shadows from then on in, so they all upgrade (and preserve lifecycle events).
template <class BucketT>
template <class BucketT, class IndexT>
static void
calculateMergeProtocolVersion(
MergeCounters& mc, uint32_t maxProtocolVersion,
Expand Down Expand Up @@ -253,7 +260,7 @@ calculateMergeProtocolVersion(
// side, or entries that compare non-equal. In all these cases we just
// take the lesser (or existing) entry and advance only one iterator,
// not scrutinizing the entry type further.
template <class BucketT>
template <class BucketT, class IndexT>
static bool
mergeCasesWithDefaultAcceptance(
BucketEntryIdCmp<BucketT> const& cmp, MergeCounters& mc,
Expand Down Expand Up @@ -299,14 +306,15 @@ mergeCasesWithDefaultAcceptance(
return false;
}

template <class BucketT>
template <class BucketT, class IndexT>
std::shared_ptr<BucketT>
BucketBase::merge(BucketManager& bucketManager, uint32_t maxProtocolVersion,
std::shared_ptr<BucketT> const& oldBucket,
std::shared_ptr<BucketT> const& newBucket,
std::vector<std::shared_ptr<BucketT>> const& shadows,
bool keepTombstoneEntries, bool countMergeEvents,
asio::io_context& ctx, bool doFsync)
BucketBase<BucketT, IndexT>::merge(
BucketManager& bucketManager, uint32_t maxProtocolVersion,
std::shared_ptr<BucketT> const& oldBucket,
std::shared_ptr<BucketT> const& newBucket,
std::vector<std::shared_ptr<BucketT>> const& shadows,
bool keepTombstoneEntries, bool countMergeEvents, asio::io_context& ctx,
bool doFsync)
{
BUCKET_TYPE_ASSERT(BucketT);

Expand All @@ -326,9 +334,9 @@ BucketBase::merge(BucketManager& bucketManager, uint32_t maxProtocolVersion,

uint32_t protocolVersion;
bool keepShadowedLifecycleEntries;
calculateMergeProtocolVersion<BucketT>(mc, maxProtocolVersion, oi, ni,
shadowIterators, protocolVersion,
keepShadowedLifecycleEntries);
calculateMergeProtocolVersion<BucketT, IndexT>(
mc, maxProtocolVersion, oi, ni, shadowIterators, protocolVersion,
keepShadowedLifecycleEntries);

auto timer = bucketManager.getMergeTimer().TimeScope();
BucketMetadata meta;
Expand All @@ -340,14 +348,14 @@ BucketBase::merge(BucketManager& bucketManager, uint32_t maxProtocolVersion,
{
releaseAssertOrThrow(protocolVersionStartsFrom(
maxProtocolVersion,
BucketBase::FIRST_PROTOCOL_SUPPORTING_PERSISTENT_EVICTION));
BucketT::FIRST_PROTOCOL_SUPPORTING_PERSISTENT_EVICTION));
meta.ext = ni.getMetadata().ext;
}
else if (oi.getMetadata().ext.v() == 1)
{
releaseAssertOrThrow(protocolVersionStartsFrom(
maxProtocolVersion,
BucketBase::FIRST_PROTOCOL_SUPPORTING_PERSISTENT_EVICTION));
BucketT::FIRST_PROTOCOL_SUPPORTING_PERSISTENT_EVICTION));
meta.ext = oi.getMetadata().ext;
}

Expand All @@ -374,9 +382,9 @@ BucketBase::merge(BucketManager& bucketManager, uint32_t maxProtocolVersion,
}
}

if (!mergeCasesWithDefaultAcceptance(cmp, mc, oi, ni, out,
shadowIterators, protocolVersion,
keepShadowedLifecycleEntries))
if (!mergeCasesWithDefaultAcceptance<BucketT, IndexT>(
cmp, mc, oi, ni, out, shadowIterators, protocolVersion,
keepShadowedLifecycleEntries))
{
BucketT::mergeCasesWithEqualKeys(mc, oi, ni, out, shadowIterators,
protocolVersion,
Expand All @@ -400,19 +408,6 @@ BucketBase::merge(BucketManager& bucketManager, uint32_t maxProtocolVersion,
return out.getBucket(bucketManager, &mk);
}

template std::shared_ptr<LiveBucket> BucketBase::merge<LiveBucket>(
BucketManager& bucketManager, uint32_t maxProtocolVersion,
std::shared_ptr<LiveBucket> const& oldBucket,
std::shared_ptr<LiveBucket> const& newBucket,
std::vector<std::shared_ptr<LiveBucket>> const& shadows,
bool keepTombstoneEntries, bool countMergeEvents, asio::io_context& ctx,
bool doFsync);

template std::shared_ptr<HotArchiveBucket> BucketBase::merge<HotArchiveBucket>(
BucketManager& bucketManager, uint32_t maxProtocolVersion,
std::shared_ptr<HotArchiveBucket> const& oldBucket,
std::shared_ptr<HotArchiveBucket> const& newBucket,
std::vector<std::shared_ptr<HotArchiveBucket>> const& shadows,
bool keepTombstoneEntries, bool countMergeEvents, asio::io_context& ctx,
bool doFsync);
template class BucketBase<LiveBucket, LiveBucket::IndexT>;
template class BucketBase<HotArchiveBucket, HotArchiveBucket::IndexT>;
}
42 changes: 27 additions & 15 deletions src/bucket/BucketBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
// under the apache license, version 2.0. see the copying file at the root
// of this distribution or at http://www.apache.org/licenses/license-2.0

#include "bucket/BucketIndex.h"
#include "bucket/BucketUtils.h"
#include "util/NonCopyable.h"
#include "util/ProtocolVersion.h"
#include "xdr/Stellar-types.h"
Expand Down Expand Up @@ -47,17 +47,37 @@ enum class Loop
INCOMPLETE
};

class HotArchiveBucket;
class LiveBucket;
class LiveBucketIndex;
class HotArchiveBucketIndex;

template <class BucketT, class IndexT>
class BucketBase : public NonMovableOrCopyable
{
BUCKET_TYPE_ASSERT(BucketT);

// Because of the CRTP design with derived Bucket classes, this base class
// does not have direct access to BucketT::IndexT, so we take two templates
// and make this assert.
static_assert(
std::is_same_v<
IndexT,
std::conditional_t<
std::is_same_v<BucketT, LiveBucket>, LiveBucketIndex,
std::conditional_t<std::is_same_v<BucketT, HotArchiveBucket>,
HotArchiveBucketIndex, void>>>,
"IndexT must match BucketT::IndexT");

protected:
std::filesystem::path const mFilename;
Hash const mHash;
size_t mSize{0};

std::unique_ptr<BucketIndex const> mIndex{};
std::unique_ptr<IndexT const> mIndex{};

// Returns index, throws if index not yet initialized
BucketIndex const& getIndex() const;
IndexT const& getIndex() const;

static std::string randomFileName(std::string const& tmpDir,
std::string ext);
Expand All @@ -74,7 +94,7 @@ class BucketBase : public NonMovableOrCopyable
// exists, but does not check that the hash is the bucket's hash. Caller
// needs to ensure that.
BucketBase(std::string const& filename, Hash const& hash,
std::unique_ptr<BucketIndex const>&& index);
std::unique_ptr<IndexT const>&& index);

Hash const& getHash() const;
std::filesystem::path const& getFilename() const;
Expand All @@ -88,13 +108,8 @@ class BucketBase : public NonMovableOrCopyable
// Returns true if bucket is indexed, false otherwise
bool isIndexed() const;

// Returns [lowerBound, upperBound) of file offsets for all offers in the
// bucket, or std::nullopt if no offers exist
std::optional<std::pair<std::streamoff, std::streamoff>>
getOfferRange() const;

// Sets index, throws if index is already set
void setIndex(std::unique_ptr<BucketIndex const>&& index);
void setIndex(std::unique_ptr<IndexT const>&& index);

// Merge two buckets together, producing a fresh one. Entries in `oldBucket`
// are overridden in the fresh bucket by keywise-equal entries in
Expand All @@ -107,7 +122,6 @@ class BucketBase : public NonMovableOrCopyable
// `maxProtocolVersion` bounds this (for error checking) and should usually
// be the protocol of the ledger header at which the merge is starting. An
// exception will be thrown if any provided bucket versions exceed it.
template <class BucketT>
static std::shared_ptr<BucketT>
merge(BucketManager& bucketManager, uint32_t maxProtocolVersion,
std::shared_ptr<BucketT> const& oldBucket,
Expand All @@ -120,16 +134,14 @@ class BucketBase : public NonMovableOrCopyable
static std::string randomBucketIndexName(std::string const& tmpDir);

#ifdef BUILD_TESTS
BucketIndex const&
IndexT const&
getIndexForTesting() const
{
return getIndex();
}

#endif // BUILD_TESTS

virtual uint32_t getBucketVersion() const = 0;

template <class BucketT> friend class BucketSnapshotBase;
template <class T> friend class BucketSnapshotBase;
};
}
Loading

0 comments on commit 96a822e

Please sign in to comment.