Skip to content

Commit

Permalink
[Opt](exec) opt the outer join performance in TPCDS Q95 (apache#21806)
Browse files Browse the repository at this point in the history
  • Loading branch information
HappenLee authored Jul 14, 2023
1 parent 83ce437 commit 7f50c07
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 20 deletions.
1 change: 1 addition & 0 deletions be/src/vec/exec/join/process_hash_table_probe.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ struct ProcessHashTableProbe {
std::vector<uint32_t> _items_counts;
std::vector<int8_t> _build_block_offsets;
std::vector<int> _build_block_rows;
std::vector<std::pair<int8_t, int>> _build_blocks_locs;
// only need set the tuple is null in RIGHT_OUTER_JOIN and FULL_OUTER_JOIN
ColumnUInt8::Container* _tuple_is_null_left_flags;
// only need set the tuple is null in LEFT_OUTER_JOIN and FULL_OUTER_JOIN
Expand Down
65 changes: 45 additions & 20 deletions be/src/vec/exec/join/process_hash_table_probe_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1085,31 +1085,25 @@ Status ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable(HashTableTyp
auto block_size = 0;
auto& visited_iter =
std::get<ForwardIterator<Mapped>>(_join_node->_outer_join_pull_visited_iter);

auto insert_from_hash_table = [&](uint8_t offset, uint32_t row_num) {
block_size++;
for (size_t j = 0; j < right_col_len; ++j) {
auto& column = *_build_blocks[offset].get_by_position(j).column;
mcol[j + right_col_idx]->insert_from(column, row_num);
}
_build_blocks_locs.resize(_batch_size);
auto register_build_loc = [&](int8_t offset, int32_t row_nums) {
_build_blocks_locs[block_size++] = std::pair<int8_t, int>(offset, row_nums);
};

if (visited_iter.ok()) {
if constexpr (std::is_same_v<Mapped, RowRefListWithFlag>) {
for (; visited_iter.ok() && block_size < _batch_size; ++visited_iter) {
insert_from_hash_table(visited_iter->block_offset, visited_iter->row_num);
register_build_loc(visited_iter->block_offset, visited_iter->row_num);
}
} else {
for (; visited_iter.ok() && block_size < _batch_size; ++visited_iter) {
if constexpr (JoinOpType == TJoinOp::RIGHT_SEMI_JOIN) {
if (visited_iter->visited) {
insert_from_hash_table(visited_iter->block_offset,
visited_iter->row_num);
register_build_loc(visited_iter->block_offset, visited_iter->row_num);
}
} else {
if (!visited_iter->visited) {
insert_from_hash_table(visited_iter->block_offset,
visited_iter->row_num);
register_build_loc(visited_iter->block_offset, visited_iter->row_num);
}
}
}
Expand All @@ -1126,8 +1120,7 @@ Status ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable(HashTableTyp
if constexpr (JoinOpType == TJoinOp::RIGHT_SEMI_JOIN) {
visited_iter = mapped.begin();
for (; visited_iter.ok() && block_size < _batch_size; ++visited_iter) {
insert_from_hash_table(visited_iter->block_offset,
visited_iter->row_num);
register_build_loc(visited_iter->block_offset, visited_iter->row_num);
}
if (visited_iter.ok()) {
// block_size >= _batch_size, quit for loop
Expand All @@ -1138,8 +1131,7 @@ Status ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable(HashTableTyp
if constexpr (JoinOpType != TJoinOp::RIGHT_SEMI_JOIN) {
visited_iter = mapped.begin();
for (; visited_iter.ok() && block_size < _batch_size; ++visited_iter) {
insert_from_hash_table(visited_iter->block_offset,
visited_iter->row_num);
register_build_loc(visited_iter->block_offset, visited_iter->row_num);
}
if (visited_iter.ok()) {
// block_size >= _batch_size, quit for loop
Expand All @@ -1152,13 +1144,11 @@ Status ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable(HashTableTyp
for (; visited_iter.ok() && block_size < _batch_size; ++visited_iter) {
if constexpr (JoinOpType == TJoinOp::RIGHT_SEMI_JOIN) {
if (visited_iter->visited) {
insert_from_hash_table(visited_iter->block_offset,
visited_iter->row_num);
register_build_loc(visited_iter->block_offset, visited_iter->row_num);
}
} else {
if (!visited_iter->visited) {
insert_from_hash_table(visited_iter->block_offset,
visited_iter->row_num);
register_build_loc(visited_iter->block_offset, visited_iter->row_num);
}
}
}
Expand All @@ -1168,6 +1158,41 @@ Status ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable(HashTableTyp
}
}
}
_build_blocks_locs.resize(block_size);

auto insert_build_rows = [&](int8_t offset) {
for (size_t j = 0; j < right_col_len; ++j) {
auto& column = *_build_blocks[offset].get_by_position(j).column;
mcol[j + right_col_idx]->insert_indices_from(
column, _build_block_rows.data(),
_build_block_rows.data() + _build_block_rows.size());
}
};
if (_build_blocks.size() > 1) {
std::sort(_build_blocks_locs.begin(), _build_blocks_locs.end(),
[](const auto a, const auto b) { return a.first > b.first; });
auto start = 0, end = 0;
while (start < _build_blocks_locs.size()) {
while (end < _build_blocks_locs.size() &&
_build_blocks_locs[start].first == _build_blocks_locs[end].first) {
end++;
}
auto offset = _build_blocks_locs[start].first;
_build_block_rows.resize(end - start);
for (int i = 0; start + i < end; i++) {
_build_block_rows[i] = _build_blocks_locs[start + i].second;
}
start = end;
insert_build_rows(offset);
}
} else if (_build_blocks.size() == 1) {
const auto size = _build_blocks_locs.size();
_build_block_rows.resize(_build_blocks_locs.size());
for (int i = 0; i < size; i++) {
_build_block_rows[i] = _build_blocks_locs[i].second;
}
insert_build_rows(0);
}

// just resize the left table column in case with other conjunct to make block size is not zero
if (_join_node->_is_right_semi_anti && _join_node->_have_other_join_conjunct) {
Expand Down

0 comments on commit 7f50c07

Please sign in to comment.