From a47fdc96c5c76ea8eb256df064950efc22e991cd Mon Sep 17 00:00:00 2001 From: Yura Sorokin Date: Thu, 16 May 2024 01:09:43 +0200 Subject: [PATCH] Implemented fetch and pull modes for the main application MySQL connection configuration structure ('easymysql::connection_config') extended with three additional parameters: * 'connect_timeout' ('MYSQL_OPT_CONNECT_TIMEOUT') * 'read_timeout' ('MYSQL_OPT_READ_TIMEOUT') * 'write_timeout' ('MYSQL_OPT_WRITE_TIMEOUT') that are used to set corresponding MySQL client options via 'mysql_options()'. 'main_config.json' sample configuration file and config file generation in 'binlog_streaming.binsrv' MTR test case updated correspondingly. Main application extended with one more command line argument (the very first one) called ''' with two allowed values ('fetch' and 'pull'). In 'fetch' mode the application now connects to the remote MySQL server and reads all events from all binary logs till the very last one (till 'mysql_binlog_fetch()' returns an empty buffer) and exits. The replication channel is opened in non-blocking mode. In 'pull' mode the application also reads all available events till the very last one but instead of exiting immediately continues to wait for other events to come later. The replication channel is opened in blocking mode. In order to be able to gracefully terminate this process, we use 'read_timeout' configuration parameter ('MYSQL_OPT_READ_TIMEOUT' MySQL client option) after which blocking 'mysql_binlog_fetch()' call will terminate with a timeout error. After this, the replication connection is closed and program enters the 'idle' state doing nothing. Currently the idle time is hardcoded to 5 seconds. When this 'idle' time passes, the program attempts to re-establish MySQL server connection and continue replication. Not being able to do so, will not terminate the program. Instead, after another 'idle' period the next attempt to re-establish connection will be made. This model will allow us to react to termination requests (most probably to posix 'SIGINT' / 'SIGTERM' signals) not only in the binlog event processing loop but during the 'idle' time as well. Introduced 'binsrv::operation_mode_type' smart enum with two values ('fetch' and 'pull') that is used to specify main application operation mode. 'easymysql::connection' now includes all the functionality available before in the 'easymysql::binlog' class. Established connection can now be switched to 'replication' mode. 'easymysql::binlog' class removed completely. The method that switches connection to replication mode ('easymysql::connection::switch_to_replication()') accepts an extra parameter that specifies blocking / non-blocking replication mode. Added explicit 'flush()' calls at the end of 'do_write_data_to_stream()' for both 'binsrv::filesystem_storage_backend' and 'binsrv::s3_storage_backend' to make sure that buffered data is immediately written to the disk. 'mysql-server' project GitHub links in the comments updated to version '8.0.37'. --- CMakeLists.txt | 8 +- main_config.json | 5 +- mtr/binlog_streaming/t/binsrv.test | 9 +- src/app.cpp | 310 +++++++++++++----- src/binsrv/event/checksum_algorithm_type.hpp | 2 +- src/binsrv/event/code_type.hpp | 2 +- src/binsrv/event/common_header.cpp | 2 +- src/binsrv/event/flag_type.hpp | 4 +- .../format_description_post_header_impl.cpp | 2 +- src/binsrv/event/reader_context.cpp | 4 +- src/binsrv/event/rotate_post_header_impl.cpp | 2 +- src/binsrv/filesystem_storage_backend.cpp | 4 + src/binsrv/operation_mode_type.hpp | 96 ++++++ .../operation_mode_type_fwd.hpp} | 28 +- src/binsrv/s3_storage_backend.cpp | 6 +- src/binsrv/storage.cpp | 4 + src/binsrv/storage.hpp | 11 +- src/easymysql/binlog.cpp | 98 ------ src/easymysql/binlog.hpp | 65 ---- src/easymysql/binlog_fwd.hpp | 27 -- src/easymysql/connection.cpp | 199 ++++++++++- src/easymysql/connection.hpp | 44 ++- src/easymysql/connection_config.hpp | 11 +- src/easymysql/connection_deimpl_private.hpp | 3 +- src/easymysql/connection_fwd.hpp | 2 + src/easymysql/core_error_helpers_private.cpp | 8 +- 26 files changed, 620 insertions(+), 336 deletions(-) create mode 100644 src/binsrv/operation_mode_type.hpp rename src/{easymysql/binlog_deimpl_private.hpp => binsrv/operation_mode_type_fwd.hpp} (51%) delete mode 100644 src/easymysql/binlog.cpp delete mode 100644 src/easymysql/binlog.hpp delete mode 100644 src/easymysql/binlog_fwd.hpp diff --git a/CMakeLists.txt b/CMakeLists.txt index fbc4059..a9d8887 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -185,6 +185,9 @@ set(source_files src/binsrv/main_config.hpp src/binsrv/main_config.cpp + src/binsrv/operation_mode_type_fwd.hpp + src/binsrv/operation_mode_type.hpp + src/binsrv/s3_error_helpers_private.hpp src/binsrv/s3_error_helpers_private.cpp src/binsrv/s3_error.hpp @@ -241,11 +244,6 @@ set(source_files src/easymysql/core_error.hpp src/easymysql/core_error.cpp - src/easymysql/binlog_deimpl_private.hpp - src/easymysql/binlog_fwd.hpp - src/easymysql/binlog.hpp - src/easymysql/binlog.cpp - src/easymysql/connection_deimpl_private.hpp src/easymysql/connection_fwd.hpp src/easymysql/connection.hpp diff --git a/main_config.json b/main_config.json index b6d07d9..28d4452 100644 --- a/main_config.json +++ b/main_config.json @@ -7,7 +7,10 @@ "host": "127.0.0.1", "port": 3306, "user": "root", - "password": "" + "password": "", + "connect_timeout": 20, + "read_timeout": 60, + "write_timeout": 60 }, "storage": { "uri": "file:///home/user/vault" diff --git a/mtr/binlog_streaming/t/binsrv.test b/mtr/binlog_streaming/t/binsrv.test index 2441b36..62ac0d4 100644 --- a/mtr/binlog_streaming/t/binsrv.test +++ b/mtr/binlog_streaming/t/binsrv.test @@ -105,7 +105,10 @@ eval SET @binsrv_config_json = JSON_OBJECT( 'host', @connection_host, 'port', @@global.port, 'user', @connection_user, - 'password', '' + 'password', '', + 'connect_timeout', 20, + 'read_timeout', 60, + 'write_timeout', 60 ), 'storage', JSON_OBJECT( 'uri', @storage_uri @@ -144,7 +147,7 @@ if ($storage_backend == file) --echo *** Executing the Binlog Server utility to download all binlog data --echo *** from the server to the directory (second --echo *** binlog is still open / in use). ---exec $BINSRV $binsrv_config_file_path > /dev/null +--exec $BINSRV fetch $binsrv_config_file_path > /dev/null --echo --echo *** Checking that the Binlog Server utility detected an empty storage @@ -220,7 +223,7 @@ FLUSH BINARY LOGS; --echo *** Executing the Binlog Server utility one more time (the second --echo *** binlog is no longer open / in use). Here we should also continue --echo *** streaming binlog events from the last saved position. ---exec $BINSRV $binsrv_config_file_path > /dev/null +--exec $BINSRV fetch $binsrv_config_file_path > /dev/null --echo --echo *** Checking that the Binlog Server utility detected a previously diff --git a/src/app.cpp b/src/app.cpp index 17391e0..1f46672 100644 --- a/src/app.cpp +++ b/src/app.cpp @@ -15,6 +15,7 @@ #include #include +#include #include #include #include @@ -26,9 +27,11 @@ #include #include #include +#include #include #include +#include #include "binsrv/basic_logger.hpp" #include "binsrv/basic_storage_backend.hpp" @@ -36,6 +39,7 @@ #include "binsrv/log_severity.hpp" #include "binsrv/logger_factory.hpp" #include "binsrv/main_config.hpp" +#include "binsrv/operation_mode_type.hpp" #include "binsrv/storage.hpp" #include "binsrv/storage_backend_factory.hpp" @@ -45,9 +49,9 @@ #include "binsrv/event/protocol_traits_fwd.hpp" #include "binsrv/event/reader_context.hpp" -#include "easymysql/binlog.hpp" #include "easymysql/connection.hpp" #include "easymysql/connection_config.hpp" +#include "easymysql/core_error.hpp" #include "easymysql/library.hpp" #include "util/byte_span_fwd.hpp" @@ -57,6 +61,67 @@ namespace { +void log_storage_info(binsrv::basic_logger &logger, + const binsrv::storage &storage) { + std::string msg{}; + if (storage.has_current_binlog_name()) { + msg = "binlog storage initialized at \""; + msg += storage.get_current_binlog_name(); + msg += "\":"; + msg += std::to_string(storage.get_current_position()); + } else { + msg = "binlog storage initialized on an empty directory"; + } + logger.log(binsrv::log_severity::info, msg); +} + +void log_library_info(binsrv::basic_logger &logger, + const easymysql::library &mysql_lib) { + std::string msg{}; + msg = "mysql client version: "; + msg += mysql_lib.get_readable_client_version(); + logger.log(binsrv::log_severity::info, msg); +} + +void log_connection_info(binsrv::basic_logger &logger, + const easymysql::connection &connection) { + std::string msg{}; + msg = "mysql server version: "; + msg += connection.get_readable_server_version(); + logger.log(binsrv::log_severity::info, msg); + + logger.log(binsrv::log_severity::info, + "mysql protocol version: " + + std::to_string(connection.get_protocol_version())); + + msg = "mysql server connection info: "; + msg += connection.get_server_connection_info(); + logger.log(binsrv::log_severity::info, msg); + + msg = "mysql connection character set: "; + msg += connection.get_character_set_name(); + logger.log(binsrv::log_severity::info, msg); +} + +void log_replication_info( + binsrv::basic_logger &logger, std::uint32_t server_id, + std::string_view current_binlog_name, std::uint64_t current_binlog_position, + easymysql::connection_replication_mode_type blocking_mode) { + std::string msg{}; + msg = "replication info (server id "; + msg += std::to_string(server_id); + msg += ", "; + msg += (current_binlog_name.empty() ? "" : current_binlog_name); + msg += ":"; + msg += std::to_string(current_binlog_position); + msg += ", "; + msg += (blocking_mode == easymysql::connection_replication_mode_type::blocking + ? "blocking" + : "non-blocking"); + msg += ")"; + logger.log(binsrv::log_severity::info, msg); +} + void log_span_dump(binsrv::basic_logger &logger, util::const_byte_span portion) { static constexpr std::size_t bytes_per_dump_line{16U}; @@ -91,9 +156,101 @@ void log_span_dump(binsrv::basic_logger &logger, } } -void receive_binlog_events(binsrv::basic_logger &logger, - easymysql::binlog &binlog, - binsrv::storage &storage) { +void process_binlog_event(const binsrv::event::event ¤t_event, + util::const_byte_span portion, + binsrv::storage &storage, bool &skip_open_binlog) { + const auto ¤t_common_header = current_event.get_common_header(); + const auto code = current_common_header.get_type_code(); + + const auto is_artificial{current_common_header.get_flags().has_element( + binsrv::event::flag_type::artificial)}; + const auto is_pseudo{current_common_header.get_next_event_position_raw() == + 0U}; + + if (code == binsrv::event::code_type::rotate && is_artificial) { + const auto ¤t_rotate_body = + current_event.get_body(); + if (skip_open_binlog) { + // we are supposed to get here after reconnection, so doing + // basic integrity checks + if (current_rotate_body.get_binlog() != + storage.get_current_binlog_name()) { + util::exception_location().raise( + "unexpected binlog name in artificial rotate event after " + "reconnection"); + } + const auto ¤t_rotate_post_header = + current_event.get_post_header(); + if (current_rotate_post_header.get_position_raw() != + storage.get_current_position()) { + util::exception_location().raise( + "unexpected binlog position in artificial rotate event after " + "reconnection"); + } + + skip_open_binlog = false; + } else { + storage.open_binlog(current_rotate_body.get_binlog()); + } + } + if (!is_artificial && !is_pseudo) { + storage.write_event(portion); + } + if (code == binsrv::event::code_type::rotate && !is_artificial) { + storage.close_binlog(); + } +} + +void receive_binlog_events( + binsrv::operation_mode_type operation_mode, binsrv::basic_logger &logger, + const easymysql::library &mysql_lib, + const easymysql::connection_config &connection_config, + std::uint32_t server_id, binsrv::storage &storage) { + easymysql::connection connection{}; + try { + connection = mysql_lib.create_connection(connection_config); + } catch (const easymysql::core_error &) { + if (operation_mode == binsrv::operation_mode_type::fetch) { + throw; + } + logger.log(binsrv::log_severity::info, + "unable to establish connection to mysql server"); + return; + } + + logger.log(binsrv::log_severity::info, + "established connection to mysql server"); + + log_connection_info(logger, connection); + + const auto current_binlog_name{storage.get_current_binlog_name()}; + // if storage binlog name is detected to be empty (empty data directory), we + // start streaming from the position 'magic_binlog_offset' (4) + const auto current_binlog_position{current_binlog_name.empty() + ? binsrv::event::magic_binlog_offset + : storage.get_current_position()}; + + const auto blocking_mode{ + operation_mode == binsrv::operation_mode_type::fetch + ? easymysql::connection_replication_mode_type::non_blocking + : easymysql::connection_replication_mode_type::blocking}; + + try { + connection.switch_to_replication(server_id, current_binlog_name, + current_binlog_position, blocking_mode); + } catch (const easymysql::core_error &) { + if (operation_mode == binsrv::operation_mode_type::fetch) { + throw; + } + logger.log(binsrv::log_severity::info, "unable to switch to replication"); + return; + } + + logger.log(binsrv::log_severity::info, "switched to replication"); + + log_replication_info(logger, server_id, current_binlog_name, + current_binlog_position, blocking_mode); + // Network streams are requested with COM_BINLOG_DUMP and // each Binlog Event response is prepended with 00 OK-byte. static constexpr std::byte expected_event_packet_prefix{'\0'}; @@ -102,7 +259,14 @@ void receive_binlog_events(binsrv::basic_logger &logger, binsrv::event::reader_context context{}; - while (!(portion = binlog.fetch()).empty()) { + // if binlog is still open, there is no sense to close it and re-open + // instead, we will just instruct this loop to process the + // very first artificial rotate event in a special way + bool skip_open_binlog{storage.is_binlog_open()}; + bool fetch_result{}; + + while ((fetch_result = connection.fetch_binlog_event(portion)) && + !portion.empty()) { if (portion[0] != expected_event_packet_prefix) { util::exception_location().raise( "unexpected event prefix"); @@ -120,28 +284,20 @@ void receive_binlog_events(binsrv::basic_logger &logger, logger.log(binsrv::log_severity::debug, "Parsed event:\n" + boost::lexical_cast(current_event)); - - const auto ¤t_common_header = current_event.get_common_header(); - const auto code = current_common_header.get_type_code(); log_span_dump(logger, portion); - const auto is_artificial{current_common_header.get_flags().has_element( - binsrv::event::flag_type::artificial)}; - const auto is_pseudo{current_common_header.get_next_event_position_raw() == - 0U}; - - if (code == binsrv::event::code_type::rotate && is_artificial) { - const auto ¤t_rotate_body = - current_event.get_body(); - - storage.open_binlog(current_rotate_body.get_binlog()); - } - if (!is_artificial && !is_pseudo) { - storage.write_event(portion); - } - if (code == binsrv::event::code_type::rotate && !is_artificial) { - storage.close_binlog(); + process_binlog_event(current_event, portion, storage, skip_open_binlog); + } + if (fetch_result) { + logger.log(binsrv::log_severity::info, + "fetched everything and disconnected"); + } else { + if (operation_mode == binsrv::operation_mode_type::fetch) { + util::exception_location().raise( + "fetch operation did not reach EOF reading binlog events"); } + logger.log(binsrv::log_severity::info, + "timed out waiting for events and disconnected"); } } @@ -156,19 +312,28 @@ int main(int argc, char *argv[]) { const auto number_of_cmd_args = std::size(cmd_args); const auto executable_name = util::extract_executable_name(cmd_args); - if (number_of_cmd_args != binsrv::main_config::flattened_size + 1 && - number_of_cmd_args != 2) { - std::cerr << "usage: " << executable_name + binsrv::operation_mode_type operation_mode{ + binsrv::operation_mode_type::delimiter}; + auto cmd_args_validated{ + (number_of_cmd_args == binsrv::main_config::flattened_size + 2U || + number_of_cmd_args == 3U) && + boost::conversion::try_lexical_convert(cmd_args[1], operation_mode) && + operation_mode != binsrv::operation_mode_type::delimiter}; + + if (!cmd_args_validated) { + std::cerr << "usage: " << executable_name << " (fetch|pull))" << " " " " + " " " \n" - << " " << executable_name << " \n"; + << " " << executable_name + << " (fetch|pull)) \n"; return exit_code; } binsrv::basic_logger_ptr logger; try { - const auto default_log_level = binsrv::log_severity::trace; + static constexpr auto default_log_level = binsrv::log_severity::trace; const binsrv::logger_config initial_logger_config{ {{default_log_level}, {""}}}; @@ -181,16 +346,26 @@ int main(int argc, char *argv[]) { logger->log(binsrv::log_severity::delimiter, util::get_readable_command_line_arguments(cmd_args)); + if (operation_mode == binsrv::operation_mode_type::fetch) { + logger->log(binsrv::log_severity::delimiter, + "'fetch' operation mode specified"); + } else if (operation_mode == binsrv::operation_mode_type::pull) { + logger->log(binsrv::log_severity::delimiter, + "'pull' operation mode specified"); + } else { + assert(false); + } + binsrv::main_config_ptr config; - if (number_of_cmd_args == 2U) { + if (number_of_cmd_args == 3U) { logger->log(binsrv::log_severity::delimiter, - "Reading connection configuration from the JSON file."); - config = std::make_shared(cmd_args[1]); - } else if (number_of_cmd_args == binsrv::main_config::flattened_size + 1) { + "reading connection configuration from the JSON file."); + config = std::make_shared(cmd_args[2]); + } else if (number_of_cmd_args == binsrv::main_config::flattened_size + 2U) { logger->log(binsrv::log_severity::delimiter, - "Reading connection configuration from the command line " + "reading connection configuration from the command line " "arguments."); - config = std::make_shared(cmd_args.subspan(1U)); + config = std::make_shared(cmd_args.subspan(2U)); } else { assert(false); } @@ -201,7 +376,7 @@ int main(int argc, char *argv[]) { logger->set_min_level(logger_config.get<"level">()); } else { logger->log(binsrv::log_severity::delimiter, - "Redirecting logging to \"" + logger_config.get<"file">() + + "redirecting logging to \"" + logger_config.get<"file">() + "\""); auto new_logger = binsrv::logger_factory::create(logger_config); std::swap(logger, new_logger); @@ -226,58 +401,41 @@ int main(int argc, char *argv[]) { logger->log(binsrv::log_severity::info, "created binlog storage with the provided backend"); - const auto last_binlog_name{storage.get_binlog_name()}; - // if storage position is detected to be 0 (empty data directory), we - // start streaming from the position magic_binlog_offset (4) - const auto last_binlog_position{ - std::max(binsrv::event::magic_binlog_offset, storage.get_position())}; - if (last_binlog_name.empty()) { - msg = "binlog storage initialized on an empty directory"; - } else { - msg = "binlog storage initialized at \""; - msg += last_binlog_name; - msg += "\":"; - msg += std::to_string(last_binlog_position); - } - logger->log(binsrv::log_severity::info, msg); + log_storage_info(*logger, storage); const auto &connection_config = config->root().get<"connection">(); logger->log(binsrv::log_severity::info, "mysql connection string: " + connection_config.get_connection_string()); - const easymysql::library mysql_lib; - logger->log(binsrv::log_severity::info, "initialized mysql client library"); + // TODO: put these parameters into configuration + static constexpr std::uint32_t default_server_id{42U}; + static constexpr std::uint32_t default_idle_time_seconds{5U}; - msg = "mysql client version: "; - msg += mysql_lib.get_readable_client_version(); - logger->log(binsrv::log_severity::info, msg); + const auto server_id{default_server_id}; + const auto idle_time_seconds{default_idle_time_seconds}; - auto connection = mysql_lib.create_connection(connection_config); - logger->log(binsrv::log_severity::info, - "established connection to mysql server"); - msg = "mysql server version: "; - msg += connection.get_readable_server_version(); - logger->log(binsrv::log_severity::info, msg); - - logger->log(binsrv::log_severity::info, - "mysql protocol version: " + - std::to_string(connection.get_protocol_version())); + const easymysql::library mysql_lib; + logger->log(binsrv::log_severity::info, "initialized mysql client library"); - msg = "mysql server connection info: "; - msg += connection.get_server_connection_info(); - logger->log(binsrv::log_severity::info, msg); + log_library_info(*logger, mysql_lib); - msg = "mysql connection character set: "; - msg += connection.get_character_set_name(); - logger->log(binsrv::log_severity::info, msg); + receive_binlog_events(operation_mode, *logger, mysql_lib, connection_config, + server_id, storage); - static constexpr std::uint32_t default_server_id{0U}; - auto binlog = connection.create_binlog(default_server_id, last_binlog_name, - last_binlog_position); - logger->log(binsrv::log_severity::info, "opened binary log connection"); + if (operation_mode == binsrv::operation_mode_type::pull) { + std::size_t iteration_number{1U}; + while (true) { + std::this_thread::sleep_for(std::chrono::seconds(idle_time_seconds)); - receive_binlog_events(*logger, binlog, storage); + logger->log(binsrv::log_severity::info, + "awoke after sleep and trying to reconnect (" + + std::to_string(iteration_number) + ")"); + receive_binlog_events(operation_mode, *logger, mysql_lib, + connection_config, server_id, storage); + ++iteration_number; + } + } logger->log(binsrv::log_severity::info, "successfully shut down"); exit_code = EXIT_SUCCESS; diff --git a/src/binsrv/event/checksum_algorithm_type.hpp b/src/binsrv/event/checksum_algorithm_type.hpp index a02f1ec..242ef64 100644 --- a/src/binsrv/event/checksum_algorithm_type.hpp +++ b/src/binsrv/event/checksum_algorithm_type.hpp @@ -27,7 +27,7 @@ namespace binsrv::event { // NOLINTBEGIN(cppcoreguidelines-macro-usage) // Checksum algorithm type codes copied from -// https://github.com/mysql/mysql-server/blob/mysql-8.0.36/libbinlogevents/include/binlog_event.h#L425 +// https://github.com/mysql/mysql-server/blob/mysql-8.0.37/libbinlogevents/include/binlog_event.h#L426 // clang-format off #define BINSRV_CHECKSUM_ALGORITHM_TYPE_XY_SEQUENCE() \ BINSRV_CHECKSUM_ALGORITHM_TYPE_XY_MACRO(off , 0), \ diff --git a/src/binsrv/event/code_type.hpp b/src/binsrv/event/code_type.hpp index a13c26e..adc03f1 100644 --- a/src/binsrv/event/code_type.hpp +++ b/src/binsrv/event/code_type.hpp @@ -27,7 +27,7 @@ namespace binsrv::event { // NOLINTBEGIN(cppcoreguidelines-macro-usage) // Event type codes copied from -// https://github.com/mysql/mysql-server/blob/mysql-8.0.36/libbinlogevents/include/binlog_event.h#L274 +// https://github.com/mysql/mysql-server/blob/mysql-8.0.37/libbinlogevents/include/binlog_event.h#L275 // clang-format off #define BINSRV_EVENT_CODE_TYPE_XY_SEQUENCE() \ BINSRV_EVENT_CODE_TYPE_XY_MACRO(unknown , 0), \ diff --git a/src/binsrv/event/common_header.cpp b/src/binsrv/event/common_header.cpp index 4c53f7a..e00ce55 100644 --- a/src/binsrv/event/common_header.cpp +++ b/src/binsrv/event/common_header.cpp @@ -40,7 +40,7 @@ common_header::common_header(util::const_byte_span portion) { // TODO: rework with direct member initialization /* - https://github.com/mysql/mysql-server/blob/mysql-8.0.36/libbinlogevents/src/binlog_event.cpp#L197 + https://github.com/mysql/mysql-server/blob/mysql-8.0.37/libbinlogevents/src/binlog_event.cpp#L198 The first 19 bytes in the header is as follows: +============================================+ diff --git a/src/binsrv/event/flag_type.hpp b/src/binsrv/event/flag_type.hpp index 3cb56c8..ea5458d 100644 --- a/src/binsrv/event/flag_type.hpp +++ b/src/binsrv/event/flag_type.hpp @@ -29,9 +29,9 @@ namespace binsrv::event { // NOLINTBEGIN(cppcoreguidelines-macro-usage) // Event flags copied from -// https://github.com/mysql/mysql-server/blob/mysql-8.0.36/sql/log_event.h#L246 +// https://github.com/mysql/mysql-server/blob/mysql-8.0.37/sql/log_event.h#L247 // 'binlog_in_use' flag is copied from -// https://github.com/mysql/mysql-server/blob/mysql-8.0.36/libbinlogevents/include/binlog_event.h#L269 +// https://github.com/mysql/mysql-server/blob/mysql-8.0.37/libbinlogevents/include/binlog_event.h#L270 // This flag is used as a marker in the common header section of the very // first format description event that this particular binlog is currently in // use. It us cleared(rewritten) by the server when the binary log is diff --git a/src/binsrv/event/format_description_post_header_impl.cpp b/src/binsrv/event/format_description_post_header_impl.cpp index 865ea6d..07842b8 100644 --- a/src/binsrv/event/format_description_post_header_impl.cpp +++ b/src/binsrv/event/format_description_post_header_impl.cpp @@ -40,7 +40,7 @@ generic_post_header_impl:: // TODO: rework with direct member initialization /* - https://github.com/mysql/mysql-server/blob/mysql-8.0.36/libbinlogevents/include/control_events.h#L286 + https://github.com/mysql/mysql-server/blob/mysql-8.0.37/libbinlogevents/include/control_events.h#L287 +=====================================+ | event | binlog_version 19 : 2 | = 4 diff --git a/src/binsrv/event/reader_context.cpp b/src/binsrv/event/reader_context.cpp index 2373b2e..439e2ec 100644 --- a/src/binsrv/event/reader_context.cpp +++ b/src/binsrv/event/reader_context.cpp @@ -39,8 +39,8 @@ void reader_context::process_event(const event ¤t_event) { const auto is_artificial{ common_header.get_flags().has_element(flag_type::artificial)}; const auto is_pseudo{common_header.get_next_event_position_raw() == 0U}; - // artificial rotate events must always have enext event position and - // timestamp setto 0 + // artificial rotate events must always have next event position and + // timestamp set to 0 if (code == code_type::rotate && is_artificial) { if (common_header.get_timestamp_raw() != 0U) { util::exception_location().raise( diff --git a/src/binsrv/event/rotate_post_header_impl.cpp b/src/binsrv/event/rotate_post_header_impl.cpp index 5a70a05..e15eedc 100644 --- a/src/binsrv/event/rotate_post_header_impl.cpp +++ b/src/binsrv/event/rotate_post_header_impl.cpp @@ -34,7 +34,7 @@ generic_post_header_impl::generic_post_header_impl( // TODO: rework with direct member initialization /* - https://github.com/mysql/mysql-server/blob/mysql-8.0.36/libbinlogevents/include/control_events.h#L53 + https://github.com/mysql/mysql-server/blob/mysql-8.0.37/libbinlogevents/include/control_events.h#L54 diff --git a/src/binsrv/filesystem_storage_backend.cpp b/src/binsrv/filesystem_storage_backend.cpp index b1c9661..c420f25 100644 --- a/src/binsrv/filesystem_storage_backend.cpp +++ b/src/binsrv/filesystem_storage_backend.cpp @@ -167,6 +167,10 @@ void filesystem_storage_backend::do_write_data_to_stream( util::exception_location().raise( "cannot write data to the underlying stream file"); } + if (!ofs_.flush()) { + util::exception_location().raise( + "cannot flush the underlying stream file"); + } // TODO: make sure that the data is properly written to the disk // use fsync() system call here } diff --git a/src/binsrv/operation_mode_type.hpp b/src/binsrv/operation_mode_type.hpp new file mode 100644 index 0000000..1648974 --- /dev/null +++ b/src/binsrv/operation_mode_type.hpp @@ -0,0 +1,96 @@ +// Copyright (c) 2023-2024 Percona and/or its affiliates. +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License, version 2.0, +// as published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License, version 2.0, for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +#ifndef BINSRV_OPERATION_MODE_TYPE_HPP +#define BINSRV_OPERATION_MODE_TYPE_HPP + +#include "binsrv/operation_mode_type_fwd.hpp" // IWYU pragma: export + +#include +#include +#include +#include +#include +#include + +#include "util/conversion_helpers.hpp" + +namespace binsrv { + +// NOLINTBEGIN(cppcoreguidelines-macro-usage) +// clang-format off +#define BINSRV_OPERATION_MODE_TYPE_X_SEQUENCE() \ + BINSRV_OPERATION_MODE_TYPE_X_MACRO(fetch), \ + BINSRV_OPERATION_MODE_TYPE_X_MACRO(pull ) +// clang-format on + +#define BINSRV_OPERATION_MODE_TYPE_X_MACRO(X) X +enum class operation_mode_type : std::uint8_t { + BINSRV_OPERATION_MODE_TYPE_X_SEQUENCE(), + delimiter +}; +#undef BINSRV_OPERATION_MODE_TYPE_X_MACRO + +inline std::string_view +to_string_view(operation_mode_type operation_mode) noexcept { + using namespace std::string_view_literals; +#define BINSRV_OPERATION_MODE_TYPE_X_MACRO(X) #X##sv + static constexpr std::array labels{BINSRV_OPERATION_MODE_TYPE_X_SEQUENCE(), + ""sv}; +#undef BINSRV_OPERATION_MODE_TYPE_X_MACRO + const auto index{util::enum_to_index( + std::min(operation_mode_type::delimiter, operation_mode))}; + // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-constant-array-index) + return labels[index]; +} +#undef BINSRV_OPERATION_MODE_TYPE_X_SEQUENCE +// NOLINTEND(cppcoreguidelines-macro-usage) + +template + requires std::same_as +std::basic_ostream & +operator<<(std::basic_ostream &output, + operation_mode_type operation_mode) { + return output << to_string_view(operation_mode); +} + +template + requires std::same_as +std::basic_istream & +operator>>(std::basic_istream &input, + operation_mode_type &operation_mode) { + std::string operation_mode_str; + input >> operation_mode_str; + if (!input) { + return input; + } + std::size_t index{0U}; + const auto max_index = util::enum_to_index(operation_mode_type::delimiter); + while (index < max_index && + to_string_view(util::index_to_enum(index)) != + operation_mode_str) { + ++index; + } + if (index < max_index) { + operation_mode = util::index_to_enum(index); + } else { + input.setstate(std::ios_base::failbit); + } + return input; +} + +} // namespace binsrv + +#endif // BINSRV_OPERATION_MODE_TYPE_HPP diff --git a/src/easymysql/binlog_deimpl_private.hpp b/src/binsrv/operation_mode_type_fwd.hpp similarity index 51% rename from src/easymysql/binlog_deimpl_private.hpp rename to src/binsrv/operation_mode_type_fwd.hpp index 6b65ac3..b69f1fe 100644 --- a/src/easymysql/binlog_deimpl_private.hpp +++ b/src/binsrv/operation_mode_type_fwd.hpp @@ -13,17 +13,29 @@ // along with this program; if not, write to the Free Software // Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA -#ifndef EASYMYSQL_BINLOG_DEIMPL_PRIVATE_HPP -#define EASYMYSQL_BINLOG_DEIMPL_PRIVATE_HPP +#ifndef BINSRV_OPERATION_MODE_TYPE_FWD_HPP +#define BINSRV_OPERATION_MODE_TYPE_FWD_HPP -#include +#include +#include +#include -#include "util/impl_helpers.hpp" +namespace binsrv { -namespace easymysql { +enum class operation_mode_type : std::uint8_t; -using binlog_deimpl = util::deimpl; +template + requires std::same_as +std::basic_ostream & +operator<<(std::basic_ostream &output, + operation_mode_type operation_mode); -} // namespace easymysql +template + requires std::same_as +std::basic_istream & +operator>>(std::basic_istream &input, + operation_mode_type &operation_mode); -#endif // EASYMYSQL_BINLOG_DEIMPL_PRIVATE_HPP +} // namespace binsrv + +#endif // BINSRV_OPERATION_MODE_TYPE_FWD_HPP diff --git a/src/binsrv/s3_storage_backend.cpp b/src/binsrv/s3_storage_backend.cpp index 9b91f6a..3332657 100644 --- a/src/binsrv/s3_storage_backend.cpp +++ b/src/binsrv/s3_storage_backend.cpp @@ -521,7 +521,11 @@ void s3_storage_backend::do_write_data_to_stream(util::const_byte_span data) { if (!tmp_fstream_.write(std::data(data_sv), static_cast(std::size(data_sv)))) { util::exception_location().raise( - "cannot write data to the underlying stream file"); + "cannot write data to the temporary file for S3 object"); + } + if (!tmp_fstream_.flush()) { + util::exception_location().raise( + "cannot flush the temporary file for S3 object"); } } diff --git a/src/binsrv/storage.cpp b/src/binsrv/storage.cpp index a45f97e..233df6e 100644 --- a/src/binsrv/storage.cpp +++ b/src/binsrv/storage.cpp @@ -78,6 +78,10 @@ storage::check_binlog_name(std::string_view binlog_name) noexcept { std::string_view::npos; } +[[nodiscard]] bool storage::is_binlog_open() const noexcept { + return backend_->is_stream_open(); +} + void storage::open_binlog(std::string_view binlog_name) { if (!check_binlog_name(binlog_name)) { util::exception_location().raise( diff --git a/src/binsrv/storage.hpp b/src/binsrv/storage.hpp index 545570e..6a6f6bc 100644 --- a/src/binsrv/storage.hpp +++ b/src/binsrv/storage.hpp @@ -44,16 +44,21 @@ class [[nodiscard]] storage { // desctuctor is explicitly defined as default here to complete the rule of 5 ~storage(); - [[nodiscard]] std::string_view get_binlog_name() const noexcept { - return binlog_names_.empty() ? std::string_view{} : binlog_names_.back(); + [[nodiscard]] bool has_current_binlog_name() const noexcept { + return !binlog_names_.empty(); } - [[nodiscard]] std::uint64_t get_position() const noexcept { + [[nodiscard]] std::string_view get_current_binlog_name() const noexcept { + return has_current_binlog_name() ? binlog_names_.back() + : std::string_view{}; + } + [[nodiscard]] std::uint64_t get_current_position() const noexcept { return position_; } [[nodiscard]] static bool check_binlog_name(std::string_view binlog_name) noexcept; + [[nodiscard]] bool is_binlog_open() const noexcept; void open_binlog(std::string_view binlog_name); void write_event(util::const_byte_span event_data); void close_binlog(); diff --git a/src/easymysql/binlog.cpp b/src/easymysql/binlog.cpp deleted file mode 100644 index 73231f5..0000000 --- a/src/easymysql/binlog.cpp +++ /dev/null @@ -1,98 +0,0 @@ -// Copyright (c) 2023-2024 Percona and/or its affiliates. -// -// This program is free software; you can redistribute it and/or modify -// it under the terms of the GNU General Public License, version 2.0, -// as published by the Free Software Foundation. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License, version 2.0, for more details. -// -// You should have received a copy of the GNU General Public License -// along with this program; if not, write to the Free Software -// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA - -#include "easymysql/binlog.hpp" - -#include -#include -#include -#include -#include - -#include - -#include "easymysql/binlog_deimpl_private.hpp" -#include "easymysql/connection.hpp" -#include "easymysql/connection_deimpl_private.hpp" -#include "easymysql/core_error_helpers_private.hpp" - -#include "util/byte_span_fwd.hpp" - -namespace easymysql { - -void binlog::rpl_deleter::operator()(void *ptr) const noexcept { - if (ptr != nullptr) { - // deleting via std::default_delete to avoid - // cppcoreguidelines-owning-memory warnings - using delete_helper = std::default_delete; - delete_helper{}(static_cast(ptr)); - } -} - -binlog::binlog(connection &conn, std::uint32_t server_id, - std::string_view file_name, std::uint64_t position) - : conn_{&conn}, - impl_{new MYSQL_RPL{.file_name_length = std::size(file_name), - .file_name = std::data(file_name), - .start_position = position, - .server_id = server_id, - // TODO: consider adding (or-ing) - // BINLOG_DUMP_NON_BLOCK and - // MYSQL_RPL_SKIP_HEARTBEAT to flags - .flags = 0U, - .gtid_set_encoded_size = 0U, - .fix_gtid_set = nullptr, - .gtid_set_arg = nullptr, - .size = 0U, - .buffer = nullptr - - }} { - assert(!conn.is_empty()); - - // WL#2540: Replication event checksums - // https://dev.mysql.com/worklog/task/?id=2540 - static constexpr std::string_view crc_query{ - "SET @source_binlog_checksum = 'NONE', " - "@master_binlog_checksum = 'NONE'"}; - conn_->execute_generic_query_noresult(crc_query); - - auto *casted_conn_impl = connection_deimpl::get(conn.impl_); - if (mysql_binlog_open(casted_conn_impl, binlog_deimpl::get(impl_)) != 0) { - raise_core_error_from_connection("cannot open binary log", conn); - } -} - -binlog::~binlog() { - if (conn_ != nullptr) { - assert(!conn_->is_empty()); - assert(!is_empty()); - mysql_binlog_close(connection_deimpl::get(conn_->impl_), - binlog_deimpl::get(impl_)); - } -} - -util::const_byte_span binlog::fetch() { - assert(conn_ != nullptr); - assert(!is_empty()); - auto *casted_conn_impl = connection_deimpl::get(conn_->impl_); - auto *casted_rpl_impl = binlog_deimpl::get(impl_); - if (mysql_binlog_fetch(casted_conn_impl, casted_rpl_impl) != 0) { - raise_core_error_from_connection("cannot fetch binlog event", *conn_); - } - return std::as_bytes( - std::span{casted_rpl_impl->buffer, casted_rpl_impl->size}); -} - -} // namespace easymysql diff --git a/src/easymysql/binlog.hpp b/src/easymysql/binlog.hpp deleted file mode 100644 index e9c95e8..0000000 --- a/src/easymysql/binlog.hpp +++ /dev/null @@ -1,65 +0,0 @@ -// Copyright (c) 2023-2024 Percona and/or its affiliates. -// -// This program is free software; you can redistribute it and/or modify -// it under the terms of the GNU General Public License, version 2.0, -// as published by the Free Software Foundation. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License, version 2.0, for more details. -// -// You should have received a copy of the GNU General Public License -// along with this program; if not, write to the Free Software -// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA - -#ifndef EASYMYSQL_BINLOG_HPP -#define EASYMYSQL_BINLOG_HPP - -#include "easymysql/binlog_fwd.hpp" // IWYU pragma: export - -#include -#include -#include -#include - -#include "easymysql/connection_fwd.hpp" - -#include "util/byte_span_fwd.hpp" - -namespace easymysql { - -class [[nodiscard]] binlog { - friend class connection; - -public: - binlog() = default; - - binlog(const binlog &) = delete; - binlog(binlog &&) = default; - binlog &operator=(const binlog &) = delete; - binlog &operator=(binlog &&) = default; - - ~binlog(); - - [[nodiscard]] bool is_empty() const noexcept { return !impl_; } - - // returns empty span on EOF - // throws an exception on error - util::const_byte_span fetch(); - -private: - binlog(connection &conn, std::uint32_t server_id, std::string_view file_name, - std::uint64_t position); - - connection *conn_{nullptr}; - struct rpl_deleter { - void operator()(void *ptr) const noexcept; - }; - using impl_ptr = std::unique_ptr; - impl_ptr impl_; -}; - -} // namespace easymysql - -#endif // EASYMYSQL_BINLOG_HPP diff --git a/src/easymysql/binlog_fwd.hpp b/src/easymysql/binlog_fwd.hpp deleted file mode 100644 index f08d849..0000000 --- a/src/easymysql/binlog_fwd.hpp +++ /dev/null @@ -1,27 +0,0 @@ -// Copyright (c) 2023-2024 Percona and/or its affiliates. -// -// This program is free software; you can redistribute it and/or modify -// it under the terms of the GNU General Public License, version 2.0, -// as published by the Free Software Foundation. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License, version 2.0, for more details. -// -// You should have received a copy of the GNU General Public License -// along with this program; if not, write to the Free Software -// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA - -#ifndef EASYMYSQL_BINLOG_FWD_HPP -#define EASYMYSQL_BINLOG_FWD_HPP - -#include - -namespace easymysql { - -class binlog; - -} // namespace easymysql - -#endif // EASYMYSQL_BINLOG_FWD_HPP diff --git a/src/easymysql/connection.cpp b/src/easymysql/connection.cpp index a061669..4a6ab7e 100644 --- a/src/easymysql/connection.cpp +++ b/src/easymysql/connection.cpp @@ -17,34 +17,119 @@ #include #include +#include +#include #include #include +#include +#include #include -#include "easymysql/binlog.hpp" #include "easymysql/connection_config.hpp" #include "easymysql/connection_deimpl_private.hpp" #include "easymysql/core_error_helpers_private.hpp" +#include "util/byte_span_fwd.hpp" #include "util/conversion_helpers.hpp" #include "util/exception_location_helpers.hpp" namespace easymysql { -void connection::connection_deleter::operator()(void *ptr) const noexcept { +void connection::mysql_deleter::operator()(void *ptr) const noexcept { if (ptr != nullptr) { mysql_close(static_cast(ptr)); } } +class connection::rpl_impl { +public: + rpl_impl(connection &conn, std::uint32_t server_id, + std::string_view file_name, std::uint64_t position, + connection_replication_mode_type blocking_mode) + : conn_{mysql_deimpl::get(conn.mysql_impl_)}, + rpl_{.file_name_length = std::size(file_name), + .file_name = std::data(file_name), + .start_position = position, + .server_id = server_id, + .flags = get_rpl_flags(blocking_mode), + .gtid_set_encoded_size = 0U, + .fix_gtid_set = nullptr, + .gtid_set_arg = nullptr, + .size = 0U, + .buffer = nullptr} { + if (mysql_binlog_open(conn_, &rpl_) != 0) { + raise_core_error_from_connection("cannot open binary log", conn); + } + } + + rpl_impl(const rpl_impl &) = delete; + rpl_impl &operator=(const rpl_impl &) = delete; + rpl_impl(rpl_impl &&) = delete; + rpl_impl &operator=(rpl_impl &&) = delete; + + ~rpl_impl() { mysql_binlog_close(conn_, &rpl_); } + + [[nodiscard]] bool fetch(util::const_byte_span &result) { + if (mysql_binlog_fetch(conn_, &rpl_) != 0) { + return false; + } + + result = std::as_bytes(std::span{rpl_.buffer, rpl_.size}); + return true; + } + +private: + MYSQL *conn_; + MYSQL_RPL rpl_; + + // MYSQL_RPL_SKIP_HEARTBEAT + // https://github.com/mysql/mysql-server/blob/mysql-cluster-8.0.37/include/mysql.h#L366 + + // USE_HEARTBEAT_EVENT_V2 + // https://github.com/mysql/mysql-server/blob/mysql-cluster-8.0.37/include/mysql.h#L372 + + // Explaining BINLOG_DUMP_NON_BLOCK + // https://github.com/mysql/mysql-server/blob/mysql-8.0.37/sql/rpl_constants.h#L45 + // https://github.com/mysql/mysql-server/blob/mysql-8.0.37/sql/rpl_binlog_sender.cc#L313 + + // For some reason BINLOG_DUMP_NON_BLOCK is a private constant, defining is + // locally + static constexpr unsigned int private_binlog_dump_non_block{1U}; + [[nodiscard]] static constexpr unsigned int + get_rpl_flags(connection_replication_mode_type blocking_mode) noexcept { + return MYSQL_RPL_SKIP_HEARTBEAT | + (blocking_mode == connection_replication_mode_type::non_blocking + ? private_binlog_dump_non_block + : 0U); + } +}; + +connection::connection() noexcept = default; + connection::connection(const connection_config &config) - : impl_{mysql_init(nullptr)} { - if (!impl_) { + : mysql_impl_{mysql_init(nullptr)}, rpl_impl_{} { + if (!mysql_impl_) { util::exception_location().raise( "cannot create MYSQL object"); } - auto *casted_impl = connection_deimpl::get(impl_); + auto *casted_impl = mysql_deimpl::get(mysql_impl_); + + const unsigned int connect_timeout{config.get<"connect_timeout">()}; + if (mysql_options(casted_impl, MYSQL_OPT_CONNECT_TIMEOUT, &connect_timeout) != + 0) { + raise_core_error_from_connection("cannot set MySQL connect timeout", *this); + } + const unsigned int read_timeout{config.get<"read_timeout">()}; + if (mysql_options(casted_impl, MYSQL_OPT_READ_TIMEOUT, &read_timeout) != 0) { + raise_core_error_from_connection("cannot set MySQL read timeout", *this); + } + const unsigned int write_timeout{config.get<"write_timeout">()}; + if (mysql_options(casted_impl, MYSQL_OPT_WRITE_TIMEOUT, &write_timeout) != + 0) { + raise_core_error_from_connection("cannot set MySQL write timeout", *this); + } + if (mysql_real_connect(casted_impl, /* host */ config.get<"host">().c_str(), /* user */ config.get<"user">().c_str(), @@ -58,46 +143,124 @@ connection::connection(const connection_config &config) } } +// default move constructor is OK as it will never do any +// 'mysql_impl_' / 'rpl_impl_' destruction +connection::connection(connection &&) noexcept = default; + +// default move assignment operator, e.g +// { +// this->mysql_impl_ = std::move(other.mysql_impl_); +// this->rpl_impl_ = std::move(other.rpl_impl_); +// } +// is not OK as the first statement will destroy the old 'MYSQL*' +// object in 'this->mysql_impl_', which will be needed during +// the destruction of the old 'MYSQL_RPL*' object from +// 'this->rpl_impl_' +connection &connection::operator=(connection &&other) noexcept { + connection local{std::move(other)}; + swap(local); + return *this; +} + +// default destructor is OK as 'mysql_impl_' and 'rpl_impl_' will be +// destrojed in the order reverse to how they were declared, e.g. +// 'rpl_impl_' first, 'mysql_impl_' second +connection::~connection() = default; + +void connection::swap(connection &other) noexcept { + mysql_impl_.swap(other.mysql_impl_); + rpl_impl_.swap(other.rpl_impl_); +} + std::uint32_t connection::get_server_version() const noexcept { assert(!is_empty()); return util::maybe_useless_integral_cast( - mysql_get_server_version(connection_deimpl::get_const_casted(impl_))); + mysql_get_server_version(mysql_deimpl::get_const_casted(mysql_impl_))); } std::string_view connection::get_readable_server_version() const noexcept { assert(!is_empty()); - return {mysql_get_server_info(connection_deimpl::get_const_casted(impl_))}; + return {mysql_get_server_info(mysql_deimpl::get_const_casted(mysql_impl_))}; } std::uint32_t connection::get_protocol_version() const noexcept { assert(!is_empty()); return util::maybe_useless_integral_cast( - mysql_get_proto_info(connection_deimpl::get_const_casted(impl_))); + mysql_get_proto_info(mysql_deimpl::get_const_casted(mysql_impl_))); } std::string_view connection::get_server_connection_info() const noexcept { assert(!is_empty()); - return {mysql_get_host_info(connection_deimpl::get_const_casted(impl_))}; + return {mysql_get_host_info(mysql_deimpl::get_const_casted(mysql_impl_))}; } std::string_view connection::get_character_set_name() const noexcept { assert(!is_empty()); - return {mysql_character_set_name(connection_deimpl::get_const_casted(impl_))}; -} - -binlog connection::create_binlog(std::uint32_t server_id, - std::string_view file_name, - std::uint64_t position) { - assert(!is_empty()); - return {*this, server_id, file_name, position}; + return { + mysql_character_set_name(mysql_deimpl::get_const_casted(mysql_impl_))}; } void connection::execute_generic_query_noresult(std::string_view query) { assert(!is_empty()); - auto *casted_impl = connection_deimpl::get(impl_); + if (is_in_replication_mode()) { + util::exception_location().raise( + "cannot execute query in replication mode"); + } + + auto *casted_impl = mysql_deimpl::get(mysql_impl_); if (mysql_real_query(casted_impl, std::data(query), std::size(query)) != 0) { raise_core_error_from_connection("cannot execute query", *this); } } +bool connection::ping() { + assert(!is_empty()); + if (is_in_replication_mode()) { + util::exception_location().raise( + "cannot perform ping in replication mode"); + } + + auto *casted_impl = mysql_deimpl::get(mysql_impl_); + return mysql_ping(casted_impl) == 0; +} + +void connection::switch_to_replication( + std::uint32_t server_id, std::string_view file_name, std::uint64_t position, + connection_replication_mode_type blocking_mode) { + assert(!is_empty()); + if (is_in_replication_mode()) { + util::exception_location().raise( + "connection has already been swithed to replication"); + } + + // WL#2540: Replication event checksums + // https://dev.mysql.com/worklog/task/?id=2540 + static constexpr std::string_view crc_query{ + "SET @source_binlog_checksum = 'NONE', " + "@master_binlog_checksum = 'NONE'"}; + execute_generic_query_noresult(crc_query); + + rpl_impl_ = std::make_unique(*this, server_id, file_name, position, + blocking_mode); +} + +bool connection::fetch_binlog_event(util::const_byte_span &portion) { + assert(!is_empty()); + if (!is_in_replication_mode()) { + util::exception_location().raise( + "connection has not been switched to replication"); + } + + const auto impl_fetch_result{rpl_impl_->fetch(portion)}; + + if (!impl_fetch_result) { + const auto native_error = mysql_errno(mysql_deimpl::get(mysql_impl_)); + if (native_error != CR_SERVER_LOST) { + raise_core_error_from_connection("cannot fetch binlog event", *this); + } + } + + return impl_fetch_result; +} + } // namespace easymysql diff --git a/src/easymysql/connection.hpp b/src/easymysql/connection.hpp index f20b494..b89383e 100644 --- a/src/easymysql/connection.hpp +++ b/src/easymysql/connection.hpp @@ -22,28 +22,30 @@ #include #include -#include "easymysql/binlog_fwd.hpp" #include "easymysql/connection_config_fwd.hpp" #include "easymysql/library_fwd.hpp" +#include "util/byte_span_fwd.hpp" + namespace easymysql { class [[nodiscard]] connection { friend class library; - friend class binlog; friend struct raise_access; public: - connection() = default; + connection() noexcept; connection(const connection &) = delete; - connection(connection &&) = default; + connection(connection &&other) noexcept; connection &operator=(const connection &) = delete; - connection &operator=(connection &&) = default; + connection &operator=(connection &&other) noexcept; + + ~connection(); - ~connection() = default; + void swap(connection &other) noexcept; - [[nodiscard]] bool is_empty() const noexcept { return !impl_; } + [[nodiscard]] bool is_empty() const noexcept { return !mysql_impl_; } [[nodiscard]] std::uint32_t get_server_version() const noexcept; [[nodiscard]] std::string_view get_readable_server_version() const noexcept; @@ -53,19 +55,35 @@ class [[nodiscard]] connection { [[nodiscard]] std::string_view get_server_connection_info() const noexcept; [[nodiscard]] std::string_view get_character_set_name() const noexcept; - [[nodiscard]] binlog create_binlog(std::uint32_t server_id, - std::string_view file_name, - std::uint64_t position); void execute_generic_query_noresult(std::string_view query); + [[nodiscard]] bool ping(); + + [[nodiscard]] bool is_in_replication_mode() const noexcept { + return static_cast(rpl_impl_); + } + + void switch_to_replication(std::uint32_t server_id, + std::string_view file_name, std::uint64_t position, + connection_replication_mode_type blocking_mode); + + // returns false on 'connection closed' / 'timeout' + // returns true and sets 'portion' to en empty span on EOF (last event read) + // returns true and sets 'portion' to event data on success + // throws an exception on any error other than 'connection closed' / 'timeout' + [[nodiscard]] bool fetch_binlog_event(util::const_byte_span &portion); private: explicit connection(const connection_config &config); - struct connection_deleter { + struct mysql_deleter { void operator()(void *ptr) const noexcept; }; - using impl_ptr = std::unique_ptr; - impl_ptr impl_; + using mysql_impl_ptr = std::unique_ptr; + mysql_impl_ptr mysql_impl_; + + class rpl_impl; + using rpl_impl_ptr = std::unique_ptr; + rpl_impl_ptr rpl_impl_; }; } // namespace easymysql diff --git a/src/easymysql/connection_config.hpp b/src/easymysql/connection_config.hpp index c395a29..3d62ce9 100644 --- a/src/easymysql/connection_config.hpp +++ b/src/easymysql/connection_config.hpp @@ -28,10 +28,13 @@ namespace easymysql { struct [[nodiscard]] connection_config : util::nv_tuple< // clang-format off - util::nv<"host" , std::string>, - util::nv<"port" , std::uint16_t>, - util::nv<"user" , std::string>, - util::nv<"password", std::string> + util::nv<"host" , std::string>, + util::nv<"port" , std::uint16_t>, + util::nv<"user" , std::string>, + util::nv<"password" , std::string>, + util::nv<"connect_timeout", std::uint32_t>, + util::nv<"read_timeout" , std::uint32_t>, + util::nv<"write_timeout" , std::uint32_t> // clang-format on > { [[nodiscard]] bool has_password() const noexcept { diff --git a/src/easymysql/connection_deimpl_private.hpp b/src/easymysql/connection_deimpl_private.hpp index 6d2e063..0e338c2 100644 --- a/src/easymysql/connection_deimpl_private.hpp +++ b/src/easymysql/connection_deimpl_private.hpp @@ -22,7 +22,8 @@ namespace easymysql { -using connection_deimpl = util::deimpl; +using mysql_deimpl = util::deimpl; +using rpl_deimpl = util::deimpl; } // namespace easymysql diff --git a/src/easymysql/connection_fwd.hpp b/src/easymysql/connection_fwd.hpp index bacf170..28da75e 100644 --- a/src/easymysql/connection_fwd.hpp +++ b/src/easymysql/connection_fwd.hpp @@ -18,6 +18,8 @@ namespace easymysql { +enum class connection_replication_mode_type { blocking, non_blocking }; + class connection; } // namespace easymysql diff --git a/src/easymysql/core_error_helpers_private.cpp b/src/easymysql/core_error_helpers_private.cpp index 18f5272..54fe7e4 100644 --- a/src/easymysql/core_error_helpers_private.cpp +++ b/src/easymysql/core_error_helpers_private.cpp @@ -30,8 +30,9 @@ namespace easymysql { struct raise_access { - static const connection::impl_ptr &get(const connection &conn) noexcept { - return conn.impl_; + static const connection::mysql_impl_ptr & + get(const connection &conn) noexcept { + return conn.mysql_impl_; } }; @@ -46,8 +47,7 @@ raise_core_error_from_connection(std::string_view user_message, // because otherwise the location will always point to this // particular line on this particular file regardless of the actual // location from where this function is called - auto *casted_impl = - connection_deimpl::get_const_casted(raise_access::get(conn)); + auto *casted_impl = mysql_deimpl::get_const_casted(raise_access::get(conn)); std::string message{}; if (!user_message.empty()) { message += user_message;
Post-Header for Rotate_event