Skip to content

Commit

Permalink
Fixed UUID serde to match Presto Java
Browse files Browse the repository at this point in the history
Summary:
This fixes the PrestoSerializer to put UUID values in the correct format that
is expected by Presto Java so that the values will match those from a Java
worker. First, when converting UUID to/from string, the values are no longer in
big endian format (as taken from boost::uuid) and are instead stored as a
little endian in an int128_t. Secondly, Presto Java will read UUID values from
an Int128ArrayBlock with the first value as the most significant bits. To
correct this, the upper/lower parts of the int128_t are swapped during
serialization/deserialization.

A unit test for checking roundtrip UUID serializaiton was added and manual
testing of Presto with a native worker to verify the problem from the issue
description is fixed.

From prestodb/presto#23311
  • Loading branch information
BryanCutler committed Nov 13, 2024
1 parent aba7ba8 commit da0e6cb
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 9 deletions.
2 changes: 1 addition & 1 deletion velox/functions/prestosql/tests/UuidFunctionsTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ TEST_F(UuidFunctionsTest, castAsVarchar) {
// Verify that CAST results as the same as boost::lexical_cast. We do not use
// boost::lexical_cast to implement CAST because it is too slow.
auto expected = makeFlatVector<std::string>(size, [&](auto row) {
const auto uuid = uuids->valueAt(row);
const auto uuid = DecimalUtil::big(uuids->valueAt(row));

boost::uuids::uuid u;
memcpy(&u, &uuid, 16);
Expand Down
6 changes: 5 additions & 1 deletion velox/functions/prestosql/types/UuidType.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ class UuidCastOperator : public exec::CastOperator {
const auto* uuids = input.as<SimpleVector<int128_t>>();

context.applyToSelectedNoThrow(rows, [&](auto row) {
const auto uuid = uuids->valueAt(row);
// Ensure UUID bytes are big endian when building the string.
const auto uuid = DecimalUtil::big(uuids->valueAt(row));

const uint8_t* uuidBytes = reinterpret_cast<const uint8_t*>(&uuid);

Expand Down Expand Up @@ -127,6 +128,9 @@ class UuidCastOperator : public exec::CastOperator {
int128_t u;
memcpy(&u, &uuid, 16);

// Convert a big endian value from Boost to native byte-order.
u = DecimalUtil::big(u);

flatResult->set(row, u);
});
}
Expand Down
97 changes: 90 additions & 7 deletions velox/serializers/PrestoSerializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "velox/common/base/IOUtils.h"
#include "velox/common/base/RawVector.h"
#include "velox/common/memory/ByteStream.h"
#include "velox/functions/prestosql/types/UuidType.h"
#include "velox/vector/BiasVector.h"
#include "velox/vector/ComplexVector.h"
#include "velox/vector/DictionaryVector.h"
Expand Down Expand Up @@ -426,19 +427,55 @@ void readDecimalValues(
if (nullCount) {
checkValuesSize<int128_t>(values, nulls, size, offset);

vector_size_t toClear = offset;
bits::forEachSetBit(
nulls->as<uint64_t>(), offset, offset + size, [&](vector_size_t row) {
// Set the values between the last non-null and this to type default.
for (; toClear < row; ++toClear) {
rawValues[toClear] = 0;
}
rawValues[row] = readJavaDecimal(source);
toClear = row + 1;
});
} else {
for (vector_size_t row = 0; row < size; ++row) {
rawValues[offset + row] = readJavaDecimal(source);
}
}
}

int128_t readUuidValue(ByteInputStream* source) {
// ByteInputStream does not support reading int128_t values.
// UUIDs are serialized as 2 uint64 values with msb value first.
auto high = folly::Endian::big(source->read<uint64_t>());
auto low = folly::Endian::big(source->read<uint64_t>());
return HugeInt::build(high, low);
}

void readUuidValues(
ByteInputStream* source,
vector_size_t size,
vector_size_t offset,
const BufferPtr& nulls,
vector_size_t nullCount,
const BufferPtr& values) {
auto rawValues = values->asMutable<int128_t>();
if (nullCount) {
checkValuesSize<int128_t>(values, nulls, size, offset);

int32_t toClear = offset;
bits::forEachSetBit(
nulls->as<uint64_t>(), offset, offset + size, [&](int32_t row) {
// Set the values between the last non-null and this to type default.
for (; toClear < row; ++toClear) {
rawValues[toClear] = 0;
}
rawValues[row] = readJavaDecimal(source);
rawValues[row] = readUuidValue(source);
toClear = row + 1;
});
} else {
for (int32_t row = 0; row < size; ++row) {
rawValues[offset + row] = readJavaDecimal(source);
rawValues[offset + row] = readUuidValue(source);
}
}
}
Expand Down Expand Up @@ -566,6 +603,16 @@ void read(
values);
return;
}
if (isUuidType(type)) {
readUuidValues(
source,
numNewValues,
resultOffset,
flatResult->nulls(),
nullCount,
values);
return;
}
readValues<T>(
source,
numNewValues,
Expand Down Expand Up @@ -1365,6 +1412,7 @@ class VectorStream {
useLosslessTimestamp_(opts.useLosslessTimestamp),
nullsFirst_(opts.nullsFirst),
isLongDecimal_(type_->isLongDecimal()),
isUuid_(isUuidType(type_)),
opts_(opts),
encoding_(getEncoding(encoding, vector)),
nulls_(streamArena, true, true),
Expand Down Expand Up @@ -1711,6 +1759,10 @@ class VectorStream {
return isLongDecimal_;
}

bool isUuid() const {
return isUuid_;
}

void clear() {
encoding_ = std::nullopt;
initializeHeader(typeToEncodingName(type_), *streamArena_);
Expand Down Expand Up @@ -1789,6 +1841,7 @@ class VectorStream {
const bool useLosslessTimestamp_;
const bool nullsFirst_;
const bool isLongDecimal_;
const bool isUuid_;
const SerdeOpts opts_;
std::optional<VectorEncoding::Simple> encoding_;
int32_t nonNullCount_{0};
Expand Down Expand Up @@ -1846,12 +1899,21 @@ FOLLY_ALWAYS_INLINE int128_t toJavaDecimalValue(int128_t value) {
return value;
}

FOLLY_ALWAYS_INLINE int128_t toJavaUuidValue(int128_t value) {
// Presto Java UuidType expects 2 long values with MSB first.
// int128 will be serialized with [lower, upper], so swap here
// to adjust the order.
return DecimalUtil::big(value);
}

template <>
void VectorStream::append(folly::Range<const int128_t*> values) {
for (auto& value : values) {
int128_t val = value;
if (isLongDecimal_) {
val = toJavaDecimalValue(value);
} else if (isUuid_) {
val = toJavaUuidValue(value);
}
values_.append<int128_t>(folly::Range(&val, 1));
}
Expand Down Expand Up @@ -2396,14 +2458,22 @@ void copyWords(
const int32_t* indices,
int32_t numIndices,
const T* values,
bool isLongDecimal = false) {
bool isLongDecimal = false,
bool isUuid = false) {
if (std::is_same_v<T, int128_t> && isLongDecimal) {
for (auto i = 0; i < numIndices; ++i) {
reinterpret_cast<int128_t*>(destination)[i] = toJavaDecimalValue(
reinterpret_cast<const int128_t*>(values)[indices[i]]);
}
return;
}
if (std::is_same_v<T, int128_t> && isUuid) {
for (auto i = 0; i < numIndices; ++i) {
reinterpret_cast<int128_t*>(destination)[i] = toJavaUuidValue(
reinterpret_cast<const int128_t*>(values)[indices[i]]);
}
return;
}
for (auto i = 0; i < numIndices; ++i) {
destination[i] = values[indices[i]];
}
Expand All @@ -2416,9 +2486,10 @@ void copyWordsWithRows(
const int32_t* indices,
int32_t numIndices,
const T* values,
bool isLongDecimal = false) {
bool isLongDecimal = false,
bool isUuid = false) {
if (!indices) {
copyWords(destination, rows, numIndices, values, isLongDecimal);
copyWords(destination, rows, numIndices, values, isLongDecimal, isUuid);
return;
}
if (std::is_same_v<T, int128_t> && isLongDecimal) {
Expand All @@ -2427,6 +2498,12 @@ void copyWordsWithRows(
reinterpret_cast<const int128_t*>(values)[rows[indices[i]]]);
}
return;
} else if (std::is_same_v<T, int128_t> && isUuid) {
for (auto i = 0; i < numIndices; ++i) {
reinterpret_cast<int128_t*>(destination)[i] = toJavaUuidValue(
reinterpret_cast<const int128_t*>(values)[rows[indices[i]]]);
}
return;
}
for (auto i = 0; i < numIndices; ++i) {
destination[i] = values[rows[indices[i]]];
Expand Down Expand Up @@ -2488,7 +2565,8 @@ void appendNonNull(
nonNullIndices,
numNonNull,
values,
stream->isLongDecimal());
stream->isLongDecimal(),
stream->isUuid());
}
}

Expand Down Expand Up @@ -2581,7 +2659,12 @@ void serializeFlatVector(
AppendWindow<T> window(stream->values(), scratch);
T* output = window.get(rows.size());
copyWords(
output, rows.data(), rows.size(), rawValues, stream->isLongDecimal());
output,
rows.data(),
rows.size(),
rawValues,
stream->isLongDecimal(),
stream->isUuid());
return;
}

Expand Down
14 changes: 14 additions & 0 deletions velox/serializers/tests/PrestoSerializerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
#include "velox/serializers/PrestoSerializer.h"
#include <folly/Random.h>
#include <functions/prestosql/types/UuidType.h>
#include <gtest/gtest.h>
#include <vector>
#include "velox/common/base/tests/GTestUtils.h"
Expand Down Expand Up @@ -1090,6 +1091,19 @@ TEST_P(PrestoSerializerTest, longDecimal) {
testRoundTrip(vector);
}

TEST_P(PrestoSerializerTest, uuid) {
auto vector = makeFlatVector<int128_t>(
200, [](vector_size_t row) { return (int128_t)0xD1 << row % 120; });

testRoundTrip(vector);

// Add some nulls.
for (auto i = 0; i < vector->size(); i += 7) {
vector->setNull(i, true);
}
testRoundTrip(vector);
}

// Test that hierarchically encoded columns (rows) have their encodings
// preserved by the PrestoBatchVectorSerializer.
TEST_P(PrestoSerializerTest, encodingsBatchVectorSerializer) {
Expand Down
10 changes: 10 additions & 0 deletions velox/type/DecimalUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,4 +125,14 @@ int32_t DecimalUtil::maxStringViewSize(int precision, int scale) {
return rowSize;
}

int128_t DecimalUtil::big(int128_t value) {
if (folly::kIsLittleEndian) {
auto upper = folly::Endian::big(HugeInt::upper(value));
auto lower = folly::Endian::big(HugeInt::lower(value));
return HugeInt::build(lower, upper);
} else {
return value;
}
}

} // namespace facebook::velox
7 changes: 7 additions & 0 deletions velox/type/DecimalUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,13 @@ class DecimalUtil {
/// @return The length of out.
static int32_t toByteArray(int128_t value, char* out);

/// Reverse byte order of an int128_t, if native byte-order is little endian.
/// If native byte-order is big endian, the value will be unchanged. This
/// is similar to folly::Endian::big(), which does not support int128_t.
///
/// \return A value with reversed byte-order for little endian platforms.
static int128_t big(int128_t value);

static constexpr __uint128_t kOverflowMultiplier = ((__uint128_t)1 << 127);
}; // DecimalUtil
} // namespace facebook::velox

0 comments on commit da0e6cb

Please sign in to comment.