Skip to content

Commit

Permalink
Version 20230415
Browse files Browse the repository at this point in the history
Change STUN outputs

Change the use of thread-pool
  • Loading branch information
cnbatch committed Apr 15, 2023
1 parent 05cb5da commit 9326f13
Show file tree
Hide file tree
Showing 9 changed files with 58 additions and 145 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ encryption_algorithm=AES-GCM
| log_path | 存放 Log 的目录 ||不能指向文件本身|

### Log 文件
在首次获取打洞后的 IP 地址与端口后,以及打洞的 IP 地址与端口发生变化后,会向 Log 目录创建 ip_address.txt 文件(若存在就追加),将 IP 地址与端口写进去。
在首次获取打洞后的 IP 地址与端口后,以及打洞的 IP 地址与端口发生变化后,会向 Log 目录创建 ip_address.txt 文件(若存在就覆盖),将 IP 地址与端口写进去。

获取到的打洞地址会同时显示在控制台当中。

Expand Down
18 changes: 9 additions & 9 deletions src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,23 @@ int main(int argc, char *argv[])
{
if (argc <= 1)
{
printf("Usage: %s config1.conf\n", argv[0]);
printf(" %s config1.conf config2.conf...\n", argv[0]);
char app_name[] = "udphop";
printf("%s version 20230415\n", app_name);
printf("Usage: %s config1.conf\n", app_name);
printf(" %s config1.conf config2.conf...\n", app_name);
return 0;
}

constexpr size_t task_count_limit = (size_t)std::numeric_limits<int16_t>::max() >> 3;
ttp::concurrency_t thread_counts = 1;
uint16_t thread_group_count = 1;
int io_thread_count = 1;
if (std::thread::hardware_concurrency() > 3)
{
thread_counts = std::thread::hardware_concurrency();
thread_group_count = (uint16_t)std::log2(thread_counts);
io_thread_count = (int)std::log(thread_counts);
auto thread_counts = std::thread::hardware_concurrency();
thread_group_count = (uint16_t)(thread_counts / 2);
io_thread_count = (int)std::log2(thread_counts);
}

ttp::task_thread_pool task_pool{ thread_counts };
ttp::task_group_pool task_groups_local{ thread_group_count };
ttp::task_group_pool task_groups_peer{ thread_group_count };

Expand Down Expand Up @@ -69,10 +69,10 @@ int main(int argc, char *argv[])
switch (settings.mode)
{
case running_mode::client:
clients.emplace_back(client_mode(ioc, network_io, task_pool, task_groups_local, task_groups_peer, task_count_limit, settings));
clients.emplace_back(client_mode(ioc, network_io, task_groups_local, task_groups_peer, task_count_limit, settings));
break;
case running_mode::server:
servers.emplace_back(server_mode(ioc, network_io, task_pool, task_groups_local, task_groups_peer, task_count_limit, settings));
servers.emplace_back(server_mode(ioc, network_io, task_groups_local, task_groups_peer, task_count_limit, settings));
break;
default:
break;
Expand Down
46 changes: 3 additions & 43 deletions src/networks/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ bool client_mode::start()
try
{
udp_callback_t udp_func_ap = std::bind(&client_mode::udp_server_incoming, this, _1, _2, _3, _4);
udp_access_point = std::make_unique<udp_server>(network_io, sequence_task_pool_local, task_limit, false, listen_on_ep, udp_func_ap);
udp_access_point = std::make_unique<udp_server>(network_io, sequence_task_pool_local, task_limit, listen_on_ep, udp_func_ap);

timer_find_timeout.expires_after(FINDER_TIMEOUT_INTERVAL);
timer_find_timeout.async_wait([this](const asio::error_code &e) { find_expires(e); });
Expand Down Expand Up @@ -105,7 +105,7 @@ void client_mode::udp_server_incoming(std::unique_ptr<uint8_t[]> data, size_t da

std::shared_ptr<data_wrapper<forwarder, udp::endpoint>> data_wrapper_ptr = std::make_shared<data_wrapper<forwarder, udp::endpoint>>(key_number);
auto udp_func = std::bind(&client_mode::udp_client_incoming_to_udp, this, _1, _2, _3, _4, _5);
std::shared_ptr<forwarder> udp_forwarder = std::make_shared<forwarder>(io_context, sequence_task_pool_peer, task_limit, true, data_wrapper_ptr, udp_func);
std::shared_ptr<forwarder> udp_forwarder = std::make_shared<forwarder>(io_context, sequence_task_pool_peer, task_limit, data_wrapper_ptr, udp_func);
if (udp_forwarder == nullptr)
return;

Expand Down Expand Up @@ -211,34 +211,6 @@ void client_mode::udp_client_incoming_to_udp(std::weak_ptr<data_wrapper<forwarde
udp_client_incoming_to_udp_unpack(wrapper_weak_ptr, std::move(data), plain_size, peer, local_port_number);
}

void client_mode::udp_client_incoming_to_udp_with_thread_pool(std::weak_ptr<data_wrapper<forwarder, udp::endpoint>> wrapper_weak_ptr, std::unique_ptr<uint8_t[]> data, size_t data_size, udp::endpoint peer, asio::ip::port_type local_port_number)
{
std::shared_ptr<data_wrapper<forwarder, udp::endpoint>> wrapper = wrapper_weak_ptr.lock();
if (data_size == 0 || wrapper == nullptr)
return;

if (data_size < RAW_HEADER_SIZE)
return;

uint8_t *data_ptr = data.get();

std::function<ttp::task_callback(std::unique_ptr<uint8_t[]>)> task_function =
[this, wrapper_weak_ptr, data_size, peer, local_port_number, data_ptr](std::unique_ptr<uint8_t[]> null_data) -> ttp::task_callback
{
auto [error_message, plain_size] = decrypt_data(current_settings.encryption_password, current_settings.encryption, data_ptr, (int)data_size);
if (!error_message.empty() || plain_size == 0)
return [](std::unique_ptr<uint8_t[]> null_data) {};

auto return_function = [this, wrapper_weak_ptr, plain_size = plain_size, peer, local_port_number](std::unique_ptr<uint8_t[]> data)
{ udp_client_incoming_to_udp_unpack(wrapper_weak_ptr, std::move(data), plain_size, peer, local_port_number); };
return return_function;
};

std::unique_ptr<uint8_t[]> unique_nullptr;
auto function_and_data = task_assigner.submit(task_function, std::move(unique_nullptr));
sequence_task_pool_local.push_task((size_t)this, std::move(function_and_data), std::move(data));
}

void client_mode::udp_client_incoming_to_udp_unpack(std::weak_ptr<data_wrapper<forwarder, udp::endpoint>> wrapper_weak_ptr, std::unique_ptr<uint8_t[]> data, size_t plain_size, udp::endpoint peer, asio::ip::port_type local_port_number)
{
std::shared_ptr<data_wrapper<forwarder, udp::endpoint>> wrapper = wrapper_weak_ptr.lock();
Expand Down Expand Up @@ -269,12 +241,6 @@ void client_mode::udp_client_incoming_to_udp_unpack(std::weak_ptr<data_wrapper<f
if (calculate_difference(timestamp, packet_timestamp) > TIME_GAP)
return;

//std::shared_lock lock_wrapper_session_map_to_udp{ mutex_wrapper_session_map_to_udp };
//auto session_iter = wrapper_session_map_to_udp.find(iden);
//if (session_iter == wrapper_session_map_to_udp.end())
// return;
//udp::endpoint& udp_endpoint = session_iter->second;
//lock_wrapper_session_map_to_udp.unlock();
const udp::endpoint &udp_endpoint = wrapper->cached_data;
udp_access_point->async_send_out(std::move(data), received_data_ptr, received_size, udp_endpoint);
}
Expand Down Expand Up @@ -361,15 +327,11 @@ void client_mode::cleanup_expiring_data_connections()
if (calculate_difference(time_right_now, expire_time) < CLEANUP_WAITS)
continue;

//std::scoped_lock lockers{ mutex_udp_session_map_to_wrapper, mutex_wrapper_session_map_to_udp,
// mutex_expiring_forwarders, mutex_wrapper_changeport_timestamp,
// mutex_wrapper_channels, mutex_id_map_to_forwarder };
std::unique_lock locker_id_map_to_forwarder{ mutex_id_map_to_forwarder };
if (auto forwarder_iter = id_map_to_forwarder.find(iden);
forwarder_iter != id_map_to_forwarder.end())
{
std::shared_ptr<forwarder> forwarder_ptr = forwarder_iter->second;
//forwarder *forwarder_ptr = forwarder_ptr_owner.get();
forwarder_ptr->remove_callback();
forwarder_ptr->stop();
std::unique_lock locker_expiring_forwarders{ mutex_expiring_forwarders };
Expand Down Expand Up @@ -407,12 +369,10 @@ void client_mode::loop_timeout_sessions()
if (udp_forwarder->time_gap_of_receive() > current_settings.timeout &&
udp_forwarder->time_gap_of_send() > current_settings.timeout)
{
//std::scoped_lock locker_expiring_wrapper{ mutex_expiring_wrapper };
if (expiring_wrapper.find(iden) == expiring_wrapper.end())
expiring_wrapper.insert({ iden, std::pair{ data_ptr, right_now() } });

wrapper_channels.erase(iter);
//std::scoped_lock locker_wrapper_changeport_timestamp{ mutex_wrapper_changeport_timestamp };
wrapper_changeport_timestamp.erase(data_ptr);
}
}
Expand All @@ -431,7 +391,7 @@ void client_mode::loop_change_new_port()
asio::error_code ec;

auto udp_func = std::bind(&client_mode::udp_client_incoming_to_udp, this, _1, _2, _3, _4, _5);
auto udp_forwarder = std::make_shared<forwarder>(io_context, sequence_task_pool_peer, task_limit, true, wrapper_ptr, udp_func);
auto udp_forwarder = std::make_shared<forwarder>(io_context, sequence_task_pool_peer, task_limit, wrapper_ptr, udp_func);
if (udp_forwarder == nullptr)
continue;

Expand Down
9 changes: 1 addition & 8 deletions src/networks/client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,12 @@ class client_mode
asio::steady_timer timer_expiring_wrapper;
asio::steady_timer timer_change_ports;
asio::steady_timer timer_keep_alive;
//asio::strand<asio::io_context::executor_type> asio_strand;
ttp::task_thread_pool &task_assigner;
ttp::task_group_pool &sequence_task_pool_local;
ttp::task_group_pool &sequence_task_pool_peer;
const size_t task_limit;

void udp_server_incoming(std::unique_ptr<uint8_t[]> data, size_t data_size, udp::endpoint peer, asio::ip::port_type port_number);
void udp_client_incoming_to_udp(std::weak_ptr<data_wrapper<forwarder, udp::endpoint>>, std::unique_ptr<uint8_t[]> data, size_t data_size, udp::endpoint peer, asio::ip::port_type local_port_number);
void udp_client_incoming_to_udp_with_thread_pool(std::weak_ptr<data_wrapper<forwarder, udp::endpoint>> wrapper_weak_ptr, std::unique_ptr<uint8_t[]> data, size_t data_size, udp::endpoint peer, asio::ip::port_type local_port_number);
void udp_client_incoming_to_udp_unpack(std::weak_ptr<data_wrapper<forwarder, udp::endpoint>> wrapper_weak_ptr, std::unique_ptr<uint8_t[]> data, size_t data_size, udp::endpoint peer, asio::ip::port_type local_port_number);
udp::endpoint get_remote_address();

Expand All @@ -70,15 +67,13 @@ class client_mode
client_mode(const client_mode &) = delete;
client_mode& operator=(const client_mode &) = delete;

client_mode(asio::io_context &io_context_ref, asio::io_context &net_io, ttp::task_thread_pool &task_pool, ttp::task_group_pool &seq_task_pool_local, ttp::task_group_pool &seq_task_pool_peer, size_t task_count_limit, const user_settings &settings) :
client_mode(asio::io_context &io_context_ref, asio::io_context &net_io, ttp::task_group_pool &seq_task_pool_local, ttp::task_group_pool &seq_task_pool_peer, size_t task_count_limit, const user_settings &settings) :
io_context(io_context_ref),
network_io(net_io),
timer_find_timeout(io_context),
timer_expiring_wrapper(io_context),
timer_change_ports(io_context),
timer_keep_alive(io_context),
//asio_strand(asio::make_strand(io_context.get_executor())),
task_assigner(task_pool),
sequence_task_pool_local(seq_task_pool_local),
sequence_task_pool_peer(seq_task_pool_peer),
task_limit(task_count_limit),
Expand All @@ -91,8 +86,6 @@ class client_mode
timer_expiring_wrapper(std::move(existing_client.timer_expiring_wrapper)),
timer_change_ports(std::move(existing_client.timer_change_ports)),
timer_keep_alive(std::move(existing_client.timer_keep_alive)),
//asio_strand(std::move(existing_client.asio_strand)),
task_assigner(existing_client.task_assigner),
sequence_task_pool_local(existing_client.sequence_task_pool_local),
sequence_task_pool_peer(existing_client.sequence_task_pool_peer),
task_limit(existing_client.task_limit),
Expand Down
12 changes: 6 additions & 6 deletions src/networks/connections.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,12 @@ void udp_server::handle_receive(std::unique_ptr<uint8_t[]> buffer_cache, const a
buffer_cache.swap(new_buffer);
}

if (enable_thread_pool)
if (sequence_task_pool != nullptr)
{
size_t pointer_to_number = (size_t)this;
if (task_limit > 0 && sequence_task_pool.get_task_count(pointer_to_number) > task_limit)
if (task_limit > 0 && sequence_task_pool->get_task_count(pointer_to_number) > task_limit)
return;
sequence_task_pool.push_task(pointer_to_number, [this, bytes_transferred, copy_of_incoming_endpoint](std::unique_ptr<uint8_t[]> data) mutable
sequence_task_pool->push_task(pointer_to_number, [this, bytes_transferred, copy_of_incoming_endpoint](std::unique_ptr<uint8_t[]> data) mutable
{ callback(std::move(data), bytes_transferred, copy_of_incoming_endpoint, port_number); },
std::move(buffer_cache));
}
Expand Down Expand Up @@ -368,12 +368,12 @@ void udp_client::handle_receive(std::unique_ptr<uint8_t[]> buffer_cache, const a
buffer_cache.swap(new_buffer);
}

if (enable_thread_pool)
if (sequence_task_pool != nullptr)
{
size_t pointer_to_number = (size_t)this;
if (task_limit > 0 && sequence_task_pool.get_task_count(pointer_to_number) > task_limit)
if (task_limit > 0 && sequence_task_pool->get_task_count(pointer_to_number) > task_limit)
return;
sequence_task_pool.push_task(pointer_to_number, [this, bytes_transferred, copy_of_incoming_endpoint](std::unique_ptr<uint8_t[]> data) mutable
sequence_task_pool->push_task(pointer_to_number, [this, bytes_transferred, copy_of_incoming_endpoint](std::unique_ptr<uint8_t[]> data) mutable
{ callback(std::move(data), bytes_transferred, copy_of_incoming_endpoint, 0); },
std::move(buffer_cache));
}
Expand Down
40 changes: 22 additions & 18 deletions src/networks/connections.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,14 @@ class udp_server
{
public:
udp_server() = delete;
udp_server(asio::io_context &net_io, ttp::task_group_pool &task_pool, size_t task_count_limit, bool use_thread_pool, const udp::endpoint &ep, udp_callback_t callback_func)
: port_number(ep.port()), sequence_task_pool(task_pool), resolver(net_io), connection_socket(net_io), callback(callback_func), task_limit(task_count_limit), enable_thread_pool(use_thread_pool)
udp_server(asio::io_context &net_io, const udp::endpoint &ep, udp_callback_t callback_func)
: port_number(ep.port()), sequence_task_pool(nullptr), resolver(net_io), connection_socket(net_io), callback(callback_func), task_limit(0)
{
initialise(ep);
start_receive();
}
udp_server(asio::io_context &net_io, ttp::task_group_pool &task_pool, size_t task_count_limit, const udp::endpoint &ep, udp_callback_t callback_func)
: port_number(ep.port()), sequence_task_pool(&task_pool), resolver(net_io), connection_socket(net_io), callback(callback_func), task_limit(task_count_limit)
{
initialise(ep);
start_receive();
Expand All @@ -73,24 +79,27 @@ class udp_server
asio::ip::port_type get_port_number();

asio::ip::port_type port_number;
//asio::strand<asio::io_context::executor_type> &task_assigner;
ttp::task_group_pool &sequence_task_pool;
ttp::task_group_pool *sequence_task_pool;
udp::resolver resolver;
udp::socket connection_socket;
udp::endpoint incoming_endpoint;
udp_callback_t callback;
const size_t task_limit;
const bool enable_thread_pool;
};

class udp_client
{
public:
udp_client() = delete;
udp_client(asio::io_context &io_context, ttp::task_group_pool &task_pool, size_t task_count_limit, bool use_thread_pool, udp_callback_t callback_func)
: sequence_task_pool(task_pool), connection_socket(io_context), resolver(io_context), callback(callback_func), task_limit(task_count_limit), enable_thread_pool(use_thread_pool),
last_receive_time(right_now()), last_send_time(right_now()),
paused(false), stopped(false)
udp_client(asio::io_context &io_context, udp_callback_t callback_func)
: sequence_task_pool(nullptr), connection_socket(io_context), resolver(io_context), callback(callback_func),
task_limit(0), last_receive_time(right_now()), last_send_time(right_now()), paused(false), stopped(false)
{
initialise();
}
udp_client(asio::io_context &io_context, ttp::task_group_pool &task_pool, size_t task_count_limit, udp_callback_t callback_func)
: sequence_task_pool(&task_pool), connection_socket(io_context), resolver(io_context), callback(callback_func),
task_limit(task_count_limit), last_receive_time(right_now()), last_send_time(right_now()), paused(false), stopped(false)
{
initialise();
}
Expand Down Expand Up @@ -125,8 +134,7 @@ class udp_client

void handle_receive(std::unique_ptr<uint8_t[]> buffer_cache, const asio::error_code &error, std::size_t bytes_transferred);

//asio::strand<asio::io_context::executor_type> &task_assigner;
ttp::task_group_pool &sequence_task_pool;
ttp::task_group_pool *sequence_task_pool;
udp::socket connection_socket;
udp::resolver resolver;
udp::endpoint incoming_endpoint;
Expand All @@ -136,7 +144,6 @@ class udp_client
std::atomic<bool> paused;
std::atomic<bool> stopped;
const size_t task_limit;
const bool enable_thread_pool;
};

template<typename F, typename C>
Expand Down Expand Up @@ -269,9 +276,9 @@ class forwarder : public udp_client
public:
using process_data_t = std::function<void(std::weak_ptr<data_wrapper<forwarder, udp::endpoint>>, std::unique_ptr<uint8_t[]>, size_t, udp::endpoint, asio::ip::port_type)>;
forwarder() = delete;
forwarder(asio::io_context &io_context, ttp::task_group_pool &task_pool, size_t task_count_limit, bool use_thread_pool, std::weak_ptr<data_wrapper<forwarder, udp::endpoint>> input_wrapper, process_data_t callback_func) :
udp_client(io_context, task_pool, task_count_limit, use_thread_pool, std::bind(&forwarder::handle_receive, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4)),
wrapper(input_wrapper), callback(callback_func)/*, cached_local_address(back_to_local_address)*/ {}
forwarder(asio::io_context &io_context, ttp::task_group_pool &task_pool, size_t task_count_limit, std::weak_ptr<data_wrapper<forwarder, udp::endpoint>> input_wrapper, process_data_t callback_func) :
udp_client(io_context, task_pool, task_count_limit, std::bind(&forwarder::handle_receive, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4)),
wrapper(input_wrapper), callback(callback_func) {}

void replace_callback(process_data_t callback_func)
{
Expand All @@ -291,14 +298,11 @@ class forwarder : public udp_client

if (wrapper.expired())
return;
//asio::post(task_assigner, [this, data_ = std::move(data), data_size, peer, local_port_number]() mutable
// { callback(wrapper, std::move(data_), data_size, peer, local_port_number); });
callback(wrapper, std::move(data), data_size, peer, local_port_number);
}

std::weak_ptr<data_wrapper<forwarder, udp::endpoint>> wrapper;
process_data_t callback;
//asio::strand<asio::io_context::executor_type> &task_assigner;
};

std::unique_ptr<rfc3489::stun_header> send_stun_3489_request(udp_server &sender, const std::string &stun_host);
Expand Down
Loading

0 comments on commit 9326f13

Please sign in to comment.