From 895619f0297db61bf9ef31d0201016d8afa80b4c Mon Sep 17 00:00:00 2001 From: HappenLee Date: Fri, 11 Aug 2023 15:35:15 +0800 Subject: [PATCH] save code --- be/src/exec/data_sink.h | 5 + be/src/pipeline/exec/operator.h | 4 +- be/src/pipeline/exec/table_sink_operator.h | 2 +- be/src/runtime/fragment_mgr.h | 2 + be/src/vec/sink/async_result_writer.cpp | 92 +++++++++++++++++++ be/src/vec/sink/async_result_writer.h | 63 +++++++++++++ be/src/vec/sink/vmysql_table_sink.cpp | 20 ++-- be/src/vec/sink/vmysql_table_sink.h | 7 +- be/src/vec/sink/vmysql_table_writer.cpp | 40 +++++--- be/src/vec/sink/vmysql_table_writer.h | 20 ++-- be/src/vec/sink/vtable_sink.h | 2 + .../java/org/apache/doris/qe/Coordinator.java | 2 +- 12 files changed, 215 insertions(+), 44 deletions(-) create mode 100644 be/src/vec/sink/async_result_writer.cpp create mode 100644 be/src/vec/sink/async_result_writer.h diff --git a/be/src/exec/data_sink.h b/be/src/exec/data_sink.h index fd59cd1d27c0811..9648a7378007a5f 100644 --- a/be/src/exec/data_sink.h +++ b/be/src/exec/data_sink.h @@ -67,6 +67,11 @@ class DataSink { return Status::NotSupported("Not support send block"); } + // Send a Block into this sink, only use in pipeline exec engine + virtual Status sink(RuntimeState* state, vectorized::Block* block, bool eos = false) { + return send(state, block, eos); + } + [[nodiscard]] virtual Status try_close(RuntimeState* state, Status exec_status) { return Status::OK(); } diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index acf55cb7bc465ae..8a7dc565a62fe73 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -273,8 +273,8 @@ class DataSinkOperator : public OperatorBase { Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state) override { - if (in_block->rows() > 0) { - auto st = _sink->send(state, in_block, source_state == SourceState::FINISHED); + if (in_block->rows() > 0 || source_state == SourceState::FINISHED) { + auto st = _sink->sink(state, in_block, source_state == SourceState::FINISHED); // TODO: improvement: if sink returned END_OF_FILE, pipeline task can be finished if (st.template is()) { return Status::OK(); diff --git a/be/src/pipeline/exec/table_sink_operator.h b/be/src/pipeline/exec/table_sink_operator.h index cbad6d64728e313..054a511139ac8f9 100644 --- a/be/src/pipeline/exec/table_sink_operator.h +++ b/be/src/pipeline/exec/table_sink_operator.h @@ -38,7 +38,7 @@ class TableSinkOperator final : public DataSinkOperatorcan_write(); } }; OperatorPtr TableSinkOperatorBuilder::build_operator() { diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index 8ca58ccffa663ff..9820cf90456517b 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -139,6 +139,8 @@ class FragmentMgr : public RestMonitorIface { void coordinator_callback(const ReportStatusRequest& req); + ThreadPool* get_thread_pool() { return _thread_pool.get(); } + private: void _exec_actual(std::shared_ptr exec_state, const FinishCallback& cb); diff --git a/be/src/vec/sink/async_result_writer.cpp b/be/src/vec/sink/async_result_writer.cpp new file mode 100644 index 000000000000000..5225bcf0e2530c0 --- /dev/null +++ b/be/src/vec/sink/async_result_writer.cpp @@ -0,0 +1,92 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "async_result_writer.h" + +#include "runtime/exec_env.h" +#include "runtime/fragment_mgr.h" +#include "vec/core/block.h" + +namespace doris { +class ObjectPool; +class RowDescriptor; +class TExpr; + +namespace vectorized { + +Status AsyncResultWriter::sink(Block* block, bool eos) { + auto rows = block->rows(); + std::unique_ptr add_block; + if (rows) { + add_block = block->create_same_struct_block(0); + RETURN_IF_ERROR(MutableBlock::build_mutable_block(add_block.get()).merge(*block)); + } + + std::lock_guard l(_m); + _eos = eos; + if (rows) { + _data_queue.emplace_back(std::move(add_block)); + } + _cv.notify_one(); + return _writer_status; +} + +std::unique_ptr AsyncResultWriter::get_block_from_queue() { + std::lock_guard l(_m); + DCHECK(!_data_queue.empty()); + auto block = std::move(_data_queue.front()); + _data_queue.pop_front(); + return block; +} + +void AsyncResultWriter::start_writer() { + ExecEnv::GetInstance()->fragment_mgr()->get_thread_pool()->submit_func( + [this]() { this->process_block(); }); +} + +void AsyncResultWriter::process_block() { + if (!_is_open) { + _writer_status = open(); + _is_open = true; + } + + if (_writer_status.ok()) { + while (true) { + { + std::unique_lock l(_m); + while (!_eos && _data_queue.empty()) { + _cv.wait(l); + } + } + + if (_eos && _data_queue.empty()) { + break; + } + + auto status = write(get_block_from_queue()); + std::unique_lock l(_m); + _writer_status = status; + if (!status.ok()) { + break; + } + } + } + _writer_thread_closed = true; +} + +} // namespace vectorized +} // namespace doris diff --git a/be/src/vec/sink/async_result_writer.h b/be/src/vec/sink/async_result_writer.h new file mode 100644 index 000000000000000..6a606cb6a2a3e11 --- /dev/null +++ b/be/src/vec/sink/async_result_writer.h @@ -0,0 +1,63 @@ +// +// Created by baizhenbo on 8/3/23. +// + +#ifndef DORIS_ASYNC_RESULT_WRITER_H +#define DORIS_ASYNC_RESULT_WRITER_H + +#include +#include + +#include "runtime/result_writer.h" + +namespace doris { +class ObjectPool; +class RowDescriptor; +class RuntimeState; +class TDataSink; +class TExpr; + +namespace vectorized { +class Block; + +// This class is a sinker, which put input data to jdbc table +class AsyncResultWriter : public ResultWriter { +public: + virtual Status open() { return Status::OK(); } + Status write(std::unique_ptr block) { + return append_block(*block); + } + + bool can_write() { + std::lock_guard l(_m); + return _data_queue.size() < 3 || !_writer_status.ok() || _eos; + } + + [[nodiscard]] bool is_pending_finish() const { return !_writer_thread_closed; } + + void process_block(); + + Status close() override { return Status::OK(); } + + Status init(RuntimeState* state) override { return Status::OK(); } + + Status sink(Block* block, bool eos); + + std::unique_ptr get_block_from_queue(); + + void start_writer(); + +private: + bool _is_open = false; + std::mutex _m; + std::condition_variable _cv; + std::deque> _data_queue; + Status _writer_status = Status::OK(); + bool _eos = false; + bool _writer_thread_closed = false; +}; + +} // namespace vectorized +} // namespace doris + +#endif //DORIS_ASYNC_RESULT_WRITER_H diff --git a/be/src/vec/sink/vmysql_table_sink.cpp b/be/src/vec/sink/vmysql_table_sink.cpp index ee1c015c54d4a61..0199299295ee6c6 100644 --- a/be/src/vec/sink/vmysql_table_sink.cpp +++ b/be/src/vec/sink/vmysql_table_sink.cpp @@ -40,28 +40,24 @@ VMysqlTableSink::VMysqlTableSink(ObjectPool* pool, const RowDescriptor& row_desc Status VMysqlTableSink::init(const TDataSink& t_sink) { RETURN_IF_ERROR(VTableSink::init(t_sink)); - const TMysqlTableSink& t_mysql_sink = t_sink.mysql_table_sink; - _conn_info.host = t_mysql_sink.host; - _conn_info.port = t_mysql_sink.port; - _conn_info.user = t_mysql_sink.user; - _conn_info.passwd = t_mysql_sink.passwd; - _conn_info.db = t_mysql_sink.db; - _table_name = t_mysql_sink.table; - _conn_info.charset = t_mysql_sink.charset; + // create writer + _writer.reset(new VMysqlTableWriter(t_sink, _output_vexpr_ctxs)); return Status::OK(); } Status VMysqlTableSink::open(RuntimeState* state) { // Prepare the exprs to run. RETURN_IF_ERROR(VTableSink::open(state)); - // create writer - _writer.reset(new VMysqlTableWriter(_output_vexpr_ctxs)); - RETURN_IF_ERROR(_writer->open(_conn_info, _table_name)); + if (state->enable_pipeline_exec()) { + _writer->start_writer(); + } else { + RETURN_IF_ERROR(_writer->open()); + } return Status::OK(); } Status VMysqlTableSink::send(RuntimeState* state, Block* block, bool eos) { - return _writer->append(block); + return _writer->append_block(*block); } Status VMysqlTableSink::close(RuntimeState* state, Status exec_status) { diff --git a/be/src/vec/sink/vmysql_table_sink.h b/be/src/vec/sink/vmysql_table_sink.h index 00ce4346fecd824..93dbf2d71f14935 100644 --- a/be/src/vec/sink/vmysql_table_sink.h +++ b/be/src/vec/sink/vmysql_table_sink.h @@ -44,10 +44,15 @@ class VMysqlTableSink : public VTableSink { Status send(RuntimeState* state, vectorized::Block* block, bool eos = false) override; + Status sink(RuntimeState* state, vectorized::Block* block, bool eos = false) override { + return _writer->sink(block, eos); + } + Status close(RuntimeState* state, Status exec_status) override; + bool is_close_done() override { return !_writer->is_pending_finish(); } + private: - MysqlConnInfo _conn_info; std::unique_ptr _writer; }; } // namespace vectorized diff --git a/be/src/vec/sink/vmysql_table_writer.cpp b/be/src/vec/sink/vmysql_table_writer.cpp index af1f920e4ab052a..98682e9e0ccd848 100644 --- a/be/src/vec/sink/vmysql_table_writer.cpp +++ b/be/src/vec/sink/vmysql_table_writer.cpp @@ -17,6 +17,7 @@ #include "vec/sink/vmysql_table_writer.h" +#include #include #include #include @@ -55,12 +56,22 @@ std::string MysqlConnInfo::debug_string() const { std::stringstream ss; ss << "(host=" << host << ",port=" << port << ",user=" << user << ",db=" << db - << ",passwd=" << passwd << ",charset=" << charset << ")"; + << ",table=" << table_name << ",passwd=" << passwd << ",charset=" << charset << ")"; return ss.str(); } -VMysqlTableWriter::VMysqlTableWriter(const VExprContextSPtrs& output_expr_ctxs) - : _vec_output_expr_ctxs(output_expr_ctxs) {} +VMysqlTableWriter::VMysqlTableWriter(const TDataSink& t_sink, + const VExprContextSPtrs& output_expr_ctxs) + : _vec_output_expr_ctxs(output_expr_ctxs) { + const auto& t_mysql_sink = t_sink.mysql_table_sink; + _conn_info.host = t_mysql_sink.host; + _conn_info.port = t_mysql_sink.port; + _conn_info.user = t_mysql_sink.user; + _conn_info.passwd = t_mysql_sink.passwd; + _conn_info.db = t_mysql_sink.db; + _conn_info.table_name = t_mysql_sink.table; + _conn_info.charset = t_mysql_sink.charset; +} VMysqlTableWriter::~VMysqlTableWriter() { if (_mysql_conn) { @@ -68,16 +79,17 @@ VMysqlTableWriter::~VMysqlTableWriter() { } } -Status VMysqlTableWriter::open(const MysqlConnInfo& conn_info, const std::string& tbl) { +Status VMysqlTableWriter::open() { _mysql_conn = mysql_init(nullptr); if (_mysql_conn == nullptr) { return Status::InternalError("Call mysql_init failed."); } - MYSQL* res = mysql_real_connect(_mysql_conn, conn_info.host.c_str(), conn_info.user.c_str(), - conn_info.passwd.c_str(), conn_info.db.c_str(), conn_info.port, - nullptr, // unix socket - 0); // flags + MYSQL* res = + mysql_real_connect(_mysql_conn, _conn_info.host.c_str(), _conn_info.user.c_str(), + _conn_info.passwd.c_str(), _conn_info.db.c_str(), _conn_info.port, + nullptr, // unix socket + 0); // flags if (res == nullptr) { fmt::memory_buffer err_ss; fmt::format_to(err_ss, "mysql_real_connect failed because : {}.", mysql_error(_mysql_conn)); @@ -85,26 +97,24 @@ Status VMysqlTableWriter::open(const MysqlConnInfo& conn_info, const std::string } // set character - if (mysql_set_character_set(_mysql_conn, conn_info.charset.c_str())) { + if (mysql_set_character_set(_mysql_conn, _conn_info.charset.c_str())) { fmt::memory_buffer err_ss; fmt::format_to(err_ss, "mysql_set_character_set failed because : {}.", mysql_error(_mysql_conn)); return Status::InternalError(fmt::to_string(err_ss.data())); } - _mysql_tbl = tbl; - return Status::OK(); } -Status VMysqlTableWriter::append(vectorized::Block* block) { +Status VMysqlTableWriter::append_block(vectorized::Block& block) { Status status = Status::OK(); - if (block == nullptr || block->rows() == 0) { + if (block.rows() == 0) { return status; } Block output_block; RETURN_IF_ERROR(vectorized::VExprContext::get_output_block_after_execute_exprs( - _vec_output_expr_ctxs, *block, &output_block)); + _vec_output_expr_ctxs, block, &output_block)); auto num_rows = output_block.rows(); materialize_block_inplace(output_block); for (int i = 0; i < num_rows; ++i) { @@ -115,7 +125,7 @@ Status VMysqlTableWriter::append(vectorized::Block* block) { Status VMysqlTableWriter::insert_row(vectorized::Block& block, size_t row) { _insert_stmt_buffer.clear(); - fmt::format_to(_insert_stmt_buffer, "INSERT INTO {} VALUES (", _mysql_tbl); + fmt::format_to(_insert_stmt_buffer, "INSERT INTO {} VALUES (", _conn_info.table_name); int num_columns = _vec_output_expr_ctxs.size(); for (int i = 0; i < num_columns; ++i) { diff --git a/be/src/vec/sink/vmysql_table_writer.h b/be/src/vec/sink/vmysql_table_writer.h index 51f62a4db50e4e9..62de5e8dbf93ec9 100644 --- a/be/src/vec/sink/vmysql_table_writer.h +++ b/be/src/vec/sink/vmysql_table_writer.h @@ -24,6 +24,7 @@ #include #include +#include "async_result_writer.h" #include "common/status.h" #include "vec/exprs/vexpr_fwd.h" @@ -35,6 +36,7 @@ struct MysqlConnInfo { std::string user; std::string passwd; std::string db; + std::string table_name; int port; std::string charset; @@ -43,27 +45,21 @@ struct MysqlConnInfo { class Block; -class VMysqlTableWriter { +class VMysqlTableWriter final : public AsyncResultWriter { public: - VMysqlTableWriter(const VExprContextSPtrs& output_exprs); - ~VMysqlTableWriter(); + VMysqlTableWriter(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs); + ~VMysqlTableWriter() override; // connect to mysql server - Status open(const MysqlConnInfo& conn_info, const std::string& tbl); + Status open() override; - Status begin_trans() { return Status::OK(); } - - Status append(vectorized::Block* block); - - Status abort_tarns() { return Status::OK(); } - - Status finish_tarns() { return Status::OK(); } + Status append_block(vectorized::Block& block) override; private: Status insert_row(vectorized::Block& block, size_t row); + MysqlConnInfo _conn_info; const VExprContextSPtrs& _vec_output_expr_ctxs; fmt::memory_buffer _insert_stmt_buffer; - std::string _mysql_tbl; MYSQL* _mysql_conn; }; } // namespace vectorized diff --git a/be/src/vec/sink/vtable_sink.h b/be/src/vec/sink/vtable_sink.h index 0c45d567f381b0f..247d8072b71be59 100644 --- a/be/src/vec/sink/vtable_sink.h +++ b/be/src/vec/sink/vtable_sink.h @@ -54,6 +54,8 @@ class VTableSink : public DataSink { const RowDescriptor& row_desc() { return _row_desc; } + virtual bool can_write() { return true; } + protected: // owned by RuntimeState ObjectPool* _pool; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 6edc8d6744a934e..32829d2db53a5e3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -322,7 +322,7 @@ public Coordinator(ConnectContext context, Analyzer analyzer, Planner planner) { this.enableShareHashTableForBroadcastJoin = context.getSessionVariable().enableShareHashTableForBroadcastJoin; // Only enable pipeline query engine in query, not load this.enablePipelineEngine = context.getSessionVariable().getEnablePipelineEngine() - && (fragments.size() > 0 && fragments.get(0).getSink() instanceof ResultSink); + && fragments.size() > 0; initQueryOptions(context); setFromUserProperty(context);