diff --git a/velox/serializers/PrestoSerializer.cpp b/velox/serializers/PrestoSerializer.cpp index 09cff0677e3e..1a58750f8b11 100644 --- a/velox/serializers/PrestoSerializer.cpp +++ b/velox/serializers/PrestoSerializer.cpp @@ -1948,7 +1948,8 @@ void serializeFlatVector( void serializeColumn( const BaseVector* vector, const folly::Range& ranges, - VectorStream* stream); + VectorStream* stream, + Scratch& scratch); void serializeColumn( const BaseVector* vector, @@ -1959,7 +1960,8 @@ void serializeColumn( void serializeWrapped( const BaseVector* vector, const folly::Range& ranges, - VectorStream* stream) { + VectorStream* stream, + Scratch& scratch) { std::vector newRanges; const bool mayHaveNulls = vector->mayHaveNulls(); const BaseVector* wrapped = vector->wrappedVector(); @@ -1969,7 +1971,7 @@ void serializeWrapped( if (mayHaveNulls && vector->isNullAt(offset)) { // The wrapper added a null. if (!newRanges.empty()) { - serializeColumn(wrapped, newRanges, stream); + serializeColumn(wrapped, newRanges, stream, scratch); newRanges.clear(); } stream->appendNull(); @@ -1980,7 +1982,7 @@ void serializeWrapped( } } if (!newRanges.empty()) { - serializeColumn(wrapped, newRanges, stream); + serializeColumn(wrapped, newRanges, stream, scratch); } } @@ -2006,7 +2008,8 @@ void serializeTimestampWithTimeZone( void serializeRowVector( const BaseVector* vector, const folly::Range& ranges, - VectorStream* stream) { + VectorStream* stream, + Scratch& scratch) { auto rowVector = dynamic_cast(vector); if (isTimestampWithTimeZoneType(vector->type())) { @@ -2030,14 +2033,15 @@ void serializeRowVector( } for (int32_t i = 0; i < rowVector->childrenSize(); ++i) { serializeColumn( - rowVector->childAt(i).get(), childRanges, stream->childAt(i)); + rowVector->childAt(i).get(), childRanges, stream->childAt(i), scratch); } } void serializeArrayVector( const BaseVector* vector, const folly::Range& ranges, - VectorStream* stream) { + VectorStream* stream, + Scratch& scratch) { auto arrayVector = dynamic_cast(vector); auto rawSizes = arrayVector->rawSizes(); auto rawOffsets = arrayVector->rawOffsets(); @@ -2060,13 +2064,14 @@ void serializeArrayVector( } } serializeColumn( - arrayVector->elements().get(), childRanges, stream->childAt(0)); + arrayVector->elements().get(), childRanges, stream->childAt(0), scratch); } void serializeMapVector( const BaseVector* vector, const folly::Range& ranges, - VectorStream* stream) { + VectorStream* stream, + Scratch& scratch) { auto mapVector = dynamic_cast(vector); auto rawSizes = mapVector->rawSizes(); auto rawOffsets = mapVector->rawOffsets(); @@ -2088,9 +2093,10 @@ void serializeMapVector( } } } - serializeColumn(mapVector->mapKeys().get(), childRanges, stream->childAt(0)); serializeColumn( - mapVector->mapValues().get(), childRanges, stream->childAt(1)); + mapVector->mapKeys().get(), childRanges, stream->childAt(0), scratch); + serializeColumn( + mapVector->mapValues().get(), childRanges, stream->childAt(1), scratch); } static inline int32_t rangesTotalSize( @@ -2106,28 +2112,68 @@ template void serializeDictionaryVector( const BaseVector* vector, const folly::Range& ranges, - VectorStream* stream) { + VectorStream* stream, + Scratch& scratch) { // Cannot serialize dictionary as PrestoPage dictionary if it has nulls. // Also check if the stream was set up for dictionary (we had to know the // encoding type when creating VectorStream for that). if (vector->nulls() || !stream->isDictionaryStream()) { - serializeWrapped(vector, ranges, stream); + serializeWrapped(vector, ranges, stream, scratch); return; } using T = typename KindToFlatVector::WrapperType; auto dictionaryVector = vector->as>(); - std::vector childRanges; - childRanges.push_back({0, dictionaryVector->valueVector()->size()}); + // Create a bit set to track which values in the Dictionary are used. + ScratchPtr usedIndicesHolder(scratch); + auto usedIndicesCapacity = + bits::nwords(dictionaryVector->valueVector()->size()); + auto* usedIndices = usedIndicesHolder.get(usedIndicesCapacity); + simd::memset(usedIndices, 0, usedIndicesCapacity * sizeof(uint64_t)); + + auto* indices = dictionaryVector->indices()->template as(); + vector_size_t numRows = 0; + for (const auto& range : ranges) { + numRows += range.size; + for (auto i = 0; i < range.size; ++i) { + bits::setBit(usedIndices, indices[range.begin + i]); + } + } + + // Convert the bitset to a list of the used indices. + ScratchPtr selectedIndicesHolder(scratch); + auto* mutableSelectedIndices = + selectedIndicesHolder.get(dictionaryVector->valueVector()->size()); + auto numUsed = simd::indicesOfSetBits( + usedIndices, + 0, + dictionaryVector->valueVector()->size(), + mutableSelectedIndices); + + // Serialize the used elements from the Dictionary. serializeColumn( - dictionaryVector->valueVector().get(), childRanges, stream->childAt(0)); + dictionaryVector->valueVector().get(), + folly::Range(mutableSelectedIndices, numUsed), + stream->childAt(0), + scratch); + + // Create a mapping from the original indices to the indices in the shrunk + // Dictionary of just used values. + ScratchPtr updatedIndicesHolder(scratch); + auto* updatedIndices = + updatedIndicesHolder.get(dictionaryVector->valueVector()->size()); + vector_size_t curIndex = 0; + for (vector_size_t i = 0; i < numUsed; ++i) { + updatedIndices[mutableSelectedIndices[i]] = curIndex++; + } - const BufferPtr& indices = dictionaryVector->indices(); - auto* rawIndices = indices->as(); + // Write out the indices, translating them using the above mapping. + stream->appendNonNull(numRows); for (const auto& range : ranges) { - stream->appendNonNull(range.size); - stream->append(folly::Range(&rawIndices[range.begin], range.size)); + for (auto i = 0; i < range.size; ++i) { + stream->appendOne(updatedIndices[indices[range.begin + i]]); + } } } @@ -2135,11 +2181,12 @@ template void serializeConstantVectorImpl( const BaseVector* vector, const folly::Range& ranges, - VectorStream* stream) { + VectorStream* stream, + Scratch& scratch) { using T = typename KindToFlatVector::WrapperType; auto constVector = dynamic_cast*>(vector); if (constVector->valueVector() != nullptr) { - serializeWrapped(constVector, ranges, stream); + serializeWrapped(constVector, ranges, stream, scratch); return; } @@ -2162,7 +2209,8 @@ template void serializeConstantVector( const BaseVector* vector, const folly::Range& ranges, - VectorStream* stream) { + VectorStream* stream, + Scratch& scratch) { if (stream->isConstantStream()) { for (const auto& range : ranges) { stream->appendNonNull(range.size); @@ -2170,9 +2218,10 @@ void serializeConstantVector( std::vector newRanges; newRanges.push_back({0, 1}); - serializeConstantVectorImpl(vector, newRanges, stream->childAt(0)); + serializeConstantVectorImpl( + vector, newRanges, stream->childAt(0), scratch); } else { - serializeConstantVectorImpl(vector, ranges, stream); + serializeConstantVectorImpl(vector, ranges, stream, scratch); } } @@ -2208,7 +2257,8 @@ void serializeBiasVector( void serializeColumn( const BaseVector* vector, const folly::Range& ranges, - VectorStream* stream) { + VectorStream* stream, + Scratch& scratch) { switch (vector->encoding()) { case VectorEncoding::Simple::FLAT: VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH_ALL( @@ -2216,7 +2266,12 @@ void serializeColumn( break; case VectorEncoding::Simple::CONSTANT: VELOX_DYNAMIC_TYPE_DISPATCH_ALL( - serializeConstantVector, vector->typeKind(), vector, ranges, stream); + serializeConstantVector, + vector->typeKind(), + vector, + ranges, + stream, + scratch); break; case VectorEncoding::Simple::DICTIONARY: VELOX_DYNAMIC_TYPE_DISPATCH_ALL( @@ -2224,7 +2279,8 @@ void serializeColumn( vector->typeKind(), vector, ranges, - stream); + stream, + scratch); break; case VectorEncoding::Simple::BIASED: switch (vector->typeKind()) { @@ -2244,19 +2300,19 @@ void serializeColumn( } break; case VectorEncoding::Simple::ROW: - serializeRowVector(vector, ranges, stream); + serializeRowVector(vector, ranges, stream, scratch); break; case VectorEncoding::Simple::ARRAY: - serializeArrayVector(vector, ranges, stream); + serializeArrayVector(vector, ranges, stream, scratch); break; case VectorEncoding::Simple::MAP: - serializeMapVector(vector, ranges, stream); + serializeMapVector(vector, ranges, stream, scratch); break; case VectorEncoding::Simple::LAZY: - serializeColumn(vector->loadedVector(), ranges, stream); + serializeColumn(vector->loadedVector(), ranges, stream, scratch); break; default: - serializeWrapped(vector, ranges, stream); + serializeWrapped(vector, ranges, stream, scratch); } } @@ -2752,7 +2808,8 @@ void serializeArrayVector( serializeColumn( arrayVector->elements().get(), folly::Range(rangesHolder.get(), numRanges), - stream->childAt(0)); + stream->childAt(0), + scratch); } void serializeMapVector( @@ -2779,11 +2836,13 @@ void serializeMapVector( serializeColumn( mapVector->mapKeys().get(), folly::Range(rangesHolder.get(), numRanges), - stream->childAt(0)); + stream->childAt(0), + scratch); serializeColumn( mapVector->mapValues().get(), folly::Range(rangesHolder.get(), numRanges), - stream->childAt(1)); + stream->childAt(1), + scratch); } template @@ -3562,7 +3621,7 @@ class PrestoBatchVectorSerializer : public BatchVectorSerializer { void serialize( const RowVectorPtr& vector, const folly::Range& ranges, - Scratch& /* scratch */, + Scratch& scratch, OutputStream* stream) override { const auto numRows = rangesTotalSize(ranges); const auto rowType = vector->type(); @@ -3579,7 +3638,8 @@ class PrestoBatchVectorSerializer : public BatchVectorSerializer { numRows, opts_); - serializeColumn(vector->childAt(i).get(), ranges, streams[i].get()); + serializeColumn( + vector->childAt(i).get(), ranges, streams[i].get(), scratch); } flushStreams(streams, numRows, arena, *codec_, stream); @@ -3620,7 +3680,8 @@ class PrestoIterativeVectorSerializer : public IterativeVectorSerializer { } numRows_ += numNewRows; for (int32_t i = 0; i < vector->childrenSize(); ++i) { - serializeColumn(vector->childAt(i).get(), ranges, streams_[i].get()); + serializeColumn( + vector->childAt(i).get(), ranges, streams_[i].get(), scratch); } } diff --git a/velox/serializers/tests/PrestoSerializerTest.cpp b/velox/serializers/tests/PrestoSerializerTest.cpp index 5a35ddb5607c..5c1046d75fcf 100644 --- a/velox/serializers/tests/PrestoSerializerTest.cpp +++ b/velox/serializers/tests/PrestoSerializerTest.cpp @@ -590,6 +590,79 @@ class PrestoSerializerTest }); } + void testMinimalDictionaryEncoding( + vector_size_t numRows, + vector_size_t alphabetSize) { + auto base = makeFlatVector( + alphabetSize, [](vector_size_t row) { return row; }); + auto allIndices = makeIndices( + numRows, [alphabetSize](auto row) { return row % alphabetSize; }); + auto evenIndices = makeIndices( + numRows, [alphabetSize](auto row) { return (row * 2) % alphabetSize; }); + auto oddIndices = makeIndices(numRows, [alphabetSize](auto row) { + return (row * 2 + 1) % alphabetSize; + }); + auto prefixIndices = makeIndices( + numRows, [alphabetSize](auto row) { return row % (alphabetSize / 2); }); + auto suffixIndices = makeIndices(numRows, [alphabetSize](auto row) { + return row % (alphabetSize / 2) + (alphabetSize / 2); + }); + + auto rows = makeRowVector({ + BaseVector::wrapInDictionary(nullptr, allIndices, numRows, base), + BaseVector::wrapInDictionary(nullptr, evenIndices, numRows, base), + BaseVector::wrapInDictionary(nullptr, oddIndices, numRows, base), + BaseVector::wrapInDictionary(nullptr, prefixIndices, numRows, base), + BaseVector::wrapInDictionary(nullptr, suffixIndices, numRows, base), + }); + + std::ostringstream out; + serializeBatch(rows, &out, /*serdeOptions=*/nullptr); + const auto serialized = out.str(); + + auto rowType = asRowType(rows->type()); + auto deserialized = + deserialize(rowType, serialized, /*serdeOptions=*/nullptr); + + assertEqualVectors(rows, deserialized); + assertEqualEncoding(rows, deserialized); + + // If the dictionary is larger than numRows, we'll still get distinct + // indices after applying modulus. + auto expectedSmallerAlphabetSize = std::min(alphabetSize / 2, numRows); + + ASSERT_EQ( + deserialized->childAt(0) + ->as>() + ->valueVector() + ->size(), + std::min(alphabetSize, numRows)); + ASSERT_EQ( + deserialized->childAt(1) + ->as>() + ->valueVector() + ->size(), + expectedSmallerAlphabetSize); + ASSERT_EQ( + deserialized->childAt(2) + ->as>() + ->valueVector() + ->size(), + expectedSmallerAlphabetSize); + ASSERT_EQ( + deserialized->childAt(3) + ->as>() + ->valueVector() + ->size(), + expectedSmallerAlphabetSize); + ASSERT_EQ( + deserialized->childAt(4) + ->as>() + ->valueVector() + ->size(), + expectedSmallerAlphabetSize); + } + std::unique_ptr serde_; }; @@ -848,6 +921,17 @@ TEST_P(PrestoSerializerTest, encodingsMapValuesBatchVectorSerializer) { testBatchVectorSerializerRoundTrip(encodingsMapValuesTestVector()); } +// Test that dictionary encoding is preserved and that only the minimum number +// of entries from the alphabet are serialized. +TEST_P(PrestoSerializerTest, minimalDictionaryEncodings) { + // Dictionary same size as rows. + testMinimalDictionaryEncoding(32, 32); + // Dictionary smaller than rows. + testMinimalDictionaryEncoding(32, 16); + // Dictionary larger than rows. + testMinimalDictionaryEncoding(32, 64); +} + TEST_P(PrestoSerializerTest, lazy) { constexpr int kSize = 1000; auto rowVector = makeTestVector(kSize);