Skip to content

Commit

Permalink
Add ParquetInt96TimestampRange
Browse files Browse the repository at this point in the history
  • Loading branch information
rui-mo committed Nov 8, 2024
1 parent c26be68 commit dd9dea8
Show file tree
Hide file tree
Showing 5 changed files with 203 additions and 57 deletions.
144 changes: 143 additions & 1 deletion velox/dwio/parquet/reader/TimestampColumnReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,31 @@
#include "velox/dwio/parquet/reader/ParquetColumnReader.h"

namespace facebook::velox::parquet {
namespace {

// Range filter for Parquet Int96 Timestamp.
class ParquetInt96TimestampRange : public common::TimestampRange {
public:
// @param lower Lower end of the range, inclusive.
// @param upper Upper end of the range, inclusive.
// @param nullAllowed Null values are passing the filter if true.
ParquetInt96TimestampRange(
const Timestamp& lower,
const Timestamp& upper,
bool nullAllowed)
: TimestampRange(lower, upper, nullAllowed) {}

// Int96 is read as int128_t value and converted to Timestamp by extracting
// days and nanos.
bool testInt128(int128_t value) const final override {
const int32_t days = static_cast<int32_t>(value >> 64);
const uint64_t nanos = value & ((((1ULL << 63) - 1ULL) << 1) + 1);
const auto ts = Timestamp::fromDaysAndNanos(days, nanos);
return ts >= this->lower() && ts <= this->upper();
}
};

} // namespace

class TimestampColumnReader : public IntegerColumnReader {
public:
Expand Down Expand Up @@ -71,14 +96,131 @@ class TimestampColumnReader : public IntegerColumnReader {
}
}

template <
typename Reader,
typename TFilter,
bool isDense,
typename ExtractValues>
void readHelperTimestamp(
velox::common::Filter* filter,
const RowSet& rows,
ExtractValues extractValues) {
if constexpr (std::is_same_v<TFilter, velox::common::TimestampRange>) {
// Convert TimestampRange to ParquetInt96TimestampRange.
auto* range = reinterpret_cast<common::TimestampRange*>(filter);
ParquetInt96TimestampRange newRange = ParquetInt96TimestampRange(
range->lower(), range->upper(), range->nullAllowed());
reinterpret_cast<Reader*>(this)->Reader::readWithVisitor(
rows,
dwio::common::ColumnVisitor<
int128_t,
ParquetInt96TimestampRange,
ExtractValues,
isDense>(newRange, this, rows, extractValues));
} else {
reinterpret_cast<Reader*>(this)->Reader::readWithVisitor(
rows,
dwio::common::
ColumnVisitor<int128_t, TFilter, ExtractValues, isDense>(
*reinterpret_cast<TFilter*>(filter),
this,
rows,
extractValues));
}
return;
}

template <
typename Reader,
bool isDense,
bool kEncodingHasNulls,
typename ExtractValues>
void processTimestampFilter(
velox::common::Filter* filter,
ExtractValues extractValues,
const RowSet& rows) {
if (filter == nullptr) {
readHelperTimestamp<Reader, velox::common::AlwaysTrue, isDense>(
&dwio::common::alwaysTrue(), rows, extractValues);
return;
}

switch (filter->kind()) {
case velox::common::FilterKind::kAlwaysTrue:
readHelperTimestamp<Reader, velox::common::AlwaysTrue, isDense>(
filter, rows, extractValues);
break;
case velox::common::FilterKind::kIsNull:
if constexpr (kEncodingHasNulls) {
filterNulls<int64_t>(
rows,
true,
!std::
is_same_v<decltype(extractValues), dwio::common::DropValues>);
} else {
readHelperTimestamp<Reader, velox::common::IsNull, isDense>(
filter, rows, extractValues);
}
break;
case velox::common::FilterKind::kIsNotNull:
if constexpr (
kEncodingHasNulls &&
std::is_same_v<decltype(extractValues), dwio::common::DropValues>) {
filterNulls<int64_t>(rows, false, false);
} else {
readHelperTimestamp<Reader, velox::common::IsNotNull, isDense>(
filter, rows, extractValues);
}
break;
case velox::common::FilterKind::kTimestampRange:
readHelperTimestamp<Reader, velox::common::TimestampRange, isDense>(
filter, rows, extractValues);
break;
default:
VELOX_UNREACHABLE();
}
}

template <typename Reader, bool kEncodingHasNulls>
void readTimestamp(const RowSet& rows) {
const bool isDense = rows.back() == rows.size() - 1;
velox::common::Filter* filter =
scanSpec_->filter() ? scanSpec_->filter() : &dwio::common::alwaysTrue();
if (scanSpec_->keepValues()) {
if (scanSpec_->valueHook()) {
if (isDense) {
processValueHook<Reader, true>(rows, scanSpec_->valueHook());
} else {
processValueHook<Reader, false>(rows, scanSpec_->valueHook());
}
} else {
if (isDense) {
processTimestampFilter<Reader, true, kEncodingHasNulls>(
filter, dwio::common::ExtractToReader(this), rows);
} else {
processTimestampFilter<Reader, false, kEncodingHasNulls>(
filter, dwio::common::ExtractToReader(this), rows);
}
}
} else {
if (isDense) {
processTimestampFilter<Reader, true, kEncodingHasNulls>(
filter, dwio::common::DropValues(), rows);
} else {
processTimestampFilter<Reader, false, kEncodingHasNulls>(
filter, dwio::common::DropValues(), rows);
}
}
}

void read(
int64_t offset,
const RowSet& rows,
const uint64_t* /*incomingNulls*/) override {
auto& data = formatData_->as<ParquetData>();
// Use int128_t as a workaround. Timestamp in Velox is of 16-byte length.
prepareRead<int128_t>(offset, rows, nullptr);
readCommon<IntegerColumnReader, true>(rows);
readTimestamp<IntegerColumnReader, true>(rows);
readOffset_ += rows.back() + 1;
}

Expand Down
Binary file not shown.
Binary file not shown.
102 changes: 55 additions & 47 deletions velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,16 +197,12 @@ class ParquetTableScanTest : public HiveConnectorTestBase {
}

// Write data to a parquet file on specified path.
// @param writeInt96AsTimestamp Write timestamp as Int96 if enabled.
void writeToParquetFile(
const std::string& path,
const std::vector<RowVectorPtr>& data,
bool writeInt96AsTimestamp) {
WriterOptions options) {
VELOX_CHECK_GT(data.size(), 0);

WriterOptions options;
options.writeInt96AsTimestamp = writeInt96AsTimestamp;

auto writeFile = std::make_unique<LocalWriteFile>(path, true, false);
auto sink = std::make_unique<dwio::common::WriteFileSink>(
std::move(writeFile), path);
Expand All @@ -223,43 +219,43 @@ class ParquetTableScanTest : public HiveConnectorTestBase {
writer->close();
}

void testInt96TimestampRead(const std::string& fileName) {
// Timestamp-int96.parquet holds one column (t: TIMESTAMP) and
// 10 rows in one row group. Data is in SNAPPY compressed format.
// The values are:
// |t |
// +-------------------+
// |2015-06-01 19:34:56|
// |2015-06-02 19:34:56|
// |2001-02-03 03:34:06|
// |1998-03-01 08:01:06|
// |2022-12-23 03:56:01|
// |1980-01-24 00:23:07|
// |1999-12-08 13:39:26|
// |2023-04-21 09:09:34|
// |2000-09-12 22:36:29|
// |2007-12-12 04:27:56|
// +-------------------+
auto vector = makeFlatVector<Timestamp>(
{Timestamp(1433187296, 0),
Timestamp(1433273696, 0),
Timestamp(981171246, 0),
Timestamp(888739266, 0),
Timestamp(1671767761, 0),
Timestamp(317521387, 0),
Timestamp(944660366, 0),
Timestamp(1682068174, 0),
Timestamp(968798189, 0),
Timestamp(1197433676, 0)});

loadData(
getExampleFilePath(fileName),
ROW({"t"}, {TIMESTAMP()}),
makeRowVector(
{"t"},
{
vector,
}));
void testInt96TimestampRead(const WriterOptions& options) {
auto stringToTimestamp = [](std::string_view view) {
return util::fromTimestampString(
view.data(),
view.size(),
util::TimestampParseMode::kPrestoCast)
.thenOrThrow(folly::identity, [&](const Status& status) {
VELOX_USER_FAIL("{}", status.message());
});
};
std::vector<std::string_view> views = {
"2015-06-01 19:34:56",
"2015-06-02 19:34:56",
"2001-02-03 03:34:06",
"1998-03-01 08:01:06",
"2022-12-23 03:56:01",
"1980-01-24 00:23:07",
"1999-12-08 13:39:26",
"2023-04-21 09:09:34",
"2000-09-12 22:36:29",
"2007-12-12 04:27:56",
};
std::vector<Timestamp> values;
values.reserve(views.size());
for (auto view : views) {
values.emplace_back(stringToTimestamp(view));
}

auto vector = makeRowVector(
{"t"},
{
makeFlatVector<Timestamp>(values),
});
auto schema = asRowType(vector->type());
auto file = TempFilePath::create();
writeToParquetFile(file->getPath(), {vector}, options);
loadData(file->getPath(), schema, vector);

assertSelectWithFilter({"t"}, {}, "", "SELECT t from tmp");
assertSelectWithFilter(
Expand Down Expand Up @@ -820,11 +816,17 @@ TEST_F(ParquetTableScanTest, sessionTimezone) {
}

TEST_F(ParquetTableScanTest, timestampInt96Dictionary) {
testInt96TimestampRead("timestamp_int96_dictionary.parquet");
WriterOptions options;
options.writeInt96AsTimestamp = true;
options.enableDictionary = true;
testInt96TimestampRead(options);
}

TEST_F(ParquetTableScanTest, timestampInt96Plain) {
testInt96TimestampRead("timestamp_int96_plain.parquet");
WriterOptions options;
options.writeInt96AsTimestamp = true;
options.enableDictionary = false;
testInt96TimestampRead(options);
}

TEST_F(ParquetTableScanTest, timestampPrecisionMicrosecond) {
Expand All @@ -836,7 +838,9 @@ TEST_F(ParquetTableScanTest, timestampPrecisionMicrosecond) {
});
auto schema = asRowType(vector->type());
auto file = TempFilePath::create();
writeToParquetFile(file->getPath(), {vector}, true);
WriterOptions options;
options.writeInt96AsTimestamp = true;
writeToParquetFile(file->getPath(), {vector}, options);
auto plan = PlanBuilder().tableScan(schema).planNode();

// Read timestamp data from parquet with microsecond precision.
Expand Down Expand Up @@ -932,7 +936,9 @@ TEST_F(ParquetTableScanTest, readParquetColumnsByIndexMixedType) {
const std::shared_ptr<exec::test::TempDirectoryPath> dataFileFolder =
exec::test::TempDirectoryPath::create();
auto filePath = dataFileFolder->getPath() + "/" + "nested_data.parquet";
writeToParquetFile(filePath, {dataFileVectors}, false);
WriterOptions options;
options.writeInt96AsTimestamp = false;
writeToParquetFile(filePath, {dataFileVectors}, options);

// Create a row type with columns having different names than in the file.
auto structType = ROW({"aa1", "bb1"}, {BIGINT(), BIGINT()});
Expand Down Expand Up @@ -1044,7 +1050,9 @@ TEST_F(ParquetTableScanTest, readParquetColumnsByIndex) {
const std::shared_ptr<exec::test::TempDirectoryPath> dataFileFolder =
exec::test::TempDirectoryPath::create();
auto filePath = dataFileFolder->getPath() + "/" + "data.parquet";
writeToParquetFile(filePath, {dataFileVectors}, false);
WriterOptions options;
options.writeInt96AsTimestamp = false;
writeToParquetFile(filePath, {dataFileVectors}, options);

auto rowType = ROW({"c2", "c3"}, {BIGINT(), BIGINT()});

Expand Down
14 changes: 5 additions & 9 deletions velox/type/Filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -1788,7 +1788,7 @@ class NegatedBytesRange final : public Filter {
/// Open ranges can be implemented by using the value to the left
/// or right of the end of the range, e.g. a < timestamp '2023-07-19
/// 17:00:00.777' is equivalent to a <= timestamp '2023-07-19 17:00:00.776'.
class TimestampRange final : public Filter {
class TimestampRange : public Filter {
public:
/// @param lower Lower end of the range, inclusive.
/// @param upper Upper end of the range, inclusive.
Expand Down Expand Up @@ -1824,14 +1824,6 @@ class TimestampRange final : public Filter {
nullAllowed_ ? "with nulls" : "no nulls");
}

bool testInt128(int128_t value) const final {
// Convert int128_t to Timestamp by extracting days and nanos.
const int32_t days = static_cast<int32_t>(value >> 64);
const uint64_t nanos = value & ((((1ULL << 63) - 1ULL) << 1) + 1);
const auto ts = Timestamp::fromDaysAndNanos(days, nanos);
return ts >= lower_ && ts <= upper_;
}

bool testTimestamp(Timestamp value) const override {
return value >= lower_ && value <= upper_;
}
Expand Down Expand Up @@ -1859,6 +1851,10 @@ class TimestampRange final : public Filter {
return upper_;
}

const bool nullAllowed() const {
return nullAllowed_;
}

bool testingEquals(const Filter& other) const final;

private:
Expand Down

0 comments on commit dd9dea8

Please sign in to comment.