Skip to content

Commit

Permalink
chore: Tiered fixes (#3401)
Browse files Browse the repository at this point in the history
1. Add background offloading stats
2. remove direct_fd override - helio is already updated with default=false, so it's not needed anymore.
3. remove redundant tiered_storage_memory_margin flag

Signed-off-by: Roman Gershman <[email protected]>
  • Loading branch information
romange authored Aug 1, 2024
1 parent 71b8615 commit 0ad3107
Show file tree
Hide file tree
Showing 12 changed files with 53 additions and 40 deletions.
4 changes: 2 additions & 2 deletions src/facade/dragonfly_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -693,10 +693,10 @@ void Connection::HandleRequests() {

socket_->CancelOnErrorCb(); // noop if nothing is registered.
}
VLOG(1) << "Closed connection for peer "
<< GetClientInfo(fb2::ProactorBase::me()->GetPoolIndex());
cc_.reset();
}

VLOG(1) << "Closed connection for peer " << remote_ep;
}

void Connection::RegisterBreakHook(BreakerCb breaker_cb) {
Expand Down
5 changes: 3 additions & 2 deletions src/server/common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ bool ParseDouble(string_view src, double* value) {
#define ADD(x) (x) += o.x

TieredStats& TieredStats::operator+=(const TieredStats& o) {
static_assert(sizeof(TieredStats) == 128);
static_assert(sizeof(TieredStats) == 144);

ADD(total_stashes);
ADD(total_fetches);
Expand All @@ -254,7 +254,8 @@ TieredStats& TieredStats::operator+=(const TieredStats& o) {
ADD(small_bins_filling_bytes);
ADD(total_stash_overflows);
ADD(cold_storage_bytes);

ADD(total_offloading_steps);
ADD(total_offloading_stashes);
return *this;
}

Expand Down
2 changes: 2 additions & 0 deletions src/server/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ struct TieredStats {

// How many times the system did not perform Stash call (disjoint with total_stashes).
uint64_t total_stash_overflows = 0;
uint64_t total_offloading_steps = 0;
uint64_t total_offloading_stashes = 0;

size_t allocated_bytes = 0;
size_t capacity_bytes = 0;
Expand Down
3 changes: 2 additions & 1 deletion src/server/db_slice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,8 @@ OpResult<DbSlice::PrimeItAndExp> DbSlice::FindInternal(const Context& cntx, std:
// attribute of the entry in case of value overrides.
res.it->first.SetTouched(true);

db.top_keys.Touch(key);
// We do not use TopKey feature, so disable it until we redesign it.
// db.top_keys.Touch(key);

return res;
}
Expand Down
8 changes: 0 additions & 8 deletions src/server/dfly_main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,6 @@ ABSL_FLAG(bool, version_check, true,

ABSL_FLAG(uint16_t, tcp_backlog, 128, "TCP listen(2) backlog parameter.");

#ifdef __linux__
ABSL_DECLARE_FLAG(bool, enable_direct_fd);
#endif

using namespace util;
using namespace facade;
using namespace io;
Expand Down Expand Up @@ -720,10 +716,6 @@ Usage: dragonfly [FLAGS]
unique_ptr<util::ProactorPool> pool;

#ifdef __linux__
// NOTE: Seems that enable_direct_fd causes sockets leakage.
// Until it is fixed in helio, we disable it.
absl::SetFlag(&FLAGS_enable_direct_fd, false);

base::sys::KernelVersion kver;
base::sys::GetKernelVersion(&kver);

Expand Down
8 changes: 6 additions & 2 deletions src/server/engine_shard_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,11 @@ void EngineShard::Heartbeat() {
eviction_redline - db_slice.memory_budget());
}

if (UsedMemory() > tiering_offload_threshold) {
size_t used_memory = UsedMemory();
if (used_memory > tiering_offload_threshold) {
VLOG(1) << "Running Offloading, memory=" << used_memory
<< " tiering_threshold: " << tiering_offload_threshold
<< ", cool memory: " << tiered_storage_->CoolMemoryUsage();
tiered_storage_->RunOffloading(i);
}
}
Expand Down Expand Up @@ -683,7 +687,7 @@ void EngineShard::RunPeriodic(std::chrono::milliseconds period_ms) {

int64_t now_ms = fb2::ProactorBase::GetMonotonicTimeNs() / 1000000;
if (now_ms - 5 * period_ms.count() > last_heartbeat_ms) {
VLOG(1) << "This heartbeat took " << now_ms - last_heartbeat_ms << "ms";
VLOG(1) << "This heartbeat-sleep took " << now_ms - last_heartbeat_ms << "ms";
}
Heartbeat();
last_heartbeat_ms = fb2::ProactorBase::GetMonotonicTimeNs() / 1000000;
Expand Down
2 changes: 2 additions & 0 deletions src/server/server_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2192,6 +2192,8 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
append("tiered_small_bins_entries_cnt", m.tiered_stats.small_bins_entries_cnt);
append("tiered_small_bins_filling_bytes", m.tiered_stats.small_bins_filling_bytes);
append("tiered_cold_storage_bytes", m.tiered_stats.cold_storage_bytes);
append("tiered_offloading_steps", m.tiered_stats.total_offloading_steps);
append("tiered_offloading_stashes", m.tiered_stats.total_offloading_stashes);
append("tiered_ram_hits", m.events.ram_hits);
append("tiered_ram_cool_hits", m.events.ram_cool_hits);
append("tiered_ram_misses", m.events.ram_misses);
Expand Down
21 changes: 14 additions & 7 deletions src/server/string_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -424,8 +424,11 @@ OpResult<array<int64_t, 5>> OpThrottle(const OpArgs& op_args, const string_view
return array<int64_t, 5>{limited ? 1 : 0, limit, remaining, retry_after_ms, reset_after_ms};
}

SinkReplyBuilder::MGetResponse OpMGet(util::fb2::BlockingCounter wait_bc, bool fetch_mcflag,
bool fetch_mcver, const Transaction* t, EngineShard* shard) {
// fetch_mask values
constexpr uint8_t FETCH_MCFLAG = 0x1;
constexpr uint8_t FETCH_MCVER = 0x2;
SinkReplyBuilder::MGetResponse OpMGet(util::fb2::BlockingCounter wait_bc, uint8_t fetch_mask,
const Transaction* t, EngineShard* shard) {
ShardArgs keys = t->GetShardArgs(shard->shard_id());
DCHECK(!keys.Empty());

Expand All @@ -448,7 +451,8 @@ SinkReplyBuilder::MGetResponse OpMGet(util::fb2::BlockingCounter wait_bc, bool f
// Allocate enough for all values
response.storage_list = SinkReplyBuilder::AllocMGetStorage(total_size);
char* next = response.storage_list->data;

bool fetch_mcflag = fetch_mask & FETCH_MCFLAG;
bool fetch_mcver = fetch_mask & FETCH_MCVER;
for (size_t i = 0; i < iters.size(); ++i) {
auto it = iters[i];
if (it.is_done())
Expand Down Expand Up @@ -1122,14 +1126,17 @@ void StringFamily::MGet(CmdArgList args, ConnectionContext* cntx) {
std::vector<SinkReplyBuilder::MGetResponse> mget_resp(shard_set->size());

ConnectionContext* dfly_cntx = static_cast<ConnectionContext*>(cntx);
bool fetch_mcflag = cntx->protocol() == Protocol::MEMCACHE;
bool fetch_mcver =
fetch_mcflag && (dfly_cntx->conn_state.memcache_flag & ConnectionState::FETCH_CAS_VER);
uint8_t fetch_mask = 0;
if (cntx->protocol() == Protocol::MEMCACHE) {
fetch_mask |= FETCH_MCFLAG;
if (dfly_cntx->conn_state.memcache_flag & ConnectionState::FETCH_CAS_VER)
fetch_mask |= FETCH_MCVER;
}

// Count of pending tiered reads
util::fb2::BlockingCounter tiering_bc{0};
auto cb = [&](Transaction* t, EngineShard* shard) {
mget_resp[shard->shard_id()] = OpMGet(tiering_bc, fetch_mcflag, fetch_mcver, t, shard);
mget_resp[shard->shard_id()] = OpMGet(tiering_bc, fetch_mask, t, shard);
return OpStatus::OK;
};

Expand Down
4 changes: 4 additions & 0 deletions src/server/string_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ struct StringValue {
static StringValue Read(DbIndex dbid, std::string_view key, const PrimeValue& pv,
EngineShard* es);

bool IsFuturized() const {
return std::holds_alternative<util::fb2::Future<std::string>>(v_);
}

private:
std::variant<std::monostate, std::string, util::fb2::Future<std::string>> v_;
};
Expand Down
28 changes: 13 additions & 15 deletions src/server/tiered_storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,6 @@

using namespace facade;

ABSL_FLAG(uint32_t, tiered_storage_memory_margin, 10_MB,
"In bytes. If memory budget on a shard goes below this limit, tiering stops "
"hot-loading values into ram.");

ABSL_FLAG(bool, tiered_experimental_cooling, true,
"If true, uses intermidate cooling layer "
"when offloading values to storage");
Expand Down Expand Up @@ -86,7 +82,6 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {
public:
ShardOpManager(TieredStorage* ts, DbSlice* db_slice, size_t max_size)
: tiering::OpManager{max_size}, ts_{ts}, db_slice_{*db_slice} {
memory_margin_ = absl::GetFlag(FLAGS_tiered_storage_memory_margin);
}

// Clear IO pending flag for entry
Expand Down Expand Up @@ -176,10 +171,10 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {
}

bool HasEnoughMemoryMargin(int64_t value_len) {
return db_slice_.memory_budget() - memory_margin_ - value_len > 0;
return db_slice_.memory_budget() - value_len > ssize_t(memory_low_limit_);
}

int64_t memory_margin_ = 0; // TODO: get rid of memory_margin_
// being compared to db_slice_.memory_budget().
size_t memory_low_limit_;

struct {
Expand Down Expand Up @@ -486,7 +481,9 @@ TieredStats TieredStorage::GetStats() const {

{ // Own stats
stats.total_stash_overflows = stats_.stash_overflow_cnt;
stats.cold_storage_bytes = cool_memory_used_;
stats.cold_storage_bytes = stats_.cool_memory_used;
stats.total_offloading_steps = stats_.offloading_steps;
stats.total_offloading_stashes = stats_.offloading_stashes;
}
return stats;
}
Expand All @@ -505,10 +502,12 @@ void TieredStorage::RunOffloading(DbIndex dbid) {

string tmp;
auto cb = [this, dbid, &tmp](PrimeIterator it) mutable {
stats_.offloading_steps++;
if (ShouldStash(it->second)) {
if (it->first.WasTouched()) {
it->first.SetTouched(false);
} else {
stats_.offloading_stashes++;
TryStash(dbid, it->first.GetSlice(&tmp), &it->second);
}
}
Expand All @@ -530,12 +529,12 @@ void TieredStorage::RunOffloading(DbIndex dbid) {
size_t TieredStorage::ReclaimMemory(size_t goal) {
size_t gained = 0;
do {
size_t memory_before = cool_memory_used_;
size_t memory_before = stats_.cool_memory_used;
detail::TieredColdRecord* record = PopCool();
if (record == nullptr) // nothing to pull anymore
break;

gained += memory_before - cool_memory_used_;
gained += memory_before - stats_.cool_memory_used;

// Find the entry that points to the cool item and externalize it.
auto predicate = [record](const PrimeKey& key, const PrimeValue& probe) {
Expand All @@ -560,7 +559,7 @@ size_t TieredStorage::ReclaimMemory(size_t goal) {
}

bool TieredStorage::ShouldStash(const PrimeValue& pv) const {
auto disk_stats = op_manager_->GetStats().disk_stats;
const auto& disk_stats = op_manager_->GetStats().disk_stats;
return !pv.IsExternal() && !pv.HasStashPending() && pv.ObjType() == OBJ_STRING &&
pv.Size() >= kMinValueSize &&
disk_stats.allocated_bytes + tiering::kPageSize + pv.Size() < disk_stats.max_file_size;
Expand All @@ -570,7 +569,7 @@ void TieredStorage::CoolDown(DbIndex db_ind, std::string_view str,
const tiering::DiskSegment& segment, PrimeValue* pv) {
detail::TieredColdRecord* record = CompactObj::AllocateMR<detail::TieredColdRecord>();
cool_queue_.push_front(*record);
cool_memory_used_ += (sizeof(detail::TieredColdRecord) + pv->MallocUsed());
stats_.cool_memory_used += (sizeof(detail::TieredColdRecord) + pv->MallocUsed());

record->key_hash = CompactObj::HashCode(str);
record->db_index = db_ind;
Expand All @@ -590,7 +589,6 @@ PrimeValue TieredStorage::Warmup(DbIndex dbid, PrimeValue::CoolItem item) {

// Bring it back to the PrimeTable.
DCHECK(hot.ObjType() == OBJ_STRING);
hot.SetTouched(true);

return hot;
}
Expand All @@ -600,7 +598,7 @@ PrimeValue TieredStorage::DeleteCool(detail::TieredColdRecord* record) {
cool_queue_.erase(it);

PrimeValue hot = std::move(record->value);
cool_memory_used_ -= (sizeof(detail::TieredColdRecord) + hot.MallocUsed());
stats_.cool_memory_used -= (sizeof(detail::TieredColdRecord) + hot.MallocUsed());
CompactObj::DeleteMR<detail::TieredColdRecord>(record);
return hot;
}
Expand All @@ -611,7 +609,7 @@ detail::TieredColdRecord* TieredStorage::PopCool() {

detail::TieredColdRecord& res = cool_queue_.back();
cool_queue_.pop_back();
cool_memory_used_ -= (sizeof(detail::TieredColdRecord) + res.value.MallocUsed());
stats_.cool_memory_used -= (sizeof(detail::TieredColdRecord) + res.value.MallocUsed());
return &res;
}

Expand Down
7 changes: 5 additions & 2 deletions src/server/tiered_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class TieredStorage {
PrimeValue Warmup(DbIndex dbid, PrimeValue::CoolItem item);

size_t CoolMemoryUsage() const {
return cool_memory_used_;
return stats_.cool_memory_used;
}

private:
Expand All @@ -106,11 +106,14 @@ class TieredStorage {
typedef ::boost::intrusive::list<detail::TieredColdRecord> CoolQueue;

CoolQueue cool_queue_;
size_t cool_memory_used_ = 0;

unsigned write_depth_limit_ = 10;
struct {
uint64_t stash_overflow_cnt = 0;
uint64_t total_deletes = 0;
uint64_t offloading_steps = 0;
uint64_t offloading_stashes = 0;
size_t cool_memory_used = 0;
} stats_;
};

Expand Down
1 change: 0 additions & 1 deletion tests/dragonfly/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,6 @@ def __init__(self, params: DflyParams, args):
def create(self, existing_port=None, **kwargs) -> DflyInstance:
args = {**self.args, **kwargs}
args.setdefault("dbfilename", "")
args.setdefault("enable_direct_fd", None) # Testing iouring with direct_fd enabled.
args.setdefault("noversion_check", None)
# MacOs does not set it automatically, so we need to set it manually
args.setdefault("maxmemory", "8G")
Expand Down

0 comments on commit 0ad3107

Please sign in to comment.