-
Notifications
You must be signed in to change notification settings - Fork 67
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix std::future status query #596
base: branch-25.04
Are you sure you want to change the base?
Changes from all commits
a81c252
08072fe
8ed7da1
d1ccb2f
a4d6963
45f934b
c5254f5
83ece06
c0a203b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,8 +18,10 @@ | |
#include <atomic> | ||
#include <cassert> | ||
#include <future> | ||
#include <memory> | ||
#include <numeric> | ||
#include <system_error> | ||
#include <type_traits> | ||
#include <utility> | ||
#include <vector> | ||
|
||
|
@@ -32,6 +34,29 @@ namespace kvikio { | |
|
||
namespace detail { | ||
|
||
/** | ||
* @brief Utility function to create a copyable callable from a move-only callable. | ||
* | ||
* The underlying thread pool uses `std::function` (until C++23) or `std::move_only_function` | ||
* (since C++23) as the element type of the task queue. For the former case that currently applies, | ||
* the `std::function` requires its "target" (associated callable) to be copy-constructible. This | ||
* utility function is a workaround for those move-only callables. | ||
* | ||
* @tparam F Callable type. F shall be move-only. | ||
* @param op Callable. | ||
* @return A new callable that satisfies the copy-constructible condition. | ||
*/ | ||
template <typename F> | ||
auto make_copyable_lambda(F op) | ||
{ | ||
// Create the callable on the heap by moving from op. Use a shared pointer to manage its lifetime. | ||
auto sp = std::make_shared<F>(std::move(op)); | ||
|
||
// Use the copyable closure as the proxy of the move-only callable. | ||
return | ||
[sp](auto&&... args) -> decltype(auto) { return (*sp)(std::forward<decltype(args)>(args)...); }; | ||
} | ||
|
||
/** | ||
* @brief Determine the NVTX color and call index. They are used to identify tasks from different | ||
* pread/pwrite calls. Tasks from the same pread/pwrite call are given the same color and call | ||
|
@@ -50,6 +75,11 @@ inline const std::pair<const nvtx_color_type&, std::uint64_t> get_next_color_and | |
return {nvtx_color, call_idx}; | ||
} | ||
|
||
/** | ||
* @brief Submit the task callable to the underlying thread pool. | ||
* | ||
* Both the callable and arguments shall satisfy copy-constructible. | ||
*/ | ||
template <typename F, typename T> | ||
std::future<std::size_t> submit_task(F op, | ||
T buf, | ||
|
@@ -59,12 +89,40 @@ std::future<std::size_t> submit_task(F op, | |
std::uint64_t nvtx_payload = 0ull, | ||
nvtx_color_type nvtx_color = NvtxManager::default_color()) | ||
{ | ||
static_assert(std::is_invocable_r_v<std::size_t, | ||
decltype(op), | ||
decltype(buf), | ||
decltype(size), | ||
decltype(file_offset), | ||
decltype(devPtr_offset)>); | ||
|
||
return defaults::thread_pool().submit_task([=] { | ||
KVIKIO_NVTX_SCOPED_RANGE("task", nvtx_payload, nvtx_color); | ||
return op(buf, size, file_offset, devPtr_offset); | ||
}); | ||
} | ||
|
||
/** | ||
* @brief Submit the move-only task callable to the underlying thread pool. | ||
* | ||
* @tparam F Callable type. F shall be move-only and have no argument. | ||
* @param op Callable. | ||
* @return A future to be used later to check if the operation has finished its execution. | ||
*/ | ||
template <typename F> | ||
std::future<std::size_t> submit_move_only_task( | ||
F op_move_only, | ||
std::uint64_t nvtx_payload = 0ull, | ||
nvtx_color_type nvtx_color = NvtxManager::default_color()) | ||
{ | ||
static_assert(std::is_invocable_r_v<std::size_t, F>); | ||
auto op_copyable = make_copyable_lambda(std::move(op_move_only)); | ||
return defaults::thread_pool().submit_task([=] { | ||
KVIKIO_NVTX_SCOPED_RANGE("task", nvtx_payload, nvtx_color); | ||
return op_copyable(); | ||
}); | ||
} | ||
|
||
} // namespace detail | ||
|
||
/** | ||
|
@@ -90,40 +148,40 @@ std::future<std::size_t> parallel_io(F op, | |
nvtx_color_type nvtx_color = NvtxManager::default_color()) | ||
{ | ||
KVIKIO_EXPECT(task_size > 0, "`task_size` must be positive", std::invalid_argument); | ||
static_assert(std::is_invocable_r_v<std::size_t, | ||
decltype(op), | ||
decltype(buf), | ||
decltype(size), | ||
decltype(file_offset), | ||
decltype(devPtr_offset)>); | ||
|
||
// Single-task guard | ||
if (task_size >= size || page_size >= size) { | ||
return detail::submit_task(op, buf, size, file_offset, devPtr_offset, call_idx, nvtx_color); | ||
} | ||
|
||
// We know an upper bound of the total number of tasks | ||
std::vector<std::future<std::size_t>> tasks; | ||
tasks.reserve(size / task_size + 2); | ||
tasks.reserve(size / task_size); | ||
|
||
// 1) Submit `task_size` sized tasks | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about (possibly simpler):
But, I have a question, why not also submit the final read task and just gather all the results in the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This PR initially made that implementation as you described: we submit the sequence of N tasks, and instead of using There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. PS: The suggested code is much more concise. Replaced as suggested. Thanks! |
||
while (size >= task_size) { | ||
// 1) Submit all tasks but the last one. These are all `task_size` sized tasks. | ||
while (size > task_size) { | ||
tasks.push_back( | ||
detail::submit_task(op, buf, task_size, file_offset, devPtr_offset, call_idx, nvtx_color)); | ||
file_offset += task_size; | ||
devPtr_offset += task_size; | ||
size -= task_size; | ||
} | ||
|
||
// 2) Submit a task for the remainder | ||
if (size > 0) { | ||
tasks.push_back( | ||
detail::submit_task(op, buf, size, file_offset, devPtr_offset, call_idx, nvtx_color)); | ||
} | ||
|
||
// Finally, we sum the result of all tasks. | ||
auto gather_tasks = [](std::vector<std::future<std::size_t>>&& tasks) -> std::size_t { | ||
std::size_t ret = 0; | ||
// 2) Submit the last task, which consists of performing the last I/O and waiting the previous | ||
// tasks. | ||
auto last_task = [=, tasks = std::move(tasks)]() mutable -> std::size_t { | ||
auto ret = op(buf, size, file_offset, devPtr_offset); | ||
for (auto& task : tasks) { | ||
ret += task.get(); | ||
} | ||
return ret; | ||
}; | ||
return std::async(std::launch::deferred, gather_tasks, std::move(tasks)); | ||
return detail::submit_move_only_task(std::move(last_task), call_idx, nvtx_color); | ||
} | ||
|
||
} // namespace kvikio |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is tempting to add the "can't copy" part to the code to improve type safety:
But this does not work.
For the non-copyable
gather_tasks
below, trying to copy construct results in compile error, but the type trait gives unexpected result.A simpler example:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good writeup: https://quuxplusone.github.io/blog/2020/02/05/vector-is-copyable-except-when-its-not/