From 50d4f032187e65bf21416c0ca81a2d988450426f Mon Sep 17 00:00:00 2001 From: zhangstar333 Date: Fri, 31 Oct 2025 21:30:39 +0800 Subject: [PATCH] [function](agg) support agg function of group_array_union (#57166) support agg function of group_array_union, doc: https://github.com/apache/doris-website/pull/2996 ``` mysql> select * from array_test; +------+-----------------+ | id | c_array | +------+-----------------+ | 1 | [1, 2, 3, 4, 5] | | 2 | [6, 7] | +------+-----------------+ mysql> select group_array_union(c_array) from array_test; +----------------------------+ | group_array_union(c_array) | +----------------------------+ | [2, 3, 4, 5, 1, 6, 7] | +----------------------------+ ``` --- be/src/exprs/hybrid_set.h | 53 +- ...gregate_function_group_array_intersect.cpp | 66 --- ...aggregate_function_group_array_intersect.h | 507 ------------------ .../aggregate_function_group_array_set_op.cpp | 158 ++++++ .../aggregate_function_group_array_set_op.h | 480 +++++++++++++++++ .../aggregate_function_simple_factory.cpp | 4 +- .../agg_group_array_intersect_test.cpp | 38 +- .../catalog/BuiltinAggregateFunctions.java | 2 + .../functions/agg/GroupArrayUnion.java | 87 +++ .../visitor/AggregateFunctionVisitor.java | 5 + .../aggregate/group_array_intersect.out | 95 ++++ .../aggregate/group_array_intersect.groovy | 32 ++ 12 files changed, 930 insertions(+), 597 deletions(-) delete mode 100644 be/src/vec/aggregate_functions/aggregate_function_group_array_intersect.cpp delete mode 100644 be/src/vec/aggregate_functions/aggregate_function_group_array_intersect.h create mode 100644 be/src/vec/aggregate_functions/aggregate_function_group_array_set_op.cpp create mode 100644 be/src/vec/aggregate_functions/aggregate_function_group_array_set_op.h create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/GroupArrayUnion.java diff --git a/be/src/exprs/hybrid_set.h b/be/src/exprs/hybrid_set.h index 4fa96511421ec5..f4445276937b93 100644 --- a/be/src/exprs/hybrid_set.h +++ b/be/src/exprs/hybrid_set.h @@ -221,6 +221,9 @@ class HybridSetBase : public FilterBase { // use in vectorize execute engine virtual void insert(void* data, size_t) = 0; + virtual void insert_range_from(const vectorized::ColumnPtr& column, size_t start, + size_t end) = 0; + virtual void insert_fixed_len(const vectorized::ColumnPtr& column, size_t start) = 0; virtual void insert(HybridSetBase* set) { @@ -291,8 +294,16 @@ class HybridSet : public HybridSetBase { void insert(void* data, size_t /*unused*/) override { insert(data); } void insert_fixed_len(const vectorized::ColumnPtr& column, size_t start) override { - const auto size = column->size(); + insert_range_from(column, start, column->size()); + } + void insert_range_from(const vectorized::ColumnPtr& column, size_t start, size_t end) override { + if (end > column->size()) { + throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR, + "Parameters start = {}, end = {}, are out of bound in " + "HybridSet::insert_range_from method (data.size() = {}).", + start, end, column->size()); + } if (column->is_nullable()) { const auto* nullable = assert_cast(column.get()); const auto& col = nullable->get_nested_column(); @@ -301,7 +312,7 @@ class HybridSet : public HybridSetBase { .get_data(); const ElementType* data = (ElementType*)col.get_raw_data().data; - for (size_t i = start; i < size; i++) { + for (size_t i = start; i < end; i++) { if (!nullmap[i]) { _set.insert(*(data + i)); } else { @@ -310,7 +321,7 @@ class HybridSet : public HybridSetBase { } } else { const ElementType* data = (ElementType*)column->get_raw_data().data; - for (size_t i = start; i < size; i++) { + for (size_t i = start; i < end; i++) { _set.insert(*(data + i)); } } @@ -448,6 +459,16 @@ class StringSet : public HybridSetBase { } void insert_fixed_len(const vectorized::ColumnPtr& column, size_t start) override { + insert_range_from(column, start, column->size()); + } + + void insert_range_from(const vectorized::ColumnPtr& column, size_t start, size_t end) override { + if (end > column->size()) { + throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR, + "Parameters start = {}, end = {}, are out of bound in " + "StringSet::insert_range_from method (data.size() = {}).", + start, end, column->size()); + } if (column->is_nullable()) { const auto* nullable = assert_cast(column.get()); const auto& nullmap = @@ -456,19 +477,19 @@ class StringSet : public HybridSetBase { if (nullable->get_nested_column().is_column_string64()) { _insert_fixed_len_string(assert_cast( nullable->get_nested_column()), - nullmap.data(), start, nullmap.size()); + nullmap.data(), start, end); } else { _insert_fixed_len_string( assert_cast(nullable->get_nested_column()), - nullmap.data(), start, nullmap.size()); + nullmap.data(), start, end); } } else { if (column->is_column_string64()) { _insert_fixed_len_string(assert_cast(*column), - nullptr, start, column->size()); + nullptr, start, end); } else { _insert_fixed_len_string(assert_cast(*column), - nullptr, start, column->size()); + nullptr, start, end); } } } @@ -618,6 +639,16 @@ class StringValueSet : public HybridSetBase { } void insert_fixed_len(const vectorized::ColumnPtr& column, size_t start) override { + insert_range_from(column, start, column->size()); + } + + void insert_range_from(const vectorized::ColumnPtr& column, size_t start, size_t end) override { + if (end > column->size()) { + throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR, + "Parameters start = {}, end = {}, are out of bound in " + "StringSet::insert_range_from method (data.size() = {}).", + start, end, column->size()); + } if (column->is_nullable()) { const auto* nullable = assert_cast(column.get()); const auto& nullmap = @@ -626,19 +657,19 @@ class StringValueSet : public HybridSetBase { if (nullable->get_nested_column().is_column_string64()) { _insert_fixed_len_string(assert_cast( nullable->get_nested_column()), - nullmap.data(), start, nullmap.size()); + nullmap.data(), start, end); } else { _insert_fixed_len_string( assert_cast(nullable->get_nested_column()), - nullmap.data(), start, nullmap.size()); + nullmap.data(), start, end); } } else { if (column->is_column_string64()) { _insert_fixed_len_string(assert_cast(*column), - nullptr, start, column->size()); + nullptr, start, end); } else { _insert_fixed_len_string(assert_cast(*column), - nullptr, start, column->size()); + nullptr, start, end); } } } diff --git a/be/src/vec/aggregate_functions/aggregate_function_group_array_intersect.cpp b/be/src/vec/aggregate_functions/aggregate_function_group_array_intersect.cpp deleted file mode 100644 index 697e5776824512..00000000000000 --- a/be/src/vec/aggregate_functions/aggregate_function_group_array_intersect.cpp +++ /dev/null @@ -1,66 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. -// This file is copied from -// https://github.com/ClickHouse/ClickHouse/blob/master/src/AggregateFunctions/AggregateFunctionGroupArrayIntersect.cpp -// and modified by Doris - -#include "vec/aggregate_functions/aggregate_function_group_array_intersect.h" - -#include "vec/aggregate_functions/factory_helpers.h" -#include "vec/aggregate_functions/helpers.h" - -namespace doris::vectorized { -#include "common/compile_check_begin.h" - -inline AggregateFunctionPtr create_aggregate_function_group_array_intersect_impl( - const std::string& name, const DataTypes& argument_types, const bool result_is_nullable, - const AggregateFunctionAttr& attr) { - const auto& nested_type = remove_nullable( - dynamic_cast(*(argument_types[0])).get_nested_type()); - AggregateFunctionPtr res = creator_with_type_list< - TYPE_TINYINT, TYPE_SMALLINT, TYPE_INT, TYPE_BIGINT, TYPE_LARGEINT, TYPE_DATEV2, - TYPE_DATETIMEV2>::create(argument_types, - result_is_nullable, - attr); - - if (!res) { - res = AggregateFunctionPtr(new AggregateFunctionGroupArrayIntersectGeneric(argument_types)); - } - return res; -} - -AggregateFunctionPtr create_aggregate_function_group_array_intersect( - const std::string& name, const DataTypes& argument_types, const bool result_is_nullable, - const AggregateFunctionAttr& attr) { - assert_arity_range(name, argument_types, 1, 1); - const DataTypePtr& argument_type = remove_nullable(argument_types[0]); - - if (argument_type->get_primitive_type() != TYPE_ARRAY) { - throw Exception(ErrorCode::INVALID_ARGUMENT, - "Aggregate function groupArrayIntersect accepts only array type argument. " - "Provided argument type: " + - argument_type->get_name()); - } - return create_aggregate_function_group_array_intersect_impl(name, {argument_type}, - result_is_nullable, attr); -} - -void register_aggregate_function_group_array_intersect(AggregateFunctionSimpleFactory& factory) { - factory.register_function_both("group_array_intersect", - create_aggregate_function_group_array_intersect); -} -} // namespace doris::vectorized diff --git a/be/src/vec/aggregate_functions/aggregate_function_group_array_intersect.h b/be/src/vec/aggregate_functions/aggregate_function_group_array_intersect.h deleted file mode 100644 index dd0982902ea7b3..00000000000000 --- a/be/src/vec/aggregate_functions/aggregate_function_group_array_intersect.h +++ /dev/null @@ -1,507 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. -// This file is copied from -// https://github.com/ClickHouse/ClickHouse/blob/master/src/AggregateFunctions/AggregateFunctionGroupArrayIntersect.cpp -// and modified by Doris - -#include - -#include "exprs/hybrid_set.h" -#include "vec/aggregate_functions/aggregate_function.h" -#include "vec/aggregate_functions/aggregate_function_simple_factory.h" -#include "vec/columns/column_array.h" -#include "vec/common/assert_cast.h" -#include "vec/core/field.h" -#include "vec/data_types/data_type_array.h" -#include "vec/data_types/data_type_date_or_datetime_v2.h" -#include "vec/data_types/data_type_string.h" - -namespace doris::vectorized { -#include "common/compile_check_begin.h" -class Arena; -class BufferReadable; -class BufferWritable; -} // namespace doris::vectorized - -namespace doris::vectorized { - -template -class NullableNumericOrDateSet - : public HybridSet::CppType>> { -public: - NullableNumericOrDateSet() - : HybridSet < T - == TYPE_BOOLEAN - ? TYPE_TINYINT - : T, - DynamicContainer < typename PrimitiveTypeTraits < T == TYPE_BOOLEAN ? TYPE_TINYINT - : T > ::CppType >> (true) {} - - void change_contain_null_value(bool target_value) { this->_contain_null = target_value; } -}; - -template -struct AggregateFunctionGroupArrayIntersectData { - using ColVecType = typename PrimitiveTypeTraits::ColumnType; - using NullableNumericOrDateSetType = NullableNumericOrDateSet; - using Set = std::unique_ptr; - - AggregateFunctionGroupArrayIntersectData() - : value(std::make_unique()) {} - - Set value; - bool init = false; - - void reset() { - init = false; - value = std::make_unique(); - } - - void process_col_data(auto& column_data, size_t offset, size_t arr_size, Set& set) { - const bool is_column_data_nullable = column_data.is_nullable(); - - const ColumnNullable* col_null = nullptr; - const ColVecType* nested_column_data = nullptr; - - if (is_column_data_nullable) { - const auto* const_col_data = &column_data; - col_null = static_cast(const_col_data); - nested_column_data = &assert_cast( - col_null->get_nested_column()); - } else { - nested_column_data = &static_cast(column_data); - } - - if (!init) { - for (size_t i = 0; i < arr_size; ++i) { - const bool is_null_element = - is_column_data_nullable && col_null->is_null_at(offset + i); - const typename PrimitiveTypeTraits::ColumnItemType* src_data = - is_null_element ? nullptr : &(nested_column_data->get_element(offset + i)); - - set->insert(src_data); - } - init = true; - } else if (!set->empty()) { - Set new_set = std::make_unique(); - - for (size_t i = 0; i < arr_size; ++i) { - const bool is_null_element = - is_column_data_nullable && col_null->is_null_at(offset + i); - const typename PrimitiveTypeTraits::ColumnItemType* src_data = - is_null_element ? nullptr : &(nested_column_data->get_element(offset + i)); - - if ((!is_null_element && set->find(src_data)) || - (set->contain_null() && is_null_element)) { - new_set->insert(src_data); - } - } - set = std::move(new_set); - } - } -}; - -/// Puts all values to the hybrid set. Returns an array of unique values. Implemented for numeric/date types. -template -class AggregateFunctionGroupArrayIntersect - : public IAggregateFunctionDataHelper, - AggregateFunctionGroupArrayIntersect>, - UnaryExpression, - NotNullableAggregateFunction { -private: - using State = AggregateFunctionGroupArrayIntersectData; - DataTypePtr argument_type; - -public: - AggregateFunctionGroupArrayIntersect(const DataTypes& argument_types_) - : IAggregateFunctionDataHelper, - AggregateFunctionGroupArrayIntersect>( - argument_types_), - argument_type(argument_types_[0]) {} - - AggregateFunctionGroupArrayIntersect(const DataTypes& argument_types_, - const bool result_is_nullable) - : IAggregateFunctionDataHelper, - AggregateFunctionGroupArrayIntersect>( - argument_types_), - argument_type(argument_types_[0]) {} - - String get_name() const override { return "group_array_intersect"; } - - DataTypePtr get_return_type() const override { return argument_type; } - - void reset(AggregateDataPtr __restrict place) const override { this->data(place).reset(); } - - void add(AggregateDataPtr __restrict place, const IColumn** columns, ssize_t row_num, - Arena&) const override { - auto& data = this->data(place); - auto& set = data.value; - - const bool col_is_nullable = (*columns[0]).is_nullable(); - const ColumnArray& column = - col_is_nullable - ? assert_cast( - assert_cast( - *columns[0]) - .get_nested_column()) - : assert_cast(*columns[0]); - - const auto& offsets = column.get_offsets(); - const auto offset = offsets[row_num - 1]; - const auto arr_size = offsets[row_num] - offset; - const auto& column_data = column.get_data(); - - data.process_col_data(column_data, offset, arr_size, set); - } - - void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, - Arena&) const override { - auto& data = this->data(place); - auto& set = data.value; - auto& rhs_set = this->data(rhs).value; - - if (!this->data(rhs).init) { - return; - } - - auto& init = data.init; - if (!init) { - set->change_contain_null_value(rhs_set->contain_null()); - HybridSetBase::IteratorBase* it = rhs_set->begin(); - while (it->has_next()) { - const void* value = it->get_value(); - set->insert(value); - it->next(); - } - init = true; - } else if (!set->empty()) { - auto create_new_set = [](auto& lhs_val, auto& rhs_val) { - typename State::Set new_set = - std::make_unique(); - HybridSetBase::IteratorBase* it = lhs_val->begin(); - while (it->has_next()) { - const void* value = it->get_value(); - if ((rhs_val->find(value))) { - new_set->insert(value); - } - it->next(); - } - new_set->change_contain_null_value(lhs_val->contain_null() && - rhs_val->contain_null()); - return new_set; - }; - auto new_set = rhs_set->size() < set->size() ? create_new_set(rhs_set, set) - : create_new_set(set, rhs_set); - set = std::move(new_set); - } - } - - void serialize(ConstAggregateDataPtr __restrict place, BufferWritable& buf) const override { - auto& data = this->data(place); - auto& set = data.value; - auto& init = data.init; - const bool is_set_contain_null = set->contain_null(); - - buf.write_binary(is_set_contain_null); - buf.write_binary(init); - buf.write_var_uint(set->size()); - HybridSetBase::IteratorBase* it = set->begin(); - - while (it->has_next()) { - const typename PrimitiveTypeTraits::CppType* value_ptr = - static_cast::CppType*>(it->get_value()); - buf.write_binary((*value_ptr)); - it->next(); - } - } - - void deserialize(AggregateDataPtr __restrict place, BufferReadable& buf, - Arena&) const override { - auto& data = this->data(place); - bool is_set_contain_null; - - buf.read_binary(is_set_contain_null); - data.value->change_contain_null_value(is_set_contain_null); - buf.read_binary(data.init); - UInt64 size; - buf.read_var_uint(size); - - typename PrimitiveTypeTraits::CppType element; - for (UInt64 i = 0; i < size; ++i) { - buf.read_binary(element); - data.value->insert(static_cast(&element)); - } - } - - void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn& to) const override { - ColumnArray& arr_to = assert_cast(to); - ColumnArray::Offsets64& offsets_to = arr_to.get_offsets(); - auto& to_nested_col = arr_to.get_data(); - const bool is_nullable = to_nested_col.is_nullable(); - - auto insert_values = [](typename State::ColVecType& nested_col, auto& set, - bool is_nullable = false, ColumnNullable* col_null = nullptr) { - size_t old_size = nested_col.get_data().size(); - size_t res_size = set->size(); - size_t i = 0; - - if (is_nullable && set->contain_null()) { - col_null->insert_data(nullptr, 0); - res_size += 1; - i = 1; - } - - nested_col.get_data().resize(old_size + res_size); - - HybridSetBase::IteratorBase* it = set->begin(); - while (it->has_next()) { - const auto value = - *reinterpret_cast::ColumnItemType*>( - it->get_value()); - nested_col.get_data()[old_size + i] = value; - if (is_nullable) { - col_null->get_null_map_data().push_back(0); - } - it->next(); - ++i; - } - }; - - const auto& set = this->data(place).value; - if (is_nullable) { - auto col_null = reinterpret_cast(&to_nested_col); - auto& nested_col = - assert_cast(col_null->get_nested_column()); - offsets_to.push_back(offsets_to.back() + set->size() + (set->contain_null() ? 1 : 0)); - insert_values(nested_col, set, true, col_null); - } else { - auto& nested_col = static_cast(to_nested_col); - offsets_to.push_back(offsets_to.back() + set->size()); - insert_values(nested_col, set); - } - } -}; - -/// Generic implementation, it uses serialized representation as object descriptor. -class NullableStringSet : public StringValueSet> { -public: - NullableStringSet() : StringValueSet>(true) {} - - void change_contain_null_value(bool target_value) { this->_contain_null = target_value; } -}; - -struct AggregateFunctionGroupArrayIntersectGenericData { - using Set = std::unique_ptr; - - AggregateFunctionGroupArrayIntersectGenericData() - : value(std::make_unique()) {} - Set value; - bool init = false; - - void reset() { - init = false; - value = std::make_unique(); - } -}; - -/** Template parameter with true value should be used for columns that store their elements in memory continuously. - * For such columns group_array_intersect() can be implemented more efficiently (especially for small numeric arrays). - */ -class AggregateFunctionGroupArrayIntersectGeneric - : public IAggregateFunctionDataHelper { -private: - using State = AggregateFunctionGroupArrayIntersectGenericData; - DataTypePtr input_data_type; - -public: - AggregateFunctionGroupArrayIntersectGeneric(const DataTypes& input_data_type_) - : IAggregateFunctionDataHelper( - input_data_type_), - input_data_type(input_data_type_[0]) {} - - String get_name() const override { return "group_array_intersect"; } - - DataTypePtr get_return_type() const override { return input_data_type; } - - void reset(AggregateDataPtr __restrict place) const override { this->data(place).reset(); } - - void add(AggregateDataPtr __restrict place, const IColumn** columns, ssize_t row_num, - Arena& arena) const override { - auto& data = this->data(place); - auto& init = data.init; - auto& set = data.value; - - const bool col_is_nullable = (*columns[0]).is_nullable(); - const ColumnArray& column = - col_is_nullable - ? assert_cast( - assert_cast( - *columns[0]) - .get_nested_column()) - : assert_cast(*columns[0]); - - const auto nested_column_data = column.get_data_ptr(); - const auto& offsets = column.get_offsets(); - const auto offset = offsets[row_num - 1]; - const auto arr_size = offsets[row_num] - offset; - const auto& column_data = column.get_data(); - const bool is_column_data_nullable = column_data.is_nullable(); - const ColumnNullable* col_null = nullptr; - - if (is_column_data_nullable) { - col_null = static_cast(&column_data); - } - - auto process_element = [&](size_t i) { - const bool is_null_element = - is_column_data_nullable && col_null->is_null_at(offset + i); - - StringRef src = nested_column_data->get_data_at(offset + i); - - src.data = is_null_element ? nullptr : arena.insert(src.data, src.size); - return src; - }; - - if (!init) { - for (size_t i = 0; i < arr_size; ++i) { - StringRef src = process_element(i); - set->insert((void*)src.data, src.size); - } - init = true; - } else if (!set->empty()) { - typename State::Set new_set = std::make_unique(); - - for (size_t i = 0; i < arr_size; ++i) { - StringRef src = process_element(i); - if ((set->find(src.data, src.size) && src.data != nullptr) || - (set->contain_null() && src.data == nullptr)) { - new_set->insert((void*)src.data, src.size); - } - } - set = std::move(new_set); - } - } - - void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, - Arena&) const override { - auto& data = this->data(place); - auto& set = data.value; - auto& rhs_set = this->data(rhs).value; - - if (!this->data(rhs).init) { - return; - } - - auto& init = data.init; - if (!init) { - set->change_contain_null_value(rhs_set->contain_null()); - HybridSetBase::IteratorBase* it = rhs_set->begin(); - while (it->has_next()) { - const auto* value = reinterpret_cast(it->get_value()); - set->insert((void*)(value->data), value->size); - it->next(); - } - init = true; - } else if (!set->empty()) { - auto create_new_set = [](auto& lhs_val, auto& rhs_val) { - typename State::Set new_set = std::make_unique(); - HybridSetBase::IteratorBase* it = lhs_val->begin(); - while (it->has_next()) { - const auto* value = reinterpret_cast(it->get_value()); - if (rhs_val->find(value)) { - new_set->insert((void*)value->data, value->size); - } - it->next(); - } - new_set->change_contain_null_value(lhs_val->contain_null() && - rhs_val->contain_null()); - return new_set; - }; - auto new_set = rhs_set->size() < set->size() ? create_new_set(rhs_set, set) - : create_new_set(set, rhs_set); - set = std::move(new_set); - } - } - - void serialize(ConstAggregateDataPtr __restrict place, BufferWritable& buf) const override { - auto& data = this->data(place); - auto& set = data.value; - auto& init = data.init; - const bool is_set_contain_null = set->contain_null(); - - buf.write_binary(is_set_contain_null); - buf.write_binary(init); - buf.write_var_uint(set->size()); - - HybridSetBase::IteratorBase* it = set->begin(); - while (it->has_next()) { - const auto* value = reinterpret_cast(it->get_value()); - buf.write_binary(*value); - it->next(); - } - } - - void deserialize(AggregateDataPtr __restrict place, BufferReadable& buf, - Arena& arena) const override { - auto& data = this->data(place); - bool is_set_contain_null; - - buf.read_binary(is_set_contain_null); - data.value->change_contain_null_value(is_set_contain_null); - buf.read_binary(data.init); - UInt64 size; - buf.read_var_uint(size); - - StringRef element; - for (UInt64 i = 0; i < size; ++i) { - element = buf.read_binary_into(arena); - data.value->insert((void*)element.data, element.size); - } - } - - void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn& to) const override { - auto& arr_to = assert_cast(to); - ColumnArray::Offsets64& offsets_to = arr_to.get_offsets(); - auto& data_to = arr_to.get_data(); - auto col_null = reinterpret_cast(&data_to); - - const auto& set = this->data(place).value; - auto res_size = set->size(); - - if (set->contain_null()) { - col_null->insert_data(nullptr, 0); - res_size += 1; - } - - offsets_to.push_back(offsets_to.back() + res_size); - - HybridSetBase::IteratorBase* it = set->begin(); - while (it->has_next()) { - const auto* value = reinterpret_cast(it->get_value()); - data_to.insert_data(value->data, value->size); - it->next(); - } - } -}; - -} // namespace doris::vectorized - -#include "common/compile_check_end.h" diff --git a/be/src/vec/aggregate_functions/aggregate_function_group_array_set_op.cpp b/be/src/vec/aggregate_functions/aggregate_function_group_array_set_op.cpp new file mode 100644 index 00000000000000..b0eaf53179f222 --- /dev/null +++ b/be/src/vec/aggregate_functions/aggregate_function_group_array_set_op.cpp @@ -0,0 +1,158 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/ClickHouse/ClickHouse/blob/master/src/AggregateFunctions/AggregateFunctionGroupArrayIntersect.cpp +// and modified by Doris + +#include "vec/aggregate_functions/aggregate_function_group_array_set_op.h" + +#include + +#include "runtime/define_primitive_type.h" +#include "runtime/primitive_type.h" +#include "vec/aggregate_functions/aggregate_function.h" +#include "vec/aggregate_functions/factory_helpers.h" +#include "vec/aggregate_functions/helpers.h" + +namespace doris::vectorized { +#include "common/compile_check_begin.h" + +template