Skip to content

Commit

Permalink
fix: properly clean tiered state upon flash
Browse files Browse the repository at this point in the history
The bug was around io pending entries that have not been properly cleaned during flush.
This PR simplified the logic around tiered storage handling during flush, it always performs the
cleaning in the synchronous part of the command.

In addition, this PR improves error logging in tests if dragonfly process exits with an error.
Finally, a test is added that makes sure pending tiered items are flushed during the flash call.

Fixes #3252

Signed-off-by: Roman Gershman <[email protected]>
  • Loading branch information
romange committed Jul 7, 2024
1 parent ac328e1 commit 5449d4f
Show file tree
Hide file tree
Showing 10 changed files with 93 additions and 55 deletions.
36 changes: 19 additions & 17 deletions src/server/db_slice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -708,14 +708,10 @@ void DbSlice::FlushSlots(cluster::SlotRanges slot_ranges) {
}

void DbSlice::FlushDbIndexes(const std::vector<DbIndex>& indexes) {
// Async cleanup can only be performed if no tiered entries exist
bool async_cleanup = true;
for (DbIndex index : indexes) {
async_cleanup &= db_arr_[index]->stats.tiered_entries == 0;
}
bool clear_tiered = owner_->tiered_storage() != nullptr;

if (!async_cleanup)
ClearEntriesOnFlush(indexes, db_arr_, false);
if (clear_tiered)
ClearOffloadedEntries(indexes, db_arr_);

DbTableArray flush_db_arr(db_arr_.size());
for (DbIndex index : indexes) {
Expand All @@ -729,9 +725,7 @@ void DbSlice::FlushDbIndexes(const std::vector<DbIndex>& indexes) {
}

CHECK(fetched_items_.empty());
auto cb = [this, async_cleanup, indexes, flush_db_arr = std::move(flush_db_arr)]() mutable {
if (async_cleanup)
ClearEntriesOnFlush(indexes, flush_db_arr, true);
auto cb = [this, indexes, flush_db_arr = std::move(flush_db_arr)]() mutable {
flush_db_arr.clear();
ServerState::tlocal()->DecommitMemory(ServerState::kDataHeap | ServerState::kBackingHeap |
ServerState::kGlibcmalloc);
Expand Down Expand Up @@ -1408,27 +1402,35 @@ void DbSlice::InvalidateSlotWatches(const cluster::SlotSet& slot_ids) {
}
}

void DbSlice::ClearEntriesOnFlush(absl::Span<const DbIndex> indices, const DbTableArray& db_arr,
bool async) {
for (auto index : indices) {
void DbSlice::ClearOffloadedEntries(absl::Span<const DbIndex> indices, const DbTableArray& db_arr) {
// Currently being used only for tiered storage.
TieredStorage* tiered_storage = shard_owner()->tiered_storage();
string scratch;
for (DbIndex index : indices) {
const auto& db_ptr = db_arr[index];
if (!db_ptr || db_ptr->stats.tiered_entries == 0)
if (!db_ptr)
continue;

// Delete all tiered entries
PrimeTable::Cursor cursor;
do {
cursor = db_ptr->prime.Traverse(cursor, [&](PrimeIterator it) {
if (it->second.IsExternal())
PerformDeletion(it, db_ptr.get());
if (it->second.IsExternal()) {
tiered_storage->Delete(index, &it->second);
} else if (it->second.HasIoPending()) {
tiered_storage->CancelStash(index, it->first.GetSlice(&scratch), &it->second);
}
});
} while (cursor && db_ptr->stats.tiered_entries > 0);
} while (cursor);

CHECK_EQ(db_ptr->stats.tiered_entries, 0u);
#if 0
// Wait for delete operations to finish in sync
while (!async && db_ptr->stats.tiered_entries > 0) {
LOG_EVERY_T(ERROR, 0.5) << "Long wait for tiered entry delete on flush";
ThisFiber::SleepFor(1ms);
}
#endif
}
}

Expand Down
6 changes: 2 additions & 4 deletions src/server/db_slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -494,10 +494,8 @@ class DbSlice {
// Invalidate all watched keys for given slots. Used on FlushSlots.
void InvalidateSlotWatches(const cluster::SlotSet& slot_ids);

// Properly clear db_arr before deleting it. If async is set, it's called from a detached fiber
// after swapping the db.
void ClearEntriesOnFlush(absl::Span<const DbIndex> indices, const DbTableArray& db_arr,
bool async);
// Clear tiered storage entries for the specified indices.
void ClearOffloadedEntries(absl::Span<const DbIndex> indices, const DbTableArray& db_arr);

void PerformDeletion(Iterator del_it, ExpIterator exp_it, DbTable* table);

Expand Down
7 changes: 7 additions & 0 deletions src/server/dragonfly_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,13 @@ TEST_F(DflyEngineTest, MemcacheFlags) {
ASSERT_EQ(resp, "OK");
MCResponse resp2 = RunMC(MP::GET, "key");
EXPECT_THAT(resp2, ElementsAre("VALUE key 42 3", "bar", "END"));

ASSERT_EQ(Run("resp", {"flushdb"}), "OK");
pp_->AwaitFiberOnAll([](auto*) {
if (auto* shard = EngineShard::tlocal(); shard) {
EXPECT_EQ(shard->db_slice().GetDBTable(0)->mcflag.size(), 0u);
}
});
}

TEST_F(DflyEngineTest, LimitMemory) {
Expand Down
26 changes: 14 additions & 12 deletions src/server/tiered_storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,24 +61,24 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {
}

// Called before overriding value with segment
void RecordAdded(DbTableStats* stats, const PrimeValue& pv, tiering::DiskSegment segment) {
void RecordAdded(DbTableStats* stats, const PrimeValue& pv, size_t tiered_len) {
stats->AddTypeMemoryUsage(pv.ObjType(), -pv.MallocUsed());
stats->tiered_entries++;
stats->tiered_used_bytes += segment.length;
stats->tiered_used_bytes += tiered_len;
}

// Called after setting new value in place of previous segment
void RecordDeleted(DbTableStats* stats, const PrimeValue& pv, tiering::DiskSegment segment) {
void RecordDeleted(DbTableStats* stats, const PrimeValue& pv, size_t tiered_len) {
stats->AddTypeMemoryUsage(pv.ObjType(), pv.MallocUsed());
stats->tiered_entries--;
stats->tiered_used_bytes -= segment.length;
stats->tiered_used_bytes -= tiered_len;
}

// Find entry by key in db_slice and store external segment in place of original value.
// Update memory stats
void SetExternal(OpManager::KeyRef key, tiering::DiskSegment segment) {
if (auto pv = Find(key); pv) {
RecordAdded(db_slice_->MutableStats(key.first), *pv, segment);
RecordAdded(db_slice_->MutableStats(key.first), *pv, segment.length);

pv->SetIoPending(false);
pv->SetExternal(segment.offset, segment.length);
Expand Down Expand Up @@ -113,9 +113,7 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {
if (!value.empty())
pv->SetString(value);

RecordDeleted(db_slice_->MutableStats(dbid), *pv, segment);

(value.empty() ? stats_.total_deletes : stats_.total_fetches)++;
RecordDeleted(db_slice_->MutableStats(dbid), *pv, segment.length);
}

// Find entry by key and store it's up-to-date value in place of external segment.
Expand Down Expand Up @@ -160,6 +158,8 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {

bool ReportFetched(EntryId id, string_view value, tiering::DiskSegment segment,
bool modified) override {
++stats_.total_fetches;

if (id == EntryId{kFragmentedBin}) { // Generally we read whole bins only for defrag
Defragment(segment, value);
return true; // delete
Expand Down Expand Up @@ -206,8 +206,8 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {
bool cache_fetched_ = false;

struct {
size_t total_stashes = 0, total_fetches = 0, total_cancels = 0, total_deletes = 0;
size_t total_defrags = 0; // included in total_fetches
size_t total_stashes = 0, total_cancels = 0, total_fetches = 0;
size_t total_defrags = 0;
} stats_;

TieredStorage* ts_;
Expand Down Expand Up @@ -310,9 +310,11 @@ bool TieredStorage::TryStash(DbIndex dbid, string_view key, PrimeValue* value) {

void TieredStorage::Delete(DbIndex dbid, PrimeValue* value) {
DCHECK(value->IsExternal());
++stats_.total_deletes;

tiering::DiskSegment segment = value->GetExternalSlice();
op_manager_->Delete(segment);
op_manager_->SetInMemory(value, dbid, "", segment);
op_manager_->DeleteOffloaded(segment);
op_manager_->SetInMemory(value, dbid, string_view{}, segment);
}

void TieredStorage::CancelStash(DbIndex dbid, std::string_view key, PrimeValue* value) {
Expand Down
1 change: 1 addition & 0 deletions src/server/tiered_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ class TieredStorage {
unsigned write_depth_limit_ = 10;
struct {
uint64_t stash_overflow_cnt = 0;
uint64_t total_deletes = 0;
} stats_;
};

Expand Down
53 changes: 38 additions & 15 deletions src/server/tiered_storage_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,31 @@ ABSL_DECLARE_FLAG(unsigned, tiered_storage_write_depth);

namespace dfly {

using absl::GetFlag;
using absl::SetFlag;

string BuildString(size_t len, char c = 'A') {
return string(len, c);
}

class TieredStorageTest : public BaseFamilyTest {
protected:
TieredStorageTest() {
num_threads_ = 1;
}

void SetUp() override {
if (absl::GetFlag(FLAGS_force_epoll)) {
if (GetFlag(FLAGS_force_epoll)) {
LOG(WARNING) << "Can't run tiered tests on EPOLL";
exit(0);
}

absl::SetFlag(&FLAGS_tiered_storage_write_depth, 15000);
absl::SetFlag(&FLAGS_tiered_prefix, "/tmp/tiered_storage_test");
absl::SetFlag(&FLAGS_tiered_storage_cache_fetched, true);
absl::SetFlag(&FLAGS_backing_file_direct, true);
SetFlag(&FLAGS_tiered_storage_write_depth, 15000);
if (GetFlag(FLAGS_tiered_prefix).empty()) {
SetFlag(&FLAGS_tiered_prefix, "/tmp/tiered_storage_test");
}
SetFlag(&FLAGS_tiered_storage_cache_fetched, true);
SetFlag(&FLAGS_backing_file_direct, true);

BaseFamilyTest::SetUp();
}
Expand All @@ -54,13 +63,13 @@ class TieredStorageTest : public BaseFamilyTest {
// Perform simple series of SET, GETSET and GET
TEST_F(TieredStorageTest, SimpleGetSet) {
absl::FlagSaver saver;
absl::SetFlag(&FLAGS_tiered_offload_threshold, 1.1f); // disable offloading
SetFlag(&FLAGS_tiered_offload_threshold, 1.1f); // disable offloading
const int kMin = 256;
const int kMax = tiering::kPageSize + 10;

// Perform SETs
for (size_t i = kMin; i < kMax; i++) {
Run({"SET", absl::StrCat("k", i), string(i, 'A')});
Run({"SET", absl::StrCat("k", i), BuildString(i)});
}

// Make sure all entries were stashed, except the one not filling a small page
Expand Down Expand Up @@ -113,26 +122,26 @@ TEST_F(TieredStorageTest, SimpleAppend) {
// TODO: use pipelines to issue APPEND/GET/APPEND sequence,
// currently it's covered only for op_manager_test
for (size_t sleep : {0, 100, 500, 1000}) {
Run({"SET", "k0", string(3000, 'A')});
Run({"SET", "k0", BuildString(3000)});
if (sleep)
util::ThisFiber::SleepFor(sleep * 1us);
EXPECT_THAT(Run({"APPEND", "k0", "B"}), IntArg(3001));
EXPECT_EQ(Run({"GET", "k0"}), string(3000, 'A') + 'B');
EXPECT_EQ(Run({"GET", "k0"}), BuildString(3000) + 'B');
}
}

TEST_F(TieredStorageTest, MultiDb) {
for (size_t i = 0; i < 10; i++) {
Run({"SELECT", absl::StrCat(i)});
Run({"SET", absl::StrCat("k", i), string(3000, char('A' + i))});
Run({"SET", absl::StrCat("k", i), BuildString(3000, char('A' + i))});
}

ExpectConditionWithinTimeout([this] { return GetMetrics().tiered_stats.total_stashes >= 10; });

for (size_t i = 0; i < 10; i++) {
Run({"SELECT", absl::StrCat(i)});
EXPECT_EQ(GetMetrics().db_stats[i].tiered_entries, 1);
EXPECT_EQ(Run({"GET", absl::StrCat("k", i)}), string(3000, char('A' + i)));
EXPECT_EQ(Run({"GET", absl::StrCat("k", i)}), BuildString(3000, char('A' + i)));
EXPECT_EQ(GetMetrics().db_stats[i].tiered_entries, 0);
}
}
Expand Down Expand Up @@ -168,7 +177,7 @@ TEST_F(TieredStorageTest, Defrag) {

TEST_F(TieredStorageTest, BackgroundOffloading) {
absl::FlagSaver saver;
absl::SetFlag(&FLAGS_tiered_offload_threshold, 0.0f); // offload all values
SetFlag(&FLAGS_tiered_offload_threshold, 0.0f); // offload all values

const int kNum = 500;

Expand All @@ -177,7 +186,7 @@ TEST_F(TieredStorageTest, BackgroundOffloading) {

// Stash all values
for (size_t i = 0; i < kNum; i++) {
Run({"SET", absl::StrCat("k", i), string(3000, 'A')});
Run({"SET", absl::StrCat("k", i), BuildString(3000)});
}

ExpectConditionWithinTimeout([&] { return GetMetrics().db_stats[0].tiered_entries == kNum; });
Expand All @@ -200,11 +209,11 @@ TEST_F(TieredStorageTest, BackgroundOffloading) {

TEST_F(TieredStorageTest, FlushAll) {
absl::FlagSaver saver;
absl::SetFlag(&FLAGS_tiered_offload_threshold, 0.0f); // offload all values
SetFlag(&FLAGS_tiered_offload_threshold, 0.0f); // offload all values

const int kNum = 500;
for (size_t i = 0; i < kNum; i++) {
Run({"SET", absl::StrCat("k", i), string(3000, 'A')});
Run({"SET", absl::StrCat("k", i), BuildString(3000)});
}
ExpectConditionWithinTimeout([&] { return GetMetrics().db_stats[0].tiered_entries == kNum; });

Expand All @@ -228,4 +237,18 @@ TEST_F(TieredStorageTest, FlushAll) {
EXPECT_GT(metrics.tiered_stats.total_fetches, 2u);
}

TEST_F(TieredStorageTest, FlushPending) {
absl::FlagSaver saver;
SetFlag(&FLAGS_tiered_offload_threshold, 0.0f); // offload all values

const int kNum = 10;
for (size_t i = 0; i < kNum; i++) {
Run({"SET", absl::StrCat("k", i), BuildString(256)});
}
ExpectConditionWithinTimeout(
[&] { return GetMetrics().tiered_stats.small_bins_filling_bytes > 0; });
Run({"FLUSHALL"});
EXPECT_EQ(GetMetrics().tiered_stats.small_bins_filling_bytes, 0u);
}

} // namespace dfly
2 changes: 1 addition & 1 deletion src/server/tiering/op_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ void OpManager::Delete(EntryId id) {
pending_stash_ver_.erase(ToOwned(id));
}

void OpManager::Delete(DiskSegment segment) {
void OpManager::DeleteOffloaded(DiskSegment segment) {
EntryOps* pending_op = nullptr;

auto base_it = pending_reads_.find(segment.ContainingPages().offset);
Expand Down
4 changes: 2 additions & 2 deletions src/server/tiering/op_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ class OpManager {
// Delete entry with pending io
void Delete(EntryId id);

// Delete offloaded entry
void Delete(DiskSegment segment);
// Delete offloaded entry located at the segment.
void DeleteOffloaded(DiskSegment segment);

// Stash value to be offloaded
std::error_code Stash(EntryId id, std::string_view value);
Expand Down
11 changes: 7 additions & 4 deletions tests/dragonfly/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from copy import deepcopy

from pathlib import Path
from tempfile import TemporaryDirectory, gettempdir
from tempfile import gettempdir, mkdtemp

from .instance import DflyInstance, DflyParams, DflyInstanceFactory, RedisServer
from . import PortPicker, dfly_args
Expand All @@ -37,9 +37,12 @@ def tmp_dir():
where the Dragonfly executable will be run and where all test data
should be stored. The directory will be cleaned up at the end of a session
"""
tmp = TemporaryDirectory()
yield Path(tmp.name)
tmp.cleanup()
tmp_name = mkdtemp()
yield Path(tmp_name)
if os.environ.get("DRAGONFLY_KEEP_TMP"):
logging.info(f"Keeping tmp dir {tmp_name}")
return
os.rmdir(tmp_name)


@pytest.fixture(scope="session")
Expand Down
2 changes: 2 additions & 0 deletions tests/dragonfly/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,8 @@ def _check_status(self):
if not self.params.existing_port:
return_code = self.proc.poll()
if return_code is not None:
# log stdout of the failed process
logging.error("Dragonfly process error:\n%s", self.proc.stdout.read().decode())
self.proc = None
raise DflyStartException(f"Failed to start instance, return code {return_code}")

Expand Down

0 comments on commit 5449d4f

Please sign in to comment.