Skip to content

Commit

Permalink
In-memory index
Browse files Browse the repository at this point in the history
  • Loading branch information
SirTyson committed Jan 31, 2025
1 parent 36adbfd commit 3fc04c1
Show file tree
Hide file tree
Showing 27 changed files with 786 additions and 324 deletions.
4 changes: 2 additions & 2 deletions docs/stellar-core_example.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -235,13 +235,13 @@ MAX_DEX_TX_OPERATIONS_IN_TX_SET = 0
# 0, indiviudal index is always used. Default page size 16 kb.
BUCKETLIST_DB_INDEX_PAGE_SIZE_EXPONENT = 14

# BUCKETLIST_DB_INDEX_CUTOFF (Integer) default 20
# BUCKETLIST_DB_INDEX_CUTOFF (Integer) default 250
# Size, in MB, determining whether a bucket should have an individual
# key index or a key range index. If bucket size is below this value, range
# based index will be used. If set to 0, all buckets are range indexed. If
# BUCKETLIST_DB_INDEX_PAGE_SIZE_EXPONENT == 0, value ingnored and all
# buckets have individual key index.
BUCKETLIST_DB_INDEX_CUTOFF = 20
BUCKETLIST_DB_INDEX_CUTOFF = 250

# BUCKETLIST_DB_PERSIST_INDEX (bool) default true
# Determines whether BucketListDB indexes are saved to disk for faster
Expand Down
104 changes: 104 additions & 0 deletions src/bucket/BucketIndexUtils.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Copyright 2025 Stellar Development Foundation and contributors. Licensed
// under the Apache License, Version 2.0. See the COPYING file at the root
// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0

#include "bucket/BucketIndexUtils.h"
#include "bucket/BucketManager.h"
#include "bucket/DiskIndex.h"
#include "bucket/HotArchiveBucket.h"
#include "bucket/HotArchiveBucketIndex.h"
#include "bucket/LiveBucket.h"
#include "bucket/LiveBucketIndex.h"
#include "main/Config.h"
#include "util/Fs.h"
#include <fmt/format.h>

namespace stellar
{

std::streamoff
getPageSizeFromConfig(Config const& cfg)
{
if (cfg.BUCKETLIST_DB_INDEX_PAGE_SIZE_EXPONENT == 0)
{
return 0;
}

return 1UL << cfg.BUCKETLIST_DB_INDEX_PAGE_SIZE_EXPONENT;
}

template <class BucketT>
std::unique_ptr<typename BucketT::IndexT const>
createIndex(BucketManager& bm, std::filesystem::path const& filename,
Hash const& hash, asio::io_context& ctx)
{
BUCKET_TYPE_ASSERT(BucketT);

ZoneScoped;
releaseAssertOrThrow(!filename.empty());

try
{
return std::unique_ptr<typename BucketT::IndexT const>(
new typename BucketT::IndexT(bm, filename, hash, ctx));
}
// BucketIndex throws if BucketManager shuts down before index finishes,
// so return empty index instead of partial index
catch (std::runtime_error&)
{
return {};
}
}

template <class BucketT>
std::unique_ptr<typename BucketT::IndexT const>
loadIndex(BucketManager const& bm, std::filesystem::path const& filename,
std::size_t fileSize)
{
std::ifstream in(filename, std::ios::binary);
if (!in)
{
throw std::runtime_error(fmt::format(
FMT_STRING("Error opening file {}"), filename.string()));
}

std::streamoff pageSize;
uint32_t version;
cereal::BinaryInputArchive ar(in);
DiskIndex<BucketT>::preLoad(ar, version, pageSize);

// Page size based on current settings. These may have changed since the
// on-disk index was serialized.
auto expectedPageSize =
BucketT::IndexT::getPageSize(bm.getConfig(), fileSize);

// Make sure on-disk index was built with correct version and config
// parameters before deserializing whole file
if (version != BucketT::IndexT::BUCKET_INDEX_VERSION ||
pageSize != expectedPageSize)
{
return {};
}

return std::unique_ptr<typename BucketT::IndexT const>(
new typename BucketT::IndexT(bm, ar, pageSize));
}

template std::unique_ptr<typename LiveBucket::IndexT const>
createIndex<LiveBucket>(BucketManager& bm,
std::filesystem::path const& filename, Hash const& hash,
asio::io_context& ctx);
template std::unique_ptr<typename HotArchiveBucket::IndexT const>
createIndex<HotArchiveBucket>(BucketManager& bm,
std::filesystem::path const& filename,
Hash const& hash, asio::io_context& ctx);

template std::unique_ptr<typename LiveBucket::IndexT const>
loadIndex<LiveBucket>(BucketManager const& bm,
std::filesystem::path const& filename,
std::size_t fileSize);
template std::unique_ptr<typename HotArchiveBucket::IndexT const>
loadIndex<HotArchiveBucket>(BucketManager const& bm,
std::filesystem::path const& filename,
std::size_t fileSize);
}
112 changes: 80 additions & 32 deletions src/bucket/BucketIndexUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,60 +4,108 @@
// under the Apache License, Version 2.0. See the COPYING file at the root
// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0

#include "bucket/BucketUtils.h"
#include "util/GlobalChecks.h"
#include "util/RandomEvictionCache.h"
#include "util/XDROperators.h" // IWYU pragma: keep
#include "xdr/Stellar-ledger-entries.h"
#include <filesystem>
#include <map>
#include <optional>
#include <variant>
#include <vector>

#include "util/XDRCereal.h"
#include <cereal/archives/binary.hpp>
#include <cereal/types/utility.hpp>
#include <cereal/types/vector.hpp>

namespace asio
{
class io_context;
}

namespace stellar
{
// maps smallest and largest LedgerKey on a given page inclusively
// [lowerBound, upperbound]
struct RangeEntry

class BucketManager;
class Config;

using AssetPoolIDMap = std::map<Asset, std::vector<PoolID>>;
using IndexPtrT = std::shared_ptr<BucketEntry const>;

// Querying a BucketIndex can return one of three states:
// 1. CACHE_HIT: The entry is in the cache. Can either be a live or dead entry.
// 2. FILE_OFFSET: The entry is not in the cache, but the entry potentially
// exists at the given offset.
// 3. NOT_FOUND: The entry does not exist in the bucket.
enum IndexReturnState
{
CACHE_HIT,
FILE_OFFSET,
NOT_FOUND
};

class IndexReturnT
{
LedgerKey lowerBound;
LedgerKey upperBound;
private:
// Payload maps to the possible return states:
// CACHE_HIT: IndexPtrT
// FILE_OFFSET: std::streamoff
// NOT_FOUND: std::monostate
using PayloadT = std::variant<IndexPtrT, std::streamoff, std::monostate>;
PayloadT mPayload;
IndexReturnState mState;

RangeEntry() = default;
RangeEntry(LedgerKey low, LedgerKey high)
: lowerBound(low), upperBound(high)
public:
IndexReturnT(IndexPtrT entry)
: mPayload(entry), mState(IndexReturnState::CACHE_HIT)
{
releaseAssert(low < high || low == high);
releaseAssertOrThrow(entry);
}

inline bool
operator==(RangeEntry const& in) const
IndexReturnT(std::streamoff offset)
: mPayload(offset), mState(IndexReturnState::FILE_OFFSET)
{
}
IndexReturnT()
: mPayload(std::monostate{}), mState(IndexReturnState::NOT_FOUND)
{
return lowerBound == in.lowerBound && upperBound == in.upperBound;
}

template <class Archive>
void
serialize(Archive& ar)
IndexReturnState
getState() const
{
ar(lowerBound, upperBound);
return mState;
}
IndexPtrT
cacheHit() const
{
releaseAssertOrThrow(mState == IndexReturnState::CACHE_HIT);
releaseAssertOrThrow(std::holds_alternative<IndexPtrT>(mPayload));
return std::get<IndexPtrT>(mPayload);
}
std::streamoff
fileOffset() const
{
releaseAssertOrThrow(mState == IndexReturnState::FILE_OFFSET);
releaseAssertOrThrow(std::holds_alternative<std::streamoff>(mPayload));
return std::get<std::streamoff>(mPayload);
}
};

using RangeIndex = std::vector<std::pair<RangeEntry, std::streamoff>>;
using BucketEvictionCache =
RandomEvictionCache<LedgerKey, std::shared_ptr<BucketEntry const>>;
// Returns pagesize, in bytes, from BUCKETLIST_DB_INDEX_CUTOFF config param
std::streamoff getPageSizeFromConfig(Config const& cfg);

using AssetPoolIDMap = std::map<Asset, std::vector<PoolID>>;
// Builds index for given bucketfile. This is expensive (> 20 seconds
// for the largest buckets) and should only be called once. Constructs a
// DiskIndex or InMemoryIndex depending on config and Bucket size.
template <class BucketT>
std::unique_ptr<typename BucketT::IndexT const>
createIndex(BucketManager& bm, std::filesystem::path const& filename,
Hash const& hash, asio::io_context& ctx);

// For small Buckets, we can cache all contents in memory. Because we cache all
// entries, the index is just as large as the Bucket itself, so we never persist
// this index type. It is always recreated on startup.
struct InMemoryIndex
{
std::map<LedgerKey, std::shared_ptr<BucketEntry>> inMemoryMap;
BucketEntryCounters counters{};
};
}
// Loads index from given file. If file does not exist or if saved
// index does not have expected version or pageSize, return null
template <class BucketT>
std::unique_ptr<typename BucketT::IndexT const>
loadIndex(BucketManager const& bm, std::filesystem::path const& filename,
std::size_t fileSize);
}
4 changes: 2 additions & 2 deletions src/bucket/BucketListSnapshotBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,12 @@ SearchableBucketListSnapshotBase<BucketT>::loopAllBuckets(
}

template <class BucketT>
std::shared_ptr<typename BucketT::LoadT>
std::shared_ptr<typename BucketT::LoadT const>
SearchableBucketListSnapshotBase<BucketT>::load(LedgerKey const& k) const
{
ZoneScoped;

std::shared_ptr<typename BucketT::LoadT> result{};
std::shared_ptr<typename BucketT::LoadT const> result{};
auto timerIter = mPointTimers.find(k.type());
releaseAssert(timerIter != mPointTimers.end());
auto timer = timerIter->second.TimeScope();
Expand Down
3 changes: 2 additions & 1 deletion src/bucket/BucketListSnapshotBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ class SearchableBucketListSnapshotBase : public NonMovableOrCopyable
loadKeysFromLedger(std::set<LedgerKey, LedgerEntryIdCmp> const& inKeys,
uint32_t ledgerSeq) const;

std::shared_ptr<typename BucketT::LoadT> load(LedgerKey const& k) const;
std::shared_ptr<typename BucketT::LoadT const>
load(LedgerKey const& k) const;
};
}
4 changes: 2 additions & 2 deletions src/bucket/BucketOutputIterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0

#include "bucket/BucketOutputIterator.h"
#include "bucket/BucketIndexUtils.h"
#include "bucket/BucketManager.h"
#include "bucket/HotArchiveBucket.h"
#include "bucket/LiveBucket.h"
Expand Down Expand Up @@ -197,8 +198,7 @@ BucketOutputIterator<BucketT>::getBucket(BucketManager& bucketManager,
if (auto b = bucketManager.getBucketIfExists<BucketT>(hash);
!b || !b->isIndexed())
{
index = createIndex<BucketT>(bucketManager, mFilename,
hash, mCtx);
index = createIndex<BucketT>(bucketManager, mFilename, hash, mCtx);
}

return bucketManager.adoptFileAsBucket<BucketT>(mFilename.string(), hash,
Expand Down
Loading

0 comments on commit 3fc04c1

Please sign in to comment.