diff --git a/be/src/agent/cgroup_cpu_ctl.cpp b/be/src/agent/cgroup_cpu_ctl.cpp index e68535a708c49b..76b72f2c9d00ae 100644 --- a/be/src/agent/cgroup_cpu_ctl.cpp +++ b/be/src/agent/cgroup_cpu_ctl.cpp @@ -158,11 +158,11 @@ uint64_t CgroupCpuCtl::cpu_soft_limit_default_value() { return _is_enable_cgroup_v2_in_env ? 100 : 1024; } -std::unique_ptr CgroupCpuCtl::create_cgroup_cpu_ctl(uint64_t wg_id) { +std::shared_ptr CgroupCpuCtl::create_cgroup_cpu_ctl(uint64_t wg_id) { if (_is_enable_cgroup_v2_in_env) { - return std::make_unique(wg_id); + return std::make_shared(wg_id); } else if (_is_enable_cgroup_v1_in_env) { - return std::make_unique(wg_id); + return std::make_shared(wg_id); } return nullptr; } diff --git a/be/src/agent/cgroup_cpu_ctl.h b/be/src/agent/cgroup_cpu_ctl.h index 84e191159f15f1..b23f1f4dd9cadb 100644 --- a/be/src/agent/cgroup_cpu_ctl.h +++ b/be/src/agent/cgroup_cpu_ctl.h @@ -52,7 +52,7 @@ class CgroupCpuCtl { static Status delete_unused_cgroup_path(std::set& used_wg_ids); - static std::unique_ptr create_cgroup_cpu_ctl(uint64_t wg_id); + static std::shared_ptr create_cgroup_cpu_ctl(uint64_t wg_id); static bool is_a_valid_cgroup_path(std::string cg_path); diff --git a/be/src/agent/topic_subscriber.cpp b/be/src/agent/topic_subscriber.cpp index f62bdaef0991c9..b470e1534e1c6f 100644 --- a/be/src/agent/topic_subscriber.cpp +++ b/be/src/agent/topic_subscriber.cpp @@ -40,14 +40,12 @@ void TopicSubscriber::handle_topic_info(const TPublishTopicRequest& topic_reques // eg, update workload info may delay other listener, then we need add a thread here // to handle_topic_info asynchronous std::shared_lock lock(_listener_mtx); - LOG(INFO) << "[topic_publish]begin handle topic info"; for (auto& listener_pair : _registered_listeners) { if (topic_request.topic_map.find(listener_pair.first) != topic_request.topic_map.end()) { - LOG(INFO) << "[topic_publish]begin handle topic " << listener_pair.first - << ", size=" << topic_request.topic_map.at(listener_pair.first).size(); listener_pair.second->handle_topic_info( topic_request.topic_map.at(listener_pair.first)); - LOG(INFO) << "[topic_publish]finish handle topic " << listener_pair.first; + LOG(INFO) << "[topic_publish]finish handle topic " << listener_pair.first + << ", size=" << topic_request.topic_map.at(listener_pair.first).size(); } } } diff --git a/be/src/agent/workload_group_listener.cpp b/be/src/agent/workload_group_listener.cpp index f0f57869f2545a..7b688b7dcdf6ef 100644 --- a/be/src/agent/workload_group_listener.cpp +++ b/be/src/agent/workload_group_listener.cpp @@ -59,7 +59,7 @@ void WorkloadGroupListener::handle_topic_info(const std::vector& topi workload_group_info.enable_cpu_hard_limit); // 4 create and update task scheduler - wg->upsert_task_scheduler(&workload_group_info, _exec_env); + wg->upsert_task_scheduler(&workload_group_info); // 5 upsert io throttle wg->upsert_scan_io_throttle(&workload_group_info); diff --git a/be/src/cloud/cloud_storage_engine.cpp b/be/src/cloud/cloud_storage_engine.cpp index 5d7b445917aa20..dc6abbac31ba1b 100644 --- a/be/src/cloud/cloud_storage_engine.cpp +++ b/be/src/cloud/cloud_storage_engine.cpp @@ -231,7 +231,7 @@ Result CloudStorageEngine::get_tablet(int64_t tablet_id) { }); } -Status CloudStorageEngine::start_bg_threads() { +Status CloudStorageEngine::start_bg_threads(std::shared_ptr wg_sptr) { RETURN_IF_ERROR(Thread::create( "CloudStorageEngine", "refresh_s3_info_thread", [this]() { this->_refresh_storage_vault_info_thread_callback(); }, @@ -266,14 +266,27 @@ Status CloudStorageEngine::start_bg_threads() { // compaction tasks producer thread int base_thread_num = get_base_thread_num(); int cumu_thread_num = get_cumu_thread_num(); - RETURN_IF_ERROR(ThreadPoolBuilder("BaseCompactionTaskThreadPool") - .set_min_threads(base_thread_num) - .set_max_threads(base_thread_num) - .build(&_base_compaction_thread_pool)); - RETURN_IF_ERROR(ThreadPoolBuilder("CumuCompactionTaskThreadPool") - .set_min_threads(cumu_thread_num) - .set_max_threads(cumu_thread_num) - .build(&_cumu_compaction_thread_pool)); + if (wg_sptr->get_cgroup_cpu_ctl_wptr().lock()) { + RETURN_IF_ERROR(ThreadPoolBuilder("BaseCompactionTaskThreadPool") + .set_min_threads(base_thread_num) + .set_max_threads(base_thread_num) + .set_cgroup_cpu_ctl(wg_sptr->get_cgroup_cpu_ctl_wptr()) + .build(&_base_compaction_thread_pool)); + RETURN_IF_ERROR(ThreadPoolBuilder("CumuCompactionTaskThreadPool") + .set_min_threads(cumu_thread_num) + .set_max_threads(cumu_thread_num) + .set_cgroup_cpu_ctl(wg_sptr->get_cgroup_cpu_ctl_wptr()) + .build(&_cumu_compaction_thread_pool)); + } else { + RETURN_IF_ERROR(ThreadPoolBuilder("BaseCompactionTaskThreadPool") + .set_min_threads(base_thread_num) + .set_max_threads(base_thread_num) + .build(&_base_compaction_thread_pool)); + RETURN_IF_ERROR(ThreadPoolBuilder("CumuCompactionTaskThreadPool") + .set_min_threads(cumu_thread_num) + .set_max_threads(cumu_thread_num) + .build(&_cumu_compaction_thread_pool)); + } RETURN_IF_ERROR(Thread::create( "StorageEngine", "compaction_tasks_producer_thread", [this]() { this->_compaction_tasks_producer_callback(); }, diff --git a/be/src/cloud/cloud_storage_engine.h b/be/src/cloud/cloud_storage_engine.h index 92d2917a916f6a..072b8366542253 100644 --- a/be/src/cloud/cloud_storage_engine.h +++ b/be/src/cloud/cloud_storage_engine.h @@ -57,7 +57,7 @@ class CloudStorageEngine final : public BaseStorageEngine { Result get_tablet(int64_t tablet_id) override; - Status start_bg_threads() override; + Status start_bg_threads(std::shared_ptr wg_sptr = nullptr) override; Status set_cluster_id(int32_t cluster_id) override { _effective_cluster_id = cluster_id; diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index a0c5a05636bfa2..736bdaa99304d3 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -210,7 +210,7 @@ static int32_t get_single_replica_compaction_threads_num(size_t data_dirs_num) { return threads_num; } -Status StorageEngine::start_bg_threads() { +Status StorageEngine::start_bg_threads(std::shared_ptr wg_sptr) { RETURN_IF_ERROR(Thread::create( "StorageEngine", "unused_rowset_monitor_thread", [this]() { this->_unused_rowset_monitor_thread_callback(); }, @@ -243,29 +243,60 @@ Status StorageEngine::start_bg_threads() { auto single_replica_compaction_threads = get_single_replica_compaction_threads_num(data_dirs.size()); - RETURN_IF_ERROR(ThreadPoolBuilder("BaseCompactionTaskThreadPool") - .set_min_threads(base_compaction_threads) - .set_max_threads(base_compaction_threads) - .build(&_base_compaction_thread_pool)); - RETURN_IF_ERROR(ThreadPoolBuilder("CumuCompactionTaskThreadPool") - .set_min_threads(cumu_compaction_threads) - .set_max_threads(cumu_compaction_threads) - .build(&_cumu_compaction_thread_pool)); - RETURN_IF_ERROR(ThreadPoolBuilder("SingleReplicaCompactionTaskThreadPool") - .set_min_threads(single_replica_compaction_threads) - .set_max_threads(single_replica_compaction_threads) - .build(&_single_replica_compaction_thread_pool)); - - if (config::enable_segcompaction) { - RETURN_IF_ERROR(ThreadPoolBuilder("SegCompactionTaskThreadPool") - .set_min_threads(config::segcompaction_num_threads) - .set_max_threads(config::segcompaction_num_threads) - .build(&_seg_compaction_thread_pool)); + if (wg_sptr->get_cgroup_cpu_ctl_wptr().lock()) { + RETURN_IF_ERROR(ThreadPoolBuilder("gBaseCompactionTaskThreadPool") + .set_min_threads(base_compaction_threads) + .set_max_threads(base_compaction_threads) + .set_cgroup_cpu_ctl(wg_sptr->get_cgroup_cpu_ctl_wptr()) + .build(&_base_compaction_thread_pool)); + RETURN_IF_ERROR(ThreadPoolBuilder("gCumuCompactionTaskThreadPool") + .set_min_threads(cumu_compaction_threads) + .set_max_threads(cumu_compaction_threads) + .set_cgroup_cpu_ctl(wg_sptr->get_cgroup_cpu_ctl_wptr()) + .build(&_cumu_compaction_thread_pool)); + RETURN_IF_ERROR(ThreadPoolBuilder("gSingleReplicaCompactionTaskThreadPool") + .set_min_threads(single_replica_compaction_threads) + .set_max_threads(single_replica_compaction_threads) + .set_cgroup_cpu_ctl(wg_sptr->get_cgroup_cpu_ctl_wptr()) + .build(&_single_replica_compaction_thread_pool)); + + if (config::enable_segcompaction) { + RETURN_IF_ERROR(ThreadPoolBuilder("gSegCompactionTaskThreadPool") + .set_min_threads(config::segcompaction_num_threads) + .set_max_threads(config::segcompaction_num_threads) + .set_cgroup_cpu_ctl(wg_sptr->get_cgroup_cpu_ctl_wptr()) + .build(&_seg_compaction_thread_pool)); + } + RETURN_IF_ERROR(ThreadPoolBuilder("gColdDataCompactionTaskThreadPool") + .set_min_threads(config::cold_data_compaction_thread_num) + .set_max_threads(config::cold_data_compaction_thread_num) + .set_cgroup_cpu_ctl(wg_sptr->get_cgroup_cpu_ctl_wptr()) + .build(&_cold_data_compaction_thread_pool)); + } else { + RETURN_IF_ERROR(ThreadPoolBuilder("BaseCompactionTaskThreadPool") + .set_min_threads(base_compaction_threads) + .set_max_threads(base_compaction_threads) + .build(&_base_compaction_thread_pool)); + RETURN_IF_ERROR(ThreadPoolBuilder("CumuCompactionTaskThreadPool") + .set_min_threads(cumu_compaction_threads) + .set_max_threads(cumu_compaction_threads) + .build(&_cumu_compaction_thread_pool)); + RETURN_IF_ERROR(ThreadPoolBuilder("SingleReplicaCompactionTaskThreadPool") + .set_min_threads(single_replica_compaction_threads) + .set_max_threads(single_replica_compaction_threads) + .build(&_single_replica_compaction_thread_pool)); + + if (config::enable_segcompaction) { + RETURN_IF_ERROR(ThreadPoolBuilder("SegCompactionTaskThreadPool") + .set_min_threads(config::segcompaction_num_threads) + .set_max_threads(config::segcompaction_num_threads) + .build(&_seg_compaction_thread_pool)); + } + RETURN_IF_ERROR(ThreadPoolBuilder("ColdDataCompactionTaskThreadPool") + .set_min_threads(config::cold_data_compaction_thread_num) + .set_max_threads(config::cold_data_compaction_thread_num) + .build(&_cold_data_compaction_thread_pool)); } - RETURN_IF_ERROR(ThreadPoolBuilder("ColdDataCompactionTaskThreadPool") - .set_min_threads(config::cold_data_compaction_thread_num) - .set_max_threads(config::cold_data_compaction_thread_num) - .build(&_cold_data_compaction_thread_pool)); // compaction tasks producer thread RETURN_IF_ERROR(Thread::create( diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index 421c0eb352d712..a22015898988b3 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -72,6 +72,7 @@ class ReportWorker; class CreateTabletRRIdxCache; struct DirInfo; class SnapshotManager; +class WorkloadGroup; using SegCompactionCandidates = std::vector; using SegCompactionCandidatesSharedPtr = std::shared_ptr; @@ -105,7 +106,7 @@ class BaseStorageEngine { virtual bool stopped() = 0; // start all background threads. This should be call after env is ready. - virtual Status start_bg_threads() = 0; + virtual Status start_bg_threads(std::shared_ptr wg_sptr = nullptr) = 0; virtual Result get_tablet(int64_t tablet_id) = 0; @@ -278,7 +279,7 @@ class StorageEngine final : public BaseStorageEngine { return _default_rowset_type; } - Status start_bg_threads() override; + Status start_bg_threads(std::shared_ptr wg_sptr = nullptr) override; // clear trash and snapshot file // option: update disk usage after sweep diff --git a/be/src/pipeline/task_scheduler.h b/be/src/pipeline/task_scheduler.h index bdb5bec1776f58..3c1b08063dfa61 100644 --- a/be/src/pipeline/task_scheduler.h +++ b/be/src/pipeline/task_scheduler.h @@ -43,7 +43,7 @@ namespace doris::pipeline { class TaskScheduler { public: - TaskScheduler(int core_num, std::string name, CgroupCpuCtl* cgroup_cpu_ctl) + TaskScheduler(int core_num, std::string name, std::shared_ptr cgroup_cpu_ctl) : _task_queue(core_num), _shutdown(false), _name(std::move(name)), @@ -65,7 +65,7 @@ class TaskScheduler { std::vector _markers; bool _shutdown; std::string _name; - CgroupCpuCtl* _cgroup_cpu_ctl = nullptr; + std::weak_ptr _cgroup_cpu_ctl; void _do_work(int index); }; diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index 75ec588aa50c1d..0f0b677bb1ce96 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -276,6 +276,7 @@ Status ExecEnv::_init(const std::vector& store_paths, _pipeline_tracer_ctx = std::make_unique(); // before query RETURN_IF_ERROR(init_pipeline_task_scheduler()); _workload_group_manager = new WorkloadGroupMgr(); + _workload_group_manager->init_internal_workload_group(); _scanner_scheduler = new doris::vectorized::ScannerScheduler(); _fragment_mgr = new FragmentMgr(this); _result_cache = new ResultCache(config::query_cache_max_size_mb, @@ -364,7 +365,8 @@ Status ExecEnv::_init(const std::vector& store_paths, return st; } _storage_engine->set_heartbeat_flags(this->heartbeat_flags()); - if (st = _storage_engine->start_bg_threads(); !st.ok()) { + WorkloadGroupPtr internal_wg = _workload_group_manager->get_internal_wg(); + if (st = _storage_engine->start_bg_threads(internal_wg); !st.ok()) { LOG(ERROR) << "Failed to starge bg threads of storage engine, res=" << st; return st; } diff --git a/be/src/runtime/workload_group/workload_group.cpp b/be/src/runtime/workload_group/workload_group.cpp index c6a3c07adda1dd..f62179273cfcd4 100644 --- a/be/src/runtime/workload_group/workload_group.cpp +++ b/be/src/runtime/workload_group/workload_group.cpp @@ -50,7 +50,9 @@ const static int CPU_HARD_LIMIT_DEFAULT_VALUE = -1; const static int SPILL_LOW_WATERMARK_DEFAULT_VALUE = 50; const static int SPILL_HIGH_WATERMARK_DEFAULT_VALUE = 80; -WorkloadGroup::WorkloadGroup(const WorkloadGroupInfo& tg_info) +WorkloadGroup::WorkloadGroup(const WorkloadGroupInfo& wg_info) : WorkloadGroup(wg_info, true) {} + +WorkloadGroup::WorkloadGroup(const WorkloadGroupInfo& tg_info, bool need_create_query_thread_pool) : _id(tg_info.id), _name(tg_info.name), _version(tg_info.version), @@ -65,7 +67,8 @@ WorkloadGroup::WorkloadGroup(const WorkloadGroupInfo& tg_info) _spill_low_watermark(tg_info.spill_low_watermark), _spill_high_watermark(tg_info.spill_high_watermark), _scan_bytes_per_second(tg_info.read_bytes_per_second), - _remote_scan_bytes_per_second(tg_info.remote_read_bytes_per_second) { + _remote_scan_bytes_per_second(tg_info.remote_read_bytes_per_second), + _need_create_query_thread_pool(need_create_query_thread_pool) { std::vector& data_dir_list = io::BeConfDataDirReader::be_config_data_dir_list; for (const auto& data_dir : data_dir_list) { _scan_io_throttle_map[data_dir.path] = @@ -434,54 +437,60 @@ WorkloadGroupInfo WorkloadGroupInfo::parse_topic_info( .remote_read_bytes_per_second = remote_read_bytes_per_second}; } -void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* exec_env) { - uint64_t tg_id = tg_info->id; - std::string tg_name = tg_info->name; - int cpu_hard_limit = tg_info->cpu_hard_limit; - uint64_t cpu_shares = tg_info->cpu_share; - bool enable_cpu_hard_limit = tg_info->enable_cpu_hard_limit; - int scan_thread_num = tg_info->scan_thread_num; - int max_remote_scan_thread_num = tg_info->max_remote_scan_thread_num; - int min_remote_scan_thread_num = tg_info->min_remote_scan_thread_num; +std::weak_ptr WorkloadGroup::get_cgroup_cpu_ctl_wptr() { + std::shared_lock rlock(_task_sched_lock); + return _cgroup_cpu_ctl; +} +void WorkloadGroup::create_cgroup_cpu_ctl() { std::lock_guard wlock(_task_sched_lock); + create_cgroup_cpu_ctl_no_lock(); +} + +void WorkloadGroup::create_cgroup_cpu_ctl_no_lock() { if (config::doris_cgroup_cpu_path != "" && _cgroup_cpu_ctl == nullptr) { - std::unique_ptr cgroup_cpu_ctl = CgroupCpuCtl::create_cgroup_cpu_ctl(tg_id); + std::shared_ptr cgroup_cpu_ctl = CgroupCpuCtl::create_cgroup_cpu_ctl(_id); if (cgroup_cpu_ctl) { Status ret = cgroup_cpu_ctl->init(); if (ret.ok()) { _cgroup_cpu_ctl = std::move(cgroup_cpu_ctl); - LOG(INFO) << "[upsert wg thread pool] cgroup init success, wg_id=" << tg_id; + LOG(INFO) << "[upsert wg thread pool] cgroup init success, wg_id=" << _id; } else { - LOG(INFO) << "[upsert wg thread pool] cgroup init failed, wg_id=" << tg_id + LOG(INFO) << "[upsert wg thread pool] cgroup init failed, wg_id=" << _id << ", reason=" << ret.to_string(); } } else { - LOG(INFO) << "[upsert wg thread pool] create cgroup cpu ctl for " << tg_id << " failed"; + LOG(INFO) << "[upsert wg thread pool] create cgroup cpu ctl wg_id=" << _id << " failed"; } } +} - CgroupCpuCtl* cg_cpu_ctl_ptr = _cgroup_cpu_ctl.get(); - +void WorkloadGroup::upsert_thread_pool_no_lock(WorkloadGroupInfo* wg_info, + std::shared_ptr cg_cpu_ctl_ptr) { + uint64_t wg_id = wg_info->id; + std::string wg_name = wg_info->name; + int scan_thread_num = wg_info->scan_thread_num; + int max_remote_scan_thread_num = wg_info->max_remote_scan_thread_num; + int min_remote_scan_thread_num = wg_info->min_remote_scan_thread_num; if (_task_sched == nullptr) { int32_t executors_size = config::pipeline_executor_size; if (executors_size <= 0) { executors_size = CpuInfo::num_cores(); } std::unique_ptr pipeline_task_scheduler = - std::make_unique(executors_size, "Pipe_" + tg_name, + std::make_unique(executors_size, "Pipe_" + wg_name, cg_cpu_ctl_ptr); Status ret = pipeline_task_scheduler->start(); if (ret.ok()) { _task_sched = std::move(pipeline_task_scheduler); } else { - LOG(INFO) << "[upsert wg thread pool] task scheduler start failed, gid= " << tg_id; + LOG(INFO) << "[upsert wg thread pool] task scheduler start failed, gid= " << wg_id; } } if (_scan_task_sched == nullptr) { std::unique_ptr scan_scheduler = - std::make_unique("Scan_" + tg_name, + std::make_unique("Scan_" + wg_name, cg_cpu_ctl_ptr); Status ret = scan_scheduler->start(config::doris_scanner_thread_pool_thread_num, config::doris_scanner_thread_pool_thread_num, @@ -489,7 +498,7 @@ void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e if (ret.ok()) { _scan_task_sched = std::move(scan_scheduler); } else { - LOG(INFO) << "[upsert wg thread pool] scan scheduler start failed, gid=" << tg_id; + LOG(INFO) << "[upsert wg thread pool] scan scheduler start failed, gid=" << wg_id; } } if (scan_thread_num > 0 && _scan_task_sched) { @@ -501,7 +510,7 @@ void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e int remote_scan_thread_queue_size = vectorized::ScannerScheduler::get_remote_scan_thread_queue_size(); std::unique_ptr remote_scan_scheduler = - std::make_unique("RScan_" + tg_name, + std::make_unique("RScan_" + wg_name, cg_cpu_ctl_ptr); Status ret = remote_scan_scheduler->start(remote_max_thread_num, config::doris_scanner_min_thread_pool_thread_num, @@ -510,7 +519,7 @@ void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e _remote_scan_task_sched = std::move(remote_scan_scheduler); } else { LOG(INFO) << "[upsert wg thread pool] remote scan scheduler start failed, gid=" - << tg_id; + << wg_id; } } if (max_remote_scan_thread_num >= min_remote_scan_thread_num && _remote_scan_task_sched) { @@ -532,7 +541,7 @@ void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e : std::min(num_disk * min_threads, num_cpus * config::wg_flush_thread_num_per_cpu); - std::string pool_name = "wg_flush_" + tg_name; + std::string pool_name = "wg_flush_" + wg_name; auto ret = ThreadPoolBuilder(pool_name) .set_min_threads(min_threads) .set_max_threads(max_threads) @@ -540,17 +549,24 @@ void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e .build(&thread_pool); if (!ret.ok()) { LOG(INFO) << "[upsert wg thread pool] create " + pool_name + " failed, gid=" - << tg_id; + << wg_id; } else { _memtable_flush_pool = std::move(thread_pool); - LOG(INFO) << "[upsert wg thread pool] create " + pool_name + " succ, gid=" << tg_id + LOG(INFO) << "[upsert wg thread pool] create " + pool_name + " succ, gid=" << wg_id << ", max thread num=" << max_threads << ", min thread num=" << min_threads; } } } +} + +void WorkloadGroup::upsert_cgroup_cpu_ctl_no_lock(WorkloadGroupInfo* wg_info) { + uint64_t wg_id = wg_info->id; + int cpu_hard_limit = wg_info->cpu_hard_limit; + uint64_t cpu_shares = wg_info->cpu_share; + bool enable_cpu_hard_limit = wg_info->enable_cpu_hard_limit; + create_cgroup_cpu_ctl_no_lock(); - // step 6: update cgroup cpu if needed if (_cgroup_cpu_ctl) { if (enable_cpu_hard_limit) { if (cpu_hard_limit > 0) { @@ -560,15 +576,24 @@ void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e } else { LOG(INFO) << "[upsert wg thread pool] enable cpu hard limit but value is " "illegal: " - << cpu_hard_limit << ", gid=" << tg_id; + << cpu_hard_limit << ", gid=" << wg_id; } } else { _cgroup_cpu_ctl->update_cpu_soft_limit(cpu_shares); _cgroup_cpu_ctl->update_cpu_hard_limit( CPU_HARD_LIMIT_DEFAULT_VALUE); // disable cpu hard limit } - _cgroup_cpu_ctl->get_cgroup_cpu_info(&(tg_info->cgroup_cpu_shares), - &(tg_info->cgroup_cpu_hard_limit)); + _cgroup_cpu_ctl->get_cgroup_cpu_info(&(wg_info->cgroup_cpu_shares), + &(wg_info->cgroup_cpu_hard_limit)); + } +} + +void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* wg_info) { + std::lock_guard wlock(_task_sched_lock); + upsert_cgroup_cpu_ctl_no_lock(wg_info); + + if (_need_create_query_thread_pool) { + upsert_thread_pool_no_lock(wg_info, _cgroup_cpu_ctl); } } diff --git a/be/src/runtime/workload_group/workload_group.h b/be/src/runtime/workload_group/workload_group.h index 2ba84ce982b304..96b8a36df1cfda 100644 --- a/be/src/runtime/workload_group/workload_group.h +++ b/be/src/runtime/workload_group/workload_group.h @@ -58,6 +58,8 @@ class WorkloadGroup : public std::enable_shared_from_this { public: explicit WorkloadGroup(const WorkloadGroupInfo& tg_info); + explicit WorkloadGroup(const WorkloadGroupInfo& tg_info, bool need_create_query_thread_pool); + int64_t version() const { return _version; } uint64_t cpu_share() const { return _cpu_share.load(); } @@ -165,7 +167,7 @@ class WorkloadGroup : public std::enable_shared_from_this { int64_t gc_memory(int64_t need_free_mem, RuntimeProfile* profile, bool is_minor_gc); - void upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* exec_env); + void upsert_task_scheduler(WorkloadGroupInfo* tg_info); void get_query_scheduler(doris::pipeline::TaskScheduler** exec_sched, vectorized::SimplifiedScanScheduler** scan_sched, @@ -198,18 +200,21 @@ class WorkloadGroup : public std::enable_shared_from_this { } int64_t get_remote_scan_bytes_per_second(); - CgroupCpuCtl* get_cgroup_cpu_ctl_ptr() { - std::shared_lock rlock(_task_sched_lock); - return _cgroup_cpu_ctl.get(); - } - ThreadPool* get_memtable_flush_pool_ptr() { // no lock here because this is called by memtable flush, // to avoid lock competition with the workload thread pool's update return _memtable_flush_pool.get(); } + void create_cgroup_cpu_ctl(); + + std::weak_ptr get_cgroup_cpu_ctl_wptr(); private: + void create_cgroup_cpu_ctl_no_lock(); + void upsert_cgroup_cpu_ctl_no_lock(WorkloadGroupInfo* wg_info); + void upsert_thread_pool_no_lock(WorkloadGroupInfo* wg_info, + std::shared_ptr cg_cpu_ctl_ptr); + mutable std::shared_mutex _mutex; // lock _name, _version, _cpu_share, _memory_limit const uint64_t _id; std::string _name; @@ -240,7 +245,10 @@ class WorkloadGroup : public std::enable_shared_from_this { std::unordered_map> _query_ctxs; std::shared_mutex _task_sched_lock; - std::unique_ptr _cgroup_cpu_ctl {nullptr}; + // _cgroup_cpu_ctl not only used by threadpool which managed by WorkloadGroup, + // but also some global background threadpool which not owned by WorkloadGroup, + // so it should be shared ptr; + std::shared_ptr _cgroup_cpu_ctl {nullptr}; std::unique_ptr _task_sched {nullptr}; std::unique_ptr _scan_task_sched {nullptr}; std::unique_ptr _remote_scan_task_sched {nullptr}; @@ -249,6 +257,9 @@ class WorkloadGroup : public std::enable_shared_from_this { std::map> _scan_io_throttle_map; std::shared_ptr _remote_scan_io_throttle {nullptr}; + // for some background workload, it doesn't need to create query thread pool + const bool _need_create_query_thread_pool; + // bvar metric std::unique_ptr> _mem_used_status; std::unique_ptr> _cpu_usage_adder; diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp b/be/src/runtime/workload_group/workload_group_manager.cpp index 927d4d13814267..4d32fc8700eaa5 100644 --- a/be/src/runtime/workload_group/workload_group_manager.cpp +++ b/be/src/runtime/workload_group/workload_group_manager.cpp @@ -34,6 +34,25 @@ namespace doris { +void WorkloadGroupMgr::init_internal_workload_group() { + WorkloadGroupPtr internal_wg = nullptr; + { + std::lock_guard w_lock(_group_mutex); + if (_workload_groups.find(INTERNAL_WORKLOAD_GROUP_ID) == _workload_groups.end()) { + WorkloadGroupInfo internal_wg_info { + .id = INTERNAL_WORKLOAD_GROUP_ID, + .name = INTERNAL_WORKLOAD_GROUP_NAME, + .cpu_share = CgroupCpuCtl::cpu_soft_limit_default_value()}; + internal_wg = std::make_shared(internal_wg_info, false); + _workload_groups[internal_wg_info.id] = internal_wg; + } + } + DCHECK(internal_wg != nullptr); + if (internal_wg) { + internal_wg->create_cgroup_cpu_ctl(); + } +} + WorkloadGroupPtr WorkloadGroupMgr::get_or_create_workload_group( const WorkloadGroupInfo& workload_group_info) { { @@ -86,6 +105,10 @@ void WorkloadGroupMgr::delete_workload_group_by_ids(std::set used_wg_i old_wg_size = _workload_groups.size(); for (auto iter = _workload_groups.begin(); iter != _workload_groups.end(); iter++) { uint64_t wg_id = iter->first; + // internal workload group created by BE can not be dropped + if (wg_id == INTERNAL_WORKLOAD_GROUP_ID) { + continue; + } auto workload_group_ptr = iter->second; if (used_wg_id.find(wg_id) == used_wg_id.end()) { workload_group_ptr->shutdown(); diff --git a/be/src/runtime/workload_group/workload_group_manager.h b/be/src/runtime/workload_group/workload_group_manager.h index f76e98d26063ba..18a0687b373325 100644 --- a/be/src/runtime/workload_group/workload_group_manager.h +++ b/be/src/runtime/workload_group/workload_group_manager.h @@ -36,11 +36,18 @@ class TaskScheduler; class MultiCoreTaskQueue; } // namespace pipeline +// internal_group is used for doris internal workload, currently is mainly compaction +const static uint64_t INTERNAL_WORKLOAD_GROUP_ID = + static_cast(TWorkloadType::type::INTERNAL); +const static std::string INTERNAL_WORKLOAD_GROUP_NAME = "_internal"; + class WorkloadGroupMgr { public: WorkloadGroupMgr() = default; ~WorkloadGroupMgr() = default; + void init_internal_workload_group(); + WorkloadGroupPtr get_or_create_workload_group(const WorkloadGroupInfo& workload_group_info); void get_related_workload_groups(const std::function& pred, @@ -64,6 +71,11 @@ class WorkloadGroupMgr { void get_wg_resource_usage(vectorized::Block* block); + WorkloadGroupPtr get_internal_wg() { + std::shared_lock r_lock(_group_mutex); + return _workload_groups[INTERNAL_WORKLOAD_GROUP_ID]; + } + private: std::shared_mutex _group_mutex; std::unordered_map _workload_groups; diff --git a/be/src/util/threadpool.cpp b/be/src/util/threadpool.cpp index 15fb36181d4336..f5ea38515def36 100644 --- a/be/src/util/threadpool.cpp +++ b/be/src/util/threadpool.cpp @@ -75,7 +75,8 @@ ThreadPoolBuilder& ThreadPoolBuilder::set_max_queue_size(int max_queue_size) { return *this; } -ThreadPoolBuilder& ThreadPoolBuilder::set_cgroup_cpu_ctl(CgroupCpuCtl* cgroup_cpu_ctl) { +ThreadPoolBuilder& ThreadPoolBuilder::set_cgroup_cpu_ctl( + std::weak_ptr cgroup_cpu_ctl) { _cgroup_cpu_ctl = cgroup_cpu_ctl; return *this; } @@ -476,8 +477,8 @@ void ThreadPool::dispatch_thread() { _num_threads++; _num_threads_pending_start--; - if (_cgroup_cpu_ctl != nullptr) { - static_cast(_cgroup_cpu_ctl->add_thread_to_cgroup()); + if (std::shared_ptr cg_cpu_ctl_sptr = _cgroup_cpu_ctl.lock()) { + static_cast(cg_cpu_ctl_sptr->add_thread_to_cgroup()); } // Owned by this worker thread and added/removed from _idle_threads as needed. diff --git a/be/src/util/threadpool.h b/be/src/util/threadpool.h index 5ce27e2f27b9a5..9bd4a7246fb0b1 100644 --- a/be/src/util/threadpool.h +++ b/be/src/util/threadpool.h @@ -107,7 +107,7 @@ class ThreadPoolBuilder { ThreadPoolBuilder& set_min_threads(int min_threads); ThreadPoolBuilder& set_max_threads(int max_threads); ThreadPoolBuilder& set_max_queue_size(int max_queue_size); - ThreadPoolBuilder& set_cgroup_cpu_ctl(CgroupCpuCtl* cgroup_cpu_ctl); + ThreadPoolBuilder& set_cgroup_cpu_ctl(std::weak_ptr cgroup_cpu_ctl); template ThreadPoolBuilder& set_idle_timeout(const std::chrono::duration& idle_timeout) { _idle_timeout = std::chrono::duration_cast(idle_timeout); @@ -133,7 +133,7 @@ class ThreadPoolBuilder { int _min_threads; int _max_threads; int _max_queue_size; - CgroupCpuCtl* _cgroup_cpu_ctl = nullptr; + std::weak_ptr _cgroup_cpu_ctl; std::chrono::milliseconds _idle_timeout; ThreadPoolBuilder(const ThreadPoolBuilder&) = delete; @@ -345,7 +345,7 @@ class ThreadPool { // Protected by _lock. int _total_queued_tasks; - CgroupCpuCtl* _cgroup_cpu_ctl = nullptr; + std::weak_ptr _cgroup_cpu_ctl; // All allocated tokens. // diff --git a/be/src/vec/exec/scan/scanner_scheduler.h b/be/src/vec/exec/scan/scanner_scheduler.h index 56c49368598adc..7731b3ba8f983b 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.h +++ b/be/src/vec/exec/scan/scanner_scheduler.h @@ -114,11 +114,8 @@ struct SimplifiedScanTask { class SimplifiedScanScheduler { public: - SimplifiedScanScheduler(std::string sched_name, CgroupCpuCtl* cgroup_cpu_ctl) { - _is_stop.store(false); - _cgroup_cpu_ctl = cgroup_cpu_ctl; - _sched_name = sched_name; - } + SimplifiedScanScheduler(std::string sched_name, std::shared_ptr cgroup_cpu_ctl) + : _is_stop(false), _cgroup_cpu_ctl(cgroup_cpu_ctl), _sched_name(sched_name) {} ~SimplifiedScanScheduler() { stop(); @@ -217,7 +214,7 @@ class SimplifiedScanScheduler { private: std::unique_ptr _scan_thread_pool; std::atomic _is_stop; - CgroupCpuCtl* _cgroup_cpu_ctl = nullptr; + std::weak_ptr _cgroup_cpu_ctl; std::string _sched_name; }; diff --git a/be/src/vec/sink/writer/async_result_writer.cpp b/be/src/vec/sink/writer/async_result_writer.cpp index 432ec1c54b5312..c17b84b2dbe1de 100644 --- a/be/src/vec/sink/writer/async_result_writer.cpp +++ b/be/src/vec/sink/writer/async_result_writer.cpp @@ -107,12 +107,13 @@ void AsyncResultWriter::process_block(RuntimeState* state, RuntimeProfile* profi force_close(status); } - if (state && state->get_query_ctx()) { - WorkloadGroupPtr wg_ptr = state->get_query_ctx()->workload_group(); - if (wg_ptr && wg_ptr->get_cgroup_cpu_ctl_ptr()) { - Status ret = wg_ptr->get_cgroup_cpu_ctl_ptr()->add_thread_to_cgroup(); + if (state && state->get_query_ctx() && state->get_query_ctx()->workload_group()) { + if (auto cg_ctl_sptr = + state->get_query_ctx()->workload_group()->get_cgroup_cpu_ctl_wptr().lock()) { + Status ret = cg_ctl_sptr->add_thread_to_cgroup(); if (ret.ok()) { - std::string wg_tname = "asyc_wr_" + wg_ptr->name(); + std::string wg_tname = + "asyc_wr_" + state->get_query_ctx()->workload_group()->name(); Thread::set_self_name(wg_tname); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterWorkloadGroupStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterWorkloadGroupStmt.java index 8cba792dd39add..4405da6ce13dd9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterWorkloadGroupStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterWorkloadGroupStmt.java @@ -25,6 +25,10 @@ import org.apache.doris.common.util.PrintableMap; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.resource.workloadgroup.WorkloadGroup; +import org.apache.doris.resource.workloadgroup.WorkloadGroupMgr; + +import org.apache.commons.lang3.StringUtils; import java.util.Map; @@ -55,14 +59,26 @@ public void analyze(Analyzer analyzer) throws UserException { } if (properties == null || properties.isEmpty()) { - throw new AnalysisException("Resource group properties can't be null"); + throw new AnalysisException("Workload Group properties can't be empty"); + } + + if (properties.containsKey(WorkloadGroup.INTERNAL_TYPE)) { + throw new AnalysisException(WorkloadGroup.INTERNAL_TYPE + " can not be create or modified "); + } + + String tagStr = properties.get(WorkloadGroup.TAG); + if (!StringUtils.isEmpty(tagStr) && (WorkloadGroupMgr.DEFAULT_GROUP_NAME.equals(workloadGroupName) + || WorkloadGroupMgr.INTERNAL_GROUP_NAME.equals(workloadGroupName))) { + throw new AnalysisException( + WorkloadGroupMgr.INTERNAL_GROUP_NAME + " and " + WorkloadGroupMgr.DEFAULT_GROUP_NAME + + " group can not set tag"); } } @Override public String toSql() { StringBuilder sb = new StringBuilder(); - sb.append("ALTER RESOURCE GROUP '").append(workloadGroupName).append("' "); + sb.append("ALTER WORKLOAD GROUP '").append(workloadGroupName).append("' "); sb.append("PROPERTIES(").append(new PrintableMap<>(properties, " = ", true, false)).append(")"); return sb.toString(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateWorkloadGroupStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateWorkloadGroupStmt.java index fc4f99046d5aa9..dd13542a8361f3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateWorkloadGroupStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateWorkloadGroupStmt.java @@ -27,6 +27,9 @@ import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; import org.apache.doris.resource.workloadgroup.WorkloadGroup; +import org.apache.doris.resource.workloadgroup.WorkloadGroupMgr; + +import org.apache.commons.lang3.StringUtils; import java.util.Map; @@ -68,12 +71,19 @@ public void analyze(Analyzer analyzer) throws UserException { FeNameFormat.checkWorkloadGroupName(workloadGroupName); if (properties == null || properties.isEmpty()) { - throw new AnalysisException("Resource group properties can't be null"); + throw new AnalysisException("Workload Group properties can't be empty"); + } + + if (properties.containsKey(WorkloadGroup.INTERNAL_TYPE)) { + throw new AnalysisException(WorkloadGroup.INTERNAL_TYPE + " can not be create or modified "); } - String wgTag = properties.get(WorkloadGroup.TAG); - if (wgTag != null) { - FeNameFormat.checkCommonName("workload group tag", wgTag); + String tagStr = properties.get(WorkloadGroup.TAG); + if (!StringUtils.isEmpty(tagStr) && (WorkloadGroupMgr.DEFAULT_GROUP_NAME.equals(workloadGroupName) + || WorkloadGroupMgr.INTERNAL_GROUP_NAME.equals(workloadGroupName))) { + throw new AnalysisException( + WorkloadGroupMgr.INTERNAL_GROUP_NAME + " and " + WorkloadGroupMgr.DEFAULT_GROUP_NAME + + " group can not set tag"); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DropWorkloadGroupStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DropWorkloadGroupStmt.java index e4e3055f1280ba..9356c6b5c4b55c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DropWorkloadGroupStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DropWorkloadGroupStmt.java @@ -20,7 +20,6 @@ import org.apache.doris.catalog.Env; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; -import org.apache.doris.common.FeNameFormat; import org.apache.doris.common.UserException; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; @@ -50,8 +49,6 @@ public void analyze(Analyzer analyzer) throws UserException { if (!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) { ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN"); } - - FeNameFormat.checkWorkloadGroupName(workloadGroupName); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 12ac201eb65f26..7fb12b9621acc3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -250,6 +250,7 @@ import org.apache.doris.qe.VariableMgr; import org.apache.doris.resource.AdmissionControl; import org.apache.doris.resource.Tag; +import org.apache.doris.resource.workloadgroup.CreateInternalWorkloadGroupThread; import org.apache.doris.resource.workloadgroup.WorkloadGroupMgr; import org.apache.doris.resource.workloadschedpolicy.WorkloadRuntimeStatusMgr; import org.apache.doris.resource.workloadschedpolicy.WorkloadSchedPolicyMgr; @@ -1874,6 +1875,7 @@ protected void startMasterOnlyDaemonThreads() { WorkloadSchedPolicyPublisher wpPublisher = new WorkloadSchedPolicyPublisher(this); topicPublisherThread.addToTopicPublisherList(wpPublisher); topicPublisherThread.start(); + new CreateInternalWorkloadGroupThread().start(); // auto analyze related threads. statisticsCleaner.start(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java index 2dc6a90d593c80..cfc9d85c6a88c2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java @@ -244,10 +244,11 @@ public static void createTbl() throws UserException { // statistics Env.getCurrentEnv().getInternalCatalog().createTable( buildStatisticsTblStmt(StatisticConstants.TABLE_STATISTIC_TBL_NAME, - Lists.newArrayList("id", "catalog_id", "db_id", "tbl_id", "idx_id", "col_id", "part_id"))); + Lists.newArrayList("id", "catalog_id", "db_id", "tbl_id", "idx_id", "col_id", "part_id"))); Env.getCurrentEnv().getInternalCatalog().createTable( buildStatisticsTblStmt(StatisticConstants.PARTITION_STATISTIC_TBL_NAME, - Lists.newArrayList("catalog_id", "db_id", "tbl_id", "idx_id", "part_name", "part_id", "col_id"))); + Lists.newArrayList("catalog_id", "db_id", "tbl_id", "idx_id", "part_name", "part_id", + "col_id"))); // audit table Env.getCurrentEnv().getInternalCatalog().createTable(buildAuditTblStmt()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java index 1c24ca69d4f191..214fbe0e41089f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java @@ -50,6 +50,9 @@ public class FeConstants { // set to false to disable internal schema db public static boolean enableInternalSchemaDb = true; + // for UT, create internal workload group thread can not start + public static boolean shouldCreateInternalWorkloadGroup = true; + // default scheduler interval is 10 seconds public static int default_scheduler_interval_millisecond = 10000; diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/CreateInternalWorkloadGroupThread.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/CreateInternalWorkloadGroupThread.java new file mode 100644 index 00000000000000..7c6d0e3a080818 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/CreateInternalWorkloadGroupThread.java @@ -0,0 +1,55 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.resource.workloadgroup; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.FeConstants; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class CreateInternalWorkloadGroupThread extends Thread { + + private static final Logger LOG = LogManager.getLogger(CreateInternalWorkloadGroupThread.class); + + public CreateInternalWorkloadGroupThread() { + super("CreateInternalWorkloadGroupThread"); + } + + public void run() { + if (!FeConstants.shouldCreateInternalWorkloadGroup) { + return; + } + try { + Env env = Env.getCurrentEnv(); + while (!env.isReady()) { + Thread.sleep(5000); + } + if (!env.getWorkloadGroupMgr() + .isWorkloadGroupExists(WorkloadGroupMgr.INTERNAL_GROUP_NAME)) { + env.getWorkloadGroupMgr().createInternalWorkloadGroup(); + LOG.info("create internal workload group succ"); + } else { + LOG.info("internal workload group already exists."); + } + } catch (Throwable t) { + LOG.warn("create internal workload group failed. ", t); + } + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java index 44fb98e10ef9f2..7d5e792ef71431 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java @@ -30,8 +30,10 @@ import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.thrift.TPipelineWorkloadGroup; import org.apache.doris.thrift.TWorkloadGroupInfo; +import org.apache.doris.thrift.TWorkloadType; import org.apache.doris.thrift.TopicInfo; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.gson.annotations.SerializedName; import org.apache.commons.lang3.StringUtils; @@ -43,8 +45,11 @@ import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.Set; public class WorkloadGroup implements Writable, GsonPostProcessable { private static final Logger LOG = LogManager.getLogger(WorkloadGroup.class); @@ -79,6 +84,11 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { public static final String REMOTE_READ_BYTES_PER_SECOND = "remote_read_bytes_per_second"; + // it's used to define Doris's internal workload group, + // currently it is internal, only contains compaction + // later more type and workload may be included in the future. + public static final String INTERNAL_TYPE = "internal_type"; + // NOTE(wb): all property is not required, some properties default value is set in be // default value is as followed // cpu_share=1024, memory_limit=0%(0 means not limit), enable_memory_overcommit=true @@ -87,7 +97,10 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { .add(MAX_QUEUE_SIZE).add(QUEUE_TIMEOUT).add(CPU_HARD_LIMIT).add(SCAN_THREAD_NUM) .add(MAX_REMOTE_SCAN_THREAD_NUM).add(MIN_REMOTE_SCAN_THREAD_NUM) .add(SPILL_THRESHOLD_LOW_WATERMARK).add(SPILL_THRESHOLD_HIGH_WATERMARK) - .add(TAG).add(READ_BYTES_PER_SECOND).add(REMOTE_READ_BYTES_PER_SECOND).build(); + .add(TAG).add(READ_BYTES_PER_SECOND).add(REMOTE_READ_BYTES_PER_SECOND).add(INTERNAL_TYPE).build(); + + public static final ImmutableMap WORKLOAD_TYPE_MAP = new ImmutableMap.Builder() + .put(TWorkloadType.INTERNAL.toString().toLowerCase(), TWorkloadType.INTERNAL.getValue()).build(); public static final int SPILL_LOW_WATERMARK_DEFAULT_VALUE = 50; public static final int SPILL_HIGH_WATERMARK_DEFAULT_VALUE = 80; @@ -420,13 +433,31 @@ private static void checkProperties(Map properties) throws DdlEx String[] tagArr = tagStr.split(","); for (String tag : tagArr) { try { - FeNameFormat.checkCommonName("workload group tag name", tag); + FeNameFormat.checkCommonName("workload group tag", tag); } catch (AnalysisException e) { - throw new DdlException("workload group tag name format is illegal, " + tagStr); + throw new DdlException("tag format is illegal, " + tagStr); } } } + // internal workload group is usually created by Doris. + // If exception happens here, it means thrift not match WORKLOAD_TYPE_MAP. + String interTypeId = properties.get(WorkloadGroup.INTERNAL_TYPE); + if (!StringUtils.isEmpty(interTypeId)) { + int wid = Integer.valueOf(interTypeId); + if (TWorkloadType.findByValue(wid) == null) { + throw new DdlException("error internal type id: " + wid + ", current id map:" + WORKLOAD_TYPE_MAP); + } + } + + } + + Optional getInternalTypeId() { + String typeIdStr = this.properties.get(INTERNAL_TYPE); + if (StringUtils.isEmpty(typeIdStr)) { + return Optional.empty(); + } + return Optional.of(Integer.valueOf(typeIdStr)); } public long getId() { @@ -535,8 +566,18 @@ public int getCpuHardLimit() { return cpuHardLimit; } - public String getTag() { - return properties.get(TAG); + public Optional> getTag() { + String tagStr = properties.get(TAG); + if (StringUtils.isEmpty(tagStr)) { + return Optional.empty(); + } + + Set tagSet = new HashSet<>(); + String[] ss = tagStr.split(","); + for (String str : ss) { + tagSet.add(str); + } + return Optional.of(tagSet); } @Override @@ -550,7 +591,13 @@ public TPipelineWorkloadGroup toThrift() { public TopicInfo toTopicInfo() { TWorkloadGroupInfo tWorkloadGroupInfo = new TWorkloadGroupInfo(); - tWorkloadGroupInfo.setId(id); + long wgId = this.id; + Optional internalTypeId = getInternalTypeId(); + if (internalTypeId.isPresent()) { + wgId = internalTypeId.get(); + } + tWorkloadGroupInfo.setId(wgId); + tWorkloadGroupInfo.setName(name); tWorkloadGroupInfo.setVersion(version); diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java index 8464d83bdbcfb7..26798bb1ec3396 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java @@ -42,6 +42,7 @@ import org.apache.doris.qe.ConnectContext; import org.apache.doris.thrift.TPipelineWorkloadGroup; import org.apache.doris.thrift.TUserIdentity; +import org.apache.doris.thrift.TWorkloadType; import org.apache.doris.thrift.TopicInfo; import com.google.common.base.Strings; @@ -49,7 +50,6 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.gson.annotations.SerializedName; -import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -62,6 +62,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -71,6 +72,12 @@ public class WorkloadGroupMgr extends MasterDaemon implements Writable, GsonPost public static final Long DEFAULT_GROUP_ID = 1L; + public static final String INTERNAL_GROUP_NAME = "_internal"; + + // internal_type_id could be converted to workload group id when Workload published to BE + // refer WorkloadGroup.toTopicInfo + public static final Long INTERNAL_TYPE_ID = Long.valueOf(TWorkloadType.INTERNAL.getValue()); + public static final ImmutableList WORKLOAD_GROUP_PROC_NODE_TITLE_NAMES = new ImmutableList.Builder() .add("Id").add("Name").add(WorkloadGroup.CPU_SHARE).add(WorkloadGroup.MEMORY_LIMIT) .add(WorkloadGroup.ENABLE_MEMORY_OVERCOMMIT) @@ -375,44 +382,84 @@ public void createWorkloadGroup(CreateWorkloadGroupStmt stmt) throws DdlExceptio LOG.info("Create workload group success: {}", workloadGroup); } + public void createInternalWorkloadGroup() { + Map properties = Maps.newHashMap(); + // 100 is cgroup v2 default cpu_share value + properties.put(WorkloadGroup.CPU_SHARE, "100"); + properties.put(WorkloadGroup.INTERNAL_TYPE, String.valueOf(INTERNAL_TYPE_ID)); + WorkloadGroup wg = new WorkloadGroup(Env.getCurrentEnv().getNextId(), INTERNAL_GROUP_NAME, properties); + writeLock(); + try { + if (!nameToWorkloadGroup.containsKey(wg.getName())) { + nameToWorkloadGroup.put(wg.getName(), wg); + idToWorkloadGroup.put(wg.getId(), wg); + Env.getCurrentEnv().getEditLog().logCreateWorkloadGroup(wg); + } + } finally { + writeUnlock(); + } + } + // NOTE: used for checking sum value of 100% for cpu_hard_limit and memory_limit // when create/alter workload group with same tag. // when oldWg is null it means caller is an alter stmt. private void checkGlobalUnlock(WorkloadGroup newWg, WorkloadGroup oldWg) throws DdlException { - String wgTag = newWg.getTag(); - double sumOfAllMemLimit = 0; - int sumOfAllCpuHardLimit = 0; - for (Map.Entry entry : idToWorkloadGroup.entrySet()) { - WorkloadGroup wg = entry.getValue(); - if (!StringUtils.equals(wgTag, wg.getTag())) { - continue; - } + Optional> newWgTag = newWg.getTag(); + Set newWgTagSet = null; + if (newWgTag.isPresent()) { + newWgTagSet = newWgTag.get(); + } else { + newWgTagSet = new HashSet<>(); + newWgTagSet.add(null); + } - if (oldWg != null && entry.getKey() == oldWg.getId()) { - continue; - } + for (String newWgOneTag : newWgTagSet) { + double sumOfAllMemLimit = 0; + int sumOfAllCpuHardLimit = 0; - if (wg.getCpuHardLimit() > 0) { - sumOfAllCpuHardLimit += wg.getCpuHardLimit(); - } - if (wg.getMemoryLimitPercent() > 0) { - sumOfAllMemLimit += wg.getMemoryLimitPercent(); + // 1 get sum value of all wg which has same tag without current wg + for (Map.Entry entry : idToWorkloadGroup.entrySet()) { + WorkloadGroup wg = entry.getValue(); + Optional> wgTag = wg.getTag(); + + if (oldWg != null && entry.getKey() == oldWg.getId()) { + continue; + } + + if (newWgOneTag == null) { + if (wgTag.isPresent()) { + continue; + } + } else if (!wgTag.isPresent() || (!wgTag.get().contains(newWgOneTag))) { + continue; + } + + if (wg.getCpuHardLimit() > 0) { + sumOfAllCpuHardLimit += wg.getCpuHardLimit(); + } + if (wg.getMemoryLimitPercent() > 0) { + sumOfAllMemLimit += wg.getMemoryLimitPercent(); + } } - } - sumOfAllMemLimit += newWg.getMemoryLimitPercent(); - sumOfAllCpuHardLimit += newWg.getCpuHardLimit(); + // 2 sum current wg value + sumOfAllMemLimit += newWg.getMemoryLimitPercent(); + sumOfAllCpuHardLimit += newWg.getCpuHardLimit(); - if (sumOfAllMemLimit > 100.0 + 1e-6) { - throw new DdlException( - "The sum of all workload group " + WorkloadGroup.MEMORY_LIMIT + " within tag " + wgTag - + " cannot be greater than 100.0%."); - } + // 3 check total sum + if (sumOfAllMemLimit > 100.0 + 1e-6) { + throw new DdlException( + "The sum of all workload group " + WorkloadGroup.MEMORY_LIMIT + " within tag " + ( + newWgTag.isPresent() ? newWgTag.get() : "") + + " cannot be greater than 100.0%. current sum val:" + sumOfAllMemLimit); + } - if (sumOfAllCpuHardLimit > 100) { - throw new DdlException( - "sum of all workload group " + WorkloadGroup.CPU_HARD_LIMIT + " within tag " - + wgTag + " can not be greater than 100% "); + if (sumOfAllCpuHardLimit > 100) { + throw new DdlException( + "sum of all workload group " + WorkloadGroup.CPU_HARD_LIMIT + " within tag " + ( + newWgTag.isPresent() + ? newWgTag.get() : "") + " can not be greater than 100% "); + } } } @@ -446,8 +493,8 @@ public void alterWorkloadGroup(AlterWorkloadGroupStmt stmt) throws DdlException public void dropWorkloadGroup(DropWorkloadGroupStmt stmt) throws DdlException { String workloadGroupName = stmt.getWorkloadGroupName(); - if (DEFAULT_GROUP_NAME.equals(workloadGroupName)) { - throw new DdlException("Dropping default workload group " + workloadGroupName + " is not allowed"); + if (DEFAULT_GROUP_NAME.equals(workloadGroupName) || INTERNAL_GROUP_NAME.equals(workloadGroupName)) { + throw new DdlException("Dropping workload group " + workloadGroupName + " is not allowed"); } // if a workload group exists in user property, it should not be dropped diff --git a/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgrTest.java index 5f1e35659667ab..d729881358e429 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgrTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgrTest.java @@ -235,4 +235,226 @@ public void testAlterWorkloadGroup() throws UserException { } Assert.assertTrue(tWorkloadGroup1.getWorkloadGroupInfo().getCpuShare() == 5); } + + @Test + public void testMultiTagCreateWorkloadGroup() throws UserException { + Config.enable_workload_group = true; + WorkloadGroupMgr workloadGroupMgr = new WorkloadGroupMgr(); + + { + String name = "empty_g1"; + Map properties = Maps.newHashMap(); + properties.put(WorkloadGroup.MEMORY_LIMIT, "50%"); + properties.put(WorkloadGroup.TAG, ""); + CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties); + workloadGroupMgr.createWorkloadGroup(createStmt); + } + + { + String name = "empty_g2"; + Map properties = Maps.newHashMap(); + properties.put(WorkloadGroup.MEMORY_LIMIT, "10%"); + CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties); + workloadGroupMgr.createWorkloadGroup(createStmt); + } + + { + String name = "not_empty_g1"; + Map properties = Maps.newHashMap(); + properties.put(WorkloadGroup.MEMORY_LIMIT, "30%"); + properties.put(WorkloadGroup.TAG, "cn1,cn2"); + CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties); + workloadGroupMgr.createWorkloadGroup(createStmt); + } + + { + String name = "not_empty_g2"; + Map properties = Maps.newHashMap(); + properties.put(WorkloadGroup.MEMORY_LIMIT, "30%"); + properties.put(WorkloadGroup.TAG, "cn3,cn2"); + CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties); + workloadGroupMgr.createWorkloadGroup(createStmt); + } + + + { + String name = "not_empty_g3"; + Map properties = Maps.newHashMap(); + properties.put(WorkloadGroup.MEMORY_LIMIT, "70%"); + properties.put(WorkloadGroup.TAG, "cn2,cn100"); + try { + CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties); + workloadGroupMgr.createWorkloadGroup(createStmt); + } catch (DdlException e) { + Assert.assertTrue(e.getMessage().contains("The sum of all workload group " + WorkloadGroup.MEMORY_LIMIT)); + } + } + + { + String name = "not_empty_g3"; + Map properties = Maps.newHashMap(); + properties.put(WorkloadGroup.MEMORY_LIMIT, "70%"); + properties.put(WorkloadGroup.TAG, "cn3,cn100"); + CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties); + workloadGroupMgr.createWorkloadGroup(createStmt); + } + + { + String name = "not_empty_g5"; + Map properties = Maps.newHashMap(); + properties.put(WorkloadGroup.MEMORY_LIMIT, "70%"); + properties.put(WorkloadGroup.TAG, "cn5"); + CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties); + workloadGroupMgr.createWorkloadGroup(createStmt); + } + + { + String name = "not_empty_g6"; + Map properties = Maps.newHashMap(); + properties.put(WorkloadGroup.MEMORY_LIMIT, "30%"); + properties.put(WorkloadGroup.TAG, "cn5"); + CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties); + workloadGroupMgr.createWorkloadGroup(createStmt); + } + + { + String name = "not_empty_g7"; + Map properties = Maps.newHashMap(); + properties.put(WorkloadGroup.MEMORY_LIMIT, "70%"); + properties.put(WorkloadGroup.TAG, "cn5"); + try { + CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties); + workloadGroupMgr.createWorkloadGroup(createStmt); + } catch (DdlException e) { + Assert.assertTrue(e.getMessage().contains("The sum of all workload group " + WorkloadGroup.MEMORY_LIMIT)); + } + } + + } + + + @Test + public void testMultiTagAlterWorkloadGroup() throws UserException { + Config.enable_workload_group = true; + WorkloadGroupMgr workloadGroupMgr = new WorkloadGroupMgr(); + { + String name = "empty_g1"; + Map properties = Maps.newHashMap(); + properties.put(WorkloadGroup.MEMORY_LIMIT, "50%"); + properties.put(WorkloadGroup.TAG, ""); + CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties); + workloadGroupMgr.createWorkloadGroup(createStmt); + } + + { + String name = "empty_g2"; + Map properties = Maps.newHashMap(); + properties.put(WorkloadGroup.MEMORY_LIMIT, "10%"); + CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties); + workloadGroupMgr.createWorkloadGroup(createStmt); + } + + { + String name = "not_empty_g1"; + Map properties = Maps.newHashMap(); + properties.put(WorkloadGroup.MEMORY_LIMIT, "30%"); + properties.put(WorkloadGroup.TAG, "cn1,cn2"); + CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties); + workloadGroupMgr.createWorkloadGroup(createStmt); + } + + { + String name = "not_empty_g2"; + Map properties = Maps.newHashMap(); + properties.put(WorkloadGroup.MEMORY_LIMIT, "30%"); + properties.put(WorkloadGroup.TAG, "cn3,cn2"); + CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties); + workloadGroupMgr.createWorkloadGroup(createStmt); + } + + { + String name = "not_empty_g3"; + Map properties = Maps.newHashMap(); + properties.put(WorkloadGroup.MEMORY_LIMIT, "30%"); + properties.put(WorkloadGroup.TAG, "cn2,cn100"); + CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties); + workloadGroupMgr.createWorkloadGroup(createStmt); + } + + { + String name = "not_empty_g3"; + Map properties = Maps.newHashMap(); + properties.put(WorkloadGroup.MEMORY_LIMIT, "70%"); + properties.put(WorkloadGroup.TAG, "cn2,cn100"); + AlterWorkloadGroupStmt alterStmt = new AlterWorkloadGroupStmt(name, properties); + try { + workloadGroupMgr.alterWorkloadGroup(alterStmt); + } catch (DdlException e) { + Assert.assertTrue(e.getMessage().contains("The sum of all workload group " + WorkloadGroup.MEMORY_LIMIT)); + } + } + } + + + @Test + public void testMultiTagCreateWorkloadGroupWithNoTag() throws UserException { + Config.enable_workload_group = true; + WorkloadGroupMgr workloadGroupMgr = new WorkloadGroupMgr(); + + { + String name = "not_empty_g1"; + Map properties = Maps.newHashMap(); + properties.put(WorkloadGroup.MEMORY_LIMIT, "30%"); + properties.put(WorkloadGroup.TAG, "cn1,cn2"); + CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties); + workloadGroupMgr.createWorkloadGroup(createStmt); + } + + { + String name = "not_empty_g2"; + Map properties = Maps.newHashMap(); + properties.put(WorkloadGroup.MEMORY_LIMIT, "30%"); + properties.put(WorkloadGroup.TAG, "cn3,cn2"); + CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties); + workloadGroupMgr.createWorkloadGroup(createStmt); + } + + // create not tag workload group + { + String name = "no_tag_g1"; + Map properties = Maps.newHashMap(); + properties.put(WorkloadGroup.MEMORY_LIMIT, "10%"); + properties.put(WorkloadGroup.TAG, ""); + CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties); + workloadGroupMgr.createWorkloadGroup(createStmt); + } + + { + String name = "no_tag_g2"; + Map properties = Maps.newHashMap(); + properties.put(WorkloadGroup.MEMORY_LIMIT, "30%"); + CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties); + workloadGroupMgr.createWorkloadGroup(createStmt); + } + + { + String name = "no_tag_g3"; + Map properties = Maps.newHashMap(); + properties.put(WorkloadGroup.MEMORY_LIMIT, "70%"); + CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties); + try { + workloadGroupMgr.createWorkloadGroup(createStmt); + } catch (DdlException e) { + Assert.assertTrue(e.getMessage().contains("The sum of all workload group " + WorkloadGroup.MEMORY_LIMIT)); + } + } + + { + String name = "no_tag_g3"; + Map properties = Maps.newHashMap(); + properties.put(WorkloadGroup.MEMORY_LIMIT, "30%"); + CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties); + workloadGroupMgr.createWorkloadGroup(createStmt); + } + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java index 8e25efdfada439..70adbbd7f99f5e 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java @@ -153,6 +153,7 @@ public Set getEnableNereidsRules() { @BeforeAll public final void beforeAll() throws Exception { FeConstants.enableInternalSchemaDb = false; + FeConstants.shouldCreateInternalWorkloadGroup = false; beforeCreatingConnectContext(); connectContext = createDefaultCtx(); beforeCluster(); diff --git a/gensrc/thrift/BackendService.thrift b/gensrc/thrift/BackendService.thrift index ed0ae243a1ded0..533999a853ff38 100644 --- a/gensrc/thrift/BackendService.thrift +++ b/gensrc/thrift/BackendService.thrift @@ -327,6 +327,10 @@ struct TPublishTopicResult { 1: required Status.TStatus status } +enum TWorkloadType { + INTERNAL = 2 +} + struct TGetRealtimeExecStatusRequest { // maybe query id or other unique id 1: optional Types.TUniqueId id diff --git a/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy b/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy index 41cc190a017afa..7807578ea81bcf 100644 --- a/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy +++ b/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy @@ -176,6 +176,30 @@ suite("test_crud_wlg") { exception "can not be greater than 100%" } + // test alter tag and type + test { + sql "alter workload group test_group properties ( 'internal_type'='13' );" + + exception "internal_type can not be create or modified" + } + + test { + sql "create workload group inter_wg properties('internal_type'='123');" + exception "internal_type can not be create or modified" + } + + test { + sql "alter workload group normal properties ('tag'='123')" + + exception "_internal and normal group can not set tag" + } + + test { + sql "alter workload group _internal properties ('tag'='123')" + + exception "_internal and normal group can not set tag" + } + sql "alter workload group test_group properties ( 'cpu_hard_limit'='20%' );" qt_cpu_hard_limit_1 """ select count(1) from ${table_name} """ qt_cpu_hard_limit_2 "select name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num from information_schema.workload_groups where name in ('normal','test_group') order by name;" @@ -492,6 +516,11 @@ suite("test_crud_wlg") { // test workload group's tag property, cpu_hard_limit + test { + sql "create workload group tag_test properties('tag'=' a, b , c ');" + exception "tag format is illegal" + } + test { sql "create workload group if not exists tag1_wg1 properties ( 'cpu_hard_limit'='101%', 'tag'='tag1')" exception "must be a positive integer"