Skip to content

Commit

Permalink
update thread-pool
Browse files Browse the repository at this point in the history
  • Loading branch information
cnbatch committed Apr 8, 2023
1 parent a38c2b4 commit 05cb5da
Show file tree
Hide file tree
Showing 9 changed files with 73 additions and 54 deletions.
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@

实际使用时请根据设备性能适当调整跳换端口的频率,以免对自己的网关设备造成较大的 NAT 压力从而影响网络性能。若条件允许,建议运行在软路由上。如果软路由本身就是网关的话,这样做就可以免除 NAT 负担。

#### 关联项目
如果想同时转发TCP流量,可以试试 [KCP Tube](https://github.com/cnbatch/kcptube)


## 用法
### 基本用法
`udphop config.conf`
Expand Down Expand Up @@ -293,10 +297,6 @@ sysctl -w net.inet6.ip6.v6only=0
因为 OpenBSD 彻底屏蔽了 IPv4 映射地址,所以在 OpenBSD 平台只能使用 IPv6 单栈模式。

## 关于代码
### 为什么要用两个 asio::io_context
这里用了两个 asio::io_context,其中一个是用于收发数据的异步循环,另一个用于处理内部逻辑。

之所以要这样做,完全是为了迁就 BSD 系统。如果只用一个 io_context 去做所有的事,由于两次接收之间的延迟过高,在 BSD 平台会导致 UDP 丢包率过高。
代码写得很随意,想到哪写到哪,因此版面混乱。

## 版面
代码写得很随意,想到哪写到哪,因此版面混乱。
至于阅读者的感受嘛…… 那肯定会不爽。
9 changes: 9 additions & 0 deletions src/3rd_party/thread_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,15 @@ namespace ttp
return tasks_total_of_threads[thread_number].load();
}

[[nodiscard]]
size_t get_task_count() const
{
size_t total = 0;
for (size_t i = 0; i < thread_count; ++i)
total += tasks_total_of_threads[i].load();
return total;
}

/**
* @brief Push a function with zero or more arguments, but no return value, into the task queue. Does not return a future, so the user must use wait_for_tasks() or some other method to ensure that the task finishes executing, otherwise bad things will happen.
*
Expand Down
38 changes: 15 additions & 23 deletions src/main.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include <algorithm>
#include <cmath>
#include <iostream>
#include <iterator>
#include <fstream>
Expand All @@ -10,22 +11,6 @@
#include "networks/client.hpp"
#include "networks/server.hpp"

size_t get_system_memory_size();
size_t get_system_memory_size()
{
#ifdef ASIO_HAS_UNISTD_H
long pages = sysconf(_SC_PHYS_PAGES);
long page_size = sysconf(_SC_PAGE_SIZE);
return pages * page_size / 2;
#endif
#ifdef ASIO_HAS_IOCP
MEMORYSTATUSEX status = {};
status.dwLength = sizeof(status);
GlobalMemoryStatusEx(&status);
return status.ullAvailPhys;
#endif
}

int main(int argc, char *argv[])
{
if (argc <= 1)
Expand All @@ -35,16 +20,23 @@ int main(int argc, char *argv[])
return 0;
}

size_t task_count_limit = get_system_memory_size() / BUFFER_SIZE / 4;
constexpr size_t task_count_limit = (size_t)std::numeric_limits<int16_t>::max() >> 3;
ttp::concurrency_t thread_counts = 1;
uint16_t thread_group_count = 1;
int io_thread_count = 1;
if (std::thread::hardware_concurrency() > 3)
thread_counts = std::thread::hardware_concurrency() / 2;
{
thread_counts = std::thread::hardware_concurrency();
thread_group_count = (uint16_t)std::log2(thread_counts);
io_thread_count = (int)std::log(thread_counts);
}

ttp::task_thread_pool task_pool{ thread_counts };
ttp::task_group_pool task_groups{ thread_counts };
ttp::task_group_pool task_groups_local{ thread_group_count };
ttp::task_group_pool task_groups_peer{ thread_group_count };

asio::io_context ioc{ (int)thread_counts };
asio::io_context network_io{ (int)thread_counts };
asio::io_context ioc{ io_thread_count };
asio::io_context network_io{ io_thread_count };

std::vector<client_mode> clients;
std::vector<server_mode> servers;
Expand Down Expand Up @@ -77,10 +69,10 @@ int main(int argc, char *argv[])
switch (settings.mode)
{
case running_mode::client:
clients.emplace_back(client_mode(ioc, network_io, task_pool, task_groups, task_count_limit, settings));
clients.emplace_back(client_mode(ioc, network_io, task_pool, task_groups_local, task_groups_peer, task_count_limit, settings));
break;
case running_mode::server:
servers.emplace_back(server_mode(ioc, network_io, task_pool, task_groups, task_count_limit, settings));
servers.emplace_back(server_mode(ioc, network_io, task_pool, task_groups_local, task_groups_peer, task_count_limit, settings));
break;
default:
break;
Expand Down
8 changes: 4 additions & 4 deletions src/networks/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ bool client_mode::start()
try
{
udp_callback_t udp_func_ap = std::bind(&client_mode::udp_server_incoming, this, _1, _2, _3, _4);
udp_access_point = std::make_unique<udp_server>(network_io, sequence_task_pool, task_limit, false, listen_on_ep, udp_func_ap);
udp_access_point = std::make_unique<udp_server>(network_io, sequence_task_pool_local, task_limit, false, listen_on_ep, udp_func_ap);

timer_find_timeout.expires_after(FINDER_TIMEOUT_INTERVAL);
timer_find_timeout.async_wait([this](const asio::error_code &e) { find_expires(e); });
Expand Down Expand Up @@ -105,7 +105,7 @@ void client_mode::udp_server_incoming(std::unique_ptr<uint8_t[]> data, size_t da

std::shared_ptr<data_wrapper<forwarder, udp::endpoint>> data_wrapper_ptr = std::make_shared<data_wrapper<forwarder, udp::endpoint>>(key_number);
auto udp_func = std::bind(&client_mode::udp_client_incoming_to_udp, this, _1, _2, _3, _4, _5);
std::shared_ptr<forwarder> udp_forwarder = std::make_shared<forwarder>(io_context, sequence_task_pool, task_limit, true, data_wrapper_ptr, udp_func);
std::shared_ptr<forwarder> udp_forwarder = std::make_shared<forwarder>(io_context, sequence_task_pool_peer, task_limit, true, data_wrapper_ptr, udp_func);
if (udp_forwarder == nullptr)
return;

Expand Down Expand Up @@ -236,7 +236,7 @@ void client_mode::udp_client_incoming_to_udp_with_thread_pool(std::weak_ptr<data

std::unique_ptr<uint8_t[]> unique_nullptr;
auto function_and_data = task_assigner.submit(task_function, std::move(unique_nullptr));
sequence_task_pool.push_task((size_t)this, std::move(function_and_data), std::move(data));
sequence_task_pool_local.push_task((size_t)this, std::move(function_and_data), std::move(data));
}

void client_mode::udp_client_incoming_to_udp_unpack(std::weak_ptr<data_wrapper<forwarder, udp::endpoint>> wrapper_weak_ptr, std::unique_ptr<uint8_t[]> data, size_t plain_size, udp::endpoint peer, asio::ip::port_type local_port_number)
Expand Down Expand Up @@ -431,7 +431,7 @@ void client_mode::loop_change_new_port()
asio::error_code ec;

auto udp_func = std::bind(&client_mode::udp_client_incoming_to_udp, this, _1, _2, _3, _4, _5);
auto udp_forwarder = std::make_shared<forwarder>(io_context, sequence_task_pool, task_limit, true, wrapper_ptr, udp_func);
auto udp_forwarder = std::make_shared<forwarder>(io_context, sequence_task_pool_peer, task_limit, true, wrapper_ptr, udp_func);
if (udp_forwarder == nullptr)
continue;

Expand Down
11 changes: 7 additions & 4 deletions src/networks/client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ class client_mode
asio::steady_timer timer_keep_alive;
//asio::strand<asio::io_context::executor_type> asio_strand;
ttp::task_thread_pool &task_assigner;
ttp::task_group_pool &sequence_task_pool;
ttp::task_group_pool &sequence_task_pool_local;
ttp::task_group_pool &sequence_task_pool_peer;
const size_t task_limit;

void udp_server_incoming(std::unique_ptr<uint8_t[]> data, size_t data_size, udp::endpoint peer, asio::ip::port_type port_number);
Expand All @@ -69,7 +70,7 @@ class client_mode
client_mode(const client_mode &) = delete;
client_mode& operator=(const client_mode &) = delete;

client_mode(asio::io_context &io_context_ref, asio::io_context &net_io, ttp::task_thread_pool &task_pool, ttp::task_group_pool &seq_task_pool, size_t task_count_limit, const user_settings &settings) :
client_mode(asio::io_context &io_context_ref, asio::io_context &net_io, ttp::task_thread_pool &task_pool, ttp::task_group_pool &seq_task_pool_local, ttp::task_group_pool &seq_task_pool_peer, size_t task_count_limit, const user_settings &settings) :
io_context(io_context_ref),
network_io(net_io),
timer_find_timeout(io_context),
Expand All @@ -78,7 +79,8 @@ class client_mode
timer_keep_alive(io_context),
//asio_strand(asio::make_strand(io_context.get_executor())),
task_assigner(task_pool),
sequence_task_pool(seq_task_pool),
sequence_task_pool_local(seq_task_pool_local),
sequence_task_pool_peer(seq_task_pool_peer),
task_limit(task_count_limit),
current_settings(settings) {}

Expand All @@ -91,7 +93,8 @@ class client_mode
timer_keep_alive(std::move(existing_client.timer_keep_alive)),
//asio_strand(std::move(existing_client.asio_strand)),
task_assigner(existing_client.task_assigner),
sequence_task_pool(existing_client.sequence_task_pool),
sequence_task_pool_local(existing_client.sequence_task_pool_local),
sequence_task_pool_peer(existing_client.sequence_task_pool_peer),
task_limit(existing_client.task_limit),
current_settings(std::move(existing_client.current_settings)) {}

Expand Down
30 changes: 21 additions & 9 deletions src/networks/connections.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,20 +98,26 @@ void udp_server::continue_receive()

void udp_server::async_send_out(std::unique_ptr<std::vector<uint8_t>> data, udp::endpoint client_endpoint)
{
if (data == nullptr)
return;
std::vector<uint8_t> &buffer = *data;
connection_socket.async_send_to(asio::buffer(buffer), client_endpoint,
[data_ = std::move(data)](const asio::error_code &error, size_t bytes_transferred) {});
}

void udp_server::async_send_out(std::unique_ptr<uint8_t[]> data, size_t data_size, udp::endpoint peer_endpoint)
{
if (data == nullptr)
return;
uint8_t *buffer_raw_ptr = data.get();
connection_socket.async_send_to(asio::buffer(buffer_raw_ptr, data_size), peer_endpoint,
[data_ = std::move(data)](const asio::error_code &error, size_t bytes_transferred) {});
}

void udp_server::async_send_out(std::unique_ptr<uint8_t[]> data, const uint8_t *data_ptr, size_t data_size, udp::endpoint client_endpoint)
{
if (data == nullptr)
return;
connection_socket.async_send_to(asio::buffer(data_ptr, data_size), client_endpoint,
[data_ = std::move(data)](const asio::error_code &error, size_t bytes_transferred) {});
}
Expand Down Expand Up @@ -153,6 +159,10 @@ void udp_server::handle_receive(std::unique_ptr<uint8_t[]> buffer_cache, const a

udp::endpoint copy_of_incoming_endpoint = incoming_endpoint;
start_receive();

if (buffer_cache == nullptr || bytes_transferred == 0)
return;

if (BUFFER_SIZE - bytes_transferred < BUFFER_EXPAND_SIZE)
{
std::unique_ptr<uint8_t[]> new_buffer = std::make_unique<uint8_t[]>(BUFFER_SIZE + BUFFER_EXPAND_SIZE);
Expand All @@ -163,9 +173,7 @@ void udp_server::handle_receive(std::unique_ptr<uint8_t[]> buffer_cache, const a
if (enable_thread_pool)
{
size_t pointer_to_number = (size_t)this;
if (auto each_thread_task_limit = task_limit / sequence_task_pool.get_thread_count();
task_limit > 0 && each_thread_task_limit > 0 &&
sequence_task_pool.get_task_count(pointer_to_number) > each_thread_task_limit)
if (task_limit > 0 && sequence_task_pool.get_task_count(pointer_to_number) > task_limit)
return;
sequence_task_pool.push_task(pointer_to_number, [this, bytes_transferred, copy_of_incoming_endpoint](std::unique_ptr<uint8_t[]> data) mutable
{ callback(std::move(data), bytes_transferred, copy_of_incoming_endpoint, port_number); },
Expand Down Expand Up @@ -252,7 +260,7 @@ size_t udp_client::send_out(const std::vector<uint8_t> &data, udp::endpoint peer

size_t udp_client::send_out(const uint8_t *data, size_t size, udp::endpoint peer_endpoint, asio::error_code &ec)
{
if (stopped.load())
if (stopped.load() || data == nullptr)
return 0;

size_t sent_size = connection_socket.send_to(asio::buffer(data, size), peer_endpoint, 0, ec);
Expand All @@ -262,7 +270,7 @@ size_t udp_client::send_out(const uint8_t *data, size_t size, udp::endpoint peer

void udp_client::async_send_out(std::unique_ptr<std::vector<uint8_t>> data, udp::endpoint peer_endpoint)
{
if (stopped.load())
if (stopped.load() || data == nullptr)
return;

std::vector<uint8_t> &buffer = *data;
Expand All @@ -273,7 +281,7 @@ void udp_client::async_send_out(std::unique_ptr<std::vector<uint8_t>> data, udp:

void udp_client::async_send_out(std::unique_ptr<uint8_t[]> data, size_t data_size, udp::endpoint peer_endpoint)
{
if (stopped.load())
if (stopped.load() || data == nullptr)
return;

uint8_t *buffer_raw_ptr = data.get();
Expand All @@ -284,6 +292,8 @@ void udp_client::async_send_out(std::unique_ptr<uint8_t[]> data, size_t data_siz

void udp_client::async_send_out(std::unique_ptr<uint8_t[]> data, const uint8_t *data_ptr, size_t data_size, udp::endpoint client_endpoint)
{
if (data == nullptr || data_ptr == nullptr)
return;
connection_socket.async_send_to(asio::buffer(data_ptr, data_size), client_endpoint,
[data_ = std::move(data)](const asio::error_code &error, size_t bytes_transferred) {});
last_send_time.store(right_now());
Expand Down Expand Up @@ -347,6 +357,10 @@ void udp_client::handle_receive(std::unique_ptr<uint8_t[]> buffer_cache, const a

udp::endpoint copy_of_incoming_endpoint = incoming_endpoint;
start_receive();

if (buffer_cache == nullptr || bytes_transferred == 0)
return;

if (BUFFER_SIZE - bytes_transferred < BUFFER_EXPAND_SIZE)
{
std::unique_ptr<uint8_t[]> new_buffer = std::make_unique<uint8_t[]>(BUFFER_SIZE + BUFFER_EXPAND_SIZE);
Expand All @@ -357,9 +371,7 @@ void udp_client::handle_receive(std::unique_ptr<uint8_t[]> buffer_cache, const a
if (enable_thread_pool)
{
size_t pointer_to_number = (size_t)this;
if (auto each_thread_task_limit = task_limit / sequence_task_pool.get_thread_count();
task_limit > 0 && each_thread_task_limit > 0 &&
sequence_task_pool.get_task_count(pointer_to_number) > each_thread_task_limit)
if (task_limit > 0 && sequence_task_pool.get_task_count(pointer_to_number) > task_limit)
return;
sequence_task_pool.push_task(pointer_to_number, [this, bytes_transferred, copy_of_incoming_endpoint](std::unique_ptr<uint8_t[]> data) mutable
{ callback(std::move(data), bytes_transferred, copy_of_incoming_endpoint, 0); },
Expand Down
2 changes: 1 addition & 1 deletion src/networks/connections.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ using asio::ip::tcp;
using asio::ip::udp;

constexpr uint8_t TIME_GAP = std::numeric_limits<uint8_t>::max(); //seconds
constexpr size_t BUFFER_SIZE = 4096u;
constexpr size_t BUFFER_SIZE = 2048u;
constexpr size_t BUFFER_EXPAND_SIZE = 128u;
constexpr size_t EMPTY_PACKET_SIZE = 1430u;
constexpr size_t RAW_HEADER_SIZE = 12u;
Expand Down
6 changes: 3 additions & 3 deletions src/networks/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ void server_mode::udp_server_incoming_with_thread_pool(std::unique_ptr<uint8_t[]

std::unique_ptr<uint8_t[]> unique_nullptr;
auto function_and_data = task_assigner.submit(task_function, std::move(unique_nullptr));
sequence_task_pool.push_task((size_t)this, std::move(function_and_data), std::move(data));
sequence_task_pool_peer.push_task((size_t)this, std::move(function_and_data), std::move(data));
}

void server_mode::udp_server_incoming_unpack(std::unique_ptr<uint8_t[]> data, size_t plain_size, udp::endpoint peer, asio::ip::port_type port_number)
Expand Down Expand Up @@ -199,7 +199,7 @@ bool server_mode::create_new_udp_connection(std::unique_ptr<uint8_t[]> data, con
{
udp_client_incoming(std::move(input_data), data_size, peer, port_number, wrapper);
};
std::unique_ptr<udp_client> target_connector = std::make_unique<udp_client>(io_context, sequence_task_pool, task_limit, false, udp_func_ap);
std::unique_ptr<udp_client> target_connector = std::make_unique<udp_client>(io_context, sequence_task_pool_local, task_limit, true, udp_func_ap);

asio::error_code ec;
target_connector->send_out(create_raw_random_data(EMPTY_PACKET_SIZE), local_empty_target, ec);
Expand Down Expand Up @@ -471,7 +471,7 @@ bool server_mode::start()
listen_on_ep.port(port_number);
try
{
udp_servers.insert({ port_number, std::make_unique<udp_server>(network_io, sequence_task_pool, task_limit, true, listen_on_ep, func) });
udp_servers.insert({ port_number, std::make_unique<udp_server>(network_io, sequence_task_pool_peer, task_limit, true, listen_on_ep, func) });
}
catch (std::exception &ex)
{
Expand Down
11 changes: 7 additions & 4 deletions src/networks/server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ class server_mode
asio::steady_timer timer_keep_alive;
//asio::strand<asio::io_context::executor_type> asio_strand;
ttp::task_thread_pool &task_assigner;
ttp::task_group_pool &sequence_task_pool;
ttp::task_group_pool &sequence_task_pool_local;
ttp::task_group_pool &sequence_task_pool_peer;
const size_t task_limit;

std::unique_ptr<udp::endpoint> udp_target;
Expand Down Expand Up @@ -70,7 +71,7 @@ class server_mode
server_mode(const server_mode &) = delete;
server_mode& operator=(const server_mode &) = delete;

server_mode(asio::io_context &io_context_ref, asio::io_context &net_io, ttp::task_thread_pool &task_pool, ttp::task_group_pool &seq_task_pool, size_t task_count_limit, const user_settings &settings)
server_mode(asio::io_context &io_context_ref, asio::io_context &net_io, ttp::task_thread_pool &task_pool, ttp::task_group_pool &seq_task_pool_local, ttp::task_group_pool &seq_task_pool_peer, size_t task_count_limit, const user_settings &settings)
: io_context(io_context_ref),
network_io(net_io),
timer_expiring_wrapper(io_context),
Expand All @@ -79,7 +80,8 @@ class server_mode
timer_keep_alive(io_context),
//asio_strand(asio::make_strand(io_context.get_executor())),
task_assigner(task_pool),
sequence_task_pool(seq_task_pool),
sequence_task_pool_local(seq_task_pool_local),
sequence_task_pool_peer(seq_task_pool_peer),
task_limit(task_count_limit),
external_ipv4_port(0),
external_ipv4_address(0),
Expand All @@ -97,7 +99,8 @@ class server_mode
timer_keep_alive(std::move(existing_server.timer_keep_alive)),
//asio_strand(std::move(existing_server.asio_strand)),
task_assigner(existing_server.task_assigner),
sequence_task_pool(existing_server.sequence_task_pool),
sequence_task_pool_local(existing_server.sequence_task_pool_local),
sequence_task_pool_peer(existing_server.sequence_task_pool_peer),
task_limit(existing_server.task_limit),
external_ipv4_port(existing_server.external_ipv4_port.load()),
external_ipv4_address(existing_server.external_ipv4_address.load()),
Expand Down

0 comments on commit 05cb5da

Please sign in to comment.