From 7ac1be166d223901432f2d72c02e1102ee169693 Mon Sep 17 00:00:00 2001 From: Jimmy Lu Date: Mon, 11 Nov 2024 10:51:03 -0800 Subject: [PATCH] feat(dwio): Delta update support in selective readers (#11501) Summary: Pull Request resolved: https://github.com/facebookincubator/velox/pull/11501 Differential Revision: D65757116 --- velox/dwio/common/ColumnLoader.cpp | 184 ++++++++++++++---- velox/dwio/common/ColumnLoader.h | 27 ++- velox/dwio/common/FileSink.cpp | 4 +- velox/dwio/common/Mutation.h | 13 ++ velox/dwio/common/Options.h | 7 +- velox/dwio/common/Reader.cpp | 163 ++++++++++------ velox/dwio/common/Reader.h | 4 + velox/dwio/common/ScanSpec.cpp | 180 ++++++++++++----- velox/dwio/common/ScanSpec.h | 45 +++-- velox/dwio/common/SelectiveColumnReader.h | 9 +- .../common/SelectiveStructColumnReader.cpp | 77 ++++---- .../dwio/common/SelectiveStructColumnReader.h | 18 ++ velox/dwio/dwrf/reader/DwrfReader.cpp | 93 +-------- .../dwrf/test/utils/E2EWriterTestUtil.cpp | 4 +- velox/type/Type.cpp | 8 - velox/type/Type.h | 9 +- 16 files changed, 536 insertions(+), 309 deletions(-) diff --git a/velox/dwio/common/ColumnLoader.cpp b/velox/dwio/common/ColumnLoader.cpp index 8fd859f7a2b83..6b9985684ea5b 100644 --- a/velox/dwio/common/ColumnLoader.cpp +++ b/velox/dwio/common/ColumnLoader.cpp @@ -20,11 +20,54 @@ namespace facebook::velox::dwio::common { +namespace { + +RowSet read( + SelectiveStructColumnReaderBase* structReader, + SelectiveColumnReader* fieldReader, + uint64_t version, + RowSet rows, + raw_vector& selectedRows, + ValueHook* hook) { + VELOX_CHECK_EQ( + version, + structReader->numReads(), + "Loading LazyVector after the enclosing reader has moved"); + const auto offset = structReader->lazyVectorReadOffset(); + const auto* incomingNulls = structReader->nulls(); + const auto outputRows = structReader->outputRows(); + RowSet effectiveRows; + + if (rows.size() == outputRows.size()) { + // All the rows planned at creation are accessed. + effectiveRows = outputRows; + } else { + // rows is a set of indices into outputRows. There has been a + // selection between creation and loading. + selectedRows.resize(rows.size()); + VELOX_DCHECK(!selectedRows.empty()); + for (auto i = 0; i < rows.size(); ++i) { + selectedRows[i] = outputRows[rows[i]]; + } + effectiveRows = selectedRows; + } + + structReader->advanceFieldReader(fieldReader, offset); + fieldReader->scanSpec()->setValueHook(hook); + fieldReader->read(offset, effectiveRows, incomingNulls); + if (fieldReader->fileType().type()->isRow()) { + // 'fieldReader_' may itself produce LazyVectors. For this it must have its + // result row numbers set. + static_cast(fieldReader) + ->setLoadableRows(effectiveRows); + } + return effectiveRows; +} + // Wraps '*result' in a dictionary to make the contiguous values // appear at the indices i 'rows'. Used when loading a LazyVector for // a sparse set of rows in conditional exprs. -namespace { -static void scatter(RowSet rows, vector_size_t resultSize, VectorPtr* result) { +void scatter(RowSet rows, vector_size_t resultSize, VectorPtr* result) { VELOX_CHECK_GE(resultSize, rows.back() + 1); // Initialize the indices to 0 to make the dictionary safely @@ -40,6 +83,62 @@ static void scatter(RowSet rows, vector_size_t resultSize, VectorPtr* result) { result->get()->disableMemo(); *result = BaseVector::wrapInDictionary(nullptr, indices, resultSize, *result); } + +template +void addToHookImpl( + const DecodedVector& decoded, + const RowSet& rows, + ValueHook& hook) { + if (decoded.isIdentityMapping()) { + auto* values = decoded.data(); + hook.addValues(rows.data(), values, rows.size()); + return; + } + for (auto i : rows) { + if (!decoded.isNullAt(i)) { + hook.addValueTyped(i, decoded.valueAt(i)); + } else if (hook.acceptsNulls()) { + hook.addNull(i); + } + } +} + +void addToHook( + const DecodedVector& decoded, + const RowSet& rows, + ValueHook& hook) { + switch (decoded.base()->typeKind()) { + case TypeKind::BOOLEAN: + addToHookImpl(decoded, rows, hook); + break; + case TypeKind::TINYINT: + addToHookImpl(decoded, rows, hook); + break; + case TypeKind::SMALLINT: + addToHookImpl(decoded, rows, hook); + break; + case TypeKind::INTEGER: + addToHookImpl(decoded, rows, hook); + break; + case TypeKind::BIGINT: + addToHookImpl(decoded, rows, hook); + break; + case TypeKind::REAL: + addToHookImpl(decoded, rows, hook); + break; + case TypeKind::DOUBLE: + addToHookImpl(decoded, rows, hook); + break; + case TypeKind::VARCHAR: + case TypeKind::VARBINARY: + addToHookImpl(decoded, rows, hook); + break; + default: + VELOX_FAIL( + "Unsupported type kind for hook: {}", decoded.base()->typeKind()); + } +} + } // namespace void ColumnLoader::loadInternal( @@ -48,48 +147,19 @@ void ColumnLoader::loadInternal( vector_size_t resultSize, VectorPtr* result) { process::TraceContext trace("ColumnLoader::loadInternal"); - VELOX_CHECK_EQ( - version_, - structReader_->numReads(), - "Loading LazyVector after the enclosing reader has moved"); - const auto offset = structReader_->lazyVectorReadOffset(); - const auto* incomingNulls = structReader_->nulls(); - const auto outputRows = structReader_->outputRows(); - raw_vector selectedRows; - RowSet effectiveRows; ExceptionContextSetter exceptionContext( {[](VeloxException::Type /*exceptionType*/, auto* reader) { return static_cast(reader) ->debugString(); }, structReader_}); - - if (rows.size() == outputRows.size()) { - // All the rows planned at creation are accessed. - effectiveRows = outputRows; - } else { - // rows is a set of indices into outputRows. There has been a - // selection between creation and loading. - selectedRows.resize(rows.size()); - assert(!selectedRows.empty()); - for (auto i = 0; i < rows.size(); ++i) { - selectedRows[i] = outputRows[rows[i]]; - } - effectiveRows = RowSet(selectedRows); - } - - structReader_->advanceFieldReader(fieldReader_, offset); - fieldReader_->scanSpec()->setValueHook(hook); - fieldReader_->read(offset, effectiveRows, incomingNulls); - if (fieldReader_->fileType().type()->kind() == TypeKind::ROW) { - // 'fieldReader_' may itself produce LazyVectors. For this it must have its - // result row numbers set. - static_cast(fieldReader_) - ->setLoadableRows(effectiveRows); - } + raw_vector selectedRows; + auto effectiveRows = + read(structReader_, fieldReader_, version_, rows, selectedRows, hook); if (!hook) { fieldReader_->getValues(effectiveRows, result); - if (((rows.back() + 1) < resultSize) || rows.size() != outputRows.size()) { + if (((rows.back() + 1) < resultSize) || + rows.size() != structReader_->outputRows().size()) { // We read sparsely. The values that were read should appear // at the indices in the result vector that were given by // 'rows'. @@ -98,4 +168,46 @@ void ColumnLoader::loadInternal( } } +void DeltaUpdateColumnLoader::loadInternal( + RowSet rows, + ValueHook* hook, + vector_size_t resultSize, + VectorPtr* result) { + process::TraceContext trace("DeltaUpdateColumnLoader::loadInternal"); + ExceptionContextSetter exceptionContext( + {[](VeloxException::Type /*exceptionType*/, auto* reader) { + return static_cast(reader) + ->debugString(); + }, + structReader_}); + auto* scanSpec = fieldReader_->scanSpec(); + VELOX_CHECK( + scanSpec->columnType() == velox::common::ScanSpec::ColumnType::kRegular); + scanSpec->setValueHook(nullptr); + if (scanSpec->isConstant()) { + SelectiveStructColumnReaderBase::setConstantField( + scanSpec->constantValue(), resultSize, *result); + scanSpec->deltaUpdate()->update(rows, *result); + return; + } + raw_vector selectedRows; + RowSet effectiveRows; + // Filters on delta updated columns need to be converted into clauses in + // remaining filter and evaluated there. Here we should just read everything + // in effectiveRows. + effectiveRows = + read(structReader_, fieldReader_, version_, rows, selectedRows, nullptr); + fieldReader_->getValues(effectiveRows, result); + scanSpec->deltaUpdate()->update(effectiveRows, *result); + if (hook) { + VELOX_CHECK(!scanSpec->hasFilter()); + DecodedVector decoded(**result); + addToHook(decoded, effectiveRows, *hook); + } else if ( + rows.back() + 1 < resultSize || + rows.size() != structReader_->outputRows().size()) { + scatter(rows, resultSize, result); + } +} + } // namespace facebook::velox::dwio::common diff --git a/velox/dwio/common/ColumnLoader.h b/velox/dwio/common/ColumnLoader.h index ed883d23a143f..2a2109f35cf02 100644 --- a/velox/dwio/common/ColumnLoader.h +++ b/velox/dwio/common/ColumnLoader.h @@ -20,7 +20,7 @@ namespace facebook::velox::dwio::common { -class ColumnLoader : public velox::VectorLoader { +class ColumnLoader : public VectorLoader { public: ColumnLoader( SelectiveStructColumnReaderBase* structReader, @@ -30,14 +30,13 @@ class ColumnLoader : public velox::VectorLoader { fieldReader_(fieldReader), version_(version) {} - protected: + private: void loadInternal( RowSet rows, ValueHook* hook, vector_size_t resultSize, VectorPtr* result) override; - private: SelectiveStructColumnReaderBase* const structReader_; SelectiveColumnReader* const fieldReader_; // This is checked against the version of 'structReader' on load. If @@ -46,4 +45,26 @@ class ColumnLoader : public velox::VectorLoader { const uint64_t version_; }; +class DeltaUpdateColumnLoader : public VectorLoader { + public: + DeltaUpdateColumnLoader( + SelectiveStructColumnReaderBase* structReader, + SelectiveColumnReader* fieldReader, + uint64_t version) + : structReader_(structReader), + fieldReader_(fieldReader), + version_(version) {} + + private: + void loadInternal( + RowSet rows, + ValueHook* hook, + vector_size_t resultSize, + VectorPtr* result) override; + + SelectiveStructColumnReaderBase* const structReader_; + SelectiveColumnReader* const fieldReader_; + const uint64_t version_; +}; + } // namespace facebook::velox::dwio::common diff --git a/velox/dwio/common/FileSink.cpp b/velox/dwio/common/FileSink.cpp index 45f5362d0e521..35fb65c1b7a3a 100644 --- a/velox/dwio/common/FileSink.cpp +++ b/velox/dwio/common/FileSink.cpp @@ -128,8 +128,8 @@ void WriteFileSink::write(std::vector>& buffers) { } void WriteFileSink::doClose() { - LOG(INFO) << "closing file: " << name() - << ", total size: " << succinctBytes(size_); + VLOG(1) << "closing file: " << name() + << ", total size: " << succinctBytes(size_); if (writeFile_ != nullptr) { writeFile_->close(); } diff --git a/velox/dwio/common/Mutation.h b/velox/dwio/common/Mutation.h index de5ff469c71a7..7a1c0d03a0d82 100644 --- a/velox/dwio/common/Mutation.h +++ b/velox/dwio/common/Mutation.h @@ -17,11 +17,13 @@ #pragma once #include "velox/common/base/RandomUtil.h" +#include "velox/vector/LazyVector.h" #include namespace facebook::velox::dwio::common { +/// Top row level mutations. struct Mutation { /// Bit masks for row numbers to be deleted. const uint64_t* deletedRows = nullptr; @@ -33,4 +35,15 @@ inline bool hasDeletion(const Mutation* mutation) { return mutation && (mutation->deletedRows || mutation->randomSkip); } +class DeltaColumnUpdater { + public: + virtual ~DeltaColumnUpdater() = default; + + /// Update the values in `result' to reflect the delta updates on `baseRows'. + /// `baseRows' are the rows starting from the beginning of the current scan + /// (so that the delta readers can use this to choose which lines to read as + /// well), not based on the positions in `result'. + virtual void update(const RowSet& baseRows, VectorPtr& result) = 0; +}; + } // namespace facebook::velox::dwio::common diff --git a/velox/dwio/common/Options.h b/velox/dwio/common/Options.h index 569d30ffd77fc..f785e701b1ce9 100644 --- a/velox/dwio/common/Options.h +++ b/velox/dwio/common/Options.h @@ -448,9 +448,10 @@ class ReaderOptions : public io::ReaderOptions { return *this; } - /// Sets the schema of the file (a Type tree). For "dwrf" format, a default - /// schema is derived from the file. For "rc" format, there is no default - /// schema. + /// Sets the current table schema of the file (a Type tree). This could be + /// different from the actual schema in file if schema evolution happened. + /// For "dwrf" format, a default schema is derived from the file. For "rc" + /// format, there is no default schema. ReaderOptions& setFileSchema(const RowTypePtr& schema) { fileSchema_ = schema; return *this; diff --git a/velox/dwio/common/Reader.cpp b/velox/dwio/common/Reader.cpp index 3fd7241ecb9e4..eb21b67b1882e 100644 --- a/velox/dwio/common/Reader.cpp +++ b/velox/dwio/common/Reader.cpp @@ -20,67 +20,6 @@ namespace facebook::velox::dwio::common { using namespace velox::common; -namespace { - -template -bool filterSimpleVectorRow( - const BaseVector& vector, - Filter& filter, - vector_size_t index) { - using T = typename TypeTraits::NativeType; - auto* simpleVector = vector.asUnchecked>(); - return applyFilter(filter, simpleVector->valueAt(index)); -} - -bool filterRow(const BaseVector& vector, Filter& filter, vector_size_t index) { - if (vector.isNullAt(index)) { - return filter.testNull(); - } - switch (vector.typeKind()) { - case TypeKind::ARRAY: - case TypeKind::MAP: - case TypeKind::ROW: - VELOX_USER_CHECK( - filter.kind() == FilterKind::kIsNull || - filter.kind() == FilterKind::kIsNotNull, - "Complex type can only take null filter, got {}", - filter.toString()); - return filter.testNonNull(); - default: - return VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH( - filterSimpleVectorRow, vector.typeKind(), vector, filter, index); - } -} - -void applyFilter( - const BaseVector& vector, - const ScanSpec& spec, - uint64_t* result) { - if (spec.filter()) { - bits::forEachSetBit(result, 0, vector.size(), [&](auto i) { - if (!filterRow(vector, *spec.filter(), i)) { - bits::clearBit(result, i); - } - }); - } - if (!vector.type()->isRow()) { - // Filter on MAP or ARRAY children are pruning, and won't affect correctness - // of the result. - return; - } - auto& rowType = vector.type()->asRow(); - auto* rowVector = vector.as(); - // Should not have any lazy from non-selective reader. - VELOX_CHECK_NOT_NULL(rowVector); - for (auto& childSpec : spec.children()) { - auto child = - rowVector->childAt(rowType.getChildIdx(childSpec->fieldName())); - applyFilter(*child, *childSpec, result); - } -} - -} // namespace - VectorPtr RowReader::projectColumns( const VectorPtr& input, const ScanSpec& spec, @@ -117,7 +56,7 @@ VectorPtr RowReader::projectColumns( } else { child = inputRow->childAt(inputRowType.getChildIdx(childSpec->fieldName())); - applyFilter(*child, *childSpec, passed.data()); + childSpec->applyFilter(*child, passed.data()); } if (!childSpec->projectOut()) { continue; @@ -238,4 +177,104 @@ void RowReader::readWithRowNumber( std::move(children)); } +namespace { + +void logTypeInequality( + const Type& fileType, + const Type& tableType, + const std::string& fileFieldName, + const std::string& tableFieldName) { + VLOG(1) << "Type of the File field '" << fileFieldName + << "' does not match the type of the Table field '" << tableFieldName + << "': [" << fileType.toString() << "] vs [" << tableType.toString() + << "]"; +} + +// Forward declaration for general type tree recursion function. +TypePtr updateColumnNamesImpl( + const TypePtr& fileType, + const TypePtr& tableType, + const std::string& fileFieldName, + const std::string& tableFieldName); + +// Non-primitive type tree recursion function. +template +TypePtr updateColumnNamesImpl( + const TypePtr& fileType, + const TypePtr& tableType) { + const auto fileRowType = std::dynamic_pointer_cast(fileType); + const auto tableRowType = std::dynamic_pointer_cast(tableType); + + std::vector newFileFieldNames; + newFileFieldNames.reserve(fileRowType->size()); + std::vector newFileFieldTypes; + newFileFieldTypes.reserve(fileRowType->size()); + + for (auto childIdx = 0; childIdx < tableRowType->size(); ++childIdx) { + if (childIdx >= fileRowType->size()) { + break; + } + + newFileFieldTypes.push_back(updateColumnNamesImpl( + fileRowType->childAt(childIdx), + tableRowType->childAt(childIdx), + fileRowType->nameOf(childIdx), + tableRowType->nameOf(childIdx))); + + newFileFieldNames.push_back(tableRowType->nameOf(childIdx)); + } + + for (auto childIdx = tableRowType->size(); childIdx < fileRowType->size(); + ++childIdx) { + newFileFieldTypes.push_back(fileRowType->childAt(childIdx)); + newFileFieldNames.push_back(fileRowType->nameOf(childIdx)); + } + + return std::make_shared( + std::move(newFileFieldNames), std::move(newFileFieldTypes)); +} + +// General type tree recursion function. +TypePtr updateColumnNamesImpl( + const TypePtr& fileType, + const TypePtr& tableType, + const std::string& fileFieldName, + const std::string& tableFieldName) { + // Check type kind equality. If not equal, no point to continue down the + // tree. + if (fileType->kind() != tableType->kind()) { + logTypeInequality(*fileType, *tableType, fileFieldName, tableFieldName); + return fileType; + } + + // For leaf types we return type as is. + if (fileType->isPrimitiveType()) { + return fileType; + } + + if (fileType->isRow()) { + return updateColumnNamesImpl(fileType, tableType); + } + + if (fileType->isMap()) { + return updateColumnNamesImpl(fileType, tableType); + } + + if (fileType->isArray()) { + return updateColumnNamesImpl(fileType, tableType); + } + + // We should not be here. + VLOG(1) << "Unexpected table type during column names update for File field '" + << fileFieldName << "': [" << fileType->toString() << "]"; + return fileType; +} +} // namespace + +TypePtr Reader::updateColumnNames( + const TypePtr& fileType, + const TypePtr& tableType) { + return updateColumnNamesImpl(fileType, tableType, "", ""); +} + } // namespace facebook::velox::dwio::common diff --git a/velox/dwio/common/Reader.h b/velox/dwio/common/Reader.h index 0c3da77d29ce8..9dddfaeaca082 100644 --- a/velox/dwio/common/Reader.h +++ b/velox/dwio/common/Reader.h @@ -206,6 +206,10 @@ class Reader { */ virtual std::unique_ptr createRowReader( const RowReaderOptions& options = {}) const = 0; + + static TypePtr updateColumnNames( + const TypePtr& fileType, + const TypePtr& tableType); }; } // namespace facebook::velox::dwio::common diff --git a/velox/dwio/common/ScanSpec.cpp b/velox/dwio/common/ScanSpec.cpp index a37cbc98ec6ab..8f950ff32334a 100644 --- a/velox/dwio/common/ScanSpec.cpp +++ b/velox/dwio/common/ScanSpec.cpp @@ -15,6 +15,8 @@ */ #include "velox/dwio/common/ScanSpec.h" + +#include "velox/core/Expressions.h" #include "velox/dwio/common/Statistics.h" namespace facebook::velox::common { @@ -42,20 +44,43 @@ ScanSpec* ScanSpec::getOrCreateChild(const Subfield& subfield) { return container; } +bool ScanSpec::compareTimeToDropValue( + const std::shared_ptr& left, + const std::shared_ptr& right) { + if (left->hasFilter() && right->hasFilter()) { + if (left->selectivity_.numIn() || right->selectivity_.numIn()) { + return left->selectivity_.timeToDropValue() < + right->selectivity_.timeToDropValue(); + } + // Integer filters are before other filters if there is no + // history data. + if (left->filter_ && right->filter_) { + return left->filter_->kind() < right->filter_->kind(); + } + // If hasFilter() is true but 'filter_' is nullptr, we have a filter + // on complex type members. The simple type filter goes first. + if (left->filter_) { + return true; + } + if (right->filter_) { + return false; + } + return left->fieldName_ < right->fieldName_; + } + if (left->hasFilter()) { + return true; + } + if (right->hasFilter()) { + return false; + } + return left->fieldName_ < right->fieldName_; +} + uint64_t ScanSpec::newRead() { - if (numReads_ == 0) { + if (numReads_ == 0 || + !std::is_sorted( + children_.begin(), children_.end(), compareTimeToDropValue)) { reorder(); - } else if (enableFilterReorder_) { - for (auto i = 1; i < children_.size(); ++i) { - if (!children_[i]->filter_) { - break; - } - if (children_[i - 1]->selectivity_.timeToDropValue() > - children_[i]->selectivity_.timeToDropValue()) { - reorder(); - break; - } - } } return ++numReads_; } @@ -64,44 +89,16 @@ void ScanSpec::reorder() { if (children_.empty()) { return; } - // Make sure 'stableChildren_' is initialized. stableChildren(); - std::sort( - children_.begin(), - children_.end(), - [this]( - const std::shared_ptr& left, - const std::shared_ptr& right) { - if (left->hasFilter() && right->hasFilter()) { - if (enableFilterReorder_ && - (left->selectivity_.numIn() || right->selectivity_.numIn())) { - return left->selectivity_.timeToDropValue() < - right->selectivity_.timeToDropValue(); - } - // Integer filters are before other filters if there is no - // history data. - if (left->filter_ && right->filter_) { - return left->filter_->kind() < right->filter_->kind(); - } - // If hasFilter() is true but 'filter_' is nullptr, we have a filter - // on complex type members. The simple type filter goes first. - if (left->filter_) { - return true; - } - if (right->filter_) { - return false; - } - return left->fieldName_ < right->fieldName_; - } - if (left->hasFilter()) { - return true; - } - if (right->hasFilter()) { - return false; - } - return left->fieldName_ < right->fieldName_; - }); + std::sort(children_.begin(), children_.end(), compareTimeToDropValue); +} + +void ScanSpec::enableFilterInSubTree(bool value) { + filterDisabled_ = !value; + for (auto& child : children_) { + child->enableFilterInSubTree(value); + } } const std::vector& ScanSpec::stableChildren() { @@ -116,10 +113,13 @@ const std::vector& ScanSpec::stableChildren() { } bool ScanSpec::hasFilter() const { + if (filterDisabled_) { + return false; + } if (hasFilter_.has_value()) { return hasFilter_.value(); } - if (!isConstant() && filter_) { + if (!isConstant() && filter()) { hasFilter_ = true; return true; } @@ -134,7 +134,8 @@ bool ScanSpec::hasFilter() const { } bool ScanSpec::testNull() const { - if (filter_ && !filter_->testNull()) { + auto* filter = this->filter(); + if (filter && !filter->testNull()) { return false; } for (auto& child : children_) { @@ -146,6 +147,7 @@ bool ScanSpec::testNull() const { } void ScanSpec::moveAdaptationFrom(ScanSpec& other) { + VELOX_CHECK(!filterDisabled_); // moves the filters and filter order from 'other'. for (auto& child : children_) { auto it = other.childByFieldName_.find(child->fieldName_); @@ -458,4 +460,82 @@ void ScanSpec::addAllChildFields(const Type& type) { } } +namespace { + +template +void filterSimpleVectorRows( + const BaseVector& vector, + Filter& filter, + vector_size_t size, + uint64_t* result) { + using T = typename TypeTraits::NativeType; + auto* simpleVector = vector.asChecked>(); + VELOX_CHECK_NOT_NULL(simpleVector); + bits::forEachSetBit(result, 0, size, [&](auto i) { + if (simpleVector->isNullAt(i)) { + if (!filter.testNull()) { + bits::clearBit(result, i); + } + } else if (!applyFilter(filter, simpleVector->valueAt(i))) { + bits::clearBit(result, i); + } + }); +} + +void filterRows( + const BaseVector& vector, + Filter& filter, + vector_size_t size, + uint64_t* result) { + switch (vector.typeKind()) { + case TypeKind::ARRAY: + case TypeKind::MAP: + case TypeKind::ROW: + VELOX_CHECK( + filter.kind() == FilterKind::kIsNull || + filter.kind() == FilterKind::kIsNotNull, + "Complex type can only take null filter, got {}", + filter.toString()); + bits::forEachSetBit(result, 0, size, [&](auto i) { + bool pass = + vector.isNullAt(i) ? filter.testNull() : filter.testNonNull(); + if (!pass) { + bits::clearBit(result, i); + } + }); + break; + default: + return VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH( + filterSimpleVectorRows, + vector.typeKind(), + vector, + filter, + size, + result); + } +} + +} // namespace + +void ScanSpec::applyFilter(const BaseVector& vector, uint64_t* result) const { + if (!hasFilter()) { + return; + } + if (auto* filter = this->filter()) { + filterRows(vector, *filter, vector.size(), result); + } + if (!vector.type()->isRow()) { + // Filter on MAP or ARRAY children are pruning, and won't affect correctness + // of the result. + return; + } + auto& rowType = vector.type()->asRow(); + auto* rowVector = vector.asChecked(); + for (int i = 0; i < rowType.size(); ++i) { + if (auto* child = childByName(rowType.nameOf(i))) { + child->applyFilter(*rowVector->childAt(i), result); + } + } +} + } // namespace facebook::velox::common diff --git a/velox/dwio/common/ScanSpec.h b/velox/dwio/common/ScanSpec.h index 25bbc543045f3..794b335e988bc 100644 --- a/velox/dwio/common/ScanSpec.h +++ b/velox/dwio/common/ScanSpec.h @@ -18,6 +18,7 @@ #include "velox/common/base/SelectivityInfo.h" #include "velox/dwio/common/MetadataFilter.h" +#include "velox/dwio/common/Mutation.h" #include "velox/type/Filter.h" #include "velox/type/Subfield.h" #include "velox/vector/BaseVector.h" @@ -57,7 +58,7 @@ class ScanSpec { // can only be isNull or isNotNull, other filtering is given by // 'children'. common::Filter* filter() const { - return filter_.get(); + return filterDisabled_ ? nullptr : filter_.get(); } // Sets 'filter_'. May be used at initialization or when adding a @@ -83,7 +84,7 @@ class ScanSpec { } int numMetadataFilters() const { - return metadataFilters_.size(); + return filterDisabled_ ? 0 : metadataFilters_.size(); } const MetadataFilter::LeafNode* metadataFilterNodeAt(int i) const { @@ -231,11 +232,11 @@ class ScanSpec { // apply to Nimble format leaf nodes, because nulls are mixed in the encoding // with actual values. bool readsNullsOnly() const { - if (filter_) { - if (filter_->kind() == FilterKind::kIsNull) { + if (auto* filter = this->filter()) { + if (filter->kind() == FilterKind::kIsNull) { return true; } - if (filter_->kind() == FilterKind::kIsNotNull && !projectOut_) { + if (filter->kind() == FilterKind::kIsNotNull && !projectOut_) { return true; } } @@ -275,10 +276,6 @@ class ScanSpec { } } - void setEnableFilterReorder(bool enableFilterReorder) { - enableFilterReorder_ = enableFilterReorder; - } - // Returns the child which produces values for 'channel'. Throws if not found. ScanSpec& getChildByChannel(column_index_t channel); @@ -336,6 +333,26 @@ class ScanSpec { template void visit(const Type& type, F&& f); + dwio::common::DeltaColumnUpdater* deltaUpdate() const { + return deltaUpdate_; + } + + void setDeltaUpdate(dwio::common::DeltaColumnUpdater* update) { + deltaUpdate_ = update; + enableFilterInSubTree(update == nullptr); + } + + void resetDeltaUpdates() { + for (auto& child : children_) { + // Only top level columns can have delta updates. + if (child->deltaUpdate_) { + setDeltaUpdate(nullptr); + } + } + } + + void applyFilter(const BaseVector& vector, uint64_t* result) const; + bool isFlatMapAsStruct() const { return isFlatMapAsStruct_; } @@ -347,6 +364,12 @@ class ScanSpec { private: void reorder(); + void enableFilterInSubTree(bool value); + + static bool compareTimeToDropValue( + const std::shared_ptr& x, + const std::shared_ptr& y); + // Serializes stableChildren(). std::mutex mutex_; @@ -377,6 +400,8 @@ class ScanSpec { // returned as flat. bool makeFlat_ = false; std::unique_ptr filter_; + bool filterDisabled_ = false; + dwio::common::DeltaColumnUpdater* deltaUpdate_ = nullptr; // Filters that will be only used for row group filtering based on metadata. // The conjunctions among these filters are tracked in MetadataFilter, with @@ -387,8 +412,6 @@ class ScanSpec { metadataFilters_; SelectivityInfo selectivity_; - // Sort children by filtering efficiency. - bool enableFilterReorder_ = true; std::vector> children_; // Read-only copy of children, not subject to reordering. Used when diff --git a/velox/dwio/common/SelectiveColumnReader.h b/velox/dwio/common/SelectiveColumnReader.h index b5219763b4473..c4b6acfe20a2b 100644 --- a/velox/dwio/common/SelectiveColumnReader.h +++ b/velox/dwio/common/SelectiveColumnReader.h @@ -183,10 +183,7 @@ class SelectiveColumnReader { // read(). If 'this' has no filter, returns 'rows' passed to last // read(). const RowSet outputRows() const { - if (scanSpec_->hasFilter() || hasDeletion()) { - return outputRows_; - } - return inputRows_; + return useOutputRows() ? outputRows_ : inputRows_; } // Advances to 'offset', so that the next item to be read is the @@ -593,6 +590,10 @@ class SelectiveColumnReader { : resultNulls_; } + bool useOutputRows() const { + return scanSpec_->hasFilter() || hasDeletion(); + } + memory::MemoryPool* const memoryPool_; // The requested data type diff --git a/velox/dwio/common/SelectiveStructColumnReader.cpp b/velox/dwio/common/SelectiveStructColumnReader.cpp index dba62caa0322c..a18cca7435d02 100644 --- a/velox/dwio/common/SelectiveStructColumnReader.cpp +++ b/velox/dwio/common/SelectiveStructColumnReader.cpp @@ -95,18 +95,6 @@ void prepareResult(VectorPtr& result) { result->pool(), result->type(), nullptr, 0, std::move(children)); } -void setConstantField( - const VectorPtr& constant, - vector_size_t size, - VectorPtr& field) { - if (field && field->isConstantEncoding() && field.use_count() == 1 && - field->size() > 0 && field->equalValueAt(constant.get(), 0, 0)) { - field->resize(size); - } else { - field = BaseVector::wrapInConstant(size, 0, constant); - } -} - void setNullField( vector_size_t size, VectorPtr& field, @@ -164,7 +152,8 @@ void setCompositeField( switch (child->columnType()) { case velox::common::ScanSpec::ColumnType::kRegular: VELOX_CHECK(child->isConstant()); - setConstantField(child->constantValue(), rows.size(), childField); + SelectiveStructColumnReaderBase::setConstantField( + child->constantValue(), rows.size(), childField); break; case velox::common::ScanSpec::ColumnType::kRowIndex: setRowNumberField(offset, rows, pool, childField); @@ -176,6 +165,20 @@ void setCompositeField( } } +void setLazyField( + std::unique_ptr loader, + const TypePtr& type, + vector_size_t size, + memory::MemoryPool* pool, + VectorPtr& result) { + if (result && result->isLazy() && result.use_count() == 1) { + static_cast(*result).reset(std::move(loader), size); + } else { + result = std::make_shared( + pool, type, size, std::move(loader), std::move(result)); + } +} + } // namespace void SelectiveStructColumnReaderBase::filterRowGroups( @@ -345,6 +348,12 @@ void SelectiveStructColumnReaderBase::read( for (size_t i = 0; i < childSpecs.size(); ++i) { const auto& childSpec = childSpecs[i]; VELOX_TRACE_HISTORY_PUSH("read %s", childSpec->fieldName().c_str()); + + if (childSpec->deltaUpdate()) { + // Will make LazyVector. + continue; + } + if (isChildConstant(*childSpec)) { if (!testFilterOnConstant(*childSpec)) { activeRows = {}; @@ -461,7 +470,6 @@ void SelectiveStructColumnReaderBase::getValues( } setComplexNulls(rows, *result); - bool lazyPrepared = false; for (const auto& childSpec : scanSpec_->children()) { VELOX_TRACE_HISTORY_PUSH("getValues %s", childSpec->fieldName().c_str()); if (!childSpec->projectOut()) { @@ -469,7 +477,21 @@ void SelectiveStructColumnReaderBase::getValues( } const auto channel = childSpec->channel(); + const auto index = childSpec->subscript(); auto& childResult = resultRow->childAt(channel); + + if (childSpec->deltaUpdate()) { + setOutputRowsForLazy(rows); + setLazyField( + std::make_unique( + this, children_[index], numReads_), + resultRow->type()->childAt(channel), + rows.size(), + memoryPool_, + childResult); + continue; + } + if (childSpec->isConstant()) { setConstantField(childSpec->constantValue(), rows.size(), childResult); continue; @@ -489,7 +511,6 @@ void SelectiveStructColumnReaderBase::getValues( continue; } - const auto index = childSpec->subscript(); // Set missing fields to be null constant, if we're in the top level struct // missing columns should already be a null constant from the check above. if (index == kConstantChildSpecSubscript) { @@ -504,25 +525,13 @@ void SelectiveStructColumnReaderBase::getValues( } // LazyVector result. - if (!lazyPrepared) { - if (rows.size() != outputRows_.size()) { - setOutputRows(rows); - } - lazyPrepared = true; - } - auto lazyLoader = - std::make_unique(this, children_[index], numReads_); - if (childResult && childResult->isLazy() && childResult.use_count() == 1) { - static_cast(*childResult) - .reset(std::move(lazyLoader), rows.size()); - } else { - childResult = std::make_shared( - memoryPool_, - resultRow->type()->childAt(channel), - rows.size(), - std::move(lazyLoader), - std::move(childResult)); - } + setOutputRowsForLazy(rows); + setLazyField( + std::make_unique(this, children_[index], numReads_), + resultRow->type()->childAt(channel), + rows.size(), + memoryPool_, + childResult); } resultRow->updateContainsLazyNotLoaded(); } diff --git a/velox/dwio/common/SelectiveStructColumnReader.h b/velox/dwio/common/SelectiveStructColumnReader.h index 8b95a68385eec..833e8d46af956 100644 --- a/velox/dwio/common/SelectiveStructColumnReader.h +++ b/velox/dwio/common/SelectiveStructColumnReader.h @@ -102,6 +102,18 @@ class SelectiveStructColumnReaderBase : public SelectiveColumnReader { currentRowNumber_ = value; } + static void setConstantField( + const VectorPtr& constant, + vector_size_t size, + VectorPtr& field) { + if (field && field->isConstantEncoding() && field.use_count() == 1 && + field->size() > 0 && field->equalValueAt(constant.get(), 0, 0)) { + field->resize(size); + } else { + field = BaseVector::wrapInConstant(size, 0, constant); + } + } + protected: template friend class SelectiveFlatMapColumnReaderHelper; @@ -146,6 +158,12 @@ class SelectiveStructColumnReaderBase : public SelectiveColumnReader { /// forward within the row group. void recordParentNullsInChildren(int64_t offset, const RowSet& rows); + void setOutputRowsForLazy(const RowSet& rows) { + if (useOutputRows() && rows.size() != outputRows_.size()) { + setOutputRows(rows); + } + } + // Context information obtained from ExceptionContext. Stored here // so that LazyVector readers under this can add this to their // ExceptionContext. Allows contextualizing reader errors to split diff --git a/velox/dwio/dwrf/reader/DwrfReader.cpp b/velox/dwio/dwrf/reader/DwrfReader.cpp index 2f63deef5961a..ed9dede68142a 100644 --- a/velox/dwio/dwrf/reader/DwrfReader.cpp +++ b/velox/dwio/dwrf/reader/DwrfReader.cpp @@ -800,102 +800,11 @@ DwrfReader::DwrfReader( } } -namespace { -void logTypeInequality( - const Type& fileType, - const Type& tableType, - const std::string& fileFieldName, - const std::string& tableFieldName) { - VLOG(1) << "Type of the File field '" << fileFieldName - << "' does not match the type of the Table field '" << tableFieldName - << "': [" << fileType.toString() << "] vs [" << tableType.toString() - << "]"; -} - -// Forward declaration for general type tree recursion function. -TypePtr updateColumnNames( - const TypePtr& fileType, - const TypePtr& tableType, - const std::string& fileFieldName, - const std::string& tableFieldName); - -// Non-primitive type tree recursion function. -template -TypePtr updateColumnNames(const TypePtr& fileType, const TypePtr& tableType) { - const auto fileRowType = std::dynamic_pointer_cast(fileType); - const auto tableRowType = std::dynamic_pointer_cast(tableType); - - std::vector newFileFieldNames; - newFileFieldNames.reserve(fileRowType->size()); - std::vector newFileFieldTypes; - newFileFieldTypes.reserve(fileRowType->size()); - - for (auto childIdx = 0; childIdx < tableRowType->size(); ++childIdx) { - if (childIdx >= fileRowType->size()) { - break; - } - - newFileFieldTypes.push_back(updateColumnNames( - fileRowType->childAt(childIdx), - tableRowType->childAt(childIdx), - fileRowType->nameOf(childIdx), - tableRowType->nameOf(childIdx))); - - newFileFieldNames.push_back(tableRowType->nameOf(childIdx)); - } - - for (auto childIdx = tableRowType->size(); childIdx < fileRowType->size(); - ++childIdx) { - newFileFieldTypes.push_back(fileRowType->childAt(childIdx)); - newFileFieldNames.push_back(fileRowType->nameOf(childIdx)); - } - - return std::make_shared( - std::move(newFileFieldNames), std::move(newFileFieldTypes)); -} - -// General type tree recursion function. -TypePtr updateColumnNames( - const TypePtr& fileType, - const TypePtr& tableType, - const std::string& fileFieldName, - const std::string& tableFieldName) { - // Check type kind equality. If not equal, no point to continue down the - // tree. - if (fileType->kind() != tableType->kind()) { - logTypeInequality(*fileType, *tableType, fileFieldName, tableFieldName); - return fileType; - } - - // For leaf types we return type as is. - if (fileType->isPrimitiveType()) { - return fileType; - } - - if (fileType->isRow()) { - return updateColumnNames(fileType, tableType); - } - - if (fileType->isMap()) { - return updateColumnNames(fileType, tableType); - } - - if (fileType->isArray()) { - return updateColumnNames(fileType, tableType); - } - - // We should not be here. - VLOG(1) << "Unexpected table type during column names update for File field '" - << fileFieldName << "': [" << fileType->toString() << "]"; - return fileType; -} -} // namespace - void DwrfReader::updateColumnNamesFromTableSchema() { const auto& tableSchema = readerBase_->readerOptions().fileSchema(); const auto& fileSchema = readerBase_->schema(); readerBase_->setSchema(std::dynamic_pointer_cast( - updateColumnNames(fileSchema, tableSchema, "", ""))); + updateColumnNames(fileSchema, tableSchema))); } std::unique_ptr DwrfReader::getStripe( diff --git a/velox/dwio/dwrf/test/utils/E2EWriterTestUtil.cpp b/velox/dwio/dwrf/test/utils/E2EWriterTestUtil.cpp index 1504cd1f6185d..fcad82f8a6830 100644 --- a/velox/dwio/dwrf/test/utils/E2EWriterTestUtil.cpp +++ b/velox/dwio/dwrf/test/utils/E2EWriterTestUtil.cpp @@ -57,8 +57,8 @@ namespace facebook::velox::dwrf { } writer->close(); - LOG(INFO) << "writer root pool usage: " - << writer->getContext().testingGetWriterMemoryStats(); + VLOG(1) << "writer root pool usage: " + << writer->getContext().testingGetWriterMemoryStats(); return writer; } diff --git a/velox/type/Type.cpp b/velox/type/Type.cpp index fe1cf1f47ff50..3b1f56465f910 100644 --- a/velox/type/Type.cpp +++ b/velox/type/Type.cpp @@ -376,14 +376,6 @@ std::unique_ptr> RowType::makeParameters() const { createTypeParameters(children_)); } -uint32_t RowType::size() const { - return children_.size(); -} - -const TypePtr& RowType::childAt(uint32_t idx) const { - return children_.at(idx); -} - namespace { template std::string makeFieldNotFoundErrorMessage( diff --git a/velox/type/Type.h b/velox/type/Type.h index 51ba5958406aa..962c389e13b23 100644 --- a/velox/type/Type.h +++ b/velox/type/Type.h @@ -993,9 +993,14 @@ class RowType : public TypeBase { ~RowType() override; - uint32_t size() const override; + uint32_t size() const final { + return children_.size(); + } - const std::shared_ptr& childAt(uint32_t idx) const override; + const TypePtr& childAt(uint32_t idx) const final { + VELOX_CHECK_LT(idx, children_.size()); + return children_[idx]; + } const std::vector>& children() const { return children_;