Skip to content

Commit

Permalink
FEC: faster recovery
Browse files Browse the repository at this point in the history
  • Loading branch information
cnbatch committed Dec 2, 2023
1 parent 4bb3959 commit 57aa5af
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 9 deletions.
2 changes: 1 addition & 1 deletion src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ int main(int argc, char *argv[])
if (argc <= 1)
{
char app_name[] = "udphop";
printf("%s version 20231126\n", app_name);
printf("%s version 20231202\n", app_name);
printf("Usage: %s config1.conf\n", app_name);
printf(" %s config1.conf config2.conf...\n", app_name);
return 0;
Expand Down
19 changes: 16 additions & 3 deletions src/networks/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ void client_mode::udp_forwarder_incoming_to_udp_unpack(std::shared_ptr<udp_mappi
original_data.second = fec_data_size;
std::copy_n(fec_data_ptr, fec_data_size, original_data.first.get());
udp_session_ptr->fec_egress_control.fec_rcv_cache[fec_sn][fec_sub_sn] = std::move(original_data);
fec_find_missings(udp_session_ptr.get(), udp_session_ptr->fec_egress_control, fec_sn, current_settings.fec_data);
return;
}
else // original data
Expand Down Expand Up @@ -368,13 +369,25 @@ void client_mode::fec_find_missings(udp_mappings *udp_session_ptr, fec_control_d
{
++next_iter;
auto sn = iter->first;
if (fec_sn == sn)
continue;;
auto &mapped_data = iter->second;
if (mapped_data.size() < max_fec_data_count)
{
if (fec_sn - sn > FEC_WAITS)
{
fec_controllor.fec_rcv_cache.erase(iter);
if (auto rcv_sn_iter = fec_controllor.fec_rcv_restored.find(sn);
rcv_sn_iter != fec_controllor.fec_rcv_restored.end())
fec_controllor.fec_rcv_restored.erase(rcv_sn_iter);
}
continue;
}
if (auto rcv_sn_iter = fec_controllor.fec_rcv_restored.find(sn); rcv_sn_iter != fec_controllor.fec_rcv_restored.end())
{
if (fec_sn - sn > FEC_WAITS)
{
fec_controllor.fec_rcv_cache.erase(iter);
fec_controllor.fec_rcv_restored.erase(rcv_sn_iter);
}
continue;
}
auto [recv_data, fec_align_length] = compact_into_container(mapped_data, max_fec_data_count);
Expand All @@ -387,7 +400,7 @@ void client_mode::fec_find_missings(udp_mappings *udp_session_ptr, fec_control_d
udp_access_point->async_send_out(std::move(data), missed_data_ptr, missed_data_size, udp_session_ptr->ingress_source_endpoint);
}

fec_controllor.fec_rcv_cache.erase(iter);
fec_controllor.fec_rcv_restored.insert(sn);
}
}

Expand Down
4 changes: 3 additions & 1 deletion src/networks/connections.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <tuple>
#include <shared_mutex>
#include <unordered_map>
#include <unordered_set>
#include <asio.hpp>

#include "../shares/share_defines.hpp"
Expand All @@ -34,7 +35,7 @@ constexpr size_t RAW_HEADER_SIZE = 12u;
constexpr size_t RETRY_TIMES = 30u;
constexpr size_t RETRY_WAITS = 2u;
constexpr size_t CLEANUP_WAITS = 10u; // second
constexpr size_t FEC_WAITS = 3u; // second
constexpr uint16_t FEC_WAITS = 3u; // second
constexpr auto STUN_RESEND = std::chrono::seconds(30);
constexpr auto FINDER_TIMEOUT_INTERVAL = std::chrono::seconds(1);
constexpr auto CHANGEPORT_UPDATE_INTERVAL = std::chrono::seconds(1);
Expand Down Expand Up @@ -387,6 +388,7 @@ struct fec_control_data
std::atomic<uint32_t> fec_snd_sub_sn;
std::vector<std::pair<std::unique_ptr<uint8_t[]>, size_t>> fec_snd_cache;
std::map<uint32_t, std::map<uint16_t, std::pair<std::unique_ptr<uint8_t[]>, size_t>>> fec_rcv_cache; // uint32_t = snd_sn, uint16_t = sub_sn
std::unordered_set<uint32_t> fec_rcv_restored;
fecpp::fec_code fecc;
};

Expand Down
20 changes: 16 additions & 4 deletions src/networks/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ void server_mode::udp_listener_incoming_unpack(std::unique_ptr<uint8_t[]> data,
original_data.second = fec_data_size;
std::copy_n(fec_data_ptr, fec_data_size, original_data.first.get());
udp_session_ptr->fec_ingress_control.fec_rcv_cache[fec_sn][fec_sub_sn] = std::move(original_data);
fec_find_missings(udp_session_ptr.get(), udp_session_ptr->fec_ingress_control, fec_sn, current_settings.fec_data);
return;
}
else // original data
Expand Down Expand Up @@ -346,14 +347,25 @@ void server_mode::fec_find_missings(udp_mappings *udp_session_ptr, fec_control_d
{
++next_iter;
auto sn = iter->first;
if (fec_sn == sn)
continue;

auto &mapped_data = iter->second;
if (mapped_data.size() < max_fec_data_count)
{
if (fec_sn - sn > FEC_WAITS)
{
fec_controllor.fec_rcv_cache.erase(iter);
if (auto rcv_sn_iter = fec_controllor.fec_rcv_restored.find(sn);
rcv_sn_iter != fec_controllor.fec_rcv_restored.end())
fec_controllor.fec_rcv_restored.erase(rcv_sn_iter);
}
continue;
}
if (auto rcv_sn_iter = fec_controllor.fec_rcv_restored.find(sn); rcv_sn_iter != fec_controllor.fec_rcv_restored.end())
{
if (fec_sn - sn > FEC_WAITS)
{
fec_controllor.fec_rcv_cache.erase(iter);
fec_controllor.fec_rcv_restored.erase(rcv_sn_iter);
}
continue;
}
auto [recv_data, fec_align_length] = compact_into_container(mapped_data, max_fec_data_count);
Expand All @@ -366,7 +378,7 @@ void server_mode::fec_find_missings(udp_mappings *udp_session_ptr, fec_control_d
udp_session_ptr->local_udp->async_send_out(std::move(data), missed_data_ptr, missed_data_size, *udp_target);
}

fec_controllor.fec_rcv_cache.erase(iter);
fec_controllor.fec_rcv_restored.insert(sn);
}
}

Expand Down

0 comments on commit 57aa5af

Please sign in to comment.