-
Notifications
You must be signed in to change notification settings - Fork 184
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add classes for Task and SharedTask in threadpool #5391
Conversation
411e836
to
e3aff67
Compare
0b11b03
to
2d0a155
Compare
49dfadf
to
7c9acb3
Compare
Passes CI, the MANYLINUX failure is a known issue. |
7c9acb3
to
bdd4b48
Compare
@@ -432,7 +432,7 @@ Status DenseReader::dense_read() { | |||
// prevent using too much memory when the budget is small and doesn't allow | |||
// to process more than one batch at a time. | |||
if (wait_compute_task_before_read && compute_task.valid()) { | |||
throw_if_not_ok(resources_.compute_tp().wait(compute_task)); | |||
throw_if_not_ok(resources_.compute_tp().wait(&compute_task)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the plan to replace all calls to tp.wait(task)
with task.wait()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, I can do it already if you think it should be in this PR, or in the following work as I was planning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, on the one hand I prefer the existing APIs because the action primarily happens on the thread pool instead of the task, but on the other hand there is already a wait()
method on std::future
that we must be careful not to call because of deadlocks. 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, and also having to pass around a reference to the threadpool object everywhere is quite cumbersome.
e30927b
to
c92a83c
Compare
} | ||
|
||
/** | ||
* Is this task valid. Wait can only be called on vaid tasks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo: valid
} | ||
|
||
/** | ||
* The encapsulated std::shared_future |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The PR description mentions that it is not safe to use the wait
method of this but that doesn't live in any of these comments. I think it probably should. Either that or std::future
isn't really the right thing to use (in the hypothetical future state, for now it makes sense transiently)
*/ | ||
std::future_status wait_for( | ||
const std::chrono::milliseconds timeout_duration) const { | ||
return f_.wait_for(timeout_duration); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This blocks, doesn't it?
So it will prevent other tasks in the thread pool from running on this thread until the duration has elapsed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But from what I've read our thread pool doesn't really have much support for event-based task wake-ups right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes it blocks. The current thread pool is pretty basic. Ideally we should evolve it (maybe transitioning to coroutines?) to be able to put back in the queue a task that is blocked waiting. This will also solve the deadlocks that are very much possible today in our system with that stacking of blocked tasks. Let's discuss more in a call if you'd like.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll follow up in 2025 about coroutines. They're definitely great; but hopefully we can avoid inventing a new runtime
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed, any more incremental a change would be welcome. I thought about multiple task queues and work stealing as well, or a way to suspend a thread when it's blocked and put it back to the single task queue that we have today, but I am not sure if/how the latter could be done without coroutines. Let's discuss further and we could also put together some document with ideas.
@@ -368,7 +368,7 @@ Status DenseReader::dense_read() { | |||
// This is as far as we should go before implementing this properly in a task | |||
// graph, where the start and end of every piece of work can clearly be | |||
// identified. | |||
ThreadPool::Task compute_task; | |||
ThreadPool::SharedTask compute_task; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's easy to see that this is waited upon multiple times - but if it worked fine before, why is it necessary for it to be shared now?
Currently
ThreadPool::Task
is a typedef tostd:: future<Status>
. This PR:wait
for our async processes to go through our ThreadPool's wait method. In that way, we never use astd::future
'swait
method directly and we avoid possible deadlocks because only the ThreadPool'swait
will yield.SharedTask
, that is encapsulatingstd::shared_future
which allows multiple threads to wait on an async operation result to become available. This is needed for the upcoming work on parallelizing IO and compute operations in the codebase even further.This is heavily influenced from similar work by @Shelnutt2.
TYPE: IMPROVEMENT
DESC: Add classes for Task and SharedTask in threadpool