Skip to content

Commit

Permalink
Remove timestamp decoder
Browse files Browse the repository at this point in the history
  • Loading branch information
mskapilks committed Jul 16, 2024
1 parent e364315 commit d6c65cf
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 122 deletions.
27 changes: 24 additions & 3 deletions velox/dwio/common/DirectDecoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@ class DirectDecoder : public IntDecoder<isSigned> {
std::unique_ptr<dwio::common::SeekableInputStream> input,
bool useVInts,
uint32_t numBytes,
bool bigEndian = false)
: IntDecoder<isSigned>{std::move(input), useVInts, numBytes, bigEndian} {}
bool bigEndian = false,
std::optional<TimestampPrecision> precision = std::nullopt)
: IntDecoder<isSigned>{std::move(input), useVInts, numBytes, bigEndian},
precision_(precision) {}

void seekToRowGroup(dwio::common::PositionProvider&) override;

Expand Down Expand Up @@ -92,7 +94,24 @@ class DirectDecoder : public IntDecoder<isSigned> {
} else if constexpr (std::is_same_v<
typename Visitor::DataType,
int128_t>) {
toSkip = visitor.process(super::template readInt<int128_t>(), atEnd);
if (precision_.has_value()) {
auto units = super::template readInt<int64_t>();
Timestamp timestamp;
if (precision_.value() == TimestampPrecision::kMilliseconds) {
timestamp = Timestamp::fromMillis(units);
} else if (precision_.value() == TimestampPrecision::kMicroseconds) {
timestamp = Timestamp::fromMicros(units);
} else {
VELOX_NYI(
"Unsupported timestamp unit. Only kMillis and kMicros supported.");
}

int128_t value;
memcpy(&value, &timestamp, sizeof(int128_t));
toSkip = visitor.process(value, atEnd);
} else {
toSkip = visitor.process(super::template readInt<int128_t>(), atEnd);
}
} else {
toSkip = visitor.process(super::template readInt<int64_t>(), atEnd);
}
Expand All @@ -111,6 +130,8 @@ class DirectDecoder : public IntDecoder<isSigned> {
private:
using super = IntDecoder<isSigned>;

const std::optional<TimestampPrecision> precision_;

float readFloat() {
float temp;
auto buffer = readFixed(sizeof(float), &temp);
Expand Down
30 changes: 12 additions & 18 deletions velox/dwio/parquet/reader/PageReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,8 @@ void PageReader::makeDecoder() {
true);
}
break;
case thrift::Type::INT64:
case thrift::Type::INT64: {
std::optional<TimestampPrecision> precisionUnit;
if (type_->logicalType_.has_value() &&
type_->logicalType_.value().__isset.TIMESTAMP) {
auto logicalType = type_->logicalType_.value();
Expand All @@ -677,24 +678,19 @@ void PageReader::makeDecoder() {
!logicalType.TIMESTAMP.unit.__isset.NANOS,
"Nano Timestamp unit not supported.");

auto precisionUnit = logicalType.TIMESTAMP.unit.__isset.MICROS
precisionUnit = logicalType.TIMESTAMP.unit.__isset.MICROS
? TimestampPrecision::kMicroseconds
: TimestampPrecision::kMilliseconds;
timestampDecoder_ = std::make_unique<TimestampDecoder>(
precisionUnit,
std::make_unique<dwio::common::SeekableArrayInputStream>(
pageData_, encodedDataSize_),
false,
parquetTypeBytes(type_->parquetType_.value()));
} else {
directDecoder_ =
std::make_unique<dwio::common::DirectDecoder<true>>(
std::make_unique<dwio::common::SeekableArrayInputStream>(
pageData_, encodedDataSize_),
false,
parquetTypeBytes(type_->parquetType_.value()));
}
break;

directDecoder_ = std::make_unique<dwio::common::DirectDecoder<true>>(
std::make_unique<dwio::common::SeekableArrayInputStream>(
pageData_, encodedDataSize_),
false,
parquetTypeBytes(type_->parquetType_.value()),
false,
precisionUnit);
} break;
default: {
directDecoder_ = std::make_unique<dwio::common::DirectDecoder<true>>(
std::make_unique<dwio::common::SeekableArrayInputStream>(
Expand Down Expand Up @@ -749,8 +745,6 @@ void PageReader::skip(int64_t numRows) {
booleanDecoder_->skip(toSkip);
} else if (deltaBpDecoder_) {
deltaBpDecoder_->skip(toSkip);
} else if (timestampDecoder_) {
timestampDecoder_->skip(toSkip);
} else {
VELOX_FAIL("No decoder to skip");
}
Expand Down
8 changes: 0 additions & 8 deletions velox/dwio/parquet/reader/PageReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
#include "velox/dwio/parquet/reader/ParquetTypeWithId.h"
#include "velox/dwio/parquet/reader/RleBpDataDecoder.h"
#include "velox/dwio/parquet/reader/StringDecoder.h"
#include "velox/dwio/parquet/reader/TimestampDecoder.h"

#include <arrow/util/rle_encoding.h>

Expand Down Expand Up @@ -277,9 +276,6 @@ class PageReader {
} else if (encoding_ == thrift::Encoding::DELTA_BINARY_PACKED) {
nullsFromFastPath = false;
deltaBpDecoder_->readWithVisitor<true>(nulls, visitor);
} else if (timestampDecoder_) {
timestampDecoder_->readWithVisitor<true>(
nulls, visitor, nullsFromFastPath);
} else {
directDecoder_->readWithVisitor<true>(
nulls, visitor, nullsFromFastPath);
Expand All @@ -290,9 +286,6 @@ class PageReader {
dictionaryIdDecoder_->readWithVisitor<false>(nullptr, dictVisitor);
} else if (encoding_ == thrift::Encoding::DELTA_BINARY_PACKED) {
deltaBpDecoder_->readWithVisitor<false>(nulls, visitor);
} else if (timestampDecoder_) {
timestampDecoder_->readWithVisitor<false>(
nulls, visitor, nullsFromFastPath);
} else {
directDecoder_->readWithVisitor<false>(
nulls, visitor, !this->type_->type()->isShortDecimal());
Expand Down Expand Up @@ -496,7 +489,6 @@ class PageReader {
std::unique_ptr<StringDecoder> stringDecoder_;
std::unique_ptr<BooleanDecoder> booleanDecoder_;
std::unique_ptr<DeltaBpDecoder> deltaBpDecoder_;
std::unique_ptr<TimestampDecoder> timestampDecoder_;
// Add decoders for other encodings here.
};

Expand Down
93 changes: 0 additions & 93 deletions velox/dwio/parquet/reader/TimestampDecoder.h

This file was deleted.

0 comments on commit d6c65cf

Please sign in to comment.