Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

close stale rta connections #200

Open
wants to merge 11 commits into
base: rta-alpha4
Choose a base branch
from
107 changes: 79 additions & 28 deletions contrib/epee/include/net/levin_protocol_handler_async.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,12 @@ class async_protocol_handler_config
// struct invoke_remote_command2_state_machine<class t_arg : public async_state_machine



void (*m_pcommands_handler_destroy)(levin_commands_handler<t_connection_context>*);

void delete_connections (size_t count, bool incoming);
void delete_connections (const std::vector<boost::uuids::uuid> &connections);

public:
typedef t_connection_context connection_context;
levin_commands_handler<t_connection_context>* m_pcommands_handler;
Expand All @@ -106,6 +112,7 @@ class async_protocol_handler_config
async_protocol_handler_config():m_pcommands_handler(NULL), m_max_packet_size(LEVIN_DEFAULT_MAX_PACKET_SIZE)
{}
void del_out_connections(size_t count);
void del_in_connections(size_t count);
};


Expand Down Expand Up @@ -386,10 +393,14 @@ class async_protocol_handler
is_continue = false;
if(cb >= MIN_BYTES_WANTED && !m_invoke_response_handlers.empty())
{
//async call scenario
boost::shared_ptr<invoke_response_handler_base> response_handler = m_invoke_response_handlers.front();
response_handler->reset_timer();
MDEBUG(m_connection_context << "LEVIN_PACKET partial msg received. len=" << cb);
CRITICAL_REGION_LOCAL(m_invoke_response_handlers_lock);
if (!m_invoke_response_handlers.empty())
{
//async call scenario
boost::shared_ptr<invoke_response_handler_base> response_handler = m_invoke_response_handlers.front();
response_handler->reset_timer();
MDEBUG(m_connection_context << "LEVIN_PACKET partial msg received. len=" << cb);
}
}
break;
}
Expand Down Expand Up @@ -733,32 +744,72 @@ void async_protocol_handler_config<t_connection_context>::del_connection(async_p
}
//------------------------------------------------------------------------------------------
template<class t_connection_context>
void async_protocol_handler_config<t_connection_context>::delete_connections(size_t count, bool incoming)
{
std::vector <boost::uuids::uuid> connections;
CRITICAL_REGION_BEGIN(m_connects_lock);
for (auto& c: m_connects)
{
if (c.second->m_connection_context.m_is_income == incoming)
connections.push_back(c.first);
}

// close random connections from the provided set
// TODO or better just keep removing random elements (performance)
unsigned seed = std::chrono::system_clock::now().time_since_epoch().count();
shuffle(connections.begin(), connections.end(), std::default_random_engine(seed));
while (count > 0 && connections.size() > 0)
{
try
{
auto i = connections.end() - 1;
async_protocol_handler<t_connection_context> *conn = m_connects.at(*i);
close(*i);
// TODO: check if connection, connection_context and async_protocol_handler deleted
del_connection(conn);
connections.erase(i);
}
catch (const std::out_of_range &e)
{
MWARNING("Connection not found in m_connects, continuing");
}
--count;
}

CRITICAL_REGION_END();
}


template<class t_connection_context>
void async_protocol_handler_config<t_connection_context>::delete_connections(const std::vector<boost::uuids::uuid> &connections)
{
CRITICAL_REGION_BEGIN(m_connects_lock);
for (const auto &conn_id: connections) {
try {
async_protocol_handler<t_connection_context> *conn = m_connects.at(conn_id);
close(conn_id);
// TODO: check if connection, connection_context and async_protocol_handler deleted
del_connection(conn);
}
catch (const std::out_of_range &e)
{
MWARNING("Connection not found in m_connects, continuing");
}
}
CRITICAL_REGION_END();
}
//--
//------------------------------------------------------------------------------------------
template<class t_connection_context>
void async_protocol_handler_config<t_connection_context>::del_out_connections(size_t count)
{
std::vector <boost::uuids::uuid> out_connections;
CRITICAL_REGION_BEGIN(m_connects_lock);
for (auto& c: m_connects)
{
if (!c.second->m_connection_context.m_is_income)
out_connections.push_back(c.first);
}

if (out_connections.size() == 0)
return;

// close random out connections
// TODO or better just keep removing random elements (performance)
unsigned seed = std::chrono::system_clock::now().time_since_epoch().count();
shuffle(out_connections.begin(), out_connections.end(), std::default_random_engine(seed));
while (count > 0 && out_connections.size() > 0)
{
close(*out_connections.begin());
del_connection(m_connects.at(*out_connections.begin()));
out_connections.erase(out_connections.begin());
--count;
}

CRITICAL_REGION_END();
delete_connections(count, false);
}
//------------------------------------------------------------------------------------------
template<class t_connection_context>
void async_protocol_handler_config<t_connection_context>::del_in_connections(size_t count)
{
delete_connections(count, true);
}
//------------------------------------------------------------------------------------------
template<class t_connection_context>
Expand Down
2 changes: 1 addition & 1 deletion contrib/epee/include/storages/http_abstract_invoke.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ namespace epee
}
if(resp_t.error.code || resp_t.error.message.size())
{
LOG_ERROR("RPC call of \"" << method_name << "\" returned error: " << resp_t.error.code << ", message: " << resp_t.error.message);
LOG_ERROR("RPC call of \"" << req_t.method << "\" returned error: " << resp_t.error.code << ", message: " << resp_t.error.message);
return false;
}
result_struct = resp_t.result;
Expand Down
16 changes: 8 additions & 8 deletions src/common/rpc_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,10 @@ namespace tools
fail_msg_writer() << "Couldn't connect to daemon: " << m_http_client.get_host() << ":" << m_http_client.get_port();
return false;
}
ok = ok && epee::net_utils::invoke_http_json_rpc("/json_rpc", method_name, req, res, m_http_client, t_http_connection::TIMEOUT());
ok = epee::net_utils::invoke_http_json_rpc("/json_rpc", method_name, req, res, m_http_client, t_http_connection::TIMEOUT());
if (!ok)
{
fail_msg_writer() << "Daemon request failed";
fail_msg_writer() << "basic_json_rpc_request: Daemon request failed";
return false;
}
else
Expand All @@ -95,15 +95,15 @@ namespace tools
t_http_connection connection(&m_http_client);

bool ok = connection.is_open();
ok = ok && epee::net_utils::invoke_http_json_rpc("/json_rpc", method_name, req, res, m_http_client, t_http_connection::TIMEOUT());
if (!ok)
{
fail_msg_writer() << "Couldn't connect to daemon: " << m_http_client.get_host() << ":" << m_http_client.get_port();
return false;
}
else if (res.status != CORE_RPC_STATUS_OK) // TODO - handle CORE_RPC_STATUS_BUSY ?
ok = epee::net_utils::invoke_http_json_rpc("/json_rpc", method_name, req, res, m_http_client, t_http_connection::TIMEOUT());
if (!ok || res.status != CORE_RPC_STATUS_OK) // TODO - handle CORE_RPC_STATUS_BUSY ?
{
fail_msg_writer() << fail_msg << " -- " << res.status;
fail_msg_writer() << fail_msg << " -- json_rpc_request: " << res.status;
return false;
}
else
Expand All @@ -123,15 +123,15 @@ namespace tools
t_http_connection connection(&m_http_client);

bool ok = connection.is_open();
ok = ok && epee::net_utils::invoke_http_json(relative_url, req, res, m_http_client, t_http_connection::TIMEOUT());
if (!ok)
{
fail_msg_writer() << "Couldn't connect to daemon: " << m_http_client.get_host() << ":" << m_http_client.get_port();
return false;
}
else if (res.status != CORE_RPC_STATUS_OK) // TODO - handle CORE_RPC_STATUS_BUSY ?
ok = epee::net_utils::invoke_http_json(relative_url, req, res, m_http_client, t_http_connection::TIMEOUT());
if (!ok || res.status != CORE_RPC_STATUS_OK) // TODO - handle CORE_RPC_STATUS_BUSY ?
{
fail_msg_writer() << fail_msg << " -- " << res.status;
fail_msg_writer() << fail_msg << "-- rpc_request: " << res.status;
return false;
}
else
Expand Down
17 changes: 17 additions & 0 deletions src/daemon/command_parser_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,23 @@ bool t_command_parser_executor::out_peers(const std::vector<std::string>& args)
return m_executor.out_peers(limit);
}

bool t_command_parser_executor::in_peers(const std::vector<std::string>& args)
{
if (args.empty()) return false;

unsigned int limit;
try {
limit = std::stoi(args[0]);
}

catch(const std::exception& ex) {
_erro("stoi exception");
return false;
}

return m_executor.in_peers(limit);
}

bool t_command_parser_executor::start_save_graph(const std::vector<std::string>& args)
{
if (!args.empty()) return false;
Expand Down
4 changes: 3 additions & 1 deletion src/daemon/command_parser_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,9 @@ class t_command_parser_executor final
bool set_limit_down(const std::vector<std::string>& args);

bool out_peers(const std::vector<std::string>& args);


bool in_peers(const std::vector<std::string>& args);

bool start_save_graph(const std::vector<std::string>& args);

bool stop_save_graph(const std::vector<std::string>& args);
Expand Down
5 changes: 5 additions & 0 deletions src/daemon/command_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,11 @@ t_command_server::t_command_server(
, std::bind(&t_command_parser_executor::out_peers, &m_parser, p::_1)
, "Set max number of out peers"
);
m_command_lookup.set_handler(
"in_peers"
, std::bind(&t_command_parser_executor::in_peers, &m_parser, p::_1)
, "in_peers <max_number>"
);
m_command_lookup.set_handler(
"start_save_graph"
, std::bind(&t_command_parser_executor::start_save_graph, &m_parser, p::_1)
Expand Down
34 changes: 33 additions & 1 deletion src/daemon/rpc_command_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1187,7 +1187,7 @@ bool t_rpc_command_executor::out_peers(uint64_t limit)

if (m_is_rpc)
{
if (!m_rpc_client->json_rpc_request(req, res, "out_peers", fail_message.c_str()))
if (!m_rpc_client->rpc_request(req, res, "/out_peers", fail_message.c_str()))
{
return true;
}
Expand All @@ -1206,6 +1206,38 @@ bool t_rpc_command_executor::out_peers(uint64_t limit)
return true;
}

bool t_rpc_command_executor::in_peers(uint64_t limit)
{
cryptonote::COMMAND_RPC_IN_PEERS::request req;
cryptonote::COMMAND_RPC_IN_PEERS::response res;

epee::json_rpc::error error_resp;

req.in_peers = limit;

std::string fail_message = "Unsuccessful";

if (m_is_rpc)
{
if (!m_rpc_client->rpc_request(req, res, "/in_peers", fail_message.c_str()))
{
return true;
}
}
else
{
if (!m_rpc_server->on_in_peers(req, res) || res.status != CORE_RPC_STATUS_OK)
{
tools::fail_msg_writer() << make_error(fail_message, res.status);
return true;
}
}

std::cout << "Max number of in peers set to " << limit << std::endl;

return true;
}

bool t_rpc_command_executor::start_save_graph()
{
cryptonote::COMMAND_RPC_START_SAVE_GRAPH::request req;
Expand Down
4 changes: 3 additions & 1 deletion src/daemon/rpc_command_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,9 @@ class t_rpc_command_executor final {
bool set_limit_down(int limit);

bool out_peers(uint64_t limit);


bool in_peers(uint64_t limit);

bool start_save_graph();

bool stop_save_graph();
Expand Down
69 changes: 69 additions & 0 deletions src/p2p/net_node.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright (c) 2014-2018, The Monero Project
//
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without modification, are
// permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of
// conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
// of conditions and the following disclaimer in the documentation and/or other
// materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be
// used to endorse or promote products derived from this software without specific
// prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//
// Parts of this file are originally copyright (c) 2012-2013 The Cryptonote developers

#include "common/command_line.h"
#include "net_node.h"

namespace nodetool
{
const command_line::arg_descriptor<std::string> arg_p2p_bind_ip = {"p2p-bind-ip", "Interface for p2p network protocol", "0.0.0.0"};
const command_line::arg_descriptor<std::string> arg_p2p_bind_port = {
"p2p-bind-port"
, "Port for p2p network protocol"
, std::to_string(config::P2P_DEFAULT_PORT)
};
const command_line::arg_descriptor<std::string> arg_testnet_p2p_bind_port = {
"testnet-p2p-bind-port"
, "Port for testnet p2p network protocol"
, std::to_string(config::testnet::P2P_DEFAULT_PORT)
};
const command_line::arg_descriptor<uint32_t> arg_p2p_external_port = {"p2p-external-port", "External port for p2p network protocol (if port forwarding used with NAT)", 0};
const command_line::arg_descriptor<bool> arg_p2p_allow_local_ip = {"allow-local-ip", "Allow local ip add to peer list, mostly in debug purposes"};
const command_line::arg_descriptor<std::vector<std::string> > arg_p2p_add_peer = {"add-peer", "Manually add peer to local peerlist"};
const command_line::arg_descriptor<std::vector<std::string> > arg_p2p_add_priority_node = {"add-priority-node", "Specify list of peers to connect to and attempt to keep the connection open"};
const command_line::arg_descriptor<std::vector<std::string> > arg_p2p_add_exclusive_node = {"add-exclusive-node", "Specify list of peers to connect to only."
" If this option is given the options add-priority-node and seed-node are ignored"};
const command_line::arg_descriptor<std::vector<std::string> > arg_p2p_seed_node = {"seed-node", "Connect to a node to retrieve peer addresses, and disconnect"};
const command_line::arg_descriptor<bool> arg_p2p_hide_my_port = {"hide-my-port", "Do not announce yourself as peerlist candidate", false, true};

const command_line::arg_descriptor<bool> arg_no_igd = {"no-igd", "Disable UPnP port mapping"};
const command_line::arg_descriptor<bool> arg_offline = {"offline", "Do not listen for peers, nor connect to any"};
const command_line::arg_descriptor<int64_t> arg_out_peers = {"out-peers", "set max number of out peers", 200};
const command_line::arg_descriptor<int64_t> arg_in_peers = {"in-peers", "set max number of in peers", 100};
const command_line::arg_descriptor<int> arg_tos_flag = {"tos-flag", "set TOS flag", -1};

const command_line::arg_descriptor<int64_t> arg_limit_rate_up = {"limit-rate-up", "set limit-rate-up [kB/s]", -1};
const command_line::arg_descriptor<int64_t> arg_limit_rate_down = {"limit-rate-down", "set limit-rate-down [kB/s]", -1};
const command_line::arg_descriptor<int64_t> arg_limit_rate = {"limit-rate", "set limit-rate [kB/s]", -1};

const command_line::arg_descriptor<bool> arg_save_graph = {"save-graph", "Save data for dr monero", false};
const command_line::arg_descriptor<Uuid> arg_p2p_net_id = {"net-id", "The way to replace hardcoded NETWORK_ID. Effective only with --testnet, ex.: 'net-id = 54686520-4172-7420-6f77-205761722037'", Uuid()};

}
Loading