Skip to content

Commit

Permalink
Refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
mskapilks committed Sep 4, 2024
1 parent 74fce2d commit f4764d3
Show file tree
Hide file tree
Showing 4 changed files with 219 additions and 43 deletions.
2 changes: 1 addition & 1 deletion velox/dwio/parquet/reader/PageReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ void PageReader::prepareDictionary(const PageHeader& pageHeader) {
: sizeof(double);
auto numBytes = dictionary_.numValues * typeSize;
if (type_->type()->isShortDecimal() &&
parquetType == thrift::Type::INT32) {
parquetType == thrift::Type::INT32) {
auto veloxTypeLength = type_->type()->cppSizeInBytes();
auto numVeloxBytes = dictionary_.numValues * veloxTypeLength;
dictionary_.values =
Expand Down
15 changes: 9 additions & 6 deletions velox/dwio/parquet/reader/ParquetColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,15 @@ std::unique_ptr<dwio::common::SelectiveColumnReader> ParquetColumnReader::build(

case TypeKind::TIMESTAMP: {
auto& parquetFileType = static_cast<const ParquetTypeWithId&>(*fileType);
auto logicalTypeOpt = parquetFileType.logicalType_;
if (logicalTypeOpt.has_value() && logicalTypeOpt.value().__isset.TIMESTAMP) {return std::make_unique<TimestampINT64ColumnReader>(
requestedType, fileType, params, scanSpec);}
else {
return std::make_unique<TimestampColumnReader>(
requestedType, fileType, params, scanSpec);
auto logicalTypeOpt = parquetFileType.logicalType_;
if (logicalTypeOpt.has_value() &&
logicalTypeOpt.value().__isset.TIMESTAMP &&
parquetFileType.parquetType_ == thrift::Type::INT64) {
return std::make_unique<TimestampINT64ColumnReader>(
requestedType, fileType, params, scanSpec);
} else {
return std::make_unique<TimestampINT96ColumnReader>(
requestedType, fileType, params, scanSpec);
}
}
default:
Expand Down
169 changes: 135 additions & 34 deletions velox/dwio/parquet/reader/TimestampColumnReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@

namespace facebook::velox::parquet {

class TimestampColumnReader : public IntegerColumnReader {
class TimestampINT96ColumnReader : public IntegerColumnReader {
public:
TimestampColumnReader(
TimestampINT96ColumnReader(
const TypePtr& requestedType,
std::shared_ptr<const dwio::common::TypeWithId> fileType,
ParquetParams& params,
Expand Down Expand Up @@ -89,7 +89,8 @@ class TimestampINT64ColumnReader : public IntegerColumnReader {
std::shared_ptr<const dwio::common::TypeWithId> fileType,
ParquetParams& params,
common::ScanSpec& scanSpec)
: IntegerColumnReader(BIGINT(), fileType, params, scanSpec) {
: IntegerColumnReader(requestedType, fileType, params, scanSpec),
timestampPrecision_(params.timestampPrecision()) {
auto& parquetFileType = static_cast<const ParquetTypeWithId&>(*fileType_);
auto logicalTypeOpt = parquetFileType.logicalType_;
VELOX_CHECK(logicalTypeOpt.has_value());
Expand All @@ -102,9 +103,9 @@ class TimestampINT64ColumnReader : public IntegerColumnReader {
}

if (logicalType.TIMESTAMP.unit.__isset.MICROS) {
sourcePrecision_ = TimestampPrecision::kMicroseconds;
parquetTimestampPrecision_ = TimestampPrecision::kMicroseconds;
} else if (logicalType.TIMESTAMP.unit.__isset.MILLIS) {
sourcePrecision_ = TimestampPrecision::kMilliseconds;
parquetTimestampPrecision_ = TimestampPrecision::kMilliseconds;
} else {
VELOX_NYI("Nano Timestamp unit is not supported.");
}
Expand All @@ -114,55 +115,155 @@ class TimestampINT64ColumnReader : public IntegerColumnReader {
return false;
}

void getValues(RowSet rows, VectorPtr* result) override {
// Upcast to int128_t here so we have enough memory already in vector to
// hold Timestamp (16bit) vs int64_t (8bit)
getFlatValues<int64_t, int128_t>(rows, result, requestedType_);

VectorPtr resultVector = *result;
auto intValues = resultVector->asUnchecked<FlatVector<int128_t>>();
void
processNulls(const bool isNull, const RowSet rows, const uint64_t* rawNulls) {
if (!rawNulls) {
return;
}
auto rawTs = values_->asMutable<Timestamp>();

auto rawValues =
resultVector->asUnchecked<FlatVector<int128_t>>()->mutableRawValues();
returnReaderNulls_ = false;
anyNulls_ = !isNull;
allNull_ = isNull;
vector_size_t idx = 0;
for (vector_size_t i = 0; i < numValues_; i++) {
if (isNull) {
if (bits::isBitNull(rawNulls, i)) {
bits::setNull(rawResultNulls_, idx);
addOutputRow(rows[i]);
idx++;
}
} else {
if (!bits::isBitNull(rawNulls, i)) {
bits::setNull(rawResultNulls_, idx, false);
rawTs[idx] = rawTs[i];
addOutputRow(rows[i]);
idx++;
}
}
}
}

Timestamp timestamp;
for (vector_size_t i = 0; i < numValues_; ++i) {
if (intValues->isNullAt(i))
continue;
void processFilter(
const common::Filter* filter,
const RowSet rows,
const uint64_t* rawNulls) {
auto rawTs = values_->asMutable<Timestamp>();

const auto timestampInt = intValues->valueAt(i);
std::cout << static_cast<int64_t>(timestampInt) << std::endl;
Timestamp timestamp;
if (sourcePrecision_ == TimestampPrecision::kMicroseconds) {
timestamp = Timestamp::fromMicros(timestampInt);
returnReaderNulls_ = false;
anyNulls_ = false;
allNull_ = true;
vector_size_t idx = 0;
for (vector_size_t i = 0; i < numValues_; i++) {
if (rawNulls && bits::isBitNull(rawNulls, i)) {
if (filter->testNull()) {
bits::setNull(rawResultNulls_, idx);
addOutputRow(rows[i]);
anyNulls_ = true;
idx++;
}
} else {
timestamp = Timestamp::fromMillis(timestampInt);
if (filter->testTimestamp(rawTs[i])) {
if (rawNulls) {
bits::setNull(rawResultNulls_, idx, false);
}
rawTs[idx] = rawTs[i];
addOutputRow(rows[i]);
allNull_ = false;
idx++;
}
}
}
}

void getValues(RowSet rows, VectorPtr* result) override {
getFlatValues<Timestamp, Timestamp>(rows, result, requestedType_);
}

memcpy(&rawValues[i], &timestamp, sizeof(int128_t));
Timestamp adjustToPrecision(
const Timestamp& timestamp,
TimestampPrecision precision) {
auto nano = timestamp.getNanos();
switch (precision) {
case TimestampPrecision::kMilliseconds:
nano = nano / 1'000'000 * 1'000'000;
break;
case TimestampPrecision::kMicroseconds:
nano = nano / 1'000 * 1'000;
break;
case TimestampPrecision::kNanoseconds:
break;
}

*result = std::make_shared<FlatVector<Timestamp>>(
&memoryPool_,
TIMESTAMP(),
resultNulls(),
numValues_,
intValues->values(),
std::move(stringBuffers_));
return Timestamp(timestamp.getSeconds(), nano);
}

void read(
vector_size_t offset,
RowSet rows,
const uint64_t* /*incomingNulls*/) override {
auto& data = formatData_->as<ParquetData>();
// Use int128_t as a workaroud. Timestamp in Velox is of 16-byte length.
prepareRead<int64_t>(offset, rows, nullptr);

// Remove filter so that we can do filtering here once data is represented
// as timestamp type
const auto filter = scanSpec_->filter()->clone();
scanSpec_->setFilter(nullptr);

readCommon<IntegerColumnReader, true>(rows);

auto tsValues =
AlignedBuffer::allocate<Timestamp>(numValues_, &memoryPool_);
auto rawTs = tsValues->asMutable<Timestamp>();
auto rawTsInt64 = values_->asMutable<int64_t>();
const auto rawNulls =
resultNulls() ? resultNulls()->as<uint64_t>() : nullptr;

Timestamp timestamp;
for (vector_size_t i = 0; i < numValues_; i++) {
if (!rawNulls || !bits::isBitNull(rawNulls, i)) {
if (parquetTimestampPrecision_ == TimestampPrecision::kMicroseconds) {
timestamp = Timestamp::fromMicros(rawTsInt64[i]);
} else {
timestamp = Timestamp::fromMillis(rawTsInt64[i]);
}

rawTs[i] = adjustToPrecision(timestamp, timestampPrecision_);
}
}
values_ = tsValues;
rawValues_ = values_->asMutable<char>();

switch (
!filter ||
(filter->kind() == common::FilterKind::kIsNotNull && !rawNulls)
? common::FilterKind::kAlwaysTrue
: filter->kind()) {
case common::FilterKind::kAlwaysTrue:
// Simply add all rows to output.
for (vector_size_t i = 0; i < numValues_; i++) {
addOutputRow(rows[i]);
}
break;
case common::FilterKind::kIsNull:
processNulls(true, rows, rawNulls);
break;
case common::FilterKind::kIsNotNull:
processNulls(false, rows, rawNulls);
break;
case common::FilterKind::kTimestampRange:
case common::FilterKind::kMultiRange:
processFilter(filter.get(), rows, rawNulls);
break;
default:
VELOX_UNSUPPORTED("Unsupported filter.");
}

readOffset_ += rows.back() + 1;
}

private:
TimestampPrecision sourcePrecision_;
TimestampPrecision parquetTimestampPrecision_;
TimestampPrecision timestampPrecision_;
};
} // namespace facebook::velox::parquet
76 changes: 74 additions & 2 deletions velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -921,8 +921,8 @@ TEST_F(ParquetTableScanTest, timestampPrecisionMicrosecond) {
TEST_F(ParquetTableScanTest, timestampINT64millis) {
testTimestampINT64(
"int64_millis_dictionary.parquet", TimestampPrecision::kMilliseconds);
//testTimestampINT64(
// "int64_millis_plain.parquet", TimestampPrecision::kMilliseconds);
testTimestampINT64(
"int64_millis_plain.parquet", TimestampPrecision::kMilliseconds);
}

TEST_F(ParquetTableScanTest, timestampINT64micros) {
Expand All @@ -932,6 +932,78 @@ TEST_F(ParquetTableScanTest, timestampINT64micros) {
"int64_micros_plain.parquet", TimestampPrecision::kMicroseconds);
}

TEST_F(ParquetTableScanTest, timestampINT64Filter) {
std::vector<Timestamp> rawDataMillis = {
Timestamp(0, 0),
Timestamp(0, 1000000),
Timestamp(-1, 999000000),
Timestamp(1, 0),
Timestamp(-1, 0),
Timestamp(1, 1000000),
Timestamp(-2, 999000000),
Timestamp(0, 999000000),
Timestamp(-1, 1000000),
Timestamp(1000, 0),
Timestamp(-1000, 0),
Timestamp(1000, 1000000),
Timestamp(-1001, 999000000),
Timestamp(99, 999000000),
Timestamp(-100, 1000000)};

auto a = makeFlatVector<Timestamp>(
60, [&](auto row) { return rawDataMillis[row / 4]; });

auto expected = makeRowVector({"time"}, {a});
createDuckDbTable("expected", {expected});

auto vector = makeArrayVector<Timestamp>({{}});

loadData(
getExampleFilePath("int64_millis_dictionary.parquet"),
ROW({"time"}, {TIMESTAMP()}),
makeRowVector(
{"time"},
{
vector,
}));

assertSelectWithFilter(
{"time"},
{},
"time < TIMESTAMP '1970-01-01 00:00:00'",
"SELECT time from expected where time < TIMESTAMP '1970-01-01 00:00:00'");

assertSelectWithFilter(
{"time"},
{},
"time <= TIMESTAMP '1970-01-01 00:00:00'",
"SELECT time from expected where time <= TIMESTAMP '1970-01-01 00:00:00'");

assertSelectWithFilter(
{"time"},
{},
"time > TIMESTAMP '1970-01-01 00:00:00'",
"SELECT time from expected where time > TIMESTAMP '1970-01-01 00:00:00'");

assertSelectWithFilter(
{"time"},
{},
"time >= TIMESTAMP '1970-01-01 00:00:00'",
"SELECT time from expected where time >= TIMESTAMP '1970-01-01 00:00:00'");

assertSelectWithFilter(
{"time"},
{},
"time == TIMESTAMP '1970-01-01 00:00:00'",
"SELECT time from expected where time == TIMESTAMP '1970-01-01 00:00:00'");

assertSelectWithFilter(
{"time"},
{},
"time != TIMESTAMP '1970-01-01 00:00:00'",
"SELECT time from expected where time != TIMESTAMP '1970-01-01 00:00:00'");
}

TEST_F(ParquetTableScanTest, timestampINT64BackwardCompatible) {
auto a = makeFlatVector<Timestamp>(
3, [](auto row) { return Timestamp(0, 10 * 1000000L); });
Expand Down

0 comments on commit f4764d3

Please sign in to comment.