From 0e97516db7dc7dfc7f134d4e23cca4413f4a2200 Mon Sep 17 00:00:00 2001 From: LTLA Date: Mon, 26 Aug 2024 12:42:36 -0700 Subject: [PATCH] Switch to subpar for tatami::parallelize's implementation. This centralizes the definition of the parallelization mechanism so that it can be easily re-used in other (non-tatami) libraries. --- .github/workflows/doxygenate.yaml | 5 + .gitignore | 1 + CMakeLists.txt | 10 ++ README.md | 4 +- cmake/Config.cmake.in | 3 + docs/Doxyfile | 2 +- extern/CMakeLists.txt | 9 ++ include/tatami/dense/convert_to_dense.hpp | 6 +- .../sparse/convert_to_compressed_sparse.hpp | 18 ++-- .../sparse/convert_to_fragmented_sparse.hpp | 8 +- include/tatami/utils/parallelize.hpp | 81 ++-------------- tests/CMakeLists.txt | 49 +--------- tests/src/utils/custom_parallel.h | 47 --------- tests/src/utils/parallelize.cpp | 97 ++++++------------- 14 files changed, 89 insertions(+), 251 deletions(-) create mode 100644 extern/CMakeLists.txt delete mode 100644 tests/src/utils/custom_parallel.h diff --git a/.github/workflows/doxygenate.yaml b/.github/workflows/doxygenate.yaml index 5464fffc..58515ee7 100644 --- a/.github/workflows/doxygenate.yaml +++ b/.github/workflows/doxygenate.yaml @@ -16,6 +16,11 @@ jobs: with: args: -O docs/doxygen-awesome.css https://raw.githubusercontent.com/jothepro/doxygen-awesome-css/main/doxygen-awesome.css + - name: Add subpar tagfile + uses: wei/wget@v1 + with: + args: -O docs/subpar.tag https://ltla.github.io/subpar/subpar.tag + - name: Doxygen Action uses: mattnotmitt/doxygen-action@v1 with: diff --git a/.gitignore b/.gitignore index 3a204e95..ee8a406d 100644 --- a/.gitignore +++ b/.gitignore @@ -2,4 +2,5 @@ build/ *.swp docs/html docs/latex +docs/*.tag _downstream diff --git a/CMakeLists.txt b/CMakeLists.txt index f77ad4f5..d738a383 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -16,6 +16,16 @@ target_include_directories(tatami INTERFACE "$" ) +# Dependencies +option(KNNCOLLE_FETCH_EXTERN "Automatically fetch tatami's external dependencies." ON) +if(KNNCOLLE_FETCH_EXTERN) + add_subdirectory(extern) +else() + find_package(ltla_subpar 0.2.0 CONFIG REQUIRED) +endif() + +target_link_libraries(tatami INTERFACE ltla::subpar) + # Building the test-related machinery, if we are compiling this library directly. if(CMAKE_PROJECT_NAME STREQUAL PROJECT_NAME) option(TATAMI_TESTS "Build tatami's test suite." ON) diff --git a/README.md b/README.md index d2d23ea3..22302d77 100644 --- a/README.md +++ b/README.md @@ -213,8 +213,8 @@ With OpenMP, this looks like: ``` Users may also consider using the `tatami::parallelize()` function, which accepts a function with the range of jobs (in this case, rows) to be processed in each thread. -By default, this uses the standard `` library and so will work in environments without OpenMP. -If this is not sufficient, applications can change the parallelization scheme in all `tatami::parallelize()` calls by setting the `TATAMI_CUSTOM_PARALLEL` macro. +This automatically falls back to the standard `` library if OpenMP is not available. +Applications can also set the `TATAMI_CUSTOM_PARALLEL` macro to override the parallelization scheme in all `tatami::parallelize()` calls. ```cpp tatami::parallelize([&](int thread, int start, int length) -> void { diff --git a/cmake/Config.cmake.in b/cmake/Config.cmake.in index 9f917af9..de76c68b 100644 --- a/cmake/Config.cmake.in +++ b/cmake/Config.cmake.in @@ -1,3 +1,6 @@ @PACKAGE_INIT@ +include(CMakeFindDependencyMacro) +find_dependency(ltla_subpar 0.2.0 CONFIG REQUIRED) + include("${CMAKE_CURRENT_LIST_DIR}/tatami_tatamiTargets.cmake") diff --git a/docs/Doxyfile b/docs/Doxyfile index 0bbd88d9..6655667e 100644 --- a/docs/Doxyfile +++ b/docs/Doxyfile @@ -2327,7 +2327,7 @@ SKIP_FUNCTION_MACROS = YES # the path). If a tag file is not located in the directory in which doxygen is # run, you must also specify the path to the tagfile here. -TAGFILES = +TAGFILES = subpar.tag=https://ltla.github.io/subpar # When a file name is specified after GENERATE_TAGFILE, doxygen will create a # tag file that is based on the input files it reads. See section "Linking to diff --git a/extern/CMakeLists.txt b/extern/CMakeLists.txt new file mode 100644 index 00000000..824be9ff --- /dev/null +++ b/extern/CMakeLists.txt @@ -0,0 +1,9 @@ +include(FetchContent) + +FetchContent_Declare( + subpar + GIT_REPOSITORY https://github.com/LTLA/subpar + GIT_TAG master # ^3.0.0 +) + +FetchContent_MakeAvailable(subpar) diff --git a/include/tatami/dense/convert_to_dense.hpp b/include/tatami/dense/convert_to_dense.hpp index 9bca19a2..15e56b2c 100644 --- a/include/tatami/dense/convert_to_dense.hpp +++ b/include/tatami/dense/convert_to_dense.hpp @@ -41,7 +41,7 @@ void convert_to_dense(const Matrix* matrix, bool row_m if (row_major == pref_rows) { constexpr bool same_type = std::is_same::value; - parallelize([&](size_t, InputIndex_ start, InputIndex_ length) -> void { + parallelize([&](int, InputIndex_ start, InputIndex_ length) -> void { std::vector temp(same_type ? 0 : secondary); auto store_copy = store + static_cast(start) * secondary; // cast to size_t to avoid overflow. auto wrk = consecutive_extractor(matrix, pref_rows, start, length); @@ -66,7 +66,7 @@ void convert_to_dense(const Matrix* matrix, bool row_m // reduce false sharing across threads during writes, as locations // for simultaneous writes in the transposed matrix will be // separated by around 'secondary * length' elements. - parallelize([&](size_t, InputIndex_ start, InputIndex_ length) -> void { + parallelize([&](int, InputIndex_ start, InputIndex_ length) -> void { auto store_copy = store; auto wrk = consecutive_extractor(matrix, pref_rows, 0, primary, start, length); @@ -89,7 +89,7 @@ void convert_to_dense(const Matrix* matrix, bool row_m // Same logic as described for the sparse case; we iterate along the // preferred dimension but split into threads along the non-preferred // dimension to reduce false sharing. - parallelize([&](size_t, InputIndex_ start, InputIndex_ length) -> void { + parallelize([&](int, InputIndex_ start, InputIndex_ length) -> void { auto store_copy = store + static_cast(start) * primary; // cast to size_t to avoid overflow. auto wrk = consecutive_extractor(matrix, pref_rows, 0, primary, start, length); diff --git a/include/tatami/sparse/convert_to_compressed_sparse.hpp b/include/tatami/sparse/convert_to_compressed_sparse.hpp index ef33a491..93851cd8 100644 --- a/include/tatami/sparse/convert_to_compressed_sparse.hpp +++ b/include/tatami/sparse/convert_to_compressed_sparse.hpp @@ -30,7 +30,7 @@ void count_compressed_sparse_non_zeros_consistent(const tatami::Matrix void { + parallelize([&](int, Index_ start, Index_ length) -> void { auto wrk = consecutive_extractor(matrix, row, start, length, opt); for (Index_ x = 0; x < length; ++x) { auto range = wrk->fetch(NULL, NULL); @@ -39,7 +39,7 @@ void count_compressed_sparse_non_zeros_consistent(const tatami::Matrix void { + parallelize([&](int, Index_ start, Index_ length) -> void { std::vector buffer_v(secondary); auto wrk = consecutive_extractor(matrix, row, start, length); for (Index_ p = start, pe = start + length; p < pe; ++p) { @@ -70,7 +70,7 @@ void count_compressed_sparse_non_zeros_inconsistent(const tatami::Matrix void { + parallelize([&](int t, Index_ start, Index_ length) -> void { std::vector buffer_i(primary); auto wrk = consecutive_extractor(matrix, !row, start, length, opt); auto my_counts = (t > 0 ? nz_counts[t - 1].data() : output); @@ -87,7 +87,7 @@ void count_compressed_sparse_non_zeros_inconsistent(const tatami::Matrix void { + parallelize([&](int t, Index_ start, Index_ length) -> void { auto wrk = consecutive_extractor(matrix, !row, start, length); std::vector buffer_v(primary); auto my_counts = (t > 0 ? nz_counts[t - 1].data() : output); @@ -126,7 +126,7 @@ void fill_compressed_sparse_matrix_consistent( Options opt; opt.sparse_ordered_index = false; - parallelize([&](size_t, InputIndex_ start, InputIndex_ length) -> void { + parallelize([&](int, InputIndex_ start, InputIndex_ length) -> void { std::vector buffer_v(secondary); std::vector buffer_i(secondary); auto wrk = consecutive_extractor(matrix, row, start, length, opt); @@ -145,7 +145,7 @@ void fill_compressed_sparse_matrix_consistent( }, primary, threads); } else { - parallelize([&](size_t, InputIndex_ start, InputIndex_ length) -> void { + parallelize([&](int, InputIndex_ start, InputIndex_ length) -> void { std::vector buffer_v(secondary); auto wrk = consecutive_extractor(matrix, row, start, length); @@ -180,7 +180,7 @@ void fill_compressed_sparse_matrix_inconsistent( Options opt; opt.sparse_ordered_index = false; - parallelize([&](size_t, InputIndex_ start, InputIndex_ length) -> void { + parallelize([&](int, InputIndex_ start, InputIndex_ length) -> void { std::vector buffer_v(length); std::vector buffer_i(length); auto wrk = consecutive_extractor(matrix, !row, static_cast(0), secondary, start, length, opt); @@ -198,7 +198,7 @@ void fill_compressed_sparse_matrix_inconsistent( }, primary, threads); } else { - parallelize([&](size_t, InputIndex_ start, InputIndex_ length) -> void { + parallelize([&](int, InputIndex_ start, InputIndex_ length) -> void { std::vector buffer_v(length); auto wrk = consecutive_extractor(matrix, !row, static_cast(0), secondary, start, length); std::vector offset_copy(pointers + start, pointers + start + length); @@ -340,7 +340,7 @@ struct CompressedSparseContents { * The behavior of this function can be replicated by manually calling `count_compressed_sparse_non_zeros()` followed by `fill_compressed_sparse_contents()`. * This may be desirable for users who want to put the compressed sparse contents into pre-existing memory allocations. */ -template +template CompressedSparseContents retrieve_compressed_sparse_contents(const Matrix* matrix, bool row, bool two_pass, int threads = 1) { CompressedSparseContents output; auto& output_v = output.value; diff --git a/include/tatami/sparse/convert_to_fragmented_sparse.hpp b/include/tatami/sparse/convert_to_fragmented_sparse.hpp index 7483fe1a..00234de3 100644 --- a/include/tatami/sparse/convert_to_fragmented_sparse.hpp +++ b/include/tatami/sparse/convert_to_fragmented_sparse.hpp @@ -75,7 +75,7 @@ FragmentedSparseContents retrieve_fragmented_sparse_ if (row == matrix->prefer_rows()) { if (matrix->is_sparse()) { - parallelize([&](size_t, InputIndex_ start, InputIndex_ length) -> void { + parallelize([&](int, InputIndex_ start, InputIndex_ length) -> void { std::vector buffer_v(secondary); std::vector buffer_i(secondary); auto wrk = consecutive_extractor(matrix, row, start, length); @@ -97,7 +97,7 @@ FragmentedSparseContents retrieve_fragmented_sparse_ }, primary, threads); } else { - parallelize([&](size_t, InputIndex_ start, InputIndex_ length) -> void { + parallelize([&](int, InputIndex_ start, InputIndex_ length) -> void { std::vector buffer_v(secondary); auto wrk = consecutive_extractor(matrix, row, start, length); @@ -125,7 +125,7 @@ FragmentedSparseContents retrieve_fragmented_sparse_ // into the output buffers. if (matrix->is_sparse()) { - parallelize([&](size_t, InputIndex_ start, InputIndex_ length) -> void { + parallelize([&](int, InputIndex_ start, InputIndex_ length) -> void { std::vector buffer_v(primary); std::vector buffer_i(primary); auto wrk = consecutive_extractor(matrix, !row, static_cast(0), secondary, start, length); @@ -142,7 +142,7 @@ FragmentedSparseContents retrieve_fragmented_sparse_ }, primary, threads); } else { - parallelize([&](size_t, InputIndex_ start, InputIndex_ length) -> void { + parallelize([&](int, InputIndex_ start, InputIndex_ length) -> void { auto wrk = consecutive_extractor(matrix, !row, static_cast(0), secondary, start, length); std::vector buffer_v(length); diff --git a/include/tatami/utils/parallelize.hpp b/include/tatami/utils/parallelize.hpp index 576ffbb8..c5eec69c 100644 --- a/include/tatami/utils/parallelize.hpp +++ b/include/tatami/utils/parallelize.hpp @@ -5,11 +5,7 @@ #include #ifndef TATAMI_CUSTOM_PARALLEL -#ifndef _OPENMP -#include -#endif -#include -#include +#include "subpar/subpar.hpp" #endif /** @@ -21,13 +17,9 @@ namespace tatami { /** - * Apply a function to a set of tasks in parallel, usually for iterating over a `Matrix`. - * This can be done using: - * - * - OpenMP, if available and enabled by the compiler. - * - Using a custom parallelization scheme, by defining a `TATAMI_CUSTOM_PARALLEL` function-like macro. - * This should accept the `fun`, `tasks` and `threads` arguments as below. - * - ``, otherwise. + * Apply a function to a set of tasks in parallel, usually for iterating over a dimension of a `Matrix`. + * By default, this uses `subpar::parallelize()` internally but can be overridden by defining a `TATAMI_CUSTOM_PARALLEL` function-like macro. + * The macro should accept the `fun`, `tasks` and `threads` arguments as described below. * * @tparam parallel_ Whether the tasks should be run in parallel. * If `false`, no parallelization is performed and all tasks are run on the current thread. @@ -46,69 +38,16 @@ namespace tatami { * @param threads Number of threads. */ template -void parallelize(Function_ fun, Index_ tasks, size_t threads) { +void parallelize(Function_ fun, Index_ tasks, int threads) { if constexpr(parallel_) { - if (threads > 1) { -#ifndef TATAMI_CUSTOM_PARALLEL - Index_ worker_size = (tasks / threads) + (tasks % threads > 0); // Ceiling of an integer division. - threads = (tasks / worker_size) + (tasks % worker_size > 0); // Set the actual number of required threads. - std::vector errors(threads); - -#if defined(_OPENMP) - #pragma omp parallel for num_threads(threads) - for (size_t t = 0; t < threads; ++t) { - Index_ start = worker_size * t; // Will not overflow due to the above recomputation of 'threads'. - Index_ remaining = tasks - start; // Must be positive, as otherwise 'tasks % worker_size = 0' and the iteration wouldn't even get here. - - try { - fun(t, start, std::min(remaining, worker_size)); // Use 'remaining' to avoid potential overflow from computing 'end = start + worker_size'. - } catch (std::exception& e) { - errors[t] = e.what(); - } catch (...) { - errors[t] = "unknown error in thread " + std::to_string(t); - } - } - +#ifdef TATAMI_CUSTOM_PARALLEL + TATAMI_CUSTOM_PARALLEL(std::move(fun), tasks, threads); #else - Index_ first = 0; - std::vector workers; - workers.reserve(threads); - - for (size_t t = 0; t < threads && first < tasks; ++t) { - Index_ remaining = tasks - first; - Index_ len = std::min(remaining, worker_size); - workers.emplace_back([&fun,&errors](int t, Index_ first, Index_ len) -> void { - try { - fun(t, first, len); - } catch (std::exception& e) { - errors[t] = e.what(); - } catch (...) { - errors[t] = "unknown error in thread " + std::to_string(t); - } - }, t, first, len); - first += len; - } - - for (auto& wrk : workers) { - wrk.join(); - } -#endif - - for (const auto& e : errors) { - if (!e.empty()) { - throw std::runtime_error(e); - } - } - -#else - TATAMI_CUSTOM_PARALLEL(std::move(fun), tasks, threads); + subpar::parallelize(threads, tasks, std::move(fun)); #endif - return; - } + } else { + fun(0, 0, tasks); } - - fun(0, 0, tasks); - return; } } diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 353ed8b5..1d6b6ff0 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -126,48 +126,9 @@ add_executable( ) decorate_executable(utils_test) -# Test custom parallelization during apply. -macro(create_partest target) - add_executable( - ${target} - src/utils/parallelize.cpp - ) - - target_link_libraries( - ${target} - gtest_main - tatami - ) - - target_compile_options(${target} PRIVATE -Wall -Wextra -Wpedantic -Werror) - - if(DO_COVERAGE) - target_compile_options(${target} PRIVATE -O0 -g --coverage) - target_link_options(${target} PRIVATE --coverage) - endif() - - gtest_discover_tests(${target}) -endmacro() - -create_partest(cuspar_test) +add_executable( + cuspar_test + src/utils/parallelize.cpp +) +decorate_executable(cuspar_test) target_compile_definitions(cuspar_test PRIVATE CUSTOM_PARALLEL_TEST=1) - -find_package(OpenMP) -if(OpenMP_FOUND) - create_partest(omp_test) - target_link_libraries(omp_test OpenMP::OpenMP_CXX) - - create_isometric_unary_test(omp_isometric_unary_test) - target_link_libraries(omp_isometric_unary_test OpenMP::OpenMP_CXX) - - create_isometric_binary_test(omp_isometric_binary_test) - target_link_libraries(omp_isometric_binary_test OpenMP::OpenMP_CXX) - - add_executable( - omp_sparse_test - src/sparse/convert_to_compressed_sparse.cpp - src/sparse/CompressedSparseMatrix.cpp - src/sparse/FragmentedSparseMatrix.cpp - ) - decorate_executable(omp_sparse_test) -endif() diff --git a/tests/src/utils/custom_parallel.h b/tests/src/utils/custom_parallel.h deleted file mode 100644 index b89ace4f..00000000 --- a/tests/src/utils/custom_parallel.h +++ /dev/null @@ -1,47 +0,0 @@ -#ifndef CUSTOM_PARALLEL_H -#define CUSTOM_PARALLEL_H - -#include -#include -#include -#include - -template -static void custom_parallelize(Function_ f, size_t ntasks, size_t nworkers) { - size_t start = 0; - std::vector jobs; - jobs.reserve(nworkers); - size_t jobs_per_worker = std::ceil(static_cast(ntasks) / static_cast(nworkers)); - std::vector errors(nworkers); - - for (size_t w = 0; w < nworkers; ++w) { - size_t end = std::min(ntasks, start + jobs_per_worker); - if (start >= end) { - break; - } - jobs.emplace_back([&f,&errors](size_t t, size_t start, size_t len) -> void { - try { - f(t, start, len); - } catch (std::exception& e) { - errors[t] = e.what(); - } catch (...) { - errors[t] = "unknown error"; - } - }, w, start, end - start); - start += jobs_per_worker; - } - - for (auto& job : jobs) { - job.join(); - } - - for (auto& e : errors) { - if (!e.empty()) { - throw std::runtime_error(e); - } - } -} - -#define TATAMI_CUSTOM_PARALLEL custom_parallelize - -#endif diff --git a/tests/src/utils/parallelize.cpp b/tests/src/utils/parallelize.cpp index 81c8233c..e04e20c4 100644 --- a/tests/src/utils/parallelize.cpp +++ b/tests/src/utils/parallelize.cpp @@ -3,8 +3,21 @@ #include #ifdef CUSTOM_PARALLEL_TEST +#include + +template +void foo_parallel(Function_ fun, Index_ ntasks, int nthreads) { + Index_ tasks_per_thread = ntasks / nthreads + (ntasks % nthreads > 0); + Index_ start = 0; + for (int t = 0; t < nthreads; ++t) { + Index_ length = std::min(tasks_per_thread, ntasks - start); + fun(t, start, length); + start += length; + } +} + // Put this before any tatami imports. -#include "custom_parallel.h" +#define TATAMI_CUSTOM_PARALLEL foo_parallel #endif #include "tatami/utils/parallelize.hpp" @@ -12,78 +25,22 @@ TEST(ParallelizeTest, BasicCheck) { std::vector start(3, -1), length(3, -1); - tatami::parallelize([&](size_t t, int s, int l) -> void { + tatami::parallelize([&](int t, int s, int l) -> void { start[t] = s; length[t] = l; }, 100, 3); - - EXPECT_EQ(start[0], 0); - EXPECT_EQ(start[1], 34); - EXPECT_EQ(start[2], 68); - - EXPECT_EQ(length[0], 34); - EXPECT_EQ(length[1], 34); - EXPECT_EQ(length[2], 32); -} - -TEST(ParallelizeTest, TypeCheck1) { - // Checking that the interval calculation is done correctly - // when we're close to the overflow boundary. - std::vector start(2, -1), length(2, -1); - tatami::parallelize([&](size_t t, unsigned char s, unsigned char l) -> void { + EXPECT_EQ(start.front(), 0); + int last = length.front(); + for (int t = 1; t < 3; ++t) { + EXPECT_EQ(last, start[t]); + last += length[t]; + } + EXPECT_EQ(last, 100); + + tatami::parallelize([&](int t, int s, int l) -> void { start[t] = s; length[t] = l; - }, static_cast(255), 2); - - EXPECT_EQ(start[0], 0); - EXPECT_EQ(start[1], 128); - - EXPECT_EQ(length[0], 128); - EXPECT_EQ(length[1], 127); -} - -TEST(ParallelizeTest, TypeCheck2) { - // No overflow in the number of jobs. - std::vector start(1000, -1), length(1000, -1); - tatami::parallelize([&](size_t t, unsigned char s, unsigned char l) -> void { - start[t] = s; - length[t] = l; - }, static_cast(2), 1000); - - EXPECT_EQ(start[0], 0); - EXPECT_EQ(start[1], 1); - - EXPECT_EQ(length[0], 1); - EXPECT_EQ(length[1], 1); - - start[0] = -1; - length[0] = -1; - start[1] = -1; - length[1] = -1; - EXPECT_EQ(start, std::vector(1000, -1)); - EXPECT_EQ(length, std::vector(1000, -1)); -} - -TEST(ParallelizeTest, ErrorChecks) { - EXPECT_ANY_THROW({ - try { - tatami::parallelize([&](size_t, int, int) -> void { - throw std::runtime_error("WHEE"); - }, 255, 2); - } catch (std::exception& e) { - EXPECT_TRUE(std::string(e.what()).find("WHEE") != std::string::npos); - throw; - } - }); - - EXPECT_ANY_THROW({ - try { - tatami::parallelize([&](size_t, int, int) -> void { - throw 123; - }, 255, 2); - } catch (std::exception& e) { - EXPECT_TRUE(std::string(e.what()).find("unknown error") != std::string::npos); - throw; - } - }); + }, 100, 3); + EXPECT_EQ(start.front(), 0); + EXPECT_EQ(length.front(), 100); }