Skip to content

Commit

Permalink
Optimize count(distinct <complex type>) (#8560)
Browse files Browse the repository at this point in the history
Summary:
Aggregations over distinct inputs use `SetAccumulator<ComplexType>`, which in
turn uses AddressableNonNullValueList to store unique complex type values in a
single non-contiguous allocation within HashStringAllocator.

Storing thousands or millions of values requires allocation with hundrends or
thousands of contiguous pieces. Calling HashStringAllocator::prepareRead on
such allocation ends up populating an `std::vector<ByteRange>` with thousands
of entries. Calling this repeatedly for each value as part of
SetAccumulator::extractValues() becomes very slow.

SetAccumulator::extractValues calls AddressableNonNullValueList::read for each
unique value (there can be millions of these):

    for (const auto& position : base.uniqueValues) {
      AddressableNonNullValueList::read(
          position.first.position, values, offset + position.second);
    }

AddressableNonNullValueList::read calls HashStringAllocator::prepareRead, which
collects thousands of byte ranges into a vector passed to ByteInputStream
constructor:

    auto stream = HashStringAllocator::prepareRead(header);

As a result, queries like count(distrinct <complex type>) are very slow.

A fix is to modify HashStringAllocator::prepareRead to accept an optional limit
on how many bytes to prepare for read, then use this in
AddressableNonNullValueList::read to prepare only as many bytes as needed to
extract a single value.

In addition, do not store hashes of the values in HSA to avoid calling
HashStringAllocator::prepareRead altogether just to fetch the hash.

Added benchmark for SetAccumulator to add 10M mostly unique values and read them
back. Without the optimizations, the benchmark couldn't finish within a
reasonable time (a few mininutes). Changing benchmark to process 100K values
allowed it to compelete.

Before (100K values):

```
============================================================================
[...]enchmarks/SetAccumulatorBenchmark.cpp     relative  time/iter   iters/s
============================================================================
bigint                                                      2.87ms    348.07
varchar                                                    22.22ms     45.01
twoBigints                                                988.08ms      1.01
```

After (100K values):

```
============================================================================
[...]enchmarks/SetAccumulatorBenchmark.cpp     relative  time/iter   iters/s
============================================================================
bigint                                                      2.80ms    356.87
varchar                                                    21.19ms     47.19
twoBigints                                                 38.83ms     25.76
```

After the optimizations, the original benchmark processing 10M values is
finishing within a few seconds.

After (10M values):

```
============================================================================
[...]enchmarks/SetAccumulatorBenchmark.cpp     relative  time/iter   iters/s
============================================================================
bigint                                                       1.23s   814.20m
varchar                                                      2.96s   338.39m
twoBigints                                                   6.30s   158.70m
```

Pull Request resolved: #8560

Reviewed By: Yuhta

Differential Revision: D53130262

Pulled By: mbasmanova

fbshipit-source-id: 9401d56fc8f9d4eecdaa4bd2ef53ae6e5f6f4f07
  • Loading branch information
mbasmanova authored and facebook-github-bot committed Jan 30, 2024
1 parent a113acf commit 5b8b4bd
Show file tree
Hide file tree
Showing 9 changed files with 209 additions and 77 deletions.
13 changes: 12 additions & 1 deletion velox/common/memory/HashStringAllocator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,15 +123,26 @@ void HashStringAllocator::freeToPool(void* ptr, size_t size) {
}

// static
ByteInputStream HashStringAllocator::prepareRead(const Header* begin) {
ByteInputStream HashStringAllocator::prepareRead(
const Header* begin,
size_t maxBytes) {
std::vector<ByteRange> ranges;
auto header = const_cast<Header*>(begin);

size_t totalBytes = 0;

for (;;) {
ranges.push_back(ByteRange{
reinterpret_cast<uint8_t*>(header->begin()), header->usableSize(), 0});
totalBytes += ranges.back().size;
if (!header->isContinued()) {
break;
}

if (totalBytes >= maxBytes) {
break;
}

header = header->nextContinued();
}
return ByteInputStream(std::move(ranges));
Expand Down
6 changes: 5 additions & 1 deletion velox/common/memory/HashStringAllocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,11 @@ class HashStringAllocator : public StreamArena {

// Returns ByteInputStream over the data in the range of 'header' and
// possible continuation ranges.
static ByteInputStream prepareRead(const Header* header);
// @param maxBytes If provided, the returned stream will cover at most that
// many bytes.
static ByteInputStream prepareRead(
const Header* header,
size_t maxBytes = std::numeric_limits<size_t>::max());

// Returns the number of payload bytes between 'header->begin()' and
// 'position'.
Expand Down
49 changes: 23 additions & 26 deletions velox/exec/AddressableNonNullValueList.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

namespace facebook::velox::aggregate::prestosql {

HashStringAllocator::Position AddressableNonNullValueList::append(
AddressableNonNullValueList::Entry AddressableNonNullValueList::append(
const DecodedVector& decoded,
vector_size_t index,
HashStringAllocator* allocator) {
Expand All @@ -37,8 +37,10 @@ HashStringAllocator::Position AddressableNonNullValueList::append(
allocator->extendWrite(currentPosition_, stream);
}

// Write hash.
stream.appendOne(decoded.base()->hashValueAt(decoded.index(index)));
const auto hash = decoded.base()->hashValueAt(decoded.index(index));

const auto originalSize = stream.size();

// Write value.
exec::ContainerRowSerde::serialize(
*decoded.base(), decoded.index(index), stream);
Expand All @@ -47,53 +49,48 @@ HashStringAllocator::Position AddressableNonNullValueList::append(

auto startAndFinish = allocator->finishWrite(stream, 1024);
currentPosition_ = startAndFinish.second;
return startAndFinish.first;

const auto writtenSize = stream.size() - originalSize;

return {startAndFinish.first, writtenSize, hash};
}

namespace {

ByteInputStream prepareRead(
HashStringAllocator::Position position,
bool skipHash) {
auto header = position.header;
auto seek = static_cast<int32_t>(position.position - header->begin());
ByteInputStream prepareRead(const AddressableNonNullValueList::Entry& entry) {
auto header = entry.offset.header;
auto seek = entry.offset.position - header->begin();

auto stream = HashStringAllocator::prepareRead(header);
auto stream = HashStringAllocator::prepareRead(header, entry.size + seek);
stream.seekp(seek);
if (skipHash) {
stream.skip(sizeof(uint64_t));
}
return stream;
}
} // namespace

// static
bool AddressableNonNullValueList::equalTo(
HashStringAllocator::Position left,
HashStringAllocator::Position right,
const Entry& left,
const Entry& right,
const TypePtr& type) {
auto leftStream = prepareRead(left, true /*skipHash*/);
auto rightStream = prepareRead(right, true /*skipHash*/);
if (left.hash != right.hash) {
return false;
}

auto leftStream = prepareRead(left);
auto rightStream = prepareRead(right);

CompareFlags compareFlags =
CompareFlags::equality(CompareFlags::NullHandlingMode::kNullAsValue);
return exec::ContainerRowSerde::compare(
leftStream, rightStream, type.get(), compareFlags) == 0;
}

// static
uint64_t AddressableNonNullValueList::readHash(
HashStringAllocator::Position position) {
auto stream = prepareRead(position, false /*skipHash*/);
return stream.read<uint64_t>();
}

// static
void AddressableNonNullValueList::read(
HashStringAllocator::Position position,
const Entry& position,
BaseVector& result,
vector_size_t index) {
auto stream = prepareRead(position, true /*skipHash*/);
auto stream = prepareRead(position);
exec::ContainerRowSerde::deserialize(stream, index, &result);
}

Expand Down
35 changes: 16 additions & 19 deletions velox/exec/AddressableNonNullValueList.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,33 +30,37 @@ namespace facebook::velox::aggregate::prestosql {
/// set_union.
class AddressableNonNullValueList {
public:
struct Entry {
HashStringAllocator::Position offset;
size_t size;
uint64_t hash;
};

struct Hash {
size_t operator()(HashStringAllocator::Position position) const {
return AddressableNonNullValueList::readHash(position);
size_t operator()(const Entry& key) const {
return key.hash;
}
};

struct EqualTo {
const TypePtr& type;

bool operator()(
HashStringAllocator::Position left,
HashStringAllocator::Position right) const {
bool operator()(const Entry& left, const Entry& right) const {
return AddressableNonNullValueList::equalTo(left, right, type);
}
};

/// Append a non-null value to the end of the list. Returns 'index' that can
/// be used to access the value later.
HashStringAllocator::Position append(
Entry append(
const DecodedVector& decoded,
vector_size_t index,
HashStringAllocator* allocator);

/// Removes last element. 'position' must be a value returned from the latest
/// call to 'append'.
void removeLast(HashStringAllocator::Position position) {
currentPosition_ = position;
void removeLast(const Entry& entry) {
currentPosition_ = entry.offset;
--size_;
}

Expand All @@ -66,19 +70,12 @@ class AddressableNonNullValueList {
}

/// Returns true if elements at 'left' and 'right' are equal.
static bool equalTo(
HashStringAllocator::Position left,
HashStringAllocator::Position right,
const TypePtr& type);

/// Returns the hash of the specified element.
static uint64_t readHash(HashStringAllocator::Position position);
static bool
equalTo(const Entry& left, const Entry& right, const TypePtr& type);

/// Copies the specified element to 'result[index]'.
static void read(
HashStringAllocator::Position position,
BaseVector& result,
vector_size_t index);
static void
read(const Entry& position, BaseVector& result, vector_size_t index);

void free(HashStringAllocator& allocator) {
if (size_ > 0) {
Expand Down
8 changes: 4 additions & 4 deletions velox/exec/SetAccumulator.h
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ struct StringViewSetAccumulator {
struct ComplexTypeSetAccumulator {
/// A set of pointers to values stored in AddressableNonNullValueList.
SetAccumulator<
HashStringAllocator::Position,
AddressableNonNullValueList::Entry,
AddressableNonNullValueList::Hash,
AddressableNonNullValueList::EqualTo>
base;
Expand All @@ -203,12 +203,12 @@ struct ComplexTypeSetAccumulator {
base.nullIndex = cnt;
}
} else {
auto position = values.append(decoded, index, allocator);
auto entry = values.append(decoded, index, allocator);

if (!base.uniqueValues
.insert({position, base.nullIndex.has_value() ? cnt + 1 : cnt})
.insert({entry, base.nullIndex.has_value() ? cnt + 1 : cnt})
.second) {
values.removeLast(position);
values.removeLast(entry);
}
}
}
Expand Down
124 changes: 124 additions & 0 deletions velox/exec/benchmarks/SetAccumulatorBenchmark.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/Benchmark.h>
#include <folly/init/Init.h>

#include "velox/common/memory/Memory.h"
#include "velox/exec/SetAccumulator.h"
#include "velox/vector/fuzzer/VectorFuzzer.h"
#include "velox/vector/tests/utils/VectorTestBase.h"

using namespace facebook::velox;
using namespace facebook::velox::exec;

namespace {

// Adds 10M mostly unique values to a single SetAccumulator, then extracts
// unique values from it.
class SetAccumulatorBenchmark : public facebook::velox::test::VectorTestBase {
public:
void setup() {
VectorFuzzer::Options opts;
opts.vectorSize = 1'000'000;
VectorFuzzer fuzzer(opts, pool());

auto rowType = ROW({"a", "b", "c"}, {BIGINT(), BIGINT(), VARCHAR()});
for (auto i = 0; i < 10; ++i) {
rowVectors_.emplace_back(fuzzer.fuzzInputRow(rowType));
}
}

void runBigint() {
runPrimitive<int64_t>("a");
}

void runVarchar() {
runPrimitive<StringView>("c");
}

void runTwoBigints() {
HashStringAllocator allocator(pool());
const TypePtr type = ROW({BIGINT(), BIGINT()});
aggregate::prestosql::SetAccumulator<ComplexType> accumulator(
type, &allocator);

for (const auto& rowVector : rowVectors_) {
auto vector =
makeRowVector({rowVector->childAt("a"), rowVector->childAt("b")});
DecodedVector decoded(*vector);
for (auto i = 0; i < rowVector->size(); ++i) {
accumulator.addValue(decoded, i, &allocator);
}
}

auto result = BaseVector::create(type, accumulator.size(), pool());
accumulator.extractValues(*result, 0);
folly::doNotOptimizeAway(result);
}

private:
template <typename T>
void runPrimitive(const std::string& name) {
const auto& type = rowVectors_[0]->childAt(name)->type();

HashStringAllocator allocator(pool());
aggregate::prestosql::SetAccumulator<T> accumulator(type, &allocator);

for (const auto& rowVector : rowVectors_) {
DecodedVector decoded(*rowVector->childAt(name));
for (auto i = 0; i < rowVector->size(); ++i) {
accumulator.addValue(decoded, i, &allocator);
}
}

auto result =
BaseVector::create<FlatVector<T>>(type, accumulator.size(), pool());
accumulator.extractValues(*result, 0);
folly::doNotOptimizeAway(result);
}

std::vector<RowVectorPtr> rowVectors_;
};

std::unique_ptr<SetAccumulatorBenchmark> bm;

BENCHMARK(bigint) {
bm->runBigint();
}

BENCHMARK(varchar) {
bm->runVarchar();
}

BENCHMARK(twoBigints) {
bm->runTwoBigints();
}

} // namespace

int main(int argc, char** argv) {
folly::init(&argc, &argv);
memory::MemoryManager::initialize({});

bm = std::make_unique<SetAccumulatorBenchmark>();
bm->setup();

folly::runBenchmarks();

bm.reset();

return 0;
}
Loading

0 comments on commit 5b8b4bd

Please sign in to comment.