Skip to content

Commit

Permalink
speed up vectorized load
Browse files Browse the repository at this point in the history
  • Loading branch information
HappenLee committed Aug 27, 2022
1 parent ebc4349 commit ca4785b
Show file tree
Hide file tree
Showing 12 changed files with 126 additions and 60 deletions.
28 changes: 18 additions & 10 deletions be/src/exec/base_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ Status BaseScanner::_materialize_dest_block(vectorized::Block* dest_block) {
size_t rows = _src_block.rows();
auto filter_column = vectorized::ColumnUInt8::create(rows, 1);
auto& filter_map = filter_column->get_data();
auto origin_column_num = _src_block.columns();

for (auto slot_desc : _dest_tuple_desc->slots()) {
if (!slot_desc->is_materialized()) {
Expand All @@ -315,7 +316,11 @@ Status BaseScanner::_materialize_dest_block(vectorized::Block* dest_block) {
int result_column_id = -1;
// PT1 => dest primitive type
RETURN_IF_ERROR(ctx->execute(&_src_block, &result_column_id));
auto column_ptr = _src_block.get_by_position(result_column_id).column;
bool is_origin_column = result_column_id < origin_column_num;
auto column_ptr = is_origin_column ?
_src_block.get_by_position(result_column_id).column->clone_resized(rows) :
_src_block.get_by_position(result_column_id).column;

DCHECK(column_ptr != nullptr);

// because of src_slot_desc is always be nullable, so the column_ptr after do dest_expr
Expand Down Expand Up @@ -373,7 +378,7 @@ Status BaseScanner::_materialize_dest_block(vectorized::Block* dest_block) {
}

// after do the dest block insert operation, clear _src_block to remove the reference of origin column
_src_block.clear();
_src_block.clear_column_data(origin_column_num);

size_t dest_size = dest_block->columns();
// do filter
Expand All @@ -389,15 +394,18 @@ Status BaseScanner::_materialize_dest_block(vectorized::Block* dest_block) {
// TODO: opt the reuse of src_block or dest_block column. some case we have to
// shallow copy the column of src_block to dest block
Status BaseScanner::_init_src_block() {
DCHECK(_src_block.columns() == 0);
for (auto i = 0; i < _num_of_columns_from_file; ++i) {
SlotDescriptor* slot_desc = _src_slot_descs[i];
if (slot_desc == nullptr) {
continue;
if (_src_block.is_empty_column()) {
for (auto i = 0; i < _num_of_columns_from_file; ++i) {
SlotDescriptor* slot_desc = _src_slot_descs[i];
if (slot_desc == nullptr) {
continue;
}
auto data_type = slot_desc->get_data_type_ptr();
auto column_ptr = data_type->create_column();
column_ptr->reserve(4096);
_src_block.insert(vectorized::ColumnWithTypeAndName(std::move(column_ptr), data_type,
slot_desc->col_name()));
}
auto data_type = slot_desc->get_data_type_ptr();
_src_block.insert(vectorized::ColumnWithTypeAndName(
data_type->create_column(), slot_desc->get_data_type_ptr(), slot_desc->col_name()));
}

return Status::OK();
Expand Down
19 changes: 7 additions & 12 deletions be/src/exec/text_converter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,19 +169,14 @@ inline bool TextConverter::write_slot(const SlotDescriptor* slot_desc, Tuple* tu
inline void TextConverter::write_string_column(const SlotDescriptor* slot_desc,
vectorized::MutableColumnPtr* column_ptr,
const char* data, size_t len) {
vectorized::IColumn* col_ptr = column_ptr->get();
// \N means it's NULL
if (LIKELY(slot_desc->is_nullable())) {
auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(column_ptr->get());
if ((len == 2 && data[0] == '\\' && data[1] == 'N') || len == SQL_NULL_DATA) {
nullable_column->insert_data(nullptr, 0);
return;
} else {
nullable_column->get_null_map_data().push_back(0);
col_ptr = &nullable_column->get_nested_column();
}
auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(column_ptr->get());
if (len == 2 && data[0] == '\\' && data[1] == 'N') {
nullable_column->get_null_map_data().push_back(1);
reinterpret_cast<vectorized::ColumnString&>(nullable_column->get_nested_column()).insert_default();
} else {
nullable_column->get_null_map_data().push_back(0);
reinterpret_cast<vectorized::ColumnString&>(nullable_column->get_nested_column()).insert_data(data, len);
}
reinterpret_cast<vectorized::ColumnString*>(col_ptr)->insert_data(data, len);
}

inline bool TextConverter::write_column(const SlotDescriptor* slot_desc,
Expand Down
3 changes: 2 additions & 1 deletion be/src/vec/core/block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -931,11 +931,12 @@ void Block::deep_copy_slot(void* dst, MemPool* pool, const doris::TypeDescriptor
}
}

MutableBlock::MutableBlock(const std::vector<TupleDescriptor*>& tuple_descs) {
MutableBlock::MutableBlock(const std::vector<TupleDescriptor*>& tuple_descs, int reserve_size) {
for (auto tuple_desc : tuple_descs) {
for (auto slot_desc : tuple_desc->slots()) {
_data_types.emplace_back(slot_desc->get_data_type_ptr());
_columns.emplace_back(_data_types.back()->create_column());
if (reserve_size != 0) { _columns.back()->reserve(reserve_size); }
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/core/block.h
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ class MutableBlock {
MutableBlock() = default;
~MutableBlock() = default;

MutableBlock(const std::vector<TupleDescriptor*>& tuple_descs);
MutableBlock(const std::vector<TupleDescriptor*>& tuple_descs, int reserve_size = 0);

MutableBlock(Block* block)
: _columns(block->mutate_columns()), _data_types(block->get_data_types()) {}
Expand Down
14 changes: 2 additions & 12 deletions be/src/vec/exec/vbroker_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ VBrokerScanner::~VBrokerScanner() = default;

Status VBrokerScanner::get_next(Block* output_block, bool* eof) {
SCOPED_TIMER(_read_timer);

RETURN_IF_ERROR(_init_src_block());

const int batch_size = _state->batch_size();
Expand Down Expand Up @@ -88,7 +89,7 @@ Status VBrokerScanner::get_next(Block* output_block, bool* eof) {
Status VBrokerScanner::_fill_dest_columns(const Slice& line,
std::vector<MutableColumnPtr>& columns) {
RETURN_IF_ERROR(_line_split_to_values(line));
if (!_success) {
if (UNLIKELY(!_success)) {
// If not success, which means we met an invalid row, return.
return Status::OK();
}
Expand All @@ -98,19 +99,8 @@ Status VBrokerScanner::_fill_dest_columns(const Slice& line,
int dest_index = idx++;

auto src_slot_desc = _src_slot_descs[i];
if (!src_slot_desc->is_materialized()) {
continue;
}

const Slice& value = _split_values[i];
if (is_null(value)) {
// nullable
auto* nullable_column =
reinterpret_cast<vectorized::ColumnNullable*>(columns[dest_index].get());
nullable_column->insert_default();
continue;
}

_text_converter->write_string_column(src_slot_desc, &columns[dest_index], value.data,
value.size);
}
Expand Down
3 changes: 1 addition & 2 deletions be/src/vec/exec/volap_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -310,8 +310,7 @@ void VOlapScanNode::transfer_thread(RuntimeState* state) {
auto block_per_scanner = (doris_scanner_row_num + (_block_size - 1)) / _block_size;
auto pre_block_count =
std::min(_volap_scanners.size(),
static_cast<size_t>(config::doris_scanner_thread_pool_thread_num)) *
block_per_scanner;
static_cast<size_t>(config::thrift_connect_timeout_seconds)) * block_per_scanner;

for (int i = 0; i < pre_block_count; ++i) {
auto block = new Block(_tuple_desc->slots(), _block_size);
Expand Down
51 changes: 32 additions & 19 deletions be/src/vec/functions/function_cast.h
Original file line number Diff line number Diff line change
Expand Up @@ -1275,31 +1275,44 @@ class FunctionCast final : public IFunctionBase {
const auto& nested_type = nullable_type.get_nested_type();

Block tmp_block;
if (source_is_nullable)
tmp_block = create_block_with_nested_columns(block, arguments);
else
tmp_block = block;
if (source_is_nullable) {
tmp_block = create_block_with_nested_columns_only_args(block, arguments);
size_t tmp_res_index = tmp_block.columns();
tmp_block.insert({nullptr, nested_type, ""});

/// Perform the requested conversion.
RETURN_IF_ERROR(wrapper(context, tmp_block, {0}, tmp_res_index,
input_rows_count));

const auto& tmp_res = tmp_block.get_by_position(tmp_res_index);

size_t tmp_res_index = block.columns();
tmp_block.insert({nullptr, nested_type, ""});
res.column = wrap_in_nullable(tmp_res.column,
Block({block.get_by_position(arguments[0]), tmp_res}),
{0}, 1, input_rows_count);
} else {
tmp_block = block;

/// Perform the requested conversion.
RETURN_IF_ERROR(
wrapper(context, tmp_block, arguments, tmp_res_index, input_rows_count));
size_t tmp_res_index = block.columns();
tmp_block.insert({nullptr, nested_type, ""});

const auto& tmp_res = tmp_block.get_by_position(tmp_res_index);
/// Perform the requested conversion.
RETURN_IF_ERROR(wrapper(context, tmp_block, arguments, tmp_res_index,
input_rows_count));

/// May happen in fuzzy tests. For debug purpose.
if (!tmp_res.column.get()) {
return Status::RuntimeError(
"Couldn't convert {} to {} in prepare_remove_nullable wrapper.",
block.get_by_position(arguments[0]).type->get_name(),
nested_type->get_name());
res.column = tmp_block.get_by_position(tmp_res_index).column;
}

res.column = wrap_in_nullable(tmp_res.column,
Block({block.get_by_position(arguments[0]), tmp_res}),
{0}, 1, input_rows_count);
// /// May happen in fuzzy tests. For debug purpose.
// if (!tmp_res.column.get()) {
// return Status::RuntimeError(
// "Couldn't convert {} to {} in prepare_remove_nullable wrapper.",
// block.get_by_position(arguments[0]).type->get_name(),
// nested_type->get_name());
// }

// res.column = wrap_in_nullable(tmp_res.column,
// Block({block.get_by_position(arguments[0]), tmp_res}),
// {0}, 1, input_rows_count);
return Status::OK();
};
} else if (source_is_nullable) {
Expand Down
36 changes: 36 additions & 0 deletions be/src/vec/functions/function_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,42 @@

namespace doris::vectorized {

Block create_block_with_nested_columns_only_args(const Block& block, const ColumnNumbers& args) {
std::unordered_set<size_t> args_set(args.begin(), args.end());
Block res;
size_t columns = block.columns();

for (size_t i = 0; i < columns; ++i) {
const auto& col = block.get_by_position(i);

if (args_set.count(i)) {
if (col.type->is_nullable()) {
const DataTypePtr& nested_type =
static_cast<const DataTypeNullable&>(*col.type).get_nested_type();

if (!col.column) {
res.insert({nullptr, nested_type, col.name});
} else if (auto* nullable = check_and_get_column<ColumnNullable>(*col.column)) {
const auto& nested_col = nullable->get_nested_column_ptr();
res.insert({nested_col, nested_type, col.name});
} else if (auto* const_column = check_and_get_column<ColumnConst>(*col.column)) {
const auto& nested_col =
check_and_get_column<ColumnNullable>(const_column->get_data_column())
->get_nested_column_ptr();
res.insert({ColumnConst::create(nested_col, col.column->size()), nested_type,
col.name});
} else {
LOG(FATAL) << "Illegal column for DataTypeNullable";
}
} else {
res.insert(col);
}
}
}

return res;
}

static Block create_block_with_nested_columns_impl(const Block& block,
const std::unordered_set<size_t>& args) {
Block res;
Expand Down
5 changes: 5 additions & 0 deletions be/src/vec/functions/function_helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ Block create_block_with_nested_columns(const Block& block, const ColumnNumbers&
Block create_block_with_nested_columns(const Block& block, const ColumnNumbers& args,
size_t result);

/// Returns the copy of a given block in which each column specified in
/// the "arguments" parameter is replaced with its respective nested
/// column if it is nullable. only contain args
Block create_block_with_nested_columns_only_args(const Block& block, const ColumnNumbers& args);

/// Checks argument type at specified index with predicate.
/// throws if there is no argument at specified index or if predicate returns false.
void validate_argument_type(const IFunction& func, const DataTypes& arguments,
Expand Down
21 changes: 19 additions & 2 deletions be/src/vec/sink/vtablet_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ void VNodeChannel::clear_all_blocks() {
Status VNodeChannel::init(RuntimeState* state) {
RETURN_IF_ERROR(NodeChannel::init(state));

_cur_mutable_block.reset(new vectorized::MutableBlock({_tuple_desc}));
_cur_mutable_block.reset(new vectorized::MutableBlock({_tuple_desc}, _batch_size));

// Initialize _cur_add_block_request
_cur_add_block_request.set_allocated_id(&_parent->_load_id);
Expand Down Expand Up @@ -198,7 +198,18 @@ Status VNodeChannel::add_row(const BlockRow& block_row, int64_t tablet_id) {
_pending_batches_num++;
}

_cur_mutable_block.reset(new vectorized::MutableBlock({_tuple_desc}));
bool do_not_get_free_block = true;
{
std::lock_guard<std::mutex> l(_free_batches_lock);
do_not_get_free_block = _free_batches_vec.empty();
if (!do_not_get_free_block) {
_cur_mutable_block.reset(new vectorized::MutableBlock(std::move(_free_batches_vec.back())));
_free_batches_vec.pop_back();
}
}
if (do_not_get_free_block) {
_cur_mutable_block.reset(new vectorized::MutableBlock({_tuple_desc}, _batch_size));
}
_cur_add_block_request.clear_tablet_ids();
}

Expand Down Expand Up @@ -272,6 +283,12 @@ void VNodeChannel::try_send_block(RuntimeState* state) {
}
}

block.clear_column_data();
{
std::lock_guard<std::mutex> l(_free_batches_lock);
_free_batches_vec.emplace_back(std::move(block));
}

int remain_ms = _rpc_timeout_ms - _timeout_watch.elapsed_time() / NANOS_PER_MILLIS;
if (UNLIKELY(remain_ms < config::min_load_rpc_timeout_ms)) {
if (remain_ms <= 0 && !request.eos()) {
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/sink/vtablet_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ class VNodeChannel : public NodeChannel {
using AddBlockReq =
std::pair<std::unique_ptr<vectorized::MutableBlock>, PTabletWriterAddBlockRequest>;
std::queue<AddBlockReq> _pending_blocks;
std::mutex _free_batches_lock; // reuse for vectorized
std::vector<vectorized::Block> _free_batches_vec;
ReusableClosure<PTabletWriterAddBlockResult>* _add_block_closure = nullptr;
};

Expand Down
2 changes: 1 addition & 1 deletion tools/clickbench-tools/load-clickbench-data.sh
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ function load() {
cd -

echo "(2/2) load clickbench data file $DATA_DIR/hits_split[0-9] into Doris"
for i in $(seq 0 9); do
for i in $(seq 0 1); do
echo -e "
start loading hits_split${i}"
curl --location-trusted \
Expand Down

0 comments on commit ca4785b

Please sign in to comment.