Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into master-cancel-job…
Browse files Browse the repository at this point in the history
…-task

# Conflicts:
#	fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
  • Loading branch information
CalvinKirs committed Dec 5, 2023
2 parents 301604f + 6074cdd commit 8077cb5
Show file tree
Hide file tree
Showing 818 changed files with 43,331 additions and 22,464 deletions.
1 change: 1 addition & 0 deletions .asf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ github:
- gitccl
- shuke987
- wm1581066
- KassieZ

notifications:
pullrequests_status: [email protected]
Expand Down
152 changes: 84 additions & 68 deletions be/src/agent/agent_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,14 @@
#include "agent/topic_subscriber.h"
#include "agent/utils.h"
#include "agent/workload_group_listener.h"
#include "common/config.h"
#include "common/logging.h"
#include "common/status.h"
#include "gutil/strings/substitute.h"
#include "olap/olap_define.h"
#include "olap/options.h"
#include "olap/snapshot_manager.h"
#include "olap/storage_engine.h"
#include "runtime/exec_env.h"

using std::string;
Expand All @@ -46,7 +48,7 @@ namespace doris {

AgentServer::AgentServer(ExecEnv* exec_env, const TMasterInfo& master_info)
: _master_info(master_info), _topic_subscriber(new TopicSubscriber()) {
for (auto& path : exec_env->store_paths()) {
for (const auto& path : exec_env->store_paths()) {
try {
string dpp_download_path_str = path.path + "/" + DPP_PREFIX;
std::filesystem::path dpp_download_path(dpp_download_path_str);
Expand All @@ -60,73 +62,9 @@ AgentServer::AgentServer(ExecEnv* exec_env, const TMasterInfo& master_info)

MasterServerClient::create(master_info);

// It is the same code to create workers of each type, so we use a macro
// to make code to be more readable.

#ifndef BE_TEST
#define CREATE_AND_START_POOL(type, pool_name) \
pool_name.reset(new TaskWorkerPool(TaskWorkerPool::TaskWorkerType::type, exec_env, \
master_info, TaskWorkerPool::ThreadModel::MULTI_THREADS)); \
pool_name->start();

#define CREATE_AND_START_THREAD(type, pool_name) \
pool_name.reset(new TaskWorkerPool(TaskWorkerPool::TaskWorkerType::type, exec_env, \
master_info, TaskWorkerPool::ThreadModel::SINGLE_THREAD)); \
pool_name->start();
#else
#define CREATE_AND_START_POOL(type, pool_name)
#define CREATE_AND_START_THREAD(type, pool_name)
#endif // BE_TEST

#ifndef BE_TEST
_create_tablet_workers.reset(
new CreateTableTaskPool(exec_env, TaskWorkerPool::ThreadModel::MULTI_THREADS));
_create_tablet_workers->start();
_drop_tablet_workers.reset(
new DropTableTaskPool(exec_env, TaskWorkerPool::ThreadModel::MULTI_THREADS));
_drop_tablet_workers->start();

// Both PUSH and REALTIME_PUSH type use _push_load_workers
_push_load_workers.reset(new PushTaskPool(exec_env, TaskWorkerPool::ThreadModel::MULTI_THREADS,
PushTaskPool::PushWokerType::LOAD_V2));
_push_load_workers->start();
_publish_version_workers.reset(
new PublishVersionTaskPool(exec_env, TaskWorkerPool::ThreadModel::MULTI_THREADS));
_publish_version_workers->start();
_clear_transaction_task_workers.reset(
new ClearTransactionTaskPool(exec_env, TaskWorkerPool::ThreadModel::MULTI_THREADS));
_clear_transaction_task_workers->start();
_push_delete_workers.reset(new PushTaskPool(exec_env,
TaskWorkerPool::ThreadModel::MULTI_THREADS,
PushTaskPool::PushWokerType::DELETE));
_push_delete_workers->start();
_alter_tablet_workers.reset(
new AlterTableTaskPool(exec_env, TaskWorkerPool::ThreadModel::MULTI_THREADS));
_alter_tablet_workers->start();
_clone_workers.reset(new CloneTaskPool(exec_env, TaskWorkerPool::ThreadModel::MULTI_THREADS));
_clone_workers->start();
_storage_medium_migrate_workers.reset(
new StorageMediumMigrateTaskPool(exec_env, TaskWorkerPool::ThreadModel::MULTI_THREADS));
_storage_medium_migrate_workers->start();
start_workers(exec_env);
#endif
CREATE_AND_START_POOL(ALTER_INVERTED_INDEX, _alter_inverted_index_workers);
CREATE_AND_START_POOL(CHECK_CONSISTENCY, _check_consistency_workers);
CREATE_AND_START_POOL(UPLOAD, _upload_workers);
CREATE_AND_START_POOL(DOWNLOAD, _download_workers);
CREATE_AND_START_POOL(MAKE_SNAPSHOT, _make_snapshot_workers);
CREATE_AND_START_POOL(RELEASE_SNAPSHOT, _release_snapshot_workers);
CREATE_AND_START_POOL(MOVE, _move_dir_workers);
CREATE_AND_START_POOL(UPDATE_TABLET_META_INFO, _update_tablet_meta_info_workers);
CREATE_AND_START_THREAD(PUSH_COOLDOWN_CONF, _push_cooldown_conf_workers);

CREATE_AND_START_THREAD(REPORT_TASK, _report_task_workers);
CREATE_AND_START_THREAD(REPORT_DISK_STATE, _report_disk_state_workers);
CREATE_AND_START_THREAD(REPORT_OLAP_TABLE, _report_tablet_workers);
CREATE_AND_START_POOL(SUBMIT_TABLE_COMPACTION, _submit_table_compaction_workers);
CREATE_AND_START_POOL(PUSH_STORAGE_POLICY, _push_storage_policy_workers);
CREATE_AND_START_THREAD(GC_BINLOG, _gc_binlog_workers);
#undef CREATE_AND_START_POOL
#undef CREATE_AND_START_THREAD

#if !defined(BE_TEST) && !defined(__APPLE__)
// Add subscriber here and register listeners
Expand All @@ -137,7 +75,85 @@ AgentServer::AgentServer(ExecEnv* exec_env, const TMasterInfo& master_info)
#endif
}

AgentServer::~AgentServer() {}
AgentServer::~AgentServer() = default;

void AgentServer::start_workers(ExecEnv* exec_env) {
// TODO(plat1ko): CloudStorageEngine
auto& engine = *StorageEngine::instance();
// clang-format off
_alter_inverted_index_workers = std::make_unique<TaskWorkerPool>(
"ALTER_INVERTED_INDEX", config::alter_index_worker_count, [&engine](auto&& task) { return alter_inverted_index_callback(engine, task); });

_check_consistency_workers = std::make_unique<TaskWorkerPool>(
"CHECK_CONSISTENCY", config::check_consistency_worker_count, [&engine](auto&& task) { return check_consistency_callback(engine, task); });

_upload_workers = std::make_unique<TaskWorkerPool>(
"UPLOAD", config::upload_worker_count, [&engine, exec_env](auto&& task) { return upload_callback(engine, exec_env, task); });

_download_workers = std::make_unique<TaskWorkerPool>(
"DOWNLOAD", config::download_worker_count, [&engine, exec_env](auto&& task) { return download_callback(engine, exec_env, task); });

_make_snapshot_workers = std::make_unique<TaskWorkerPool>(
"MAKE_SNAPSHOT", config::make_snapshot_worker_count, [&engine](auto&& task) { return make_snapshot_callback(engine, task); });

_release_snapshot_workers = std::make_unique<TaskWorkerPool>(
"RELEASE_SNAPSHOT", config::release_snapshot_worker_count, [&engine](auto&& task) { return release_snapshot_callback(engine, task); });

_move_dir_workers = std::make_unique<TaskWorkerPool>(
"MOVE", 1, [&engine, exec_env](auto&& task) { return move_dir_callback(engine, exec_env, task); });

_submit_table_compaction_workers = std::make_unique<TaskWorkerPool>(
"SUBMIT_TABLE_COMPACTION", 1, [&engine](auto&& task) { return submit_table_compaction_callback(engine, task); });

_push_storage_policy_workers = std::make_unique<TaskWorkerPool>(
"PUSH_STORAGE_POLICY", 1, [&engine](auto&& task) { return push_storage_policy_callback(engine, task); });

_push_cooldown_conf_workers = std::make_unique<TaskWorkerPool>(
"PUSH_COOLDOWN_CONF", 1, [&engine](auto&& task) { return push_cooldown_conf_callback(engine, task); });

_create_tablet_workers = std::make_unique<TaskWorkerPool>(
"CREATE_TABLE", config::create_tablet_worker_count, [&engine](auto&& task) { return create_tablet_callback(engine, task); });

_drop_tablet_workers = std::make_unique<TaskWorkerPool>(
"DROP_TABLE", config::drop_tablet_worker_count, [&engine](auto&& task) { return drop_tablet_callback(engine, task); });

_publish_version_workers = std::make_unique<PublishVersionWorkerPool>(engine);

_clear_transaction_task_workers = std::make_unique<TaskWorkerPool>(
"CLEAR_TRANSACTION_TASK", config::clear_transaction_task_worker_count, [&engine](auto&& task) { return clear_transaction_task_callback(engine, task); });

_push_delete_workers = std::make_unique<TaskWorkerPool>(
"DELETE", config::delete_worker_count, push_callback);

// Both PUSH and REALTIME_PUSH type use push_callback
_push_load_workers = std::make_unique<PriorTaskWorkerPool>(
"PUSH", config::push_worker_count_normal_priority, config::push_worker_count_high_priority, push_callback);

_update_tablet_meta_info_workers = std::make_unique<TaskWorkerPool>(
"UPDATE_TABLET_META_INFO", 1, [&engine](auto&& task) { return update_tablet_meta_callback(engine, task); });

_alter_tablet_workers = std::make_unique<TaskWorkerPool>(
"ALTER_TABLE", config::alter_tablet_worker_count, [&engine](auto&& task) { return alter_tablet_callback(engine, task); });

_clone_workers = std::make_unique<TaskWorkerPool>(
"CLONE", config::clone_worker_count, [&engine, &master_info = _master_info](auto&& task) { return clone_callback(engine, master_info, task); });

_storage_medium_migrate_workers = std::make_unique<TaskWorkerPool>(
"STORAGE_MEDIUM_MIGRATE", config::storage_medium_migrate_count, [&engine](auto&& task) { return storage_medium_migrate_callback(engine, task); });

_gc_binlog_workers = std::make_unique<TaskWorkerPool>(
"GC_BINLOG", 1, [&engine](auto&& task) { return gc_binlog_callback(engine, task); });

_report_task_workers = std::make_unique<ReportWorker>(
"REPORT_TASK", _master_info, config::report_task_interval_seconds, [&master_info = _master_info] { report_task_callback(master_info); });

_report_disk_state_workers = std::make_unique<ReportWorker>(
"REPORT_DISK_STATE", _master_info, config::report_disk_state_interval_seconds, [&engine, &master_info = _master_info] { report_disk_callback(engine, master_info); });

_report_tablet_workers = std::make_unique<ReportWorker>(
"REPORT_OLAP_TABLE", _master_info, config::report_tablet_interval_seconds,[&engine, &master_info = _master_info] { report_tablet_callback(engine, master_info); });
// clang-format on
}

// TODO(lingbin): each task in the batch may have it own status or FE must check and
// resend request when something is wrong(BE may need some logic to guarantee idempotence.
Expand All @@ -152,7 +168,7 @@ void AgentServer::submit_tasks(TAgentResult& agent_result,
return;
}

for (auto task : tasks) {
for (auto&& task : tasks) {
VLOG_RPC << "submit one task: " << apache::thrift::ThriftDebugString(task).c_str();
TTaskType::type task_type = task.task_type;
int64_t signature = task.signature;
Expand Down
15 changes: 9 additions & 6 deletions be/src/agent/agent_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@
#include <string>
#include <vector>

#include "agent/topic_subscriber.h"

namespace doris {

class TaskWorkerPool;
class PriorTaskWorkerPool;
class ReportWorker;
class TopicSubscriber;
class ExecEnv;
class TAgentPublishRequest;
class TAgentResult;
Expand Down Expand Up @@ -56,14 +57,16 @@ class AgentServer {
TopicSubscriber* get_topic_subscriber() { return _topic_subscriber.get(); }

private:
void start_workers(ExecEnv* exec_env);

DISALLOW_COPY_AND_ASSIGN(AgentServer);

// Reference to the ExecEnv::_master_info
const TMasterInfo& _master_info;

std::unique_ptr<TaskWorkerPool> _create_tablet_workers;
std::unique_ptr<TaskWorkerPool> _drop_tablet_workers;
std::unique_ptr<TaskWorkerPool> _push_load_workers;
std::unique_ptr<PriorTaskWorkerPool> _push_load_workers;
std::unique_ptr<TaskWorkerPool> _publish_version_workers;
std::unique_ptr<TaskWorkerPool> _clear_transaction_task_workers;
std::unique_ptr<TaskWorkerPool> _push_delete_workers;
Expand All @@ -76,9 +79,9 @@ class AgentServer {

// These 3 worker-pool do not accept tasks from FE.
// It is self triggered periodically and reports to Fe master
std::unique_ptr<TaskWorkerPool> _report_task_workers;
std::unique_ptr<TaskWorkerPool> _report_disk_state_workers;
std::unique_ptr<TaskWorkerPool> _report_tablet_workers;
std::unique_ptr<ReportWorker> _report_task_workers;
std::unique_ptr<ReportWorker> _report_disk_state_workers;
std::unique_ptr<ReportWorker> _report_tablet_workers;

std::unique_ptr<TaskWorkerPool> _upload_workers;
std::unique_ptr<TaskWorkerPool> _download_workers;
Expand Down
81 changes: 78 additions & 3 deletions be/src/agent/cgroup_cpu_ctl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
#include "agent/cgroup_cpu_ctl.h"

#include <fmt/format.h>
#include <sys/stat.h>

#include <filesystem>

namespace doris {

Expand All @@ -41,7 +44,7 @@ Status CgroupCpuCtl::init() {
return Status::OK();
}

void CgroupCpuCtl::get_cgroup_cpu_info(uint64_t* cpu_shares, uint64_t* cpu_hard_limit) {
void CgroupCpuCtl::get_cgroup_cpu_info(uint64_t* cpu_shares, int* cpu_hard_limit) {
std::lock_guard<std::shared_mutex> w_lock(_lock_mutex);
*cpu_shares = this->_cpu_shares;
*cpu_hard_limit = this->_cpu_hard_limit;
Expand Down Expand Up @@ -100,11 +103,34 @@ Status CgroupV1CpuCtl::init() {
int ret = mkdir(_cgroup_v1_cpu_query_path.c_str(), S_IRWXU);
if (ret != 0) {
LOG(ERROR) << "cgroup v1 mkdir query failed, path=" << _cgroup_v1_cpu_query_path;
return Status::InternalError<false>("cgroup v1 mkdir query failed, path=",
return Status::InternalError<false>("cgroup v1 mkdir query failed, path={}",
_cgroup_v1_cpu_query_path);
}
}

// check whether current user specified path is a valid cgroup path
std::string query_path_tasks = _cgroup_v1_cpu_query_path + "/tasks";
std::string query_path_cpu_shares = _cgroup_v1_cpu_query_path + "/cpu.shares";
std::string query_path_quota = _cgroup_v1_cpu_query_path + "/cpu.cfs_quota_us";
if (access(query_path_tasks.c_str(), F_OK) != 0) {
return Status::InternalError<false>("invalid cgroup path, not find task file");
}
if (access(query_path_cpu_shares.c_str(), F_OK) != 0) {
return Status::InternalError<false>("invalid cgroup path, not find cpu share file");
}
if (access(query_path_quota.c_str(), F_OK) != 0) {
return Status::InternalError<false>("invalid cgroup path, not find cpu quota file");
}

if (_tg_id == -1) {
// means current cgroup cpu ctl is just used to clear dir,
// it does not contains task group.
// todo(wb) rethinking whether need to refactor cgroup_cpu_ctl
_init_succ = true;
LOG(INFO) << "init cgroup cpu query path succ, path=" << _cgroup_v1_cpu_query_path;
return Status::OK();
}

// workload group path
_cgroup_v1_cpu_tg_path = _cgroup_v1_cpu_query_path + "/" + std::to_string(_tg_id);
if (access(_cgroup_v1_cpu_tg_path.c_str(), F_OK) != 0) {
Expand Down Expand Up @@ -137,7 +163,8 @@ Status CgroupV1CpuCtl::modify_cg_cpu_soft_limit_no_lock(int cpu_shares) {
}

Status CgroupV1CpuCtl::modify_cg_cpu_hard_limit_no_lock(int cpu_hard_limit) {
int val = _cpu_cfs_period_us * _cpu_core_num * cpu_hard_limit / 100;
int val = cpu_hard_limit > 0 ? (_cpu_cfs_period_us * _cpu_core_num * cpu_hard_limit / 100)
: CPU_HARD_LIMIT_DEFAULT_VALUE;
std::string msg = "modify cpu quota value to " + std::to_string(val);
return CgroupCpuCtl::write_cg_sys_file(_cgroup_v1_cpu_tg_quota_file, val, msg, false);
}
Expand All @@ -156,4 +183,52 @@ Status CgroupV1CpuCtl::add_thread_to_cgroup() {
return CgroupCpuCtl::write_cg_sys_file(_cgroup_v1_cpu_tg_task_file, tid, msg, true);
#endif
}

Status CgroupV1CpuCtl::delete_unused_cgroup_path(std::set<uint64_t>& used_wg_ids) {
if (!_init_succ) {
return Status::InternalError<false>(
"cgroup cpu ctl init failed, delete can not be executed");
}
// 1 get unused wg id
std::set<std::string> unused_wg_ids;
for (const auto& entry : std::filesystem::directory_iterator(_cgroup_v1_cpu_query_path)) {
const std::string dir_name = entry.path().string();
struct stat st;
// == 0 means exists
if (stat(dir_name.c_str(), &st) == 0 && (st.st_mode & S_IFDIR)) {
int pos = dir_name.rfind("/");
std::string wg_dir_name = dir_name.substr(pos + 1, dir_name.length());
if (wg_dir_name.empty()) {
return Status::InternalError<false>("find an empty workload group path, path={}",
dir_name);
}
if (std::all_of(wg_dir_name.begin(), wg_dir_name.end(), ::isdigit)) {
uint64_t id_in_path = std::stoll(wg_dir_name);
if (used_wg_ids.find(id_in_path) == used_wg_ids.end()) {
unused_wg_ids.insert(wg_dir_name);
}
}
}
}

// 2 delete unused cgroup path
int failed_count = 0;
std::string query_path = _cgroup_v1_cpu_query_path.back() != '/'
? _cgroup_v1_cpu_query_path + "/"
: _cgroup_v1_cpu_query_path;
for (const std::string& unused_wg_id : unused_wg_ids) {
std::string wg_path = query_path + unused_wg_id;
int ret = rmdir(wg_path.c_str());
if (ret < 0) {
LOG(WARNING) << "rmdir failed, path=" << wg_path;
failed_count++;
}
}
if (failed_count != 0) {
return Status::InternalError<false>("error happens when delete unused path, count={}",
failed_count);
}
return Status::OK();
}

} // namespace doris
Loading

0 comments on commit 8077cb5

Please sign in to comment.