diff --git a/velox/connectors/hive/HiveConnectorUtil.cpp b/velox/connectors/hive/HiveConnectorUtil.cpp index 68f60098ea73e..f2d2083a7a0b6 100644 --- a/velox/connectors/hive/HiveConnectorUtil.cpp +++ b/velox/connectors/hive/HiveConnectorUtil.cpp @@ -660,6 +660,7 @@ bool applyPartitionFilter( VELOX_FAIL( "Bad type {} for partition value: {}", type->kind(), partitionValue); } + return true; } } // namespace diff --git a/velox/connectors/hive/HiveDataSource.cpp b/velox/connectors/hive/HiveDataSource.cpp index bd930bbffd4f8..cbb9838702ef1 100644 --- a/velox/connectors/hive/HiveDataSource.cpp +++ b/velox/connectors/hive/HiveDataSource.cpp @@ -215,7 +215,10 @@ std::unique_ptr HiveDataSource::createSplitReader() { ioStats_, fileHandleFactory_, executor_, - scanSpec_); + scanSpec_, + remainingFilterExprSet_, + expressionEvaluator_, + totalRemainingFilterTime_); } std::unique_ptr HiveDataSource::setupBucketConversion() { @@ -315,8 +318,10 @@ void HiveDataSource::addSplit(std::shared_ptr split) { } splitReader_ = createSplitReader(); + // Split reader subclasses may need to use the reader options in prepareSplit // so we initialize it beforehand. + splitReader_->configureReaderOptions(randomSkip_); splitReader_->prepareSplit(metadataFilter_, runtimeStats_); } @@ -583,6 +588,7 @@ std::shared_ptr HiveDataSource::toWaveDataSource() { void HiveDataSource::registerWaveDelegateHook(WaveDelegateHookFunction hook) { waveDelegateHook_ = hook; } + std::shared_ptr toWaveDataSource(); } // namespace facebook::velox::connector::hive diff --git a/velox/connectors/hive/HiveDataSource.h b/velox/connectors/hive/HiveDataSource.h index a870966603cab..6bcde10fecf42 100644 --- a/velox/connectors/hive/HiveDataSource.h +++ b/velox/connectors/hive/HiveDataSource.h @@ -164,7 +164,7 @@ class HiveDataSource : public DataSource { subfields_; SubfieldFilters filters_; std::shared_ptr metadataFilter_; - std::unique_ptr remainingFilterExprSet_; + std::shared_ptr remainingFilterExprSet_; RowVectorPtr emptyOutput_; dwio::common::RuntimeStatistics runtimeStats_; std::atomic totalRemainingFilterTime_{0}; diff --git a/velox/connectors/hive/SplitReader.cpp b/velox/connectors/hive/SplitReader.cpp index bfa9e52828c40..fe7c908b6ad5c 100644 --- a/velox/connectors/hive/SplitReader.cpp +++ b/velox/connectors/hive/SplitReader.cpp @@ -75,7 +75,10 @@ std::unique_ptr SplitReader::create( const std::shared_ptr& ioStats, FileHandleFactory* fileHandleFactory, folly::Executor* executor, - const std::shared_ptr& scanSpec) { + const std::shared_ptr& scanSpec, + std::shared_ptr& remainingFilterExprSet, + core::ExpressionEvaluator* expressionEvaluator, + std::atomic& totalRemainingFilterTime) { // Create the SplitReader based on hiveSplit->customSplitInfo["table_format"] if (hiveSplit->customSplitInfo.count("table_format") > 0 && hiveSplit->customSplitInfo["table_format"] == "hive-iceberg") { @@ -89,7 +92,10 @@ std::unique_ptr SplitReader::create( ioStats, fileHandleFactory, executor, - scanSpec); + scanSpec, + remainingFilterExprSet, + expressionEvaluator, + totalRemainingFilterTime); } else { return std::unique_ptr(new SplitReader( hiveSplit, @@ -179,6 +185,11 @@ void SplitReader::resetSplit() { hiveSplit_.reset(); } +std::shared_ptr SplitReader::baseFileSchema() { + VELOX_CHECK_NOT_NULL(baseReader_.get()); + return baseReader_->typeWithId(); +} + int64_t SplitReader::estimatedRowSize() const { if (!baseRowReader_) { return DataSource::kUnknownRowSize; diff --git a/velox/connectors/hive/SplitReader.h b/velox/connectors/hive/SplitReader.h index b8ab6e10fd040..1fe3ccd13a608 100644 --- a/velox/connectors/hive/SplitReader.h +++ b/velox/connectors/hive/SplitReader.h @@ -64,7 +64,10 @@ class SplitReader { const std::shared_ptr& ioStats, FileHandleFactory* fileHandleFactory, folly::Executor* executor, - const std::shared_ptr& scanSpec); + const std::shared_ptr& scanSpec, + std::shared_ptr& remainingFilterExprSet, + core::ExpressionEvaluator* expressionEvaluator, + std::atomic& totalRemainingFilterTime); virtual ~SplitReader() = default; @@ -87,6 +90,8 @@ class SplitReader { void resetSplit(); + std::shared_ptr baseFileSchema(); + int64_t estimatedRowSize() const; void updateRuntimeStats(dwio::common::RuntimeStatistics& stats) const; diff --git a/velox/connectors/hive/iceberg/CMakeLists.txt b/velox/connectors/hive/iceberg/CMakeLists.txt index bc78005c91bb1..c2721f6c07050 100644 --- a/velox/connectors/hive/iceberg/CMakeLists.txt +++ b/velox/connectors/hive/iceberg/CMakeLists.txt @@ -12,8 +12,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -velox_add_library(velox_hive_iceberg_splitreader IcebergSplitReader.cpp - IcebergSplit.cpp PositionalDeleteFileReader.cpp) +velox_add_library( + velox_hive_iceberg_splitreader + IcebergSplitReader.cpp + IcebergSplit.cpp + PositionalDeleteFileReader.cpp + EqualityDeleteFileReader.cpp + FilterUtil.cpp) velox_link_libraries(velox_hive_iceberg_splitreader velox_connector Folly::folly) diff --git a/velox/connectors/hive/iceberg/EqualityDeleteFileReader.cpp b/velox/connectors/hive/iceberg/EqualityDeleteFileReader.cpp new file mode 100644 index 0000000000000..b6f146765972c --- /dev/null +++ b/velox/connectors/hive/iceberg/EqualityDeleteFileReader.cpp @@ -0,0 +1,227 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/connectors/hive/iceberg/EqualityDeleteFileReader.h" + +#include "velox/connectors/hive/HiveConnectorUtil.h" +#include "velox/connectors/hive/iceberg/FilterUtil.h" +#include "velox/connectors/hive/iceberg/IcebergDeleteFile.h" +#include "velox/dwio/common/ReaderFactory.h" +#include "velox/dwio/common/TypeUtils.h" + +using namespace facebook::velox::common; +using namespace facebook::velox::core; +using namespace facebook::velox::exec; + +namespace facebook::velox::connector::hive::iceberg { + +static constexpr const int kMaxBatchRows = 10'000; + +EqualityDeleteFileReader::EqualityDeleteFileReader( + const IcebergDeleteFile& deleteFile, + std::shared_ptr baseFileSchema, + FileHandleFactory* fileHandleFactory, + folly::Executor* executor, + const ConnectorQueryCtx* connectorQueryCtx, + const std::shared_ptr hiveConfig, + std::shared_ptr ioStats, + dwio::common::RuntimeStatistics& runtimeStats, + const std::string& connectorId) + : deleteFile_(deleteFile), + baseFileSchema_(baseFileSchema), + connectorQueryCtx_(connectorQueryCtx), + hiveConfig_(hiveConfig), + deleteSplit_(nullptr), + fileHandleFactory_(fileHandleFactory), + executor_(executor), + pool_(connectorQueryCtx->memoryPool()), + ioStats_(ioStats), + deleteRowReader_(nullptr) { + VELOX_CHECK(deleteFile_.content == FileContent::kEqualityDeletes); + + if (deleteFile_.recordCount == 0) { + return; + } + + std::unordered_set equalityFieldIds( + deleteFile_.equalityFieldIds.begin(), deleteFile_.equalityFieldIds.end()); + auto deleteFieldSelector = [&equalityFieldIds](size_t index) { + return equalityFieldIds.find(static_cast(index)) != + equalityFieldIds.end(); + }; + auto deleteFileSchema = dwio::common::typeutils::buildSelectedType( + baseFileSchema_, deleteFieldSelector); + + rowType_ = std::static_pointer_cast(deleteFileSchema->type()); + + // TODO: push down filter if previous delete file contains this one. E.g. + // previous equality delete file has a=1, and this file also contains + // columns a, then a!=1 can be pushed as a filter when reading this delete + // file. + + auto scanSpec = std::make_shared(""); + scanSpec->addAllChildFields(rowType_->asRow()); + + deleteSplit_ = std::make_shared( + connectorId, + deleteFile_.filePath, + deleteFile_.fileFormat, + 0, + deleteFile_.fileSizeInBytes); + + // Create the Reader and RowReader + + dwio::common::ReaderOptions deleteReaderOpts(pool_); + configureReaderOptions( + deleteReaderOpts, + hiveConfig_, + connectorQueryCtx_, + rowType_, + deleteSplit_); + + auto deleteFileHandleCachePtr = + fileHandleFactory_->generate(deleteFile_.filePath); + auto deleteFileInput = createBufferedInput( + *deleteFileHandleCachePtr, + deleteReaderOpts, + connectorQueryCtx_, + ioStats_, + executor_); + + auto deleteReader = + dwio::common::getReaderFactory(deleteReaderOpts.fileFormat()) + ->createReader(std::move(deleteFileInput), deleteReaderOpts); + + dwio::common::RowReaderOptions deleteRowReaderOpts; + configureRowReaderOptions( + {}, + scanSpec, + nullptr, + rowType_, + deleteSplit_, + hiveConfig_, + connectorQueryCtx_->sessionProperties(), + deleteRowReaderOpts); + + deleteRowReader_.reset(); + deleteRowReader_ = deleteReader->createRowReader(deleteRowReaderOpts); +} + +void EqualityDeleteFileReader::readDeleteValues( + SubfieldFilters& subfieldFilters, + std::vector& expressionInputs) { + VELOX_CHECK(deleteRowReader_); + VELOX_CHECK(deleteSplit_); + + if (!deleteValuesOutput_) { + deleteValuesOutput_ = BaseVector::create(rowType_, 0, pool_); + } + + // TODO:: verfiy if the field is a sub-field. Velox currently doesn't support + // pushing down filters to sub-fields + if (rowType_->size() == 1) { + // Construct the IN list filter that can be pushed down to the base file + // readers, then update the baseFileScanSpec. + readSingleColumnDeleteValues(subfieldFilters); + } else { + readMultipleColumnDeleteValues(expressionInputs); + } + + deleteSplit_.reset(); +} + +void EqualityDeleteFileReader::readSingleColumnDeleteValues( + SubfieldFilters& subfieldFilters) { + std::unique_ptr filter = std::make_unique(); + while (deleteRowReader_->next(kMaxBatchRows, deleteValuesOutput_)) { + if (deleteValuesOutput_->size() == 0) { + continue; + } + + deleteValuesOutput_->loadedVector(); + auto vector = + std::dynamic_pointer_cast(deleteValuesOutput_)->childAt(0); + auto name = rowType_->nameOf(0); + + auto typeKind = vector->type()->kind(); + VELOX_CHECK( + typeKind != TypeKind::DOUBLE && typeKind != TypeKind::REAL, + "Iceberg does not allow DOUBLE or REAL columns as the equality delete columns: {} : {}", + name, + typeKind); + + auto notExistsFilter = + createNotExistsFilter(vector, 0, deleteValuesOutput_->size(), typeKind); + filter = filter->mergeWith(notExistsFilter.get()); + } + + if (filter->kind() != FilterKind::kAlwaysTrue) { + if (subfieldFilters.find(common::Subfield(rowType_->nameOf(0))) != + subfieldFilters.end()) { + subfieldFilters[common::Subfield(rowType_->nameOf(0))] = + subfieldFilters[common::Subfield(rowType_->nameOf(0))]->mergeWith( + filter.get()); + } else { + subfieldFilters[common::Subfield(rowType_->nameOf(0))] = + std::move(filter); + } + } +} + +void EqualityDeleteFileReader::readMultipleColumnDeleteValues( + std::vector& expressionInputs) { + auto numDeleteFields = rowType_->size(); + VELOX_CHECK_GT( + numDeleteFields, + 0, + "Iceberg equality delete file should have at least one field."); + + // TODO: logical expression simplifications + while (deleteRowReader_->next(kMaxBatchRows, deleteValuesOutput_)) { + if (deleteValuesOutput_->size() == 0) { + continue; + } + + deleteValuesOutput_->loadedVector(); + auto rowVector = std::dynamic_pointer_cast(deleteValuesOutput_); + auto numDeletedValues = rowVector->childAt(0)->size(); + + for (int i = 0; i < numDeletedValues; i++) { + std::vector disconjunctInputs; + + for (int j = 0; j < numDeleteFields; j++) { + auto type = rowType_->childAt(j); + auto name = rowType_->nameOf(j); + auto value = BaseVector::wrapInConstant(1, i, rowVector->childAt(j)); + + std::vector isNotEqualInputs; + isNotEqualInputs.push_back( + std::make_shared(type, name)); + isNotEqualInputs.push_back(std::make_shared(value)); + auto isNotEqualExpr = + std::make_shared(BOOLEAN(), isNotEqualInputs, "neq"); + + disconjunctInputs.push_back(isNotEqualExpr); + } + + auto disconjunctNotEqualExpr = + std::make_shared(BOOLEAN(), disconjunctInputs, "or"); + expressionInputs.push_back(disconjunctNotEqualExpr); + } + } +} + +} // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/iceberg/EqualityDeleteFileReader.h b/velox/connectors/hive/iceberg/EqualityDeleteFileReader.h new file mode 100644 index 0000000000000..29160b56a38f4 --- /dev/null +++ b/velox/connectors/hive/iceberg/EqualityDeleteFileReader.h @@ -0,0 +1,72 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/connectors/Connector.h" +#include "velox/connectors/hive/FileHandle.h" +#include "velox/connectors/hive/HiveConfig.h" +#include "velox/connectors/hive/HiveConnectorSplit.h" +#include "velox/dwio/common/Reader.h" +#include "velox/expression/Expr.h" + +namespace facebook::velox::connector::hive::iceberg { + +class IcebergDeleteFile; + +using SubfieldFilters = + std::unordered_map>; + +class EqualityDeleteFileReader { + public: + EqualityDeleteFileReader( + const IcebergDeleteFile& deleteFile, + std::shared_ptr baseFileSchema, + FileHandleFactory* fileHandleFactory, + folly::Executor* executor, + const ConnectorQueryCtx* connectorQueryCtx, + const std::shared_ptr hiveConfig, + std::shared_ptr ioStats, + dwio::common::RuntimeStatistics& runtimeStats, + const std::string& connectorId); + + void readDeleteValues( + SubfieldFilters& subfieldFilters, + std::vector& typedExpressions); + + private: + void readSingleColumnDeleteValues(SubfieldFilters& subfieldFilters); + + void readMultipleColumnDeleteValues( + std::vector& expressionInputs); + + const IcebergDeleteFile& deleteFile_; + const std::shared_ptr baseFileSchema_; + const ConnectorQueryCtx* const connectorQueryCtx_; + const std::shared_ptr hiveConfig_; + std::shared_ptr deleteSplit_; + + FileHandleFactory* const fileHandleFactory_; + folly::Executor* const executor_; + memory::MemoryPool* const pool_; + const std::shared_ptr ioStats_; + + RowTypePtr rowType_; + std::unique_ptr deleteRowReader_; + VectorPtr deleteValuesOutput_; +}; + +} // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/iceberg/FilterUtil.cpp b/velox/connectors/hive/iceberg/FilterUtil.cpp new file mode 100644 index 0000000000000..e1d988f7e1792 --- /dev/null +++ b/velox/connectors/hive/iceberg/FilterUtil.cpp @@ -0,0 +1,122 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "velox/connectors/hive/iceberg/FilterUtil.h" + +namespace facebook::velox::connector::hive::iceberg { + +using namespace facebook::velox::exec; +using namespace facebook::velox::core; + +// Read 'size' values from 'valuesVector' starting at 'offset', de-duplicate +// remove nulls and sort. Return a list of unique non-null values sorted in +// ascending order and a boolean indicating whether there were any null values. +template +std::pair, bool> toValues( + const VectorPtr& valuesVector, + vector_size_t offset, + vector_size_t size) { + auto simpleValues = valuesVector->as>(); + + bool nullAllowed = false; + std::vector values; + values.reserve(size); + + for (auto i = offset; i < offset + size; i++) { + if (simpleValues->isNullAt(i)) { + nullAllowed = true; + } else { + if constexpr (std::is_same_v) { + values.emplace_back(simpleValues->valueAt(i).toMillis()); + } else { + values.emplace_back(simpleValues->valueAt(i)); + } + } + } + + // In-place sort, remove duplicates, and later std::move to save memory + std::sort(values.begin(), values.end()); + auto last = std::unique(values.begin(), values.end()); + values.resize(std::distance(values.begin(), last)); + + return {std::move(values), nullAllowed}; +} + +template +std::unique_ptr createNegatedBigintValuesFilter( + const VectorPtr& valuesVector, + vector_size_t offset, + vector_size_t size) { + auto valuesPair = toValues(valuesVector, offset, size); + + const auto& values = valuesPair.first; + bool hasNull = valuesPair.second; + + return common::createNegatedBigintValues(values, !hasNull); +} + +std::unique_ptr createNotExistsFilter( + const VectorPtr& elements, + vector_size_t offset, + vector_size_t size, + const TypeKind& type) { + std::unique_ptr filter; + switch (type) { + case TypeKind::HUGEINT: + // TODO: createNegatedHugeintValuesFilter is not implemented yet. + VELOX_NYI("createNegatedHugeintValuesFilter is not implemented yet"); + case TypeKind::BIGINT: + filter = createNegatedBigintValuesFilter(elements, offset, size); + break; + case TypeKind::INTEGER: + filter = createNegatedBigintValuesFilter(elements, offset, size); + break; + case TypeKind::SMALLINT: + filter = createNegatedBigintValuesFilter(elements, offset, size); + break; + case TypeKind::TINYINT: + filter = createNegatedBigintValuesFilter(elements, offset, size); + break; + case TypeKind::BOOLEAN: + // Hack: using BIGINT filter for bool, which is essentially "int1_t". + filter = createNegatedBigintValuesFilter(elements, offset, size); + break; + case TypeKind::TIMESTAMP: + filter = + createNegatedBigintValuesFilter(elements, offset, size); + break; + case TypeKind::VARCHAR: + case TypeKind::VARBINARY: + // TODO: createNegatedBytesValuesFilter is not implemented yet. + VELOX_NYI("createNegatedBytesValuesFilter is not implemented yet"); + case TypeKind::REAL: + case TypeKind::DOUBLE: + VELOX_USER_FAIL( + "Iceberg equality delete column cannot be DOUBLE or FLOAT"); + case TypeKind::UNKNOWN: + [[fallthrough]]; + case TypeKind::ARRAY: + [[fallthrough]]; + case TypeKind::MAP: + [[fallthrough]]; + case TypeKind::ROW: + [[fallthrough]]; + default: + VELOX_UNSUPPORTED( + "Unsupported in-list type {} for NOT EXIST predicate", type); + } + return filter; +} +} // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/iceberg/FilterUtil.h b/velox/connectors/hive/iceberg/FilterUtil.h new file mode 100644 index 0000000000000..90e446f297a13 --- /dev/null +++ b/velox/connectors/hive/iceberg/FilterUtil.h @@ -0,0 +1,30 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/expression/Expr.h" +#include "velox/type/Filter.h" +#include "velox/vector/ComplexVector.h" + +namespace facebook::velox::connector::hive::iceberg { +std::unique_ptr createNotExistsFilter( + const VectorPtr& elements, + vector_size_t offset, + vector_size_t size, + const TypeKind& type); + +} // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/iceberg/IcebergSplitReader.cpp b/velox/connectors/hive/iceberg/IcebergSplitReader.cpp index 1923837d112cd..72568eb342ff5 100644 --- a/velox/connectors/hive/iceberg/IcebergSplitReader.cpp +++ b/velox/connectors/hive/iceberg/IcebergSplitReader.cpp @@ -16,11 +16,13 @@ #include "velox/connectors/hive/iceberg/IcebergSplitReader.h" +#include "velox/connectors/hive/iceberg/EqualityDeleteFileReader.h" #include "velox/connectors/hive/iceberg/IcebergDeleteFile.h" #include "velox/connectors/hive/iceberg/IcebergSplit.h" #include "velox/dwio/common/BufferUtil.h" using namespace facebook::velox::dwio::common; +using namespace facebook::velox::exec; namespace facebook::velox::connector::hive::iceberg { @@ -35,7 +37,10 @@ IcebergSplitReader::IcebergSplitReader( const std::shared_ptr& ioStats, FileHandleFactory* const fileHandleFactory, folly::Executor* executor, - const std::shared_ptr& scanSpec) + const std::shared_ptr& scanSpec, + std::shared_ptr& remainingFilterExprSet, + core::ExpressionEvaluator* expressionEvaluator, + std::atomic& totalRemainingFilterTime) : SplitReader( hiveSplit, hiveTableHandle, @@ -47,30 +52,100 @@ IcebergSplitReader::IcebergSplitReader( fileHandleFactory, executor, scanSpec), + originalScanSpec_(nullptr), baseReadOffset_(0), splitOffset_(0), deleteBitmap_(nullptr), - deleteBitmapBitOffset_(0) {} + deleteBitmapBitOffset_(0), + deleteExprSet_(nullptr), + expressionEvaluator_(expressionEvaluator), + totalRemainingFilterTime_(totalRemainingFilterTime) {} + +IcebergSplitReader::~IcebergSplitReader() {} void IcebergSplitReader::prepareSplit( std::shared_ptr metadataFilter, dwio::common::RuntimeStatistics& runtimeStats) { + // The base file reader needs to be created before checking if the split is + // emtpy. createReader(std::move(metadataFilter)); + std::shared_ptr icebergSplit = + std::dynamic_pointer_cast(hiveSplit_); + const auto& deleteFiles = icebergSplit->deleteFiles; + std::unordered_set equalityFieldIds; + for (const auto& deleteFile : deleteFiles) { + if (deleteFile.content == FileContent::kEqualityDeletes && + deleteFile.recordCount > 0) { + equalityFieldIds.insert( + deleteFile.equalityFieldIds.begin(), + deleteFile.equalityFieldIds.end()); + } + } + + // checkIfSplitIsEmpty needs to use the base reader's schemaWithId_. For that + // we need to update the ScanSpec first. + for (int32_t id : equalityFieldIds) { + scanSpec_->getOrCreateChild(baseReader_->rowType()->nameOf(id - 1), true); + } + if (checkIfSplitIsEmpty(runtimeStats)) { VELOX_CHECK(emptySplit_); return; } + // Process the equality delete files to update the scan spec and remaining + // filters. It needs to be done after creating the Reader and before creating + // the RowReader. + + SubfieldFilters subfieldFilters; + std::vector conjunctInputs; + + for (const auto& deleteFile : deleteFiles) { + if (deleteFile.content == FileContent::kEqualityDeletes && + deleteFile.recordCount > 0) { + // TODO: build cache of to avoid repeating file + // parsing across partitions. Within a single partition, the splits should + // be with the same equality delete files and only need to be parsed once. + auto equalityDeleteReader = std::make_unique( + deleteFile, + baseFileSchema(), + fileHandleFactory_, + executor_, + connectorQueryCtx_, + hiveConfig_, + ioStats_, + runtimeStats, + hiveSplit_->connectorId); + equalityDeleteReader->readDeleteValues(subfieldFilters, conjunctInputs); + } + } + + if (!subfieldFilters.empty()) { + for (auto iter = subfieldFilters.begin(); iter != subfieldFilters.end(); + iter++) { + auto childSpec = scanSpec_->getOrCreateChild(iter->first, true); + childSpec->addFilter(*iter->second); + childSpec->setHasTempFilter(true); + childSpec->setSubscript(scanSpec_->children().size() - 1); + } + } + + if (!conjunctInputs.empty()) { + core::TypedExprPtr expression = + std::make_shared(BOOLEAN(), conjunctInputs, "and"); + deleteExprSet_ = expressionEvaluator_->compile(expression); + VELOX_CHECK_EQ(deleteExprSet_->size(), 1); + } + createRowReader(); - std::shared_ptr icebergSplit = - std::dynamic_pointer_cast(hiveSplit_); baseReadOffset_ = 0; splitOffset_ = baseRowReader_->nextRowNumber(); - positionalDeleteFileReaders_.clear(); - const auto& deleteFiles = icebergSplit->deleteFiles; + // Create the positional deletes file readers. They need to be created after + // the RowReader is created. + positionalDeleteFileReaders_.clear(); for (const auto& deleteFile : deleteFiles) { if (deleteFile.content == FileContent::kPositionalDeletes) { if (deleteFile.recordCount > 0) { @@ -87,8 +162,6 @@ void IcebergSplitReader::prepareSplit( splitOffset_, hiveSplit_->connectorId)); } - } else { - VELOX_NYI(); } } } @@ -145,9 +218,36 @@ uint64_t IcebergSplitReader::next(uint64_t size, VectorPtr& output) { : nullptr; auto rowsScanned = baseRowReader_->next(size, output, &mutation); + + // Evaluate the remaining filter deleteExprSet_ for every batch and update the + // output vector if it reduces any rows. + if (deleteExprSet_) { + auto filterStartMicros = getCurrentTimeMicro(); + + filterRows_.resize(output->size()); + auto rowVector = std::dynamic_pointer_cast(output); + expressionEvaluator_->evaluate( + deleteExprSet_.get(), filterRows_, *rowVector, filterResult_); + auto numRemainingRows = exec::processFilterResults( + filterResult_, filterRows_, filterEvalCtx_, pool_); + + if (numRemainingRows < output->size()) { + output = exec::wrap( + numRemainingRows, filterEvalCtx_.selectedIndices, rowVector); + } + + totalRemainingFilterTime_.fetch_add( + (getCurrentTimeMicro() - filterStartMicros) * 1000, + std::memory_order_relaxed); + } + baseReadOffset_ += rowsScanned; deleteBitmapBitOffset_ = rowsScanned; + if (rowsScanned == 0) { + scanSpec_->deleteTempNodes(); + } + return rowsScanned; } diff --git a/velox/connectors/hive/iceberg/IcebergSplitReader.h b/velox/connectors/hive/iceberg/IcebergSplitReader.h index 5f1196038f96d..852ef4480e53f 100644 --- a/velox/connectors/hive/iceberg/IcebergSplitReader.h +++ b/velox/connectors/hive/iceberg/IcebergSplitReader.h @@ -19,6 +19,7 @@ #include "velox/connectors/Connector.h" #include "velox/connectors/hive/SplitReader.h" #include "velox/connectors/hive/iceberg/PositionalDeleteFileReader.h" +#include "velox/exec/OperatorUtils.h" namespace facebook::velox::connector::hive::iceberg { @@ -37,9 +38,12 @@ class IcebergSplitReader : public SplitReader { const std::shared_ptr& ioStats, FileHandleFactory* fileHandleFactory, folly::Executor* executor, - const std::shared_ptr& scanSpec); + const std::shared_ptr& scanSpec, + std::shared_ptr& remainingFilterExprSet, + core::ExpressionEvaluator* expressionEvaluator, + std::atomic& totalRemainingFilterTime); - ~IcebergSplitReader() override = default; + ~IcebergSplitReader() override; void prepareSplit( std::shared_ptr metadataFilter, @@ -48,6 +52,11 @@ class IcebergSplitReader : public SplitReader { uint64_t next(uint64_t size, VectorPtr& output) override; private: + // The ScanSpec may need to be updated for different partitions if the split + // comes with single column equality delete files. So we need to keep a copy + // of the original ScanSpec. + std::shared_ptr originalScanSpec_; + // The read offset to the beginning of the split in number of rows for the // current batch for the base data file uint64_t baseReadOffset_; @@ -59,5 +68,14 @@ class IcebergSplitReader : public SplitReader { // The offset in bits of the deleteBitmap_ starting from where the bits shall // be consumed uint64_t deleteBitmapBitOffset_; + + std::unique_ptr deleteExprSet_; + core::ExpressionEvaluator* expressionEvaluator_; + std::atomic& totalRemainingFilterTime_; + + // Reusable memory for remaining filter evaluation. + VectorPtr filterResult_; + SelectivityVector filterRows_; + exec::FilterEvalCtx filterEvalCtx_; }; } // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp b/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp index d79e21b733439..5735a3da4b724 100644 --- a/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp +++ b/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp @@ -100,53 +100,6 @@ class HiveIcebergTest : public HiveConnectorTestBase { rowGroupSizesForFiles, deleteFilesForBaseDatafiles, 0); } - void assertMultipleSplits( - const std::vector& deletePositions, - int32_t splitCount, - int32_t numPrefetchSplits) { - std::map> rowGroupSizesForFiles; - for (int32_t i = 0; i < splitCount; i++) { - std::string dataFileName = fmt::format("data_file_{}", i); - rowGroupSizesForFiles[dataFileName] = {rowCount}; - } - - std::unordered_map< - std::string, - std::multimap>> - deleteFilesForBaseDatafiles; - for (int i = 0; i < splitCount; i++) { - std::string deleteFileName = fmt::format("delete_file_{}", i); - deleteFilesForBaseDatafiles[deleteFileName] = { - {fmt::format("data_file_{}", i), deletePositions}}; - } - - assertPositionalDeletes( - rowGroupSizesForFiles, deleteFilesForBaseDatafiles, numPrefetchSplits); - } - - std::vector makeRandomIncreasingValues(int64_t begin, int64_t end) { - VELOX_CHECK(begin < end); - - std::mt19937 gen{0}; - std::vector values; - values.reserve(end - begin); - for (int i = begin; i < end; i++) { - if (folly::Random::rand32(0, 10, gen) > 8) { - values.push_back(i); - } - } - return values; - } - - std::vector makeContinuousIncreasingValues( - int64_t begin, - int64_t end) { - std::vector values; - values.resize(end - begin); - std::iota(values.begin(), values.end(), begin); - return values; - } - /// @rowGroupSizesForFiles The key is the file name, and the value is a vector /// of RowGroup sizes /// @deleteFilesForBaseDatafiles The key is the delete file name, and the @@ -198,14 +151,14 @@ class HiveIcebergTest : public HiveConnectorTestBase { // add it to the split auto deleteFilePath = deleteFilePaths[deleteFileName].second->getPath(); - IcebergDeleteFile deleteFile( + IcebergDeleteFile icebergDeleteFile( FileContent::kPositionalDeletes, deleteFilePath, fileFomat_, deleteFilePaths[deleteFileName].first, testing::internal::GetFileSize( std::fopen(deleteFilePath.c_str(), "r"))); - deleteFiles.push_back(deleteFile); + deleteFiles.push_back(icebergDeleteFile); } } @@ -214,7 +167,7 @@ class HiveIcebergTest : public HiveConnectorTestBase { std::string duckdbSql = getDuckDBQuery(rowGroupSizesForFiles, deleteFilesForBaseDatafiles); - auto plan = tableScanNode(); + auto plan = tableScanNode(rowType_); auto task = HiveConnectorTestBase::assertQuery( plan, splits, duckdbSql, numPrefetchSplits); @@ -225,9 +178,202 @@ class HiveIcebergTest : public HiveConnectorTestBase { ASSERT_TRUE(it->second.peakMemoryBytes > 0); } + void assertEqualityDeletes( + const std::unordered_map>>& + equalityDeleteVectorMap, + const std::unordered_map>& + equalityFieldIdsMap, + std::string duckDbSql = "") { + VELOX_CHECK_EQ(equalityDeleteVectorMap.size(), equalityFieldIdsMap.size()); + // We will create data vectors with numColumns number of columns that is the + // max field Id in equalityFieldIds + int32_t numDataColumns = 0; + + for (auto it = equalityFieldIdsMap.begin(); it != equalityFieldIdsMap.end(); + ++it) { + auto equalityFieldIds = it->second; + auto currentMax = + *std::max_element(equalityFieldIds.begin(), equalityFieldIds.end()); + numDataColumns = std::max(numDataColumns, currentMax); + } + + // VELOX_CHECK_GT(numDataColumns, 0); + // VELOX_CHECK_GE(numDataColumns, equalityDeleteVector.size()); + // VELOX_CHECK_GT(equalityDeleteVector.size(), 0); + // + // VELOX_CHECK_LE(equalityFieldIds.size(), numDataColumns); + + std::shared_ptr dataFilePath = + writeDataFiles(rowCount, numDataColumns)[0]; + + std::vector deleteFiles; + std::string predicates = ""; + unsigned long numDeletedValues = 0; + + std::vector> deleteFilePaths; + for (auto it = equalityFieldIdsMap.begin(); + it != equalityFieldIdsMap.end();) { + auto equalityFieldIds = it->second; + auto equalityDeleteVector = equalityDeleteVectorMap.at(it->first); + VELOX_CHECK_GT(equalityDeleteVector.size(), 0); + numDeletedValues = + std::max(numDeletedValues, equalityDeleteVector[0].size()); + deleteFilePaths.push_back(writeEqualityDeleteFile(equalityDeleteVector)); + IcebergDeleteFile deleteFile( + FileContent::kEqualityDeletes, + deleteFilePaths.back()->getPath(), + fileFomat_, + equalityDeleteVector[0].size(), + testing::internal::GetFileSize( + std::fopen(deleteFilePaths.back()->getPath().c_str(), "r")), + equalityFieldIds); + deleteFiles.push_back(deleteFile); + predicates += makePredicates(equalityDeleteVector, equalityFieldIds); + ++it; + if (it != equalityFieldIdsMap.end()) { + predicates += " AND "; + } + } + + auto icebergSplit = makeIcebergSplit(dataFilePath->getPath(), deleteFiles); + + // If the caller passed in a query, use that. + if (duckDbSql == "") { + // Select all columns + duckDbSql = "SELECT * FROM tmp "; + if (numDeletedValues > 0) { + duckDbSql += fmt::format("WHERE {}", predicates); + } + } + + assertEqualityDeletes(icebergSplit, rowType_, duckDbSql); + + // Select a column that's not in the filter columns + if (numDataColumns > 1 && + equalityDeleteVectorMap.at(0).size() < numDataColumns) { + // if (inputDuckDbSql.empty()) { + std::string duckDbSql1 = "SELECT c0 FROM tmp"; + if (numDeletedValues > 0) { + duckDbSql1 += fmt::format(" WHERE {}", predicates); + } + + std::vector names({"c0"}); + std::vector types(1, BIGINT()); + assertEqualityDeletes( + icebergSplit, + std::make_shared(std::move(names), std::move(types)), + duckDbSql1); + } + } + + IcebergDeleteFile* getEqualityDeleteFile( + const std::vector>& equalityDeleteVector, + const std::vector& equalityFieldIds, + std::shared_ptr& deleteFilePath, + std::string duckDbSql = "") { + deleteFilePath = writeEqualityDeleteFile(equalityDeleteVector); + IcebergDeleteFile* deleteFile = new IcebergDeleteFile( + FileContent::kEqualityDeletes, + deleteFilePath->getPath(), + fileFomat_, + equalityDeleteVector[0].size(), + testing::internal::GetFileSize( + std::fopen(deleteFilePath->getPath().c_str(), "r")), + equalityFieldIds); + + return deleteFile; + } + + void assertMultipleSplits( + const std::vector& deletePositions, + int32_t splitCount, + int32_t numPrefetchSplits) { + std::map> rowGroupSizesForFiles; + for (int32_t i = 0; i < splitCount; i++) { + std::string dataFileName = fmt::format("data_file_{}", i); + rowGroupSizesForFiles[dataFileName] = {rowCount}; + } + + std::unordered_map< + std::string, + std::multimap>> + deleteFilesForBaseDatafiles; + for (int i = 0; i < splitCount; i++) { + std::string deleteFileName = fmt::format("delete_file_{}", i); + deleteFilesForBaseDatafiles[deleteFileName] = { + {fmt::format("data_file_{}", i), deletePositions}}; + } + + assertPositionalDeletes( + rowGroupSizesForFiles, deleteFilesForBaseDatafiles, numPrefetchSplits); + } + + std::vector makeRandomIncreasingValues(int64_t begin, int64_t end) { + VELOX_CHECK(begin < end); + + std::mt19937 gen{0}; + std::vector values; + values.reserve(end - begin); + for (int i = begin; i < end; i++) { + if (folly::Random::rand32(0, 10, gen) > 8) { + values.push_back(i); + } + } + return values; + } + + std::vector makeContinuousIncreasingValues( + int64_t begin, + int64_t end) { + std::vector values; + values.resize(end - begin); + std::iota(values.begin(), values.end(), begin); + return values; + } + + std::vector makeSequenceValues(int32_t numRows, int8_t repeat = 1) { + VELOX_CHECK_GT(repeat, 0); + + auto maxValue = std::ceil((double)numRows / repeat); + std::vector values; + values.reserve(numRows); + for (int32_t i = 0; i < maxValue; i++) { + for (int8_t j = 0; j < repeat; j++) { + values.push_back(i); + } + } + values.resize(numRows); + return values; + } + + std::vector makeRandomDeleteValues(int32_t maxRowNumber) { + std::mt19937 gen{0}; + std::vector deleteRows; + for (int i = 0; i < maxRowNumber; i++) { + if (folly::Random::rand32(0, 10, gen) > 8) { + deleteRows.push_back(i); + } + } + return deleteRows; + } + const static int rowCount = 20000; private: + void assertEqualityDeletes( + std::shared_ptr split, + RowTypePtr outputRowType, + const std::string& duckDbSql) { + auto plan = tableScanNode(outputRowType); + auto task = OperatorTestBase::assertQuery(plan, {split}, duckDbSql); + + auto planStats = toPlanStats(task->taskStats()); + auto scanNodeId = plan->id(); + auto it = planStats.find(scanNodeId); + ASSERT_TRUE(it != planStats.end()); + ASSERT_TRUE(it->second.peakMemoryBytes > 0); + } + std::map> writeDataFiles( std::map> rowGroupSizesForFiles) { std::map> dataFilePaths; @@ -258,6 +404,24 @@ class HiveIcebergTest : public HiveConnectorTestBase { return dataFilePaths; } + std::vector> writeDataFiles( + uint64_t numRows, + int32_t numColumns = 1, + int32_t splitCount = 1) { + auto dataVectors = makeVectors(splitCount, numRows, numColumns); + VELOX_CHECK_EQ(dataVectors.size(), splitCount); + + std::vector> dataFilePaths; + dataFilePaths.reserve(splitCount); + for (auto i = 0; i < splitCount; i++) { + dataFilePaths.emplace_back(TempFilePath::create()); + writeToFile(dataFilePaths.back()->getPath(), dataVectors[i]); + } + + createDuckDbTable(dataVectors); + return dataFilePaths; + } + /// Input is like <"deleteFile1", <"dataFile1", {pos_RG1, pos_RG2,..}>, /// <"dataFile2", {pos_RG1, pos_RG2,..}> std::unordered_map< @@ -335,6 +499,44 @@ class HiveIcebergTest : public HiveConnectorTestBase { return vectors; } + std::vector + makeVectors(int32_t count, int32_t rowsPerVector, int32_t numColumns = 1) { + std::vector types(numColumns, BIGINT()); + std::vector names; + for (int j = 0; j < numColumns; j++) { + names.push_back(fmt::format("c{}", j)); + } + + std::vector rowVectors; + for (int i = 0; i < count; i++) { + std::vector vectors; + + // Create the column values like below: + // c0 c1 c2 + // 0 0 0 + // 1 0 0 + // 2 1 0 + // 3 1 1 + // 4 2 1 + // 5 2 1 + // 6 3 2 + // ... + // In the first column c0, the values are continuously increasing and not + // repeating. In the second column c1, the values are continuously + // increasing and each value repeats once. And so on. + for (int j = 0; j < numColumns; j++) { + auto data = makeSequenceValues(rowsPerVector, j + 1); + vectors.push_back(vectorMaker_.flatVector(data)); + } + + rowVectors.push_back(makeRowVector(names, vectors)); + } + + rowType_ = std::make_shared(std::move(names), std::move(types)); + + return rowVectors; + } + std::shared_ptr makeIcebergSplit( const std::string& dataFilePath, const std::vector& deleteFiles = {}) { @@ -472,8 +674,83 @@ class HiveIcebergTest : public HiveConnectorTestBase { }); } - core::PlanNodePtr tableScanNode() { - return PlanBuilder(pool_.get()).tableScan(rowType_).planNode(); + core::PlanNodePtr tableScanNode(RowTypePtr outputRowType) { + return PlanBuilder(pool_.get()).tableScan(outputRowType).planNode(); + } + + std::shared_ptr writeEqualityDeleteFile( + const std::vector>& equalityDeleteVector) { + std::vector names; + std::vector vectors; + for (int i = 0; i < equalityDeleteVector.size(); i++) { + names.push_back(fmt::format("c{}", i)); + vectors.push_back( + vectorMaker_.flatVector(equalityDeleteVector[i])); + } + + RowVectorPtr deleteFileVectors = makeRowVector(names, vectors); + + auto deleteFilePath = TempFilePath::create(); + writeToFile(deleteFilePath->getPath(), deleteFileVectors); + + return deleteFilePath; + } + + std::string makePredicates( + const std::vector>& equalityDeleteVector, + const std::vector& equalityFieldIds) { + std::string predicates(""); + int32_t numDataColumns = + *std::max_element(equalityFieldIds.begin(), equalityFieldIds.end()); + + VELOX_CHECK_GT(numDataColumns, 0); + VELOX_CHECK_GE(numDataColumns, equalityDeleteVector.size()); + VELOX_CHECK_GT(equalityDeleteVector.size(), 0); + + auto numDeletedValues = equalityDeleteVector[0].size(); + + if (numDeletedValues == 0) { + return predicates; + } + + // If all values for a column are deleted, just return an always-false + // predicate + for (auto i = 0; i < equalityDeleteVector.size(); i++) { + auto equalityFieldId = equalityFieldIds[i]; + auto deleteValues = equalityDeleteVector[i]; + + auto lastIter = std::unique(deleteValues.begin(), deleteValues.end()); + auto numDistinctValues = lastIter - deleteValues.begin(); + auto minValue = 1; + auto maxValue = *std::max_element(deleteValues.begin(), lastIter); + if (maxValue - minValue + 1 == numDistinctValues && + maxValue == (rowCount - 1) / equalityFieldId) { + return "1 = 0"; + } + } + + if (equalityDeleteVector.size() == 1) { + std::string name = fmt::format("c{}", equalityFieldIds[0] - 1); + predicates = fmt::format( + "{} NOT IN ({})", name, makeNotInList({equalityDeleteVector[0]})); + } else { + for (int i = 0; i < numDeletedValues; i++) { + std::string oneRow(""); + for (int j = 0; j < equalityFieldIds.size(); j++) { + std::string name = fmt::format("c{}", equalityFieldIds[j] - 1); + std::string predicate = + fmt::format("({} <> {})", name, equalityDeleteVector[j][i]); + + oneRow = oneRow == "" ? predicate + : fmt::format("({} OR {})", oneRow, predicate); + } + + predicates = predicates == "" + ? oneRow + : fmt::format("{} AND {}", predicates, oneRow); + } + } + return predicates; } dwio::common::FileFormat fileFomat_{dwio::common::FileFormat::DWRF}; @@ -659,4 +936,181 @@ TEST_F(HiveIcebergTest, positionalDeletesMultipleSplits) { assertMultipleSplits({}, 10, 3); } +// Delete values from a single column file +TEST_F(HiveIcebergTest, equalityDeletesSingleFileColumn1) { + folly::SingletonVault::singleton()->registrationComplete(); + + std::unordered_map> equalityFieldIdsMap; + std::unordered_map>> + equalityDeleteVectorMap; + equalityFieldIdsMap.insert({0, {1}}); + + // Delete row 0, 1, 2, 3 from the first batch out of two. + equalityDeleteVectorMap.insert({0, {{0, 1, 2, 3}}}); + assertEqualityDeletes(equalityDeleteVectorMap, equalityFieldIdsMap); + + // Delete the first and last row in each batch (10000 rows per batch) + equalityDeleteVectorMap.clear(); + equalityDeleteVectorMap.insert({0, {{0, 9999, 10000, 19999}}}); + assertEqualityDeletes(equalityDeleteVectorMap, equalityFieldIdsMap); + + // Delete several rows in the second batch (10000 rows per batch) + equalityDeleteVectorMap.clear(); + equalityDeleteVectorMap.insert({0, {{10000, 10002, 19999}}}); + assertEqualityDeletes(equalityDeleteVectorMap, equalityFieldIdsMap); + + // Delete random rows + equalityDeleteVectorMap.clear(); + equalityDeleteVectorMap.insert({0, {makeRandomDeleteValues(rowCount)}}); + assertEqualityDeletes(equalityDeleteVectorMap, equalityFieldIdsMap); + + // Delete 0 rows + equalityDeleteVectorMap.clear(); + equalityDeleteVectorMap.insert({0, {{}}}); + assertEqualityDeletes(equalityDeleteVectorMap, equalityFieldIdsMap); + + // Delete all rows + equalityDeleteVectorMap.insert({0, {makeSequenceValues(rowCount)}}); + assertEqualityDeletes(equalityDeleteVectorMap, equalityFieldIdsMap); + + // Delete rows that don't exist + equalityDeleteVectorMap.clear(); + equalityDeleteVectorMap.insert({0, {{20000, 29999}}}); + assertEqualityDeletes(equalityDeleteVectorMap, equalityFieldIdsMap); +} + +// Delete values from the second column in a 2-column file +// +// c1 c2 +// 0 0 +// 1 0 +// 2 1 +// 3 1 +// 4 2 +// ... ... +// 19999 9999 +TEST_F(HiveIcebergTest, equalityDeletesSingleFileColumn2) { + folly::SingletonVault::singleton()->registrationComplete(); + + std::unordered_map> equalityFieldIdsMap; + std::unordered_map>> + equalityDeleteVectorMap; + equalityFieldIdsMap.insert({0, {2}}); + + // Delete values 0, 1, 2, 3 from the second column + equalityDeleteVectorMap.insert({0, {{0, 1, 2, 3}}}); + assertEqualityDeletes(equalityDeleteVectorMap, equalityFieldIdsMap); + + // Delete the smallest value 0 and the largest value 9999 from the second + // column, which has the range [0, 9999] + equalityDeleteVectorMap.clear(); + equalityDeleteVectorMap.insert({0, {{0, 9999}}}); + assertEqualityDeletes(equalityDeleteVectorMap, equalityFieldIdsMap); + + // Delete non-existent values from the second column + equalityDeleteVectorMap.clear(); + equalityDeleteVectorMap.insert({0, {{10000, 10002, 19999}}}); + assertEqualityDeletes(equalityDeleteVectorMap, equalityFieldIdsMap); + + // Delete random rows from the second column + equalityDeleteVectorMap.clear(); + equalityDeleteVectorMap.insert({0, {makeSequenceValues(rowCount)}}); + assertEqualityDeletes(equalityDeleteVectorMap, equalityFieldIdsMap); + + // Delete 0 values + equalityDeleteVectorMap.clear(); + equalityDeleteVectorMap.insert({0, {{}}}); + assertEqualityDeletes(equalityDeleteVectorMap, equalityFieldIdsMap); + + // Delete all values + equalityDeleteVectorMap.clear(); + equalityDeleteVectorMap.insert({0, {makeSequenceValues(rowCount / 2)}}); + assertEqualityDeletes(equalityDeleteVectorMap, equalityFieldIdsMap); +} + +// Delete values from 2 columns with the following data: +// +// c1 c2 +// 0 0 +// 1 0 +// 2 1 +// 3 1 +// 4 2 +// ... ... +// 19999 9999 +TEST_F(HiveIcebergTest, equalityDeletesSingleFileMultipleColumns) { + folly::SingletonVault::singleton()->registrationComplete(); + + std::unordered_map> equalityFieldIdsMap; + std::unordered_map>> + equalityDeleteVectorMap; + equalityFieldIdsMap.insert({0, {1, 2}}); + + // Delete rows 0, 1 + equalityDeleteVectorMap.insert({0, {{0, 1}, {0, 0}}}); + assertEqualityDeletes(equalityDeleteVectorMap, equalityFieldIdsMap); + + // Delete rows 0, 2, 4, 6 + equalityDeleteVectorMap.clear(); + equalityDeleteVectorMap.insert({0, {{0, 2, 4, 6}, {0, 1, 2, 3}}}); + assertEqualityDeletes(equalityDeleteVectorMap, equalityFieldIdsMap); + + // Delete the last row + equalityDeleteVectorMap.clear(); + equalityDeleteVectorMap.insert({0, {{19999}, {9999}}}); + assertEqualityDeletes(equalityDeleteVectorMap, equalityFieldIdsMap); + + // Delete non-existent values + equalityDeleteVectorMap.clear(); + equalityDeleteVectorMap.insert({0, {{20000, 30000}, {10000, 1500}}}); + assertEqualityDeletes(equalityDeleteVectorMap, equalityFieldIdsMap); + + // Delete 0 values + equalityDeleteVectorMap.clear(); + equalityDeleteVectorMap.insert({0, {{}, {}}}); + assertEqualityDeletes(equalityDeleteVectorMap, equalityFieldIdsMap); + + // Delete all values + equalityDeleteVectorMap.clear(); + equalityDeleteVectorMap.insert( + {0, {makeSequenceValues(rowCount), makeSequenceValues(rowCount, 2)}}); + assertEqualityDeletes( + equalityDeleteVectorMap, + equalityFieldIdsMap, + "SELECT * FROM tmp WHERE 1 = 0"); +} + +TEST_F(HiveIcebergTest, equalityDeletesMultipleFiles) { + folly::SingletonVault::singleton()->registrationComplete(); + + std::unordered_map> equalityFieldIdsMap; + std::unordered_map>> + equalityDeleteVectorMap; + equalityFieldIdsMap.insert({{0, {1}}, {1, {2}}}); + + // Delete rows {0, 1} from c0, {2, 3} from c1, with two equality delete files + equalityDeleteVectorMap.insert({{0, {{0, 1}}}, {1, {{2, 3}}}}); + assertEqualityDeletes(equalityDeleteVectorMap, equalityFieldIdsMap); + + // Delete using 3 equality delete files + equalityFieldIdsMap.insert({{2, {3}}}); + equalityDeleteVectorMap.insert({{0, {{0, 1}}}, {1, {{2, 3}}}, {2, {{4, 5}}}}); + assertEqualityDeletes(equalityDeleteVectorMap, equalityFieldIdsMap); + + // Delete 0 values + equalityDeleteVectorMap.clear(); + equalityDeleteVectorMap.insert({{0, {{}}}, {1, {{}}}, {2, {{}}}}); + assertEqualityDeletes(equalityDeleteVectorMap, equalityFieldIdsMap); + + // Delete all values + equalityDeleteVectorMap.clear(); + equalityDeleteVectorMap.insert( + {{0, {makeSequenceValues(rowCount)}}, + {1, {makeSequenceValues(rowCount)}}, + {2, {makeSequenceValues(rowCount)}}}); + assertEqualityDeletes( + equalityDeleteVectorMap, + equalityFieldIdsMap, + "SELECT * FROM tmp WHERE 1 = 0"); +} } // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/iceberg/tests/IcebergSplitReaderBenchmark.cpp b/velox/connectors/hive/iceberg/tests/IcebergSplitReaderBenchmark.cpp index e46880c5fe238..cf8f66edfc76e 100644 --- a/velox/connectors/hive/iceberg/tests/IcebergSplitReaderBenchmark.cpp +++ b/velox/connectors/hive/iceberg/tests/IcebergSplitReaderBenchmark.cpp @@ -15,8 +15,13 @@ */ #include "velox/connectors/hive/iceberg/tests/IcebergSplitReaderBenchmark.h" + #include +#include + +#include "velox/vector/tests/utils/VectorMaker.h" + using namespace facebook::velox; using namespace facebook::velox::dwio; using namespace facebook::velox::dwio::common; @@ -327,9 +332,14 @@ void IcebergSplitReaderBenchmark::readSingleColumn( suspender.dismiss(); + auto ioExecutor = std::make_unique(3); + std::shared_ptr remainingFilterExprSet{nullptr}; + std::atomic totalRemainingFilterTime; + uint64_t resultSize = 0; for (std::shared_ptr split : splits) { scanSpec->resetCachedValues(true); + std::unique_ptr icebergSplitReader = std::make_unique( split, @@ -340,8 +350,11 @@ void IcebergSplitReaderBenchmark::readSingleColumn( rowType, ioStats, &fileHandleFactory, - nullptr, - scanSpec); + ioExecutor.get(), + scanSpec, + remainingFilterExprSet, + connectorQueryCtx_->expressionEvaluator(), + totalRemainingFilterTime); std::shared_ptr randomSkip; icebergSplitReader->configureReaderOptions(randomSkip); diff --git a/velox/dwio/common/ScanSpec.cpp b/velox/dwio/common/ScanSpec.cpp index 8f950ff32334a..18ceafdaa1581 100644 --- a/velox/dwio/common/ScanSpec.cpp +++ b/velox/dwio/common/ScanSpec.cpp @@ -21,25 +21,27 @@ namespace facebook::velox::common { -ScanSpec* ScanSpec::getOrCreateChild(const std::string& name) { +ScanSpec* ScanSpec::getOrCreateChild(const std::string& name, bool isTempNode) { if (auto it = this->childByFieldName_.find(name); it != this->childByFieldName_.end()) { return it->second; } - this->children_.push_back(std::make_unique(name)); + this->children_.push_back(std::make_unique(name, isTempNode)); auto* child = this->children_.back().get(); this->childByFieldName_[child->fieldName()] = child; return child; } -ScanSpec* ScanSpec::getOrCreateChild(const Subfield& subfield) { +ScanSpec* ScanSpec::getOrCreateChild( + const Subfield& subfield, + bool isTempNode) { auto* container = this; const auto& path = subfield.path(); for (size_t depth = 0; depth < path.size(); ++depth) { const auto element = path[depth].get(); VELOX_CHECK_EQ(element->kind(), kNestedField); auto* nestedField = static_cast(element); - container = container->getOrCreateChild(nestedField->name()); + container = container->getOrCreateChild(nestedField->name(), isTempNode); } return container; } @@ -54,15 +56,15 @@ bool ScanSpec::compareTimeToDropValue( } // Integer filters are before other filters if there is no // history data. - if (left->filter_ && right->filter_) { - return left->filter_->kind() < right->filter_->kind(); + if (!left->filters_.empty() && !right->filters_.empty()) { + return left->filters_.back()->kind() < right->filters_.back()->kind(); } // If hasFilter() is true but 'filter_' is nullptr, we have a filter // on complex type members. The simple type filter goes first. - if (left->filter_) { + if (!left->filters_.empty()) { return true; } - if (right->filter_) { + if (!right->filters_.empty()) { return false; } return left->fieldName_ < right->fieldName_; @@ -101,6 +103,19 @@ void ScanSpec::enableFilterInSubTree(bool value) { } } +void ScanSpec::deleteTempNodes() { + for (auto it = children_.begin(); it != children_.end();) { + if ((*it)->isTempNode()) { + it = children_.erase(it); + } else { + if ((*it)->hasTempFilter()) { + (*it)->restoreFilter(); + } + ++it; + } + } +} + const std::vector& ScanSpec::stableChildren() { std::lock_guard l(mutex_); if (stableChildren_.empty()) { @@ -160,7 +175,7 @@ void ScanSpec::moveAdaptationFrom(ScanSpec& other) { // constant will have been evaluated at split start time. If // 'child' is constant there is no adaptation that can be // received. - child->filter_ = std::move(otherChild->filter_); + child->filters_ = std::move(otherChild->filters_); child->selectivity_ = otherChild->selectivity_; } } @@ -363,8 +378,8 @@ std::string ScanSpec::toString() const { std::stringstream out; if (!fieldName_.empty()) { out << fieldName_; - if (filter_) { - out << " filter " << filter_->toString(); + if (!filters_.empty()) { + out << " filter " << filters_.back()->toString(); } if (isConstant()) { out << " constant"; @@ -384,7 +399,7 @@ std::string ScanSpec::toString() const { } void ScanSpec::addFilter(const Filter& filter) { - filter_ = filter_ ? filter_->mergeWith(&filter) : filter.clone(); + updateFilter(filter.clone()); } ScanSpec* ScanSpec::addField(const std::string& name, column_index_t channel) { diff --git a/velox/dwio/common/ScanSpec.h b/velox/dwio/common/ScanSpec.h index 794b335e988bc..100db3f6b65af 100644 --- a/velox/dwio/common/ScanSpec.h +++ b/velox/dwio/common/ScanSpec.h @@ -52,19 +52,33 @@ class ScanSpec { static constexpr const char* kMapValuesFieldName = "values"; static constexpr const char* kArrayElementsFieldName = "elements"; - explicit ScanSpec(const std::string& name) : fieldName_(name) {} + explicit ScanSpec(const std::string& name, bool isTempNode = false) + : fieldName_(name), isTempNode_(isTempNode) {} // Filter to apply. If 'this' corresponds to a struct/list/map, this // can only be isNull or isNotNull, other filtering is given by // 'children'. common::Filter* filter() const { - return filterDisabled_ ? nullptr : filter_.get(); + return filterDisabled_ + ? nullptr + : (filters_.empty() ? nullptr : filters_.back().get()); } // Sets 'filter_'. May be used at initialization or when adding a // pushed down filter, e.g. top k cutoff. void setFilter(std::unique_ptr filter) { - filter_ = std::move(filter); + filters_.push_back(std::move(filter)); + } + + void updateFilter(std::unique_ptr newFilter) { + if (!filters_.empty()) { + newFilter = newFilter->mergeWith(filters_.back().get()); + } + filters_.push_back(std::move(newFilter)); + } + + void restoreFilter() { + filters_.pop_back(); } void addFilter(const Filter&); @@ -198,13 +212,15 @@ class ScanSpec { /// Returns the ScanSpec corresponding to 'name'. Creates it if needed without /// any intermediate level. - ScanSpec* getOrCreateChild(const std::string& name); + ScanSpec* getOrCreateChild(const std::string& name, bool isTempNode = false); // Returns the ScanSpec corresponding to 'subfield'. Creates it if // needed, including any intermediate levels. This is used at // TableScan initialization to create the ScanSpec tree that // corresponds to the ColumnReader tree. - ScanSpec* getOrCreateChild(const Subfield& subfield); + ScanSpec* getOrCreateChild(const Subfield& subfield, bool isTempNode = false); + + void deleteTempNodes(); ScanSpec* childByName(const std::string& name) const { auto it = childByFieldName_.find(name); @@ -361,6 +377,18 @@ class ScanSpec { isFlatMapAsStruct_ = value; } + bool isTempNode() const { + return isTempNode_; + } + + void setHasTempFilter(bool hasTempFilter) { + hasTempFilter_ = hasTempFilter; + } + + bool hasTempFilter() const { + return hasTempFilter_; + } + private: void reorder(); @@ -399,9 +427,12 @@ class ScanSpec { // True if a string dictionary or flat map in this field should be // returned as flat. bool makeFlat_ = false; - std::unique_ptr filter_; + //<<<<<<< HEAD + // std::unique_ptr filter_; bool filterDisabled_ = false; dwio::common::DeltaColumnUpdater* deltaUpdate_ = nullptr; + //======= + std::vector> filters_; // Filters that will be only used for row group filtering based on metadata. // The conjunctions among these filters are tracked in MetadataFilter, with @@ -438,6 +469,11 @@ class ScanSpec { // This node represents a flat map column that need to be read as struct, // i.e. in table schema it is a MAP, but in result vector it is ROW. bool isFlatMapAsStruct_ = false; + + // This node is temporary, will be used and deleted after intermediate + // processing stages, like Iceberg equality deletes. + bool isTempNode_ = false; + bool hasTempFilter_ = false; }; template diff --git a/velox/dwio/common/TypeWithId.cpp b/velox/dwio/common/TypeWithId.cpp index 03328024ef67f..b50451098bcfd 100644 --- a/velox/dwio/common/TypeWithId.cpp +++ b/velox/dwio/common/TypeWithId.cpp @@ -78,6 +78,7 @@ std::unique_ptr TypeWithId::create( uint32_t next = 1; std::vector> children(type->size()); for (int i = 0, size = type->size(); i < size; ++i) { + // There is no guarantee that the spec contains auto* childSpec = spec.childByName(type->nameOf(i)); if (childSpec && !childSpec->isConstant()) { children[i] = create(type->childAt(i), next, i); diff --git a/velox/expression/Expr.cpp b/velox/expression/Expr.cpp index d4d0404cea1bb..ce38f7a0d1b9a 100644 --- a/velox/expression/Expr.cpp +++ b/velox/expression/Expr.cpp @@ -1776,6 +1776,27 @@ ExprSet::ExprSet( } } +ExprSet::ExprSet( + const std::vector>& sources, + core::ExecCtx* execCtx) + : execCtx_(execCtx) { + clear(); + exprs_ = sources; + std::vector allDistinctFields; + for (auto& expr : exprs_) { + Expr::mergeFields( + distinctFields_, multiplyReferencedFields_, expr->distinctFields()); + } +} + +void ExprSet::operator=(const ExprSet& another) { + exprs_ = another.exprs_; + distinctFields_ = another.distinctFields_; + multiplyReferencedFields_ = another.multiplyReferencedFields_; + toReset_ = another.toReset_; + memoizingExprs_ = another.memoizingExprs_; +} + namespace { void addStats( const exec::Expr& expr, diff --git a/velox/expression/Expr.h b/velox/expression/Expr.h index 863759914b37a..eee411cd854fd 100644 --- a/velox/expression/Expr.h +++ b/velox/expression/Expr.h @@ -45,7 +45,9 @@ DECLARE_string(velox_save_input_on_expression_system_failure_path); namespace facebook::velox::exec { class ExprSet; + class FieldReference; + class VectorFunction; struct ExprStats { @@ -722,6 +724,12 @@ class ExprSet { core::ExecCtx* execCtx, bool enableConstantFolding = true); + ExprSet( + const std::vector>& sources, + core::ExecCtx* execCtx); + + void operator=(const ExprSet& another); + virtual ~ExprSet(); // Initialize and evaluate all expressions available in this ExprSet. diff --git a/velox/expression/tests/SimpleFunctionTest.cpp b/velox/expression/tests/SimpleFunctionTest.cpp index b800cf1a712e0..e3de1a8c0ad12 100644 --- a/velox/expression/tests/SimpleFunctionTest.cpp +++ b/velox/expression/tests/SimpleFunctionTest.cpp @@ -999,7 +999,7 @@ VectorPtr testVariadicArgReuse( // Create a dummy EvalCtx. SelectivityVector rows(inputs[0]->size()); - exec::ExprSet exprSet({}, execCtx); + exec::ExprSet exprSet(std::vector{}, execCtx); RowVectorPtr inputRows = vectorMaker.rowVector({}); exec::EvalCtx evalCtx(execCtx, &exprSet, inputRows.get()); diff --git a/velox/functions/prestosql/tests/ElementAtTest.cpp b/velox/functions/prestosql/tests/ElementAtTest.cpp index 64b7f70855432..e98a7826a3e93 100644 --- a/velox/functions/prestosql/tests/ElementAtTest.cpp +++ b/velox/functions/prestosql/tests/ElementAtTest.cpp @@ -134,7 +134,7 @@ class ElementAtTest : public FunctionBaseTest { test::assertEqualVectors(expected, result); } // case 4: Verify NaNs are identified when employing caching. - exec::ExprSet exprSet({}, &execCtx_); + exec::ExprSet exprSet(std::vector{}, &execCtx_); auto inputs = makeRowVector({}); exec::EvalCtx evalCtx(&execCtx_, &exprSet, inputs.get()); @@ -1060,7 +1060,7 @@ TEST_F(ElementAtTest, testCachingOptimization) { } // Make a dummy eval context. - exec::ExprSet exprSet({}, &execCtx_); + exec::ExprSet exprSet(std::vector{}, &execCtx_); auto inputs = makeRowVector({}); exec::EvalCtx evalCtx(&execCtx_, &exprSet, inputs.get()); @@ -1197,7 +1197,7 @@ TEST_F(ElementAtTest, testCachingOptimizationComplexKey) { } // Make a dummy eval context. - exec::ExprSet exprSet({}, &execCtx_); + exec::ExprSet exprSet(std::vector{}, &execCtx_); auto inputs = makeRowVector({}); exec::EvalCtx evalCtx(&execCtx_, &exprSet, inputs.get()); @@ -1370,7 +1370,7 @@ TEST_F(ElementAtTest, timestampWithTimeZone) { TEST_F(ElementAtTest, timestampWithTimeZoneWithCaching) { auto testCaching = [&](std::vector&& args, const VectorPtr& expected) { - exec::ExprSet exprSet({}, &execCtx_); + exec::ExprSet exprSet(std::vector{}, &execCtx_); const auto inputs = makeRowVector({}); exec::EvalCtx evalCtx(&execCtx_, &exprSet, inputs.get()); diff --git a/velox/functions/prestosql/tests/StringFunctionsTest.cpp b/velox/functions/prestosql/tests/StringFunctionsTest.cpp index 39555e5ed2ce3..583891c6b08ee 100644 --- a/velox/functions/prestosql/tests/StringFunctionsTest.cpp +++ b/velox/functions/prestosql/tests/StringFunctionsTest.cpp @@ -1416,7 +1416,7 @@ void StringFunctionsTest::testReplaceInPlace( : exec::getVectorFunction( "replace", {VARCHAR(), VARCHAR()}, {}, config); SelectivityVector rows(tests.size()); - ExprSet exprSet({}, &execCtx_); + ExprSet exprSet(std::vector{}, &execCtx_); RowVectorPtr inputRows = makeRowVector({}); exec::EvalCtx evalCtx(&execCtx_, &exprSet, inputRows.get()); replaceFunction->apply(rows, functionInputs, VARCHAR(), evalCtx, resultPtr); diff --git a/velox/type/Filter.cpp b/velox/type/Filter.cpp index d0f6932b4aad9..f474694fcf34f 100644 --- a/velox/type/Filter.cpp +++ b/velox/type/Filter.cpp @@ -1180,7 +1180,7 @@ std::unique_ptr createBigintValuesFilter( std::unique_ptr createBigintValues( const std::vector& values, bool nullAllowed) { - return createBigintValuesFilter(values, nullAllowed, false); + return common::createBigintValuesFilter(values, nullAllowed, false); } std::unique_ptr createHugeintValues( @@ -1196,7 +1196,7 @@ std::unique_ptr createHugeintValues( std::unique_ptr createNegatedBigintValues( const std::vector& values, bool nullAllowed) { - return createBigintValuesFilter(values, nullAllowed, true); + return common::createBigintValuesFilter(values, nullAllowed, true); } BigintMultiRange::BigintMultiRange(