Skip to content

Commit

Permalink
Use local buffers to avoid false sharing in running calculations.
Browse files Browse the repository at this point in the history
  • Loading branch information
LTLA committed May 21, 2024
1 parent 47c4f73 commit 231cda1
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 31 deletions.
32 changes: 20 additions & 12 deletions include/tatami_stats/grouped_sums.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,21 +95,21 @@ void apply(bool row, const tatami::Matrix<Value_, Index_>* p, const Group_* grou
}, dim, sopt.num_threads);

} else {
for (size_t g = 0; g < num_groups; ++g) {
std::fill(output[g], output[g] + dim, static_cast<Output_>(0));
}

// Order doesn't affect numerical precision of the outcome, as
// addition order for each target vector is already well-defined
// for a running calculation.
tatami::Options opt;
opt.sparse_ordered_index = false;

tatami::parallelize([&](int, Index_ start, Index_ len) -> void {
tatami::parallelize([&](size_t thread, Index_ start, Index_ len) -> void {
std::vector<sums::RunningSparse<Output_, Value_, Index_> > runners;
runners.reserve(num_groups);
std::vector<LocalOutputBuffer<Output_> > local_output;
local_output.reserve(num_groups);

for (size_t g = 0; g < num_groups; ++g) {
runners.emplace_back(len, output[g] + start, sopt.skip_nan, start);
local_output.emplace_back(thread, start, len, output[g]);
runners.emplace_back(len, local_output.back().data(), sopt.skip_nan, start);
}

auto ext = tatami::consecutive_extractor<true>(p, !row, 0, otherdim, start, len, opt);
Expand All @@ -120,6 +120,10 @@ void apply(bool row, const tatami::Matrix<Value_, Index_>* p, const Group_* grou
auto range = ext->fetch(xbuffer.data(), ibuffer.data());
runners[group[i]].add(range.value, range.index, range.number);
}

for (size_t g = 0; g < num_groups; ++g) {
local_output[g].transfer();
}
}, dim, sopt.num_threads);
}

Expand Down Expand Up @@ -154,15 +158,15 @@ void apply(bool row, const tatami::Matrix<Value_, Index_>* p, const Group_* grou
}, dim, sopt.num_threads);

} else {
for (size_t g = 0; g < num_groups; ++g) {
std::fill(output[g], output[g] + dim, static_cast<Output_>(0));
}

tatami::parallelize([&](int, Index_ start, Index_ len) -> void {
tatami::parallelize([&](size_t thread, Index_ start, Index_ len) -> void {
std::vector<sums::RunningDense<Output_, Value_, Index_> > runners;
runners.reserve(num_groups);
std::vector<LocalOutputBuffer<Output_> > local_output;
local_output.reserve(num_groups);

for (size_t g = 0; g < num_groups; ++g) {
runners.emplace_back(len, output[g] + start, sopt.skip_nan);
local_output.emplace_back(thread, start, len, output[g]);
runners.emplace_back(len, local_output.back().data(), sopt.skip_nan);
}

std::vector<double> xbuffer(len);
Expand All @@ -172,6 +176,10 @@ void apply(bool row, const tatami::Matrix<Value_, Index_>* p, const Group_* grou
auto ptr = ext->fetch(xbuffer.data());
runners[group[i]].add(ptr);
}

for (size_t g = 0; g < num_groups; ++g) {
local_output[g].transfer();
}
}, dim, sopt.num_threads);
}
}
Expand Down
23 changes: 17 additions & 6 deletions include/tatami_stats/ranges.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#define TATAMI_STATS_RANGES_HPP

#include "tatami/tatami.hpp"
#include "utils.hpp"

#include <vector>
#include <algorithm>
Expand Down Expand Up @@ -353,12 +354,15 @@ void apply(bool row, const tatami::Matrix<Value_, Index_>* p, Output_* min_out,
}, dim, ropt.num_threads);

} else {
tatami::parallelize([&](size_t, Index_ s, Index_ l) {
tatami::parallelize([&](size_t thread, Index_ s, Index_ l) {
auto ext = tatami::consecutive_extractor<true>(p, !row, 0, otherdim, s, l, opt);
std::vector<Value_> vbuffer(l);
std::vector<Index_> ibuffer(l);
ranges::RunningSparse<true, Output_, Value_, Index_> runmin(l, (store_min ? min_out + s : NULL), ropt.skip_nan, s);
ranges::RunningSparse<false, Output_, Value_, Index_> runmax(l, (store_max ? max_out + s : NULL), ropt.skip_nan, s);

auto local_min = (store_min ? LocalOutputBuffer<Output_>(thread, s, l, min_out) : LocalOutputBuffer<Output_>());
auto local_max = (store_max ? LocalOutputBuffer<Output_>(thread, s, l, max_out) : LocalOutputBuffer<Output_>());
ranges::RunningSparse<true, Output_, Value_, Index_> runmin(l, local_min.data(), ropt.skip_nan, s);
ranges::RunningSparse<false, Output_, Value_, Index_> runmax(l, local_max.data(), ropt.skip_nan, s);

for (Index_ x = 0; x < otherdim; ++x) {
auto out = ext->fetch(vbuffer.data(), ibuffer.data());
Expand All @@ -372,9 +376,11 @@ void apply(bool row, const tatami::Matrix<Value_, Index_>* p, Output_* min_out,

if (store_min) {
runmin.finish();
local_min.transfer();
}
if (store_max) {
runmax.finish();
local_max.transfer();
}
}, dim, ropt.num_threads);
}
Expand All @@ -396,11 +402,14 @@ void apply(bool row, const tatami::Matrix<Value_, Index_>* p, Output_* min_out,
}, dim, ropt.num_threads);

} else {
tatami::parallelize([&](size_t, Index_ s, Index_ l) {
tatami::parallelize([&](size_t thread, Index_ s, Index_ l) {
auto ext = tatami::consecutive_extractor<false>(p, !row, 0, otherdim, s, l);
std::vector<Value_> buffer(l);
ranges::RunningDense<true, Output_, Value_, Index_> runmin(l, (store_min ? min_out + s : NULL), ropt.skip_nan);
ranges::RunningDense<false, Output_, Value_, Index_> runmax(l, (store_max ? max_out + s : NULL), ropt.skip_nan);

auto local_min = (store_min ? LocalOutputBuffer<Output_>(thread, s, l, min_out) : LocalOutputBuffer<Output_>());
auto local_max = (store_max ? LocalOutputBuffer<Output_>(thread, s, l, max_out) : LocalOutputBuffer<Output_>());
ranges::RunningDense<true, Output_, Value_, Index_> runmin(l, local_min.data(), ropt.skip_nan);
ranges::RunningDense<false, Output_, Value_, Index_> runmax(l, local_max.data(), ropt.skip_nan);

for (Index_ x = 0; x < otherdim; ++x) {
auto ptr = ext->fetch(buffer.data());
Expand All @@ -414,9 +423,11 @@ void apply(bool row, const tatami::Matrix<Value_, Index_>* p, Output_* min_out,

if (store_min) {
runmin.finish();
local_min.transfer();
}
if (store_max) {
runmax.finish();
local_max.transfer();
}
}, dim, ropt.num_threads);
}
Expand Down
20 changes: 13 additions & 7 deletions include/tatami_stats/sums.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#define TATAMI_STATS__SUMS_HPP

#include "tatami/tatami.hpp"
#include "utils.hpp"

#include <vector>
#include <numeric>
Expand Down Expand Up @@ -210,18 +211,21 @@ void apply(bool row, const tatami::Matrix<Value_, Index_>* p, Output_* output, c
} else {
tatami::Options opt;
opt.sparse_ordered_index = false;
std::fill(output, output + dim, static_cast<Output_>(0));

tatami::parallelize([&](size_t, Index_ s, Index_ l) {
tatami::parallelize([&](size_t thread, Index_ s, Index_ l) {
auto ext = tatami::consecutive_extractor<true>(p, !row, 0, otherdim, s, l, opt);
std::vector<Value_> vbuffer(l);
std::vector<Index_> ibuffer(l);
sums::RunningSparse<Output_, Value_, Index_> runner(l, output + s, sopt.skip_nan, s);

LocalOutputBuffer<Output_> local_output(thread, s, l, output);
sums::RunningSparse<Output_, Value_, Index_> runner(l, local_output.data(), sopt.skip_nan, s);

for (Index_ x = 0; x < otherdim; ++x) {
auto out = ext->fetch(vbuffer.data(), ibuffer.data());
runner.add(out.value, out.index, out.number);
}

local_output.transfer();
}, dim, sopt.num_threads);
}

Expand All @@ -237,17 +241,19 @@ void apply(bool row, const tatami::Matrix<Value_, Index_>* p, Output_* output, c
}, dim, sopt.num_threads);

} else {
std::fill(output, output + dim, static_cast<Output_>(0));

tatami::parallelize([&](size_t, Index_ s, Index_ l) {
tatami::parallelize([&](size_t thread, Index_ s, Index_ l) {
auto ext = tatami::consecutive_extractor<false>(p, !row, 0, otherdim, s, l);
std::vector<Value_> buffer(l);
sums::RunningDense<Output_, Value_, Index_> runner(l, output + s, sopt.skip_nan);

LocalOutputBuffer<Output_> local_output(thread, s, l, output);
sums::RunningDense<Output_, Value_, Index_> runner(l, local_output.data(), sopt.skip_nan);

for (Index_ x = 0; x < otherdim; ++x) {
auto out = ext->fetch(buffer.data());
runner.add(out);
}

local_output.transfer();
}, dim, sopt.num_threads);
}
}
Expand Down
59 changes: 59 additions & 0 deletions include/tatami_stats/utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,65 @@ std::vector<Size_> tabulate_groups(const Group_* group, Size_ n) {
return group_sizes;
}

/**
* @brief Local output buffer for running calculations.
*
* A typical parallelization scenario involves dividing the set of target vectors into contiguous blocks, where each thread operates on a block at a time.
* However, in running calculations, an entire block's statistics are updated when its corresponding thread processes an observed vector.
* If these statistics are stored in a global buffer, false sharing at the boundaries of the blocks can result in performance degradation.
*
* To avoid this, the `LocalOutputBuffer` class provides thread-local storage for output statistics.
* Once the calculations are finished per thread, callers should use `transfer()` to transfer the local statistics to the global buffer.
* The exception is that of the first thread, which is allowed to directly write to the global output buffer.
*
* @tparam Output_ Type of the result.
*/
template<typename Output_>
class LocalOutputBuffer {
public:
/**
* @tparam Index_ Type of the start index and length.
* @param thread Identity of the thread, starting from zero to the total number of threads.
* @param start Index of the first target vector in the contiguous block for this thread.
* @param length Number of target vectors in the contiguous block for this thread.
* @param[out] output Pointer to the global output buffer.
*/
template<typename Index_>
LocalOutputBuffer(size_t thread, Index_ start, Index_ length, Output_* output) : my_output(output + start), use_local(thread > 0), my_buffer(use_local ? length : 0) {
if (!use_local) {
// Setting to zero to match the initial behavior of 'my_buffer' when 'use_local = true'.
std::fill_n(my_output, length, static_cast<Output_>(0));
}
}

/**
* Default constructor.
*/
LocalOutputBuffer() = default;

/**
* @return Pointer to an output buffer to use for this thread.
* This contains at least `length` addressable elements (see the argument of the same name in the constructor).
*/
Output_* data() {
return (use_local ? my_buffer.data() : my_output);
}

/**
* Transfer results from the local buffer to the global buffer (i.e., `output` in the constructor).
*/
void transfer() {
if (use_local) {
std::copy(my_buffer.begin(), my_buffer.end(), my_output);
}
}

private:
Output_* my_output = NULL;
bool use_local = false;
std::vector<Output_> my_buffer;
};

}

#endif
21 changes: 15 additions & 6 deletions include/tatami_stats/variances.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#define TATAMI_STATS_VARS_HPP

#include "tatami/tatami.hpp"
#include "utils.hpp"

#include <vector>
#include <cmath>
Expand Down Expand Up @@ -377,18 +378,22 @@ void apply(bool row, const tatami::Matrix<Value_, Index_>* p, Output_* output, c
}, dim, vopt.num_threads);

} else {
std::fill(output, output + dim, static_cast<Output_>(0));
tatami::parallelize([&](size_t, Index_ s, Index_ l) {
tatami::parallelize([&](size_t thread, Index_ s, Index_ l) {
auto ext = tatami::consecutive_extractor<true>(p, !row, 0, otherdim, s, l);
std::vector<Value_> vbuffer(l);
std::vector<Index_> ibuffer(l);

std::vector<Output_> running_means(l);
variances::RunningSparse<Output_, Value_, Index_> runner(l, running_means.data(), output + s, vopt.skip_nan, s);
LocalOutputBuffer<Output_> local_output(thread, s, l, output);
variances::RunningSparse<Output_, Value_, Index_> runner(l, running_means.data(), local_output.data(), vopt.skip_nan, s);

for (Index_ x = 0; x < otherdim; ++x) {
auto out = ext->fetch(vbuffer.data(), ibuffer.data());
runner.add(out.value, out.index, out.number);
}
runner.finish();

local_output.transfer();
}, dim, vopt.num_threads);
}

Expand All @@ -404,16 +409,20 @@ void apply(bool row, const tatami::Matrix<Value_, Index_>* p, Output_* output, c
}, dim, vopt.num_threads);

} else {
std::fill(output, output + dim, static_cast<Output_>(0));
tatami::parallelize([&](size_t, Index_ s, Index_ l) {
tatami::parallelize([&](size_t thread, Index_ s, Index_ l) {
auto ext = tatami::consecutive_extractor<false>(p, !row, 0, otherdim, s, l);
std::vector<Value_> buffer(l);

std::vector<Output_> running_means(l);
variances::RunningDense<Output_, Value_, Index_> runner(l, running_means.data(), output + s, vopt.skip_nan);
LocalOutputBuffer<Output_> local_output(thread, s, l, output);
variances::RunningDense<Output_, Value_, Index_> runner(l, running_means.data(), local_output.data(), vopt.skip_nan);

for (Index_ x = 0; x < otherdim; ++x) {
runner.add(ext->fetch(buffer.data()));
}
runner.finish();

local_output.transfer();
}, dim, vopt.num_threads);
}
}
Expand Down

0 comments on commit 231cda1

Please sign in to comment.