Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for plain and dictionary encoded INT64 timestamp in parquet files #8325

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions velox/dwio/common/tests/utils/DataSetBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,36 @@ DataSetBuilder& DataSetBuilder::makeMapStringValues(
return *this;
}

void DataSetBuilder::adjustTimestampToPrecision(
VectorPtr batch,
TimestampPrecision precision) {
auto type = batch->type();

if (type->kind() == TypeKind::TIMESTAMP) {
auto rawValues =
batch->asUnchecked<FlatVector<Timestamp>>()->mutableRawValues();
for (auto i = 0; i < batch->size(); ++i) {
if (batch->isNullAt(i)) {
continue;
}

rawValues[i].toPrecision(precision);
}
} else if (type->kind() == TypeKind::ROW) {
for (auto& child : batch->as<RowVector>()->children()) {
adjustTimestampToPrecision(child, precision);
}
}
}

DataSetBuilder& DataSetBuilder::adjustTimestampToPrecision(
TimestampPrecision precision) {
for (auto& batch : *batches_) {
adjustTimestampToPrecision(batch, precision);
}
return *this;
}

std::unique_ptr<std::vector<RowVectorPtr>> DataSetBuilder::build() {
return std::move(batches_);
}
Expand Down
5 changes: 5 additions & 0 deletions velox/dwio/common/tests/utils/DataSetBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ class DataSetBuilder {
// groups. Tests skipping row groups based on row group stats.
DataSetBuilder& withRowGroupSpecificData(int32_t numRowsPerGroup);

DataSetBuilder& adjustTimestampToPrecision(TimestampPrecision precision);
void adjustTimestampToPrecision(
VectorPtr batch,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

VectorPtr& batch

TimestampPrecision precision);

// Makes all data in 'batches_' after firstRow non-null. This finds a sampling
// of non-null values from each column and replaces nulls in the column in
// question with one of these. A column where only nulls are found in sampling
Expand Down
17 changes: 13 additions & 4 deletions velox/dwio/parquet/reader/ParquetColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,19 @@ std::unique_ptr<dwio::common::SelectiveColumnReader> ParquetColumnReader::build(
return std::make_unique<BooleanColumnReader>(
requestedType, fileType, params, scanSpec);

case TypeKind::TIMESTAMP:
return std::make_unique<TimestampColumnReader>(
requestedType, fileType, params, scanSpec);

case TypeKind::TIMESTAMP: {
auto& parquetFileType = static_cast<const ParquetTypeWithId&>(*fileType);
auto logicalTypeOpt = parquetFileType.logicalType_;
if (logicalTypeOpt.has_value() &&
logicalTypeOpt.value().__isset.TIMESTAMP &&
parquetFileType.parquetType_ == thrift::Type::INT64) {
return std::make_unique<TimestampInt64ColumnReader>(
requestedType, fileType, params, scanSpec);
} else {
return std::make_unique<TimestampInt96ColumnReader>(
requestedType, fileType, params, scanSpec);
}
}
default:
VELOX_FAIL(
"buildReader unhandled type: " +
Expand Down
40 changes: 36 additions & 4 deletions velox/dwio/parquet/reader/ParquetReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ class ReaderBase {
TypePtr convertType(
const thrift::SchemaElement& schemaElement,
const TypePtr& requestedType) const;

thrift::LogicalType getTimestampLogicalType(
thrift::ConvertedType::type type) const;

template <typename T>
static std::shared_ptr<const RowType> createRowType(
Expand Down Expand Up @@ -551,13 +554,19 @@ std::unique_ptr<ParquetTypeWithId> ReaderBase::getParquetColumnInfo(
int32_t precision =
schemaElement.__isset.precision ? schemaElement.precision : 0;
int32_t scale = schemaElement.__isset.scale ? schemaElement.scale : 0;
int32_t type_length =
int32_t typeLength =
schemaElement.__isset.type_length ? schemaElement.type_length : 0;
std::vector<std::unique_ptr<dwio::common::TypeWithId>> children;
const std::optional<thrift::LogicalType> logicalType_ =
std::optional<thrift::LogicalType> logicalType =
schemaElement.__isset.logicalType
? std::optional<thrift::LogicalType>(schemaElement.logicalType)
: std::nullopt;

if (veloxType->kind() == TypeKind::TIMESTAMP &&
mskapilks marked this conversation as resolved.
Show resolved Hide resolved
schemaElement.type == thrift::Type::INT64 && !logicalType.has_value()) {
logicalType = getTimestampLogicalType(schemaElement.converted_type);
}

auto leafTypePtr = std::make_unique<ParquetTypeWithId>(
veloxType,
std::move(children),
Expand All @@ -566,14 +575,14 @@ std::unique_ptr<ParquetTypeWithId> ReaderBase::getParquetColumnInfo(
columnIdx++,
name,
schemaElement.type,
logicalType_,
logicalType,
maxRepeat,
maxDefine,
isOptional,
isRepeated,
precision,
scale,
type_length);
typeLength);

if (schemaElement.repetition_type ==
thrift::FieldRepetitionType::REPEATED) {
Expand Down Expand Up @@ -602,6 +611,29 @@ std::unique_ptr<ParquetTypeWithId> ReaderBase::getParquetColumnInfo(
return nullptr;
}

thrift::LogicalType ReaderBase::getTimestampLogicalType(
thrift::ConvertedType::type convertedType) const {
thrift::TimestampType timestamp;
timestamp.__set_isAdjustedToUTC(true);
thrift::TimeUnit unit;

if (convertedType == thrift::ConvertedType::TIMESTAMP_MICROS) {
thrift::MicroSeconds micros;
unit.__set_MICROS(micros);
} else if (convertedType == thrift::ConvertedType::TIMESTAMP_MILLIS) {
thrift::MilliSeconds millis;
unit.__set_MILLIS(millis);
} else {
VELOX_NYI("{} Timestamp unit not supported.", convertedType);
}

timestamp.__set_unit(unit);
thrift::LogicalType logicalType;
logicalType.__set_TIMESTAMP(timestamp);

return logicalType;
}

TypePtr ReaderBase::convertType(
const thrift::SchemaElement& schemaElement,
const TypePtr& requestedType) const {
Expand Down
200 changes: 185 additions & 15 deletions velox/dwio/parquet/reader/TimestampColumnReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@

namespace facebook::velox::parquet {

class TimestampColumnReader : public IntegerColumnReader {
class TimestampInt96ColumnReader : public IntegerColumnReader {
public:
TimestampColumnReader(
TimestampInt96ColumnReader(
const TypePtr& requestedType,
std::shared_ptr<const dwio::common::TypeWithId> fileType,
ParquetParams& params,
Expand All @@ -49,19 +49,7 @@ class TimestampColumnReader : public IntegerColumnReader {
if (resultVector->isNullAt(i)) {
continue;
}
const auto timestamp = rawValues[i];
uint64_t nanos = timestamp.getNanos();
switch (timestampPrecision_) {
case TimestampPrecision::kMilliseconds:
nanos = nanos / 1'000'000 * 1'000'000;
break;
case TimestampPrecision::kMicroseconds:
nanos = nanos / 1'000 * 1'000;
break;
case TimestampPrecision::kNanoseconds:
break;
}
rawValues[i] = Timestamp(timestamp.getSeconds(), nanos);
rawValues[i].toPrecision(timestampPrecision_);
}
}

Expand All @@ -82,4 +70,186 @@ class TimestampColumnReader : public IntegerColumnReader {
TimestampPrecision timestampPrecision_;
};

class TimestampInt64ColumnReader : public IntegerColumnReader {
public:
TimestampInt64ColumnReader(
const TypePtr& requestedType,
std::shared_ptr<const dwio::common::TypeWithId> fileType,
ParquetParams& params,
common::ScanSpec& scanSpec)
: IntegerColumnReader(requestedType, fileType, params, scanSpec),
timestampPrecision_(params.timestampPrecision()) {
auto& parquetFileType = static_cast<const ParquetTypeWithId&>(*fileType_);
auto logicalTypeOpt = parquetFileType.logicalType_;
VELOX_CHECK(logicalTypeOpt.has_value());

auto logicalType = logicalTypeOpt.value();
VELOX_CHECK(logicalType.__isset.TIMESTAMP);

if (!logicalType.TIMESTAMP.isAdjustedToUTC) {
VELOX_NYI("Only UTC adjusted Timestamp is supported.");
}

if (logicalType.TIMESTAMP.unit.__isset.MICROS) {
parquetTimestampPrecision_ = TimestampPrecision::kMicroseconds;
} else if (logicalType.TIMESTAMP.unit.__isset.MILLIS) {
parquetTimestampPrecision_ = TimestampPrecision::kMilliseconds;
} else {
VELOX_NYI("Nano Timestamp unit is not supported.");
}
}

bool hasBulkPath() const override {
return true;
}

void
processNulls(const bool isNull, const RowSet rows, const uint64_t* rawNulls) {
if (!rawNulls) {
return;
}
auto rawTs = values_->asMutable<Timestamp>();

returnReaderNulls_ = false;
anyNulls_ = !isNull;
allNull_ = isNull;
vector_size_t idx = 0;
for (vector_size_t i = 0; i < numValues_; i++) {
if (isNull) {
if (bits::isBitNull(rawNulls, i)) {
bits::setNull(rawResultNulls_, idx);
addOutputRow(rows[i]);
idx++;
}
} else {
if (!bits::isBitNull(rawNulls, i)) {
bits::setNull(rawResultNulls_, idx, false);
rawTs[idx] = rawTs[i];
addOutputRow(rows[i]);
idx++;
}
}
}
}

void processFilter(
const common::Filter* filter,
const RowSet rows,
const uint64_t* rawNulls) {
auto rawTs = values_->asMutable<Timestamp>();

returnReaderNulls_ = false;
anyNulls_ = false;
allNull_ = true;
vector_size_t idx = 0;
for (vector_size_t i = 0; i < numValues_; i++) {
if (rawNulls && bits::isBitNull(rawNulls, i)) {
if (filter->testNull()) {
bits::setNull(rawResultNulls_, idx);
addOutputRow(rows[i]);
anyNulls_ = true;
idx++;
}
} else {
if (filter->testTimestamp(rawTs[i])) {
if (rawNulls) {
bits::setNull(rawResultNulls_, idx, false);
}
rawTs[idx] = rawTs[i];
addOutputRow(rows[i]);
allNull_ = false;
idx++;
}
}
}
}

void getValues(RowSet rows, VectorPtr* result) override {
getFlatValues<Timestamp, Timestamp>(rows, result, requestedType_);
}

template <bool isDense>
void readHelper(common::Filter* filter, RowSet rows) {
dwio::common::ExtractToReader extractValues(this);
common::AlwaysTrue alwaysTrue;
dwio::common::ColumnVisitor<
int64_t,
common::AlwaysTrue,
decltype(extractValues),
isDense>
visitor(alwaysTrue, this, rows, extractValues);
readWithVisitor(rows, visitor);

auto tsValues =
AlignedBuffer::allocate<Timestamp>(numValues_, &memoryPool_);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do it in-place instead of allocating new buffer for each batch?

auto rawTs = tsValues->asMutable<Timestamp>();
auto rawTsInt64 = values_->asMutable<int64_t>();

const auto rawNulls =
resultNulls() ? resultNulls()->as<uint64_t>() : nullptr;

Timestamp timestamp;
for (vector_size_t i = 0; i < numValues_; i++) {
if (!rawNulls || !bits::isBitNull(rawNulls, i)) {
if (parquetTimestampPrecision_ == TimestampPrecision::kMicroseconds) {
timestamp = Timestamp::fromMicros(rawTsInt64[i]);
} else {
timestamp = Timestamp::fromMillis(rawTsInt64[i]);
}
rawTs[i] = timestamp;
rawTs[i].toPrecision(timestampPrecision_);
}
}

values_ = tsValues;
rawValues_ = values_->asMutable<char>();
outputRows_.clear();

switch (
!filter ||
(filter->kind() == common::FilterKind::kIsNotNull && !rawNulls)
? common::FilterKind::kAlwaysTrue
: filter->kind()) {
case common::FilterKind::kAlwaysTrue:
// Simply add all rows to output.
for (vector_size_t i = 0; i < numValues_; i++) {
addOutputRow(rows[i]);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you need to do this, we use inputRows_ in case there is no filter

}
break;
case common::FilterKind::kIsNull:
processNulls(true, rows, rawNulls);
break;
case common::FilterKind::kIsNotNull:
processNulls(false, rows, rawNulls);
break;
case common::FilterKind::kTimestampRange:
case common::FilterKind::kMultiRange:
processFilter(filter, rows, rawNulls);
break;
default:
VELOX_UNSUPPORTED("Unsupported filter.");
}
}

void read(
vector_size_t offset,
RowSet rows,
const uint64_t* /*incomingNulls*/) override {
auto& data = formatData_->as<ParquetData>();
prepareRead<int64_t>(offset, rows, nullptr);

bool isDense = rows.back() == rows.size() - 1;
if (isDense) {
readHelper<true>(scanSpec_->filter(), rows);
} else {
readHelper<false>(scanSpec_->filter(), rows);
}

readOffset_ += rows.back() + 1;
}

private:
TimestampPrecision parquetTimestampPrecision_;
TimestampPrecision timestampPrecision_;
};
} // namespace facebook::velox::parquet
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Loading
Loading