Skip to content

Commit

Permalink
save code
Browse files Browse the repository at this point in the history
  • Loading branch information
HappenLee committed Aug 11, 2023
1 parent f88f021 commit 895619f
Show file tree
Hide file tree
Showing 12 changed files with 215 additions and 44 deletions.
5 changes: 5 additions & 0 deletions be/src/exec/data_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ class DataSink {
return Status::NotSupported("Not support send block");
}

// Send a Block into this sink, only use in pipeline exec engine
virtual Status sink(RuntimeState* state, vectorized::Block* block, bool eos = false) {
return send(state, block, eos);
}

[[nodiscard]] virtual Status try_close(RuntimeState* state, Status exec_status) {
return Status::OK();
}
Expand Down
4 changes: 2 additions & 2 deletions be/src/pipeline/exec/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,8 @@ class DataSinkOperator : public OperatorBase {

Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override {
if (in_block->rows() > 0) {
auto st = _sink->send(state, in_block, source_state == SourceState::FINISHED);
if (in_block->rows() > 0 || source_state == SourceState::FINISHED) {
auto st = _sink->sink(state, in_block, source_state == SourceState::FINISHED);
// TODO: improvement: if sink returned END_OF_FILE, pipeline task can be finished
if (st.template is<ErrorCode::END_OF_FILE>()) {
return Status::OK();
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/table_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class TableSinkOperator final : public DataSinkOperator<TableSinkOperatorBuilder
TableSinkOperator(OperatorBuilderBase* operator_builder, DataSink* sink)
: DataSinkOperator(operator_builder, sink) {}

bool can_write() override { return true; }
bool can_write() override { return _sink->can_write(); }
};

OperatorPtr TableSinkOperatorBuilder::build_operator() {
Expand Down
2 changes: 2 additions & 0 deletions be/src/runtime/fragment_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ class FragmentMgr : public RestMonitorIface {

void coordinator_callback(const ReportStatusRequest& req);

ThreadPool* get_thread_pool() { return _thread_pool.get(); }

private:
void _exec_actual(std::shared_ptr<FragmentExecState> exec_state, const FinishCallback& cb);

Expand Down
92 changes: 92 additions & 0 deletions be/src/vec/sink/async_result_writer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "async_result_writer.h"

#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"
#include "vec/core/block.h"

namespace doris {
class ObjectPool;
class RowDescriptor;
class TExpr;

namespace vectorized {

Status AsyncResultWriter::sink(Block* block, bool eos) {
auto rows = block->rows();
std::unique_ptr<Block> add_block;
if (rows) {
add_block = block->create_same_struct_block(0);
RETURN_IF_ERROR(MutableBlock::build_mutable_block(add_block.get()).merge(*block));
}

std::lock_guard l(_m);
_eos = eos;
if (rows) {
_data_queue.emplace_back(std::move(add_block));
}
_cv.notify_one();
return _writer_status;
}

std::unique_ptr<Block> AsyncResultWriter::get_block_from_queue() {
std::lock_guard l(_m);
DCHECK(!_data_queue.empty());
auto block = std::move(_data_queue.front());
_data_queue.pop_front();
return block;
}

void AsyncResultWriter::start_writer() {
ExecEnv::GetInstance()->fragment_mgr()->get_thread_pool()->submit_func(
[this]() { this->process_block(); });
}

void AsyncResultWriter::process_block() {
if (!_is_open) {
_writer_status = open();
_is_open = true;
}

if (_writer_status.ok()) {
while (true) {
{
std::unique_lock l(_m);
while (!_eos && _data_queue.empty()) {
_cv.wait(l);
}
}

if (_eos && _data_queue.empty()) {
break;
}

auto status = write(get_block_from_queue());
std::unique_lock l(_m);
_writer_status = status;
if (!status.ok()) {
break;
}
}
}
_writer_thread_closed = true;
}

} // namespace vectorized
} // namespace doris
63 changes: 63 additions & 0 deletions be/src/vec/sink/async_result_writer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
//
// Created by baizhenbo on 8/3/23.
//

#ifndef DORIS_ASYNC_RESULT_WRITER_H
#define DORIS_ASYNC_RESULT_WRITER_H

#include <condition_variable>
#include <queue>

#include "runtime/result_writer.h"

namespace doris {
class ObjectPool;
class RowDescriptor;
class RuntimeState;
class TDataSink;
class TExpr;

namespace vectorized {
class Block;

// This class is a sinker, which put input data to jdbc table
class AsyncResultWriter : public ResultWriter {
public:
virtual Status open() { return Status::OK(); }
Status write(std::unique_ptr<Block> block) {
return append_block(*block);
}

bool can_write() {
std::lock_guard l(_m);
return _data_queue.size() < 3 || !_writer_status.ok() || _eos;
}

[[nodiscard]] bool is_pending_finish() const { return !_writer_thread_closed; }

void process_block();

Status close() override { return Status::OK(); }

Status init(RuntimeState* state) override { return Status::OK(); }

Status sink(Block* block, bool eos);

std::unique_ptr<Block> get_block_from_queue();

void start_writer();

private:
bool _is_open = false;
std::mutex _m;
std::condition_variable _cv;
std::deque<std::unique_ptr<Block>> _data_queue;
Status _writer_status = Status::OK();
bool _eos = false;
bool _writer_thread_closed = false;
};

} // namespace vectorized
} // namespace doris

#endif //DORIS_ASYNC_RESULT_WRITER_H
20 changes: 8 additions & 12 deletions be/src/vec/sink/vmysql_table_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,28 +40,24 @@ VMysqlTableSink::VMysqlTableSink(ObjectPool* pool, const RowDescriptor& row_desc

Status VMysqlTableSink::init(const TDataSink& t_sink) {
RETURN_IF_ERROR(VTableSink::init(t_sink));
const TMysqlTableSink& t_mysql_sink = t_sink.mysql_table_sink;
_conn_info.host = t_mysql_sink.host;
_conn_info.port = t_mysql_sink.port;
_conn_info.user = t_mysql_sink.user;
_conn_info.passwd = t_mysql_sink.passwd;
_conn_info.db = t_mysql_sink.db;
_table_name = t_mysql_sink.table;
_conn_info.charset = t_mysql_sink.charset;
// create writer
_writer.reset(new VMysqlTableWriter(t_sink, _output_vexpr_ctxs));
return Status::OK();
}

Status VMysqlTableSink::open(RuntimeState* state) {
// Prepare the exprs to run.
RETURN_IF_ERROR(VTableSink::open(state));
// create writer
_writer.reset(new VMysqlTableWriter(_output_vexpr_ctxs));
RETURN_IF_ERROR(_writer->open(_conn_info, _table_name));
if (state->enable_pipeline_exec()) {
_writer->start_writer();
} else {
RETURN_IF_ERROR(_writer->open());
}
return Status::OK();
}

Status VMysqlTableSink::send(RuntimeState* state, Block* block, bool eos) {
return _writer->append(block);
return _writer->append_block(*block);
}

Status VMysqlTableSink::close(RuntimeState* state, Status exec_status) {
Expand Down
7 changes: 6 additions & 1 deletion be/src/vec/sink/vmysql_table_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,15 @@ class VMysqlTableSink : public VTableSink {

Status send(RuntimeState* state, vectorized::Block* block, bool eos = false) override;

Status sink(RuntimeState* state, vectorized::Block* block, bool eos = false) override {
return _writer->sink(block, eos);
}

Status close(RuntimeState* state, Status exec_status) override;

bool is_close_done() override { return !_writer->is_pending_finish(); }

private:
MysqlConnInfo _conn_info;
std::unique_ptr<VMysqlTableWriter> _writer;
};
} // namespace vectorized
Expand Down
40 changes: 25 additions & 15 deletions be/src/vec/sink/vmysql_table_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "vec/sink/vmysql_table_writer.h"

#include <gen_cpp/DataSinks_types.h>
#include <glog/logging.h>
#include <mysql/mysql.h>
#include <stdint.h>
Expand Down Expand Up @@ -55,56 +56,65 @@ std::string MysqlConnInfo::debug_string() const {
std::stringstream ss;

ss << "(host=" << host << ",port=" << port << ",user=" << user << ",db=" << db
<< ",passwd=" << passwd << ",charset=" << charset << ")";
<< ",table=" << table_name << ",passwd=" << passwd << ",charset=" << charset << ")";
return ss.str();
}

VMysqlTableWriter::VMysqlTableWriter(const VExprContextSPtrs& output_expr_ctxs)
: _vec_output_expr_ctxs(output_expr_ctxs) {}
VMysqlTableWriter::VMysqlTableWriter(const TDataSink& t_sink,
const VExprContextSPtrs& output_expr_ctxs)
: _vec_output_expr_ctxs(output_expr_ctxs) {
const auto& t_mysql_sink = t_sink.mysql_table_sink;
_conn_info.host = t_mysql_sink.host;
_conn_info.port = t_mysql_sink.port;
_conn_info.user = t_mysql_sink.user;
_conn_info.passwd = t_mysql_sink.passwd;
_conn_info.db = t_mysql_sink.db;
_conn_info.table_name = t_mysql_sink.table;
_conn_info.charset = t_mysql_sink.charset;
}

VMysqlTableWriter::~VMysqlTableWriter() {
if (_mysql_conn) {
mysql_close(_mysql_conn);
}
}

Status VMysqlTableWriter::open(const MysqlConnInfo& conn_info, const std::string& tbl) {
Status VMysqlTableWriter::open() {
_mysql_conn = mysql_init(nullptr);
if (_mysql_conn == nullptr) {
return Status::InternalError("Call mysql_init failed.");
}

MYSQL* res = mysql_real_connect(_mysql_conn, conn_info.host.c_str(), conn_info.user.c_str(),
conn_info.passwd.c_str(), conn_info.db.c_str(), conn_info.port,
nullptr, // unix socket
0); // flags
MYSQL* res =
mysql_real_connect(_mysql_conn, _conn_info.host.c_str(), _conn_info.user.c_str(),
_conn_info.passwd.c_str(), _conn_info.db.c_str(), _conn_info.port,
nullptr, // unix socket
0); // flags
if (res == nullptr) {
fmt::memory_buffer err_ss;
fmt::format_to(err_ss, "mysql_real_connect failed because : {}.", mysql_error(_mysql_conn));
return Status::InternalError(fmt::to_string(err_ss.data()));
}

// set character
if (mysql_set_character_set(_mysql_conn, conn_info.charset.c_str())) {
if (mysql_set_character_set(_mysql_conn, _conn_info.charset.c_str())) {
fmt::memory_buffer err_ss;
fmt::format_to(err_ss, "mysql_set_character_set failed because : {}.",
mysql_error(_mysql_conn));
return Status::InternalError(fmt::to_string(err_ss.data()));
}

_mysql_tbl = tbl;

return Status::OK();
}

Status VMysqlTableWriter::append(vectorized::Block* block) {
Status VMysqlTableWriter::append_block(vectorized::Block& block) {
Status status = Status::OK();
if (block == nullptr || block->rows() == 0) {
if (block.rows() == 0) {
return status;
}
Block output_block;
RETURN_IF_ERROR(vectorized::VExprContext::get_output_block_after_execute_exprs(
_vec_output_expr_ctxs, *block, &output_block));
_vec_output_expr_ctxs, block, &output_block));
auto num_rows = output_block.rows();
materialize_block_inplace(output_block);
for (int i = 0; i < num_rows; ++i) {
Expand All @@ -115,7 +125,7 @@ Status VMysqlTableWriter::append(vectorized::Block* block) {

Status VMysqlTableWriter::insert_row(vectorized::Block& block, size_t row) {
_insert_stmt_buffer.clear();
fmt::format_to(_insert_stmt_buffer, "INSERT INTO {} VALUES (", _mysql_tbl);
fmt::format_to(_insert_stmt_buffer, "INSERT INTO {} VALUES (", _conn_info.table_name);
int num_columns = _vec_output_expr_ctxs.size();

for (int i = 0; i < num_columns; ++i) {
Expand Down
20 changes: 8 additions & 12 deletions be/src/vec/sink/vmysql_table_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <string>
#include <vector>

#include "async_result_writer.h"
#include "common/status.h"
#include "vec/exprs/vexpr_fwd.h"

Expand All @@ -35,6 +36,7 @@ struct MysqlConnInfo {
std::string user;
std::string passwd;
std::string db;
std::string table_name;
int port;
std::string charset;

Expand All @@ -43,27 +45,21 @@ struct MysqlConnInfo {

class Block;

class VMysqlTableWriter {
class VMysqlTableWriter final : public AsyncResultWriter {
public:
VMysqlTableWriter(const VExprContextSPtrs& output_exprs);
~VMysqlTableWriter();
VMysqlTableWriter(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs);
~VMysqlTableWriter() override;

// connect to mysql server
Status open(const MysqlConnInfo& conn_info, const std::string& tbl);
Status open() override;

Status begin_trans() { return Status::OK(); }

Status append(vectorized::Block* block);

Status abort_tarns() { return Status::OK(); }

Status finish_tarns() { return Status::OK(); }
Status append_block(vectorized::Block& block) override;

private:
Status insert_row(vectorized::Block& block, size_t row);
MysqlConnInfo _conn_info;
const VExprContextSPtrs& _vec_output_expr_ctxs;
fmt::memory_buffer _insert_stmt_buffer;
std::string _mysql_tbl;
MYSQL* _mysql_conn;
};
} // namespace vectorized
Expand Down
Loading

0 comments on commit 895619f

Please sign in to comment.