diff --git a/include/derecho/conf/conf.hpp b/include/derecho/conf/conf.hpp index 63e5476e..4b104613 100644 --- a/include/derecho/conf/conf.hpp +++ b/include/derecho/conf/conf.hpp @@ -21,12 +21,18 @@ constexpr std::size_t DERECHO_MIN_RPC_RESPONSE_SIZE = 128; /** The single configuration file for derecho **/ class Conf { public: + //String constants for config options - static constexpr const char* DERECHO_LEADER_IP = "DERECHO/leader_ip"; - static constexpr const char* DERECHO_LEADER_GMS_PORT = "DERECHO/leader_gms_port"; - static constexpr const char* DERECHO_LEADER_EXTERNAL_PORT = "DERECHO/leader_external_port"; - static constexpr const char* DERECHO_RESTART_LEADERS = "DERECHO/restart_leaders"; - static constexpr const char* DERECHO_RESTART_LEADER_PORTS = "DERECHO/restart_leader_ports"; +#ifdef ENABLE_LEADER_REGISTRY + static constexpr const char* DERECHO_LEADER_REGISTRY_IP = "DERECHO/leader_registry_ip"; + static constexpr const char* DERECHO_LEADER_REGISTRY_PORT = "DERECHO/leader_registry_port"; +#else + static constexpr const char* DERECHO_LEADER_IP = "DERECHO/leader_ip"; + static constexpr const char* DERECHO_LEADER_GMS_PORT = "DERECHO/leader_gms_port"; + static constexpr const char* DERECHO_LEADER_EXTERNAL_PORT = "DERECHO/leader_external_port"; + static constexpr const char* DERECHO_RESTART_LEADERS = "DERECHO/restart_leaders"; + static constexpr const char* DERECHO_RESTART_LEADER_PORTS = "DERECHO/restart_leader_ports"; +#endif static constexpr const char* DERECHO_LOCAL_ID = "DERECHO/local_id"; static constexpr const char* DERECHO_LOCAL_IP = "DERECHO/local_ip"; static constexpr const char* DERECHO_GMS_PORT = "DERECHO/gms_port"; @@ -80,11 +86,16 @@ class Conf { // config name --> default value std::map config = { // [DERECHO] +#ifdef ENABLE_LEADER_REGISTRY + {DERECHO_LEADER_REGISTRY_IP, "127.0.0.1"}, + {DERECHO_LEADER_REGISTRY_PORT, "50182"}, +#else {DERECHO_LEADER_IP, "127.0.0.1"}, {DERECHO_LEADER_GMS_PORT, "23580"}, {DERECHO_LEADER_EXTERNAL_PORT, "32645"}, {DERECHO_RESTART_LEADERS, "127.0.0.1"}, {DERECHO_RESTART_LEADER_PORTS, "23580"}, +#endif {DERECHO_LOCAL_ID, "0"}, {DERECHO_LOCAL_IP, "127.0.0.1"}, {DERECHO_GMS_PORT, "23580"}, @@ -216,6 +227,16 @@ class Conf { */ const std::tuple get_leader() const; +#ifdef ENABLE_LEADER_REGISTRY + /** + * Push a leader to leader registry + * @param ip The ip address of the new leader. + * @param gms_port The GMS port of the new leader. + * @param ext_port The EXT port of the new leader. + */ + void push_leader(std::string ip,uint16_t gms_port,uint16_t ext_port); +#endif + // Initialize the singleton from the command line and the configuration file. // The command line has higher priority than the configuration file // The process we find the configuration file: @@ -264,12 +285,5 @@ const std::string getAbsoluteFilePath(const std::string& filename); */ std::vector split_string(const std::string& str, const std::string& delimiter = ","); -#ifdef ENABLE_LEADER_REGISTRY -/** - * The default port number of leader registry - */ -constexpr uint16_t LEADER_REGISTRY_PORT = 50182; -#endif - } // namespace derecho #endif // CONF_HPP diff --git a/include/derecho/core/detail/restart_state.hpp b/include/derecho/core/detail/restart_state.hpp index 66f80c43..d8d5c5e1 100644 --- a/include/derecho/core/detail/restart_state.hpp +++ b/include/derecho/core/detail/restart_state.hpp @@ -67,6 +67,10 @@ struct RestartState { * replicated state. */ std::vector> restart_shard_leaders; +#ifdef ENABLE_LEADER_REGISTRY + // Leader registry will tell who is the current leader. We don't need to test it in the order of + // "restart leaders". +#else /** * List of IP addresses of potential restart leaders (of the overall process) * in descending priority order @@ -81,6 +85,7 @@ struct RestartState { * into restart_leader_ips. */ uint32_t num_leader_failures; +#endif /** * Reads the logs stored at this node and initializes logged_ragged_trim */ diff --git a/src/conf/CMakeLists.txt b/src/conf/CMakeLists.txt index 09d52330..c1a0e81b 100644 --- a/src/conf/CMakeLists.txt +++ b/src/conf/CMakeLists.txt @@ -13,7 +13,8 @@ if (${ENABLE_LEADER_REGISTRY}) $ $ ) - target_link_libraries(leader_registry rpclib::rpc pthread) + target_link_libraries(leader_registry rpclib::rpc derecho pthread) + add_dependencies(leader_registry derecho) add_executable(leader_registry_client leader_registry_client.cpp) target_include_directories(leader_registry_client PRIVATE @@ -21,7 +22,8 @@ if (${ENABLE_LEADER_REGISTRY}) $ $ ) - target_link_libraries(leader_registry_client rpclib::rpc pthread) + target_link_libraries(leader_registry_client rpclib::rpc derecho pthread) + add_dependencies(leader_registry_client derecho) endif() add_executable(conftst test.cpp conf.cpp) diff --git a/src/conf/conf.cpp b/src/conf/conf.cpp index abefd982..71f37bda 100644 --- a/src/conf/conf.cpp +++ b/src/conf/conf.cpp @@ -7,6 +7,9 @@ #include #include #include +#ifdef ENABLE_LEADER_REGISTRY +#include +#endif namespace derecho { @@ -29,11 +32,16 @@ std::atomic Conf::singleton_initialized_flag = 0; { x, required_argument, 0, 0 } struct option Conf::long_options[] = { // [DERECHO] +#ifdef ENABLE_LEADER_REGISTRY + MAKE_LONG_OPT_ENTRY(DERECHO_LEADER_REGISTRY_IP), + MAKE_LONG_OPT_ENTRY(DERECHO_LEADER_REGISTRY_PORT), +#else MAKE_LONG_OPT_ENTRY(DERECHO_LEADER_IP), MAKE_LONG_OPT_ENTRY(DERECHO_LEADER_GMS_PORT), MAKE_LONG_OPT_ENTRY(DERECHO_LEADER_EXTERNAL_PORT), MAKE_LONG_OPT_ENTRY(DERECHO_RESTART_LEADERS), MAKE_LONG_OPT_ENTRY(DERECHO_RESTART_LEADER_PORTS), +#endif MAKE_LONG_OPT_ENTRY(DERECHO_LOCAL_ID), MAKE_LONG_OPT_ENTRY(DERECHO_LOCAL_IP), MAKE_LONG_OPT_ENTRY(DERECHO_GMS_PORT), @@ -171,16 +179,27 @@ const Conf* Conf::get() noexcept(true) { } const std::tuple Conf::get_leader() const { - // TODO: implement a full version of this. - std::tuple leader = - { +#ifdef ENABLE_LEADER_REGISTRY + ::rpc::client lrc(this->getString(DERECHO_LEADER_REGISTRY_IP), + this->getUInt16(DERECHO_LEADER_REGISTRY_PORT)); + return lrc.call("get").as>(); +#else + return std::tuple { this->getString(DERECHO_LEADER_IP), this->getUInt16(DERECHO_LEADER_GMS_PORT), this->getUInt16(DERECHO_LEADER_EXTERNAL_PORT) }; - return leader; +#endif } +#ifdef ENABLE_LEADER_REGISTRY +void Conf::push_leader(std::string ip,uint16_t gms_port,uint16_t ext_port) { + ::rpc::client lrc(this->getString(DERECHO_LEADER_REGISTRY_IP), + this->getUInt16(DERECHO_LEADER_REGISTRY_PORT)); + lrc.call("put",ip,gms_port,ext_port); +} +#endif + const std::string& getConfString(const std::string& key) { return Conf::get()->getString(key); } diff --git a/src/conf/leader_registry.cpp b/src/conf/leader_registry.cpp index d79218fd..d4753605 100644 --- a/src/conf/leader_registry.cpp +++ b/src/conf/leader_registry.cpp @@ -116,7 +116,7 @@ void term_handler(int signum) { int main(int argc, char** argv) { - uint16_t port = LEADER_REGISTRY_PORT; + uint16_t port = getConfUInt16(Conf::DERECHO_LEADER_REGISTRY_PORT); std::string leader_ip = ""; uint16_t leader_gms_port = 0; bool daemonized = false; diff --git a/src/conf/leader_registry_client.cpp b/src/conf/leader_registry_client.cpp index 320b09c2..f2630887 100644 --- a/src/conf/leader_registry_client.cpp +++ b/src/conf/leader_registry_client.cpp @@ -34,8 +34,8 @@ struct option long_options[] = { }; int main(int argc, char** argv) { - std::string server = "localhost"; - uint16_t port = LEADER_REGISTRY_PORT; + std::string server = getConfString(Conf::DERECHO_LEADER_REGISTRY_IP); + uint16_t port = getConfUInt16(Conf::DERECHO_LEADER_REGISTRY_PORT); while(true) { int option_index = 0; diff --git a/src/core/view_manager.cpp b/src/core/view_manager.cpp index df65ba9b..74c28335 100644 --- a/src/core/view_manager.cpp +++ b/src/core/view_manager.cpp @@ -72,6 +72,8 @@ bool ViewManager::first_init() { if(curr_view) { dbg_debug(vm_logger, "Found view {} on disk", curr_view->vid); restart_state = std::make_unique(); +#ifdef ENABLE_LEADER_REGISTRY +#else restart_state->restart_leader_ips = split_string(getConfString(Conf::DERECHO_RESTART_LEADERS)); restart_state->restart_leader_ports = [&]() { //"Apply std::stoi over the result of split_string(getConfString(...))" @@ -83,6 +85,7 @@ bool ViewManager::first_init() { return ports; }(); restart_state->num_leader_failures = 0; +#endif//ENABLE_LEADER_REGISTRY restart_to_initial_view(); } else { startup_to_first_view(); @@ -135,8 +138,13 @@ bool ViewManager::restart_to_initial_view() { bool got_initial_view = false; while(!got_initial_view) { //Determine if I am the current restart leader +#ifdef ENABLE_LEADER_REGISTRY + auto leader_tuple = Conf::get()->get_leader(); + if (my_ip == std::get<0>(leader_tuple) && my_gms_port == std::get<1>(leader_tuple)) { +#else if(my_ip == restart_state->restart_leader_ips[restart_state->num_leader_failures] && my_gms_port == restart_state->restart_leader_ports[restart_state->num_leader_failures]) { +#endif in_total_restart = true; active_leader = true; dbg_info(vm_logger, "Logged View {} found on disk. Restarting in recovery mode as the leader.", curr_view->vid); @@ -245,6 +253,8 @@ bool ViewManager::receive_initial_view() { std::vector{}); // Initialize restart_state, which wasn't created in first_init() because we had no logged View restart_state = std::make_unique(); +#ifdef ENABLE_LEADER_REGISTRY +#else restart_state->restart_leader_ips = split_string(getConfString(Conf::DERECHO_RESTART_LEADERS)); restart_state->restart_leader_ports = [&]() { auto port_list = split_string(getConfString(Conf::DERECHO_RESTART_LEADER_PORTS)); @@ -255,6 +265,7 @@ bool ViewManager::receive_initial_view() { return ports; }(); restart_state->num_leader_failures = 0; +#endif//ENABLE_LEADER_REGISTRY } dbg_debug(vm_logger, "Sending view {} to leader", curr_view->vid); auto leader_socket_write = [this](const uint8_t* bytes, std::size_t size) {