From 8a3382a053670889f5d33235114ee2a2dbc07134 Mon Sep 17 00:00:00 2001 From: Orri Erling Date: Sun, 10 Dec 2023 06:48:33 -0800 Subject: [PATCH] Scatter struct nulls when deserializing Presto wire format --- velox/exec/SpillFile.cpp | 4 +- velox/serializers/PrestoSerializer.cpp | 1012 ++++++++++------- velox/serializers/PrestoSerializer.h | 12 +- .../tests/PrestoSerializerTest.cpp | 81 +- 4 files changed, 611 insertions(+), 498 deletions(-) diff --git a/velox/exec/SpillFile.cpp b/velox/exec/SpillFile.cpp index 38ac8c2f2bd9e..127d70471297b 100644 --- a/velox/exec/SpillFile.cpp +++ b/velox/exec/SpillFile.cpp @@ -174,7 +174,7 @@ uint64_t SpillWriter::write( MicrosecondTimer timer(&timeUs); if (batch_ == nullptr) { serializer::presto::PrestoVectorSerde::PrestoOptions options = { - kDefaultUseLosslessTimestamp, compressionKind_}; + kDefaultUseLosslessTimestamp, compressionKind_, true}; batch_ = std::make_unique(pool_); batch_->createStreamTree( std::static_pointer_cast(rows->type()), @@ -290,7 +290,7 @@ SpillReadFile::SpillReadFile( numSortKeys_(numSortKeys), sortCompareFlags_(sortCompareFlags), compressionKind_(compressionKind), - readOptions_{kDefaultUseLosslessTimestamp, compressionKind_}, + readOptions_{kDefaultUseLosslessTimestamp, compressionKind_, true}, pool_(pool) { constexpr uint64_t kMaxReadBufferSize = (1 << 20) - AlignedBuffer::kPaddedSize; // 1MB - padding. diff --git a/velox/serializers/PrestoSerializer.cpp b/velox/serializers/PrestoSerializer.cpp index 53747c6f4d4a5..4b403c74a6e58 100644 --- a/velox/serializers/PrestoSerializer.cpp +++ b/velox/serializers/PrestoSerializer.cpp @@ -25,6 +25,9 @@ #include "velox/vector/VectorTypeUtils.h" namespace facebook::velox::serializer::presto { + +using SerdeOpts = PrestoVectorSerde::PrestoOptions; + namespace { constexpr int8_t kCompressedBitMask = 1; constexpr int8_t kEncryptedBitMask = 2; @@ -148,6 +151,23 @@ FOLLY_ALWAYS_INLINE bool needCompression(const folly::io::Codec& codec) { return codec.type() != folly::io::CodecType::NO_COMPRESSION; } +using StructNullsMap = + folly::F14FastMap, int32_t>>; + +auto& structNullsMap() { + thread_local std::unique_ptr map; + return map; +} + +std::pair getStructNulls(int64_t position) { + auto& map = structNullsMap(); + auto it = map->find(position); + if (it == map->end()) { + return {nullptr, 0}; + } + return {it->second.first.data(), it->second.second}; +} + template void readValues( ByteInputStream* source, @@ -304,21 +324,40 @@ void readDecimalValues( } } +vector_size_t sizeWithIncomingNulls( + vector_size_t size, + int32_t numIncomingNulls) { + return numIncomingNulls == 0 ? size : numIncomingNulls; +} + vector_size_t readNulls( ByteInputStream* source, vector_size_t size, BaseVector& result, - vector_size_t resultOffset) { + vector_size_t resultOffset, + const uint64_t* incomingNulls, + int32_t numIncomingNulls) { + VELOX_DCHECK_LE( + result.size(), resultOffset + (incomingNulls ? numIncomingNulls : size)); if (source->readByte() == 0) { - result.clearNulls(resultOffset, resultOffset + size); - return 0; + if (incomingNulls) { + auto rawNulls = result.mutableRawNulls(); + bits::copyBits( + incomingNulls, 0, rawNulls, resultOffset, numIncomingNulls); + } else { + result.clearNulls(resultOffset, resultOffset + size); + } + return incomingNulls + ? numIncomingNulls - bits::countBits(incomingNulls, 0, numIncomingNulls) + : 0; } const bool noPriorNulls = (result.rawNulls() == nullptr); + const auto numNewValues = sizeWithIncomingNulls(size, numIncomingNulls); // Allocate one extra byte in case we cannot use bits from the current last // partial byte. - BufferPtr& nulls = result.mutableNulls(resultOffset + size + 8); + BufferPtr& nulls = result.mutableNulls(resultOffset + numNewValues + 8); if (noPriorNulls) { bits::fillBits( nulls->asMutable(), 0, resultOffset, bits::kNotNull); @@ -330,6 +369,15 @@ vector_size_t readNulls( source->readBytes(rawNulls, numBytes); bits::reverseBits(rawNulls, numBytes); bits::negate(reinterpret_cast(rawNulls), numBytes * 8); + // Add incoming nulls if any. + if (incomingNulls) { + bits::scatterBits( + size, + numIncomingNulls, + reinterpret_cast(rawNulls), + incomingNulls, + reinterpret_cast(rawNulls)); + } // Shift bits if needed. if (bits::nbytes(resultOffset) * 8 > resultOffset) { @@ -338,10 +386,11 @@ vector_size_t readNulls( bits::nbytes(resultOffset) * 8, nulls->asMutable(), resultOffset, - size); + numNewValues); } - return BaseVector::countNulls(nulls, resultOffset, resultOffset + size); + return BaseVector::countNulls( + nulls, resultOffset, resultOffset + numNewValues); } template @@ -351,28 +400,47 @@ void read( velox::memory::MemoryPool* pool, VectorPtr& result, vector_size_t resultOffset, - bool useLosslessTimestamp) { + const SerdeOpts& opts, + const uint64_t* incomingNulls, + int32_t numIncomingNulls) { const int32_t size = source->read(); - result->resize(resultOffset + size); + const auto numNewValues = sizeWithIncomingNulls(size, numIncomingNulls); + result->resize(resultOffset + numNewValues); auto flatResult = result->asFlatVector(); - auto nullCount = readNulls(source, size, *flatResult, resultOffset); + auto nullCount = readNulls( + source, size, *flatResult, resultOffset, incomingNulls, numIncomingNulls); - BufferPtr values = flatResult->mutableValues(resultOffset + size); + BufferPtr values = flatResult->mutableValues(resultOffset + numNewValues); if constexpr (std::is_same_v) { - if (useLosslessTimestamp) { + if (opts.useLosslessTimestamp) { readLosslessTimestampValues( - source, size, resultOffset, flatResult->nulls(), nullCount, values); + source, + numNewValues, + resultOffset, + flatResult->nulls(), + nullCount, + values); return; } } if (type->isLongDecimal()) { readDecimalValues( - source, size, resultOffset, flatResult->nulls(), nullCount, values); + source, + numNewValues, + resultOffset, + flatResult->nulls(), + nullCount, + values); return; } readValues( - source, size, resultOffset, flatResult->nulls(), nullCount, values); + source, + numNewValues, + resultOffset, + flatResult->nulls(), + nullCount, + values); } template <> @@ -382,20 +450,29 @@ void read( velox::memory::MemoryPool* pool, VectorPtr& result, vector_size_t resultOffset, - bool useLosslessTimestamp) { + const SerdeOpts& opts, + const uint64_t* incomingNulls, + int32_t numIncomingNulls) { const int32_t size = source->read(); + const int32_t numNewValues = sizeWithIncomingNulls(size, numIncomingNulls); - result->resize(resultOffset + size); + result->resize(resultOffset + numNewValues); auto flatResult = result->as>(); BufferPtr values = flatResult->mutableValues(resultOffset + size); auto rawValues = values->asMutable(); - for (int32_t i = 0; i < size; ++i) { + int32_t lastOffset = 0; + for (int32_t i = 0; i < numNewValues; ++i) { // Set the first int32_t of each StringView to be the offset. - *reinterpret_cast(&rawValues[resultOffset + i]) = + if (incomingNulls && bits::isBitNull(incomingNulls, i)) { + *reinterpret_cast(&rawValues[resultOffset + i]) = lastOffset; + continue; + } + lastOffset = *reinterpret_cast(&rawValues[resultOffset + i]) = source->read(); } - readNulls(source, size, *flatResult, resultOffset); + readNulls( + source, size, *flatResult, resultOffset, incomingNulls, numIncomingNulls); const int32_t dataSize = source->read(); if (dataSize == 0) { @@ -408,7 +485,7 @@ void read( source->readBytes(rawStrings, dataSize); int32_t previousOffset = 0; auto rawChars = reinterpret_cast(rawStrings); - for (int32_t i = 0; i < size; ++i) { + for (int32_t i = 0; i < numNewValues; ++i) { int32_t offset = rawValues[resultOffset + i].size(); rawValues[resultOffset + i] = StringView(rawChars + previousOffset, offset - previousOffset); @@ -422,7 +499,9 @@ void readColumns( const std::vector& types, std::vector& result, vector_size_t resultOffset, - bool useLosslessTimestamp); + const SerdeOpts& opts, + const uint64_t* incomingNulls, + int32_t numIncomingNulls); void readConstantVector( ByteInputStream* source, @@ -430,25 +509,38 @@ void readConstantVector( velox::memory::MemoryPool* pool, VectorPtr& result, vector_size_t resultOffset, - bool useLosslessTimestamp) { + const SerdeOpts& opts, + const uint64_t* incomingNulls, + int32_t numIncomingNulls) { const auto size = source->read(); + const int32_t numNewValues = sizeWithIncomingNulls(size, numIncomingNulls); std::vector childTypes = {type}; std::vector children{BaseVector::create(type, 0, pool)}; - readColumns(source, pool, childTypes, children, 0, useLosslessTimestamp); + readColumns(source, pool, childTypes, children, 0, opts, nullptr, 0); VELOX_CHECK_EQ(1, children[0]->size()); - auto constantVector = BaseVector::wrapInConstant(size, 0, children[0]); - if (resultOffset == 0) { + auto constantVector = + BaseVector::wrapInConstant(numNewValues, 0, children[0]); + if (resultOffset == 0 && !incomingNulls) { result = std::move(constantVector); } else { - result->resize(resultOffset + size); + if (!incomingNulls && constantVector->equalValueAt(result.get(), 0, 0)) { + result->resize(resultOffset + numNewValues); + return; + } + result->resize(resultOffset + numNewValues); SelectivityVector rows(resultOffset + size, false); - rows.setValidRange(resultOffset, resultOffset + size, true); + rows.setValidRange(resultOffset, resultOffset + numNewValues, true); rows.updateBounds(); BaseVector::ensureWritable(rows, type, pool, result); - result->copy(constantVector.get(), resultOffset, 0, size); + result->copy(constantVector.get(), resultOffset, 0, numNewValues); + if (incomingNulls) { + bits::forEachUnsetBit(incomingNulls, 0, numNewValues, [&](auto row) { + result->setNull(resultOffset + row, true); + }); + } } } @@ -458,34 +550,57 @@ void readDictionaryVector( velox::memory::MemoryPool* pool, VectorPtr& result, vector_size_t resultOffset, - bool useLosslessTimestamp) { + const SerdeOpts& opts, + const uint64_t* incomingNulls, + int32_t numIncomingNulls) { const auto size = source->read(); + const int32_t numNewValues = sizeWithIncomingNulls(size, numIncomingNulls); std::vector childTypes = {type}; std::vector children{BaseVector::create(type, 0, pool)}; - readColumns(source, pool, childTypes, children, 0, useLosslessTimestamp); + readColumns(source, pool, childTypes, children, 0, opts, nullptr, 0); // Read indices. - BufferPtr indices = allocateIndices(size, pool); - source->readBytes(indices->asMutable(), size * sizeof(int32_t)); + BufferPtr indices = allocateIndices(numNewValues, pool); + if (incomingNulls) { + auto rawIndices = indices->asMutable(); + for (auto i = 0; i < numNewValues; ++i) { + if (bits::isBitNull(incomingNulls, i)) { + rawIndices[i] = 0; + } else { + rawIndices[i] = source->read(); + } + } + } else { + source->readBytes( + indices->asMutable(), numNewValues * sizeof(int32_t)); + } // Skip 3 * 8 bytes of 'instance id'. Velox doesn't use 'instance id' for // dictionary vectors. source->skip(24); - auto dictionaryVector = - BaseVector::wrapInDictionary(nullptr, indices, size, children[0]); + BufferPtr incomingNullsBuffer = nullptr; + if (incomingNulls) { + incomingNullsBuffer = AlignedBuffer::allocate(numIncomingNulls, pool); + memcpy( + incomingNullsBuffer->asMutable(), + incomingNulls, + bits::nbytes(numIncomingNulls)); + } + auto dictionaryVector = BaseVector::wrapInDictionary( + incomingNullsBuffer, indices, size, children[0]); if (resultOffset == 0) { result = std::move(dictionaryVector); } else { - result->resize(resultOffset + size); + result->resize(resultOffset + numNewValues); - SelectivityVector rows(resultOffset + size, false); - rows.setValidRange(resultOffset, resultOffset + size, true); + SelectivityVector rows(resultOffset + numNewValues, false); + rows.setValidRange(resultOffset, resultOffset + numNewValues, true); rows.updateBounds(); BaseVector::ensureWritable(rows, type, pool, result); - result->copy(dictionaryVector.get(), resultOffset, 0, size); + result->copy(dictionaryVector.get(), resultOffset, 0, numNewValues); } } @@ -495,7 +610,9 @@ void readArrayVector( velox::memory::MemoryPool* pool, VectorPtr& result, vector_size_t resultOffset, - bool useLosslessTimestamp) { + const SerdeOpts& opts, + const uint64_t* incomingNulls, + int32_t numIncomingNulls) { ArrayVector* arrayVector = result->as(); const auto resultElementsOffset = arrayVector->elements()->size(); @@ -508,25 +625,39 @@ void readArrayVector( childTypes, children, resultElementsOffset, - useLosslessTimestamp); + opts, + nullptr, + 0); - vector_size_t size = source->read(); - arrayVector->resize(resultOffset + size); + const vector_size_t size = source->read(); + const auto numNewValues = sizeWithIncomingNulls(size, numIncomingNulls); + arrayVector->resize(resultOffset + numNewValues); arrayVector->setElements(children[0]); - BufferPtr offsets = arrayVector->mutableOffsets(resultOffset + size); + BufferPtr offsets = arrayVector->mutableOffsets(resultOffset + numNewValues); auto rawOffsets = offsets->asMutable(); - BufferPtr sizes = arrayVector->mutableSizes(resultOffset + size); + BufferPtr sizes = arrayVector->mutableSizes(resultOffset + numNewValues); auto rawSizes = sizes->asMutable(); int32_t base = source->read(); - for (int32_t i = 0; i < size; ++i) { + for (int32_t i = 0; i < numNewValues; ++i) { + if (incomingNulls && bits::isBitNull(incomingNulls, i)) { + rawOffsets[resultOffset + i] = 0; + rawSizes[resultOffset + i] = 0; + continue; + } int32_t offset = source->read(); rawOffsets[resultOffset + i] = resultElementsOffset + base; rawSizes[resultOffset + i] = offset - base; base = offset; } - readNulls(source, size, *arrayVector, resultOffset); + readNulls( + source, + size, + *arrayVector, + resultOffset, + incomingNulls, + numIncomingNulls); } void readMapVector( @@ -535,7 +666,9 @@ void readMapVector( velox::memory::MemoryPool* pool, VectorPtr& result, vector_size_t resultOffset, - bool useLosslessTimestamp) { + const SerdeOpts& opts, + const uint64_t* incomingNulls, + int32_t numIncomingNulls) { MapVector* mapVector = result->as(); const auto resultElementsOffset = mapVector->mapKeys()->size(); std::vector childTypes = {type->childAt(0), type->childAt(1)}; @@ -546,7 +679,9 @@ void readMapVector( childTypes, children, resultElementsOffset, - useLosslessTimestamp); + opts, + nullptr, + 0); int32_t hashTableSize = source->read(); if (hashTableSize != -1) { @@ -554,23 +689,31 @@ void readMapVector( source->skip(hashTableSize * sizeof(int32_t)); } - vector_size_t size = source->read(); - mapVector->resize(resultOffset + size); + const vector_size_t size = source->read(); + const vector_size_t numNewValues = + sizeWithIncomingNulls(size, numIncomingNulls); + mapVector->resize(resultOffset + numNewValues); mapVector->setKeysAndValues(children[0], children[1]); - BufferPtr offsets = mapVector->mutableOffsets(resultOffset + size); + BufferPtr offsets = mapVector->mutableOffsets(resultOffset + numNewValues); auto rawOffsets = offsets->asMutable(); - BufferPtr sizes = mapVector->mutableSizes(resultOffset + size); + BufferPtr sizes = mapVector->mutableSizes(resultOffset + numNewValues); auto rawSizes = sizes->asMutable(); int32_t base = source->read(); - for (int32_t i = 0; i < size; ++i) { + for (int32_t i = 0; i < numNewValues; ++i) { + if (incomingNulls && bits::isBitNull(incomingNulls, i)) { + rawOffsets[resultOffset + i] = 0; + rawSizes[resultOffset + i] = 0; + continue; + } int32_t offset = source->read(); rawOffsets[resultOffset + i] = resultElementsOffset + base; rawSizes[resultOffset + i] = offset - base; base = offset; } - readNulls(source, size, *mapVector, resultOffset); + readNulls( + source, size, *mapVector, resultOffset, incomingNulls, numIncomingNulls); } int64_t packTimestampWithTimeZone(int64_t timestamp, int16_t timezone) { @@ -589,9 +732,21 @@ void readTimestampWithTimeZone( ByteInputStream* source, velox::memory::MemoryPool* pool, RowVector* result, - vector_size_t resultOffset) { + vector_size_t resultOffset, + const uint64_t* incomingNulls, + int32_t numIncomingNulls) { + SerdeOpts opts; + opts.useLosslessTimestamp = false; auto& timestamps = result->childAt(0); - read(source, BIGINT(), pool, timestamps, resultOffset, false); + read( + source, + BIGINT(), + pool, + timestamps, + resultOffset, + opts, + incomingNulls, + numIncomingNulls); auto rawTimestamps = timestamps->asFlatVector()->mutableRawValues(); @@ -614,338 +769,85 @@ void readTimestampWithTimeZone( } } -template -void scatterValues( - int32_t numValues, - const vector_size_t* indices, - T* data, - vector_size_t offset) { - for (auto index = numValues - 1; index >= 0; --index) { - const auto destination = indices[index]; - if (destination == offset + index) { - break; - } - data[destination] = data[offset + index]; - } -} - -template -void scatterFlatValues( - vector_size_t scatterSize, - const vector_size_t* scatter, - BaseVector& vector, - vector_size_t offset) { - using T = typename TypeTraits::NativeType; - auto* values = vector.asUnchecked>()->mutableRawValues(); - scatterValues(scatterSize, scatter, values, offset); -} - -template <> -void scatterFlatValues( - vector_size_t scatterSize, - const vector_size_t* scatter, - BaseVector& vector, - vector_size_t offset) { - auto* values = const_cast(vector.values()->as()); - - for (auto index = scatterSize - 1; index >= 0; --index) { - const auto destination = scatter[index]; - if (destination == offset + index) { - break; - } - bits::setBit(values, destination, bits::isBitSet(values, offset + index)); - } -} - -void scatterStructNulls( - vector_size_t size, - vector_size_t scatterSize, - const vector_size_t* scatter, - const uint64_t* incomingNulls, - RowVector& row, - vector_size_t rowOffset); - -// Scatters existing nulls and adds 'incomingNulls' to the gaps. 'oldSize' is -// the number of valid bits in the nulls of 'vector'. 'vector' must have been -// resized to the new size before calling this. -void scatterNulls( - vector_size_t oldSize, - const uint64_t* incomingNulls, - BaseVector& vector) { - const bool hasNulls = vector.mayHaveNulls(); - const auto size = vector.size(); - - if (hasNulls) { - auto bits = reinterpret_cast(vector.mutableRawNulls()); - bits::scatterBits(oldSize, size, bits, incomingNulls, bits); - } else { - memcpy(vector.mutableRawNulls(), incomingNulls, bits::nbytes(size)); - } -} - -// Scatters all rows in 'vector' starting from 'offset'. All these rows are -// expected to be non-null before this call. vector[offset + i] is copied into -// vector[offset + scatter[i]] for all i in [0, scatterSize). The gaps are -// filled with nulls. scatter[i] >= i for all i. scatter[i] >= scatter[j] for -// all i and j. -// -// @param size Size of the 'vector' after scatter. Includes trailing nulls. -// @param scatterSize Number of rows to scatter, starting from 'offset'. -// @param scatter Destination row numbers. A total of 'scatterSize' entries. -// @param incomingNulls A bitmap representation of the 'scatter' covering 'size' -// rows. First 'offset' bits should be kNotNull. -// @param vector Vector to modify. -// @param offset First row to scatter. -void scatterVector( - int32_t size, - vector_size_t scatterSize, - const vector_size_t* scatter, - const uint64_t* incomingNulls, - VectorPtr& vector, - vector_size_t offset) { - const auto oldSize = vector->size(); - if (scatter != nullptr && scatterSize > 0) { - // 'scatter' should have an entry for every row in 'vector' starting from - // 'offset'. - VELOX_CHECK_EQ(vector->size(), scatterSize + offset); - - // The new vector size should cover all 'scatter' destinations and - // trailing nulls. - VELOX_CHECK_GE(size, scatter[scatterSize - 1] + 1); - } - - if (vector->encoding() == VectorEncoding::Simple::ROW) { - vector->asUnchecked()->unsafeResize(size); - } else { - vector->resize(size); - } - - switch (vector->encoding()) { - case VectorEncoding::Simple::DICTIONARY: { - VELOX_CHECK_EQ(0, offset, "Result offset is not supported yet") - if (incomingNulls) { - auto dictIndices = - const_cast(vector->wrapInfo()->as()); - scatterValues(scatterSize, scatter, dictIndices, 0); - scatterNulls(oldSize, incomingNulls, *vector); - } - auto values = vector->valueVector(); - scatterVector(values->size(), 0, nullptr, nullptr, values, 0); - break; - } - case VectorEncoding::Simple::CONSTANT: { - VELOX_CHECK_EQ(0, offset, "Result offset is not supported yet") - auto values = vector->valueVector(); - if (values) { - scatterVector(values->size(), 0, nullptr, nullptr, values, 0); - } - - if (incomingNulls) { - BaseVector::ensureWritable( - SelectivityVector::empty(), vector->type(), vector->pool(), vector); - scatterNulls(oldSize, incomingNulls, *vector); - } - break; - } - case VectorEncoding::Simple::ARRAY: { - auto* array = vector->asUnchecked(); - if (incomingNulls) { - auto offsets = const_cast(array->rawOffsets()); - auto sizes = const_cast(array->rawSizes()); - scatterValues(scatterSize, scatter, offsets, offset); - scatterValues(scatterSize, scatter, sizes, offset); - scatterNulls(oldSize, incomingNulls, *vector); - } - // Find the offset from which 'new' array elements start assuming that - // array elements are written in order. - vector_size_t elementOffset = 0; - for (auto i = offset - 1; i >= 0; --i) { - if (!array->isNullAt(i)) { - elementOffset = array->offsetAt(i) + array->sizeAt(i); - break; - } - } - auto elements = array->elements(); - scatterVector( - elements->size(), 0, nullptr, nullptr, elements, elementOffset); - break; - } - case VectorEncoding::Simple::MAP: { - auto* map = vector->asUnchecked(); - if (incomingNulls) { - auto offsets = const_cast(map->rawOffsets()); - auto sizes = const_cast(map->rawSizes()); - scatterValues(scatterSize, scatter, offsets, offset); - scatterValues(scatterSize, scatter, sizes, offset); - scatterNulls(oldSize, incomingNulls, *vector); - } - // Find out the offset from which 'new' map keys and values start assuming - // that map elements are written in order. - vector_size_t elementOffset = 0; - for (auto i = offset - 1; i >= 0; --i) { - if (!map->isNullAt(i)) { - elementOffset = map->offsetAt(i) + map->sizeAt(i); - break; - } - } - auto keys = map->mapKeys(); - scatterVector(keys->size(), 0, nullptr, nullptr, keys, elementOffset); - auto values = map->mapValues(); - scatterVector(values->size(), 0, nullptr, nullptr, values, elementOffset); - break; - } - case VectorEncoding::Simple::ROW: { - auto* row = vector->asUnchecked(); - scatterStructNulls(row->size(), 0, nullptr, nullptr, *row, offset); - break; - } - case VectorEncoding::Simple::FLAT: { - if (incomingNulls) { - VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH_ALL( - scatterFlatValues, - vector->typeKind(), - scatterSize, - scatter, - *vector, - offset); - scatterNulls(oldSize, incomingNulls, *vector); - } - break; - } - default: - VELOX_FAIL("Unsupported encoding in scatter: {}", vector->encoding()); - } -} - -// A RowVector with nulls is serialized with children having a value -// only for rows where the struct is non-null. After deserializing, -// we do an extra traversal to add gaps into struct members where -// the containing struct is null. As we go down the struct tree, we -// merge child struct nulls into the nulls from enclosing structs so -// that the leaves only get scattered once, considering all nulls -// from all enclosing structs. -void scatterStructNulls( - vector_size_t size, - vector_size_t scatterSize, - const vector_size_t* scatter, - const uint64_t* incomingNulls, - RowVector& row, - vector_size_t rowOffset) { - const auto oldSize = row.size(); - if (isTimestampWithTimeZoneType(row.type())) { - // The timestamp with tz case is special. The child vectors are aligned with - // the struct even if the struct has nulls. - if (incomingNulls) { - scatterVector( - size, scatterSize, scatter, incomingNulls, row.childAt(0), rowOffset); - scatterVector( - size, scatterSize, scatter, incomingNulls, row.childAt(1), rowOffset); - row.unsafeResize(size); - scatterNulls(oldSize, incomingNulls, row); - } - return; - } - - const uint64_t* childIncomingNulls = incomingNulls; - const vector_size_t* childScatter = scatter; - auto childScatterSize = scatterSize; - raw_vector innerScatter; - raw_vector childIncomingNullsVector; - if (auto* rawNulls = row.rawNulls()) { - innerScatter.resize(size); - if (!incomingNulls) { - childIncomingNulls = rawNulls; - if (rowOffset > 0) { - childIncomingNullsVector.resize(bits::nwords(size)); - auto newNulls = childIncomingNullsVector.data(); - memcpy(newNulls, rawNulls, bits::nbytes(size)); - - // Fill in first 'rowOffset' bits with kNotNull to avoid scattering - // nulls for pre-existing rows. - bits::fillBits(newNulls, 0, rowOffset, bits::kNotNull); - childIncomingNulls = newNulls; - } - childScatterSize = simd::indicesOfSetBits( - rawNulls, rowOffset, size, innerScatter.data()); - } else { - childIncomingNullsVector.resize(bits::nwords(size)); - auto newNulls = childIncomingNullsVector.data(); - bits::scatterBits( - row.size(), - size, - reinterpret_cast(rawNulls), - incomingNulls, - reinterpret_cast(newNulls)); - - // Fill in first 'rowOffset' bits with kNotNull to avoid scattering - // nulls for pre-existing rows. - bits::fillBits(newNulls, 0, rowOffset, bits::kNotNull); - childIncomingNulls = newNulls; - childScatterSize = simd::indicesOfSetBits( - childIncomingNulls, rowOffset, size, innerScatter.data()); - } - childScatter = innerScatter.data(); - } - for (auto i = 0; i < row.childrenSize(); ++i) { - auto& child = row.childAt(i); - if (child->encoding() == VectorEncoding::Simple::ROW) { - scatterStructNulls( - size, - childScatterSize, - childScatter, - childIncomingNulls, - *child->asUnchecked(), - rowOffset); - } else { - scatterVector( - size, - childScatterSize, - childScatter, - childIncomingNulls, - row.childAt(i), - rowOffset); - } - } - if (incomingNulls) { - row.unsafeResize(size); - scatterNulls(oldSize, incomingNulls, row); - } - // On return of scatter we check that child sizes match the struct size. This - // is safe also if no scatter. - for (auto i = 0; i < row.childrenSize(); ++i) { - VELOX_CHECK_EQ(row.childAt(i)->size(), row.size()); - } -} - void readRowVector( ByteInputStream* source, const TypePtr& type, velox::memory::MemoryPool* pool, VectorPtr& result, vector_size_t resultOffset, - bool useLosslessTimestamp) { + const SerdeOpts& opts, + const uint64_t* incomingNulls, + int32_t numIncomingNulls) { auto* row = result->as(); if (isTimestampWithTimeZoneType(type)) { - readTimestampWithTimeZone(source, pool, row, resultOffset); + readTimestampWithTimeZone( + source, pool, row, resultOffset, incomingNulls, numIncomingNulls); return; } - + BufferPtr combinedNulls; + const uint64_t* childNulls = incomingNulls; + int32_t numChildNulls = numIncomingNulls; + if (opts.nullsFirst) { + const auto size = source->read(); + const auto numNewValues = sizeWithIncomingNulls(size, numIncomingNulls); + row->resize(resultOffset + numNewValues); + readNulls( + source, size, *result, resultOffset, incomingNulls, numIncomingNulls); + if (row->rawNulls()) { + combinedNulls = AlignedBuffer::allocate(numNewValues, pool); + bits::copyBits( + row->rawNulls(), + resultOffset, + combinedNulls->asMutable(), + 0, + numNewValues); + childNulls = combinedNulls->as(); + numChildNulls = numNewValues; + } + } else { + auto [structNulls, numStructNulls] = getStructNulls(source->tellp()); + // childNulls is the nulls added to the children, i.e. the nulls of this + // struct combined with nulls of enclosing structs. + if (structNulls) { + if (incomingNulls) { + combinedNulls = AlignedBuffer::allocate(numIncomingNulls, pool); + bits::scatterBits( + numStructNulls, + numIncomingNulls, + reinterpret_cast(structNulls), + incomingNulls, + combinedNulls->asMutable()); + childNulls = combinedNulls->as(); + numChildNulls = numIncomingNulls; + } else { + childNulls = structNulls; + numChildNulls = numStructNulls; + } + } + } const int32_t numChildren = source->read(); auto& children = row->children(); const auto& childTypes = type->asRow().children(); readColumns( - source, pool, childTypes, children, resultOffset, useLosslessTimestamp); - - auto size = source->read(); - // Set the size of the row but do not alter the size of the - // children. The children get adjusted in a separate pass over the - // data. The parent and child size MUST be separate until the second pass. - row->BaseVector::resize(resultOffset + size); - for (int32_t i = 0; i <= size; ++i) { - source->read(); + source, + pool, + childTypes, + children, + resultOffset, + opts, + childNulls, + numChildNulls); + if (!opts.nullsFirst) { + const auto size = source->read(); + const auto numNewValues = sizeWithIncomingNulls(size, numIncomingNulls); + row->resize(resultOffset + numNewValues); + // Read and discard the offsets. The number of offsets is not affected by + // incomingNulls. + source->skip((size + 1) * sizeof(int32_t)); + readNulls( + source, size, *result, resultOffset, incomingNulls, numIncomingNulls); } - readNulls(source, size, *result, resultOffset); } std::string readLengthPrefixedString(ByteInputStream* source) { @@ -972,7 +874,9 @@ void readColumns( const std::vector& types, std::vector& results, vector_size_t resultOffset, - bool useLosslessTimestamp) { + const SerdeOpts& opts, + const uint64_t* incomingNulls, + int32_t numIncomingNulls) { static const std::unordered_map< TypeKind, std::function> + const SerdeOpts& opts, + const uint64_t* incomingNulls, + int32_t numIncomingNulls)>> readers = { {TypeKind::BOOLEAN, &read}, {TypeKind::TINYINT, &read}, @@ -1013,7 +919,9 @@ void readColumns( pool, columnResult, resultOffset, - useLosslessTimestamp); + opts, + incomingNulls, + numIncomingNulls); } else if (encoding == kDictionary) { readDictionaryVector( source, @@ -1021,7 +929,9 @@ void readColumns( pool, columnResult, resultOffset, - useLosslessTimestamp); + opts, + incomingNulls, + numIncomingNulls); } else { checkTypeEncoding(encoding, columnType); const auto it = readers.find(columnType->kind()); @@ -1036,7 +946,214 @@ void readColumns( pool, columnResult, resultOffset, - useLosslessTimestamp); + opts, + incomingNulls, + numIncomingNulls); + } + } +} + +// Reads nulls into 'scratch' and returns count of non-nulls. If 'copy' is +// given, returns the null bits in 'copy'. +vector_size_t valueCount( + ByteInputStream* source, + vector_size_t size, + Scratch& scratch, + std::vector* copy = nullptr) { + if (source->readByte() == 0) { + return size; + } + ScratchPtr nullsHolder(scratch); + auto rawNulls = nullsHolder.get(bits::nwords(size)); + auto numBytes = bits::nbytes(size); + source->readBytes(rawNulls, numBytes); + bits::reverseBits(reinterpret_cast(rawNulls), numBytes); + bits::negate(reinterpret_cast(rawNulls), numBytes * 8); + if (copy) { + copy->resize(bits::nwords(size)); + memcpy(copy->data(), rawNulls, numBytes); + } + return bits::countBits(rawNulls, 0, size); +} + +template +void readStructNulls( + ByteInputStream* source, + const TypePtr& type, + bool useLosslessTimestamp, + Scratch& scratch) { + const int32_t size = source->read(); + auto numValues = valueCount(source, size, scratch); + + if constexpr (std::is_same_v) { + source->skip( + numValues * + (useLosslessTimestamp ? sizeof(Timestamp) : sizeof(uint64_t))); + return; + } + source->skip(numValues * sizeof(T)); +} + +template <> +void readStructNulls( + ByteInputStream* source, + const TypePtr& type, + bool /*useLosslessTimestamp*/, + Scratch& scratch) { + const int32_t size = source->read(); + source->skip(size * sizeof(int32_t)); + valueCount(source, size, scratch); + const int32_t dataSize = source->read(); + source->skip(dataSize); +} + +void readStructNullsColumns( + ByteInputStream* source, + const std::vector& types, + bool useLoasslessTimestamp, + Scratch& scratch); + +void readConstantVectorStructNulls( + ByteInputStream* source, + const TypePtr& type, + bool useLosslessTimestamp, + Scratch& scratch) { + const auto size = source->read(); + std::vector childTypes = {type}; + readStructNullsColumns(source, childTypes, useLosslessTimestamp, scratch); +} + +void readDictionaryVectorStructNulls( + ByteInputStream* source, + const TypePtr& type, + bool useLosslessTimestamp, + Scratch& scratch) { + const auto size = source->read(); + std::vector childTypes = {type}; + readStructNullsColumns(source, childTypes, useLosslessTimestamp, scratch); + + // Skip indices. + source->skip(sizeof(int32_t) * size); + + // Skip 3 * 8 bytes of 'instance id'. Velox doesn't use 'instance id' for + // dictionary vectors. + source->skip(24); +} + +void readArrayVectorStructNulls( + ByteInputStream* source, + const TypePtr& type, + bool useLosslessTimestamp, + Scratch& scratch) { + std::vector childTypes = {type->childAt(0)}; + readStructNullsColumns(source, childTypes, useLosslessTimestamp, scratch); + + const vector_size_t size = source->read(); + + source->skip((size + 1) * sizeof(int32_t)); + valueCount(source, size, scratch); +} + +void readMapVectorStructNulls( + ByteInputStream* source, + const TypePtr& type, + bool useLosslessTimestamp, + Scratch& scratch) { + std::vector childTypes = {type->childAt(0), type->childAt(1)}; + readStructNullsColumns(source, childTypes, useLosslessTimestamp, scratch); + + int32_t hashTableSize = source->read(); + if (hashTableSize != -1) { + // Skip over serialized hash table from Presto wire format. + source->skip(hashTableSize * sizeof(int32_t)); + } + + const vector_size_t size = source->read(); + + source->skip((1 + size) * sizeof(int32_t)); + valueCount(source, size, scratch); +} + +void readTimestampWithTimeZoneStructNulls( + ByteInputStream* source, + Scratch& scratch) { + readStructNulls(source, BIGINT(), false, scratch); +} + +void readRowVectorStructNulls( + ByteInputStream* source, + const TypePtr& type, + bool useLosslessTimestamp, + Scratch& scratch) { + if (isTimestampWithTimeZoneType(type)) { + readTimestampWithTimeZoneStructNulls(source, scratch); + return; + } + auto streamPos = source->tellp(); + const int32_t numChildren = source->read(); + const auto& childTypes = type->asRow().children(); + readStructNullsColumns(source, childTypes, useLosslessTimestamp, scratch); + + const auto size = source->read(); + // Read and discard the offsets. The number of offsets is not affected by + // nulls. + source->skip((size + 1) * sizeof(int32_t)); + std::vector nullsCopy; + auto numNonNull = valueCount(source, size, scratch, &nullsCopy); + if (size != numNonNull) { + (*structNullsMap())[streamPos] = + std::pair, int32_t>(std::move(nullsCopy), size); + } +} + +void readStructNullsColumns( + ByteInputStream* source, + const std::vector& types, + bool useLosslessTimestamp, + Scratch& scratch) { + static const std::unordered_map< + TypeKind, + std::function> + readers = { + {TypeKind::BOOLEAN, &readStructNulls}, + {TypeKind::TINYINT, &readStructNulls}, + {TypeKind::SMALLINT, &readStructNulls}, + {TypeKind::INTEGER, &readStructNulls}, + {TypeKind::BIGINT, &readStructNulls}, + {TypeKind::HUGEINT, &readStructNulls}, + {TypeKind::REAL, &readStructNulls}, + {TypeKind::DOUBLE, &readStructNulls}, + {TypeKind::TIMESTAMP, &readStructNulls}, + {TypeKind::VARCHAR, &readStructNulls}, + {TypeKind::VARBINARY, &readStructNulls}, + {TypeKind::ARRAY, &readArrayVectorStructNulls}, + {TypeKind::MAP, &readMapVectorStructNulls}, + {TypeKind::ROW, &readRowVectorStructNulls}, + {TypeKind::UNKNOWN, &readStructNulls}}; + + for (int32_t i = 0; i < types.size(); ++i) { + const auto& columnType = types[i]; + + const auto encoding = readLengthPrefixedString(source); + if (encoding == kRLE) { + readConstantVectorStructNulls( + source, columnType, useLosslessTimestamp, scratch); + } else if (encoding == kDictionary) { + readDictionaryVectorStructNulls( + source, columnType, useLosslessTimestamp, scratch); + } else { + checkTypeEncoding(encoding, columnType); + const auto it = readers.find(columnType->kind()); + VELOX_CHECK( + it != readers.end(), + "Column reader for type {} is missing", + columnType->kindName()); + + it->second(source, columnType, useLosslessTimestamp, scratch); } } } @@ -1088,10 +1205,11 @@ class VectorStream { std::optional encoding, StreamArena* streamArena, int32_t initialNumRows, - bool useLosslessTimestamp) + const SerdeOpts& opts) : type_(type), encoding_{encoding}, - useLosslessTimestamp_(useLosslessTimestamp), + useLosslessTimestamp_(opts.useLosslessTimestamp), + nullsFirst_(opts.nullsFirst), nulls_(streamArena, true, true), lengths_(streamArena), values_(streamArena) { @@ -1105,22 +1223,14 @@ class VectorStream { case VectorEncoding::Simple::CONSTANT: { initializeHeader(kRLE, *streamArena); children_.emplace_back(std::make_unique( - type_, - std::nullopt, - streamArena, - initialNumRows, - useLosslessTimestamp)); + type_, std::nullopt, streamArena, initialNumRows, opts)); return; } case VectorEncoding::Simple::DICTIONARY: { initializeHeader(kDictionary, *streamArena); values_.startWrite(initialNumRows * 4); children_.emplace_back(std::make_unique( - type_, - std::nullopt, - streamArena, - initialNumRows, - useLosslessTimestamp)); + type_, std::nullopt, streamArena, initialNumRows, opts)); return; } default:; @@ -1149,7 +1259,7 @@ class VectorStream { std::nullopt, streamArena, initialNumRows, - useLosslessTimestamp); + opts); } // The first element in the offsets in the wire format is always 0 for // nested types. @@ -1255,13 +1365,19 @@ class VectorStream { return; } + if (nullsFirst_) { + writeInt32(out, nullCount_ + nonNullCount_); + flushNulls(out); + } writeInt32(out, children_.size()); for (auto& child : children_) { child->flush(out); } - writeInt32(out, nullCount_ + nonNullCount_); - lengths_.flush(out); - flushNulls(out); + if (!nullsFirst_) { + writeInt32(out, nullCount_ + nonNullCount_); + lengths_.flush(out); + flushNulls(out); + } return; case TypeKind::ARRAY: @@ -1317,6 +1433,7 @@ class VectorStream { /// If false, they are serialized with millisecond precision which is /// compatible with presto. const bool useLosslessTimestamp_; + const bool nullsFirst_; int32_t nonNullCount_{0}; int32_t nullCount_{0}; int32_t totalLength_{0}; @@ -2042,10 +2159,9 @@ class PrestoVectorSerializer : public VectorSerializer { std::vector encodings, int32_t numRows, StreamArena* streamArena, - bool useLosslessTimestamp, - common::CompressionKind compressionKind) + const SerdeOpts& opts) : streamArena_(streamArena), - codec_(common::compressionKindToCodec(compressionKind)) { + codec_(common::compressionKindToCodec(opts.compressionKind)) { auto types = rowType->children(); auto numTypes = types.size(); streams_.resize(numTypes); @@ -2055,7 +2171,7 @@ class PrestoVectorSerializer : public VectorSerializer { encoding = encodings[i]; } streams_[i] = std::make_unique( - types[i], encoding, streamArena, numRows, useLosslessTimestamp); + types[i], encoding, streamArena, numRows, opts); } } @@ -2265,12 +2381,7 @@ std::unique_ptr PrestoVectorSerde::createSerializer( const Options* options) { auto prestoOptions = toPrestoOptions(options); return std::make_unique( - type, - prestoOptions.encodings, - numRows, - streamArena, - prestoOptions.useLosslessTimestamp, - prestoOptions.compressionKind); + type, prestoOptions.encodings, numRows, streamArena, prestoOptions); } void PrestoVectorSerde::serializeEncoded( @@ -2285,6 +2396,61 @@ void PrestoVectorSerde::serializeEncoded( ->flushEncoded(vector, out); } +namespace { +bool hasNestedStructs(const TypePtr& type) { + if (type->isRow()) { + return true; + } + if (type->isArray()) { + return hasNestedStructs(type->childAt(0)); + } + if (type->isMap()) { + return hasNestedStructs(type->childAt(0)) || + hasNestedStructs(type->childAt(1)); + } + return false; +} + +bool hasNestedStructs(const std::vector& types) { + for (auto& child : types) { + if (hasNestedStructs(child)) { + return true; + } + } + return false; +} + +void readTopColumns( + ByteInputStream& source, + const RowTypePtr& type, + velox::memory::MemoryPool* pool, + const RowVectorPtr& result, + int32_t resultOffset, + const SerdeOpts& opts) { + auto& children = result->children(); + const auto& childTypes = type->asRow().children(); + const auto numColumns = source.read(); + VELOX_USER_CHECK_EQ( + numColumns, + type->size(), + "Number of columns in serialized data doesn't match " + "number of columns requested for deserialization"); + + auto guard = folly::makeGuard([&]() { structNullsMap().reset(); }); + + if (!opts.nullsFirst && hasNestedStructs(childTypes)) { + structNullsMap() = std::make_unique(); + Scratch scratch; + auto position = source.tellp(); + readStructNullsColumns( + &source, childTypes, opts.useLosslessTimestamp, scratch); + source.seekp(position); + } + readColumns( + &source, pool, childTypes, children, resultOffset, opts, nullptr, 0); +} +} // namespace + void PrestoVectorSerde::deserialize( ByteInputStream* source, velox::memory::MemoryPool* pool, @@ -2335,18 +2501,8 @@ void PrestoVectorSerde::deserialize( common::compressionKindToString( common::codecTypeToCompressionKind(codec->type()))); - auto& children = (*result)->children(); - const auto& childTypes = type->asRow().children(); if (!needCompression(*codec)) { - const auto numColumns = source->read(); - // TODO Fix call sites and tighten the check to _EQ. - VELOX_USER_CHECK_GE( - numColumns, - type->size(), - "Number of columns in serialized data doesn't match " - "number of columns requested for deserialization"); - readColumns( - source, pool, childTypes, children, resultOffset, useLosslessTimestamp); + readTopColumns(*source, type, pool, *result, resultOffset, prestoOptions); } else { auto compressBuf = folly::IOBuf::create(compressedSize); source->readBytes(compressBuf->writableData(), compressedSize); @@ -2356,33 +2512,9 @@ void PrestoVectorSerde::deserialize( uncompress->writableData(), (int32_t)uncompress->length(), 0}; ByteInputStream uncompressedSource({byteRange}); - const auto numColumns = uncompressedSource.read(); - VELOX_USER_CHECK_EQ( - numColumns, - type->size(), - "Number of columns in serialized data doesn't match " - "number of columns requested for deserialization"); - readColumns( - &uncompressedSource, - pool, - childTypes, - children, - resultOffset, - useLosslessTimestamp); + readTopColumns( + uncompressedSource, type, pool, *result, resultOffset, prestoOptions); } - - scatterStructNulls( - (*result)->size(), 0, nullptr, nullptr, **result, resultOffset); -} - -void testingScatterStructNulls( - vector_size_t size, - vector_size_t scatterSize, - const vector_size_t* scatter, - const uint64_t* incomingNulls, - RowVector& row, - vector_size_t rowOffset) { - scatterStructNulls(size, scatterSize, scatter, incomingNulls, row, rowOffset); } // static diff --git a/velox/serializers/PrestoSerializer.h b/velox/serializers/PrestoSerializer.h index da20b21cc008b..ab594d19393c5 100644 --- a/velox/serializers/PrestoSerializer.h +++ b/velox/serializers/PrestoSerializer.h @@ -29,18 +29,26 @@ class PrestoVectorSerde : public VectorSerde { PrestoOptions( bool _useLosslessTimestamp, - common::CompressionKind _compressionKind) + common::CompressionKind _compressionKind, + bool nullsFirst = false) : useLosslessTimestamp(_useLosslessTimestamp), - compressionKind(_compressionKind) {} + compressionKind(_compressionKind), + nullsFirst(nullsFirst) {} // Currently presto only supports millisecond precision and the serializer // converts velox native timestamp to that resulting in loss of precision. // This option allows it to serialize with nanosecond precision and is // currently used for spilling. Is false by default. bool useLosslessTimestamp{false}; + common::CompressionKind compressionKind{ common::CompressionKind::CompressionKind_NONE}; std::vector encodings; + + /// Serializes nulls of structs before the columns. Used to allow + /// single pass reading of in spilling. TODO: Make Presto also + /// serialize nulls before columns of structs. + bool nullsFirst{false}; }; void estimateSerializedSize( diff --git a/velox/serializers/tests/PrestoSerializerTest.cpp b/velox/serializers/tests/PrestoSerializerTest.cpp index 0165c273706c9..c829478d0854a 100644 --- a/velox/serializers/tests/PrestoSerializerTest.cpp +++ b/velox/serializers/tests/PrestoSerializerTest.cpp @@ -60,8 +60,11 @@ class PrestoSerializerTest const bool useLosslessTimestamp = serdeOptions == nullptr ? false : serdeOptions->useLosslessTimestamp; common::CompressionKind kind = GetParam(); + const bool nullsFirst = + serdeOptions == nullptr ? false : serdeOptions->nullsFirst; serializer::presto::PrestoVectorSerde::PrestoOptions paramOptions{ - useLosslessTimestamp, kind}; + useLosslessTimestamp, kind, nullsFirst}; + return paramOptions; } @@ -436,56 +439,6 @@ TEST_P(PrestoSerializerTest, encodings) { testEncodedRoundTrip(data); } -TEST_P(PrestoSerializerTest, scatterEncoded) { - // Makes a struct with nulls and constant/dictionary encoded children. The - // children need to get gaps where the parent struct has a null. - VectorFuzzer::Options opts; - opts.timestampPrecision = - VectorFuzzer::Options::TimestampPrecision::kMilliSeconds; - opts.nullRatio = 0.1; - VectorFuzzer fuzzer(opts, pool_.get()); - - auto rowType = ROW( - {{"inner", - ROW( - {{"i1", BIGINT()}, - {"i2", VARCHAR()}, - {"i3", ARRAY(INTEGER())}, - {"i4", ROW({{"ii1", BIGINT()}})}})}}); - auto row = fuzzer.fuzzInputRow(rowType); - auto inner = - const_cast(row->childAt(0)->wrappedVector()->as()); - if (!inner->mayHaveNulls()) { - return; - } - auto numNulls = BaseVector::countNulls(inner->nulls(), 0, inner->size()); - auto numNonNull = inner->size() - numNulls; - auto indices = makeIndices(numNonNull, [](auto row) { return row; }); - - inner->children()[0] = BaseVector::createConstant( - BIGINT(), - variant::create(11L), - numNonNull, - pool_.get()); - inner->children()[1] = BaseVector::wrapInDictionary( - BufferPtr(nullptr), indices, numNonNull, inner->childAt(1)); - inner->children()[2] = - BaseVector::wrapInConstant(numNonNull, 3, inner->childAt(2)); - - // i4 is a struct that we wrap in constant. We make ths struct like it was - // read from seriailization, needing scatter for struct nulls. - auto i4 = const_cast( - inner->childAt(3)->wrappedVector()->as()); - auto i4NonNull = i4->mayHaveNulls() - ? i4->size() - BaseVector::countNulls(i4->nulls(), 0, i4->size()) - : i4->size(); - i4->childAt(0)->resize(i4NonNull); - inner->children()[3] = - BaseVector::wrapInConstant(numNonNull, 3, inner->childAt(3)); - serializer::presto::testingScatterStructNulls( - row->size(), row->size(), nullptr, nullptr, *row, 0); -} - TEST_P(PrestoSerializerTest, lazy) { constexpr int kSize = 1000; auto rowVector = makeTestVector(kSize); @@ -504,7 +457,7 @@ TEST_P(PrestoSerializerTest, ioBufRoundTrip) { opts.nullRatio = 0.1; VectorFuzzer fuzzer(opts, pool_.get()); - const size_t numRounds = 20; + const size_t numRounds = 100; for (size_t i = 0; i < numRounds; ++i) { auto rowType = fuzzer.randRowType(); @@ -523,12 +476,32 @@ TEST_P(PrestoSerializerTest, roundTrip) { opts.nullRatio = 0.1; VectorFuzzer fuzzer(opts, pool_.get()); - const size_t numRounds = 20; + const size_t numRounds = 100; + + for (size_t i = 0; i < numRounds; ++i) { + auto rowType = fuzzer.randRowType(); + auto inputRowVector = fuzzer.fuzzInputRow(rowType); + serializer::presto::PrestoVectorSerde::PrestoOptions prestoOpts; + // Test every second with struct nulls first. + prestoOpts.nullsFirst = i % 2 == 1; + testRoundTrip(inputRowVector, &prestoOpts); + } +} + +TEST_P(PrestoSerializerTest, encodedRoundtrip) { + VectorFuzzer::Options opts; + opts.timestampPrecision = + VectorFuzzer::Options::TimestampPrecision::kMilliSeconds; + opts.nullRatio = 0.1; + opts.dictionaryHasNulls = false; + VectorFuzzer fuzzer(opts, pool_.get()); + + const size_t numRounds = 200; for (size_t i = 0; i < numRounds; ++i) { auto rowType = fuzzer.randRowType(); auto inputRowVector = fuzzer.fuzzInputRow(rowType); - testRoundTrip(inputRowVector); + testEncodedRoundTrip(inputRowVector); } }