Skip to content

Commit

Permalink
stability update, enhancing stability during multi-connection.
Browse files Browse the repository at this point in the history
  • Loading branch information
cnbatch committed Sep 16, 2023
1 parent 3983403 commit 6af4374
Show file tree
Hide file tree
Showing 9 changed files with 536 additions and 478 deletions.
38 changes: 24 additions & 14 deletions src/3rd_party/thread_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ namespace ttp
}

[[nodiscard]]
size_t get_task_count(size_t number) const
size_t get_task_count() const
{
return tasks_total.load();
}
Expand Down Expand Up @@ -109,8 +109,8 @@ namespace ttp
{
std::scoped_lock tasks_lock(tasks_mutex);
tasks.push_back({ task_function, std::move(data) });
++tasks_total;
}
++tasks_total;
task_available_cv.notify_one();
}

Expand Down Expand Up @@ -198,10 +198,13 @@ namespace ttp
*/
void wait_for_tasks()
{
waiting = true;
std::unique_lock<std::mutex> tasks_lock(tasks_mutex);
task_done_cv.wait(tasks_lock, [this] { return (tasks_total == 0); });
waiting = false;
if (!waiting)
{
waiting = true;
std::unique_lock<std::mutex> tasks_lock(tasks_mutex);
task_done_cv.wait(tasks_lock, [this] { return (tasks_total == 0); });
waiting = false;
}
}

private:
Expand All @@ -227,7 +230,10 @@ namespace ttp
void destroy_threads()
{
running = false;
task_available_cv.notify_all();
{
const std::scoped_lock tasks_lock(tasks_mutex);
task_available_cv.notify_all();
}
for (concurrency_t i = 0; i < thread_count; ++i)
{
threads[i].join();
Expand Down Expand Up @@ -423,8 +429,8 @@ namespace ttp
{
std::scoped_lock tasks_lock(tasks_mutex_of_threads[thread_number]);
task_queue_of_threads[thread_number].push_back({ task_function, std::move(data) });
++tasks_total_of_threads[thread_number];
}
++tasks_total_of_threads[thread_number];
task_available_cv[thread_number].notify_one();
}

Expand All @@ -439,8 +445,8 @@ namespace ttp
task_function(std::move(data));
};
task_queue_of_threads[thread_number].push_back({ task_func, std::move(data) });
++tasks_total_of_threads[thread_number];
}
++tasks_total_of_threads[thread_number];
task_available_cv[thread_number].notify_one();
}

Expand Down Expand Up @@ -527,13 +533,16 @@ namespace ttp
*/
void wait_for_tasks()
{
waiting = true;
for (concurrency_t i = 0; i < thread_count; ++i)
if (!waiting)
{
std::unique_lock<std::mutex> tasks_lock(tasks_mutex_of_threads[i]);
task_done_cv.wait(tasks_lock, [this, i] { return (tasks_total_of_threads[i].load() == 0); });
waiting = true;
for (concurrency_t i = 0; i < thread_count; ++i)
{
std::unique_lock<std::mutex> tasks_lock(tasks_mutex_of_threads[i]);
task_done_cv.wait(tasks_lock, [this, i] { return (tasks_total_of_threads[i].load() == 0); });
}
waiting = false;
}
waiting = false;
}

private:
Expand Down Expand Up @@ -561,6 +570,7 @@ namespace ttp
running = false;
for (concurrency_t i = 0; i < thread_count; ++i)
{
const std::scoped_lock tasks_lock(tasks_mutex_of_threads[i]);
task_available_cv[i].notify_all();
}

Expand Down
2 changes: 1 addition & 1 deletion src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ int main(int argc, char *argv[])
if (argc <= 1)
{
char app_name[] = "udphop";
printf("%s version 20230427\n", app_name);
printf("%s version 20230916\n", app_name);
printf("Usage: %s config1.conf\n", app_name);
printf(" %s config1.conf config2.conf...\n", app_name);
return 0;
Expand Down
Loading

0 comments on commit 6af4374

Please sign in to comment.