Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Try reconnecting to SHiP again when read fails. #614

Closed
wants to merge 2 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 49 additions & 12 deletions cmd/ship_receiver_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ class ship_receiver_plugin_impl : std::enable_shared_from_this<ship_receiver_plu
SILK_CRIT << "SHiP initial read failed : " << ec.message();
sys::error();
}
abi = load_abi(eosio::json_token_stream{(char*)buff.data().data()});
auto end = buff.prepare(1);
((char *)end.data())[0] = '\0';
buff.commit(1);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems rather odd that it would not always contain the terminating character.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I am not sure whether we should fix it like this or not.

I don't think boost websocket scream will write a zero there. So maybe we should fix json_token_stream.

But I am not sure if it's a good idea to touch cdt for this issue..

abi = load_abi(eosio::json_token_stream{(char *)buff.data().data()});
}

void send_request(const eosio::ship_protocol::request& req) {
Expand Down Expand Up @@ -108,11 +111,9 @@ class ship_receiver_plugin_impl : std::enable_shared_from_this<ship_receiver_plu
stream->async_read(*buff, appbase::app().get_priority_queue().wrap(80,
[buff, func](const auto ec, auto) {
if (ec) {
SILK_CRIT << "SHiP read failed : " << ec.message();
sys::error();
} else {
func(buff);
}
SILK_ERROR << "SHiP read failed : " << ec.message();
}
func(ec, buff);
})
);
}
Expand Down Expand Up @@ -232,7 +233,31 @@ class ship_receiver_plugin_impl : std::enable_shared_from_this<ship_receiver_plu

void start_read() {
static size_t num_messages = 0;
async_read([this](auto buff) {
async_read([this](const auto ec, auto buff) {
if (ec) {
// Failed in read, retry connecting!
// Note that we only try to reconnect when error happened during read.
// Any other error will still result in exit.
SILK_INFO << "Trying to recover from SHiP read failure.";
// Wait for a while before doing anything in case we hit some network jam.
std::this_thread::sleep_for(std::chrono::seconds(3));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3 seconds seems like a rather long time.


// Try close connection gracefully but ignore return value.
boost::system::error_code ec2;
stream->close(websocket::close_reason(websocket::close_code::normal),ec2);

// Reconnect and restart sync.
SILK_INFO << "Trying to reconnect.";
num_messages = 0;
stream = std::make_shared<websocket::stream<tcp::socket>>(appbase::app().get_io_service());
stream->binary(true);
stream->read_message_max(0x1ull << 36);
connect_stream();
initial_read();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would refactor this and the code from init() into a common method.

SILK_INFO << "Trying to sync again.";
sync(true);
return;
}
auto block = to_native(std::get<eosio::ship_protocol::get_blocks_result_v0>(get_result(buff)));
if(!block) {
sys::error("Unable to generate native block");
Expand All @@ -250,7 +275,7 @@ class ship_receiver_plugin_impl : std::enable_shared_from_this<ship_receiver_plu
});
}

void sync() {
void sync(bool restart = false) {
// get available blocks range we can grab
auto res = get_status();

Expand All @@ -267,11 +292,23 @@ class ship_receiver_plugin_impl : std::enable_shared_from_this<ship_receiver_plu
auto start_from = utils::to_block_num(head_header->mix_hash.bytes) + 1;
SILK_INFO << "Canonical header start from block: " << start_from;

if( start_from_block_id ) {
uint32_t block_num = utils::to_block_num(*start_from_block_id);
SILK_INFO << "Using specified start block number:" << block_num;
start_from = block_num;
if (restart) {
// Always start from early blocks to handle potential forks.
// Clearly this approach will introduce overhaad, but since this piece of code
// will only execute during recovery, it should be fine.
if (head_header->number > 250) {
start_from -= 500;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is trivial to keep track of LIB which would be better than just guessing at a block number. See block.last_irreversible.block_num.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That means we need a set of functions to query single block etc.

And there's a corner case:
FORK ------ A (origin HEAD)
| -LIB--------A' (new block at same height)

In the extreme case that LIB already pass the fork block when we try to restart, LIB is not enough to recover.

}
}
else {
// Only take care of input option when it's initial sync.
if( start_from_block_id ) {
uint32_t block_num = utils::to_block_num(*start_from_block_id);
SILK_INFO << "Using specified start block number:" << block_num;
start_from = block_num;
}
}


if( res.trace_begin_block > start_from ) {
SILK_ERROR << "Block #" << start_from << " not available in SHiP";
Expand Down