Skip to content

Commit

Permalink
Encode the minimum necessary values from a dictionary in PrestoSerial…
Browse files Browse the repository at this point in the history
…izer (facebookincubator#8639)

Summary:
Pull Request resolved: facebookincubator#8639

There are cases where the values in a DictionaryVector may be much larger than what's actually needed.  E.g. some UDFs
use DictionaryVectors to filter values, like array subscript.  When we use PrestoBatchSerializer in PartitionedOutput with a
single destination, I suspect we'll run into similar issues if we have a large RowVector that we break into smaller pieces, we
don't necessarily want to send the whole alphabet for each piece.

This change adds logic to serializeDictionaryVector to determine the minimum subset of the alphabet to include in the
serialized output.  It also updates the indices to point to the correct values in this minimal dictionary.

Reviewed By: bikramSingh91

Differential Revision: D53327612

fbshipit-source-id: 1651554806d2124d982cca122fd707fcc84cab95
  • Loading branch information
Kevin Wilfong authored and facebook-github-bot committed Feb 21, 2024
1 parent 07ece26 commit 032358e
Show file tree
Hide file tree
Showing 2 changed files with 184 additions and 39 deletions.
139 changes: 100 additions & 39 deletions velox/serializers/PrestoSerializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1948,7 +1948,8 @@ void serializeFlatVector<TypeKind::BOOLEAN>(
void serializeColumn(
const BaseVector* vector,
const folly::Range<const IndexRange*>& ranges,
VectorStream* stream);
VectorStream* stream,
Scratch& scratch);

void serializeColumn(
const BaseVector* vector,
Expand All @@ -1959,7 +1960,8 @@ void serializeColumn(
void serializeWrapped(
const BaseVector* vector,
const folly::Range<const IndexRange*>& ranges,
VectorStream* stream) {
VectorStream* stream,
Scratch& scratch) {
std::vector<IndexRange> newRanges;
const bool mayHaveNulls = vector->mayHaveNulls();
const BaseVector* wrapped = vector->wrappedVector();
Expand All @@ -1969,7 +1971,7 @@ void serializeWrapped(
if (mayHaveNulls && vector->isNullAt(offset)) {
// The wrapper added a null.
if (!newRanges.empty()) {
serializeColumn(wrapped, newRanges, stream);
serializeColumn(wrapped, newRanges, stream, scratch);
newRanges.clear();
}
stream->appendNull();
Expand All @@ -1980,7 +1982,7 @@ void serializeWrapped(
}
}
if (!newRanges.empty()) {
serializeColumn(wrapped, newRanges, stream);
serializeColumn(wrapped, newRanges, stream, scratch);
}
}

Expand All @@ -2006,7 +2008,8 @@ void serializeTimestampWithTimeZone(
void serializeRowVector(
const BaseVector* vector,
const folly::Range<const IndexRange*>& ranges,
VectorStream* stream) {
VectorStream* stream,
Scratch& scratch) {
auto rowVector = dynamic_cast<const RowVector*>(vector);

if (isTimestampWithTimeZoneType(vector->type())) {
Expand All @@ -2030,14 +2033,15 @@ void serializeRowVector(
}
for (int32_t i = 0; i < rowVector->childrenSize(); ++i) {
serializeColumn(
rowVector->childAt(i).get(), childRanges, stream->childAt(i));
rowVector->childAt(i).get(), childRanges, stream->childAt(i), scratch);
}
}

void serializeArrayVector(
const BaseVector* vector,
const folly::Range<const IndexRange*>& ranges,
VectorStream* stream) {
VectorStream* stream,
Scratch& scratch) {
auto arrayVector = dynamic_cast<const ArrayVector*>(vector);
auto rawSizes = arrayVector->rawSizes();
auto rawOffsets = arrayVector->rawOffsets();
Expand All @@ -2060,13 +2064,14 @@ void serializeArrayVector(
}
}
serializeColumn(
arrayVector->elements().get(), childRanges, stream->childAt(0));
arrayVector->elements().get(), childRanges, stream->childAt(0), scratch);
}

void serializeMapVector(
const BaseVector* vector,
const folly::Range<const IndexRange*>& ranges,
VectorStream* stream) {
VectorStream* stream,
Scratch& scratch) {
auto mapVector = dynamic_cast<const MapVector*>(vector);
auto rawSizes = mapVector->rawSizes();
auto rawOffsets = mapVector->rawOffsets();
Expand All @@ -2088,9 +2093,10 @@ void serializeMapVector(
}
}
}
serializeColumn(mapVector->mapKeys().get(), childRanges, stream->childAt(0));
serializeColumn(
mapVector->mapValues().get(), childRanges, stream->childAt(1));
mapVector->mapKeys().get(), childRanges, stream->childAt(0), scratch);
serializeColumn(
mapVector->mapValues().get(), childRanges, stream->childAt(1), scratch);
}

static inline int32_t rangesTotalSize(
Expand All @@ -2106,40 +2112,81 @@ template <TypeKind Kind>
void serializeDictionaryVector(
const BaseVector* vector,
const folly::Range<const IndexRange*>& ranges,
VectorStream* stream) {
VectorStream* stream,
Scratch& scratch) {
// Cannot serialize dictionary as PrestoPage dictionary if it has nulls.
// Also check if the stream was set up for dictionary (we had to know the
// encoding type when creating VectorStream for that).
if (vector->nulls() || !stream->isDictionaryStream()) {
serializeWrapped(vector, ranges, stream);
serializeWrapped(vector, ranges, stream, scratch);
return;
}

using T = typename KindToFlatVector<Kind>::WrapperType;
auto dictionaryVector = vector->as<DictionaryVector<T>>();

std::vector<IndexRange> childRanges;
childRanges.push_back({0, dictionaryVector->valueVector()->size()});
// Create a bit set to track which values in the Dictionary are used.
ScratchPtr<uint64_t, 64> usedIndicesHolder(scratch);
auto usedIndicesCapacity =
bits::nwords(dictionaryVector->valueVector()->size());
auto* usedIndices = usedIndicesHolder.get(usedIndicesCapacity);
simd::memset(usedIndices, 0, usedIndicesCapacity * sizeof(uint64_t));

auto* indices = dictionaryVector->indices()->template as<vector_size_t>();
vector_size_t numRows = 0;
for (const auto& range : ranges) {
numRows += range.size;
for (auto i = 0; i < range.size; ++i) {
bits::setBit(usedIndices, indices[range.begin + i]);
}
}

// Convert the bitset to a list of the used indices.
ScratchPtr<vector_size_t, 64> selectedIndicesHolder(scratch);
auto* mutableSelectedIndices =
selectedIndicesHolder.get(dictionaryVector->valueVector()->size());
auto numUsed = simd::indicesOfSetBits(
usedIndices,
0,
dictionaryVector->valueVector()->size(),
mutableSelectedIndices);

// Serialize the used elements from the Dictionary.
serializeColumn(
dictionaryVector->valueVector().get(), childRanges, stream->childAt(0));
dictionaryVector->valueVector().get(),
folly::Range<const vector_size_t*>(mutableSelectedIndices, numUsed),
stream->childAt(0),
scratch);

// Create a mapping from the original indices to the indices in the shrunk
// Dictionary of just used values.
ScratchPtr<vector_size_t, 64> updatedIndicesHolder(scratch);
auto* updatedIndices =
updatedIndicesHolder.get(dictionaryVector->valueVector()->size());
vector_size_t curIndex = 0;
for (vector_size_t i = 0; i < numUsed; ++i) {
updatedIndices[mutableSelectedIndices[i]] = curIndex++;
}

const BufferPtr& indices = dictionaryVector->indices();
auto* rawIndices = indices->as<vector_size_t>();
// Write out the indices, translating them using the above mapping.
stream->appendNonNull(numRows);
for (const auto& range : ranges) {
stream->appendNonNull(range.size);
stream->append<int32_t>(folly::Range(&rawIndices[range.begin], range.size));
for (auto i = 0; i < range.size; ++i) {
stream->appendOne(updatedIndices[indices[range.begin + i]]);
}
}
}

template <TypeKind kind>
void serializeConstantVectorImpl(
const BaseVector* vector,
const folly::Range<const IndexRange*>& ranges,
VectorStream* stream) {
VectorStream* stream,
Scratch& scratch) {
using T = typename KindToFlatVector<kind>::WrapperType;
auto constVector = dynamic_cast<const ConstantVector<T>*>(vector);
if (constVector->valueVector() != nullptr) {
serializeWrapped(constVector, ranges, stream);
serializeWrapped(constVector, ranges, stream, scratch);
return;
}

Expand All @@ -2162,17 +2209,19 @@ template <TypeKind Kind>
void serializeConstantVector(
const BaseVector* vector,
const folly::Range<const IndexRange*>& ranges,
VectorStream* stream) {
VectorStream* stream,
Scratch& scratch) {
if (stream->isConstantStream()) {
for (const auto& range : ranges) {
stream->appendNonNull(range.size);
}

std::vector<IndexRange> newRanges;
newRanges.push_back({0, 1});
serializeConstantVectorImpl<Kind>(vector, newRanges, stream->childAt(0));
serializeConstantVectorImpl<Kind>(
vector, newRanges, stream->childAt(0), scratch);
} else {
serializeConstantVectorImpl<Kind>(vector, ranges, stream);
serializeConstantVectorImpl<Kind>(vector, ranges, stream, scratch);
}
}

Expand Down Expand Up @@ -2208,23 +2257,30 @@ void serializeBiasVector(
void serializeColumn(
const BaseVector* vector,
const folly::Range<const IndexRange*>& ranges,
VectorStream* stream) {
VectorStream* stream,
Scratch& scratch) {
switch (vector->encoding()) {
case VectorEncoding::Simple::FLAT:
VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH_ALL(
serializeFlatVector, vector->typeKind(), vector, ranges, stream);
break;
case VectorEncoding::Simple::CONSTANT:
VELOX_DYNAMIC_TYPE_DISPATCH_ALL(
serializeConstantVector, vector->typeKind(), vector, ranges, stream);
serializeConstantVector,
vector->typeKind(),
vector,
ranges,
stream,
scratch);
break;
case VectorEncoding::Simple::DICTIONARY:
VELOX_DYNAMIC_TYPE_DISPATCH_ALL(
serializeDictionaryVector,
vector->typeKind(),
vector,
ranges,
stream);
stream,
scratch);
break;
case VectorEncoding::Simple::BIASED:
switch (vector->typeKind()) {
Expand All @@ -2244,19 +2300,19 @@ void serializeColumn(
}
break;
case VectorEncoding::Simple::ROW:
serializeRowVector(vector, ranges, stream);
serializeRowVector(vector, ranges, stream, scratch);
break;
case VectorEncoding::Simple::ARRAY:
serializeArrayVector(vector, ranges, stream);
serializeArrayVector(vector, ranges, stream, scratch);
break;
case VectorEncoding::Simple::MAP:
serializeMapVector(vector, ranges, stream);
serializeMapVector(vector, ranges, stream, scratch);
break;
case VectorEncoding::Simple::LAZY:
serializeColumn(vector->loadedVector(), ranges, stream);
serializeColumn(vector->loadedVector(), ranges, stream, scratch);
break;
default:
serializeWrapped(vector, ranges, stream);
serializeWrapped(vector, ranges, stream, scratch);
}
}

Expand Down Expand Up @@ -2752,7 +2808,8 @@ void serializeArrayVector(
serializeColumn(
arrayVector->elements().get(),
folly::Range<const IndexRange*>(rangesHolder.get(), numRanges),
stream->childAt(0));
stream->childAt(0),
scratch);
}

void serializeMapVector(
Expand All @@ -2779,11 +2836,13 @@ void serializeMapVector(
serializeColumn(
mapVector->mapKeys().get(),
folly::Range<const IndexRange*>(rangesHolder.get(), numRanges),
stream->childAt(0));
stream->childAt(0),
scratch);
serializeColumn(
mapVector->mapValues().get(),
folly::Range<const IndexRange*>(rangesHolder.get(), numRanges),
stream->childAt(1));
stream->childAt(1),
scratch);
}

template <TypeKind kind>
Expand Down Expand Up @@ -3562,7 +3621,7 @@ class PrestoBatchVectorSerializer : public BatchVectorSerializer {
void serialize(
const RowVectorPtr& vector,
const folly::Range<const IndexRange*>& ranges,
Scratch& /* scratch */,
Scratch& scratch,
OutputStream* stream) override {
const auto numRows = rangesTotalSize(ranges);
const auto rowType = vector->type();
Expand All @@ -3579,7 +3638,8 @@ class PrestoBatchVectorSerializer : public BatchVectorSerializer {
numRows,
opts_);

serializeColumn(vector->childAt(i).get(), ranges, streams[i].get());
serializeColumn(
vector->childAt(i).get(), ranges, streams[i].get(), scratch);
}

flushStreams(streams, numRows, arena, *codec_, stream);
Expand Down Expand Up @@ -3620,7 +3680,8 @@ class PrestoIterativeVectorSerializer : public IterativeVectorSerializer {
}
numRows_ += numNewRows;
for (int32_t i = 0; i < vector->childrenSize(); ++i) {
serializeColumn(vector->childAt(i).get(), ranges, streams_[i].get());
serializeColumn(
vector->childAt(i).get(), ranges, streams_[i].get(), scratch);
}
}

Expand Down
Loading

0 comments on commit 032358e

Please sign in to comment.