Skip to content

Commit

Permalink
PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
mskapilks committed Feb 7, 2024
1 parent b1570f1 commit 630f96c
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 39 deletions.
8 changes: 4 additions & 4 deletions velox/dwio/common/TimestampDecoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@

namespace facebook::velox::dwio::common {

enum TIMESTAMP_PRECISION { MILLIS, MICROS };
enum TimestampPrecision { kMillis, kMicros };

class TimestampDecoder : public DirectDecoder<false> {
public:
TimestampDecoder(
TIMESTAMP_PRECISION precision,
TimestampPrecision precision,
std::unique_ptr<dwio::common::SeekableInputStream> input,
bool useVInts,
uint32_t numBytes,
Expand Down Expand Up @@ -62,7 +62,7 @@ class TimestampDecoder : public DirectDecoder<false> {
}
if constexpr (std::is_same_v<typename Visitor::DataType, int128_t>) {
auto units = IntDecoder<false>::template readInt<int64_t>();
Timestamp timestamp = precision_ == TIMESTAMP_PRECISION::MILLIS
Timestamp timestamp = precision_ == TimestampPrecision::kMillis
? util::fromUTCMillis(units)
: util::fromUTCMicros(units);

Expand All @@ -85,6 +85,6 @@ class TimestampDecoder : public DirectDecoder<false> {
}

private:
TIMESTAMP_PRECISION precision_;
TimestampPrecision precision_;
};
} // namespace facebook::velox::dwio::common
58 changes: 24 additions & 34 deletions velox/dwio/parquet/reader/PageReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -336,8 +336,10 @@ void PageReader::prepareDictionary(const PageHeader& pageHeader) {
? sizeof(float)
: sizeof(double);
auto numBytes = dictionary_.numValues * typeSize;
if (type_->type()->isShortDecimal() &&
parquetType == thrift::Type::INT32) {
if ((type_->type()->isShortDecimal() &&
parquetType == thrift::Type::INT32) ||
(type_->type()->isTimestamp() &&
parquetType == thrift::Type::INT64)) {
auto veloxTypeLength = type_->type()->cppSizeInBytes();
auto numVeloxBytes = dictionary_.numValues * veloxTypeLength;
dictionary_.values =
Expand Down Expand Up @@ -365,13 +367,13 @@ void PageReader::prepareDictionary(const PageHeader& pageHeader) {
values[i] = parquetValues[i];
}
} else if (
parquetType == thrift::Type::INT64 &&
type_->logicalType_.has_value()) {
type_->type()->isTimestamp() && parquetType == thrift::Type::INT64) {
VELOX_CHECK(type_->logicalType_.has_value());
auto logicalType = type_->logicalType_.value();
if (logicalType.__isset.TIMESTAMP) {
VELOX_CHECK(
logicalType.TIMESTAMP.isAdjustedToUTC,
"Only UTC adjusted Timestamp is supported");
"Only UTC adjusted Timestamp is supported.");
auto values = dictionary_.values->asMutable<Timestamp>();
auto parquetValues = dictionary_.values->asMutable<char>();
int64_t units;
Expand All @@ -387,7 +389,7 @@ void PageReader::prepareDictionary(const PageHeader& pageHeader) {
values[i] = util::fromUTCMillis(units);
}
} else {
VELOX_NYI("Unsupported timestamp unit");
VELOX_NYI("Nano Timestamp unit is not supported.");
}
}
}
Expand Down Expand Up @@ -654,39 +656,27 @@ void PageReader::makeDecoder() {
true);
break;
case thrift::Type::INT64:
if (type_->logicalType_.has_value()) {
if (type_->logicalType_.has_value() &&
type_->logicalType_.value().__isset.TIMESTAMP) {
auto logicalType = type_->logicalType_.value();
if (logicalType.__isset.TIMESTAMP) {
VELOX_CHECK(
logicalType.TIMESTAMP.isAdjustedToUTC,
"Only UTC adjusted Timestamp is supported");
if (logicalType.TIMESTAMP.unit.__isset.MICROS) {
timestampDecoder_ = std::make_unique<
dwio::common::TimestampDecoder>(
dwio::common::TIMESTAMP_PRECISION::MICROS,
std::make_unique<dwio::common::SeekableArrayInputStream>(
pageData_, encodedDataSize_),
false,
parquetTypeBytes(type_->parquetType_.value()));
} else if (logicalType.TIMESTAMP.unit.__isset.MILLIS) {
timestampDecoder_ = std::make_unique<
dwio::common::TimestampDecoder>(
dwio::common::TIMESTAMP_PRECISION::MILLIS,

VELOX_CHECK(
logicalType.TIMESTAMP.isAdjustedToUTC,
"Only UTC adjusted Timestamp is supported.");
VELOX_CHECK(
!logicalType.TIMESTAMP.unit.__isset.NANOS,
"Nano Timestamp unit not supported.");

auto precisionUnit = logicalType.TIMESTAMP.unit.__isset.MICROS
? dwio::common::TimestampPrecision::kMicros
: dwio::common::TimestampPrecision::kMillis;
timestampDecoder_ =
std::make_unique<dwio::common::TimestampDecoder>(
precisionUnit,
std::make_unique<dwio::common::SeekableArrayInputStream>(
pageData_, encodedDataSize_),
false,
parquetTypeBytes(type_->parquetType_.value()));
} else {
VELOX_NYI("Timestamp unit not supported");
}
} else {
directDecoder_ =
std::make_unique<dwio::common::DirectDecoder<true>>(
std::make_unique<dwio::common::SeekableArrayInputStream>(
pageData_, encodedDataSize_),
false,
parquetTypeBytes(type_->parquetType_.value()));
}
} else {
directDecoder_ =
std::make_unique<dwio::common::DirectDecoder<true>>(
Expand Down
2 changes: 1 addition & 1 deletion velox/dwio/parquet/reader/ParquetReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ std::shared_ptr<const ParquetTypeWithId> ReaderBase::getParquetColumnInfo(
unit.__set_MILLIS(millis);
} else {
VELOX_NYI(
"{} Timestamp unit not supported", schemaElement.converted_type);
"{} Timestamp unit not supported.", schemaElement.converted_type);
}

timestamp.__set_unit(unit);
Expand Down

0 comments on commit 630f96c

Please sign in to comment.