Skip to content

Commit

Permalink
[enhancement](multi-table) enable mullti table routine load on pipeli…
Browse files Browse the repository at this point in the history
…ne engine (apache#21729)
  • Loading branch information
TangSiyang2001 authored Jul 14, 2023
1 parent 2c897b8 commit b013f80
Show file tree
Hide file tree
Showing 11 changed files with 114 additions and 40 deletions.
27 changes: 19 additions & 8 deletions be/src/io/file_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,23 +144,34 @@ Status FileFactory::create_file_reader(const io::FileSystemProperties& system_pr

// file scan node/stream load pipe
Status FileFactory::create_pipe_reader(const TUniqueId& load_id, io::FileReaderSPtr* file_reader,
const TUniqueId& fragment_instance_id) {
RuntimeState* runtime_state) {
auto stream_load_ctx = ExecEnv::GetInstance()->new_load_stream_mgr()->get(load_id);
if (!stream_load_ctx) {
return Status::InternalError("unknown stream load id: {}", UniqueId(load_id).to_string());
}

*file_reader = stream_load_ctx->pipe;

if (file_reader->get() != nullptr) {
auto multi_table_pipe = std::dynamic_pointer_cast<io::MultiTablePipe>(*file_reader);
if (multi_table_pipe != nullptr) {
*file_reader = multi_table_pipe->getPipe(fragment_instance_id);
LOG(INFO) << "create pipe reader for fragment instance: " << fragment_instance_id
<< " pipe: " << (*file_reader).get();
}
if (file_reader->get() == nullptr) {
return Status::OK();
}

auto multi_table_pipe = std::dynamic_pointer_cast<io::MultiTablePipe>(*file_reader);
if (multi_table_pipe == nullptr || runtime_state == nullptr) {
return Status::OK();
}

TUniqueId pipe_id;
if (runtime_state->enable_pipeline_exec()) {
pipe_id = io::StreamLoadPipe::calculate_pipe_id(runtime_state->query_id(),
runtime_state->fragment_id());
} else {
pipe_id = runtime_state->fragment_instance_id();
}
*file_reader = multi_table_pipe->getPipe(pipe_id);
LOG(INFO) << "create pipe reader for fragment instance: " << pipe_id
<< " pipe: " << (*file_reader).get();

return Status::OK();
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/io/file_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class FileFactory {

// Create FileReader for stream load pipe
static Status create_pipe_reader(const TUniqueId& load_id, io::FileReaderSPtr* file_reader,
const TUniqueId& fragment_instance_id);
RuntimeState* runtime_state);

static Status create_hdfs_reader(const THdfsParams& hdfs_params, const io::FileDescription& fd,
const io::FileReaderOptions& reader_options,
Expand Down
85 changes: 65 additions & 20 deletions be/src/io/fs/multi_table_pipe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,15 @@
#include <gen_cpp/FrontendService.h>
#include <gen_cpp/FrontendService_types.h>
#include <gen_cpp/HeartbeatService_types.h>
#include <gen_cpp/PaloInternalService_types.h>
#include <gen_cpp/Types_types.h>
#include <thrift/protocol/TDebugProtocol.h>

#include <type_traits>

#include "common/status.h"
#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"
#include "runtime/runtime_state.h"
#include "runtime/stream_load/new_load_stream_mgr.h"
Expand Down Expand Up @@ -130,7 +134,9 @@ Status MultiTablePipe::dispatch(const std::string& table, const char* data, size

#ifndef BE_TEST
Status MultiTablePipe::request_and_exec_plans() {
if (_unplanned_pipes.empty()) return Status::OK();
if (_unplanned_pipes.empty()) {
return Status::OK();
}

// get list of table names in unplanned pipes
std::vector<std::string> tables;
Expand Down Expand Up @@ -175,24 +181,52 @@ Status MultiTablePipe::request_and_exec_plans() {
return plan_status;
}

Status st;
if (_ctx->multi_table_put_result.__isset.params &&
!_ctx->multi_table_put_result.__isset.pipeline_params) {
st = exec_plans(exec_env, _ctx->multi_table_put_result.params);
} else if (!_ctx->multi_table_put_result.__isset.params &&
_ctx->multi_table_put_result.__isset.pipeline_params) {
st = exec_plans(exec_env, _ctx->multi_table_put_result.pipeline_params);
} else {
return Status::Aborted("too many or too few params are set in multi_table_put_result.");
}

return st;
}

template <typename ExecParam>
Status MultiTablePipe::exec_plans(ExecEnv* exec_env, std::vector<ExecParam> params) {
// put unplanned pipes into planned pipes and clear unplanned pipes
for (auto& pipe : _unplanned_pipes) {
_ctx->table_list.push_back(pipe.first);
_planned_pipes.emplace(pipe.first, pipe.second);
}
LOG(INFO) << fmt::format("{} tables plan complete, planned table cnt={}, returned plan cnt={}",
_unplanned_pipes.size(), _planned_pipes.size(),
_ctx->multi_table_put_result.params.size());
_unplanned_pipes.size(), _planned_pipes.size(), params.size());
_unplanned_pipes.clear();

_inflight_plan_cnt += _ctx->multi_table_put_result.params.size();
for (auto& plan : _ctx->multi_table_put_result.params) {
// TODO: use pipeline in the future (currently is buggy for load)
DCHECK_EQ(plan.__isset.table_name, true);
DCHECK(_planned_pipes.find(plan.table_name) != _planned_pipes.end());
putPipe(plan.params.fragment_instance_id, _planned_pipes[plan.table_name]);
LOG(INFO) << "fragment_instance_id=" << plan.params.fragment_instance_id
<< " table=" << plan.table_name;
_inflight_plan_cnt += params.size();
for (auto& plan : params) {
if (!plan.__isset.table_name ||
_planned_pipes.find(plan.table_name) == _planned_pipes.end()) {
return Status::Aborted("Missing vital param: table_name");
}

if constexpr (std::is_same_v<ExecParam, TExecPlanFragmentParams>) {
putPipe(plan.params.fragment_instance_id, _planned_pipes[plan.table_name]);
LOG(INFO) << "fragment_instance_id=" << plan.params.fragment_instance_id
<< " table=" << plan.table_name;
} else if constexpr (std::is_same_v<ExecParam, TPipelineFragmentParams>) {
auto pipe_id = calculate_pipe_id(plan.query_id, plan.fragment_id);
putPipe(pipe_id, _planned_pipes[plan.table_name]);
LOG(INFO) << "pipe_id=" << pipe_id << "table=" << plan.table_name;
} else {
LOG(WARNING) << "illegal exec param type, need `TExecPlanFragmentParams` or "
"`TPipelineFragmentParams`, will crash";
CHECK(false);
}

exec_env->fragment_mgr()->exec_plan_fragment(plan, [this](RuntimeState* state,
Status* status) {
{
Expand Down Expand Up @@ -243,6 +277,7 @@ Status MultiTablePipe::request_and_exec_plans() {

return Status::OK();
}

#else
Status MultiTablePipe::request_and_exec_plans() {
// put unplanned pipes into planned pipes
Expand All @@ -254,36 +289,46 @@ Status MultiTablePipe::request_and_exec_plans() {
_unplanned_pipes.clear();
return Status::OK();
}

template <typename ExecParam>
Status MultiTablePipe::exec_plans(ExecEnv* exec_env, std::vector<ExecParam> params) {
return Status::OK();
}

#endif

Status MultiTablePipe::putPipe(const TUniqueId& fragment_instance_id,
std::shared_ptr<io::StreamLoadPipe> pipe) {
Status MultiTablePipe::putPipe(const TUniqueId& pipe_id, std::shared_ptr<io::StreamLoadPipe> pipe) {
std::lock_guard<std::mutex> l(_pipe_map_lock);
auto it = _pipe_map.find(fragment_instance_id);
auto it = _pipe_map.find(pipe_id);
if (it != std::end(_pipe_map)) {
return Status::InternalError("id already exist");
}
_pipe_map.emplace(fragment_instance_id, pipe);
_pipe_map.emplace(pipe_id, pipe);
return Status::OK();
}

std::shared_ptr<io::StreamLoadPipe> MultiTablePipe::getPipe(const TUniqueId& fragment_instance_id) {
std::shared_ptr<io::StreamLoadPipe> MultiTablePipe::getPipe(const TUniqueId& pipe_id) {
std::lock_guard<std::mutex> l(_pipe_map_lock);
auto it = _pipe_map.find(fragment_instance_id);
auto it = _pipe_map.find(pipe_id);
if (it == std::end(_pipe_map)) {
return std::shared_ptr<io::StreamLoadPipe>(nullptr);
}
return it->second;
}

void MultiTablePipe::removePipe(const TUniqueId& fragment_instance_id) {
void MultiTablePipe::removePipe(const TUniqueId& pipe_id) {
std::lock_guard<std::mutex> l(_pipe_map_lock);
auto it = _pipe_map.find(fragment_instance_id);
auto it = _pipe_map.find(pipe_id);
if (it != std::end(_pipe_map)) {
_pipe_map.erase(it);
VLOG_NOTICE << "remove stream load pipe: " << fragment_instance_id;
VLOG_NOTICE << "remove stream load pipe: " << pipe_id;
}
}

template Status MultiTablePipe::exec_plans(ExecEnv* exec_env,
std::vector<TExecPlanFragmentParams> params);
template Status MultiTablePipe::exec_plans(ExecEnv* exec_env,
std::vector<TPipelineFragmentParams> params);

} // namespace io
} // namespace doris
3 changes: 3 additions & 0 deletions be/src/io/fs/multi_table_pipe.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ class MultiTablePipe : public KafkaConsumerPipe {
// [thread-unsafe] dispatch data to corresponding KafkaConsumerPipe
Status dispatch(const std::string& table, const char* data, size_t size, AppendFunc cb);

template <typename ExecParam>
Status exec_plans(ExecEnv* exec_env, std::vector<ExecParam> params);

private:
std::unordered_map<std::string /*table*/, KafkaConsumerPipePtr> _planned_pipes;
std::unordered_map<std::string /*table*/, KafkaConsumerPipePtr> _unplanned_pipes;
Expand Down
7 changes: 7 additions & 0 deletions be/src/io/fs/stream_load_pipe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -250,5 +250,12 @@ void StreamLoadPipe::cancel(const std::string& reason) {
_put_cond.notify_all();
}

TUniqueId StreamLoadPipe::calculate_pipe_id(const UniqueId& query_id, int32_t fragment_id) {
TUniqueId pipe_id;
pipe_id.lo = query_id.lo + fragment_id;
pipe_id.hi = query_id.hi;
return pipe_id;
}

} // namespace io
} // namespace doris
3 changes: 3 additions & 0 deletions be/src/io/fs/stream_load_pipe.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ class StreamLoadPipe : public MessageBodySink, public FileReader {

size_t get_queue_size() { return _buf_queue.size(); }

// used for pipeline load, which use TUniqueId(lo: query_id.lo + fragment_id, hi: query_id.hi) as pipe_id
static TUniqueId calculate_pipe_id(const UniqueId& query_id, int32_t fragment_id);

protected:
Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const IOContext* io_ctx) override;
Expand Down
6 changes: 3 additions & 3 deletions be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,9 +217,9 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re
.tag("pthread_id", (uintptr_t)pthread_self());

// 1. init _runtime_state
_runtime_state =
RuntimeState::create_unique(local_params, request.query_id, request.query_options,
_query_ctx->query_globals, _exec_env);
_runtime_state = RuntimeState::create_unique(local_params, request.query_id,
request.fragment_id, request.query_options,
_query_ctx->query_globals, _exec_env);
_runtime_state->set_query_ctx(_query_ctx.get());
_runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker);
_runtime_state->set_tracer(std::move(tracer));
Expand Down
6 changes: 4 additions & 2 deletions be/src/runtime/runtime_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,15 +99,17 @@ RuntimeState::RuntimeState(const TPlanFragmentExecParams& fragment_exec_params,
}

RuntimeState::RuntimeState(const TPipelineInstanceParams& pipeline_params,
const TUniqueId& query_id, const TQueryOptions& query_options,
const TQueryGlobals& query_globals, ExecEnv* exec_env)
const TUniqueId& query_id, int32_t fragment_id,
const TQueryOptions& query_options, const TQueryGlobals& query_globals,
ExecEnv* exec_env)
: _profile("Fragment " + print_id(pipeline_params.fragment_instance_id)),
_load_channel_profile("<unnamed>"),
_obj_pool(new ObjectPool()),
_runtime_filter_mgr(new RuntimeFilterMgr(query_id, this)),
_data_stream_recvrs_pool(new ObjectPool()),
_unreported_error_idx(0),
_query_id(query_id),
_fragment_id(fragment_id),
_is_cancelled(false),
_per_fragment_instance_idx(0),
_num_rows_load_total(0),
Expand Down
9 changes: 7 additions & 2 deletions be/src/runtime/runtime_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/factory_creator.h"
#include "common/status.h"
#include "gutil/integral_types.h"
#include "util/runtime_profile.h"
#include "util/telemetry/telemetry.h"

Expand Down Expand Up @@ -66,8 +67,8 @@ class RuntimeState {
ExecEnv* exec_env);

RuntimeState(const TPipelineInstanceParams& pipeline_params, const TUniqueId& query_id,
const TQueryOptions& query_options, const TQueryGlobals& query_globals,
ExecEnv* exec_env);
int32 fragment_id, const TQueryOptions& query_options,
const TQueryGlobals& query_globals, ExecEnv* exec_env);

// RuntimeState for executing expr in fe-support.
RuntimeState(const TQueryGlobals& query_globals);
Expand Down Expand Up @@ -115,6 +116,8 @@ class RuntimeState {
const std::string& user() const { return _user; }
const TUniqueId& query_id() const { return _query_id; }
const TUniqueId& fragment_instance_id() const { return _fragment_instance_id; }
// should only be called in pipeline engine
int32_t fragment_id() const { return _fragment_id; }
ExecEnv* exec_env() { return _exec_env; }
std::shared_ptr<MemTrackerLimiter> query_mem_tracker() const;

Expand Down Expand Up @@ -460,6 +463,8 @@ class RuntimeState {
cctz::time_zone _timezone_obj;

TUniqueId _query_id;
// fragment id for each TPipelineFragmentParams
int32_t _fragment_id;
TUniqueId _fragment_instance_id;
TQueryOptions _query_options;
ExecEnv* _exec_env = nullptr;
Expand Down
3 changes: 1 addition & 2 deletions be/src/vec/exec/format/csv/csv_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,7 @@ Status CsvReader::init_reader(bool is_load) {
_file_description.start_offset = start_offset;

if (_params.file_type == TFileType::FILE_STREAM) {
RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &_file_reader,
_state->fragment_instance_id()));
RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &_file_reader, _state));
} else {
io::FileReaderOptions reader_options = FileFactory::get_reader_options(_state);
_file_description.mtime = _range.__isset.modification_time ? _range.modification_time : 0;
Expand Down
3 changes: 1 addition & 2 deletions be/src/vec/exec/format/json/new_json_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -378,8 +378,7 @@ Status NewJsonReader::_open_file_reader() {
_file_description.start_offset = start_offset;

if (_params.file_type == TFileType::FILE_STREAM) {
RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &_file_reader,
_state->fragment_instance_id()));
RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &_file_reader, _state));
} else {
io::FileReaderOptions reader_options = FileFactory::get_reader_options(_state);
_file_description.mtime = _range.__isset.modification_time ? _range.modification_time : 0;
Expand Down

0 comments on commit b013f80

Please sign in to comment.