Skip to content

Commit

Permalink
Support scan filter for decimal in ORC
Browse files Browse the repository at this point in the history
  • Loading branch information
rui-mo committed Nov 20, 2024
1 parent b106900 commit 3117603
Show file tree
Hide file tree
Showing 5 changed files with 300 additions and 13 deletions.
173 changes: 161 additions & 12 deletions velox/dwio/dwrf/reader/SelectiveDecimalColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,17 @@ void SelectiveDecimalColumnReader<DataT>::seekToRowGroup(int64_t index) {

template <typename DataT>
template <bool kDense>
void SelectiveDecimalColumnReader<DataT>::readHelper(RowSet rows) {
vector_size_t numRows = rows.back() + 1;
void SelectiveDecimalColumnReader<DataT>::readHelper(
common::Filter* filter,
RowSet rows) {
ExtractToReader extractValues(this);
common::AlwaysTrue filter;
common::AlwaysTrue alwaysTrue;
DirectRleColumnVisitor<
int64_t,
common::AlwaysTrue,
decltype(extractValues),
kDense>
visitor(filter, this, rows, extractValues);
visitor(alwaysTrue, this, rows, extractValues);

// decode scale stream
if (version_ == velox::dwrf::RleVersion_1) {
Expand All @@ -96,46 +97,194 @@ void SelectiveDecimalColumnReader<DataT>::readHelper(RowSet rows) {
// reset numValues_ before reading values
numValues_ = 0;
valueSize_ = sizeof(DataT);
vector_size_t numRows = rows.back() + 1;
ensureValuesCapacity<DataT>(numRows);

// decode value stream
facebook::velox::dwio::common::
ColumnVisitor<DataT, common::AlwaysTrue, decltype(extractValues), kDense>
valueVisitor(filter, this, rows, extractValues);
valueVisitor(alwaysTrue, this, rows, extractValues);
decodeWithVisitor<DirectDecoder<true>>(valueDecoder_.get(), valueVisitor);
readOffset_ += numRows;

// Fill decimals before applying filter.
fillDecimals();

const auto rawNulls = nullsInReadRange_
? (kDense ? nullsInReadRange_->as<uint64_t>() : rawResultNulls_)
: nullptr;
// Process filter.
process(filter, rows, rawNulls);
}

template <typename DataT>
void SelectiveDecimalColumnReader<DataT>::processNulls(
bool isNull,
const RowSet& rows,
const uint64_t* rawNulls) {
if (!rawNulls) {
return;
}
returnReaderNulls_ = false;
anyNulls_ = !isNull;
allNull_ = isNull;

auto rawDecimal = values_->asMutable<DataT>();
auto rawScale = scaleBuffer_->asMutable<int64_t>();

vector_size_t idx = 0;
if (isNull) {
for (vector_size_t i = 0; i < numValues_; i++) {
if (bits::isBitNull(rawNulls, i)) {
bits::setNull(rawResultNulls_, idx);
addOutputRow(rows[i]);
idx++;
}
}
} else {
for (vector_size_t i = 0; i < numValues_; i++) {
if (!bits::isBitNull(rawNulls, i)) {
bits::setNull(rawResultNulls_, idx, false);
rawDecimal[idx] = rawDecimal[i];
rawScale[idx] = rawScale[i];
addOutputRow(rows[i]);
idx++;
}
}
}
}

template <typename DataT>
void SelectiveDecimalColumnReader<DataT>::processFilter(
const common::Filter* filter,
const RowSet& rows,
const uint64_t* rawNulls) {
returnReaderNulls_ = false;
anyNulls_ = false;
allNull_ = true;

auto rawDecimal = values_->asMutable<DataT>();

vector_size_t idx = 0;
for (vector_size_t i = 0; i < numValues_; i++) {
if (rawNulls && bits::isBitNull(rawNulls, i)) {
if (filter->testNull()) {
bits::setNull(rawResultNulls_, idx);
addOutputRow(rows[i]);
anyNulls_ = true;
idx++;
}
} else {
bool tested;
if constexpr (std::is_same_v<DataT, int64_t>) {
tested = filter->testInt64(rawDecimal[i]);
} else {
tested = filter->testInt128(rawDecimal[i]);
}

if (tested) {
if (rawNulls) {
bits::setNull(rawResultNulls_, idx, false);
}
rawDecimal[idx] = rawDecimal[i];
addOutputRow(rows[i]);
allNull_ = false;
idx++;
}
}
}
}

template <typename DataT>
void SelectiveDecimalColumnReader<DataT>::process(
const common::Filter* filter,
const RowSet& rows,
const uint64_t* rawNulls) {
// Treat the filter as kAlwaysTrue if any of the following conditions are met:
// 1) No filter found;
// 2) Filter is kIsNotNull but rawNulls == NULL (no elements is null).
auto filterKind =
!filter || (filter->kind() == common::FilterKind::kIsNotNull && !rawNulls)
? common::FilterKind::kAlwaysTrue
: filter->kind();
switch (filterKind) {
case common::FilterKind::kAlwaysTrue:
// Simply add all rows to output.
for (vector_size_t i = 0; i < numValues_; i++) {
addOutputRow(rows[i]);
}
break;
case common::FilterKind::kIsNull:
processNulls(true, rows, rawNulls);
break;
case common::FilterKind::kIsNotNull:
processNulls(false, rows, rawNulls);
break;
case common::FilterKind::kBigintRange:
case common::FilterKind::kBigintValuesUsingHashTable:
case common::FilterKind::kBigintValuesUsingBitmask:
case common::FilterKind::kNegatedBigintRange:
case common::FilterKind::kNegatedBigintValuesUsingHashTable:
case common::FilterKind::kNegatedBigintValuesUsingBitmask:
case common::FilterKind::kBigintMultiRange: {
if constexpr (std::is_same_v<DataT, int64_t>) {
processFilter(filter, rows, rawNulls);
} else {
const auto actualType = CppToType<DataT>::create();
VELOX_NYI(
"Expected type BIGINT, but found file type {}.",
actualType->toString());
}
break;
}
case common::FilterKind::kHugeintValuesUsingHashTable:
case common::FilterKind::kHugeintRange: {
if constexpr (std::is_same_v<DataT, int128_t>) {
processFilter(filter, rows, rawNulls);
} else {
const auto actualType = CppToType<DataT>::create();
VELOX_NYI(
"Expected type HUGEINT, but found file type {}.",
actualType->toString());
}
break;
}
default:
VELOX_NYI("Unsupported filter: {}.", (int)filterKind);
}
}

template <typename DataT>
void SelectiveDecimalColumnReader<DataT>::read(
int64_t offset,
const RowSet& rows,
const uint64_t* incomingNulls) {
VELOX_CHECK(!scanSpec_->filter());
VELOX_CHECK(!scanSpec_->valueHook());
prepareRead<int64_t>(offset, rows, incomingNulls);
bool isDense = rows.back() == rows.size() - 1;
if (isDense) {
readHelper<true>(rows);
readHelper<true>(scanSpec_->filter(), rows);
} else {
readHelper<false>(rows);
readHelper<false>(scanSpec_->filter(), rows);
}
}

template <typename DataT>
void SelectiveDecimalColumnReader<DataT>::getValues(
const RowSet& rows,
VectorPtr* result) {
rawValues_ = values_->asMutable<char>();
getIntValues(rows, requestedType_, result);
}

template <typename DataT>
void SelectiveDecimalColumnReader<DataT>::fillDecimals() {
auto nullsPtr =
resultNulls() ? resultNulls()->template as<uint64_t>() : nullptr;
auto scales = scaleBuffer_->as<int64_t>();
auto values = values_->asMutable<DataT>();

DecimalUtil::fillDecimals<DataT>(
values, nullsPtr, values, scales, numValues_, scale_);

rawValues_ = values_->asMutable<char>();
getIntValues(rows, requestedType_, result);
}

template class SelectiveDecimalColumnReader<int64_t>;
Expand Down
19 changes: 18 additions & 1 deletion velox/dwio/dwrf/reader/SelectiveDecimalColumnReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,24 @@ class SelectiveDecimalColumnReader : public SelectiveColumnReader {

private:
template <bool kDense>
void readHelper(RowSet rows);
void readHelper(common::Filter* filter, RowSet rows);

// Process IsNull and IsNotNull filters.
void processNulls(bool isNull, const RowSet& rows, const uint64_t* rawNulls);

// Process filters on decimal values.
void processFilter(
const common::Filter* filter,
const RowSet& rows,
const uint64_t* rawNulls);

// Dispatch to the respective filter processing based on the filter type.
void process(
const common::Filter* filter,
const RowSet& rows,
const uint64_t* rawNulls);

void fillDecimals();

std::unique_ptr<IntDecoder<true>> valueDecoder_;
std::unique_ptr<IntDecoder<true>> scaleDecoder_;
Expand Down
121 changes: 121 additions & 0 deletions velox/exec/tests/TableScanTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "velox/connectors/hive/HivePartitionFunction.h"
#include "velox/dwio/common/CacheInputStream.h"
#include "velox/dwio/common/tests/utils/DataFiles.h"
#include "velox/dwio/orc/reader/OrcReader.h"
#include "velox/exec/Exchange.h"
#include "velox/exec/OutputBufferManager.h"
#include "velox/exec/PlanNodeStats.h"
Expand All @@ -44,6 +45,7 @@
#include "velox/exec/tests/utils/PlanBuilder.h"
#include "velox/exec/tests/utils/TempDirectoryPath.h"
#include "velox/expression/ExprToSubfieldFilter.h"
#include "velox/functions/lib/IsNull.h"
#include "velox/type/Timestamp.h"
#include "velox/type/Type.h"
#include "velox/type/tests/SubfieldFiltersBuilder.h"
Expand Down Expand Up @@ -74,6 +76,7 @@ class TableScanTest : public virtual HiveConnectorTestBase {
HiveConnectorTestBase::SetUp();
exec::ExchangeSource::factories().clear();
exec::ExchangeSource::registerFactory(createLocalExchangeSource);
orc::registerOrcReaderFactory();
}

static void SetUpTestCase() {
Expand Down Expand Up @@ -1838,6 +1841,124 @@ TEST_F(TableScanTest, validFileNoData) {
assertQuery(op, split, "");
}

TEST_F(TableScanTest, shortDecimalFilter) {
functions::registerIsNotNullFunction("isnotnull");

std::vector<std::optional<int64_t>> values = {
123456789123456789L,
987654321123456L,
std::nullopt,
2000000000000000L,
5000000000000000L,
987654321987654321L,
100000000000000L,
1230000000123456L,
120000000123456L,
std::nullopt};
auto rowVector = makeRowVector({
makeNullableFlatVector<int64_t>(values, DECIMAL(18, 6)),
});
createDuckDbTable({rowVector});

auto filePath = facebook::velox::test::getDataFilePath(
"velox/exec/tests", "data/short_decimal.orc");
auto split = HiveConnectorSplitBuilder(filePath)
.start(0)
.length(fs::file_size(filePath))
.fileFormat(dwio::common::FileFormat::ORC)
.build();

auto rowType = ROW({"d"}, {DECIMAL(18, 6)});

// Is not null.
auto op =
PlanBuilder().tableScan(rowType, {}, "isnotnull(d)", rowType).planNode();
assertQuery(op, split, "SELECT c0 FROM tmp where c0 is not null");

// Is null.
op = PlanBuilder().tableScan(rowType, {}, "is_null(d)", rowType).planNode();
assertQuery(op, split, "SELECT c0 FROM tmp where c0 is null");

// BigintRange.
op =
PlanBuilder()
.tableScan(
rowType,
{},
"d > 2000000000.0::DECIMAL(18, 6) and d < 6000000000.0::DECIMAL(18, 6)",
rowType)
.planNode();
assertQuery(
op,
split,
"SELECT c0 FROM tmp where c0 > 2000000000.0 and c0 < 6000000000.0");

// NegatedBigintRange.
op =
PlanBuilder()
.tableScan(
rowType,
{},
"not(d between 2000000000.0::DECIMAL(18, 6) and 6000000000.0::DECIMAL(18, 6))",
rowType)
.planNode();
assertQuery(
op,
split,
"SELECT c0 FROM tmp where c0 < 2000000000.0 or c0 > 6000000000.0");
}

TEST_F(TableScanTest, longDecimalFilter) {
functions::registerIsNotNullFunction("isnotnull");

std::vector<std::optional<int128_t>> values = {
HugeInt::parse("123456789123456789123456789" + std::string(9, '0')),
HugeInt::parse("987654321123456789" + std::string(9, '0')),
std::nullopt,
HugeInt::parse("2" + std::string(37, '0')),
HugeInt::parse("5" + std::string(37, '0')),
HugeInt::parse("987654321987654321987654321" + std::string(9, '0')),
HugeInt::parse("1" + std::string(26, '0')),
HugeInt::parse("123000000012345678" + std::string(10, '0')),
HugeInt::parse("120000000123456789" + std::string(9, '0')),
HugeInt::parse("9" + std::string(37, '0'))};
auto rowVector = makeRowVector({
makeNullableFlatVector<int128_t>(values, DECIMAL(38, 18)),
});
createDuckDbTable({rowVector});

auto filePath = facebook::velox::test::getDataFilePath(
"velox/exec/tests", "data/long_decimal.orc");
auto split = HiveConnectorSplitBuilder(filePath)
.start(0)
.length(fs::file_size(filePath))
.fileFormat(dwio::common::FileFormat::ORC)
.build();

auto rowType = ROW({"d"}, {DECIMAL(38, 18)});
auto op =
PlanBuilder().tableScan(rowType, {}, "isnotnull(d)", rowType).planNode();
assertQuery(op, split, "SELECT c0 FROM tmp where c0 is not null");

// Is null.
op = PlanBuilder().tableScan(rowType, {}, "is_null(d)", rowType).planNode();
assertQuery(op, split, "SELECT c0 FROM tmp where c0 is null");

// HugeintRange.
op =
PlanBuilder()
.tableScan(
rowType,
{},
"d > 2000000000.0::DECIMAL(38, 18) and d < 6000000000.0::DECIMAL(38, 18)",
rowType)
.planNode();
assertQuery(
op,
split,
"SELECT c0 FROM tmp where c0 > 2000000000.0 and c0 < 6000000000.0");
}

// An invalid (size = 0) file.
TEST_F(TableScanTest, emptyFile) {
auto filePath = TempFilePath::create();
Expand Down
Binary file added velox/exec/tests/data/long_decimal.orc
Binary file not shown.
Binary file added velox/exec/tests/data/short_decimal.orc
Binary file not shown.

0 comments on commit 3117603

Please sign in to comment.