Skip to content

Commit

Permalink
Switch to subpar for tatami::parallelize's implementation.
Browse files Browse the repository at this point in the history
This centralizes the definition of the parallelization mechanism so that
it can be easily re-used in other (non-tatami) libraries.
  • Loading branch information
LTLA committed Aug 26, 2024
1 parent 80f7ba4 commit 0e97516
Show file tree
Hide file tree
Showing 14 changed files with 89 additions and 251 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/doxygenate.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ jobs:
with:
args: -O docs/doxygen-awesome.css https://raw.githubusercontent.com/jothepro/doxygen-awesome-css/main/doxygen-awesome.css

- name: Add subpar tagfile
uses: wei/wget@v1
with:
args: -O docs/subpar.tag https://ltla.github.io/subpar/subpar.tag

- name: Doxygen Action
uses: mattnotmitt/doxygen-action@v1
with:
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ build/
*.swp
docs/html
docs/latex
docs/*.tag
_downstream
10 changes: 10 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,16 @@ target_include_directories(tatami INTERFACE
"$<INSTALL_INTERFACE:${CMAKE_INSTALL_INCLUDEDIR}/tatami_tatami>"
)

# Dependencies
option(KNNCOLLE_FETCH_EXTERN "Automatically fetch tatami's external dependencies." ON)
if(KNNCOLLE_FETCH_EXTERN)
add_subdirectory(extern)
else()
find_package(ltla_subpar 0.2.0 CONFIG REQUIRED)
endif()

target_link_libraries(tatami INTERFACE ltla::subpar)

# Building the test-related machinery, if we are compiling this library directly.
if(CMAKE_PROJECT_NAME STREQUAL PROJECT_NAME)
option(TATAMI_TESTS "Build tatami's test suite." ON)
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,8 @@ With OpenMP, this looks like:
```

Users may also consider using the `tatami::parallelize()` function, which accepts a function with the range of jobs (in this case, rows) to be processed in each thread.
By default, this uses the standard `<thread>` library and so will work in environments without OpenMP.
If this is not sufficient, applications can change the parallelization scheme in all `tatami::parallelize()` calls by setting the `TATAMI_CUSTOM_PARALLEL` macro.
This automatically falls back to the standard `<thread>` library if OpenMP is not available.
Applications can also set the `TATAMI_CUSTOM_PARALLEL` macro to override the parallelization scheme in all `tatami::parallelize()` calls.

```cpp
tatami::parallelize([&](int thread, int start, int length) -> void {
Expand Down
3 changes: 3 additions & 0 deletions cmake/Config.cmake.in
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
@PACKAGE_INIT@

include(CMakeFindDependencyMacro)
find_dependency(ltla_subpar 0.2.0 CONFIG REQUIRED)

include("${CMAKE_CURRENT_LIST_DIR}/tatami_tatamiTargets.cmake")
2 changes: 1 addition & 1 deletion docs/Doxyfile
Original file line number Diff line number Diff line change
Expand Up @@ -2327,7 +2327,7 @@ SKIP_FUNCTION_MACROS = YES
# the path). If a tag file is not located in the directory in which doxygen is
# run, you must also specify the path to the tagfile here.

TAGFILES =
TAGFILES = subpar.tag=https://ltla.github.io/subpar

# When a file name is specified after GENERATE_TAGFILE, doxygen will create a
# tag file that is based on the input files it reads. See section "Linking to
Expand Down
9 changes: 9 additions & 0 deletions extern/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
include(FetchContent)

FetchContent_Declare(
subpar
GIT_REPOSITORY https://github.com/LTLA/subpar
GIT_TAG master # ^3.0.0
)

FetchContent_MakeAvailable(subpar)
6 changes: 3 additions & 3 deletions include/tatami/dense/convert_to_dense.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ void convert_to_dense(const Matrix<InputValue_, InputIndex_>* matrix, bool row_m

if (row_major == pref_rows) {
constexpr bool same_type = std::is_same<InputValue_, StoredValue_>::value;
parallelize([&](size_t, InputIndex_ start, InputIndex_ length) -> void {
parallelize([&](int, InputIndex_ start, InputIndex_ length) -> void {
std::vector<InputValue_> temp(same_type ? 0 : secondary);
auto store_copy = store + static_cast<size_t>(start) * secondary; // cast to size_t to avoid overflow.
auto wrk = consecutive_extractor<false>(matrix, pref_rows, start, length);
Expand All @@ -66,7 +66,7 @@ void convert_to_dense(const Matrix<InputValue_, InputIndex_>* matrix, bool row_m
// reduce false sharing across threads during writes, as locations
// for simultaneous writes in the transposed matrix will be
// separated by around 'secondary * length' elements.
parallelize([&](size_t, InputIndex_ start, InputIndex_ length) -> void {
parallelize([&](int, InputIndex_ start, InputIndex_ length) -> void {
auto store_copy = store;

auto wrk = consecutive_extractor<true, InputValue_, InputIndex_>(matrix, pref_rows, 0, primary, start, length);
Expand All @@ -89,7 +89,7 @@ void convert_to_dense(const Matrix<InputValue_, InputIndex_>* matrix, bool row_m
// Same logic as described for the sparse case; we iterate along the
// preferred dimension but split into threads along the non-preferred
// dimension to reduce false sharing.
parallelize([&](size_t, InputIndex_ start, InputIndex_ length) -> void {
parallelize([&](int, InputIndex_ start, InputIndex_ length) -> void {
auto store_copy = store + static_cast<size_t>(start) * primary; // cast to size_t to avoid overflow.

auto wrk = consecutive_extractor<false, InputValue_, InputIndex_>(matrix, pref_rows, 0, primary, start, length);
Expand Down
18 changes: 9 additions & 9 deletions include/tatami/sparse/convert_to_compressed_sparse.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ void count_compressed_sparse_non_zeros_consistent(const tatami::Matrix<Value_, I
opt.sparse_extract_index = false;
opt.sparse_ordered_index = false;

parallelize([&](size_t, Index_ start, Index_ length) -> void {
parallelize([&](int, Index_ start, Index_ length) -> void {
auto wrk = consecutive_extractor<true>(matrix, row, start, length, opt);
for (Index_ x = 0; x < length; ++x) {
auto range = wrk->fetch(NULL, NULL);
Expand All @@ -39,7 +39,7 @@ void count_compressed_sparse_non_zeros_consistent(const tatami::Matrix<Value_, I
}, primary, threads);

} else {
parallelize([&](size_t, Index_ start, Index_ length) -> void {
parallelize([&](int, Index_ start, Index_ length) -> void {
std::vector<Value_> buffer_v(secondary);
auto wrk = consecutive_extractor<false>(matrix, row, start, length);
for (Index_ p = start, pe = start + length; p < pe; ++p) {
Expand Down Expand Up @@ -70,7 +70,7 @@ void count_compressed_sparse_non_zeros_inconsistent(const tatami::Matrix<Value_,
opt.sparse_extract_value = false;
opt.sparse_ordered_index = false;

parallelize([&](size_t t, Index_ start, Index_ length) -> void {
parallelize([&](int t, Index_ start, Index_ length) -> void {
std::vector<Index_> buffer_i(primary);
auto wrk = consecutive_extractor<true>(matrix, !row, start, length, opt);
auto my_counts = (t > 0 ? nz_counts[t - 1].data() : output);
Expand All @@ -87,7 +87,7 @@ void count_compressed_sparse_non_zeros_inconsistent(const tatami::Matrix<Value_,
}, secondary, threads);

} else {
parallelize([&](size_t t, Index_ start, Index_ length) -> void {
parallelize([&](int t, Index_ start, Index_ length) -> void {
auto wrk = consecutive_extractor<false>(matrix, !row, start, length);
std::vector<Value_> buffer_v(primary);
auto my_counts = (t > 0 ? nz_counts[t - 1].data() : output);
Expand Down Expand Up @@ -126,7 +126,7 @@ void fill_compressed_sparse_matrix_consistent(
Options opt;
opt.sparse_ordered_index = false;

parallelize([&](size_t, InputIndex_ start, InputIndex_ length) -> void {
parallelize([&](int, InputIndex_ start, InputIndex_ length) -> void {
std::vector<InputValue_> buffer_v(secondary);
std::vector<InputIndex_> buffer_i(secondary);
auto wrk = consecutive_extractor<true>(matrix, row, start, length, opt);
Expand All @@ -145,7 +145,7 @@ void fill_compressed_sparse_matrix_consistent(
}, primary, threads);

} else {
parallelize([&](size_t, InputIndex_ start, InputIndex_ length) -> void {
parallelize([&](int, InputIndex_ start, InputIndex_ length) -> void {
std::vector<InputValue_> buffer_v(secondary);
auto wrk = consecutive_extractor<false>(matrix, row, start, length);

Expand Down Expand Up @@ -180,7 +180,7 @@ void fill_compressed_sparse_matrix_inconsistent(
Options opt;
opt.sparse_ordered_index = false;

parallelize([&](size_t, InputIndex_ start, InputIndex_ length) -> void {
parallelize([&](int, InputIndex_ start, InputIndex_ length) -> void {
std::vector<InputValue_> buffer_v(length);
std::vector<InputIndex_> buffer_i(length);
auto wrk = consecutive_extractor<true>(matrix, !row, static_cast<InputIndex_>(0), secondary, start, length, opt);
Expand All @@ -198,7 +198,7 @@ void fill_compressed_sparse_matrix_inconsistent(
}, primary, threads);

} else {
parallelize([&](size_t, InputIndex_ start, InputIndex_ length) -> void {
parallelize([&](int, InputIndex_ start, InputIndex_ length) -> void {
std::vector<InputValue_> buffer_v(length);
auto wrk = consecutive_extractor<false>(matrix, !row, static_cast<InputIndex_>(0), secondary, start, length);
std::vector<Pointer_> offset_copy(pointers + start, pointers + start + length);
Expand Down Expand Up @@ -340,7 +340,7 @@ struct CompressedSparseContents {
* The behavior of this function can be replicated by manually calling `count_compressed_sparse_non_zeros()` followed by `fill_compressed_sparse_contents()`.
* This may be desirable for users who want to put the compressed sparse contents into pre-existing memory allocations.
*/
template<typename StoredValue_, typename StoredIndex_, typename StoredPointer_ = size_t, typename InputValue_, typename InputIndex_>
template<typename StoredValue_, typename StoredIndex_, typename StoredPointer_ = int, typename InputValue_, typename InputIndex_>
CompressedSparseContents<StoredValue_, StoredIndex_, StoredPointer_> retrieve_compressed_sparse_contents(const Matrix<InputValue_, InputIndex_>* matrix, bool row, bool two_pass, int threads = 1) {
CompressedSparseContents<StoredValue_, StoredIndex_, StoredPointer_> output;
auto& output_v = output.value;
Expand Down
8 changes: 4 additions & 4 deletions include/tatami/sparse/convert_to_fragmented_sparse.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ FragmentedSparseContents<StoredValue_, StoredIndex_> retrieve_fragmented_sparse_

if (row == matrix->prefer_rows()) {
if (matrix->is_sparse()) {
parallelize([&](size_t, InputIndex_ start, InputIndex_ length) -> void {
parallelize([&](int, InputIndex_ start, InputIndex_ length) -> void {
std::vector<InputValue_> buffer_v(secondary);
std::vector<InputIndex_> buffer_i(secondary);
auto wrk = consecutive_extractor<true>(matrix, row, start, length);
Expand All @@ -97,7 +97,7 @@ FragmentedSparseContents<StoredValue_, StoredIndex_> retrieve_fragmented_sparse_
}, primary, threads);

} else {
parallelize([&](size_t, InputIndex_ start, InputIndex_ length) -> void {
parallelize([&](int, InputIndex_ start, InputIndex_ length) -> void {
std::vector<InputValue_> buffer_v(secondary);
auto wrk = consecutive_extractor<false>(matrix, row, start, length);

Expand Down Expand Up @@ -125,7 +125,7 @@ FragmentedSparseContents<StoredValue_, StoredIndex_> retrieve_fragmented_sparse_
// into the output buffers.

if (matrix->is_sparse()) {
parallelize([&](size_t, InputIndex_ start, InputIndex_ length) -> void {
parallelize([&](int, InputIndex_ start, InputIndex_ length) -> void {
std::vector<InputValue_> buffer_v(primary);
std::vector<InputIndex_> buffer_i(primary);
auto wrk = consecutive_extractor<true>(matrix, !row, static_cast<InputIndex_>(0), secondary, start, length);
Expand All @@ -142,7 +142,7 @@ FragmentedSparseContents<StoredValue_, StoredIndex_> retrieve_fragmented_sparse_
}, primary, threads);

} else {
parallelize([&](size_t, InputIndex_ start, InputIndex_ length) -> void {
parallelize([&](int, InputIndex_ start, InputIndex_ length) -> void {
auto wrk = consecutive_extractor<false>(matrix, !row, static_cast<InputIndex_>(0), secondary, start, length);
std::vector<InputValue_> buffer_v(length);

Expand Down
81 changes: 10 additions & 71 deletions include/tatami/utils/parallelize.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,7 @@
#include <cmath>

#ifndef TATAMI_CUSTOM_PARALLEL
#ifndef _OPENMP
#include <thread>
#endif
#include <string>
#include <stdexcept>
#include "subpar/subpar.hpp"
#endif

/**
Expand All @@ -21,13 +17,9 @@
namespace tatami {

/**
* Apply a function to a set of tasks in parallel, usually for iterating over a `Matrix`.
* This can be done using:
*
* - OpenMP, if available and enabled by the compiler.
* - Using a custom parallelization scheme, by defining a `TATAMI_CUSTOM_PARALLEL` function-like macro.
* This should accept the `fun`, `tasks` and `threads` arguments as below.
* - `<thread>`, otherwise.
* Apply a function to a set of tasks in parallel, usually for iterating over a dimension of a `Matrix`.
* By default, this uses `subpar::parallelize()` internally but can be overridden by defining a `TATAMI_CUSTOM_PARALLEL` function-like macro.
* The macro should accept the `fun`, `tasks` and `threads` arguments as described below.
*
* @tparam parallel_ Whether the tasks should be run in parallel.
* If `false`, no parallelization is performed and all tasks are run on the current thread.
Expand All @@ -46,69 +38,16 @@ namespace tatami {
* @param threads Number of threads.
*/
template<bool parallel_ = true, class Function_, typename Index_>
void parallelize(Function_ fun, Index_ tasks, size_t threads) {
void parallelize(Function_ fun, Index_ tasks, int threads) {
if constexpr(parallel_) {
if (threads > 1) {
#ifndef TATAMI_CUSTOM_PARALLEL
Index_ worker_size = (tasks / threads) + (tasks % threads > 0); // Ceiling of an integer division.
threads = (tasks / worker_size) + (tasks % worker_size > 0); // Set the actual number of required threads.
std::vector<std::string> errors(threads);

#if defined(_OPENMP)
#pragma omp parallel for num_threads(threads)
for (size_t t = 0; t < threads; ++t) {
Index_ start = worker_size * t; // Will not overflow due to the above recomputation of 'threads'.
Index_ remaining = tasks - start; // Must be positive, as otherwise 'tasks % worker_size = 0' and the iteration wouldn't even get here.

try {
fun(t, start, std::min(remaining, worker_size)); // Use 'remaining' to avoid potential overflow from computing 'end = start + worker_size'.
} catch (std::exception& e) {
errors[t] = e.what();
} catch (...) {
errors[t] = "unknown error in thread " + std::to_string(t);
}
}

#ifdef TATAMI_CUSTOM_PARALLEL
TATAMI_CUSTOM_PARALLEL(std::move(fun), tasks, threads);
#else
Index_ first = 0;
std::vector<std::thread> workers;
workers.reserve(threads);

for (size_t t = 0; t < threads && first < tasks; ++t) {
Index_ remaining = tasks - first;
Index_ len = std::min(remaining, worker_size);
workers.emplace_back([&fun,&errors](int t, Index_ first, Index_ len) -> void {
try {
fun(t, first, len);
} catch (std::exception& e) {
errors[t] = e.what();
} catch (...) {
errors[t] = "unknown error in thread " + std::to_string(t);
}
}, t, first, len);
first += len;
}

for (auto& wrk : workers) {
wrk.join();
}
#endif

for (const auto& e : errors) {
if (!e.empty()) {
throw std::runtime_error(e);
}
}

#else
TATAMI_CUSTOM_PARALLEL(std::move(fun), tasks, threads);
subpar::parallelize(threads, tasks, std::move(fun));
#endif
return;
}
} else {
fun(0, 0, tasks);
}

fun(0, 0, tasks);
return;
}

}
Expand Down
49 changes: 5 additions & 44 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -126,48 +126,9 @@ add_executable(
)
decorate_executable(utils_test)

# Test custom parallelization during apply.
macro(create_partest target)
add_executable(
${target}
src/utils/parallelize.cpp
)

target_link_libraries(
${target}
gtest_main
tatami
)

target_compile_options(${target} PRIVATE -Wall -Wextra -Wpedantic -Werror)

if(DO_COVERAGE)
target_compile_options(${target} PRIVATE -O0 -g --coverage)
target_link_options(${target} PRIVATE --coverage)
endif()

gtest_discover_tests(${target})
endmacro()

create_partest(cuspar_test)
add_executable(
cuspar_test
src/utils/parallelize.cpp
)
decorate_executable(cuspar_test)
target_compile_definitions(cuspar_test PRIVATE CUSTOM_PARALLEL_TEST=1)

find_package(OpenMP)
if(OpenMP_FOUND)
create_partest(omp_test)
target_link_libraries(omp_test OpenMP::OpenMP_CXX)

create_isometric_unary_test(omp_isometric_unary_test)
target_link_libraries(omp_isometric_unary_test OpenMP::OpenMP_CXX)

create_isometric_binary_test(omp_isometric_binary_test)
target_link_libraries(omp_isometric_binary_test OpenMP::OpenMP_CXX)

add_executable(
omp_sparse_test
src/sparse/convert_to_compressed_sparse.cpp
src/sparse/CompressedSparseMatrix.cpp
src/sparse/FragmentedSparseMatrix.cpp
)
decorate_executable(omp_sparse_test)
endif()
Loading

0 comments on commit 0e97516

Please sign in to comment.