From 5a1ee8605b2daee782d1fdf0ccd2a6899451eaff Mon Sep 17 00:00:00 2001 From: Orri Erling Date: Thu, 7 Dec 2023 15:03:59 -0800 Subject: [PATCH] Scatter struct nulls when deserializing Presto wire format --- velox/serializers/PrestoSerializer.cpp | 943 ++++++++++-------- .../tests/PrestoSerializerTest.cpp | 71 +- 2 files changed, 555 insertions(+), 459 deletions(-) diff --git a/velox/serializers/PrestoSerializer.cpp b/velox/serializers/PrestoSerializer.cpp index 53747c6f4d4a5..a2a3671f77015 100644 --- a/velox/serializers/PrestoSerializer.cpp +++ b/velox/serializers/PrestoSerializer.cpp @@ -148,6 +148,23 @@ FOLLY_ALWAYS_INLINE bool needCompression(const folly::io::Codec& codec) { return codec.type() != folly::io::CodecType::NO_COMPRESSION; } +using StructNullsMap = + folly::F14FastMap, int32_t>>; + +auto& structNullsMap() { + thread_local std::unique_ptr map; + return map; +} + +std::pair getStructNulls(int64_t position) { + auto& map = structNullsMap(); + auto it = map->find(position); + if (it == map->end()) { + return {nullptr, 0}; + } + return {it->second.first.data(), it->second.second}; +} + template void readValues( ByteInputStream* source, @@ -304,21 +321,40 @@ void readDecimalValues( } } +vector_size_t sizeWithIncomingNulls( + vector_size_t size, + int32_t numIncomingNulls) { + return numIncomingNulls == 0 ? size : numIncomingNulls; +} + vector_size_t readNulls( ByteInputStream* source, vector_size_t size, BaseVector& result, - vector_size_t resultOffset) { + vector_size_t resultOffset, + const uint64_t* incomingNulls, + int32_t numIncomingNulls) { + VELOX_DCHECK_LE( + result.size(), resultOffset + (incomingNulls ? numIncomingNulls : size)); if (source->readByte() == 0) { - result.clearNulls(resultOffset, resultOffset + size); - return 0; + if (incomingNulls) { + auto rawNulls = result.mutableRawNulls(); + bits::copyBits( + incomingNulls, 0, rawNulls, resultOffset, numIncomingNulls); + } else { + result.clearNulls(resultOffset, resultOffset + size); + } + return incomingNulls + ? numIncomingNulls - bits::countBits(incomingNulls, 0, numIncomingNulls) + : 0; } const bool noPriorNulls = (result.rawNulls() == nullptr); + const auto numNewValues = sizeWithIncomingNulls(size, numIncomingNulls); // Allocate one extra byte in case we cannot use bits from the current last // partial byte. - BufferPtr& nulls = result.mutableNulls(resultOffset + size + 8); + BufferPtr& nulls = result.mutableNulls(resultOffset + numNewValues + 8); if (noPriorNulls) { bits::fillBits( nulls->asMutable(), 0, resultOffset, bits::kNotNull); @@ -330,6 +366,15 @@ vector_size_t readNulls( source->readBytes(rawNulls, numBytes); bits::reverseBits(rawNulls, numBytes); bits::negate(reinterpret_cast(rawNulls), numBytes * 8); + // Add incoming nulls if any. + if (incomingNulls) { + bits::scatterBits( + size, + numIncomingNulls, + reinterpret_cast(rawNulls), + incomingNulls, + reinterpret_cast(rawNulls)); + } // Shift bits if needed. if (bits::nbytes(resultOffset) * 8 > resultOffset) { @@ -338,10 +383,11 @@ vector_size_t readNulls( bits::nbytes(resultOffset) * 8, nulls->asMutable(), resultOffset, - size); + numNewValues); } - return BaseVector::countNulls(nulls, resultOffset, resultOffset + size); + return BaseVector::countNulls( + nulls, resultOffset, resultOffset + numNewValues); } template @@ -351,28 +397,47 @@ void read( velox::memory::MemoryPool* pool, VectorPtr& result, vector_size_t resultOffset, - bool useLosslessTimestamp) { + bool useLosslessTimestamp, + const uint64_t* incomingNulls, + int32_t numIncomingNulls) { const int32_t size = source->read(); - result->resize(resultOffset + size); + const auto numNewValues = sizeWithIncomingNulls(size, numIncomingNulls); + result->resize(resultOffset + numNewValues); auto flatResult = result->asFlatVector(); - auto nullCount = readNulls(source, size, *flatResult, resultOffset); + auto nullCount = readNulls( + source, size, *flatResult, resultOffset, incomingNulls, numIncomingNulls); - BufferPtr values = flatResult->mutableValues(resultOffset + size); + BufferPtr values = flatResult->mutableValues(resultOffset + numNewValues); if constexpr (std::is_same_v) { if (useLosslessTimestamp) { readLosslessTimestampValues( - source, size, resultOffset, flatResult->nulls(), nullCount, values); + source, + numNewValues, + resultOffset, + flatResult->nulls(), + nullCount, + values); return; } } if (type->isLongDecimal()) { readDecimalValues( - source, size, resultOffset, flatResult->nulls(), nullCount, values); + source, + numNewValues, + resultOffset, + flatResult->nulls(), + nullCount, + values); return; } readValues( - source, size, resultOffset, flatResult->nulls(), nullCount, values); + source, + numNewValues, + resultOffset, + flatResult->nulls(), + nullCount, + values); } template <> @@ -382,20 +447,29 @@ void read( velox::memory::MemoryPool* pool, VectorPtr& result, vector_size_t resultOffset, - bool useLosslessTimestamp) { + bool useLosslessTimestamp, + const uint64_t* incomingNulls, + int32_t numIncomingNulls) { const int32_t size = source->read(); + const int32_t numNewValues = sizeWithIncomingNulls(size, numIncomingNulls); - result->resize(resultOffset + size); + result->resize(resultOffset + numNewValues); auto flatResult = result->as>(); BufferPtr values = flatResult->mutableValues(resultOffset + size); auto rawValues = values->asMutable(); - for (int32_t i = 0; i < size; ++i) { + int32_t lastOffset = 0; + for (int32_t i = 0; i < numNewValues; ++i) { // Set the first int32_t of each StringView to be the offset. - *reinterpret_cast(&rawValues[resultOffset + i]) = + if (incomingNulls && bits::isBitNull(incomingNulls, i)) { + *reinterpret_cast(&rawValues[resultOffset + i]) = lastOffset; + continue; + } + lastOffset = *reinterpret_cast(&rawValues[resultOffset + i]) = source->read(); } - readNulls(source, size, *flatResult, resultOffset); + readNulls( + source, size, *flatResult, resultOffset, incomingNulls, numIncomingNulls); const int32_t dataSize = source->read(); if (dataSize == 0) { @@ -408,7 +482,7 @@ void read( source->readBytes(rawStrings, dataSize); int32_t previousOffset = 0; auto rawChars = reinterpret_cast(rawStrings); - for (int32_t i = 0; i < size; ++i) { + for (int32_t i = 0; i < numNewValues; ++i) { int32_t offset = rawValues[resultOffset + i].size(); rawValues[resultOffset + i] = StringView(rawChars + previousOffset, offset - previousOffset); @@ -422,7 +496,9 @@ void readColumns( const std::vector& types, std::vector& result, vector_size_t resultOffset, - bool useLosslessTimestamp); + bool useLosslessTimestamp, + const uint64_t* incomingNulls, + int32_t numIncomingNulls); void readConstantVector( ByteInputStream* source, @@ -430,25 +506,39 @@ void readConstantVector( velox::memory::MemoryPool* pool, VectorPtr& result, vector_size_t resultOffset, - bool useLosslessTimestamp) { + bool useLosslessTimestamp, + const uint64_t* incomingNulls, + int32_t numIncomingNulls) { const auto size = source->read(); + const int32_t numNewValues = sizeWithIncomingNulls(size, numIncomingNulls); std::vector childTypes = {type}; std::vector children{BaseVector::create(type, 0, pool)}; - readColumns(source, pool, childTypes, children, 0, useLosslessTimestamp); + readColumns( + source, pool, childTypes, children, 0, useLosslessTimestamp, nullptr, 0); VELOX_CHECK_EQ(1, children[0]->size()); - auto constantVector = BaseVector::wrapInConstant(size, 0, children[0]); - if (resultOffset == 0) { + auto constantVector = + BaseVector::wrapInConstant(numNewValues, 0, children[0]); + if (resultOffset == 0 && !incomingNulls) { result = std::move(constantVector); } else { - result->resize(resultOffset + size); + if (!incomingNulls && constantVector->equalValueAt(result.get(), 0, 0)) { + result->resize(resultOffset + numNewValues); + return; + } + result->resize(resultOffset + numNewValues); SelectivityVector rows(resultOffset + size, false); - rows.setValidRange(resultOffset, resultOffset + size, true); + rows.setValidRange(resultOffset, resultOffset + numNewValues, true); rows.updateBounds(); BaseVector::ensureWritable(rows, type, pool, result); - result->copy(constantVector.get(), resultOffset, 0, size); + result->copy(constantVector.get(), resultOffset, 0, numNewValues); + if (incomingNulls) { + bits::forEachUnsetBit(incomingNulls, 0, numNewValues, [&](auto row) { + result->setNull(resultOffset + row, true); + }); + } } } @@ -458,34 +548,58 @@ void readDictionaryVector( velox::memory::MemoryPool* pool, VectorPtr& result, vector_size_t resultOffset, - bool useLosslessTimestamp) { + bool useLosslessTimestamp, + const uint64_t* incomingNulls, + int32_t numIncomingNulls) { const auto size = source->read(); + const int32_t numNewValues = sizeWithIncomingNulls(size, numIncomingNulls); std::vector childTypes = {type}; std::vector children{BaseVector::create(type, 0, pool)}; - readColumns(source, pool, childTypes, children, 0, useLosslessTimestamp); + readColumns( + source, pool, childTypes, children, 0, useLosslessTimestamp, nullptr, 0); // Read indices. - BufferPtr indices = allocateIndices(size, pool); - source->readBytes(indices->asMutable(), size * sizeof(int32_t)); + BufferPtr indices = allocateIndices(numNewValues, pool); + if (incomingNulls) { + auto rawIndices = indices->asMutable(); + for (auto i = 0; i < numNewValues; ++i) { + if (bits::isBitNull(incomingNulls, i)) { + rawIndices[i] = 0; + } else { + rawIndices[i] = source->read(); + } + } + } else { + source->readBytes( + indices->asMutable(), numNewValues * sizeof(int32_t)); + } // Skip 3 * 8 bytes of 'instance id'. Velox doesn't use 'instance id' for // dictionary vectors. source->skip(24); - auto dictionaryVector = - BaseVector::wrapInDictionary(nullptr, indices, size, children[0]); + BufferPtr incomingNullsBuffer = nullptr; + if (incomingNulls) { + incomingNullsBuffer = AlignedBuffer::allocate(numIncomingNulls, pool); + memcpy( + incomingNullsBuffer->asMutable(), + incomingNulls, + bits::nbytes(numIncomingNulls)); + } + auto dictionaryVector = BaseVector::wrapInDictionary( + incomingNullsBuffer, indices, size, children[0]); if (resultOffset == 0) { result = std::move(dictionaryVector); } else { - result->resize(resultOffset + size); + result->resize(resultOffset + numNewValues); - SelectivityVector rows(resultOffset + size, false); - rows.setValidRange(resultOffset, resultOffset + size, true); + SelectivityVector rows(resultOffset + numNewValues, false); + rows.setValidRange(resultOffset, resultOffset + numNewValues, true); rows.updateBounds(); BaseVector::ensureWritable(rows, type, pool, result); - result->copy(dictionaryVector.get(), resultOffset, 0, size); + result->copy(dictionaryVector.get(), resultOffset, 0, numNewValues); } } @@ -495,7 +609,9 @@ void readArrayVector( velox::memory::MemoryPool* pool, VectorPtr& result, vector_size_t resultOffset, - bool useLosslessTimestamp) { + bool useLosslessTimestamp, + const uint64_t* incomingNulls, + int32_t numIncomingNulls) { ArrayVector* arrayVector = result->as(); const auto resultElementsOffset = arrayVector->elements()->size(); @@ -508,25 +624,39 @@ void readArrayVector( childTypes, children, resultElementsOffset, - useLosslessTimestamp); + useLosslessTimestamp, + nullptr, + 0); - vector_size_t size = source->read(); - arrayVector->resize(resultOffset + size); + const vector_size_t size = source->read(); + const auto numNewValues = sizeWithIncomingNulls(size, numIncomingNulls); + arrayVector->resize(resultOffset + numNewValues); arrayVector->setElements(children[0]); - BufferPtr offsets = arrayVector->mutableOffsets(resultOffset + size); + BufferPtr offsets = arrayVector->mutableOffsets(resultOffset + numNewValues); auto rawOffsets = offsets->asMutable(); - BufferPtr sizes = arrayVector->mutableSizes(resultOffset + size); + BufferPtr sizes = arrayVector->mutableSizes(resultOffset + numNewValues); auto rawSizes = sizes->asMutable(); int32_t base = source->read(); - for (int32_t i = 0; i < size; ++i) { + for (int32_t i = 0; i < numNewValues; ++i) { + if (incomingNulls && bits::isBitNull(incomingNulls, i)) { + rawOffsets[resultOffset + i] = 0; + rawSizes[resultOffset + i] = 0; + continue; + } int32_t offset = source->read(); rawOffsets[resultOffset + i] = resultElementsOffset + base; rawSizes[resultOffset + i] = offset - base; base = offset; } - readNulls(source, size, *arrayVector, resultOffset); + readNulls( + source, + size, + *arrayVector, + resultOffset, + incomingNulls, + numIncomingNulls); } void readMapVector( @@ -535,7 +665,9 @@ void readMapVector( velox::memory::MemoryPool* pool, VectorPtr& result, vector_size_t resultOffset, - bool useLosslessTimestamp) { + bool useLosslessTimestamp, + const uint64_t* incomingNulls, + int32_t numIncomingNulls) { MapVector* mapVector = result->as(); const auto resultElementsOffset = mapVector->mapKeys()->size(); std::vector childTypes = {type->childAt(0), type->childAt(1)}; @@ -546,7 +678,9 @@ void readMapVector( childTypes, children, resultElementsOffset, - useLosslessTimestamp); + useLosslessTimestamp, + nullptr, + 0); int32_t hashTableSize = source->read(); if (hashTableSize != -1) { @@ -554,23 +688,31 @@ void readMapVector( source->skip(hashTableSize * sizeof(int32_t)); } - vector_size_t size = source->read(); - mapVector->resize(resultOffset + size); + const vector_size_t size = source->read(); + const vector_size_t numNewValues = + sizeWithIncomingNulls(size, numIncomingNulls); + mapVector->resize(resultOffset + numNewValues); mapVector->setKeysAndValues(children[0], children[1]); - BufferPtr offsets = mapVector->mutableOffsets(resultOffset + size); + BufferPtr offsets = mapVector->mutableOffsets(resultOffset + numNewValues); auto rawOffsets = offsets->asMutable(); - BufferPtr sizes = mapVector->mutableSizes(resultOffset + size); + BufferPtr sizes = mapVector->mutableSizes(resultOffset + numNewValues); auto rawSizes = sizes->asMutable(); int32_t base = source->read(); - for (int32_t i = 0; i < size; ++i) { + for (int32_t i = 0; i < numNewValues; ++i) { + if (incomingNulls && bits::isBitNull(incomingNulls, i)) { + rawOffsets[resultOffset + i] = 0; + rawSizes[resultOffset + i] = 0; + continue; + } int32_t offset = source->read(); rawOffsets[resultOffset + i] = resultElementsOffset + base; rawSizes[resultOffset + i] = offset - base; base = offset; } - readNulls(source, size, *mapVector, resultOffset); + readNulls( + source, size, *mapVector, resultOffset, incomingNulls, numIncomingNulls); } int64_t packTimestampWithTimeZone(int64_t timestamp, int16_t timezone) { @@ -589,9 +731,19 @@ void readTimestampWithTimeZone( ByteInputStream* source, velox::memory::MemoryPool* pool, RowVector* result, - vector_size_t resultOffset) { + vector_size_t resultOffset, + const uint64_t* incomingNulls, + int32_t numIncomingNulls) { auto& timestamps = result->childAt(0); - read(source, BIGINT(), pool, timestamps, resultOffset, false); + read( + source, + BIGINT(), + pool, + timestamps, + resultOffset, + false, + incomingNulls, + numIncomingNulls); auto rawTimestamps = timestamps->asFlatVector()->mutableRawValues(); @@ -614,338 +766,67 @@ void readTimestampWithTimeZone( } } -template -void scatterValues( - int32_t numValues, - const vector_size_t* indices, - T* data, - vector_size_t offset) { - for (auto index = numValues - 1; index >= 0; --index) { - const auto destination = indices[index]; - if (destination == offset + index) { - break; - } - data[destination] = data[offset + index]; - } -} - -template -void scatterFlatValues( - vector_size_t scatterSize, - const vector_size_t* scatter, - BaseVector& vector, - vector_size_t offset) { - using T = typename TypeTraits::NativeType; - auto* values = vector.asUnchecked>()->mutableRawValues(); - scatterValues(scatterSize, scatter, values, offset); -} - -template <> -void scatterFlatValues( - vector_size_t scatterSize, - const vector_size_t* scatter, - BaseVector& vector, - vector_size_t offset) { - auto* values = const_cast(vector.values()->as()); - - for (auto index = scatterSize - 1; index >= 0; --index) { - const auto destination = scatter[index]; - if (destination == offset + index) { - break; - } - bits::setBit(values, destination, bits::isBitSet(values, offset + index)); - } -} - -void scatterStructNulls( - vector_size_t size, - vector_size_t scatterSize, - const vector_size_t* scatter, - const uint64_t* incomingNulls, - RowVector& row, - vector_size_t rowOffset); - -// Scatters existing nulls and adds 'incomingNulls' to the gaps. 'oldSize' is -// the number of valid bits in the nulls of 'vector'. 'vector' must have been -// resized to the new size before calling this. -void scatterNulls( - vector_size_t oldSize, - const uint64_t* incomingNulls, - BaseVector& vector) { - const bool hasNulls = vector.mayHaveNulls(); - const auto size = vector.size(); - - if (hasNulls) { - auto bits = reinterpret_cast(vector.mutableRawNulls()); - bits::scatterBits(oldSize, size, bits, incomingNulls, bits); - } else { - memcpy(vector.mutableRawNulls(), incomingNulls, bits::nbytes(size)); - } -} - -// Scatters all rows in 'vector' starting from 'offset'. All these rows are -// expected to be non-null before this call. vector[offset + i] is copied into -// vector[offset + scatter[i]] for all i in [0, scatterSize). The gaps are -// filled with nulls. scatter[i] >= i for all i. scatter[i] >= scatter[j] for -// all i and j. -// -// @param size Size of the 'vector' after scatter. Includes trailing nulls. -// @param scatterSize Number of rows to scatter, starting from 'offset'. -// @param scatter Destination row numbers. A total of 'scatterSize' entries. -// @param incomingNulls A bitmap representation of the 'scatter' covering 'size' -// rows. First 'offset' bits should be kNotNull. -// @param vector Vector to modify. -// @param offset First row to scatter. -void scatterVector( - int32_t size, - vector_size_t scatterSize, - const vector_size_t* scatter, - const uint64_t* incomingNulls, - VectorPtr& vector, - vector_size_t offset) { - const auto oldSize = vector->size(); - if (scatter != nullptr && scatterSize > 0) { - // 'scatter' should have an entry for every row in 'vector' starting from - // 'offset'. - VELOX_CHECK_EQ(vector->size(), scatterSize + offset); - - // The new vector size should cover all 'scatter' destinations and - // trailing nulls. - VELOX_CHECK_GE(size, scatter[scatterSize - 1] + 1); - } - - if (vector->encoding() == VectorEncoding::Simple::ROW) { - vector->asUnchecked()->unsafeResize(size); - } else { - vector->resize(size); - } - - switch (vector->encoding()) { - case VectorEncoding::Simple::DICTIONARY: { - VELOX_CHECK_EQ(0, offset, "Result offset is not supported yet") - if (incomingNulls) { - auto dictIndices = - const_cast(vector->wrapInfo()->as()); - scatterValues(scatterSize, scatter, dictIndices, 0); - scatterNulls(oldSize, incomingNulls, *vector); - } - auto values = vector->valueVector(); - scatterVector(values->size(), 0, nullptr, nullptr, values, 0); - break; - } - case VectorEncoding::Simple::CONSTANT: { - VELOX_CHECK_EQ(0, offset, "Result offset is not supported yet") - auto values = vector->valueVector(); - if (values) { - scatterVector(values->size(), 0, nullptr, nullptr, values, 0); - } - - if (incomingNulls) { - BaseVector::ensureWritable( - SelectivityVector::empty(), vector->type(), vector->pool(), vector); - scatterNulls(oldSize, incomingNulls, *vector); - } - break; - } - case VectorEncoding::Simple::ARRAY: { - auto* array = vector->asUnchecked(); - if (incomingNulls) { - auto offsets = const_cast(array->rawOffsets()); - auto sizes = const_cast(array->rawSizes()); - scatterValues(scatterSize, scatter, offsets, offset); - scatterValues(scatterSize, scatter, sizes, offset); - scatterNulls(oldSize, incomingNulls, *vector); - } - // Find the offset from which 'new' array elements start assuming that - // array elements are written in order. - vector_size_t elementOffset = 0; - for (auto i = offset - 1; i >= 0; --i) { - if (!array->isNullAt(i)) { - elementOffset = array->offsetAt(i) + array->sizeAt(i); - break; - } - } - auto elements = array->elements(); - scatterVector( - elements->size(), 0, nullptr, nullptr, elements, elementOffset); - break; - } - case VectorEncoding::Simple::MAP: { - auto* map = vector->asUnchecked(); - if (incomingNulls) { - auto offsets = const_cast(map->rawOffsets()); - auto sizes = const_cast(map->rawSizes()); - scatterValues(scatterSize, scatter, offsets, offset); - scatterValues(scatterSize, scatter, sizes, offset); - scatterNulls(oldSize, incomingNulls, *vector); - } - // Find out the offset from which 'new' map keys and values start assuming - // that map elements are written in order. - vector_size_t elementOffset = 0; - for (auto i = offset - 1; i >= 0; --i) { - if (!map->isNullAt(i)) { - elementOffset = map->offsetAt(i) + map->sizeAt(i); - break; - } - } - auto keys = map->mapKeys(); - scatterVector(keys->size(), 0, nullptr, nullptr, keys, elementOffset); - auto values = map->mapValues(); - scatterVector(values->size(), 0, nullptr, nullptr, values, elementOffset); - break; - } - case VectorEncoding::Simple::ROW: { - auto* row = vector->asUnchecked(); - scatterStructNulls(row->size(), 0, nullptr, nullptr, *row, offset); - break; - } - case VectorEncoding::Simple::FLAT: { - if (incomingNulls) { - VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH_ALL( - scatterFlatValues, - vector->typeKind(), - scatterSize, - scatter, - *vector, - offset); - scatterNulls(oldSize, incomingNulls, *vector); - } - break; - } - default: - VELOX_FAIL("Unsupported encoding in scatter: {}", vector->encoding()); - } -} - -// A RowVector with nulls is serialized with children having a value -// only for rows where the struct is non-null. After deserializing, -// we do an extra traversal to add gaps into struct members where -// the containing struct is null. As we go down the struct tree, we -// merge child struct nulls into the nulls from enclosing structs so -// that the leaves only get scattered once, considering all nulls -// from all enclosing structs. -void scatterStructNulls( - vector_size_t size, - vector_size_t scatterSize, - const vector_size_t* scatter, - const uint64_t* incomingNulls, - RowVector& row, - vector_size_t rowOffset) { - const auto oldSize = row.size(); - if (isTimestampWithTimeZoneType(row.type())) { - // The timestamp with tz case is special. The child vectors are aligned with - // the struct even if the struct has nulls. - if (incomingNulls) { - scatterVector( - size, scatterSize, scatter, incomingNulls, row.childAt(0), rowOffset); - scatterVector( - size, scatterSize, scatter, incomingNulls, row.childAt(1), rowOffset); - row.unsafeResize(size); - scatterNulls(oldSize, incomingNulls, row); - } - return; - } - - const uint64_t* childIncomingNulls = incomingNulls; - const vector_size_t* childScatter = scatter; - auto childScatterSize = scatterSize; - raw_vector innerScatter; - raw_vector childIncomingNullsVector; - if (auto* rawNulls = row.rawNulls()) { - innerScatter.resize(size); - if (!incomingNulls) { - childIncomingNulls = rawNulls; - if (rowOffset > 0) { - childIncomingNullsVector.resize(bits::nwords(size)); - auto newNulls = childIncomingNullsVector.data(); - memcpy(newNulls, rawNulls, bits::nbytes(size)); - - // Fill in first 'rowOffset' bits with kNotNull to avoid scattering - // nulls for pre-existing rows. - bits::fillBits(newNulls, 0, rowOffset, bits::kNotNull); - childIncomingNulls = newNulls; - } - childScatterSize = simd::indicesOfSetBits( - rawNulls, rowOffset, size, innerScatter.data()); - } else { - childIncomingNullsVector.resize(bits::nwords(size)); - auto newNulls = childIncomingNullsVector.data(); - bits::scatterBits( - row.size(), - size, - reinterpret_cast(rawNulls), - incomingNulls, - reinterpret_cast(newNulls)); - - // Fill in first 'rowOffset' bits with kNotNull to avoid scattering - // nulls for pre-existing rows. - bits::fillBits(newNulls, 0, rowOffset, bits::kNotNull); - childIncomingNulls = newNulls; - childScatterSize = simd::indicesOfSetBits( - childIncomingNulls, rowOffset, size, innerScatter.data()); - } - childScatter = innerScatter.data(); - } - for (auto i = 0; i < row.childrenSize(); ++i) { - auto& child = row.childAt(i); - if (child->encoding() == VectorEncoding::Simple::ROW) { - scatterStructNulls( - size, - childScatterSize, - childScatter, - childIncomingNulls, - *child->asUnchecked(), - rowOffset); - } else { - scatterVector( - size, - childScatterSize, - childScatter, - childIncomingNulls, - row.childAt(i), - rowOffset); - } - } - if (incomingNulls) { - row.unsafeResize(size); - scatterNulls(oldSize, incomingNulls, row); - } - // On return of scatter we check that child sizes match the struct size. This - // is safe also if no scatter. - for (auto i = 0; i < row.childrenSize(); ++i) { - VELOX_CHECK_EQ(row.childAt(i)->size(), row.size()); - } -} - void readRowVector( ByteInputStream* source, const TypePtr& type, velox::memory::MemoryPool* pool, VectorPtr& result, vector_size_t resultOffset, - bool useLosslessTimestamp) { + bool useLosslessTimestamp, + const uint64_t* incomingNulls, + int32_t numIncomingNulls) { auto* row = result->as(); if (isTimestampWithTimeZoneType(type)) { - readTimestampWithTimeZone(source, pool, row, resultOffset); + readTimestampWithTimeZone( + source, pool, row, resultOffset, incomingNulls, numIncomingNulls); return; } - + auto [structNulls, numStructNulls] = getStructNulls(source->tellp()); + BufferPtr combinedNulls; + // childNulls is the nulls added to the children, i.e. the nulls of this + // struct combined with nulls of enclosing structs. + const uint64_t* childNulls = incomingNulls; + int32_t numChildNulls = numIncomingNulls; + if (structNulls) { + if (incomingNulls) { + combinedNulls = AlignedBuffer::allocate(numIncomingNulls, pool); + bits::scatterBits( + numStructNulls, + numIncomingNulls, + reinterpret_cast(structNulls), + incomingNulls, + combinedNulls->asMutable()); + childNulls = combinedNulls->as(); + numChildNulls = numIncomingNulls; + } else { + childNulls = structNulls; + numChildNulls = numStructNulls; + } + } const int32_t numChildren = source->read(); auto& children = row->children(); const auto& childTypes = type->asRow().children(); readColumns( - source, pool, childTypes, children, resultOffset, useLosslessTimestamp); + source, + pool, + childTypes, + children, + resultOffset, + useLosslessTimestamp, + childNulls, + numChildNulls); - auto size = source->read(); - // Set the size of the row but do not alter the size of the - // children. The children get adjusted in a separate pass over the - // data. The parent and child size MUST be separate until the second pass. - row->BaseVector::resize(resultOffset + size); + const auto size = source->read(); + const auto numNewValues = sizeWithIncomingNulls(size, numIncomingNulls); + row->resize(resultOffset + numNewValues); + // Read and discard the offsets. The number of offsets is not affected by + // incomingNulls. for (int32_t i = 0; i <= size; ++i) { source->read(); } - readNulls(source, size, *result, resultOffset); + readNulls( + source, size, *result, resultOffset, incomingNulls, numIncomingNulls); } std::string readLengthPrefixedString(ByteInputStream* source) { @@ -972,7 +853,9 @@ void readColumns( const std::vector& types, std::vector& results, vector_size_t resultOffset, - bool useLosslessTimestamp) { + bool useLosslessTimestamp, + const uint64_t* incomingNulls, + int32_t numIncomingNulls) { static const std::unordered_map< TypeKind, std::function> + bool useLosslessTimestamp, + const uint64_t* incomingNulls, + int32_t numIncomingNulls)>> readers = { {TypeKind::BOOLEAN, &read}, {TypeKind::TINYINT, &read}, @@ -1013,7 +898,9 @@ void readColumns( pool, columnResult, resultOffset, - useLosslessTimestamp); + useLosslessTimestamp, + incomingNulls, + numIncomingNulls); } else if (encoding == kDictionary) { readDictionaryVector( source, @@ -1021,7 +908,9 @@ void readColumns( pool, columnResult, resultOffset, - useLosslessTimestamp); + useLosslessTimestamp, + incomingNulls, + numIncomingNulls); } else { checkTypeEncoding(encoding, columnType); const auto it = readers.find(columnType->kind()); @@ -1036,7 +925,214 @@ void readColumns( pool, columnResult, resultOffset, - useLosslessTimestamp); + useLosslessTimestamp, + incomingNulls, + numIncomingNulls); + } + } +} + +// Reads nulls into 'scratch' and returns count of non-nulls. If 'copy' is +// given, returns the null bits in 'copy'. +vector_size_t valueCount( + ByteInputStream* source, + vector_size_t size, + Scratch& scratch, + std::vector* copy = nullptr) { + if (source->readByte() == 0) { + return size; + } + ScratchPtr nullsHolder(scratch); + auto rawNulls = nullsHolder.get(bits::nwords(size)); + auto numBytes = bits::nbytes(size); + source->readBytes(rawNulls, numBytes); + bits::reverseBits(reinterpret_cast(rawNulls), numBytes); + bits::negate(reinterpret_cast(rawNulls), numBytes * 8); + if (copy) { + copy->resize(bits::nwords(size)); + memcpy(copy->data(), rawNulls, numBytes); + } + return bits::countBits(rawNulls, 0, size); +} + +template +void readStructNulls( + ByteInputStream* source, + const TypePtr& type, + bool useLosslessTimestamp, + Scratch& scratch) { + const int32_t size = source->read(); + auto numValues = valueCount(source, size, scratch); + + if constexpr (std::is_same_v) { + source->skip( + numValues * + (useLosslessTimestamp ? sizeof(Timestamp) : sizeof(uint64_t))); + return; + } + source->skip(numValues * sizeof(T)); +} + +template <> +void readStructNulls( + ByteInputStream* source, + const TypePtr& type, + bool /*useLosslessTimestamp*/, + Scratch& scratch) { + const int32_t size = source->read(); + source->skip(size * sizeof(int32_t)); + valueCount(source, size, scratch); + const int32_t dataSize = source->read(); + source->skip(dataSize); +} + +void readStructNullsColumns( + ByteInputStream* source, + const std::vector& types, + bool useLoasslessTimestamp, + Scratch& scratch); + +void readConstantVectorStructNulls( + ByteInputStream* source, + const TypePtr& type, + bool useLosslessTimestamp, + Scratch& scratch) { + const auto size = source->read(); + std::vector childTypes = {type}; + readStructNullsColumns(source, childTypes, useLosslessTimestamp, scratch); +} + +void readDictionaryVectorStructNulls( + ByteInputStream* source, + const TypePtr& type, + bool useLosslessTimestamp, + Scratch& scratch) { + const auto size = source->read(); + std::vector childTypes = {type}; + readStructNullsColumns(source, childTypes, useLosslessTimestamp, scratch); + + // Skip indices. + source->skip(sizeof(int32_t) * size); + + // Skip 3 * 8 bytes of 'instance id'. Velox doesn't use 'instance id' for + // dictionary vectors. + source->skip(24); +} + +void readArrayVectorStructNulls( + ByteInputStream* source, + const TypePtr& type, + bool useLosslessTimestamp, + Scratch& scratch) { + std::vector childTypes = {type->childAt(0)}; + readStructNullsColumns(source, childTypes, useLosslessTimestamp, scratch); + + const vector_size_t size = source->read(); + + source->skip((size + 1) * sizeof(int32_t)); + valueCount(source, size, scratch); +} + +void readMapVectorStructNulls( + ByteInputStream* source, + const TypePtr& type, + bool useLosslessTimestamp, + Scratch& scratch) { + std::vector childTypes = {type->childAt(0), type->childAt(1)}; + readStructNullsColumns(source, childTypes, useLosslessTimestamp, scratch); + + int32_t hashTableSize = source->read(); + if (hashTableSize != -1) { + // Skip over serialized hash table from Presto wire format. + source->skip(hashTableSize * sizeof(int32_t)); + } + + const vector_size_t size = source->read(); + + source->skip((1 + size) * sizeof(int32_t)); + valueCount(source, size, scratch); +} + +void readTimestampWithTimeZoneStructNulls( + ByteInputStream* source, + Scratch& scratch) { + readStructNulls(source, BIGINT(), false, scratch); +} + +void readRowVectorStructNulls( + ByteInputStream* source, + const TypePtr& type, + bool useLosslessTimestamp, + Scratch& scratch) { + if (isTimestampWithTimeZoneType(type)) { + readTimestampWithTimeZoneStructNulls(source, scratch); + return; + } + auto streamPos = source->tellp(); + const int32_t numChildren = source->read(); + const auto& childTypes = type->asRow().children(); + readStructNullsColumns(source, childTypes, useLosslessTimestamp, scratch); + + const auto size = source->read(); + // Read and discard the offsets. The number of offsets is not affected by + // nulls. + source->skip((size + 1) * sizeof(int32_t)); + std::vector nullsCopy; + auto numNonNull = valueCount(source, size, scratch, &nullsCopy); + if (size != numNonNull) { + (*structNullsMap())[streamPos] = + std::pair, int32_t>(std::move(nullsCopy), size); + } +} + +void readStructNullsColumns( + ByteInputStream* source, + const std::vector& types, + bool useLosslessTimestamp, + Scratch& scratch) { + static const std::unordered_map< + TypeKind, + std::function> + readers = { + {TypeKind::BOOLEAN, &readStructNulls}, + {TypeKind::TINYINT, &readStructNulls}, + {TypeKind::SMALLINT, &readStructNulls}, + {TypeKind::INTEGER, &readStructNulls}, + {TypeKind::BIGINT, &readStructNulls}, + {TypeKind::HUGEINT, &readStructNulls}, + {TypeKind::REAL, &readStructNulls}, + {TypeKind::DOUBLE, &readStructNulls}, + {TypeKind::TIMESTAMP, &readStructNulls}, + {TypeKind::VARCHAR, &readStructNulls}, + {TypeKind::VARBINARY, &readStructNulls}, + {TypeKind::ARRAY, &readArrayVectorStructNulls}, + {TypeKind::MAP, &readMapVectorStructNulls}, + {TypeKind::ROW, &readRowVectorStructNulls}, + {TypeKind::UNKNOWN, &readStructNulls}}; + + for (int32_t i = 0; i < types.size(); ++i) { + const auto& columnType = types[i]; + + const auto encoding = readLengthPrefixedString(source); + if (encoding == kRLE) { + readConstantVectorStructNulls( + source, columnType, useLosslessTimestamp, scratch); + } else if (encoding == kDictionary) { + readDictionaryVectorStructNulls( + source, columnType, useLosslessTimestamp, scratch); + } else { + checkTypeEncoding(encoding, columnType); + const auto it = readers.find(columnType->kind()); + VELOX_CHECK( + it != readers.end(), + "Column reader for type {} is missing", + columnType->kindName()); + + it->second(source, columnType, useLosslessTimestamp, scratch); } } } @@ -2285,6 +2381,67 @@ void PrestoVectorSerde::serializeEncoded( ->flushEncoded(vector, out); } +namespace { +bool hasNestedStructs(const TypePtr& type) { + if (type->isRow()) { + return true; + } + if (type->isArray()) { + return hasNestedStructs(type->childAt(0)); + } + if (type->isMap()) { + return hasNestedStructs(type->childAt(0)) || + hasNestedStructs(type->childAt(1)); + } + return false; +} + +bool hasNestedStructs(const std::vector& types) { + for (auto& child : types) { + if (hasNestedStructs(child)) { + return true; + } + } + return false; +} + +void readTopColumns( + ByteInputStream& source, + const RowTypePtr& type, + velox::memory::MemoryPool* pool, + const RowVectorPtr& result, + int32_t resultOffset, + bool useLosslessTimestamp) { + auto& children = result->children(); + const auto& childTypes = type->asRow().children(); + const auto numColumns = source.read(); + VELOX_USER_CHECK_EQ( + numColumns, + type->size(), + "Number of columns in serialized data doesn't match " + "number of columns requested for deserialization"); + + auto guard = folly::makeGuard([&]() { structNullsMap().reset(); }); + + if (hasNestedStructs(childTypes)) { + structNullsMap() = std::make_unique(); + Scratch scratch; + auto position = source.tellp(); + readStructNullsColumns(&source, childTypes, useLosslessTimestamp, scratch); + source.seekp(position); + } + readColumns( + &source, + pool, + childTypes, + children, + resultOffset, + useLosslessTimestamp, + nullptr, + 0); +} +} // namespace + void PrestoVectorSerde::deserialize( ByteInputStream* source, velox::memory::MemoryPool* pool, @@ -2335,18 +2492,9 @@ void PrestoVectorSerde::deserialize( common::compressionKindToString( common::codecTypeToCompressionKind(codec->type()))); - auto& children = (*result)->children(); - const auto& childTypes = type->asRow().children(); if (!needCompression(*codec)) { - const auto numColumns = source->read(); - // TODO Fix call sites and tighten the check to _EQ. - VELOX_USER_CHECK_GE( - numColumns, - type->size(), - "Number of columns in serialized data doesn't match " - "number of columns requested for deserialization"); - readColumns( - source, pool, childTypes, children, resultOffset, useLosslessTimestamp); + readTopColumns( + *source, type, pool, *result, resultOffset, useLosslessTimestamp); } else { auto compressBuf = folly::IOBuf::create(compressedSize); source->readBytes(compressBuf->writableData(), compressedSize); @@ -2356,33 +2504,14 @@ void PrestoVectorSerde::deserialize( uncompress->writableData(), (int32_t)uncompress->length(), 0}; ByteInputStream uncompressedSource({byteRange}); - const auto numColumns = uncompressedSource.read(); - VELOX_USER_CHECK_EQ( - numColumns, - type->size(), - "Number of columns in serialized data doesn't match " - "number of columns requested for deserialization"); - readColumns( - &uncompressedSource, + readTopColumns( + uncompressedSource, + type, pool, - childTypes, - children, + *result, resultOffset, useLosslessTimestamp); } - - scatterStructNulls( - (*result)->size(), 0, nullptr, nullptr, **result, resultOffset); -} - -void testingScatterStructNulls( - vector_size_t size, - vector_size_t scatterSize, - const vector_size_t* scatter, - const uint64_t* incomingNulls, - RowVector& row, - vector_size_t rowOffset) { - scatterStructNulls(size, scatterSize, scatter, incomingNulls, row, rowOffset); } // static diff --git a/velox/serializers/tests/PrestoSerializerTest.cpp b/velox/serializers/tests/PrestoSerializerTest.cpp index 0165c273706c9..fa97b61759401 100644 --- a/velox/serializers/tests/PrestoSerializerTest.cpp +++ b/velox/serializers/tests/PrestoSerializerTest.cpp @@ -436,56 +436,6 @@ TEST_P(PrestoSerializerTest, encodings) { testEncodedRoundTrip(data); } -TEST_P(PrestoSerializerTest, scatterEncoded) { - // Makes a struct with nulls and constant/dictionary encoded children. The - // children need to get gaps where the parent struct has a null. - VectorFuzzer::Options opts; - opts.timestampPrecision = - VectorFuzzer::Options::TimestampPrecision::kMilliSeconds; - opts.nullRatio = 0.1; - VectorFuzzer fuzzer(opts, pool_.get()); - - auto rowType = ROW( - {{"inner", - ROW( - {{"i1", BIGINT()}, - {"i2", VARCHAR()}, - {"i3", ARRAY(INTEGER())}, - {"i4", ROW({{"ii1", BIGINT()}})}})}}); - auto row = fuzzer.fuzzInputRow(rowType); - auto inner = - const_cast(row->childAt(0)->wrappedVector()->as()); - if (!inner->mayHaveNulls()) { - return; - } - auto numNulls = BaseVector::countNulls(inner->nulls(), 0, inner->size()); - auto numNonNull = inner->size() - numNulls; - auto indices = makeIndices(numNonNull, [](auto row) { return row; }); - - inner->children()[0] = BaseVector::createConstant( - BIGINT(), - variant::create(11L), - numNonNull, - pool_.get()); - inner->children()[1] = BaseVector::wrapInDictionary( - BufferPtr(nullptr), indices, numNonNull, inner->childAt(1)); - inner->children()[2] = - BaseVector::wrapInConstant(numNonNull, 3, inner->childAt(2)); - - // i4 is a struct that we wrap in constant. We make ths struct like it was - // read from seriailization, needing scatter for struct nulls. - auto i4 = const_cast( - inner->childAt(3)->wrappedVector()->as()); - auto i4NonNull = i4->mayHaveNulls() - ? i4->size() - BaseVector::countNulls(i4->nulls(), 0, i4->size()) - : i4->size(); - i4->childAt(0)->resize(i4NonNull); - inner->children()[3] = - BaseVector::wrapInConstant(numNonNull, 3, inner->childAt(3)); - serializer::presto::testingScatterStructNulls( - row->size(), row->size(), nullptr, nullptr, *row, 0); -} - TEST_P(PrestoSerializerTest, lazy) { constexpr int kSize = 1000; auto rowVector = makeTestVector(kSize); @@ -504,7 +454,7 @@ TEST_P(PrestoSerializerTest, ioBufRoundTrip) { opts.nullRatio = 0.1; VectorFuzzer fuzzer(opts, pool_.get()); - const size_t numRounds = 20; + const size_t numRounds = 100; for (size_t i = 0; i < numRounds; ++i) { auto rowType = fuzzer.randRowType(); @@ -523,7 +473,7 @@ TEST_P(PrestoSerializerTest, roundTrip) { opts.nullRatio = 0.1; VectorFuzzer fuzzer(opts, pool_.get()); - const size_t numRounds = 20; + const size_t numRounds = 100; for (size_t i = 0; i < numRounds; ++i) { auto rowType = fuzzer.randRowType(); @@ -532,6 +482,23 @@ TEST_P(PrestoSerializerTest, roundTrip) { } } +TEST_P(PrestoSerializerTest, encodedRoundtrip) { + VectorFuzzer::Options opts; + opts.timestampPrecision = + VectorFuzzer::Options::TimestampPrecision::kMilliSeconds; + opts.nullRatio = 0.1; + opts.dictionaryHasNulls = false; + VectorFuzzer fuzzer(opts, pool_.get()); + + const size_t numRounds = 200; + + for (size_t i = 0; i < numRounds; ++i) { + auto rowType = fuzzer.randRowType(); + auto inputRowVector = fuzzer.fuzzInputRow(rowType); + testEncodedRoundTrip(inputRowVector); + } +} + TEST_P(PrestoSerializerTest, emptyArrayOfRowVector) { // The value of nullCount_ + nonNullCount_ of the inner RowVector is 0. auto arrayOfRow = makeArrayOfRowVector(ROW({UNKNOWN()}), {{}});