diff --git a/CMakeLists.txt b/CMakeLists.txt index fbc4059..a9d8887 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -185,6 +185,9 @@ set(source_files src/binsrv/main_config.hpp src/binsrv/main_config.cpp + src/binsrv/operation_mode_type_fwd.hpp + src/binsrv/operation_mode_type.hpp + src/binsrv/s3_error_helpers_private.hpp src/binsrv/s3_error_helpers_private.cpp src/binsrv/s3_error.hpp @@ -241,11 +244,6 @@ set(source_files src/easymysql/core_error.hpp src/easymysql/core_error.cpp - src/easymysql/binlog_deimpl_private.hpp - src/easymysql/binlog_fwd.hpp - src/easymysql/binlog.hpp - src/easymysql/binlog.cpp - src/easymysql/connection_deimpl_private.hpp src/easymysql/connection_fwd.hpp src/easymysql/connection.hpp diff --git a/main_config.json b/main_config.json index b6d07d9..28d4452 100644 --- a/main_config.json +++ b/main_config.json @@ -7,7 +7,10 @@ "host": "127.0.0.1", "port": 3306, "user": "root", - "password": "" + "password": "", + "connect_timeout": 20, + "read_timeout": 60, + "write_timeout": 60 }, "storage": { "uri": "file:///home/user/vault" diff --git a/mtr/binlog_streaming/t/binsrv.test b/mtr/binlog_streaming/t/binsrv.test index 2441b36..62ac0d4 100644 --- a/mtr/binlog_streaming/t/binsrv.test +++ b/mtr/binlog_streaming/t/binsrv.test @@ -105,7 +105,10 @@ eval SET @binsrv_config_json = JSON_OBJECT( 'host', @connection_host, 'port', @@global.port, 'user', @connection_user, - 'password', '' + 'password', '', + 'connect_timeout', 20, + 'read_timeout', 60, + 'write_timeout', 60 ), 'storage', JSON_OBJECT( 'uri', @storage_uri @@ -144,7 +147,7 @@ if ($storage_backend == file) --echo *** Executing the Binlog Server utility to download all binlog data --echo *** from the server to the directory (second --echo *** binlog is still open / in use). ---exec $BINSRV $binsrv_config_file_path > /dev/null +--exec $BINSRV fetch $binsrv_config_file_path > /dev/null --echo --echo *** Checking that the Binlog Server utility detected an empty storage @@ -220,7 +223,7 @@ FLUSH BINARY LOGS; --echo *** Executing the Binlog Server utility one more time (the second --echo *** binlog is no longer open / in use). Here we should also continue --echo *** streaming binlog events from the last saved position. ---exec $BINSRV $binsrv_config_file_path > /dev/null +--exec $BINSRV fetch $binsrv_config_file_path > /dev/null --echo --echo *** Checking that the Binlog Server utility detected a previously diff --git a/src/app.cpp b/src/app.cpp index 17391e0..1f46672 100644 --- a/src/app.cpp +++ b/src/app.cpp @@ -15,6 +15,7 @@ #include #include +#include #include #include #include @@ -26,9 +27,11 @@ #include #include #include +#include #include #include +#include #include "binsrv/basic_logger.hpp" #include "binsrv/basic_storage_backend.hpp" @@ -36,6 +39,7 @@ #include "binsrv/log_severity.hpp" #include "binsrv/logger_factory.hpp" #include "binsrv/main_config.hpp" +#include "binsrv/operation_mode_type.hpp" #include "binsrv/storage.hpp" #include "binsrv/storage_backend_factory.hpp" @@ -45,9 +49,9 @@ #include "binsrv/event/protocol_traits_fwd.hpp" #include "binsrv/event/reader_context.hpp" -#include "easymysql/binlog.hpp" #include "easymysql/connection.hpp" #include "easymysql/connection_config.hpp" +#include "easymysql/core_error.hpp" #include "easymysql/library.hpp" #include "util/byte_span_fwd.hpp" @@ -57,6 +61,67 @@ namespace { +void log_storage_info(binsrv::basic_logger &logger, + const binsrv::storage &storage) { + std::string msg{}; + if (storage.has_current_binlog_name()) { + msg = "binlog storage initialized at \""; + msg += storage.get_current_binlog_name(); + msg += "\":"; + msg += std::to_string(storage.get_current_position()); + } else { + msg = "binlog storage initialized on an empty directory"; + } + logger.log(binsrv::log_severity::info, msg); +} + +void log_library_info(binsrv::basic_logger &logger, + const easymysql::library &mysql_lib) { + std::string msg{}; + msg = "mysql client version: "; + msg += mysql_lib.get_readable_client_version(); + logger.log(binsrv::log_severity::info, msg); +} + +void log_connection_info(binsrv::basic_logger &logger, + const easymysql::connection &connection) { + std::string msg{}; + msg = "mysql server version: "; + msg += connection.get_readable_server_version(); + logger.log(binsrv::log_severity::info, msg); + + logger.log(binsrv::log_severity::info, + "mysql protocol version: " + + std::to_string(connection.get_protocol_version())); + + msg = "mysql server connection info: "; + msg += connection.get_server_connection_info(); + logger.log(binsrv::log_severity::info, msg); + + msg = "mysql connection character set: "; + msg += connection.get_character_set_name(); + logger.log(binsrv::log_severity::info, msg); +} + +void log_replication_info( + binsrv::basic_logger &logger, std::uint32_t server_id, + std::string_view current_binlog_name, std::uint64_t current_binlog_position, + easymysql::connection_replication_mode_type blocking_mode) { + std::string msg{}; + msg = "replication info (server id "; + msg += std::to_string(server_id); + msg += ", "; + msg += (current_binlog_name.empty() ? "" : current_binlog_name); + msg += ":"; + msg += std::to_string(current_binlog_position); + msg += ", "; + msg += (blocking_mode == easymysql::connection_replication_mode_type::blocking + ? "blocking" + : "non-blocking"); + msg += ")"; + logger.log(binsrv::log_severity::info, msg); +} + void log_span_dump(binsrv::basic_logger &logger, util::const_byte_span portion) { static constexpr std::size_t bytes_per_dump_line{16U}; @@ -91,9 +156,101 @@ void log_span_dump(binsrv::basic_logger &logger, } } -void receive_binlog_events(binsrv::basic_logger &logger, - easymysql::binlog &binlog, - binsrv::storage &storage) { +void process_binlog_event(const binsrv::event::event ¤t_event, + util::const_byte_span portion, + binsrv::storage &storage, bool &skip_open_binlog) { + const auto ¤t_common_header = current_event.get_common_header(); + const auto code = current_common_header.get_type_code(); + + const auto is_artificial{current_common_header.get_flags().has_element( + binsrv::event::flag_type::artificial)}; + const auto is_pseudo{current_common_header.get_next_event_position_raw() == + 0U}; + + if (code == binsrv::event::code_type::rotate && is_artificial) { + const auto ¤t_rotate_body = + current_event.get_body(); + if (skip_open_binlog) { + // we are supposed to get here after reconnection, so doing + // basic integrity checks + if (current_rotate_body.get_binlog() != + storage.get_current_binlog_name()) { + util::exception_location().raise( + "unexpected binlog name in artificial rotate event after " + "reconnection"); + } + const auto ¤t_rotate_post_header = + current_event.get_post_header(); + if (current_rotate_post_header.get_position_raw() != + storage.get_current_position()) { + util::exception_location().raise( + "unexpected binlog position in artificial rotate event after " + "reconnection"); + } + + skip_open_binlog = false; + } else { + storage.open_binlog(current_rotate_body.get_binlog()); + } + } + if (!is_artificial && !is_pseudo) { + storage.write_event(portion); + } + if (code == binsrv::event::code_type::rotate && !is_artificial) { + storage.close_binlog(); + } +} + +void receive_binlog_events( + binsrv::operation_mode_type operation_mode, binsrv::basic_logger &logger, + const easymysql::library &mysql_lib, + const easymysql::connection_config &connection_config, + std::uint32_t server_id, binsrv::storage &storage) { + easymysql::connection connection{}; + try { + connection = mysql_lib.create_connection(connection_config); + } catch (const easymysql::core_error &) { + if (operation_mode == binsrv::operation_mode_type::fetch) { + throw; + } + logger.log(binsrv::log_severity::info, + "unable to establish connection to mysql server"); + return; + } + + logger.log(binsrv::log_severity::info, + "established connection to mysql server"); + + log_connection_info(logger, connection); + + const auto current_binlog_name{storage.get_current_binlog_name()}; + // if storage binlog name is detected to be empty (empty data directory), we + // start streaming from the position 'magic_binlog_offset' (4) + const auto current_binlog_position{current_binlog_name.empty() + ? binsrv::event::magic_binlog_offset + : storage.get_current_position()}; + + const auto blocking_mode{ + operation_mode == binsrv::operation_mode_type::fetch + ? easymysql::connection_replication_mode_type::non_blocking + : easymysql::connection_replication_mode_type::blocking}; + + try { + connection.switch_to_replication(server_id, current_binlog_name, + current_binlog_position, blocking_mode); + } catch (const easymysql::core_error &) { + if (operation_mode == binsrv::operation_mode_type::fetch) { + throw; + } + logger.log(binsrv::log_severity::info, "unable to switch to replication"); + return; + } + + logger.log(binsrv::log_severity::info, "switched to replication"); + + log_replication_info(logger, server_id, current_binlog_name, + current_binlog_position, blocking_mode); + // Network streams are requested with COM_BINLOG_DUMP and // each Binlog Event response is prepended with 00 OK-byte. static constexpr std::byte expected_event_packet_prefix{'\0'}; @@ -102,7 +259,14 @@ void receive_binlog_events(binsrv::basic_logger &logger, binsrv::event::reader_context context{}; - while (!(portion = binlog.fetch()).empty()) { + // if binlog is still open, there is no sense to close it and re-open + // instead, we will just instruct this loop to process the + // very first artificial rotate event in a special way + bool skip_open_binlog{storage.is_binlog_open()}; + bool fetch_result{}; + + while ((fetch_result = connection.fetch_binlog_event(portion)) && + !portion.empty()) { if (portion[0] != expected_event_packet_prefix) { util::exception_location().raise( "unexpected event prefix"); @@ -120,28 +284,20 @@ void receive_binlog_events(binsrv::basic_logger &logger, logger.log(binsrv::log_severity::debug, "Parsed event:\n" + boost::lexical_cast(current_event)); - - const auto ¤t_common_header = current_event.get_common_header(); - const auto code = current_common_header.get_type_code(); log_span_dump(logger, portion); - const auto is_artificial{current_common_header.get_flags().has_element( - binsrv::event::flag_type::artificial)}; - const auto is_pseudo{current_common_header.get_next_event_position_raw() == - 0U}; - - if (code == binsrv::event::code_type::rotate && is_artificial) { - const auto ¤t_rotate_body = - current_event.get_body(); - - storage.open_binlog(current_rotate_body.get_binlog()); - } - if (!is_artificial && !is_pseudo) { - storage.write_event(portion); - } - if (code == binsrv::event::code_type::rotate && !is_artificial) { - storage.close_binlog(); + process_binlog_event(current_event, portion, storage, skip_open_binlog); + } + if (fetch_result) { + logger.log(binsrv::log_severity::info, + "fetched everything and disconnected"); + } else { + if (operation_mode == binsrv::operation_mode_type::fetch) { + util::exception_location().raise( + "fetch operation did not reach EOF reading binlog events"); } + logger.log(binsrv::log_severity::info, + "timed out waiting for events and disconnected"); } } @@ -156,19 +312,28 @@ int main(int argc, char *argv[]) { const auto number_of_cmd_args = std::size(cmd_args); const auto executable_name = util::extract_executable_name(cmd_args); - if (number_of_cmd_args != binsrv::main_config::flattened_size + 1 && - number_of_cmd_args != 2) { - std::cerr << "usage: " << executable_name + binsrv::operation_mode_type operation_mode{ + binsrv::operation_mode_type::delimiter}; + auto cmd_args_validated{ + (number_of_cmd_args == binsrv::main_config::flattened_size + 2U || + number_of_cmd_args == 3U) && + boost::conversion::try_lexical_convert(cmd_args[1], operation_mode) && + operation_mode != binsrv::operation_mode_type::delimiter}; + + if (!cmd_args_validated) { + std::cerr << "usage: " << executable_name << " (fetch|pull))" << " " " " + " " " \n" - << " " << executable_name << " \n"; + << " " << executable_name + << " (fetch|pull)) \n"; return exit_code; } binsrv::basic_logger_ptr logger; try { - const auto default_log_level = binsrv::log_severity::trace; + static constexpr auto default_log_level = binsrv::log_severity::trace; const binsrv::logger_config initial_logger_config{ {{default_log_level}, {""}}}; @@ -181,16 +346,26 @@ int main(int argc, char *argv[]) { logger->log(binsrv::log_severity::delimiter, util::get_readable_command_line_arguments(cmd_args)); + if (operation_mode == binsrv::operation_mode_type::fetch) { + logger->log(binsrv::log_severity::delimiter, + "'fetch' operation mode specified"); + } else if (operation_mode == binsrv::operation_mode_type::pull) { + logger->log(binsrv::log_severity::delimiter, + "'pull' operation mode specified"); + } else { + assert(false); + } + binsrv::main_config_ptr config; - if (number_of_cmd_args == 2U) { + if (number_of_cmd_args == 3U) { logger->log(binsrv::log_severity::delimiter, - "Reading connection configuration from the JSON file."); - config = std::make_shared(cmd_args[1]); - } else if (number_of_cmd_args == binsrv::main_config::flattened_size + 1) { + "reading connection configuration from the JSON file."); + config = std::make_shared(cmd_args[2]); + } else if (number_of_cmd_args == binsrv::main_config::flattened_size + 2U) { logger->log(binsrv::log_severity::delimiter, - "Reading connection configuration from the command line " + "reading connection configuration from the command line " "arguments."); - config = std::make_shared(cmd_args.subspan(1U)); + config = std::make_shared(cmd_args.subspan(2U)); } else { assert(false); } @@ -201,7 +376,7 @@ int main(int argc, char *argv[]) { logger->set_min_level(logger_config.get<"level">()); } else { logger->log(binsrv::log_severity::delimiter, - "Redirecting logging to \"" + logger_config.get<"file">() + + "redirecting logging to \"" + logger_config.get<"file">() + "\""); auto new_logger = binsrv::logger_factory::create(logger_config); std::swap(logger, new_logger); @@ -226,58 +401,41 @@ int main(int argc, char *argv[]) { logger->log(binsrv::log_severity::info, "created binlog storage with the provided backend"); - const auto last_binlog_name{storage.get_binlog_name()}; - // if storage position is detected to be 0 (empty data directory), we - // start streaming from the position magic_binlog_offset (4) - const auto last_binlog_position{ - std::max(binsrv::event::magic_binlog_offset, storage.get_position())}; - if (last_binlog_name.empty()) { - msg = "binlog storage initialized on an empty directory"; - } else { - msg = "binlog storage initialized at \""; - msg += last_binlog_name; - msg += "\":"; - msg += std::to_string(last_binlog_position); - } - logger->log(binsrv::log_severity::info, msg); + log_storage_info(*logger, storage); const auto &connection_config = config->root().get<"connection">(); logger->log(binsrv::log_severity::info, "mysql connection string: " + connection_config.get_connection_string()); - const easymysql::library mysql_lib; - logger->log(binsrv::log_severity::info, "initialized mysql client library"); + // TODO: put these parameters into configuration + static constexpr std::uint32_t default_server_id{42U}; + static constexpr std::uint32_t default_idle_time_seconds{5U}; - msg = "mysql client version: "; - msg += mysql_lib.get_readable_client_version(); - logger->log(binsrv::log_severity::info, msg); + const auto server_id{default_server_id}; + const auto idle_time_seconds{default_idle_time_seconds}; - auto connection = mysql_lib.create_connection(connection_config); - logger->log(binsrv::log_severity::info, - "established connection to mysql server"); - msg = "mysql server version: "; - msg += connection.get_readable_server_version(); - logger->log(binsrv::log_severity::info, msg); - - logger->log(binsrv::log_severity::info, - "mysql protocol version: " + - std::to_string(connection.get_protocol_version())); + const easymysql::library mysql_lib; + logger->log(binsrv::log_severity::info, "initialized mysql client library"); - msg = "mysql server connection info: "; - msg += connection.get_server_connection_info(); - logger->log(binsrv::log_severity::info, msg); + log_library_info(*logger, mysql_lib); - msg = "mysql connection character set: "; - msg += connection.get_character_set_name(); - logger->log(binsrv::log_severity::info, msg); + receive_binlog_events(operation_mode, *logger, mysql_lib, connection_config, + server_id, storage); - static constexpr std::uint32_t default_server_id{0U}; - auto binlog = connection.create_binlog(default_server_id, last_binlog_name, - last_binlog_position); - logger->log(binsrv::log_severity::info, "opened binary log connection"); + if (operation_mode == binsrv::operation_mode_type::pull) { + std::size_t iteration_number{1U}; + while (true) { + std::this_thread::sleep_for(std::chrono::seconds(idle_time_seconds)); - receive_binlog_events(*logger, binlog, storage); + logger->log(binsrv::log_severity::info, + "awoke after sleep and trying to reconnect (" + + std::to_string(iteration_number) + ")"); + receive_binlog_events(operation_mode, *logger, mysql_lib, + connection_config, server_id, storage); + ++iteration_number; + } + } logger->log(binsrv::log_severity::info, "successfully shut down"); exit_code = EXIT_SUCCESS; diff --git a/src/binsrv/event/checksum_algorithm_type.hpp b/src/binsrv/event/checksum_algorithm_type.hpp index a02f1ec..242ef64 100644 --- a/src/binsrv/event/checksum_algorithm_type.hpp +++ b/src/binsrv/event/checksum_algorithm_type.hpp @@ -27,7 +27,7 @@ namespace binsrv::event { // NOLINTBEGIN(cppcoreguidelines-macro-usage) // Checksum algorithm type codes copied from -// https://github.com/mysql/mysql-server/blob/mysql-8.0.36/libbinlogevents/include/binlog_event.h#L425 +// https://github.com/mysql/mysql-server/blob/mysql-8.0.37/libbinlogevents/include/binlog_event.h#L426 // clang-format off #define BINSRV_CHECKSUM_ALGORITHM_TYPE_XY_SEQUENCE() \ BINSRV_CHECKSUM_ALGORITHM_TYPE_XY_MACRO(off , 0), \ diff --git a/src/binsrv/event/code_type.hpp b/src/binsrv/event/code_type.hpp index a13c26e..adc03f1 100644 --- a/src/binsrv/event/code_type.hpp +++ b/src/binsrv/event/code_type.hpp @@ -27,7 +27,7 @@ namespace binsrv::event { // NOLINTBEGIN(cppcoreguidelines-macro-usage) // Event type codes copied from -// https://github.com/mysql/mysql-server/blob/mysql-8.0.36/libbinlogevents/include/binlog_event.h#L274 +// https://github.com/mysql/mysql-server/blob/mysql-8.0.37/libbinlogevents/include/binlog_event.h#L275 // clang-format off #define BINSRV_EVENT_CODE_TYPE_XY_SEQUENCE() \ BINSRV_EVENT_CODE_TYPE_XY_MACRO(unknown , 0), \ diff --git a/src/binsrv/event/common_header.cpp b/src/binsrv/event/common_header.cpp index 4c53f7a..e00ce55 100644 --- a/src/binsrv/event/common_header.cpp +++ b/src/binsrv/event/common_header.cpp @@ -40,7 +40,7 @@ common_header::common_header(util::const_byte_span portion) { // TODO: rework with direct member initialization /* - https://github.com/mysql/mysql-server/blob/mysql-8.0.36/libbinlogevents/src/binlog_event.cpp#L197 + https://github.com/mysql/mysql-server/blob/mysql-8.0.37/libbinlogevents/src/binlog_event.cpp#L198 The first 19 bytes in the header is as follows: +============================================+ diff --git a/src/binsrv/event/flag_type.hpp b/src/binsrv/event/flag_type.hpp index 3cb56c8..ea5458d 100644 --- a/src/binsrv/event/flag_type.hpp +++ b/src/binsrv/event/flag_type.hpp @@ -29,9 +29,9 @@ namespace binsrv::event { // NOLINTBEGIN(cppcoreguidelines-macro-usage) // Event flags copied from -// https://github.com/mysql/mysql-server/blob/mysql-8.0.36/sql/log_event.h#L246 +// https://github.com/mysql/mysql-server/blob/mysql-8.0.37/sql/log_event.h#L247 // 'binlog_in_use' flag is copied from -// https://github.com/mysql/mysql-server/blob/mysql-8.0.36/libbinlogevents/include/binlog_event.h#L269 +// https://github.com/mysql/mysql-server/blob/mysql-8.0.37/libbinlogevents/include/binlog_event.h#L270 // This flag is used as a marker in the common header section of the very // first format description event that this particular binlog is currently in // use. It us cleared(rewritten) by the server when the binary log is diff --git a/src/binsrv/event/format_description_post_header_impl.cpp b/src/binsrv/event/format_description_post_header_impl.cpp index 865ea6d..07842b8 100644 --- a/src/binsrv/event/format_description_post_header_impl.cpp +++ b/src/binsrv/event/format_description_post_header_impl.cpp @@ -40,7 +40,7 @@ generic_post_header_impl:: // TODO: rework with direct member initialization /* - https://github.com/mysql/mysql-server/blob/mysql-8.0.36/libbinlogevents/include/control_events.h#L286 + https://github.com/mysql/mysql-server/blob/mysql-8.0.37/libbinlogevents/include/control_events.h#L287 +=====================================+ | event | binlog_version 19 : 2 | = 4 diff --git a/src/binsrv/event/reader_context.cpp b/src/binsrv/event/reader_context.cpp index 2373b2e..439e2ec 100644 --- a/src/binsrv/event/reader_context.cpp +++ b/src/binsrv/event/reader_context.cpp @@ -39,8 +39,8 @@ void reader_context::process_event(const event ¤t_event) { const auto is_artificial{ common_header.get_flags().has_element(flag_type::artificial)}; const auto is_pseudo{common_header.get_next_event_position_raw() == 0U}; - // artificial rotate events must always have enext event position and - // timestamp setto 0 + // artificial rotate events must always have next event position and + // timestamp set to 0 if (code == code_type::rotate && is_artificial) { if (common_header.get_timestamp_raw() != 0U) { util::exception_location().raise( diff --git a/src/binsrv/event/rotate_post_header_impl.cpp b/src/binsrv/event/rotate_post_header_impl.cpp index 5a70a05..e15eedc 100644 --- a/src/binsrv/event/rotate_post_header_impl.cpp +++ b/src/binsrv/event/rotate_post_header_impl.cpp @@ -34,7 +34,7 @@ generic_post_header_impl::generic_post_header_impl( // TODO: rework with direct member initialization /* - https://github.com/mysql/mysql-server/blob/mysql-8.0.36/libbinlogevents/include/control_events.h#L53 + https://github.com/mysql/mysql-server/blob/mysql-8.0.37/libbinlogevents/include/control_events.h#L54 diff --git a/src/binsrv/filesystem_storage_backend.cpp b/src/binsrv/filesystem_storage_backend.cpp index b1c9661..c420f25 100644 --- a/src/binsrv/filesystem_storage_backend.cpp +++ b/src/binsrv/filesystem_storage_backend.cpp @@ -167,6 +167,10 @@ void filesystem_storage_backend::do_write_data_to_stream( util::exception_location().raise( "cannot write data to the underlying stream file"); } + if (!ofs_.flush()) { + util::exception_location().raise( + "cannot flush the underlying stream file"); + } // TODO: make sure that the data is properly written to the disk // use fsync() system call here } diff --git a/src/binsrv/operation_mode_type.hpp b/src/binsrv/operation_mode_type.hpp new file mode 100644 index 0000000..1648974 --- /dev/null +++ b/src/binsrv/operation_mode_type.hpp @@ -0,0 +1,96 @@ +// Copyright (c) 2023-2024 Percona and/or its affiliates. +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License, version 2.0, +// as published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License, version 2.0, for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +#ifndef BINSRV_OPERATION_MODE_TYPE_HPP +#define BINSRV_OPERATION_MODE_TYPE_HPP + +#include "binsrv/operation_mode_type_fwd.hpp" // IWYU pragma: export + +#include +#include +#include +#include +#include +#include + +#include "util/conversion_helpers.hpp" + +namespace binsrv { + +// NOLINTBEGIN(cppcoreguidelines-macro-usage) +// clang-format off +#define BINSRV_OPERATION_MODE_TYPE_X_SEQUENCE() \ + BINSRV_OPERATION_MODE_TYPE_X_MACRO(fetch), \ + BINSRV_OPERATION_MODE_TYPE_X_MACRO(pull ) +// clang-format on + +#define BINSRV_OPERATION_MODE_TYPE_X_MACRO(X) X +enum class operation_mode_type : std::uint8_t { + BINSRV_OPERATION_MODE_TYPE_X_SEQUENCE(), + delimiter +}; +#undef BINSRV_OPERATION_MODE_TYPE_X_MACRO + +inline std::string_view +to_string_view(operation_mode_type operation_mode) noexcept { + using namespace std::string_view_literals; +#define BINSRV_OPERATION_MODE_TYPE_X_MACRO(X) #X##sv + static constexpr std::array labels{BINSRV_OPERATION_MODE_TYPE_X_SEQUENCE(), + ""sv}; +#undef BINSRV_OPERATION_MODE_TYPE_X_MACRO + const auto index{util::enum_to_index( + std::min(operation_mode_type::delimiter, operation_mode))}; + // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-constant-array-index) + return labels[index]; +} +#undef BINSRV_OPERATION_MODE_TYPE_X_SEQUENCE +// NOLINTEND(cppcoreguidelines-macro-usage) + +template + requires std::same_as +std::basic_ostream & +operator<<(std::basic_ostream &output, + operation_mode_type operation_mode) { + return output << to_string_view(operation_mode); +} + +template + requires std::same_as +std::basic_istream & +operator>>(std::basic_istream &input, + operation_mode_type &operation_mode) { + std::string operation_mode_str; + input >> operation_mode_str; + if (!input) { + return input; + } + std::size_t index{0U}; + const auto max_index = util::enum_to_index(operation_mode_type::delimiter); + while (index < max_index && + to_string_view(util::index_to_enum(index)) != + operation_mode_str) { + ++index; + } + if (index < max_index) { + operation_mode = util::index_to_enum(index); + } else { + input.setstate(std::ios_base::failbit); + } + return input; +} + +} // namespace binsrv + +#endif // BINSRV_OPERATION_MODE_TYPE_HPP diff --git a/src/easymysql/binlog_deimpl_private.hpp b/src/binsrv/operation_mode_type_fwd.hpp similarity index 51% rename from src/easymysql/binlog_deimpl_private.hpp rename to src/binsrv/operation_mode_type_fwd.hpp index 6b65ac3..b69f1fe 100644 --- a/src/easymysql/binlog_deimpl_private.hpp +++ b/src/binsrv/operation_mode_type_fwd.hpp @@ -13,17 +13,29 @@ // along with this program; if not, write to the Free Software // Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA -#ifndef EASYMYSQL_BINLOG_DEIMPL_PRIVATE_HPP -#define EASYMYSQL_BINLOG_DEIMPL_PRIVATE_HPP +#ifndef BINSRV_OPERATION_MODE_TYPE_FWD_HPP +#define BINSRV_OPERATION_MODE_TYPE_FWD_HPP -#include +#include +#include +#include -#include "util/impl_helpers.hpp" +namespace binsrv { -namespace easymysql { +enum class operation_mode_type : std::uint8_t; -using binlog_deimpl = util::deimpl; +template + requires std::same_as +std::basic_ostream & +operator<<(std::basic_ostream &output, + operation_mode_type operation_mode); -} // namespace easymysql +template + requires std::same_as +std::basic_istream & +operator>>(std::basic_istream &input, + operation_mode_type &operation_mode); -#endif // EASYMYSQL_BINLOG_DEIMPL_PRIVATE_HPP +} // namespace binsrv + +#endif // BINSRV_OPERATION_MODE_TYPE_FWD_HPP diff --git a/src/binsrv/s3_storage_backend.cpp b/src/binsrv/s3_storage_backend.cpp index 9b91f6a..3332657 100644 --- a/src/binsrv/s3_storage_backend.cpp +++ b/src/binsrv/s3_storage_backend.cpp @@ -521,7 +521,11 @@ void s3_storage_backend::do_write_data_to_stream(util::const_byte_span data) { if (!tmp_fstream_.write(std::data(data_sv), static_cast(std::size(data_sv)))) { util::exception_location().raise( - "cannot write data to the underlying stream file"); + "cannot write data to the temporary file for S3 object"); + } + if (!tmp_fstream_.flush()) { + util::exception_location().raise( + "cannot flush the temporary file for S3 object"); } } diff --git a/src/binsrv/storage.cpp b/src/binsrv/storage.cpp index a45f97e..233df6e 100644 --- a/src/binsrv/storage.cpp +++ b/src/binsrv/storage.cpp @@ -78,6 +78,10 @@ storage::check_binlog_name(std::string_view binlog_name) noexcept { std::string_view::npos; } +[[nodiscard]] bool storage::is_binlog_open() const noexcept { + return backend_->is_stream_open(); +} + void storage::open_binlog(std::string_view binlog_name) { if (!check_binlog_name(binlog_name)) { util::exception_location().raise( diff --git a/src/binsrv/storage.hpp b/src/binsrv/storage.hpp index 545570e..6a6f6bc 100644 --- a/src/binsrv/storage.hpp +++ b/src/binsrv/storage.hpp @@ -44,16 +44,21 @@ class [[nodiscard]] storage { // desctuctor is explicitly defined as default here to complete the rule of 5 ~storage(); - [[nodiscard]] std::string_view get_binlog_name() const noexcept { - return binlog_names_.empty() ? std::string_view{} : binlog_names_.back(); + [[nodiscard]] bool has_current_binlog_name() const noexcept { + return !binlog_names_.empty(); } - [[nodiscard]] std::uint64_t get_position() const noexcept { + [[nodiscard]] std::string_view get_current_binlog_name() const noexcept { + return has_current_binlog_name() ? binlog_names_.back() + : std::string_view{}; + } + [[nodiscard]] std::uint64_t get_current_position() const noexcept { return position_; } [[nodiscard]] static bool check_binlog_name(std::string_view binlog_name) noexcept; + [[nodiscard]] bool is_binlog_open() const noexcept; void open_binlog(std::string_view binlog_name); void write_event(util::const_byte_span event_data); void close_binlog(); diff --git a/src/easymysql/binlog.cpp b/src/easymysql/binlog.cpp deleted file mode 100644 index 73231f5..0000000 --- a/src/easymysql/binlog.cpp +++ /dev/null @@ -1,98 +0,0 @@ -// Copyright (c) 2023-2024 Percona and/or its affiliates. -// -// This program is free software; you can redistribute it and/or modify -// it under the terms of the GNU General Public License, version 2.0, -// as published by the Free Software Foundation. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License, version 2.0, for more details. -// -// You should have received a copy of the GNU General Public License -// along with this program; if not, write to the Free Software -// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA - -#include "easymysql/binlog.hpp" - -#include -#include -#include -#include -#include - -#include - -#include "easymysql/binlog_deimpl_private.hpp" -#include "easymysql/connection.hpp" -#include "easymysql/connection_deimpl_private.hpp" -#include "easymysql/core_error_helpers_private.hpp" - -#include "util/byte_span_fwd.hpp" - -namespace easymysql { - -void binlog::rpl_deleter::operator()(void *ptr) const noexcept { - if (ptr != nullptr) { - // deleting via std::default_delete to avoid - // cppcoreguidelines-owning-memory warnings - using delete_helper = std::default_delete; - delete_helper{}(static_cast(ptr)); - } -} - -binlog::binlog(connection &conn, std::uint32_t server_id, - std::string_view file_name, std::uint64_t position) - : conn_{&conn}, - impl_{new MYSQL_RPL{.file_name_length = std::size(file_name), - .file_name = std::data(file_name), - .start_position = position, - .server_id = server_id, - // TODO: consider adding (or-ing) - // BINLOG_DUMP_NON_BLOCK and - // MYSQL_RPL_SKIP_HEARTBEAT to flags - .flags = 0U, - .gtid_set_encoded_size = 0U, - .fix_gtid_set = nullptr, - .gtid_set_arg = nullptr, - .size = 0U, - .buffer = nullptr - - }} { - assert(!conn.is_empty()); - - // WL#2540: Replication event checksums - // https://dev.mysql.com/worklog/task/?id=2540 - static constexpr std::string_view crc_query{ - "SET @source_binlog_checksum = 'NONE', " - "@master_binlog_checksum = 'NONE'"}; - conn_->execute_generic_query_noresult(crc_query); - - auto *casted_conn_impl = connection_deimpl::get(conn.impl_); - if (mysql_binlog_open(casted_conn_impl, binlog_deimpl::get(impl_)) != 0) { - raise_core_error_from_connection("cannot open binary log", conn); - } -} - -binlog::~binlog() { - if (conn_ != nullptr) { - assert(!conn_->is_empty()); - assert(!is_empty()); - mysql_binlog_close(connection_deimpl::get(conn_->impl_), - binlog_deimpl::get(impl_)); - } -} - -util::const_byte_span binlog::fetch() { - assert(conn_ != nullptr); - assert(!is_empty()); - auto *casted_conn_impl = connection_deimpl::get(conn_->impl_); - auto *casted_rpl_impl = binlog_deimpl::get(impl_); - if (mysql_binlog_fetch(casted_conn_impl, casted_rpl_impl) != 0) { - raise_core_error_from_connection("cannot fetch binlog event", *conn_); - } - return std::as_bytes( - std::span{casted_rpl_impl->buffer, casted_rpl_impl->size}); -} - -} // namespace easymysql diff --git a/src/easymysql/binlog.hpp b/src/easymysql/binlog.hpp deleted file mode 100644 index e9c95e8..0000000 --- a/src/easymysql/binlog.hpp +++ /dev/null @@ -1,65 +0,0 @@ -// Copyright (c) 2023-2024 Percona and/or its affiliates. -// -// This program is free software; you can redistribute it and/or modify -// it under the terms of the GNU General Public License, version 2.0, -// as published by the Free Software Foundation. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License, version 2.0, for more details. -// -// You should have received a copy of the GNU General Public License -// along with this program; if not, write to the Free Software -// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA - -#ifndef EASYMYSQL_BINLOG_HPP -#define EASYMYSQL_BINLOG_HPP - -#include "easymysql/binlog_fwd.hpp" // IWYU pragma: export - -#include -#include -#include -#include - -#include "easymysql/connection_fwd.hpp" - -#include "util/byte_span_fwd.hpp" - -namespace easymysql { - -class [[nodiscard]] binlog { - friend class connection; - -public: - binlog() = default; - - binlog(const binlog &) = delete; - binlog(binlog &&) = default; - binlog &operator=(const binlog &) = delete; - binlog &operator=(binlog &&) = default; - - ~binlog(); - - [[nodiscard]] bool is_empty() const noexcept { return !impl_; } - - // returns empty span on EOF - // throws an exception on error - util::const_byte_span fetch(); - -private: - binlog(connection &conn, std::uint32_t server_id, std::string_view file_name, - std::uint64_t position); - - connection *conn_{nullptr}; - struct rpl_deleter { - void operator()(void *ptr) const noexcept; - }; - using impl_ptr = std::unique_ptr; - impl_ptr impl_; -}; - -} // namespace easymysql - -#endif // EASYMYSQL_BINLOG_HPP diff --git a/src/easymysql/binlog_fwd.hpp b/src/easymysql/binlog_fwd.hpp deleted file mode 100644 index f08d849..0000000 --- a/src/easymysql/binlog_fwd.hpp +++ /dev/null @@ -1,27 +0,0 @@ -// Copyright (c) 2023-2024 Percona and/or its affiliates. -// -// This program is free software; you can redistribute it and/or modify -// it under the terms of the GNU General Public License, version 2.0, -// as published by the Free Software Foundation. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License, version 2.0, for more details. -// -// You should have received a copy of the GNU General Public License -// along with this program; if not, write to the Free Software -// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA - -#ifndef EASYMYSQL_BINLOG_FWD_HPP -#define EASYMYSQL_BINLOG_FWD_HPP - -#include - -namespace easymysql { - -class binlog; - -} // namespace easymysql - -#endif // EASYMYSQL_BINLOG_FWD_HPP diff --git a/src/easymysql/connection.cpp b/src/easymysql/connection.cpp index a061669..4a6ab7e 100644 --- a/src/easymysql/connection.cpp +++ b/src/easymysql/connection.cpp @@ -17,34 +17,119 @@ #include #include +#include +#include #include #include +#include +#include #include -#include "easymysql/binlog.hpp" #include "easymysql/connection_config.hpp" #include "easymysql/connection_deimpl_private.hpp" #include "easymysql/core_error_helpers_private.hpp" +#include "util/byte_span_fwd.hpp" #include "util/conversion_helpers.hpp" #include "util/exception_location_helpers.hpp" namespace easymysql { -void connection::connection_deleter::operator()(void *ptr) const noexcept { +void connection::mysql_deleter::operator()(void *ptr) const noexcept { if (ptr != nullptr) { mysql_close(static_cast(ptr)); } } +class connection::rpl_impl { +public: + rpl_impl(connection &conn, std::uint32_t server_id, + std::string_view file_name, std::uint64_t position, + connection_replication_mode_type blocking_mode) + : conn_{mysql_deimpl::get(conn.mysql_impl_)}, + rpl_{.file_name_length = std::size(file_name), + .file_name = std::data(file_name), + .start_position = position, + .server_id = server_id, + .flags = get_rpl_flags(blocking_mode), + .gtid_set_encoded_size = 0U, + .fix_gtid_set = nullptr, + .gtid_set_arg = nullptr, + .size = 0U, + .buffer = nullptr} { + if (mysql_binlog_open(conn_, &rpl_) != 0) { + raise_core_error_from_connection("cannot open binary log", conn); + } + } + + rpl_impl(const rpl_impl &) = delete; + rpl_impl &operator=(const rpl_impl &) = delete; + rpl_impl(rpl_impl &&) = delete; + rpl_impl &operator=(rpl_impl &&) = delete; + + ~rpl_impl() { mysql_binlog_close(conn_, &rpl_); } + + [[nodiscard]] bool fetch(util::const_byte_span &result) { + if (mysql_binlog_fetch(conn_, &rpl_) != 0) { + return false; + } + + result = std::as_bytes(std::span{rpl_.buffer, rpl_.size}); + return true; + } + +private: + MYSQL *conn_; + MYSQL_RPL rpl_; + + // MYSQL_RPL_SKIP_HEARTBEAT + // https://github.com/mysql/mysql-server/blob/mysql-cluster-8.0.37/include/mysql.h#L366 + + // USE_HEARTBEAT_EVENT_V2 + // https://github.com/mysql/mysql-server/blob/mysql-cluster-8.0.37/include/mysql.h#L372 + + // Explaining BINLOG_DUMP_NON_BLOCK + // https://github.com/mysql/mysql-server/blob/mysql-8.0.37/sql/rpl_constants.h#L45 + // https://github.com/mysql/mysql-server/blob/mysql-8.0.37/sql/rpl_binlog_sender.cc#L313 + + // For some reason BINLOG_DUMP_NON_BLOCK is a private constant, defining is + // locally + static constexpr unsigned int private_binlog_dump_non_block{1U}; + [[nodiscard]] static constexpr unsigned int + get_rpl_flags(connection_replication_mode_type blocking_mode) noexcept { + return MYSQL_RPL_SKIP_HEARTBEAT | + (blocking_mode == connection_replication_mode_type::non_blocking + ? private_binlog_dump_non_block + : 0U); + } +}; + +connection::connection() noexcept = default; + connection::connection(const connection_config &config) - : impl_{mysql_init(nullptr)} { - if (!impl_) { + : mysql_impl_{mysql_init(nullptr)}, rpl_impl_{} { + if (!mysql_impl_) { util::exception_location().raise( "cannot create MYSQL object"); } - auto *casted_impl = connection_deimpl::get(impl_); + auto *casted_impl = mysql_deimpl::get(mysql_impl_); + + const unsigned int connect_timeout{config.get<"connect_timeout">()}; + if (mysql_options(casted_impl, MYSQL_OPT_CONNECT_TIMEOUT, &connect_timeout) != + 0) { + raise_core_error_from_connection("cannot set MySQL connect timeout", *this); + } + const unsigned int read_timeout{config.get<"read_timeout">()}; + if (mysql_options(casted_impl, MYSQL_OPT_READ_TIMEOUT, &read_timeout) != 0) { + raise_core_error_from_connection("cannot set MySQL read timeout", *this); + } + const unsigned int write_timeout{config.get<"write_timeout">()}; + if (mysql_options(casted_impl, MYSQL_OPT_WRITE_TIMEOUT, &write_timeout) != + 0) { + raise_core_error_from_connection("cannot set MySQL write timeout", *this); + } + if (mysql_real_connect(casted_impl, /* host */ config.get<"host">().c_str(), /* user */ config.get<"user">().c_str(), @@ -58,46 +143,124 @@ connection::connection(const connection_config &config) } } +// default move constructor is OK as it will never do any +// 'mysql_impl_' / 'rpl_impl_' destruction +connection::connection(connection &&) noexcept = default; + +// default move assignment operator, e.g +// { +// this->mysql_impl_ = std::move(other.mysql_impl_); +// this->rpl_impl_ = std::move(other.rpl_impl_); +// } +// is not OK as the first statement will destroy the old 'MYSQL*' +// object in 'this->mysql_impl_', which will be needed during +// the destruction of the old 'MYSQL_RPL*' object from +// 'this->rpl_impl_' +connection &connection::operator=(connection &&other) noexcept { + connection local{std::move(other)}; + swap(local); + return *this; +} + +// default destructor is OK as 'mysql_impl_' and 'rpl_impl_' will be +// destrojed in the order reverse to how they were declared, e.g. +// 'rpl_impl_' first, 'mysql_impl_' second +connection::~connection() = default; + +void connection::swap(connection &other) noexcept { + mysql_impl_.swap(other.mysql_impl_); + rpl_impl_.swap(other.rpl_impl_); +} + std::uint32_t connection::get_server_version() const noexcept { assert(!is_empty()); return util::maybe_useless_integral_cast( - mysql_get_server_version(connection_deimpl::get_const_casted(impl_))); + mysql_get_server_version(mysql_deimpl::get_const_casted(mysql_impl_))); } std::string_view connection::get_readable_server_version() const noexcept { assert(!is_empty()); - return {mysql_get_server_info(connection_deimpl::get_const_casted(impl_))}; + return {mysql_get_server_info(mysql_deimpl::get_const_casted(mysql_impl_))}; } std::uint32_t connection::get_protocol_version() const noexcept { assert(!is_empty()); return util::maybe_useless_integral_cast( - mysql_get_proto_info(connection_deimpl::get_const_casted(impl_))); + mysql_get_proto_info(mysql_deimpl::get_const_casted(mysql_impl_))); } std::string_view connection::get_server_connection_info() const noexcept { assert(!is_empty()); - return {mysql_get_host_info(connection_deimpl::get_const_casted(impl_))}; + return {mysql_get_host_info(mysql_deimpl::get_const_casted(mysql_impl_))}; } std::string_view connection::get_character_set_name() const noexcept { assert(!is_empty()); - return {mysql_character_set_name(connection_deimpl::get_const_casted(impl_))}; -} - -binlog connection::create_binlog(std::uint32_t server_id, - std::string_view file_name, - std::uint64_t position) { - assert(!is_empty()); - return {*this, server_id, file_name, position}; + return { + mysql_character_set_name(mysql_deimpl::get_const_casted(mysql_impl_))}; } void connection::execute_generic_query_noresult(std::string_view query) { assert(!is_empty()); - auto *casted_impl = connection_deimpl::get(impl_); + if (is_in_replication_mode()) { + util::exception_location().raise( + "cannot execute query in replication mode"); + } + + auto *casted_impl = mysql_deimpl::get(mysql_impl_); if (mysql_real_query(casted_impl, std::data(query), std::size(query)) != 0) { raise_core_error_from_connection("cannot execute query", *this); } } +bool connection::ping() { + assert(!is_empty()); + if (is_in_replication_mode()) { + util::exception_location().raise( + "cannot perform ping in replication mode"); + } + + auto *casted_impl = mysql_deimpl::get(mysql_impl_); + return mysql_ping(casted_impl) == 0; +} + +void connection::switch_to_replication( + std::uint32_t server_id, std::string_view file_name, std::uint64_t position, + connection_replication_mode_type blocking_mode) { + assert(!is_empty()); + if (is_in_replication_mode()) { + util::exception_location().raise( + "connection has already been swithed to replication"); + } + + // WL#2540: Replication event checksums + // https://dev.mysql.com/worklog/task/?id=2540 + static constexpr std::string_view crc_query{ + "SET @source_binlog_checksum = 'NONE', " + "@master_binlog_checksum = 'NONE'"}; + execute_generic_query_noresult(crc_query); + + rpl_impl_ = std::make_unique(*this, server_id, file_name, position, + blocking_mode); +} + +bool connection::fetch_binlog_event(util::const_byte_span &portion) { + assert(!is_empty()); + if (!is_in_replication_mode()) { + util::exception_location().raise( + "connection has not been switched to replication"); + } + + const auto impl_fetch_result{rpl_impl_->fetch(portion)}; + + if (!impl_fetch_result) { + const auto native_error = mysql_errno(mysql_deimpl::get(mysql_impl_)); + if (native_error != CR_SERVER_LOST) { + raise_core_error_from_connection("cannot fetch binlog event", *this); + } + } + + return impl_fetch_result; +} + } // namespace easymysql diff --git a/src/easymysql/connection.hpp b/src/easymysql/connection.hpp index f20b494..b89383e 100644 --- a/src/easymysql/connection.hpp +++ b/src/easymysql/connection.hpp @@ -22,28 +22,30 @@ #include #include -#include "easymysql/binlog_fwd.hpp" #include "easymysql/connection_config_fwd.hpp" #include "easymysql/library_fwd.hpp" +#include "util/byte_span_fwd.hpp" + namespace easymysql { class [[nodiscard]] connection { friend class library; - friend class binlog; friend struct raise_access; public: - connection() = default; + connection() noexcept; connection(const connection &) = delete; - connection(connection &&) = default; + connection(connection &&other) noexcept; connection &operator=(const connection &) = delete; - connection &operator=(connection &&) = default; + connection &operator=(connection &&other) noexcept; + + ~connection(); - ~connection() = default; + void swap(connection &other) noexcept; - [[nodiscard]] bool is_empty() const noexcept { return !impl_; } + [[nodiscard]] bool is_empty() const noexcept { return !mysql_impl_; } [[nodiscard]] std::uint32_t get_server_version() const noexcept; [[nodiscard]] std::string_view get_readable_server_version() const noexcept; @@ -53,19 +55,35 @@ class [[nodiscard]] connection { [[nodiscard]] std::string_view get_server_connection_info() const noexcept; [[nodiscard]] std::string_view get_character_set_name() const noexcept; - [[nodiscard]] binlog create_binlog(std::uint32_t server_id, - std::string_view file_name, - std::uint64_t position); void execute_generic_query_noresult(std::string_view query); + [[nodiscard]] bool ping(); + + [[nodiscard]] bool is_in_replication_mode() const noexcept { + return static_cast(rpl_impl_); + } + + void switch_to_replication(std::uint32_t server_id, + std::string_view file_name, std::uint64_t position, + connection_replication_mode_type blocking_mode); + + // returns false on 'connection closed' / 'timeout' + // returns true and sets 'portion' to en empty span on EOF (last event read) + // returns true and sets 'portion' to event data on success + // throws an exception on any error other than 'connection closed' / 'timeout' + [[nodiscard]] bool fetch_binlog_event(util::const_byte_span &portion); private: explicit connection(const connection_config &config); - struct connection_deleter { + struct mysql_deleter { void operator()(void *ptr) const noexcept; }; - using impl_ptr = std::unique_ptr; - impl_ptr impl_; + using mysql_impl_ptr = std::unique_ptr; + mysql_impl_ptr mysql_impl_; + + class rpl_impl; + using rpl_impl_ptr = std::unique_ptr; + rpl_impl_ptr rpl_impl_; }; } // namespace easymysql diff --git a/src/easymysql/connection_config.hpp b/src/easymysql/connection_config.hpp index c395a29..3d62ce9 100644 --- a/src/easymysql/connection_config.hpp +++ b/src/easymysql/connection_config.hpp @@ -28,10 +28,13 @@ namespace easymysql { struct [[nodiscard]] connection_config : util::nv_tuple< // clang-format off - util::nv<"host" , std::string>, - util::nv<"port" , std::uint16_t>, - util::nv<"user" , std::string>, - util::nv<"password", std::string> + util::nv<"host" , std::string>, + util::nv<"port" , std::uint16_t>, + util::nv<"user" , std::string>, + util::nv<"password" , std::string>, + util::nv<"connect_timeout", std::uint32_t>, + util::nv<"read_timeout" , std::uint32_t>, + util::nv<"write_timeout" , std::uint32_t> // clang-format on > { [[nodiscard]] bool has_password() const noexcept { diff --git a/src/easymysql/connection_deimpl_private.hpp b/src/easymysql/connection_deimpl_private.hpp index 6d2e063..0e338c2 100644 --- a/src/easymysql/connection_deimpl_private.hpp +++ b/src/easymysql/connection_deimpl_private.hpp @@ -22,7 +22,8 @@ namespace easymysql { -using connection_deimpl = util::deimpl; +using mysql_deimpl = util::deimpl; +using rpl_deimpl = util::deimpl; } // namespace easymysql diff --git a/src/easymysql/connection_fwd.hpp b/src/easymysql/connection_fwd.hpp index bacf170..28da75e 100644 --- a/src/easymysql/connection_fwd.hpp +++ b/src/easymysql/connection_fwd.hpp @@ -18,6 +18,8 @@ namespace easymysql { +enum class connection_replication_mode_type { blocking, non_blocking }; + class connection; } // namespace easymysql diff --git a/src/easymysql/core_error_helpers_private.cpp b/src/easymysql/core_error_helpers_private.cpp index 18f5272..54fe7e4 100644 --- a/src/easymysql/core_error_helpers_private.cpp +++ b/src/easymysql/core_error_helpers_private.cpp @@ -30,8 +30,9 @@ namespace easymysql { struct raise_access { - static const connection::impl_ptr &get(const connection &conn) noexcept { - return conn.impl_; + static const connection::mysql_impl_ptr & + get(const connection &conn) noexcept { + return conn.mysql_impl_; } }; @@ -46,8 +47,7 @@ raise_core_error_from_connection(std::string_view user_message, // because otherwise the location will always point to this // particular line on this particular file regardless of the actual // location from where this function is called - auto *casted_impl = - connection_deimpl::get_const_casted(raise_access::get(conn)); + auto *casted_impl = mysql_deimpl::get_const_casted(raise_access::get(conn)); std::string message{}; if (!user_message.empty()) { message += user_message;
Post-Header for Rotate_event