From bfa5399b888f21bda284e9d62a6d57f43ce13639 Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Fri, 12 Jul 2024 12:05:28 +0300 Subject: [PATCH] chore: Separate tiered serialization format from object values (#3310) Before that OpManager::Stash, DiskStorage::Stash and SmallBins::Stash received a single value argument as a serialized blob of an object value. Now they accept an additional footer argument that allows appending arbitrary byte sequence when preparing a disk page. We could, of course serialize, into value before calling these interface but then we would need another copy. With two arguments we allow an optional meta-format that has a footer after the value. This PR does not use this feature yet, but we will need it in the next PR in order to store the encoding mask of the string object. In the future we will need storing other meta information in case we decide supporting multiple DF types. Signed-off-by: Roman Gershman --- src/server/tiered_storage.cc | 14 +++---- src/server/tiering/disk_storage.cc | 49 ++++++++++++++----------- src/server/tiering/disk_storage.h | 8 ++-- src/server/tiering/disk_storage_test.cc | 14 +++---- src/server/tiering/op_manager.cc | 25 +++++++------ src/server/tiering/op_manager.h | 8 ++-- src/server/tiering/op_manager_test.cc | 18 ++++----- src/server/tiering/small_bins.cc | 20 ++++------ src/server/tiering/small_bins.h | 3 +- src/server/tiering/small_bins_test.cc | 8 ++-- 10 files changed, 87 insertions(+), 80 deletions(-) diff --git a/src/server/tiered_storage.cc b/src/server/tiered_storage.cc index d75bd8ffa981..5b234f0d454c 100644 --- a/src/server/tiered_storage.cc +++ b/src/server/tiered_storage.cc @@ -103,12 +103,12 @@ class TieredStorage::ShardOpManager : public tiering::OpManager { // Load all values from bin by their hashes void Defragment(tiering::DiskSegment segment, string_view value); - void NotifyStashed(EntryId id, tiering::DiskSegment segment, error_code ec) override { - if (ec) { - VLOG(1) << "Stash failed " << ec.message(); + void NotifyStashed(EntryId id, const io::Result& segment) override { + if (!segment) { + VLOG(1) << "Stash failed " << segment.error().message(); visit([this](auto id) { ClearIoPending(id); }, id); } else { - visit([this, segment](auto id) { SetExternal(id, segment); }, id); + visit([this, segment](auto id) { SetExternal(id, *segment); }, id); } } @@ -299,10 +299,10 @@ bool TieredStorage::TryStash(DbIndex dbid, string_view key, PrimeValue* value) { error_code ec; if (OccupiesWholePages(value->Size())) { // large enough for own page id = KeyRef(dbid, key); - ec = op_manager_->Stash(id, value_sv); - } else if (auto bin = bins_->Stash(dbid, key, value_sv); bin) { + ec = op_manager_->Stash(id, value_sv, {}); + } else if (auto bin = bins_->Stash(dbid, key, value_sv, {}); bin) { id = bin->first; - ec = op_manager_->Stash(id, bin->second); + ec = op_manager_->Stash(id, bin->second, {}); } if (ec) { diff --git a/src/server/tiering/disk_storage.cc b/src/server/tiering/disk_storage.cc index 68804736a3d6..e708fba9ff54 100644 --- a/src/server/tiering/disk_storage.cc +++ b/src/server/tiering/disk_storage.cc @@ -22,6 +22,7 @@ ABSL_FLAG(uint64_t, registered_buffer_size, 512_KB, namespace dfly::tiering { +using namespace std; using namespace ::util::fb2; namespace { @@ -30,13 +31,13 @@ UringBuf AllocateTmpBuf(size_t size) { size = (size + kPageSize - 1) / kPageSize * kPageSize; VLOG(1) << "Fallback to temporary allocation: " << size; - uint8_t* buf = new (std::align_val_t(kPageSize)) uint8_t[size]; - return UringBuf{{buf, size}, std::nullopt}; + uint8_t* buf = new (align_val_t(kPageSize)) uint8_t[size]; + return UringBuf{{buf, size}, nullopt}; } void DestroyTmpBuf(UringBuf buf) { DCHECK(!buf.buf_idx); - ::operator delete[](buf.bytes.data(), std::align_val_t(kPageSize)); + ::operator delete[](buf.bytes.data(), align_val_t(kPageSize)); } void ReturnBuf(UringBuf buf) { @@ -51,12 +52,12 @@ void ReturnBuf(UringBuf buf) { constexpr off_t kInitialSize = 1UL << 28; // 256MB -template std::error_code DoFiberCall(void (SubmitEntry::*c)(Ts...), Ts... args) { +template error_code DoFiberCall(void (SubmitEntry::*c)(Ts...), Ts... args) { auto* proactor = static_cast(ProactorBase::me()); FiberCall fc(proactor); (fc.operator->()->*c)(std::forward(args)...); FiberCall::IoResult io_res = fc.Get(); - return io_res < 0 ? std::error_code{-io_res, std::system_category()} : std::error_code{}; + return io_res < 0 ? error_code{-io_res, system_category()} : error_code{}; } } // anonymous namespace @@ -64,7 +65,7 @@ template std::error_code DoFiberCall(void (SubmitEntry::*c)(Ts. DiskStorage::DiskStorage(size_t max_size) : max_size_(max_size) { } -std::error_code DiskStorage::Open(std::string_view path) { +error_code DiskStorage::Open(string_view path) { DCHECK_EQ(ProactorBase::me()->GetKind(), ProactorBase::IOURING); CHECK(!backing_file_); @@ -86,13 +87,13 @@ std::error_code DiskStorage::Open(std::string_view path) { auto* up = static_cast(ProactorBase::me()); if (int io_res = up->RegisterBuffers(absl::GetFlag(FLAGS_registered_buffer_size)); io_res < 0) - return std::error_code{-io_res, std::system_category()}; + return error_code{-io_res, system_category()}; return {}; } void DiskStorage::Close() { - using namespace std::chrono_literals; + using namespace chrono_literals; // TODO: to fix this polling. while (pending_ops_ > 0 || grow_pending_) @@ -106,12 +107,15 @@ void DiskStorage::Read(DiskSegment segment, ReadCb cb) { DCHECK_GT(segment.length, 0u); DCHECK_EQ(segment.offset % kPageSize, 0u); - UringBuf buf = PrepareBuf(segment.length); - auto io_cb = [this, cb = std::move(cb), buf, segment](int io_res) { - if (io_res < 0) - cb("", std::error_code{-io_res, std::system_category()}); - else - cb(std::string_view{reinterpret_cast(buf.bytes.data()), segment.length}, {}); + size_t len = segment.length; + UringBuf buf = PrepareBuf(len); + auto io_cb = [this, cb = std::move(cb), buf, len](int io_res) { + if (io_res < 0) { + cb(nonstd::make_unexpected(error_code{-io_res, system_category()})); + return; + } + + cb(string_view{reinterpret_cast(buf.bytes.data()), len}); ReturnBuf(buf); pending_ops_--; }; @@ -130,10 +134,11 @@ void DiskStorage::MarkAsFree(DiskSegment segment) { alloc_.Free(segment.offset, segment.length); } -std::error_code DiskStorage::Stash(io::Bytes bytes, StashCb cb) { +std::error_code DiskStorage::Stash(io::Bytes bytes, io::Bytes footer, StashCb cb) { DCHECK_GT(bytes.length(), 0u); - int64_t offset = alloc_.Malloc(bytes.size()); + size_t len = bytes.size() + footer.size(); + int64_t offset = alloc_.Malloc(len); // If we've run out of space, block and grow as much as needed if (offset < 0) { @@ -141,20 +146,22 @@ std::error_code DiskStorage::Stash(io::Bytes bytes, StashCb cb) { // Right now we do it synchronously as well (see Grow(256MB) call.) RETURN_ON_ERR(Grow(-offset)); - offset = alloc_.Malloc(bytes.size()); + offset = alloc_.Malloc(len); if (offset < 0) // we can't fit it even after resizing return std::make_error_code(std::errc::file_too_large); } - UringBuf buf = PrepareBuf(bytes.size()); + UringBuf buf = PrepareBuf(len); memcpy(buf.bytes.data(), bytes.data(), bytes.length()); + if (!footer.empty()) + memcpy(buf.bytes.data() + bytes.length(), footer.data(), footer.length()); - auto io_cb = [this, cb, offset, buf, len = bytes.size()](int io_res) { + auto io_cb = [this, cb, offset, buf, len](int io_res) { if (io_res < 0) { MarkAsFree({size_t(offset), len}); - cb({}, std::error_code{-io_res, std::system_category()}); + cb(nonstd::make_unexpected(error_code{-io_res, std::system_category()})); } else { - cb({size_t(offset), len}, {}); + cb(DiskSegment{size_t(offset), len}); } ReturnBuf(buf); pending_ops_--; diff --git a/src/server/tiering/disk_storage.h b/src/server/tiering/disk_storage.h index bb0edccb7350..9b142c582dec 100644 --- a/src/server/tiering/disk_storage.h +++ b/src/server/tiering/disk_storage.h @@ -24,8 +24,8 @@ class DiskStorage { uint64_t registered_buf_alloc_count = 0; }; - using ReadCb = std::function; - using StashCb = std::function; + using ReadCb = std::function)>; + using StashCb = std::function)>; explicit DiskStorage(size_t max_size); @@ -42,12 +42,14 @@ class DiskStorage { // grow backing file. Returns error code if operation failed immediately (most likely it failed // to grow the backing file) or passes an empty segment if the final write operation failed. // Bytes are copied and can be dropped before cb is resolved - std::error_code Stash(io::Bytes bytes, StashCb cb); + std::error_code Stash(io::Bytes bytes, io::Bytes footer, StashCb cb); Stats GetStats() const; private: std::error_code Grow(off_t grow_size); + + // Returns a buffer with size greater or equal to len. util::fb2::UringBuf PrepareBuf(size_t len); off_t size_, max_size_; diff --git a/src/server/tiering/disk_storage_test.cc b/src/server/tiering/disk_storage_test.cc index 21ffc1ee0ec9..7fd2ca9d8bc4 100644 --- a/src/server/tiering/disk_storage_test.cc +++ b/src/server/tiering/disk_storage_test.cc @@ -37,19 +37,19 @@ struct DiskStorageTest : public PoolTestBase { void Stash(size_t index, string value) { pending_ops_++; auto buf = make_shared(value); - storage_->Stash(io::Buffer(*buf), [this, index, buf](DiskSegment segment, std::error_code ec) { - EXPECT_FALSE(ec); - EXPECT_GT(segment.length, 0u); - segments_[index] = segment; + storage_->Stash(io::Buffer(*buf), {}, [this, index, buf](io::Result segment) { + EXPECT_TRUE(segment); + EXPECT_GT(segment->length, 0u); + segments_[index] = *segment; pending_ops_--; }); } void Read(size_t index) { pending_ops_++; - storage_->Read(segments_[index], [this, index](string_view value, std::error_code ec) { - EXPECT_FALSE(ec); - last_reads_[index] = value; + storage_->Read(segments_[index], [this, index](io::Result value) { + EXPECT_TRUE(value); + last_reads_[index] = *value; pending_ops_--; }); } diff --git a/src/server/tiering/op_manager.cc b/src/server/tiering/op_manager.cc index d7f271357583..ccd943ee2c35 100644 --- a/src/server/tiering/op_manager.cc +++ b/src/server/tiering/op_manager.cc @@ -74,17 +74,17 @@ void OpManager::DeleteOffloaded(DiskSegment segment) { } } -std::error_code OpManager::Stash(EntryId id_ref, std::string_view value) { +std::error_code OpManager::Stash(EntryId id_ref, std::string_view value, io::Bytes footer) { auto id = ToOwned(id_ref); unsigned version = pending_stash_ver_[id] = ++pending_stash_counter_; - io::Bytes buf_view{reinterpret_cast(value.data()), value.length()}; - auto io_cb = [this, version, id = std::move(id)](DiskSegment segment, std::error_code ec) { - ProcessStashed(Borrowed(id), version, segment, ec); + io::Bytes buf_view = io::Buffer(value); + auto io_cb = [this, version, id = std::move(id)](io::Result segment) { + ProcessStashed(Borrowed(id), version, segment); }; // May block due to blocking call to Grow. - auto ec = storage_.Stash(buf_view, std::move(io_cb)); + auto ec = storage_.Stash(buf_view, footer, std::move(io_cb)); if (ec) pending_stash_ver_.erase(ToOwned(id_ref)); return ec; @@ -96,23 +96,24 @@ OpManager::ReadOp& OpManager::PrepareRead(DiskSegment aligned_segment) { auto [it, inserted] = pending_reads_.try_emplace(aligned_segment.offset, aligned_segment); if (inserted) { - auto io_cb = [this, aligned_segment](std::string_view value, std::error_code ec) { - ProcessRead(aligned_segment.offset, value); + auto io_cb = [this, aligned_segment](io::Result result) { + CHECK(result) << result.error(); // TODO: to handle this gracefully. + ProcessRead(aligned_segment.offset, *result); }; storage_.Read(aligned_segment, io_cb); } return it->second; } -void OpManager::ProcessStashed(EntryId id, unsigned version, DiskSegment segment, - std::error_code ec) { +void OpManager::ProcessStashed(EntryId id, unsigned version, + const io::Result& segment) { if (auto it = pending_stash_ver_.find(ToOwned(id)); it != pending_stash_ver_.end() && it->second == version) { pending_stash_ver_.erase(it); - NotifyStashed(id, segment, ec); - } else if (!ec) { + NotifyStashed(id, segment); + } else if (segment) { // Throw away the value because it's no longer up-to-date even if no error occured - storage_.MarkAsFree(segment); + storage_.MarkAsFree(*segment); } } diff --git a/src/server/tiering/op_manager.h b/src/server/tiering/op_manager.h index 65d47d280ee9..c9b9fa46093c 100644 --- a/src/server/tiering/op_manager.h +++ b/src/server/tiering/op_manager.h @@ -57,15 +57,15 @@ class OpManager { // Delete offloaded entry located at the segment. void DeleteOffloaded(DiskSegment segment); - // Stash value to be offloaded - std::error_code Stash(EntryId id, std::string_view value); + // Stash (value, footer) to be offloaded. Both arguments are opaque to OpManager. + std::error_code Stash(EntryId id, std::string_view value, io::Bytes footer); Stats GetStats() const; protected: // Notify that a stash succeeded and the entry was stored at the provided segment or failed with // given error - virtual void NotifyStashed(EntryId id, DiskSegment segment, std::error_code ec) = 0; + virtual void NotifyStashed(EntryId id, const io::Result& segment) = 0; // Notify that an entry was successfully fetched. Includes whether entry was modified. // Returns true if value needs to be deleted. @@ -110,7 +110,7 @@ class OpManager { void ProcessRead(size_t offset, std::string_view value); // Called once Stash finished - void ProcessStashed(EntryId id, unsigned version, DiskSegment segment, std::error_code ec); + void ProcessStashed(EntryId id, unsigned version, const io::Result& segment); protected: DiskStorage storage_; diff --git a/src/server/tiering/op_manager_test.cc b/src/server/tiering/op_manager_test.cc index b20c5331c340..834b91d363bf 100644 --- a/src/server/tiering/op_manager_test.cc +++ b/src/server/tiering/op_manager_test.cc @@ -42,9 +42,9 @@ struct OpManagerTest : PoolTestBase, OpManager { return future; } - void NotifyStashed(EntryId id, DiskSegment segment, std::error_code ec) override { - EXPECT_FALSE(ec); - stashed_[id] = segment; + void NotifyStashed(EntryId id, const io::Result& segment) override { + ASSERT_TRUE(segment); + stashed_[id] = *segment; } bool NotifyFetched(EntryId id, std::string_view value, DiskSegment segment, @@ -66,9 +66,9 @@ TEST_F(OpManagerTest, SimpleStashesWithReads) { Open(); for (unsigned i = 0; i < 100; i++) { - EXPECT_FALSE(Stash(i, absl::StrCat("VALUE", i, "cancelled"))); - EXPECT_FALSE(Stash(i, absl::StrCat("VALUE", i, "cancelled"))); - EXPECT_FALSE(Stash(i, absl::StrCat("VALUE", i, "real"))); + EXPECT_FALSE(Stash(i, absl::StrCat("VALUE", i, "cancelled"), {})); + EXPECT_FALSE(Stash(i, absl::StrCat("VALUE", i, "cancelled"), {})); + EXPECT_FALSE(Stash(i, absl::StrCat("VALUE", i, "real"), {})); } EXPECT_EQ(GetStats().pending_stash_cnt, 100); @@ -93,7 +93,7 @@ TEST_F(OpManagerTest, DeleteAfterReads) { pp_->at(0)->Await([this] { Open(); - EXPECT_FALSE(Stash(0u, absl::StrCat("DATA"))); + EXPECT_FALSE(Stash(0u, absl::StrCat("DATA"), {})); while (stashed_.empty()) util::ThisFiber::SleepFor(1ms); @@ -122,7 +122,7 @@ TEST_F(OpManagerTest, ReadSamePageDifferentOffsets) { numbers += number; } - EXPECT_FALSE(Stash(0u, numbers)); + EXPECT_FALSE(Stash(0u, numbers, {})); while (stashed_.empty()) util::ThisFiber::SleepFor(1ms); @@ -144,7 +144,7 @@ TEST_F(OpManagerTest, Modify) { pp_->at(0)->Await([this] { Open(); - Stash(0u, "D"); + Stash(0u, "D", {}); while (stashed_.empty()) util::ThisFiber::SleepFor(1ms); diff --git a/src/server/tiering/small_bins.cc b/src/server/tiering/small_bins.cc index 8f93cb28f6c9..18009cbf1544 100644 --- a/src/server/tiering/small_bins.cc +++ b/src/server/tiering/small_bins.cc @@ -28,10 +28,10 @@ size_t StashedValueSize(string_view value) { } // namespace std::optional SmallBins::Stash(DbIndex dbid, std::string_view key, - std::string_view value) { + std::string_view value, io::Bytes footer) { DCHECK_LT(value.size(), 2_KB); - size_t value_bytes = StashedValueSize(value); + size_t value_bytes = StashedValueSize(value) + footer.size(); std::optional filled_bin; if (2 /* num entries */ + current_bin_bytes_ + value_bytes >= kPageSize) { @@ -39,17 +39,13 @@ std::optional SmallBins::Stash(DbIndex dbid, std::string_v } current_bin_bytes_ += value_bytes; + string blob; + blob.reserve(value.size() + footer.size()); + blob.append(value); + blob.append(io::View(footer)); + auto [it, inserted] = current_bin_.emplace(std::make_pair(dbid, key), std::move(blob)); + CHECK(inserted); - auto [it, inserted] = current_bin_.emplace(std::make_pair(dbid, key), value); - if (!inserted) { - LOG(ERROR) << "Duplicate key " << key << " dbid " << dbid; - LOG(ERROR) << "Values are same: " << int(it->second == value); - for (const auto& [key, _] : current_bin_) { - LOG(ERROR) << "Existing ones: " << key.first << " " << key.second; - } - - LOG(FATAL) << "Crashing!"; - } DVLOG(2) << "current_bin_bytes: " << current_bin_bytes_ << ", current_bin_size:" << current_bin_.size(); return filled_bin; diff --git a/src/server/tiering/small_bins.h b/src/server/tiering/small_bins.h index 05449f698b54..1cdfb84b8471 100644 --- a/src/server/tiering/small_bins.h +++ b/src/server/tiering/small_bins.h @@ -49,7 +49,8 @@ class SmallBins { } // Enqueue key/value pair for stash. Returns page to be stashed if it filled up. - std::optional Stash(DbIndex dbid, std::string_view key, std::string_view value); + std::optional Stash(DbIndex dbid, std::string_view key, std::string_view value, + io::Bytes footer); // Report that a stash succeeeded. Returns list of stored keys with calculated value locations. KeySegmentList ReportStashed(BinId id, DiskSegment segment); diff --git a/src/server/tiering/small_bins_test.cc b/src/server/tiering/small_bins_test.cc index e1f3c051e907..037e4180b1c1 100644 --- a/src/server/tiering/small_bins_test.cc +++ b/src/server/tiering/small_bins_test.cc @@ -30,7 +30,7 @@ TEST_F(SmallBinsTest, SimpleStashRead) { // Fill single bin std::optional bin; for (unsigned i = 0; !bin; i++) - bin = bins_.Stash(0, absl::StrCat("k", i), absl::StrCat("v", i)); + bin = bins_.Stash(0, absl::StrCat("k", i), absl::StrCat("v", i), {}); // Verify cut locations point to correct values auto segments = bins_.ReportStashed(bin->first, DiskSegment{0, 4_KB}); @@ -47,7 +47,7 @@ TEST_F(SmallBinsTest, SimpleDeleteAbort) { std::optional bin; unsigned i = 0; for (; !bin; i++) - bin = bins_.Stash(0, absl::StrCat("k", i), absl::StrCat("v", i)); + bin = bins_.Stash(0, absl::StrCat("k", i), absl::StrCat("v", i), {}); // Delete all even values for (unsigned j = 0; j <= i; j += 2) @@ -69,7 +69,7 @@ TEST_F(SmallBinsTest, PartialStashDelete) { std::optional bin; unsigned i = 0; for (; !bin; i++) - bin = bins_.Stash(0, absl::StrCat("k", i), absl::StrCat("v", i)); + bin = bins_.Stash(0, absl::StrCat("k", i), absl::StrCat("v", i), {}); // Delete all even values for (unsigned j = 0; j <= i; j += 2) @@ -103,7 +103,7 @@ TEST_F(SmallBinsTest, PartialStashDelete) { TEST_F(SmallBinsTest, UpdateStatsAfterDelete) { // caused https://github.com/dragonflydb/dragonfly/issues/3240 for (unsigned i = 0; i < 10; i++) { - auto spilled_bin = bins_.Stash(0, absl::StrCat("k", i), SmallString(128)); + auto spilled_bin = bins_.Stash(0, absl::StrCat("k", i), SmallString(128), {}); ASSERT_FALSE(spilled_bin); }