diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp index 8cc6ae58a4fb29..ad7f4e6f1847eb 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp @@ -186,9 +186,9 @@ Status PartitionedAggSinkOperatorX::sink(doris::RuntimeState* state, vectorized: revocable_size = revocable_mem_size(state); query_mem_limit = state->get_query_ctx()->get_mem_limit(); LOG(INFO) << fmt::format( - "Query: {}, task {}, agg sink {} eos, need spill: {}, query mem limit: {}, " - "revocable memory: {}", - print_id(state->query_id()), state->task_id(), node_id(), + "Query:{}, agg sink:{}, task:{}, eos, need spill:{}, query mem limit:{}, " + "revocable memory:{}", + print_id(state->query_id()), node_id(), state->task_id(), local_state._shared_state->is_spilled, PrettyPrinter::print_bytes(query_mem_limit), PrettyPrinter::print_bytes(revocable_size)); @@ -268,9 +268,9 @@ Status PartitionedAggSinkLocalState::revoke_memory( RuntimeState* state, const std::shared_ptr& spill_context) { const auto size_to_revoke = _parent->revocable_mem_size(state); LOG(INFO) << fmt::format( - "Query: {}, task {}, agg sink {} revoke_memory, eos: {}, need spill: {}, revocable " - "memory: {}", - print_id(state->query_id()), state->task_id(), _parent->node_id(), _eos, + "Query:{}, agg sink:{}, task:{}, revoke_memory, eos:{}, need spill:{}, revocable " + "memory:{}", + print_id(state->query_id()), _parent->node_id(), state->task_id(), _eos, _shared_state->is_spilled, PrettyPrinter::print_bytes(_parent->revocable_mem_size(state))); if (!_shared_state->is_spilled) { @@ -316,16 +316,17 @@ Status PartitionedAggSinkLocalState::revoke_memory( Defer defer {[&]() { if (!status.ok() || state->is_cancelled()) { if (!status.ok()) { - LOG(WARNING) << "Query " << print_id(query_id) << " agg node " - << Base::_parent->node_id() - << " revoke_memory error: " << status; + LOG(WARNING) << fmt::format( + "Query:{}, agg sink:{}, task:{}, revoke_memory error:{}", + print_id(query_id), Base::_parent->node_id(), state->task_id(), + status); } _shared_state->close(); } else { LOG(INFO) << fmt::format( - "Query: {}, task {}, agg sink {} revoke_memory finish, eos: {}, " - "revocable memory: {}", - print_id(state->query_id()), state->task_id(), _parent->node_id(), + "Query:{}, agg sink:{}, task:{}, revoke_memory finish, eos:{}, " + "revocable memory:{}", + print_id(state->query_id()), _parent->node_id(), state->task_id(), _eos, PrettyPrinter::print_bytes(_parent->revocable_mem_size(state))); } diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp index 8e221a1c7e2341..c87ee24dedb222 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp @@ -250,8 +250,9 @@ Status PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, b Defer defer {[&]() { if (!status.ok() || state->is_cancelled()) { if (!status.ok()) { - LOG(WARNING) << "Query " << print_id(query_id) << " agg node " - << _parent->node_id() << " recover agg data error: " << status; + LOG(WARNING) << fmt::format( + "Query:{}, agg probe:{}, task:{}, recover agg data error:{}", + print_id(query_id), _parent->node_id(), state->task_id(), status); } _shared_state->close(); } @@ -305,15 +306,16 @@ Status PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, b } } - VLOG_DEBUG << "Query: " << print_id(query_id) << " agg node " << _parent->node_id() - << ", task id: " << state->task_id() << " recover partitioned finished, " - << _shared_state->spill_partitions.size() << " partitions left, " - << accumulated_blocks_size - << " bytes read, spill dep: " << (void*)(_spill_dependency.get()); + VLOG_DEBUG << fmt::format( + "Query:{}, agg probe:{}, task:{}, recover partitioned finished, partitions " + "left:{}, bytes read:{}, spill dep:{}", + print_id(query_id), _parent->node_id(), state->task_id(), + _shared_state->spill_partitions.size(), accumulated_blocks_size, + (void*)(_spill_dependency.get())); return status; }; - auto exception_catch_func = [spill_func, query_id]() { + auto exception_catch_func = [this, state, spill_func, query_id]() { DBUG_EXECUTE_IF("fault_inject::partitioned_agg_source::merge_spill_data_cancel", { auto st = Status::InternalError( "fault_inject partitioned_agg_source " @@ -323,8 +325,9 @@ Status PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, b }); auto status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return spill_func(); }); }(); - LOG_IF(INFO, !status.ok()) << "Query : " << print_id(query_id) - << " recover exception : " << status.to_string(); + LOG_IF(INFO, !status.ok()) << fmt::format( + "Query:{}, agg probe:{}, task:{}, recover exception:{}", print_id(query_id), + _parent->node_id(), state->task_id(), status.to_string()); return status; }; @@ -334,10 +337,11 @@ Status PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, b }); _spill_dependency->block(); - VLOG_DEBUG << "Query: " << print_id(query_id) << " agg node " << _parent->node_id() - << ", task id: " << state->task_id() << " begin to recover, " - << _shared_state->spill_partitions.size() - << " partitions left, _spill_dependency: " << (void*)(_spill_dependency.get()); + VLOG_DEBUG << fmt::format( + "Query:{}, agg probe:{}, task:{}, begin to recover, partitions left:{}, " + "_spill_dependency:{}", + print_id(query_id), _parent->node_id(), state->task_id(), + _shared_state->spill_partitions.size(), (void*)(_spill_dependency.get())); return ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit( std::make_shared(state, _spill_dependency, _runtime_profile.get(), _shared_state->shared_from_this(), diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp index 04b83e822c114b..ff9c78c5be496b 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp @@ -230,9 +230,11 @@ Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat } COUNTER_SET(_probe_blocks_bytes, int64_t(not_revoked_size)); - VLOG_DEBUG << "Query: " << print_id(query_id) - << " hash probe revoke done, node: " << p.node_id() - << ", task: " << state->task_id(); + + VLOG_DEBUG << fmt::format( + "Query:{}, hash join probe:{}, task:{}," + " spill_probe_blocks done", + print_id(query_id), p.node_id(), state->task_id()); return Status::OK(); }; @@ -275,9 +277,10 @@ Status PartitionedHashJoinProbeLocalState::finish_spilling(uint32_t partition_in Status PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(RuntimeState* state, uint32_t partition_index, bool& has_data) { - VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", node: " << _parent->node_id() - << ", task id: " << state->task_id() << ", partition: " << partition_index - << " recover_build_blocks_from_disk"; + VLOG_DEBUG << fmt::format( + "Query:{}, hash join probe:{}, task:{}," + " partition:{}, recover_build_blocks_from_disk", + print_id(state->query_id()), _parent->node_id(), state->task_id(), partition_index); auto& spilled_stream = _shared_state->spilled_streams[partition_index]; has_data = false; if (!spilled_stream) { @@ -291,9 +294,10 @@ Status PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(Runtim SCOPED_TIMER(_recovery_build_timer); bool eos = false; - VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", node: " << _parent->node_id() - << ", task id: " << state->task_id() << ", partition: " << partition_index - << ", recoverying build data"; + VLOG_DEBUG << fmt::format( + "Query:{}, hash join probe:{}, task:{}," + " partition:{}, recoverying build data", + print_id(state->query_id()), _parent->node_id(), state->task_id(), partition_index); Status status; while (!eos) { vectorized::Block block; @@ -315,7 +319,11 @@ Status PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(Runtim } if (UNLIKELY(state->is_cancelled())) { - LOG(INFO) << "recovery build block when canceled."; + LOG(INFO) << fmt::format( + "Query:{}, hash join probe:{}, task:{}," + " partition:{}, recovery build data canceled", + print_id(state->query_id()), _parent->node_id(), state->task_id(), + partition_index); break; } @@ -338,9 +346,11 @@ Status PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(Runtim if (eos) { ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(spilled_stream); _shared_state->spilled_streams[partition_index].reset(); - VLOG_DEBUG << "Query: " << print_id(state->query_id()) - << ", node: " << _parent->node_id() << ", task id: " << state->task_id() - << ", partition: " << partition_index; + VLOG_DEBUG << fmt::format( + "Query:{}, hash join probe:{}, task:{}," + " partition:{}, recovery build data eos", + print_id(state->query_id()), _parent->node_id(), state->task_id(), + partition_index); } return status; }; @@ -365,16 +375,6 @@ Status PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(Runtim auto* spill_io_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool(); has_data = true; _spill_dependency->block(); - { - auto* pipeline_task = state->get_task(); - if (pipeline_task) { - auto& p = _parent->cast(); - VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", node: " << p.node_id() - << ", task id: " << state->task_id() << ", partition: " << partition_index - << ", dependency: " << _dependency - << ", task debug_string: " << pipeline_task->debug_string(); - } - } DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::recovery_build_blocks_submit_func", { @@ -386,9 +386,6 @@ Status PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(Runtim auto spill_runnable = std::make_shared( state, _spill_dependency, _runtime_profile.get(), _shared_state->shared_from_this(), exception_catch_func); - VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", node: " << _parent->node_id() - << ", task id: " << state->task_id() << ", partition: " << partition_index - << " recover_build_blocks_from_disk submit func"; return spill_io_pool->submit(std::move(spill_runnable)); } @@ -429,7 +426,7 @@ Status PartitionedHashJoinProbeLocalState::recover_probe_blocks_from_disk(Runtim auto query_id = state->query_id(); - auto read_func = [this, query_id, &spilled_stream, &blocks] { + auto read_func = [this, query_id, partition_index, &spilled_stream, &blocks] { SCOPED_TIMER(_recovery_probe_timer); vectorized::Block block; @@ -457,8 +454,10 @@ Status PartitionedHashJoinProbeLocalState::recover_probe_blocks_from_disk(Runtim } } if (eos) { - VLOG_DEBUG << "Query: " << print_id(query_id) - << ", recovery probe data done: " << spilled_stream->get_spill_dir(); + VLOG_DEBUG << fmt::format( + "Query:{}, hash join probe:{}, task:{}," + " partition:{}, recovery probe data done", + print_id(query_id), _parent->node_id(), _state->task_id(), partition_index); ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(spilled_stream); spilled_stream.reset(); } @@ -675,13 +674,13 @@ Status PartitionedHashJoinProbeOperatorX::_setup_internal_operators( RETURN_IF_ERROR(_inner_sink_operator->sink(local_state._shared_state->inner_runtime_state.get(), &block, true)); - VLOG_DEBUG << "Query: " << print_id(state->query_id()) - << ", internal build operator finished, node id: " << node_id() - << ", task id: " << state->task_id() - << ", partition: " << local_state._partition_cursor << "rows: " << block.rows() - << ", usage: " - << _inner_sink_operator->get_memory_usage( - local_state._shared_state->inner_runtime_state.get()); + VLOG_DEBUG << fmt::format( + "Query:{}, hash join probe:{}, task:{}," + " internal build operator finished, partition:{}, rows:{}, memory usage:{}", + print_id(state->query_id()), node_id(), state->task_id(), local_state._partition_cursor, + block.rows(), + _inner_sink_operator->get_memory_usage( + local_state._shared_state->inner_runtime_state.get())); COUNTER_SET(local_state._hash_table_memory_usage, sink_local_state->profile()->get_counter("MemoryUsageHashTable")->value()); @@ -734,9 +733,10 @@ Status PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state, if (!has_data) { vectorized::Block block; RETURN_IF_ERROR(_inner_probe_operator->push(runtime_state, &block, true)); - VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", node: " << node_id() - << ", task: " << state->task_id() << "partition: " << partition_index - << " has no data to recovery"; + VLOG_DEBUG << fmt::format( + "Query:{}, hash join probe:{}, task:{}," + " partition:{}, has no data to recovery", + print_id(state->query_id()), node_id(), state->task_id(), partition_index); break; } else { return Status::OK(); @@ -755,9 +755,11 @@ Status PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state, *eos = false; if (in_mem_eos) { - VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", node: " << node_id() - << ", task: " << state->task_id() - << ", partition: " << local_state._partition_cursor; + VLOG_DEBUG << fmt::format( + "Query:{}, hash join probe:{}, task:{}," + " partition:{}, probe done", + print_id(state->query_id()), node_id(), state->task_id(), + local_state._partition_cursor); local_state._partition_cursor++; if (local_state._partition_cursor == _partition_count) { *eos = true; @@ -848,8 +850,8 @@ size_t PartitionedHashJoinProbeOperatorX::get_reserve_mem_size(RuntimeState* sta Status PartitionedHashJoinProbeOperatorX::_revoke_memory(RuntimeState* state) { auto& local_state = get_local_state(state); - VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", hash probe node: " << node_id() - << ", task: " << state->task_id(); + VLOG_DEBUG << fmt::format("Query:{}, hash join probe:{}, task:{}, revoke_memory", + print_id(state->query_id()), node_id(), state->task_id()); RETURN_IF_ERROR(local_state.spill_probe_blocks(state)); return Status::OK(); @@ -894,10 +896,10 @@ Status PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori #ifndef NDEBUG Defer eos_check_defer([&] { if (*eos) { - LOG(INFO) << "Query: " << print_id(state->query_id()) - << ", hash probe node: " << node_id() << ", task: " << state->task_id() - << ", eos with child eos: " << local_state._child_eos - << ", need spill: " << need_to_spill; + LOG(INFO) << fmt::format( + "Query:{}, hash join probe:{}, task:{}, child eos:{}, need spill:{}", + print_id(state->query_id()), node_id(), state->task_id(), + local_state._child_eos, need_to_spill); } }); #endif diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp index 2e2c38f04c32ec..a227d87aa1bb94 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp @@ -178,8 +178,10 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block( } if (build_block.rows() <= 1) { - LOG(WARNING) << "has no data to revoke, node: " << _parent->node_id() - << ", task: " << state->task_id(); + LOG(WARNING) << fmt::format( + "Query:{}, hash join sink:{}, task:{}," + " has no data to revoke", + print_id(state->query_id()), _parent->node_id(), state->task_id()); if (spill_context) { spill_context->on_task_finished(); } @@ -270,9 +272,9 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block( }); status = _finish_spilling(); VLOG_DEBUG << fmt::format( - "Query: {}, task {}, hash join sink {} _revoke_unpartitioned_block " + "Query:{}, hash join sink:{}, task:{}, _revoke_unpartitioned_block, " "set_ready_to_read", - print_id(state->query_id()), state->task_id(), _parent->node_id()); + print_id(state->query_id()), _parent->node_id(), state->task_id()); _dependency->set_ready_to_read(); } @@ -303,9 +305,9 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block( Status PartitionedHashJoinSinkLocalState::revoke_memory( RuntimeState* state, const std::shared_ptr& spill_context) { SCOPED_TIMER(_spill_total_timer); - VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", task " << state->task_id() - << " hash join sink " << _parent->node_id() << " revoke_memory" - << ", eos: " << _child_eos; + VLOG_DEBUG << fmt::format("Query:{}, hash join sink:{}, task:{}, revoke_memory, eos:{}", + print_id(state->query_id()), _parent->node_id(), state->task_id(), + _child_eos); CHECK_EQ(_spill_dependency->is_blocked_by(nullptr), nullptr); if (!_shared_state->need_to_spill) { @@ -322,9 +324,9 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory( auto spill_fin_cb = [this, state, query_id, spill_context]() { Status status; if (_child_eos) { - LOG(INFO) << "Query:" << print_id(this->state()->query_id()) << ", task " - << state->task_id() << " hash join sink " << _parent->node_id() - << " finish spilling, set_ready_to_read"; + LOG(INFO) << fmt::format( + "Query:{}, hash join sink:{}, task:{}, finish spilling, set_ready_to_read", + print_id(this->state()->query_id()), _parent->node_id(), state->task_id()); std::for_each(_shared_state->partitioned_build_blocks.begin(), _shared_state->partitioned_build_blocks.end(), [&](auto& block) { if (block) { @@ -565,10 +567,9 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B revocable_size = revocable_mem_size(state); query_mem_limit = state->get_query_ctx()->get_mem_limit(); LOG(INFO) << fmt::format( - "Query: {}, task {}, hash join sink {} eos, need spill: {}, query mem limit: {}, " - "revocable " - "memory: {}", - print_id(state->query_id()), state->task_id(), node_id(), need_to_spill, + "Query:{}, hash join sink:{}, task:{}, eos, need spill:{}, query mem limit:{}, " + "revocable memory:{}", + print_id(state->query_id()), node_id(), state->task_id(), need_to_spill, PrettyPrinter::print_bytes(query_mem_limit), PrettyPrinter::print_bytes(revocable_size)); } @@ -590,9 +591,9 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B if (is_revocable_mem_high_watermark(state, revocable_size, query_mem_limit)) { LOG(INFO) << fmt::format( - "Query: {}, task {}, hash join sink {} eos, revoke_memory " + "Query:{}, hash join sink:{}, task:{} eos, revoke_memory " "because revocable memory is high", - print_id(state->query_id()), state->task_id(), node_id()); + print_id(state->query_id()), node_id(), state->task_id()); return revoke_memory(state, nullptr); } @@ -601,10 +602,9 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B local_state._shared_state->inner_runtime_state.get(), in_block, eos)); LOG(INFO) << fmt::format( - "Query: {}, task {}, hash join sink {} eos, set_ready_to_read, nonspill " - "memory " - "usage: {}", - print_id(state->query_id()), state->task_id(), node_id(), + "Query:{}, hash join sink:{}, task:{}, eos, set_ready_to_read, nonspill " + "memory usage:{}", + print_id(state->query_id()), node_id(), state->task_id(), _inner_sink_operator->get_memory_usage_debug_str( local_state._shared_state->inner_runtime_state.get())); } @@ -642,9 +642,9 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B if (eos) { if (is_revocable_mem_high_watermark(state, revocable_size, query_mem_limit)) { LOG(INFO) << fmt::format( - "Query: {}, task {}, hash join sink {} eos, revoke_memory " + "Query:{}, hash join sink:{}, task:{}, eos, revoke_memory " "because revocable memory is high", - print_id(state->query_id()), state->task_id(), node_id()); + print_id(state->query_id()), node_id(), state->task_id()); return revoke_memory(state, nullptr); } } @@ -653,9 +653,9 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B local_state.update_memory_usage(); if (eos) { LOG(INFO) << fmt::format( - "Query: {}, task {}, hash join sink {} eos, set_ready_to_read, nonspill memory " - "usage: {}", - print_id(state->query_id()), state->task_id(), node_id(), + "Query:{}, hash join sink:{}, task:{}, eos, set_ready_to_read, nonspill memory " + "usage:{}", + print_id(state->query_id()), node_id(), state->task_id(), _inner_sink_operator->get_memory_usage_debug_str( local_state._shared_state->inner_runtime_state.get())); local_state._dependency->set_ready_to_read(); diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp b/be/src/pipeline/exec/spill_sort_sink_operator.cpp index 03c4072f7de9c4..debe1d59710aa9 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp @@ -195,9 +195,9 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state, profile()->add_info_string("Spilled", "true"); } - VLOG_DEBUG << "Query " << print_id(state->query_id()) << " sort node " - << Base::_parent->node_id() << " revoke_memory" - << ", eos: " << _eos; + VLOG_DEBUG << fmt::format("Query:{}, sort sink:{}, task:{}, revoke_memory, eos:{}", + print_id(state->query_id()), _parent->node_id(), state->task_id(), + _eos); auto status = ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream( state, _spilling_stream, print_id(state->query_id()), "sort", _parent->node_id(), @@ -219,13 +219,14 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state, Defer defer {[&]() { if (!status.ok() || state->is_cancelled()) { if (!status.ok()) { - LOG(WARNING) << "Query " << print_id(query_id) << " sort node " - << _parent->node_id() << " revoke memory error: " << status; + LOG(WARNING) << fmt::format( + "Query:{}, sort sink:{}, task:{}, revoke memory error:{}", + print_id(query_id), _parent->node_id(), state->task_id(), status); } _shared_state->close(); } else { - VLOG_DEBUG << "Query " << print_id(query_id) << " sort node " << _parent->node_id() - << " revoke memory finish"; + VLOG_DEBUG << fmt::format("Query:{}, sort sink:{}, task:{}, revoke memory finish", + print_id(query_id), _parent->node_id(), state->task_id()); } if (!status.ok()) { diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp b/be/src/pipeline/exec/spill_sort_source_operator.cpp index 8a58d0b15040a2..43bb8a65b6e605 100644 --- a/be/src/pipeline/exec/spill_sort_source_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp @@ -70,8 +70,8 @@ int SpillSortLocalState::_calc_spill_blocks_to_merge(RuntimeState* state) const } Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* state) { auto& parent = Base::_parent->template cast(); - VLOG_DEBUG << "Query " << print_id(state->query_id()) << " sort node " << _parent->node_id() - << " merge spill data"; + VLOG_DEBUG << fmt::format("Query:{}, sort source:{}, task:{}, merge spill data", + print_id(state->query_id()), _parent->node_id(), state->task_id()); _spill_dependency->Dependency::block(); auto query_id = state->query_id(); @@ -82,8 +82,9 @@ Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat Defer defer {[&]() { if (!status.ok() || state->is_cancelled()) { if (!status.ok()) { - LOG(WARNING) << "Query " << print_id(query_id) << " sort node " - << _parent->node_id() << " merge spill data error: " << status; + LOG(WARNING) << fmt::format( + "Query:{}, sort source:{}, task:{}, merge spill data error:{}", + print_id(query_id), _parent->node_id(), state->task_id(), status); } _shared_state->close(); for (auto& stream : _current_merging_streams) { @@ -91,18 +92,20 @@ Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat } _current_merging_streams.clear(); } else { - VLOG_DEBUG << "Query " << print_id(query_id) << " sort node " << _parent->node_id() - << " merge spill data finish"; + VLOG_DEBUG << fmt::format( + "Query:{}, sort source:{}, task:{}, merge spill data finish", + print_id(query_id), _parent->node_id(), state->task_id()); } }}; vectorized::Block merge_sorted_block; vectorized::SpillStreamSPtr tmp_stream; while (!state->is_cancelled()) { int max_stream_count = _calc_spill_blocks_to_merge(state); - VLOG_DEBUG << "Query " << print_id(query_id) << " sort node " << _parent->node_id() - << " merge spill streams, streams count: " - << _shared_state->sorted_streams.size() - << ", curren merge max stream count: " << max_stream_count; + VLOG_DEBUG << fmt::format( + "Query:{}, sort source:{}, task:{}, merge spill streams, streams count:{}, " + "curren merge max stream count:{}", + print_id(query_id), _parent->node_id(), state->task_id(), + _shared_state->sorted_streams.size(), max_stream_count); { SCOPED_TIMER(Base::_spill_recover_time); status = _create_intermediate_merger( diff --git a/be/src/vec/spill/spill_reader.cpp b/be/src/vec/spill/spill_reader.cpp index 3086b1c77d861a..795db9ea85c3b0 100644 --- a/be/src/vec/spill/spill_reader.cpp +++ b/be/src/vec/spill/spill_reader.cpp @@ -20,7 +20,6 @@ #include #include -#include #include "common/cast_set.h" #include "common/exception.h" diff --git a/be/src/vec/spill/spill_stream_manager.cpp b/be/src/vec/spill/spill_stream_manager.cpp index d6cd6fa430656f..c6f3fd24224e86 100644 --- a/be/src/vec/spill/spill_stream_manager.cpp +++ b/be/src/vec/spill/spill_stream_manager.cpp @@ -112,12 +112,6 @@ void SpillStreamManager::_init_metrics() { _spill_read_bytes_counter = (IntAtomicCounter*)(_entity->register_metric( _spill_read_bytes_metric.get())); - _spill_running_task_count_metric = std::make_unique( - doris::MetricType::COUNTER, doris::MetricUnit::NOUNIT, "spill_running_task_count"); - _spill_running_task_count_counter = - (IntAtomicCounter*)(_entity->register_metric( - _spill_running_task_count_metric.get())); - INT_UGAUGE_METRIC_REGISTER(_entity, spill_io_thread_pool_max_threads); INT_UGAUGE_METRIC_REGISTER(_entity, spill_io_thread_pool_active_threads); INT_UGAUGE_METRIC_REGISTER(_entity, spill_io_thread_pool_pool_max_queue_size); diff --git a/be/src/vec/spill/spill_stream_manager.h b/be/src/vec/spill/spill_stream_manager.h index 8005e8de36bce3..2515ee67c2161b 100644 --- a/be/src/vec/spill/spill_stream_manager.h +++ b/be/src/vec/spill/spill_stream_manager.h @@ -18,7 +18,6 @@ #pragma once #include -#include #include #include #include @@ -170,11 +169,9 @@ class SpillStreamManager { std::unique_ptr _spill_write_bytes_metric {nullptr}; std::unique_ptr _spill_read_bytes_metric {nullptr}; - std::unique_ptr _spill_running_task_count_metric {nullptr}; IntAtomicCounter* _spill_write_bytes_counter {nullptr}; IntAtomicCounter* _spill_read_bytes_counter {nullptr}; - IntAtomicCounter* _spill_running_task_count_counter {nullptr}; }; } // namespace vectorized } // namespace doris