Skip to content

Commit

Permalink
[WIP] dont use this version. Partially enabling the leader registry. …
Browse files Browse the repository at this point in the history
…need to talk with Edward before proceeding.
  • Loading branch information
songweijia committed Feb 6, 2024
1 parent 3e9c823 commit fca4dc0
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 21 deletions.
38 changes: 26 additions & 12 deletions include/derecho/conf/conf.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,18 @@ constexpr std::size_t DERECHO_MIN_RPC_RESPONSE_SIZE = 128;
/** The single configuration file for derecho **/
class Conf {
public:

//String constants for config options
static constexpr const char* DERECHO_LEADER_IP = "DERECHO/leader_ip";
static constexpr const char* DERECHO_LEADER_GMS_PORT = "DERECHO/leader_gms_port";
static constexpr const char* DERECHO_LEADER_EXTERNAL_PORT = "DERECHO/leader_external_port";
static constexpr const char* DERECHO_RESTART_LEADERS = "DERECHO/restart_leaders";
static constexpr const char* DERECHO_RESTART_LEADER_PORTS = "DERECHO/restart_leader_ports";
#ifdef ENABLE_LEADER_REGISTRY
static constexpr const char* DERECHO_LEADER_REGISTRY_IP = "DERECHO/leader_registry_ip";
static constexpr const char* DERECHO_LEADER_REGISTRY_PORT = "DERECHO/leader_registry_port";
#else
static constexpr const char* DERECHO_LEADER_IP = "DERECHO/leader_ip";
static constexpr const char* DERECHO_LEADER_GMS_PORT = "DERECHO/leader_gms_port";
static constexpr const char* DERECHO_LEADER_EXTERNAL_PORT = "DERECHO/leader_external_port";
static constexpr const char* DERECHO_RESTART_LEADERS = "DERECHO/restart_leaders";
static constexpr const char* DERECHO_RESTART_LEADER_PORTS = "DERECHO/restart_leader_ports";
#endif
static constexpr const char* DERECHO_LOCAL_ID = "DERECHO/local_id";
static constexpr const char* DERECHO_LOCAL_IP = "DERECHO/local_ip";
static constexpr const char* DERECHO_GMS_PORT = "DERECHO/gms_port";
Expand Down Expand Up @@ -80,11 +86,16 @@ class Conf {
// config name --> default value
std::map<const std::string, std::string> config = {
// [DERECHO]
#ifdef ENABLE_LEADER_REGISTRY
{DERECHO_LEADER_REGISTRY_IP, "127.0.0.1"},
{DERECHO_LEADER_REGISTRY_PORT, "50182"},
#else
{DERECHO_LEADER_IP, "127.0.0.1"},
{DERECHO_LEADER_GMS_PORT, "23580"},
{DERECHO_LEADER_EXTERNAL_PORT, "32645"},
{DERECHO_RESTART_LEADERS, "127.0.0.1"},
{DERECHO_RESTART_LEADER_PORTS, "23580"},
#endif
{DERECHO_LOCAL_ID, "0"},
{DERECHO_LOCAL_IP, "127.0.0.1"},
{DERECHO_GMS_PORT, "23580"},
Expand Down Expand Up @@ -216,6 +227,16 @@ class Conf {
*/
const std::tuple<std::string,uint16_t,uint16_t> get_leader() const;

#ifdef ENABLE_LEADER_REGISTRY
/**
* Push a leader to leader registry
* @param ip The ip address of the new leader.
* @param gms_port The GMS port of the new leader.
* @param ext_port The EXT port of the new leader.
*/
void push_leader(std::string ip,uint16_t gms_port,uint16_t ext_port);
#endif

// Initialize the singleton from the command line and the configuration file.
// The command line has higher priority than the configuration file
// The process we find the configuration file:
Expand Down Expand Up @@ -264,12 +285,5 @@ const std::string getAbsoluteFilePath(const std::string& filename);
*/
std::vector<std::string> split_string(const std::string& str, const std::string& delimiter = ",");

#ifdef ENABLE_LEADER_REGISTRY
/**
* The default port number of leader registry
*/
constexpr uint16_t LEADER_REGISTRY_PORT = 50182;
#endif

} // namespace derecho
#endif // CONF_HPP
5 changes: 5 additions & 0 deletions include/derecho/core/detail/restart_state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ struct RestartState {
* replicated state.
*/
std::vector<std::vector<int64_t>> restart_shard_leaders;
#ifdef ENABLE_LEADER_REGISTRY
// Leader registry will tell who is the current leader. We don't need to test it in the order of
// "restart leaders".
#else
/**
* List of IP addresses of potential restart leaders (of the overall process)
* in descending priority order
Expand All @@ -81,6 +85,7 @@ struct RestartState {
* into restart_leader_ips.
*/
uint32_t num_leader_failures;
#endif
/**
* Reads the logs stored at this node and initializes logged_ragged_trim
*/
Expand Down
6 changes: 4 additions & 2 deletions src/conf/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,17 @@ if (${ENABLE_LEADER_REGISTRY})
$<BUILD_INTERFACE:${CMAKE_SOURCE_DIR}/include>
$<BUILD_INTERFACE:${CMAKE_BINARY_DIR}/include>
)
target_link_libraries(leader_registry rpclib::rpc pthread)
target_link_libraries(leader_registry rpclib::rpc derecho pthread)
add_dependencies(leader_registry derecho)

add_executable(leader_registry_client leader_registry_client.cpp)
target_include_directories(leader_registry_client PRIVATE
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}>
$<BUILD_INTERFACE:${CMAKE_SOURCE_DIR}/include>
$<BUILD_INTERFACE:${CMAKE_BINARY_DIR}/include>
)
target_link_libraries(leader_registry_client rpclib::rpc pthread)
target_link_libraries(leader_registry_client rpclib::rpc derecho pthread)
add_dependencies(leader_registry_client derecho)
endif()

add_executable(conftst test.cpp conf.cpp)
Expand Down
27 changes: 23 additions & 4 deletions src/conf/conf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
#include <sys/stat.h>
#include <unistd.h>
#include <stdexcept>
#ifdef ENABLE_LEADER_REGISTRY
#include <rpc/client.h>
#endif

namespace derecho {

Expand All @@ -29,11 +32,16 @@ std::atomic<uint32_t> Conf::singleton_initialized_flag = 0;
{ x, required_argument, 0, 0 }
struct option Conf::long_options[] = {
// [DERECHO]
#ifdef ENABLE_LEADER_REGISTRY
MAKE_LONG_OPT_ENTRY(DERECHO_LEADER_REGISTRY_IP),
MAKE_LONG_OPT_ENTRY(DERECHO_LEADER_REGISTRY_PORT),
#else
MAKE_LONG_OPT_ENTRY(DERECHO_LEADER_IP),
MAKE_LONG_OPT_ENTRY(DERECHO_LEADER_GMS_PORT),
MAKE_LONG_OPT_ENTRY(DERECHO_LEADER_EXTERNAL_PORT),
MAKE_LONG_OPT_ENTRY(DERECHO_RESTART_LEADERS),
MAKE_LONG_OPT_ENTRY(DERECHO_RESTART_LEADER_PORTS),
#endif
MAKE_LONG_OPT_ENTRY(DERECHO_LOCAL_ID),
MAKE_LONG_OPT_ENTRY(DERECHO_LOCAL_IP),
MAKE_LONG_OPT_ENTRY(DERECHO_GMS_PORT),
Expand Down Expand Up @@ -171,16 +179,27 @@ const Conf* Conf::get() noexcept(true) {
}

const std::tuple<std::string,uint16_t,uint16_t> Conf::get_leader() const {
// TODO: implement a full version of this.
std::tuple<std::string,uint16_t,uint16_t> leader =
{
#ifdef ENABLE_LEADER_REGISTRY
::rpc::client lrc(this->getString(DERECHO_LEADER_REGISTRY_IP),
this->getUInt16(DERECHO_LEADER_REGISTRY_PORT));
return lrc.call("get").as<std::tuple<std::string,uint16_t,uint16_t>>();
#else
return std::tuple<std::string,uint16_t,uint16_t> {
this->getString(DERECHO_LEADER_IP),
this->getUInt16(DERECHO_LEADER_GMS_PORT),
this->getUInt16(DERECHO_LEADER_EXTERNAL_PORT)
};
return leader;
#endif
}

#ifdef ENABLE_LEADER_REGISTRY
void Conf::push_leader(std::string ip,uint16_t gms_port,uint16_t ext_port) {
::rpc::client lrc(this->getString(DERECHO_LEADER_REGISTRY_IP),
this->getUInt16(DERECHO_LEADER_REGISTRY_PORT));
lrc.call("put",ip,gms_port,ext_port);
}
#endif

const std::string& getConfString(const std::string& key) {
return Conf::get()->getString(key);
}
Expand Down
2 changes: 1 addition & 1 deletion src/conf/leader_registry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ void term_handler(int signum) {

int main(int argc, char** argv) {

uint16_t port = LEADER_REGISTRY_PORT;
uint16_t port = getConfUInt16(Conf::DERECHO_LEADER_REGISTRY_PORT);
std::string leader_ip = "";
uint16_t leader_gms_port = 0;
bool daemonized = false;
Expand Down
4 changes: 2 additions & 2 deletions src/conf/leader_registry_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ struct option long_options[] = {
};

int main(int argc, char** argv) {
std::string server = "localhost";
uint16_t port = LEADER_REGISTRY_PORT;
std::string server = getConfString(Conf::DERECHO_LEADER_REGISTRY_IP);
uint16_t port = getConfUInt16(Conf::DERECHO_LEADER_REGISTRY_PORT);

while(true) {
int option_index = 0;
Expand Down
11 changes: 11 additions & 0 deletions src/core/view_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ bool ViewManager::first_init() {
if(curr_view) {
dbg_debug(vm_logger, "Found view {} on disk", curr_view->vid);
restart_state = std::make_unique<RestartState>();
#ifdef ENABLE_LEADER_REGISTRY
#else
restart_state->restart_leader_ips = split_string(getConfString(Conf::DERECHO_RESTART_LEADERS));
restart_state->restart_leader_ports = [&]() {
//"Apply std::stoi over the result of split_string(getConfString(...))"
Expand All @@ -83,6 +85,7 @@ bool ViewManager::first_init() {
return ports;
}();
restart_state->num_leader_failures = 0;
#endif//ENABLE_LEADER_REGISTRY
restart_to_initial_view();
} else {
startup_to_first_view();
Expand Down Expand Up @@ -135,8 +138,13 @@ bool ViewManager::restart_to_initial_view() {
bool got_initial_view = false;
while(!got_initial_view) {
//Determine if I am the current restart leader
#ifdef ENABLE_LEADER_REGISTRY
auto leader_tuple = Conf::get()->get_leader();
if (my_ip == std::get<0>(leader_tuple) && my_gms_port == std::get<1>(leader_tuple)) {
#else
if(my_ip == restart_state->restart_leader_ips[restart_state->num_leader_failures]
&& my_gms_port == restart_state->restart_leader_ports[restart_state->num_leader_failures]) {
#endif
in_total_restart = true;
active_leader = true;
dbg_info(vm_logger, "Logged View {} found on disk. Restarting in recovery mode as the leader.", curr_view->vid);
Expand Down Expand Up @@ -245,6 +253,8 @@ bool ViewManager::receive_initial_view() {
std::vector<std::type_index>{});
// Initialize restart_state, which wasn't created in first_init() because we had no logged View
restart_state = std::make_unique<RestartState>();
#ifdef ENABLE_LEADER_REGISTRY
#else
restart_state->restart_leader_ips = split_string(getConfString(Conf::DERECHO_RESTART_LEADERS));
restart_state->restart_leader_ports = [&]() {
auto port_list = split_string(getConfString(Conf::DERECHO_RESTART_LEADER_PORTS));
Expand All @@ -255,6 +265,7 @@ bool ViewManager::receive_initial_view() {
return ports;
}();
restart_state->num_leader_failures = 0;
#endif//ENABLE_LEADER_REGISTRY
}
dbg_debug(vm_logger, "Sending view {} to leader", curr_view->vid);
auto leader_socket_write = [this](const uint8_t* bytes, std::size_t size) {
Expand Down

0 comments on commit fca4dc0

Please sign in to comment.