Skip to content

Commit

Permalink
[fix](memory) Refactor memory allocated failure processing (apache#36090
Browse files Browse the repository at this point in the history
)

In this PR,
1. Do not catch exception in operators.
2. Catch exception where we will get an exception.
  • Loading branch information
Gabriel39 authored and dataroaring committed Jun 13, 2024
1 parent 27496b5 commit 4b8ca34
Show file tree
Hide file tree
Showing 14 changed files with 170 additions and 146 deletions.
17 changes: 17 additions & 0 deletions be/src/common/exception.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,3 +122,20 @@ inline const std::string& Exception::to_string() const {
return Status::Error<false>(e.code(), e.to_string()); \
} \
} while (0)

#define ASSIGN_STATUS_IF_CATCH_EXCEPTION(stmt, status_) \
do { \
try { \
doris::enable_thread_catch_bad_alloc++; \
Defer defer {[&]() { doris::enable_thread_catch_bad_alloc--; }}; \
{ stmt; } \
} catch (const doris::Exception& e) { \
if (e.code() == doris::ErrorCode::MEM_ALLOC_FAILED) { \
status_ = Status::MemoryLimitExceeded(fmt::format( \
"PreCatch error code:{}, {}, __FILE__:{}, __LINE__:{}, __FUNCTION__:{}", \
e.code(), e.to_string(), __FILE__, __LINE__, __PRETTY_FUNCTION__)); \
} else { \
status_ = e.to_status(); \
} \
} \
} while (0);
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/aggregation_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ Status AggSinkLocalState::open(RuntimeState* state) {
// this could cause unable to get JVM
if (Base::_shared_state->probe_expr_ctxs.empty()) {
// _create_agg_status may acquire a lot of memory, may allocate failed when memory is very few
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_create_agg_status(_agg_data->without_key));
RETURN_IF_ERROR(_create_agg_status(_agg_data->without_key));
_shared_state->agg_data_created_without_key = true;
}
return Status::OK();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,7 @@ Status DistinctStreamingAggLocalState::_distinct_pre_agg_with_serialized_key(
_distinct_row.reserve(rows);

if (!_stop_emplace_flag) {
RETURN_IF_CATCH_EXCEPTION(
_emplace_into_hash_table_to_distinct(_distinct_row, key_columns, rows));
_emplace_into_hash_table_to_distinct(_distinct_row, key_columns, rows);
}

bool mem_reuse = _parent->cast<DistinctStreamingAggOperatorX>()._make_nullable_keys.empty() &&
Expand Down
58 changes: 28 additions & 30 deletions be/src/pipeline/exec/hashjoin_probe_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -294,37 +294,35 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc
Status st;
if (local_state._probe_index < local_state._probe_block.rows()) {
DCHECK(local_state._has_set_need_null_map_for_probe);
RETURN_IF_CATCH_EXCEPTION({
std::visit(
[&](auto&& arg, auto&& process_hashtable_ctx, auto need_null_map_for_probe,
auto ignore_null) {
using HashTableProbeType = std::decay_t<decltype(process_hashtable_ctx)>;
if constexpr (!std::is_same_v<HashTableProbeType, std::monostate>) {
using HashTableCtxType = std::decay_t<decltype(arg)>;
if constexpr (!std::is_same_v<HashTableCtxType, std::monostate>) {
st = process_hashtable_ctx.template process<need_null_map_for_probe,
ignore_null>(
arg,
need_null_map_for_probe
? &local_state._null_map_column->get_data()
: nullptr,
mutable_join_block, &temp_block,
local_state._probe_block.rows(), _is_mark_join,
_have_other_join_conjunct);
local_state._mem_tracker->set_consumption(
arg.serialized_keys_size(false));
} else {
st = Status::InternalError("uninited hash table");
}
std::visit(
[&](auto&& arg, auto&& process_hashtable_ctx, auto need_null_map_for_probe,
auto ignore_null) {
using HashTableProbeType = std::decay_t<decltype(process_hashtable_ctx)>;
if constexpr (!std::is_same_v<HashTableProbeType, std::monostate>) {
using HashTableCtxType = std::decay_t<decltype(arg)>;
if constexpr (!std::is_same_v<HashTableCtxType, std::monostate>) {
st = process_hashtable_ctx
.template process<need_null_map_for_probe, ignore_null>(
arg,
need_null_map_for_probe
? &local_state._null_map_column->get_data()
: nullptr,
mutable_join_block, &temp_block,
local_state._probe_block.rows(), _is_mark_join,
_have_other_join_conjunct);
local_state._mem_tracker->set_consumption(
arg.serialized_keys_size(false));
} else {
st = Status::InternalError("uninited hash table probe");
st = Status::InternalError("uninited hash table");
}
},
*local_state._shared_state->hash_table_variants,
*local_state._process_hashtable_ctx_variants,
vectorized::make_bool_variant(local_state._need_null_map_for_probe),
vectorized::make_bool_variant(local_state._shared_state->probe_ignore_null));
});
} else {
st = Status::InternalError("uninited hash table probe");
}
},
*local_state._shared_state->hash_table_variants,
*local_state._process_hashtable_ctx_variants,
vectorized::make_bool_variant(local_state._need_null_map_for_probe),
vectorized::make_bool_variant(local_state._shared_state->probe_ignore_null));
} else if (local_state._probe_eos) {
if (_is_right_semi_anti || (_is_outer_join && _join_op != TJoinOp::LEFT_OUTER_JOIN)) {
std::visit(
Expand Down Expand Up @@ -457,7 +455,7 @@ Status HashJoinProbeLocalState::filter_data_and_build_output(RuntimeState* state
temp_block->columns()));
}

RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_build_output_block(temp_block, output_block, false));
RETURN_IF_ERROR(_build_output_block(temp_block, output_block, false));
_reset_tuple_is_null_column();
reached_limit(output_block, eos);
return Status::OK();
Expand Down
4 changes: 2 additions & 2 deletions be/src/pipeline/exec/join/process_hash_table_probe_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -283,10 +283,10 @@ Status ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c
return true;
};

RETURN_IF_CATCH_EXCEPTION(probe_side_output_column(
probe_side_output_column(
mcol, *_left_output_slot_flags, current_offset, last_probe_index,
check_all_match_one(_probe_indexs, last_probe_index, current_offset),
with_other_conjuncts));
with_other_conjuncts);
}

output_block->swap(mutable_block.to_block());
Expand Down
15 changes: 6 additions & 9 deletions be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ Status NestedLoopJoinProbeLocalState::generate_join_block_data(RuntimeState* sta
}

if constexpr (set_probe_side_flag) {
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(
RETURN_IF_ERROR(
(_do_filtering_and_update_visited_flags<set_build_side_flag,
set_probe_side_flag, ignore_null>(
&_join_block, !p._is_left_semi_anti)));
Expand All @@ -185,10 +185,9 @@ Status NestedLoopJoinProbeLocalState::generate_join_block_data(RuntimeState* sta
}

if constexpr (!set_probe_side_flag) {
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(
(_do_filtering_and_update_visited_flags<set_build_side_flag, set_probe_side_flag,
ignore_null>(&_join_block,
!p._is_right_semi_anti)));
RETURN_IF_ERROR((_do_filtering_and_update_visited_flags<set_build_side_flag,
set_probe_side_flag, ignore_null>(
&_join_block, !p._is_right_semi_anti)));
_update_additional_flags(&_join_block);
}

Expand Down Expand Up @@ -499,8 +498,7 @@ Status NestedLoopJoinProbeOperatorX::pull(RuntimeState* state, vectorized::Block
bool* eos) const {
auto& local_state = get_local_state(state);
if (_is_output_left_side_only) {
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(
local_state._build_output_block(local_state._child_block.get(), block));
RETURN_IF_ERROR(local_state._build_output_block(local_state._child_block.get(), block));
*eos = local_state._shared_state->left_side_eos;
local_state._need_more_input_data = !local_state._shared_state->left_side_eos;
} else {
Expand All @@ -522,8 +520,7 @@ Status NestedLoopJoinProbeOperatorX::pull(RuntimeState* state, vectorized::Block
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(
local_state._conjuncts, &tmp_block, tmp_block.columns()));
}
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(
local_state._build_output_block(&tmp_block, block, false));
RETURN_IF_ERROR(local_state._build_output_block(&tmp_block, block, false));
local_state._reset_tuple_is_null_column();
}
local_state._join_block.clear_column_data();
Expand Down
3 changes: 1 addition & 2 deletions be/src/pipeline/exec/sort_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,7 @@ Status SortSourceOperatorX::open(RuntimeState* state) {
Status SortSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block, bool* eos) {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(
local_state._shared_state->sorter->get_next(state, block, eos));
RETURN_IF_ERROR(local_state._shared_state->sorter->get_next(state, block, eos));
local_state.reached_limit(block, eos);
return Status::OK();
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/streaming_aggregation_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -716,7 +716,7 @@ Status StreamingAggLocalState::_pre_agg_with_serialized_key(doris::vectorized::B
_agg_data->method_variant));

if (!ret_flag) {
RETURN_IF_CATCH_EXCEPTION(_emplace_into_hash_table(_places.data(), key_columns, rows));
_emplace_into_hash_table(_places.data(), key_columns, rows);

for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
RETURN_IF_ERROR(_aggregate_evaluators[i]->execute_batch_add(
Expand Down
13 changes: 6 additions & 7 deletions be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -306,18 +306,17 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re
SCOPED_TIMER(_build_pipelines_timer);
// 2. Build pipelines with operators in this fragment.
auto root_pipeline = add_pipeline();
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_build_pipelines(_runtime_state->obj_pool(), request,
*_query_ctx->desc_tbl, &_root_op,
root_pipeline));
RETURN_IF_ERROR(_build_pipelines(_runtime_state->obj_pool(), request, *_query_ctx->desc_tbl,
&_root_op, root_pipeline));

// 3. Create sink operator
if (!request.fragment.__isset.output_sink) {
return Status::InternalError("No output sink in this fragment!");
}
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_create_data_sink(
_runtime_state->obj_pool(), request.fragment.output_sink,
request.fragment.output_exprs, request, root_pipeline->output_row_desc(),
_runtime_state.get(), *_desc_tbl, root_pipeline->id()));
RETURN_IF_ERROR(_create_data_sink(_runtime_state->obj_pool(), request.fragment.output_sink,
request.fragment.output_exprs, request,
root_pipeline->output_row_desc(), _runtime_state.get(),
*_desc_tbl, root_pipeline->id()));
RETURN_IF_ERROR(_sink->init(request.fragment.output_sink));
RETURN_IF_ERROR(root_pipeline->set_sink(_sink));

Expand Down
4 changes: 2 additions & 2 deletions be/src/pipeline/pipeline_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ Status PipelineTask::execute(bool* eos) {
} else {
SCOPED_TIMER(_get_block_timer);
_get_block_counter->update(1);
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_root->get_block_after_projects(_state, block, eos));
RETURN_IF_ERROR(_root->get_block_after_projects(_state, block, eos));
}

if (_block->rows() != 0 || *eos) {
Expand All @@ -353,7 +353,7 @@ Status PipelineTask::execute(bool* eos) {
// return error status with EOF, it is special, could not return directly.
auto sink_function = [&]() -> Status {
Status internal_st;
RETURN_IF_CATCH_EXCEPTION(internal_st = _sink->sink(_state, block, *eos));
internal_st = _sink->sink(_state, block, *eos);
return internal_st;
};
status = sink_function();
Expand Down
41 changes: 19 additions & 22 deletions be/src/pipeline/task_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,31 +127,28 @@ void TaskScheduler::_do_work(size_t index) {
bool eos = false;
auto status = Status::OK();

try {
//TODO: use a better enclose to abstracting these
if (ExecEnv::GetInstance()->pipeline_tracer_context()->enabled()) {
TUniqueId query_id = task->query_context()->query_id();
std::string task_name = task->task_name();
#ifdef __APPLE__
uint32_t core_id = 0;
uint32_t core_id = 0;
#else
uint32_t core_id = sched_getcpu();
uint32_t core_id = sched_getcpu();
#endif
std::thread::id tid = std::this_thread::get_id();
uint64_t thread_id = *reinterpret_cast<uint64_t*>(&tid);
uint64_t start_time = MonotonicMicros();

status = task->execute(&eos);

uint64_t end_time = MonotonicMicros();
ExecEnv::GetInstance()->pipeline_tracer_context()->record(
{query_id, task_name, core_id, thread_id, start_time, end_time});
} else {
status = task->execute(&eos);
}
} catch (const Exception& e) {
status = e.to_status();
}
ASSIGN_STATUS_IF_CATCH_EXCEPTION(
//TODO: use a better enclose to abstracting these
if (ExecEnv::GetInstance()->pipeline_tracer_context()->enabled()) {
TUniqueId query_id = task->query_context()->query_id();
std::string task_name = task->task_name();

std::thread::id tid = std::this_thread::get_id();
uint64_t thread_id = *reinterpret_cast<uint64_t*>(&tid);
uint64_t start_time = MonotonicMicros();

status = task->execute(&eos);

uint64_t end_time = MonotonicMicros();
ExecEnv::GetInstance()->pipeline_tracer_context()->record(
{query_id, task_name, core_id, thread_id, start_time, end_time});
} else { status = task->execute(&eos); },
status);

task->set_previous_core_id(index);

Expand Down
3 changes: 2 additions & 1 deletion be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -718,7 +718,8 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
this, std::placeholders::_1, std::placeholders::_2));
{
SCOPED_RAW_TIMER(&duration_ns);
auto prepare_st = context->prepare(params);
Status prepare_st = Status::OK();
ASSIGN_STATUS_IF_CATCH_EXCEPTION(prepare_st = context->prepare(params), prepare_st);
if (!prepare_st.ok()) {
query_ctx->cancel(prepare_st, params.fragment_id);
query_ctx->set_execution_dependency_ready();
Expand Down
25 changes: 18 additions & 7 deletions be/src/service/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1463,7 +1463,22 @@ void PInternalService::fold_constant_expr(google::protobuf::RpcController* contr
google::protobuf::Closure* done) {
bool ret = _light_work_pool.try_offer([this, request, response, done]() {
brpc::ClosureGuard closure_guard(done);
Status st = _fold_constant_expr(request->request(), response);
TFoldConstantParams t_request;
Status st = Status::OK();
{
const uint8_t* buf = (const uint8_t*)request->request().data();
uint32_t len = request->request().size();
st = deserialize_thrift_msg(buf, &len, false, &t_request);
}
if (!st.ok()) {
LOG(WARNING) << "exec fold constant expr failed, errmsg=" << st
<< " .and query_id_is: " << t_request.query_id;
}
st = _fold_constant_expr(request->request(), response);
if (!st.ok()) {
LOG(WARNING) << "exec fold constant expr failed, errmsg=" << st
<< " .and query_id_is: " << t_request.query_id;
}
st.to_protobuf(response->mutable_status());
});
if (!ret) {
Expand All @@ -1481,12 +1496,8 @@ Status PInternalService::_fold_constant_expr(const std::string& ser_request,
RETURN_IF_ERROR(deserialize_thrift_msg(buf, &len, false, &t_request));
}
std::unique_ptr<FoldConstantExecutor> fold_executor = std::make_unique<FoldConstantExecutor>();
Status st = fold_executor->fold_constant_vexpr(t_request, response);
if (!st.ok()) {
LOG(WARNING) << "exec fold constant expr failed, errmsg=" << st
<< " .and query_id_is: " << fold_executor->query_id_string();
}
return st;
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(fold_executor->fold_constant_vexpr(t_request, response));
return Status::OK();
}

void PInternalService::transmit_block(google::protobuf::RpcController* controller,
Expand Down
Loading

0 comments on commit 4b8ca34

Please sign in to comment.