Skip to content

Commit

Permalink
Revert D64775045: add executorBarrier to nimble parallel writer
Browse files Browse the repository at this point in the history
Differential Revision:
D64775045

Original commit changeset: 435e565fd181

Original Phabricator Diff: D64775045

fbshipit-source-id: 27385cc9afbd5ec55af5e9657ef8002a42408915
  • Loading branch information
Sai Nikhit Gulla authored and facebook-github-bot committed Dec 5, 2024
1 parent 0e6d7b6 commit 86e1158
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 89 deletions.
78 changes: 20 additions & 58 deletions dwio/nimble/velox/FieldWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -284,10 +284,8 @@ class SimpleFieldWriter : public FieldWriter {
valuesStream_{context.createNullableContentStreamData<TargetType>(
typeBuilder_->asScalar().scalarDescriptor())} {}

void write(
const velox::VectorPtr& vector,
const OrderedRanges& ranges,
folly::Executor*) override {
void write(const velox::VectorPtr& vector, const OrderedRanges& ranges)
override {
auto size = ranges.size();
auto& buffer = context_.stringBuffer();
auto& data = valuesStream_.mutableData();
Expand Down Expand Up @@ -377,10 +375,8 @@ class RowFieldWriter : public FieldWriter {
}
}

void write(
const velox::VectorPtr& vector,
const OrderedRanges& ranges,
folly::Executor* executor = nullptr) override {
void write(const velox::VectorPtr& vector, const OrderedRanges& ranges)
override {
auto size = ranges.size();
OrderedRanges childRanges;
const OrderedRanges* childRangesPtr;
Expand Down Expand Up @@ -413,19 +409,8 @@ class RowFieldWriter : public FieldWriter {
Decoded{decoded},
[&](auto offset) { childRanges.add(offset, 1); });
}

if (executor) {
for (auto i = 0; i < fields_.size(); ++i) {
executor->add([&field = fields_[i],
&childVector = row->childAt(i),
childRanges = *childRangesPtr]() {
field->write(childVector, childRanges);
});
}
} else {
for (auto i = 0; i < fields_.size(); ++i) {
fields_[i]->write(row->childAt(i), *childRangesPtr);
}
for (auto i = 0; i < fields_.size(); ++i) {
fields_[i]->write(row->childAt(i), *childRangesPtr);
}
}

Expand Down Expand Up @@ -525,10 +510,8 @@ class ArrayFieldWriter : public MultiValueFieldWriter {
typeBuilder_->asArray().setChildren(elements_->typeBuilder());
}

void write(
const velox::VectorPtr& vector,
const OrderedRanges& ranges,
folly::Executor*) override {
void write(const velox::VectorPtr& vector, const OrderedRanges& ranges)
override {
OrderedRanges childRanges;
auto array = ingestLengths<velox::ArrayVector>(vector, ranges, childRanges);
if (childRanges.size() > 0) {
Expand Down Expand Up @@ -567,10 +550,8 @@ class MapFieldWriter : public MultiValueFieldWriter {
keys_->typeBuilder(), values_->typeBuilder());
}

void write(
const velox::VectorPtr& vector,
const OrderedRanges& ranges,
folly::Executor*) override {
void write(const velox::VectorPtr& vector, const OrderedRanges& ranges)
override {
OrderedRanges childRanges;
auto map = ingestLengths<velox::MapVector>(vector, ranges, childRanges);
if (childRanges.size() > 0) {
Expand Down Expand Up @@ -617,10 +598,8 @@ class SlidingWindowMapFieldWriter : public FieldWriter {
type->type(), 1, context.bufferMemoryPool.get());
}

void write(
const velox::VectorPtr& vector,
const OrderedRanges& ranges,
folly::Executor*) override {
void write(const velox::VectorPtr& vector, const OrderedRanges& ranges)
override {
OrderedRanges childFilteredRanges;
auto map = ingestOffsetsAndLengthsDeduplicated(
vector, ranges, childFilteredRanges);
Expand Down Expand Up @@ -845,14 +824,11 @@ class FlatMapFieldWriter : public FieldWriter {
typeBuilder_->asFlatMap().nullsDescriptor())},
valueType_{type->childAt(1)} {}

void write(
const velox::VectorPtr& vector,
const OrderedRanges& ranges,
folly::Executor* executor = nullptr) override {
void write(const velox::VectorPtr& vector, const OrderedRanges& ranges)
override {
// Check if the vector received is already flattened
const auto isFlatMap = vector->type()->kind() == velox::TypeKind::ROW;
isFlatMap ? ingestFlattenedMap(vector, ranges)
: ingestMap(vector, ranges, executor);
isFlatMap ? ingestFlattenedMap(vector, ranges) : ingestMap(vector, ranges);
}

FlatMapPassthroughValueFieldWriter& createPassthroughValueFieldWriter(
Expand Down Expand Up @@ -921,10 +897,7 @@ class FlatMapFieldWriter : public FieldWriter {
}
}

void ingestMap(
const velox::VectorPtr& vector,
const OrderedRanges& ranges,
folly::Executor* executor = nullptr) {
void ingestMap(const velox::VectorPtr& vector, const OrderedRanges& ranges) {
NIMBLE_ASSERT(
currentPassthroughFields_.empty(),
"Mixing map and flatmap vectors in the FlatMapFieldWriter is not supported");
Expand Down Expand Up @@ -1006,15 +979,8 @@ class FlatMapFieldWriter : public FieldWriter {
// Now actually ingest the map values
if (nonNullCount > 0) {
auto& values = map->mapValues();

if (executor) {
for (auto& pair : currentValueFields_) {
executor->add([&]() { pair.second->write(values, nonNullCount); });
}
} else {
for (auto& pair : currentValueFields_) {
pair.second->write(values, nonNullCount);
}
for (auto& pair : currentValueFields_) {
pair.second->write(values, nonNullCount);
}
}
nonNullCount_ += nonNullCount;
Expand Down Expand Up @@ -1064,8 +1030,6 @@ class FlatMapFieldWriter : public FieldWriter {
// check whether the typebuilder for this key is already present
auto flatFieldIt = allValueFields_.find(key);
if (flatFieldIt == allValueFields_.end()) {
std::scoped_lock<std::mutex> lock{context_.flatMapSchemaMutex};

auto valueFieldWriter = FieldWriter::create(context_, valueType_);
const auto& inMapDescriptor = typeBuilder_->asFlatMap().addChild(
stringKey, valueFieldWriter->typeBuilder());
Expand Down Expand Up @@ -1165,10 +1129,8 @@ class ArrayWithOffsetsFieldWriter : public FieldWriter {
type->type(), 1, context.bufferMemoryPool.get());
}

void write(
const velox::VectorPtr& vector,
const OrderedRanges& ranges,
folly::Executor*) override {
void write(const velox::VectorPtr& vector, const OrderedRanges& ranges)
override {
OrderedRanges childFilteredRanges;
const velox::ArrayVector* array;
// To unwrap the dictionaryVector we need to cast into ComplexType before
Expand Down
4 changes: 1 addition & 3 deletions dwio/nimble/velox/FieldWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ struct FieldWriterContext {
}

std::shared_ptr<velox::memory::MemoryPool> bufferMemoryPool;
std::mutex flatMapSchemaMutex;
SchemaBuilder schemaBuilder;

folly::F14FastSet<uint32_t> flatMapNodeIds;
Expand Down Expand Up @@ -167,8 +166,7 @@ class FieldWriter {
// Writes the vector to internal buffers.
virtual void write(
const velox::VectorPtr& vector,
const OrderedRanges& ranges,
folly::Executor* executor = nullptr) = 0;
const OrderedRanges& ranges) = 0;

// Clears interanl state and any accumulated data in internal buffers.
virtual void reset() = 0;
Expand Down
11 changes: 2 additions & 9 deletions dwio/nimble/velox/VeloxWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -505,14 +505,7 @@ bool VeloxWriter::write(const velox::VectorPtr& vector) {
NIMBLE_CHECK(file_, "Writer is already closed");
try {
auto size = vector->size();
if (context_->options.writeExecutor) {
velox::dwio::common::ExecutorBarrier barrier{
*context_->options.writeExecutor};
root_->write(vector, OrderedRanges::of(0, size), &barrier);
barrier.waitAll();
} else {
root_->write(vector, OrderedRanges::of(0, size));
}
root_->write(vector, OrderedRanges::of(0, size));

uint64_t memoryUsed = 0;
for (const auto& stream : context_->streams()) {
Expand Down Expand Up @@ -712,7 +705,7 @@ void VeloxWriter::writeChunk(bool lastChunk) {

if (context_->options.encodingExecutor) {
velox::dwio::common::ExecutorBarrier barrier{
*context_->options.encodingExecutor};
context_->options.encodingExecutor};
for (auto& streamData : context_->streams()) {
processStream(
*streamData, [&](StreamData& innerStreamData, bool isNullStream) {
Expand Down
2 changes: 0 additions & 2 deletions dwio/nimble/velox/VeloxWriterOptions.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,6 @@ struct VeloxWriterOptions {
// If provided, internal encoding operations will happen in parallel using
// this executor.
std::shared_ptr<folly::Executor> encodingExecutor;
// If provided, internal ingestion operations will happen in parallel
std::shared_ptr<folly::Executor> writeExecutor;

bool enableChunking = false;

Expand Down
21 changes: 4 additions & 17 deletions dwio/nimble/velox/tests/VeloxReaderTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2430,10 +2430,8 @@ TEST_F(VeloxReaderTests, FuzzSimple) {
LOG(INFO) << "Parallelism Factor: " << parallelismFactor;
nimble::VeloxWriterOptions writerOptions;
if (parallelismFactor > 0) {
auto executor =
writerOptions.encodingExecutor =
std::make_shared<folly::CPUThreadPoolExecutor>(parallelismFactor);
writerOptions.encodingExecutor = executor;
writerOptions.writeExecutor = executor;
}

for (auto i = 0; i < iterations; ++i) {
Expand Down Expand Up @@ -2467,14 +2465,6 @@ TEST_F(VeloxReaderTests, FuzzComplex) {
{"a", velox::REAL()},
{"b", velox::INTEGER()},
})},
{"row",
velox::ROW(
{{"nested_row",
velox::ROW(
{{"nested_nested_row", velox::ROW({{"a", velox::INTEGER()}})},
{"b", velox::INTEGER()}})}})},
{"map",
velox::MAP(velox::INTEGER(), velox::ROW({{"a", velox::INTEGER()}}))},
{"nested",
velox::ARRAY(velox::ROW({
{"a", velox::INTEGER()},
Expand Down Expand Up @@ -2527,12 +2517,9 @@ TEST_F(VeloxReaderTests, FuzzComplex) {

for (auto parallelismFactor : {0U, 1U, std::thread::hardware_concurrency()}) {
LOG(INFO) << "Parallelism Factor: " << parallelismFactor;
if (parallelismFactor > 0) {
auto executor =
std::make_shared<folly::CPUThreadPoolExecutor>(parallelismFactor);
writerOptions.encodingExecutor = executor;
writerOptions.writeExecutor = executor;
}
writerOptions.encodingExecutor = parallelismFactor > 0
? std::make_shared<folly::CPUThreadPoolExecutor>(parallelismFactor)
: nullptr;

for (auto i = 0; i < iterations; ++i) {
writeAndVerify(
Expand Down

0 comments on commit 86e1158

Please sign in to comment.