diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 483e7753ec2134..f047071139e831 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -219,6 +219,7 @@ DEFINE_Int32(check_consistency_worker_count, "1"); DEFINE_Int32(upload_worker_count, "1"); // the count of thread to download DEFINE_Int32(download_worker_count, "1"); +DEFINE_Int32(num_query_ctx_map_partitions, "128"); // the count of thread to make snapshot DEFINE_Int32(make_snapshot_worker_count, "5"); // the count of thread to release snapshot diff --git a/be/src/common/config.h b/be/src/common/config.h index 3579ce54fce714..29080a56defbcf 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1346,7 +1346,7 @@ DECLARE_Int32(spill_io_thread_pool_thread_num); DECLARE_Int32(spill_io_thread_pool_queue_size); DECLARE_mBool(check_segment_when_build_rowset_meta); - +DECLARE_Int32(num_query_ctx_map_partitions); // max s3 client retry times DECLARE_mInt32(max_s3_client_retry); // When meet s3 429 error, the "get" request will diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index b7bbaf8f206702..0788b5e3206748 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -222,12 +222,95 @@ static std::map> _get_all_running_queries return result; } +inline uint32_t get_map_id(const TUniqueId& query_id, size_t capacity) { + uint32_t value = HashUtil::hash(&query_id.lo, 8, 0); + value = HashUtil::hash(&query_id.hi, 8, value); + return value % capacity; +} + +inline uint32_t get_map_id(std::pair key, size_t capacity) { + uint32_t value = HashUtil::hash(&key.first.lo, 8, 0); + value = HashUtil::hash(&key.first.hi, 8, value); + return value % capacity; +} + +template +ConcurrentContextMap::ConcurrentContextMap() { + _internal_map.resize(config::num_query_ctx_map_partitions); + for (size_t i = 0; i < config::num_query_ctx_map_partitions; i++) { + _internal_map[i] = {std::make_unique(), + phmap::flat_hash_map()}; + } +} + +template +Value ConcurrentContextMap::find(const Key& query_id) { + auto id = get_map_id(query_id, _internal_map.size()); + { + std::shared_lock lock(*_internal_map[id].first); + auto& map = _internal_map[id].second; + auto search = map.find(query_id); + if (search != map.end()) { + return search->second; + } + return std::shared_ptr(nullptr); + } +} + +template +Status ConcurrentContextMap::apply_if_not_exists( + const Key& query_id, std::shared_ptr& query_ctx, ApplyFunction&& function) { + auto id = get_map_id(query_id, _internal_map.size()); + { + std::unique_lock lock(*_internal_map[id].first); + auto& map = _internal_map[id].second; + auto search = map.find(query_id); + if (search != map.end()) { + query_ctx = search->second; + } + if (!query_ctx) { + return function(map); + } + return Status::OK(); + } +} + +template +void ConcurrentContextMap::erase(const Key& query_id) { + auto id = get_map_id(query_id, _internal_map.size()); + { + std::unique_lock lock(*_internal_map[id].first); + auto& map = _internal_map[id].second; + map.erase(query_id); + } +} + +template +void ConcurrentContextMap::insert(const Key& query_id, + std::shared_ptr query_ctx) { + auto id = get_map_id(query_id, _internal_map.size()); + { + std::unique_lock lock(*_internal_map[id].first); + auto& map = _internal_map[id].second; + map.insert({query_id, query_ctx}); + } +} + +template +void ConcurrentContextMap::clear() { + for (auto& pair : _internal_map) { + std::unique_lock lock(*pair.first); + auto& map = pair.second; + map.clear(); + } +} + FragmentMgr::FragmentMgr(ExecEnv* exec_env) : _exec_env(exec_env), _stop_background_threads_latch(1) { _entity = DorisMetrics::instance()->metric_registry()->register_entity("FragmentMgr"); INT_UGAUGE_METRIC_REGISTER(_entity, timeout_canceled_fragment_count); REGISTER_HOOK_METRIC(fragment_instance_count, - [this]() { return _fragment_instance_map.size(); }); + [this]() { return _fragment_instance_map.num_items(); }); auto s = Thread::create( "FragmentMgr", "cancel_timeout_plan_fragment", [this]() { this->cancel_worker(); }, @@ -268,20 +351,17 @@ void FragmentMgr::stop() { _thread_pool->shutdown(); // Only me can delete - { - std::lock_guard lock(_lock); - _fragment_instance_map.clear(); - for (auto& pipeline : _pipeline_map) { - pipeline.second->close_sink(); - } - _pipeline_map.clear(); - } - - { - std::unique_lock lock(_query_ctx_map_lock); - _query_ctx_map.clear(); - } - + _fragment_instance_map.clear(); + _pipeline_map.apply( + [&](phmap::flat_hash_map>& + map) -> Status { + for (auto& pipeline : map) { + pipeline.second->close_sink(); + } + return Status::OK(); + }); + _pipeline_map.clear(); + _query_ctx_map.clear(); _async_report_thread_pool->shutdown(); } @@ -621,13 +701,11 @@ void FragmentMgr::_exec_actual(std::shared_ptr fragment_ex // remove exec state after this fragment finished { - std::lock_guard lock(_lock); _fragment_instance_map.erase(fragment_executor->fragment_instance_id()); LOG_INFO("Instance {} finished", print_id(fragment_executor->fragment_instance_id())); } if (all_done && query_ctx) { - std::unique_lock lock(_query_ctx_map_lock); _query_ctx_map.erase(query_ctx->query_id()); LOG_INFO("Query {} finished", print_id(query_ctx->query_id())); } @@ -721,15 +799,13 @@ Status FragmentMgr::start_query_execution(const PExecPlanFragmentStartRequest* r TUniqueId query_id; query_id.__set_hi(request->query_id().hi()); query_id.__set_lo(request->query_id().lo()); - std::shared_lock lock(_query_ctx_map_lock); - auto search = _query_ctx_map.find(query_id); - if (search == _query_ctx_map.end()) { + q_ctx = _query_ctx_map.find(query_id); + if (q_ctx == nullptr) { return Status::InternalError( "Failed to get query fragments context. Query may be " "timeout or be cancelled. host: {}", BackendOptions::get_localhost()); } - q_ctx = search->second; } q_ctx->set_ready_to_execute(false); LOG_INFO("Query {} start execution", print_id(query_id)); @@ -742,7 +818,6 @@ void FragmentMgr::remove_pipeline_context( bool all_done = false; TUniqueId query_id = f_context->get_query_id(); { - std::lock_guard lock(_lock); std::vector ins_ids; f_context->instance_ids(ins_ids); all_done = q_context->countdown(ins_ids.size()); @@ -754,7 +829,6 @@ void FragmentMgr::remove_pipeline_context( } } if (all_done) { - std::unique_lock lock(_query_ctx_map_lock); _query_ctx_map.erase(query_id); LOG_INFO("Query {} finished", print_id(query_id)); } @@ -768,98 +842,90 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo { return Status::InternalError("FragmentMgr._get_query_ctx.failed"); }); if (params.is_simplified_param) { // Get common components from _query_ctx_map - std::shared_lock lock(_query_ctx_map_lock); - auto search = _query_ctx_map.find(query_id); - if (search == _query_ctx_map.end()) { + query_ctx = _query_ctx_map.find(query_id); + if (query_ctx == nullptr) { return Status::InternalError( "Failed to get query fragments context. Query may be " "timeout or be cancelled. host: {}", BackendOptions::get_localhost()); } - query_ctx = search->second; } else { // Find _query_ctx_map, in case some other request has already // create the query fragments context. - { - std::shared_lock lock(_query_ctx_map_lock); - auto search = _query_ctx_map.find(query_id); - if (search != _query_ctx_map.end()) { - query_ctx = search->second; - return Status::OK(); - } - } - - std::unique_lock lock(_query_ctx_map_lock); - auto search = _query_ctx_map.find(query_id); - if (search != _query_ctx_map.end()) { - query_ctx = search->second; - return Status::OK(); - } - - TNetworkAddress current_connect_fe_addr; - // for gray upragde between 2.1 version, fe may not set current_connect_fe, - // then use coord addr instead - if (params.__isset.current_connect_fe) { - current_connect_fe_addr = params.current_connect_fe; - } else { - current_connect_fe_addr = params.coord; - } + RETURN_IF_ERROR(_query_ctx_map.apply_if_not_exists( + query_id, query_ctx, + [&](phmap::flat_hash_map>& map) -> Status { + TNetworkAddress current_connect_fe_addr; + // for gray upragde between 2.1 version, fe may not set current_connect_fe, + // then use coord addr instead + if (params.__isset.current_connect_fe) { + current_connect_fe_addr = params.current_connect_fe; + } else { + current_connect_fe_addr = params.coord; + } - LOG(INFO) << "query_id: " << print_id(query_id) << ", coord_addr: " << params.coord - << ", total fragment num on current host: " << params.fragment_num_on_host - << ", fe process uuid: " << params.query_options.fe_process_uuid - << ", query type: " << params.query_options.query_type - << ", report audit fe:" << current_connect_fe_addr; - - // This may be a first fragment request of the query. - // Create the query fragments context. - query_ctx = QueryContext::create_shared( - query_id, params.fragment_num_on_host, _exec_env, params.query_options, - params.coord, pipeline, params.is_nereids, current_connect_fe_addr, query_source); - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_ctx->query_mem_tracker); - RETURN_IF_ERROR(DescriptorTbl::create(&(query_ctx->obj_pool), params.desc_tbl, - &(query_ctx->desc_tbl))); - // set file scan range params - if (params.__isset.file_scan_params) { - query_ctx->file_scan_range_params_map = params.file_scan_params; - } + LOG(INFO) << "query_id: " << print_id(query_id) + << ", coord_addr: " << params.coord + << ", total fragment num on current host: " + << params.fragment_num_on_host + << ", fe process uuid: " << params.query_options.fe_process_uuid + << ", query type: " << params.query_options.query_type + << ", report audit fe:" << current_connect_fe_addr; + + // This may be a first fragment request of the query. + // Create the query fragments context. + query_ctx = QueryContext::create_shared( + query_id, params.fragment_num_on_host, _exec_env, params.query_options, + params.coord, pipeline, params.is_nereids, current_connect_fe_addr, + query_source); + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_ctx->query_mem_tracker); + RETURN_IF_ERROR(DescriptorTbl::create(&(query_ctx->obj_pool), params.desc_tbl, + &(query_ctx->desc_tbl))); + // set file scan range params + if (params.__isset.file_scan_params) { + query_ctx->file_scan_range_params_map = params.file_scan_params; + } - query_ctx->query_globals = params.query_globals; + query_ctx->query_globals = params.query_globals; - if (params.__isset.resource_info) { - query_ctx->user = params.resource_info.user; - query_ctx->group = params.resource_info.group; - query_ctx->set_rsc_info = true; - } + if (params.__isset.resource_info) { + query_ctx->user = params.resource_info.user; + query_ctx->group = params.resource_info.group; + query_ctx->set_rsc_info = true; + } - query_ctx->get_shared_hash_table_controller()->set_pipeline_engine_enabled(pipeline); - _set_scan_concurrency(params, query_ctx.get()); - const bool is_pipeline = std::is_same_v; - - if (params.__isset.workload_groups && !params.workload_groups.empty()) { - uint64_t tg_id = params.workload_groups[0].id; - WorkloadGroupPtr workload_group_ptr = - _exec_env->workload_group_mgr()->get_task_group_by_id(tg_id); - if (workload_group_ptr != nullptr) { - RETURN_IF_ERROR(workload_group_ptr->add_query(query_id, query_ctx)); - RETURN_IF_ERROR(query_ctx->set_workload_group(workload_group_ptr)); - _exec_env->runtime_query_statistics_mgr()->set_workload_group_id(print_id(query_id), - tg_id); - - LOG(INFO) << "Query/load id: " << print_id(query_ctx->query_id()) - << ", use workload group: " << workload_group_ptr->debug_string() - << ", is pipeline: " << ((int)is_pipeline); - } else { - LOG(INFO) << "Query/load id: " << print_id(query_ctx->query_id()) - << " carried group info but can not find group in be"; - } - } - // There is some logic in query ctx's dctor, we could not check if exists and delete the - // temp query ctx now. For example, the query id maybe removed from workload group's queryset. - _query_ctx_map.insert(std::make_pair(query_ctx->query_id(), query_ctx)); - LOG(INFO) << "Register query/load memory tracker, query/load id: " - << print_id(query_ctx->query_id()) - << " limit: " << PrettyPrinter::print(query_ctx->mem_limit(), TUnit::BYTES); + query_ctx->get_shared_hash_table_controller()->set_pipeline_engine_enabled( + pipeline); + _set_scan_concurrency(params, query_ctx.get()); + const bool is_pipeline = std::is_same_v; + + if (params.__isset.workload_groups && !params.workload_groups.empty()) { + uint64_t tg_id = params.workload_groups[0].id; + WorkloadGroupPtr workload_group_ptr = + _exec_env->workload_group_mgr()->get_task_group_by_id(tg_id); + if (workload_group_ptr != nullptr) { + RETURN_IF_ERROR(workload_group_ptr->add_query(query_id, query_ctx)); + RETURN_IF_ERROR(query_ctx->set_workload_group(workload_group_ptr)); + _exec_env->runtime_query_statistics_mgr()->set_workload_group_id( + print_id(query_id), tg_id); + + LOG(INFO) << "Query/load id: " << print_id(query_ctx->query_id()) + << ", use workload group: " + << workload_group_ptr->debug_string() + << ", is pipeline: " << ((int)is_pipeline); + } else { + LOG(INFO) << "Query/load id: " << print_id(query_ctx->query_id()) + << " carried group info but can not find group in be"; + } + } + // There is some logic in query ctx's dctor, we could not check if exists and delete the + // temp query ctx now. For example, the query id maybe removed from workload group's queryset. + map.insert({query_id, query_ctx}); + LOG(INFO) << "Register query/load memory tracker, query/load id: " + << print_id(query_ctx->query_id()) << " limit: " + << PrettyPrinter::print(query_ctx->mem_limit(), TUnit::BYTES); + return Status::OK(); + })); } return Status::OK(); } @@ -874,9 +940,8 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, << apache::thrift::ThriftDebugString(params.query_options).c_str(); const TUniqueId& fragment_instance_id = params.params.fragment_instance_id; { - std::lock_guard lock(_lock); auto iter = _fragment_instance_map.find(fragment_instance_id); - if (iter != _fragment_instance_map.end()) { + if (iter != nullptr) { // Duplicated LOG(WARNING) << "duplicate fragment instance id: " << print_id(fragment_instance_id); return Status::OK(); @@ -894,8 +959,7 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, // Need lock here, because it will modify fragment ids and std::vector may resize and reallocate // memory, but query_is_canncelled will traverse the vector, it will core. // query_is_cancelled is called in allocator, we has to avoid dead lock. - std::lock_guard lock(_lock); - query_ctx->fragment_instance_ids.push_back(fragment_instance_id); + query_ctx->push_instance_ids(fragment_instance_id); } auto fragment_executor = std::make_shared( @@ -921,12 +985,10 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, params.params, params.params.query_id, params.query_options, &handler, RuntimeFilterParamsContext::create(fragment_executor->runtime_state()))); { - std::lock_guard lock(_lock); if (handler) { query_ctx->set_merge_controller_handler(handler); } - _fragment_instance_map.insert( - std::make_pair(params.params.fragment_instance_id, fragment_executor)); + _fragment_instance_map.insert(params.params.fragment_instance_id, fragment_executor); } auto st = _thread_pool->submit_func([this, fragment_executor, cb]() { @@ -938,7 +1000,6 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, if (!st.ok()) { { // Remove the exec state added - std::lock_guard lock(_lock); _fragment_instance_map.erase(params.params.fragment_instance_id); } fragment_executor->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, @@ -957,27 +1018,33 @@ std::string FragmentMgr::dump_pipeline_tasks(int64_t duration) { auto t = MonotonicNanos(); size_t i = 0; { - std::lock_guard lock(_lock); fmt::format_to(debug_string_buffer, "{} pipeline fragment contexts are still running! duration_limit={}\n", - _pipeline_map.size(), duration); + _pipeline_map.num_items(), duration); timespec now; clock_gettime(CLOCK_MONOTONIC, &now); - for (auto& it : _pipeline_map) { - auto elapsed = (t - it.second->create_time()) / 1000000000.0; - if (elapsed < duration) { - // Only display tasks which has been running for more than {duration} seconds. - continue; - } - auto timeout_second = it.second->timeout_second(); - fmt::format_to(debug_string_buffer, - "No.{} (elapse_second={}s, query_timeout_second={}s, instance_id=" - "{}) : {}\n", - i, elapsed, timeout_second, print_id(it.first), - it.second->debug_string()); - i++; - } + _pipeline_map.apply( + [&](phmap::flat_hash_map>& map) + -> Status { + for (auto& it : map) { + auto elapsed = (t - it.second->create_time()) / 1000000000.0; + if (elapsed < duration) { + // Only display tasks which has been running for more than {duration} seconds. + continue; + } + auto timeout_second = it.second->timeout_second(); + fmt::format_to( + debug_string_buffer, + "No.{} (elapse_second={}s, query_timeout_second={}s, instance_id=" + "{}) : {}\n", + i, elapsed, timeout_second, print_id(it.first), + it.second->debug_string()); + i++; + } + return Status::OK(); + }); } return fmt::to_string(debug_string_buffer); } @@ -1037,14 +1104,13 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, for (const auto& local_param : params.local_params) { const TUniqueId& fragment_instance_id = local_param.fragment_instance_id; - std::lock_guard lock(_lock); auto iter = _pipeline_map.find(fragment_instance_id); - if (iter != _pipeline_map.end()) { + if (iter != nullptr) { return Status::InternalError( "exec_plan_fragment input duplicated fragment_instance_id({})", UniqueId(fragment_instance_id).to_string()); } - query_ctx->fragment_instance_ids.push_back(fragment_instance_id); + query_ctx->push_instance_ids(fragment_instance_id); } if (!params.__isset.need_wait_execution_trigger || !params.need_wait_execution_trigger) { @@ -1052,13 +1118,12 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, } { - std::lock_guard lock(_lock); std::vector ins_ids; reinterpret_cast(context.get()) ->instance_ids(ins_ids); // TODO: simplify this mapping for (const auto& ins_id : ins_ids) { - _pipeline_map.insert({ins_id, context}); + _pipeline_map.insert(ins_id, context); } } query_ctx->set_pipeline_context(params.fragment_id, context); @@ -1071,13 +1136,12 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, const TUniqueId& fragment_instance_id = local_params.fragment_instance_id; { - std::lock_guard lock(_lock); - auto iter = _pipeline_map.find(fragment_instance_id); - if (iter != _pipeline_map.end()) { + auto res = _pipeline_map.find(fragment_instance_id); + if (res != nullptr) { // Duplicated return Status::OK(); } - query_ctx->fragment_instance_ids.push_back(fragment_instance_id); + query_ctx->push_instance_ids(fragment_instance_id); } int64_t duration_ns = 0; @@ -1115,10 +1179,7 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, if (i == 0 && handler) { query_ctx->set_merge_controller_handler(handler); } - { - std::lock_guard lock(_lock); - _pipeline_map.insert(std::make_pair(fragment_instance_id, context)); - } + _pipeline_map.insert(fragment_instance_id, context); return context->submit(); }; @@ -1188,13 +1249,7 @@ void FragmentMgr::_set_scan_concurrency(const Param& params, QueryContext* query } std::shared_ptr FragmentMgr::get_query_context(const TUniqueId& query_id) { - std::shared_lock lock(_query_ctx_map_lock); - auto ctx = _query_ctx_map.find(query_id); - if (ctx != _query_ctx_map.end()) { - return ctx->second; - } else { - return nullptr; - } + return _query_ctx_map.find(query_id); } void FragmentMgr::cancel_query(const TUniqueId& query_id, const PPlanFragmentCancelReason& reason, @@ -1202,15 +1257,13 @@ void FragmentMgr::cancel_query(const TUniqueId& query_id, const PPlanFragmentCan std::shared_ptr query_ctx; std::vector all_instance_ids; { - std::shared_lock lock(_query_ctx_map_lock); - auto ctx_iter = _query_ctx_map.find(query_id); + query_ctx = _query_ctx_map.find(query_id); - if (ctx_iter == _query_ctx_map.end()) { + if (query_ctx == nullptr) { LOG(WARNING) << "Query " << print_id(query_id) << " does not exists, failed to cancel it"; return; } - query_ctx = ctx_iter->second; // Copy instanceids to avoid concurrent modification. // And to reduce the scope of lock. all_instance_ids = query_ctx->fragment_instance_ids; @@ -1224,10 +1277,7 @@ void FragmentMgr::cancel_query(const TUniqueId& query_id, const PPlanFragmentCan } query_ctx->cancel(msg, Status::Cancelled(msg)); - { - std::lock_guard state_lock(_lock); - _query_ctx_map.erase(query_id); - } + _query_ctx_map.erase(query_id); LOG(INFO) << "Query " << print_id(query_id) << " is cancelled and removed. Reason: " << msg; } @@ -1236,22 +1286,10 @@ void FragmentMgr::cancel_instance(const TUniqueId& instance_id, std::shared_ptr pipeline_ctx; std::shared_ptr non_pipeline_ctx; { - std::lock_guard state_lock(_lock); - const bool is_pipeline_instance = _pipeline_map.contains(instance_id); - if (is_pipeline_instance) { - auto itr = _pipeline_map.find(instance_id); - if (itr != _pipeline_map.end()) { - pipeline_ctx = itr->second; - } else { - LOG(WARNING) << "Could not find the pipeline instance id:" << print_id(instance_id) - << " to cancel"; - return; - } - } else { - auto itr = _fragment_instance_map.find(instance_id); - if (itr != _fragment_instance_map.end()) { - non_pipeline_ctx = itr->second; - } else { + pipeline_ctx = _pipeline_map.find(instance_id); + if (!pipeline_ctx) { + non_pipeline_ctx = _fragment_instance_map.find(instance_id); + if (non_pipeline_ctx == nullptr) { LOG(WARNING) << "Could not find the fragment instance id:" << print_id(instance_id) << " to cancel"; return; @@ -1269,14 +1307,10 @@ void FragmentMgr::cancel_instance(const TUniqueId& instance_id, void FragmentMgr::cancel_fragment(const TUniqueId& query_id, int32_t fragment_id, const PPlanFragmentCancelReason& reason, const std::string& msg) { - std::shared_lock lock(_query_ctx_map_lock); - auto q_ctx_iter = _query_ctx_map.find(query_id); - if (q_ctx_iter != _query_ctx_map.end()) { + auto res = _query_ctx_map.find(query_id); + if (res != nullptr) { // Has to use value to keep the shared ptr not deconstructed. - std::shared_ptr q_ctx = q_ctx_iter->second; - // the lock should only be used to protect the map, not scope query ctx - lock.unlock(); - WARN_IF_ERROR(q_ctx->cancel_pipeline_context(fragment_id, reason, msg), + WARN_IF_ERROR(res->cancel_pipeline_context(fragment_id, reason, msg), "fail to cancel fragment"); } else { LOG(WARNING) << "Could not find the query id:" << print_id(query_id) @@ -1314,141 +1348,161 @@ void FragmentMgr::cancel_worker() { VecDateTimeValue now = VecDateTimeValue::local_time(); std::unordered_map, BrpcItem> brpc_stub_with_queries; { - std::lock_guard lock(_lock); - for (auto& fragment_instance_itr : _fragment_instance_map) { - if (fragment_instance_itr.second->is_timeout(now)) { - queries_timeout.push_back(fragment_instance_itr.second->fragment_instance_id()); - } - } - - for (auto& pipeline_itr : _pipeline_map) { - if (pipeline_itr.second->is_timeout(now)) { - std::vector ins_ids; - reinterpret_cast(pipeline_itr.second.get()) - ->instance_ids(ins_ids); - for (auto& ins_id : ins_ids) { - queries_timeout.push_back(ins_id); - } - } else { - pipeline_itr.second->clear_finished_tasks(); - } - } + _fragment_instance_map.apply( + [&](phmap::flat_hash_map>& map) + -> Status { + for (auto& fragment_instance_itr : map) { + if (fragment_instance_itr.second->is_timeout(now)) { + queries_timeout.push_back( + fragment_instance_itr.second->fragment_instance_id()); + } + } + return Status::OK(); + }); + _pipeline_map.apply( + [&](phmap::flat_hash_map< + TUniqueId, std::shared_ptr>& map) + -> Status { + for (auto& pipeline_itr : map) { + if (pipeline_itr.second->is_timeout(now)) { + std::vector ins_ids; + reinterpret_cast( + pipeline_itr.second.get()) + ->instance_ids(ins_ids); + for (auto& ins_id : ins_ids) { + queries_timeout.push_back(ins_id); + } + } else { + pipeline_itr.second->clear_finished_tasks(); + } + } + return Status::OK(); + }); } { - std::unique_lock lock(_query_ctx_map_lock); - for (auto it = _query_ctx_map.begin(); it != _query_ctx_map.end();) { - if (it->second->is_timeout(now)) { - LOG_WARNING("Query {} is timeout", print_id(it->first)); - it = _query_ctx_map.erase(it); - } else { - if (config::enable_brpc_connection_check) { - auto brpc_stubs = it->second->get_using_brpc_stubs(); - for (auto& item : brpc_stubs) { - if (!brpc_stub_with_queries.contains(item.second)) { - brpc_stub_with_queries.emplace(item.second, - BrpcItem {item.first, {it->second}}); - } else { - brpc_stub_with_queries[item.second].queries.emplace_back( - it->second); + _query_ctx_map.apply([&](phmap::flat_hash_map>& + map) -> Status { + for (auto it = map.begin(); it != map.end();) { + if (it->second->is_timeout(now)) { + LOG_WARNING("Query {} is timeout", print_id(it->first)); + it = map.erase(it); + } else { + if (config::enable_brpc_connection_check) { + auto brpc_stubs = it->second->get_using_brpc_stubs(); + for (auto& item : brpc_stubs) { + if (!brpc_stub_with_queries.contains(item.second)) { + brpc_stub_with_queries.emplace( + item.second, BrpcItem {item.first, {it->second}}); + } else { + brpc_stub_with_queries[item.second].queries.emplace_back( + it->second); + } } } + ++it; } - ++it; } - } + return Status::OK(); + }); } { - std::shared_lock lock(_query_ctx_map_lock); // We use a very conservative cancel strategy. // 0. If there are no running frontends, do not cancel any queries. // 1. If query's process uuid is zero, do not cancel // 2. If same process uuid, do not cancel // 3. If fe has zero process uuid, do not cancel - if (running_fes.empty() && !_query_ctx_map.empty()) { + if (running_fes.empty() && _query_ctx_map.num_items() != 0) { LOG_EVERY_N(WARNING, 10) << "Could not find any running frontends, maybe we are upgrading or " "starting? " << "We will not cancel any outdated queries in this situation."; } else { - for (const auto& q : _query_ctx_map) { - auto q_ctx = q.second; - const int64_t fe_process_uuid = q_ctx->get_fe_process_uuid(); - - if (fe_process_uuid == 0) { - // zero means this query is from a older version fe or - // this fe is starting - continue; - } + _query_ctx_map.apply([&](phmap::flat_hash_map>& map) + -> Status { + for (const auto& q : map) { + auto q_ctx = q.second; + const int64_t fe_process_uuid = q_ctx->get_fe_process_uuid(); + + if (fe_process_uuid == 0) { + // zero means this query is from a older version fe or + // this fe is starting + continue; + } - // If the query is not running on the any frontends, cancel it. - if (auto itr = running_queries_on_all_fes.find(fe_process_uuid); - itr != running_queries_on_all_fes.end()) { - // Query not found on this frontend, and the query arrives before the last check - if (itr->second.find(q_ctx->query_id()) == itr->second.end() && - // tv_nsec represents the number of nanoseconds that have elapsed since the time point stored in tv_sec. - // tv_sec is enough, we do not need to check tv_nsec. - q_ctx->get_query_arrival_timestamp().tv_sec < - check_invalid_query_last_timestamp.tv_sec && - q_ctx->get_query_source() == QuerySource::INTERNAL_FRONTEND) { - if (q_ctx->enable_pipeline_x_exec()) { - queries_pipeline_task_leak.push_back(q_ctx->query_id()); - LOG_INFO( - "Query {}, type {} is not found on any frontends, maybe it " - "is leaked.", - print_id(q_ctx->query_id()), - toString(q_ctx->get_query_source())); - continue; + // If the query is not running on the any frontends, cancel it. + if (auto itr = running_queries_on_all_fes.find(fe_process_uuid); + itr != running_queries_on_all_fes.end()) { + // Query not found on this frontend, and the query arrives before the last check + if (itr->second.find(q_ctx->query_id()) == itr->second.end() && + // tv_nsec represents the number of nanoseconds that have elapsed since the time point stored in tv_sec. + // tv_sec is enough, we do not need to check tv_nsec. + q_ctx->get_query_arrival_timestamp().tv_sec < + check_invalid_query_last_timestamp.tv_sec && + q_ctx->get_query_source() == QuerySource::INTERNAL_FRONTEND) { + if (q_ctx->enable_pipeline_x_exec()) { + queries_pipeline_task_leak.push_back(q_ctx->query_id()); + LOG_INFO( + "Query {}, type {} is not found on any frontends, " + "maybe it " + "is leaked.", + print_id(q_ctx->query_id()), + toString(q_ctx->get_query_source())); + continue; + } } } - } - auto query_context = q.second; + auto query_context = q.second; - auto itr = running_fes.find(query_context->coord_addr); - if (itr != running_fes.end()) { - if (q.second->get_fe_process_uuid() == itr->second.info.process_uuid || - itr->second.info.process_uuid == 0) { - continue; - } else { - LOG_WARNING("Coordinator of query {} restarted, going to cancel it.", + auto itr = running_fes.find(query_context->coord_addr); + if (itr != running_fes.end()) { + if (q.second->get_fe_process_uuid() == itr->second.info.process_uuid || + itr->second.info.process_uuid == 0) { + continue; + } else { + LOG_WARNING( + "Coordinator of query {} restarted, going to cancel it.", print_id(q.second->query_id())); - } - } else { - // In some rear cases, the rpc port of follower is not updated in time, - // then the port of this follower will be zero, but acutally it is still running, - // and be has already received the query from follower. - // So we need to check if host is in running_fes. - bool fe_host_is_standing = - std::any_of(running_fes.begin(), running_fes.end(), - [query_context](const auto& fe) { - return fe.first.hostname == - query_context->coord_addr.hostname && - fe.first.port == 0; - }); - - if (fe_host_is_standing) { - LOG_WARNING( - "Coordinator {}:{} is not found, but its host is still " - "running with an unstable rpc port, not going to cancel " - "it.", - query_context->coord_addr.hostname, - query_context->coord_addr.port, - print_id(query_context->query_id())); - continue; + } } else { - LOG_WARNING( - "Could not find target coordinator {}:{} of query {}, " - "going to " - "cancel it.", - query_context->coord_addr.hostname, - query_context->coord_addr.port, - print_id(query_context->query_id())); + // In some rear cases, the rpc port of follower is not updated in time, + // then the port of this follower will be zero, but acutally it is still running, + // and be has already received the query from follower. + // So we need to check if host is in running_fes. + bool fe_host_is_standing = std::any_of( + running_fes.begin(), running_fes.end(), + [query_context](const auto& fe) { + return fe.first.hostname == + query_context->coord_addr.hostname && + fe.first.port == 0; + }); + + if (fe_host_is_standing) { + LOG_WARNING( + "Coordinator {}:{} is not found, but its host is still " + "running with an unstable rpc port, not going to cancel " + "it.", + query_context->coord_addr.hostname, + query_context->coord_addr.port, + print_id(query_context->query_id())); + continue; + } else { + LOG_WARNING( + "Could not find target coordinator {}:{} of query {}, " + "going to " + "cancel it.", + query_context->coord_addr.hostname, + query_context->coord_addr.port, + print_id(query_context->query_id())); + } } - } - // Coorninator of this query has already dead. - queries_to_cancel.push_back(q.first); - } + // Coorninator of this query has already dead. + queries_to_cancel.push_back(q.first); + } + return Status::OK(); + }); } } @@ -1493,15 +1547,18 @@ void FragmentMgr::cancel_worker() { void FragmentMgr::debug(std::stringstream& ss) { // Keep things simple - std::lock_guard lock(_lock); - - ss << "FragmentMgr have " << _fragment_instance_map.size() << " jobs.\n"; + ss << "FragmentMgr have " << _fragment_instance_map.num_items() << " jobs.\n"; ss << "job_id\t\tstart_time\t\texecute_time(s)\n"; VecDateTimeValue now = VecDateTimeValue::local_time(); - for (auto& it : _fragment_instance_map) { - ss << it.first << "\t" << it.second->start_time().debug_string() << "\t" - << now.second_diff(it.second->start_time()) << "\n"; - } + _fragment_instance_map.apply( + [&](phmap::flat_hash_map>& map) + -> Status { + for (auto& it : map) { + ss << it.first << "\t" << it.second->start_time().debug_string() << "\t" + << now.second_diff(it.second->start_time()) << "\n"; + } + return Status::OK(); + }); } void FragmentMgr::_check_brpc_available(const std::shared_ptr& brpc_stub, @@ -1684,26 +1741,22 @@ Status FragmentMgr::apply_filter(const PPublishFilterRequest* request, RuntimeFilterMgr* runtime_filter_mgr = nullptr; if (is_pipeline) { - std::unique_lock lock(_lock); - auto iter = _pipeline_map.find(tfragment_instance_id); - if (iter == _pipeline_map.end()) { + pip_context = _pipeline_map.find(tfragment_instance_id); + if (pip_context == nullptr) { VLOG_CRITICAL << "unknown.... fragment-id:" << fragment_instance_id; return Status::InvalidArgument("fragment-id: {}", fragment_instance_id.to_string()); } - pip_context = iter->second; DCHECK(pip_context != nullptr); runtime_filter_mgr = pip_context->get_query_ctx()->runtime_filter_mgr(); query_thread_context = {pip_context->get_query_ctx()->query_id(), pip_context->get_query_ctx()->query_mem_tracker}; } else { - std::unique_lock lock(_lock); - auto iter = _fragment_instance_map.find(tfragment_instance_id); - if (iter == _fragment_instance_map.end()) { + fragment_executor = _fragment_instance_map.find(tfragment_instance_id); + if (fragment_executor == nullptr) { VLOG_CRITICAL << "unknown.... fragment instance id:" << print_id(tfragment_instance_id); return Status::InvalidArgument("fragment-id: {}", print_id(tfragment_instance_id)); } - fragment_executor = iter->second; DCHECK(fragment_executor != nullptr); runtime_filter_mgr = @@ -1730,16 +1783,14 @@ Status FragmentMgr::apply_filterv2(const PPublishFilterRequestV2* request, const auto& fragment_instance_ids = request->fragment_instance_ids(); { - std::unique_lock lock(_lock); for (UniqueId fragment_instance_id : fragment_instance_ids) { TUniqueId tfragment_instance_id = fragment_instance_id.to_thrift(); if (is_pipeline) { - auto iter = _pipeline_map.find(tfragment_instance_id); - if (iter == _pipeline_map.end()) { + pip_context = _pipeline_map.find(tfragment_instance_id); + if (pip_context == nullptr) { continue; } - pip_context = iter->second; DCHECK(pip_context != nullptr); runtime_filter_mgr = pip_context->get_query_ctx()->runtime_filter_mgr(); @@ -1748,11 +1799,10 @@ Status FragmentMgr::apply_filterv2(const PPublishFilterRequestV2* request, pip_context->get_query_ctx()->query_mem_tracker, pip_context->get_query_ctx()->workload_group()}; } else { - auto iter = _fragment_instance_map.find(tfragment_instance_id); - if (iter == _fragment_instance_map.end()) { + fragment_executor = _fragment_instance_map.find(tfragment_instance_id); + if (fragment_executor == nullptr) { continue; } - fragment_executor = iter->second; DCHECK(fragment_executor != nullptr); runtime_filter_mgr = fragment_executor->get_query_ctx()->runtime_filter_mgr(); @@ -1796,14 +1846,11 @@ Status FragmentMgr::send_filter_size(const PSendFilterSizeRequest* request) { TUniqueId query_id; query_id.__set_hi(queryid.hi); query_id.__set_lo(queryid.lo); - std::shared_lock lock(_query_ctx_map_lock); - auto iter = _query_ctx_map.find(query_id); - if (iter == _query_ctx_map.end()) { + query_ctx = _query_ctx_map.find(query_id); + if (query_ctx == nullptr) { return Status::EndOfFile("Query context (query-id: {}) not found, maybe finished", queryid.to_string()); } - - query_ctx = iter->second; } std::shared_ptr filter_controller; @@ -1819,13 +1866,10 @@ Status FragmentMgr::sync_filter_size(const PSyncFilterSizeRequest* request) { TUniqueId query_id; query_id.__set_hi(queryid.hi); query_id.__set_lo(queryid.lo); - std::shared_lock lock(_query_ctx_map_lock); - auto iter = _query_ctx_map.find(query_id); - if (iter == _query_ctx_map.end()) { + query_ctx = _query_ctx_map.find(query_id); + if (query_ctx == nullptr) { return Status::InvalidArgument("query-id: {}", queryid.to_string()); } - - query_ctx = iter->second; } return query_ctx->runtime_filter_mgr()->sync_filter_size(request); } @@ -1842,15 +1886,10 @@ Status FragmentMgr::merge_filter(const PMergeFilterRequest* request, TUniqueId query_id; query_id.__set_hi(queryid.hi); query_id.__set_lo(queryid.lo); - std::shared_lock lock(_query_ctx_map_lock); - auto iter = _query_ctx_map.find(query_id); - if (iter == _query_ctx_map.end()) { + query_ctx = _query_ctx_map.find(query_id); + if (query_ctx == nullptr) { return Status::InvalidArgument("query-id: {}", queryid.to_string()); } - - // hold reference to pip_context, or else runtime_state can be destroyed - // when filter_controller->merge is still in progress - query_ctx = iter->second; } SCOPED_ATTACH_TASK(query_ctx.get()); auto merge_status = filter_controller->merge(request, attach_data, opt_remote_rf); @@ -1936,17 +1975,19 @@ void FragmentMgr::_setup_shared_hashtable_for_broadcast_join(const TPipelineFrag } void FragmentMgr::get_runtime_query_info(std::vector* query_info_list) { - { - std::shared_lock lock(_query_ctx_map_lock); - for (const auto& q : _query_ctx_map) { - WorkloadQueryInfo workload_query_info; - workload_query_info.query_id = print_id(q.first); - workload_query_info.tquery_id = q.first; - workload_query_info.wg_id = - q.second->workload_group() == nullptr ? -1 : q.second->workload_group()->id(); - query_info_list->push_back(workload_query_info); - } - } + _query_ctx_map.apply( + [&](phmap::flat_hash_map>& map) -> Status { + for (const auto& q : map) { + WorkloadQueryInfo workload_query_info; + workload_query_info.query_id = print_id(q.first); + workload_query_info.tquery_id = q.first; + workload_query_info.wg_id = q.second->workload_group() == nullptr + ? -1 + : q.second->workload_group()->id(); + query_info_list->push_back(workload_query_info); + } + return Status::OK(); + }); } } // namespace doris diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index 53cea30686fa22..1ceedb5337d459 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -68,6 +68,46 @@ class WorkloadQueryInfo; std::string to_load_error_http_path(const std::string& file_name); +template +class ConcurrentContextMap { +public: + using ApplyFunction = std::function&)>; + ConcurrentContextMap(); + Value find(const Key& query_id); + void insert(const Key& query_id, std::shared_ptr); + void clear(); + void erase(const Key& query_id); + size_t num_items() const { + size_t n = 0; + for (auto& pair : _internal_map) { + std::shared_lock lock(*pair.first); + auto& map = pair.second; + n += map.size(); + } + return n; + } + void apply(ApplyFunction&& function) { + for (auto& pair : _internal_map) { + // TODO: Now only the cancel worker do the GC the _query_ctx_map. each query must + // do erase the finish query unless in _query_ctx_map. Rethink the logic is ok + std::unique_lock lock(*pair.first); + static_cast(function(pair.second)); + } + } + + Status apply_if_not_exists(const Key& query_id, std::shared_ptr& query_ctx, + ApplyFunction&& function); + +private: + // The lock should only be used to protect the structures in fragment manager. Has to be + // used in a very small scope because it may dead lock. For example, if the _lock is used + // in prepare stage, the call path is prepare --> expr prepare --> may call allocator + // when allocate failed, allocator may call query_is_cancelled, query is callced will also + // call _lock, so that there is dead lock. + std::vector, phmap::flat_hash_map>> + _internal_map; +}; + // This class used to manage all the fragment execute in this instance class FragmentMgr : public RestMonitorIface { public: @@ -142,10 +182,7 @@ class FragmentMgr : public RestMonitorIface { std::shared_ptr get_query_context(const TUniqueId& query_id); - int32_t running_query_num() { - std::shared_lock ctx_lock(_query_ctx_map_lock); - return _query_ctx_map.size(); - } + int32_t running_query_num() { return _query_ctx_map.num_items(); } std::string dump_pipeline_tasks(int64_t duration = 0); std::string dump_pipeline_tasks(TUniqueId& query_id); @@ -189,21 +226,17 @@ class FragmentMgr : public RestMonitorIface { // This is input params ExecEnv* _exec_env = nullptr; - // The lock should only be used to protect the structures in fragment manager. Has to be - // used in a very small scope because it may dead lock. For example, if the _lock is used - // in prepare stage, the call path is prepare --> expr prepare --> may call allocator - // when allocate failed, allocator may call query_is_cancelled, query is callced will also - // call _lock, so that there is dead lock. - std::mutex _lock; - // Make sure that remove this before no data reference PlanFragmentExecutor - std::unordered_map> _fragment_instance_map; - - std::unordered_map> _pipeline_map; + // (QueryID, FragmentID) -> PipelineFragmentContext + ConcurrentContextMap, PlanFragmentExecutor> + _fragment_instance_map; + // (QueryID, FragmentID) -> PipelineFragmentContext + ConcurrentContextMap, + pipeline::PipelineFragmentContext> + _pipeline_map; - std::shared_mutex _query_ctx_map_lock; // query id -> QueryContext - phmap::flat_hash_map> _query_ctx_map; + ConcurrentContextMap, QueryContext> _query_ctx_map; std::unordered_map> _bf_size_map; CountDownLatch _stop_background_threads_latch; diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index dcff789043a748..66623a46fbc4d2 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -325,7 +325,13 @@ class QueryContext { return _using_brpc_stubs; } + void push_instance_ids(const TUniqueId& ins_id) { + std::lock_guard lock(_ins_lock); + fragment_instance_ids.push_back(ins_id); + } + private: + std::mutex _ins_lock; TUniqueId _query_id; ExecEnv* _exec_env = nullptr; VecDateTimeValue _start_time;