Skip to content

Commit

Permalink
HashProbe should only reuse nonSpillInputIndicesBuffer_ if it's mutable
Browse files Browse the repository at this point in the history
Summary:
When some partitions are spilled and others are not, HashProbe wraps the input columns with a
DictionaryVector to only select the rows that belong to unspilled partitions.  HashProbe uses the
Buffer in nonSpillInputIndicesBuffer_ for the indices of this DictionaryVector.  If these input columns
are selected in the output these DictionaryVectors are propagated in the output.

This can lead to concurrency problems, e.g. if the Join is followed by a LocalExchange.  In this
case, the next time addInput is called on HashProbe, the output Vectors from a previous call to
getOutput may still be used by other Driver threads.

We need to check if nonSpillInputIndicesBuffer_ is mutable to ensure no other threads are holding
references to it before overwriting it.

Unfortunately, I can't add any unit tests for this.  Any situation where TSAN might catch such an
issue will also catch the issue that HashProbe holds onto output_, which leads to a data race if
another thread reads from that Vector and the thread with the HashProbe happens to hold the last
reference to the Vector causing it to eventually delete it leading TSAN to flag it as an unsafe write.
In practice this is fine since it will only write if it's holding the last reference.

Reviewed By: xiaoxmeng

Differential Revision: D56795760

fbshipit-source-id: fa574b30ce2b9361d97853132222ff2f25096ded
  • Loading branch information
Kevin Wilfong authored and facebook-github-bot committed May 2, 2024
1 parent 3f3895d commit d7c4f88
Showing 1 changed file with 1 addition and 0 deletions.
1 change: 1 addition & 0 deletions velox/exec/HashProbe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,7 @@ void HashProbe::prepareInputIndicesBuffers(
VELOX_DCHECK(spillEnabled());
const auto maxIndicesBufferBytes = numInput * sizeof(vector_size_t);
if (nonSpillInputIndicesBuffer_ == nullptr ||
!nonSpillInputIndicesBuffer_->isMutable() ||
nonSpillInputIndicesBuffer_->size() < maxIndicesBufferBytes) {
nonSpillInputIndicesBuffer_ = allocateIndices(numInput, pool());
rawNonSpillInputIndicesBuffer_ =
Expand Down

0 comments on commit d7c4f88

Please sign in to comment.