From 05cb5daf9de62c6c9d3819790c66ae456afede41 Mon Sep 17 00:00:00 2001 From: cnbatch Date: Sun, 9 Apr 2023 05:08:55 +0800 Subject: [PATCH] update thread-pool --- README.md | 12 +++++------ src/3rd_party/thread_pool.hpp | 9 +++++++++ src/main.cpp | 38 ++++++++++++++--------------------- src/networks/client.cpp | 8 ++++---- src/networks/client.hpp | 11 ++++++---- src/networks/connections.cpp | 30 ++++++++++++++++++--------- src/networks/connections.hpp | 2 +- src/networks/server.cpp | 6 +++--- src/networks/server.hpp | 11 ++++++---- 9 files changed, 73 insertions(+), 54 deletions(-) diff --git a/README.md b/README.md index a447e1e..42829a1 100644 --- a/README.md +++ b/README.md @@ -16,6 +16,10 @@ 实际使用时请根据设备性能适当调整跳换端口的频率,以免对自己的网关设备造成较大的 NAT 压力从而影响网络性能。若条件允许,建议运行在软路由上。如果软路由本身就是网关的话,这样做就可以免除 NAT 负担。 +#### 关联项目 +如果想同时转发TCP流量,可以试试 [KCP Tube](https://github.com/cnbatch/kcptube) + + ## 用法 ### 基本用法 `udphop config.conf` @@ -293,10 +297,6 @@ sysctl -w net.inet6.ip6.v6only=0 因为 OpenBSD 彻底屏蔽了 IPv4 映射地址,所以在 OpenBSD 平台只能使用 IPv6 单栈模式。 ## 关于代码 -### 为什么要用两个 asio::io_context -这里用了两个 asio::io_context,其中一个是用于收发数据的异步循环,另一个用于处理内部逻辑。 - -之所以要这样做,完全是为了迁就 BSD 系统。如果只用一个 io_context 去做所有的事,由于两次接收之间的延迟过高,在 BSD 平台会导致 UDP 丢包率过高。 +代码写得很随意,想到哪写到哪,因此版面混乱。 -## 版面 -代码写得很随意,想到哪写到哪,因此版面混乱。 \ No newline at end of file +至于阅读者的感受嘛…… 那肯定会不爽。 \ No newline at end of file diff --git a/src/3rd_party/thread_pool.hpp b/src/3rd_party/thread_pool.hpp index e2391ec..a77cf00 100644 --- a/src/3rd_party/thread_pool.hpp +++ b/src/3rd_party/thread_pool.hpp @@ -382,6 +382,15 @@ namespace ttp return tasks_total_of_threads[thread_number].load(); } + [[nodiscard]] + size_t get_task_count() const + { + size_t total = 0; + for (size_t i = 0; i < thread_count; ++i) + total += tasks_total_of_threads[i].load(); + return total; + } + /** * @brief Push a function with zero or more arguments, but no return value, into the task queue. Does not return a future, so the user must use wait_for_tasks() or some other method to ensure that the task finishes executing, otherwise bad things will happen. * diff --git a/src/main.cpp b/src/main.cpp index 4b028a3..aefcb48 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -1,4 +1,5 @@ #include +#include #include #include #include @@ -10,22 +11,6 @@ #include "networks/client.hpp" #include "networks/server.hpp" -size_t get_system_memory_size(); -size_t get_system_memory_size() -{ -#ifdef ASIO_HAS_UNISTD_H - long pages = sysconf(_SC_PHYS_PAGES); - long page_size = sysconf(_SC_PAGE_SIZE); - return pages * page_size / 2; -#endif -#ifdef ASIO_HAS_IOCP - MEMORYSTATUSEX status = {}; - status.dwLength = sizeof(status); - GlobalMemoryStatusEx(&status); - return status.ullAvailPhys; -#endif -} - int main(int argc, char *argv[]) { if (argc <= 1) @@ -35,16 +20,23 @@ int main(int argc, char *argv[]) return 0; } - size_t task_count_limit = get_system_memory_size() / BUFFER_SIZE / 4; + constexpr size_t task_count_limit = (size_t)std::numeric_limits::max() >> 3; ttp::concurrency_t thread_counts = 1; + uint16_t thread_group_count = 1; + int io_thread_count = 1; if (std::thread::hardware_concurrency() > 3) - thread_counts = std::thread::hardware_concurrency() / 2; + { + thread_counts = std::thread::hardware_concurrency(); + thread_group_count = (uint16_t)std::log2(thread_counts); + io_thread_count = (int)std::log(thread_counts); + } ttp::task_thread_pool task_pool{ thread_counts }; - ttp::task_group_pool task_groups{ thread_counts }; + ttp::task_group_pool task_groups_local{ thread_group_count }; + ttp::task_group_pool task_groups_peer{ thread_group_count }; - asio::io_context ioc{ (int)thread_counts }; - asio::io_context network_io{ (int)thread_counts }; + asio::io_context ioc{ io_thread_count }; + asio::io_context network_io{ io_thread_count }; std::vector clients; std::vector servers; @@ -77,10 +69,10 @@ int main(int argc, char *argv[]) switch (settings.mode) { case running_mode::client: - clients.emplace_back(client_mode(ioc, network_io, task_pool, task_groups, task_count_limit, settings)); + clients.emplace_back(client_mode(ioc, network_io, task_pool, task_groups_local, task_groups_peer, task_count_limit, settings)); break; case running_mode::server: - servers.emplace_back(server_mode(ioc, network_io, task_pool, task_groups, task_count_limit, settings)); + servers.emplace_back(server_mode(ioc, network_io, task_pool, task_groups_local, task_groups_peer, task_count_limit, settings)); break; default: break; diff --git a/src/networks/client.cpp b/src/networks/client.cpp index d316932..f0f66e9 100644 --- a/src/networks/client.cpp +++ b/src/networks/client.cpp @@ -48,7 +48,7 @@ bool client_mode::start() try { udp_callback_t udp_func_ap = std::bind(&client_mode::udp_server_incoming, this, _1, _2, _3, _4); - udp_access_point = std::make_unique(network_io, sequence_task_pool, task_limit, false, listen_on_ep, udp_func_ap); + udp_access_point = std::make_unique(network_io, sequence_task_pool_local, task_limit, false, listen_on_ep, udp_func_ap); timer_find_timeout.expires_after(FINDER_TIMEOUT_INTERVAL); timer_find_timeout.async_wait([this](const asio::error_code &e) { find_expires(e); }); @@ -105,7 +105,7 @@ void client_mode::udp_server_incoming(std::unique_ptr data, size_t da std::shared_ptr> data_wrapper_ptr = std::make_shared>(key_number); auto udp_func = std::bind(&client_mode::udp_client_incoming_to_udp, this, _1, _2, _3, _4, _5); - std::shared_ptr udp_forwarder = std::make_shared(io_context, sequence_task_pool, task_limit, true, data_wrapper_ptr, udp_func); + std::shared_ptr udp_forwarder = std::make_shared(io_context, sequence_task_pool_peer, task_limit, true, data_wrapper_ptr, udp_func); if (udp_forwarder == nullptr) return; @@ -236,7 +236,7 @@ void client_mode::udp_client_incoming_to_udp_with_thread_pool(std::weak_ptr unique_nullptr; auto function_and_data = task_assigner.submit(task_function, std::move(unique_nullptr)); - sequence_task_pool.push_task((size_t)this, std::move(function_and_data), std::move(data)); + sequence_task_pool_local.push_task((size_t)this, std::move(function_and_data), std::move(data)); } void client_mode::udp_client_incoming_to_udp_unpack(std::weak_ptr> wrapper_weak_ptr, std::unique_ptr data, size_t plain_size, udp::endpoint peer, asio::ip::port_type local_port_number) @@ -431,7 +431,7 @@ void client_mode::loop_change_new_port() asio::error_code ec; auto udp_func = std::bind(&client_mode::udp_client_incoming_to_udp, this, _1, _2, _3, _4, _5); - auto udp_forwarder = std::make_shared(io_context, sequence_task_pool, task_limit, true, wrapper_ptr, udp_func); + auto udp_forwarder = std::make_shared(io_context, sequence_task_pool_peer, task_limit, true, wrapper_ptr, udp_func); if (udp_forwarder == nullptr) continue; diff --git a/src/networks/client.hpp b/src/networks/client.hpp index 517ccd1..b282b12 100644 --- a/src/networks/client.hpp +++ b/src/networks/client.hpp @@ -43,7 +43,8 @@ class client_mode asio::steady_timer timer_keep_alive; //asio::strand asio_strand; ttp::task_thread_pool &task_assigner; - ttp::task_group_pool &sequence_task_pool; + ttp::task_group_pool &sequence_task_pool_local; + ttp::task_group_pool &sequence_task_pool_peer; const size_t task_limit; void udp_server_incoming(std::unique_ptr data, size_t data_size, udp::endpoint peer, asio::ip::port_type port_number); @@ -69,7 +70,7 @@ class client_mode client_mode(const client_mode &) = delete; client_mode& operator=(const client_mode &) = delete; - client_mode(asio::io_context &io_context_ref, asio::io_context &net_io, ttp::task_thread_pool &task_pool, ttp::task_group_pool &seq_task_pool, size_t task_count_limit, const user_settings &settings) : + client_mode(asio::io_context &io_context_ref, asio::io_context &net_io, ttp::task_thread_pool &task_pool, ttp::task_group_pool &seq_task_pool_local, ttp::task_group_pool &seq_task_pool_peer, size_t task_count_limit, const user_settings &settings) : io_context(io_context_ref), network_io(net_io), timer_find_timeout(io_context), @@ -78,7 +79,8 @@ class client_mode timer_keep_alive(io_context), //asio_strand(asio::make_strand(io_context.get_executor())), task_assigner(task_pool), - sequence_task_pool(seq_task_pool), + sequence_task_pool_local(seq_task_pool_local), + sequence_task_pool_peer(seq_task_pool_peer), task_limit(task_count_limit), current_settings(settings) {} @@ -91,7 +93,8 @@ class client_mode timer_keep_alive(std::move(existing_client.timer_keep_alive)), //asio_strand(std::move(existing_client.asio_strand)), task_assigner(existing_client.task_assigner), - sequence_task_pool(existing_client.sequence_task_pool), + sequence_task_pool_local(existing_client.sequence_task_pool_local), + sequence_task_pool_peer(existing_client.sequence_task_pool_peer), task_limit(existing_client.task_limit), current_settings(std::move(existing_client.current_settings)) {} diff --git a/src/networks/connections.cpp b/src/networks/connections.cpp index f369d9c..d508812 100644 --- a/src/networks/connections.cpp +++ b/src/networks/connections.cpp @@ -98,6 +98,8 @@ void udp_server::continue_receive() void udp_server::async_send_out(std::unique_ptr> data, udp::endpoint client_endpoint) { + if (data == nullptr) + return; std::vector &buffer = *data; connection_socket.async_send_to(asio::buffer(buffer), client_endpoint, [data_ = std::move(data)](const asio::error_code &error, size_t bytes_transferred) {}); @@ -105,6 +107,8 @@ void udp_server::async_send_out(std::unique_ptr> data, udp: void udp_server::async_send_out(std::unique_ptr data, size_t data_size, udp::endpoint peer_endpoint) { + if (data == nullptr) + return; uint8_t *buffer_raw_ptr = data.get(); connection_socket.async_send_to(asio::buffer(buffer_raw_ptr, data_size), peer_endpoint, [data_ = std::move(data)](const asio::error_code &error, size_t bytes_transferred) {}); @@ -112,6 +116,8 @@ void udp_server::async_send_out(std::unique_ptr data, size_t data_siz void udp_server::async_send_out(std::unique_ptr data, const uint8_t *data_ptr, size_t data_size, udp::endpoint client_endpoint) { + if (data == nullptr) + return; connection_socket.async_send_to(asio::buffer(data_ptr, data_size), client_endpoint, [data_ = std::move(data)](const asio::error_code &error, size_t bytes_transferred) {}); } @@ -153,6 +159,10 @@ void udp_server::handle_receive(std::unique_ptr buffer_cache, const a udp::endpoint copy_of_incoming_endpoint = incoming_endpoint; start_receive(); + + if (buffer_cache == nullptr || bytes_transferred == 0) + return; + if (BUFFER_SIZE - bytes_transferred < BUFFER_EXPAND_SIZE) { std::unique_ptr new_buffer = std::make_unique(BUFFER_SIZE + BUFFER_EXPAND_SIZE); @@ -163,9 +173,7 @@ void udp_server::handle_receive(std::unique_ptr buffer_cache, const a if (enable_thread_pool) { size_t pointer_to_number = (size_t)this; - if (auto each_thread_task_limit = task_limit / sequence_task_pool.get_thread_count(); - task_limit > 0 && each_thread_task_limit > 0 && - sequence_task_pool.get_task_count(pointer_to_number) > each_thread_task_limit) + if (task_limit > 0 && sequence_task_pool.get_task_count(pointer_to_number) > task_limit) return; sequence_task_pool.push_task(pointer_to_number, [this, bytes_transferred, copy_of_incoming_endpoint](std::unique_ptr data) mutable { callback(std::move(data), bytes_transferred, copy_of_incoming_endpoint, port_number); }, @@ -252,7 +260,7 @@ size_t udp_client::send_out(const std::vector &data, udp::endpoint peer size_t udp_client::send_out(const uint8_t *data, size_t size, udp::endpoint peer_endpoint, asio::error_code &ec) { - if (stopped.load()) + if (stopped.load() || data == nullptr) return 0; size_t sent_size = connection_socket.send_to(asio::buffer(data, size), peer_endpoint, 0, ec); @@ -262,7 +270,7 @@ size_t udp_client::send_out(const uint8_t *data, size_t size, udp::endpoint peer void udp_client::async_send_out(std::unique_ptr> data, udp::endpoint peer_endpoint) { - if (stopped.load()) + if (stopped.load() || data == nullptr) return; std::vector &buffer = *data; @@ -273,7 +281,7 @@ void udp_client::async_send_out(std::unique_ptr> data, udp: void udp_client::async_send_out(std::unique_ptr data, size_t data_size, udp::endpoint peer_endpoint) { - if (stopped.load()) + if (stopped.load() || data == nullptr) return; uint8_t *buffer_raw_ptr = data.get(); @@ -284,6 +292,8 @@ void udp_client::async_send_out(std::unique_ptr data, size_t data_siz void udp_client::async_send_out(std::unique_ptr data, const uint8_t *data_ptr, size_t data_size, udp::endpoint client_endpoint) { + if (data == nullptr || data_ptr == nullptr) + return; connection_socket.async_send_to(asio::buffer(data_ptr, data_size), client_endpoint, [data_ = std::move(data)](const asio::error_code &error, size_t bytes_transferred) {}); last_send_time.store(right_now()); @@ -347,6 +357,10 @@ void udp_client::handle_receive(std::unique_ptr buffer_cache, const a udp::endpoint copy_of_incoming_endpoint = incoming_endpoint; start_receive(); + + if (buffer_cache == nullptr || bytes_transferred == 0) + return; + if (BUFFER_SIZE - bytes_transferred < BUFFER_EXPAND_SIZE) { std::unique_ptr new_buffer = std::make_unique(BUFFER_SIZE + BUFFER_EXPAND_SIZE); @@ -357,9 +371,7 @@ void udp_client::handle_receive(std::unique_ptr buffer_cache, const a if (enable_thread_pool) { size_t pointer_to_number = (size_t)this; - if (auto each_thread_task_limit = task_limit / sequence_task_pool.get_thread_count(); - task_limit > 0 && each_thread_task_limit > 0 && - sequence_task_pool.get_task_count(pointer_to_number) > each_thread_task_limit) + if (task_limit > 0 && sequence_task_pool.get_task_count(pointer_to_number) > task_limit) return; sequence_task_pool.push_task(pointer_to_number, [this, bytes_transferred, copy_of_incoming_endpoint](std::unique_ptr data) mutable { callback(std::move(data), bytes_transferred, copy_of_incoming_endpoint, 0); }, diff --git a/src/networks/connections.hpp b/src/networks/connections.hpp index f0704fc..7860c9f 100644 --- a/src/networks/connections.hpp +++ b/src/networks/connections.hpp @@ -25,7 +25,7 @@ using asio::ip::tcp; using asio::ip::udp; constexpr uint8_t TIME_GAP = std::numeric_limits::max(); //seconds -constexpr size_t BUFFER_SIZE = 4096u; +constexpr size_t BUFFER_SIZE = 2048u; constexpr size_t BUFFER_EXPAND_SIZE = 128u; constexpr size_t EMPTY_PACKET_SIZE = 1430u; constexpr size_t RAW_HEADER_SIZE = 12u; diff --git a/src/networks/server.cpp b/src/networks/server.cpp index 2f97ceb..8148f67 100644 --- a/src/networks/server.cpp +++ b/src/networks/server.cpp @@ -78,7 +78,7 @@ void server_mode::udp_server_incoming_with_thread_pool(std::unique_ptr unique_nullptr; auto function_and_data = task_assigner.submit(task_function, std::move(unique_nullptr)); - sequence_task_pool.push_task((size_t)this, std::move(function_and_data), std::move(data)); + sequence_task_pool_peer.push_task((size_t)this, std::move(function_and_data), std::move(data)); } void server_mode::udp_server_incoming_unpack(std::unique_ptr data, size_t plain_size, udp::endpoint peer, asio::ip::port_type port_number) @@ -199,7 +199,7 @@ bool server_mode::create_new_udp_connection(std::unique_ptr data, con { udp_client_incoming(std::move(input_data), data_size, peer, port_number, wrapper); }; - std::unique_ptr target_connector = std::make_unique(io_context, sequence_task_pool, task_limit, false, udp_func_ap); + std::unique_ptr target_connector = std::make_unique(io_context, sequence_task_pool_local, task_limit, true, udp_func_ap); asio::error_code ec; target_connector->send_out(create_raw_random_data(EMPTY_PACKET_SIZE), local_empty_target, ec); @@ -471,7 +471,7 @@ bool server_mode::start() listen_on_ep.port(port_number); try { - udp_servers.insert({ port_number, std::make_unique(network_io, sequence_task_pool, task_limit, true, listen_on_ep, func) }); + udp_servers.insert({ port_number, std::make_unique(network_io, sequence_task_pool_peer, task_limit, true, listen_on_ep, func) }); } catch (std::exception &ex) { diff --git a/src/networks/server.hpp b/src/networks/server.hpp index 6abd0a7..de3d2ae 100644 --- a/src/networks/server.hpp +++ b/src/networks/server.hpp @@ -39,7 +39,8 @@ class server_mode asio::steady_timer timer_keep_alive; //asio::strand asio_strand; ttp::task_thread_pool &task_assigner; - ttp::task_group_pool &sequence_task_pool; + ttp::task_group_pool &sequence_task_pool_local; + ttp::task_group_pool &sequence_task_pool_peer; const size_t task_limit; std::unique_ptr udp_target; @@ -70,7 +71,7 @@ class server_mode server_mode(const server_mode &) = delete; server_mode& operator=(const server_mode &) = delete; - server_mode(asio::io_context &io_context_ref, asio::io_context &net_io, ttp::task_thread_pool &task_pool, ttp::task_group_pool &seq_task_pool, size_t task_count_limit, const user_settings &settings) + server_mode(asio::io_context &io_context_ref, asio::io_context &net_io, ttp::task_thread_pool &task_pool, ttp::task_group_pool &seq_task_pool_local, ttp::task_group_pool &seq_task_pool_peer, size_t task_count_limit, const user_settings &settings) : io_context(io_context_ref), network_io(net_io), timer_expiring_wrapper(io_context), @@ -79,7 +80,8 @@ class server_mode timer_keep_alive(io_context), //asio_strand(asio::make_strand(io_context.get_executor())), task_assigner(task_pool), - sequence_task_pool(seq_task_pool), + sequence_task_pool_local(seq_task_pool_local), + sequence_task_pool_peer(seq_task_pool_peer), task_limit(task_count_limit), external_ipv4_port(0), external_ipv4_address(0), @@ -97,7 +99,8 @@ class server_mode timer_keep_alive(std::move(existing_server.timer_keep_alive)), //asio_strand(std::move(existing_server.asio_strand)), task_assigner(existing_server.task_assigner), - sequence_task_pool(existing_server.sequence_task_pool), + sequence_task_pool_local(existing_server.sequence_task_pool_local), + sequence_task_pool_peer(existing_server.sequence_task_pool_peer), task_limit(existing_server.task_limit), external_ipv4_port(existing_server.external_ipv4_port.load()), external_ipv4_address(existing_server.external_ipv4_address.load()),