Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/job-task-storage' into job-task-…
Browse files Browse the repository at this point in the history
…storage

# Conflicts:
#	fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/JobTaskManager.java
  • Loading branch information
CalvinKirs committed Oct 7, 2023
2 parents 8b237ff + fdda730 commit b4d6991
Show file tree
Hide file tree
Showing 429 changed files with 11,668 additions and 5,273 deletions.
6 changes: 6 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1088,6 +1088,12 @@ DEFINE_Int32(grace_shutdown_wait_seconds, "120");

DEFINE_Int16(bitmap_serialize_version, "1");

// group commit insert config
DEFINE_String(group_commit_replay_wal_dir, "./wal");
DEFINE_Int32(group_commit_replay_wal_retry_num, "10");
DEFINE_Int32(group_commit_replay_wal_retry_interval_seconds, "5");
DEFINE_Int32(group_commit_sync_wal_batch, "10");

// the count of thread to group commit insert
DEFINE_Int32(group_commit_insert_threads, "10");

Expand Down
6 changes: 6 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1159,6 +1159,12 @@ DECLARE_Int32(grace_shutdown_wait_seconds);
// BitmapValue serialize version.
DECLARE_Int16(bitmap_serialize_version);

// group commit insert config
DECLARE_String(group_commit_replay_wal_dir);
DECLARE_Int32(group_commit_replay_wal_retry_num);
DECLARE_Int32(group_commit_replay_wal_retry_interval_seconds);
DECLARE_Int32(group_commit_sync_wal_batch);

// This config can be set to limit thread number in group commit insert thread pool.
DECLARE_mInt32(group_commit_insert_threads);

Expand Down
21 changes: 19 additions & 2 deletions be/src/http/action/http_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"
#include "runtime/group_commit_mgr.h"
#include "runtime/load_path_mgr.h"
#include "runtime/plan_fragment_executor.h"
#include "runtime/stream_load/new_load_stream_mgr.h"
Expand Down Expand Up @@ -139,6 +140,11 @@ Status HttpStreamAction::_handle(HttpRequest* http_req, std::shared_ptr<StreamLo
// wait stream load finish
RETURN_IF_ERROR(ctx->future.get());

if (ctx->group_commit) {
LOG(INFO) << "skip commit because this is group commit, pipe_id=" << ctx->id.to_string();
return Status::OK();
}

int64_t commit_and_publish_start_time = MonotonicNanos();
RETURN_IF_ERROR(_exec_env->stream_load_executor()->commit_txn(ctx.get()));
ctx->commit_and_publish_txn_cost_nanos = MonotonicNanos() - commit_and_publish_start_time;
Expand All @@ -158,6 +164,7 @@ int HttpStreamAction::on_header(HttpRequest* req) {
if (ctx->label.empty()) {
ctx->label = generate_uuid_string();
}
ctx->group_commit = iequal(req->header(HTTP_GROUP_COMMIT), "true");

LOG(INFO) << "new income streaming load request." << ctx->brief()
<< " sql : " << req->header(HTTP_SQL);
Expand Down Expand Up @@ -223,12 +230,13 @@ void HttpStreamAction::on_chunk_data(HttpRequest* req) {
if (ctx == nullptr || !ctx->status.ok()) {
return;
}

if (!req->header(HTTP_WAL_ID_KY).empty()) {
ctx->wal_id = std::stoll(req->header(HTTP_WAL_ID_KY));
}
struct evhttp_request* ev_req = req->get_evhttp_request();
auto evbuf = evhttp_request_get_input_buffer(ev_req);

int64_t start_read_data_time = MonotonicNanos();

while (evbuffer_get_length(evbuf) > 0) {
auto bb = ByteBuffer::allocate(128 * 1024);
auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity);
Expand Down Expand Up @@ -283,6 +291,7 @@ Status HttpStreamAction::_process_put(HttpRequest* http_req,
request.__set_load_sql(http_req->header(HTTP_SQL));
request.__set_loadId(ctx->id.to_thrift());
request.__set_label(ctx->label);
request.__set_group_commit(ctx->group_commit);
if (_exec_env->master_info()->__isset.backend_id) {
request.__set_backend_id(_exec_env->master_info()->backend_id);
} else {
Expand All @@ -306,6 +315,14 @@ Status HttpStreamAction::_process_put(HttpRequest* http_req,
ctx->db = ctx->put_result.params.db_name;
ctx->table = ctx->put_result.params.table_name;
ctx->txn_id = ctx->put_result.params.txn_conf.txn_id;
ctx->put_result.params.__set_wal_id(ctx->wal_id);

if (ctx->group_commit) {
ctx->db_id = ctx->put_result.db_id;
ctx->table_id = ctx->put_result.table_id;
ctx->schema_version = ctx->put_result.base_schema_version;
return _exec_env->group_commit_mgr()->group_commit_stream_load(ctx);
}
return _exec_env->stream_load_executor()->execute_plan_fragment(ctx);
}

Expand Down
38 changes: 32 additions & 6 deletions be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
#include "olap/storage_engine.h"
#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
#include "runtime/group_commit_mgr.h"
#include "runtime/load_path_mgr.h"
#include "runtime/message_body_sink.h"
#include "runtime/stream_load/new_load_stream_mgr.h"
Expand Down Expand Up @@ -153,6 +154,11 @@ Status StreamLoadAction::_handle(std::shared_ptr<StreamLoadContext> ctx) {
// wait stream load finish
RETURN_IF_ERROR(ctx->future.get());

if (ctx->group_commit) {
LOG(INFO) << "skip commit because this is group commit, pipe_id=" << ctx->id.to_string();
return Status::OK();
}

if (ctx->two_phase_commit) {
int64_t pre_commit_start_time = MonotonicNanos();
RETURN_IF_ERROR(_exec_env->stream_load_executor()->pre_commit_txn(ctx.get()));
Expand All @@ -178,16 +184,26 @@ int StreamLoadAction::on_header(HttpRequest* req) {
url_decode(req->param(HTTP_DB_KEY), &ctx->db);
url_decode(req->param(HTTP_TABLE_KEY), &ctx->table);
ctx->label = req->header(HTTP_LABEL_KEY);
if (ctx->label.empty()) {
ctx->label = generate_uuid_string();
Status st = Status::OK();
if (iequal(req->header(HTTP_GROUP_COMMIT), "true")) {
if (!ctx->label.empty()) {
st = Status::InternalError("label and group_commit can't be set at the same time");
}
ctx->group_commit = true;
} else {
if (ctx->label.empty()) {
ctx->label = generate_uuid_string();
}
}

ctx->two_phase_commit = req->header(HTTP_TWO_PHASE_COMMIT) == "true" ? true : false;

LOG(INFO) << "new income streaming load request." << ctx->brief() << ", db=" << ctx->db
<< ", tbl=" << ctx->table;

auto st = _on_header(req, ctx);
if (st.ok()) {
st = _on_header(req, ctx);
}
if (!st.ok()) {
ctx->status = std::move(st);
if (ctx->need_rollback) {
Expand Down Expand Up @@ -287,9 +303,11 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, std::shared_ptr<Strea
ctx->load_comment = http_req->header(HTTP_COMMENT);
}
// begin transaction
int64_t begin_txn_start_time = MonotonicNanos();
RETURN_IF_ERROR(_exec_env->stream_load_executor()->begin_txn(ctx.get()));
ctx->begin_txn_cost_nanos = MonotonicNanos() - begin_txn_start_time;
if (!ctx->group_commit) {
int64_t begin_txn_start_time = MonotonicNanos();
RETURN_IF_ERROR(_exec_env->stream_load_executor()->begin_txn(ctx.get()));
ctx->begin_txn_cost_nanos = MonotonicNanos() - begin_txn_start_time;
}

// process put file
return _process_put(http_req, ctx);
Expand Down Expand Up @@ -555,6 +573,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req,
bool value = iequal(http_req->header(HTTP_MEMTABLE_ON_SINKNODE), "true");
request.__set_memtable_on_sink_node(value);
}
request.__set_group_commit(ctx->group_commit);

#ifndef BE_TEST
// plan this load
Expand Down Expand Up @@ -582,6 +601,13 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req,
return Status::OK();
}

if (ctx->group_commit) {
ctx->db_id = ctx->put_result.db_id;
ctx->table_id = ctx->put_result.table_id;
ctx->schema_version = ctx->put_result.base_schema_version;
return _exec_env->group_commit_mgr()->group_commit_stream_load(ctx);
}

return _exec_env->stream_load_executor()->execute_plan_fragment(ctx);
}

Expand Down
3 changes: 3 additions & 0 deletions be/src/http/http_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,5 +63,8 @@ static const std::string HTTP_TWO_PHASE_COMMIT = "two_phase_commit";
static const std::string HTTP_TXN_ID_KEY = "txn_id";
static const std::string HTTP_TXN_OPERATION_KEY = "txn_operation";
static const std::string HTTP_MEMTABLE_ON_SINKNODE = "memtable_on_sink_node";
static const std::string HTTP_WAL_ID_KY = "wal_id";
static const std::string HTTP_AUTH_CODE = "auth_code";
static const std::string HTTP_GROUP_COMMIT = "group_commit";

} // namespace doris
10 changes: 7 additions & 3 deletions be/src/http/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "common/status.h"
#include "common/utils.h"
#include "http/http_channel.h"
#include "http/http_common.h"
#include "http/http_headers.h"
#include "http/http_method.h"
#include "http/http_request.h"
Expand Down Expand Up @@ -72,7 +73,12 @@ bool parse_basic_auth(const HttpRequest& req, std::string* user, std::string* pa

bool parse_basic_auth(const HttpRequest& req, AuthInfo* auth) {
auto& token = req.header("token");
if (token.empty()) {
auto& auth_code = req.header(HTTP_AUTH_CODE);
if (!token.empty()) {
auth->token = token;
} else if (!auth_code.empty()) {
auth->auth_code = std::stoll(auth_code);
} else {
std::string full_user;
if (!parse_basic_auth(req, &full_user, &auth->passwd)) {
return false;
Expand All @@ -84,8 +90,6 @@ bool parse_basic_auth(const HttpRequest& req, AuthInfo* auth) {
} else {
auth->user = full_user;
}
} else {
auth->token = token;
}

// set user ip
Expand Down
4 changes: 3 additions & 1 deletion be/src/olap/delete_bitmap_calculator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ bool MergeIndexDeleteBitmapCalculatorContext::Comparator::operator()(

bool MergeIndexDeleteBitmapCalculatorContext::Comparator::is_key_same(Slice const& lhs,
Slice const& rhs) const {
DCHECK(lhs.get_size() >= _sequence_length);
DCHECK(rhs.get_size() >= _sequence_length);
auto lhs_without_seq = Slice(lhs.get_data(), lhs.get_size() - _sequence_length);
auto rhs_without_seq = Slice(rhs.get_data(), rhs.get_size() - _sequence_length);
return lhs_without_seq.compare(rhs_without_seq) == 0;
Expand Down Expand Up @@ -154,7 +156,7 @@ Status MergeIndexDeleteBitmapCalculator::calculate_one(RowLocation& loc) {
_heap->pop();
Slice cur_key;
RETURN_IF_ERROR(cur_ctx->get_current_key(cur_key));
if (_comparator.is_key_same(cur_key, _last_key)) {
if (!_last_key.empty() && _comparator.is_key_same(cur_key, _last_key)) {
loc.segment_id = cur_ctx->segment_id();
loc.row_id = cur_ctx->row_id();
auto st = cur_ctx->advance();
Expand Down
29 changes: 29 additions & 0 deletions be/src/olap/push_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "olap/push_handler.h"

#include <fmt/core.h>
#include <gen_cpp/AgentService_types.h>
#include <gen_cpp/Descriptors_types.h>
#include <gen_cpp/MasterService_types.h>
Expand All @@ -25,6 +26,7 @@
#include <gen_cpp/Types_types.h>
#include <gen_cpp/olap_file.pb.h>
#include <gen_cpp/types.pb.h>
#include <glog/logging.h>

#include <algorithm>
#include <iostream>
Expand Down Expand Up @@ -138,6 +140,33 @@ Status PushHandler::_do_streaming_ingestion(TabletSharedPtr tablet, const TPushR
DeletePredicatePB del_pred;
TabletSchema tablet_schema;
tablet_schema.copy_from(*tablet->tablet_schema());
for (const auto& delete_cond : request.delete_conditions) {
if (!delete_cond.__isset.column_unique_id) {
LOG(WARNING) << "column=" << delete_cond.column_name
<< " in predicate does not have uid, table id="
<< tablet_schema.table_id();
// TODO(tsy): make it fail here after FE forbidding hard-link-schema-change
continue;
}
if (tablet_schema.field_index(delete_cond.column_unique_id) == -1) {
const auto& err_msg =
fmt::format("column id={} does not exists, table id={}",
delete_cond.column_unique_id, tablet_schema.table_id());
LOG(WARNING) << err_msg;
DCHECK(false);
return Status::Aborted(err_msg);
}
if (tablet_schema.column_by_uid(delete_cond.column_unique_id).name() !=
delete_cond.column_name) {
const auto& err_msg = fmt::format(
"colum name={} does not belongs to column uid={}, which column name={}",
delete_cond.column_name, delete_cond.column_unique_id,
tablet_schema.column_by_uid(delete_cond.column_unique_id).name());
LOG(WARNING) << err_msg;
DCHECK(false);
return Status::Aborted(err_msg);
}
}
if (!request.columns_desc.empty() && request.columns_desc[0].col_unique_id >= 0) {
tablet_schema.clear_columns();
for (const auto& column_desc : request.columns_desc) {
Expand Down
22 changes: 20 additions & 2 deletions be/src/olap/rowset/segment_v2/segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,17 @@ Status SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_f
// build default value columns
auto default_value_block = old_value_block.clone_empty();
auto mutable_default_value_columns = default_value_block.mutate_columns();
if (has_default_or_nullable) {

const vectorized::Int8* delete_sign_column_data = nullptr;
if (const vectorized::ColumnWithTypeAndName* delete_sign_column =
old_value_block.try_get_by_name(DELETE_SIGN);
delete_sign_column != nullptr && _tablet_schema->has_sequence_col()) {
auto& delete_sign_col =
reinterpret_cast<const vectorized::ColumnInt8&>(*(delete_sign_column->column));
delete_sign_column_data = delete_sign_col.get_data().data();
}

if (has_default_or_nullable || delete_sign_column_data != nullptr) {
for (auto i = 0; i < cids_missing.size(); ++i) {
const auto& column = _tablet_schema->column(cids_missing[i]);
if (column.has_default_value()) {
Expand All @@ -600,7 +610,15 @@ Status SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_f

// fill all missing value from mutable_old_columns, need to consider default value and null value
for (auto idx = 0; idx < use_default_or_null_flag.size(); idx++) {
if (use_default_or_null_flag[idx]) {
// `use_default_or_null_flag[idx] == true` doesn't mean that we should read values from the old row
// for the missing columns. For example, if a table has sequence column, the rows with DELETE_SIGN column
// marked will not be marked in delete bitmap(see https://github.com/apache/doris/pull/24011), so it will
// be found in Tablet::lookup_row_key() and `use_default_or_null_flag[idx]` will be false. But we should not
// read values from old rows for missing values in this occasion. So we should read the DELETE_SIGN column
// to check if a row REALLY exists in the table.
if (use_default_or_null_flag[idx] ||
(delete_sign_column_data != nullptr &&
delete_sign_column_data[read_index[idx + segment_start_pos]] != 0)) {
for (auto i = 0; i < cids_missing.size(); ++i) {
// if the column has default value, fiil it with default value
// otherwise, if the column is nullable, fill it with null value
Expand Down
11 changes: 8 additions & 3 deletions be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@
#include "util/time.h"
#include "util/trace.h"
#include "util/uid_util.h"
#include "util/work_thread_pool.hpp"
#include "vec/columns/column.h"
#include "vec/columns/column_string.h"
#include "vec/common/string_ref.h"
Expand Down Expand Up @@ -174,7 +175,11 @@ struct WriteCooldownMetaExecutors {
};
// Each executor is a mpsc to ensure uploads of the same tablet meta are not concurrent
// FIXME(AlexYue): Use mpsc instead of `ThreadPool` with 1 thread
std::vector<std::unique_ptr<ThreadPool>> _executors;
// We use PriorityThreadPool since it would call status inside it's `shutdown` function.
// Consider one situation where the StackTraceCache's singleton is detructed before
// this WriteCooldownMetaExecutors's singleton, then invoking the status would also call
// StackTraceCache which would then result in heap use after free like #23834
std::vector<std::unique_ptr<PriorityThreadPool>> _executors;
std::unordered_set<int64_t> _pending_tablets;
std::mutex _latch;
size_t _executor_nums;
Expand All @@ -183,7 +188,7 @@ struct WriteCooldownMetaExecutors {
WriteCooldownMetaExecutors::WriteCooldownMetaExecutors(size_t executor_nums)
: _executor_nums(executor_nums) {
for (size_t i = 0; i < _executor_nums; i++) {
std::unique_ptr<ThreadPool> pool;
std::unique_ptr<PriorityThreadPool> pool;
ThreadPoolBuilder("WriteCooldownMetaExecutor")
.set_min_threads(1)
.set_max_threads(1)
Expand Down Expand Up @@ -230,7 +235,7 @@ void WriteCooldownMetaExecutors::WriteCooldownMetaExecutors::submit(TabletShared
VLOG_DEBUG << "tablet " << t->tablet_id() << " is not cooldown replica";
};

_executors[_get_executor_pos(tablet_id)]->submit_func(
_executors[_get_executor_pos(tablet_id)]->offer(
[task = std::move(async_write_task)]() { task(); });
}

Expand Down
Loading

0 comments on commit b4d6991

Please sign in to comment.