Skip to content

Commit

Permalink
Merge pull request #248 from koujl/resync_rebase
Browse files Browse the repository at this point in the history
Implementation of Baseline Resync
  • Loading branch information
koujl authored Jan 6, 2025
2 parents b66acf5 + 83ebc1b commit 38ab472
Show file tree
Hide file tree
Showing 25 changed files with 1,509 additions and 403 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/version_change_check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ name: Check Version Update
on:
pull_request:
types: [opened, edited, synchronize]
branches:
- main

jobs:
check-file:
Expand Down
24 changes: 12 additions & 12 deletions conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomeObjectConan(ConanFile):
name = "homeobject"
version = "2.1.20"
version = "2.2.0"

homepage = "https://github.com/eBay/HomeObject"
description = "Blob Store built on HomeReplication"
Expand All @@ -20,17 +20,17 @@ class HomeObjectConan(ConanFile):
settings = "arch", "os", "compiler", "build_type"

options = {
"shared": ['True', 'False'],
"fPIC": ['True', 'False'],
"coverage": ['True', 'False'],
"sanitize": ['True', 'False'],
}
"shared": ['True', 'False'],
"fPIC": ['True', 'False'],
"coverage": ['True', 'False'],
"sanitize": ['True', 'False'],
}
default_options = {
'shared': False,
'fPIC': True,
'coverage': False,
'sanitize': False,
}
'shared': False,
'fPIC': True,
'coverage': False,
'sanitize': False,
}

exports_sources = ("CMakeLists.txt", "cmake/*", "src/*", "LICENSE")

Expand All @@ -49,7 +49,7 @@ def build_requirements(self):

def requirements(self):
self.requires("sisl/[^12.2]@oss/master", transitive_headers=True)
self.requires("homestore/[^6.5]@oss/master")
self.requires("homestore/[^6.6]@oss/master")
self.requires("iomgr/[^11.3]@oss/master")
self.requires("lz4/1.9.4", override=True)
self.requires("openssl/3.3.1", override=True)
Expand Down
2 changes: 2 additions & 0 deletions src/include/homeobject/common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ using blob_id_t = uint64_t;
using peer_id_t = boost::uuids::uuid;
using pg_id_t = uint16_t;
using shard_id_t = uint64_t;
using snp_batch_id_t = uint16_t;
using snp_obj_id_t = uint64_t;

template < class E >
class Manager {
Expand Down
2 changes: 1 addition & 1 deletion src/include/homeobject/shard_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ struct ShardInfo {
shard_id_t id;
pg_id_t placement_group;
State state;
uint64_t lsn;
uint64_t lsn; // created_lsn
uint64_t created_time;
uint64_t last_modified_time;
uint64_t available_capacity_bytes;
Expand Down
13 changes: 11 additions & 2 deletions src/lib/homestore_backend/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ target_sources("${PROJECT_NAME}_homestore" PRIVATE
hs_shard_manager.cpp
hs_pg_manager.cpp
pg_blob_iterator.cpp
snapshot_receive_handler.cpp
index_kv.cpp
heap_chunk_selector.cpp
replication_state_machine.cpp
Expand All @@ -41,7 +42,8 @@ settings_gen_cpp(
${CMAKE_CURRENT_BINARY_DIR}/generated/
"${PROJECT_NAME}_homestore"
hs_backend_config.fbs
resync_pg_shard.fbs
resync_pg_data.fbs
resync_shard_data.fbs
resync_blob_data.fbs
)

Expand All @@ -67,5 +69,12 @@ target_link_libraries(homestore_test_dynamic PUBLIC
${COMMON_TEST_DEPS}
)

add_test(NAME HomestoreTestDynamic COMMAND homestore_test_dynamic -csv error --executor immediate --config_path ./ --override_config homestore_config.consensus.snapshot_freq_distance:0)
add_test(NAME HomestoreTestDynamic
COMMAND homestore_test_dynamic -csv error --executor immediate --config_path ./
--override_config homestore_config.consensus.snapshot_freq_distance:0)

# To test both baseline & incremental resync functionality, we use 13 to minimize the likelihood of it being a divisor of the total LSN (currently 30)
add_test(NAME HomestoreTestDynamicWithResync
COMMAND homestore_test_dynamic -csv error --executor immediate --config_path ./
--override_config homestore_config.consensus.snapshot_freq_distance:13
--override_config homestore_config.consensus.num_reserved_log_items=13)
19 changes: 19 additions & 0 deletions src/lib/homestore_backend/heap_chunk_selector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,25 @@ uint32_t HeapChunkSelector::get_chunk_size() const {
return chunk->size();
}

bool HeapChunkSelector::is_chunk_available(const pg_id_t pg_id, const chunk_num_t v_chunk_id) const {
std::shared_lock lock_guard(m_chunk_selector_mtx);
auto pg_it = m_per_pg_chunks.find(pg_id);
if (pg_it == m_per_pg_chunks.end()) {
LOGWARNMOD(homeobject, "No pg found for pg_id {}", pg_id);
return false;
}

auto pg_chunk_collection = pg_it->second;
auto& pg_chunks = pg_chunk_collection->m_pg_chunks;
if (v_chunk_id >= pg_chunks.size()) {
LOGWARNMOD(homeobject, "No chunk found for v_chunk_id {}", v_chunk_id);
return false;
}
std::scoped_lock lock(pg_chunk_collection->mtx);
auto chunk = pg_chunks[v_chunk_id];
return chunk->available();
}

std::optional< uint32_t > HeapChunkSelector::select_chunks_for_pg(pg_id_t pg_id, uint64_t pg_size) {
std::unique_lock lock_guard(m_chunk_selector_mtx);
if (m_per_pg_chunks.find(pg_id) != m_per_pg_chunks.end()) {
Expand Down
2 changes: 2 additions & 0 deletions src/lib/homestore_backend/heap_chunk_selector.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ class HeapChunkSelector : public homestore::ChunkSelector {
*/
uint32_t total_disks() const { return m_per_dev_heap.size(); }

bool is_chunk_available(const pg_id_t pg_id, const chunk_num_t v_chunk_id) const;

private:
void add_chunk_internal(const chunk_num_t, bool add_to_heap = true);

Expand Down
6 changes: 3 additions & 3 deletions src/lib/homestore_backend/hs_backend_config.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ table HSBackendSettings {
// timer thread freq in us
backend_timer_us: uint64 = 60000000 (hotswap);

// Maximum number of blobs in a snapshot batch
max_num_blobs_in_snapshot_batch: uint64 = 1024 (hotswap);

// Maximum size of a snapshot batch
max_snapshot_batch_size_mb: uint64 = 128 (hotswap);

//Snapshot blob load retry count
snapshot_blob_load_retry: uint8 = 3 (hotswap);
}

root_type HSBackendSettings;
117 changes: 74 additions & 43 deletions src/lib/homestore_backend/hs_blob_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ BlobError toBlobError(ReplServiceError const& e) {
case ReplServiceError::RESULT_NOT_EXIST_YET:
[[fallthrough]];
case ReplServiceError::TERM_MISMATCH:
[[fallthrough]];
case ReplServiceError::DATA_DUPLICATED:
return BlobError(BlobErrorCode::REPLICATION_ERROR);
case ReplServiceError::NOT_LEADER:
return BlobError(BlobErrorCode::NOT_LEADER);
Expand Down Expand Up @@ -116,6 +118,7 @@ BlobManager::AsyncResult< blob_id_t > HSHomeObject::_put_blob(ShardInfo const& s
req->header()->payload_crc = 0;
req->header()->shard_id = shard.id;
req->header()->pg_id = pg_id;
req->header()->blob_id = new_blob_id;
req->header()->seal();

// Serialize blob_id as Replication key
Expand Down Expand Up @@ -182,6 +185,51 @@ BlobManager::AsyncResult< blob_id_t > HSHomeObject::_put_blob(ShardInfo const& s
});
}

bool HSHomeObject::local_add_blob_info(pg_id_t const pg_id, BlobInfo const& blob_info) {
HS_PG* hs_pg{nullptr};
{
shared_lock lock_guard(_pg_lock);
const auto iter = _pg_map.find(pg_id);
RELEASE_ASSERT(iter != _pg_map.end(), "PG not found");
hs_pg = dynamic_cast< HS_PG* >(iter->second.get());
}
shared< BlobIndexTable > index_table = hs_pg->index_table_;
RELEASE_ASSERT(index_table != nullptr, "Index table not initialized");

// Write to index table with key {shard id, blob id} and value {pba}.
auto const [exist_already, status] = add_to_index_table(index_table, blob_info);
LOGTRACEMOD(blobmgr, "blob put commit shard_id: {} blob_id: {}, exist_already:{}, status:{}, pbas: {}",
blob_info.shard_id, blob_info.blob_id, exist_already, status, blob_info.pbas.to_string());
if (status != homestore::btree_status_t::success) {
LOGE("Failed to insert into index table for blob {} err {}", blob_info.blob_id, enum_name(status));
return false;
}
if (!exist_already) {
// The PG superblock (durable entities) will be persisted as part of HS_CLIENT Checkpoint, which is always
// done ahead of the Index Checkpoint. Hence, if the index already has this entity, whatever durable
// counters updated as part of the update would have been persisted already in PG superblock. So if we were
// to increment now, it will be a duplicate increment, hence ignoring for cases where index already exist
// for this blob put.

// Update the durable counters. We need to update the blob_sequence_num here only for replay case, as the
// number is already updated in the put_blob call.
hs_pg->durable_entities_update([&blob_info](auto& de) {
auto existing_blob_id = de.blob_sequence_num.load();
auto next_blob_id = blob_info.blob_id + 1;
while (next_blob_id > existing_blob_id &&
// we need update the blob_sequence_num to existing_blob_id+1 so that if leader changes, we can
// still get the up-to-date blob_sequence_num
!de.blob_sequence_num.compare_exchange_weak(existing_blob_id, next_blob_id)) {}
de.active_blob_count.fetch_add(1, std::memory_order_relaxed);
de.total_occupied_blk_count.fetch_add(blob_info.pbas.blk_count(), std::memory_order_relaxed);
});
} else {
LOGTRACEMOD(blobmgr, "blob already exists in index table, skip it. shard_id: {} blob_id: {}",
blob_info.shard_id, blob_info.blob_id);
}
return true;
}

void HSHomeObject::on_blob_put_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
homestore::MultiBlkId const& pbas,
cintrusive< homestore::repl_req_ctx >& hs_ctx) {
Expand All @@ -199,54 +247,19 @@ void HSHomeObject::on_blob_put_commit(int64_t lsn, sisl::blob const& header, sis
}

auto const blob_id = *(reinterpret_cast< blob_id_t* >(const_cast< uint8_t* >(key.cbytes())));
shared< BlobIndexTable > index_table;
HS_PG* hs_pg{nullptr};
{
std::shared_lock lock_guard(_pg_lock);
auto iter = _pg_map.find(msg_header->pg_id);
RELEASE_ASSERT(iter != _pg_map.end(), "PG not found");
hs_pg = static_cast< HS_PG* >(iter->second.get());
}

index_table = hs_pg->index_table_;
RELEASE_ASSERT(index_table != nullptr, "Index table not intialized");
auto const pg_id = msg_header->pg_id;

BlobInfo blob_info;
blob_info.shard_id = msg_header->shard_id;
blob_info.blob_id = blob_id;
blob_info.pbas = pbas;

// Write to index table with key {shard id, blob id } and value {pba}.
auto const [exist_already, status] = add_to_index_table(index_table, blob_info);
LOGTRACEMOD(blobmgr, "blob put commit shard_id: {} blob_id: {}, lsn:{}, exist_already:{}, status:{}, pbas: {}",
msg_header->shard_id, blob_id, lsn, exist_already, status, pbas.to_string());
if (!exist_already) {
if (status != homestore::btree_status_t::success) {
LOGE("Failed to insert into index table for blob {} err {}", lsn, enum_name(status));
if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(BlobError(BlobErrorCode::INDEX_ERROR))); }
return;
} else {
// The PG superblock (durable entities) will be persisted as part of HS_CLIENT Checkpoint, which is always
// done ahead of the Index Checkpoint. Hence if the index already has this entity, whatever durable counters
// updated as part of the update would have been persisted already in PG superblock. So if we were to
// increment now, it will be a duplicate increment, hence ignorning for cases where index already exist for
// this blob put.

// Update the durable counters. We need to update the blob_sequence_num here only for replay case, as the
// number is already updated in the put_blob call.
hs_pg->durable_entities_update([&blob_id, &pbas](auto& de) {
auto existing_blob_id = de.blob_sequence_num.load();
auto next_blob_id = blob_id + 1;
while ((next_blob_id > existing_blob_id) &&
// we need update the blob_sequence_num to existing_blob_id+1 so that if leader changes, we can
// still get the up-to-date blob_sequence_num
!de.blob_sequence_num.compare_exchange_weak(existing_blob_id, next_blob_id)) {}
de.active_blob_count.fetch_add(1, std::memory_order_relaxed);
de.total_occupied_blk_count.fetch_add(pbas.blk_count(), std::memory_order_relaxed);
});
}
bool success = local_add_blob_info(pg_id, blob_info);

if (ctx) {
ctx->promise_.setValue(success ? BlobManager::Result< BlobInfo >(blob_info)
: folly::makeUnexpected(BlobError(BlobErrorCode::INDEX_ERROR)));
}
if (ctx) { ctx->promise_.setValue(BlobManager::Result< BlobInfo >(blob_info)); }
}

BlobManager::AsyncResult< Blob > HSHomeObject::_get_blob(ShardInfo const& shard, blob_id_t blob_id, uint64_t req_offset,
Expand Down Expand Up @@ -357,6 +370,13 @@ HSHomeObject::blob_put_get_blk_alloc_hints(sisl::blob const& header, cintrusive<
return folly::makeUnexpected(homestore::ReplServiceError::FAILED);
}

auto pg_iter = _pg_map.find(msg_header->pg_id);
if (pg_iter == _pg_map.end()) {
LOGW("Received a blob_put on an unknown pg:{}, underlying engine will retry this later",
msg_header->pg_id);
return folly::makeUnexpected(homestore::ReplServiceError::RESULT_NOT_EXIST_YET);
}

std::scoped_lock lock_guard(_shard_lock);
auto shard_iter = _shard_map.find(msg_header->shard_id);
if (shard_iter == _shard_map.end()) {
Expand All @@ -365,11 +385,22 @@ HSHomeObject::blob_put_get_blk_alloc_hints(sisl::blob const& header, cintrusive<
return folly::makeUnexpected(homestore::ReplServiceError::RESULT_NOT_EXIST_YET);
}

homestore::blk_alloc_hints hints;

auto hs_shard = d_cast< HS_Shard* >((*shard_iter->second).get());
BLOGD(msg_header->shard_id, "n/a", "Picked p_chunk_id={}", hs_shard->sb_->p_chunk_id);

homestore::blk_alloc_hints hints;
hints.chunk_id_hint = hs_shard->sb_->p_chunk_id;

if (msg_header->blob_id != 0) {
//check if the blob already exists, if yes, return the blk id
auto index_table = d_cast< HS_PG* >(pg_iter->second.get())->index_table_;
auto r = get_blob_from_index_table(index_table, msg_header->shard_id, msg_header->blob_id);
if (r.hasValue()) {
LOGT("Blob has already been persisted, blob_id:{}, shard_id:{}", msg_header->blob_id, msg_header->shard_id);
hints.committed_blk_id = r.value();
}
}

return hints;
}

Expand Down
Loading

0 comments on commit 38ab472

Please sign in to comment.