diff --git a/include/tatami_stats/grouped_sums.hpp b/include/tatami_stats/grouped_sums.hpp index ab7853b..8d5d73f 100644 --- a/include/tatami_stats/grouped_sums.hpp +++ b/include/tatami_stats/grouped_sums.hpp @@ -95,21 +95,21 @@ void apply(bool row, const tatami::Matrix* p, const Group_* grou }, dim, sopt.num_threads); } else { - for (size_t g = 0; g < num_groups; ++g) { - std::fill(output[g], output[g] + dim, static_cast(0)); - } - // Order doesn't affect numerical precision of the outcome, as // addition order for each target vector is already well-defined // for a running calculation. tatami::Options opt; opt.sparse_ordered_index = false; - tatami::parallelize([&](int, Index_ start, Index_ len) -> void { + tatami::parallelize([&](size_t thread, Index_ start, Index_ len) -> void { std::vector > runners; runners.reserve(num_groups); + std::vector > local_output; + local_output.reserve(num_groups); + for (size_t g = 0; g < num_groups; ++g) { - runners.emplace_back(len, output[g] + start, sopt.skip_nan, start); + local_output.emplace_back(thread, start, len, output[g]); + runners.emplace_back(len, local_output.back().data(), sopt.skip_nan, start); } auto ext = tatami::consecutive_extractor(p, !row, 0, otherdim, start, len, opt); @@ -120,6 +120,10 @@ void apply(bool row, const tatami::Matrix* p, const Group_* grou auto range = ext->fetch(xbuffer.data(), ibuffer.data()); runners[group[i]].add(range.value, range.index, range.number); } + + for (size_t g = 0; g < num_groups; ++g) { + local_output[g].transfer(); + } }, dim, sopt.num_threads); } @@ -154,15 +158,15 @@ void apply(bool row, const tatami::Matrix* p, const Group_* grou }, dim, sopt.num_threads); } else { - for (size_t g = 0; g < num_groups; ++g) { - std::fill(output[g], output[g] + dim, static_cast(0)); - } - - tatami::parallelize([&](int, Index_ start, Index_ len) -> void { + tatami::parallelize([&](size_t thread, Index_ start, Index_ len) -> void { std::vector > runners; runners.reserve(num_groups); + std::vector > local_output; + local_output.reserve(num_groups); + for (size_t g = 0; g < num_groups; ++g) { - runners.emplace_back(len, output[g] + start, sopt.skip_nan); + local_output.emplace_back(thread, start, len, output[g]); + runners.emplace_back(len, local_output.back().data(), sopt.skip_nan); } std::vector xbuffer(len); @@ -172,6 +176,10 @@ void apply(bool row, const tatami::Matrix* p, const Group_* grou auto ptr = ext->fetch(xbuffer.data()); runners[group[i]].add(ptr); } + + for (size_t g = 0; g < num_groups; ++g) { + local_output[g].transfer(); + } }, dim, sopt.num_threads); } } diff --git a/include/tatami_stats/ranges.hpp b/include/tatami_stats/ranges.hpp index 69e24d5..04dcc70 100644 --- a/include/tatami_stats/ranges.hpp +++ b/include/tatami_stats/ranges.hpp @@ -2,6 +2,7 @@ #define TATAMI_STATS_RANGES_HPP #include "tatami/tatami.hpp" +#include "utils.hpp" #include #include @@ -353,12 +354,15 @@ void apply(bool row, const tatami::Matrix* p, Output_* min_out, }, dim, ropt.num_threads); } else { - tatami::parallelize([&](size_t, Index_ s, Index_ l) { + tatami::parallelize([&](size_t thread, Index_ s, Index_ l) { auto ext = tatami::consecutive_extractor(p, !row, 0, otherdim, s, l, opt); std::vector vbuffer(l); std::vector ibuffer(l); - ranges::RunningSparse runmin(l, (store_min ? min_out + s : NULL), ropt.skip_nan, s); - ranges::RunningSparse runmax(l, (store_max ? max_out + s : NULL), ropt.skip_nan, s); + + auto local_min = (store_min ? LocalOutputBuffer(thread, s, l, min_out) : LocalOutputBuffer()); + auto local_max = (store_max ? LocalOutputBuffer(thread, s, l, max_out) : LocalOutputBuffer()); + ranges::RunningSparse runmin(l, local_min.data(), ropt.skip_nan, s); + ranges::RunningSparse runmax(l, local_max.data(), ropt.skip_nan, s); for (Index_ x = 0; x < otherdim; ++x) { auto out = ext->fetch(vbuffer.data(), ibuffer.data()); @@ -372,9 +376,11 @@ void apply(bool row, const tatami::Matrix* p, Output_* min_out, if (store_min) { runmin.finish(); + local_min.transfer(); } if (store_max) { runmax.finish(); + local_max.transfer(); } }, dim, ropt.num_threads); } @@ -396,11 +402,14 @@ void apply(bool row, const tatami::Matrix* p, Output_* min_out, }, dim, ropt.num_threads); } else { - tatami::parallelize([&](size_t, Index_ s, Index_ l) { + tatami::parallelize([&](size_t thread, Index_ s, Index_ l) { auto ext = tatami::consecutive_extractor(p, !row, 0, otherdim, s, l); std::vector buffer(l); - ranges::RunningDense runmin(l, (store_min ? min_out + s : NULL), ropt.skip_nan); - ranges::RunningDense runmax(l, (store_max ? max_out + s : NULL), ropt.skip_nan); + + auto local_min = (store_min ? LocalOutputBuffer(thread, s, l, min_out) : LocalOutputBuffer()); + auto local_max = (store_max ? LocalOutputBuffer(thread, s, l, max_out) : LocalOutputBuffer()); + ranges::RunningDense runmin(l, local_min.data(), ropt.skip_nan); + ranges::RunningDense runmax(l, local_max.data(), ropt.skip_nan); for (Index_ x = 0; x < otherdim; ++x) { auto ptr = ext->fetch(buffer.data()); @@ -414,9 +423,11 @@ void apply(bool row, const tatami::Matrix* p, Output_* min_out, if (store_min) { runmin.finish(); + local_min.transfer(); } if (store_max) { runmax.finish(); + local_max.transfer(); } }, dim, ropt.num_threads); } diff --git a/include/tatami_stats/sums.hpp b/include/tatami_stats/sums.hpp index 1ba5448..b6ab8c0 100644 --- a/include/tatami_stats/sums.hpp +++ b/include/tatami_stats/sums.hpp @@ -2,6 +2,7 @@ #define TATAMI_STATS__SUMS_HPP #include "tatami/tatami.hpp" +#include "utils.hpp" #include #include @@ -210,18 +211,21 @@ void apply(bool row, const tatami::Matrix* p, Output_* output, c } else { tatami::Options opt; opt.sparse_ordered_index = false; - std::fill(output, output + dim, static_cast(0)); - tatami::parallelize([&](size_t, Index_ s, Index_ l) { + tatami::parallelize([&](size_t thread, Index_ s, Index_ l) { auto ext = tatami::consecutive_extractor(p, !row, 0, otherdim, s, l, opt); std::vector vbuffer(l); std::vector ibuffer(l); - sums::RunningSparse runner(l, output + s, sopt.skip_nan, s); + + LocalOutputBuffer local_output(thread, s, l, output); + sums::RunningSparse runner(l, local_output.data(), sopt.skip_nan, s); for (Index_ x = 0; x < otherdim; ++x) { auto out = ext->fetch(vbuffer.data(), ibuffer.data()); runner.add(out.value, out.index, out.number); } + + local_output.transfer(); }, dim, sopt.num_threads); } @@ -237,17 +241,19 @@ void apply(bool row, const tatami::Matrix* p, Output_* output, c }, dim, sopt.num_threads); } else { - std::fill(output, output + dim, static_cast(0)); - - tatami::parallelize([&](size_t, Index_ s, Index_ l) { + tatami::parallelize([&](size_t thread, Index_ s, Index_ l) { auto ext = tatami::consecutive_extractor(p, !row, 0, otherdim, s, l); std::vector buffer(l); - sums::RunningDense runner(l, output + s, sopt.skip_nan); + + LocalOutputBuffer local_output(thread, s, l, output); + sums::RunningDense runner(l, local_output.data(), sopt.skip_nan); for (Index_ x = 0; x < otherdim; ++x) { auto out = ext->fetch(buffer.data()); runner.add(out); } + + local_output.transfer(); }, dim, sopt.num_threads); } } diff --git a/include/tatami_stats/utils.hpp b/include/tatami_stats/utils.hpp index d0011eb..977e14a 100644 --- a/include/tatami_stats/utils.hpp +++ b/include/tatami_stats/utils.hpp @@ -56,6 +56,65 @@ std::vector tabulate_groups(const Group_* group, Size_ n) { return group_sizes; } +/** + * @brief Local output buffer for running calculations. + * + * A typical parallelization scenario involves dividing the set of target vectors into contiguous blocks, where each thread operates on a block at a time. + * However, in running calculations, an entire block's statistics are updated when its corresponding thread processes an observed vector. + * If these statistics are stored in a global buffer, false sharing at the boundaries of the blocks can result in performance degradation. + * + * To avoid this, the `LocalOutputBuffer` class provides thread-local storage for output statistics. + * Once the calculations are finished per thread, callers should use `transfer()` to transfer the local statistics to the global buffer. + * The exception is that of the first thread, which is allowed to directly write to the global output buffer. + * + * @tparam Output_ Type of the result. + */ +template +class LocalOutputBuffer { +public: + /** + * @tparam Index_ Type of the start index and length. + * @param thread Identity of the thread, starting from zero to the total number of threads. + * @param start Index of the first target vector in the contiguous block for this thread. + * @param length Number of target vectors in the contiguous block for this thread. + * @param[out] output Pointer to the global output buffer. + */ + template + LocalOutputBuffer(size_t thread, Index_ start, Index_ length, Output_* output) : my_output(output + start), use_local(thread > 0), my_buffer(use_local ? length : 0) { + if (!use_local) { + // Setting to zero to match the initial behavior of 'my_buffer' when 'use_local = true'. + std::fill_n(my_output, length, static_cast(0)); + } + } + + /** + * Default constructor. + */ + LocalOutputBuffer() = default; + + /** + * @return Pointer to an output buffer to use for this thread. + * This contains at least `length` addressable elements (see the argument of the same name in the constructor). + */ + Output_* data() { + return (use_local ? my_buffer.data() : my_output); + } + + /** + * Transfer results from the local buffer to the global buffer (i.e., `output` in the constructor). + */ + void transfer() { + if (use_local) { + std::copy(my_buffer.begin(), my_buffer.end(), my_output); + } + } + +private: + Output_* my_output = NULL; + bool use_local = false; + std::vector my_buffer; +}; + } #endif diff --git a/include/tatami_stats/variances.hpp b/include/tatami_stats/variances.hpp index 472f2b6..c3c1a7f 100644 --- a/include/tatami_stats/variances.hpp +++ b/include/tatami_stats/variances.hpp @@ -2,6 +2,7 @@ #define TATAMI_STATS_VARS_HPP #include "tatami/tatami.hpp" +#include "utils.hpp" #include #include @@ -377,18 +378,22 @@ void apply(bool row, const tatami::Matrix* p, Output_* output, c }, dim, vopt.num_threads); } else { - std::fill(output, output + dim, static_cast(0)); - tatami::parallelize([&](size_t, Index_ s, Index_ l) { + tatami::parallelize([&](size_t thread, Index_ s, Index_ l) { auto ext = tatami::consecutive_extractor(p, !row, 0, otherdim, s, l); std::vector vbuffer(l); std::vector ibuffer(l); + std::vector running_means(l); - variances::RunningSparse runner(l, running_means.data(), output + s, vopt.skip_nan, s); + LocalOutputBuffer local_output(thread, s, l, output); + variances::RunningSparse runner(l, running_means.data(), local_output.data(), vopt.skip_nan, s); + for (Index_ x = 0; x < otherdim; ++x) { auto out = ext->fetch(vbuffer.data(), ibuffer.data()); runner.add(out.value, out.index, out.number); } runner.finish(); + + local_output.transfer(); }, dim, vopt.num_threads); } @@ -404,16 +409,20 @@ void apply(bool row, const tatami::Matrix* p, Output_* output, c }, dim, vopt.num_threads); } else { - std::fill(output, output + dim, static_cast(0)); - tatami::parallelize([&](size_t, Index_ s, Index_ l) { + tatami::parallelize([&](size_t thread, Index_ s, Index_ l) { auto ext = tatami::consecutive_extractor(p, !row, 0, otherdim, s, l); std::vector buffer(l); + std::vector running_means(l); - variances::RunningDense runner(l, running_means.data(), output + s, vopt.skip_nan); + LocalOutputBuffer local_output(thread, s, l, output); + variances::RunningDense runner(l, running_means.data(), local_output.data(), vopt.skip_nan); + for (Index_ x = 0; x < otherdim; ++x) { runner.add(ext->fetch(buffer.data())); } runner.finish(); + + local_output.transfer(); }, dim, vopt.num_threads); } }