Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GC skeleton #181

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomeObjectConan(ConanFile):
name = "homeobject"
version = "2.0.3"
version = "2.1.3"
homepage = "https://github.com/eBay/HomeObject"
description = "Blob Store built on HomeReplication"
topics = ("ebay")
Expand Down
3 changes: 2 additions & 1 deletion src/include/homeobject/homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ class HomeObject {
virtual HomeObjectStats get_stats() const = 0;
};

extern std::shared_ptr< HomeObject > init_homeobject(std::weak_ptr< HomeObjectApplication >&& application);
extern std::shared_ptr< HomeObject > init_homeobject(std::weak_ptr< HomeObjectApplication >&& application,
uint32_t gc_defrag_refresh_interval_second = 60ul);

} // namespace homeobject
//
Expand Down
1 change: 1 addition & 0 deletions src/lib/homestore_backend/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ target_sources("${PROJECT_NAME}_homestore" PRIVATE
replication_state_machine.cpp
hs_cp_callbacks.cpp
hs_http_manager.cpp
gc_manager.cpp
$<TARGET_OBJECTS:${PROJECT_NAME}_core>
)
target_link_libraries("${PROJECT_NAME}_homestore" PUBLIC
Expand Down
169 changes: 169 additions & 0 deletions src/lib/homestore_backend/gc_manager.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
#include "gc_manager.hpp"

#include <thread>

namespace homeobject {

void GCManager::start(uint32_t gc_defrag_refresh_interval_second) {
if (!m_heap_chunk_selector) {
LOGERRORMOD(
homeobject,
"GCManager have not been initialized!, please use initialzie_with_chunk_selector() to initialize it.");
return;
}

// gc thread function
auto gc_task_handler = [this](uint32_t pdev_id) {
RELEASE_ASSERT(m_per_dev_gc_controller.contains(pdev_id), "controller of pdev {} is not initialized!", pdev_id);
auto& controller = m_per_dev_gc_controller[pdev_id];
// the reservced chunk is a brand new chunk. if the move_to chunk of a gc task is not specified, the reserved
// chunk will be used as the move_to chunk.
// here, select chunk is called before homeobject service start, so this will make sure we can select a brand
// new chunk.
homestore::blk_alloc_hints hint;
hint.pdev_id_hint = pdev_id;
VChunk reserved_chunk(m_heap_chunk_selector->select_chunk(0, hint));

m_total_threads_num++;
while (!m_stop_gc) {
VChunk move_to_chunk(nullptr);
VChunk move_from_chunk(nullptr);

std::unique_lock< std::mutex > lock(controller.mtx);
if (!controller.high_priority_gc_task_queue.empty()) {
// TODO: add seperate new thread for high priority gc task if necessary, so that is can be processed
// ASAP.
std::tie(move_from_chunk, move_to_chunk) = controller.high_priority_gc_task_queue.front();
controller.high_priority_gc_task_queue.pop();
} else if (!controller.defrag_heap.empty()) {
move_from_chunk = controller.defrag_heap.top();
controller.defrag_heap.pop();
} else {
lock.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
continue;
}
lock.unlock();

RELEASE_ASSERT(move_from_chunk.get_pdev_id() == pdev_id,
"currrent pdev_id is {}, but get a move_from_chunk with chunk_id {} and pdev_id {}, which "
"is not in the same pdev!",
pdev_id, move_from_chunk.get_chunk_id(), move_from_chunk.get_pdev_id());

if (!m_heap_chunk_selector->select_specific_chunk(move_from_chunk.get_chunk_id())) {
LOGWARNMOD(homeobject, "failed to select specific chunk with chunk_id {}",
move_from_chunk.get_chunk_id());
continue;
}

if (!move_to_chunk.get_internal_chunk()) move_to_chunk = reserved_chunk;
// handle gc task
gc(move_from_chunk, move_to_chunk);
m_total_gc_chunk_count++;

// after gc, the move_from_chunk is empty, so we can use is as the new reserved chunk.
// the move_to_chunk is not empty, so we need to release it.
reserved_chunk = move_from_chunk;
m_heap_chunk_selector->release_chunk(move_to_chunk.get_chunk_id());
}
m_total_threads_num--;
};

uint32_t gc_total_thread_num = 0;
for (const auto& [pdev_id, controller] : m_per_dev_gc_controller) {
// TODO: add gc thread pool for each pdev if necessary
std::thread(gc_task_handler, pdev_id).detach();
gc_total_thread_num++;
}

// wait until all the gc threads have selected a brand new chunk as the reserved move_to_chunk.
while (m_total_threads_num < gc_total_thread_num) {
// we can also use a condition variable to replace all the while wait and notify in this file.
// we can change this if necessary.
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}

// TODO:make the interval of refresh defrag heap and iolimiter configurable

// start thread to refresh defrag_heap for each pdev periodically
std::thread([this, gc_defrag_refresh_interval_second]() {
m_total_threads_num++;
while (!m_stop_gc) {
for (const auto& [pdev_id, _] : m_per_dev_gc_controller)
refresh_defrag_heap(pdev_id);

// Sleep for a period of time before refreshing again
std::this_thread::sleep_for(std::chrono::seconds(gc_defrag_refresh_interval_second));
}
m_total_threads_num--;
}).detach();

// start thread to refresh io_limiter for each pdev periodically
std::thread([this]() {
m_total_threads_num++;
while (!m_stop_gc) {
for (auto& [_, controller] : m_per_dev_gc_controller)
controller.io_limiter.refresh();
// Sleep for a period of time before refreshing again
std::this_thread::sleep_for(std::chrono::seconds(1));
}
m_total_threads_num--;
}).detach();

LOGINFOMOD(homeobject, "gc manager has been started with {} gc thread.", gc_total_thread_num);
}

void GCManager::stop() {
m_stop_gc = true;
// theoretically,some gc thread might not started before stop() is called.
// TODO:make sure waiting for all gc thread exit gracefully
while (m_total_threads_num > 0) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
LOGINFOMOD(homeobject, "all GC threads have stopped.");
}

void GCManager::add_gc_task(VChunk& move_from_chunk, VChunk& move_to_chunk) {
auto pdev_id = move_from_chunk.get_pdev_id();
if (pdev_id != move_to_chunk.get_pdev_id()) {
LOGWARNMOD(homeobject, "move_from_chunk(pdev_id {}) and move_to_chunk(pdev_id {}) should be in the same pdev!",
pdev_id, move_to_chunk.get_pdev_id());
return;
}
LOGINFOMOD(homeobject, "high priority gc task is added, move_from_chunk: {}, move_to_chunk: {}, pdev_id: {}",
move_from_chunk.get_chunk_id(), move_to_chunk.get_chunk_id(), pdev_id);
std::unique_lock< std::mutex > lock(m_per_dev_gc_controller[pdev_id].mtx);
m_per_dev_gc_controller[pdev_id].high_priority_gc_task_queue.emplace(move_from_chunk, move_to_chunk);
}

GCManager::GCManager(std::shared_ptr< HeapChunkSelector > chunk_selector) :
m_heap_chunk_selector{chunk_selector}, metrics_{*this} {
RELEASE_ASSERT(m_heap_chunk_selector, "chunk_selector is nullptr!");
auto pdev_ids = m_heap_chunk_selector->get_all_pdev_ids();
RELEASE_ASSERT(pdev_ids.size(), "chunk_selector is not initialized!");
for (const auto& pdev_id : pdev_ids)
m_per_dev_gc_controller[pdev_id];
}

void GCManager::refresh_defrag_heap(uint32_t pdev_id) {
if (!m_heap_chunk_selector) {
LOGERRORMOD(homeobject,
"GCManager has not been initialized, fail to refresh defrag heap for pdev {}. skip the operation",
pdev_id);
return;
}
const auto& pdev_chunks = m_heap_chunk_selector->get_all_chunks(pdev_id);
VChunkDefragHeap new_defrag_heap;
for (const auto& chunk : pdev_chunks) {
if (chunk.get_defrag_nblks()) new_defrag_heap.emplace(chunk);
}
new_defrag_heap.swap(m_per_dev_gc_controller[pdev_id].defrag_heap);
}

void GCManager::gc(VChunk move_from_chunk, VChunk move_to_chunk) {
LOGINFOMOD(homeobject, "start gc chunk {} , move to chunk {}, pdev_id {}.", move_from_chunk.get_chunk_id(),
move_to_chunk.get_chunk_id(), move_from_chunk.get_pdev_id());
// TODO: implement gc logic
}

} // namespace homeobject
113 changes: 113 additions & 0 deletions src/lib/homestore_backend/gc_manager.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
#pragma once

#include "heap_chunk_selector.h"

#include <sisl/utility/enum.hpp>

namespace homeobject {

class GCManager {
public:
GCManager(std::shared_ptr< HeapChunkSelector >);

void start(uint32_t);
void stop();

/**
* @brief this is used for emergency or recovered GC task when restarting
*/
void add_gc_task(VChunk&, VChunk&);

~GCManager() = default;

private:
class VChunkDefragComparator {
public:
bool operator()(VChunk& lhs, VChunk& rhs) { return lhs.get_defrag_nblks() < rhs.get_defrag_nblks(); }
};

using VChunkDefragHeap = std::priority_queue< VChunk, std::vector< VChunk >, VChunkDefragComparator >;
struct PerDevGCController {
class IOLimiter {
public:
IOLimiter() : m_tokenBucket(1000), m_lastRefreshTime(std::chrono::steady_clock::now()) {}

void refresh() {
std::lock_guard< std::mutex > lock(m_mutex);
auto currentTime = std::chrono::steady_clock::now();
auto elapsedTime =
std::chrono::duration_cast< std::chrono::milliseconds >(currentTime - m_lastRefreshTime);
m_tokenBucket += m_tokensPerSecond * elapsedTime.count();
m_tokenBucket = std::min(m_tokenBucket, m_maxTokens);
m_lastRefreshTime = currentTime;
}

// gc thread will consume tokens before doing IO to limit the IO resource occupied by gc for each pdev
bool tryConsumeTokens(uint32_t tokens) {
std::lock_guard< std::mutex > lock(m_mutex);
if (m_tokenBucket >= tokens) {
m_tokenBucket -= tokens;
return true;
}
return false;
}

private:
std::mutex m_mutex;
uint32_t m_tokenBucket;
// TODO: make m_maxTokens and m_tokensPerSecond configurable, and set them as optimized values after testing
const uint32_t m_maxTokens = 1000; // Maximum number of tokens in the bucket
const uint32_t m_tokensPerSecond = 100; // Number of tokens added per millisecond
std::chrono::steady_clock::time_point m_lastRefreshTime;
};

std::mutex mtx;
IOLimiter io_limiter;
VChunkDefragHeap defrag_heap;
// emergency and recovered GC tasks will be put in this queue
std::queue< std::pair< VChunk, VChunk > > high_priority_gc_task_queue;
};

void refresh_defrag_heap(uint32_t pdev_id);

void gc(VChunk move_from_chunk, VChunk move_to_chunk);

// metrics
struct GCMetrics : public sisl::MetricsGroup {
public:
GCMetrics(GCManager const& gc_manager) : sisl::MetricsGroup{"GC", "Singelton"}, gc_manager{gc_manager} {
// TODO:: add more metrics
REGISTER_GAUGE(total_gc_chunk_count, "Number of chunks which has been GC");

register_me_to_farm();
attach_gather_cb(std::bind(&GCMetrics::on_gather, this));
}
~GCMetrics() { deregister_me_from_farm(); }
GCMetrics(const GCMetrics&) = delete;
GCMetrics(GCMetrics&&) noexcept = delete;
GCMetrics& operator=(const GCMetrics&) = delete;
GCMetrics& operator=(GCMetrics&&) noexcept = delete;

void on_gather() { GAUGE_UPDATE(*this, total_gc_chunk_count, gc_manager.total_gc_chunk_count()); }

private:
GCManager const& gc_manager;
};

uint64_t total_gc_chunk_count() const { return m_total_gc_chunk_count; }

private:
GCManager(const GCManager&) = delete;
GCManager& operator=(const GCManager&) = delete;

std::shared_ptr< HeapChunkSelector > m_heap_chunk_selector;
std::unordered_map< uint32_t, PerDevGCController > m_per_dev_gc_controller;
std::atomic_ushort m_total_threads_num{0};
std::atomic_bool m_stop_gc{false};

// metrics
GCMetrics metrics_;
std::atomic_uint16_t m_total_gc_chunk_count{0};
};

} // namespace homeobject
Loading
Loading