Skip to content

Commit

Permalink
chore: Separate tiered serialization format from object values (#3310)
Browse files Browse the repository at this point in the history
Before that OpManager::Stash, DiskStorage::Stash and SmallBins::Stash received a
single value argument as a serialized blob of an object value.

Now they accept an additional footer argument that allows appending arbitrary byte sequence when
preparing a disk page. We could, of course serialize, into value before calling these interface but then
we would need another copy.
With two arguments we allow an optional meta-format that has a footer after the value.

This PR does not use this feature yet, but we will need it in the next PR in order to store the encoding mask of the string object.
In the future we will need storing other meta information in case we decide supporting multiple DF types.

Signed-off-by: Roman Gershman <[email protected]>
  • Loading branch information
romange authored Jul 12, 2024
1 parent d7351b3 commit bfa5399
Show file tree
Hide file tree
Showing 10 changed files with 87 additions and 80 deletions.
14 changes: 7 additions & 7 deletions src/server/tiered_storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,12 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {
// Load all values from bin by their hashes
void Defragment(tiering::DiskSegment segment, string_view value);

void NotifyStashed(EntryId id, tiering::DiskSegment segment, error_code ec) override {
if (ec) {
VLOG(1) << "Stash failed " << ec.message();
void NotifyStashed(EntryId id, const io::Result<tiering::DiskSegment>& segment) override {
if (!segment) {
VLOG(1) << "Stash failed " << segment.error().message();
visit([this](auto id) { ClearIoPending(id); }, id);
} else {
visit([this, segment](auto id) { SetExternal(id, segment); }, id);
visit([this, segment](auto id) { SetExternal(id, *segment); }, id);
}
}

Expand Down Expand Up @@ -299,10 +299,10 @@ bool TieredStorage::TryStash(DbIndex dbid, string_view key, PrimeValue* value) {
error_code ec;
if (OccupiesWholePages(value->Size())) { // large enough for own page
id = KeyRef(dbid, key);
ec = op_manager_->Stash(id, value_sv);
} else if (auto bin = bins_->Stash(dbid, key, value_sv); bin) {
ec = op_manager_->Stash(id, value_sv, {});
} else if (auto bin = bins_->Stash(dbid, key, value_sv, {}); bin) {
id = bin->first;
ec = op_manager_->Stash(id, bin->second);
ec = op_manager_->Stash(id, bin->second, {});
}

if (ec) {
Expand Down
49 changes: 28 additions & 21 deletions src/server/tiering/disk_storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ ABSL_FLAG(uint64_t, registered_buffer_size, 512_KB,

namespace dfly::tiering {

using namespace std;
using namespace ::util::fb2;

namespace {
Expand All @@ -30,13 +31,13 @@ UringBuf AllocateTmpBuf(size_t size) {
size = (size + kPageSize - 1) / kPageSize * kPageSize;
VLOG(1) << "Fallback to temporary allocation: " << size;

uint8_t* buf = new (std::align_val_t(kPageSize)) uint8_t[size];
return UringBuf{{buf, size}, std::nullopt};
uint8_t* buf = new (align_val_t(kPageSize)) uint8_t[size];
return UringBuf{{buf, size}, nullopt};
}

void DestroyTmpBuf(UringBuf buf) {
DCHECK(!buf.buf_idx);
::operator delete[](buf.bytes.data(), std::align_val_t(kPageSize));
::operator delete[](buf.bytes.data(), align_val_t(kPageSize));
}

void ReturnBuf(UringBuf buf) {
Expand All @@ -51,20 +52,20 @@ void ReturnBuf(UringBuf buf) {

constexpr off_t kInitialSize = 1UL << 28; // 256MB

template <typename... Ts> std::error_code DoFiberCall(void (SubmitEntry::*c)(Ts...), Ts... args) {
template <typename... Ts> error_code DoFiberCall(void (SubmitEntry::*c)(Ts...), Ts... args) {
auto* proactor = static_cast<UringProactor*>(ProactorBase::me());
FiberCall fc(proactor);
(fc.operator->()->*c)(std::forward<Ts>(args)...);
FiberCall::IoResult io_res = fc.Get();
return io_res < 0 ? std::error_code{-io_res, std::system_category()} : std::error_code{};
return io_res < 0 ? error_code{-io_res, system_category()} : error_code{};
}

} // anonymous namespace

DiskStorage::DiskStorage(size_t max_size) : max_size_(max_size) {
}

std::error_code DiskStorage::Open(std::string_view path) {
error_code DiskStorage::Open(string_view path) {
DCHECK_EQ(ProactorBase::me()->GetKind(), ProactorBase::IOURING);
CHECK(!backing_file_);

Expand All @@ -86,13 +87,13 @@ std::error_code DiskStorage::Open(std::string_view path) {

auto* up = static_cast<UringProactor*>(ProactorBase::me());
if (int io_res = up->RegisterBuffers(absl::GetFlag(FLAGS_registered_buffer_size)); io_res < 0)
return std::error_code{-io_res, std::system_category()};
return error_code{-io_res, system_category()};

return {};
}

void DiskStorage::Close() {
using namespace std::chrono_literals;
using namespace chrono_literals;

// TODO: to fix this polling.
while (pending_ops_ > 0 || grow_pending_)
Expand All @@ -106,12 +107,15 @@ void DiskStorage::Read(DiskSegment segment, ReadCb cb) {
DCHECK_GT(segment.length, 0u);
DCHECK_EQ(segment.offset % kPageSize, 0u);

UringBuf buf = PrepareBuf(segment.length);
auto io_cb = [this, cb = std::move(cb), buf, segment](int io_res) {
if (io_res < 0)
cb("", std::error_code{-io_res, std::system_category()});
else
cb(std::string_view{reinterpret_cast<char*>(buf.bytes.data()), segment.length}, {});
size_t len = segment.length;
UringBuf buf = PrepareBuf(len);
auto io_cb = [this, cb = std::move(cb), buf, len](int io_res) {
if (io_res < 0) {
cb(nonstd::make_unexpected(error_code{-io_res, system_category()}));
return;
}

cb(string_view{reinterpret_cast<char*>(buf.bytes.data()), len});
ReturnBuf(buf);
pending_ops_--;
};
Expand All @@ -130,31 +134,34 @@ void DiskStorage::MarkAsFree(DiskSegment segment) {
alloc_.Free(segment.offset, segment.length);
}

std::error_code DiskStorage::Stash(io::Bytes bytes, StashCb cb) {
std::error_code DiskStorage::Stash(io::Bytes bytes, io::Bytes footer, StashCb cb) {
DCHECK_GT(bytes.length(), 0u);

int64_t offset = alloc_.Malloc(bytes.size());
size_t len = bytes.size() + footer.size();
int64_t offset = alloc_.Malloc(len);

// If we've run out of space, block and grow as much as needed
if (offset < 0) {
// TODO: To introduce asynchronous call that starts resizing before we reach this step.
// Right now we do it synchronously as well (see Grow(256MB) call.)
RETURN_ON_ERR(Grow(-offset));

offset = alloc_.Malloc(bytes.size());
offset = alloc_.Malloc(len);
if (offset < 0) // we can't fit it even after resizing
return std::make_error_code(std::errc::file_too_large);
}

UringBuf buf = PrepareBuf(bytes.size());
UringBuf buf = PrepareBuf(len);
memcpy(buf.bytes.data(), bytes.data(), bytes.length());
if (!footer.empty())
memcpy(buf.bytes.data() + bytes.length(), footer.data(), footer.length());

auto io_cb = [this, cb, offset, buf, len = bytes.size()](int io_res) {
auto io_cb = [this, cb, offset, buf, len](int io_res) {
if (io_res < 0) {
MarkAsFree({size_t(offset), len});
cb({}, std::error_code{-io_res, std::system_category()});
cb(nonstd::make_unexpected(error_code{-io_res, std::system_category()}));
} else {
cb({size_t(offset), len}, {});
cb(DiskSegment{size_t(offset), len});
}
ReturnBuf(buf);
pending_ops_--;
Expand Down
8 changes: 5 additions & 3 deletions src/server/tiering/disk_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ class DiskStorage {
uint64_t registered_buf_alloc_count = 0;
};

using ReadCb = std::function<void(std::string_view, std::error_code)>;
using StashCb = std::function<void(DiskSegment, std::error_code)>;
using ReadCb = std::function<void(io::Result<std::string_view>)>;
using StashCb = std::function<void(io::Result<DiskSegment>)>;

explicit DiskStorage(size_t max_size);

Expand All @@ -42,12 +42,14 @@ class DiskStorage {
// grow backing file. Returns error code if operation failed immediately (most likely it failed
// to grow the backing file) or passes an empty segment if the final write operation failed.
// Bytes are copied and can be dropped before cb is resolved
std::error_code Stash(io::Bytes bytes, StashCb cb);
std::error_code Stash(io::Bytes bytes, io::Bytes footer, StashCb cb);

Stats GetStats() const;

private:
std::error_code Grow(off_t grow_size);

// Returns a buffer with size greater or equal to len.
util::fb2::UringBuf PrepareBuf(size_t len);

off_t size_, max_size_;
Expand Down
14 changes: 7 additions & 7 deletions src/server/tiering/disk_storage_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,19 @@ struct DiskStorageTest : public PoolTestBase {
void Stash(size_t index, string value) {
pending_ops_++;
auto buf = make_shared<string>(value);
storage_->Stash(io::Buffer(*buf), [this, index, buf](DiskSegment segment, std::error_code ec) {
EXPECT_FALSE(ec);
EXPECT_GT(segment.length, 0u);
segments_[index] = segment;
storage_->Stash(io::Buffer(*buf), {}, [this, index, buf](io::Result<DiskSegment> segment) {
EXPECT_TRUE(segment);
EXPECT_GT(segment->length, 0u);
segments_[index] = *segment;
pending_ops_--;
});
}

void Read(size_t index) {
pending_ops_++;
storage_->Read(segments_[index], [this, index](string_view value, std::error_code ec) {
EXPECT_FALSE(ec);
last_reads_[index] = value;
storage_->Read(segments_[index], [this, index](io::Result<string_view> value) {
EXPECT_TRUE(value);
last_reads_[index] = *value;
pending_ops_--;
});
}
Expand Down
25 changes: 13 additions & 12 deletions src/server/tiering/op_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,17 +74,17 @@ void OpManager::DeleteOffloaded(DiskSegment segment) {
}
}

std::error_code OpManager::Stash(EntryId id_ref, std::string_view value) {
std::error_code OpManager::Stash(EntryId id_ref, std::string_view value, io::Bytes footer) {
auto id = ToOwned(id_ref);
unsigned version = pending_stash_ver_[id] = ++pending_stash_counter_;

io::Bytes buf_view{reinterpret_cast<const uint8_t*>(value.data()), value.length()};
auto io_cb = [this, version, id = std::move(id)](DiskSegment segment, std::error_code ec) {
ProcessStashed(Borrowed(id), version, segment, ec);
io::Bytes buf_view = io::Buffer(value);
auto io_cb = [this, version, id = std::move(id)](io::Result<DiskSegment> segment) {
ProcessStashed(Borrowed(id), version, segment);
};

// May block due to blocking call to Grow.
auto ec = storage_.Stash(buf_view, std::move(io_cb));
auto ec = storage_.Stash(buf_view, footer, std::move(io_cb));
if (ec)
pending_stash_ver_.erase(ToOwned(id_ref));
return ec;
Expand All @@ -96,23 +96,24 @@ OpManager::ReadOp& OpManager::PrepareRead(DiskSegment aligned_segment) {

auto [it, inserted] = pending_reads_.try_emplace(aligned_segment.offset, aligned_segment);
if (inserted) {
auto io_cb = [this, aligned_segment](std::string_view value, std::error_code ec) {
ProcessRead(aligned_segment.offset, value);
auto io_cb = [this, aligned_segment](io::Result<std::string_view> result) {
CHECK(result) << result.error(); // TODO: to handle this gracefully.
ProcessRead(aligned_segment.offset, *result);
};
storage_.Read(aligned_segment, io_cb);
}
return it->second;
}

void OpManager::ProcessStashed(EntryId id, unsigned version, DiskSegment segment,
std::error_code ec) {
void OpManager::ProcessStashed(EntryId id, unsigned version,
const io::Result<DiskSegment>& segment) {
if (auto it = pending_stash_ver_.find(ToOwned(id));
it != pending_stash_ver_.end() && it->second == version) {
pending_stash_ver_.erase(it);
NotifyStashed(id, segment, ec);
} else if (!ec) {
NotifyStashed(id, segment);
} else if (segment) {
// Throw away the value because it's no longer up-to-date even if no error occured
storage_.MarkAsFree(segment);
storage_.MarkAsFree(*segment);
}
}

Expand Down
8 changes: 4 additions & 4 deletions src/server/tiering/op_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,15 @@ class OpManager {
// Delete offloaded entry located at the segment.
void DeleteOffloaded(DiskSegment segment);

// Stash value to be offloaded
std::error_code Stash(EntryId id, std::string_view value);
// Stash (value, footer) to be offloaded. Both arguments are opaque to OpManager.
std::error_code Stash(EntryId id, std::string_view value, io::Bytes footer);

Stats GetStats() const;

protected:
// Notify that a stash succeeded and the entry was stored at the provided segment or failed with
// given error
virtual void NotifyStashed(EntryId id, DiskSegment segment, std::error_code ec) = 0;
virtual void NotifyStashed(EntryId id, const io::Result<DiskSegment>& segment) = 0;

// Notify that an entry was successfully fetched. Includes whether entry was modified.
// Returns true if value needs to be deleted.
Expand Down Expand Up @@ -110,7 +110,7 @@ class OpManager {
void ProcessRead(size_t offset, std::string_view value);

// Called once Stash finished
void ProcessStashed(EntryId id, unsigned version, DiskSegment segment, std::error_code ec);
void ProcessStashed(EntryId id, unsigned version, const io::Result<DiskSegment>& segment);

protected:
DiskStorage storage_;
Expand Down
18 changes: 9 additions & 9 deletions src/server/tiering/op_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ struct OpManagerTest : PoolTestBase, OpManager {
return future;
}

void NotifyStashed(EntryId id, DiskSegment segment, std::error_code ec) override {
EXPECT_FALSE(ec);
stashed_[id] = segment;
void NotifyStashed(EntryId id, const io::Result<DiskSegment>& segment) override {
ASSERT_TRUE(segment);
stashed_[id] = *segment;
}

bool NotifyFetched(EntryId id, std::string_view value, DiskSegment segment,
Expand All @@ -66,9 +66,9 @@ TEST_F(OpManagerTest, SimpleStashesWithReads) {
Open();

for (unsigned i = 0; i < 100; i++) {
EXPECT_FALSE(Stash(i, absl::StrCat("VALUE", i, "cancelled")));
EXPECT_FALSE(Stash(i, absl::StrCat("VALUE", i, "cancelled")));
EXPECT_FALSE(Stash(i, absl::StrCat("VALUE", i, "real")));
EXPECT_FALSE(Stash(i, absl::StrCat("VALUE", i, "cancelled"), {}));
EXPECT_FALSE(Stash(i, absl::StrCat("VALUE", i, "cancelled"), {}));
EXPECT_FALSE(Stash(i, absl::StrCat("VALUE", i, "real"), {}));
}

EXPECT_EQ(GetStats().pending_stash_cnt, 100);
Expand All @@ -93,7 +93,7 @@ TEST_F(OpManagerTest, DeleteAfterReads) {
pp_->at(0)->Await([this] {
Open();

EXPECT_FALSE(Stash(0u, absl::StrCat("DATA")));
EXPECT_FALSE(Stash(0u, absl::StrCat("DATA"), {}));
while (stashed_.empty())
util::ThisFiber::SleepFor(1ms);

Expand Down Expand Up @@ -122,7 +122,7 @@ TEST_F(OpManagerTest, ReadSamePageDifferentOffsets) {
numbers += number;
}

EXPECT_FALSE(Stash(0u, numbers));
EXPECT_FALSE(Stash(0u, numbers, {}));
while (stashed_.empty())
util::ThisFiber::SleepFor(1ms);

Expand All @@ -144,7 +144,7 @@ TEST_F(OpManagerTest, Modify) {
pp_->at(0)->Await([this] {
Open();

Stash(0u, "D");
Stash(0u, "D", {});
while (stashed_.empty())
util::ThisFiber::SleepFor(1ms);

Expand Down
20 changes: 8 additions & 12 deletions src/server/tiering/small_bins.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,28 +28,24 @@ size_t StashedValueSize(string_view value) {
} // namespace

std::optional<SmallBins::FilledBin> SmallBins::Stash(DbIndex dbid, std::string_view key,
std::string_view value) {
std::string_view value, io::Bytes footer) {
DCHECK_LT(value.size(), 2_KB);

size_t value_bytes = StashedValueSize(value);
size_t value_bytes = StashedValueSize(value) + footer.size();

std::optional<FilledBin> filled_bin;
if (2 /* num entries */ + current_bin_bytes_ + value_bytes >= kPageSize) {
filled_bin = FlushBin();
}

current_bin_bytes_ += value_bytes;
string blob;
blob.reserve(value.size() + footer.size());
blob.append(value);
blob.append(io::View(footer));
auto [it, inserted] = current_bin_.emplace(std::make_pair(dbid, key), std::move(blob));
CHECK(inserted);

auto [it, inserted] = current_bin_.emplace(std::make_pair(dbid, key), value);
if (!inserted) {
LOG(ERROR) << "Duplicate key " << key << " dbid " << dbid;
LOG(ERROR) << "Values are same: " << int(it->second == value);
for (const auto& [key, _] : current_bin_) {
LOG(ERROR) << "Existing ones: " << key.first << " " << key.second;
}

LOG(FATAL) << "Crashing!";
}
DVLOG(2) << "current_bin_bytes: " << current_bin_bytes_
<< ", current_bin_size:" << current_bin_.size();
return filled_bin;
Expand Down
3 changes: 2 additions & 1 deletion src/server/tiering/small_bins.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ class SmallBins {
}

// Enqueue key/value pair for stash. Returns page to be stashed if it filled up.
std::optional<FilledBin> Stash(DbIndex dbid, std::string_view key, std::string_view value);
std::optional<FilledBin> Stash(DbIndex dbid, std::string_view key, std::string_view value,
io::Bytes footer);

// Report that a stash succeeeded. Returns list of stored keys with calculated value locations.
KeySegmentList ReportStashed(BinId id, DiskSegment segment);
Expand Down
Loading

0 comments on commit bfa5399

Please sign in to comment.