From 2645613d4a0047f55b705f878917d029729ff1a7 Mon Sep 17 00:00:00 2001 From: Orri Erling Date: Fri, 19 Jan 2024 09:58:27 -0800 Subject: [PATCH] Scatter struct nulls when deserializing Presto wire format (#8318) Summary: When reading spill serialization, struct nulls are written before the struct columns and the reading can proceed i a single pass. Like this, nulls from enclosing structs are passed down when reading. These are combined ith nulls of the contained column so that the contained column also has a null for rows where the enclosing struct is null. When reading Presto Pages, struct nulls come after the child columns. A separate pass scatters the child column values so as to create a null gap for the rows where the containing struct is null. Adds a test for encoding preserving roud trips. Adds a test for concatenating different encodings in a message, e.g. constant, dictionary, flat in all combinations of same/different encoding/value domain. This functionality only applies to nulls first representations. This will apply to Presto pages when the struct nulls are read before constructing the struct. See PR 8152 for the end state. Pull Request resolved: https://github.com/facebookincubator/velox/pull/8318 Reviewed By: xiaoxmeng Differential Revision: D52682198 Pulled By: oerling fbshipit-source-id: 4253727392ecae2caca92e79799710703370a287 --- velox/exec/SpillFile.cpp | 4 +- velox/serializers/PrestoSerializer.cpp | 552 ++++++++++++------ velox/serializers/PrestoSerializer.h | 23 +- .../tests/PrestoSerializerTest.cpp | 196 +++++-- 4 files changed, 525 insertions(+), 250 deletions(-) diff --git a/velox/exec/SpillFile.cpp b/velox/exec/SpillFile.cpp index a04b34ca92e2..28181f48d779 100644 --- a/velox/exec/SpillFile.cpp +++ b/velox/exec/SpillFile.cpp @@ -181,7 +181,7 @@ uint64_t SpillWriter::write( MicrosecondTimer timer(&timeUs); if (batch_ == nullptr) { serializer::presto::PrestoVectorSerde::PrestoOptions options = { - kDefaultUseLosslessTimestamp, compressionKind_}; + kDefaultUseLosslessTimestamp, compressionKind_, true}; batch_ = std::make_unique(pool_); batch_->createStreamTree( std::static_pointer_cast(rows->type()), @@ -297,7 +297,7 @@ SpillReadFile::SpillReadFile( numSortKeys_(numSortKeys), sortCompareFlags_(sortCompareFlags), compressionKind_(compressionKind), - readOptions_{kDefaultUseLosslessTimestamp, compressionKind_}, + readOptions_{kDefaultUseLosslessTimestamp, compressionKind_, true}, pool_(pool) { constexpr uint64_t kMaxReadBufferSize = (1 << 20) - AlignedBuffer::kPaddedSize; // 1MB - padding. diff --git a/velox/serializers/PrestoSerializer.cpp b/velox/serializers/PrestoSerializer.cpp index f57c649bd75b..fbcf592c7de1 100644 --- a/velox/serializers/PrestoSerializer.cpp +++ b/velox/serializers/PrestoSerializer.cpp @@ -25,6 +25,9 @@ #include "velox/vector/VectorTypeUtils.h" namespace facebook::velox::serializer::presto { + +using SerdeOpts = PrestoVectorSerde::PrestoOptions; + namespace { constexpr int8_t kCompressedBitMask = 1; constexpr int8_t kEncryptedBitMask = 2; @@ -300,21 +303,53 @@ void readDecimalValues( } } +/// When deserializing vectors under row vectors that introduce +/// nulls, the child vector must have a gap at the place where a +/// parent RowVector has a null. So, if there is a parent RowVector +/// that adds a null, 'incomingNulls' is the bitmap where a null +/// denotes a null in the parent RowVector(s). 'numIncomingNulls' is +/// the number of bits in this bitmap, i.e. the number of rows in +/// the parentRowVector. 'size' is the size of the child vector +/// being deserialized. This size does not include rows where a +/// parent RowVector has nulls. +vector_size_t sizeWithIncomingNulls( + vector_size_t size, + int32_t numIncomingNulls) { + return numIncomingNulls == 0 ? size : numIncomingNulls; +} + +// Fills the nulls of 'result' from the serialized nulls in +// 'source'. Adds nulls from 'incomingNulls' so that the null flags +// gets padded with extra nulls where a parent RowVector has a +// null. Returns the number of nulls in the result. vector_size_t readNulls( ByteInputStream* source, vector_size_t size, - BaseVector& result, - vector_size_t resultOffset) { + vector_size_t resultOffset, + const uint64_t* incomingNulls, + int32_t numIncomingNulls, + BaseVector& result) { + VELOX_DCHECK_LE( + result.size(), resultOffset + (incomingNulls ? numIncomingNulls : size)); if (source->readByte() == 0) { - result.clearNulls(resultOffset, resultOffset + size); - return 0; + if (incomingNulls) { + auto* rawNulls = result.mutableRawNulls(); + bits::copyBits( + incomingNulls, 0, rawNulls, resultOffset, numIncomingNulls); + } else { + result.clearNulls(resultOffset, resultOffset + size); + } + return incomingNulls + ? numIncomingNulls - bits::countBits(incomingNulls, 0, numIncomingNulls) + : 0; } - const bool noPriorNulls = (result.rawNulls() == nullptr); + const auto numNewValues = sizeWithIncomingNulls(size, numIncomingNulls); + const bool noPriorNulls = (result.rawNulls() == nullptr); // Allocate one extra byte in case we cannot use bits from the current last // partial byte. - BufferPtr& nulls = result.mutableNulls(resultOffset + size + 8); + BufferPtr& nulls = result.mutableNulls(resultOffset + numNewValues + 8); if (noPriorNulls) { bits::fillBits( nulls->asMutable(), 0, resultOffset, bits::kNotNull); @@ -326,6 +361,15 @@ vector_size_t readNulls( source->readBytes(rawNulls, numBytes); bits::reverseBits(rawNulls, numBytes); bits::negate(reinterpret_cast(rawNulls), numBytes * 8); + // Add incoming nulls if any. + if (incomingNulls) { + bits::scatterBits( + size, + numIncomingNulls, + reinterpret_cast(rawNulls), + incomingNulls, + reinterpret_cast(rawNulls)); + } // Shift bits if needed. if (bits::nbytes(resultOffset) * 8 > resultOffset) { @@ -334,64 +378,93 @@ vector_size_t readNulls( bits::nbytes(resultOffset) * 8, nulls->asMutable(), resultOffset, - size); + numNewValues); } - return BaseVector::countNulls(nulls, resultOffset, resultOffset + size); + return BaseVector::countNulls( + nulls, resultOffset, resultOffset + numNewValues); } template void read( ByteInputStream* source, const TypePtr& type, - velox::memory::MemoryPool* pool, - VectorPtr& result, vector_size_t resultOffset, - bool useLosslessTimestamp) { + const uint64_t* incomingNulls, + int32_t numIncomingNulls, + velox::memory::MemoryPool* pool, + const SerdeOpts& opts, + VectorPtr& result) { const int32_t size = source->read(); - result->resize(resultOffset + size); + const auto numNewValues = sizeWithIncomingNulls(size, numIncomingNulls); + result->resize(resultOffset + numNewValues); auto flatResult = result->asFlatVector(); - auto nullCount = readNulls(source, size, *flatResult, resultOffset); + auto nullCount = readNulls( + source, size, resultOffset, incomingNulls, numIncomingNulls, *flatResult); - BufferPtr values = flatResult->mutableValues(resultOffset + size); + BufferPtr values = flatResult->mutableValues(resultOffset + numNewValues); if constexpr (std::is_same_v) { - if (useLosslessTimestamp) { + if (opts.useLosslessTimestamp) { readLosslessTimestampValues( - source, size, resultOffset, flatResult->nulls(), nullCount, values); + source, + numNewValues, + resultOffset, + flatResult->nulls(), + nullCount, + values); return; } } if (type->isLongDecimal()) { readDecimalValues( - source, size, resultOffset, flatResult->nulls(), nullCount, values); + source, + numNewValues, + resultOffset, + flatResult->nulls(), + nullCount, + values); return; } readValues( - source, size, resultOffset, flatResult->nulls(), nullCount, values); + source, + numNewValues, + resultOffset, + flatResult->nulls(), + nullCount, + values); } template <> void read( ByteInputStream* source, const TypePtr& type, - velox::memory::MemoryPool* pool, - VectorPtr& result, vector_size_t resultOffset, - bool useLosslessTimestamp) { + const uint64_t* incomingNulls, + int32_t numIncomingNulls, + velox::memory::MemoryPool* pool, + const SerdeOpts& opts, + VectorPtr& result) { const int32_t size = source->read(); + const int32_t numNewValues = sizeWithIncomingNulls(size, numIncomingNulls); - result->resize(resultOffset + size); + result->resize(resultOffset + numNewValues); auto flatResult = result->as>(); BufferPtr values = flatResult->mutableValues(resultOffset + size); auto rawValues = values->asMutable(); - for (int32_t i = 0; i < size; ++i) { + int32_t lastOffset = 0; + for (int32_t i = 0; i < numNewValues; ++i) { // Set the first int32_t of each StringView to be the offset. - *reinterpret_cast(&rawValues[resultOffset + i]) = - source->read(); + if (incomingNulls && bits::isBitNull(incomingNulls, i)) { + *reinterpret_cast(&rawValues[resultOffset + i]) = lastOffset; + continue; + } + lastOffset = source->read(); + *reinterpret_cast(&rawValues[resultOffset + i]) = lastOffset; } - readNulls(source, size, *flatResult, resultOffset); + readNulls( + source, size, resultOffset, incomingNulls, numIncomingNulls, *flatResult); const int32_t dataSize = source->read(); if (dataSize == 0) { @@ -404,7 +477,7 @@ void read( source->readBytes(rawStrings, dataSize); int32_t previousOffset = 0; auto rawChars = reinterpret_cast(rawStrings); - for (int32_t i = 0; i < size; ++i) { + for (int32_t i = 0; i < numNewValues; ++i) { int32_t offset = rawValues[resultOffset + i].size(); rawValues[resultOffset + i] = StringView(rawChars + previousOffset, offset - previousOffset); @@ -414,84 +487,127 @@ void read( void readColumns( ByteInputStream* source, - velox::memory::MemoryPool* pool, const std::vector& types, - std::vector& result, vector_size_t resultOffset, - bool useLosslessTimestamp); + const uint64_t* incomingNulls, + int32_t numIncomingNulls, + velox::memory::MemoryPool* pool, + const SerdeOpts& opts, + std::vector& result); void readConstantVector( ByteInputStream* source, const TypePtr& type, - velox::memory::MemoryPool* pool, - VectorPtr& result, vector_size_t resultOffset, - bool useLosslessTimestamp) { + const uint64_t* incomingNulls, + int32_t numIncomingNulls, + velox::memory::MemoryPool* pool, + const SerdeOpts& opts, + VectorPtr& result) { const auto size = source->read(); + const int32_t numNewValues = sizeWithIncomingNulls(size, numIncomingNulls); std::vector childTypes = {type}; std::vector children{BaseVector::create(type, 0, pool)}; - readColumns(source, pool, childTypes, children, 0, useLosslessTimestamp); + readColumns(source, childTypes, 0, nullptr, 0, pool, opts, children); VELOX_CHECK_EQ(1, children[0]->size()); - auto constantVector = BaseVector::wrapInConstant(size, 0, children[0]); - if (resultOffset == 0) { + auto constantVector = + BaseVector::wrapInConstant(numNewValues, 0, children[0]); + if (resultOffset == 0 && !incomingNulls) { result = std::move(constantVector); } else { - result->resize(resultOffset + size); + if (!incomingNulls && + opts.nullsFirst && // TODO remove when removing scatter nulls pass. + result->encoding() == VectorEncoding::Simple::CONSTANT && + constantVector->equalValueAt(result.get(), 0, 0)) { + result->resize(resultOffset + numNewValues); + return; + } + result->resize(resultOffset + numNewValues); - SelectivityVector rows(resultOffset + size, false); - rows.setValidRange(resultOffset, resultOffset + size, true); + SelectivityVector rows(resultOffset + numNewValues, false); + rows.setValidRange(resultOffset, resultOffset + numNewValues, true); rows.updateBounds(); BaseVector::ensureWritable(rows, type, pool, result); - result->copy(constantVector.get(), resultOffset, 0, size); + result->copy(constantVector.get(), resultOffset, 0, numNewValues); + if (incomingNulls) { + bits::forEachUnsetBit(incomingNulls, 0, numNewValues, [&](auto row) { + result->setNull(resultOffset + row, true); + }); + } } } void readDictionaryVector( ByteInputStream* source, const TypePtr& type, - velox::memory::MemoryPool* pool, - VectorPtr& result, vector_size_t resultOffset, - bool useLosslessTimestamp) { + const uint64_t* incomingNulls, + int32_t numIncomingNulls, + velox::memory::MemoryPool* pool, + const SerdeOpts& opts, + VectorPtr& result) { const auto size = source->read(); + const int32_t numNewValues = sizeWithIncomingNulls(size, numIncomingNulls); std::vector childTypes = {type}; std::vector children{BaseVector::create(type, 0, pool)}; - readColumns(source, pool, childTypes, children, 0, useLosslessTimestamp); + readColumns(source, childTypes, 0, nullptr, 0, pool, opts, children); // Read indices. - BufferPtr indices = allocateIndices(size, pool); - source->readBytes(indices->asMutable(), size * sizeof(int32_t)); + BufferPtr indices = allocateIndices(numNewValues, pool); + if (incomingNulls) { + auto rawIndices = indices->asMutable(); + for (auto i = 0; i < numNewValues; ++i) { + if (bits::isBitNull(incomingNulls, i)) { + rawIndices[i] = 0; + } else { + rawIndices[i] = source->read(); + } + } + } else { + source->readBytes( + indices->asMutable(), numNewValues * sizeof(int32_t)); + } // Skip 3 * 8 bytes of 'instance id'. Velox doesn't use 'instance id' for // dictionary vectors. source->skip(24); - auto dictionaryVector = - BaseVector::wrapInDictionary(nullptr, indices, size, children[0]); + BufferPtr incomingNullsBuffer = nullptr; + if (incomingNulls) { + incomingNullsBuffer = AlignedBuffer::allocate(numIncomingNulls, pool); + memcpy( + incomingNullsBuffer->asMutable(), + incomingNulls, + bits::nbytes(numIncomingNulls)); + } + auto dictionaryVector = BaseVector::wrapInDictionary( + incomingNullsBuffer, indices, numNewValues, children[0]); if (resultOffset == 0) { result = std::move(dictionaryVector); } else { - result->resize(resultOffset + size); + result->resize(resultOffset + numNewValues); - SelectivityVector rows(resultOffset + size, false); - rows.setValidRange(resultOffset, resultOffset + size, true); + SelectivityVector rows(resultOffset + numNewValues, false); + rows.setValidRange(resultOffset, resultOffset + numNewValues, true); rows.updateBounds(); BaseVector::ensureWritable(rows, type, pool, result); - result->copy(dictionaryVector.get(), resultOffset, 0, size); + result->copy(dictionaryVector.get(), resultOffset, 0, numNewValues); } } void readArrayVector( ByteInputStream* source, const TypePtr& type, - velox::memory::MemoryPool* pool, - VectorPtr& result, vector_size_t resultOffset, - bool useLosslessTimestamp) { + const uint64_t* incomingNulls, + int32_t numIncomingNulls, + velox::memory::MemoryPool* pool, + const SerdeOpts& opts, + VectorPtr& result) { ArrayVector* arrayVector = result->as(); const auto resultElementsOffset = arrayVector->elements()->size(); @@ -500,49 +616,67 @@ void readArrayVector( std::vector children{arrayVector->elements()}; readColumns( source, - pool, childTypes, - children, resultElementsOffset, - useLosslessTimestamp); + nullptr, + 0, + pool, + opts, + children); - vector_size_t size = source->read(); - arrayVector->resize(resultOffset + size); + const vector_size_t size = source->read(); + const auto numNewValues = sizeWithIncomingNulls(size, numIncomingNulls); + arrayVector->resize(resultOffset + numNewValues); arrayVector->setElements(children[0]); - BufferPtr offsets = arrayVector->mutableOffsets(resultOffset + size); + BufferPtr offsets = arrayVector->mutableOffsets(resultOffset + numNewValues); auto rawOffsets = offsets->asMutable(); - BufferPtr sizes = arrayVector->mutableSizes(resultOffset + size); + BufferPtr sizes = arrayVector->mutableSizes(resultOffset + numNewValues); auto rawSizes = sizes->asMutable(); int32_t base = source->read(); - for (int32_t i = 0; i < size; ++i) { + for (int32_t i = 0; i < numNewValues; ++i) { + if (incomingNulls && bits::isBitNull(incomingNulls, i)) { + rawOffsets[resultOffset + i] = 0; + rawSizes[resultOffset + i] = 0; + continue; + } int32_t offset = source->read(); rawOffsets[resultOffset + i] = resultElementsOffset + base; rawSizes[resultOffset + i] = offset - base; base = offset; } - readNulls(source, size, *arrayVector, resultOffset); + readNulls( + source, + size, + resultOffset, + incomingNulls, + numIncomingNulls, + *arrayVector); } void readMapVector( ByteInputStream* source, const TypePtr& type, - velox::memory::MemoryPool* pool, - VectorPtr& result, vector_size_t resultOffset, - bool useLosslessTimestamp) { + const uint64_t* incomingNulls, + int32_t numIncomingNulls, + velox::memory::MemoryPool* pool, + const SerdeOpts& opts, + VectorPtr& result) { MapVector* mapVector = result->as(); const auto resultElementsOffset = mapVector->mapKeys()->size(); std::vector childTypes = {type->childAt(0), type->childAt(1)}; std::vector children{mapVector->mapKeys(), mapVector->mapValues()}; readColumns( source, - pool, childTypes, - children, resultElementsOffset, - useLosslessTimestamp); + nullptr, + 0, + pool, + opts, + children); int32_t hashTableSize = source->read(); if (hashTableSize != -1) { @@ -550,23 +684,31 @@ void readMapVector( source->skip(hashTableSize * sizeof(int32_t)); } - vector_size_t size = source->read(); - mapVector->resize(resultOffset + size); + const vector_size_t size = source->read(); + const vector_size_t numNewValues = + sizeWithIncomingNulls(size, numIncomingNulls); + mapVector->resize(resultOffset + numNewValues); mapVector->setKeysAndValues(children[0], children[1]); - BufferPtr offsets = mapVector->mutableOffsets(resultOffset + size); + BufferPtr offsets = mapVector->mutableOffsets(resultOffset + numNewValues); auto rawOffsets = offsets->asMutable(); - BufferPtr sizes = mapVector->mutableSizes(resultOffset + size); + BufferPtr sizes = mapVector->mutableSizes(resultOffset + numNewValues); auto rawSizes = sizes->asMutable(); int32_t base = source->read(); - for (int32_t i = 0; i < size; ++i) { + for (int32_t i = 0; i < numNewValues; ++i) { + if (incomingNulls && bits::isBitNull(incomingNulls, i)) { + rawOffsets[resultOffset + i] = 0; + rawSizes[resultOffset + i] = 0; + continue; + } int32_t offset = source->read(); rawOffsets[resultOffset + i] = resultElementsOffset + base; rawSizes[resultOffset + i] = offset - base; base = offset; } - readNulls(source, size, *mapVector, resultOffset); + readNulls( + source, size, resultOffset, incomingNulls, numIncomingNulls, *mapVector); } int64_t packTimestampWithTimeZone(int64_t timestamp, int16_t timezone) { @@ -585,9 +727,21 @@ void readTimestampWithTimeZone( ByteInputStream* source, velox::memory::MemoryPool* pool, RowVector* result, - vector_size_t resultOffset) { + vector_size_t resultOffset, + const uint64_t* incomingNulls, + int32_t numIncomingNulls) { + SerdeOpts opts; + opts.useLosslessTimestamp = false; auto& timestamps = result->childAt(0); - read(source, BIGINT(), pool, timestamps, resultOffset, false); + read( + source, + BIGINT(), + resultOffset, + incomingNulls, + numIncomingNulls, + pool, + opts, + timestamps); auto rawTimestamps = timestamps->asFlatVector()->mutableRawValues(); @@ -916,32 +1070,64 @@ void scatterStructNulls( void readRowVector( ByteInputStream* source, const TypePtr& type, - velox::memory::MemoryPool* pool, - VectorPtr& result, vector_size_t resultOffset, - bool useLosslessTimestamp) { - auto* row = result->as(); + const uint64_t* incomingNulls, + int32_t numIncomingNulls, + velox::memory::MemoryPool* pool, + const SerdeOpts& opts, + VectorPtr& result) { + auto* row = result->asUnchecked(); if (isTimestampWithTimeZoneType(type)) { - readTimestampWithTimeZone(source, pool, row, resultOffset); + readTimestampWithTimeZone( + source, pool, row, resultOffset, incomingNulls, numIncomingNulls); return; } - + BufferPtr combinedNulls; + const uint64_t* childNulls = incomingNulls; + int32_t numChildNulls = numIncomingNulls; + if (opts.nullsFirst) { + const auto size = source->read(); + const auto numNewValues = sizeWithIncomingNulls(size, numIncomingNulls); + row->resize(resultOffset + numNewValues); + readNulls( + source, size, resultOffset, incomingNulls, numIncomingNulls, *result); + if (row->rawNulls()) { + combinedNulls = AlignedBuffer::allocate(numNewValues, pool); + bits::copyBits( + row->rawNulls(), + resultOffset, + combinedNulls->asMutable(), + 0, + numNewValues); + childNulls = combinedNulls->as(); + numChildNulls = numNewValues; + } + } const int32_t numChildren = source->read(); auto& children = row->children(); const auto& childTypes = type->asRow().children(); readColumns( - source, pool, childTypes, children, resultOffset, useLosslessTimestamp); - - auto size = source->read(); - // Set the size of the row but do not alter the size of the - // children. The children get adjusted in a separate pass over the - // data. The parent and child size MUST be separate until the second pass. - row->BaseVector::resize(resultOffset + size); - for (int32_t i = 0; i <= size; ++i) { - source->read(); + source, + childTypes, + resultOffset, + childNulls, + numChildNulls, + pool, + opts, + children); + if (!opts.nullsFirst) { + const auto size = source->read(); + // Set the size of the row but do not alter the size of the + // children. The children get adjusted in a separate pass over the + // data. The parent and child size MUST be separate until the second pass. + row->BaseVector::resize(resultOffset + size); + // Read and discard the offsets. The number of offsets is not affected by + // incomingNulls. + source->skip((size + 1) * sizeof(int32_t)); + readNulls( + source, size, resultOffset, incomingNulls, numIncomingNulls, *result); } - readNulls(source, size, *result, resultOffset); } std::string readLengthPrefixedString(ByteInputStream* source) { @@ -964,20 +1150,24 @@ void checkTypeEncoding(std::string_view encoding, const TypePtr& type) { void readColumns( ByteInputStream* source, - velox::memory::MemoryPool* pool, const std::vector& types, - std::vector& results, vector_size_t resultOffset, - bool useLosslessTimestamp) { + const uint64_t* incomingNulls, + int32_t numIncomingNulls, + velox::memory::MemoryPool* pool, + const SerdeOpts& opts, + std::vector& results) { static const std::unordered_map< TypeKind, std::function> + const uint64_t* incomingNulls, + int32_t numIncomingNulls, + velox::memory::MemoryPool* pool, + const SerdeOpts& opts, + VectorPtr& result)>> readers = { {TypeKind::BOOLEAN, &read}, {TypeKind::TINYINT, &read}, @@ -1006,20 +1196,30 @@ void readColumns( readConstantVector( source, columnType, - pool, - columnResult, resultOffset, - useLosslessTimestamp); + incomingNulls, + numIncomingNulls, + pool, + opts, + columnResult); } else if (encoding == kDictionary) { readDictionaryVector( source, columnType, - pool, - columnResult, resultOffset, - useLosslessTimestamp); + incomingNulls, + numIncomingNulls, + pool, + opts, + columnResult); } else { checkTypeEncoding(encoding, columnType); + if (columnResult != nullptr && + (columnResult->encoding() == VectorEncoding::Simple::CONSTANT || + columnResult->encoding() == VectorEncoding::Simple::DICTIONARY)) { + BaseVector::ensureWritable( + SelectivityVector::empty(), types[i], pool, columnResult); + } const auto it = readers.find(columnType->kind()); VELOX_CHECK( it != readers.end(), @@ -1029,10 +1229,12 @@ void readColumns( it->second( source, columnType, - pool, - columnResult, resultOffset, - useLosslessTimestamp); + incomingNulls, + numIncomingNulls, + pool, + opts, + columnResult); } } } @@ -1089,11 +1291,12 @@ class VectorStream { std::optional encoding, StreamArena* streamArena, int32_t initialNumRows, - bool useLosslessTimestamp) + const SerdeOpts& opts) : type_(type), isLongDecimal_(type_->isLongDecimal()), encoding_(encoding), - useLosslessTimestamp_(useLosslessTimestamp), + useLosslessTimestamp_(opts.useLosslessTimestamp), + nullsFirst_(opts.nullsFirst), nulls_(streamArena, true, true), lengths_(streamArena), values_(streamArena) { @@ -1107,22 +1310,14 @@ class VectorStream { case VectorEncoding::Simple::CONSTANT: { initializeHeader(kRLE, *streamArena); children_.emplace_back(std::make_unique( - type_, - std::nullopt, - streamArena, - initialNumRows, - useLosslessTimestamp)); + type_, std::nullopt, streamArena, initialNumRows, opts)); return; } case VectorEncoding::Simple::DICTIONARY: { initializeHeader(kDictionary, *streamArena); values_.startWrite(initialNumRows * 4); children_.emplace_back(std::make_unique( - type_, - std::nullopt, - streamArena, - initialNumRows, - useLosslessTimestamp)); + type_, std::nullopt, streamArena, initialNumRows, opts)); return; } default: @@ -1152,7 +1347,7 @@ class VectorStream { std::nullopt, streamArena, initialNumRows, - useLosslessTimestamp); + opts); } // The first element in the offsets in the wire format is always 0 for // nested types. @@ -1335,13 +1530,19 @@ class VectorStream { return; } + if (nullsFirst_) { + writeInt32(out, nullCount_ + nonNullCount_); + flushNulls(out); + } writeInt32(out, children_.size()); for (auto& child : children_) { child->flush(out); } - writeInt32(out, nullCount_ + nonNullCount_); - lengths_.flush(out); - flushNulls(out); + if (!nullsFirst_) { + writeInt32(out, nullCount_ + nonNullCount_); + lengths_.flush(out); + flushNulls(out); + } return; case TypeKind::ARRAY: @@ -1402,7 +1603,7 @@ class VectorStream { /// If false, they are serialized with millisecond precision which is /// compatible with presto. const bool useLosslessTimestamp_; - + const bool nullsFirst_; int32_t nonNullCount_{0}; int32_t nullCount_{0}; int32_t totalLength_{0}; @@ -3030,10 +3231,9 @@ class PrestoVectorSerializer : public VectorSerializer { std::vector encodings, int32_t numRows, StreamArena* streamArena, - bool useLosslessTimestamp, - common::CompressionKind compressionKind) + const SerdeOpts& opts) : streamArena_(streamArena), - codec_(common::compressionKindToCodec(compressionKind)) { + codec_(common::compressionKindToCodec(opts.compressionKind)) { const auto types = rowType->children(); const auto numTypes = types.size(); streams_.resize(numTypes); @@ -3043,7 +3243,7 @@ class PrestoVectorSerializer : public VectorSerializer { encoding = encodings[i]; } streams_[i] = std::make_unique( - types[i], encoding, streamArena, numRows, useLosslessTimestamp); + types[i], encoding, streamArena, numRows, opts); } } @@ -3079,14 +3279,18 @@ class PrestoVectorSerializer : public VectorSerializer { void appendEncoded( const RowVectorPtr& vector, const folly::Range& ranges) { - const auto numNewRows = rangesTotalSize(ranges); - if (numNewRows == 0) { - return; - } - numRows_ += numNewRows; - for (int32_t i = 0; i < vector->childrenSize(); ++i) { - serializeEncodedColumn( - vector->childAt(i).get(), ranges, streams_[i].get()); + const auto newRows = rangesTotalSize(ranges); + if (newRows > 0) { + numRows_ += newRows; + for (int32_t i = 0; i < vector->childrenSize(); ++i) { + auto child = vector->childAt(i).get(); + if (child->encoding() == VectorEncoding::Simple::DICTIONARY && + child->rawNulls()) { + serializeColumn(child, ranges, streams_[i].get()); + } else { + serializeEncodedColumn(child, ranges, streams_[i].get()); + } + } } } @@ -3281,12 +3485,7 @@ std::unique_ptr PrestoVectorSerde::createSerializer( const Options* options) { const auto prestoOptions = toPrestoOptions(options); return std::make_unique( - type, - prestoOptions.encodings, - numRows, - streamArena, - prestoOptions.useLosslessTimestamp, - prestoOptions.compressionKind); + type, prestoOptions.encodings, numRows, streamArena, prestoOptions); } void PrestoVectorSerde::serializeEncoded( @@ -3300,6 +3499,31 @@ void PrestoVectorSerde::serializeEncoded( static_cast(serializer.get()) ->flushEncoded(vector, out); } +namespace { +void readTopColumns( + ByteInputStream& source, + const RowTypePtr& type, + velox::memory::MemoryPool* pool, + const RowVectorPtr& result, + int32_t resultOffset, + const SerdeOpts& opts) { + auto& children = result->children(); + const auto& childTypes = type->asRow().children(); + const auto numColumns = source.read(); + VELOX_USER_CHECK_EQ( + numColumns, + type->size(), + "Number of columns in serialized data doesn't match " + "number of columns requested for deserialization"); + + readColumns( + &source, childTypes, resultOffset, nullptr, 0, pool, opts, children); + if (!opts.nullsFirst) { + scatterStructNulls( + result->size(), 0, nullptr, nullptr, *result, resultOffset); + } +} +} // namespace void PrestoVectorSerde::deserialize( ByteInputStream* source, @@ -3351,18 +3575,8 @@ void PrestoVectorSerde::deserialize( common::compressionKindToString( common::codecTypeToCompressionKind(codec->type()))); - auto& children = (*result)->children(); - const auto& childTypes = type->asRow().children(); if (!needCompression(*codec)) { - const auto numColumns = source->read(); - // TODO Fix call sites and tighten the check to _EQ. - VELOX_USER_CHECK_GE( - numColumns, - type->size(), - "Number of columns in serialized data doesn't match " - "number of columns requested for deserialization"); - readColumns( - source, pool, childTypes, children, resultOffset, useLosslessTimestamp); + readTopColumns(*source, type, pool, *result, resultOffset, prestoOptions); } else { auto compressBuf = folly::IOBuf::create(compressedSize); source->readBytes(compressBuf->writableData(), compressedSize); @@ -3372,33 +3586,9 @@ void PrestoVectorSerde::deserialize( uncompress->writableData(), (int32_t)uncompress->length(), 0}; ByteInputStream uncompressedSource({byteRange}); - const auto numColumns = uncompressedSource.read(); - VELOX_USER_CHECK_EQ( - numColumns, - type->size(), - "Number of columns in serialized data doesn't match " - "number of columns requested for deserialization"); - readColumns( - &uncompressedSource, - pool, - childTypes, - children, - resultOffset, - useLosslessTimestamp); + readTopColumns( + uncompressedSource, type, pool, *result, resultOffset, prestoOptions); } - - scatterStructNulls( - (*result)->size(), 0, nullptr, nullptr, **result, resultOffset); -} - -void testingScatterStructNulls( - vector_size_t size, - vector_size_t scatterSize, - const vector_size_t* scatter, - const uint64_t* incomingNulls, - RowVector& row, - vector_size_t rowOffset) { - scatterStructNulls(size, scatterSize, scatter, incomingNulls, row, rowOffset); } // static diff --git a/velox/serializers/PrestoSerializer.h b/velox/serializers/PrestoSerializer.h index d238f78117dd..9b297fe702b2 100644 --- a/velox/serializers/PrestoSerializer.h +++ b/velox/serializers/PrestoSerializer.h @@ -29,19 +29,29 @@ class PrestoVectorSerde : public VectorSerde { PrestoOptions( bool _useLosslessTimestamp, - common::CompressionKind _compressionKind) + common::CompressionKind _compressionKind, + bool _nullsFirst = false) : useLosslessTimestamp(_useLosslessTimestamp), - compressionKind(_compressionKind) {} + compressionKind(_compressionKind), + nullsFirst(_nullsFirst) {} /// Currently presto only supports millisecond precision and the serializer /// converts velox native timestamp to that resulting in loss of precision. /// This option allows it to serialize with nanosecond precision and is /// currently used for spilling. Is false by default. bool useLosslessTimestamp{false}; + common::CompressionKind compressionKind{ common::CompressionKind::CompressionKind_NONE}; /// Specifies the encoding for each of the top-level child vector. std::vector encodings; + + /// Serializes nulls of structs before the columns. Used to allow + /// single pass reading of in spilling. + /// + /// TODO: Make Presto also serialize nulls before columns of + /// structs. + bool nullsFirst{false}; }; /// Adds the serialized sizes of the rows of 'vector' in 'ranges[i]' to @@ -99,15 +109,6 @@ class PrestoVectorSerde : public VectorSerde { static void registerVectorSerde(); }; -// Testing function for nested encodings. See comments in scatterStructNulls(). -void testingScatterStructNulls( - vector_size_t size, - vector_size_t scatterSize, - const vector_size_t* scatter, - const uint64_t* incomingNulls, - RowVector& row, - vector_size_t rowOffset); - class PrestoOutputStreamListener : public OutputStreamListener { public: void onWrite(const char* s, std::streamsize count) override { diff --git a/velox/serializers/tests/PrestoSerializerTest.cpp b/velox/serializers/tests/PrestoSerializerTest.cpp index 3bcdf490cae2..9f99e6f4763f 100644 --- a/velox/serializers/tests/PrestoSerializerTest.cpp +++ b/velox/serializers/tests/PrestoSerializerTest.cpp @@ -72,8 +72,11 @@ class PrestoSerializerTest const bool useLosslessTimestamp = serdeOptions == nullptr ? false : serdeOptions->useLosslessTimestamp; common::CompressionKind kind = GetParam(); + const bool nullsFirst = + serdeOptions == nullptr ? false : serdeOptions->nullsFirst; serializer::presto::PrestoVectorSerde::PrestoOptions paramOptions{ - useLosslessTimestamp, kind}; + useLosslessTimestamp, kind, nullsFirst}; + return paramOptions; } @@ -262,7 +265,12 @@ class PrestoSerializerTest auto paramOptions = getParamSerdeOptions(serdeOptions); for (const auto& child : rowVector->children()) { - paramOptions.encodings.push_back(child->encoding()); + auto encoding = child->encoding(); + if (encoding == VectorEncoding::Simple::DICTIONARY && child->rawNulls()) { + paramOptions.encodings.push_back(VectorEncoding::Simple::FLAT); + } else { + paramOptions.encodings.push_back(encoding); + } } serde_->serializeEncoded(rowVector, &arena, ¶mOptions, &out); @@ -305,6 +313,66 @@ class PrestoSerializerTest assertEqualVectors(expected, result); } + void makePermutations( + const std::vector& vectors, + int32_t size, + std::vector& items, + std::vector>& result) { + if (size == items.size()) { + result.push_back(items); + return; + } + for (const auto& vector : vectors) { + items.push_back(vector); + makePermutations(vectors, size, items, result); + items.pop_back(); + } + } + + // tests combining encodings in serialization and deserialization. Serializes + // each of 'vectors' with encoding, then reads them back into a single vector. + void testEncodedConcatenation( + std::vector& vectors, + const serializer::presto::PrestoVectorSerde::PrestoOptions* serdeOptions = + nullptr) { + std::vector pieces; + auto rowType = ROW({{"f", vectors[0]->type()}}); + auto concatenation = BaseVector::create(rowType, 0, pool_.get()); + auto arena = std::make_unique(pool_.get()); + auto paramOptions = getParamSerdeOptions(serdeOptions); + auto serializer = + serde_->createSerializer(rowType, 10, arena.get(), ¶mOptions); + + for (const auto& vector : vectors) { + auto data = makeRowVector({"f"}, {vector}); + concatenation->append(data.get()); + std::ostringstream out; + serializeEncoded(data, &out, ¶mOptions); + pieces.push_back(out.str()); + serializer->append(data); + } + facebook::velox::serializer::presto::PrestoOutputStreamListener listener; + std::ostringstream allOut; + OStreamOutputStream allOutStream(&allOut, &listener); + serializer->flush(&allOutStream); + + auto allDeserialized = deserialize(rowType, allOut.str(), ¶mOptions); + assertEqualVectors(allDeserialized, concatenation); + RowVectorPtr deserialized = + BaseVector::create(rowType, 0, pool_.get()); + for (auto& piece : pieces) { + auto byteStream = toByteStream(piece); + serde_->deserialize( + &byteStream, + pool_.get(), + rowType, + &deserialized, + deserialized->size(), + ¶mOptions); + } + assertEqualVectors(concatenation, deserialized); + } + std::unique_ptr serde_; }; @@ -521,56 +589,6 @@ TEST_P(PrestoSerializerTest, encodings) { testEncodedRoundTrip(data); } -TEST_P(PrestoSerializerTest, scatterEncoded) { - // Makes a struct with nulls and constant/dictionary encoded children. The - // children need to get gaps where the parent struct has a null. - VectorFuzzer::Options opts; - opts.timestampPrecision = - VectorFuzzer::Options::TimestampPrecision::kMilliSeconds; - opts.nullRatio = 0.1; - VectorFuzzer fuzzer(opts, pool_.get()); - - auto rowType = ROW( - {{"inner", - ROW( - {{"i1", BIGINT()}, - {"i2", VARCHAR()}, - {"i3", ARRAY(INTEGER())}, - {"i4", ROW({{"ii1", BIGINT()}})}})}}); - auto row = fuzzer.fuzzInputRow(rowType); - auto inner = - const_cast(row->childAt(0)->wrappedVector()->as()); - if (!inner->mayHaveNulls()) { - return; - } - auto numNulls = BaseVector::countNulls(inner->nulls(), 0, inner->size()); - auto numNonNull = inner->size() - numNulls; - auto indices = makeIndices(numNonNull, [](auto row) { return row; }); - - inner->children()[0] = BaseVector::createConstant( - BIGINT(), - variant::create(11L), - numNonNull, - pool_.get()); - inner->children()[1] = BaseVector::wrapInDictionary( - BufferPtr(nullptr), indices, numNonNull, inner->childAt(1)); - inner->children()[2] = - BaseVector::wrapInConstant(numNonNull, 3, inner->childAt(2)); - - // i4 is a struct that we wrap in constant. We make ths struct like it was - // read from seriailization, needing scatter for struct nulls. - auto i4 = const_cast( - inner->childAt(3)->wrappedVector()->as()); - auto i4NonNull = i4->mayHaveNulls() - ? i4->size() - BaseVector::countNulls(i4->nulls(), 0, i4->size()) - : i4->size(); - i4->childAt(0)->resize(i4NonNull); - inner->children()[3] = - BaseVector::wrapInConstant(numNonNull, 3, inner->childAt(3)); - serializer::presto::testingScatterStructNulls( - row->size(), row->size(), nullptr, nullptr, *row, 0); -} - TEST_P(PrestoSerializerTest, lazy) { constexpr int kSize = 1000; auto rowVector = makeTestVector(kSize); @@ -589,7 +607,7 @@ TEST_P(PrestoSerializerTest, ioBufRoundTrip) { opts.nullRatio = 0.1; VectorFuzzer fuzzer(opts, pool_.get()); - const size_t numRounds = 20; + const size_t numRounds = 100; for (size_t i = 0; i < numRounds; ++i) { auto rowType = fuzzer.randRowType(); @@ -613,14 +631,80 @@ TEST_P(PrestoSerializerTest, roundTrip) { nonNullOpts.nullRatio = 0; VectorFuzzer nonNullFuzzer(nonNullOpts, pool_.get()); - const size_t numRounds = 20; + const size_t numRounds = 100; for (size_t i = 0; i < numRounds; ++i) { auto rowType = fuzzer.randRowType(); - auto inputRowVector = (i % 2 == 0) ? fuzzer.fuzzInputRow(rowType) : nonNullFuzzer.fuzzInputRow(rowType); - testRoundTrip(inputRowVector); + serializer::presto::PrestoVectorSerde::PrestoOptions prestoOpts; + // Test every 2/4 with struct nulls first. + prestoOpts.nullsFirst = i % 4 < 2; + testRoundTrip(inputRowVector, &prestoOpts); + } +} + +TEST_P(PrestoSerializerTest, encodedRoundtrip) { + VectorFuzzer::Options opts; + opts.timestampPrecision = + VectorFuzzer::Options::TimestampPrecision::kMilliSeconds; + opts.nullRatio = 0.1; + opts.dictionaryHasNulls = false; + VectorFuzzer fuzzer(opts, pool_.get()); + + const size_t numRounds = 200; + + for (size_t i = 0; i < numRounds; ++i) { + auto rowType = fuzzer.randRowType(); + auto inputRowVector = fuzzer.fuzzInputRow(rowType); + serializer::presto::PrestoVectorSerde::PrestoOptions serdeOpts; + serdeOpts.nullsFirst = true; + testEncodedRoundTrip(inputRowVector, &serdeOpts); + } +} + +TEST_P(PrestoSerializerTest, encodedConcatenation) { + // Slow test, run only for no compression. + if (GetParam() != common::CompressionKind::CompressionKind_NONE) { + return; + } + + std::vector types = {ROW({{"s0", VARCHAR()}}), VARCHAR()}; + VectorFuzzer::Options nonNullOpts{.nullRatio = 0}; + VectorFuzzer nonNullFuzzer(nonNullOpts, pool_.get()); + VectorFuzzer::Options nullOpts{.nullRatio = 1}; + VectorFuzzer nullFuzzer(nullOpts, pool_.get()); + VectorFuzzer::Options mixedOpts{.nullRatio = 0.5}; + VectorFuzzer mixedFuzzer(mixedOpts, pool_.get()); + for (const auto& type : types) { + auto flatNull = nullFuzzer.fuzzFlat(type, 12); + auto flatNonNull = nonNullFuzzer.fuzzFlat(type, 13); + + std::vector vectors = { + nullFuzzer.fuzzConstant(type, 10), + nonNullFuzzer.fuzzConstant(type, 11), + flatNonNull, + flatNull, + nonNullFuzzer.fuzzDictionary(flatNonNull), + nonNullFuzzer.fuzzDictionary(flatNull), + nullFuzzer.fuzzDictionary(flatNull)}; + if (type->isRow()) { + auto& rowType = type->as(); + auto row = makeRowVector( + {rowType.nameOf(0)}, + {nonNullFuzzer.fuzzConstant(rowType.childAt(0), 14)}); + row->setNull(2, true); + row->setNull(10, true); + vectors.push_back(row); + } + std::vector> permutations; + std::vector temp; + makePermutations(vectors, 4, temp, permutations); + for (auto i = 0; i < permutations.size(); ++i) { + serializer::presto::PrestoVectorSerde::PrestoOptions opts; + opts.nullsFirst = true; + testEncodedConcatenation(permutations[i], &opts); + } } }