Skip to content

Commit

Permalink
Added ability to gracefully interrupt application in both fetch and p…
Browse files Browse the repository at this point in the history
…ull modes (#47)

Added custom signal handlers for SIGINT and SIGTERM that set global flag
('std::atomic_flag global_termination_flag' ).

Added extra logic that checks for a termination request in the following locations.
* Inside receiving / processing individual binlog events loop in 'receive_binlog_events()'.
* In the 'pull' mode idle-reconnection loop.
* Inside the idle time sleep function.

From the user point of view the application can now be gracefully shut down when user presses 'Ctrl+C' or kills the application by SIGTERM (e.g. 'kill <binlog_server_pid>').
"Gracefully " means that all the connections will be properly closed, data caches written to disk / uploaded to the cloud and storage will remain in consistent / resumable state.

Because of the synchronous nature of the binlog functions from the MySQL client library, there still may be a delay between receiving the signal and reacting to it. Worst case scenario, user will have to wait '<connection.read_timeout>' (the value from the configuration) + 1 second (the granularity of sleep intervals in the 'idle' mode).
  • Loading branch information
percona-ysorokin authored May 17, 2024
1 parent f0f62ef commit 0a79d56
Showing 1 changed file with 87 additions and 20 deletions.
107 changes: 87 additions & 20 deletions src/app.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA

#include <algorithm>
#include <atomic>
#include <cassert>
#include <chrono>
#include <csignal>
#include <cstddef>
#include <cstdint>
#include <cstdlib>
Expand Down Expand Up @@ -232,8 +234,9 @@ void process_binlog_event(const binsrv::event::event &current_event,
}

void receive_binlog_events(
binsrv::operation_mode_type operation_mode, binsrv::basic_logger &logger,
const easymysql::library &mysql_lib,
binsrv::operation_mode_type operation_mode,
const volatile std::atomic_flag &termination_flag,
binsrv::basic_logger &logger, const easymysql::library &mysql_lib,
const easymysql::connection_config &connection_config,
std::uint32_t server_id, binsrv::storage &storage) {
easymysql::connection connection{};
Expand Down Expand Up @@ -295,7 +298,8 @@ void receive_binlog_events(
bool skip_open_binlog{storage.is_binlog_open()};
bool fetch_result{};

while ((fetch_result = connection.fetch_binlog_event(portion)) &&
while (!termination_flag.test() &&
(fetch_result = connection.fetch_binlog_event(portion)) &&
!portion.empty()) {
if (portion[0] != expected_event_packet_prefix) {
util::exception_location().raise<std::runtime_error>(
Expand All @@ -318,20 +322,55 @@ void receive_binlog_events(

process_binlog_event(current_event, portion, storage, skip_open_binlog);
}
if (termination_flag.test()) {
logger.log(binsrv::log_severity::info,
"fetching binlog events loop terminated by signal");
return;
}
if (fetch_result) {
logger.log(binsrv::log_severity::info,
"fetched everything and disconnected");
} else {
if (operation_mode == binsrv::operation_mode_type::fetch) {
util::exception_location().raise<std::logic_error>(
"fetch operation did not reach EOF reading binlog events");
}
logger.log(binsrv::log_severity::info,
"timed out waiting for events and disconnected");
return;
}
if (operation_mode == binsrv::operation_mode_type::fetch) {
util::exception_location().raise<std::logic_error>(
"fetch operation did not reach EOF reading binlog events");
}
logger.log(binsrv::log_severity::info,
"timed out waiting for events and disconnected");
}

bool wait_for_interruptable(std::uint32_t idle_time_seconds,
const volatile std::atomic_flag &termination_flag) {
// instead of
// 'std::this_thread::sleep_for(std::chrono::seconds(idle_time_seconds))'
// we do 'std::this_thread::sleep_for(1s)' '<idle_time_seconds>' times
// in a loop also checking for termination condition

// standard pattern with declaring an instance of
// std::conditional_variable and waiting for it (for
// '<idle_time_seconds>' seconds) to be notified from the signal handler
// can be dangerous as the chances of signal handler being called on the
// same thread as this one ('main()') are pretty big.
for (std::uint32_t sleep_iteration{0U};
sleep_iteration < idle_time_seconds && !termination_flag.test();
++sleep_iteration) {
std::this_thread::sleep_for(std::chrono::seconds(1U));
}
return !termination_flag.test();
}

// since c++20 it is no longer needed to initialize std::atomic_flag with
// ATOMIC_FLAG_INIT as this flag is modified from a signal handler it is marked
// as volatile to make sure optimizer do optimizations which will be unsafe for
// this scenario
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
volatile std::atomic_flag global_termination_flag{};

} // anonymous namespace
extern "C" void custom_signal_handler(int /*signo*/) {
global_termination_flag.test_and_set();
}

int main(int argc, char *argv[]) {
using namespace std::string_literals;
Expand Down Expand Up @@ -418,6 +457,20 @@ int main(int argc, char *argv[]) {
"logging level set to \""s + std::string{log_level_label} +
'"');

// setting custom SIGINT and SIGTERM signals amd making sure that
// any other dependency library hasn't already changed them
[[maybe_unused]] auto const previous_sigterm_handler{
std::signal(SIGTERM, &custom_signal_handler)};
assert(previous_sigterm_handler == SIG_DFL);

[[maybe_unused]] auto const previous_sigint_handler{
std::signal(SIGINT, &custom_signal_handler)};
assert(previous_sigint_handler == SIG_DFL);

logger->log(binsrv::log_severity::info,
"set custom handlers for SIGINT and SIGTERM signals");
const volatile std::atomic_flag &termination_flag{global_termination_flag};

const auto &storage_config = config->root().get<"storage">();
auto storage_backend{
binsrv::storage_backend_factory::create(storage_config)};
Expand Down Expand Up @@ -445,28 +498,42 @@ int main(int argc, char *argv[]) {

log_library_info(*logger, mysql_lib);

receive_binlog_events(operation_mode, *logger, mysql_lib, connection_config,
server_id, storage);
receive_binlog_events(operation_mode, termination_flag, *logger, mysql_lib,
connection_config, server_id, storage);

if (operation_mode == binsrv::operation_mode_type::pull) {
std::size_t iteration_number{1U};
while (true) {
std::this_thread::sleep_for(std::chrono::seconds(idle_time_seconds));

msg = "awoke after sleeping for ";
while (!termination_flag.test()) {
msg = "entering idle mode for ";
msg += std::to_string(idle_time_seconds);
msg += " seconds and trying to reconnect (iteration ";
msg += " seconds";
logger->log(binsrv::log_severity::info, msg);

if (!wait_for_interruptable(idle_time_seconds, termination_flag)) {
break;
}

msg = "awoke after sleeping and trying to reconnect (iteration ";
msg += std::to_string(iteration_number);
msg += ')';
logger->log(binsrv::log_severity::info, msg);

receive_binlog_events(operation_mode, *logger, mysql_lib,
connection_config, server_id, storage);
receive_binlog_events(operation_mode, termination_flag, *logger,
mysql_lib, connection_config, server_id, storage);
++iteration_number;
}
}

logger->log(binsrv::log_severity::info, "successfully shut down");
if (termination_flag.test()) {
logger->log(
binsrv::log_severity::info,
"successfully shut down after receiving a termination signal");
} else {
logger->log(
binsrv::log_severity::info,
"successfully shut down after finishing the requested operation");
}

exit_code = EXIT_SUCCESS;
} catch (...) {
handle_std_exception(logger);
Expand Down

0 comments on commit 0a79d56

Please sign in to comment.