Skip to content

Commit

Permalink
Introduce a recursion function to update the waiting down buffer to u…
Browse files Browse the repository at this point in the history
…p buffer
  • Loading branch information
shosseinimotlagh committed Dec 24, 2024
1 parent 91ade04 commit 60d996f
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 45 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "6.6.1"
version = "6.6.2"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down
67 changes: 27 additions & 40 deletions src/lib/index/wb_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -582,26 +582,17 @@ void IndexWBCache::recover(sisl::byte_view sb) {
}
} else if (buf->m_created_cp_id == icp_ctx->id()) {
// New node
if (was_node_committed(buf)) {
if (was_node_committed(buf->m_up_buffer)) {
// Both current and up buffer is commited, we can safely commit the current block
m_vdev->commit_blk(buf->m_blkid);
pending_bufs.push_back(buf->m_up_buffer);
} else {
// Up buffer is not committed, we need to repair it first
buf->m_up_buffer->remove_down_buffer(buf);
// buf->m_up_buffer = nullptr;
if (buf->m_up_buffer->m_wait_for_down_buffers.testz()) {
// if up buffer has upbuffer, then we need to decrement its wait_for_down_buffers
auto grand_buf = buf->m_up_buffer->m_up_buffer;
if (grand_buf) {
HS_DBG_ASSERT(!grand_buf->m_wait_for_down_buffers.testz(),
"upbuffer of upbuffer is already zero");
grand_buf->remove_down_buffer(buf->m_up_buffer);
LOGINFOMOD(wbcache, "Decrementing wait_for_down_buffers for up buffer of up buffer {}",
grand_buf->to_string());
}
}
if (was_node_committed(buf) && was_node_committed(buf->m_up_buffer)) {
// Both current and up buffer is commited, we can safely commit the current block
m_vdev->commit_blk(buf->m_blkid);
pending_bufs.push_back(buf->m_up_buffer);
} else {
// Up buffer is not committed, we need to repair it first
buf->m_up_buffer->remove_down_buffer(buf);
// buf->m_up_buffer = nullptr;
if (buf->m_up_buffer->m_wait_for_down_buffers.testz()) {
// if up buffer has upbuffer, then we need to decrement its wait_for_down_buffers
update_up_buffer_counters(buf->m_up_buffer);
}
}
}
Expand Down Expand Up @@ -630,26 +621,22 @@ void IndexWBCache::recover(sisl::byte_view sb) {
m_vdev->recovery_completed();
}

void IndexWBCache::updateUpBufferCounters(std::vector< IndexBufferPtr >& l0_bufs) {
std::unordered_set< IndexBufferPtr > allBuffers;

// First, collect all unique buffers and reset their counters
for (auto& leaf : l0_bufs) {
auto currentBuffer = leaf;
while (currentBuffer) {
if (allBuffers.insert(currentBuffer).second) { currentBuffer->m_wait_for_down_buffers.set(0); }
currentBuffer = currentBuffer->m_up_buffer;
}
}

// Now, iterate over each leaf buffer and update the count for each parent up the chain
for (auto& leaf : l0_bufs) {
auto currentBuffer = leaf;
while (currentBuffer) {
if (currentBuffer->m_up_buffer) { currentBuffer->m_up_buffer->m_wait_for_down_buffers.increment(1); }
currentBuffer = currentBuffer->m_up_buffer;
}
// if buf->m_wait_for_down_buffers.testz() is true (which means that it has no dependency on any other buffer) then we
// can decrement the wait_for_down_buffers of its up buffer. If the up buffer has up buffer, then we need to decrement
// its wait_for_down_buffers. If the up buffer of up buffer has wait_for_down_buffers as 0, then we need to decrement
// its wait_for_down_buffers. This process continues until we reach the root buffer. If the root buffer has
// wait_for_down_buffers as 0, then we need to decrement its wait_for_down_buffers.
void IndexWBCache::update_up_buffer_counters(IndexBufferPtr const& buf) {
if (buf == nullptr || !buf->m_wait_for_down_buffers.testz() || buf->m_up_buffer == nullptr) {
LOGINFOMOD(wbcache, "Finish decrementing wait_for_down_buffers");
return;
}
auto grand_buf = buf->m_up_buffer;
grand_buf->remove_down_buffer(buf);
LOGINFOMOD(wbcache,
"Decrementing wait_for_down_buffers for buffer {} due to zero dependency of child {}, Keep going up",
grand_buf->to_string(), buf->to_string());
update_up_buffer_counters(grand_buf);
}

void IndexWBCache::recover_buf(IndexBufferPtr const& buf) {
Expand All @@ -670,7 +657,7 @@ void IndexWBCache::recover_buf(IndexBufferPtr const& buf) {
if (buf->m_up_buffer && buf->m_up_buffer->is_meta_buf()) {
// Our up buffer is a meta buffer, which means old root is dirtied and may need no repair but possible of
// new root on upper level so needs to be retore the edge
LOGTRACEMOD(wbcache, "check root change for without repairing {}\n\n", buf->to_string());
LOGTRACEMOD(wbcache, "check root change for without repairing {}", buf->to_string());
index_service().update_root(buf->m_index_ordinal, buf);
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/lib/index/wb_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,6 @@ class IndexWBCache : public IndexWBCacheBase {
void recover_buf(IndexBufferPtr const& buf);
bool was_node_committed(IndexBufferPtr const& buf);
void load_buf(IndexBufferPtr const& buf);
void updateUpBufferCounters(std::vector< IndexBufferPtr >& pending_bufs);
void update_up_buffer_counters(IndexBufferPtr const& buf);
};
} // namespace homestore
6 changes: 3 additions & 3 deletions src/tests/test_index_crash_recovery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ struct IndexCrashTest : public test_common::HSTestHelper, BtreeTestHelper< TestT
}

void crash_and_recover(OperationList& operations, std::string filename = "") {
// this->print_keys("Btree prior to CP and susbsequent simulated crash: ");
// this->print_keys("Btree prior to CP and susbsequent simulated crash: ");
LOGINFO("Before Crash: {} keys in shadow map and it is actually {} keys in tree - operations size {}",
this->m_shadow_map.size(), tree_key_count(), operations.size());

Expand All @@ -461,7 +461,7 @@ struct IndexCrashTest : public test_common::HSTestHelper, BtreeTestHelper< TestT
LOGINFO("Visualize the tree file after recovery : {}", rec_filename);
this->visualize_keys(rec_filename);
}
// this->print_keys("Post crash and recovery, btree structure: ");
// this->print_keys("Post crash and recovery, btree structure: ");
sanity_check(operations);
// Added to the index service right after recovery. Not needed here
// test_common::HSTestHelper::trigger_cp(true);
Expand All @@ -473,7 +473,7 @@ struct IndexCrashTest : public test_common::HSTestHelper, BtreeTestHelper< TestT
LOGINFO("Visualize the tree after reapply {}", re_filename);
this->visualize_keys(re_filename);
}
// this->print_keys("Post reapply, btree structure: ");
// this->print_keys("Post reapply, btree structure: ");

this->get_all();
LOGINFO("After reapply: {} keys in shadow map and actually {} in tress", this->m_shadow_map.size(),
Expand Down

0 comments on commit 60d996f

Please sign in to comment.