Skip to content

Commit

Permalink
Run copy file tasks with lower priority
Browse files Browse the repository at this point in the history
  • Loading branch information
Tom-Newton committed Dec 2, 2024
1 parent ae12226 commit 6eff633
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 7 deletions.
7 changes: 6 additions & 1 deletion cpp/src/arrow/filesystem/filesystem.cc
Original file line number Diff line number Diff line change
Expand Up @@ -653,8 +653,13 @@ Status CopyFiles(const std::vector<FileLocator>& sources,
return destination->CloseAsync();
};

// Spawn copy_one_file less urgently than default, so that background_writes are done
// with higher priority. Otherwise copy_one_file will keep buffering more data in memory
// without giving the background_writes any chance to upload the data and drop it from
// memory. Therefore, without this large copies would cause OOMs.
TaskHints hints{10};
auto future = ::arrow::internal::OptionalParallelForAsync(
use_threads, sources, std::move(copy_one_file), io_context.executor());
use_threads, sources, std::move(copy_one_file), io_context.executor(), hints);

// Wait for all the copy_one_file instances to complete.
ARROW_ASSIGN_OR_RAISE(auto copy_close_async_future, future.result());
Expand Down
14 changes: 8 additions & 6 deletions cpp/src/arrow/util/parallel.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,13 @@ Status ParallelFor(int num_tasks, FUNCTION&& func,

template <class FUNCTION, typename T,
typename R = typename internal::call_traits::return_type<FUNCTION>::ValueType>
Future<std::vector<R>> ParallelForAsync(
std::vector<T> inputs, FUNCTION&& func,
Executor* executor = internal::GetCpuThreadPool()) {
Future<std::vector<R>> ParallelForAsync(std::vector<T> inputs, FUNCTION&& func,
Executor* executor = internal::GetCpuThreadPool(),
TaskHints hints = TaskHints{}) {
std::vector<Future<R>> futures(inputs.size());
for (size_t i = 0; i < inputs.size(); ++i) {
ARROW_ASSIGN_OR_RAISE(futures[i], executor->Submit(func, i, std::move(inputs[i])));
ARROW_ASSIGN_OR_RAISE(futures[i],
executor->Submit(hints, func, i, std::move(inputs[i])));
}
return All(std::move(futures))
.Then([](const std::vector<Result<R>>& results) -> Result<std::vector<R>> {
Expand Down Expand Up @@ -86,9 +87,10 @@ template <class FUNCTION, typename T,
typename R = typename internal::call_traits::return_type<FUNCTION>::ValueType>
Future<std::vector<R>> OptionalParallelForAsync(
bool use_threads, std::vector<T> inputs, FUNCTION&& func,
Executor* executor = internal::GetCpuThreadPool()) {
Executor* executor = internal::GetCpuThreadPool(), TaskHints hints = TaskHints{}) {
if (use_threads) {
return ParallelForAsync(std::move(inputs), std::forward<FUNCTION>(func), executor);
return ParallelForAsync(std::move(inputs), std::forward<FUNCTION>(func), executor,
hints);
} else {
std::vector<R> result(inputs.size());
for (size_t i = 0; i < inputs.size(); ++i) {
Expand Down

0 comments on commit 6eff633

Please sign in to comment.