From d7c4f88f606a99134ef988feb94acc905a90b638 Mon Sep 17 00:00:00 2001 From: Kevin Wilfong Date: Thu, 2 May 2024 10:15:06 -0700 Subject: [PATCH] HashProbe should only reuse nonSpillInputIndicesBuffer_ if it's mutable Summary: When some partitions are spilled and others are not, HashProbe wraps the input columns with a DictionaryVector to only select the rows that belong to unspilled partitions. HashProbe uses the Buffer in nonSpillInputIndicesBuffer_ for the indices of this DictionaryVector. If these input columns are selected in the output these DictionaryVectors are propagated in the output. This can lead to concurrency problems, e.g. if the Join is followed by a LocalExchange. In this case, the next time addInput is called on HashProbe, the output Vectors from a previous call to getOutput may still be used by other Driver threads. We need to check if nonSpillInputIndicesBuffer_ is mutable to ensure no other threads are holding references to it before overwriting it. Unfortunately, I can't add any unit tests for this. Any situation where TSAN might catch such an issue will also catch the issue that HashProbe holds onto output_, which leads to a data race if another thread reads from that Vector and the thread with the HashProbe happens to hold the last reference to the Vector causing it to eventually delete it leading TSAN to flag it as an unsafe write. In practice this is fine since it will only write if it's holding the last reference. Reviewed By: xiaoxmeng Differential Revision: D56795760 fbshipit-source-id: fa574b30ce2b9361d97853132222ff2f25096ded --- velox/exec/HashProbe.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/velox/exec/HashProbe.cpp b/velox/exec/HashProbe.cpp index 0c6369715173..66cc6b7801e9 100644 --- a/velox/exec/HashProbe.cpp +++ b/velox/exec/HashProbe.cpp @@ -478,6 +478,7 @@ void HashProbe::prepareInputIndicesBuffers( VELOX_DCHECK(spillEnabled()); const auto maxIndicesBufferBytes = numInput * sizeof(vector_size_t); if (nonSpillInputIndicesBuffer_ == nullptr || + !nonSpillInputIndicesBuffer_->isMutable() || nonSpillInputIndicesBuffer_->size() < maxIndicesBufferBytes) { nonSpillInputIndicesBuffer_ = allocateIndices(numInput, pool()); rawNonSpillInputIndicesBuffer_ =