diff --git a/dwio/nimble/velox/FieldWriter.cpp b/dwio/nimble/velox/FieldWriter.cpp index 754a7f6..437d481 100644 --- a/dwio/nimble/velox/FieldWriter.cpp +++ b/dwio/nimble/velox/FieldWriter.cpp @@ -284,10 +284,8 @@ class SimpleFieldWriter : public FieldWriter { valuesStream_{context.createNullableContentStreamData( typeBuilder_->asScalar().scalarDescriptor())} {} - void write( - const velox::VectorPtr& vector, - const OrderedRanges& ranges, - folly::Executor*) override { + void write(const velox::VectorPtr& vector, const OrderedRanges& ranges) + override { auto size = ranges.size(); auto& buffer = context_.stringBuffer(); auto& data = valuesStream_.mutableData(); @@ -377,10 +375,8 @@ class RowFieldWriter : public FieldWriter { } } - void write( - const velox::VectorPtr& vector, - const OrderedRanges& ranges, - folly::Executor* executor = nullptr) override { + void write(const velox::VectorPtr& vector, const OrderedRanges& ranges) + override { auto size = ranges.size(); OrderedRanges childRanges; const OrderedRanges* childRangesPtr; @@ -413,19 +409,8 @@ class RowFieldWriter : public FieldWriter { Decoded{decoded}, [&](auto offset) { childRanges.add(offset, 1); }); } - - if (executor) { - for (auto i = 0; i < fields_.size(); ++i) { - executor->add([&field = fields_[i], - &childVector = row->childAt(i), - childRanges = *childRangesPtr]() { - field->write(childVector, childRanges); - }); - } - } else { - for (auto i = 0; i < fields_.size(); ++i) { - fields_[i]->write(row->childAt(i), *childRangesPtr); - } + for (auto i = 0; i < fields_.size(); ++i) { + fields_[i]->write(row->childAt(i), *childRangesPtr); } } @@ -525,10 +510,8 @@ class ArrayFieldWriter : public MultiValueFieldWriter { typeBuilder_->asArray().setChildren(elements_->typeBuilder()); } - void write( - const velox::VectorPtr& vector, - const OrderedRanges& ranges, - folly::Executor*) override { + void write(const velox::VectorPtr& vector, const OrderedRanges& ranges) + override { OrderedRanges childRanges; auto array = ingestLengths(vector, ranges, childRanges); if (childRanges.size() > 0) { @@ -567,10 +550,8 @@ class MapFieldWriter : public MultiValueFieldWriter { keys_->typeBuilder(), values_->typeBuilder()); } - void write( - const velox::VectorPtr& vector, - const OrderedRanges& ranges, - folly::Executor*) override { + void write(const velox::VectorPtr& vector, const OrderedRanges& ranges) + override { OrderedRanges childRanges; auto map = ingestLengths(vector, ranges, childRanges); if (childRanges.size() > 0) { @@ -617,10 +598,8 @@ class SlidingWindowMapFieldWriter : public FieldWriter { type->type(), 1, context.bufferMemoryPool.get()); } - void write( - const velox::VectorPtr& vector, - const OrderedRanges& ranges, - folly::Executor*) override { + void write(const velox::VectorPtr& vector, const OrderedRanges& ranges) + override { OrderedRanges childFilteredRanges; auto map = ingestOffsetsAndLengthsDeduplicated( vector, ranges, childFilteredRanges); @@ -845,14 +824,11 @@ class FlatMapFieldWriter : public FieldWriter { typeBuilder_->asFlatMap().nullsDescriptor())}, valueType_{type->childAt(1)} {} - void write( - const velox::VectorPtr& vector, - const OrderedRanges& ranges, - folly::Executor* executor = nullptr) override { + void write(const velox::VectorPtr& vector, const OrderedRanges& ranges) + override { // Check if the vector received is already flattened const auto isFlatMap = vector->type()->kind() == velox::TypeKind::ROW; - isFlatMap ? ingestFlattenedMap(vector, ranges) - : ingestMap(vector, ranges, executor); + isFlatMap ? ingestFlattenedMap(vector, ranges) : ingestMap(vector, ranges); } FlatMapPassthroughValueFieldWriter& createPassthroughValueFieldWriter( @@ -921,10 +897,7 @@ class FlatMapFieldWriter : public FieldWriter { } } - void ingestMap( - const velox::VectorPtr& vector, - const OrderedRanges& ranges, - folly::Executor* executor = nullptr) { + void ingestMap(const velox::VectorPtr& vector, const OrderedRanges& ranges) { NIMBLE_ASSERT( currentPassthroughFields_.empty(), "Mixing map and flatmap vectors in the FlatMapFieldWriter is not supported"); @@ -1006,15 +979,8 @@ class FlatMapFieldWriter : public FieldWriter { // Now actually ingest the map values if (nonNullCount > 0) { auto& values = map->mapValues(); - - if (executor) { - for (auto& pair : currentValueFields_) { - executor->add([&]() { pair.second->write(values, nonNullCount); }); - } - } else { - for (auto& pair : currentValueFields_) { - pair.second->write(values, nonNullCount); - } + for (auto& pair : currentValueFields_) { + pair.second->write(values, nonNullCount); } } nonNullCount_ += nonNullCount; @@ -1064,8 +1030,6 @@ class FlatMapFieldWriter : public FieldWriter { // check whether the typebuilder for this key is already present auto flatFieldIt = allValueFields_.find(key); if (flatFieldIt == allValueFields_.end()) { - std::scoped_lock lock{context_.flatMapSchemaMutex}; - auto valueFieldWriter = FieldWriter::create(context_, valueType_); const auto& inMapDescriptor = typeBuilder_->asFlatMap().addChild( stringKey, valueFieldWriter->typeBuilder()); @@ -1165,10 +1129,8 @@ class ArrayWithOffsetsFieldWriter : public FieldWriter { type->type(), 1, context.bufferMemoryPool.get()); } - void write( - const velox::VectorPtr& vector, - const OrderedRanges& ranges, - folly::Executor*) override { + void write(const velox::VectorPtr& vector, const OrderedRanges& ranges) + override { OrderedRanges childFilteredRanges; const velox::ArrayVector* array; // To unwrap the dictionaryVector we need to cast into ComplexType before diff --git a/dwio/nimble/velox/FieldWriter.h b/dwio/nimble/velox/FieldWriter.h index 98212c4..04d776b 100644 --- a/dwio/nimble/velox/FieldWriter.h +++ b/dwio/nimble/velox/FieldWriter.h @@ -92,7 +92,6 @@ struct FieldWriterContext { } std::shared_ptr bufferMemoryPool; - std::mutex flatMapSchemaMutex; SchemaBuilder schemaBuilder; folly::F14FastSet flatMapNodeIds; @@ -167,8 +166,7 @@ class FieldWriter { // Writes the vector to internal buffers. virtual void write( const velox::VectorPtr& vector, - const OrderedRanges& ranges, - folly::Executor* executor = nullptr) = 0; + const OrderedRanges& ranges) = 0; // Clears interanl state and any accumulated data in internal buffers. virtual void reset() = 0; diff --git a/dwio/nimble/velox/VeloxWriter.cpp b/dwio/nimble/velox/VeloxWriter.cpp index f381bbb..0c5ea32 100644 --- a/dwio/nimble/velox/VeloxWriter.cpp +++ b/dwio/nimble/velox/VeloxWriter.cpp @@ -505,14 +505,7 @@ bool VeloxWriter::write(const velox::VectorPtr& vector) { NIMBLE_CHECK(file_, "Writer is already closed"); try { auto size = vector->size(); - if (context_->options.writeExecutor) { - velox::dwio::common::ExecutorBarrier barrier{ - *context_->options.writeExecutor}; - root_->write(vector, OrderedRanges::of(0, size), &barrier); - barrier.waitAll(); - } else { - root_->write(vector, OrderedRanges::of(0, size)); - } + root_->write(vector, OrderedRanges::of(0, size)); uint64_t memoryUsed = 0; for (const auto& stream : context_->streams()) { @@ -712,7 +705,7 @@ void VeloxWriter::writeChunk(bool lastChunk) { if (context_->options.encodingExecutor) { velox::dwio::common::ExecutorBarrier barrier{ - *context_->options.encodingExecutor}; + context_->options.encodingExecutor}; for (auto& streamData : context_->streams()) { processStream( *streamData, [&](StreamData& innerStreamData, bool isNullStream) { diff --git a/dwio/nimble/velox/VeloxWriterOptions.h b/dwio/nimble/velox/VeloxWriterOptions.h index 62c6dce..8e4386a 100644 --- a/dwio/nimble/velox/VeloxWriterOptions.h +++ b/dwio/nimble/velox/VeloxWriterOptions.h @@ -129,8 +129,6 @@ struct VeloxWriterOptions { // If provided, internal encoding operations will happen in parallel using // this executor. std::shared_ptr encodingExecutor; - // If provided, internal ingestion operations will happen in parallel - std::shared_ptr writeExecutor; bool enableChunking = false; diff --git a/dwio/nimble/velox/tests/VeloxReaderTests.cpp b/dwio/nimble/velox/tests/VeloxReaderTests.cpp index 8b0cf02..72fee6a 100644 --- a/dwio/nimble/velox/tests/VeloxReaderTests.cpp +++ b/dwio/nimble/velox/tests/VeloxReaderTests.cpp @@ -2430,10 +2430,8 @@ TEST_F(VeloxReaderTests, FuzzSimple) { LOG(INFO) << "Parallelism Factor: " << parallelismFactor; nimble::VeloxWriterOptions writerOptions; if (parallelismFactor > 0) { - auto executor = + writerOptions.encodingExecutor = std::make_shared(parallelismFactor); - writerOptions.encodingExecutor = executor; - writerOptions.writeExecutor = executor; } for (auto i = 0; i < iterations; ++i) { @@ -2467,14 +2465,6 @@ TEST_F(VeloxReaderTests, FuzzComplex) { {"a", velox::REAL()}, {"b", velox::INTEGER()}, })}, - {"row", - velox::ROW( - {{"nested_row", - velox::ROW( - {{"nested_nested_row", velox::ROW({{"a", velox::INTEGER()}})}, - {"b", velox::INTEGER()}})}})}, - {"map", - velox::MAP(velox::INTEGER(), velox::ROW({{"a", velox::INTEGER()}}))}, {"nested", velox::ARRAY(velox::ROW({ {"a", velox::INTEGER()}, @@ -2527,12 +2517,9 @@ TEST_F(VeloxReaderTests, FuzzComplex) { for (auto parallelismFactor : {0U, 1U, std::thread::hardware_concurrency()}) { LOG(INFO) << "Parallelism Factor: " << parallelismFactor; - if (parallelismFactor > 0) { - auto executor = - std::make_shared(parallelismFactor); - writerOptions.encodingExecutor = executor; - writerOptions.writeExecutor = executor; - } + writerOptions.encodingExecutor = parallelismFactor > 0 + ? std::make_shared(parallelismFactor) + : nullptr; for (auto i = 0; i < iterations; ++i) { writeAndVerify(