diff --git a/be/src/exec/base_scanner.cpp b/be/src/exec/base_scanner.cpp index 5808d35021cf94..def4bae3f21b73 100644 --- a/be/src/exec/base_scanner.cpp +++ b/be/src/exec/base_scanner.cpp @@ -304,6 +304,7 @@ Status BaseScanner::_materialize_dest_block(vectorized::Block* dest_block) { size_t rows = _src_block.rows(); auto filter_column = vectorized::ColumnUInt8::create(rows, 1); auto& filter_map = filter_column->get_data(); + auto origin_column_num = _src_block.columns(); for (auto slot_desc : _dest_tuple_desc->slots()) { if (!slot_desc->is_materialized()) { @@ -315,7 +316,11 @@ Status BaseScanner::_materialize_dest_block(vectorized::Block* dest_block) { int result_column_id = -1; // PT1 => dest primitive type RETURN_IF_ERROR(ctx->execute(&_src_block, &result_column_id)); - auto column_ptr = _src_block.get_by_position(result_column_id).column; + bool is_origin_column = result_column_id < origin_column_num; + auto column_ptr = is_origin_column ? + _src_block.get_by_position(result_column_id).column->clone_resized(rows) : + _src_block.get_by_position(result_column_id).column; + DCHECK(column_ptr != nullptr); // because of src_slot_desc is always be nullable, so the column_ptr after do dest_expr @@ -373,7 +378,7 @@ Status BaseScanner::_materialize_dest_block(vectorized::Block* dest_block) { } // after do the dest block insert operation, clear _src_block to remove the reference of origin column - _src_block.clear(); + _src_block.clear_column_data(origin_column_num); size_t dest_size = dest_block->columns(); // do filter @@ -389,15 +394,18 @@ Status BaseScanner::_materialize_dest_block(vectorized::Block* dest_block) { // TODO: opt the reuse of src_block or dest_block column. some case we have to // shallow copy the column of src_block to dest block Status BaseScanner::_init_src_block() { - DCHECK(_src_block.columns() == 0); - for (auto i = 0; i < _num_of_columns_from_file; ++i) { - SlotDescriptor* slot_desc = _src_slot_descs[i]; - if (slot_desc == nullptr) { - continue; + if (_src_block.is_empty_column()) { + for (auto i = 0; i < _num_of_columns_from_file; ++i) { + SlotDescriptor* slot_desc = _src_slot_descs[i]; + if (slot_desc == nullptr) { + continue; + } + auto data_type = slot_desc->get_data_type_ptr(); + auto column_ptr = data_type->create_column(); + column_ptr->reserve(4096); + _src_block.insert(vectorized::ColumnWithTypeAndName(std::move(column_ptr), data_type, + slot_desc->col_name())); } - auto data_type = slot_desc->get_data_type_ptr(); - _src_block.insert(vectorized::ColumnWithTypeAndName( - data_type->create_column(), slot_desc->get_data_type_ptr(), slot_desc->col_name())); } return Status::OK(); diff --git a/be/src/exec/text_converter.hpp b/be/src/exec/text_converter.hpp index 8baa90b6d50b0b..88203c2437256a 100644 --- a/be/src/exec/text_converter.hpp +++ b/be/src/exec/text_converter.hpp @@ -169,19 +169,14 @@ inline bool TextConverter::write_slot(const SlotDescriptor* slot_desc, Tuple* tu inline void TextConverter::write_string_column(const SlotDescriptor* slot_desc, vectorized::MutableColumnPtr* column_ptr, const char* data, size_t len) { - vectorized::IColumn* col_ptr = column_ptr->get(); - // \N means it's NULL - if (LIKELY(slot_desc->is_nullable())) { - auto* nullable_column = reinterpret_cast(column_ptr->get()); - if ((len == 2 && data[0] == '\\' && data[1] == 'N') || len == SQL_NULL_DATA) { - nullable_column->insert_data(nullptr, 0); - return; - } else { - nullable_column->get_null_map_data().push_back(0); - col_ptr = &nullable_column->get_nested_column(); - } + auto* nullable_column = reinterpret_cast(column_ptr->get()); + if (len == 2 && data[0] == '\\' && data[1] == 'N') { + nullable_column->get_null_map_data().push_back(1); + reinterpret_cast(nullable_column->get_nested_column()).insert_default(); + } else { + nullable_column->get_null_map_data().push_back(0); + reinterpret_cast(nullable_column->get_nested_column()).insert_data(data, len); } - reinterpret_cast(col_ptr)->insert_data(data, len); } inline bool TextConverter::write_column(const SlotDescriptor* slot_desc, diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp index 8728803a4d7619..e463892eb08111 100644 --- a/be/src/vec/core/block.cpp +++ b/be/src/vec/core/block.cpp @@ -931,11 +931,12 @@ void Block::deep_copy_slot(void* dst, MemPool* pool, const doris::TypeDescriptor } } -MutableBlock::MutableBlock(const std::vector& tuple_descs) { +MutableBlock::MutableBlock(const std::vector& tuple_descs, int reserve_size) { for (auto tuple_desc : tuple_descs) { for (auto slot_desc : tuple_desc->slots()) { _data_types.emplace_back(slot_desc->get_data_type_ptr()); _columns.emplace_back(_data_types.back()->create_column()); + if (reserve_size != 0) { _columns.back()->reserve(reserve_size); } } } } diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h index b4fd18bb0c7cbb..65ca36cefcc198 100644 --- a/be/src/vec/core/block.h +++ b/be/src/vec/core/block.h @@ -372,7 +372,7 @@ class MutableBlock { MutableBlock() = default; ~MutableBlock() = default; - MutableBlock(const std::vector& tuple_descs); + MutableBlock(const std::vector& tuple_descs, int reserve_size = 0); MutableBlock(Block* block) : _columns(block->mutate_columns()), _data_types(block->get_data_types()) {} diff --git a/be/src/vec/exec/vbroker_scanner.cpp b/be/src/vec/exec/vbroker_scanner.cpp index 41d0dcac1d7dec..87841a8083e2b4 100644 --- a/be/src/vec/exec/vbroker_scanner.cpp +++ b/be/src/vec/exec/vbroker_scanner.cpp @@ -48,6 +48,7 @@ VBrokerScanner::~VBrokerScanner() = default; Status VBrokerScanner::get_next(Block* output_block, bool* eof) { SCOPED_TIMER(_read_timer); + RETURN_IF_ERROR(_init_src_block()); const int batch_size = _state->batch_size(); @@ -88,7 +89,7 @@ Status VBrokerScanner::get_next(Block* output_block, bool* eof) { Status VBrokerScanner::_fill_dest_columns(const Slice& line, std::vector& columns) { RETURN_IF_ERROR(_line_split_to_values(line)); - if (!_success) { + if (UNLIKELY(!_success)) { // If not success, which means we met an invalid row, return. return Status::OK(); } @@ -98,19 +99,8 @@ Status VBrokerScanner::_fill_dest_columns(const Slice& line, int dest_index = idx++; auto src_slot_desc = _src_slot_descs[i]; - if (!src_slot_desc->is_materialized()) { - continue; - } const Slice& value = _split_values[i]; - if (is_null(value)) { - // nullable - auto* nullable_column = - reinterpret_cast(columns[dest_index].get()); - nullable_column->insert_default(); - continue; - } - _text_converter->write_string_column(src_slot_desc, &columns[dest_index], value.data, value.size); } diff --git a/be/src/vec/exec/volap_scan_node.cpp b/be/src/vec/exec/volap_scan_node.cpp index 53e8baee9283b5..9224054efc8962 100644 --- a/be/src/vec/exec/volap_scan_node.cpp +++ b/be/src/vec/exec/volap_scan_node.cpp @@ -310,8 +310,7 @@ void VOlapScanNode::transfer_thread(RuntimeState* state) { auto block_per_scanner = (doris_scanner_row_num + (_block_size - 1)) / _block_size; auto pre_block_count = std::min(_volap_scanners.size(), - static_cast(config::doris_scanner_thread_pool_thread_num)) * - block_per_scanner; + static_cast(config::thrift_connect_timeout_seconds)) * block_per_scanner; for (int i = 0; i < pre_block_count; ++i) { auto block = new Block(_tuple_desc->slots(), _block_size); diff --git a/be/src/vec/functions/function_cast.h b/be/src/vec/functions/function_cast.h index 11d8d7ed442b76..5e33ef5e0fb1f2 100644 --- a/be/src/vec/functions/function_cast.h +++ b/be/src/vec/functions/function_cast.h @@ -1275,31 +1275,44 @@ class FunctionCast final : public IFunctionBase { const auto& nested_type = nullable_type.get_nested_type(); Block tmp_block; - if (source_is_nullable) - tmp_block = create_block_with_nested_columns(block, arguments); - else - tmp_block = block; + if (source_is_nullable) { + tmp_block = create_block_with_nested_columns_only_args(block, arguments); + size_t tmp_res_index = tmp_block.columns(); + tmp_block.insert({nullptr, nested_type, ""}); + + /// Perform the requested conversion. + RETURN_IF_ERROR(wrapper(context, tmp_block, {0}, tmp_res_index, + input_rows_count)); + + const auto& tmp_res = tmp_block.get_by_position(tmp_res_index); - size_t tmp_res_index = block.columns(); - tmp_block.insert({nullptr, nested_type, ""}); + res.column = wrap_in_nullable(tmp_res.column, + Block({block.get_by_position(arguments[0]), tmp_res}), + {0}, 1, input_rows_count); + } else { + tmp_block = block; - /// Perform the requested conversion. - RETURN_IF_ERROR( - wrapper(context, tmp_block, arguments, tmp_res_index, input_rows_count)); + size_t tmp_res_index = block.columns(); + tmp_block.insert({nullptr, nested_type, ""}); - const auto& tmp_res = tmp_block.get_by_position(tmp_res_index); + /// Perform the requested conversion. + RETURN_IF_ERROR(wrapper(context, tmp_block, arguments, tmp_res_index, + input_rows_count)); - /// May happen in fuzzy tests. For debug purpose. - if (!tmp_res.column.get()) { - return Status::RuntimeError( - "Couldn't convert {} to {} in prepare_remove_nullable wrapper.", - block.get_by_position(arguments[0]).type->get_name(), - nested_type->get_name()); + res.column = tmp_block.get_by_position(tmp_res_index).column; } - res.column = wrap_in_nullable(tmp_res.column, - Block({block.get_by_position(arguments[0]), tmp_res}), - {0}, 1, input_rows_count); +// /// May happen in fuzzy tests. For debug purpose. +// if (!tmp_res.column.get()) { +// return Status::RuntimeError( +// "Couldn't convert {} to {} in prepare_remove_nullable wrapper.", +// block.get_by_position(arguments[0]).type->get_name(), +// nested_type->get_name()); +// } + +// res.column = wrap_in_nullable(tmp_res.column, +// Block({block.get_by_position(arguments[0]), tmp_res}), +// {0}, 1, input_rows_count); return Status::OK(); }; } else if (source_is_nullable) { diff --git a/be/src/vec/functions/function_helpers.cpp b/be/src/vec/functions/function_helpers.cpp index 0e98d88ee027d8..f0bc6cd8646460 100644 --- a/be/src/vec/functions/function_helpers.cpp +++ b/be/src/vec/functions/function_helpers.cpp @@ -26,6 +26,42 @@ namespace doris::vectorized { +Block create_block_with_nested_columns_only_args(const Block& block, const ColumnNumbers& args) { + std::unordered_set args_set(args.begin(), args.end()); + Block res; + size_t columns = block.columns(); + + for (size_t i = 0; i < columns; ++i) { + const auto& col = block.get_by_position(i); + + if (args_set.count(i)) { + if (col.type->is_nullable()) { + const DataTypePtr& nested_type = + static_cast(*col.type).get_nested_type(); + + if (!col.column) { + res.insert({nullptr, nested_type, col.name}); + } else if (auto* nullable = check_and_get_column(*col.column)) { + const auto& nested_col = nullable->get_nested_column_ptr(); + res.insert({nested_col, nested_type, col.name}); + } else if (auto* const_column = check_and_get_column(*col.column)) { + const auto& nested_col = + check_and_get_column(const_column->get_data_column()) + ->get_nested_column_ptr(); + res.insert({ColumnConst::create(nested_col, col.column->size()), nested_type, + col.name}); + } else { + LOG(FATAL) << "Illegal column for DataTypeNullable"; + } + } else { + res.insert(col); + } + } + } + + return res; +} + static Block create_block_with_nested_columns_impl(const Block& block, const std::unordered_set& args) { Block res; diff --git a/be/src/vec/functions/function_helpers.h b/be/src/vec/functions/function_helpers.h index ac6601b06ee979..f8555c358bee40 100644 --- a/be/src/vec/functions/function_helpers.h +++ b/be/src/vec/functions/function_helpers.h @@ -95,6 +95,11 @@ Block create_block_with_nested_columns(const Block& block, const ColumnNumbers& Block create_block_with_nested_columns(const Block& block, const ColumnNumbers& args, size_t result); +/// Returns the copy of a given block in which each column specified in +/// the "arguments" parameter is replaced with its respective nested +/// column if it is nullable. only contain args +Block create_block_with_nested_columns_only_args(const Block& block, const ColumnNumbers& args); + /// Checks argument type at specified index with predicate. /// throws if there is no argument at specified index or if predicate returns false. void validate_argument_type(const IFunction& func, const DataTypes& arguments, diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp index a9d091bf754f60..6fcc065d65503d 100644 --- a/be/src/vec/sink/vtablet_sink.cpp +++ b/be/src/vec/sink/vtablet_sink.cpp @@ -56,7 +56,7 @@ void VNodeChannel::clear_all_blocks() { Status VNodeChannel::init(RuntimeState* state) { RETURN_IF_ERROR(NodeChannel::init(state)); - _cur_mutable_block.reset(new vectorized::MutableBlock({_tuple_desc})); + _cur_mutable_block.reset(new vectorized::MutableBlock({_tuple_desc}, _batch_size)); // Initialize _cur_add_block_request _cur_add_block_request.set_allocated_id(&_parent->_load_id); @@ -198,7 +198,18 @@ Status VNodeChannel::add_row(const BlockRow& block_row, int64_t tablet_id) { _pending_batches_num++; } - _cur_mutable_block.reset(new vectorized::MutableBlock({_tuple_desc})); + bool do_not_get_free_block = true; + { + std::lock_guard l(_free_batches_lock); + do_not_get_free_block = _free_batches_vec.empty(); + if (!do_not_get_free_block) { + _cur_mutable_block.reset(new vectorized::MutableBlock(std::move(_free_batches_vec.back()))); + _free_batches_vec.pop_back(); + } + } + if (do_not_get_free_block) { + _cur_mutable_block.reset(new vectorized::MutableBlock({_tuple_desc}, _batch_size)); + } _cur_add_block_request.clear_tablet_ids(); } @@ -272,6 +283,12 @@ void VNodeChannel::try_send_block(RuntimeState* state) { } } + block.clear_column_data(); + { + std::lock_guard l(_free_batches_lock); + _free_batches_vec.emplace_back(std::move(block)); + } + int remain_ms = _rpc_timeout_ms - _timeout_watch.elapsed_time() / NANOS_PER_MILLIS; if (UNLIKELY(remain_ms < config::min_load_rpc_timeout_ms)) { if (remain_ms <= 0 && !request.eos()) { diff --git a/be/src/vec/sink/vtablet_sink.h b/be/src/vec/sink/vtablet_sink.h index 36943473a886d9..c65ed13bbb8f60 100644 --- a/be/src/vec/sink/vtablet_sink.h +++ b/be/src/vec/sink/vtablet_sink.h @@ -62,6 +62,8 @@ class VNodeChannel : public NodeChannel { using AddBlockReq = std::pair, PTabletWriterAddBlockRequest>; std::queue _pending_blocks; + std::mutex _free_batches_lock; // reuse for vectorized + std::vector _free_batches_vec; ReusableClosure* _add_block_closure = nullptr; }; diff --git a/tools/clickbench-tools/load-clickbench-data.sh b/tools/clickbench-tools/load-clickbench-data.sh index 53d6d61b4b8499..c064353c206867 100755 --- a/tools/clickbench-tools/load-clickbench-data.sh +++ b/tools/clickbench-tools/load-clickbench-data.sh @@ -121,7 +121,7 @@ function load() { cd - echo "(2/2) load clickbench data file $DATA_DIR/hits_split[0-9] into Doris" - for i in $(seq 0 9); do + for i in $(seq 0 1); do echo -e " start loading hits_split${i}" curl --location-trusted \