Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aggregator support prefetch #9679

Open
wants to merge 53 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
b869b2f
new hash
guo-shaoge Nov 25, 2024
e8a2df8
prefetch done
guo-shaoge Nov 26, 2024
b314166
executeImplBatchStringHashMap done
guo-shaoge Nov 27, 2024
ec6e892
handle resize exception done
guo-shaoge Nov 27, 2024
9dc702d
tmp save
guo-shaoge Nov 27, 2024
ce1f767
revert Serialized Key changes
guo-shaoge Nov 28, 2024
8ac8beb
refine
guo-shaoge Nov 28, 2024
0053ce8
refine
guo-shaoge Nov 28, 2024
fcf8ed2
fix unit test
guo-shaoge Nov 29, 2024
ae7b969
refine
guo-shaoge Dec 2, 2024
3a86617
unit test
guo-shaoge Dec 2, 2024
623fef5
prefetch
guo-shaoge Dec 2, 2024
19f320d
fix
guo-shaoge Dec 2, 2024
3a226df
refine
guo-shaoge Dec 3, 2024
c44ace7
refine
guo-shaoge Dec 3, 2024
3e30f95
revert new hasher
guo-shaoge Dec 3, 2024
ea85d19
debug low distinct value
guo-shaoge Dec 3, 2024
16937ff
Revert "revert new hasher"
guo-shaoge Dec 3, 2024
d2fba57
refine original code path
guo-shaoge Dec 3, 2024
71b6ecd
Reapply "revert new hasher"
guo-shaoge Dec 3, 2024
40ceb08
one level old hash; two level new hash
guo-shaoge Dec 3, 2024
4cb24c2
Revert "one level old hash; two level new hash"
guo-shaoge Dec 4, 2024
c02cf71
revert new hasher; refine original code path
guo-shaoge Dec 4, 2024
352b710
fix case
guo-shaoge Dec 5, 2024
e5ab87c
refine
guo-shaoge Dec 13, 2024
30d99be
is_serialized_key
guo-shaoge Dec 15, 2024
01cae80
fix
guo-shaoge Dec 15, 2024
95c597d
fix start_row
guo-shaoge Dec 15, 2024
83fb879
Merge branch 'master' into hashagg_prefetch
guo-shaoge Dec 16, 2024
e0dea79
revert prefetch for StringHashTable
guo-shaoge Dec 17, 2024
143fee3
comment
guo-shaoge Dec 17, 2024
6cea464
fmt
guo-shaoge Dec 18, 2024
6750737
comment
guo-shaoge Dec 19, 2024
2e94829
prefetch agg data
guo-shaoge Dec 20, 2024
dc0973a
try fix ci
guo-shaoge Dec 22, 2024
9623368
prefetch threshold(2MB); mini batch; prefetch exists key; prefetch in…
guo-shaoge Dec 24, 2024
0ef6ba6
clean code
guo-shaoge Dec 24, 2024
49d4188
fix prefetch agg func is 0
guo-shaoge Dec 24, 2024
b4a1bde
fix case
guo-shaoge Dec 24, 2024
738ab2a
refine agg func is zero
guo-shaoge Dec 25, 2024
361f988
refine
guo-shaoge Dec 25, 2024
5158885
refine comment
guo-shaoge Dec 25, 2024
80b7e22
getHash full
guo-shaoge Dec 26, 2024
c41dba4
minor refine
guo-shaoge Dec 26, 2024
ab31beb
hashvals sizeof 16
guo-shaoge Dec 26, 2024
00cc20d
fix case
guo-shaoge Dec 26, 2024
4aec90b
remove macro
guo-shaoge Dec 27, 2024
606e013
remove macro 2
guo-shaoge Dec 27, 2024
a7361e1
fix
guo-shaoge Dec 27, 2024
bdb21d3
refine comment
guo-shaoge Dec 27, 2024
4153592
refine
guo-shaoge Dec 27, 2024
7660edd
getHash full
guo-shaoge Dec 29, 2024
3301941
refine code
guo-shaoge Dec 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions dbms/src/AggregateFunctions/AggregateFunctionGroupUniqArray.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,18 +182,18 @@ class AggregateFunctionGroupUniqArrayGeneric
{
// We have to copy the keys to our arena.
assert(arena != nullptr);
cur_set.emplace(ArenaKeyHolder{rhs_elem.getValue(), *arena}, it, inserted);
cur_set.emplace(ArenaKeyHolder{rhs_elem.getValue(), arena}, it, inserted);
}
}

void insertResultInto(ConstAggregateDataPtr __restrict place, IColumn & to, Arena *) const override
{
ColumnArray & arr_to = assert_cast<ColumnArray &>(to);
auto & arr_to = assert_cast<ColumnArray &>(to);
ColumnArray::Offsets & offsets_to = arr_to.getOffsets();
IColumn & data_to = arr_to.getData();

auto & set = this->data(place).value;
offsets_to.push_back((offsets_to.size() == 0 ? 0 : offsets_to.back()) + set.size());
offsets_to.push_back((offsets_to.empty() ? 0 : offsets_to.back()) + set.size());

for (auto & elem : set)
deserializeAndInsert<is_plain_column>(elem.getValue(), data_to);
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/AggregateFunctions/KeyHolderHelpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ inline auto getKeyHolder(const IColumn & column, size_t row_num, Arena & arena)
{
if constexpr (is_plain_column)
{
return ArenaKeyHolder{column.getDataAt(row_num), arena};
return ArenaKeyHolder{column.getDataAt(row_num), &arena};
}
else
{
Expand Down
25 changes: 22 additions & 3 deletions dbms/src/Common/ColumnsHashing.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,17 @@ struct HashMethodOneNumber
using Base = columns_hashing_impl::HashMethodBase<Self, Value, Mapped, use_cache>;

const FieldType * vec;
const size_t total_rows;

/// If the keys of a fixed length then key_sizes contains their lengths, empty otherwise.
HashMethodOneNumber(const ColumnRawPtrs & key_columns, const Sizes & /*key_sizes*/, const TiDB::TiDBCollators &)
: total_rows(key_columns[0]->size())
{
vec = &static_cast<const ColumnVector<FieldType> *>(key_columns[0])->getData()[0];
}

explicit HashMethodOneNumber(const IColumn * column)
: total_rows(column->size())
{
vec = &static_cast<const ColumnVector<FieldType> *>(column)->getData()[0];
}
Expand All @@ -82,6 +85,8 @@ struct HashMethodOneNumber
}

const FieldType * getKeyData() const { return vec; }

size_t getTotalRows() const { return total_rows; }
};


Expand All @@ -97,11 +102,13 @@ struct HashMethodString
const IColumn::Offset * offsets;
const UInt8 * chars;
TiDB::TiDBCollatorPtr collator = nullptr;
const size_t total_rows;

HashMethodString(
const ColumnRawPtrs & key_columns,
const Sizes & /*key_sizes*/,
const TiDB::TiDBCollators & collators)
: total_rows(key_columns[0]->size())
{
const IColumn & column = *key_columns[0];
const auto & column_string = assert_cast<const ColumnString &>(column);
Expand All @@ -128,7 +135,7 @@ struct HashMethodString
{
if (likely(collator))
key = collator->sortKey(key.data, key.size, sort_key_containers[0]);
return ArenaKeyHolder{key, *pool};
return ArenaKeyHolder{key, pool};
}
else
{
Expand All @@ -149,8 +156,10 @@ struct HashMethodStringBin

const IColumn::Offset * offsets;
const UInt8 * chars;
const size_t total_rows;

HashMethodStringBin(const ColumnRawPtrs & key_columns, const Sizes & /*key_sizes*/, const TiDB::TiDBCollators &)
: total_rows(key_columns[0]->size())
{
const IColumn & column = *key_columns[0];
const auto & column_string = assert_cast<const ColumnString &>(column);
Expand All @@ -163,7 +172,7 @@ struct HashMethodStringBin
auto last_offset = row == 0 ? 0 : offsets[row - 1];
StringRef key(chars + last_offset, offsets[row] - last_offset - 1);
key = BinCollatorSortKey<padding>(key.data, key.size);
return ArenaKeyHolder{key, *pool};
return ArenaKeyHolder{key, pool};
}

protected:
Expand Down Expand Up @@ -346,10 +355,12 @@ struct HashMethodFastPathTwoKeysSerialized

Key1Desc key_1_desc;
Key2Desc key_2_desc;
const size_t total_rows;

HashMethodFastPathTwoKeysSerialized(const ColumnRawPtrs & key_columns, const Sizes &, const TiDB::TiDBCollators &)
: key_1_desc(key_columns[0])
, key_2_desc(key_columns[1])
, total_rows(key_columns[0]->size())
{}

ALWAYS_INLINE inline auto getKeyHolder(ssize_t row, Arena * pool, std::vector<String> &) const
Expand Down Expand Up @@ -384,11 +395,13 @@ struct HashMethodFixedString
size_t n;
const ColumnFixedString::Chars_t * chars;
TiDB::TiDBCollatorPtr collator = nullptr;
const size_t total_rows;
guo-shaoge marked this conversation as resolved.
Show resolved Hide resolved

HashMethodFixedString(
const ColumnRawPtrs & key_columns,
const Sizes & /*key_sizes*/,
const TiDB::TiDBCollators & collators)
: total_rows(key_columns[0]->size())
{
const IColumn & column = *key_columns[0];
const auto & column_string = assert_cast<const ColumnFixedString &>(column);
Expand All @@ -412,7 +425,7 @@ struct HashMethodFixedString

if constexpr (place_string_to_arena)
{
return ArenaKeyHolder{key, *pool};
return ArenaKeyHolder{key, pool};
}
else
{
Expand Down Expand Up @@ -442,6 +455,7 @@ struct HashMethodKeysFixed

Sizes key_sizes;
size_t keys_size;
const size_t total_rows;

/// SSSE3 shuffle method can be used. Shuffle masks will be calculated and stored here.
#if defined(__SSSE3__) && !defined(MEMORY_SANITIZER)
Expand All @@ -467,6 +481,7 @@ struct HashMethodKeysFixed
: Base(key_columns)
, key_sizes(std::move(key_sizes_))
, keys_size(key_columns.size())
, total_rows(key_columns[0]->size())
{
if (usePreparedKeys(key_sizes))
{
Expand Down Expand Up @@ -596,6 +611,7 @@ struct HashMethodSerialized
ColumnRawPtrs key_columns;
size_t keys_size;
TiDB::TiDBCollators collators;
const size_t total_rows;

HashMethodSerialized(
const ColumnRawPtrs & key_columns_,
Expand All @@ -604,6 +620,7 @@ struct HashMethodSerialized
: key_columns(key_columns_)
, keys_size(key_columns_.size())
, collators(collators_)
, total_rows(key_columns_[0]->size())
{}

ALWAYS_INLINE inline SerializedKeyHolder getKeyHolder(
Expand Down Expand Up @@ -631,10 +648,12 @@ struct HashMethodHashed

ColumnRawPtrs key_columns;
TiDB::TiDBCollators collators;
const size_t total_rows;

HashMethodHashed(ColumnRawPtrs key_columns_, const Sizes &, const TiDB::TiDBCollators & collators_)
: key_columns(std::move(key_columns_))
, collators(collators_)
, total_rows(key_columns[0]->size())
{}

ALWAYS_INLINE inline Key getKeyHolder(size_t row, Arena *, std::vector<String> & sort_key_containers) const
Expand Down
104 changes: 90 additions & 14 deletions dbms/src/Common/ColumnsHashingImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <Columns/IColumn.h>
#include <Common/HashTable/HashTable.h>
#include <Common/HashTable/HashTableKeyHolder.h>
#include <Common/HashTable/StringHashTable.h>
#include <Common/assert_cast.h>
#include <Functions/FunctionHelpers.h>
#include <Interpreters/AggregationCommon.h>
Expand Down Expand Up @@ -127,37 +128,104 @@ class HashMethodBase
using FindResult = FindResultImpl<Mapped>;
static constexpr bool has_mapped = !std::is_same<Mapped, VoidMapped>::value;
using Cache = LastElementCache<Value, consecutive_keys_optimization>;
static constexpr size_t prefetch_step = 16;

template <typename Data>
template <bool enable_prefetch = false, typename Data>
ALWAYS_INLINE inline EmplaceResult emplaceKey(
Data & data,
size_t row,
Arena & pool,
std::vector<String> & sort_key_containers)
std::vector<String> & sort_key_containers,
const std::vector<size_t> & hashvals = {})
{
auto key_holder = static_cast<Derived &>(*this).getKeyHolder(row, &pool, sort_key_containers);
return emplaceImpl(key_holder, data);
if constexpr (enable_prefetch)
{
const auto idx = row + prefetch_step;
if (idx < hashvals.size())
data.prefetch(hashvals[idx]);

return emplaceImpl<true>(key_holder, data, hashvals[row]);
}
else
{
return emplaceImpl<false>(key_holder, data, 0);
}
}

template <typename Data>
template <bool enable_prefetch = false, typename Data>
ALWAYS_INLINE inline FindResult findKey(
Data & data,
size_t row,
Arena & pool,
std::vector<String> & sort_key_containers)
std::vector<String> & sort_key_containers,
const std::vector<size_t> & hashvals = {})
{
auto key_holder = static_cast<Derived &>(*this).getKeyHolder(row, &pool, sort_key_containers);
return findKeyImpl(keyHolderGetKey(key_holder), data);
if constexpr (enable_prefetch)
{
const auto idx = row + prefetch_step;
if (idx < hashvals.size())
data.prefetch(hashvals[idx]);

return findKeyImpl<true>(keyHolderGetKey(key_holder), data, hashvals[row]);
}
else
{
return findKeyImpl<false>(keyHolderGetKey(key_holder), data, 0);
}
}

// TODO emplaceStringKey merge with emplaceKey?
template <size_t SubMapIndex, bool enable_prefetch = false, typename Data, typename StringKeyType>
ALWAYS_INLINE inline EmplaceResult emplaceStringKey(
Data & data,
size_t idx,
std::vector<StringKeyType> & datas, // TODO const
const std::vector<size_t> & hashvals)
{
auto & submap = StringHashTableSubMapSelector<SubMapIndex, Data::is_two_level, std::decay_t<Data>>::getSubMap(
hashvals[idx],
data);
if constexpr (enable_prefetch)
{
const auto prefetch_idx = idx + prefetch_step;
if (prefetch_idx < hashvals.size())
submap.prefetch(hashvals[prefetch_idx]);
}

return emplaceImpl<true>(datas[idx], submap, hashvals[idx]);
}

// TODO Macro with emplaceStringKey
template <size_t SubMapIndex, bool enable_prefetch = false, typename Data, typename StringKeyType>
ALWAYS_INLINE inline FindResult findStringKey(
Data & data,
size_t idx,
std::vector<StringKeyType> & datas, // TODO const
const std::vector<size_t> & hashvals)
{
auto & submap = StringHashTableSubMapSelector<SubMapIndex, Data::is_two_level, std::decay_t<Data>>::getSubMap(
hashvals[idx],
data);
if constexpr (enable_prefetch)
{
const auto prefetch_idx = idx + prefetch_step;
if (prefetch_idx < hashvals.size())
submap.prefetch(hashvals[prefetch_idx]);
}

return findKeyImpl<true>(keyHolderGetKey(datas[idx]), submap, hashvals[idx]);
}

template <typename Data>
ALWAYS_INLINE inline size_t getHash(
const Data & data,
size_t row,
Arena & pool,
std::vector<String> & sort_key_containers)
std::vector<String> & sort_key_containers) const
{
auto key_holder = static_cast<Derived &>(*this).getKeyHolder(row, &pool, sort_key_containers);
auto key_holder = static_cast<const Derived &>(*this).getKeyHolder(row, &pool, sort_key_containers);
return data.hash(keyHolderGetKey(key_holder));
}

Expand All @@ -179,8 +247,8 @@ class HashMethodBase
}
}

template <typename Data, typename KeyHolder>
ALWAYS_INLINE inline EmplaceResult emplaceImpl(KeyHolder & key_holder, Data & data)
template <bool use_hashval, typename Data, typename KeyHolder>
ALWAYS_INLINE inline EmplaceResult emplaceImpl(KeyHolder & key_holder, Data & data, size_t hashval)
{
if constexpr (Cache::consecutive_keys_optimization)
{
Expand All @@ -195,7 +263,11 @@ class HashMethodBase

typename Data::LookupResult it;
bool inserted = false;
data.emplace(key_holder, it, inserted);

if constexpr (use_hashval)
data.emplace(key_holder, it, inserted, hashval);
else
data.emplace(key_holder, it, inserted);

[[maybe_unused]] Mapped * cached = nullptr;
if constexpr (has_mapped)
Expand Down Expand Up @@ -232,8 +304,8 @@ class HashMethodBase
return EmplaceResult(inserted);
}

template <typename Data, typename Key>
ALWAYS_INLINE inline FindResult findKeyImpl(Key key, Data & data)
template <bool use_hashval, typename Data, typename Key>
ALWAYS_INLINE inline FindResult findKeyImpl(Key & key, Data & data, size_t hashval)
{
if constexpr (Cache::consecutive_keys_optimization)
{
Expand All @@ -246,7 +318,11 @@ class HashMethodBase
}
}

auto it = data.find(key);
typename Data::LookupResult it;
if constexpr (use_hashval)
it = data.find(key, hashval);
else
it = data.find(key);

if constexpr (consecutive_keys_optimization)
{
Expand Down
5 changes: 4 additions & 1 deletion dbms/src/Common/HashTable/FixedHashTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,6 @@ class FixedHashTable
typename cell_type::CellExt cell;
};


public:
using key_type = Key;
using mapped_type = typename Cell::mapped_type;
Expand All @@ -222,6 +221,8 @@ class FixedHashTable
using LookupResult = Cell *;
using ConstLookupResult = const Cell *;

static constexpr bool is_string_hash_map = false;
static constexpr bool is_two_level = false;

size_t hash(const Key & x) const { return x; }

Expand Down Expand Up @@ -352,6 +353,8 @@ class FixedHashTable

iterator end() { return iterator(this, buf ? buf + NUM_CELLS : buf); }

inline void prefetch(size_t) {}

/// The last parameter is unused but exists for compatibility with HashTable interface.
void ALWAYS_INLINE emplace(const Key & x, LookupResult & it, bool & inserted, size_t /* hash */ = 0)
{
Expand Down
Loading