diff --git a/src/app.cpp b/src/app.cpp index dd5a5cb..89e5d83 100644 --- a/src/app.cpp +++ b/src/app.cpp @@ -14,8 +14,10 @@ // Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA #include +#include #include #include +#include #include #include #include @@ -232,8 +234,9 @@ void process_binlog_event(const binsrv::event::event ¤t_event, } void receive_binlog_events( - binsrv::operation_mode_type operation_mode, binsrv::basic_logger &logger, - const easymysql::library &mysql_lib, + binsrv::operation_mode_type operation_mode, + const volatile std::atomic_flag &termination_flag, + binsrv::basic_logger &logger, const easymysql::library &mysql_lib, const easymysql::connection_config &connection_config, std::uint32_t server_id, binsrv::storage &storage) { easymysql::connection connection{}; @@ -295,7 +298,8 @@ void receive_binlog_events( bool skip_open_binlog{storage.is_binlog_open()}; bool fetch_result{}; - while ((fetch_result = connection.fetch_binlog_event(portion)) && + while (!termination_flag.test() && + (fetch_result = connection.fetch_binlog_event(portion)) && !portion.empty()) { if (portion[0] != expected_event_packet_prefix) { util::exception_location().raise( @@ -318,20 +322,55 @@ void receive_binlog_events( process_binlog_event(current_event, portion, storage, skip_open_binlog); } + if (termination_flag.test()) { + logger.log(binsrv::log_severity::info, + "fetching binlog events loop terminated by signal"); + return; + } if (fetch_result) { logger.log(binsrv::log_severity::info, "fetched everything and disconnected"); - } else { - if (operation_mode == binsrv::operation_mode_type::fetch) { - util::exception_location().raise( - "fetch operation did not reach EOF reading binlog events"); - } - logger.log(binsrv::log_severity::info, - "timed out waiting for events and disconnected"); + return; + } + if (operation_mode == binsrv::operation_mode_type::fetch) { + util::exception_location().raise( + "fetch operation did not reach EOF reading binlog events"); + } + logger.log(binsrv::log_severity::info, + "timed out waiting for events and disconnected"); +} + +bool wait_for_interruptable(std::uint32_t idle_time_seconds, + const volatile std::atomic_flag &termination_flag) { + // instead of + // 'std::this_thread::sleep_for(std::chrono::seconds(idle_time_seconds))' + // we do 'std::this_thread::sleep_for(1s)' '' times + // in a loop also checking for termination condition + + // standard pattern with declaring an instance of + // std::conditional_variable and waiting for it (for + // '' seconds) to be notified from the signal handler + // can be dangerous as the chances of signal handler being called on the + // same thread as this one ('main()') are pretty big. + for (std::uint32_t sleep_iteration{0U}; + sleep_iteration < idle_time_seconds && !termination_flag.test(); + ++sleep_iteration) { + std::this_thread::sleep_for(std::chrono::seconds(1U)); } + return !termination_flag.test(); } +// since c++20 it is no longer needed to initialize std::atomic_flag with +// ATOMIC_FLAG_INIT as this flag is modified from a signal handler it is marked +// as volatile to make sure optimizer do optimizations which will be unsafe for +// this scenario +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +volatile std::atomic_flag global_termination_flag{}; + } // anonymous namespace +extern "C" void custom_signal_handler(int /*signo*/) { + global_termination_flag.test_and_set(); +} int main(int argc, char *argv[]) { using namespace std::string_literals; @@ -418,6 +457,20 @@ int main(int argc, char *argv[]) { "logging level set to \""s + std::string{log_level_label} + '"'); + // setting custom SIGINT and SIGTERM signals amd making sure that + // any other dependency library hasn't already changed them + [[maybe_unused]] auto const previous_sigterm_handler{ + std::signal(SIGTERM, &custom_signal_handler)}; + assert(previous_sigterm_handler == SIG_DFL); + + [[maybe_unused]] auto const previous_sigint_handler{ + std::signal(SIGINT, &custom_signal_handler)}; + assert(previous_sigint_handler == SIG_DFL); + + logger->log(binsrv::log_severity::info, + "set custom handlers for SIGINT and SIGTERM signals"); + const volatile std::atomic_flag &termination_flag{global_termination_flag}; + const auto &storage_config = config->root().get<"storage">(); auto storage_backend{ binsrv::storage_backend_factory::create(storage_config)}; @@ -445,28 +498,42 @@ int main(int argc, char *argv[]) { log_library_info(*logger, mysql_lib); - receive_binlog_events(operation_mode, *logger, mysql_lib, connection_config, - server_id, storage); + receive_binlog_events(operation_mode, termination_flag, *logger, mysql_lib, + connection_config, server_id, storage); if (operation_mode == binsrv::operation_mode_type::pull) { std::size_t iteration_number{1U}; - while (true) { - std::this_thread::sleep_for(std::chrono::seconds(idle_time_seconds)); - - msg = "awoke after sleeping for "; + while (!termination_flag.test()) { + msg = "entering idle mode for "; msg += std::to_string(idle_time_seconds); - msg += " seconds and trying to reconnect (iteration "; + msg += " seconds"; + logger->log(binsrv::log_severity::info, msg); + + if (!wait_for_interruptable(idle_time_seconds, termination_flag)) { + break; + } + + msg = "awoke after sleeping and trying to reconnect (iteration "; msg += std::to_string(iteration_number); msg += ')'; logger->log(binsrv::log_severity::info, msg); - receive_binlog_events(operation_mode, *logger, mysql_lib, - connection_config, server_id, storage); + receive_binlog_events(operation_mode, termination_flag, *logger, + mysql_lib, connection_config, server_id, storage); ++iteration_number; } } - logger->log(binsrv::log_severity::info, "successfully shut down"); + if (termination_flag.test()) { + logger->log( + binsrv::log_severity::info, + "successfully shut down after receiving a termination signal"); + } else { + logger->log( + binsrv::log_severity::info, + "successfully shut down after finishing the requested operation"); + } + exit_code = EXIT_SUCCESS; } catch (...) { handle_std_exception(logger);