Skip to content

Commit

Permalink
Add option to disable flatenning optimization in PrestoSerializer (fa…
Browse files Browse the repository at this point in the history
…cebookincubator#11465)

Summary:
Pull Request resolved: facebookincubator#11465

This change introduces an option, "preserveEncodings" to the SerDe
Options, to disable the flattening optimization in the
PrestoBatchVectorSerializer, enabling support for use-cases
where encodings need to be preserved between the serialization
and deserialization steps.

Note:
There are still 2 cases where encodings can be lost:
1. Nulls in dictionary layer: Presto page format does not support having
nulls so this gets flattened
2. If there are multiple levels of dictionary or a constant underneath:
Currently, the presto serializer only preserves the top most layer and
flattens everything underneath it.

Reviewed By: kevinwilfong, kagamiori

Differential Revision: D65569946

fbshipit-source-id: 5e38dce14d8741b504d8adc6159551d89af663c9
  • Loading branch information
Bikramjeet Vig authored and facebook-github-bot committed Nov 8, 2024
1 parent b146dee commit 5544d0a
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 58 deletions.
46 changes: 26 additions & 20 deletions velox/serializers/PrestoSerializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1412,7 +1412,7 @@ class VectorStream {
// For fix width types that are smaller than int32_t (the type for
// indexes into the dictionary) dictionary encoding increases the
// size, so we should flatten it.
if (type->isFixedWidth() &&
if (!preserveEncodings() && type->isFixedWidth() &&
type->cppSizeInBytes() <= sizeof(int32_t)) {
encoding_ = std::nullopt;
break;
Expand Down Expand Up @@ -1596,6 +1596,10 @@ class VectorStream {
return isConstantStream_;
}

bool preserveEncodings() const {
return opts_.preserveEncodings;
}

VectorStream* childAt(int32_t index) {
return children_[index].get();
}
Expand Down Expand Up @@ -2128,31 +2132,33 @@ void serializeDictionaryVector(
auto numUsed = computeSelectedIndices(
dictionaryVector, ranges, scratch, mutableSelectedIndices);

// If the values are fixed width and we aren't getting enough reuse to justify
// the dictionary, flatten it.
// For variable width types, rather than iterate over them computing their
// size, we simply assume we'll get a benefit.
if constexpr (TypeTraits<Kind>::isFixedWidth) {
// This calculation admittdely ignores some constants, but if they really
// make a difference, they're small so there's not much difference either
// way.
if (numUsed * vector->type()->cppSizeInBytes() +
numRows * sizeof(int32_t) >=
numRows * vector->type()->cppSizeInBytes()) {
if (!stream->preserveEncodings()) {
// If the values are fixed width and we aren't getting enough reuse to
// justify the dictionary, flatten it. For variable width types, rather than
// iterate over them computing their size, we simply assume we'll get a
// benefit.
if constexpr (TypeTraits<Kind>::isFixedWidth) {
// This calculation admittdely ignores some constants, but if they really
// make a difference, they're small so there's not much difference either
// way.
if (numUsed * vector->type()->cppSizeInBytes() +
numRows * sizeof(int32_t) >=
numRows * vector->type()->cppSizeInBytes()) {
stream->flattenStream(vector, numRows);
serializeWrapped(vector, ranges, stream, scratch);
return;
}
}

// If every element is unique the dictionary isn't giving us any benefit,
// flatten it.
if (numUsed == numRows) {
stream->flattenStream(vector, numRows);
serializeWrapped(vector, ranges, stream, scratch);
return;
}
}

// If every element is unique the dictionary isn't giving us any benefit,
// flatten it.
if (numUsed == numRows) {
stream->flattenStream(vector, numRows);
serializeWrapped(vector, ranges, stream, scratch);
return;
}

// Serialize the used elements from the Dictionary.
serializeColumn(
dictionaryVector->valueVector(),
Expand Down
11 changes: 9 additions & 2 deletions velox/serializers/PrestoSerializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,12 @@ class PrestoVectorSerde : public VectorSerde {
PrestoOptions(
bool _useLosslessTimestamp,
common::CompressionKind _compressionKind,
bool _nullsFirst = false)
bool _nullsFirst = false,
bool _preserveEncodings = false)
: VectorSerde::Options(_compressionKind),
useLosslessTimestamp(_useLosslessTimestamp),
nullsFirst(_nullsFirst) {}
nullsFirst(_nullsFirst),
preserveEncodings(_preserveEncodings) {}

/// Currently presto only supports millisecond precision and the serializer
/// converts velox native timestamp to that resulting in loss of precision.
Expand All @@ -72,6 +74,11 @@ class PrestoVectorSerde : public VectorSerde {
/// than this causes subsequent compression attempts to be skipped. The more
/// times compression misses the target the less frequently it is tried.
float minCompressionRatio{0.8};

/// If true, the serializer will not employ any optimizations that can
/// affect the encoding of the input vectors. This is only relevant when
/// using BatchVectorSerializer.
bool preserveEncodings{false};
};

/// Adds the serialized sizes of the rows of 'vector' in 'ranges[i]' to
Expand Down
91 changes: 55 additions & 36 deletions velox/serializers/tests/PrestoSerializerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,10 @@ class PrestoSerializerTest
common::CompressionKind kind = GetParam();
const bool nullsFirst =
serdeOptions == nullptr ? false : serdeOptions->nullsFirst;
const bool preserveEncodings =
serdeOptions == nullptr ? false : serdeOptions->preserveEncodings;
serializer::presto::PrestoVectorSerde::PrestoOptions paramOptions{
useLosslessTimestamp, kind, nullsFirst};
useLosslessTimestamp, kind, nullsFirst, preserveEncodings};

return paramOptions;
}
Expand Down Expand Up @@ -1157,41 +1159,58 @@ TEST_P(PrestoSerializerTest, dictionaryEncodingTurnedOff) {
BaseVector::wrapInDictionary(nullptr, allIndices, 32, stringBase),
});

std::ostringstream out;
serializeBatch(rows, &out, /*serdeOptions=*/nullptr);
const auto serialized = out.str();

auto rowType = asRowType(rows->type());
auto deserialized =
deserialize(rowType, serialized, /*serdeOptions=*/nullptr);

assertEqualVectors(rows, deserialized);

// smallInt + one index
ASSERT_EQ(deserialized->childAt(0)->encoding(), VectorEncoding::Simple::FLAT);
// int + one index
ASSERT_EQ(deserialized->childAt(1)->encoding(), VectorEncoding::Simple::FLAT);
// bigint + one index
ASSERT_EQ(
deserialized->childAt(2)->encoding(), VectorEncoding::Simple::DICTIONARY);
// bigint + quarter indices
ASSERT_EQ(
deserialized->childAt(3)->encoding(), VectorEncoding::Simple::DICTIONARY);
// bigint + all but one indices
ASSERT_EQ(deserialized->childAt(4)->encoding(), VectorEncoding::Simple::FLAT);
// bigint + all indices
ASSERT_EQ(deserialized->childAt(5)->encoding(), VectorEncoding::Simple::FLAT);
// string + one index
ASSERT_EQ(
deserialized->childAt(6)->encoding(), VectorEncoding::Simple::DICTIONARY);
// string + quarter indices
ASSERT_EQ(
deserialized->childAt(7)->encoding(), VectorEncoding::Simple::DICTIONARY);
// string + all but one indices
ASSERT_EQ(
deserialized->childAt(8)->encoding(), VectorEncoding::Simple::DICTIONARY);
// string + all indices
ASSERT_EQ(deserialized->childAt(9)->encoding(), VectorEncoding::Simple::FLAT);
for (bool preserveEncodings : {false, true}) {
SCOPED_TRACE(fmt::format("preserveEncodings: {}", preserveEncodings));
auto exptectedTransformedEncoding = preserveEncodings
? VectorEncoding::Simple::DICTIONARY
: VectorEncoding::Simple::FLAT;
serializer::presto::PrestoVectorSerde::PrestoOptions serdeOptions;
serdeOptions.preserveEncodings = preserveEncodings;
std::ostringstream out;
serializeBatch(rows, &out, &serdeOptions);
const auto serialized = out.str();

auto rowType = asRowType(rows->type());
auto deserialized = deserialize(rowType, serialized, &serdeOptions);

assertEqualVectors(rows, deserialized);

// smallInt + one index
ASSERT_EQ(
deserialized->childAt(0)->encoding(), exptectedTransformedEncoding);
// int + one index
ASSERT_EQ(
deserialized->childAt(1)->encoding(), exptectedTransformedEncoding);
// bigint + one index
ASSERT_EQ(
deserialized->childAt(2)->encoding(),
VectorEncoding::Simple::DICTIONARY);
// bigint + quarter indices
ASSERT_EQ(
deserialized->childAt(3)->encoding(),
VectorEncoding::Simple::DICTIONARY);
// bigint + all but one indices
ASSERT_EQ(
deserialized->childAt(4)->encoding(), exptectedTransformedEncoding);
// bigint + all indices
ASSERT_EQ(
deserialized->childAt(5)->encoding(), exptectedTransformedEncoding);
// string + one index
ASSERT_EQ(
deserialized->childAt(6)->encoding(),
VectorEncoding::Simple::DICTIONARY);
// string + quarter indices
ASSERT_EQ(
deserialized->childAt(7)->encoding(),
VectorEncoding::Simple::DICTIONARY);
// string + all but one indices
ASSERT_EQ(
deserialized->childAt(8)->encoding(),
VectorEncoding::Simple::DICTIONARY);
// string + all indices
ASSERT_EQ(
deserialized->childAt(9)->encoding(), exptectedTransformedEncoding);
}
}

TEST_P(PrestoSerializerTest, emptyVectorBatchVectorSerializer) {
Expand Down

0 comments on commit 5544d0a

Please sign in to comment.