Skip to content

Commit

Permalink
Fix ColumnReader (#7459)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: #7459

D50459051 (#7151) and D50472518 (#7155) broke the reader for cases where children vectors had different types than the ones that would be read.

Undoing those changes and parallelizing at that level.

Reviewed By: helfman

Differential Revision: D51088935

fbshipit-source-id: c76e0647047f264927f9fbead4f3c8f1213da122
  • Loading branch information
Daniel Munoz authored and facebook-github-bot committed Nov 8, 2023
1 parent fb34b0f commit d498bd4
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 78 deletions.
44 changes: 17 additions & 27 deletions velox/dwio/dwrf/reader/ColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1877,52 +1877,42 @@ void StructColumnReader::next(
// children vectors.
childrenVectorsPtr = &rowVector->children();
childrenVectors.clear();
} else {
childrenVectors.resize(children_.size());
childrenVectorsPtr = &childrenVectors;
}

for (uint64_t i = 0; i < children_.size(); ++i) {
auto& reader = children_[i];
if (reader) {
reader->next(numValues, (*childrenVectorsPtr)[i], nullsPtr);
}
}

if (result) {
result->setNullCount(nullCount);
} else {
// When read-string-as-row flag is on, string readers produce ROW(BIGINT,
// BIGINT) type instead of VARCHAR or VARBINARY. In these cases,
// requestedType_->type is not the right type of the final struct.
childrenVectors.resize(children_.size());
std::vector<TypePtr> types;
types.reserve(childrenVectors.size());
for (auto i = 0; i < childrenVectors.size(); i++) {
const auto& child = childrenVectors[i];
types.reserve(childrenVectorsPtr->size());
for (auto i = 0; i < childrenVectorsPtr->size(); i++) {
const auto& child = (*childrenVectorsPtr)[i];
if (child) {
types.emplace_back(child->type());
} else {
types.emplace_back(requestedType_->type()->childAt(i));
}
}

auto rowResult = std::make_shared<RowVector>(
result = std::make_shared<RowVector>(
&memoryPool_,
ROW(std::move(types)),
nulls,
numValues,
std::move(childrenVectors),
nullCount);
childrenVectorsPtr = &rowResult->children();
result = std::move(rowResult);
}

if (executor_) {
for (uint64_t i = 0; i < children_.size(); ++i) {
auto& reader = children_[i];
if (reader) {
executor_->add(
[&reader,
numValues,
child = &(*childrenVectorsPtr)[i],
nullsPtr]() { reader->next(numValues, *child, nullsPtr); });
}
}
} else {
for (uint64_t i = 0; i < children_.size(); ++i) {
auto& reader = children_[i];
if (reader) {
reader->next(numValues, (*childrenVectorsPtr)[i], nullsPtr);
}
}
}
}

Expand Down
11 changes: 2 additions & 9 deletions velox/dwio/dwrf/reader/DwrfReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
namespace facebook::velox::dwrf {

using dwio::common::ColumnSelector;
using dwio::common::ExecutorBarrier;
using dwio::common::FileFormat;
using dwio::common::InputStream;
using dwio::common::ReaderOptions;
Expand All @@ -35,10 +34,7 @@ DwrfRowReader::DwrfRowReader(
const RowReaderOptions& opts)
: StripeReaderBase(reader),
options_(opts),
executorBarrier_{
options_.getDecodingExecutor() ? std::make_unique<ExecutorBarrier>(
options_.getDecodingExecutor())
: nullptr},
executor_{options_.getDecodingExecutor()},
columnSelector_{std::make_shared<ColumnSelector>(
ColumnSelector::apply(opts.getSelector(), reader->getSchema()))} {
auto& footer = getReader().getFooter();
Expand Down Expand Up @@ -265,9 +261,6 @@ void DwrfRowReader::readNext(
mutation == nullptr,
"Mutation pushdown is only supported in selective reader");
columnReader_->next(rowsToRead, result);
if (executorBarrier_) {
executorBarrier_->waitAll();
}
auto reportDecodingTimeMsMetric = options_.getDecodingTimeMsCallback();
if (reportDecodingTimeMsMetric) {
auto decodingTime = std::chrono::duration_cast<std::chrono::milliseconds>(
Expand Down Expand Up @@ -521,7 +514,7 @@ DwrfRowReader::FetchResult DwrfRowReader::fetch(uint32_t stripeIndex) {
fileType,
stripeStreams,
streamLabels,
executorBarrier_.get(),
executor_.get(),
flatMapContext);
}
DWIO_ENSURE(
Expand Down
4 changes: 2 additions & 2 deletions velox/dwio/dwrf/reader/DwrfReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

#pragma once

#include "folly/Executor.h"
#include "folly/synchronization/Baton.h"
#include "velox/dwio/common/ExecutorBarrier.h"
#include "velox/dwio/common/ReaderFactory.h"
#include "velox/dwio/dwrf/reader/SelectiveDwrfReader.h"

Expand Down Expand Up @@ -146,7 +146,7 @@ class DwrfRowReader : public StrideIndexProvider,
uint64_t strideIndex_;
std::shared_ptr<StripeDictionaryCache> stripeDictionaryCache_;
dwio::common::RowReaderOptions options_;
std::unique_ptr<dwio::common::ExecutorBarrier> executorBarrier_;
std::shared_ptr<folly::Executor> executor_;

struct PrefetchedStripeState {
bool preloaded;
Expand Down
42 changes: 24 additions & 18 deletions velox/dwio/dwrf/reader/FlatMapColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <folly/json.h>

#include "velox/common/base/BitUtil.h"
#include "velox/dwio/common/ExecutorBarrier.h"
#include "velox/dwio/common/FlatMapHelper.h"

namespace facebook::velox::dwrf {
Expand Down Expand Up @@ -689,36 +690,26 @@ void FlatMapStructEncodingColumnReader<T>::next(
// children vectors.
childrenPtr = &rowVector->children();
children.clear();
result->setNullCount(nullCount);
} else {
children.resize(keyNodes_.size());
auto rowResult = std::make_shared<RowVector>(
&memoryPool_,
ROW(std::vector<std::string>(keyNodes_.size()),
std::vector<std::shared_ptr<const Type>>(
keyNodes_.size(), requestedType_->type()->asMap().valueType())),
nulls,
numValues,
std::move(children),
nullCount);
childrenPtr = &rowResult->children();
result = std::move(rowResult);
childrenPtr = &children;
}

if (executor_) {
dwio::common::ExecutorBarrier barrier(*executor_);
auto mergedNullsBuffers = std::make_shared<
folly::Synchronized<std::unordered_map<std::thread::id, BufferPtr>>>();
for (size_t i = 0; i < keyNodes_.size(); ++i) {
auto& node = keyNodes_[i];
auto& child = (*childrenPtr)[i];

if (node) {
executor_->add([&node,
&child,
numValues,
nonNullMaps,
nullsPtr,
mergedNullsBuffers]() {
barrier.add([&node,
&child,
numValues,
nonNullMaps,
nullsPtr,
mergedNullsBuffers]() {
auto mergedNullsBuffer =
getBufferForCurrentThread(*mergedNullsBuffers);
node->loadAsChild(
Expand All @@ -728,6 +719,7 @@ void FlatMapStructEncodingColumnReader<T>::next(
nullColumnReader_->next(numValues, child, nullsPtr);
}
}
barrier.waitAll();
} else {
for (size_t i = 0; i < keyNodes_.size(); ++i) {
auto& node = keyNodes_[i];
Expand All @@ -741,6 +733,20 @@ void FlatMapStructEncodingColumnReader<T>::next(
}
}
}

if (result) {
result->setNullCount(nullCount);
} else {
result = std::make_shared<RowVector>(
&memoryPool_,
ROW(std::vector<std::string>(keyNodes_.size()),
std::vector<std::shared_ptr<const Type>>(
keyNodes_.size(), requestedType_->type()->asMap().valueType())),
nulls,
numValues,
std::move(children),
nullCount);
}
}

inline bool isRequiringStructEncoding(
Expand Down
32 changes: 10 additions & 22 deletions velox/dwio/dwrf/test/ReaderTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "folly/executors/CPUThreadPoolExecutor.h"
#include "folly/lang/Assume.h"
#include "velox/common/base/tests/GTestUtils.h"
#include "velox/dwio/common/ExecutorBarrier.h"
#include "velox/dwio/common/FileSink.h"
#include "velox/dwio/common/tests/utils/BatchMaker.h"
#include "velox/dwio/dwrf/common/Common.h"
Expand Down Expand Up @@ -79,16 +80,15 @@ class TestReaderP
: public testing::TestWithParam</* parallel decoding = */ bool>,
public VectorTestBase {
protected:
ExecutorBarrier* barrier() {
if (GetParam() && !barrier_) {
barrier_ = std::make_unique<ExecutorBarrier>(
std::make_shared<folly::CPUThreadPoolExecutor>(2));
folly::Executor* executor() {
if (GetParam() && !executor_) {
std::make_shared<folly::CPUThreadPoolExecutor>(2);
}
return barrier_.get();
return executor_.get();
}

private:
std::unique_ptr<ExecutorBarrier> barrier_;
std::unique_ptr<folly::Executor> executor_;
};

class TestReader : public testing::Test, public VectorTestBase {
Expand Down Expand Up @@ -1801,13 +1801,10 @@ TEST_P(TestReaderP, testUpcastBoolean) {
TypeWithId::create(rowType),
streams,
labels,
barrier());
executor());

VectorPtr batch;
reader->next(104, batch);
if (barrier()) {
barrier()->waitAll();
}

auto lv = std::dynamic_pointer_cast<FlatVector<int32_t>>(
std::dynamic_pointer_cast<RowVector>(batch)->childAt(0));
Expand Down Expand Up @@ -1854,13 +1851,10 @@ TEST_P(TestReaderP, testUpcastIntDirect) {
TypeWithId::create(rowType),
streams,
labels,
barrier());
executor());

VectorPtr batch;
reader->next(100, batch);
if (barrier()) {
barrier()->waitAll();
}

auto lv = std::dynamic_pointer_cast<FlatVector<int64_t>>(
std::dynamic_pointer_cast<RowVector>(batch)->childAt(0));
Expand Down Expand Up @@ -1924,13 +1918,10 @@ TEST_P(TestReaderP, testUpcastIntDict) {
TypeWithId::create(rowType),
streams,
labels,
barrier());
executor());

VectorPtr batch;
reader->next(100, batch);
if (barrier()) {
barrier()->waitAll();
}

auto lv = std::dynamic_pointer_cast<FlatVector<int64_t>>(
std::dynamic_pointer_cast<RowVector>(batch)->childAt(0));
Expand Down Expand Up @@ -1982,13 +1973,10 @@ TEST_P(TestReaderP, testUpcastFloat) {
TypeWithId::create(rowType),
streams,
labels,
barrier());
executor());

VectorPtr batch;
reader->next(100, batch);
if (barrier()) {
barrier()->waitAll();
}

auto lv = std::dynamic_pointer_cast<FlatVector<double>>(
std::dynamic_pointer_cast<RowVector>(batch)->childAt(0));
Expand Down

0 comments on commit d498bd4

Please sign in to comment.