From 6af4374448665fb856e58e2fe1b42a24f16d41fa Mon Sep 17 00:00:00 2001 From: cnbatch Date: Sat, 16 Sep 2023 17:09:57 +0800 Subject: [PATCH] stability update, enhancing stability during multi-connection. --- src/3rd_party/thread_pool.hpp | 38 ++-- src/main.cpp | 2 +- src/networks/client.cpp | 377 +++++++++++++++------------------ src/networks/client.hpp | 48 ++--- src/networks/connections.cpp | 1 + src/networks/connections.hpp | 383 ++++++++++++++++++++++------------ src/networks/server.cpp | 144 ++++++------- src/networks/server.hpp | 19 +- src/shares/CMakeLists.txt | 2 +- 9 files changed, 536 insertions(+), 478 deletions(-) diff --git a/src/3rd_party/thread_pool.hpp b/src/3rd_party/thread_pool.hpp index 90e090c..3a976a1 100644 --- a/src/3rd_party/thread_pool.hpp +++ b/src/3rd_party/thread_pool.hpp @@ -73,7 +73,7 @@ namespace ttp } [[nodiscard]] - size_t get_task_count(size_t number) const + size_t get_task_count() const { return tasks_total.load(); } @@ -109,8 +109,8 @@ namespace ttp { std::scoped_lock tasks_lock(tasks_mutex); tasks.push_back({ task_function, std::move(data) }); + ++tasks_total; } - ++tasks_total; task_available_cv.notify_one(); } @@ -198,10 +198,13 @@ namespace ttp */ void wait_for_tasks() { - waiting = true; - std::unique_lock tasks_lock(tasks_mutex); - task_done_cv.wait(tasks_lock, [this] { return (tasks_total == 0); }); - waiting = false; + if (!waiting) + { + waiting = true; + std::unique_lock tasks_lock(tasks_mutex); + task_done_cv.wait(tasks_lock, [this] { return (tasks_total == 0); }); + waiting = false; + } } private: @@ -227,7 +230,10 @@ namespace ttp void destroy_threads() { running = false; - task_available_cv.notify_all(); + { + const std::scoped_lock tasks_lock(tasks_mutex); + task_available_cv.notify_all(); + } for (concurrency_t i = 0; i < thread_count; ++i) { threads[i].join(); @@ -423,8 +429,8 @@ namespace ttp { std::scoped_lock tasks_lock(tasks_mutex_of_threads[thread_number]); task_queue_of_threads[thread_number].push_back({ task_function, std::move(data) }); + ++tasks_total_of_threads[thread_number]; } - ++tasks_total_of_threads[thread_number]; task_available_cv[thread_number].notify_one(); } @@ -439,8 +445,8 @@ namespace ttp task_function(std::move(data)); }; task_queue_of_threads[thread_number].push_back({ task_func, std::move(data) }); + ++tasks_total_of_threads[thread_number]; } - ++tasks_total_of_threads[thread_number]; task_available_cv[thread_number].notify_one(); } @@ -527,13 +533,16 @@ namespace ttp */ void wait_for_tasks() { - waiting = true; - for (concurrency_t i = 0; i < thread_count; ++i) + if (!waiting) { - std::unique_lock tasks_lock(tasks_mutex_of_threads[i]); - task_done_cv.wait(tasks_lock, [this, i] { return (tasks_total_of_threads[i].load() == 0); }); + waiting = true; + for (concurrency_t i = 0; i < thread_count; ++i) + { + std::unique_lock tasks_lock(tasks_mutex_of_threads[i]); + task_done_cv.wait(tasks_lock, [this, i] { return (tasks_total_of_threads[i].load() == 0); }); + } + waiting = false; } - waiting = false; } private: @@ -561,6 +570,7 @@ namespace ttp running = false; for (concurrency_t i = 0; i < thread_count; ++i) { + const std::scoped_lock tasks_lock(tasks_mutex_of_threads[i]); task_available_cv[i].notify_all(); } diff --git a/src/main.cpp b/src/main.cpp index 6a6347b..5af2541 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -16,7 +16,7 @@ int main(int argc, char *argv[]) if (argc <= 1) { char app_name[] = "udphop"; - printf("%s version 20230427\n", app_name); + printf("%s version 20230916\n", app_name); printf("Usage: %s config1.conf\n", app_name); printf(" %s config1.conf config2.conf...\n", app_name); return 0; diff --git a/src/networks/client.cpp b/src/networks/client.cpp index d8d11c6..257d734 100644 --- a/src/networks/client.cpp +++ b/src/networks/client.cpp @@ -13,7 +13,7 @@ using namespace std::literals; client_mode::~client_mode() { timer_find_timeout.cancel(); - timer_change_ports.cancel(); + timer_expiring_sessions.cancel(); timer_keep_alive.cancel(); } @@ -52,17 +52,14 @@ bool client_mode::start() try { - udp_callback_t udp_func_ap = std::bind(&client_mode::udp_server_incoming, this, _1, _2, _3, _4); + udp_callback_t udp_func_ap = std::bind(&client_mode::udp_listener_incoming, this, _1, _2, _3, _4); udp_access_point = std::make_unique(network_io, sequence_task_pool_local, task_limit, listen_on_ep, udp_func_ap); timer_find_timeout.expires_after(FINDER_TIMEOUT_INTERVAL); timer_find_timeout.async_wait([this](const asio::error_code &e) { find_expires(e); }); - timer_expiring_wrapper.expires_after(EXPRING_UPDATE_INTERVAL); - timer_expiring_wrapper.async_wait([this](const asio::error_code &e) { expiring_wrapper_loops(e); }); - - timer_change_ports.expires_after(CHANGEPORT_UPDATE_INTERVAL); - timer_change_ports.async_wait([this](const asio::error_code &e) { change_new_port(e); }); + timer_expiring_sessions.expires_after(EXPRING_UPDATE_INTERVAL); + timer_expiring_sessions.async_wait([this](const asio::error_code &e) { expiring_wrapper_loops(e); }); if (current_settings.keep_alive > 0) { @@ -81,25 +78,25 @@ bool client_mode::start() return true; } -void client_mode::udp_server_incoming(std::unique_ptr data, size_t data_size, udp::endpoint peer, asio::ip::port_type port_number) +void client_mode::udp_listener_incoming(std::unique_ptr data, size_t data_size, udp::endpoint peer, asio::ip::port_type port_number) { if (data_size == 0) return; - std::shared_ptr> wrapper_session = nullptr; + std::shared_ptr udp_session = nullptr; { - std::shared_lock share_locker_udp_session_map_to_wrapper{ mutex_udp_session_map_to_wrapper, std::defer_lock }; - std::unique_lock unique_locker_udp_session_map_to_wrapper{ mutex_udp_session_map_to_wrapper, std::defer_lock }; + std::shared_lock share_locker_udp_session_map_to_wrapper{ mutex_udp_endpoint_map_to_session, std::defer_lock }; + std::unique_lock unique_locker_udp_session_map_to_wrapper{ mutex_udp_endpoint_map_to_session, std::defer_lock }; share_locker_udp_session_map_to_wrapper.lock(); - auto iter = udp_session_map_to_wrapper.find(peer); - if (iter == udp_session_map_to_wrapper.end()) + auto iter = udp_endpoint_map_to_session.find(peer); + if (iter == udp_endpoint_map_to_session.end()) { share_locker_udp_session_map_to_wrapper.unlock(); unique_locker_udp_session_map_to_wrapper.lock(); - iter = udp_session_map_to_wrapper.find(peer); - if (iter == udp_session_map_to_wrapper.end()) + iter = udp_endpoint_map_to_session.find(peer); + if (iter == udp_endpoint_map_to_session.end()) { const std::string& destination_address = current_settings.destination_address; uint16_t destination_port = current_settings.destination_port; @@ -108,51 +105,31 @@ void client_mode::udp_server_incoming(std::unique_ptr data, size_t da uint32_t key_number = generate_token_number(); - std::shared_ptr> data_wrapper_ptr = std::make_shared>(key_number); - auto udp_func = std::bind(&client_mode::udp_client_incoming_to_udp, this, _1, _2, _3, _4, _5); - std::shared_ptr udp_forwarder = std::make_shared(io_context, sequence_task_pool_peer, task_limit, data_wrapper_ptr, udp_func, current_settings.ipv4_only); + std::shared_ptr udp_session_ptr = std::make_shared(); + auto udp_func = std::bind(&client_mode::udp_connector_incoming_to_udp, this, _1, _2, _3, _4, _5); + std::shared_ptr udp_forwarder = std::make_shared(io_context, sequence_task_pool_peer, task_limit, udp_session_ptr, udp_func, current_settings.ipv4_only); if (udp_forwarder == nullptr) return; - asio::error_code ec; - udp::endpoint endpoint_target; - for (int i = 0; i <= RETRY_TIMES; ++i) - { - udp::resolver::results_type udp_endpoints = udp_forwarder->get_remote_hostname(destination_address, destination_port, ec); - if (ec) - { - std::string error_message = time_to_string_with_square_brackets() + ec.message(); - std::cerr << error_message << "\n"; - print_message_to_file(error_message + "\n", current_settings.log_messages); - std::this_thread::sleep_for(std::chrono::seconds(RETRY_WAITS)); - } - else if (udp_endpoints.size() == 0) - { - std::string error_message = time_to_string_with_square_brackets() + "destination address not found\n"; - std::cerr << error_message; - if (!current_settings.log_messages.empty()) - print_message_to_file(error_message, current_settings.log_messages); - std::this_thread::sleep_for(std::chrono::seconds(RETRY_WAITS)); - } - else - { - endpoint_target = *udp_endpoints.begin(); - std::scoped_lock locker{ mutex_udp_target }; - udp_target = std::make_unique(endpoint_target); - previous_udp_target = std::make_unique(endpoint_target); - break; - } - } - - if (ec) + bool success = get_udp_target(udp_forwarder, udp_session_ptr->egress_target_endpoint); + if (!success) return; - uint8_t* packing_data_ptr = data.get(); + std::shared_ptr data_wrapper_ptr = std::make_shared(key_number, udp_session_ptr); + udp_session_ptr->wrapper_ptr = data_wrapper_ptr; + udp_session_ptr->changeport_timestamp.store(right_now() + current_settings.dynamic_port_refresh); + udp_session_ptr->egress_forwarder = udp_forwarder; + udp_session_ptr->egress_previous_target_endpoint = udp_session_ptr->egress_target_endpoint; + udp_session_ptr->ingress_source_endpoint = peer; + + uint8_t *packing_data_ptr = data.get(); size_t packed_data_size = data_wrapper_ptr->pack_data(packing_data_ptr, data_size); auto [error_message, cipher_size] = encrypt_data(current_settings.encryption_password, current_settings.encryption, packing_data_ptr, (int)packed_data_size); if (!error_message.empty() || cipher_size == 0) return; - udp_forwarder->send_out(packing_data_ptr, cipher_size, endpoint_target, ec); + + asio::error_code ec; + udp_forwarder->send_out(packing_data_ptr, cipher_size, udp_session_ptr->egress_target_endpoint, ec); if (ec) { std::string error_message = time_to_string_with_square_brackets() + "Cannot Send Data: " + ec.message(); @@ -162,47 +139,37 @@ void client_mode::udp_server_incoming(std::unique_ptr data, size_t da } udp_forwarder->async_receive(); - data_wrapper_ptr->forwarder_ptr.store(udp_forwarder.get()); - data_wrapper_ptr->cached_data = peer; - - std::unique_lock lock_wrapper_changeport_timestamp{ mutex_wrapper_changeport_timestamp }; - wrapper_changeport_timestamp[data_wrapper_ptr].store(right_now() + current_settings.dynamic_port_refresh); - lock_wrapper_changeport_timestamp.unlock(); - - udp_session_map_to_wrapper.insert({ peer, data_wrapper_ptr }); - { - std::scoped_lock lockers{ mutex_wrapper_session_map_to_udp, mutex_id_map_to_forwarder, mutex_wrapper_channels }; - wrapper_session_map_to_udp[key_number] = peer; - id_map_to_forwarder.insert({ key_number, udp_forwarder }); - wrapper_channels.insert({ key_number, data_wrapper_ptr }); - } + std::scoped_lock locker{ mutex_udp_session_channels }; + udp_session_channels[key_number] = udp_session_ptr; + udp_endpoint_map_to_session[peer] = udp_session_ptr; return; } else { - wrapper_session = iter->second; + udp_session = iter->second; } } else { - wrapper_session = iter->second; + udp_session = iter->second; } } uint8_t *packing_data_ptr = data.get(); - auto packed_data_size = wrapper_session->pack_data(packing_data_ptr, data_size); + auto packed_data_size = udp_session->wrapper_ptr->pack_data(packing_data_ptr, data_size); auto [error_message, cipher_size] = encrypt_data(current_settings.encryption_password, current_settings.encryption, packing_data_ptr, (int)packed_data_size); if (error_message.empty() && cipher_size > 0) - wrapper_session->send_data(std::move(data), packing_data_ptr, cipher_size, get_remote_address()); + udp_session->egress_forwarder->async_send_out(std::move(data), packing_data_ptr, cipher_size, udp_session->egress_target_endpoint); + change_new_port(udp_session); } -void client_mode::udp_client_incoming_to_udp(std::weak_ptr> wrapper_weak_ptr, std::unique_ptr data, size_t data_size, udp::endpoint peer, asio::ip::port_type local_port_number) +void client_mode::udp_connector_incoming_to_udp(std::weak_ptr udp_session_weak_ptr, std::unique_ptr data, size_t data_size, udp::endpoint peer, asio::ip::port_type local_port_number) { - std::shared_ptr> wrapper = wrapper_weak_ptr.lock(); - if (data_size == 0 || wrapper == nullptr) + std::shared_ptr udp_session_ptr = udp_session_weak_ptr.lock(); + if (data_size == 0 || udp_session_ptr == nullptr) return; if (data_size < RAW_HEADER_SIZE) @@ -213,30 +180,23 @@ void client_mode::udp_client_incoming_to_udp(std::weak_ptr> wrapper_weak_ptr, std::unique_ptr data, size_t plain_size, udp::endpoint peer, asio::ip::port_type local_port_number) +void client_mode::udp_connector_incoming_to_udp_unpack(std::shared_ptr udp_session_ptr, std::unique_ptr data, size_t plain_size, udp::endpoint peer, asio::ip::port_type local_port_number) { - std::shared_ptr> wrapper = wrapper_weak_ptr.lock(); - if (plain_size == 0 || wrapper == nullptr) + if (plain_size == 0 || udp_session_ptr == nullptr) return; uint8_t *data_ptr = data.get(); - uint32_t iden = data_wrapper::extract_iden(data_ptr); - if (wrapper->get_iden() != iden) - { - return; - } - - if (std::shared_lock lock_id_map_to_forwarder{ mutex_id_map_to_forwarder }; - id_map_to_forwarder.find(iden) == id_map_to_forwarder.end()) + uint32_t iden = udp_session_ptr->wrapper_ptr->extract_iden(data_ptr); + if (udp_session_ptr->wrapper_ptr->get_iden() != iden) { return; } - auto [packet_timestamp, received_data_ptr, received_size] = wrapper->receive_data(data_ptr, plain_size); + auto [packet_timestamp, received_data_ptr, received_size] = udp_session_ptr->wrapper_ptr->receive_data(data_ptr, plain_size); if (received_size == 0) return; @@ -246,30 +206,79 @@ void client_mode::udp_client_incoming_to_udp_unpack(std::weak_ptr TIME_GAP) return; - const udp::endpoint &udp_endpoint = wrapper->cached_data; + std::shared_lock shared_locker_ingress_endpoint{ udp_session_ptr->mutex_ingress_endpoint }; + udp::endpoint udp_endpoint = udp_session_ptr->ingress_source_endpoint; + shared_locker_ingress_endpoint.unlock(); + udp_access_point->async_send_out(std::move(data), received_data_ptr, received_size, udp_endpoint); } - std::shared_lock shared_lock_udp_target{ mutex_udp_target }; - if (*udp_target != peer && *previous_udp_target != peer) + std::shared_lock shared_lock_udp_target{ udp_session_ptr->mutex_egress_endpoint }; + if (udp_session_ptr->egress_target_endpoint != peer && udp_session_ptr->egress_previous_target_endpoint != peer) { shared_lock_udp_target.unlock(); - std::unique_lock unique_lock_udp_target{ mutex_udp_target }; - if (*udp_target != peer) + std::scoped_lock unique_lock_udp_target{ udp_session_ptr->mutex_egress_endpoint, mutex_target_address }; + if (udp_session_ptr->egress_target_endpoint != peer) { - *previous_udp_target = *udp_target; - *udp_target = peer; + udp_session_ptr->egress_previous_target_endpoint = udp_session_ptr->egress_target_endpoint; + udp_session_ptr->egress_target_endpoint = peer; + *target_address = peer.address(); } } } -udp::endpoint client_mode::get_remote_address() +bool client_mode::get_udp_target(std::shared_ptr target_connector, udp::endpoint &udp_target) { - udp::endpoint ep; - std::shared_lock locker{ mutex_udp_target }; - ep = *udp_target; - locker.unlock(); - return ep; + if (target_address != nullptr) + { + uint16_t destination_port = current_settings.destination_port; + if (destination_port == 0) + destination_port = generate_new_port_number(current_settings.destination_port_start, current_settings.destination_port_end); + + udp_target = udp::endpoint(*target_address, destination_port); + return true; + } + + return update_udp_target(target_connector, udp_target); +} + +bool client_mode::update_udp_target(std::shared_ptr target_connector, udp::endpoint &udp_target) +{ + uint16_t destination_port = current_settings.destination_port; + if (destination_port == 0) + destination_port = generate_new_port_number(current_settings.destination_port_start, current_settings.destination_port_end); + + bool connect_success = false; + asio::error_code ec; + for (int i = 0; i <= RETRY_TIMES; ++i) + { + const std::string &destination_address = current_settings.destination_address; + udp::resolver::results_type udp_endpoints = target_connector->get_remote_hostname(destination_address, destination_port, ec); + if (ec) + { + std::string error_message = time_to_string_with_square_brackets() + ec.message() + "\n"; + std::cerr << error_message; + print_message_to_file(error_message, current_settings.log_messages); + std::this_thread::sleep_for(std::chrono::seconds(RETRY_WAITS)); + } + else if (udp_endpoints.size() == 0) + { + std::string error_message = time_to_string_with_square_brackets() + "destination address not found\n"; + std::cerr << error_message; + print_message_to_file(error_message, current_settings.log_messages); + std::this_thread::sleep_for(std::chrono::seconds(RETRY_WAITS)); + } + else + { + std::scoped_lock locker{ mutex_target_address }; + udp_target = *udp_endpoints.begin(); + target_address = std::make_unique(udp_target.address()); + connect_success = true; + break; + } + } + + return connect_success; } uint16_t client_mode::generate_new_port_number(uint16_t start_port_num, uint16_t end_port_num) @@ -321,133 +330,52 @@ void client_mode::cleanup_expiring_data_connections() { auto time_right_now = right_now(); - std::scoped_lock lockers{ mutex_wrapper_channels, mutex_expiring_wrapper, mutex_wrapper_changeport_timestamp, - mutex_udp_session_map_to_wrapper, mutex_wrapper_session_map_to_udp }; - for (auto iter = expiring_wrapper.begin(), next_iter = iter; iter != expiring_wrapper.end(); iter = next_iter) + std::scoped_lock lockers{ mutex_expiring_sessions, mutex_udp_endpoint_map_to_session }; + for (auto iter = expiring_sessions.begin(), next_iter = iter; iter != expiring_sessions.end(); iter = next_iter) { ++next_iter; - uint32_t iden = iter->first; - auto &[wrapper_ptr, expire_time] = iter->second; + auto &[udp_session_ptr, expire_time] = *iter; + uint32_t iden = udp_session_ptr->wrapper_ptr->get_iden(); if (calculate_difference(time_right_now, expire_time) < CLEANUP_WAITS) continue; - std::unique_lock locker_id_map_to_forwarder{ mutex_id_map_to_forwarder }; - if (auto forwarder_iter = id_map_to_forwarder.find(iden); - forwarder_iter != id_map_to_forwarder.end()) - { - std::shared_ptr forwarder_ptr = forwarder_iter->second; - forwarder_ptr->remove_callback(); - forwarder_ptr->stop(); - std::unique_lock locker_expiring_forwarders{ mutex_expiring_forwarders }; - if (expiring_forwarders.find(forwarder_ptr) == expiring_forwarders.end()) - expiring_forwarders.insert({ forwarder_ptr, right_now() }); - locker_expiring_forwarders.unlock(); - id_map_to_forwarder.erase(forwarder_iter); - } - locker_id_map_to_forwarder.unlock(); - - udp::endpoint &udp_endpoint = wrapper_session_map_to_udp[iden]; - udp_session_map_to_wrapper.erase(udp_endpoint); - wrapper_session_map_to_udp.erase(iden); - wrapper_changeport_timestamp.erase(wrapper_ptr); - expiring_wrapper.erase(iter); + udp_endpoint_map_to_session.erase(udp_session_ptr->ingress_source_endpoint); + expiring_sessions.erase(iter); } } void client_mode::loop_timeout_sessions() { - std::scoped_lock lockers{ mutex_wrapper_channels, mutex_expiring_wrapper, mutex_wrapper_changeport_timestamp }; - for (auto iter = wrapper_channels.begin(), next_iter = iter; iter != wrapper_channels.end(); iter = next_iter) + std::scoped_lock lockers{ mutex_udp_session_channels, mutex_expiring_sessions }; + for (auto iter = udp_session_channels.begin(), next_iter = iter; iter != udp_session_channels.end(); iter = next_iter) { ++next_iter; uint32_t iden = iter->first; - std::shared_ptr> data_ptr = iter->second; - - std::shared_lock locker_id_map_to_forwarder{ mutex_id_map_to_forwarder }; - auto fordwarder_iter = id_map_to_forwarder.find(iden); - if (fordwarder_iter == id_map_to_forwarder.end()) - continue; - std::shared_ptr udp_forwarder = fordwarder_iter->second; - locker_id_map_to_forwarder.unlock(); + std::shared_ptr udp_session_ptr = iter->second; - if (udp_forwarder->time_gap_of_receive() > current_settings.timeout && - udp_forwarder->time_gap_of_send() > current_settings.timeout) + if (udp_session_ptr->egress_forwarder->time_gap_of_receive() > current_settings.timeout && + udp_session_ptr->egress_forwarder->time_gap_of_send() > current_settings.timeout) { - if (expiring_wrapper.find(iden) == expiring_wrapper.end()) - expiring_wrapper.insert({ iden, std::pair{ data_ptr, right_now() } }); + if (expiring_sessions.find(udp_session_ptr) == expiring_sessions.end()) + expiring_sessions[udp_session_ptr] = right_now(); - wrapper_channels.erase(iter); - wrapper_changeport_timestamp.erase(data_ptr); + udp_session_channels.erase(iter); + udp_session_ptr->changeport_timestamp.store(LLONG_MAX); } } } -void client_mode::loop_change_new_port() -{ - std::shared_lock locker{ mutex_wrapper_changeport_timestamp }; - for (auto &[wrapper_ptr, timestamp] : wrapper_changeport_timestamp) - { - if (timestamp.load() > right_now()) - continue; - timestamp += current_settings.dynamic_port_refresh; - - uint32_t iden = wrapper_ptr->get_iden(); - asio::error_code ec; - - auto udp_func = std::bind(&client_mode::udp_client_incoming_to_udp, this, _1, _2, _3, _4, _5); - auto udp_forwarder = std::make_shared(io_context, sequence_task_pool_peer, task_limit, wrapper_ptr, udp_func, current_settings.ipv4_only); - if (udp_forwarder == nullptr) - continue; - - if (current_settings.destination_port_start != current_settings.destination_port_end) - { - uint16_t new_port_numer = generate_new_port_number(current_settings.destination_port_start, current_settings.destination_port_end); - std::scoped_lock locker{ mutex_udp_target }; - *previous_udp_target = *udp_target; - *udp_target = udp::endpoint(udp_target->address(), new_port_numer); - } - - std::shared_ptr new_forwarder = udp_forwarder; - std::vector keep_alive_packet = create_empty_data(current_settings.encryption_password, current_settings.encryption, EMPTY_PACKET_SIZE); - wrapper_ptr->write_iden(keep_alive_packet.data()); - if (current_settings.ipv4_only) - new_forwarder->send_out(std::move(keep_alive_packet), local_empty_target_v4, ec); - else - new_forwarder->send_out(std::move(keep_alive_packet), local_empty_target_v6, ec); - - if (ec) - { - timestamp += current_settings.dynamic_port_refresh; - return; - } - new_forwarder->async_receive(); - - std::unique_lock locker_id_map_to_forwarder{ mutex_id_map_to_forwarder }; - auto iter_forwarder = id_map_to_forwarder.find(iden); - if (iter_forwarder == id_map_to_forwarder.end()) - continue; - - std::shared_ptr old_forwarder = iter_forwarder->second; - std::swap(udp_forwarder, iter_forwarder->second); - locker_id_map_to_forwarder.unlock(); - wrapper_ptr->forwarder_ptr.store(new_forwarder.get()); - - std::scoped_lock lock_expiring_forwarders{ mutex_expiring_forwarders }; - expiring_forwarders.insert({ old_forwarder, right_now() }); - } -} - void client_mode::loop_keep_alive() { - std::shared_lock locker{ mutex_wrapper_changeport_timestamp }; - for (auto& [wrapper_ptr, timestamp] : wrapper_changeport_timestamp) + std::shared_lock locker{ mutex_udp_session_channels }; + for (auto &[iden, udp_session_ptr] : udp_session_channels) { - if (timestamp.load() > right_now()) + if (udp_session_ptr->changeport_timestamp.load() > right_now()) { std::vector keep_alive_packet = create_empty_data(current_settings.encryption_password, current_settings.encryption, EMPTY_PACKET_SIZE); - wrapper_ptr->write_iden(keep_alive_packet.data()); - wrapper_ptr->send_data(std::move(keep_alive_packet), get_remote_address()); + udp_session_ptr->wrapper_ptr->write_iden(keep_alive_packet.data()); + udp_session_ptr->egress_forwarder->async_send_out(std::move(keep_alive_packet), udp_session_ptr->egress_target_endpoint); } } } @@ -471,19 +399,56 @@ void client_mode::expiring_wrapper_loops(const asio::error_code & e) cleanup_expiring_forwarders(); cleanup_expiring_data_connections(); - timer_expiring_wrapper.expires_after(EXPRING_UPDATE_INTERVAL); - timer_expiring_wrapper.async_wait([this](const asio::error_code &e) { expiring_wrapper_loops(e); }); + timer_expiring_sessions.expires_after(EXPRING_UPDATE_INTERVAL); + timer_expiring_sessions.async_wait([this](const asio::error_code &e) { expiring_wrapper_loops(e); }); } -void client_mode::change_new_port(const asio::error_code & e) +void client_mode::change_new_port(std::shared_ptr udp_mappings_ptr) { - if (e == asio::error::operation_aborted) + if (udp_mappings_ptr->changeport_timestamp.load() > right_now()) + return; + udp_mappings_ptr->changeport_timestamp += current_settings.dynamic_port_refresh; + + uint32_t iden = udp_mappings_ptr->wrapper_ptr->get_iden(); + asio::error_code ec; + + auto udp_func = std::bind(&client_mode::udp_connector_incoming_to_udp, this, _1, _2, _3, _4, _5); + auto udp_forwarder = std::make_shared(io_context, sequence_task_pool_peer, task_limit, udp_mappings_ptr, udp_func, current_settings.ipv4_only); + if (udp_forwarder == nullptr) return; - loop_change_new_port(); + uint16_t destination_port_start = current_settings.destination_port_start; + uint16_t destination_port_end = current_settings.destination_port_end; + if (destination_port_start != destination_port_end) + { + uint16_t new_port_numer = generate_new_port_number(destination_port_start, destination_port_end); + std::shared_lock locker{ mutex_target_address }; + asio::ip::address temp_address = *target_address; + locker.unlock(); + std::scoped_lock locker_egress{ udp_mappings_ptr->mutex_egress_endpoint }; + udp_mappings_ptr->egress_target_endpoint.address(temp_address); + udp_mappings_ptr->egress_target_endpoint.port(new_port_numer); + } + + std::shared_ptr new_forwarder = udp_forwarder; + std::vector keep_alive_packet = create_empty_data(current_settings.encryption_password, current_settings.encryption, EMPTY_PACKET_SIZE); + udp_mappings_ptr->wrapper_ptr->write_iden(keep_alive_packet.data()); + + if (current_settings.ipv4_only) + new_forwarder->send_out(std::move(keep_alive_packet), local_empty_target_v4, ec); + else + new_forwarder->send_out(std::move(keep_alive_packet), local_empty_target_v6, ec); + + if (ec) + return; + + new_forwarder->async_receive(); + + std::shared_ptr old_forwarder = udp_mappings_ptr->egress_forwarder; + udp_mappings_ptr->egress_forwarder = new_forwarder; - timer_change_ports.expires_after(CHANGEPORT_UPDATE_INTERVAL); - timer_change_ports.async_wait([this](const asio::error_code &e) { change_new_port(e); }); + std::scoped_lock lock_expiring_forwarders{ mutex_expiring_forwarders }; + expiring_forwarders.insert({ old_forwarder, right_now() }); } void client_mode::keep_alive(const asio::error_code& e) diff --git a/src/networks/client.hpp b/src/networks/client.hpp index 2d9f984..cf24fb7 100644 --- a/src/networks/client.hpp +++ b/src/networks/client.hpp @@ -12,54 +12,42 @@ class client_mode user_settings current_settings; std::unique_ptr udp_access_point; - std::shared_mutex mutex_id_map_to_forwarder; - std::map> id_map_to_forwarder; + std::shared_mutex mutex_udp_endpoint_map_to_session; + std::map> udp_endpoint_map_to_session; + std::shared_mutex mutex_udp_session_channels; + std::map> udp_session_channels; - std::shared_mutex mutex_udp_session_map_to_wrapper; - std::map>> udp_session_map_to_wrapper; - std::shared_mutex mutex_wrapper_session_map_to_udp; - std::map wrapper_session_map_to_udp; - - - std::mutex mutex_wrapper_channels; - std::map>> wrapper_channels; - - std::mutex mutex_expiring_wrapper; - std::map>, int64_t>> expiring_wrapper; + std::mutex mutex_expiring_sessions; + std::map, int64_t, std::owner_less<>> expiring_sessions; std::mutex mutex_expiring_forwarders; std::map, int64_t, std::owner_less<>> expiring_forwarders; - std::shared_mutex mutex_udp_target; - std::shared_ptr udp_target; - std::shared_ptr previous_udp_target; - - std::shared_mutex mutex_wrapper_changeport_timestamp; - std::map>, std::atomic, std::owner_less<>> wrapper_changeport_timestamp; + std::shared_mutex mutex_target_address; + std::unique_ptr target_address; asio::steady_timer timer_find_timeout; - asio::steady_timer timer_expiring_wrapper; - asio::steady_timer timer_change_ports; + asio::steady_timer timer_expiring_sessions; asio::steady_timer timer_keep_alive; ttp::task_group_pool &sequence_task_pool_local; ttp::task_group_pool &sequence_task_pool_peer; const size_t task_limit; - void udp_server_incoming(std::unique_ptr data, size_t data_size, udp::endpoint peer, asio::ip::port_type port_number); - void udp_client_incoming_to_udp(std::weak_ptr>, std::unique_ptr data, size_t data_size, udp::endpoint peer, asio::ip::port_type local_port_number); - void udp_client_incoming_to_udp_unpack(std::weak_ptr> wrapper_weak_ptr, std::unique_ptr data, size_t data_size, udp::endpoint peer, asio::ip::port_type local_port_number); - udp::endpoint get_remote_address(); + void udp_listener_incoming(std::unique_ptr data, size_t data_size, udp::endpoint peer, asio::ip::port_type port_number); + void udp_connector_incoming_to_udp(std::weak_ptr udp_session_weak_ptr, std::unique_ptr data, size_t data_size, udp::endpoint peer, asio::ip::port_type local_port_number); + void udp_connector_incoming_to_udp_unpack(std::shared_ptr udp_session_weak_ptr, std::unique_ptr data, size_t data_size, udp::endpoint peer, asio::ip::port_type local_port_number); + bool get_udp_target(std::shared_ptr target_connector, udp::endpoint &udp_target); + bool update_udp_target(std::shared_ptr target_connector, udp::endpoint &udp_target); uint16_t generate_new_port_number(uint16_t start_port_num, uint16_t end_port_num); uint32_t generate_token_number(); void cleanup_expiring_forwarders(); void cleanup_expiring_data_connections(); void loop_timeout_sessions(); - void loop_change_new_port(); void loop_keep_alive(); void find_expires(const asio::error_code &e); void expiring_wrapper_loops(const asio::error_code &e); - void change_new_port(const asio::error_code &e); + void change_new_port(std::shared_ptr udp_mappings_ptr); void keep_alive(const asio::error_code &e); public: @@ -71,8 +59,7 @@ class client_mode io_context(io_context_ref), network_io(net_io), timer_find_timeout(io_context), - timer_expiring_wrapper(io_context), - timer_change_ports(io_context), + timer_expiring_sessions(io_context), timer_keep_alive(io_context), sequence_task_pool_local(seq_task_pool_local), sequence_task_pool_peer(seq_task_pool_peer), @@ -83,8 +70,7 @@ class client_mode io_context(existing_client.io_context), network_io(existing_client.network_io), timer_find_timeout(std::move(existing_client.timer_find_timeout)), - timer_expiring_wrapper(std::move(existing_client.timer_expiring_wrapper)), - timer_change_ports(std::move(existing_client.timer_change_ports)), + timer_expiring_sessions(std::move(existing_client.timer_expiring_sessions)), timer_keep_alive(std::move(existing_client.timer_keep_alive)), sequence_task_pool_local(existing_client.sequence_task_pool_local), sequence_task_pool_peer(existing_client.sequence_task_pool_peer), diff --git a/src/networks/connections.cpp b/src/networks/connections.cpp index 3a28aae..a05eb54 100644 --- a/src/networks/connections.cpp +++ b/src/networks/connections.cpp @@ -19,6 +19,7 @@ void empty_udp_callback(std::unique_ptr tmp1, size_t tmps, udp::endpo { } + std::unique_ptr send_stun_3489_request(udp_server &sender, const std::string &stun_host, bool v4_only) { auto udp_version = v4_only ? udp::v4() : udp::v6(); diff --git a/src/networks/connections.hpp b/src/networks/connections.hpp index b68b3a3..f43b0ad 100644 --- a/src/networks/connections.hpp +++ b/src/networks/connections.hpp @@ -40,7 +40,7 @@ const asio::ip::udp::endpoint local_empty_target_v4(asio::ip::make_address_v4("1 const asio::ip::udp::endpoint local_empty_target_v6(asio::ip::make_address_v6("::1"), 70); -class forwarder; +struct udp_mappings; using udp_callback_t = std::function, size_t, udp::endpoint, asio::ip::port_type)>; @@ -48,6 +48,111 @@ int64_t right_now(); void empty_udp_callback(std::unique_ptr tmp1, size_t tmps, udp::endpoint tmp2, asio::ip::port_type tmp3); +namespace packet +{ +#pragma pack (push, 1) + struct packet_layer + { + uint32_t iden; + int64_t timestamp; + uint8_t data[1]; + }; +#pragma pack(pop) + + class data_wrapper + { + private: + const uint32_t iden; + std::weak_ptr udp_session_ptr; + + public: + data_wrapper() = delete; + data_wrapper(uint32_t id, std::weak_ptr related_session_ptr) : + iden(id), udp_session_ptr(related_session_ptr) {} + + static uint32_t extract_iden(const std::vector &input_data) + { + const packet_layer *ptr = (const packet_layer *)input_data.data(); + return ptr->iden; + } + + static uint32_t extract_iden(const uint8_t *input_data) + { + const packet_layer *ptr = (const packet_layer *)input_data; + return ptr->iden; + } + + uint32_t get_iden() { return iden; } + + void write_iden(uint8_t *input_data) + { + packet_layer *ptr = (packet_layer *)input_data; + ptr->iden = iden; + } + + std::tuple receive_data(const uint8_t *input_data, size_t length) + { + const packet_layer *ptr = (const packet_layer *)input_data; + int64_t timestamp = ptr->timestamp; + const uint8_t *data_ptr = ptr->data; + size_t data_size = length - (data_ptr - input_data); + + return { timestamp, data_ptr, data_size }; + } + + std::pair> receive_data(const std::vector &input_data) + { + const packet_layer *ptr = (const packet_layer *)input_data.data(); + int64_t timestamp = ptr->timestamp; + const uint8_t *data_ptr = ptr->data; + + size_t data_size = input_data.size() - (data_ptr - input_data.data()); + + return { timestamp, std::vector(data_ptr, data_ptr + data_size) }; + } + + std::vector pack_data(const uint8_t *input_data, size_t data_size) + { + auto timestamp = right_now(); + size_t new_size = sizeof(packet_layer) - 1 + data_size; + + std::vector new_data(new_size); + packet_layer *ptr = (packet_layer *)new_data.data(); + ptr->iden = iden; + ptr->timestamp = timestamp; + uint8_t *data_ptr = ptr->data; + + if (data_size > 0) + std::copy_n(input_data, data_size, data_ptr); + + return new_data; + } + + size_t pack_data(uint8_t *input_data, size_t data_size) + { + auto timestamp = right_now(); + size_t new_size = sizeof(packet_layer) - 1 + data_size; + uint8_t new_data[BUFFER_SIZE + BUFFER_EXPAND_SIZE] = {}; + + packet_layer *ptr = (packet_layer *)new_data; + ptr->iden = iden; + ptr->timestamp = timestamp; + uint8_t *data_ptr = ptr->data; + + if (data_size > 0) + std::copy_n(input_data, data_size, data_ptr); + + std::copy_n(new_data, new_size, input_data); + + return new_size; + } + + std::vector pack_data(const std::vector &input_data) + { + return pack_data(input_data.data(), input_data.size()); + } + }; +} class udp_server { @@ -148,139 +253,139 @@ class udp_client const bool ipv4_only; }; -template -class data_wrapper -{ -private: - uint32_t iden; - -public: - std::atomic forwarder_ptr; - // forwarder: local endpoint; - // udp_server: weak_ptr of local udp_channel (udp_client) - C cached_data; - - data_wrapper() = delete; - data_wrapper(uint32_t id) : iden(id) {} - - static uint32_t extract_iden(const std::vector &input_data) - { - const uint8_t *ptr = input_data.data(); - uint32_t ident = reinterpret_cast(ptr)[0]; - return ident; - } - - static uint32_t extract_iden(const uint8_t *input_data) - { - uint32_t ident = reinterpret_cast(input_data)[0]; - return ident; - } - - void write_iden(uint8_t *input_data) - { - reinterpret_cast(input_data)[0] = iden; - } - - uint32_t get_iden() { return iden; } - - std::tuple receive_data(const uint8_t *input_data, size_t data_size) - { - const uint8_t *ptr = input_data; - uint32_t iden = reinterpret_cast(ptr)[0]; - - ptr = ptr + sizeof(iden); - int64_t timestamp = reinterpret_cast(ptr)[0]; - - ptr = ptr + sizeof(timestamp); - size_t new_data_size = data_size - (ptr - input_data); - - return { timestamp, ptr, new_data_size }; - } - - std::pair> receive_data(const std::vector &input_data) - { - const uint8_t *ptr = input_data.data(); - uint32_t iden = reinterpret_cast(ptr)[0]; - - ptr = ptr + sizeof(iden); - int64_t timestamp = reinterpret_cast(ptr)[0]; - - ptr = ptr + sizeof(timestamp); - size_t data_size = input_data.size() - (ptr - input_data.data()); - - return { timestamp, std::vector(ptr, ptr + data_size) }; - } - - std::vector pack_data(const uint8_t *input_data, size_t data_size) - { - auto timestamp = right_now(); - - std::vector new_data(sizeof(iden) + sizeof(timestamp) + data_size); - uint8_t *ptr = new_data.data(); - reinterpret_cast(ptr)[0] = iden; - - ptr = ptr + sizeof(iden); - reinterpret_cast(ptr)[0] = timestamp; - - ptr = ptr + sizeof(timestamp); - if (data_size > 0) - std::copy_n(input_data, data_size, ptr); - - return new_data; - } - - size_t pack_data(uint8_t *input_data, size_t data_size) - { - auto timestamp = right_now(); - size_t new_size = sizeof(iden) + sizeof(timestamp) + data_size; - uint8_t new_data[BUFFER_SIZE + BUFFER_EXPAND_SIZE] = {}; - - uint8_t *ptr = new_data; - reinterpret_cast(ptr)[0] = iden; - - ptr = ptr + sizeof(iden); - reinterpret_cast(ptr)[0] = timestamp; - - ptr = ptr + sizeof(timestamp); - if (data_size > 0) - std::copy_n(input_data, data_size, ptr); - - std::copy_n(new_data, new_size, input_data); - - return new_size; - } - - std::vector pack_data(const std::vector &input_data) - { - return pack_data(input_data.data(), input_data.size()); - } - - void send_data(std::vector &&output_data, udp::endpoint peer_endpoint) - { - if (forwarder_ptr.load() == nullptr) - return; - - forwarder_ptr.load()->async_send_out(std::move(output_data), peer_endpoint); - } - - void send_data(std::unique_ptr output_data, uint8_t *start_pos, size_t data_size, udp::endpoint peer_endpoint) - { - if (forwarder_ptr.load() == nullptr) - return; - - forwarder_ptr.load()->async_send_out(std::move(output_data), start_pos, data_size, peer_endpoint); - } -}; +//template +//class data_wrapper +//{ +//private: +// uint32_t iden; +// +//public: +// std::atomic forwarder_ptr; +// // forwarder: local endpoint; +// // udp_server: weak_ptr of local udp_channel (udp_client) +// C cached_data; +// +// data_wrapper() = delete; +// data_wrapper(uint32_t id) : iden(id) {} +// +// static uint32_t extract_iden(const std::vector &input_data) +// { +// const uint8_t *ptr = input_data.data(); +// uint32_t ident = reinterpret_cast(ptr)[0]; +// return ident; +// } +// +// static uint32_t extract_iden(const uint8_t *input_data) +// { +// uint32_t ident = reinterpret_cast(input_data)[0]; +// return ident; +// } +// +// void write_iden(uint8_t *input_data) +// { +// reinterpret_cast(input_data)[0] = iden; +// } +// +// uint32_t get_iden() { return iden; } +// +// std::tuple receive_data(const uint8_t *input_data, size_t data_size) +// { +// const uint8_t *ptr = input_data; +// uint32_t iden = reinterpret_cast(ptr)[0]; +// +// ptr = ptr + sizeof(iden); +// int64_t timestamp = reinterpret_cast(ptr)[0]; +// +// ptr = ptr + sizeof(timestamp); +// size_t new_data_size = data_size - (ptr - input_data); +// +// return { timestamp, ptr, new_data_size }; +// } +// +// std::pair> receive_data(const std::vector &input_data) +// { +// const uint8_t *ptr = input_data.data(); +// uint32_t iden = reinterpret_cast(ptr)[0]; +// +// ptr = ptr + sizeof(iden); +// int64_t timestamp = reinterpret_cast(ptr)[0]; +// +// ptr = ptr + sizeof(timestamp); +// size_t data_size = input_data.size() - (ptr - input_data.data()); +// +// return { timestamp, std::vector(ptr, ptr + data_size) }; +// } +// +// std::vector pack_data(const uint8_t *input_data, size_t data_size) +// { +// auto timestamp = right_now(); +// +// std::vector new_data(sizeof(iden) + sizeof(timestamp) + data_size); +// uint8_t *ptr = new_data.data(); +// reinterpret_cast(ptr)[0] = iden; +// +// ptr = ptr + sizeof(iden); +// reinterpret_cast(ptr)[0] = timestamp; +// +// ptr = ptr + sizeof(timestamp); +// if (data_size > 0) +// std::copy_n(input_data, data_size, ptr); +// +// return new_data; +// } +// +// size_t pack_data(uint8_t *input_data, size_t data_size) +// { +// auto timestamp = right_now(); +// size_t new_size = sizeof(iden) + sizeof(timestamp) + data_size; +// uint8_t new_data[BUFFER_SIZE + BUFFER_EXPAND_SIZE] = {}; +// +// uint8_t *ptr = new_data; +// reinterpret_cast(ptr)[0] = iden; +// +// ptr = ptr + sizeof(iden); +// reinterpret_cast(ptr)[0] = timestamp; +// +// ptr = ptr + sizeof(timestamp); +// if (data_size > 0) +// std::copy_n(input_data, data_size, ptr); +// +// std::copy_n(new_data, new_size, input_data); +// +// return new_size; +// } +// +// std::vector pack_data(const std::vector &input_data) +// { +// return pack_data(input_data.data(), input_data.size()); +// } +// +// void send_data(std::vector &&output_data, udp::endpoint peer_endpoint) +// { +// if (forwarder_ptr.load() == nullptr) +// return; +// +// forwarder_ptr.load()->async_send_out(std::move(output_data), peer_endpoint); +// } +// +// void send_data(std::unique_ptr output_data, uint8_t *start_pos, size_t data_size, udp::endpoint peer_endpoint) +// { +// if (forwarder_ptr.load() == nullptr) +// return; +// +// forwarder_ptr.load()->async_send_out(std::move(output_data), start_pos, data_size, peer_endpoint); +// } +//}; class forwarder : public udp_client { public: - using process_data_t = std::function>, std::unique_ptr, size_t, udp::endpoint, asio::ip::port_type)>; + using process_data_t = std::function, std::unique_ptr, size_t, udp::endpoint, asio::ip::port_type)>; forwarder() = delete; - forwarder(asio::io_context &io_context, ttp::task_group_pool &task_pool, size_t task_count_limit, std::weak_ptr> input_wrapper, process_data_t callback_func, bool v4_only = false) : + forwarder(asio::io_context &io_context, ttp::task_group_pool &task_pool, size_t task_count_limit, std::weak_ptr input_session, process_data_t callback_func, bool v4_only = false) : udp_client(io_context, task_pool, task_count_limit, std::bind(&forwarder::handle_receive, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4), v4_only), - wrapper(input_wrapper), callback(callback_func) {} + udp_session_mappings(input_session), callback(callback_func) {} void replace_callback(process_data_t callback_func) { @@ -289,7 +394,7 @@ class forwarder : public udp_client void remove_callback() { - callback = [](std::weak_ptr> wrapper, std::unique_ptr data, size_t data_size, udp::endpoint ep, asio::ip::port_type num) {}; + callback = [](std::weak_ptr udp_session_mappings, std::unique_ptr data, size_t data_size, udp::endpoint ep, asio::ip::port_type num) {}; } private: @@ -298,15 +403,31 @@ class forwarder : public udp_client if (paused.load() || stopped.load()) return; - if (wrapper.expired()) + if (udp_session_mappings.expired()) return; - callback(wrapper, std::move(data), data_size, peer, local_port_number); + callback(udp_session_mappings, std::move(data), data_size, peer, local_port_number); } - std::weak_ptr> wrapper; + std::weak_ptr udp_session_mappings; process_data_t callback; }; + +struct udp_mappings +{ + std::shared_ptr wrapper_ptr; + std::shared_mutex mutex_ingress_endpoint; + udp::endpoint ingress_source_endpoint; + std::shared_mutex mutex_egress_endpoint; + udp::endpoint egress_target_endpoint; + udp::endpoint egress_previous_target_endpoint; + std::shared_ptr egress_forwarder; // client only + std::atomic ingress_sender; // server only + std::unique_ptr local_udp; // server only + std::atomic changeport_timestamp; +}; + + std::unique_ptr send_stun_3489_request(udp_server &sender, const std::string &stun_host, bool v4_only = false); std::unique_ptr send_stun_8489_request(udp_server &sender, const std::string &stun_host, bool v4_only = false); void resend_stun_8489_request(udp_server &sender, const std::string &stun_host, rfc8489::stun_header *header, bool v4_only = false); diff --git a/src/networks/server.cpp b/src/networks/server.cpp index e88a4cf..4f5c6ce 100644 --- a/src/networks/server.cpp +++ b/src/networks/server.cpp @@ -43,41 +43,41 @@ void server_mode::udp_server_incoming(std::unique_ptr data, size_t da void server_mode::udp_server_incoming_unpack(std::unique_ptr data, size_t plain_size, udp::endpoint peer, asio::ip::port_type port_number) { uint8_t *data_ptr = data.get(); - uint32_t iden = data_wrapper>::extract_iden(data_ptr); + uint32_t iden = packet::data_wrapper::extract_iden(data_ptr); if (iden == 0) { return; } - std::shared_ptr>> wrapper = nullptr; + std::shared_ptr udp_session_ptr = nullptr; { std::shared_lock share_locker_wrapper_channels{ mutex_wrapper_channels, std::defer_lock }; std::unique_lock unique_locker_wrapper_channels{ mutex_wrapper_channels, std::defer_lock }; share_locker_wrapper_channels.lock(); - auto wrapper_channel_iter = wrapper_channels.find(iden); - if (wrapper_channel_iter == wrapper_channels.end()) + auto wrapper_channel_iter = udp_session_channels.find(iden); + if (wrapper_channel_iter == udp_session_channels.end()) { share_locker_wrapper_channels.unlock(); unique_locker_wrapper_channels.lock(); - wrapper_channel_iter = wrapper_channels.find(iden); - if (wrapper_channel_iter == wrapper_channels.end()) + wrapper_channel_iter = udp_session_channels.find(iden); + if (wrapper_channel_iter == udp_session_channels.end()) { udp_server_incoming_new_connection(std::move(data), plain_size, peer, port_number); return; } else { - wrapper = wrapper_channel_iter->second; + udp_session_ptr = wrapper_channel_iter->second; } } else { - wrapper = wrapper_channel_iter->second; + udp_session_ptr = wrapper_channel_iter->second; } } - auto [packet_timestamp, received_data, received_size] = wrapper->receive_data(data_ptr, plain_size); + auto [packet_timestamp, received_data, received_size] = udp_session_ptr->wrapper_ptr->receive_data(data_ptr, plain_size); if (received_size == 0) return; @@ -87,35 +87,40 @@ void server_mode::udp_server_incoming_unpack(std::unique_ptr data, si if (calculate_difference(timestamp, packet_timestamp) > TIME_GAP) return; - wrapper->forwarder_ptr.store(udp_servers[port_number].get()); + udp_session_ptr->ingress_sender.store(udp_servers[port_number].get()); - udp_client *udp_channel = wrapper->cached_data.get(); + udp_client *udp_channel = udp_session_ptr->local_udp.get(); if (udp_channel == nullptr) return; udp_channel->async_send_out(std::move(data), received_data, received_size, *udp_target); } - std::shared_lock shared_locker_wrapper_session_map_to_source_udp{ mutex_wrapper_session_map_to_source_udp }; - if (auto wrapper_iter = wrapper_session_map_to_source_udp.find(wrapper); wrapper_iter != wrapper_session_map_to_source_udp.end()) + std::shared_lock shared_locker_ingress_endpoint{ udp_session_ptr->mutex_ingress_endpoint }; + if (udp_session_ptr->ingress_source_endpoint != peer) { - if (wrapper_iter->second != peer) - { - shared_locker_wrapper_session_map_to_source_udp.unlock(); - std::unique_lock unique_locker_wrapper_session_map_to_source_udp{ mutex_wrapper_session_map_to_source_udp }; - if (wrapper_iter->second != peer) - wrapper_iter->second = peer; - } + shared_locker_ingress_endpoint.unlock(); + std::unique_lock unique_locker_ingress_endpoint{ udp_session_ptr->mutex_ingress_endpoint }; + if (udp_session_ptr->ingress_source_endpoint != peer) + udp_session_ptr->ingress_source_endpoint = peer; } } -void server_mode::udp_client_incoming(std::unique_ptr data, size_t data_size, udp::endpoint peer, asio::ip::port_type port_number, std::shared_ptr>> wrapper_session) +void server_mode::udp_client_incoming(std::unique_ptr data, size_t data_size, udp::endpoint peer, asio::ip::port_type port_number, std::weak_ptr udp_session_weak_ptr) { uint8_t *packing_data_ptr = data.get(); - auto packed_data_size = wrapper_session->pack_data(packing_data_ptr, data_size); + std::shared_ptr udp_session_ptr = udp_session_weak_ptr.lock(); + if (packing_data_ptr == nullptr || udp_session_ptr == nullptr) + return; + + std::shared_lock shared_locker_ingress_endpoint{ udp_session_ptr->mutex_ingress_endpoint }; + udp::endpoint udp_endpoint = udp_session_ptr->ingress_source_endpoint; + shared_locker_ingress_endpoint.unlock(); + + auto packed_data_size = udp_session_ptr->wrapper_ptr->pack_data(packing_data_ptr, data_size); auto [error_message, cipher_size] = encrypt_data(current_settings.encryption_password, current_settings.encryption, packing_data_ptr, (int)packed_data_size); if (error_message.empty() && cipher_size > 0) - wrapper_session->send_data(std::move(data), packing_data_ptr, cipher_size, get_remote_address(wrapper_session)); + udp_session_ptr->ingress_sender.load()->async_send_out(std::move(data), packing_data_ptr, cipher_size, udp_endpoint); } void server_mode::udp_server_incoming_new_connection(std::unique_ptr data, size_t data_size, udp::endpoint peer, asio::ip::port_type port_number) @@ -125,8 +130,10 @@ void server_mode::udp_server_incoming_new_connection(std::unique_ptr uint8_t *data_ptr = data.get(); - uint32_t iden = data_wrapper>::extract_iden(data_ptr); - std::shared_ptr>> wrapper = std::make_shared>>(iden); + uint32_t iden = packet::data_wrapper::extract_iden(data_ptr); + std::shared_ptr udp_session_ptr = std::make_shared(); + std::shared_ptr wrapper = std::make_shared(iden, udp_session_ptr); + udp_session_ptr->wrapper_ptr = wrapper; auto [packet_timestamp, received_data, received_size] = wrapper->receive_data(data_ptr, data_size); if (received_size == 0) @@ -136,27 +143,23 @@ void server_mode::udp_server_incoming_new_connection(std::unique_ptr if (calculate_difference(timestamp, packet_timestamp) > TIME_GAP) return; - std::unique_lock locker_wrapper_session_map_to_source_udp{ mutex_wrapper_session_map_to_source_udp }; - wrapper_session_map_to_source_udp[wrapper] = peer; + std::unique_lock locker_wrapper_session_map_to_source_udp{ udp_session_ptr->mutex_ingress_endpoint }; + udp_session_ptr->ingress_source_endpoint = peer; locker_wrapper_session_map_to_source_udp.unlock(); - wrapper->forwarder_ptr.store(udp_servers[port_number].get()); + udp_session_ptr->ingress_sender.store(udp_servers[port_number].get()); - if (create_new_udp_connection(std::move(data), received_data, received_size, wrapper, peer)) - { - wrapper_channels.insert({ iden, wrapper }); - locker_wrapper_session_map_to_source_udp.lock(); - wrapper_session_map_to_source_udp[wrapper] = peer; - locker_wrapper_session_map_to_source_udp.unlock(); - } + if (create_new_udp_connection(std::move(data), received_data, received_size, udp_session_ptr, peer)) + udp_session_channels[iden] = udp_session_ptr; } -bool server_mode::create_new_udp_connection(std::unique_ptr data, const uint8_t *data_ptr, size_t data_size, std::shared_ptr>> wrapper, udp::endpoint peer) +bool server_mode::create_new_udp_connection(std::unique_ptr data, const uint8_t *data_ptr, size_t data_size, std::shared_ptr udp_session_ptr, udp::endpoint peer) { bool connect_success = false; - udp_callback_t udp_func_ap = [wrapper, this](std::unique_ptr input_data, size_t data_size, udp::endpoint peer, asio::ip::port_type port_number) + std::weak_ptr udp_session_weak_ptr = udp_session_ptr; + udp_callback_t udp_func_ap = [udp_session_weak_ptr, this](std::unique_ptr input_data, size_t data_size, udp::endpoint peer, asio::ip::port_type port_number) { - udp_client_incoming(std::move(input_data), data_size, peer, port_number, wrapper); + udp_client_incoming(std::move(input_data), data_size, peer, port_number, udp_session_weak_ptr); }; std::unique_ptr target_connector = std::make_unique(io_context, sequence_task_pool_local, task_limit, udp_func_ap, current_settings.ipv4_only); @@ -173,10 +176,7 @@ bool server_mode::create_new_udp_connection(std::unique_ptr data, con { target_connector->async_receive(); target_connector->async_send_out(std::move(data), data_ptr, data_size, *udp_target); - wrapper->cached_data = std::move(target_connector); - //std::unique_lock locker{ mutex_wrapper_session_map_to_target_udp }; - //wrapper_session_map_to_target_udp.insert({ wrapper, target_connector }); - //locker.unlock(); + udp_session_ptr->local_udp = std::move(target_connector); return true; } @@ -188,18 +188,6 @@ bool server_mode::create_new_udp_connection(std::unique_ptr data, con return connect_success; } -udp::endpoint server_mode::get_remote_address(std::shared_ptr>> wrapper_ptr) -{ - udp::endpoint ep; - std::shared_lock locker_wrapper_session_map_to_source_udp{ mutex_wrapper_session_map_to_source_udp }; - auto iter = wrapper_session_map_to_source_udp.find(wrapper_ptr); - if (iter != wrapper_session_map_to_source_udp.end()) - ep = iter->second; - locker_wrapper_session_map_to_source_udp.unlock(); - - return ep; -} - bool server_mode::update_local_udp_target(udp_client *target_connector) { bool connect_success = false; @@ -278,46 +266,38 @@ void server_mode::cleanup_expiring_data_connections() { auto time_right_now = right_now(); - std::scoped_lock lockers{ mutex_expiring_wrapper, mutex_wrapper_channels }; - for (auto iter = expiring_wrapper.begin(), next_iter = iter; iter != expiring_wrapper.end(); iter = next_iter) + std::scoped_lock lockers{ mutex_expiring_wrapper }; + for (auto iter = expiring_udp_sessions.begin(), next_iter = iter; iter != expiring_udp_sessions.end(); iter = next_iter) { ++next_iter; - std::shared_ptr>> wrapper_ptr = iter->first; + std::shared_ptr udp_session_ptr = iter->first; int64_t expire_time = iter->second; - uint32_t iden = wrapper_ptr->get_iden(); + uint32_t iden = udp_session_ptr->wrapper_ptr->get_iden(); if (calculate_difference(time_right_now, expire_time) < CLEANUP_WAITS) continue; - std::unique_lock locker_wrapper_session_map_to_source_udp{ mutex_wrapper_session_map_to_source_udp }; - wrapper_channels.erase(iden); - wrapper_session_map_to_source_udp.erase(wrapper_ptr); - - expiring_wrapper.erase(iter); + expiring_udp_sessions.erase(iter); } } void server_mode::loop_timeout_sessions() { std::scoped_lock locker_wrapper_looping{ mutex_wrapper_channels, mutex_expiring_wrapper }; - for (auto iter = wrapper_channels.begin(), next_iter = iter; iter != wrapper_channels.end(); iter = next_iter) + for (auto iter = udp_session_channels.begin(), next_iter = iter; iter != udp_session_channels.end(); iter = next_iter) { ++next_iter; uint32_t iden = iter->first; - std::shared_ptr>> wrapper_ptr = iter->second; - //std::unique_lock locker_wrapper_session_map_to_udp{ mutex_wrapper_session_map_to_target_udp }; - udp_client *local_session = wrapper_ptr->cached_data.get(); + std::shared_ptr udp_session_ptr = iter->second; + udp_client *local_session = udp_session_ptr->local_udp.get(); if (local_session == nullptr) continue; if (local_session->time_gap_of_receive() > current_settings.timeout && local_session->time_gap_of_send() > current_settings.timeout) { local_session->stop(); - wrapper_channels.erase(iter); - //wrapper_session_map_to_target_udp.erase(wrapper_ptr); - //std::unique_lock locker_expiring_wrapper{ mutex_expiring_wrapper }; - if (expiring_wrapper.find(wrapper_ptr) == expiring_wrapper.end()) - expiring_wrapper.insert({ wrapper_ptr, right_now() - current_settings.timeout }); - //locker_expiring_wrapper.unlock(); + udp_session_channels.erase(iter); + if (expiring_udp_sessions.find(udp_session_ptr) == expiring_udp_sessions.end()) + expiring_udp_sessions.insert({ udp_session_ptr, right_now() - current_settings.timeout }); } } } @@ -328,14 +308,14 @@ void server_mode::loop_keep_alive() encryption_mode encryption = current_settings.encryption; std::scoped_lock locker_wrapper_looping{ mutex_wrapper_channels }; - for (auto iter = wrapper_channels.begin(), next_iter = iter; iter != wrapper_channels.end(); iter = next_iter) + for (auto iter = udp_session_channels.begin(), next_iter = iter; iter != udp_session_channels.end(); iter = next_iter) { ++next_iter; uint32_t iden = iter->first; - std::shared_ptr>> wrapper_ptr = iter->second; + std::shared_ptr udp_session_ptr = iter->second; std::vector keep_alive_packet = create_empty_data(encryption_password, encryption, EMPTY_PACKET_SIZE); - wrapper_ptr->write_iden(keep_alive_packet.data()); - wrapper_ptr->send_data(std::move(keep_alive_packet), get_remote_address(wrapper_ptr)); + udp_session_ptr->wrapper_ptr->write_iden(keep_alive_packet.data()); + udp_session_ptr->ingress_sender.load()->async_send_out(std::move(keep_alive_packet), udp_session_ptr->ingress_source_endpoint); } } @@ -361,8 +341,8 @@ void server_mode::expiring_wrapper_loops(const asio::error_code &e) cleanup_expiring_data_connections(); - timer_expiring_wrapper.expires_after(EXPRING_UPDATE_INTERVAL); - timer_expiring_wrapper.async_wait([this](const asio::error_code &e) { expiring_wrapper_loops(e); }); + timer_expiring_sessions.expires_after(EXPRING_UPDATE_INTERVAL); + timer_expiring_sessions.async_wait([this](const asio::error_code &e) { expiring_wrapper_loops(e); }); } void server_mode::keep_alive(const asio::error_code& e) @@ -394,7 +374,7 @@ void server_mode::send_stun_request(const asio::error_code &e) server_mode::~server_mode() { - timer_expiring_wrapper.cancel(); + timer_expiring_sessions.cancel(); timer_find_timeout.cancel(); timer_stun.cancel(); timer_keep_alive.cancel(); @@ -461,8 +441,8 @@ bool server_mode::start() try { - timer_expiring_wrapper.expires_after(EXPRING_UPDATE_INTERVAL); - timer_expiring_wrapper.async_wait([this](const asio::error_code &e) { expiring_wrapper_loops(e); }); + timer_expiring_sessions.expires_after(EXPRING_UPDATE_INTERVAL); + timer_expiring_sessions.async_wait([this](const asio::error_code &e) { expiring_wrapper_loops(e); }); timer_find_timeout.expires_after(FINDER_TIMEOUT_INTERVAL); timer_find_timeout.async_wait([this](const asio::error_code &e) { find_expires(e); }); diff --git a/src/networks/server.hpp b/src/networks/server.hpp index 7ec96e4..90aaa45 100644 --- a/src/networks/server.hpp +++ b/src/networks/server.hpp @@ -19,19 +19,15 @@ class server_mode std::array external_ipv6_address; const std::array zero_value_array; - std::map> udp_servers; - std::shared_mutex mutex_wrapper_session_map_to_source_udp; - std::map>>, udp::endpoint, std::owner_less<>> wrapper_session_map_to_source_udp; - std::shared_mutex mutex_wrapper_channels; - std::map>>> wrapper_channels; + std::map> udp_session_channels; std::mutex mutex_expiring_wrapper; - std::map>>, int64_t, std::owner_less<>> expiring_wrapper; + std::map, int64_t, std::owner_less<>> expiring_udp_sessions; - asio::steady_timer timer_expiring_wrapper; + asio::steady_timer timer_expiring_sessions; asio::steady_timer timer_find_timeout; asio::steady_timer timer_stun; asio::steady_timer timer_keep_alive; @@ -43,13 +39,12 @@ class server_mode void udp_server_incoming(std::unique_ptr data, size_t data_size, udp::endpoint peer, asio::ip::port_type port_number); void udp_server_incoming_unpack(std::unique_ptr data, size_t plain_size, udp::endpoint peer, asio::ip::port_type port_number); - void udp_client_incoming(std::unique_ptr data, size_t data_size, udp::endpoint peer, asio::ip::port_type port_number, std::shared_ptr>> wrapper_session); + void udp_client_incoming(std::unique_ptr data, size_t data_size, udp::endpoint peer, asio::ip::port_type port_number, std::weak_ptr udp_session_ptr); void udp_server_incoming_new_connection(std::unique_ptr data, size_t data_size, udp::endpoint peer, asio::ip::port_type port_number); - bool create_new_udp_connection(std::unique_ptr data, const uint8_t *data_ptr, size_t data_size, std::shared_ptr>> wrapper, udp::endpoint peer); + bool create_new_udp_connection(std::unique_ptr data, const uint8_t *data_ptr, size_t data_size, std::shared_ptr udp_session_ptr, udp::endpoint peer); - udp::endpoint get_remote_address(std::shared_ptr>> wrapper_ptr); bool update_local_udp_target(udp_client *target_connector); void save_external_ip_address(uint32_t ipv4_address, uint16_t ipv4_port, const std::array &ipv6_address, uint16_t ipv6_port); @@ -69,7 +64,7 @@ class server_mode server_mode(asio::io_context &io_context_ref, asio::io_context &net_io, ttp::task_group_pool &seq_task_pool_local, ttp::task_group_pool &seq_task_pool_peer, size_t task_count_limit, const user_settings &settings) : io_context(io_context_ref), network_io(net_io), - timer_expiring_wrapper(io_context), + timer_expiring_sessions(io_context), timer_find_timeout(io_context), timer_stun(io_context), timer_keep_alive(io_context), @@ -86,7 +81,7 @@ class server_mode server_mode(server_mode &&existing_server) noexcept : io_context(existing_server.io_context), network_io(existing_server.network_io), - timer_expiring_wrapper(std::move(existing_server.timer_expiring_wrapper)), + timer_expiring_sessions(std::move(existing_server.timer_expiring_sessions)), timer_find_timeout(std::move(existing_server.timer_find_timeout)), timer_stun(std::move(existing_server.timer_stun)), timer_keep_alive(std::move(existing_server.timer_keep_alive)), diff --git a/src/shares/CMakeLists.txt b/src/shares/CMakeLists.txt index cec60be..822c2d3 100644 --- a/src/shares/CMakeLists.txt +++ b/src/shares/CMakeLists.txt @@ -1,6 +1,6 @@ set(THISLIB_NAME SHAREDEFINES) -add_library(${THISLIB_NAME} STATIC share_defines.cpp data_operations.cpp) +add_library(${THISLIB_NAME} STATIC configurations.cpp share_defines.cpp data_operations.cpp) #target_include_directories(${THISLIB_NAME} PUBLIC shares/ PARENT_SCOPE)