diff --git a/velox/common/base/BitSet.h b/velox/common/base/BitSet.h index baf2d9a6f2f05..765eb1f7073af 100644 --- a/velox/common/base/BitSet.h +++ b/velox/common/base/BitSet.h @@ -46,7 +46,7 @@ class BitSet { bits::setBit(bits_.data(), bit, true); } - bool contains(uint32_t index) { + bool contains(uint32_t index) const { uint64_t bit = index - min_; if (bit >= bits_.size() * 64) { // If index was < min_, bit will have wrapped around and will be > diff --git a/velox/common/io/Options.h b/velox/common/io/Options.h index 00d669eb8e3f1..93f89fe292f36 100644 --- a/velox/common/io/Options.h +++ b/velox/common/io/Options.h @@ -68,22 +68,6 @@ class ReaderOptions { autoPreloadLength_(DEFAULT_AUTO_PRELOAD_SIZE), prefetchMode_(PrefetchMode::PREFETCH) {} - ReaderOptions& operator=(const ReaderOptions& other) { - memoryPool_ = other.memoryPool_; - autoPreloadLength_ = other.autoPreloadLength_; - prefetchMode_ = other.prefetchMode_; - maxCoalesceDistance_ = other.maxCoalesceDistance_; - maxCoalesceBytes_ = other.maxCoalesceBytes_; - prefetchRowGroups_ = other.prefetchRowGroups_; - loadQuantum_ = other.loadQuantum_; - noCacheRetention_ = other.noCacheRetention_; - return *this; - } - - ReaderOptions(const ReaderOptions& other) { - *this = other; - } - /// Sets the memory pool for allocation. ReaderOptions& setMemoryPool(velox::memory::MemoryPool& pool) { memoryPool_ = &pool; diff --git a/velox/connectors/hive/HiveConnectorUtil.cpp b/velox/connectors/hive/HiveConnectorUtil.cpp index a7440c4af0b71..0eea06b53d571 100644 --- a/velox/connectors/hive/HiveConnectorUtil.cpp +++ b/velox/connectors/hive/HiveConnectorUtil.cpp @@ -572,8 +572,7 @@ void configureRowReaderOptions( } rowReaderOptions.setScanSpec(scanSpec); rowReaderOptions.setMetadataFilter(std::move(metadataFilter)); - rowReaderOptions.select( - dwio::common::ColumnSelector::fromScanSpec(*scanSpec, rowType)); + rowReaderOptions.setRequestedType(rowType); rowReaderOptions.range(hiveSplit->start, hiveSplit->length); } diff --git a/velox/connectors/hive/SplitReader.cpp b/velox/connectors/hive/SplitReader.cpp index 5e7e0a2bb81fa..229117aa6f6ff 100644 --- a/velox/connectors/hive/SplitReader.cpp +++ b/velox/connectors/hive/SplitReader.cpp @@ -138,20 +138,21 @@ void SplitReader::configureReaderOptions( hiveTableHandle_, hiveSplit_); baseReaderOpts_.setRandomSkip(std::move(randomSkip)); + baseReaderOpts_.setScanSpec(scanSpec_); } void SplitReader::prepareSplit( std::shared_ptr metadataFilter, dwio::common::RuntimeStatistics& runtimeStats, const std::shared_ptr& rowIndexColumn) { - createReader(); + createReader(std::move(metadataFilter), rowIndexColumn); if (checkIfSplitIsEmpty(runtimeStats)) { VELOX_CHECK(emptySplit_); return; } - createRowReader(std::move(metadataFilter), rowIndexColumn); + createRowReader(); } uint64_t SplitReader::next(uint64_t size, VectorPtr& output) { @@ -218,7 +219,9 @@ std::string SplitReader::toString() const { static_cast(baseRowReader_.get())); } -void SplitReader::createReader() { +void SplitReader::createReader( + std::shared_ptr metadataFilter, + const std::shared_ptr& rowIndexColumn) { VELOX_CHECK_NE( baseReaderOpts_.fileFormat(), dwio::common::FileFormat::UNKNOWN); @@ -255,6 +258,20 @@ void SplitReader::createReader() { baseReader_ = dwio::common::getReaderFactory(baseReaderOpts_.fileFormat()) ->createReader(std::move(baseFileInput), baseReaderOpts_); + + auto& fileType = baseReader_->rowType(); + auto columnTypes = adaptColumns(fileType, baseReaderOpts_.fileSchema()); + auto columnNames = fileType->names(); + if (rowIndexColumn != nullptr) { + setRowIndexColumn(fileType, rowIndexColumn); + } + configureRowReaderOptions( + baseRowReaderOpts_, + hiveTableHandle_->tableParameters(), + scanSpec_, + std::move(metadataFilter), + ROW(std::move(columnNames), std::move(columnTypes)), + hiveSplit_); } bool SplitReader::checkIfSplitIsEmpty( @@ -286,23 +303,7 @@ bool SplitReader::checkIfSplitIsEmpty( return emptySplit_; } -void SplitReader::createRowReader( - std::shared_ptr metadataFilter, - const std::shared_ptr& rowIndexColumn) { - auto& fileType = baseReader_->rowType(); - auto columnTypes = adaptColumns(fileType, baseReaderOpts_.fileSchema()); - auto columnNames = fileType->names(); - if (rowIndexColumn != nullptr) { - setRowIndexColumn(fileType, rowIndexColumn); - } - - configureRowReaderOptions( - baseRowReaderOpts_, - hiveTableHandle_->tableParameters(), - scanSpec_, - std::move(metadataFilter), - ROW(std::move(columnNames), std::move(columnTypes)), - hiveSplit_); +void SplitReader::createRowReader() { // NOTE: we firstly reset the finished 'baseRowReader_' of previous split // before setting up for the next one to avoid doubling the peak memory usage. baseRowReader_.reset(); diff --git a/velox/connectors/hive/SplitReader.h b/velox/connectors/hive/SplitReader.h index acd92441c92f4..1bc021e7c6b79 100644 --- a/velox/connectors/hive/SplitReader.h +++ b/velox/connectors/hive/SplitReader.h @@ -115,7 +115,9 @@ class SplitReader { protected: /// Create the dwio::common::Reader object baseReader_, which will be used to /// read the data file's metadata and schema - void createReader(); + void createReader( + std::shared_ptr metadataFilter, + const std::shared_ptr& rowIndexColumn); /// Check if the hiveSplit_ is empty. The split is considered empty when /// 1) The data file is missing but the user chooses to ignore it @@ -127,9 +129,7 @@ class SplitReader { /// Create the dwio::common::RowReader object baseRowReader_, which owns the /// ColumnReaders that will be used to read the data - void createRowReader( - std::shared_ptr metadataFilter, - const std::shared_ptr& rowIndexColumn); + void createRowReader(); /// Different table formats may have different meatadata columns. /// This function will be used to update the scanSpec for these columns. diff --git a/velox/connectors/hive/iceberg/IcebergSplitReader.cpp b/velox/connectors/hive/iceberg/IcebergSplitReader.cpp index be59e7170206a..d56347cf39bfe 100644 --- a/velox/connectors/hive/iceberg/IcebergSplitReader.cpp +++ b/velox/connectors/hive/iceberg/IcebergSplitReader.cpp @@ -54,14 +54,14 @@ void IcebergSplitReader::prepareSplit( std::shared_ptr metadataFilter, dwio::common::RuntimeStatistics& runtimeStats, const std::shared_ptr& rowIndexColumn) { - createReader(); + createReader(std::move(metadataFilter), rowIndexColumn); if (checkIfSplitIsEmpty(runtimeStats)) { VELOX_CHECK(emptySplit_); return; } - createRowReader(metadataFilter, rowIndexColumn); + createRowReader(); std::shared_ptr icebergSplit = std::dynamic_pointer_cast(hiveSplit_); diff --git a/velox/connectors/hive/tests/HiveConnectorUtilTest.cpp b/velox/connectors/hive/tests/HiveConnectorUtilTest.cpp index f66e75607166b..393e03bc110d1 100644 --- a/velox/connectors/hive/tests/HiveConnectorUtilTest.cpp +++ b/velox/connectors/hive/tests/HiveConnectorUtilTest.cpp @@ -205,11 +205,6 @@ TEST_F(HiveConnectorUtilTest, configureRowReaderOptions) { float_features->childByName(common::ScanSpec::kMapKeysFieldName) ->setFilter(common::createBigintValues({1, 3}, false)); float_features->setFlatMapFeatureSelection({"1", "3"}); - RowReaderOptions options; - configureRowReaderOptions(options, {}, spec, nullptr, rowType, split); - auto& nodes = options.getSelector()->getProjection(); - ASSERT_EQ(nodes.size(), 1); - ASSERT_EQ(nodes[0].expression, "[1,3]"); } } // namespace facebook::velox::connector diff --git a/velox/dwio/common/FormatData.h b/velox/dwio/common/FormatData.h index 1f0b5d4426bb8..d0d37006a2a8b 100644 --- a/velox/dwio/common/FormatData.h +++ b/velox/dwio/common/FormatData.h @@ -17,7 +17,6 @@ #pragma once #include "velox/common/memory/Memory.h" -#include "velox/dwio/common/ColumnSelector.h" #include "velox/dwio/common/ScanSpec.h" #include "velox/dwio/common/SeekableInputStream.h" #include "velox/dwio/common/Statistics.h" diff --git a/velox/dwio/common/Options.h b/velox/dwio/common/Options.h index 57413cab5c1ee..202440953aead 100644 --- a/velox/dwio/common/Options.h +++ b/velox/dwio/common/Options.h @@ -126,6 +126,7 @@ class RowReaderOptions { bool returnFlatVector_ = false; ErrorTolerance errorTolerance_; std::shared_ptr selector_; + RowTypePtr requestedType_; std::shared_ptr scanSpec_ = nullptr; std::shared_ptr metadataFilter_; // Node id for map column to a list of keys to be projected as a struct. @@ -177,6 +178,10 @@ class RowReaderOptions { */ RowReaderOptions& select(const std::shared_ptr& selector) { selector_ = selector; + if (selector) { + VELOX_CHECK_NULL(requestedType_); + requestedType_ = selector->getSchema(); + } return *this; } @@ -295,6 +300,15 @@ class RowReaderOptions { return errorTolerance_; } + const RowTypePtr& requestedType() const { + return requestedType_; + } + + void setRequestedType(RowTypePtr requestedType) { + VELOX_CHECK_NULL(selector_); + requestedType_ = std::move(requestedType); + } + const std::shared_ptr& getScanSpec() const { return scanSpec_; } @@ -440,33 +454,6 @@ class ReaderOptions : public io::ReaderOptions { fileFormat_(FileFormat::UNKNOWN), fileSchema_(nullptr) {} - ReaderOptions& operator=(const ReaderOptions& other) { - io::ReaderOptions::operator=(other); - tailLocation_ = other.tailLocation_; - fileFormat_ = other.fileFormat_; - fileSchema_ = other.fileSchema_; - serDeOptions_ = other.serDeOptions_; - decrypterFactory_ = other.decrypterFactory_; - footerEstimatedSize_ = other.footerEstimatedSize_; - filePreloadThreshold_ = other.filePreloadThreshold_; - fileColumnNamesReadAsLowerCase_ = other.fileColumnNamesReadAsLowerCase_; - useColumnNamesForColumnMapping_ = other.useColumnNamesForColumnMapping_; - return *this; - } - - ReaderOptions(const ReaderOptions& other) - : io::ReaderOptions(other), - tailLocation_(other.tailLocation_), - fileFormat_(other.fileFormat_), - fileSchema_(other.fileSchema_), - serDeOptions_(other.serDeOptions_), - decrypterFactory_(other.decrypterFactory_), - footerEstimatedSize_(other.footerEstimatedSize_), - filePreloadThreshold_(other.filePreloadThreshold_), - fileColumnNamesReadAsLowerCase_(other.fileColumnNamesReadAsLowerCase_), - useColumnNamesForColumnMapping_(other.useColumnNamesForColumnMapping_) { - } - /// Sets the format of the file, such as "rc" or "dwrf". The default is /// "dwrf". ReaderOptions& setFileFormat(FileFormat format) { @@ -589,6 +576,14 @@ class ReaderOptions : public io::ReaderOptions { noCacheRetention_ = noCacheRetention; } + const std::shared_ptr& scanSpec() const { + return scanSpec_; + } + + void setScanSpec(std::shared_ptr scanSpec) { + scanSpec_ = std::move(scanSpec); + } + private: uint64_t tailLocation_; FileFormat fileFormat_; @@ -601,6 +596,7 @@ class ReaderOptions : public io::ReaderOptions { bool useColumnNamesForColumnMapping_{false}; std::shared_ptr ioExecutor_; std::shared_ptr randomSkip_; + std::shared_ptr scanSpec_; }; struct WriterOptions { diff --git a/velox/dwio/common/SelectiveColumnReader.h b/velox/dwio/common/SelectiveColumnReader.h index 361ff1a657626..77e042d4a9402 100644 --- a/velox/dwio/common/SelectiveColumnReader.h +++ b/velox/dwio/common/SelectiveColumnReader.h @@ -19,7 +19,6 @@ #include "velox/common/memory/Memory.h" #include "velox/common/process/ProcessBase.h" #include "velox/common/process/TraceHistory.h" -#include "velox/dwio/common/ColumnSelector.h" #include "velox/dwio/common/FormatData.h" #include "velox/dwio/common/IntDecoder.h" #include "velox/dwio/common/Mutation.h" diff --git a/velox/dwio/common/SelectiveRepeatedColumnReader.cpp b/velox/dwio/common/SelectiveRepeatedColumnReader.cpp index 7b164245f0f95..9ff30eebd5f75 100644 --- a/velox/dwio/common/SelectiveRepeatedColumnReader.cpp +++ b/velox/dwio/common/SelectiveRepeatedColumnReader.cpp @@ -189,7 +189,7 @@ RowSet SelectiveRepeatedColumnReader::applyFilter(RowSet rows) { } SelectiveListColumnReader::SelectiveListColumnReader( - const std::shared_ptr& requestedType, + const TypePtr& requestedType, const std::shared_ptr& fileType, FormatParams& params, velox::common::ScanSpec& scanSpec) @@ -197,8 +197,7 @@ SelectiveListColumnReader::SelectiveListColumnReader( fileType->type(), params, scanSpec, - fileType), - requestedType_{requestedType} {} + fileType) {} uint64_t SelectiveListColumnReader::skip(uint64_t numValues) { numValues = formatData_->skipNulls(numValues); @@ -241,19 +240,19 @@ void SelectiveListColumnReader::read( void SelectiveListColumnReader::getValues(RowSet rows, VectorPtr* result) { VELOX_DCHECK_NOT_NULL(result); - prepareResult(*result, requestedType_->type(), rows.size(), &memoryPool_); + prepareResult(*result, requestedType_, rows.size(), &memoryPool_); auto* resultArray = result->get()->asUnchecked(); makeOffsetsAndSizes(rows, *resultArray); result->get()->setNulls(resultNulls()); if (child_ && !nestedRows_.empty()) { auto& elements = resultArray->elements(); - prepareStructResult(requestedType_->type()->childAt(0), &elements); + prepareStructResult(requestedType_->childAt(0), &elements); child_->getValues(nestedRows_, &elements); } } SelectiveMapColumnReader::SelectiveMapColumnReader( - const std::shared_ptr& requestedType, + const TypePtr& requestedType, const std::shared_ptr& fileType, FormatParams& params, velox::common::ScanSpec& scanSpec) @@ -261,8 +260,7 @@ SelectiveMapColumnReader::SelectiveMapColumnReader( fileType->type(), params, scanSpec, - fileType), - requestedType_{requestedType} {} + fileType) {} uint64_t SelectiveMapColumnReader::skip(uint64_t numValues) { numValues = formatData_->skipNulls(numValues); @@ -326,7 +324,7 @@ void SelectiveMapColumnReader::getValues(RowSet rows, VectorPtr* result) { !result->get() || result->get()->type()->isMap(), "Expect MAP result vector, got {}", result->get()->type()->toString()); - prepareResult(*result, requestedType_->type(), rows.size(), &memoryPool_); + prepareResult(*result, requestedType_, rows.size(), &memoryPool_); auto* resultMap = result->get()->asUnchecked(); makeOffsetsAndSizes(rows, *resultMap); result->get()->setNulls(resultNulls()); @@ -337,7 +335,7 @@ void SelectiveMapColumnReader::getValues(RowSet rows, VectorPtr* result) { if (!nestedRows_.empty()) { keyReader_->getValues(nestedRows_, &resultMap->mapKeys()); auto& values = resultMap->mapValues(); - prepareStructResult(requestedType_->type()->childAt(1), &values); + prepareStructResult(requestedType_->childAt(1), &values); elementReader_->getValues(nestedRows_, &values); } } diff --git a/velox/dwio/common/SelectiveRepeatedColumnReader.h b/velox/dwio/common/SelectiveRepeatedColumnReader.h index aae3a67e6f5e0..1d486acb47181 100644 --- a/velox/dwio/common/SelectiveRepeatedColumnReader.h +++ b/velox/dwio/common/SelectiveRepeatedColumnReader.h @@ -85,7 +85,7 @@ class SelectiveRepeatedColumnReader : public SelectiveColumnReader { class SelectiveListColumnReader : public SelectiveRepeatedColumnReader { public: SelectiveListColumnReader( - const std::shared_ptr& requestedType, + const TypePtr& requestedType, const std::shared_ptr& fileType, FormatParams& params, velox::common::ScanSpec& scanSpec); @@ -103,13 +103,12 @@ class SelectiveListColumnReader : public SelectiveRepeatedColumnReader { protected: std::unique_ptr child_; - const std::shared_ptr requestedType_; }; class SelectiveMapColumnReader : public SelectiveRepeatedColumnReader { public: SelectiveMapColumnReader( - const std::shared_ptr& requestedType, + const TypePtr& requestedType, const std::shared_ptr& fileType, FormatParams& params, velox::common::ScanSpec& scanSpec); @@ -128,7 +127,6 @@ class SelectiveMapColumnReader : public SelectiveRepeatedColumnReader { std::unique_ptr keyReader_; std::unique_ptr elementReader_; - const std::shared_ptr requestedType_; }; } // namespace facebook::velox::dwio::common diff --git a/velox/dwio/common/SelectiveStructColumnReader.h b/velox/dwio/common/SelectiveStructColumnReader.h index 90d8fd8d84997..b74f89929eed3 100644 --- a/velox/dwio/common/SelectiveStructColumnReader.h +++ b/velox/dwio/common/SelectiveStructColumnReader.h @@ -111,13 +111,12 @@ class SelectiveStructColumnReaderBase : public SelectiveColumnReader { static constexpr int32_t kConstantChildSpecSubscript = -1; SelectiveStructColumnReaderBase( - const std::shared_ptr& requestedType, + const TypePtr& requestedType, const std::shared_ptr& fileType, FormatParams& params, velox::common::ScanSpec& scanSpec, bool isRoot = false) - : SelectiveColumnReader(fileType->type(), fileType, params, scanSpec), - requestedType_(requestedType), + : SelectiveColumnReader(requestedType, fileType, params, scanSpec), debugString_( getExceptionContext().message(VeloxException::Type::kSystem)), isRoot_(isRoot) {} @@ -138,8 +137,6 @@ class SelectiveStructColumnReaderBase : public SelectiveColumnReader { void fillOutputRowsFromMutation(vector_size_t size); - const std::shared_ptr requestedType_; - std::vector children_; // Sequence number of output batch. Checked against ColumnLoaders @@ -210,7 +207,7 @@ class SelectiveFlatMapColumnReaderHelper { reader_.children_[i] = keyNodes_[i].reader.get(); reader_.children_[i]->setIsFlatMapValue(true); } - if (auto type = reader_.requestedType_->type()->childAt(1); type->isRow()) { + if (auto type = reader_.requestedType_->childAt(1); type->isRow()) { childValues_ = BaseVector::create(type, 0, &reader_.memoryPool_); } } @@ -228,7 +225,7 @@ class SelectiveFlatMapColumnReaderHelper { } else { VLOG(1) << "Reallocating result MAP vector of size " << size; result = BaseVector::create( - reader_.requestedType_->type(), size, &reader_.memoryPool_); + reader_.requestedType_, size, &reader_.memoryPool_); } return *result->asUnchecked(); } diff --git a/velox/dwio/common/TypeWithId.cpp b/velox/dwio/common/TypeWithId.cpp index e303bd448748d..03328024ef67f 100644 --- a/velox/dwio/common/TypeWithId.cpp +++ b/velox/dwio/common/TypeWithId.cpp @@ -15,6 +15,7 @@ */ #include "velox/dwio/common/TypeWithId.h" + #include "velox/dwio/common/exception/Exception.h" namespace facebook::velox::dwio::common { @@ -47,7 +48,9 @@ TypeWithId::TypeWithId( column_{column}, children_{toShared(std::move(children))} { for (auto& child : children_) { - const_cast(child->parent_) = this; + if (child) { + const_cast(child->parent_) = this; + } } } @@ -57,6 +60,35 @@ std::unique_ptr TypeWithId::create( return create(root, next, 0); } +namespace { + +int countNodes(const TypePtr& type) { + int count = 1; + for (auto& child : *type) { + count += countNodes(child); + } + return count; +} + +} // namespace + +std::unique_ptr TypeWithId::create( + const RowTypePtr& type, + const velox::common::ScanSpec& spec) { + uint32_t next = 1; + std::vector> children(type->size()); + for (int i = 0, size = type->size(); i < size; ++i) { + auto* childSpec = spec.childByName(type->nameOf(i)); + if (childSpec && !childSpec->isConstant()) { + children[i] = create(type->childAt(i), next, i); + } else { + next += countNodes(type->childAt(i)); + } + } + return std::make_unique( + type, std::move(children), 0, next - 1, 0); +} + uint32_t TypeWithId::size() const { return children_.size(); } diff --git a/velox/dwio/common/TypeWithId.h b/velox/dwio/common/TypeWithId.h index 5c5fbc5d070c7..a147cfe5066fc 100644 --- a/velox/dwio/common/TypeWithId.h +++ b/velox/dwio/common/TypeWithId.h @@ -18,6 +18,7 @@ #include #include +#include "velox/dwio/common/ScanSpec.h" #include "velox/type/Type.h" namespace facebook::velox::dwio::common { @@ -39,6 +40,13 @@ class TypeWithId : public velox::Tree> { const std::shared_ptr& root, uint32_t next = 0); + /// Create TypeWithId node but leave all the unselected children as nullptr. + /// The ids are set correctly even when some of the previous nodes are not + /// selected. + static std::unique_ptr create( + const RowTypePtr& type, + const velox::common::ScanSpec& spec); + uint32_t size() const override; const std::shared_ptr& type() const { diff --git a/velox/dwio/dwrf/reader/BinaryStreamReader.cpp b/velox/dwio/dwrf/reader/BinaryStreamReader.cpp index fe0fd4b132e33..31a2266fe09ba 100644 --- a/velox/dwio/dwrf/reader/BinaryStreamReader.cpp +++ b/velox/dwio/dwrf/reader/BinaryStreamReader.cpp @@ -37,7 +37,8 @@ BinaryStripeStreams::BinaryStripeStreams( stripeReader.fetchStripe(stripeIndex, preload_))}, stripeStreams_{ stripeReadState_, - selector, + &selector, + nullptr, options_, stripeReadState_->stripeMetadata->stripeInfo.offset(), static_cast( diff --git a/velox/dwio/dwrf/reader/DwrfData.h b/velox/dwio/dwrf/reader/DwrfData.h index f48aa11336752..5e847c9132f0e 100644 --- a/velox/dwio/dwrf/reader/DwrfData.h +++ b/velox/dwio/dwrf/reader/DwrfData.h @@ -17,7 +17,6 @@ #pragma once #include "velox/common/memory/Memory.h" -#include "velox/dwio/common/ColumnSelector.h" #include "velox/dwio/common/FormatData.h" #include "velox/dwio/common/TypeWithId.h" #include "velox/dwio/common/compression/Compression.h" diff --git a/velox/dwio/dwrf/reader/DwrfReader.cpp b/velox/dwio/dwrf/reader/DwrfReader.cpp index e505be0334217..77e8be4a15fed 100644 --- a/velox/dwio/dwrf/reader/DwrfReader.cpp +++ b/velox/dwio/dwrf/reader/DwrfReader.cpp @@ -43,12 +43,14 @@ class DwrfUnit : public LoadUnit { dwio::common::ColumnReaderStatistics& columnReaderStatistics, uint32_t stripeIndex, std::shared_ptr columnSelector, + const std::shared_ptr& projectedNodes, RowReaderOptions options) : stripeReaderBase_{stripeReaderBase}, strideIndexProvider_{strideIndexProvider}, columnReaderStatistics_{columnReaderStatistics}, stripeIndex_{stripeIndex}, columnSelector_{std::move(columnSelector)}, + projectedNodes_{projectedNodes}, options_{std::move(options)}, stripeInfo_{ stripeReaderBase.getReader().getFooter().stripes(stripeIndex_)} {} @@ -86,6 +88,7 @@ class DwrfUnit : public LoadUnit { dwio::common::ColumnReaderStatistics& columnReaderStatistics_; const uint32_t stripeIndex_; const std::shared_ptr columnSelector_; + std::shared_ptr projectedNodes_; const RowReaderOptions options_; const StripeInformationWrapper stripeInfo_; @@ -140,7 +143,8 @@ void DwrfUnit::ensureDecoders() { stripeStreams_ = std::make_unique( stripeReadState_, - *columnSelector_, + columnSelector_.get(), + projectedNodes_, options_, stripeInfo_.offset(), stripeInfo_.numberOfRows(), @@ -148,7 +152,6 @@ void DwrfUnit::ensureDecoders() { stripeIndex_); auto scanSpec = options_.getScanSpec().get(); - auto requestedType = columnSelector_->getSchemaWithId(); auto fileType = stripeReaderBase_.getReader().getSchemaWithId(); FlatMapContext flatMapContext; flatMapContext.keySelectionCallback = options_.getKeySelectionCallback(); @@ -157,7 +160,7 @@ void DwrfUnit::ensureDecoders() { if (scanSpec) { selectiveColumnReader_ = SelectiveDwrfReader::build( - requestedType, + options_.requestedType() ? options_.requestedType() : fileType->type(), fileType, *stripeStreams_, streamLabels, @@ -169,6 +172,7 @@ void DwrfUnit::ensureDecoders() { selectiveColumnReader_->setFillMutatedOutputRows( options_.getRowNumberColumnInfo().has_value()); } else { + auto requestedType = columnSelector_->getSchemaWithId(); columnReader_ = ColumnReader::build( // enqueue streams requestedType, fileType, @@ -204,6 +208,17 @@ DwrfUnit* castDwrfUnit(LoadUnit* unit) { return dwrfUnit; } +void makeProjectedNodes( + const dwio::common::TypeWithId& fileType, + BitSet& projectedNodes) { + projectedNodes.insert(fileType.id()); + for (auto& child : fileType.getChildren()) { + if (child) { + makeProjectedNodes(*child, projectedNodes); + } + } +} + } // namespace DwrfRowReader::DwrfRowReader( @@ -213,8 +228,12 @@ DwrfRowReader::DwrfRowReader( strideIndex_{0}, options_(opts), decodingTimeCallback_{options_.getDecodingTimeCallback()}, - columnSelector_{std::make_shared( - ColumnSelector::apply(opts.getSelector(), reader->getSchema()))}, + columnSelector_{ + options_.getScanSpec() + ? nullptr + : std::make_shared(ColumnSelector::apply( + opts.getSelector(), + reader->getSchema()))}, currentUnit_{nullptr} { auto& fileFooter = getReader().getFooter(); uint32_t numberOfStripes = fileFooter.stripesSize(); @@ -274,8 +293,13 @@ DwrfRowReader::DwrfRowReader( return exceptionMessageContext; }; - dwio::common::typeutils::checkTypeCompatibility( - *getReader().getSchema(), *columnSelector_, createExceptionContext); + if (columnSelector_) { + dwio::common::typeutils::checkTypeCompatibility( + *getReader().getSchema(), *columnSelector_, createExceptionContext); + } else { + projectedNodes_ = std::make_shared(0); + makeProjectedNodes(*getReader().getSchemaWithId(), *projectedNodes_); + } unitLoader_ = getUnitLoader(); } @@ -301,6 +325,7 @@ std::unique_ptr DwrfRowReader::getUnitLoader() { columnReaderStatistics_, stripe, columnSelector_, + projectedNodes_, options_)); } std::shared_ptr unitLoaderFactory = @@ -623,9 +648,17 @@ void DwrfRowReader::loadCurrentStripe() { } size_t DwrfRowReader::estimatedReaderMemory() const { + VELOX_CHECK_NOT_NULL(columnSelector_); return 2 * DwrfReader::getMemoryUse(getReader(), -1, *columnSelector_); } +bool DwrfRowReader::shouldReadNode(uint32_t nodeId) const { + if (columnSelector_) { + return columnSelector_->shouldReadNode(nodeId); + } + return projectedNodes_->contains(nodeId); +} + std::optional DwrfRowReader::estimatedRowSizeHelper( const FooterWrapper& fileFooter, const dwio::common::Statistics& stats, @@ -699,7 +732,7 @@ std::optional DwrfRowReader::estimatedRowSizeHelper( // start the estimate with the offsets and hasNulls vectors sizes size_t totalEstimate = valueCount * (sizeof(uint8_t) + sizeof(uint64_t)); for (int32_t i = 0; i < t.subtypesSize(); ++i) { - if (!columnSelector_->shouldReadNode(t.subtypes(i))) { + if (!shouldReadNode(t.subtypes(i))) { continue; } auto subtypeEstimate = @@ -752,7 +785,8 @@ DwrfReader::DwrfReader( options.fileFormat() == FileFormat::ORC ? FileFormat::ORC : FileFormat::DWRF, options.fileColumnNamesReadAsLowerCase(), - options.randomSkip())), + options.randomSkip(), + options.scanSpec())), options_(options) { // If we are not using column names to map table columns to file columns, // then we use indices. In that case we need to ensure the names completely diff --git a/velox/dwio/dwrf/reader/DwrfReader.h b/velox/dwio/dwrf/reader/DwrfReader.h index 9786f804871cb..65b9abf724be3 100644 --- a/velox/dwio/dwrf/reader/DwrfReader.h +++ b/velox/dwio/dwrf/reader/DwrfReader.h @@ -146,6 +146,8 @@ class DwrfRowReader : public StrideIndexProvider, // column selector std::shared_ptr columnSelector_; + std::shared_ptr projectedNodes_; + const uint64_t* stridesToSkip_; int stridesToSkipSize_; // Record of strides to skip in each visited stripe. Used for diagnostics. @@ -167,13 +169,18 @@ class DwrfRowReader : public StrideIndexProvider, // internal methods + bool shouldReadNode(uint32_t nodeId) const; + std::optional estimatedRowSizeHelper( const FooterWrapper& fileFooter, const dwio::common::Statistics& stats, uint32_t nodeId) const; std::shared_ptr getType() const { - return columnSelector_->getSchema(); + if (columnSelector_) { + return columnSelector_->getSchema(); + } + return options_.requestedType(); } bool isEmptyFile() const { diff --git a/velox/dwio/dwrf/reader/ReaderBase.cpp b/velox/dwio/dwrf/reader/ReaderBase.cpp index 085ce2c501f55..53d787f6ca134 100644 --- a/velox/dwio/dwrf/reader/ReaderBase.cpp +++ b/velox/dwio/dwrf/reader/ReaderBase.cpp @@ -95,14 +95,16 @@ ReaderBase::ReaderBase( uint64_t filePreloadThreshold, FileFormat fileFormat, bool fileColumnNamesReadAsLowerCase, - std::shared_ptr randomSkip) + std::shared_ptr randomSkip, + std::shared_ptr scanSpec) : pool_{pool}, arena_(std::make_unique()), decryptorFactory_(decryptorFactory), footerEstimatedSize_(footerEstimatedSize), filePreloadThreshold_(filePreloadThreshold), input_(std::move(input)), - randomSkip_(std::move(randomSkip)) { + randomSkip_(std::move(randomSkip)), + scanSpec_(std::move(scanSpec)) { process::TraceContext trace("ReaderBase::ReaderBase"); // read last bytes into buffer to get PostScript // If file is small, load the entire file. diff --git a/velox/dwio/dwrf/reader/ReaderBase.h b/velox/dwio/dwrf/reader/ReaderBase.h index 2805285a74c8a..a9eb50342d821 100644 --- a/velox/dwio/dwrf/reader/ReaderBase.h +++ b/velox/dwio/dwrf/reader/ReaderBase.h @@ -72,7 +72,8 @@ class ReaderBase { dwio::common::ReaderOptions::kDefaultFilePreloadThreshold, dwio::common::FileFormat fileFormat = dwio::common::FileFormat::DWRF, bool fileColumnNamesReadAsLowerCase = false, - std::shared_ptr randomSkip = nullptr); + std::shared_ptr randomSkip = nullptr, + std::shared_ptr scanSpec = nullptr); ReaderBase( memory::MemoryPool& pool, @@ -131,7 +132,11 @@ class ReaderBase { const std::shared_ptr& getSchemaWithId() const { if (!schemaWithId_) { - schemaWithId_ = dwio::common::TypeWithId::create(schema_); + if (scanSpec_) { + schemaWithId_ = dwio::common::TypeWithId::create(schema_, *scanSpec_); + } else { + schemaWithId_ = dwio::common::TypeWithId::create(schema_); + } } return schemaWithId_; } @@ -257,6 +262,7 @@ class ReaderBase { std::unique_ptr input_; const std::shared_ptr randomSkip_; + const std::shared_ptr scanSpec_; RowTypePtr schema_; // Lazily populated mutable std::shared_ptr schemaWithId_; diff --git a/velox/dwio/dwrf/reader/SelectiveByteRleColumnReader.h b/velox/dwio/dwrf/reader/SelectiveByteRleColumnReader.h index 8684842c6fa27..0d32b412de9a7 100644 --- a/velox/dwio/dwrf/reader/SelectiveByteRleColumnReader.h +++ b/velox/dwio/dwrf/reader/SelectiveByteRleColumnReader.h @@ -26,13 +26,13 @@ class SelectiveByteRleColumnReader using ValueType = int8_t; SelectiveByteRleColumnReader( - const std::shared_ptr& requestedType, + const TypePtr& requestedType, std::shared_ptr fileType, DwrfParams& params, common::ScanSpec& scanSpec, bool isBool) : dwio::common::SelectiveByteRleColumnReader( - requestedType->type(), + requestedType, std::move(fileType), params, scanSpec) { diff --git a/velox/dwio/dwrf/reader/SelectiveDwrfReader.cpp b/velox/dwio/dwrf/reader/SelectiveDwrfReader.cpp index 5276cf8584432..45e5e104e991f 100644 --- a/velox/dwio/dwrf/reader/SelectiveDwrfReader.cpp +++ b/velox/dwio/dwrf/reader/SelectiveDwrfReader.cpp @@ -34,7 +34,7 @@ namespace facebook::velox::dwrf { using namespace facebook::velox::dwio::common; std::unique_ptr buildIntegerReader( - const std::shared_ptr& requestedType, + const TypePtr& requestedType, const std::shared_ptr& fileType, DwrfParams& params, uint32_t numBytes, @@ -57,7 +57,7 @@ std::unique_ptr buildIntegerReader( // static std::unique_ptr SelectiveDwrfReader::build( - const std::shared_ptr& requestedType, + const TypePtr& requestedType, const std::shared_ptr& fileType, DwrfParams& params, common::ScanSpec& scanSpec, @@ -66,7 +66,7 @@ std::unique_ptr SelectiveDwrfReader::build( !isRoot || fileType->type()->kind() == TypeKind::ROW, "The root object can only be a row."); dwio::common::typeutils::checkTypeCompatibility( - *fileType->type(), *requestedType->type()); + *fileType->type(), *requestedType); EncodingKey ek{fileType->id(), params.flatMapContext().sequence}; auto& stripe = params.stripeStreams(); switch (fileType->type()->kind()) { @@ -76,7 +76,7 @@ std::unique_ptr SelectiveDwrfReader::build( case TypeKind::BIGINT: if (fileType->type()->isDecimal()) { return std::make_unique>( - requestedType, params, scanSpec); + fileType, params, scanSpec); } else { return buildIntegerReader( requestedType, fileType, params, LONG_BYTE_SIZE, scanSpec); @@ -96,19 +96,19 @@ std::unique_ptr SelectiveDwrfReader::build( return std::make_unique( requestedType, fileType, params, scanSpec); case TypeKind::REAL: - if (requestedType->type()->kind() == TypeKind::REAL) { + if (requestedType->kind() == TypeKind::REAL) { return std::make_unique< SelectiveFloatingPointColumnReader>( - requestedType->type(), fileType, params, scanSpec); + requestedType, fileType, params, scanSpec); } else { return std::make_unique< SelectiveFloatingPointColumnReader>( - requestedType->type(), fileType, params, scanSpec); + requestedType, fileType, params, scanSpec); } case TypeKind::DOUBLE: return std::make_unique< SelectiveFloatingPointColumnReader>( - requestedType->type(), fileType, params, scanSpec); + requestedType, fileType, params, scanSpec); case TypeKind::ROW: return std::make_unique( requestedType, fileType, params, scanSpec, isRoot); @@ -138,7 +138,7 @@ std::unique_ptr SelectiveDwrfReader::build( case TypeKind::HUGEINT: if (fileType->type()->isDecimal()) { return std::make_unique>( - requestedType, params, scanSpec); + fileType, params, scanSpec); } [[fallthrough]]; default: diff --git a/velox/dwio/dwrf/reader/SelectiveDwrfReader.h b/velox/dwio/dwrf/reader/SelectiveDwrfReader.h index 555edc59b1284..e5e660ea05337 100644 --- a/velox/dwio/dwrf/reader/SelectiveDwrfReader.h +++ b/velox/dwio/dwrf/reader/SelectiveDwrfReader.h @@ -26,7 +26,7 @@ namespace facebook::velox::dwrf { class SelectiveDwrfReader { public: static std::unique_ptr build( - const std::shared_ptr& requestedType, + const TypePtr& requestedType, const std::shared_ptr& fileType, DwrfParams& params, common::ScanSpec& scanSpec, @@ -35,7 +35,7 @@ class SelectiveDwrfReader { // Compatibility wrapper for tests. Takes the components of DwrfParams as // separate. static std::unique_ptr build( - const std::shared_ptr& requestedType, + const TypePtr& requestedType, const std::shared_ptr& fileType, StripeStreams& stripe, const StreamLabels& streamLabels, @@ -55,7 +55,7 @@ class SelectiveColumnReaderFactory : public ColumnReaderFactory { : scanSpec_(scanSpec) {} std::unique_ptr buildSelective( - const std::shared_ptr& requestedType, + const TypePtr& requestedType, const std::shared_ptr& fileType, StripeStreams& stripe, const StreamLabels& streamLabels, diff --git a/velox/dwio/dwrf/reader/SelectiveFlatMapColumnReader.cpp b/velox/dwio/dwrf/reader/SelectiveFlatMapColumnReader.cpp index f565ffa70f545..5e4a01757729e 100644 --- a/velox/dwio/dwrf/reader/SelectiveFlatMapColumnReader.cpp +++ b/velox/dwio/dwrf/reader/SelectiveFlatMapColumnReader.cpp @@ -45,15 +45,6 @@ std::string toString(const T& x) { } } -template -dwio::common::flatmap::KeyPredicate prepareKeyPredicate( - const std::shared_ptr& requestedType, - StripeStreams& stripe) { - auto& cs = stripe.getColumnSelector(); - const auto expr = cs.getNode(requestedType->id())->getNode().expression; - return dwio::common::flatmap::prepareKeyPredicate(expr); -} - // Represent a branch of a value node in a flat map. Represent a keyed value // node. template @@ -76,7 +67,7 @@ struct KeyNode { template std::vector> getKeyNodes( - const std::shared_ptr& requestedType, + const TypePtr& requestedType, const std::shared_ptr& fileType, DwrfParams& params, common::ScanSpec& scanSpec, @@ -89,7 +80,6 @@ std::vector> getKeyNodes( auto& requestedValueType = requestedType->childAt(1); auto& dataValueType = fileType->childAt(1); auto& stripe = params.stripeStreams(); - auto keyPredicate = prepareKeyPredicate(requestedType, stripe); common::ScanSpec* keysSpec = nullptr; common::ScanSpec* valuesSpec = nullptr; @@ -130,10 +120,6 @@ std::vector> getKeyNodes( EncodingKey seqEk(dataValueType->id(), sequence); const auto& keyInfo = stripe.getEncoding(seqEk).key(); auto key = extractKey(keyInfo); - // Check if we have key filter passed through read schema. - if (!keyPredicate(key)) { - return; - } common::ScanSpec* childSpec; if (auto it = childSpecs.find(key); it != childSpecs.end() && !it->second->isConstant()) { @@ -183,7 +169,7 @@ template class SelectiveFlatMapAsStructReader : public SelectiveStructColumnReaderBase { public: SelectiveFlatMapAsStructReader( - const std::shared_ptr& requestedType, + const TypePtr& requestedType, const std::shared_ptr& fileType, DwrfParams& params, common::ScanSpec& scanSpec) @@ -212,7 +198,7 @@ template class SelectiveFlatMapReader : public SelectiveStructColumnReaderBase { public: SelectiveFlatMapReader( - const std::shared_ptr& requestedType, + const TypePtr& requestedType, const std::shared_ptr& fileType, DwrfParams& params, common::ScanSpec& scanSpec) @@ -241,7 +227,7 @@ class SelectiveFlatMapReader : public SelectiveStructColumnReaderBase { template std::unique_ptr createReader( - const std::shared_ptr& requestedType, + const TypePtr& requestedType, const std::shared_ptr& fileType, DwrfParams& params, common::ScanSpec& scanSpec) { @@ -258,7 +244,7 @@ std::unique_ptr createReader( std::unique_ptr createSelectiveFlatMapColumnReader( - const std::shared_ptr& requestedType, + const TypePtr& requestedType, const std::shared_ptr& fileType, DwrfParams& params, common::ScanSpec& scanSpec) { diff --git a/velox/dwio/dwrf/reader/SelectiveFlatMapColumnReader.h b/velox/dwio/dwrf/reader/SelectiveFlatMapColumnReader.h index 292de61c18f49..9a3c3898927da 100644 --- a/velox/dwio/dwrf/reader/SelectiveFlatMapColumnReader.h +++ b/velox/dwio/dwrf/reader/SelectiveFlatMapColumnReader.h @@ -23,7 +23,7 @@ namespace facebook::velox::dwrf { std::unique_ptr createSelectiveFlatMapColumnReader( - const std::shared_ptr& requestedType, + const TypePtr& requestedType, const std::shared_ptr& fileType, DwrfParams&, common::ScanSpec&); diff --git a/velox/dwio/dwrf/reader/SelectiveIntegerDictionaryColumnReader.cpp b/velox/dwio/dwrf/reader/SelectiveIntegerDictionaryColumnReader.cpp index d8dec4a9e0b5f..be245ce410fc2 100644 --- a/velox/dwio/dwrf/reader/SelectiveIntegerDictionaryColumnReader.cpp +++ b/velox/dwio/dwrf/reader/SelectiveIntegerDictionaryColumnReader.cpp @@ -22,13 +22,13 @@ namespace facebook::velox::dwrf { using namespace dwio::common; SelectiveIntegerDictionaryColumnReader::SelectiveIntegerDictionaryColumnReader( - const std::shared_ptr& requestedType, + const TypePtr& requestedType, std::shared_ptr fileType, DwrfParams& params, common::ScanSpec& scanSpec, uint32_t numBytes) : SelectiveIntegerColumnReader( - requestedType->type(), + requestedType, params, scanSpec, std::move(fileType)) { diff --git a/velox/dwio/dwrf/reader/SelectiveIntegerDictionaryColumnReader.h b/velox/dwio/dwrf/reader/SelectiveIntegerDictionaryColumnReader.h index 5ed11f190f1db..e87a73a6dffc2 100644 --- a/velox/dwio/dwrf/reader/SelectiveIntegerDictionaryColumnReader.h +++ b/velox/dwio/dwrf/reader/SelectiveIntegerDictionaryColumnReader.h @@ -28,7 +28,7 @@ class SelectiveIntegerDictionaryColumnReader using ValueType = int64_t; SelectiveIntegerDictionaryColumnReader( - const std::shared_ptr& requestedType, + const TypePtr& requestedType, std::shared_ptr fileType, DwrfParams& params, common::ScanSpec& scanSpec, diff --git a/velox/dwio/dwrf/reader/SelectiveIntegerDirectColumnReader.h b/velox/dwio/dwrf/reader/SelectiveIntegerDirectColumnReader.h index 92b3aa7503860..a123ae2e86529 100644 --- a/velox/dwio/dwrf/reader/SelectiveIntegerDirectColumnReader.h +++ b/velox/dwio/dwrf/reader/SelectiveIntegerDirectColumnReader.h @@ -28,13 +28,13 @@ class SelectiveIntegerDirectColumnReader using ValueType = int64_t; SelectiveIntegerDirectColumnReader( - const std::shared_ptr& requestedType, + const TypePtr& requestedType, std::shared_ptr fileType, DwrfParams& params, uint32_t numBytes, common::ScanSpec& scanSpec) : SelectiveIntegerColumnReader( - requestedType->type(), + requestedType, params, scanSpec, std::move(fileType)) { diff --git a/velox/dwio/dwrf/reader/SelectiveRepeatedColumnReader.cpp b/velox/dwio/dwrf/reader/SelectiveRepeatedColumnReader.cpp index 87fe5ee420549..f9c0dcb8cbc5b 100644 --- a/velox/dwio/dwrf/reader/SelectiveRepeatedColumnReader.cpp +++ b/velox/dwio/dwrf/reader/SelectiveRepeatedColumnReader.cpp @@ -45,7 +45,7 @@ FlatMapContext flatMapContextFromEncodingKey(EncodingKey& encodingKey) { } SelectiveListColumnReader::SelectiveListColumnReader( - const std::shared_ptr& requestedType, + const TypePtr& requestedType, const std::shared_ptr& fileType, DwrfParams& params, common::ScanSpec& scanSpec) @@ -59,11 +59,7 @@ SelectiveListColumnReader::SelectiveListColumnReader( EncodingKey encodingKey{fileType_->id(), params.flatMapContext().sequence}; auto& stripe = params.stripeStreams(); // count the number of selected sub-columns - const auto& cs = stripe.getColumnSelector(); auto& childType = requestedType_->childAt(0); - VELOX_CHECK( - cs.shouldReadNode(childType->id()), - "SelectiveListColumnReader must select the values stream"); if (scanSpec_->children().empty()) { scanSpec.getOrCreateChild( common::Subfield(common::ScanSpec::kArrayElementsFieldName)); @@ -82,7 +78,7 @@ SelectiveListColumnReader::SelectiveListColumnReader( } SelectiveMapColumnReader::SelectiveMapColumnReader( - const std::shared_ptr& requestedType, + const TypePtr& requestedType, const std::shared_ptr& fileType, DwrfParams& params, common::ScanSpec& scanSpec) @@ -106,11 +102,7 @@ SelectiveMapColumnReader::SelectiveMapColumnReader( scanSpec_->children()[1]->setProjectOut(true); scanSpec_->children()[1]->setExtractValues(true); - const auto& cs = stripe.getColumnSelector(); auto& keyType = requestedType_->childAt(0); - VELOX_CHECK( - cs.shouldReadNode(keyType->id()), - "Map key must be selected in SelectiveMapColumnReader"); auto keyParams = DwrfParams( stripe, params.streamLabels(), @@ -123,9 +115,6 @@ SelectiveMapColumnReader::SelectiveMapColumnReader( *scanSpec_->children()[0].get()); auto& valueType = requestedType_->childAt(1); - VELOX_CHECK( - cs.shouldReadNode(valueType->id()), - "Map Values must be selected in SelectiveMapColumnReader"); auto elementParams = DwrfParams( stripe, params.streamLabels(), diff --git a/velox/dwio/dwrf/reader/SelectiveRepeatedColumnReader.h b/velox/dwio/dwrf/reader/SelectiveRepeatedColumnReader.h index 9456917e43dd4..7f31c425ce4b2 100644 --- a/velox/dwio/dwrf/reader/SelectiveRepeatedColumnReader.h +++ b/velox/dwio/dwrf/reader/SelectiveRepeatedColumnReader.h @@ -28,7 +28,7 @@ class SelectiveListColumnReader : public dwio::common::SelectiveListColumnReader { public: SelectiveListColumnReader( - const std::shared_ptr& requestedType, + const TypePtr& requestedType, const std::shared_ptr& fileType, DwrfParams& params, common::ScanSpec& scanSpec); @@ -61,7 +61,7 @@ class SelectiveListColumnReader class SelectiveMapColumnReader : public dwio::common::SelectiveMapColumnReader { public: SelectiveMapColumnReader( - const std::shared_ptr& requestedType, + const TypePtr& requestedType, const std::shared_ptr& fileType, DwrfParams& params, common::ScanSpec& scanSpec); diff --git a/velox/dwio/dwrf/reader/SelectiveStructColumnReader.cpp b/velox/dwio/dwrf/reader/SelectiveStructColumnReader.cpp index 341359b2e4f5b..992f6f5b8ac77 100644 --- a/velox/dwio/dwrf/reader/SelectiveStructColumnReader.cpp +++ b/velox/dwio/dwrf/reader/SelectiveStructColumnReader.cpp @@ -24,7 +24,7 @@ namespace facebook::velox::dwrf { using namespace dwio::common; SelectiveStructColumnReader::SelectiveStructColumnReader( - const std::shared_ptr& requestedType, + const TypePtr& requestedType, const std::shared_ptr& fileType, DwrfParams& params, common::ScanSpec& scanSpec, @@ -43,11 +43,11 @@ SelectiveStructColumnReader::SelectiveStructColumnReader( proto::ColumnEncoding_Kind_DIRECT, "Unknown encoding for StructColumnReader"); - const auto& cs = stripe.getColumnSelector(); // A reader tree may be constructed while the ScanSpec is being used // for another read. This happens when the next stripe is being // prepared while the previous one is reading. auto& childSpecs = scanSpec.stableChildren(); + auto& rowType = requestedType_->asRow(); for (auto i = 0; i < childSpecs.size(); ++i) { auto childSpec = childSpecs[i]; if (isChildConstant(*childSpec)) { @@ -55,8 +55,7 @@ SelectiveStructColumnReader::SelectiveStructColumnReader( continue; } auto childFileType = fileType_->childByName(childSpec->fieldName()); - auto childRequestedType = - requestedType_->childByName(childSpec->fieldName()); + auto childRequestedType = rowType.findChild(childSpec->fieldName()); auto labels = params.streamLabels().append(folly::to(i)); auto childParams = DwrfParams( stripe, @@ -66,7 +65,6 @@ SelectiveStructColumnReader::SelectiveStructColumnReader( .sequence = encodingKey.sequence(), .inMapDecoder = nullptr, .keySelectionCallback = nullptr}); - VELOX_CHECK(cs.shouldReadNode(childRequestedType->id())); addChild(SelectiveDwrfReader::build( childRequestedType, childFileType, childParams, *childSpec)); childSpec->setSubscript(children_.size() - 1); diff --git a/velox/dwio/dwrf/reader/SelectiveStructColumnReader.h b/velox/dwio/dwrf/reader/SelectiveStructColumnReader.h index e3225b406bc22..08d146cd86b28 100644 --- a/velox/dwio/dwrf/reader/SelectiveStructColumnReader.h +++ b/velox/dwio/dwrf/reader/SelectiveStructColumnReader.h @@ -25,7 +25,7 @@ class SelectiveStructColumnReaderBase : public dwio::common::SelectiveStructColumnReaderBase { public: SelectiveStructColumnReaderBase( - const std::shared_ptr& requestedType, + const TypePtr& requestedType, const std::shared_ptr& fileType, DwrfParams& params, common::ScanSpec& scanSpec, @@ -80,7 +80,7 @@ class SelectiveStructColumnReaderBase struct SelectiveStructColumnReader : SelectiveStructColumnReaderBase { SelectiveStructColumnReader( - const std::shared_ptr& requestedType, + const TypePtr& requestedType, const std::shared_ptr& fileType, DwrfParams& params, common::ScanSpec& scanSpec, diff --git a/velox/dwio/dwrf/reader/StripeReaderBase.cpp b/velox/dwio/dwrf/reader/StripeReaderBase.cpp index cfb7b2796b5ab..fda09f0450fc1 100644 --- a/velox/dwio/dwrf/reader/StripeReaderBase.cpp +++ b/velox/dwio/dwrf/reader/StripeReaderBase.cpp @@ -76,8 +76,14 @@ std::unique_ptr StripeReaderBase::fetchStripe( auto streamDebugInfo = fmt::format("Stripe {} Footer ", index); - auto stripeFooter = ProtoUtils::readProto( - reader_->createDecompressedStream(std::move(stream), streamDebugInfo)); + auto arena = std::make_shared(); + auto* rawFooter = + google::protobuf::Arena::CreateMessage(arena.get()); + ProtoUtils::readProtoInto( + reader_->createDecompressedStream(std::move(stream), streamDebugInfo), + rawFooter); + std::shared_ptr stripeFooter( + rawFooter, [arena = std::move(arena)](auto*) { arena->Reset(); }); auto handler = std::make_unique( reader_->getDecryptionHandler()); diff --git a/velox/dwio/dwrf/reader/StripeStream.cpp b/velox/dwio/dwrf/reader/StripeStream.cpp index 93d361af8a90a..2e99649b7476a 100644 --- a/velox/dwio/dwrf/reader/StripeStream.cpp +++ b/velox/dwio/dwrf/reader/StripeStream.cpp @@ -17,7 +17,6 @@ #include #include -#include "velox/common/base/BitSet.h" #include "velox/dwio/common/exception/Exception.h" #include "velox/dwio/dwrf/common/DecoderUtil.h" #include "velox/dwio/dwrf/common/wrap/coded-stream-wrapper.h" @@ -33,9 +32,9 @@ namespace { template void findProjectedNodes( BitSet& projectedNodes, - const TypeWithId& expected, - const TypeWithId& actual, - IsProjected isProjected) { + const dwio::common::TypeWithId& expected, + const dwio::common::TypeWithId& actual, + IsProjected&& isProjected) { // we don't need to perform schema compatibility check since reader should // have already done that before reaching here. // if a leaf node is projected, all the intermediate node from root to the @@ -53,7 +52,7 @@ void findProjectedNodes( projectedNodes, *expected.childAt(i), *actual.childAt(i), - isProjected); + std::forward(isProjected)); } break; } @@ -62,19 +61,19 @@ void findProjectedNodes( projectedNodes, *expected.childAt(0), *actual.childAt(0), - isProjected); + std::forward(isProjected)); break; case TypeKind::MAP: { findProjectedNodes( projectedNodes, *expected.childAt(0), *actual.childAt(0), - isProjected); + std::forward(isProjected)); findProjectedNodes( projectedNodes, *expected.childAt(1), *actual.childAt(1), - isProjected); + std::forward(isProjected)); break; } default: @@ -148,24 +147,29 @@ StripeStreamsBase::getIntDictionaryInitializerForNode( void StripeStreamsImpl::loadStreams() { auto& stripeFooter = *readState_->stripeMetadata->footer; - // HACK!!! - // Column selector filters based on requested schema (ie, table schema), while - // we need filter based on file schema. As a result we cannot call - // shouldReadNode directly. Instead, build projected nodes set based on node - // id from file schema. Column selector should really be fixed to handle file - // schema properly - BitSet projectedNodes(0); - auto expected = selector_.getSchemaWithId(); - auto actual = readState_->readerBase->getSchemaWithId(); - findProjectedNodes(projectedNodes, *expected, *actual, [&](uint32_t node) { - return selector_.shouldReadNode(node); - }); + if (selector_) { + // HACK!!! + // + // Column selector filters based on requested schema (ie, table schema), + // while we need filter based on file schema. As a result we cannot call + // shouldReadNode directly. Instead, build projected nodes set based on node + // id from file schema. Column selector should really be fixed to handle + // file schema properly. + VELOX_CHECK_NULL(projectedNodes_); + projectedNodes_ = std::make_shared(0); + auto expected = selector_->getSchemaWithId(); + auto actual = readState_->readerBase->getSchemaWithId(); + findProjectedNodes( + *projectedNodes_, *expected, *actual, [&](uint32_t node) { + return selector_->shouldReadNode(node); + }); + } auto addStream = [&](auto& stream, auto& offset) { if (stream.has_offset()) { offset = stream.offset(); } - if (projectedNodes.contains(stream.node())) { + if (projectedNodes_->contains(stream.node())) { streams_[stream] = {offset, stream}; } offset += stream.length(); @@ -180,7 +184,7 @@ void StripeStreamsImpl::loadStreams() { for (uint32_t i = 0; i < stripeFooter.encoding_size(); ++i) { auto& e = stripeFooter.encoding(i); auto node = e.has_node() ? e.node() : i; - if (projectedNodes.contains(node)) { + if (projectedNodes_->contains(node)) { encodings_[{node, e.has_sequence() ? e.sequence() : 0}] = i; } } @@ -193,7 +197,10 @@ void StripeStreamsImpl::loadStreams() { stripeFooter.encryptiongroups_size()); folly::F14FastSet groupIndices; bits::forEachSetBit( - projectedNodes.bits(), 0, projectedNodes.max() + 1, [&](uint32_t node) { + projectedNodes_->bits(), + 0, + projectedNodes_->max() + 1, + [&](uint32_t node) { if (handler.isEncrypted(node)) { groupIndices.insert(handler.getEncryptionGroupIndex(node)); } @@ -214,7 +221,7 @@ void StripeStreamsImpl::loadStreams() { for (auto& encoding : groupProto->encoding()) { DWIO_ENSURE(encoding.has_node(), "node is required"); auto node = encoding.node(); - if (projectedNodes.contains(node)) { + if (projectedNodes_->contains(node)) { decryptedEncodings_[{ node, encoding.has_sequence() ? encoding.sequence() : 0}] = encoding; diff --git a/velox/dwio/dwrf/reader/StripeStream.h b/velox/dwio/dwrf/reader/StripeStream.h index b2c0a78663c38..60cdacd347b75 100644 --- a/velox/dwio/dwrf/reader/StripeStream.h +++ b/velox/dwio/dwrf/reader/StripeStream.h @@ -16,6 +16,7 @@ #pragma once +#include "velox/common/base/BitSet.h" #include "velox/dwio/common/ColumnSelector.h" #include "velox/dwio/common/Options.h" #include "velox/dwio/common/SeekableInputStream.h" @@ -219,8 +220,11 @@ struct StripeReadState { class StripeStreamsImpl : public StripeStreamsBase { private: std::shared_ptr readState_; - const dwio::common::ColumnSelector& selector_; + const dwio::common::ColumnSelector* selector_; const dwio::common::RowReaderOptions& opts_; + // When selector_ is null, this needs to be passed in constructor; otherwise + // leave it as null and it will be populated from selector_. + std::shared_ptr projectedNodes_; const uint64_t stripeStart_; const int64_t stripeNumberOfRows_; const StrideIndexProvider& provider_; @@ -244,7 +248,8 @@ class StripeStreamsImpl : public StripeStreamsBase { StripeStreamsImpl( std::shared_ptr readState, - const dwio::common::ColumnSelector& selector, + const dwio::common::ColumnSelector* selector, + std::shared_ptr projectedNodes, const dwio::common::RowReaderOptions& opts, uint64_t stripeStart, int64_t stripeNumberOfRows, @@ -254,6 +259,7 @@ class StripeStreamsImpl : public StripeStreamsBase { readState_(std::move(readState)), selector_{selector}, opts_{opts}, + projectedNodes_{std::move(projectedNodes)}, stripeStart_{stripeStart}, stripeNumberOfRows_{stripeNumberOfRows}, provider_(provider), @@ -269,7 +275,7 @@ class StripeStreamsImpl : public StripeStreamsBase { } const dwio::common::ColumnSelector& getColumnSelector() const override { - return selector_; + return *selector_; } const dwio::common::RowReaderOptions& getRowReaderOptions() const override { diff --git a/velox/dwio/dwrf/test/ColumnWriterStatsTests.cpp b/velox/dwio/dwrf/test/ColumnWriterStatsTests.cpp index fd770677b35fe..57140266a4dc4 100644 --- a/velox/dwio/dwrf/test/ColumnWriterStatsTests.cpp +++ b/velox/dwio/dwrf/test/ColumnWriterStatsTests.cpp @@ -104,7 +104,8 @@ void verifyStats( StripeStreamsImpl streams{ std::make_shared( rowReader.readerBaseShared(), std::move(stripeMetadata)), - rowReader.getColumnSelector(), + &rowReader.getColumnSelector(), + nullptr, rowReader.getRowReaderOptions(), stripeInfo.offset(), static_cast(stripeInfo.numberOfRows()), diff --git a/velox/dwio/dwrf/test/E2EWriterTest.cpp b/velox/dwio/dwrf/test/E2EWriterTest.cpp index 22d27f0f6d77e..72abdb996b1e3 100644 --- a/velox/dwio/dwrf/test/E2EWriterTest.cpp +++ b/velox/dwio/dwrf/test/E2EWriterTest.cpp @@ -186,7 +186,8 @@ class E2EWriterTest : public testing::Test { dwrf::StripeStreamsImpl stripeStreams( std::make_shared( dwrfRowReader->readerBaseShared(), std::move(stripeMetadata)), - dwrfRowReader->getColumnSelector(), + &dwrfRowReader->getColumnSelector(), + nullptr, rowReaderOpts, currentStripeInfo.offset(), currentStripeInfo.numberOfRows(), diff --git a/velox/dwio/dwrf/test/TestColumnReader.cpp b/velox/dwio/dwrf/test/TestColumnReader.cpp index de11d3b4ebfaa..ef055a29f2fbd 100644 --- a/velox/dwio/dwrf/test/TestColumnReader.cpp +++ b/velox/dwio/dwrf/test/TestColumnReader.cpp @@ -145,7 +145,7 @@ class ColumnReaderTestBase { } makeFieldSpecs("", 0, rowType, scanSpec); selectiveColumnReader_ = SelectiveDwrfReader::build( - cs.getSchemaWithId(), + cs.getSchema(), fileTypeWithId, streams_, labels_, diff --git a/velox/dwio/dwrf/test/TestStripeStream.cpp b/velox/dwio/dwrf/test/TestStripeStream.cpp index 766b6df0d6980..e3181816702fa 100644 --- a/velox/dwio/dwrf/test/TestStripeStream.cpp +++ b/velox/dwio/dwrf/test/TestStripeStream.cpp @@ -91,7 +91,8 @@ StripeStreamsImpl createAndLoadStripeStreams( TestProvider indexProvider; StripeStreamsImpl streams{ readState, - selector, + &selector, + nullptr, RowReaderOptions{}, 0, StripeStreamsImpl::kUnknownStripeRows, @@ -287,7 +288,8 @@ TEST_F(StripeStreamTest, zeroLength) { ColumnSelector cs{std::dynamic_pointer_cast(type)}; StripeStreamsImpl streams{ stripeReadState, - cs, + &cs, + nullptr, RowReaderOptions{}, 0, StripeStreamsImpl::kUnknownStripeRows, @@ -496,7 +498,8 @@ TEST_F(StripeStreamTest, readEncryptedStreams) { TestProvider provider; StripeStreamsImpl streams{ stripeReadState, - selector, + &selector, + nullptr, RowReaderOptions{}, 0, StripeStreamsImpl::kUnknownStripeRows, @@ -583,7 +586,8 @@ TEST_F(StripeStreamTest, schemaMismatch) { TestProvider provider; StripeStreamsImpl streams{ stripeReadState, - selector, + &selector, + nullptr, RowReaderOptions{}, 0, StripeStreamsImpl::kUnknownStripeRows, diff --git a/velox/dwio/parquet/reader/BooleanColumnReader.h b/velox/dwio/parquet/reader/BooleanColumnReader.h index 73126f4679888..465ec5adb6f91 100644 --- a/velox/dwio/parquet/reader/BooleanColumnReader.h +++ b/velox/dwio/parquet/reader/BooleanColumnReader.h @@ -25,12 +25,12 @@ class BooleanColumnReader : public dwio::common::SelectiveByteRleColumnReader { public: using ValueType = bool; BooleanColumnReader( - const std::shared_ptr& requestedType, + const TypePtr& requestedType, std::shared_ptr fileType, ParquetParams& params, common::ScanSpec& scanSpec) : SelectiveByteRleColumnReader( - requestedType->type(), + requestedType, std::move(fileType), params, scanSpec) {} diff --git a/velox/dwio/parquet/reader/IntegerColumnReader.h b/velox/dwio/parquet/reader/IntegerColumnReader.h index d7b458c739534..5c0689902a0a3 100644 --- a/velox/dwio/parquet/reader/IntegerColumnReader.h +++ b/velox/dwio/parquet/reader/IntegerColumnReader.h @@ -23,12 +23,12 @@ namespace facebook::velox::parquet { class IntegerColumnReader : public dwio::common::SelectiveIntegerColumnReader { public: IntegerColumnReader( - const std::shared_ptr& requestedType, + const TypePtr& requestedType, std::shared_ptr fileType, ParquetParams& params, common::ScanSpec& scanSpec) : SelectiveIntegerColumnReader( - requestedType->type(), + requestedType, params, scanSpec, std::move(fileType)) {} diff --git a/velox/dwio/parquet/reader/ParquetColumnReader.cpp b/velox/dwio/parquet/reader/ParquetColumnReader.cpp index c3816c0e960a7..f87631235601d 100644 --- a/velox/dwio/parquet/reader/ParquetColumnReader.cpp +++ b/velox/dwio/parquet/reader/ParquetColumnReader.cpp @@ -32,7 +32,7 @@ namespace facebook::velox::parquet { // static std::unique_ptr ParquetColumnReader::build( - const std::shared_ptr& requestedType, + const TypePtr& requestedType, const std::shared_ptr& fileType, ParquetParams& params, common::ScanSpec& scanSpec) { @@ -49,10 +49,10 @@ std::unique_ptr ParquetColumnReader::build( case TypeKind::REAL: return std::make_unique>( - requestedType->type(), fileType, params, scanSpec); + requestedType, fileType, params, scanSpec); case TypeKind::DOUBLE: return std::make_unique>( - requestedType->type(), fileType, params, scanSpec); + requestedType, fileType, params, scanSpec); case TypeKind::ROW: return std::make_unique( diff --git a/velox/dwio/parquet/reader/ParquetColumnReader.h b/velox/dwio/parquet/reader/ParquetColumnReader.h index 516a500cd22cb..8ff0860294724 100644 --- a/velox/dwio/parquet/reader/ParquetColumnReader.h +++ b/velox/dwio/parquet/reader/ParquetColumnReader.h @@ -42,7 +42,7 @@ class ParquetColumnReader { /// Builds a reader tree producing 'fileType'. The metadata is in 'params'. /// The filters and pruning are in 'scanSpec'. static std::unique_ptr build( - const std::shared_ptr& requestedType, + const TypePtr& requestedType, const std::shared_ptr& fileType, ParquetParams& params, common::ScanSpec& scanSpec); diff --git a/velox/dwio/parquet/reader/ParquetReader.cpp b/velox/dwio/parquet/reader/ParquetReader.cpp index daeba812f99e8..647b4e65a15c8 100644 --- a/velox/dwio/parquet/reader/ParquetReader.cpp +++ b/velox/dwio/parquet/reader/ParquetReader.cpp @@ -25,8 +25,6 @@ namespace facebook::velox::parquet { -using dwio::common::ColumnSelector; - /// Metadata and options for reading Parquet. class ReaderBase { public: @@ -751,7 +749,7 @@ class ParquetRowReader::Impl { "Input Table Schema (with partition columns): {}\n", readerBase_->bufferedInput().getReadFile()->getName(), readerBase_->schema()->toString(), - requestedType_->type()->toString()); + requestedType_->toString()); return exceptionMessageContext; }; @@ -760,9 +758,8 @@ class ParquetRowReader::Impl { } ParquetParams params( pool_, columnReaderStats_, readerBase_->fileMetaData()); - auto columnSelector = std::make_shared( - ColumnSelector::apply(options_.getSelector(), readerBase_->schema())); - requestedType_ = columnSelector->getSchemaWithId(); + requestedType_ = options_.requestedType() ? options_.requestedType() + : readerBase_->schema(); columnReader_ = ParquetColumnReader::build( requestedType_, readerBase_->schemaWithId(), // Id is schema id @@ -913,7 +910,7 @@ class ParquetRowReader::Impl { std::unique_ptr columnReader_; - std::shared_ptr requestedType_; + TypePtr requestedType_; dwio::common::ColumnReaderStatistics columnReaderStats_; }; diff --git a/velox/dwio/parquet/reader/RepeatedColumnReader.cpp b/velox/dwio/parquet/reader/RepeatedColumnReader.cpp index 9f8771d3225f5..7b33154da7f5c 100644 --- a/velox/dwio/parquet/reader/RepeatedColumnReader.cpp +++ b/velox/dwio/parquet/reader/RepeatedColumnReader.cpp @@ -110,7 +110,7 @@ void ensureRepDefs( } MapColumnReader::MapColumnReader( - const std::shared_ptr& requestedType, + const TypePtr& requestedType, const std::shared_ptr& fileType, ParquetParams& params, common::ScanSpec& scanSpec) @@ -220,7 +220,7 @@ void MapColumnReader::filterRowGroups( } ListColumnReader::ListColumnReader( - const std::shared_ptr& requestedType, + const TypePtr& requestedType, const std::shared_ptr& fileType, ParquetParams& params, common::ScanSpec& scanSpec) diff --git a/velox/dwio/parquet/reader/RepeatedColumnReader.h b/velox/dwio/parquet/reader/RepeatedColumnReader.h index 538822b148e3c..317b374b79eef 100644 --- a/velox/dwio/parquet/reader/RepeatedColumnReader.h +++ b/velox/dwio/parquet/reader/RepeatedColumnReader.h @@ -56,7 +56,7 @@ class RepeatedLengths { class MapColumnReader : public dwio::common::SelectiveMapColumnReader { public: MapColumnReader( - const std::shared_ptr& requestedType, + const TypePtr& requestedType, const std::shared_ptr& fileType, ParquetParams& params, common::ScanSpec& scanSpec); @@ -112,7 +112,7 @@ class MapColumnReader : public dwio::common::SelectiveMapColumnReader { class ListColumnReader : public dwio::common::SelectiveListColumnReader { public: ListColumnReader( - const std::shared_ptr& requestedType, + const TypePtr& requestedType, const std::shared_ptr& fileType, ParquetParams& params, common::ScanSpec& scanSpec); diff --git a/velox/dwio/parquet/reader/StructColumnReader.cpp b/velox/dwio/parquet/reader/StructColumnReader.cpp index 92683783bd8ff..cbff1d0716dcf 100644 --- a/velox/dwio/parquet/reader/StructColumnReader.cpp +++ b/velox/dwio/parquet/reader/StructColumnReader.cpp @@ -27,7 +27,7 @@ class ScanSpec; namespace facebook::velox::parquet { StructColumnReader::StructColumnReader( - const std::shared_ptr& requestedType, + const TypePtr& requestedType, const std::shared_ptr& fileType, ParquetParams& params, common::ScanSpec& scanSpec) @@ -40,7 +40,7 @@ StructColumnReader::StructColumnReader( } auto childFileType = fileType_->childByName(childSpec->fieldName()); auto childRequestedType = - requestedType_->childByName(childSpec->fieldName()); + requestedType_->asRow().findChild(childSpec->fieldName()); addChild(ParquetColumnReader::build( childRequestedType, childFileType, params, *childSpec)); diff --git a/velox/dwio/parquet/reader/StructColumnReader.h b/velox/dwio/parquet/reader/StructColumnReader.h index 8a6833a698ecc..198d2cac0f5fc 100644 --- a/velox/dwio/parquet/reader/StructColumnReader.h +++ b/velox/dwio/parquet/reader/StructColumnReader.h @@ -32,7 +32,7 @@ class ParquetParams; class StructColumnReader : public dwio::common::SelectiveStructColumnReader { public: StructColumnReader( - const std::shared_ptr& requestedType, + const TypePtr& requestedType, const std::shared_ptr& fileType, ParquetParams& params, common::ScanSpec& scanSpec); diff --git a/velox/exec/tests/TableScanTest.cpp b/velox/exec/tests/TableScanTest.cpp index 36a997651d6f2..077ab79c4cde7 100644 --- a/velox/exec/tests/TableScanTest.cpp +++ b/velox/exec/tests/TableScanTest.cpp @@ -1672,7 +1672,7 @@ TEST_F(TableScanTest, validFileNoData) { .length(fs::file_size(filePath) / 2) .build(); - auto op = tableScanNode(rowType); + auto op = PlanBuilder().tableScan(rowType, {}, "", rowType).planNode(); assertQuery(op, split, ""); }