Skip to content

Commit

Permalink
Add ParquetInt96TimestampRange
Browse files Browse the repository at this point in the history
  • Loading branch information
rui-mo committed Nov 6, 2024
1 parent ffab4d8 commit fdfc09b
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 10 deletions.
144 changes: 143 additions & 1 deletion velox/dwio/parquet/reader/TimestampColumnReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,31 @@
#include "velox/dwio/parquet/reader/ParquetColumnReader.h"

namespace facebook::velox::parquet {
namespace {

// Range filter for Parquet Int96 Timestamp.
class ParquetInt96TimestampRange : public common::TimestampRange {
public:
// @param lower Lower end of the range, inclusive.
// @param upper Upper end of the range, inclusive.
// @param nullAllowed Null values are passing the filter if true.
ParquetInt96TimestampRange(
const Timestamp& lower,
const Timestamp& upper,
bool nullAllowed)
: TimestampRange(lower, upper, nullAllowed) {}

// Int96 is read as int128_t value and converted to Timestamp by extracting
// days and nanos.
bool testInt128(int128_t value) const override {
const int32_t days = static_cast<int32_t>(value >> 64);
const uint64_t nanos = value & ((((1ULL << 63) - 1ULL) << 1) + 1);
const auto ts = Timestamp::fromDaysAndNanos(days, nanos);
return ts >= this->lower() && ts <= this->upper();
}
};

} // namespace

class TimestampColumnReader : public IntegerColumnReader {
public:
Expand Down Expand Up @@ -71,14 +96,131 @@ class TimestampColumnReader : public IntegerColumnReader {
}
}

template <
typename Reader,
typename TFilter,
bool isDense,
typename ExtractValues>
void readHelperTimestamp(
velox::common::Filter* filter,
const RowSet& rows,
ExtractValues extractValues) {
if constexpr (std::is_same_v<TFilter, velox::common::TimestampRange>) {
// Convert TimestampRange to ParquetInt96TimestampRange.
auto* range = reinterpret_cast<common::TimestampRange*>(filter);
ParquetInt96TimestampRange newRange = ParquetInt96TimestampRange(
range->lower(), range->upper(), range->nullAllowed());
reinterpret_cast<Reader*>(this)->Reader::readWithVisitor(
rows,
dwio::common::ColumnVisitor<
int128_t,
ParquetInt96TimestampRange,
ExtractValues,
isDense>(newRange, this, rows, extractValues));
} else {
reinterpret_cast<Reader*>(this)->Reader::readWithVisitor(
rows,
dwio::common::
ColumnVisitor<int128_t, TFilter, ExtractValues, isDense>(
*reinterpret_cast<TFilter*>(filter),
this,
rows,
extractValues));
}
return;
}

template <
typename Reader,
bool isDense,
bool kEncodingHasNulls,
typename ExtractValues>
void processTimestampFilter(
velox::common::Filter* filter,
ExtractValues extractValues,
const RowSet& rows) {
if (filter == nullptr) {
readHelperTimestamp<Reader, velox::common::AlwaysTrue, isDense>(
&dwio::common::alwaysTrue(), rows, extractValues);
return;
}

switch (filter->kind()) {
case velox::common::FilterKind::kAlwaysTrue:
readHelperTimestamp<Reader, velox::common::AlwaysTrue, isDense>(
filter, rows, extractValues);
break;
case velox::common::FilterKind::kIsNull:
if constexpr (kEncodingHasNulls) {
filterNulls<int64_t>(
rows,
true,
!std::
is_same_v<decltype(extractValues), dwio::common::DropValues>);
} else {
readHelperTimestamp<Reader, velox::common::IsNull, isDense>(
filter, rows, extractValues);
}
break;
case velox::common::FilterKind::kIsNotNull:
if constexpr (
kEncodingHasNulls &&
std::is_same_v<decltype(extractValues), dwio::common::DropValues>) {
filterNulls<int64_t>(rows, false, false);
} else {
readHelperTimestamp<Reader, velox::common::IsNotNull, isDense>(
filter, rows, extractValues);
}
break;
case velox::common::FilterKind::kTimestampRange:
readHelperTimestamp<Reader, velox::common::TimestampRange, isDense>(
filter, rows, extractValues);
break;
default:
VELOX_UNREACHABLE();
}
}

template <typename Reader, bool kEncodingHasNulls>
void readTimestamp(const RowSet& rows) {
const bool isDense = rows.back() == rows.size() - 1;
velox::common::Filter* filter =
scanSpec_->filter() ? scanSpec_->filter() : &dwio::common::alwaysTrue();
if (scanSpec_->keepValues()) {
if (scanSpec_->valueHook()) {
if (isDense) {
processValueHook<Reader, true>(rows, scanSpec_->valueHook());
} else {
processValueHook<Reader, false>(rows, scanSpec_->valueHook());
}
} else {
if (isDense) {
processTimestampFilter<Reader, true, kEncodingHasNulls>(
filter, dwio::common::ExtractToReader(this), rows);
} else {
processTimestampFilter<Reader, false, kEncodingHasNulls>(
filter, dwio::common::ExtractToReader(this), rows);
}
}
} else {
if (isDense) {
processTimestampFilter<Reader, true, kEncodingHasNulls>(
filter, dwio::common::DropValues(), rows);
} else {
processTimestampFilter<Reader, false, kEncodingHasNulls>(
filter, dwio::common::DropValues(), rows);
}
}
}

void read(
int64_t offset,
const RowSet& rows,
const uint64_t* /*incomingNulls*/) override {
auto& data = formatData_->as<ParquetData>();
// Use int128_t as a workaround. Timestamp in Velox is of 16-byte length.
prepareRead<int128_t>(offset, rows, nullptr);
readCommon<IntegerColumnReader, true>(rows);
readTimestamp<IntegerColumnReader, true>(rows);
readOffset_ += rows.back() + 1;
}

Expand Down
14 changes: 5 additions & 9 deletions velox/type/Filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -1788,7 +1788,7 @@ class NegatedBytesRange final : public Filter {
/// Open ranges can be implemented by using the value to the left
/// or right of the end of the range, e.g. a < timestamp '2023-07-19
/// 17:00:00.777' is equivalent to a <= timestamp '2023-07-19 17:00:00.776'.
class TimestampRange final : public Filter {
class TimestampRange : public Filter {
public:
/// @param lower Lower end of the range, inclusive.
/// @param upper Upper end of the range, inclusive.
Expand Down Expand Up @@ -1824,14 +1824,6 @@ class TimestampRange final : public Filter {
nullAllowed_ ? "with nulls" : "no nulls");
}

bool testInt128(int128_t value) const final {
// Convert int128_t to Timestamp by extracting days and nanos.
const int32_t days = static_cast<int32_t>(value >> 64);
const uint64_t nanos = value & ((((1ULL << 63) - 1ULL) << 1) + 1);
const auto ts = Timestamp::fromDaysAndNanos(days, nanos);
return ts >= lower_ && ts <= upper_;
}

bool testTimestamp(Timestamp value) const override {
return value >= lower_ && value <= upper_;
}
Expand Down Expand Up @@ -1859,6 +1851,10 @@ class TimestampRange final : public Filter {
return upper_;
}

const bool nullAllowed() const {
return nullAllowed_;
}

bool testingEquals(const Filter& other) const final;

private:
Expand Down

0 comments on commit fdfc09b

Please sign in to comment.