From a1b1d031125e8bc551edae83d933345cae1af3e1 Mon Sep 17 00:00:00 2001 From: yarkin Date: Tue, 11 Jul 2023 16:57:14 +0800 Subject: [PATCH 1/2] Refactor to allow reconnection to SHiP endpoints after connection lost. --- src/ship_receiver_plugin.cpp | 258 +++++++++++++++++++++++++---------- 1 file changed, 188 insertions(+), 70 deletions(-) diff --git a/src/ship_receiver_plugin.cpp b/src/ship_receiver_plugin.cpp index 8c2f260..39108fc 100644 --- a/src/ship_receiver_plugin.cpp +++ b/src/ship_receiver_plugin.cpp @@ -30,75 +30,77 @@ class ship_receiver_plugin_impl : std::enable_shared_from_this start_block_id, int64_t start_block_timestamp) { + std::optional start_block_id, int64_t start_block_timestamp, + uint32_t input_max_retry, uint32_t input_delay_second) { SILK_DEBUG << "ship_receiver_plugin_impl INIT"; host = std::move(h); port = std::move(p); core_account = ca; start_from_block_id = start_block_id; start_from_block_timestamp = start_block_timestamp; + last_lib = 0; + last_block_num = 0; + delay_second = input_delay_second; + max_retry = input_max_retry; + retry_count = 0; resolver = std::make_shared(appbase::app().get_io_context()); - stream = std::make_shared>(appbase::app().get_io_context()); - stream->binary(true); - stream->read_message_max(0x1ull << 36); - connect_stream(); + // Defer connection to plugin_start() } - void initial_read() { - flat_buffer buff; - boost::system::error_code ec; - stream->read(buff, ec); - if (ec) { - SILK_CRIT << "SHiP initial read failed : " << ec.message(); - sys::error(); - } - abi = load_abi(eosio::json_token_stream{(char*)buff.data().data()}); + void shutdown() { } - void send_request(const eosio::ship_protocol::request& req) { + auto send_request(const eosio::ship_protocol::request& req) { auto bin = eosio::convert_to_bin(req); boost::system::error_code ec; stream->write(asio::buffer(bin), ec); if (ec) { - SILK_CRIT << "Sending request failed : " << ec.message(); - sys::error(); + SILK_ERROR << "Sending request failed : " << ec.message(); } + return ec; } - void connect_stream() { + auto connect_stream(auto& resolve_it) { boost::system::error_code ec; - auto res = resolver->resolve( tcp::v4(), host, port, ec); - if (ec) { - SILK_CRIT << "Resolver failed : " << ec.message(); - sys::error(); - } - - boost::asio::connect(stream->next_layer(), res, ec); + boost::asio::connect(stream->next_layer(), resolve_it, ec); if (ec) { - SILK_CRIT << "SHiP connection failed : " << ec.message(); - sys::error(); + SILK_ERROR << "SHiP connection failed : " << ec.message(); + return ec; } stream->handshake(host, "/", ec); if (ec) { - SILK_CRIT << "SHiP failed handshake : " << ec.message(); - sys::error(); + SILK_ERROR << "SHiP failed handshake : " << ec.message(); + return ec; } SILK_INFO << "Connected to SHiP at " << host << ":" << port; + return ec; } - inline auto read() const { - auto buff = std::make_shared(); + auto initial_read() { + flat_buffer buff; boost::system::error_code ec; - stream->read(*buff, ec); + stream->read(buff, ec); if (ec) { - SILK_CRIT << "SHiP read failed : " << ec.message(); - sys::error(); + SILK_ERROR << "SHiP initial read failed : " << ec.message(); } - return buff; + auto end = buff.prepare(1); + ((char *)end.data())[0] = '\0'; + buff.commit(1); + abi = load_abi(eosio::json_token_stream{(char *)buff.data().data()}); + return ec; + } + + inline auto read(flat_buffer& buff) const { + boost::system::error_code ec; + stream->read(buff, ec); + if (ec) { + SILK_ERROR << "SHiP read failed : " << ec.message(); + } + return ec; } template @@ -108,16 +110,14 @@ class ship_receiver_plugin_impl : std::enable_shared_from_thisasync_read(*buff, appbase::app().executor().get_priority_queue().wrap(80, 0, [buff, func](const auto ec, auto) { if (ec) { - SILK_CRIT << "SHiP read failed : " << ec.message(); - sys::error(); - } else { - func(buff); + SILK_ERROR << "SHiP read failed : " << ec.message(); } + func(ec, buff); }) ); } - void send_get_blocks_request(uint32_t start) { + auto send_get_blocks_request(uint32_t start) { eosio::ship_protocol::request req = eosio::ship_protocol::get_blocks_request_v0{ .start_block_num = start, .end_block_num = std::numeric_limits::max(), @@ -128,17 +128,17 @@ class ship_receiver_plugin_impl : std::enable_shared_from_this @@ -220,30 +220,102 @@ class ship_receiver_plugin_impl : std::enable_shared_from_this(get_result(read()));; + auto get_status(auto& r){ + auto ec = send_get_status_request(); + if (ec) { + return ec; + } + auto buff = std::make_shared(); + ec = read(*buff); + if (ec) { + return ec; + } + r = std::get(get_result(buff)); + return ec; } - void shutdown() { + void reset_connection() { + // De facto entry point. + if (stream) { + // Try close connection gracefully but ignore return value. + boost::system::error_code ec; + stream->close(websocket::close_reason(websocket::close_code::normal),ec); + + // Determine if we should re-connect. + if (++retry_count > max_retry) { + // No more retry; + sys::error("Max retry reached. No more reconnections."); + return; + } + + // Delay in the case of reconnection. + std::this_thread::sleep_for(std::chrono::seconds(delay_second)); + SILK_INFO << "Trying to reconnect "<< retry_count << "/" << max_retry; + } + stream = std::make_shared>(appbase::app().get_io_context()); + stream->binary(true); + stream->read_message_max(0x1ull << 36); + + // CAUTION: we have to use async call here to avoid recursive reset_connection() calls. + resolver->async_resolve( tcp::v4(), host, port, [this](const auto ec, auto res) { + if (ec) { + SILK_ERROR << "Resolver failed : " << ec.message(); + reset_connection(); + return; + } + + // It should be fine to call connection and initial read synchronously as though they are + // blocking calls, it's only one thread and we have nothing more important to run anyway. + auto ec2 = connect_stream(res); + if (ec2) { + reset_connection(); + return; + } + + ec2 = initial_read(); + if (ec2) { + reset_connection(); + return; + } + + // Will call reset connection if necessary internally. + sync(); + }); } void start_read() { static size_t num_messages = 0; - async_read([this](auto buff) { + async_read([this](const auto ec, auto buff) { + if (ec) { + SILK_INFO << "Trying to recover from SHiP read failure."; + // Reconnect and restart sync. + num_messages = 0; + reset_connection(); + return; + } auto block = to_native(std::get(get_result(buff))); if(!block) { sys::error("Unable to generate native block"); + // No reset! return; } + last_lib = block->lib; + last_block_num = block->block_num; + // reset retry_count upon successful read. + retry_count = 0; + native_blocks_channel.publish(80, std::make_shared(std::move(*block))); if(++num_messages % 1024 == 0) { //SILK_INFO << "Block #" << block->block_num; - send_get_blocks_ack_request(num_messages); + auto ec = send_get_blocks_ack_request(num_messages); + if (ec) { + num_messages = 0; + reset_connection(); + return; + } } start_read(); @@ -251,36 +323,63 @@ class ship_receiver_plugin_impl : std::enable_shared_from_this().get_head_canonical_header(); - if (!head_header) { - sys::error("Unable to read canonical header"); + eosio::ship_protocol::get_status_result_v0 res = {}; + auto ec = get_status(res); + if (ec) { + reset_connection(); return; } - SILK_INFO << "get_head_canonical_header: " + + uint32_t start_from = 0; + + if (last_lib > 0) { + // None zero last_lib means we are in the process of reconnection. + // If last pushed block number is higher than LIB, we have the risk of fork and need to start from LIB. + // Otherwise it means we are catching up blocks and can safely continue from next block. + start_from = (last_lib > last_block_num ? last_block_num : last_lib) + 1; + SILK_INFO << "Recover from disconnection, " << "last LIB is: " << last_lib + << ", last block num is: " << last_block_num + << ", continue from: " << start_from; + } + else { + auto head_header = appbase::app().get_plugin().get_head_canonical_header(); + if (!head_header) { + sys::error("Unable to read canonical header"); + // No reset! + return; + } + + // Only take care of canonical header and input options when it's initial sync. + SILK_INFO << "Get_head_canonical_header: " << "#" << head_header->number << ", hash:" << silkworm::to_hex(head_header->hash()) << ", mixHash:" << silkworm::to_hex(head_header->prev_randao); - auto start_from = utils::to_block_num(head_header->prev_randao.bytes) + 1; - SILK_INFO << "Canonical header start from block: " << start_from; - - if( start_from_block_id ) { - uint32_t block_num = utils::to_block_num(*start_from_block_id); - SILK_INFO << "Using specified start block number:" << block_num; - start_from = block_num; + start_from = utils::to_block_num(head_header->prev_randao.bytes) + 1; + SILK_INFO << "Canonical header start from block: " << start_from; + + if( start_from_block_id ) { + uint32_t block_num = utils::to_block_num(*start_from_block_id); + SILK_INFO << "Using specified start block number:" << block_num; + start_from = block_num; + } } - + if( res.trace_begin_block > start_from ) { SILK_ERROR << "Block #" << start_from << " not available in SHiP"; sys::error("Start block not available in SHiP"); + // No reset! return; } SILK_INFO << "Starting from block #" << start_from; - send_get_blocks_request(start_from); + ec = send_get_blocks_request(start_from); + if (ec) { + reset_connection(); + return; + } start_read(); } @@ -305,6 +404,11 @@ class ship_receiver_plugin_impl : std::enable_shared_from_this start_from_block_id; int64_t start_from_block_timestamp{}; + uint32_t last_lib; + uint32_t last_block_num; + uint32_t delay_second; + uint32_t max_retry; + uint32_t retry_count; }; ship_receiver_plugin::ship_receiver_plugin() : my(new ship_receiver_plugin_impl) {} @@ -320,6 +424,10 @@ void ship_receiver_plugin::set_program_options( appbase::options_description& cl "Override Antelope block id to start syncing from" ) ("ship-start-from-block-timestamp", boost::program_options::value(), "Timestamp for the provided ship-start-from-block-id, required if block-id provided" ) + ("ship-max-retry", boost::program_options::value(), + "Max retry times before give up when trying to reconnect to SHiP endpoints" ) + ("ship-delay-second", boost::program_options::value(), + "Deply in seconds between each retry when trying to reconnect to SHiP endpoints" ) ; } @@ -329,6 +437,8 @@ void ship_receiver_plugin::plugin_initialize( const appbase::variables_map& opti auto core = options.at("ship-core-account").as(); std::optional start_block_id; int64_t start_block_timestamp = 0; + uint32_t delay_second = 10; + uint32_t max_retry = 0; if (options.contains("ship-start-from-block-id")) { if (!options.contains("ship-start-from-block-timestamp")) { throw std::runtime_error("ship-start-from-block-timestamp required if ship-start-from-block-id provided"); @@ -340,14 +450,22 @@ void ship_receiver_plugin::plugin_initialize( const appbase::variables_map& opti throw std::runtime_error("ship-start-from-block-timestamp only valid if ship-start-from-block-id provided"); } - my->init(endpoint.substr(0, i), endpoint.substr(i+1), eosio::name(core), start_block_id, start_block_timestamp); + if (options.contains("ship-max-retry")) { + max_retry = options.at("ship-max-retry").as(); + } + + if (options.contains("ship-delay-second")) { + delay_second = options.at("ship-delay-second").as(); + } + + my->init(endpoint.substr(0, i), endpoint.substr(i+1), eosio::name(core), start_block_id, start_block_timestamp, max_retry, delay_second); SILK_INFO << "Initialized SHiP Receiver Plugin"; } void ship_receiver_plugin::plugin_startup() { SILK_INFO << "Started SHiP Receiver"; - my->initial_read(); - my->sync(); + my->reset_connection(); + } void ship_receiver_plugin::plugin_shutdown() { From af79548691671eb2c0d3f50b799116f72501263d Mon Sep 17 00:00:00 2001 From: yarkin Date: Fri, 28 Jul 2023 08:42:50 +0800 Subject: [PATCH 2/2] Skip block import logic if duplicated blocks received that might happen during reconnection to SHiP. --- src/block_conversion_plugin.cpp | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/block_conversion_plugin.cpp b/src/block_conversion_plugin.cpp index 9fb69e4..6b6c27a 100644 --- a/src/block_conversion_plugin.cpp +++ b/src/block_conversion_plugin.cpp @@ -170,6 +170,14 @@ class block_conversion_plugin_impl : std::enable_shared_from_thisid; }); + if( dup_block != native_blocks.end() ) { + SILK_WARN << "Receiving duplicated blocks " << new_block->id << " It's normal if it's caused by reconnection to SHiP."; + return; + } + // Find fork block auto fork_block = std::find_if(native_blocks.begin(), native_blocks.end(), [&new_block](const auto& nb){ return nb.id == new_block->prev; }); if( fork_block == native_blocks.end() ) {