diff --git a/dbms/src/Interpreters/Join.cpp b/dbms/src/Interpreters/Join.cpp index 7cf4c48eb13..23b2375ba69 100644 --- a/dbms/src/Interpreters/Join.cpp +++ b/dbms/src/Interpreters/Join.cpp @@ -1705,26 +1705,8 @@ Block Join::joinBlockNullAwareSemiImpl(const ProbeProcessInfo & probe_process_in Block Join::joinBlockSemi(ProbeProcessInfo & probe_process_info) const { - JoinBuildInfo join_build_info{ - enable_fine_grained_shuffle, - fine_grained_shuffle_count, - isEnableSpill(), - hash_join_spill_context->isSpilled(), - build_concurrency, - restore_config.restore_round}; - - probe_process_info.prepareForHashProbe( - key_names_left, - non_equal_conditions.left_filter_column, - kind, - strictness, - join_build_info.needVirtualDispatchForProbeBlock(), - collators, - restore_config.restore_round); - Block block{}; -#define CALL(KIND, STRICTNESS, MAP) \ - block = joinBlockSemiImpl(join_build_info, probe_process_info); +#define CALL(KIND, STRICTNESS, MAP) block = joinBlockSemiImpl(probe_process_info); using enum ASTTableJoin::Strictness; using enum ASTTableJoin::Kind; @@ -1750,166 +1732,93 @@ Block Join::joinBlockSemi(ProbeProcessInfo & probe_process_info) const FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_join_prob_failpoint); + return removeUselessColumn(block); +} + +namespace +{ +template +Block genSemiJoinResult( + ProbeProcessInfo & probe_process_info, + SemiJoinHelper * helper, + const NameSet & output_column_names_set) +{ + auto ret = helper->genJoinResult(output_column_names_set); /// (left outer) (anti) semi join never expand the left block, just handle the whole block at one time is enough probe_process_info.all_rows_joined_finish = true; - - return removeUselessColumn(block); + probe_process_info.semi_join_family_helper.reset(); + return ret; } +} // namespace template -Block Join::joinBlockSemiImpl(const JoinBuildInfo & join_build_info, const ProbeProcessInfo & probe_process_info) const +Block Join::joinBlockSemiImpl(ProbeProcessInfo & probe_process_info) const { size_t rows = probe_process_info.block.rows(); - - auto [res, res_list] = JoinPartition::probeBlockSemi( - partitions, - rows, - key_sizes, - collators, - join_build_info, - probe_process_info); - - RUNTIME_ASSERT(res.size() == rows, "SemiJoinResult size {} must be equal to block size {}", res.size(), rows); - if (is_cancelled()) - return {}; - - const NameSet & probe_output_name_set = has_other_condition - ? output_columns_names_set_for_other_condition_after_finalize - : output_column_names_set_after_finalize; - Block block{}; - for (size_t i = 0; i < probe_process_info.block.columns(); ++i) - { - const auto & column = probe_process_info.block.getByPosition(i); - if (probe_output_name_set.contains(column.name)) - block.insert(column); - } - - size_t left_columns = block.columns(); - /// Add new columns to the block. - std::vector right_column_indices_to_add; - - for (size_t i = 0; i < right_sample_block.columns(); ++i) + JoinBuildInfo join_build_info{ + enable_fine_grained_shuffle, + fine_grained_shuffle_count, + isEnableSpill(), + hash_join_spill_context->isSpilled(), + build_concurrency, + restore_config.restore_round}; + if (probe_process_info.semi_join_family_helper != nullptr) { - const auto & column = right_sample_block.getByPosition(i); - if (probe_output_name_set.contains(column.name)) - { - RUNTIME_CHECK_MSG( - !block.has(column.name), - "block from probe side has a column with the same name: {} as a column in right_sample_block", - column.name); - block.insert(column); - right_column_indices_to_add.push_back(i); - } + // current block still not done + RUNTIME_CHECK_MSG( + !probe_process_info.all_rows_joined_finish, + "semi_family_helper should be reset to nullptr after all rows are joined for current block"); } - - if constexpr (STRICTNESS == ASTTableJoin::Strictness::All) + else { - if (!res_list.empty()) - { - SemiJoinHelper helper( - block, - left_columns, - right_column_indices_to_add, - max_block_size, - non_equal_conditions, - is_cancelled); - - helper.joinResult(res_list); + // probe a new block + probe_process_info.prepareForHashProbe( + key_names_left, + non_equal_conditions.left_filter_column, + kind, + strictness, + join_build_info.needVirtualDispatchForProbeBlock(), + collators, + restore_config.restore_round); - RUNTIME_CHECK_MSG(res_list.empty(), "SemiJoinResult list must be empty after calculating join result"); - } + probe_process_info.semi_join_family_helper = decltype(probe_process_info.semi_join_family_helper)( + new SemiJoinHelper(rows, max_block_size, non_equal_conditions, is_cancelled), + [](void * ptr) { delete reinterpret_cast *>(ptr); }); } - if (is_cancelled()) - return {}; + auto * helper + = reinterpret_cast *>(probe_process_info.semi_join_family_helper.get()); - /// Now all results are known. - - std::unique_ptr filter; - if constexpr (KIND == ASTTableJoin::Kind::Semi || KIND == ASTTableJoin::Kind::Anti) - filter = std::make_unique(rows); - - MutableColumnPtr left_semi_column_ptr = nullptr; - ColumnInt8::Container * left_semi_column_data = nullptr; - ColumnUInt8::Container * left_semi_null_map = nullptr; - - if constexpr (KIND == ASTTableJoin::Kind::LeftOuterSemi || KIND == ASTTableJoin::Kind::LeftOuterAnti) + if (!helper->isProbeHashTableDone()) { - left_semi_column_ptr = block.getByPosition(block.columns() - 1).column->cloneEmpty(); - auto * left_semi_column = typeid_cast(left_semi_column_ptr.get()); - left_semi_column_data = &typeid_cast &>(left_semi_column->getNestedColumn()).getData(); - left_semi_column_data->reserve(rows); - left_semi_null_map = &left_semi_column->getNullMapColumn().getData(); - if constexpr (STRICTNESS == ASTTableJoin::Strictness::Any) - { - left_semi_null_map->resize_fill(rows, 0); - } - else - { - left_semi_null_map->reserve(rows); - } - } + const NameSet & probe_output_name_set = has_other_condition + ? output_columns_names_set_for_other_condition_after_finalize + : output_column_names_set_after_finalize; - size_t rows_for_semi_anti = 0; - for (size_t i = 0; i < rows; ++i) - { - auto result = res[i].getResult(); - if constexpr (KIND == ASTTableJoin::Kind::Semi || KIND == ASTTableJoin::Kind::Anti) - { - if (isTrueSemiJoinResult(result)) - { - // If the result is true, this row should be kept. - (*filter)[i] = 1; - ++rows_for_semi_anti; - } - else - { - // If the result is null or false, this row should be filtered. - (*filter)[i] = 0; - } - } - else - { - if constexpr (STRICTNESS == ASTTableJoin::Strictness::Any) - { - left_semi_column_data->push_back(result); - } - else - { - switch (result) - { - case SemiJoinResultType::FALSE_VALUE: - left_semi_column_data->push_back(0); - left_semi_null_map->push_back(0); - break; - case SemiJoinResultType::TRUE_VALUE: - left_semi_column_data->push_back(1); - left_semi_null_map->push_back(0); - break; - case SemiJoinResultType::NULL_VALUE: - left_semi_column_data->push_back(0); - left_semi_null_map->push_back(1); - break; - } - } - } + helper->probeHashTable( + partitions, + key_sizes, + collators, + join_build_info, + probe_process_info, + probe_output_name_set, + right_sample_block); } - if constexpr (KIND == ASTTableJoin::Kind::LeftOuterSemi || KIND == ASTTableJoin::Kind::LeftOuterAnti) + while (!helper->isJoinDone()) { - block.getByPosition(block.columns() - 1).column = std::move(left_semi_column_ptr); + if (is_cancelled()) + return {}; + helper->doJoin(); } - if constexpr (KIND == ASTTableJoin::Kind::Semi || KIND == ASTTableJoin::Kind::Anti) - { - for (size_t i = 0; i < left_columns; ++i) - { - auto & column = block.getByPosition(i); - if (output_column_names_set_after_finalize.contains(column.name)) - column.column = column.column->filter(*filter, rows_for_semi_anti); - } - } - return block; + if (is_cancelled()) + return {}; + + return genSemiJoinResult( + probe_process_info, + helper, + output_column_names_set_after_finalize); } void Join::checkTypesOfKeys(const Block & block_left, const Block & block_right) const diff --git a/dbms/src/Interpreters/Join.h b/dbms/src/Interpreters/Join.h index 929f8ce7d6a..ba556b0e574 100644 --- a/dbms/src/Interpreters/Join.h +++ b/dbms/src/Interpreters/Join.h @@ -504,7 +504,7 @@ class Join Block joinBlockNullAwareSemiImpl(const ProbeProcessInfo & probe_process_info) const; template - Block joinBlockSemiImpl(const JoinBuildInfo & join_build_info, const ProbeProcessInfo & probe_process_info) const; + Block joinBlockSemiImpl(ProbeProcessInfo & probe_process_info) const; IColumn::Selector hashToSelector(const WeakHash32 & hash) const; IColumn::Selector selectDispatchBlock(const Strings & key_columns_names, const Block & from_block); diff --git a/dbms/src/Interpreters/ProbeProcessInfo.h b/dbms/src/Interpreters/ProbeProcessInfo.h index a45d9a4b3aa..f23496257df 100644 --- a/dbms/src/Interpreters/ProbeProcessInfo.h +++ b/dbms/src/Interpreters/ProbeProcessInfo.h @@ -120,6 +120,9 @@ struct ProbeProcessInfo /// for null-aware join std::unique_ptr null_aware_join_data; + // for semi join family + std::unique_ptr> semi_join_family_helper = nullptr; /// type erasure + ProbeProcessInfo(UInt64 max_block_size_, UInt64 cache_columns_threshold_) : partition_index(0) , max_block_size(max_block_size_) diff --git a/dbms/src/Interpreters/SemiJoinHelper.cpp b/dbms/src/Interpreters/SemiJoinHelper.cpp index 6951cdd2450..dc968f68c5d 100644 --- a/dbms/src/Interpreters/SemiJoinHelper.cpp +++ b/dbms/src/Interpreters/SemiJoinHelper.cpp @@ -14,7 +14,9 @@ #include #include +#include #include +#include #include namespace DB @@ -141,186 +143,241 @@ bool SemiJoinResult::checkExprResult( return false; } -template -SemiJoinHelper::SemiJoinHelper( - Block & block_, - size_t left_columns_, - const std::vector & right_column_indices_to_added_, +template +SemiJoinHelper::SemiJoinHelper( + size_t input_rows_, size_t max_block_size_, const JoinNonEqualConditions & non_equal_conditions_, CancellationHook is_cancelled_) - : block(block_) - , left_columns(left_columns_) - , right_column_indices_to_add(right_column_indices_to_added_) + : input_rows(input_rows_) , max_block_size(max_block_size_) , is_cancelled(is_cancelled_) , non_equal_conditions(non_equal_conditions_) { static_assert(KIND == Semi || KIND == Anti || KIND == LeftOuterAnti || KIND == LeftOuterSemi); - - right_columns = right_column_indices_to_add.size(); - RUNTIME_CHECK(block.columns() == left_columns + right_columns); - - if constexpr (KIND == LeftOuterAnti || KIND == LeftOuterSemi) - { - /// The last column is `match_helper`. - right_columns -= 1; - } - - RUNTIME_CHECK(right_columns > 0); - RUNTIME_CHECK(non_equal_conditions.other_cond_expr != nullptr); } -template -void SemiJoinHelper::joinResult(std::list & res_list) +template +void SemiJoinHelper::doJoin() { - std::vector offsets; - size_t block_columns = block.columns(); - Block exec_block = block.cloneEmpty(); - - while (!res_list.empty()) + if constexpr (STRICTNESS == ASTTableJoin::Strictness::Any) { - if (is_cancelled()) - return; - MutableColumns columns(block_columns); - for (size_t i = 0; i < block_columns; ++i) - { - /// Reuse the column to avoid memory allocation. - /// Correctness depends on the fact that equal and other condition expressions do not - /// removed any column, namely, the columns will not out of order. - /// TODO: Maybe we can reuse the memory of new columns added by expressions? - columns[i] = exec_block.safeGetByPosition(i).column->assumeMutable(); - columns[i]->popBack(columns[i]->size()); - } - - size_t current_offset = 0; - offsets.clear(); + return; + } + else + { + assert(!probe_res_list.empty()); + std::vector offsets; + size_t block_columns = result_block.columns(); + Block exec_block = result_block.cloneEmpty(); - for (auto & res : res_list) + while (!probe_res_list.empty()) { - size_t prev_offset = current_offset; - res->template fillRightColumns( - columns, - left_columns, - right_columns, - right_column_indices_to_add, - current_offset, - max_block_size - current_offset); - - /// Note that current_offset - prev_offset may be zero. - if (current_offset > prev_offset) + MutableColumns columns(block_columns); + for (size_t i = 0; i < block_columns; ++i) { - for (size_t i = 0; i < left_columns; ++i) - columns[i]->insertManyFrom( - *block.getByPosition(i).column.get(), - res->getRowNum(), - current_offset - prev_offset); + /// Reuse the column to avoid memory allocation. + /// Correctness depends on the fact that equal and other condition expressions do not + /// removed any column, namely, the columns will not out of order. + /// TODO: Maybe we can reuse the memory of new columns added by expressions? + columns[i] = exec_block.safeGetByPosition(i).column->assumeMutable(); + columns[i]->popBack(columns[i]->size()); } - offsets.emplace_back(current_offset); - if (current_offset >= max_block_size) - break; - } + size_t current_offset = 0; + offsets.clear(); - /// Move the columns to exec_block. - /// Note that this can remove the new columns that are added in the last loop - /// from equal and other condition expressions. - exec_block = block.cloneWithColumns(std::move(columns)); + for (auto & res : probe_res_list) + { + size_t prev_offset = current_offset; + res->template fillRightColumns( + columns, + left_columns, + right_columns, + right_column_indices_to_add, + current_offset, + max_block_size - current_offset); + + /// Note that current_offset - prev_offset may be zero. + if (current_offset > prev_offset) + { + for (size_t i = 0; i < left_columns; ++i) + columns[i]->insertManyFrom( + *result_block.getByPosition(i).column.get(), + res->getRowNum(), + current_offset - prev_offset); + } - non_equal_conditions.other_cond_expr->execute(exec_block); + offsets.emplace_back(current_offset); + if (current_offset >= max_block_size) + break; + } - const ColumnUInt8::Container *other_eq_from_in_column_data = nullptr, *other_column_data = nullptr; - ConstNullMapPtr other_eq_from_in_null_map = nullptr, other_null_map = nullptr; - ColumnPtr other_eq_from_in_column, other_column; + /// Move the columns to exec_block. + /// Note that this can remove the new columns that are added in the last loop + /// from equal and other condition expressions. + exec_block = result_block.cloneWithColumns(std::move(columns)); - bool has_other_eq_cond_from_in = !non_equal_conditions.other_eq_cond_from_in_name.empty(); - if (has_other_eq_cond_from_in) - { - other_eq_from_in_column = exec_block.getByName(non_equal_conditions.other_eq_cond_from_in_name).column; - auto is_nullable_col = [&]() { - if (other_eq_from_in_column->isColumnNullable()) - return true; - if (other_eq_from_in_column->isColumnConst()) - { - const auto & const_col = typeid_cast(*other_eq_from_in_column); - return const_col.getDataColumn().isColumnNullable(); - } - return false; - }; - // nullable, const(nullable) - RUNTIME_CHECK_MSG( - is_nullable_col(), - "The equal condition from in column should be nullable, otherwise it should be used as join key"); + non_equal_conditions.other_cond_expr->execute(exec_block); - std::tie(other_eq_from_in_column_data, other_eq_from_in_null_map) - = getDataAndNullMapVectorFromFilterColumn(other_eq_from_in_column); - } + const ColumnUInt8::Container *other_eq_from_in_column_data = nullptr, *other_column_data = nullptr; + ConstNullMapPtr other_eq_from_in_null_map = nullptr, other_null_map = nullptr; + ColumnPtr other_eq_from_in_column, other_column; - bool has_other_cond = !non_equal_conditions.other_cond_name.empty(); - bool has_other_cond_null_map = false; - if (has_other_cond) - { - other_column = exec_block.getByName(non_equal_conditions.other_cond_name).column; - std::tie(other_column_data, other_null_map) = getDataAndNullMapVectorFromFilterColumn(other_column); - has_other_cond_null_map = other_null_map != nullptr; - } + bool has_other_eq_cond_from_in = !non_equal_conditions.other_eq_cond_from_in_name.empty(); + if (has_other_eq_cond_from_in) + { + other_eq_from_in_column = exec_block.getByName(non_equal_conditions.other_eq_cond_from_in_name).column; + auto is_nullable_col = [&]() { + if (other_eq_from_in_column->isColumnNullable()) + return true; + if (other_eq_from_in_column->isColumnConst()) + { + const auto & const_col = typeid_cast(*other_eq_from_in_column); + return const_col.getDataColumn().isColumnNullable(); + } + return false; + }; + // nullable, const(nullable) + RUNTIME_CHECK_MSG( + is_nullable_col(), + "The equal condition from in column should be nullable, otherwise it should be used as join key"); + + std::tie(other_eq_from_in_column_data, other_eq_from_in_null_map) + = getDataAndNullMapVectorFromFilterColumn(other_eq_from_in_column); + } + + bool has_other_cond = !non_equal_conditions.other_cond_name.empty(); + bool has_other_cond_null_map = false; + if (has_other_cond) + { + other_column = exec_block.getByName(non_equal_conditions.other_cond_name).column; + std::tie(other_column_data, other_null_map) = getDataAndNullMapVectorFromFilterColumn(other_column); + has_other_cond_null_map = other_null_map != nullptr; + } #define CALL(has_other_eq_cond_from_in, has_other_cond, has_other_cond_null_map) \ checkAllExprResult( \ offsets, \ - res_list, \ other_eq_from_in_column_data, \ other_eq_from_in_null_map, \ other_column_data, \ other_null_map); - if (has_other_eq_cond_from_in) - { - if (has_other_cond) + if (has_other_eq_cond_from_in) { - if (has_other_cond_null_map) + if (has_other_cond) { - CALL(true, true, true); + if (has_other_cond_null_map) + { + CALL(true, true, true); + } + else + { + CALL(true, true, false); + } } else { - CALL(true, true, false); + CALL(true, false, false); } } else { - CALL(true, false, false); + RUNTIME_CHECK(has_other_cond); + if (has_other_cond_null_map) + { + CALL(false, true, true); + } + else + { + CALL(false, true, false); + } } +#undef CALL } - else + } +} + +template +void SemiJoinHelper::probeHashTable( + const JoinPartitions & join_partitions, + const Sizes & key_sizes, + const TiDB::TiDBCollators & collators, + const JoinBuildInfo & join_build_info, + const ProbeProcessInfo & probe_process_info, + const NameSet & probe_output_name_set, + const Block & right_sample_block) +{ + if (is_probe_hash_table_done) + return; + std::tie(probe_res, probe_res_list) = JoinPartition::probeBlockSemi( + join_partitions, + input_rows, + key_sizes, + collators, + join_build_info, + probe_process_info); + + RUNTIME_ASSERT( + probe_res.size() == input_rows, + "SemiJoinResult size {} must be equal to block size {}", + probe_res.size(), + input_rows); + if (is_cancelled()) + return; + for (size_t i = 0; i < probe_process_info.block.columns(); ++i) + { + const auto & column = probe_process_info.block.getByPosition(i); + if (probe_output_name_set.contains(column.name)) + result_block.insert(column); + } + + left_columns = result_block.columns(); + for (size_t i = 0; i < right_sample_block.columns(); ++i) + { + const auto & column = right_sample_block.getByPosition(i); + if (probe_output_name_set.contains(column.name)) { - RUNTIME_CHECK(has_other_cond); - if (has_other_cond_null_map) - { - CALL(false, true, true); - } - else - { - CALL(false, true, false); - } + RUNTIME_CHECK_MSG( + !result_block.has(column.name), + "block from probe side has a column with the same name: {} as a column in right_sample_block", + column.name); + result_block.insert(column); + right_column_indices_to_add.push_back(i); } -#undef CALL } + right_columns = right_column_indices_to_add.size(); + RUNTIME_CHECK(result_block.columns() == left_columns + right_columns); + + if constexpr (KIND == LeftOuterAnti || KIND == LeftOuterSemi) + { + /// The last column is `match_helper`. + right_columns -= 1; + } + if constexpr (STRICTNESS == ASTTableJoin::Strictness::Any) + { + RUNTIME_CHECK_MSG(probe_res_list.empty(), "SemiJoinResult list must be empty for any semi join"); + } + else + { + RUNTIME_CHECK(right_columns > 0); + } + is_probe_hash_table_done = true; } -template +template template -void SemiJoinHelper::checkAllExprResult( +void SemiJoinHelper::checkAllExprResult( const std::vector & offsets, - std::list & res_list, const ColumnUInt8::Container * other_eq_column, ConstNullMapPtr other_eq_null_map, const ColumnUInt8::Container * other_column, ConstNullMapPtr other_null_map) { size_t prev_offset = 0; - auto it = res_list.begin(); - for (size_t i = 0, size = offsets.size(); i < size && it != res_list.end(); ++i) + auto it = probe_res_list.begin(); + for (size_t i = 0, size = offsets.size(); i < size && it != probe_res_list.end(); ++i) { if ((*it)->template checkExprResult( other_eq_column, @@ -330,7 +387,7 @@ void SemiJoinHelper::checkAllExprResult( prev_offset, offsets[i])) { - it = res_list.erase(it); + it = probe_res_list.erase(it); } else { @@ -341,13 +398,102 @@ void SemiJoinHelper::checkAllExprResult( } } +template +Block SemiJoinHelper::genJoinResult(const NameSet & output_column_names_set) +{ + RUNTIME_CHECK_MSG(probe_res_list.empty(), "SemiJoinResult list must be empty when generating join result"); + std::unique_ptr filter; + if constexpr (KIND == ASTTableJoin::Kind::Semi || KIND == ASTTableJoin::Kind::Anti) + filter = std::make_unique(input_rows); + + MutableColumnPtr left_semi_column_ptr = nullptr; + ColumnInt8::Container * left_semi_column_data = nullptr; + ColumnUInt8::Container * left_semi_null_map = nullptr; + + if constexpr (KIND == ASTTableJoin::Kind::LeftOuterSemi || KIND == ASTTableJoin::Kind::LeftOuterAnti) + { + left_semi_column_ptr = result_block.getByPosition(result_block.columns() - 1).column->cloneEmpty(); + auto * left_semi_column = typeid_cast(left_semi_column_ptr.get()); + left_semi_column_data = &typeid_cast &>(left_semi_column->getNestedColumn()).getData(); + left_semi_column_data->reserve(input_rows); + left_semi_null_map = &left_semi_column->getNullMapColumn().getData(); + if constexpr (STRICTNESS == ASTTableJoin::Strictness::Any) + { + left_semi_null_map->resize_fill(input_rows, 0); + } + else + { + left_semi_null_map->reserve(input_rows); + } + } + + size_t rows_for_semi_anti = 0; + for (size_t i = 0; i < input_rows; ++i) + { + auto result = probe_res[i].getResult(); + if constexpr (KIND == ASTTableJoin::Kind::Semi || KIND == ASTTableJoin::Kind::Anti) + { + if (isTrueSemiJoinResult(result)) + { + // If the result is true, this row should be kept. + (*filter)[i] = 1; + ++rows_for_semi_anti; + } + else + { + // If the result is null or false, this row should be filtered. + (*filter)[i] = 0; + } + } + else + { + if constexpr (STRICTNESS == ASTTableJoin::Strictness::Any) + { + left_semi_column_data->push_back(result); + } + else + { + switch (result) + { + case SemiJoinResultType::FALSE_VALUE: + left_semi_column_data->push_back(0); + left_semi_null_map->push_back(0); + break; + case SemiJoinResultType::TRUE_VALUE: + left_semi_column_data->push_back(1); + left_semi_null_map->push_back(0); + break; + case SemiJoinResultType::NULL_VALUE: + left_semi_column_data->push_back(0); + left_semi_null_map->push_back(1); + break; + } + } + } + } + + if constexpr (KIND == ASTTableJoin::Kind::LeftOuterSemi || KIND == ASTTableJoin::Kind::LeftOuterAnti) + { + result_block.getByPosition(result_block.columns() - 1).column = std::move(left_semi_column_ptr); + } + + if constexpr (KIND == ASTTableJoin::Kind::Semi || KIND == ASTTableJoin::Kind::Anti) + { + for (size_t i = 0; i < left_columns; ++i) + { + auto & column = result_block.getByPosition(i); + if (output_column_names_set.contains(column.name)) + column.column = column.column->filter(*filter, rows_for_semi_anti); + } + } + return std::move(result_block); +} + #define M(KIND, STRICTNESS, MAPTYPE) template class SemiJoinResult; APPLY_FOR_SEMI_JOIN(M) #undef M -template class SemiJoinHelper; -template class SemiJoinHelper; -template class SemiJoinHelper; -template class SemiJoinHelper; - +#define M(KIND, STRICTNESS, MAPTYPE) template class SemiJoinHelper; +APPLY_FOR_SEMI_JOIN(M) +#undef M } // namespace DB diff --git a/dbms/src/Interpreters/SemiJoinHelper.h b/dbms/src/Interpreters/SemiJoinHelper.h index 90dc39c81ce..303956b18b2 100644 --- a/dbms/src/Interpreters/SemiJoinHelper.h +++ b/dbms/src/Interpreters/SemiJoinHelper.h @@ -127,38 +127,54 @@ class SemiJoinResult const void * map_it; }; -template +struct ProbeProcessInfo; +class JoinPartition; +using JoinPartitions = std::vector>; + +template class SemiJoinHelper { public: - using Result = SemiJoinResult; - SemiJoinHelper( - Block & block, - size_t left_columns, - const std::vector & right_column_indices_to_add, + size_t input_rows, size_t max_block_size, const JoinNonEqualConditions & non_equal_conditions, CancellationHook is_cancelled_); - void joinResult(std::list & res_list); + void probeHashTable( + const JoinPartitions & join_partitions, + const Sizes & key_sizes, + const TiDB::TiDBCollators & collators, + const JoinBuildInfo & join_build_info, + const ProbeProcessInfo & probe_process_info, + const NameSet & probe_output_name_set, + const Block & right_sample_block); + void doJoin(); + + bool isJoinDone() const { return is_probe_hash_table_done && probe_res_list.empty(); } + bool isProbeHashTableDone() const { return is_probe_hash_table_done; } + + Block genJoinResult(const NameSet & output_column_names_set); private: template void checkAllExprResult( const std::vector & offsets, - std::list & res_list, const ColumnUInt8::Container * other_eq_column, ConstNullMapPtr other_eq_null_map, const ColumnUInt8::Container * other_column, ConstNullMapPtr other_null_map); - Block & block; - size_t left_columns; - size_t right_columns; + PaddedPODArray> probe_res; + std::list *> probe_res_list; + Block result_block; + size_t left_columns = 0; + size_t right_columns = 0; + size_t input_rows; std::vector right_column_indices_to_add; size_t max_block_size; CancellationHook is_cancelled; + bool is_probe_hash_table_done = false; const JoinNonEqualConditions & non_equal_conditions; };