diff --git a/velox/functions/prestosql/tests/UuidFunctionsTest.cpp b/velox/functions/prestosql/tests/UuidFunctionsTest.cpp index a66a9ff5adbb2..66e3f71ef1163 100644 --- a/velox/functions/prestosql/tests/UuidFunctionsTest.cpp +++ b/velox/functions/prestosql/tests/UuidFunctionsTest.cpp @@ -63,7 +63,7 @@ TEST_F(UuidFunctionsTest, castAsVarchar) { // Verify that CAST results as the same as boost::lexical_cast. We do not use // boost::lexical_cast to implement CAST because it is too slow. auto expected = makeFlatVector(size, [&](auto row) { - const auto uuid = uuids->valueAt(row); + const auto uuid = DecimalUtil::big(uuids->valueAt(row)); boost::uuids::uuid u; memcpy(&u, &uuid, 16); diff --git a/velox/functions/prestosql/types/UuidType.cpp b/velox/functions/prestosql/types/UuidType.cpp index 5aa420113a69d..bb9fa5d58cc05 100644 --- a/velox/functions/prestosql/types/UuidType.cpp +++ b/velox/functions/prestosql/types/UuidType.cpp @@ -75,7 +75,8 @@ class UuidCastOperator : public exec::CastOperator { const auto* uuids = input.as>(); context.applyToSelectedNoThrow(rows, [&](auto row) { - const auto uuid = uuids->valueAt(row); + // Ensure UUID bytes are big endian when building the string. + const auto uuid = DecimalUtil::big(uuids->valueAt(row)); const uint8_t* uuidBytes = reinterpret_cast(&uuid); @@ -127,6 +128,9 @@ class UuidCastOperator : public exec::CastOperator { int128_t u; memcpy(&u, &uuid, 16); + // Convert a big endian value from Boost to native byte-order. + u = DecimalUtil::big(u); + flatResult->set(row, u); }); } diff --git a/velox/serializers/PrestoSerializer.cpp b/velox/serializers/PrestoSerializer.cpp index 952bfa2061b92..553190094c8c5 100644 --- a/velox/serializers/PrestoSerializer.cpp +++ b/velox/serializers/PrestoSerializer.cpp @@ -23,6 +23,7 @@ #include "velox/common/base/IOUtils.h" #include "velox/common/base/RawVector.h" #include "velox/common/memory/ByteStream.h" +#include "velox/functions/prestosql/types/UuidType.h" #include "velox/vector/BiasVector.h" #include "velox/vector/ComplexVector.h" #include "velox/vector/DictionaryVector.h" @@ -426,6 +427,42 @@ void readDecimalValues( if (nullCount) { checkValuesSize(values, nulls, size, offset); + vector_size_t toClear = offset; + bits::forEachSetBit( + nulls->as(), offset, offset + size, [&](vector_size_t row) { + // Set the values between the last non-null and this to type default. + for (; toClear < row; ++toClear) { + rawValues[toClear] = 0; + } + rawValues[row] = readJavaDecimal(source); + toClear = row + 1; + }); + } else { + for (vector_size_t row = 0; row < size; ++row) { + rawValues[offset + row] = readJavaDecimal(source); + } + } +} + +int128_t readUuidValue(ByteInputStream* source) { + // ByteInputStream does not support reading int128_t values. + // UUIDs are serialized as 2 uint64 values with msb value first. + auto high = folly::Endian::big(source->read()); + auto low = folly::Endian::big(source->read()); + return HugeInt::build(high, low); +} + +void readUuidValues( + ByteInputStream* source, + vector_size_t size, + vector_size_t offset, + const BufferPtr& nulls, + vector_size_t nullCount, + const BufferPtr& values) { + auto rawValues = values->asMutable(); + if (nullCount) { + checkValuesSize(values, nulls, size, offset); + int32_t toClear = offset; bits::forEachSetBit( nulls->as(), offset, offset + size, [&](int32_t row) { @@ -433,12 +470,12 @@ void readDecimalValues( for (; toClear < row; ++toClear) { rawValues[toClear] = 0; } - rawValues[row] = readJavaDecimal(source); + rawValues[row] = readUuidValue(source); toClear = row + 1; }); } else { for (int32_t row = 0; row < size; ++row) { - rawValues[offset + row] = readJavaDecimal(source); + rawValues[offset + row] = readUuidValue(source); } } } @@ -566,6 +603,16 @@ void read( values); return; } + if (isUuidType(type)) { + readUuidValues( + source, + numNewValues, + resultOffset, + flatResult->nulls(), + nullCount, + values); + return; + } readValues( source, numNewValues, @@ -1365,6 +1412,7 @@ class VectorStream { useLosslessTimestamp_(opts.useLosslessTimestamp), nullsFirst_(opts.nullsFirst), isLongDecimal_(type_->isLongDecimal()), + isUuid_(isUuidType(type_)), opts_(opts), encoding_(getEncoding(encoding, vector)), nulls_(streamArena, true, true), @@ -1711,6 +1759,10 @@ class VectorStream { return isLongDecimal_; } + bool isUuid() const { + return isUuid_; + } + void clear() { encoding_ = std::nullopt; initializeHeader(typeToEncodingName(type_), *streamArena_); @@ -1789,6 +1841,7 @@ class VectorStream { const bool useLosslessTimestamp_; const bool nullsFirst_; const bool isLongDecimal_; + const bool isUuid_; const SerdeOpts opts_; std::optional encoding_; int32_t nonNullCount_{0}; @@ -1846,12 +1899,21 @@ FOLLY_ALWAYS_INLINE int128_t toJavaDecimalValue(int128_t value) { return value; } +FOLLY_ALWAYS_INLINE int128_t toJavaUuidValue(int128_t value) { + // Presto Java UuidType expects 2 long values with MSB first. + // int128 will be serialized with [lower, upper], so swap here + // to adjust the order. + return DecimalUtil::big(value); +} + template <> void VectorStream::append(folly::Range values) { for (auto& value : values) { int128_t val = value; if (isLongDecimal_) { val = toJavaDecimalValue(value); + } else if (isUuid_) { + val = toJavaUuidValue(value); } values_.append(folly::Range(&val, 1)); } @@ -2396,7 +2458,8 @@ void copyWords( const int32_t* indices, int32_t numIndices, const T* values, - bool isLongDecimal = false) { + bool isLongDecimal = false, + bool isUuid = false) { if (std::is_same_v && isLongDecimal) { for (auto i = 0; i < numIndices; ++i) { reinterpret_cast(destination)[i] = toJavaDecimalValue( @@ -2404,6 +2467,13 @@ void copyWords( } return; } + if (std::is_same_v && isUuid) { + for (auto i = 0; i < numIndices; ++i) { + reinterpret_cast(destination)[i] = toJavaUuidValue( + reinterpret_cast(values)[indices[i]]); + } + return; + } for (auto i = 0; i < numIndices; ++i) { destination[i] = values[indices[i]]; } @@ -2416,9 +2486,10 @@ void copyWordsWithRows( const int32_t* indices, int32_t numIndices, const T* values, - bool isLongDecimal = false) { + bool isLongDecimal = false, + bool isUuid = false) { if (!indices) { - copyWords(destination, rows, numIndices, values, isLongDecimal); + copyWords(destination, rows, numIndices, values, isLongDecimal, isUuid); return; } if (std::is_same_v && isLongDecimal) { @@ -2427,6 +2498,12 @@ void copyWordsWithRows( reinterpret_cast(values)[rows[indices[i]]]); } return; + } else if (std::is_same_v && isUuid) { + for (auto i = 0; i < numIndices; ++i) { + reinterpret_cast(destination)[i] = toJavaUuidValue( + reinterpret_cast(values)[rows[indices[i]]]); + } + return; } for (auto i = 0; i < numIndices; ++i) { destination[i] = values[rows[indices[i]]]; @@ -2488,7 +2565,8 @@ void appendNonNull( nonNullIndices, numNonNull, values, - stream->isLongDecimal()); + stream->isLongDecimal(), + stream->isUuid()); } } @@ -2581,7 +2659,12 @@ void serializeFlatVector( AppendWindow window(stream->values(), scratch); T* output = window.get(rows.size()); copyWords( - output, rows.data(), rows.size(), rawValues, stream->isLongDecimal()); + output, + rows.data(), + rows.size(), + rawValues, + stream->isLongDecimal(), + stream->isUuid()); return; } diff --git a/velox/serializers/tests/PrestoSerializerTest.cpp b/velox/serializers/tests/PrestoSerializerTest.cpp index da03e6f39739b..4723c6eecce72 100644 --- a/velox/serializers/tests/PrestoSerializerTest.cpp +++ b/velox/serializers/tests/PrestoSerializerTest.cpp @@ -15,6 +15,7 @@ */ #include "velox/serializers/PrestoSerializer.h" #include +#include #include #include #include "velox/common/base/tests/GTestUtils.h" @@ -1090,6 +1091,19 @@ TEST_P(PrestoSerializerTest, longDecimal) { testRoundTrip(vector); } +TEST_P(PrestoSerializerTest, uuid) { + auto vector = makeFlatVector( + 200, [](vector_size_t row) { return (int128_t)0xD1 << row % 120; }); + + testRoundTrip(vector); + + // Add some nulls. + for (auto i = 0; i < vector->size(); i += 7) { + vector->setNull(i, true); + } + testRoundTrip(vector); +} + // Test that hierarchically encoded columns (rows) have their encodings // preserved by the PrestoBatchVectorSerializer. TEST_P(PrestoSerializerTest, encodingsBatchVectorSerializer) { diff --git a/velox/type/DecimalUtil.cpp b/velox/type/DecimalUtil.cpp index f07b34a115283..c06dda6df90d6 100644 --- a/velox/type/DecimalUtil.cpp +++ b/velox/type/DecimalUtil.cpp @@ -125,4 +125,14 @@ int32_t DecimalUtil::maxStringViewSize(int precision, int scale) { return rowSize; } +int128_t DecimalUtil::big(int128_t value) { + if (folly::kIsLittleEndian) { + auto upper = folly::Endian::big(HugeInt::upper(value)); + auto lower = folly::Endian::big(HugeInt::lower(value)); + return HugeInt::build(lower, upper); + } else { + return value; + } +} + } // namespace facebook::velox diff --git a/velox/type/DecimalUtil.h b/velox/type/DecimalUtil.h index dc935b94cdc95..64ce28e0be0ae 100644 --- a/velox/type/DecimalUtil.h +++ b/velox/type/DecimalUtil.h @@ -479,6 +479,13 @@ class DecimalUtil { /// @return The length of out. static int32_t toByteArray(int128_t value, char* out); + /// Reverse byte order of an int128_t, if native byte-order is little endian. + /// If native byte-order is big endian, the value will be unchanged. This + /// is similar to folly::Endian::big(), which does not support int128_t. + /// + /// \return A value with reversed byte-order for little endian platforms. + static int128_t big(int128_t value); + static constexpr __uint128_t kOverflowMultiplier = ((__uint128_t)1 << 127); }; // DecimalUtil } // namespace facebook::velox