diff --git a/velox/dwio/dwrf/reader/ColumnReader.cpp b/velox/dwio/dwrf/reader/ColumnReader.cpp index 4e58f4607c2c..763e4b14adc3 100644 --- a/velox/dwio/dwrf/reader/ColumnReader.cpp +++ b/velox/dwio/dwrf/reader/ColumnReader.cpp @@ -1877,16 +1877,28 @@ void StructColumnReader::next( // children vectors. childrenVectorsPtr = &rowVector->children(); childrenVectors.clear(); + } else { + childrenVectors.resize(children_.size()); + childrenVectorsPtr = &childrenVectors; + } + + for (uint64_t i = 0; i < children_.size(); ++i) { + auto& reader = children_[i]; + if (reader) { + reader->next(numValues, (*childrenVectorsPtr)[i], nullsPtr); + } + } + + if (result) { result->setNullCount(nullCount); } else { // When read-string-as-row flag is on, string readers produce ROW(BIGINT, // BIGINT) type instead of VARCHAR or VARBINARY. In these cases, // requestedType_->type is not the right type of the final struct. - childrenVectors.resize(children_.size()); std::vector types; - types.reserve(childrenVectors.size()); - for (auto i = 0; i < childrenVectors.size(); i++) { - const auto& child = childrenVectors[i]; + types.reserve(childrenVectorsPtr->size()); + for (auto i = 0; i < childrenVectorsPtr->size(); i++) { + const auto& child = (*childrenVectorsPtr)[i]; if (child) { types.emplace_back(child->type()); } else { @@ -1894,35 +1906,13 @@ void StructColumnReader::next( } } - auto rowResult = std::make_shared( + result = std::make_shared( &memoryPool_, ROW(std::move(types)), nulls, numValues, std::move(childrenVectors), nullCount); - childrenVectorsPtr = &rowResult->children(); - result = std::move(rowResult); - } - - if (executor_) { - for (uint64_t i = 0; i < children_.size(); ++i) { - auto& reader = children_[i]; - if (reader) { - executor_->add( - [&reader, - numValues, - child = &(*childrenVectorsPtr)[i], - nullsPtr]() { reader->next(numValues, *child, nullsPtr); }); - } - } - } else { - for (uint64_t i = 0; i < children_.size(); ++i) { - auto& reader = children_[i]; - if (reader) { - reader->next(numValues, (*childrenVectorsPtr)[i], nullsPtr); - } - } } } diff --git a/velox/dwio/dwrf/reader/DwrfReader.cpp b/velox/dwio/dwrf/reader/DwrfReader.cpp index 239982eaf963..107a6a69c9e7 100644 --- a/velox/dwio/dwrf/reader/DwrfReader.cpp +++ b/velox/dwio/dwrf/reader/DwrfReader.cpp @@ -24,7 +24,6 @@ namespace facebook::velox::dwrf { using dwio::common::ColumnSelector; -using dwio::common::ExecutorBarrier; using dwio::common::FileFormat; using dwio::common::InputStream; using dwio::common::ReaderOptions; @@ -35,10 +34,7 @@ DwrfRowReader::DwrfRowReader( const RowReaderOptions& opts) : StripeReaderBase(reader), options_(opts), - executorBarrier_{ - options_.getDecodingExecutor() ? std::make_unique( - options_.getDecodingExecutor()) - : nullptr}, + executor_{options_.getDecodingExecutor()}, columnSelector_{std::make_shared( ColumnSelector::apply(opts.getSelector(), reader->getSchema()))} { auto& footer = getReader().getFooter(); @@ -265,9 +261,6 @@ void DwrfRowReader::readNext( mutation == nullptr, "Mutation pushdown is only supported in selective reader"); columnReader_->next(rowsToRead, result); - if (executorBarrier_) { - executorBarrier_->waitAll(); - } auto reportDecodingTimeMsMetric = options_.getDecodingTimeMsCallback(); if (reportDecodingTimeMsMetric) { auto decodingTime = std::chrono::duration_cast( @@ -521,7 +514,7 @@ DwrfRowReader::FetchResult DwrfRowReader::fetch(uint32_t stripeIndex) { fileType, stripeStreams, streamLabels, - executorBarrier_.get(), + executor_.get(), flatMapContext); } DWIO_ENSURE( diff --git a/velox/dwio/dwrf/reader/DwrfReader.h b/velox/dwio/dwrf/reader/DwrfReader.h index bdea8ade3ce9..5cc35ac33d21 100644 --- a/velox/dwio/dwrf/reader/DwrfReader.h +++ b/velox/dwio/dwrf/reader/DwrfReader.h @@ -16,8 +16,8 @@ #pragma once +#include "folly/Executor.h" #include "folly/synchronization/Baton.h" -#include "velox/dwio/common/ExecutorBarrier.h" #include "velox/dwio/common/ReaderFactory.h" #include "velox/dwio/dwrf/reader/SelectiveDwrfReader.h" @@ -146,7 +146,7 @@ class DwrfRowReader : public StrideIndexProvider, uint64_t strideIndex_; std::shared_ptr stripeDictionaryCache_; dwio::common::RowReaderOptions options_; - std::unique_ptr executorBarrier_; + std::shared_ptr executor_; struct PrefetchedStripeState { bool preloaded; diff --git a/velox/dwio/dwrf/reader/FlatMapColumnReader.cpp b/velox/dwio/dwrf/reader/FlatMapColumnReader.cpp index 5ab51f3d52ac..eed8b6fef37e 100644 --- a/velox/dwio/dwrf/reader/FlatMapColumnReader.cpp +++ b/velox/dwio/dwrf/reader/FlatMapColumnReader.cpp @@ -20,6 +20,7 @@ #include #include "velox/common/base/BitUtil.h" +#include "velox/dwio/common/ExecutorBarrier.h" #include "velox/dwio/common/FlatMapHelper.h" namespace facebook::velox::dwrf { @@ -689,23 +690,13 @@ void FlatMapStructEncodingColumnReader::next( // children vectors. childrenPtr = &rowVector->children(); children.clear(); - result->setNullCount(nullCount); } else { children.resize(keyNodes_.size()); - auto rowResult = std::make_shared( - &memoryPool_, - ROW(std::vector(keyNodes_.size()), - std::vector>( - keyNodes_.size(), requestedType_->type()->asMap().valueType())), - nulls, - numValues, - std::move(children), - nullCount); - childrenPtr = &rowResult->children(); - result = std::move(rowResult); + childrenPtr = &children; } if (executor_) { + dwio::common::ExecutorBarrier barrier(*executor_); auto mergedNullsBuffers = std::make_shared< folly::Synchronized>>(); for (size_t i = 0; i < keyNodes_.size(); ++i) { @@ -713,12 +704,12 @@ void FlatMapStructEncodingColumnReader::next( auto& child = (*childrenPtr)[i]; if (node) { - executor_->add([&node, - &child, - numValues, - nonNullMaps, - nullsPtr, - mergedNullsBuffers]() { + barrier.add([&node, + &child, + numValues, + nonNullMaps, + nullsPtr, + mergedNullsBuffers]() { auto mergedNullsBuffer = getBufferForCurrentThread(*mergedNullsBuffers); node->loadAsChild( @@ -728,6 +719,7 @@ void FlatMapStructEncodingColumnReader::next( nullColumnReader_->next(numValues, child, nullsPtr); } } + barrier.waitAll(); } else { for (size_t i = 0; i < keyNodes_.size(); ++i) { auto& node = keyNodes_[i]; @@ -741,6 +733,20 @@ void FlatMapStructEncodingColumnReader::next( } } } + + if (result) { + result->setNullCount(nullCount); + } else { + result = std::make_shared( + &memoryPool_, + ROW(std::vector(keyNodes_.size()), + std::vector>( + keyNodes_.size(), requestedType_->type()->asMap().valueType())), + nulls, + numValues, + std::move(children), + nullCount); + } } inline bool isRequiringStructEncoding( diff --git a/velox/dwio/dwrf/test/ReaderTest.cpp b/velox/dwio/dwrf/test/ReaderTest.cpp index aa22b9cabedc..e04f3eac8550 100644 --- a/velox/dwio/dwrf/test/ReaderTest.cpp +++ b/velox/dwio/dwrf/test/ReaderTest.cpp @@ -21,6 +21,7 @@ #include "folly/executors/CPUThreadPoolExecutor.h" #include "folly/lang/Assume.h" #include "velox/common/base/tests/GTestUtils.h" +#include "velox/dwio/common/ExecutorBarrier.h" #include "velox/dwio/common/FileSink.h" #include "velox/dwio/common/tests/utils/BatchMaker.h" #include "velox/dwio/dwrf/common/Common.h" @@ -79,16 +80,15 @@ class TestReaderP : public testing::TestWithParam, public VectorTestBase { protected: - ExecutorBarrier* barrier() { - if (GetParam() && !barrier_) { - barrier_ = std::make_unique( - std::make_shared(2)); + folly::Executor* executor() { + if (GetParam() && !executor_) { + std::make_shared(2); } - return barrier_.get(); + return executor_.get(); } private: - std::unique_ptr barrier_; + std::unique_ptr executor_; }; class TestReader : public testing::Test, public VectorTestBase { @@ -1801,13 +1801,10 @@ TEST_P(TestReaderP, testUpcastBoolean) { TypeWithId::create(rowType), streams, labels, - barrier()); + executor()); VectorPtr batch; reader->next(104, batch); - if (barrier()) { - barrier()->waitAll(); - } auto lv = std::dynamic_pointer_cast>( std::dynamic_pointer_cast(batch)->childAt(0)); @@ -1854,13 +1851,10 @@ TEST_P(TestReaderP, testUpcastIntDirect) { TypeWithId::create(rowType), streams, labels, - barrier()); + executor()); VectorPtr batch; reader->next(100, batch); - if (barrier()) { - barrier()->waitAll(); - } auto lv = std::dynamic_pointer_cast>( std::dynamic_pointer_cast(batch)->childAt(0)); @@ -1924,13 +1918,10 @@ TEST_P(TestReaderP, testUpcastIntDict) { TypeWithId::create(rowType), streams, labels, - barrier()); + executor()); VectorPtr batch; reader->next(100, batch); - if (barrier()) { - barrier()->waitAll(); - } auto lv = std::dynamic_pointer_cast>( std::dynamic_pointer_cast(batch)->childAt(0)); @@ -1982,13 +1973,10 @@ TEST_P(TestReaderP, testUpcastFloat) { TypeWithId::create(rowType), streams, labels, - barrier()); + executor()); VectorPtr batch; reader->next(100, batch); - if (barrier()) { - barrier()->waitAll(); - } auto lv = std::dynamic_pointer_cast>( std::dynamic_pointer_cast(batch)->childAt(0));