diff --git a/be/src/common/exception.h b/be/src/common/exception.h index 0ec8d334e8b192..ce44e6587499b4 100644 --- a/be/src/common/exception.h +++ b/be/src/common/exception.h @@ -122,3 +122,20 @@ inline const std::string& Exception::to_string() const { return Status::Error(e.code(), e.to_string()); \ } \ } while (0) + +#define ASSIGN_STATUS_IF_CATCH_EXCEPTION(stmt, status_) \ + do { \ + try { \ + doris::enable_thread_catch_bad_alloc++; \ + Defer defer {[&]() { doris::enable_thread_catch_bad_alloc--; }}; \ + { stmt; } \ + } catch (const doris::Exception& e) { \ + if (e.code() == doris::ErrorCode::MEM_ALLOC_FAILED) { \ + status_ = Status::MemoryLimitExceeded(fmt::format( \ + "PreCatch error code:{}, {}, __FILE__:{}, __LINE__:{}, __FUNCTION__:{}", \ + e.code(), e.to_string(), __FILE__, __LINE__, __PRETTY_FUNCTION__)); \ + } else { \ + status_ = e.to_status(); \ + } \ + } \ + } while (0); diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp b/be/src/pipeline/exec/aggregation_sink_operator.cpp index 7a4a9d9c951c27..8e34de9bf98443 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp @@ -151,7 +151,7 @@ Status AggSinkLocalState::open(RuntimeState* state) { // this could cause unable to get JVM if (Base::_shared_state->probe_expr_ctxs.empty()) { // _create_agg_status may acquire a lot of memory, may allocate failed when memory is very few - RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_create_agg_status(_agg_data->without_key)); + RETURN_IF_ERROR(_create_agg_status(_agg_data->without_key)); _shared_state->agg_data_created_without_key = true; } return Status::OK(); diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp index 73ce8ce5fb4cbb..4390bebcfddd41 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp @@ -203,8 +203,7 @@ Status DistinctStreamingAggLocalState::_distinct_pre_agg_with_serialized_key( _distinct_row.reserve(rows); if (!_stop_emplace_flag) { - RETURN_IF_CATCH_EXCEPTION( - _emplace_into_hash_table_to_distinct(_distinct_row, key_columns, rows)); + _emplace_into_hash_table_to_distinct(_distinct_row, key_columns, rows); } bool mem_reuse = _parent->cast()._make_nullable_keys.empty() && diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp b/be/src/pipeline/exec/hashjoin_probe_operator.cpp index dc2df872bd5f87..b4d511fe2dd162 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp +++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp @@ -294,37 +294,35 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc Status st; if (local_state._probe_index < local_state._probe_block.rows()) { DCHECK(local_state._has_set_need_null_map_for_probe); - RETURN_IF_CATCH_EXCEPTION({ - std::visit( - [&](auto&& arg, auto&& process_hashtable_ctx, auto need_null_map_for_probe, - auto ignore_null) { - using HashTableProbeType = std::decay_t; - if constexpr (!std::is_same_v) { - using HashTableCtxType = std::decay_t; - if constexpr (!std::is_same_v) { - st = process_hashtable_ctx.template process( - arg, - need_null_map_for_probe - ? &local_state._null_map_column->get_data() - : nullptr, - mutable_join_block, &temp_block, - local_state._probe_block.rows(), _is_mark_join, - _have_other_join_conjunct); - local_state._mem_tracker->set_consumption( - arg.serialized_keys_size(false)); - } else { - st = Status::InternalError("uninited hash table"); - } + std::visit( + [&](auto&& arg, auto&& process_hashtable_ctx, auto need_null_map_for_probe, + auto ignore_null) { + using HashTableProbeType = std::decay_t; + if constexpr (!std::is_same_v) { + using HashTableCtxType = std::decay_t; + if constexpr (!std::is_same_v) { + st = process_hashtable_ctx + .template process( + arg, + need_null_map_for_probe + ? &local_state._null_map_column->get_data() + : nullptr, + mutable_join_block, &temp_block, + local_state._probe_block.rows(), _is_mark_join, + _have_other_join_conjunct); + local_state._mem_tracker->set_consumption( + arg.serialized_keys_size(false)); } else { - st = Status::InternalError("uninited hash table probe"); + st = Status::InternalError("uninited hash table"); } - }, - *local_state._shared_state->hash_table_variants, - *local_state._process_hashtable_ctx_variants, - vectorized::make_bool_variant(local_state._need_null_map_for_probe), - vectorized::make_bool_variant(local_state._shared_state->probe_ignore_null)); - }); + } else { + st = Status::InternalError("uninited hash table probe"); + } + }, + *local_state._shared_state->hash_table_variants, + *local_state._process_hashtable_ctx_variants, + vectorized::make_bool_variant(local_state._need_null_map_for_probe), + vectorized::make_bool_variant(local_state._shared_state->probe_ignore_null)); } else if (local_state._probe_eos) { if (_is_right_semi_anti || (_is_outer_join && _join_op != TJoinOp::LEFT_OUTER_JOIN)) { std::visit( @@ -457,7 +455,7 @@ Status HashJoinProbeLocalState::filter_data_and_build_output(RuntimeState* state temp_block->columns())); } - RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_build_output_block(temp_block, output_block, false)); + RETURN_IF_ERROR(_build_output_block(temp_block, output_block, false)); _reset_tuple_is_null_column(); reached_limit(output_block, eos); return Status::OK(); diff --git a/be/src/pipeline/exec/join/process_hash_table_probe_impl.h b/be/src/pipeline/exec/join/process_hash_table_probe_impl.h index 5e023f2c86158f..3ffdb9cb990c32 100644 --- a/be/src/pipeline/exec/join/process_hash_table_probe_impl.h +++ b/be/src/pipeline/exec/join/process_hash_table_probe_impl.h @@ -283,10 +283,10 @@ Status ProcessHashTableProbe::do_process(HashTableType& hash_table_c return true; }; - RETURN_IF_CATCH_EXCEPTION(probe_side_output_column( + probe_side_output_column( mcol, *_left_output_slot_flags, current_offset, last_probe_index, check_all_match_one(_probe_indexs, last_probe_index, current_offset), - with_other_conjuncts)); + with_other_conjuncts); } output_block->swap(mutable_block.to_block()); diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp index 7dc31cabddb438..84112151e63432 100644 --- a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp +++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp @@ -165,7 +165,7 @@ Status NestedLoopJoinProbeLocalState::generate_join_block_data(RuntimeState* sta } if constexpr (set_probe_side_flag) { - RETURN_IF_ERROR_OR_CATCH_EXCEPTION( + RETURN_IF_ERROR( (_do_filtering_and_update_visited_flags( &_join_block, !p._is_left_semi_anti))); @@ -185,10 +185,9 @@ Status NestedLoopJoinProbeLocalState::generate_join_block_data(RuntimeState* sta } if constexpr (!set_probe_side_flag) { - RETURN_IF_ERROR_OR_CATCH_EXCEPTION( - (_do_filtering_and_update_visited_flags(&_join_block, - !p._is_right_semi_anti))); + RETURN_IF_ERROR((_do_filtering_and_update_visited_flags( + &_join_block, !p._is_right_semi_anti))); _update_additional_flags(&_join_block); } @@ -499,8 +498,7 @@ Status NestedLoopJoinProbeOperatorX::pull(RuntimeState* state, vectorized::Block bool* eos) const { auto& local_state = get_local_state(state); if (_is_output_left_side_only) { - RETURN_IF_ERROR_OR_CATCH_EXCEPTION( - local_state._build_output_block(local_state._child_block.get(), block)); + RETURN_IF_ERROR(local_state._build_output_block(local_state._child_block.get(), block)); *eos = local_state._shared_state->left_side_eos; local_state._need_more_input_data = !local_state._shared_state->left_side_eos; } else { @@ -522,8 +520,7 @@ Status NestedLoopJoinProbeOperatorX::pull(RuntimeState* state, vectorized::Block RETURN_IF_ERROR(vectorized::VExprContext::filter_block( local_state._conjuncts, &tmp_block, tmp_block.columns())); } - RETURN_IF_ERROR_OR_CATCH_EXCEPTION( - local_state._build_output_block(&tmp_block, block, false)); + RETURN_IF_ERROR(local_state._build_output_block(&tmp_block, block, false)); local_state._reset_tuple_is_null_column(); } local_state._join_block.clear_column_data(); diff --git a/be/src/pipeline/exec/sort_source_operator.cpp b/be/src/pipeline/exec/sort_source_operator.cpp index 8926282870861d..fa891196151345 100644 --- a/be/src/pipeline/exec/sort_source_operator.cpp +++ b/be/src/pipeline/exec/sort_source_operator.cpp @@ -61,8 +61,7 @@ Status SortSourceOperatorX::open(RuntimeState* state) { Status SortSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block, bool* eos) { auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); - RETURN_IF_ERROR_OR_CATCH_EXCEPTION( - local_state._shared_state->sorter->get_next(state, block, eos)); + RETURN_IF_ERROR(local_state._shared_state->sorter->get_next(state, block, eos)); local_state.reached_limit(block, eos); return Status::OK(); } diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp b/be/src/pipeline/exec/streaming_aggregation_operator.cpp index 40b63783c12cdf..85cf8487575d63 100644 --- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp +++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp @@ -716,7 +716,7 @@ Status StreamingAggLocalState::_pre_agg_with_serialized_key(doris::vectorized::B _agg_data->method_variant)); if (!ret_flag) { - RETURN_IF_CATCH_EXCEPTION(_emplace_into_hash_table(_places.data(), key_columns, rows)); + _emplace_into_hash_table(_places.data(), key_columns, rows); for (int i = 0; i < _aggregate_evaluators.size(); ++i) { RETURN_IF_ERROR(_aggregate_evaluators[i]->execute_batch_add( diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 72c06721d89299..7a78c2551704e9 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -306,18 +306,17 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re SCOPED_TIMER(_build_pipelines_timer); // 2. Build pipelines with operators in this fragment. auto root_pipeline = add_pipeline(); - RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_build_pipelines(_runtime_state->obj_pool(), request, - *_query_ctx->desc_tbl, &_root_op, - root_pipeline)); + RETURN_IF_ERROR(_build_pipelines(_runtime_state->obj_pool(), request, *_query_ctx->desc_tbl, + &_root_op, root_pipeline)); // 3. Create sink operator if (!request.fragment.__isset.output_sink) { return Status::InternalError("No output sink in this fragment!"); } - RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_create_data_sink( - _runtime_state->obj_pool(), request.fragment.output_sink, - request.fragment.output_exprs, request, root_pipeline->output_row_desc(), - _runtime_state.get(), *_desc_tbl, root_pipeline->id())); + RETURN_IF_ERROR(_create_data_sink(_runtime_state->obj_pool(), request.fragment.output_sink, + request.fragment.output_exprs, request, + root_pipeline->output_row_desc(), _runtime_state.get(), + *_desc_tbl, root_pipeline->id())); RETURN_IF_ERROR(_sink->init(request.fragment.output_sink)); RETURN_IF_ERROR(root_pipeline->set_sink(_sink)); diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index c43410e68a4785..09f32d9d23ef2b 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -343,7 +343,7 @@ Status PipelineTask::execute(bool* eos) { } else { SCOPED_TIMER(_get_block_timer); _get_block_counter->update(1); - RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_root->get_block_after_projects(_state, block, eos)); + RETURN_IF_ERROR(_root->get_block_after_projects(_state, block, eos)); } if (_block->rows() != 0 || *eos) { @@ -353,7 +353,7 @@ Status PipelineTask::execute(bool* eos) { // return error status with EOF, it is special, could not return directly. auto sink_function = [&]() -> Status { Status internal_st; - RETURN_IF_CATCH_EXCEPTION(internal_st = _sink->sink(_state, block, *eos)); + internal_st = _sink->sink(_state, block, *eos); return internal_st; }; status = sink_function(); diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index c45186190b7f69..3b846b60fa8d1c 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -127,31 +127,28 @@ void TaskScheduler::_do_work(size_t index) { bool eos = false; auto status = Status::OK(); - try { - //TODO: use a better enclose to abstracting these - if (ExecEnv::GetInstance()->pipeline_tracer_context()->enabled()) { - TUniqueId query_id = task->query_context()->query_id(); - std::string task_name = task->task_name(); #ifdef __APPLE__ - uint32_t core_id = 0; + uint32_t core_id = 0; #else - uint32_t core_id = sched_getcpu(); + uint32_t core_id = sched_getcpu(); #endif - std::thread::id tid = std::this_thread::get_id(); - uint64_t thread_id = *reinterpret_cast(&tid); - uint64_t start_time = MonotonicMicros(); - - status = task->execute(&eos); - - uint64_t end_time = MonotonicMicros(); - ExecEnv::GetInstance()->pipeline_tracer_context()->record( - {query_id, task_name, core_id, thread_id, start_time, end_time}); - } else { - status = task->execute(&eos); - } - } catch (const Exception& e) { - status = e.to_status(); - } + ASSIGN_STATUS_IF_CATCH_EXCEPTION( + //TODO: use a better enclose to abstracting these + if (ExecEnv::GetInstance()->pipeline_tracer_context()->enabled()) { + TUniqueId query_id = task->query_context()->query_id(); + std::string task_name = task->task_name(); + + std::thread::id tid = std::this_thread::get_id(); + uint64_t thread_id = *reinterpret_cast(&tid); + uint64_t start_time = MonotonicMicros(); + + status = task->execute(&eos); + + uint64_t end_time = MonotonicMicros(); + ExecEnv::GetInstance()->pipeline_tracer_context()->record( + {query_id, task_name, core_id, thread_id, start_time, end_time}); + } else { status = task->execute(&eos); }, + status); task->set_previous_core_id(index); diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 2b0625207e25c2..ead6922ae6b042 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -718,7 +718,8 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, this, std::placeholders::_1, std::placeholders::_2)); { SCOPED_RAW_TIMER(&duration_ns); - auto prepare_st = context->prepare(params); + Status prepare_st = Status::OK(); + ASSIGN_STATUS_IF_CATCH_EXCEPTION(prepare_st = context->prepare(params), prepare_st); if (!prepare_st.ok()) { query_ctx->cancel(prepare_st, params.fragment_id); query_ctx->set_execution_dependency_ready(); diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 668397eff3b9c7..1cfa0ff0965a45 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -1463,7 +1463,22 @@ void PInternalService::fold_constant_expr(google::protobuf::RpcController* contr google::protobuf::Closure* done) { bool ret = _light_work_pool.try_offer([this, request, response, done]() { brpc::ClosureGuard closure_guard(done); - Status st = _fold_constant_expr(request->request(), response); + TFoldConstantParams t_request; + Status st = Status::OK(); + { + const uint8_t* buf = (const uint8_t*)request->request().data(); + uint32_t len = request->request().size(); + st = deserialize_thrift_msg(buf, &len, false, &t_request); + } + if (!st.ok()) { + LOG(WARNING) << "exec fold constant expr failed, errmsg=" << st + << " .and query_id_is: " << t_request.query_id; + } + st = _fold_constant_expr(request->request(), response); + if (!st.ok()) { + LOG(WARNING) << "exec fold constant expr failed, errmsg=" << st + << " .and query_id_is: " << t_request.query_id; + } st.to_protobuf(response->mutable_status()); }); if (!ret) { @@ -1481,12 +1496,8 @@ Status PInternalService::_fold_constant_expr(const std::string& ser_request, RETURN_IF_ERROR(deserialize_thrift_msg(buf, &len, false, &t_request)); } std::unique_ptr fold_executor = std::make_unique(); - Status st = fold_executor->fold_constant_vexpr(t_request, response); - if (!st.ok()) { - LOG(WARNING) << "exec fold constant expr failed, errmsg=" << st - << " .and query_id_is: " << fold_executor->query_id_string(); - } - return st; + RETURN_IF_ERROR_OR_CATCH_EXCEPTION(fold_executor->fold_constant_vexpr(t_request, response)); + return Status::OK(); } void PInternalService::transmit_block(google::protobuf::RpcController* controller, diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index e13ebf7c209ed2..6c1f02530b2efe 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -246,70 +246,76 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr ctx, scanner->start_scan_cpu_timer(); Status status = Status::OK(); bool eos = false; - RuntimeState* state = ctx->state(); - DCHECK(nullptr != state); - if (!scanner->is_init()) { - status = scanner->init(); - if (!status.ok()) { - eos = true; - } - } - - if (!eos && !scanner->is_open()) { - status = scanner->open(state); - if (!status.ok()) { - eos = true; - } - scanner->set_opened(); - } + ASSIGN_STATUS_IF_CATCH_EXCEPTION( + RuntimeState* state = ctx->state(); DCHECK(nullptr != state); + if (!scanner->is_init()) { + status = scanner->init(); + if (!status.ok()) { + eos = true; + } + } - Status rf_status = scanner->try_append_late_arrival_runtime_filter(); - if (!rf_status.ok()) { - LOG(WARNING) << "Failed to append late arrival runtime filter: " << rf_status.to_string(); - } + if (!eos && !scanner->is_open()) { + status = scanner->open(state); + if (!status.ok()) { + eos = true; + } + scanner->set_opened(); + } - size_t raw_bytes_threshold = config::doris_scanner_row_bytes; - size_t raw_bytes_read = 0; - bool first_read = true; - while (!eos && raw_bytes_read < raw_bytes_threshold) { - if (UNLIKELY(ctx->done())) { - eos = true; - break; - } - BlockUPtr free_block = ctx->get_free_block(first_read); - if (free_block == nullptr) { - break; - } - status = scanner->get_block_after_projects(state, free_block.get(), &eos); - first_read = false; - if (!status.ok()) { - LOG(WARNING) << "Scan thread read VScanner failed: " << status.to_string(); - break; - } - auto free_block_bytes = free_block->allocated_bytes(); - raw_bytes_read += free_block_bytes; - if (!scan_task->cached_blocks.empty() && - scan_task->cached_blocks.back().first->rows() + free_block->rows() <= - ctx->batch_size()) { - size_t block_size = scan_task->cached_blocks.back().first->allocated_bytes(); - vectorized::MutableBlock mutable_block(scan_task->cached_blocks.back().first.get()); - status = mutable_block.merge(*free_block); - if (!status.ok()) { - LOG(WARNING) << "Block merge failed: " << status.to_string(); - break; + Status rf_status = scanner->try_append_late_arrival_runtime_filter(); + if (!rf_status.ok()) { + LOG(WARNING) << "Failed to append late arrival runtime filter: " + << rf_status.to_string(); } - scan_task->cached_blocks.back().first.get()->set_columns( - std::move(mutable_block.mutable_columns())); - ctx->return_free_block(std::move(free_block)); - ctx->inc_free_block_usage(scan_task->cached_blocks.back().first->allocated_bytes() - - block_size); - } else { - ctx->inc_free_block_usage(free_block->allocated_bytes()); - scan_task->cached_blocks.emplace_back(std::move(free_block), free_block_bytes); - } - } // end for while - if (UNLIKELY(!status.ok())) { + size_t raw_bytes_threshold = config::doris_scanner_row_bytes; + size_t raw_bytes_read = 0; bool first_read = true; + while (!eos && raw_bytes_read < raw_bytes_threshold) { + if (UNLIKELY(ctx->done())) { + eos = true; + break; + } + BlockUPtr free_block = ctx->get_free_block(first_read); + if (free_block == nullptr) { + break; + } + status = scanner->get_block_after_projects(state, free_block.get(), &eos); + first_read = false; + if (!status.ok()) { + LOG(WARNING) << "Scan thread read VScanner failed: " << status.to_string(); + break; + } + auto free_block_bytes = free_block->allocated_bytes(); + raw_bytes_read += free_block_bytes; + if (!scan_task->cached_blocks.empty() && + scan_task->cached_blocks.back().first->rows() + free_block->rows() <= + ctx->batch_size()) { + size_t block_size = scan_task->cached_blocks.back().first->allocated_bytes(); + vectorized::MutableBlock mutable_block( + scan_task->cached_blocks.back().first.get()); + status = mutable_block.merge(*free_block); + if (!status.ok()) { + LOG(WARNING) << "Block merge failed: " << status.to_string(); + break; + } + scan_task->cached_blocks.back().first.get()->set_columns( + std::move(mutable_block.mutable_columns())); + ctx->return_free_block(std::move(free_block)); + ctx->inc_free_block_usage( + scan_task->cached_blocks.back().first->allocated_bytes() - block_size); + } else { + ctx->inc_free_block_usage(free_block->allocated_bytes()); + scan_task->cached_blocks.emplace_back(std::move(free_block), free_block_bytes); + } + } // end for while + + if (UNLIKELY(!status.ok())) { + scan_task->set_status(status); + eos = true; + }, + status); + if (status.is()) { scan_task->set_status(status); eos = true; }