From b473b98d58acdc7955dffd28f5f43d9615c5f06f Mon Sep 17 00:00:00 2001 From: Pxl Date: Mon, 19 Feb 2024 14:08:09 +0800 Subject: [PATCH 01/13] [Improvement](join) optimization for build_side_output_column (#30826) optimization for build_side_output_column --- .../rowset/segment_v2/segment_iterator.cpp | 20 +++--- be/src/vec/columns/column.h | 7 -- be/src/vec/columns/column_array.cpp | 31 -------- be/src/vec/columns/column_array.h | 2 +- be/src/vec/columns/column_complex.h | 14 ---- be/src/vec/columns/column_const.cpp | 6 -- be/src/vec/columns/column_const.h | 2 +- be/src/vec/columns/column_decimal.cpp | 12 ---- be/src/vec/columns/column_decimal.h | 2 - be/src/vec/columns/column_dictionary.h | 4 -- be/src/vec/columns/column_dummy.h | 4 -- .../vec/columns/column_fixed_length_object.h | 4 -- be/src/vec/columns/column_map.cpp | 20 ------ be/src/vec/columns/column_map.h | 1 - be/src/vec/columns/column_nullable.cpp | 15 ++-- be/src/vec/columns/column_nullable.h | 3 +- be/src/vec/columns/column_object.cpp | 12 ---- be/src/vec/columns/column_object.h | 2 - be/src/vec/columns/column_string.cpp | 28 -------- be/src/vec/columns/column_string.h | 2 - be/src/vec/columns/column_struct.cpp | 9 --- be/src/vec/columns/column_struct.h | 1 - be/src/vec/columns/column_vector.cpp | 18 +---- be/src/vec/columns/column_vector.h | 2 - be/src/vec/columns/predicate_column.h | 4 -- .../vec/exec/join/process_hash_table_probe.h | 3 + .../exec/join/process_hash_table_probe_impl.h | 70 ++++++++++++++----- .../array/function_array_with_constant.cpp | 4 +- be/test/vec/core/column_array_test.cpp | 48 ------------- 29 files changed, 82 insertions(+), 268 deletions(-) diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp b/be/src/olap/rowset/segment_v2/segment_iterator.cpp index 4c659e92df81a6..97fbaf5e0e10c8 100644 --- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp +++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp @@ -1526,21 +1526,21 @@ Status SegmentIterator::_vec_init_lazy_materialization() { std::set short_cir_pred_col_id_set; // using set for distinct cid std::set vec_pred_col_id_set; - for (auto predicate : _col_predicates) { + for (auto* predicate : _col_predicates) { auto cid = predicate->column_id(); _is_pred_column[cid] = true; pred_column_ids.insert(cid); // check pred using short eval or vec eval if (_can_evaluated_by_vectorized(predicate)) { - vec_pred_col_id_set.insert(predicate->column_id()); + vec_pred_col_id_set.insert(cid); _pre_eval_block_predicate.push_back(predicate); } else { short_cir_pred_col_id_set.insert(cid); _short_cir_eval_predicate.push_back(predicate); - if (predicate->is_filter()) { - _filter_info_id.push_back(predicate); - } + } + if (predicate->is_filter()) { + _filter_info_id.push_back(predicate); } } @@ -1959,17 +1959,17 @@ uint16_t SegmentIterator::_evaluate_vectorization_predicate(uint16_t* sel_rowid_ bool ret_flags[original_size]; DCHECK(!_pre_eval_block_predicate.empty()); bool is_first = true; - for (int i = 0; i < _pre_eval_block_predicate.size(); i++) { - if (_pre_eval_block_predicate[i]->always_true()) { + for (auto& pred : _pre_eval_block_predicate) { + if (pred->always_true()) { continue; } - auto column_id = _pre_eval_block_predicate[i]->column_id(); + auto column_id = pred->column_id(); auto& column = _current_return_columns[column_id]; if (is_first) { - _pre_eval_block_predicate[i]->evaluate_vec(*column, original_size, ret_flags); + pred->evaluate_vec(*column, original_size, ret_flags); is_first = false; } else { - _pre_eval_block_predicate[i]->evaluate_and_vec(*column, original_size, ret_flags); + pred->evaluate_and_vec(*column, original_size, ret_flags); } } diff --git a/be/src/vec/columns/column.h b/be/src/vec/columns/column.h index 23f9073eff5e53..322456a8f77010 100644 --- a/be/src/vec/columns/column.h +++ b/be/src/vec/columns/column.h @@ -483,13 +483,6 @@ class IColumn : public COW { */ virtual Ptr replicate(const Offsets& offsets) const = 0; - /** Copies each element according offsets parameter. - * (i-th element should be copied counts[i] times.) - * If `begin` and `count_sz` specified, it means elements in range [`begin`, `begin` + `count_sz`) will be replicated. - * If `count_sz` is -1, `begin` must be 0. - */ - virtual void replicate(const uint32_t* indexs, size_t target_size, IColumn& column) const = 0; - /// Appends one field multiple times. Can be optimized in inherited classes. virtual void insert_many(const Field& field, size_t length) { for (size_t i = 0; i < length; ++i) { diff --git a/be/src/vec/columns/column_array.cpp b/be/src/vec/columns/column_array.cpp index 8d1cbdd69acc1d..6d2914d8053166 100644 --- a/be/src/vec/columns/column_array.cpp +++ b/be/src/vec/columns/column_array.cpp @@ -856,37 +856,6 @@ ColumnPtr ColumnArray::replicate(const IColumn::Offsets& replicate_offsets) cons return replicate_generic(replicate_offsets); } -void ColumnArray::replicate(const uint32_t* indices, size_t target_size, IColumn& column) const { - if (target_size == 0) { - return; - } - - auto& dst_col = assert_cast(column); - auto& dst_data_col = dst_col.get_data(); - auto& dst_offsets = dst_col.get_offsets(); - dst_offsets.reserve(target_size); - - PODArray data_indices_to_replicate; - - for (size_t i = 0; i < target_size; ++i) { - const auto index = indices[i]; - const auto start = offset_at(index); - const auto length = size_at(index); - dst_offsets.push_back(dst_offsets.back() + length); - if (UNLIKELY(length == 0)) { - continue; - } - - data_indices_to_replicate.reserve(data_indices_to_replicate.size() + length); - for (size_t j = start; j != start + length; ++j) { - data_indices_to_replicate.push_back(j); - } - } - - get_data().replicate(data_indices_to_replicate.data(), data_indices_to_replicate.size(), - dst_data_col); -} - template ColumnPtr ColumnArray::replicate_number(const IColumn::Offsets& replicate_offsets) const { size_t col_size = size(); diff --git a/be/src/vec/columns/column_array.h b/be/src/vec/columns/column_array.h index f2f187a7236933..046bc22ac48f76 100644 --- a/be/src/vec/columns/column_array.h +++ b/be/src/vec/columns/column_array.h @@ -174,7 +174,7 @@ class ColumnArray final : public COWHelper { size_t byte_size() const override; size_t allocated_bytes() const override; ColumnPtr replicate(const IColumn::Offsets& replicate_offsets) const override; - void replicate(const uint32_t* counts, size_t target_size, IColumn& column) const override; + ColumnPtr convert_to_full_column_if_const() const override; /** More efficient methods of manipulation */ diff --git a/be/src/vec/columns/column_complex.h b/be/src/vec/columns/column_complex.h index b004d946aae6fd..8e9686cb2a54a9 100644 --- a/be/src/vec/columns/column_complex.h +++ b/be/src/vec/columns/column_complex.h @@ -265,8 +265,6 @@ class ColumnComplexType final : public COWHelper> ColumnPtr replicate(const IColumn::Offsets& replicate_offsets) const override; - void replicate(const uint32_t* indexs, size_t target_size, IColumn& column) const override; - [[noreturn]] MutableColumns scatter(IColumn::ColumnIndex num_columns, const IColumn::Selector& selector) const override { LOG(FATAL) << "scatter not implemented"; @@ -403,18 +401,6 @@ ColumnPtr ColumnComplexType::replicate(const IColumn::Offsets& offsets) const return res; } -template -void ColumnComplexType::replicate(const uint32_t* indexs, size_t target_size, - IColumn& column) const { - auto& res = reinterpret_cast&>(column); - typename Self::Container& res_data = res.get_data(); - res_data.resize(target_size); - - for (size_t i = 0; i < target_size; ++i) { - res_data[i] = data[indexs[i]]; - } -} - using ColumnBitmap = ColumnComplexType; using ColumnHLL = ColumnComplexType; using ColumnQuantileState = ColumnComplexType; diff --git a/be/src/vec/columns/column_const.cpp b/be/src/vec/columns/column_const.cpp index 3fb851b2a9c8dd..f7efec1f72a859 100644 --- a/be/src/vec/columns/column_const.cpp +++ b/be/src/vec/columns/column_const.cpp @@ -78,12 +78,6 @@ ColumnPtr ColumnConst::replicate(const Offsets& offsets) const { return ColumnConst::create(data, replicated_size); } -void ColumnConst::replicate(const uint32_t* counts, size_t target_size, IColumn& column) const { - if (s == 0) return; - auto& res = reinterpret_cast(column); - res.s = target_size; -} - ColumnPtr ColumnConst::permute(const Permutation& perm, size_t limit) const { if (limit == 0) { limit = s; diff --git a/be/src/vec/columns/column_const.h b/be/src/vec/columns/column_const.h index c832c02bcbdba7..5498fbf7c20b86 100644 --- a/be/src/vec/columns/column_const.h +++ b/be/src/vec/columns/column_const.h @@ -192,7 +192,7 @@ class ColumnConst final : public COWHelper { size_t filter(const Filter& filter) override; ColumnPtr replicate(const Offsets& offsets) const override; - void replicate(const uint32_t* indexs, size_t target_size, IColumn& column) const override; + ColumnPtr permute(const Permutation& perm, size_t limit) const override; // ColumnPtr index(const IColumn & indexes, size_t limit) const override; void get_permutation(bool reverse, size_t limit, int nan_direction_hint, diff --git a/be/src/vec/columns/column_decimal.cpp b/be/src/vec/columns/column_decimal.cpp index dd42b3563a827d..95b247fc668681 100644 --- a/be/src/vec/columns/column_decimal.cpp +++ b/be/src/vec/columns/column_decimal.cpp @@ -437,18 +437,6 @@ ColumnPtr ColumnDecimal::replicate(const IColumn::Offsets& offsets) const { return res; } -template -void ColumnDecimal::replicate(const uint32_t* __restrict indexs, size_t target_size, - IColumn& column) const { - auto& res = reinterpret_cast&>(column); - typename Self::Container& res_data = res.get_data(); - res_data.resize(target_size); - - for (size_t i = 0; i < target_size; ++i) { - res_data[i] = data[indexs[i]]; - } -} - template void ColumnDecimal::sort_column(const ColumnSorter* sorter, EqualFlags& flags, IColumn::Permutation& perms, EqualRange& range, diff --git a/be/src/vec/columns/column_decimal.h b/be/src/vec/columns/column_decimal.h index 3681f9190ae3ff..49b58ebaa4fc76 100644 --- a/be/src/vec/columns/column_decimal.h +++ b/be/src/vec/columns/column_decimal.h @@ -230,8 +230,6 @@ class ColumnDecimal final : public COWHelpertemplate scatter_impl(num_columns, selector); diff --git a/be/src/vec/columns/column_dictionary.h b/be/src/vec/columns/column_dictionary.h index 6de729fd2de7e0..518d8faa008f69 100644 --- a/be/src/vec/columns/column_dictionary.h +++ b/be/src/vec/columns/column_dictionary.h @@ -303,10 +303,6 @@ class ColumnDictionary final : public COWHelper> { return _rowset_segment_id; } - void replicate(const uint32_t* indexs, size_t target_size, IColumn& column) const override { - LOG(FATAL) << "not support"; - } - bool is_dict_sorted() const { return _dict_sorted; } bool is_dict_empty() const { return _dict.empty(); } diff --git a/be/src/vec/columns/column_dummy.h b/be/src/vec/columns/column_dummy.h index b51cd8faa659eb..b94464be5ba325 100644 --- a/be/src/vec/columns/column_dummy.h +++ b/be/src/vec/columns/column_dummy.h @@ -116,10 +116,6 @@ class IColumnDummy : public IColumn { return clone_dummy(offsets.back()); } - void replicate(const uint32_t* indexs, size_t target_size, IColumn& column) const override { - LOG(FATAL) << "Not implemented"; - } - MutableColumns scatter(ColumnIndex num_columns, const Selector& selector) const override { if (s != selector.size()) { LOG(FATAL) << "Size of selector doesn't match size of column."; diff --git a/be/src/vec/columns/column_fixed_length_object.h b/be/src/vec/columns/column_fixed_length_object.h index 6eefc789aa0057..2e67b00d3126c3 100644 --- a/be/src/vec/columns/column_fixed_length_object.h +++ b/be/src/vec/columns/column_fixed_length_object.h @@ -200,10 +200,6 @@ class ColumnFixedLengthObject final : public COWHelper(column); - - auto keys_array = - ColumnArray::create(keys_column->assume_mutable(), offsets_column->assume_mutable()); - - auto result_array = ColumnArray::create(res.keys_column->assume_mutable(), - res.offsets_column->assume_mutable()); - keys_array->replicate(indices, target_size, result_array->assume_mutable_ref()); - - result_array = ColumnArray::create(res.values_column->assume_mutable(), - res.offsets_column->clone_empty()); - - auto values_array = - ColumnArray::create(values_column->assume_mutable(), offsets_column->assume_mutable()); - - /// FIXME: To reuse the replicate of ColumnArray, the offsets column was replicated twice - values_array->replicate(indices, target_size, result_array->assume_mutable_ref()); -} - MutableColumnPtr ColumnMap::get_shrinked_column() { MutableColumns new_columns(2); diff --git a/be/src/vec/columns/column_map.h b/be/src/vec/columns/column_map.h index fe1ccfb6f82e9e..206660d6a0692f 100644 --- a/be/src/vec/columns/column_map.h +++ b/be/src/vec/columns/column_map.h @@ -116,7 +116,6 @@ class ColumnMap final : public COWHelper { size_t filter(const Filter& filter) override; ColumnPtr permute(const Permutation& perm, size_t limit) const override; ColumnPtr replicate(const Offsets& offsets) const override; - void replicate(const uint32_t* indexs, size_t target_size, IColumn& column) const override; MutableColumns scatter(ColumnIndex num_columns, const Selector& selector) const override { return scatter_impl(num_columns, selector); } diff --git a/be/src/vec/columns/column_nullable.cpp b/be/src/vec/columns/column_nullable.cpp index 8b0008d6e2e4e8..0cd671eb11076e 100644 --- a/be/src/vec/columns/column_nullable.cpp +++ b/be/src/vec/columns/column_nullable.cpp @@ -313,6 +313,15 @@ void ColumnNullable::insert_indices_from(const IColumn& src, const uint32_t* ind _need_update_has_null = true; } +void ColumnNullable::insert_indices_from_not_has_null(const IColumn& src, + const uint32_t* indices_begin, + const uint32_t* indices_end) { + const auto& src_concrete = assert_cast(src); + get_nested_column().insert_indices_from(src_concrete.get_nested_column(), indices_begin, + indices_end); + _get_null_map_column().insert_many_defaults(indices_end - indices_begin); +} + void ColumnNullable::insert(const Field& x) { if (x.is_null()) { get_nested_column().insert_default(); @@ -508,12 +517,6 @@ ColumnPtr ColumnNullable::replicate(const Offsets& offsets) const { return ColumnNullable::create(replicated_data, replicated_null_map); } -void ColumnNullable::replicate(const uint32_t* counts, size_t target_size, IColumn& column) const { - auto& res = reinterpret_cast(column); - get_nested_column().replicate(counts, target_size, res.get_nested_column()); - get_null_map_column().replicate(counts, target_size, res.get_null_map_column()); -} - template void ColumnNullable::apply_null_map_impl(const ColumnUInt8& map) { NullMap& arr1 = get_null_map_data(); diff --git a/be/src/vec/columns/column_nullable.h b/be/src/vec/columns/column_nullable.h index 91128fb69a8cec..eca4c57fceb05f 100644 --- a/be/src/vec/columns/column_nullable.h +++ b/be/src/vec/columns/column_nullable.h @@ -121,6 +121,8 @@ class ColumnNullable final : public COWHelper { void insert_range_from(const IColumn& src, size_t start, size_t length) override; void insert_indices_from(const IColumn& src, const uint32_t* indices_begin, const uint32_t* indices_end) override; + void insert_indices_from_not_has_null(const IColumn& src, const uint32_t* indices_begin, + const uint32_t* indices_end); void insert(const Field& x) override; void insert_from(const IColumn& src, size_t n) override; @@ -211,7 +213,6 @@ class ColumnNullable final : public COWHelper { size_t byte_size() const override; size_t allocated_bytes() const override; ColumnPtr replicate(const Offsets& replicate_offsets) const override; - void replicate(const uint32_t* counts, size_t target_size, IColumn& column) const override; void update_xxHash_with_value(size_t start, size_t end, uint64_t& hash, const uint8_t* __restrict null_data) const override; void update_crc_with_value(size_t start, size_t end, uint32_t& hash, diff --git a/be/src/vec/columns/column_object.cpp b/be/src/vec/columns/column_object.cpp index 552ad31809adaa..33ce8fad6aae03 100644 --- a/be/src/vec/columns/column_object.cpp +++ b/be/src/vec/columns/column_object.cpp @@ -1267,18 +1267,6 @@ void ColumnObject::strip_outer_array() { std::swap(subcolumns, new_subcolumns); } -void ColumnObject::replicate(const uint32_t* indexs, size_t target_size, IColumn& column) const { - if (!is_finalized()) { - const_cast(this)->finalize(); - } - auto& var = assert_cast(column); - for (auto& entry : subcolumns) { - auto replica = entry->data.get_finalized_column().clone_empty(); - entry->data.get_finalized_column().replicate(indexs, target_size, *replica); - var.add_sub_column(entry->path, std::move(replica), entry->data.get_least_common_type()); - } -} - ColumnPtr ColumnObject::filter(const Filter& filter, ssize_t count) const { if (!is_finalized()) { const_cast(this)->finalize(); diff --git a/be/src/vec/columns/column_object.h b/be/src/vec/columns/column_object.h index 7e50e34bc9d481..c460bb813d70e5 100644 --- a/be/src/vec/columns/column_object.h +++ b/be/src/vec/columns/column_object.h @@ -473,8 +473,6 @@ class ColumnObject final : public COWHelper { LOG(FATAL) << "should not call the method in column object"; } - void replicate(const uint32_t* indexs, size_t target_size, IColumn& column) const override; - template MutableColumnPtr apply_for_subcolumns(Func&& func) const; diff --git a/be/src/vec/columns/column_string.cpp b/be/src/vec/columns/column_string.cpp index 337f5e5663a242..5c9b1361ac7868 100644 --- a/be/src/vec/columns/column_string.cpp +++ b/be/src/vec/columns/column_string.cpp @@ -472,34 +472,6 @@ ColumnPtr ColumnString::replicate(const Offsets& replicate_offsets) const { return res; } -void ColumnString::replicate(const uint32_t* indexs, size_t target_size, IColumn& column) const { - auto& res = reinterpret_cast(column); - - Chars& res_chars = res.chars; - Offsets& res_offsets = res.offsets; - - size_t byte_size = 0; - res_offsets.resize(target_size); - for (size_t i = 0; i < target_size; ++i) { - long row_idx = indexs[i]; - auto str_size = offsets[row_idx] - offsets[row_idx - 1]; - res_offsets[i] = res_offsets[i - 1] + str_size; - byte_size += str_size; - } - - res_chars.resize(byte_size); - auto* __restrict dest = res.chars.data(); - auto* __restrict src = chars.data(); - for (size_t i = 0; i < target_size; ++i) { - long row_idx = indexs[i]; - auto str_size = offsets[row_idx] - offsets[row_idx - 1]; - memcpy_small_allow_read_write_overflow15(dest + res_offsets[i - 1], - src + offsets[row_idx - 1], str_size); - } - - check_chars_length(res_chars.size(), res_offsets.size()); -} - void ColumnString::reserve(size_t n) { offsets.reserve(n); chars.reserve(n); diff --git a/be/src/vec/columns/column_string.h b/be/src/vec/columns/column_string.h index e6b27f20054f3e..5c50f5ed2f11a4 100644 --- a/be/src/vec/columns/column_string.h +++ b/be/src/vec/columns/column_string.h @@ -520,8 +520,6 @@ class ColumnString final : public COWHelper { ColumnPtr replicate(const Offsets& replicate_offsets) const override; - void replicate(const uint32_t* indexs, size_t target_size, IColumn& column) const override; - MutableColumns scatter(ColumnIndex num_columns, const Selector& selector) const override { return scatter_impl(num_columns, selector); } diff --git a/be/src/vec/columns/column_struct.cpp b/be/src/vec/columns/column_struct.cpp index 4e69db0a545461..f6ab9c9604f5ee 100644 --- a/be/src/vec/columns/column_struct.cpp +++ b/be/src/vec/columns/column_struct.cpp @@ -300,15 +300,6 @@ ColumnPtr ColumnStruct::replicate(const Offsets& offsets) const { return ColumnStruct::create(new_columns); } -void ColumnStruct::replicate(const uint32_t* indexs, size_t target_size, IColumn& column) const { - auto& res = reinterpret_cast(column); - res.columns.resize(columns.size()); - - for (size_t i = 0; i != columns.size(); ++i) { - columns[i]->replicate(indexs, target_size, *res.columns[i]); - } -} - MutableColumnPtr ColumnStruct::get_shrinked_column() { const size_t tuple_size = columns.size(); MutableColumns new_columns(tuple_size); diff --git a/be/src/vec/columns/column_struct.h b/be/src/vec/columns/column_struct.h index 73f40713729618..1b1daee44526a8 100644 --- a/be/src/vec/columns/column_struct.h +++ b/be/src/vec/columns/column_struct.h @@ -154,7 +154,6 @@ class ColumnStruct final : public COWHelper { size_t filter(const Filter& filter) override; ColumnPtr permute(const Permutation& perm, size_t limit) const override; ColumnPtr replicate(const Offsets& offsets) const override; - void replicate(const uint32_t* counts, size_t target_size, IColumn& column) const override; MutableColumns scatter(ColumnIndex num_columns, const Selector& selector) const override; // ColumnPtr index(const IColumn & indexes, size_t limit) const override; diff --git a/be/src/vec/columns/column_vector.cpp b/be/src/vec/columns/column_vector.cpp index 05dd3d2ddeb158..71f55af0a79c1e 100644 --- a/be/src/vec/columns/column_vector.cpp +++ b/be/src/vec/columns/column_vector.cpp @@ -374,7 +374,7 @@ void ColumnVector::insert_indices_from(const IColumn& src, const uint32_t* in auto copy = [](const T* __restrict src, T* __restrict dest, const uint32_t* __restrict begin, const uint32_t* __restrict end) { - for (auto it = begin; it != end; ++it) { + for (const auto* it = begin; it != end; ++it) { *dest = src[*it]; ++dest; } @@ -541,22 +541,6 @@ ColumnPtr ColumnVector::replicate(const IColumn::Offsets& offsets) const { return res; } -template -void ColumnVector::replicate(const uint32_t* __restrict indexs, size_t target_size, - IColumn& column) const { - auto& res = reinterpret_cast&>(column); - typename Self::Container& res_data = res.get_data(); - DCHECK(res_data.empty()); - res_data.resize(target_size); - auto* __restrict left = res_data.data(); - auto* __restrict right = data.data(); - auto* __restrict idxs = indexs; - - for (size_t i = 0; i < target_size; ++i) { - left[i] = right[idxs[i]]; - } -} - template ColumnPtr ColumnVector::index(const IColumn& indexes, size_t limit) const { return select_index_impl(*this, indexes, limit); diff --git a/be/src/vec/columns/column_vector.h b/be/src/vec/columns/column_vector.h index d9aab68697de7c..dbc7524eaacd96 100644 --- a/be/src/vec/columns/column_vector.h +++ b/be/src/vec/columns/column_vector.h @@ -412,8 +412,6 @@ class ColumnVector final : public COWHelper> ColumnPtr replicate(const IColumn::Offsets& offsets) const override; - void replicate(const uint32_t* indexs, size_t target_size, IColumn& column) const override; - MutableColumns scatter(IColumn::ColumnIndex num_columns, const IColumn::Selector& selector) const override { return this->template scatter_impl(num_columns, selector); diff --git a/be/src/vec/columns/predicate_column.h b/be/src/vec/columns/predicate_column.h index 95a8a84327b8f1..198c7ee9cd4c89 100644 --- a/be/src/vec/columns/predicate_column.h +++ b/be/src/vec/columns/predicate_column.h @@ -373,10 +373,6 @@ class PredicateColumnType final : public COWHelper* _left_output_slot_flags = nullptr; std::vector* _right_output_slot_flags = nullptr; + // nullable column but not has null except first row + std::vector _build_column_has_null; + bool _need_calculate_build_index_has_zero = true; bool* _has_null_in_build_side; RuntimeProfile::Counter* _rows_returned_counter = nullptr; diff --git a/be/src/vec/exec/join/process_hash_table_probe_impl.h b/be/src/vec/exec/join/process_hash_table_probe_impl.h index d7b47bed9c0f79..b4212405aeda17 100644 --- a/be/src/vec/exec/join/process_hash_table_probe_impl.h +++ b/be/src/vec/exec/join/process_hash_table_probe_impl.h @@ -17,12 +17,15 @@ #pragma once +#include + #include "common/status.h" #include "pipeline/exec/hashjoin_probe_operator.h" #include "process_hash_table_probe.h" #include "runtime/thread_context.h" // IWYU pragma: keep #include "util/simd/bits.h" #include "vec/columns/column_filter_helper.h" +#include "vec/columns/column_nullable.h" #include "vec/exprs/vexpr_context.h" #include "vhash_join_node.h" @@ -73,20 +76,13 @@ void ProcessHashTableProbe::build_side_output_column( constexpr auto probe_all = JoinOpType == TJoinOp::LEFT_OUTER_JOIN || JoinOpType == TJoinOp::FULL_OUTER_JOIN; - if ((!is_semi_anti_join || have_other_join_conjunct || - (is_mark_join && !_parent->_mark_join_conjuncts.empty())) && - size) { - for (int i = 0; i < _right_col_len; i++) { - const auto& column = *_build_block->safe_get_by_position(i).column; - if (output_slot_flags[i]) { - mcol[i + _right_col_idx]->insert_indices_from(column, _build_indexs.data(), - _build_indexs.data() + size); - } else { - mcol[i + _right_col_idx]->insert_many_defaults(size); - } - } - } - + // indicates whether build_indexs contain 0 + bool build_index_has_zero = + (JoinOpType != TJoinOp::INNER_JOIN && JoinOpType != TJoinOp::RIGHT_OUTER_JOIN) || + have_other_join_conjunct || is_mark_join; + bool need_output = (!is_semi_anti_join || have_other_join_conjunct || + (is_mark_join && !_parent->_mark_join_conjuncts.empty())) && + size; // Dispose right tuple is null flags columns if (probe_all && !have_other_join_conjunct) { _tuple_is_null_right_flags->resize(size); @@ -94,6 +90,42 @@ void ProcessHashTableProbe::build_side_output_column( for (int i = 0; i < size; ++i) { null_data[i] = _build_indexs[i] == 0; } + if (need_output && _need_calculate_build_index_has_zero) { + build_index_has_zero = simd::contain_byte(null_data, size, 1); + } + } + + if (need_output) { + if (!build_index_has_zero && _build_column_has_null.empty()) { + _need_calculate_build_index_has_zero = false; + _build_column_has_null.resize(output_slot_flags.size()); + for (int i = 0; i < _right_col_len; i++) { + const auto& column = *_build_block->safe_get_by_position(i).column; + _build_column_has_null[i] = false; + if (output_slot_flags[i] && column.is_nullable()) { + const auto& nullable = assert_cast(column); + _build_column_has_null[i] = !simd::contain_byte( + nullable.get_null_map_data().data() + 1, nullable.size() - 1, 1); + _need_calculate_build_index_has_zero |= _build_column_has_null[i]; + } + } + } + + for (int i = 0; i < _right_col_len; i++) { + const auto& column = *_build_block->safe_get_by_position(i).column; + if (output_slot_flags[i]) { + if (!build_index_has_zero && _build_column_has_null[i]) { + assert_cast(mcol[i + _right_col_idx].get()) + ->insert_indices_from_not_has_null(column, _build_indexs.data(), + _build_indexs.data() + size); + } else { + mcol[i + _right_col_idx]->insert_indices_from(column, _build_indexs.data(), + _build_indexs.data() + size); + } + } else { + mcol[i + _right_col_idx]->insert_many_defaults(size); + } + } } } @@ -109,7 +141,8 @@ void ProcessHashTableProbe::probe_side_output_column( if (all_match_one) { mcol[i]->insert_range_from(*column, last_probe_index, size); } else { - column->replicate(_probe_indexs.data(), size, *mcol[i]); + mcol[i]->insert_indices_from(*column, _probe_indexs.data(), + _probe_indexs.data() + size); } } else { mcol[i]->insert_many_defaults(size); @@ -147,6 +180,7 @@ typename HashTableType::State ProcessHashTableProbe::_init_p COUNTER_SET(_parent->_probe_arena_memory_usage, (int64_t)hash_table_ctx.serialized_keys_size(false)); } + return typename HashTableType::State(_parent->_probe_columns); } @@ -237,9 +271,11 @@ Status ProcessHashTableProbe::do_process(HashTableType& hash JoinOpType != TJoinOp::RIGHT_ANTI_JOIN)) { auto check_all_match_one = [](const std::vector& vecs, uint32_t probe_idx, int size) { - if (size < 1 || vecs[0] != probe_idx) return false; + if (!size || vecs[0] != probe_idx || vecs[size - 1] != probe_idx + size - 1) { + return false; + } for (int i = 1; i < size; i++) { - if (vecs[i] - vecs[i - 1] != 1) { + if (vecs[i] == vecs[i - 1]) { return false; } } diff --git a/be/src/vec/functions/array/function_array_with_constant.cpp b/be/src/vec/functions/array/function_array_with_constant.cpp index f9a8981a3353a8..bb3cbb53e41231 100644 --- a/be/src/vec/functions/array/function_array_with_constant.cpp +++ b/be/src/vec/functions/array/function_array_with_constant.cpp @@ -101,8 +101,8 @@ class FunctionArrayWithConstant : public IFunction { } auto clone = value->clone_empty(); clone->reserve(input_rows_count); - RETURN_IF_CATCH_EXCEPTION( - value->replicate(array_sizes.data(), offset, *clone->assume_mutable().get())); + clone->assume_mutable()->insert_indices_from(*value, array_sizes.data(), + array_sizes.data() + offset); if (!clone->is_nullable()) { clone = ColumnNullable::create(std::move(clone), ColumnUInt8::create(clone->size(), 0)); } diff --git a/be/test/vec/core/column_array_test.cpp b/be/test/vec/core/column_array_test.cpp index fd2ed212730780..c371dda25e85d1 100644 --- a/be/test/vec/core/column_array_test.cpp +++ b/be/test/vec/core/column_array_test.cpp @@ -186,52 +186,4 @@ TEST(ColumnArrayTest, EmptyArrayPermuteTest) { check_array_data(*res2, {}); } -TEST(ColumnArrayTest, IntArrayReplicateTest) { - auto off_column = ColumnVector::create(); - auto data_column = ColumnVector::create(); - // init column array with [[1,2,3],[],[4],[5,6]] - std::vector offs = {0, 3, 3, 4, 6}; - std::vector vals = {1, 2, 3, 4, 5, 6}; - for (size_t i = 1; i < offs.size(); ++i) { - off_column->insert_data((const char*)(&offs[i]), 0); - } - for (auto& v : vals) { - data_column->insert_data((const char*)(&v), 0); - } - ColumnArray array_column(std::move(data_column), std::move(off_column)); - - uint32_t counts[] = {0, 0, 1, 3, 3, 3}; // size should be equal array_column.size() - size_t target_size = 6; // sum(counts) - - // return array column: [[1,2,3],[1,2,3],[],[5,6],[5,6],[5,6]]; - auto res1 = array_column.clone_empty(); - array_column.replicate(counts, target_size, *res1); - check_array_offsets(*res1, {3, 6, 6, 8, 10, 12}); - check_array_data(*res1, {1, 2, 3, 1, 2, 3, 5, 6, 5, 6, 5, 6}); -} - -TEST(ColumnArrayTest, StringArrayReplicateTest) { - auto off_column = ColumnVector::create(); - auto data_column = ColumnString::create(); - // init column array with [["abc","d"],["ef"],[], [""]]; - std::vector offs = {0, 2, 3, 3, 4}; - std::vector vals = {"abc", "d", "ef", ""}; - for (size_t i = 1; i < offs.size(); ++i) { - off_column->insert_data((const char*)(&offs[i]), 0); - } - for (auto& v : vals) { - data_column->insert_data(v.data(), v.size()); - } - ColumnArray array_column(std::move(data_column), std::move(off_column)); - - uint32_t counts[] = {0, 0, 1, 3, 3, 3}; // size should be equal array_column.size() - size_t target_size = 6; // sum(counts) - - // return array column: [["abc","d"],["abc","d"],["ef"],[""],[""],[""]]; - auto res1 = array_column.clone_empty(); - array_column.replicate(counts, target_size, *res1); - check_array_offsets(*res1, {2, 4, 5, 6, 7, 8}); - check_array_data(*res1, {"abc", "d", "abc", "d", "ef", "", "", ""}); -} - } // namespace doris::vectorized From ff9a962dfb84d46cdd1949139316384d0b09f7b7 Mon Sep 17 00:00:00 2001 From: xzj7019 <131111794+xzj7019@users.noreply.github.com> Date: Mon, 19 Feb 2024 14:13:32 +0800 Subject: [PATCH 02/13] [opt](Nereids) refine group by elimination column prune (#30953) --- .../nereids/rules/rewrite/ColumnPruning.java | 54 +++++++++++++++---- .../rules/rewrite/EliminateGroupByKey.java | 5 +- 2 files changed, 45 insertions(+), 14 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/ColumnPruning.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/ColumnPruning.java index dcb330cd28e1a2..284ac52e14dcf1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/ColumnPruning.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/ColumnPruning.java @@ -18,7 +18,9 @@ package org.apache.doris.nereids.rules.rewrite; import org.apache.doris.nereids.jobs.JobContext; +import org.apache.doris.nereids.rules.RuleType; import org.apache.doris.nereids.rules.rewrite.ColumnPruning.PruneContext; +import org.apache.doris.nereids.trees.expressions.Alias; import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.NamedExpression; import org.apache.doris.nereids.trees.expressions.Slot; @@ -38,6 +40,7 @@ import org.apache.doris.nereids.trees.plans.visitor.CustomRewriter; import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter; import org.apache.doris.nereids.util.ExpressionUtils; +import org.apache.doris.qe.ConnectContext; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; @@ -173,16 +176,18 @@ public Plan visitLogicalRepeat(LogicalRepeat repeat, PruneContex private Plan pruneAggregate(Aggregate agg, PruneContext context) { // first try to prune group by and aggregate functions Aggregate prunedOutputAgg = pruneOutput(agg, agg.getOutputs(), agg::pruneOutputs, context); + Set enableNereidsRules = ConnectContext.get().getSessionVariable().getEnableNereidsRules(); + Aggregate fillUpAggr; - List groupByExpressions = prunedOutputAgg.getGroupByExpressions(); - List outputExpressions = prunedOutputAgg.getOutputExpressions(); - - // then fill up group by - Aggregate fillUpOutputRepeat = fillUpGroupByToOutput(groupByExpressions, outputExpressions) - .map(fullOutput -> prunedOutputAgg.withAggOutput(fullOutput)) - .orElse(prunedOutputAgg); + if (!enableNereidsRules.contains(RuleType.ELIMINATE_GROUP_BY_KEY.type())) { + fillUpAggr = fillUpGroupByToOutput(prunedOutputAgg) + .map(fullOutput -> prunedOutputAgg.withAggOutput(fullOutput)) + .orElse(prunedOutputAgg); + } else { + fillUpAggr = fillUpGroupByAndOutput(prunedOutputAgg); + } - return pruneChildren(fillUpOutputRepeat); + return pruneChildren(fillUpAggr); } private Plan skipPruneThisAndFirstLevelChildren(Plan plan) { @@ -193,8 +198,9 @@ private Plan skipPruneThisAndFirstLevelChildren(Plan plan) { return pruneChildren(plan, requireAllOutputOfChildren); } - private static Optional> fillUpGroupByToOutput( - List groupBy, List output) { + private static Optional> fillUpGroupByToOutput(Aggregate prunedOutputAgg) { + List groupBy = prunedOutputAgg.getGroupByExpressions(); + List output = prunedOutputAgg.getOutputExpressions(); if (output.containsAll(groupBy)) { return Optional.empty(); @@ -209,6 +215,34 @@ private static Optional> fillUpGroupByToOutput( .build()); } + private static Aggregate fillUpGroupByAndOutput(Aggregate prunedOutputAgg) { + List groupBy = prunedOutputAgg.getGroupByExpressions(); + List output = prunedOutputAgg.getOutputExpressions(); + + if (!(prunedOutputAgg instanceof LogicalAggregate)) { + return prunedOutputAgg; + } + // add back group by keys which eliminated by rule ELIMINATE_GROUP_BY_KEY + // if related output expressions are not in pruned output list. + List remainedOutputExprs = Lists.newArrayList(output); + remainedOutputExprs.removeAll(groupBy); + + List newOutputList = Lists.newArrayList(); + newOutputList.addAll((List) groupBy); + newOutputList.addAll(remainedOutputExprs); + + if (!(prunedOutputAgg instanceof LogicalAggregate)) { + return prunedOutputAgg.withAggOutput(newOutputList); + } else { + List newGroupByExprList = newOutputList.stream().filter(e -> + !(prunedOutputAgg.getAggregateFunctions().contains(e) + || e instanceof Alias && prunedOutputAgg.getAggregateFunctions() + .contains(((Alias) e).child())) + ).collect(Collectors.toList()); + return ((LogicalAggregate) prunedOutputAgg).withGroupByAndOutput(newGroupByExprList, newOutputList); + } + } + /** prune output */ public static

P pruneOutput(P plan, List originOutput, Function, P> withPrunedOutput, PruneContext context) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/EliminateGroupByKey.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/EliminateGroupByKey.java index d922252bebc8c8..69a34a680ec51c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/EliminateGroupByKey.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/EliminateGroupByKey.java @@ -50,10 +50,7 @@ public Rule build() { List uniqueFdItems = new ArrayList<>(); List nonUniqueFdItems = new ArrayList<>(); if (agg.getGroupByExpressions().isEmpty() - || agg.getGroupByExpressions().equals(agg.getOutputExpressions()) - || !agg.getGroupByExpressions().stream().allMatch(e -> e instanceof SlotReference) - || agg.getGroupByExpressions().stream().allMatch(e -> - ((SlotReference) e).getColumn().isPresent() && ((SlotReference) e).getTable().isPresent())) { + || !agg.getGroupByExpressions().stream().allMatch(e -> e instanceof SlotReference)) { return null; } ImmutableSet fdItems = childPlan.getLogicalProperties().getFunctionalDependencies().getFdItems(); From 31d8df8de28250058e9b9e4f9cafb5496994e259 Mon Sep 17 00:00:00 2001 From: AlexYue Date: Mon, 19 Feb 2024 15:05:05 +0800 Subject: [PATCH 03/13] (enhance)(S3) Change s3 metric from bvar adder to latency recorder (#28861) --- be/src/io/fs/err_utils.cpp | 15 ++++++++------- be/src/io/fs/s3_file_reader.cpp | 3 ++- be/src/io/fs/s3_file_system.cpp | 32 +++++++++++++++++++++----------- be/src/io/fs/s3_file_writer.cpp | 32 ++++++++++++++------------------ be/src/util/s3_util.cpp | 18 +++++++++--------- be/src/util/s3_util.h | 19 ++++++++++--------- 6 files changed, 64 insertions(+), 55 deletions(-) diff --git a/be/src/io/fs/err_utils.cpp b/be/src/io/fs/err_utils.cpp index a9eeae7862f2f4..5688b888f4481a 100644 --- a/be/src/io/fs/err_utils.cpp +++ b/be/src/io/fs/err_utils.cpp @@ -115,15 +115,16 @@ Status s3fs_error(const Aws::S3::S3Error& err, std::string_view msg) { using namespace Aws::Http; switch (err.GetResponseCode()) { case HttpResponseCode::NOT_FOUND: - return Status::Error("{}: {} {}", msg, err.GetExceptionName(), - err.GetMessage()); + return Status::Error("{}: {} {} type={}", msg, err.GetExceptionName(), + err.GetMessage(), err.GetErrorType()); case HttpResponseCode::FORBIDDEN: - return Status::Error("{}: {} {}", msg, err.GetExceptionName(), - err.GetMessage()); + return Status::Error("{}: {} {} type={}", msg, + err.GetExceptionName(), err.GetMessage(), + err.GetErrorType()); default: - return Status::Error("{}: {} {} code={}", msg, - err.GetExceptionName(), err.GetMessage(), - err.GetResponseCode()); + return Status::Error( + "{}: {} {} code={} type={}", msg, err.GetExceptionName(), err.GetMessage(), + err.GetResponseCode(), err.GetErrorType()); } } diff --git a/be/src/io/fs/s3_file_reader.cpp b/be/src/io/fs/s3_file_reader.cpp index 61569375d7eec7..c78eabd09d78b8 100644 --- a/be/src/io/fs/s3_file_reader.cpp +++ b/be/src/io/fs/s3_file_reader.cpp @@ -33,6 +33,7 @@ #include "common/compiler_util.h" // IWYU pragma: keep #include "io/fs/err_utils.h" #include "io/fs/s3_common.h" +#include "util/bvar_helper.h" #include "util/doris_metrics.h" #include "util/s3_util.h" @@ -96,8 +97,8 @@ Status S3FileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_rea if (!client) { return Status::InternalError("init s3 client error"); } + SCOPED_BVAR_LATENCY(s3_bvar::s3_get_latency); auto outcome = client->GetObject(request); - s3_bvar::s3_get_total << 1; if (!outcome.IsSuccess()) { return s3fs_error(outcome.GetError(), fmt::format("failed to read from {}", _path.native())); diff --git a/be/src/io/fs/s3_file_system.cpp b/be/src/io/fs/s3_file_system.cpp index 77f303ffdc0d4a..207bd9dff49cdd 100644 --- a/be/src/io/fs/s3_file_system.cpp +++ b/be/src/io/fs/s3_file_system.cpp @@ -71,6 +71,7 @@ #include "io/fs/remote_file_system.h" #include "io/fs/s3_file_reader.h" #include "io/fs/s3_file_writer.h" +#include "util/bvar_helper.h" #include "util/s3_uri.h" #include "util/s3_util.h" @@ -166,8 +167,8 @@ Status S3FileSystem::delete_file_impl(const Path& file) { GET_KEY(key, file); request.WithBucket(_s3_conf.bucket).WithKey(key); + SCOPED_BVAR_LATENCY(s3_bvar::s3_delete_latency); auto outcome = client->DeleteObject(request); - s3_bvar::s3_delete_total << 1; if (outcome.IsSuccess() || outcome.GetError().GetResponseCode() == Aws::Http::HttpResponseCode::NOT_FOUND) { return Status::OK(); @@ -190,8 +191,11 @@ Status S3FileSystem::delete_directory_impl(const Path& dir) { delete_request.SetBucket(_s3_conf.bucket); bool is_trucated = false; do { - auto outcome = client->ListObjectsV2(request); - s3_bvar::s3_list_total << 1; + Aws::S3::Model::ListObjectsV2Outcome outcome; + { + SCOPED_BVAR_LATENCY(s3_bvar::s3_list_latency); + outcome = client->ListObjectsV2(request); + } if (!outcome.IsSuccess()) { return s3fs_error( outcome.GetError(), @@ -207,8 +211,8 @@ Status S3FileSystem::delete_directory_impl(const Path& dir) { Aws::S3::Model::Delete del; del.WithObjects(std::move(objects)).SetQuiet(true); delete_request.SetDelete(std::move(del)); + SCOPED_BVAR_LATENCY(s3_bvar::s3_delete_latency); auto delete_outcome = client->DeleteObjects(delete_request); - s3_bvar::s3_delete_total << 1; if (!delete_outcome.IsSuccess()) { return s3fs_error(delete_outcome.GetError(), fmt::format("failed to delete dir {}", full_path(prefix))); @@ -249,8 +253,8 @@ Status S3FileSystem::batch_delete_impl(const std::vector& remote_files) { } del.WithObjects(std::move(objects)).SetQuiet(true); delete_request.SetDelete(std::move(del)); + SCOPED_BVAR_LATENCY(s3_bvar::s3_delete_latency); auto delete_outcome = client->DeleteObjects(delete_request); - s3_bvar::s3_delete_total << 1; if (UNLIKELY(!delete_outcome.IsSuccess())) { return s3fs_error( delete_outcome.GetError(), @@ -276,8 +280,8 @@ Status S3FileSystem::exists_impl(const Path& path, bool* res) const { Aws::S3::Model::HeadObjectRequest request; request.WithBucket(_s3_conf.bucket).WithKey(key); + SCOPED_BVAR_LATENCY(s3_bvar::s3_head_latency); auto outcome = client->HeadObject(request); - s3_bvar::s3_head_total << 1; if (outcome.IsSuccess()) { *res = true; } else if (outcome.GetError().GetResponseCode() == Aws::Http::HttpResponseCode::NOT_FOUND) { @@ -297,8 +301,8 @@ Status S3FileSystem::file_size_impl(const Path& file, int64_t* file_size) const GET_KEY(key, file); request.WithBucket(_s3_conf.bucket).WithKey(key); + SCOPED_BVAR_LATENCY(s3_bvar::s3_head_latency); auto outcome = client->HeadObject(request); - s3_bvar::s3_head_total << 1; if (!outcome.IsSuccess()) { return s3fs_error(outcome.GetError(), fmt::format("failed to get file size {}", full_path(key))); @@ -324,8 +328,11 @@ Status S3FileSystem::list_impl(const Path& dir, bool only_file, std::vectorListObjectsV2(request); - s3_bvar::s3_list_total << 1; + Aws::S3::Model::ListObjectsV2Outcome outcome; + { + SCOPED_BVAR_LATENCY(s3_bvar::s3_list_latency); + outcome = client->ListObjectsV2(request); + } if (!outcome.IsSuccess()) { return s3fs_error(outcome.GetError(), fmt::format("failed to list {}", full_path(prefix))); @@ -425,8 +432,11 @@ Status S3FileSystem::download_impl(const Path& remote_file, const Path& local_fi GET_KEY(key, remote_file); Aws::S3::Model::GetObjectRequest request; request.WithBucket(_s3_conf.bucket).WithKey(key); - Aws::S3::Model::GetObjectOutcome response = _client->GetObject(request); - s3_bvar::s3_get_total << 1; + Aws::S3::Model::GetObjectOutcome response; + { + SCOPED_BVAR_LATENCY(s3_bvar::s3_get_latency); + response = _client->GetObject(request); + } if (response.IsSuccess()) { Aws::OFStream local_file_s; local_file_s.open(local_file, std::ios::out | std::ios::binary); diff --git a/be/src/io/fs/s3_file_writer.cpp b/be/src/io/fs/s3_file_writer.cpp index 8ec6bddafd116b..0ec3a46f808757 100644 --- a/be/src/io/fs/s3_file_writer.cpp +++ b/be/src/io/fs/s3_file_writer.cpp @@ -52,19 +52,16 @@ #include "io/fs/path.h" #include "io/fs/s3_file_bufferpool.h" #include "io/fs/s3_file_system.h" +#include "util/bvar_helper.h" #include "util/debug_points.h" #include "util/defer_op.h" #include "util/doris_metrics.h" #include "util/runtime_profile.h" #include "util/s3_util.h" -namespace Aws { -namespace S3 { -namespace Model { +namespace Aws::S3::Model { class DeleteObjectRequest; -} // namespace Model -} // namespace S3 -} // namespace Aws +} // namespace Aws::S3::Model using Aws::S3::Model::AbortMultipartUploadRequest; using Aws::S3::Model::CompletedPart; @@ -74,8 +71,7 @@ using Aws::S3::Model::CreateMultipartUploadRequest; using Aws::S3::Model::UploadPartRequest; using Aws::S3::Model::UploadPartOutcome; -namespace doris { -namespace io { +namespace doris::io { using namespace Aws::S3::Model; using Aws::S3::S3Client; @@ -126,8 +122,8 @@ Status S3FileWriter::_create_multi_upload_request() { _bucket, _path.native(), _upload_id); }); + SCOPED_BVAR_LATENCY(s3_bvar::s3_multi_part_upload_latency); auto outcome = _client->CreateMultipartUpload(create_request); - s3_bvar::s3_multi_part_upload_total << 1; if (outcome.IsSuccess()) { _upload_id = outcome.GetResult().GetUploadId(); @@ -175,8 +171,8 @@ Status S3FileWriter::_abort() { _wait_until_finish("Abort"); AbortMultipartUploadRequest request; request.WithBucket(_bucket).WithKey(_key).WithUploadId(_upload_id); + SCOPED_BVAR_LATENCY(s3_bvar::s3_multi_part_upload_latency); auto outcome = _client->AbortMultipartUpload(request); - s3_bvar::s3_multi_part_upload_total << 1; if (outcome.IsSuccess() || outcome.GetError().GetErrorType() == Aws::S3::S3Errors::NO_SUCH_UPLOAD || outcome.GetError().GetResponseCode() == Aws::Http::HttpResponseCode::NOT_FOUND) { @@ -324,10 +320,11 @@ void S3FileWriter::_upload_one_part(int64_t part_num, UploadFileBuffer& buf) { upload_request.SetContentLength(buf.get_size()); upload_request.SetContentType("application/octet-stream"); - auto upload_part_callable = _client->UploadPartCallable(upload_request); - s3_bvar::s3_multi_part_upload_total << 1; - - UploadPartOutcome upload_part_outcome = upload_part_callable.get(); + UploadPartOutcome upload_part_outcome; + { + SCOPED_BVAR_LATENCY(s3_bvar::s3_multi_part_upload_latency); + upload_part_outcome = _client->UploadPart(upload_request); + } DBUG_EXECUTE_IF("s3_file_writer::_upload_one_part", { if (part_num > 1) { std::this_thread::sleep_for(std::chrono::milliseconds(500)); @@ -413,8 +410,8 @@ Status S3FileWriter::_complete() { LOG_WARNING(s.to_string()); return s; }); + SCOPED_BVAR_LATENCY(s3_bvar::s3_multi_part_upload_latency); auto complete_outcome = _client->CompleteMultipartUpload(complete_request); - s3_bvar::s3_multi_part_upload_total << 1; if (!complete_outcome.IsSuccess()) { _st = s3fs_error(complete_outcome.GetError(), @@ -464,8 +461,8 @@ void S3FileWriter::_put_object(UploadFileBuffer& buf) { LOG(WARNING) << _st; return; }); + SCOPED_BVAR_LATENCY(s3_bvar::s3_put_latency); auto response = _client->PutObject(request); - s3_bvar::s3_put_total << 1; if (!response.IsSuccess()) { _st = s3fs_error(response.GetError(), fmt::format("failed to put object {}, upload_id={}", _path.native(), _upload_id)); @@ -477,5 +474,4 @@ void S3FileWriter::_put_object(UploadFileBuffer& buf) { s3_file_created_total << 1; } -} // namespace io -} // namespace doris +} // namespace doris::io diff --git a/be/src/util/s3_util.cpp b/be/src/util/s3_util.cpp index 80bce1d4383542..82a1b4097c6de2 100644 --- a/be/src/util/s3_util.cpp +++ b/be/src/util/s3_util.cpp @@ -42,15 +42,15 @@ namespace doris { namespace s3_bvar { -bvar::Adder s3_get_total("s3_get", "total_num"); -bvar::Adder s3_put_total("s3_put", "total_num"); -bvar::Adder s3_delete_total("s3_delete", "total_num"); -bvar::Adder s3_head_total("s3_head", "total_num"); -bvar::Adder s3_multi_part_upload_total("s3_multi_part_upload", "total_num"); -bvar::Adder s3_list_total("s3_list", "total_num"); -bvar::Adder s3_list_object_versions_total("s3_list_object_versions", "total_num"); -bvar::Adder s3_get_bucket_version_total("s3_get_bucket_version", "total_num"); -bvar::Adder s3_copy_object_total("s3_copy_object", "total_num"); +bvar::LatencyRecorder s3_get_latency("s3_get"); +bvar::LatencyRecorder s3_put_latency("s3_put"); +bvar::LatencyRecorder s3_delete_latency("s3_delete"); +bvar::LatencyRecorder s3_head_latency("s3_head"); +bvar::LatencyRecorder s3_multi_part_upload_latency("s3_multi_part_upload"); +bvar::LatencyRecorder s3_list_latency("s3_list"); +bvar::LatencyRecorder s3_list_object_versions_latency("s3_list_object_versions"); +bvar::LatencyRecorder s3_get_bucket_version_latency("s3_get_bucket_version"); +bvar::LatencyRecorder s3_copy_object_latency("s3_copy_object"); }; // namespace s3_bvar class DorisAWSLogger final : public Aws::Utils::Logging::LogSystemInterface { diff --git a/be/src/util/s3_util.h b/be/src/util/s3_util.h index 873f6b06f973d1..0727128c06befd 100644 --- a/be/src/util/s3_util.h +++ b/be/src/util/s3_util.h @@ -19,6 +19,7 @@ #include #include +#include #include #include #include @@ -45,15 +46,15 @@ class Adder; namespace doris { namespace s3_bvar { -extern bvar::Adder s3_get_total; -extern bvar::Adder s3_put_total; -extern bvar::Adder s3_delete_total; -extern bvar::Adder s3_head_total; -extern bvar::Adder s3_multi_part_upload_total; -extern bvar::Adder s3_list_total; -extern bvar::Adder s3_list_object_versions_total; -extern bvar::Adder s3_get_bucket_version_total; -extern bvar::Adder s3_copy_object_total; +extern bvar::LatencyRecorder s3_get_latency; +extern bvar::LatencyRecorder s3_put_latency; +extern bvar::LatencyRecorder s3_delete_latency; +extern bvar::LatencyRecorder s3_head_latency; +extern bvar::LatencyRecorder s3_multi_part_upload_latency; +extern bvar::LatencyRecorder s3_list_latency; +extern bvar::LatencyRecorder s3_list_object_versions_latency; +extern bvar::LatencyRecorder s3_get_bucket_version_latency; +extern bvar::LatencyRecorder s3_copy_object_latency; }; // namespace s3_bvar class S3URI; From 5c252e3c5261d2c34762382bce0625332156ed9d Mon Sep 17 00:00:00 2001 From: Siyang Tang <82279870+TangSiyang2001@users.noreply.github.com> Date: Mon, 19 Feb 2024 15:54:21 +0800 Subject: [PATCH 04/13] [fix](mysql-channel) initialize mysql serializer by default (#31083) --- .../main/java/org/apache/doris/mysql/DummyMysqlChannel.java | 3 --- .../src/main/java/org/apache/doris/mysql/MysqlChannel.java | 4 +--- 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/DummyMysqlChannel.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/DummyMysqlChannel.java index 05b72552f96ed1..4d738cb639d6a2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mysql/DummyMysqlChannel.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/DummyMysqlChannel.java @@ -27,9 +27,6 @@ * And don't need to allocate a real ByteBuffer. */ public class DummyMysqlChannel extends MysqlChannel { - public DummyMysqlChannel() { - this.serializer = MysqlSerializer.newInstance(); - } public void setSequenceId(int sequenceId) { this.sequenceId = sequenceId; diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java index 5dfa7947abe45c..90354ae9cb7d54 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java @@ -75,7 +75,7 @@ public class MysqlChannel { protected boolean isSslHandshaking; private SSLEngine sslEngine; - protected volatile MysqlSerializer serializer; + protected volatile MysqlSerializer serializer = MysqlSerializer.newInstance(); // mysql flag CLIENT_DEPRECATE_EOF private boolean clientDeprecatedEOF; @@ -111,8 +111,6 @@ public MysqlChannel(StreamConnection connection, ConnectContext context) { remoteHostPortString = connection.getPeerAddress().toString(); remoteIp = connection.getPeerAddress().toString(); } - // The serializer and buffers should only be created if this is a real MysqlChannel - this.serializer = MysqlSerializer.newInstance(); this.defaultBuffer = ByteBuffer.allocate(16 * 1024); this.headerByteBuffer = ByteBuffer.allocate(PACKET_HEADER_LEN); this.sendBuffer = ByteBuffer.allocate(2 * 1024 * 1024); From 99c3b5f041a2b6ce828a3e279c4798ae261a7f7e Mon Sep 17 00:00:00 2001 From: zxealous Date: Mon, 19 Feb 2024 16:55:54 +0800 Subject: [PATCH 05/13] [fix](outfile) Fix unable to export empty data (#30703) Issue Number: close #30600 Fix unable to export empty data to hdfs / S3, this behavior is inconsistent with version 1.2.7, version 1.2.7 can export empty data to hdfs/ S3, and there will be exported files on S3/HDFS. --- be/src/io/fs/broker_file_writer.cpp | 8 + be/src/io/fs/broker_file_writer.h | 1 + be/src/io/fs/file_writer.h | 3 + be/src/io/fs/hdfs_file_writer.cpp | 8 + be/src/io/fs/hdfs_file_writer.h | 1 + be/src/io/fs/s3_file_writer.cpp | 39 +++- .../vec/sink/writer/vfile_result_writer.cpp | 1 + .../outfile/csv/test_outfile_empty_data.out | 9 + .../csv/test_outfile_empty_data.groovy | 166 ++++++++++++++++++ 9 files changed, 230 insertions(+), 6 deletions(-) create mode 100644 regression-test/data/export_p0/outfile/csv/test_outfile_empty_data.out create mode 100644 regression-test/suites/export_p0/outfile/csv/test_outfile_empty_data.groovy diff --git a/be/src/io/fs/broker_file_writer.cpp b/be/src/io/fs/broker_file_writer.cpp index d5b2baa7a66e0c..0d305bf269b53a 100644 --- a/be/src/io/fs/broker_file_writer.cpp +++ b/be/src/io/fs/broker_file_writer.cpp @@ -153,6 +153,14 @@ Status BrokerFileWriter::finalize() { return Status::OK(); } +Status BrokerFileWriter::open() { + if (!_opened) { + RETURN_IF_ERROR(_open()); + _opened = true; + } + return Status::OK(); +} + Status BrokerFileWriter::_open() { TBrokerOpenWriterRequest request; diff --git a/be/src/io/fs/broker_file_writer.h b/be/src/io/fs/broker_file_writer.h index cf5b8013acb146..f132545f0a8bc8 100644 --- a/be/src/io/fs/broker_file_writer.h +++ b/be/src/io/fs/broker_file_writer.h @@ -45,6 +45,7 @@ class BrokerFileWriter : public FileWriter { int64_t start_offset, FileSystemSPtr fs); virtual ~BrokerFileWriter(); + Status open() override; Status close() override; Status appendv(const Slice* data, size_t data_cnt) override; Status finalize() override; diff --git a/be/src/io/fs/file_writer.h b/be/src/io/fs/file_writer.h index 58c9c9ff060fa9..256c67a9838ff7 100644 --- a/be/src/io/fs/file_writer.h +++ b/be/src/io/fs/file_writer.h @@ -46,6 +46,9 @@ class FileWriter { DISALLOW_COPY_AND_ASSIGN(FileWriter); + // Open the file for writing. + virtual Status open() { return Status::OK(); } + // Normal close. Wait for all data to persist before returning. virtual Status close() = 0; diff --git a/be/src/io/fs/hdfs_file_writer.cpp b/be/src/io/fs/hdfs_file_writer.cpp index 00081db310fa0f..1f262e1abcd4f3 100644 --- a/be/src/io/fs/hdfs_file_writer.cpp +++ b/be/src/io/fs/hdfs_file_writer.cpp @@ -103,6 +103,14 @@ Status HdfsFileWriter::finalize() { return Status::OK(); } +Status HdfsFileWriter::open() { + if (!_opened) { + RETURN_IF_ERROR(_open()); + _opened = true; + } + return Status::OK(); +} + Status HdfsFileWriter::_open() { _path = convert_path(_path, _hdfs_fs->_fs_name); std::string hdfs_dir = _path.parent_path().string(); diff --git a/be/src/io/fs/hdfs_file_writer.h b/be/src/io/fs/hdfs_file_writer.h index c05f7625124020..bffd0efdca9c72 100644 --- a/be/src/io/fs/hdfs_file_writer.h +++ b/be/src/io/fs/hdfs_file_writer.h @@ -36,6 +36,7 @@ class HdfsFileWriter : public FileWriter { HdfsFileWriter(Path file, FileSystemSPtr fs); ~HdfsFileWriter(); + Status open() override; Status close() override; Status appendv(const Slice* data, size_t data_cnt) override; Status finalize() override; diff --git a/be/src/io/fs/s3_file_writer.cpp b/be/src/io/fs/s3_file_writer.cpp index 0ec3a46f808757..7711529b6f5989 100644 --- a/be/src/io/fs/s3_file_writer.cpp +++ b/be/src/io/fs/s3_file_writer.cpp @@ -198,17 +198,44 @@ Status S3FileWriter::close() { return _st; } VLOG_DEBUG << "S3FileWriter::close, path: " << _path.native(); - // it might be one file less than 5MB, we do upload here - if (_pending_buf != nullptr) { - if (_upload_id.empty()) { + + if (_upload_id.empty()) { + if (_pending_buf != nullptr) { + // it might be one file less than 5MB, we do upload here auto* buf = dynamic_cast(_pending_buf.get()); DCHECK(buf != nullptr); buf->set_upload_to_remote([this](UploadFileBuffer& b) { _put_object(b); }); + } else { + // if there is no pending buffer, we need to create an empty file + auto builder = FileBufferBuilder(); + builder.set_type(BufferType::UPLOAD) + .set_upload_callback([this](UploadFileBuffer& buf) { _put_object(buf); }) + .set_sync_after_complete_task([this](Status s) { + bool ret = false; + if (!s.ok()) [[unlikely]] { + VLOG_NOTICE << "failed at key: " << _key + << ", status: " << s.to_string(); + std::unique_lock _lck {_completed_lock}; + _failed = true; + ret = true; + this->_st = std::move(s); + } + // After the signal, there is a scenario where the previous invocation of _wait_until_finish + // returns to the caller, and subsequently, the S3 file writer is destructed. + // This means that accessing _failed afterwards would result in a heap use after free vulnerability. + _countdown_event.signal(); + return ret; + }) + .set_is_cancelled([this]() { return _failed.load(); }); + RETURN_IF_ERROR(builder.build(&_pending_buf)); + auto* buf = dynamic_cast(_pending_buf.get()); + DCHECK(buf != nullptr); } - _countdown_event.add_count(); - RETURN_IF_ERROR(_pending_buf->submit(std::move(_pending_buf))); - _pending_buf = nullptr; } + _countdown_event.add_count(); + RETURN_IF_ERROR(_pending_buf->submit(std::move(_pending_buf))); + _pending_buf = nullptr; + DBUG_EXECUTE_IF("s3_file_writer::close", { RETURN_IF_ERROR(_complete()); return Status::InternalError("failed to close s3 file writer"); diff --git a/be/src/vec/sink/writer/vfile_result_writer.cpp b/be/src/vec/sink/writer/vfile_result_writer.cpp index bf3d09cdda6999..d3228d3368098f 100644 --- a/be/src/vec/sink/writer/vfile_result_writer.cpp +++ b/be/src/vec/sink/writer/vfile_result_writer.cpp @@ -141,6 +141,7 @@ Status VFileResultWriter::_create_file_writer(const std::string& file_name) { FileFactory::convert_storage_type(_storage_type), _state->exec_env(), _file_opts->broker_addresses, _file_opts->broker_properties, file_name, 0, _file_writer_impl)); + RETURN_IF_ERROR(_file_writer_impl->open()); switch (_file_opts->file_format) { case TFileFormatType::FORMAT_CSV_PLAIN: _vfile_writer.reset(new VCSVTransformer(_state, _file_writer_impl.get(), diff --git a/regression-test/data/export_p0/outfile/csv/test_outfile_empty_data.out b/regression-test/data/export_p0/outfile/csv/test_outfile_empty_data.out new file mode 100644 index 00000000000000..260c177d310c7d --- /dev/null +++ b/regression-test/data/export_p0/outfile/csv/test_outfile_empty_data.out @@ -0,0 +1,9 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_base1 -- + +-- !select_tvf1 -- + +-- !select_tvf2 -- + +-- !select_tvf3 -- + diff --git a/regression-test/suites/export_p0/outfile/csv/test_outfile_empty_data.groovy b/regression-test/suites/export_p0/outfile/csv/test_outfile_empty_data.groovy new file mode 100644 index 00000000000000..1804fff2a11450 --- /dev/null +++ b/regression-test/suites/export_p0/outfile/csv/test_outfile_empty_data.groovy @@ -0,0 +1,166 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.codehaus.groovy.runtime.IOGroovyMethods + +import java.nio.charset.StandardCharsets +import java.nio.file.Files +import java.nio.file.Paths + +suite("test_outfile_empty_data", "external,hive,tvf,external_docker") { + + String enabled = context.config.otherConfigs.get("enableHiveTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("diable Hive test.") + return; + } + + // open nereids + sql """ set enable_nereids_planner=true """ + sql """ set enable_fallback_to_original_planner=false """ + + // use to outfile to hdfs + String hdfs_port = context.config.otherConfigs.get("hdfs_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + // It's okay to use random `hdfsUser`, but can not be empty. + def hdfsUserName = "doris" + def format = "csv" + def defaultFS = "hdfs://${externalEnvIp}:${hdfs_port}" + + // use to outfile to s3 + String ak = getS3AK() + String sk = getS3SK() + String s3_endpoint = getS3Endpoint() + String region = region = getS3Region() + String bucket = context.config.otherConfigs.get("s3BucketName"); + + // broker + String broker_name = "hdfs" + + def export_table_name = "outfile_empty_data_test" + + def create_table = {table_name, column_define -> + sql """ DROP TABLE IF EXISTS ${table_name} """ + sql """ + CREATE TABLE IF NOT EXISTS ${table_name} ( + ${column_define} + ) + DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1"); + """ + } + + def outfile_to_HDFS_directly = { + // select ... into outfile ... + def uuid = UUID.randomUUID().toString() + + hdfs_outfile_path = "/user/doris/tmp_data/${uuid}" + uri = "${defaultFS}" + "${hdfs_outfile_path}/exp_" + + def res = sql """ + SELECT * FROM ${export_table_name} t ORDER BY user_id + INTO OUTFILE "${uri}" + FORMAT AS ${format} + PROPERTIES ( + "fs.defaultFS"="${defaultFS}", + "hadoop.username" = "${hdfsUserName}" + ); + """ + logger.info("outfile to hdfs direct success path: " + res[0][3]); + return res[0][3] + } + + def outfile_to_HDFS_with_broker = { + // select ... into outfile ... + def uuid = UUID.randomUUID().toString() + + hdfs_outfile_path = "/user/doris/tmp_data/${uuid}" + uri = "${defaultFS}" + "${hdfs_outfile_path}/exp_" + + def res = sql """ + SELECT * FROM ${export_table_name} t ORDER BY user_id + INTO OUTFILE "${uri}" + FORMAT AS ${format} + PROPERTIES ( + "broker.fs.defaultFS"="${defaultFS}", + "broker.name"="hdfs", + "broker.username" = "${hdfsUserName}" + ); + """ + logger.info("outfile to hdfs with broker success path: " + res[0][3]); + return res[0][3] + } + + def outfile_to_S3_directly = { + // select ... into outfile ... + s3_outfile_path = "${bucket}/outfile/csv/test-outfile-empty/" + uri = "s3://${s3_outfile_path}/exp_" + + def res = sql """ + SELECT * FROM ${export_table_name} t ORDER BY user_id + INTO OUTFILE "${uri}" + FORMAT AS csv + PROPERTIES ( + "s3.endpoint" = "${s3_endpoint}", + "s3.region" = "${region}", + "s3.secret_key"="${sk}", + "s3.access_key" = "${ak}" + ); + """ + logger.info("outfile to s3 success path: " + res[0][3]); + return res[0][3] + } + + try { + def doris_column_define = """ + `user_id` INT NOT NULL COMMENT "用户id", + `name` STRING NULL, + `age` INT NULL""" + // create table + create_table(export_table_name, doris_column_define); + // test outfile empty data to hdfs directly + def outfile_to_hdfs_directly_url = outfile_to_HDFS_directly() + // test outfile empty data to hdfs with broker + def outfile_to_hdfs_with_broker_url= outfile_to_HDFS_with_broker() + // test outfile empty data to s3 directly + def outfile_to_s3_directly_url = outfile_to_S3_directly() + qt_select_base1 """ SELECT * FROM ${export_table_name} ORDER BY user_id; """ + + qt_select_tvf1 """ select * from HDFS( + "uri" = "${outfile_to_hdfs_directly_url}0.csv", + "hadoop.username" = "${hdfsUserName}", + "format" = "${format}"); + """ + + qt_select_tvf2 """ select * from HDFS( + "uri" = "${outfile_to_hdfs_with_broker_url}0.csv", + "hadoop.username" = "${hdfsUserName}", + "format" = "${format}"); + """ + + qt_select_tvf3 """ SELECT * FROM S3 ( + "uri" = "http://${s3_endpoint}${outfile_to_s3_directly_url.substring(4, outfile_to_s3_directly_url.length())}0.csv", + "ACCESS_KEY"= "${ak}", + "SECRET_KEY" = "${sk}", + "format" = "${format}", + "region" = "${region}", + "use_path_style" = "true" + ); + """ + + } finally { + } +} From 7811bae0f7fe7cca3389364d07d727769ff1eff3 Mon Sep 17 00:00:00 2001 From: zhangstar333 <87313068+zhangstar333@users.noreply.github.com> Date: Mon, 19 Feb 2024 17:19:05 +0800 Subject: [PATCH 06/13] [fix](udf) fix java-udf coredump as get env return nullptr (#30986) --- be/src/runtime/fold_constant_executor.cpp | 3 ++- be/src/vec/functions/function_java_udf.cpp | 2 +- be/src/vec/functions/function_java_udf.h | 18 ++++++++---------- .../expression/rules/FoldConstantRuleOnBE.java | 2 ++ 4 files changed, 13 insertions(+), 12 deletions(-) diff --git a/be/src/runtime/fold_constant_executor.cpp b/be/src/runtime/fold_constant_executor.cpp index de1fd04688207d..8a19ca1b35c81a 100644 --- a/be/src/runtime/fold_constant_executor.cpp +++ b/be/src/runtime/fold_constant_executor.cpp @@ -122,7 +122,8 @@ Status FoldConstantExecutor::fold_constant_vexpr(const TFoldConstantParams& para } expr_result_map->insert({m.first, pexpr_result_map}); } - + //TODO: will be delete the debug log after find problem of timeout. + LOG(INFO) << "finish fold_query_id: " << query_id_string(); return Status::OK(); } diff --git a/be/src/vec/functions/function_java_udf.cpp b/be/src/vec/functions/function_java_udf.cpp index 0ae649b768d00a..dc367486dfcbfa 100644 --- a/be/src/vec/functions/function_java_udf.cpp +++ b/be/src/vec/functions/function_java_udf.cpp @@ -127,7 +127,7 @@ Status JavaFunctionCall::close(FunctionContext* context, // JNIContext own some resource and its release method depend on JavaFunctionCall // has to release the resource before JavaFunctionCall is deconstructed. if (jni_ctx) { - jni_ctx->close(); + RETURN_IF_ERROR(jni_ctx->close()); } return Status::OK(); } diff --git a/be/src/vec/functions/function_java_udf.h b/be/src/vec/functions/function_java_udf.h index f930346a72db02..e507392184f85d 100644 --- a/be/src/vec/functions/function_java_udf.h +++ b/be/src/vec/functions/function_java_udf.h @@ -126,29 +126,27 @@ class JavaFunctionCall : public IFunctionBase { JniContext() = default; - void close() { + Status close() { if (!open_successes) { LOG_WARNING("maybe open failed, need check the reason"); - return; //maybe open failed, so can't call some jni + return Status::OK(); //maybe open failed, so can't call some jni } if (is_closed) { - return; + return Status::OK(); } VLOG_DEBUG << "Free resources for JniContext"; - JNIEnv* env; + JNIEnv* env = nullptr; Status status = JniUtil::GetJNIEnv(&env); - if (!status.ok()) { + if (!status.ok() || env == nullptr) { LOG(WARNING) << "errors while get jni env " << status; - return; + return status; } env->CallNonvirtualVoidMethodA(executor, executor_cl, executor_close_id, nullptr); env->DeleteGlobalRef(executor); env->DeleteGlobalRef(executor_cl); - Status s = JniUtil::GetJniExceptionMsg(env); - if (!s.ok()) { - LOG(WARNING) << s; - } + RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env)); is_closed = true; + return Status::OK(); } }; }; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/FoldConstantRuleOnBE.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/FoldConstantRuleOnBE.java index 681fa4b56cd889..a254f9b4d43f91 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/FoldConstantRuleOnBE.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/FoldConstantRuleOnBE.java @@ -181,6 +181,8 @@ private Map evalOnBE(Map> paramMa tParams.setQueryOptions(tQueryOptions); tParams.setQueryId(context.queryId()); + // TODO: will be delete the debug log after find problem of timeout. + LOG.info("fold query {} ", DebugUtil.printId(context.queryId())); Future future = BackendServiceProxy.getInstance().foldConstantExpr(brpcAddress, tParams); PConstantExprResult result = future.get(5, TimeUnit.SECONDS); From 1883acf2341037d6823a03abfac272ce0a480317 Mon Sep 17 00:00:00 2001 From: starocean999 <40539150+starocean999@users.noreply.github.com> Date: Mon, 19 Feb 2024 17:43:05 +0800 Subject: [PATCH 07/13] [fix](nereids) disable PushDownJoinOtherCondition rule for mark join (#31084) --- .../rewrite/PushDownJoinOtherCondition.java | 3 ++- .../data/nereids_p0/join/test_mark_join.out | 9 +++++++++ .../nereids_p0/join/test_mark_join.groovy | 19 ++++++++++++++++++- 3 files changed, 29 insertions(+), 2 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownJoinOtherCondition.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownJoinOtherCondition.java index 8fff61988d4d61..cec7413cd61305 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownJoinOtherCondition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownJoinOtherCondition.java @@ -62,7 +62,8 @@ public Rule build() { return logicalJoin() // TODO: we may need another rule to handle on true or on false condition .when(join -> !join.getOtherJoinConjuncts().isEmpty() && !(join.getOtherJoinConjuncts().size() == 1 - && join.getOtherJoinConjuncts().get(0) instanceof BooleanLiteral)) + && join.getOtherJoinConjuncts().get(0) instanceof BooleanLiteral) + && !join.isMarkJoin()) .then(join -> { List otherJoinConjuncts = join.getOtherJoinConjuncts(); List remainingOther = Lists.newArrayList(); diff --git a/regression-test/data/nereids_p0/join/test_mark_join.out b/regression-test/data/nereids_p0/join/test_mark_join.out index 4098502b75df66..1ab6bcce40ee78 100644 --- a/regression-test/data/nereids_p0/join/test_mark_join.out +++ b/regression-test/data/nereids_p0/join/test_mark_join.out @@ -41,3 +41,12 @@ 3 \N true 4 \N false +-- !mark_join7 -- +1 p 0 +9 1 +\N \N 2 +\N \N 3 +3 4 +2 q 5 +0 6 + diff --git a/regression-test/suites/nereids_p0/join/test_mark_join.groovy b/regression-test/suites/nereids_p0/join/test_mark_join.groovy index 6008919d831a5a..e951ea155771c8 100644 --- a/regression-test/suites/nereids_p0/join/test_mark_join.groovy +++ b/regression-test/suites/nereids_p0/join/test_mark_join.groovy @@ -21,6 +21,7 @@ suite("test_mark_join", "nereids_p0") { sql "drop table if exists `test_mark_join_t1`;" sql "drop table if exists `test_mark_join_t2`;" + sql "drop table if exists table_7_undef_partitions2_keys3_properties4_distributed_by5;" sql """ CREATE TABLE IF NOT EXISTS `test_mark_join_t1` ( @@ -60,6 +61,16 @@ suite("test_mark_join", "nereids_p0") { ); """ + sql """ + create table table_7_undef_partitions2_keys3_properties4_distributed_by5 ( + col_int_undef_signed int/*agg_type_placeholder*/ , + col_varchar_10__undef_signed varchar(10)/*agg_type_placeholder*/ , + pk int/*agg_type_placeholder*/ + ) engine=olap + distributed by hash(pk) buckets 10 + properties("replication_num" = "1"); + """ + sql """ insert into `test_mark_join_t1` values (1, 1, 1, 'abc', 'efg', 'hjk'), @@ -80,6 +91,8 @@ suite("test_mark_join", "nereids_p0") { ); """ + sql """insert into table_7_undef_partitions2_keys3_properties4_distributed_by5(pk,col_int_undef_signed,col_varchar_10__undef_signed) values (0,1,'p'),(1,9,''),(2,null,null),(3,null,null),(4,3,''),(5,2,'q'),(6,0,'');""" + qt_mark_join1 """ select k1, k2 @@ -122,5 +135,9 @@ suite("test_mark_join", "nereids_p0") { from test_mark_join_t1 order by 1, 2, 3; """ - + qt_mark_join7 """ + SELECT * FROM table_7_undef_partitions2_keys3_properties4_distributed_by5 AS t1 + WHERE EXISTS ( SELECT MIN(`pk`) FROM table_7_undef_partitions2_keys3_properties4_distributed_by5 AS t2 WHERE t1.pk = 6 ) + OR EXISTS ( SELECT `pk` FROM table_7_undef_partitions2_keys3_properties4_distributed_by5 AS t2 WHERE t1.pk = 5 ) order by pk ; + """ } From 5e2ee19ff7124572a220895993bdb14a671e4b71 Mon Sep 17 00:00:00 2001 From: yiguolei <676222867@qq.com> Date: Mon, 19 Feb 2024 17:47:35 +0800 Subject: [PATCH 08/13] [bugfix](performance) fix performance problem (#31093) --- be/src/pipeline/pipeline_fragment_context.h | 9 +++++++++ be/src/runtime/fragment_mgr.cpp | 4 +++- be/src/runtime/plan_fragment_executor.h | 8 ++++++++ 3 files changed, 20 insertions(+), 1 deletion(-) diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index 34ca84b1481cc8..9ffcb40038cdb1 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -112,6 +112,11 @@ class PipelineFragmentContext : public TaskExecutionContext { void close_a_pipeline(); + void set_merge_controller_handler( + std::shared_ptr& handler) { + _merge_controller_handler = handler; + } + virtual void add_merge_controller_handler( std::shared_ptr& handler) {} @@ -188,6 +193,10 @@ class PipelineFragmentContext : public TaskExecutionContext { std::shared_ptr _query_ctx; + // This shared ptr is never used. It is just a reference to hold the object. + // There is a weak ptr in runtime filter manager to reference this object. + std::shared_ptr _merge_controller_handler; + MonotonicStopWatch _fragment_watcher; RuntimeProfile::Counter* _start_timer = nullptr; RuntimeProfile::Counter* _prepare_timer = nullptr; diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index ec8e36d34c6606..af7370a4c58a30 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -719,6 +719,7 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, static_cast(_runtimefilter_controller.add_entity( params.params, params.params.query_id, params.query_options, &handler, RuntimeFilterParamsContext::create(fragment_executor->runtime_state()))); + fragment_executor->set_merge_controller_handler(handler); { std::lock_guard lock(_lock); _fragment_instance_map.insert( @@ -806,6 +807,7 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, static_cast(_runtimefilter_controller.add_entity( params.local_params[i], params.query_id, params.query_options, &handler, RuntimeFilterParamsContext::create(context->get_runtime_state()))); + context->set_merge_controller_handler(handler); const TUniqueId& fragment_instance_id = params.local_params[i].fragment_instance_id; { std::lock_guard lock(_lock); @@ -885,7 +887,7 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, static_cast(_runtimefilter_controller.add_entity( local_params, params.query_id, params.query_options, &handler, RuntimeFilterParamsContext::create(context->get_runtime_state()))); - + context->set_merge_controller_handler(handler); { std::lock_guard lock(_lock); _pipeline_map.insert(std::make_pair(fragment_instance_id, context)); diff --git a/be/src/runtime/plan_fragment_executor.h b/be/src/runtime/plan_fragment_executor.h index 5529d1ba3b5314..41fa6c2f819537 100644 --- a/be/src/runtime/plan_fragment_executor.h +++ b/be/src/runtime/plan_fragment_executor.h @@ -134,6 +134,11 @@ class PlanFragmentExecutor : public TaskExecutionContext { void set_need_wait_execution_trigger() { _need_wait_execution_trigger = true; } + void set_merge_controller_handler( + std::shared_ptr& handler) { + _merge_controller_handler = handler; + } + std::shared_ptr get_query_ctx() { return _query_ctx; } TUniqueId fragment_instance_id() const { return _fragment_instance_id; } @@ -214,6 +219,9 @@ class PlanFragmentExecutor : public TaskExecutionContext { RuntimeProfile::Counter* _blocks_produced_counter = nullptr; RuntimeProfile::Counter* _fragment_cpu_timer = nullptr; + // This shared ptr is never used. It is just a reference to hold the object. + // There is a weak ptr in runtime filter manager to reference this object. + std::shared_ptr _merge_controller_handler; // If set the true, this plan fragment will be executed only after FE send execution start rpc. bool _need_wait_execution_trigger = false; From a94c1009287eadbe5e950c3382784a34f43f6078 Mon Sep 17 00:00:00 2001 From: Hyman-zhao Date: Mon, 19 Feb 2024 18:00:34 +0800 Subject: [PATCH 09/13] [fix](docs) fix kafka-load error word #31116 --- docs/en/docs/data-operate/import/import-scenes/kafka-load.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/en/docs/data-operate/import/import-scenes/kafka-load.md b/docs/en/docs/data-operate/import/import-scenes/kafka-load.md index 2d2539855e67c7..999d120862faef 100644 --- a/docs/en/docs/data-operate/import/import-scenes/kafka-load.md +++ b/docs/en/docs/data-operate/import/import-scenes/kafka-load.md @@ -28,7 +28,7 @@ under the License. Users can directly subscribe to message data in Kafka by submitting routine import jobs to synchronize data in near real-time. -Doris itself can ensure that messages in Kafka are subscribed without loss or weight, that is, `Exactly-Once` consumption semantics. +Doris itself can ensure that messages in Kafka are subscribed without loss or duplicated, that is, `Exactly-Once` consumption semantics. ## Subscribe to Kafka messages From 4f57f69bf35c6648250f9d4b60ffa944cae2249d Mon Sep 17 00:00:00 2001 From: zclllyybb Date: Mon, 19 Feb 2024 18:32:53 +0800 Subject: [PATCH 10/13] [fix](auto-partition) Fix concurrent load same value of auto partition #31107 --- .../main/java/org/apache/doris/service/FrontendServiceImpl.java | 1 - 1 file changed, 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 78eb936e4d2e5b..fc275b2606d3bd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -3275,7 +3275,6 @@ public TCreatePartitionResult createPartition(TCreatePartitionRequest request) t // check partition's number limit. int partitionNum = olapTable.getPartitionNum() + addPartitionClauseMap.size(); if (partitionNum > Config.max_auto_partition_num) { - olapTable.writeUnlock(); String errorMessage = String.format( "create partition failed. partition numbers %d will exceed limit variable " + "max_auto_partition_num %d", From c9dcd5a2292ffd90b17781706c1ef08add0fe9c3 Mon Sep 17 00:00:00 2001 From: zclllyybb Date: Mon, 19 Feb 2024 18:33:35 +0800 Subject: [PATCH 11/13] [docs](functions) fix and complete agg functions docs #31114 --- .../{bitmap_agg.md => bitmap-agg.md} | 0 .../aggregate-functions/group-bit-xor.md | 2 +- .../aggregate-functions/group-bitmap-xor.md | 2 +- docs/sidebars.json | 49 ++++++++++--------- .../{bitmap_agg.md => bitmap-agg.md} | 0 .../aggregate-functions/group-bit-xor.md | 4 +- .../aggregate-functions/group-bitmap-xor.md | 4 +- 7 files changed, 33 insertions(+), 28 deletions(-) rename docs/en/docs/sql-manual/sql-functions/aggregate-functions/{bitmap_agg.md => bitmap-agg.md} (100%) rename docs/zh-CN/docs/sql-manual/sql-functions/aggregate-functions/{bitmap_agg.md => bitmap-agg.md} (100%) diff --git a/docs/en/docs/sql-manual/sql-functions/aggregate-functions/bitmap_agg.md b/docs/en/docs/sql-manual/sql-functions/aggregate-functions/bitmap-agg.md similarity index 100% rename from docs/en/docs/sql-manual/sql-functions/aggregate-functions/bitmap_agg.md rename to docs/en/docs/sql-manual/sql-functions/aggregate-functions/bitmap-agg.md diff --git a/docs/en/docs/sql-manual/sql-functions/aggregate-functions/group-bit-xor.md b/docs/en/docs/sql-manual/sql-functions/aggregate-functions/group-bit-xor.md index 65b5ca2682a631..719c6a9e078ef6 100644 --- a/docs/en/docs/sql-manual/sql-functions/aggregate-functions/group-bit-xor.md +++ b/docs/en/docs/sql-manual/sql-functions/aggregate-functions/group-bit-xor.md @@ -24,7 +24,7 @@ specific language governing permissions and limitations under the License. --> -## group_bit_xor +## GROUP_BIT_XOR ### description #### Syntax diff --git a/docs/en/docs/sql-manual/sql-functions/aggregate-functions/group-bitmap-xor.md b/docs/en/docs/sql-manual/sql-functions/aggregate-functions/group-bitmap-xor.md index 66f73e7332c0a6..b4eed3a00b9649 100644 --- a/docs/en/docs/sql-manual/sql-functions/aggregate-functions/group-bitmap-xor.md +++ b/docs/en/docs/sql-manual/sql-functions/aggregate-functions/group-bitmap-xor.md @@ -24,7 +24,7 @@ specific language governing permissions and limitations under the License. --> -## group_bitmap_xor +## GROUP_BITMAP_XOR ### description #### Syntax diff --git a/docs/sidebars.json b/docs/sidebars.json index fd429fbfa3c945..f02c6782d0dae2 100644 --- a/docs/sidebars.json +++ b/docs/sidebars.json @@ -562,43 +562,48 @@ "type": "category", "label": "Aggregate Functions", "items": [ - "sql-manual/sql-functions/aggregate-functions/collect-set", "sql-manual/sql-functions/aggregate-functions/min", - "sql-manual/sql-functions/aggregate-functions/stddev-samp", + "sql-manual/sql-functions/aggregate-functions/min-by", + "sql-manual/sql-functions/aggregate-functions/max", + "sql-manual/sql-functions/aggregate-functions/max-by", "sql-manual/sql-functions/aggregate-functions/avg", "sql-manual/sql-functions/aggregate-functions/avg-weighted", - "sql-manual/sql-functions/aggregate-functions/percentile", - "sql-manual/sql-functions/aggregate-functions/percentile-array", - "sql-manual/sql-functions/aggregate-functions/hll-union-agg", + "sql-manual/sql-functions/aggregate-functions/sum", + "sql-manual/sql-functions/aggregate-functions/stddev", + "sql-manual/sql-functions/aggregate-functions/stddev-samp", + "sql-manual/sql-functions/aggregate-functions/variance", + "sql-manual/sql-functions/aggregate-functions/var-samp", + "sql-manual/sql-functions/aggregate-functions/covar", + "sql-manual/sql-functions/aggregate-functions/covar-samp", + "sql-manual/sql-functions/aggregate-functions/corr", "sql-manual/sql-functions/aggregate-functions/topn", "sql-manual/sql-functions/aggregate-functions/topn-array", "sql-manual/sql-functions/aggregate-functions/topn-weighted", "sql-manual/sql-functions/aggregate-functions/count", - "sql-manual/sql-functions/aggregate-functions/sum", - "sql-manual/sql-functions/aggregate-functions/max-by", - "sql-manual/sql-functions/aggregate-functions/bitmap-union", + "sql-manual/sql-functions/aggregate-functions/count-by-enum", + "sql-manual/sql-functions/aggregate-functions/approx-count-distinct", + "sql-manual/sql-functions/aggregate-functions/percentile", + "sql-manual/sql-functions/aggregate-functions/percentile-array", + "sql-manual/sql-functions/aggregate-functions/percentile-approx", + "sql-manual/sql-functions/aggregate-functions/histogram", "sql-manual/sql-functions/aggregate-functions/group-bitmap-xor", "sql-manual/sql-functions/aggregate-functions/group-bit-and", "sql-manual/sql-functions/aggregate-functions/group-bit-or", "sql-manual/sql-functions/aggregate-functions/group-bit-xor", - "sql-manual/sql-functions/aggregate-functions/percentile-approx", - "sql-manual/sql-functions/aggregate-functions/stddev", "sql-manual/sql-functions/aggregate-functions/group-concat", - "sql-manual/sql-functions/aggregate-functions/collect-list", - "sql-manual/sql-functions/aggregate-functions/min-by", - "sql-manual/sql-functions/aggregate-functions/max", + "sql-manual/sql-functions/aggregate-functions/bitmap-union", + "sql-manual/sql-functions/aggregate-functions/hll-union-agg", + "sql-manual/sql-functions/aggregate-functions/grouping", + "sql-manual/sql-functions/aggregate-functions/grouping-id", "sql-manual/sql-functions/aggregate-functions/any-value", - "sql-manual/sql-functions/aggregate-functions/var-samp", - "sql-manual/sql-functions/aggregate-functions/approx-count-distinct", - "sql-manual/sql-functions/aggregate-functions/variance", + "sql-manual/sql-functions/aggregate-functions/array-agg", + "sql-manual/sql-functions/aggregate-functions/map-agg", + "sql-manual/sql-functions/aggregate-functions/bitmap-agg", + "sql-manual/sql-functions/aggregate-functions/collect-set", + "sql-manual/sql-functions/aggregate-functions/collect-list", "sql-manual/sql-functions/aggregate-functions/retention", "sql-manual/sql-functions/aggregate-functions/sequence-match", - "sql-manual/sql-functions/aggregate-functions/sequence-count", - "sql-manual/sql-functions/aggregate-functions/grouping", - "sql-manual/sql-functions/aggregate-functions/grouping-id", - "sql-manual/sql-functions/aggregate-functions/count-by-enum", - "sql-manual/sql-functions/aggregate-functions/histogram", - "sql-manual/sql-functions/aggregate-functions/map-agg" + "sql-manual/sql-functions/aggregate-functions/sequence-count" ] }, { diff --git a/docs/zh-CN/docs/sql-manual/sql-functions/aggregate-functions/bitmap_agg.md b/docs/zh-CN/docs/sql-manual/sql-functions/aggregate-functions/bitmap-agg.md similarity index 100% rename from docs/zh-CN/docs/sql-manual/sql-functions/aggregate-functions/bitmap_agg.md rename to docs/zh-CN/docs/sql-manual/sql-functions/aggregate-functions/bitmap-agg.md diff --git a/docs/zh-CN/docs/sql-manual/sql-functions/aggregate-functions/group-bit-xor.md b/docs/zh-CN/docs/sql-manual/sql-functions/aggregate-functions/group-bit-xor.md index d5ca78c87ead46..812ad4372c455c 100644 --- a/docs/zh-CN/docs/sql-manual/sql-functions/aggregate-functions/group-bit-xor.md +++ b/docs/zh-CN/docs/sql-manual/sql-functions/aggregate-functions/group-bit-xor.md @@ -1,6 +1,6 @@ --- { - "title": "group_bit_xor", + "title": "GROUP_BIT_XOR", "language": "zh-CN" } --- @@ -24,7 +24,7 @@ specific language governing permissions and limitations under the License. --> -## group_bit_xor +## GROUP_BIT_XOR ### description #### Syntax diff --git a/docs/zh-CN/docs/sql-manual/sql-functions/aggregate-functions/group-bitmap-xor.md b/docs/zh-CN/docs/sql-manual/sql-functions/aggregate-functions/group-bitmap-xor.md index 653f6b5ef3607a..0456e94af483f5 100644 --- a/docs/zh-CN/docs/sql-manual/sql-functions/aggregate-functions/group-bitmap-xor.md +++ b/docs/zh-CN/docs/sql-manual/sql-functions/aggregate-functions/group-bitmap-xor.md @@ -1,6 +1,6 @@ --- { - "title": "group_bitmap_xor", + "title": "GROUP_BITMAP_XOR", "language": "zh-CN" } --- @@ -24,7 +24,7 @@ specific language governing permissions and limitations under the License. --> -## group_bitmap_xor +## GROUP_BITMAP_XOR ### description #### Syntax From b027c7226582fd89671c8d16b4bcd364c8d1cf3f Mon Sep 17 00:00:00 2001 From: Hyman-zhao Date: Mon, 19 Feb 2024 18:33:55 +0800 Subject: [PATCH 12/13] Doris doc (#31099) --- .../docs/data-operate/import/import-way/s3-load-manual.md | 2 +- docs/en/docs/data-table/best-practice.md | 4 ++-- docs/en/docs/data-table/index/inverted-index.md | 6 +++--- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/docs/en/docs/data-operate/import/import-way/s3-load-manual.md b/docs/en/docs/data-operate/import/import-way/s3-load-manual.md index 3cf5dd0052a225..84b73f7f3d869a 100644 --- a/docs/en/docs/data-operate/import/import-way/s3-load-manual.md +++ b/docs/en/docs/data-operate/import/import-way/s3-load-manual.md @@ -95,7 +95,7 @@ example: -2. Support using temporary security credentials to access object stores that support the S3 protocol: +2. Support using temporary security credentials to access object storage that support the S3 protocol: ``` WITH S3 diff --git a/docs/en/docs/data-table/best-practice.md b/docs/en/docs/data-table/best-practice.md index 5c4333bd7b8cad..8a4361b1635b74 100644 --- a/docs/en/docs/data-table/best-practice.md +++ b/docs/en/docs/data-table/best-practice.md @@ -113,7 +113,7 @@ In business, most users will choose to partition on time, which has the followin 1.3.1.2. List Partitioning -In business,, users can select cities or other enumeration values for partition. +In business, users can select cities or other enumeration values for partition. 1.3.2. Hash Bucketing @@ -179,4 +179,4 @@ Users can modify the Schema of an existing table through the Schema Change opera - Reorder columns - Adding or removing index -For details, please refer to [Schema Change](../advanced/alter-table/schema-change.md) \ No newline at end of file +For details, please refer to [Schema Change](../advanced/alter-table/schema-change.md) diff --git a/docs/en/docs/data-table/index/inverted-index.md b/docs/en/docs/data-table/index/inverted-index.md index 20ea94b026b4c8..a8dfccaf5cb881 100644 --- a/docs/en/docs/data-table/index/inverted-index.md +++ b/docs/en/docs/data-table/index/inverted-index.md @@ -35,7 +35,7 @@ From version 2.0.0, Doris implemented inverted index to support fulltext search ## Glossary -- [inverted index](https://en.wikipedia.org/wiki/Inverted_index) is a index techlogy used in information retirval commonly. It split text into word terms and construct a term to doc index. This index is called inverted index and can be used to find the docs where a specific term appears. +- [inverted index](https://en.wikipedia.org/wiki/Inverted_index) is a index techlogy used in information retrieval commonly. It split text into word terms and construct a term to doc index. This index is called inverted index and can be used to find the docs where a specific term appears. ## Basic Principles @@ -353,7 +353,7 @@ mysql> SELECT count() FROM hackernews_1m WHERE comment MATCH_ANY 'OLAP'; 1 row in set (0.02 sec) ``` -- Semilarly, count on 'OLTP' shows 0.07s vs 0.01s. Due to the cache in Doris, both LIKE and MATCH_ANY is faster, but there is still 7x speedup. +- Similarly, count on 'OLTP' shows 0.07s vs 0.01s. Due to the cache in Doris, both LIKE and MATCH_ANY is faster, but there is still 7x speedup. ```sql mysql> SELECT count() FROM hackernews_1m WHERE comment LIKE '%OLTP%'; +---------+ @@ -394,7 +394,7 @@ mysql> SELECT count() FROM hackernews_1m WHERE comment MATCH_ALL 'OLAP OLTP'; ``` - search for at least one of 'OLAP' or 'OLTP', 0.12s vs 0.01s,12x speedup - - using MATCH_ALL if you only need at least one of the keywords appears + - using MATCH_ANY if you only need at least one of the keywords appears ```sql mysql> SELECT count() FROM hackernews_1m WHERE comment LIKE '%OLAP%' OR comment LIKE '%OLTP%'; +---------+ From 6e4a2f54136c839c6605cd7c02a05ea0d83b974f Mon Sep 17 00:00:00 2001 From: slothever <18522955+wsjz@users.noreply.github.com> Date: Mon, 19 Feb 2024 20:19:36 +0800 Subject: [PATCH 13/13] [fix](multi-catalog)fix getting ugi methods and unify them (#30844) put all ugi login methods to HadoopUGI --- .../org/apache/doris/hudi/HudiJniScanner.java | 4 +- .../java/org/apache/doris/hudi/Utils.java | 34 +------ fe/fe-common/pom.xml | 15 +++ .../security/authentication}/AuthType.java | 2 +- .../authentication/AuthenticationConfig.java | 69 +++++++++++++ .../security/authentication/HadoopUGI.java | 99 +++++++++++++++++++ .../KerberosAuthenticationConfig.java | 34 +++++++ .../SimpleAuthenticationConfig.java | 31 ++++++ .../apache/doris/catalog/HdfsResource.java | 13 +-- .../catalog/HiveMetaStoreClientHelper.java | 40 +++----- .../org/apache/doris/catalog/HiveTable.java | 33 ++++--- .../doris/datasource/HMSExternalCatalog.java | 32 ++---- .../datasource/hive/HiveMetaStoreCache.java | 4 +- .../doris/fs/remote/dfs/DFSFileSystem.java | 58 +---------- 14 files changed, 305 insertions(+), 163 deletions(-) rename fe/{fe-core/src/main/java/org/apache/doris/catalog => fe-common/src/main/java/org/apache/doris/common/security/authentication}/AuthType.java (96%) create mode 100644 fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/AuthenticationConfig.java create mode 100644 fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopUGI.java create mode 100644 fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/KerberosAuthenticationConfig.java create mode 100644 fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/SimpleAuthenticationConfig.java diff --git a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java index 17ca650675de3c..f22a69255a9d27 100644 --- a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java +++ b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java @@ -21,6 +21,8 @@ import org.apache.doris.common.jni.JniScanner; import org.apache.doris.common.jni.vec.ColumnType; import org.apache.doris.common.jni.vec.ScanPredicate; +import org.apache.doris.common.security.authentication.AuthenticationConfig; +import org.apache.doris.common.security.authentication.HadoopUGI; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.avro.generic.GenericDatumReader; @@ -138,7 +140,7 @@ public HudiJniScanner(int fetchSize, Map params) { predicates = new ScanPredicate[0]; } } - ugi = Utils.getUserGroupInformation(split.hadoopConf()); + ugi = HadoopUGI.loginWithUGI(AuthenticationConfig.getKerberosConfig(split.hadoopConf())); } catch (Exception e) { LOG.error("Failed to initialize hudi scanner, split params:\n" + debugString, e); throw e; diff --git a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/Utils.java b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/Utils.java index 4e8d670dac293a..be5628d2ce4d25 100644 --- a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/Utils.java +++ b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/Utils.java @@ -17,6 +17,9 @@ package org.apache.doris.hudi; +import org.apache.doris.common.security.authentication.AuthenticationConfig; +import org.apache.doris.common.security.authentication.HadoopUGI; + import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; @@ -36,35 +39,6 @@ import java.util.List; public class Utils { - public static class Constants { - public static String HADOOP_USER_NAME = "hadoop.username"; - public static String HADOOP_SECURITY_AUTHENTICATION = "hadoop.security.authentication"; - public static String HADOOP_KERBEROS_PRINCIPAL = "hadoop.kerberos.principal"; - public static String HADOOP_KERBEROS_KEYTAB = "hadoop.kerberos.keytab"; - } - - public static UserGroupInformation getUserGroupInformation(Configuration conf) { - String authentication = conf.get(Constants.HADOOP_SECURITY_AUTHENTICATION, null); - if ("kerberos".equals(authentication)) { - conf.set("hadoop.security.authorization", "true"); - UserGroupInformation.setConfiguration(conf); - String principal = conf.get(Constants.HADOOP_KERBEROS_PRINCIPAL); - String keytab = conf.get(Constants.HADOOP_KERBEROS_KEYTAB); - try { - UserGroupInformation ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab); - UserGroupInformation.setLoginUser(ugi); - return ugi; - } catch (IOException e) { - throw new RuntimeException(e); - } - } else { - String hadoopUserName = conf.get(Constants.HADOOP_USER_NAME); - if (hadoopUserName != null) { - return UserGroupInformation.createRemoteUser(hadoopUserName); - } - } - return null; - } public static long getCurrentProcId() { try { @@ -114,7 +88,7 @@ public static void killProcess(long pid) { } public static HoodieTableMetaClient getMetaClient(Configuration conf, String basePath) { - UserGroupInformation ugi = getUserGroupInformation(conf); + UserGroupInformation ugi = HadoopUGI.loginWithUGI(AuthenticationConfig.getKerberosConfig(conf)); HoodieTableMetaClient metaClient; if (ugi != null) { try { diff --git a/fe/fe-common/pom.xml b/fe/fe-common/pom.xml index 6192ffd202af5e..c4b1b29b352319 100644 --- a/fe/fe-common/pom.xml +++ b/fe/fe-common/pom.xml @@ -93,6 +93,21 @@ under the License. org.projectlombok lombok + + org.apache.hadoop + hadoop-common + + + commons-collections + commons-collections + + + org.apache.commons + commons-compress + + + provided + doris-fe-common diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/AuthType.java b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/AuthType.java similarity index 96% rename from fe/fe-core/src/main/java/org/apache/doris/catalog/AuthType.java rename to fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/AuthType.java index 25097bb24e5372..6cf3358fe7f32d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/AuthType.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/AuthType.java @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.catalog; +package org.apache.doris.common.security.authentication; /** * Define different auth type for external table such as hive/iceberg diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/AuthenticationConfig.java b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/AuthenticationConfig.java new file mode 100644 index 00000000000000..b3cb69f7004498 --- /dev/null +++ b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/AuthenticationConfig.java @@ -0,0 +1,69 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.security.authentication; + +import lombok.Data; +import org.apache.hadoop.conf.Configuration; + +@Data +public abstract class AuthenticationConfig { + + public static String HADOOP_USER_NAME = "hadoop.username"; + public static String HADOOP_SECURITY_AUTHENTICATION = "hadoop.security.authentication"; + public static String HADOOP_KERBEROS_PRINCIPAL = "hadoop.kerberos.principal"; + public static String HADOOP_KERBEROS_AUTHORIZATION = "hadoop.security.authorization"; + public static String HADOOP_KERBEROS_KEYTAB = "hadoop.kerberos.keytab"; + public static String HIVE_KERBEROS_PRINCIPAL = "hive.metastore.kerberos.principal"; + public static String HIVE_KERBEROS_KEYTAB = "hive.metastore.kerberos.keytab.file"; + + private boolean isValid; + + /** + * get kerberos config from hadoop conf + * @param conf config + * @return ugi + */ + public static AuthenticationConfig getKerberosConfig(Configuration conf) { + return AuthenticationConfig.getKerberosConfig(conf, HADOOP_KERBEROS_PRINCIPAL, HADOOP_KERBEROS_KEYTAB); + } + + /** + * get kerberos config from hadoop conf + * @param conf config + * @param krbPrincipalKey principal key + * @param krbKeytabKey keytab key + * @return ugi + */ + public static AuthenticationConfig getKerberosConfig(Configuration conf, + String krbPrincipalKey, + String krbKeytabKey) { + String authentication = conf.get(HADOOP_SECURITY_AUTHENTICATION, null); + if (AuthType.KERBEROS.getDesc().equals(authentication)) { + KerberosAuthenticationConfig krbConfig = new KerberosAuthenticationConfig(); + krbConfig.setKerberosPrincipal(conf.get(krbPrincipalKey)); + krbConfig.setKerberosKeytab(conf.get(krbKeytabKey)); + krbConfig.setConf(conf); + return krbConfig; + } else { + // AuthType.SIMPLE + SimpleAuthenticationConfig simpleAuthenticationConfig = new SimpleAuthenticationConfig(); + simpleAuthenticationConfig.setUsername(conf.get(HADOOP_USER_NAME)); + return simpleAuthenticationConfig; + } + } +} diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopUGI.java b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopUGI.java new file mode 100644 index 00000000000000..5fb8f4fdab1cbc --- /dev/null +++ b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopUGI.java @@ -0,0 +1,99 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.security.authentication; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.IOException; + +public class HadoopUGI { + private static final Logger LOG = LogManager.getLogger(HadoopUGI.class); + + /** + * login and return hadoop ugi + * @param config auth config + * @return ugi + */ + public static UserGroupInformation loginWithUGI(AuthenticationConfig config) { + UserGroupInformation ugi; + if (config instanceof KerberosAuthenticationConfig) { + KerberosAuthenticationConfig krbConfig = (KerberosAuthenticationConfig) config; + Configuration hadoopConf = krbConfig.getConf(); + hadoopConf.set(AuthenticationConfig.HADOOP_KERBEROS_AUTHORIZATION, "true"); + UserGroupInformation.setConfiguration(hadoopConf); + String principal = krbConfig.getKerberosPrincipal(); + try { + // login hadoop with keytab and try checking TGT + ugi = UserGroupInformation.getLoginUser(); + LOG.debug("Current login user: {}", ugi.getUserName()); + if (ugi.hasKerberosCredentials() && StringUtils.equals(ugi.getUserName(), principal)) { + // if the current user is logged by kerberos and is the same user + // just use checkTGTAndReloginFromKeytab because this method will only relogin + // when the TGT is expired or is close to expiry + ugi.checkTGTAndReloginFromKeytab(); + return ugi; + } + } catch (IOException e) { + LOG.warn("A SecurityException occurs with kerberos, do login immediately.", e); + } + try { + ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, krbConfig.getKerberosKeytab()); + UserGroupInformation.setLoginUser(ugi); + LOG.debug("Login by kerberos authentication with principal: {}", principal); + return ugi; + } catch (IOException e) { + throw new RuntimeException(e); + } + } else { + String hadoopUserName = ((SimpleAuthenticationConfig) config).getUsername(); + if (hadoopUserName == null) { + hadoopUserName = "hadoop"; + LOG.debug(AuthenticationConfig.HADOOP_USER_NAME + " is unset, use default user: hadoop"); + } + ugi = UserGroupInformation.createRemoteUser(hadoopUserName); + UserGroupInformation.setLoginUser(ugi); + LOG.debug("Login by proxy user, hadoop.username: {}", hadoopUserName); + return ugi; + } + } + + /** + * use for HMSExternalCatalog to login + * @param config auth config + */ + public static void tryKrbLogin(String catalogName, AuthenticationConfig config) { + if (config instanceof KerberosAuthenticationConfig) { + KerberosAuthenticationConfig krbConfig = (KerberosAuthenticationConfig) config; + try { + /** + * Because metastore client is created by using + * {@link org.apache.hadoop.hive.metastore.RetryingMetaStoreClient#getProxy} + * it will relogin when TGT is expired, so we don't need to relogin manually. + */ + UserGroupInformation.loginUserFromKeytab(krbConfig.getKerberosPrincipal(), + krbConfig.getKerberosKeytab()); + } catch (IOException e) { + throw new RuntimeException("login with kerberos auth failed for catalog: " + catalogName, e); + } + } + } +} diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/KerberosAuthenticationConfig.java b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/KerberosAuthenticationConfig.java new file mode 100644 index 00000000000000..722cd0352b7d7d --- /dev/null +++ b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/KerberosAuthenticationConfig.java @@ -0,0 +1,34 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.security.authentication; + +import lombok.Data; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; + +@Data +public class KerberosAuthenticationConfig extends AuthenticationConfig { + private String kerberosPrincipal; + private String kerberosKeytab; + private Configuration conf; + + @Override + public boolean isValid() { + return StringUtils.isNotEmpty(kerberosPrincipal) && StringUtils.isNotEmpty(kerberosKeytab); + } +} diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/SimpleAuthenticationConfig.java b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/SimpleAuthenticationConfig.java new file mode 100644 index 00000000000000..57bb7887372609 --- /dev/null +++ b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/SimpleAuthenticationConfig.java @@ -0,0 +1,31 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.security.authentication; + +import lombok.Data; +import org.apache.commons.lang3.StringUtils; + +@Data +public class SimpleAuthenticationConfig extends AuthenticationConfig { + private String username; + + @Override + public boolean isValid() { + return StringUtils.isNotEmpty(username); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsResource.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsResource.java index d2a03aaf90c084..8ebd66c70937b7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsResource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsResource.java @@ -19,6 +19,7 @@ import org.apache.doris.common.DdlException; import org.apache.doris.common.proc.BaseProcResult; +import org.apache.doris.common.security.authentication.AuthenticationConfig; import org.apache.doris.thrift.THdfsConf; import org.apache.doris.thrift.THdfsParams; @@ -44,12 +45,6 @@ public class HdfsResource extends Resource { public static final String HADOOP_FS_PREFIX = "dfs."; public static String HADOOP_FS_NAME = "fs.defaultFS"; - // simple or kerberos - public static String HADOOP_USER_NAME = "hadoop.username"; - public static String HADOOP_SECURITY_AUTHENTICATION = "hadoop.security.authentication"; - public static String HADOOP_KERBEROS_PRINCIPAL = "hadoop.kerberos.principal"; - public static String HADOOP_KERBEROS_AUTHORIZATION = "hadoop.security.authorization"; - public static String HADOOP_KERBEROS_KEYTAB = "hadoop.kerberos.keytab"; public static String HADOOP_SHORT_CIRCUIT = "dfs.client.read.shortcircuit"; public static String HADOOP_SOCKET_PATH = "dfs.domain.socket.path"; public static String DSF_NAMESERVICES = "dfs.nameservices"; @@ -107,11 +102,11 @@ public static THdfsParams generateHdfsParam(Map properties) { for (Map.Entry property : properties.entrySet()) { if (property.getKey().equalsIgnoreCase(HADOOP_FS_NAME)) { tHdfsParams.setFsName(property.getValue()); - } else if (property.getKey().equalsIgnoreCase(HADOOP_USER_NAME)) { + } else if (property.getKey().equalsIgnoreCase(AuthenticationConfig.HADOOP_USER_NAME)) { tHdfsParams.setUser(property.getValue()); - } else if (property.getKey().equalsIgnoreCase(HADOOP_KERBEROS_PRINCIPAL)) { + } else if (property.getKey().equalsIgnoreCase(AuthenticationConfig.HADOOP_KERBEROS_PRINCIPAL)) { tHdfsParams.setHdfsKerberosPrincipal(property.getValue()); - } else if (property.getKey().equalsIgnoreCase(HADOOP_KERBEROS_KEYTAB)) { + } else if (property.getKey().equalsIgnoreCase(AuthenticationConfig.HADOOP_KERBEROS_KEYTAB)) { tHdfsParams.setHdfsKerberosKeytab(property.getValue()); } else { THdfsConf hdfsConf = new THdfsConf(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java index 6273b78b66b8f2..f8ddcb7f0a5f47 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java @@ -33,6 +33,8 @@ import org.apache.doris.catalog.external.HMSExternalTable; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; +import org.apache.doris.common.security.authentication.AuthenticationConfig; +import org.apache.doris.common.security.authentication.HadoopUGI; import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.property.constants.HMSProperties; import org.apache.doris.thrift.TExprOpcode; @@ -775,36 +777,22 @@ public static Schema getHudiTableSchema(HMSExternalTable table) { return hudiSchema; } - public static UserGroupInformation getUserGroupInformation(Configuration conf) { - UserGroupInformation ugi = null; - String authentication = conf.get(HdfsResource.HADOOP_SECURITY_AUTHENTICATION, null); - if (AuthType.KERBEROS.getDesc().equals(authentication)) { - conf.set("hadoop.security.authorization", "true"); - UserGroupInformation.setConfiguration(conf); - String principal = conf.get(HdfsResource.HADOOP_KERBEROS_PRINCIPAL); - String keytab = conf.get(HdfsResource.HADOOP_KERBEROS_KEYTAB); - try { - ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab); - UserGroupInformation.setLoginUser(ugi); - } catch (IOException e) { - throw new RuntimeException(e); - } - } else { - String hadoopUserName = conf.get(HdfsResource.HADOOP_USER_NAME); - if (hadoopUserName != null) { - ugi = UserGroupInformation.createRemoteUser(hadoopUserName); - } - } - return ugi; - } - public static T ugiDoAs(long catalogId, PrivilegedExceptionAction action) { return ugiDoAs(((ExternalCatalog) Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogId)).getConfiguration(), action); } public static T ugiDoAs(Configuration conf, PrivilegedExceptionAction action) { - UserGroupInformation ugi = getUserGroupInformation(conf); + AuthenticationConfig krbConfig = AuthenticationConfig.getKerberosConfig(conf, + AuthenticationConfig.HIVE_KERBEROS_PRINCIPAL, + AuthenticationConfig.HIVE_KERBEROS_KEYTAB); + if (!krbConfig.isValid()) { + // if hive config is not ready, then use hadoop kerberos to login + krbConfig = AuthenticationConfig.getKerberosConfig(conf, + AuthenticationConfig.HADOOP_KERBEROS_PRINCIPAL, + AuthenticationConfig.HADOOP_KERBEROS_KEYTAB); + } + UserGroupInformation ugi = HadoopUGI.loginWithUGI(krbConfig); try { if (ugi != null) { ugi.checkTGTAndReloginFromKeytab(); @@ -813,7 +801,7 @@ public static T ugiDoAs(Configuration conf, PrivilegedExceptionAction act return action.run(); } } catch (Exception e) { - throw new RuntimeException(e.getMessage(), e.getCause()); + throw new RuntimeException(e.getMessage(), e); } } @@ -821,7 +809,7 @@ public static HoodieTableMetaClient getHudiClient(HMSExternalTable table) { String hudiBasePath = table.getRemoteTable().getSd().getLocation(); Configuration conf = getConfiguration(table); - UserGroupInformation ugi = getUserGroupInformation(conf); + UserGroupInformation ugi = HadoopUGI.loginWithUGI(AuthenticationConfig.getKerberosConfig(conf)); HoodieTableMetaClient metaClient; if (ugi != null) { try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveTable.java index 550be96f7441e4..385b79d493eb98 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveTable.java @@ -19,6 +19,8 @@ import org.apache.doris.common.DdlException; import org.apache.doris.common.io.Text; +import org.apache.doris.common.security.authentication.AuthType; +import org.apache.doris.common.security.authentication.AuthenticationConfig; import org.apache.doris.datasource.property.constants.HMSProperties; import org.apache.doris.datasource.property.constants.S3Properties; import org.apache.doris.thrift.THiveTable; @@ -114,40 +116,41 @@ private void validate(Map properties) throws DdlException { } // check auth type - String authType = copiedProps.get(HdfsResource.HADOOP_SECURITY_AUTHENTICATION); + String authType = copiedProps.get(AuthenticationConfig.HADOOP_SECURITY_AUTHENTICATION); if (Strings.isNullOrEmpty(authType)) { authType = AuthType.SIMPLE.getDesc(); } if (!AuthType.isSupportedAuthType(authType)) { throw new DdlException(String.format(PROPERTY_ERROR_MSG, - HdfsResource.HADOOP_SECURITY_AUTHENTICATION, authType)); + AuthenticationConfig.HADOOP_SECURITY_AUTHENTICATION, authType)); } - copiedProps.remove(HdfsResource.HADOOP_SECURITY_AUTHENTICATION); - hiveProperties.put(HdfsResource.HADOOP_SECURITY_AUTHENTICATION, authType); + copiedProps.remove(AuthenticationConfig.HADOOP_SECURITY_AUTHENTICATION); + hiveProperties.put(AuthenticationConfig.HADOOP_SECURITY_AUTHENTICATION, authType); if (AuthType.KERBEROS.getDesc().equals(authType)) { // check principal - String principal = copiedProps.get(HdfsResource.HADOOP_KERBEROS_PRINCIPAL); + String principal = copiedProps.get(AuthenticationConfig.HADOOP_KERBEROS_PRINCIPAL); if (Strings.isNullOrEmpty(principal)) { throw new DdlException(String.format(PROPERTY_MISSING_MSG, - HdfsResource.HADOOP_KERBEROS_PRINCIPAL, HdfsResource.HADOOP_KERBEROS_PRINCIPAL)); + AuthenticationConfig.HADOOP_KERBEROS_PRINCIPAL, + AuthenticationConfig.HADOOP_KERBEROS_PRINCIPAL)); } - hiveProperties.put(HdfsResource.HADOOP_KERBEROS_PRINCIPAL, principal); - copiedProps.remove(HdfsResource.HADOOP_KERBEROS_PRINCIPAL); + hiveProperties.put(AuthenticationConfig.HADOOP_KERBEROS_PRINCIPAL, principal); + copiedProps.remove(AuthenticationConfig.HADOOP_KERBEROS_PRINCIPAL); // check keytab - String keytabPath = copiedProps.get(HdfsResource.HADOOP_KERBEROS_KEYTAB); + String keytabPath = copiedProps.get(AuthenticationConfig.HADOOP_KERBEROS_KEYTAB); if (Strings.isNullOrEmpty(keytabPath)) { throw new DdlException(String.format(PROPERTY_MISSING_MSG, - HdfsResource.HADOOP_KERBEROS_KEYTAB, HdfsResource.HADOOP_KERBEROS_KEYTAB)); + AuthenticationConfig.HADOOP_KERBEROS_KEYTAB, AuthenticationConfig.HADOOP_KERBEROS_KEYTAB)); } else { - hiveProperties.put(HdfsResource.HADOOP_KERBEROS_KEYTAB, keytabPath); - copiedProps.remove(HdfsResource.HADOOP_KERBEROS_KEYTAB); + hiveProperties.put(AuthenticationConfig.HADOOP_KERBEROS_KEYTAB, keytabPath); + copiedProps.remove(AuthenticationConfig.HADOOP_KERBEROS_KEYTAB); } } - String hdfsUserName = copiedProps.get(HdfsResource.HADOOP_USER_NAME); + String hdfsUserName = copiedProps.get(AuthenticationConfig.HADOOP_USER_NAME); if (!Strings.isNullOrEmpty(hdfsUserName)) { - hiveProperties.put(HdfsResource.HADOOP_USER_NAME, hdfsUserName); - copiedProps.remove(HdfsResource.HADOOP_USER_NAME); + hiveProperties.put(AuthenticationConfig.HADOOP_USER_NAME, hdfsUserName); + copiedProps.remove(AuthenticationConfig.HADOOP_USER_NAME); } if (!copiedProps.isEmpty()) { Iterator> iter = copiedProps.entrySet().iterator(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java index 4f385fad1ea985..92a15badefe2d9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java @@ -17,7 +17,6 @@ package org.apache.doris.datasource; -import org.apache.doris.catalog.AuthType; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.HdfsResource; import org.apache.doris.catalog.external.ExternalDatabase; @@ -26,6 +25,8 @@ import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; +import org.apache.doris.common.security.authentication.AuthenticationConfig; +import org.apache.doris.common.security.authentication.HadoopUGI; import org.apache.doris.datasource.hive.HMSCachedClient; import org.apache.doris.datasource.hive.HMSCachedClientFactory; import org.apache.doris.datasource.jdbc.client.JdbcClientConfig; @@ -36,11 +37,9 @@ import com.google.common.collect.Lists; import org.apache.commons.lang3.math.NumberUtils; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.io.IOException; import java.util.List; import java.util.Map; import java.util.Objects; @@ -143,27 +142,13 @@ protected void initLocalObjectsImpl() { } hiveConf.set(HiveConf.ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT.name(), String.valueOf(Config.hive_metastore_client_timeout_second)); - String authentication = catalogProperty.getOrDefault( - HdfsResource.HADOOP_SECURITY_AUTHENTICATION, ""); - if (AuthType.KERBEROS.getDesc().equals(authentication)) { - hiveConf.set(HdfsResource.HADOOP_SECURITY_AUTHENTICATION, authentication); - UserGroupInformation.setConfiguration(hiveConf); - try { - /** - * Because metastore client is created by using - * {@link org.apache.hadoop.hive.metastore.RetryingMetaStoreClient#getProxy} - * it will relogin when TGT is expired, so we don't need to relogin manually. - */ - UserGroupInformation.loginUserFromKeytab( - catalogProperty.getOrDefault(HdfsResource.HADOOP_KERBEROS_PRINCIPAL, ""), - catalogProperty.getOrDefault(HdfsResource.HADOOP_KERBEROS_KEYTAB, "")); - } catch (IOException e) { - throw new HMSClientException("login with kerberos auth failed for catalog %s", e, this.getName()); - } - } + HadoopUGI.tryKrbLogin(this.getName(), AuthenticationConfig.getKerberosConfig(hiveConf, + AuthenticationConfig.HIVE_KERBEROS_PRINCIPAL, + AuthenticationConfig.HIVE_KERBEROS_KEYTAB)); + client = HMSCachedClientFactory.createCachedClient(hiveConf, + Math.max(MIN_CLIENT_POOL_SIZE, Config.max_external_cache_loader_thread_pool_size), + jdbcClientConfig); } - client = HMSCachedClientFactory.createCachedClient(hiveConf, - Math.max(MIN_CLIENT_POOL_SIZE, Config.max_external_cache_loader_thread_pool_size), jdbcClientConfig); } @Override @@ -248,5 +233,4 @@ public String getHiveVersion() { protected List listDatabaseNames() { return client.getAllDatabases(); } - } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java index b65594060a6d56..c7bf28cda90e7b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java @@ -22,7 +22,6 @@ import org.apache.doris.backup.Status.ErrCode; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Env; -import org.apache.doris.catalog.HdfsResource; import org.apache.doris.catalog.ListPartitionItem; import org.apache.doris.catalog.PartitionItem; import org.apache.doris.catalog.PartitionKey; @@ -33,6 +32,7 @@ import org.apache.doris.common.FeConstants; import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; +import org.apache.doris.common.security.authentication.AuthenticationConfig; import org.apache.doris.common.util.CacheBulkLoader; import org.apache.doris.common.util.LocationPath; import org.apache.doris.datasource.CacheException; @@ -754,7 +754,7 @@ public AtomicReference> getFileCacheR public List getFilesByTransaction(List partitions, ValidWriteIdList validWriteIds, boolean isFullAcid, long tableId, String bindBrokerName) { List fileCacheValues = Lists.newArrayList(); - String remoteUser = jobConf.get(HdfsResource.HADOOP_USER_NAME); + String remoteUser = jobConf.get(AuthenticationConfig.HADOOP_USER_NAME); try { for (HivePartition partition : partitions) { FileCacheValue fileCacheValue = new FileCacheValue(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java index 79c83b80891d20..4d21ebbdf9d421 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java @@ -19,9 +19,9 @@ import org.apache.doris.analysis.StorageBackend; import org.apache.doris.backup.Status; -import org.apache.doris.catalog.AuthType; -import org.apache.doris.catalog.HdfsResource; import org.apache.doris.common.UserException; +import org.apache.doris.common.security.authentication.AuthenticationConfig; +import org.apache.doris.common.security.authentication.HadoopUGI; import org.apache.doris.common.util.URI; import org.apache.doris.fs.operations.HDFSFileOperations; import org.apache.doris.fs.operations.HDFSOpParams; @@ -82,7 +82,7 @@ protected FileSystem nativeFileSystem(String remotePath) throws UserException { conf.set(propEntry.getKey(), propEntry.getValue()); } - UserGroupInformation ugi = login(conf); + UserGroupInformation ugi = HadoopUGI.loginWithUGI(AuthenticationConfig.getKerberosConfig(conf)); try { dfsFileSystem = ugi.doAs((PrivilegedAction) () -> { try { @@ -100,58 +100,6 @@ protected FileSystem nativeFileSystem(String remotePath) throws UserException { return dfsFileSystem; } - private UserGroupInformation login(Configuration conf) throws UserException { - if (AuthType.KERBEROS.getDesc().equals( - conf.get(HdfsResource.HADOOP_SECURITY_AUTHENTICATION, null))) { - try { - UserGroupInformation ugi = UserGroupInformation.getLoginUser(); - String principal = conf.get(HdfsResource.HADOOP_KERBEROS_PRINCIPAL); - LOG.debug("Current login user: {}", ugi.getUserName()); - if (ugi.hasKerberosCredentials() && ugi.getUserName().equals(principal)) { - // if the current user is logged by kerberos and is the same user - // just use checkTGTAndReloginFromKeytab because this method will only relogin - // when the TGT is expired or is close to expiry - ugi.checkTGTAndReloginFromKeytab(); - return ugi; - } - } catch (IOException e) { - LOG.warn("A SecurityException occurs with kerberos, do login immediately.", e); - return doLogin(conf); - } - } - - return doLogin(conf); - } - - private UserGroupInformation doLogin(Configuration conf) throws UserException { - if (AuthType.KERBEROS.getDesc().equals( - conf.get(HdfsResource.HADOOP_SECURITY_AUTHENTICATION, null))) { - conf.set(HdfsResource.HADOOP_KERBEROS_AUTHORIZATION, "true"); - String principal = conf.get(HdfsResource.HADOOP_KERBEROS_PRINCIPAL); - String keytab = conf.get(HdfsResource.HADOOP_KERBEROS_KEYTAB); - - UserGroupInformation.setConfiguration(conf); - try { - UserGroupInformation ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab); - UserGroupInformation.setLoginUser(ugi); - LOG.info("Login by kerberos authentication with principal: {}", principal); - return ugi; - } catch (IOException e) { - throw new UserException(e); - } - } else { - String hadoopUserName = conf.get(HdfsResource.HADOOP_USER_NAME); - if (hadoopUserName == null) { - hadoopUserName = "hadoop"; - LOG.debug(HdfsResource.HADOOP_USER_NAME + " is unset, use default user: hadoop"); - } - UserGroupInformation ugi = UserGroupInformation.createRemoteUser(hadoopUserName); - UserGroupInformation.setLoginUser(ugi); - LOG.info("Login by proxy user, hadoop.username: {}", hadoopUserName); - return ugi; - } - } - @Override public Status downloadWithFileSize(String remoteFilePath, String localFilePath, long fileSize) { LOG.debug("download from {} to {}, file size: {}.", remoteFilePath, localFilePath, fileSize);