Skip to content

Commit

Permalink
test code
Browse files Browse the repository at this point in the history
  • Loading branch information
HappenLee committed Jul 15, 2023
1 parent 21e8ab7 commit e739b11
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 17 deletions.
14 changes: 7 additions & 7 deletions be/src/vec/columns/column_vector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -547,16 +547,16 @@ ColumnPtr ColumnVector<T>::replicate(const IColumn::Offsets& offsets) const {
template <typename T>
void ColumnVector<T>::replicate(const uint32_t* counts, size_t target_size, IColumn& column,
size_t begin, int count_sz) const {
size_t size = count_sz < 0 ? data.size() : count_sz;
if (size == 0) return;

auto& res = reinterpret_cast<ColumnVector<T>&>(column);
typename Self::Container& res_data = res.get_data();
res_data.reserve(target_size);
res_data.resize(target_size);

size_t end = begin + size;
for (size_t i = begin; i < end; ++i) {
res_data.add_num_element_without_reserve(data[i], counts[i]);
auto* __restrict left = res_data.data();
auto* __restrict right = data.data();
auto* __restrict index = counts;

for (size_t i = 0; i < target_size; ++i) {
left[i] = right[index[i]];
}
}

Expand Down
40 changes: 40 additions & 0 deletions be/src/vec/common/hash_table/ph_hash_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -206,10 +206,50 @@ class PHHashMap : private boost::noncopyable {

size_t hash(const Key& x) const { return _hash_map.hash(x); }

template <bool READ = true>
void ALWAYS_INLINE prefetch_by_hash(size_t hash_value) { _hash_map.prefetch_hash(hash_value); }

template <bool READ = true>
void ALWAYS_INLINE prefetch_by_key(Key key) { _hash_map.prefetch(key); }

template <bool READ, typename KeyHolder>
void ALWAYS_INLINE prefetch(KeyHolder& key_holder) {
const auto& key = key_holder_get_key(key_holder);
prefetch_by_key<READ>(key);
}

size_t get_size() {
return _hash_map.size();
}

std::vector<size_t> sizes() const {
std::vector<size_t> sizes {_hash_map.size()};
return sizes;
}

int64_t get_resize_timer_value() const {
return 0;
}

int64_t get_convert_timer_value() const {
return 0;
}

int64_t get_collisions() const {
return 0;
}

void reset_resize_timer() {
}

void expanse_for_add_elem(size_t num_elem) {
_hash_map.reserve(num_elem);
}

std::vector<size_t> get_buffer_sizes_in_cells() const {
return sizes();
}

/// Call func(const Key &, Mapped &) for each hash map element.
template <typename Func>
void for_each_value(Func&& func) {
Expand Down
26 changes: 17 additions & 9 deletions be/src/vec/exec/join/process_hash_table_probe_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -215,11 +215,12 @@ Status ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c
bool is_mark_join) {
auto& probe_index = _join_node->_probe_index;
auto& probe_raw_ptrs = _join_node->_probe_columns;
if (probe_index == 0 && _items_counts.size() < probe_rows) {
_items_counts.resize(probe_rows);
}
// if (probe_index == 0 && _items_counts.size() < probe_rows) {
// _items_counts.resize(probe_rows);
// }

if (_build_block_rows.size() < probe_rows * PROBE_SIDE_EXPLODE_RATE) {
_items_counts.resize(probe_rows * PROBE_SIDE_EXPLODE_RATE);
_build_block_rows.resize(probe_rows * PROBE_SIDE_EXPLODE_RATE);
_build_block_offsets.resize(probe_rows * PROBE_SIDE_EXPLODE_RATE);
}
Expand Down Expand Up @@ -263,13 +264,14 @@ Status ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c
if (LIKELY(current_offset < _build_block_rows.size())) {
_build_block_offsets[current_offset] = probe_row_match_iter->block_offset;
_build_block_rows[current_offset] = probe_row_match_iter->row_num;
_items_counts[current_offset] = probe_index;
} else {
_build_block_offsets.emplace_back(probe_row_match_iter->block_offset);
_build_block_rows.emplace_back(probe_row_match_iter->row_num);
_items_counts.template emplace_back(probe_index);
}
++current_offset;
}
_items_counts[probe_index] = current_offset;
all_match_one &= (current_offset == 1);
if (!probe_row_match_iter.ok()) {
++probe_index;
Expand All @@ -283,19 +285,19 @@ Status ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c
if constexpr (ignore_null && need_null_map_for_probe) {
if ((*null_map)[probe_index]) {
if constexpr (probe_all) {
_items_counts[probe_index++] = (uint32_t)1;
// only full outer / left outer need insert the data of right table
if (LIKELY(current_offset < _build_block_rows.size())) {
_build_block_offsets[current_offset] = -1;
_build_block_rows[current_offset] = -1;
_items_counts[current_offset] = probe_index;
} else {
_build_block_offsets.emplace_back(-1);
_build_block_rows.emplace_back(-1);
_items_counts.template emplace_back(probe_index);
}
++current_offset;
} else {
_items_counts[probe_index++] = (uint32_t)0;
}
probe_index++;
all_match_one = false;
if constexpr (probe_all) {
if (current_offset >= _batch_size) {
Expand All @@ -320,7 +322,7 @@ Status ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c
probe_index + PREFETCH_STEP, *_arena);
}

auto current_probe_index = probe_index;
// auto current_probe_index = probe_index;
if constexpr (JoinOpType == TJoinOp::LEFT_ANTI_JOIN ||
JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
if (is_mark_join) {
Expand Down Expand Up @@ -361,9 +363,11 @@ Status ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c
if (LIKELY(current_offset < _build_block_rows.size())) {
_build_block_offsets[current_offset] = mapped.block_offset;
_build_block_rows[current_offset] = mapped.row_num;
_items_counts[current_offset] = probe_index;
} else {
_build_block_offsets.emplace_back(mapped.block_offset);
_build_block_rows.emplace_back(mapped.row_num);
_items_counts.template emplace_back(probe_index);
}
++current_offset;
}
Expand All @@ -375,9 +379,11 @@ Status ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c
if (LIKELY(current_offset < _build_block_rows.size())) {
_build_block_offsets[current_offset] = it->block_offset;
_build_block_rows[current_offset] = it->row_num;
_items_counts[current_offset] = probe_index;
} else {
_build_block_offsets.emplace_back(it->block_offset);
_build_block_rows.emplace_back(it->row_num);
_items_counts.template emplace_back(probe_index);
}
++current_offset;
}
Expand All @@ -403,9 +409,11 @@ Status ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c
if (LIKELY(current_offset < _build_block_rows.size())) {
_build_block_offsets[current_offset] = -1;
_build_block_rows[current_offset] = -1;
_items_counts[current_offset] = probe_index;
} else {
_build_block_offsets.emplace_back(-1);
_build_block_rows.emplace_back(-1);
_items_counts.template emplace_back(probe_index);
}
++current_offset;
}
Expand All @@ -414,7 +422,7 @@ Status ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c
}

uint32_t count = (uint32_t)(current_offset - last_offset);
_items_counts[current_probe_index] = count;
// _items_counts[current_probe_index] = count;
all_match_one &= (count == 1);
if (current_offset >= _batch_size) {
break;
Expand Down
3 changes: 2 additions & 1 deletion be/src/vec/exec/join/vhash_join_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ struct SerializedHashTableContext {
template <class T, typename RowRefListType>
struct PrimaryTypeHashTableContext {
using Mapped = RowRefListType;
using HashTable = PartitionedHashMap<T, Mapped, HashCRC32<T>>;
// using HashTable = PHPartitionedHashMap<T, Mapped, HashCRC32<T>>;
using HashTable = PHHashMap<T, Mapped, HashCRC32<T>>;
using State =
ColumnsHashing::HashMethodOneNumber<typename HashTable::value_type, Mapped, T, false>;
using Iter = typename HashTable::iterator;
Expand Down

0 comments on commit e739b11

Please sign in to comment.