Skip to content

Commit

Permalink
Support timestamp reader
Browse files Browse the repository at this point in the history
  • Loading branch information
rui-mo committed Feb 20, 2024
1 parent d55c48b commit 15d35cc
Show file tree
Hide file tree
Showing 12 changed files with 239 additions and 10 deletions.
13 changes: 8 additions & 5 deletions velox/connectors/hive/HiveDataSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,11 +158,14 @@ HiveDataSource::HiveDataSource(
for (auto& [k, v] : hiveTableHandle_->subfieldFilters()) {
filters.emplace(k.clone(), v->clone());
}
auto remainingFilter = extractFiltersFromRemainingFilter(
hiveTableHandle_->remainingFilter(),
expressionEvaluator_,
false,
filters);
auto remainingFilter = hiveTableHandle_->remainingFilter();
if (hiveTableHandle_->isFilterPushdownEnabled()) {
remainingFilter = extractFiltersFromRemainingFilter(
hiveTableHandle_->remainingFilter(),
expressionEvaluator_,
false,
filters);
}

std::vector<common::Subfield> remainingFilterSubfields;
if (remainingFilter) {
Expand Down
3 changes: 3 additions & 0 deletions velox/dwio/common/SelectiveColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,9 @@ void SelectiveColumnReader::getIntValues(
VELOX_FAIL("Unsupported value size: {}", valueSize_);
}
break;
case TypeKind::TIMESTAMP:
getFlatValues<Timestamp, Timestamp>(rows, result, requestedType);
break;
default:
VELOX_FAIL(
"Not a valid type for integer reader: {}", requestedType->toString());
Expand Down
46 changes: 45 additions & 1 deletion velox/dwio/parquet/reader/PageReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,49 @@ void PageReader::prepareDictionary(const PageHeader& pageHeader) {
}
break;
}
case thrift::Type::INT96: {
auto numVeloxBytes = dictionary_.numValues * sizeof(Timestamp);
dictionary_.values = AlignedBuffer::allocate<char>(numVeloxBytes, &pool_);
auto numBytes = dictionary_.numValues * sizeof(Int96Timestamp);
if (pageData_) {
memcpy(dictionary_.values->asMutable<char>(), pageData_, numBytes);
} else {
dwio::common::readBytes(
numBytes,
inputStream_.get(),
dictionary_.values->asMutable<char>(),
bufferStart_,
bufferEnd_);
}
// Expand the Parquet type length values to Velox type length.
// We start from the end to allow in-place expansion.
auto values = dictionary_.values->asMutable<Timestamp>();
auto parquetValues = dictionary_.values->asMutable<char>();

for (auto i = dictionary_.numValues - 1; i >= 0; --i) {
// Convert the timestamp into seconds and nanos since the Unix epoch,
// 00:00:00.000000 on 1 January 1970.
uint64_t nanos;
memcpy(
&nanos,
parquetValues + i * sizeof(Int96Timestamp),
sizeof(uint64_t));
int32_t days;
memcpy(
&days,
parquetValues + i * sizeof(Int96Timestamp) + sizeof(uint64_t),
sizeof(int32_t));
int64_t seconds = (days - Timestamp::kJulianToUnixEpochDays) *
Timestamp::kSecondsPerDay;
if (nanos > Timestamp::kMaxNanos) {
seconds += nanos / Timestamp::kNanosInSecond;
nanos -=
(nanos / Timestamp::kNanosInSecond) * Timestamp::kNanosInSecond;
}
values[i] = Timestamp(seconds, nanos);
}
break;
}
case thrift::Type::BYTE_ARRAY: {
dictionary_.values =
AlignedBuffer::allocate<StringView>(dictionary_.numValues, &pool_);
Expand Down Expand Up @@ -456,7 +499,6 @@ void PageReader::prepareDictionary(const PageHeader& pageHeader) {
VELOX_UNSUPPORTED(
"Parquet type {} not supported for dictionary", parquetType);
}
case thrift::Type::INT96:
default:
VELOX_UNSUPPORTED(
"Parquet type {} not supported for dictionary", parquetType);
Expand All @@ -483,6 +525,8 @@ int32_t parquetTypeBytes(thrift::Type::type type) {
case thrift::Type::INT64:
case thrift::Type::DOUBLE:
return 8;
case thrift::Type::INT96:
return 12;
default:
VELOX_FAIL("Type does not have a byte width {}", type);
}
Expand Down
5 changes: 5 additions & 0 deletions velox/dwio/parquet/reader/ParquetColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "velox/dwio/parquet/reader/RepeatedColumnReader.h"
#include "velox/dwio/parquet/reader/StringColumnReader.h"
#include "velox/dwio/parquet/reader/StructColumnReader.h"
#include "velox/dwio/parquet/reader/TimestampColumnReader.h"

namespace facebook::velox::parquet {

Expand Down Expand Up @@ -74,6 +75,10 @@ std::unique_ptr<dwio::common::SelectiveColumnReader> ParquetColumnReader::build(
return std::make_unique<BooleanColumnReader>(
requestedType, fileType, params, scanSpec);

case TypeKind::TIMESTAMP:
return std::make_unique<TimestampColumnReader>(
requestedType, fileType, params, scanSpec);

default:
VELOX_FAIL(
"buildReader unhandled type: " +
Expand Down
2 changes: 1 addition & 1 deletion velox/dwio/parquet/reader/ParquetReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,7 @@ TypePtr ReaderBase::convertType(
case thrift::Type::type::INT64:
return BIGINT();
case thrift::Type::type::INT96:
return DOUBLE(); // TODO: Lose precision
return TIMESTAMP();
case thrift::Type::type::FLOAT:
return REAL();
case thrift::Type::type::DOUBLE:
Expand Down
49 changes: 49 additions & 0 deletions velox/dwio/parquet/reader/TimestampColumnReader.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "velox/dwio/parquet/reader/IntegerColumnReader.h"
#include "velox/dwio/parquet/reader/ParquetColumnReader.h"

namespace facebook::velox::parquet {

class TimestampColumnReader : public IntegerColumnReader {
public:
TimestampColumnReader(
const std::shared_ptr<const dwio::common::TypeWithId>& requestedType,
std::shared_ptr<const dwio::common::TypeWithId> fileType,
ParquetParams& params,
common::ScanSpec& scanSpec)
: IntegerColumnReader(requestedType, fileType, params, scanSpec) {}

bool hasBulkPath() const override {
return false;
}

void read(
vector_size_t offset,
RowSet rows,
const uint64_t* /*incomingNulls*/) override {
auto& data = formatData_->as<ParquetData>();
// Use int128_t as a workaroud. Timestamp in Velox is of 16-byte length.
prepareRead<int128_t>(offset, rows, nullptr);
readCommon<IntegerColumnReader>(rows);
readOffset_ += rows.back() + 1;
}
};

} // namespace facebook::velox::parquet
Binary file not shown.
106 changes: 106 additions & 0 deletions velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,34 @@ class ParquetTableScanTest : public HiveConnectorTestBase {
assertQuery(plan, splits_, sql);
}

void assertSelectWithFilter(
std::vector<std::string>&& outputColumnNames,
const std::vector<std::string>& subfieldFilters,
const std::string& remainingFilter,
const std::string& sql,
bool isFilterPushdownEnabled) {
auto rowType = getRowType(std::move(outputColumnNames));
parse::ParseOptions options;
options.parseDecimalAsDouble = false;

auto plan = PlanBuilder(pool_.get())
.setParseOptions(options)
// Function extractFiltersFromRemainingFilter will extract
// filters to subfield filters, but for some types, filter
// pushdown is not supported.
.tableScan(
"hive_table",
rowType,
{},
subfieldFilters,
remainingFilter,
nullptr,
isFilterPushdownEnabled)
.planNode();

assertQuery(plan, splits_, sql);
}

void assertSelectWithAgg(
std::vector<std::string>&& outputColumnNames,
const std::vector<std::string>& aggregates,
Expand Down Expand Up @@ -443,6 +471,84 @@ TEST_F(ParquetTableScanTest, readAsLowerCase) {
result.second, {makeRowVector({"a"}, {makeFlatVector<int64_t>({0, 1})})});
}

TEST_F(ParquetTableScanTest, timestampFilter) {
// Timestamp-int96.parquet holds one column (t: TIMESTAMP) and
// 10 rows in one row group. Data is in SNAPPY compressed format.
// The values are:
// |t |
// +-------------------+
// |2015-06-01 19:34:56|
// |2015-06-02 19:34:56|
// |2001-02-03 03:34:06|
// |1998-03-01 08:01:06|
// |2022-12-23 03:56:01|
// |1980-01-24 00:23:07|
// |1999-12-08 13:39:26|
// |2023-04-21 09:09:34|
// |2000-09-12 22:36:29|
// |2007-12-12 04:27:56|
// +-------------------+
auto vector = makeFlatVector<Timestamp>(
{Timestamp(1433116800, 70496000000000),
Timestamp(1433203200, 70496000000000),
Timestamp(981158400, 12846000000000),
Timestamp(888710400, 28866000000000),
Timestamp(1671753600, 14161000000000),
Timestamp(317520000, 1387000000000),
Timestamp(944611200, 49166000000000),
Timestamp(1682035200, 32974000000000),
Timestamp(968716800, 81389000000000),
Timestamp(1197417600, 16076000000000)});

loadData(
getExampleFilePath("timestamp_int96.parquet"),
ROW({"t"}, {TIMESTAMP()}),
makeRowVector(
{"t"},
{
vector,
}));

assertSelectWithFilter({"t"}, {}, "", "SELECT t from tmp", false);
assertSelectWithFilter(
{"t"},
{},
"t < TIMESTAMP '2000-09-12 22:36:29'",
"SELECT t from tmp where t < TIMESTAMP '2000-09-12 22:36:29'",
false);
assertSelectWithFilter(
{"t"},
{},
"t <= TIMESTAMP '2000-09-12 22:36:29'",
"SELECT t from tmp where t <= TIMESTAMP '2000-09-12 22:36:29'",
false);
assertSelectWithFilter(
{"t"},
{},
"t > TIMESTAMP '1980-01-24 00:23:07'",
"SELECT t from tmp where t > TIMESTAMP '1980-01-24 00:23:07'",
false);
assertSelectWithFilter(
{"t"},
{},
"t >= TIMESTAMP '1980-01-24 00:23:07'",
"SELECT t from tmp where t >= TIMESTAMP '1980-01-24 00:23:07'",
false);
assertSelectWithFilter(
{"t"},
{},
"t == TIMESTAMP '2022-12-23 03:56:01'",
"SELECT t from tmp where t == TIMESTAMP '2022-12-23 03:56:01'",
false);
VELOX_ASSERT_THROW(
assertSelectWithFilter(
{"t"},
{"t < TIMESTAMP '2000-09-12 22:36:29'"},
"",
"SELECT t from tmp where t < TIMESTAMP '2000-09-12 22:36:29'"),
"testInt128() is not supported");
}

int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
folly::Init init{&argc, &argv, false};
Expand Down
6 changes: 4 additions & 2 deletions velox/exec/tests/utils/PlanBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,13 @@ PlanBuilder& PlanBuilder::tableScan(
const std::unordered_map<std::string, std::string>& columnAliases,
const std::vector<std::string>& subfieldFilters,
const std::string& remainingFilter,
const RowTypePtr& dataColumns) {
const RowTypePtr& dataColumns,
bool isFilterPushdownEnabled) {
return TableScanBuilder(*this)
.tableName(tableName)
.outputType(outputType)
.columnAliases(columnAliases)
.filterPushdown(isFilterPushdownEnabled)
.subfieldFilters(subfieldFilters)
.remainingFilter(remainingFilter)
.dataColumns(dataColumns)
Expand Down Expand Up @@ -200,7 +202,7 @@ core::PlanNodePtr PlanBuilder::TableScanBuilder::build(core::PlanNodeId id) {
tableHandle_ = std::make_shared<HiveTableHandle>(
connectorId_,
tableName_,
true,
isFilterPushdownEnabled_,
std::move(filters),
remainingFilterExpr,
dataColumns_);
Expand Down
10 changes: 9 additions & 1 deletion velox/exec/tests/utils/PlanBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ class PlanBuilder {
const std::unordered_map<std::string, std::string>& columnAliases = {},
const std::vector<std::string>& subfieldFilters = {},
const std::string& remainingFilter = "",
const RowTypePtr& dataColumns = nullptr);
const RowTypePtr& dataColumns = nullptr,
bool isFilterPushdownEnabled = true);

/// Add a TableScanNode to scan a TPC-H table.
///
Expand Down Expand Up @@ -176,6 +177,12 @@ class PlanBuilder {
return *this;
}

/// @param isFilterPushdownEnabled Whether filter push-down is enabled.
TableScanBuilder& filterPushdown(bool isFilterPushdownEnabled) {
isFilterPushdownEnabled_ = isFilterPushdownEnabled;
return *this;
}

/// @param outputType List of column names and types to read from the table.
TableScanBuilder& outputType(RowTypePtr outputType) {
outputType_ = std::move(outputType);
Expand Down Expand Up @@ -261,6 +268,7 @@ class PlanBuilder {
PlanBuilder& planBuilder_;
std::string tableName_{"hive_table"};
std::string connectorId_{"test-hive"};
bool isFilterPushdownEnabled_ = true;
RowTypePtr outputType_;
std::vector<std::string> subfieldFilters_;
std::string remainingFilter_;
Expand Down
4 changes: 4 additions & 0 deletions velox/type/Timestamp.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ struct Timestamp {
static constexpr int64_t kMicrosecondsInMillisecond = 1'000;
static constexpr int64_t kNanosecondsInMicrosecond = 1'000;
static constexpr int64_t kNanosecondsInMillisecond = 1'000'000;
static constexpr int64_t kNanosInSecond =
kNanosecondsInMillisecond * kMillisecondsInSecond;
static constexpr int64_t kJulianToUnixEpochDays = 2440588LL;
static constexpr int64_t kSecondsPerDay = 86400LL;

// Limit the range of seconds to avoid some problems. Seconds should be
// in the range [INT64_MIN/1000 - 1, INT64_MAX/1000].
Expand Down
5 changes: 5 additions & 0 deletions velox/type/Type.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ namespace facebook::velox {

using int128_t = __int128_t;

struct __attribute__((__packed__)) Int96Timestamp {
int32_t days;
uint64_t nanos;
};

/// Velox type system supports a small set of SQL-compatible composeable types:
/// BOOLEAN, TINYINT, SMALLINT, INTEGER, BIGINT, HUGEINT, REAL, DOUBLE, VARCHAR,
/// VARBINARY, TIMESTAMP, ARRAY, MAP, ROW
Expand Down

0 comments on commit 15d35cc

Please sign in to comment.