Skip to content

Commit

Permalink
[opt](join) Opt the performance of join probe
Browse files Browse the repository at this point in the history
  • Loading branch information
HappenLee committed Jul 17, 2023
1 parent 014b34b commit b352983
Show file tree
Hide file tree
Showing 19 changed files with 108 additions and 134 deletions.
3 changes: 1 addition & 2 deletions be/src/vec/columns/column.h
Original file line number Diff line number Diff line change
Expand Up @@ -488,8 +488,7 @@ class IColumn : public COW<IColumn> {
* If `begin` and `count_sz` specified, it means elements in range [`begin`, `begin` + `count_sz`) will be replicated.
* If `count_sz` is -1, `begin` must be 0.
*/
virtual void replicate(const uint32_t* counts, size_t target_size, IColumn& column,
size_t begin = 0, int count_sz = -1) const {
virtual void replicate(const uint32_t* indexs, size_t target_size, IColumn& column) const {
LOG(FATAL) << "not support";
}

Expand Down
33 changes: 15 additions & 18 deletions be/src/vec/columns/column_array.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -864,32 +864,29 @@ ColumnPtr ColumnArray::replicate(const IColumn::Offsets& replicate_offsets) cons
return replicate_generic(replicate_offsets);
}

void ColumnArray::replicate(const uint32_t* counts, size_t target_size, IColumn& column,
size_t begin, int count_sz) const {
size_t col_size = count_sz < 0 ? size() : count_sz;
if (col_size == 0) {
void ColumnArray::replicate(const uint32_t* indexs, size_t target_size, IColumn& column) const {
if (target_size == 0) {
return;
}
auto total_size = get_offsets().size();
// |---------------------|-------------------------|-------------------------|
// [0, begin) [begin, begin + count_sz) [begin + count_sz, size())
// do not need to copy copy counts[n] times do not need to copy
IColumn::Offsets replicate_offsets(get_offsets().size(), 0);
size_t cur_offset = 0;
size_t end = begin + col_size;
IColumn::Offsets replicate_offsets(total_size, 0);
// copy original data at offset n counts[n] times
for (size_t i = begin; i < end; ++i) {
cur_offset += counts[i];
replicate_offsets[i] = cur_offset;
}
// ignored
for (size_t i = end; i < size(); ++i) {
replicate_offsets[i] = replicate_offsets[i - 1];
auto begin = 0, end = 0;
while (begin < target_size) {
while (end < target_size && indexs[begin] == indexs[end]) {
end++;
}
long index = indexs[begin];
replicate_offsets[index] = end - begin;
begin = end;
}

if (cur_offset != target_size) {
LOG(WARNING) << "ColumnArray replicate input target_size:" << target_size
<< " not equal SUM(counts):" << cur_offset;
return;
// ignored
for (size_t i = 1; i < total_size; ++i) {
replicate_offsets[i] += replicate_offsets[i - 1];
}

auto rep_res = replicate(replicate_offsets);
Expand Down
3 changes: 1 addition & 2 deletions be/src/vec/columns/column_array.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,7 @@ class ColumnArray final : public COWHelper<IColumn, ColumnArray> {
size_t allocated_bytes() const override;
void protect() override;
ColumnPtr replicate(const IColumn::Offsets& replicate_offsets) const override;
void replicate(const uint32_t* counts, size_t target_size, IColumn& column, size_t begin = 0,
int count_sz = -1) const override;
void replicate(const uint32_t* counts, size_t target_size, IColumn& column) const override;
ColumnPtr convert_to_full_column_if_const() const override;
void get_extremes(Field& min, Field& max) const override {
LOG(FATAL) << "get_extremes not implemented";
Expand Down
20 changes: 6 additions & 14 deletions be/src/vec/columns/column_complex.h
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,7 @@ class ColumnComplexType final : public COWHelper<IColumn, ColumnComplexType<T>>

ColumnPtr replicate(const IColumn::Offsets& replicate_offsets) const override;

void replicate(const uint32_t* counts, size_t target_size, IColumn& column, size_t begin = 0,
int count_sz = -1) const override;
void replicate(const uint32_t* indexs, size_t target_size, IColumn& column) const override;

[[noreturn]] MutableColumns scatter(IColumn::ColumnIndex num_columns,
const IColumn::Selector& selector) const override {
Expand Down Expand Up @@ -410,21 +409,14 @@ ColumnPtr ColumnComplexType<T>::replicate(const IColumn::Offsets& offsets) const
}

template <typename T>
void ColumnComplexType<T>::replicate(const uint32_t* counts, size_t target_size, IColumn& column,
size_t begin, int count_sz) const {
size_t size = count_sz < 0 ? data.size() : count_sz;
if (0 == size) return;

void ColumnComplexType<T>::replicate(const uint32_t* indexs, size_t target_size,
IColumn& column) const {
auto& res = reinterpret_cast<ColumnComplexType<T>&>(column);
typename Self::Container& res_data = res.get_data();
res_data.reserve(target_size);
res_data.resize(target_size);

size_t end = size + begin;
for (size_t i = begin; i < end; ++i) {
size_t size_to_replicate = counts[i];
for (size_t j = 0; j < size_to_replicate; ++j) {
res_data.push_back(data[i]);
}
for (size_t i = 0; i < target_size; ++i) {
res_data[i] = data[indexs[i]];
}
}

Expand Down
5 changes: 2 additions & 3 deletions be/src/vec/columns/column_const.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,10 @@ ColumnPtr ColumnConst::replicate(const Offsets& offsets) const {
return ColumnConst::create(data, replicated_size);
}

void ColumnConst::replicate(const uint32_t* counts, size_t target_size, IColumn& column,
size_t begin, int count_sz) const {
void ColumnConst::replicate(const uint32_t* counts, size_t target_size, IColumn& column) const {
if (s == 0) return;
auto& res = reinterpret_cast<ColumnConst&>(column);
res.s = s;
res.s = target_size;
}

ColumnPtr ColumnConst::permute(const Permutation& perm, size_t limit) const {
Expand Down
3 changes: 1 addition & 2 deletions be/src/vec/columns/column_const.h
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,7 @@ class ColumnConst final : public COWHelper<IColumn, ColumnConst> {
size_t filter(const Filter& filter) override;

ColumnPtr replicate(const Offsets& offsets) const override;
void replicate(const uint32_t* counts, size_t target_size, IColumn& column, size_t begin = 0,
int count_sz = -1) const override;
void replicate(const uint32_t* indexs, size_t target_size, IColumn& column) const override;
ColumnPtr permute(const Permutation& perm, size_t limit) const override;
// ColumnPtr index(const IColumn & indexes, size_t limit) const override;
void get_permutation(bool reverse, size_t limit, int nan_direction_hint,
Expand Down
14 changes: 5 additions & 9 deletions be/src/vec/columns/column_decimal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -440,18 +440,14 @@ ColumnPtr ColumnDecimal<T>::replicate(const IColumn::Offsets& offsets) const {
}

template <typename T>
void ColumnDecimal<T>::replicate(const uint32_t* counts, size_t target_size, IColumn& column,
size_t begin, int count_sz) const {
size_t size = count_sz < 0 ? data.size() : count_sz;
if (0 == size) return;

void ColumnDecimal<T>::replicate(const uint32_t* __restrict indexs, size_t target_size,
IColumn& column) const {
auto& res = reinterpret_cast<ColumnDecimal<T>&>(column);
typename Self::Container& res_data = res.get_data();
res_data.reserve(target_size);
res_data.resize(target_size);

size_t end = size + begin;
for (size_t i = begin; i < end; ++i) {
res_data.add_num_element_without_reserve(data[i], counts[i]);
for (size_t i = 0; i < target_size; ++i) {
res_data[i] = data[indexs[i]];
}
}

Expand Down
3 changes: 1 addition & 2 deletions be/src/vec/columns/column_decimal.h
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,7 @@ class ColumnDecimal final : public COWHelper<ColumnVectorHelper, ColumnDecimal<T

ColumnPtr replicate(const IColumn::Offsets& offsets) const override;

void replicate(const uint32_t* counts, size_t target_size, IColumn& column, size_t begin = 0,
int count_sz = -1) const override;
void replicate(const uint32_t* indexs, size_t target_size, IColumn& column) const override;

TypeIndex get_data_type() const override { return TypeId<T>::value; }

Expand Down
8 changes: 3 additions & 5 deletions be/src/vec/columns/column_nullable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -578,12 +578,10 @@ ColumnPtr ColumnNullable::replicate(const Offsets& offsets) const {
return ColumnNullable::create(replicated_data, replicated_null_map);
}

void ColumnNullable::replicate(const uint32_t* counts, size_t target_size, IColumn& column,
size_t begin, int count_sz) const {
void ColumnNullable::replicate(const uint32_t* counts, size_t target_size, IColumn& column) const {
auto& res = reinterpret_cast<ColumnNullable&>(column);
get_nested_column().replicate(counts, target_size, res.get_nested_column(), begin, count_sz);
get_null_map_column().replicate(counts, target_size, res.get_null_map_column(), begin,
count_sz);
get_nested_column().replicate(counts, target_size, res.get_nested_column());
get_null_map_column().replicate(counts, target_size, res.get_null_map_column());
}

template <bool negative>
Expand Down
3 changes: 1 addition & 2 deletions be/src/vec/columns/column_nullable.h
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,7 @@ class ColumnNullable final : public COWHelper<IColumn, ColumnNullable> {
size_t allocated_bytes() const override;
void protect() override;
ColumnPtr replicate(const Offsets& replicate_offsets) const override;
void replicate(const uint32_t* counts, size_t target_size, IColumn& column, size_t begin = 0,
int count_sz = -1) const override;
void replicate(const uint32_t* counts, size_t target_size, IColumn& column) const override;
void update_xxHash_with_value(size_t start, size_t end, uint64_t& hash,
const uint8_t* __restrict null_data) const override;
void update_crc_with_value(size_t start, size_t end, uint64_t& hash,
Expand Down
46 changes: 18 additions & 28 deletions be/src/vec/columns/column_string.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -446,39 +446,29 @@ ColumnPtr ColumnString::replicate(const Offsets& replicate_offsets) const {
return res;
}

void ColumnString::replicate(const uint32_t* counts, size_t target_size, IColumn& column,
size_t begin, int count_sz) const {
size_t col_size = count_sz < 0 ? size() : count_sz;
if (0 == col_size) {
return;
}

void ColumnString::replicate(const uint32_t* indexs, size_t target_size, IColumn& column) const {
auto& res = reinterpret_cast<ColumnString&>(column);

Chars& res_chars = res.chars;
Offsets& res_offsets = res.offsets;
res_chars.reserve(chars.size() / col_size * target_size);
res_offsets.reserve(target_size);

size_t base = begin > 0 ? offsets[begin - 1] : 0;
Offset prev_string_offset = 0 + base;
Offset current_new_offset = 0;

size_t end = begin + col_size;
for (size_t i = begin; i < end; ++i) {
size_t size_to_replicate = counts[i];
size_t string_size = offsets[i] - prev_string_offset;

for (size_t j = 0; j < size_to_replicate; ++j) {
current_new_offset += string_size;
res_offsets.push_back(current_new_offset);

res_chars.resize(res_chars.size() + string_size);
memcpy_small_allow_read_write_overflow15(&res_chars[res_chars.size() - string_size],
&chars[prev_string_offset], string_size);
}

prev_string_offset = offsets[i];
size_t byte_size = 0;
res_offsets.resize(target_size);
for (size_t i = 0; i < target_size; ++i) {
long row_idx = indexs[i];
auto str_size = offsets[row_idx] - offsets[row_idx - 1];
res_offsets[i] = res_offsets[i - 1] + str_size;
byte_size += str_size;
}

res_chars.resize(byte_size);
auto* __restrict dest = res.chars.data();
auto* __restrict src = chars.data();
for (size_t i = 0; i < target_size; ++i) {
long row_idx = indexs[i];
auto str_size = offsets[row_idx] - offsets[row_idx - 1];
memcpy_small_allow_read_write_overflow15(dest + res_offsets[i - 1],
src + offsets[row_idx - 1], str_size);
}

check_chars_length(res_chars.size(), res_offsets.size());
Expand Down
3 changes: 1 addition & 2 deletions be/src/vec/columns/column_string.h
Original file line number Diff line number Diff line change
Expand Up @@ -506,8 +506,7 @@ class ColumnString final : public COWHelper<IColumn, ColumnString> {

ColumnPtr replicate(const Offsets& replicate_offsets) const override;

void replicate(const uint32_t* counts, size_t target_size, IColumn& column, size_t begin = 0,
int count_sz = -1) const override;
void replicate(const uint32_t* indexs, size_t target_size, IColumn& column) const override;

MutableColumns scatter(ColumnIndex num_columns, const Selector& selector) const override {
return scatter_impl<ColumnString>(num_columns, selector);
Expand Down
10 changes: 2 additions & 8 deletions be/src/vec/columns/column_struct.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -294,18 +294,12 @@ ColumnPtr ColumnStruct::replicate(const Offsets& offsets) const {
return ColumnStruct::create(new_columns);
}

void ColumnStruct::replicate(const uint32_t* counts, size_t target_size, IColumn& column,
size_t begin, int count_sz) const {
size_t col_size = count_sz < 0 ? size() : count_sz;
if (0 == col_size) {
return;
}

void ColumnStruct::replicate(const uint32_t* indexs, size_t target_size, IColumn& column) const {
auto& res = reinterpret_cast<ColumnStruct&>(column);
res.columns.resize(columns.size());

for (size_t i = 0; i != columns.size(); ++i) {
columns[i]->replicate(counts, target_size, *res.columns[i], begin, count_sz);
columns[i]->replicate(indexs, target_size, *res.columns[i]);
}
}

Expand Down
3 changes: 1 addition & 2 deletions be/src/vec/columns/column_struct.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,7 @@ class ColumnStruct final : public COWHelper<IColumn, ColumnStruct> {
Status filter_by_selector(const uint16_t* sel, size_t sel_size, IColumn* col_ptr) override;
ColumnPtr permute(const Permutation& perm, size_t limit) const override;
ColumnPtr replicate(const Offsets& offsets) const override;
void replicate(const uint32_t* counts, size_t target_size, IColumn& column, size_t begin = 0,
int count_sz = -1) const override;
void replicate(const uint32_t* counts, size_t target_size, IColumn& column) const override;
MutableColumns scatter(ColumnIndex num_columns, const Selector& selector) const override;

// ColumnPtr index(const IColumn & indexes, size_t limit) const override;
Expand Down
20 changes: 10 additions & 10 deletions be/src/vec/columns/column_vector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -545,18 +545,18 @@ ColumnPtr ColumnVector<T>::replicate(const IColumn::Offsets& offsets) const {
}

template <typename T>
void ColumnVector<T>::replicate(const uint32_t* counts, size_t target_size, IColumn& column,
size_t begin, int count_sz) const {
size_t size = count_sz < 0 ? data.size() : count_sz;
if (size == 0) return;

void ColumnVector<T>::replicate(const uint32_t* __restrict indexs, size_t target_size,
IColumn& column) const {
auto& res = reinterpret_cast<ColumnVector<T>&>(column);
typename Self::Container& res_data = res.get_data();
res_data.reserve(target_size);

size_t end = begin + size;
for (size_t i = begin; i < end; ++i) {
res_data.add_num_element_without_reserve(data[i], counts[i]);
DCHECK(res_data.empty());
res_data.resize(target_size);
auto* __restrict left = res_data.data();
auto* __restrict right = data.data();
auto* __restrict idxs = indexs;

for (size_t i = 0; i < target_size; ++i) {
left[i] = right[idxs[i]];
}
}

Expand Down
3 changes: 1 addition & 2 deletions be/src/vec/columns/column_vector.h
Original file line number Diff line number Diff line change
Expand Up @@ -425,8 +425,7 @@ class ColumnVector final : public COWHelper<ColumnVectorHelper, ColumnVector<T>>

ColumnPtr replicate(const IColumn::Offsets& offsets) const override;

void replicate(const uint32_t* counts, size_t target_size, IColumn& column, size_t begin = 0,
int count_sz = -1) const override;
void replicate(const uint32_t* indexs, size_t target_size, IColumn& column) const override;

void get_extremes(Field& min, Field& max) const override;

Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/exec/join/process_hash_table_probe.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ struct ProcessHashTableProbe {
std::unique_ptr<Arena> _arena;
std::vector<StringRef> _probe_keys;

std::vector<uint32_t> _items_counts;
std::vector<uint32_t> _probe_indexs;
std::vector<int8_t> _build_block_offsets;
std::vector<int> _build_block_rows;
std::vector<std::pair<int8_t, int>> _build_blocks_locs;
Expand Down
Loading

0 comments on commit b352983

Please sign in to comment.