diff --git a/be/src/pipeline/dependency.cpp b/be/src/pipeline/dependency.cpp index 560efec94e1d18..93117fa71a0513 100644 --- a/be/src/pipeline/dependency.cpp +++ b/be/src/pipeline/dependency.cpp @@ -199,7 +199,7 @@ void LocalExchangeSharedState::sub_running_source_operators( LocalExchangeSharedState::LocalExchangeSharedState(int num_instances) { source_deps.resize(num_instances, nullptr); - mem_trackers.resize(num_instances, nullptr); + mem_counters.resize(num_instances, nullptr); } vectorized::MutableColumns AggSharedState::_get_keys_hash_table() { diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h index 619dd2d2aa3c4d..01344e6fe7a193 100644 --- a/be/src/pipeline/dependency.h +++ b/be/src/pipeline/dependency.h @@ -324,11 +324,6 @@ struct AggSharedState : public BasicSharedState { vectorized::Sizes offsets_of_aggregate_states; std::vector make_nullable_keys; - struct MemoryRecord { - int64_t used_in_arena {}; - int64_t used_in_state {}; - }; - MemoryRecord mem_usage_record; bool agg_data_created_without_key = false; bool enable_spill = false; bool reach_limit = false; @@ -825,7 +820,7 @@ struct LocalExchangeSharedState : public BasicSharedState { LocalExchangeSharedState(int num_instances); ~LocalExchangeSharedState() override; std::unique_ptr exchanger {}; - std::vector mem_trackers; + std::vector mem_counters; std::atomic mem_usage = 0; // We need to make sure to add mem_usage first and then enqueue, otherwise sub mem_usage may cause negative mem_usage during concurrent dequeue. std::mutex le_lock; @@ -861,13 +856,15 @@ struct LocalExchangeSharedState : public BasicSharedState { } void add_mem_usage(int channel_id, size_t delta, bool update_total_mem_usage = true) { - mem_trackers[channel_id]->consume(delta); + mem_counters[channel_id]->update(delta); if (update_total_mem_usage) { add_total_mem_usage(delta, channel_id); } } - void sub_mem_usage(int channel_id, size_t delta) { mem_trackers[channel_id]->release(delta); } + void sub_mem_usage(int channel_id, size_t delta) { + mem_counters[channel_id]->update(-(int64_t)delta); + } virtual void add_total_mem_usage(size_t delta, int channel_id) { if (mem_usage.fetch_add(delta) + delta > config::local_exchange_buffer_mem_limit) { diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp b/be/src/pipeline/exec/aggregation_sink_operator.cpp index 627dbf4ef410be..f5caaf30bfcdad 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp @@ -57,10 +57,10 @@ Status AggSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { _agg_data = Base::_shared_state->agg_data.get(); _agg_arena_pool = Base::_shared_state->agg_arena_pool.get(); _hash_table_size_counter = ADD_COUNTER(profile(), "HashTableSize", TUnit::UNIT); - _hash_table_memory_usage = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(), "HashTable", - TUnit::BYTES, "MemoryUsage", 1); - _serialize_key_arena_memory_usage = Base::profile()->AddHighWaterMarkCounter( - "SerializeKeyArena", TUnit::BYTES, "MemoryUsage", 1); + _hash_table_memory_usage = + ADD_COUNTER_WITH_LEVEL(Base::profile(), "MemoryUsageHashTable", TUnit::BYTES, 1); + _serialize_key_arena_memory_usage = ADD_COUNTER_WITH_LEVEL( + Base::profile(), "MemoryUsageSerializeKeyArena", TUnit::BYTES, 1); _build_timer = ADD_TIMER(Base::profile(), "BuildTime"); _merge_timer = ADD_TIMER(Base::profile(), "MergeTime"); @@ -223,24 +223,17 @@ void AggSinkLocalState::_update_memusage_with_serialized_key() { }, [&](auto& agg_method) -> void { auto& data = *agg_method.hash_table; - auto arena_memory_usage = + int64_t arena_memory_usage = _agg_arena_pool->size() + - Base::_shared_state->aggregate_data_container->memory_usage() - - Base::_shared_state->mem_usage_record.used_in_arena; - Base::_mem_tracker->consume(arena_memory_usage); - Base::_mem_tracker->consume( - data.get_buffer_size_in_bytes() - - Base::_shared_state->mem_usage_record.used_in_state); - _serialize_key_arena_memory_usage->add(arena_memory_usage); - COUNTER_UPDATE( - _hash_table_memory_usage, - data.get_buffer_size_in_bytes() - - Base::_shared_state->mem_usage_record.used_in_state); - Base::_shared_state->mem_usage_record.used_in_state = - data.get_buffer_size_in_bytes(); - Base::_shared_state->mem_usage_record.used_in_arena = - _agg_arena_pool->size() + - Base::_shared_state->aggregate_data_container->memory_usage(); + _shared_state->aggregate_data_container->memory_usage(); + int64_t hash_table_memory_usage = data.get_buffer_size_in_bytes(); + + COUNTER_SET(_memory_used_counter, + arena_memory_usage + hash_table_memory_usage); + COUNTER_SET(_peak_memory_usage_counter, _memory_used_counter->value()); + + COUNTER_SET(_serialize_key_arena_memory_usage, arena_memory_usage); + COUNTER_SET(_hash_table_memory_usage, hash_table_memory_usage); }}, _agg_data->method_variant); } @@ -419,11 +412,10 @@ Status AggSinkLocalState::_merge_without_key(vectorized::Block* block) { } void AggSinkLocalState::_update_memusage_without_key() { - auto arena_memory_usage = - _agg_arena_pool->size() - Base::_shared_state->mem_usage_record.used_in_arena; - Base::_mem_tracker->consume(arena_memory_usage); - _serialize_key_arena_memory_usage->add(arena_memory_usage); - Base::_shared_state->mem_usage_record.used_in_arena = _agg_arena_pool->size(); + int64_t arena_memory_usage = _agg_arena_pool->size(); + COUNTER_SET(_memory_used_counter, arena_memory_usage); + COUNTER_SET(_peak_memory_usage_counter, arena_memory_usage); + COUNTER_SET(_serialize_key_arena_memory_usage, arena_memory_usage); } Status AggSinkLocalState::_execute_with_serialized_key(vectorized::Block* block) { @@ -875,8 +867,6 @@ Status AggSinkLocalState::close(RuntimeState* state, Status exec_status) { std::vector tmp_deserialize_buffer; _deserialize_buffer.swap(tmp_deserialize_buffer); - Base::_mem_tracker->release(Base::_shared_state->mem_usage_record.used_in_state + - Base::_shared_state->mem_usage_record.used_in_arena); return Base::close(state, exec_status); } diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h b/be/src/pipeline/exec/aggregation_sink_operator.h index 975b04477f203f..e5209425f768be 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.h +++ b/be/src/pipeline/exec/aggregation_sink_operator.h @@ -106,7 +106,7 @@ class AggSinkLocalState : public PipelineXSinkLocalState { RuntimeProfile::Counter* _deserialize_data_timer = nullptr; RuntimeProfile::Counter* _hash_table_memory_usage = nullptr; RuntimeProfile::Counter* _hash_table_size_counter = nullptr; - RuntimeProfile::HighWaterMarkCounter* _serialize_key_arena_memory_usage = nullptr; + RuntimeProfile::Counter* _serialize_key_arena_memory_usage = nullptr; bool _should_limit_output = false; diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp b/be/src/pipeline/exec/aggregation_source_operator.cpp index a406bdc329ef50..fbc71ac06ad177 100644 --- a/be/src/pipeline/exec/aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/aggregation_source_operator.cpp @@ -450,8 +450,6 @@ void AggLocalState::do_agg_limit(vectorized::Block* block, bool* eos) { vectorized::Block::filter_block_internal(block, _shared_state->need_computes); if (auto rows = block->rows()) { _num_rows_returned += rows; - COUNTER_UPDATE(_blocks_returned_counter, 1); - COUNTER_SET(_rows_returned_counter, _num_rows_returned); } } else { reached_limit(block, eos); @@ -459,8 +457,6 @@ void AggLocalState::do_agg_limit(vectorized::Block* block, bool* eos) { } else { if (auto rows = block->rows()) { _num_rows_returned += rows; - COUNTER_UPDATE(_blocks_returned_counter, 1); - COUNTER_SET(_rows_returned_counter, _num_rows_returned); } } } diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp b/be/src/pipeline/exec/analytic_sink_operator.cpp index 839a485f2d98c7..7c59c9543687d5 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.cpp +++ b/be/src/pipeline/exec/analytic_sink_operator.cpp @@ -33,6 +33,7 @@ Status AnalyticSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf _compute_agg_data_timer = ADD_TIMER(profile(), "ComputeAggDataTime"); _compute_partition_by_timer = ADD_TIMER(profile(), "ComputePartitionByTime"); _compute_order_by_timer = ADD_TIMER(profile(), "ComputeOrderByTime"); + _blocks_memory_usage = ADD_COUNTER_WITH_LEVEL(_profile, "MemoryUsageBlocks", TUnit::BYTES, 1); return Status::OK(); } @@ -322,8 +323,10 @@ Status AnalyticSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block } } - COUNTER_UPDATE(local_state._memory_used_counter, input_block->allocated_bytes()); + int64_t block_mem_usage = input_block->allocated_bytes(); + COUNTER_UPDATE(local_state._memory_used_counter, block_mem_usage); COUNTER_SET(local_state._peak_memory_usage_counter, local_state._memory_used_counter->value()); + COUNTER_UPDATE(local_state._blocks_memory_usage, block_mem_usage); //TODO: if need improvement, the is a tips to maintain a free queue, //so the memory could reuse, no need to new/delete again; diff --git a/be/src/pipeline/exec/analytic_sink_operator.h b/be/src/pipeline/exec/analytic_sink_operator.h index 084998d2c36cdc..5b08f0a6e3d252 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.h +++ b/be/src/pipeline/exec/analytic_sink_operator.h @@ -61,6 +61,7 @@ class AnalyticSinkLocalState : public PipelineXSinkLocalState _agg_expr_ctxs; }; diff --git a/be/src/pipeline/exec/analytic_source_operator.cpp b/be/src/pipeline/exec/analytic_source_operator.cpp index 06a6374bbae468..205e6e7d3e2c0a 100644 --- a/be/src/pipeline/exec/analytic_source_operator.cpp +++ b/be/src/pipeline/exec/analytic_source_operator.cpp @@ -444,7 +444,6 @@ bool AnalyticLocalState::init_next_partition(BlockRowPos found_partition_end) { Status AnalyticLocalState::output_current_block(vectorized::Block* block) { block->swap(std::move(_shared_state->input_blocks[_output_block_index])); _blocks_memory_usage->add(-block->allocated_bytes()); - mem_tracker()->consume(-block->allocated_bytes()); if (_shared_state->origin_cols.size() < block->columns()) { block->erase_not_in(_shared_state->origin_cols); } diff --git a/be/src/pipeline/exec/assert_num_rows_operator.cpp b/be/src/pipeline/exec/assert_num_rows_operator.cpp index 563c4bf49ca41c..f4ad31a6f90c0d 100644 --- a/be/src/pipeline/exec/assert_num_rows_operator.cpp +++ b/be/src/pipeline/exec/assert_num_rows_operator.cpp @@ -115,8 +115,6 @@ Status AssertNumRowsOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc return Status::Cancelled("Expected {} {} to be returned by expression {}", to_string_lambda(_assertion), _desired_num_rows, _subquery_string); } - COUNTER_SET(local_state.rows_returned_counter(), local_state.num_rows_returned()); - COUNTER_UPDATE(local_state.blocks_returned_counter(), 1); RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, block, block->columns())); return Status::OK(); diff --git a/be/src/pipeline/exec/cache_source_operator.cpp b/be/src/pipeline/exec/cache_source_operator.cpp index e98a18b76a3a98..1387de351d11ef 100644 --- a/be/src/pipeline/exec/cache_source_operator.cpp +++ b/be/src/pipeline/exec/cache_source_operator.cpp @@ -159,7 +159,6 @@ Status CacheSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* b local_state._current_query_cache_rows += output_block->rows(); auto mem_consume = output_block->allocated_bytes(); local_state._current_query_cache_bytes += mem_consume; - local_state._mem_tracker->consume(mem_consume); if (_cache_param.entry_max_bytes < local_state._current_query_cache_bytes || _cache_param.entry_max_rows < local_state._current_query_cache_rows) { diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp index 75b26c3ed18b76..2f4a046003f24d 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp @@ -461,7 +461,6 @@ Status DistinctStreamingAggOperatorX::pull(RuntimeState* state, vectorized::Bloc block->columns())); } local_state.add_num_rows_returned(block->rows()); - COUNTER_UPDATE(local_state.blocks_returned_counter(), 1); // If the limit is not reached, it is important to ensure that _aggregated_block is empty // because it may still contain data. // However, if the limit is reached, there is no need to output data even if some exists. diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index 1753991fe52386..45aeda8d9d3d1a 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -156,6 +156,9 @@ Status ExchangeSinkBuffer::add_block(TransmitInfo&& request) { if (request.block) { RETURN_IF_ERROR( BeExecVersionManager::check_be_exec_version(request.block->be_exec_version())); + COUNTER_UPDATE(_parent->memory_used_counter(), request.block->ByteSizeLong()); + COUNTER_SET(_parent->peak_memory_usage_counter(), + _parent->memory_used_counter()->value()); } _instance_to_package_queue[ins_id].emplace(std::move(request)); _total_queue_size++; @@ -291,6 +294,7 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { } if (request.block) { static_cast(brpc_request->release_block()); + COUNTER_UPDATE(_parent->memory_used_counter(), -request.block->ByteSizeLong()); } q.pop(); _total_queue_size--; @@ -416,12 +420,24 @@ void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId id) { _turn_off_channel(id, lock); std::queue>& broadcast_q = _instance_to_broadcast_package_queue[id]; + for (; !broadcast_q.empty(); broadcast_q.pop()) { + if (broadcast_q.front().block_holder->get_block()) { + COUNTER_UPDATE(_parent->memory_used_counter(), + -broadcast_q.front().block_holder->get_block()->ByteSizeLong()); + } + } { std::queue> empty; swap(empty, broadcast_q); } std::queue>& q = _instance_to_package_queue[id]; + for (; !q.empty(); q.pop()) { + if (q.front().block) { + COUNTER_UPDATE(_parent->memory_used_counter(), -q.front().block->ByteSizeLong()); + } + } + { std::queue> empty; swap(empty, q); diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index 85c87df8f4dbc5..1561c1052983c6 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -91,7 +91,6 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf channels.emplace_back(channels[fragment_id_to_channel_index[fragment_instance_id.lo]]); } } - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); // Make sure brpc stub is ready before execution. for (int i = 0; i < channels.size(); ++i) { @@ -126,7 +125,6 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) { SCOPED_TIMER(_open_timer); RETURN_IF_ERROR(Base::open(state)); auto& p = _parent->cast(); - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); if (_part_type == TPartitionType::UNPARTITIONED || _part_type == TPartitionType::RANDOM || _part_type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED) { @@ -348,7 +346,6 @@ Status ExchangeSinkOperatorX::init(const TDataSink& tsink) { Status ExchangeSinkOperatorX::open(RuntimeState* state) { RETURN_IF_ERROR(DataSinkOperatorX::open(state)); _state = state; - _mem_tracker = std::make_unique("ExchangeSinkOperatorX:"); _compression_type = state->fragement_transmission_compression_type(); if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) { if (_output_tuple_id == -1) { @@ -377,7 +374,6 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block auto& local_state = get_local_state(state); COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)block->rows()); SCOPED_TIMER(local_state.exec_time_counter()); - local_state._peak_memory_usage_counter->set(local_state._mem_tracker->peak_consumption()); bool all_receiver_eof = true; for (auto& channel : local_state.channels) { if (!channel->is_receiver_eof()) { @@ -389,6 +385,10 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block return Status::EndOfFile("all data stream channels EOF"); } + Defer defer([&]() { + COUNTER_SET(local_state._peak_memory_usage_counter, + local_state._memory_used_counter->value()); + }); if (_part_type == TPartitionType::UNPARTITIONED || local_state.channels.size() == 1) { // 1. serialize depends on it is not local exchange // 2. send block @@ -412,7 +412,6 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block } else { auto block_holder = vectorized::BroadcastPBlockHolder::create_shared(); { - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); bool serialized = false; RETURN_IF_ERROR(local_state._serializer.next_serialized_block( block, block_holder->get_block(), local_state._rpc_channels_num, @@ -444,7 +443,6 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block idx == local_state._last_local_channel_idx); moved = idx == local_state._last_local_channel_idx; } else { - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); status = channel->send_broadcast_block(block_holder, eos); } HANDLE_CHANNEL_STATUS(state, channel, status); @@ -470,7 +468,6 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block auto status = current_channel->send_local_block(block, eos, true); HANDLE_CHANNEL_STATUS(state, current_channel, status); } else { - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); auto pblock = std::make_unique(); RETURN_IF_ERROR(local_state._serializer.serialize_block(block, pblock.get())); auto status = current_channel->send_remote_block(std::move(pblock), eos); @@ -484,8 +481,11 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block auto rows = block->rows(); { SCOPED_TIMER(local_state._split_block_hash_compute_timer); - RETURN_IF_ERROR( - local_state._partitioner->do_partitioning(state, block, _mem_tracker.get())); + RETURN_IF_ERROR(local_state._partitioner->do_partitioning(state, block)); + } + int64_t old_channel_mem_usage = 0; + for (const auto& channel : local_state.channels) { + old_channel_mem_usage += channel->mem_usage(); } if (_part_type == TPartitionType::HASH_PARTITIONED) { SCOPED_TIMER(local_state._distribute_rows_into_channels_timer); @@ -498,6 +498,14 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block state, local_state.channels, local_state._partition_count, local_state._partitioner->get_channel_ids().get(), rows, block, eos)); } + int64_t new_channel_mem_usage = 0; + for (const auto& channel : local_state.channels) { + new_channel_mem_usage += channel->mem_usage(); + } + COUNTER_UPDATE(local_state.memory_used_counter(), + new_channel_mem_usage - old_channel_mem_usage); + COUNTER_SET(local_state.peak_memory_usage_counter(), + local_state.memory_used_counter()->value()); } else if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) { int64_t old_channel_mem_usage = 0; for (const auto& channel : local_state.channels) { @@ -549,10 +557,13 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block COUNTER_SET(local_state.peak_memory_usage_counter(), local_state.memory_used_counter()->value()); } else if (_part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) { + int64_t old_channel_mem_usage = 0; + for (const auto& channel : local_state.channels) { + old_channel_mem_usage += channel->mem_usage(); + } { SCOPED_TIMER(local_state._split_block_hash_compute_timer); - RETURN_IF_ERROR( - local_state._partitioner->do_partitioning(state, block, _mem_tracker.get())); + RETURN_IF_ERROR(local_state._partitioner->do_partitioning(state, block)); } std::vector> assignments = local_state.scale_writer_partitioning_exchanger->accept(block); @@ -563,6 +574,14 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block block, eos)); } + int64_t new_channel_mem_usage = 0; + for (const auto& channel : local_state.channels) { + new_channel_mem_usage += channel->mem_usage(); + } + COUNTER_UPDATE(local_state.memory_used_counter(), + new_channel_mem_usage - old_channel_mem_usage); + COUNTER_SET(local_state.peak_memory_usage_counter(), + local_state.memory_used_counter()->value()); } else if (_part_type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED) { // Control the number of channels according to the flow, thereby controlling the number of table sink writers. // 1. select channel @@ -573,7 +592,6 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block auto status = current_channel->send_local_block(block, eos, true); HANDLE_CHANNEL_STATUS(state, current_channel, status); } else { - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); auto pblock = std::make_unique(); RETURN_IF_ERROR(local_state._serializer.serialize_block(block, pblock.get())); auto status = current_channel->send_remote_block(std::move(pblock), eos); diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index bee34ad1a854ed..e85deb1a679478 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -240,7 +240,6 @@ class ExchangeSinkOperatorX final : public DataSinkOperatorX _dests; - std::unique_ptr _mem_tracker; // Identifier of the destination plan node. const PlanNodeId _dest_node_id; diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp b/be/src/pipeline/exec/exchange_source_operator.cpp index deafb361c5702a..572e848d569dcb 100644 --- a/be/src/pipeline/exec/exchange_source_operator.cpp +++ b/be/src/pipeline/exec/exchange_source_operator.cpp @@ -63,8 +63,8 @@ Status ExchangeLocalState::init(RuntimeState* state, LocalStateInfo& info) { SCOPED_TIMER(_init_timer); auto& p = _parent->cast(); stream_recvr = state->exec_env()->vstream_mgr()->create_recvr( - state, p.input_row_desc(), state->fragment_instance_id(), p.node_id(), p.num_senders(), - profile(), p.is_merging()); + state, this, p.input_row_desc(), state->fragment_instance_id(), p.node_id(), + p.num_senders(), profile(), p.is_merging()); const auto& queues = stream_recvr->sender_queues(); deps.resize(queues.size()); metrics.resize(queues.size()); @@ -186,8 +186,6 @@ Status ExchangeSourceOperatorX::get_block(RuntimeState* state, vectorized::Block block->set_num_rows(limit); local_state.set_num_rows_returned(_limit); } - COUNTER_SET(local_state.rows_returned_counter(), local_state.num_rows_returned()); - COUNTER_UPDATE(local_state.blocks_returned_counter(), 1); } return Status::OK(); } diff --git a/be/src/pipeline/exec/group_commit_block_sink_operator.cpp b/be/src/pipeline/exec/group_commit_block_sink_operator.cpp index 8da335f4fa2c0e..9b0598318428b7 100644 --- a/be/src/pipeline/exec/group_commit_block_sink_operator.cpp +++ b/be/src/pipeline/exec/group_commit_block_sink_operator.cpp @@ -46,8 +46,6 @@ Status GroupCommitBlockSinkLocalState::open(RuntimeState* state) { _vpartition = std::make_unique(p._schema, p._partition); RETURN_IF_ERROR(_vpartition->init()); _state = state; - // profile must add to state's object pool - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); _block_convertor = std::make_unique(p._output_tuple_desc); _block_convertor->init_autoinc_info(p._schema->db_id(), p._schema->table_id(), @@ -288,7 +286,6 @@ Status GroupCommitBlockSinkOperatorX::sink(RuntimeState* state, vectorized::Bloc auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)input_block->rows()); - SCOPED_CONSUME_MEM_TRACKER(local_state._mem_tracker.get()); if (!local_state._load_block_queue) { RETURN_IF_ERROR(local_state._initialize_load_queue()); } diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index 991127d83fe26f..5fadfbc1a650af 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -74,11 +74,11 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo _runtime_filter_init_timer = ADD_TIMER(profile(), "RuntimeFilterInitTime"); _build_blocks_memory_usage = - ADD_CHILD_COUNTER_WITH_LEVEL(profile(), "BuildBlocks", TUnit::BYTES, "MemoryUsage", 1); + ADD_COUNTER_WITH_LEVEL(profile(), "MemoryUsageBuildBlocks", TUnit::BYTES, 1); _hash_table_memory_usage = - ADD_CHILD_COUNTER_WITH_LEVEL(profile(), "HashTable", TUnit::BYTES, "MemoryUsage", 1); + ADD_COUNTER_WITH_LEVEL(profile(), "MemoryUsageHashTable", TUnit::BYTES, 1); _build_arena_memory_usage = - profile()->AddHighWaterMarkCounter("BuildKeyArena", TUnit::BYTES, "MemoryUsage", 1); + ADD_COUNTER_WITH_LEVEL(profile(), "MemoryUsageBuildKeyArena", TUnit::BYTES, 1); // Build phase auto* record_profile = _should_build_hash_table ? profile() : faker_runtime_profile(); @@ -289,41 +289,41 @@ Status HashJoinBuildSinkLocalState::process_build_block(RuntimeState* state, // Get the key column that needs to be built Status st = _extract_join_column(block, null_map_val, raw_ptrs, _build_col_ids); - st = std::visit( - vectorized::Overload { - [&](std::monostate& arg, auto join_op, auto has_null_value, - auto short_circuit_for_null_in_build_side, - auto with_other_conjuncts) -> Status { - LOG(FATAL) << "FATAL: uninited hash table"; - __builtin_unreachable(); - return Status::OK(); - }, - [&](auto&& arg, auto&& join_op, auto has_null_value, - auto short_circuit_for_null_in_build_side, - auto with_other_conjuncts) -> Status { - using HashTableCtxType = std::decay_t; - using JoinOpType = std::decay_t; - ProcessHashTableBuild hash_table_build_process( - rows, raw_ptrs, this, state->batch_size(), state); - auto old_hash_table_size = arg.hash_table->get_byte_size(); - auto old_key_size = arg.serialized_keys_size(true); - auto st = hash_table_build_process.template run< - JoinOpType::value, has_null_value, - short_circuit_for_null_in_build_side, with_other_conjuncts>( - arg, - has_null_value || short_circuit_for_null_in_build_side - ? &null_map_val->get_data() - : nullptr, - &_shared_state->_has_null_in_build_side); - _mem_tracker->consume(arg.hash_table->get_byte_size() - - old_hash_table_size); - _mem_tracker->consume(arg.serialized_keys_size(true) - old_key_size); - return st; - }}, - *_shared_state->hash_table_variants, _shared_state->join_op_variants, - vectorized::make_bool_variant(_build_side_ignore_null), - vectorized::make_bool_variant(p._short_circuit_for_null_in_build_side), - vectorized::make_bool_variant((p._have_other_join_conjunct))); + st = std::visit(vectorized::Overload { + [&](std::monostate& arg, auto join_op, auto has_null_value, + auto short_circuit_for_null_in_build_side, + auto with_other_conjuncts) -> Status { + LOG(FATAL) << "FATAL: uninited hash table"; + __builtin_unreachable(); + return Status::OK(); + }, + [&](auto&& arg, auto&& join_op, auto has_null_value, + auto short_circuit_for_null_in_build_side, + auto with_other_conjuncts) -> Status { + using HashTableCtxType = std::decay_t; + using JoinOpType = std::decay_t; + ProcessHashTableBuild hash_table_build_process( + rows, raw_ptrs, this, state->batch_size(), state); + auto st = hash_table_build_process.template run< + JoinOpType::value, has_null_value, + short_circuit_for_null_in_build_side, with_other_conjuncts>( + arg, + has_null_value || short_circuit_for_null_in_build_side + ? &null_map_val->get_data() + : nullptr, + &_shared_state->_has_null_in_build_side); + COUNTER_SET(_memory_used_counter, + _build_blocks_memory_usage->value() + + (int64_t)(arg.hash_table->get_byte_size() + + arg.serialized_keys_size(true))); + COUNTER_SET(_peak_memory_usage_counter, + _memory_used_counter->value()); + return st; + }}, + *_shared_state->hash_table_variants, _shared_state->join_op_variants, + vectorized::make_bool_variant(_build_side_ignore_null), + vectorized::make_bool_variant(p._short_circuit_for_null_in_build_side), + vectorized::make_bool_variant((p._have_other_join_conjunct))); return st; } @@ -520,7 +520,6 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* if (local_state._should_build_hash_table) { // If eos or have already met a null value using short-circuit strategy, we do not need to pull // data from probe side. - local_state._build_side_mem_used += in_block->allocated_bytes(); if (local_state._build_side_mutable_block.empty()) { auto tmp_build_block = vectorized::VectorizedUtils::create_empty_columnswithtypename( @@ -547,12 +546,13 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* std::to_string(std::numeric_limits::max())); } - local_state._mem_tracker->consume(in_block->bytes()); - COUNTER_UPDATE(local_state._build_blocks_memory_usage, in_block->bytes()); - SCOPED_TIMER(local_state._build_side_merge_block_timer); RETURN_IF_ERROR(local_state._build_side_mutable_block.merge_ignore_overflow( std::move(*in_block))); + int64_t blocks_mem_usage = local_state._build_side_mutable_block.allocated_bytes(); + COUNTER_SET(local_state._memory_used_counter, blocks_mem_usage); + COUNTER_SET(local_state._peak_memory_usage_counter, blocks_mem_usage); + COUNTER_SET(local_state._build_blocks_memory_usage, blocks_mem_usage); } } diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h b/be/src/pipeline/exec/hashjoin_build_sink.h index 4833bee5488088..f8634ac4c49816 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.h +++ b/be/src/pipeline/exec/hashjoin_build_sink.h @@ -74,8 +74,6 @@ class HashJoinBuildSinkLocalState final std::vector _key_columns_holder; bool _should_build_hash_table = true; - int64_t _build_side_mem_used = 0; - int64_t _build_side_last_mem_used = 0; size_t _build_side_rows = 0; diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp b/be/src/pipeline/exec/hashjoin_probe_operator.cpp index 756a151394b41e..0d5dcfad664f04 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp +++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp @@ -54,7 +54,7 @@ Status HashJoinProbeLocalState::init(RuntimeState* state, LocalStateInfo& info) _construct_mutable_join_block(); _probe_column_disguise_null.reserve(_probe_expr_ctxs.size()); _probe_arena_memory_usage = - profile()->AddHighWaterMarkCounter("ProbeKeyArena", TUnit::BYTES, "MemoryUsage", 1); + profile()->AddHighWaterMarkCounter("MemoryUsageProbeKeyArena", TUnit::BYTES, "", 1); // Probe phase _probe_expr_call_timer = ADD_TIMER(profile(), "ProbeExprCallTime"); _search_hashtable_timer = ADD_TIMER(profile(), "ProbeWhenSearchHashTableTime"); @@ -301,8 +301,6 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc mutable_join_block, &temp_block, local_state._probe_block.rows(), _is_mark_join, _have_other_join_conjunct); - local_state._mem_tracker->set_consumption( - arg.serialized_keys_size(false)); } else { st = Status::InternalError("uninited hash table"); } @@ -498,6 +496,10 @@ Status HashJoinProbeOperatorX::push(RuntimeState* state, vectorized::Block* inpu if (&local_state._probe_block != input_block) { input_block->swap(local_state._probe_block); + COUNTER_SET(local_state._memory_used_counter, + (int64_t)local_state._probe_block.allocated_bytes()); + COUNTER_SET(local_state._peak_memory_usage_counter, + local_state._memory_used_counter->value()); } } return Status::OK(); diff --git a/be/src/pipeline/exec/join/process_hash_table_probe.h b/be/src/pipeline/exec/join/process_hash_table_probe.h index 2ccc9aec8c7e01..7a5c34fb845937 100644 --- a/be/src/pipeline/exec/join/process_hash_table_probe.h +++ b/be/src/pipeline/exec/join/process_hash_table_probe.h @@ -131,7 +131,6 @@ struct ProcessHashTableProbe { bool _need_calculate_build_index_has_zero = true; bool* _has_null_in_build_side; - RuntimeProfile::Counter* _rows_returned_counter = nullptr; RuntimeProfile::Counter* _search_hashtable_timer = nullptr; RuntimeProfile::Counter* _init_probe_side_timer = nullptr; RuntimeProfile::Counter* _build_side_output_timer = nullptr; diff --git a/be/src/pipeline/exec/join/process_hash_table_probe_impl.h b/be/src/pipeline/exec/join/process_hash_table_probe_impl.h index 5de033b63e8aad..8123badb72641f 100644 --- a/be/src/pipeline/exec/join/process_hash_table_probe_impl.h +++ b/be/src/pipeline/exec/join/process_hash_table_probe_impl.h @@ -51,7 +51,6 @@ ProcessHashTableProbe::ProcessHashTableProbe(HashJoinProbeLocalState _left_output_slot_flags(parent->left_output_slot_flags()), _right_output_slot_flags(parent->right_output_slot_flags()), _has_null_in_build_side(parent->has_null_in_build_side()), - _rows_returned_counter(parent->_rows_returned_counter), _search_hashtable_timer(parent->_search_hashtable_timer), _init_probe_side_timer(parent->_init_probe_side_timer), _build_side_output_timer(parent->_build_side_output_timer), @@ -177,8 +176,10 @@ typename HashTableType::State ProcessHashTableProbe::_init_probe_sid false, hash_table_ctx.hash_table->get_bucket_size()); hash_table_ctx.hash_table->pre_build_idxs(hash_table_ctx.bucket_nums, need_judge_null ? null_map : nullptr); - COUNTER_SET(_parent->_probe_arena_memory_usage, - (int64_t)hash_table_ctx.serialized_keys_size(false)); + int64_t arena_memory_usage = hash_table_ctx.serialized_keys_size(false); + COUNTER_SET(_parent->_probe_arena_memory_usage, arena_memory_usage); + COUNTER_UPDATE(_parent->_memory_used_counter, arena_memory_usage); + COUNTER_SET(_parent->_peak_memory_usage_counter, _parent->_memory_used_counter->value()); } return typename HashTableType::State(_parent->_probe_columns); diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp index 304e8e96f0c79c..c91dccb873ffbe 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp +++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp @@ -106,7 +106,6 @@ Status MultiCastDataStreamerSourceOperatorX::get_block(RuntimeState* state, local_state._output_expr_contexts, *output_block, block, true)); vectorized::materialize_block_inplace(*block); } - COUNTER_UPDATE(local_state._rows_returned_counter, block->rows()); return Status::OK(); } diff --git a/be/src/pipeline/exec/operator.cpp b/be/src/pipeline/exec/operator.cpp index 79f3cbdf17b51c..3b5174d87c0f7f 100644 --- a/be/src/pipeline/exec/operator.cpp +++ b/be/src/pipeline/exec/operator.cpp @@ -322,17 +322,28 @@ Status OperatorXBase::get_block_after_projects(RuntimeState* state, vectorized:: } }); + Status status; auto* local_state = state->get_local_state(operator_id()); + Defer defer([&]() { + if (status.ok()) { + if (auto rows = block->rows()) { + COUNTER_UPDATE(local_state->_rows_returned_counter, rows); + COUNTER_UPDATE(local_state->_blocks_returned_counter, 1); + } + } + }); if (_output_row_descriptor) { local_state->clear_origin_block(); - auto status = get_block(state, &local_state->_origin_block, eos); + status = get_block(state, &local_state->_origin_block, eos); if (UNLIKELY(!status.ok())) { return status; } - return do_projections(state, &local_state->_origin_block, block); + status = do_projections(state, &local_state->_origin_block, block); + return status; } - local_state->_peak_memory_usage_counter->set(local_state->_mem_tracker->peak_consumption()); - return get_block(state, block, eos); + status = get_block(state, block, eos); + local_state->_peak_memory_usage_counter->set(local_state->_memory_used_counter->value()); + return status; } void PipelineXLocalStateBase::reached_limit(vectorized::Block* block, bool* eos) { @@ -353,8 +364,6 @@ void PipelineXLocalStateBase::reached_limit(vectorized::Block* block, bool* eos) if (auto rows = block->rows()) { _num_rows_returned += rows; - COUNTER_UPDATE(_blocks_returned_counter, 1); - COUNTER_SET(_rows_returned_counter, _num_rows_returned); } } @@ -475,10 +484,9 @@ Status PipelineXLocalState::init(RuntimeState* state, LocalState _open_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "OpenTime", 1); _close_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "CloseTime", 1); _exec_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "ExecTime", 1); - _mem_tracker = std::make_unique("PipelineXLocalState:" + _runtime_profile->name()); - _memory_used_counter = ADD_LABEL_COUNTER_WITH_LEVEL(_runtime_profile, "MemoryUsage", 1); - _peak_memory_usage_counter = _runtime_profile->AddHighWaterMarkCounter( - "PeakMemoryUsage", TUnit::BYTES, "MemoryUsage", 1); + _memory_used_counter = ADD_COUNTER_WITH_LEVEL(_runtime_profile, "MemoryUsage", TUnit::BYTES, 1); + _peak_memory_usage_counter = + _runtime_profile->AddHighWaterMarkCounter("MemoryUsagePeak", TUnit::BYTES, "", 1); return Status::OK(); } @@ -511,11 +519,8 @@ Status PipelineXLocalState::close(RuntimeState* state) { if constexpr (!std::is_same_v) { COUNTER_SET(_wait_for_dependency_timer, _dependency->watcher_elapse_time()); } - if (_rows_returned_counter != nullptr) { - COUNTER_SET(_rows_returned_counter, _num_rows_returned); - } if (_peak_memory_usage_counter) { - _peak_memory_usage_counter->set(_mem_tracker->peak_consumption()); + _peak_memory_usage_counter->set(_memory_used_counter->value()); } _closed = true; // Some kinds of source operators has a 1-1 relationship with a sink operator (such as AnalyticOperator). @@ -555,10 +560,9 @@ Status PipelineXSinkLocalState::init(RuntimeState* state, LocalSink _close_timer = ADD_TIMER_WITH_LEVEL(_profile, "CloseTime", 1); _exec_timer = ADD_TIMER_WITH_LEVEL(_profile, "ExecTime", 1); info.parent_profile->add_child(_profile, true, nullptr); - _mem_tracker = std::make_unique(_parent->get_name()); - _memory_used_counter = ADD_LABEL_COUNTER_WITH_LEVEL(_profile, "MemoryUsage", 1); + _memory_used_counter = ADD_COUNTER_WITH_LEVEL(_profile, "MemoryUsage", TUnit::BYTES, 1); _peak_memory_usage_counter = - _profile->AddHighWaterMarkCounter("PeakMemoryUsage", TUnit::BYTES, "MemoryUsage", 1); + _profile->AddHighWaterMarkCounter("MemoryUsagePeak", TUnit::BYTES, "", 1); return Status::OK(); } @@ -571,7 +575,7 @@ Status PipelineXSinkLocalState::close(RuntimeState* state, Status e COUNTER_SET(_wait_for_dependency_timer, _dependency->watcher_elapse_time()); } if (_peak_memory_usage_counter) { - _peak_memory_usage_counter->set(_mem_tracker->peak_consumption()); + _peak_memory_usage_counter->set(_memory_used_counter->value()); } _closed = true; return Status::OK(); diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index 301f7599737989..25c3a68a9f2dc3 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -163,10 +163,9 @@ class PipelineXLocalStateBase { void reached_limit(vectorized::Block* block, bool* eos); RuntimeProfile* profile() { return _runtime_profile.get(); } - MemTracker* mem_tracker() { return _mem_tracker.get(); } - RuntimeProfile::Counter* rows_returned_counter() { return _rows_returned_counter; } - RuntimeProfile::Counter* blocks_returned_counter() { return _blocks_returned_counter; } RuntimeProfile::Counter* exec_time_counter() { return _exec_timer; } + RuntimeProfile::Counter* memory_used_counter() { return _memory_used_counter; } + RuntimeProfile::Counter* peak_memory_usage_counter() { return _peak_memory_usage_counter; } OperatorXBase* parent() { return _parent; } RuntimeState* state() { return _state; } vectorized::VExprContextSPtrs& conjuncts() { return _conjuncts; } @@ -188,16 +187,14 @@ class PipelineXLocalStateBase { protected: friend class OperatorXBase; + template + friend class ScanOperatorX; ObjectPool* _pool = nullptr; int64_t _num_rows_returned {0}; std::unique_ptr _runtime_profile; - // Record this node memory size. it is expected that artificial guarantees are accurate, - // which will providea reference for operator memory. - std::unique_ptr _mem_tracker; - std::shared_ptr _query_statistics = nullptr; RuntimeProfile::Counter* _rows_returned_counter = nullptr; @@ -342,7 +339,6 @@ class PipelineXSinkLocalStateBase { DataSinkOperatorXBase* parent() { return _parent; } RuntimeState* state() { return _state; } RuntimeProfile* profile() { return _profile; } - MemTracker* mem_tracker() { return _mem_tracker.get(); } [[nodiscard]] RuntimeProfile* faker_runtime_profile() const { return _faker_runtime_profile.get(); } @@ -350,9 +346,7 @@ class PipelineXSinkLocalStateBase { RuntimeProfile::Counter* rows_input_counter() { return _rows_input_counter; } RuntimeProfile::Counter* exec_time_counter() { return _exec_timer; } RuntimeProfile::Counter* memory_used_counter() { return _memory_used_counter; } - RuntimeProfile::HighWaterMarkCounter* peak_memory_usage_counter() { - return _peak_memory_usage_counter; - } + RuntimeProfile::Counter* peak_memory_usage_counter() { return _peak_memory_usage_counter; } virtual std::vector dependencies() const { return {nullptr}; } // override in exchange sink , AsyncWriterSink @@ -364,7 +358,6 @@ class PipelineXSinkLocalStateBase { DataSinkOperatorXBase* _parent = nullptr; RuntimeState* _state = nullptr; RuntimeProfile* _profile = nullptr; - std::unique_ptr _mem_tracker; // Set to true after close() has been called. subclasses should check and set this in // close(). bool _closed = false; diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.cpp b/be/src/pipeline/exec/partition_sort_sink_operator.cpp index fbabdbdc8f85fe..eba9e8472d3158 100644 --- a/be/src/pipeline/exec/partition_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/partition_sort_sink_operator.cpp @@ -113,9 +113,9 @@ Status PartitionSortSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo _partition_exprs_num = p._partition_exprs_num; _hash_table_size_counter = ADD_COUNTER(_profile, "HashTableSize", TUnit::UNIT); _serialize_key_arena_memory_usage = - _profile->AddHighWaterMarkCounter("SerializeKeyArena", TUnit::BYTES, "MemoryUsage", 1); + _profile->AddHighWaterMarkCounter("MemoryUsageSerializeKeyArena", TUnit::BYTES, "", 1); _hash_table_memory_usage = - ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "HashTable", TUnit::BYTES, "MemoryUsage", 1); + ADD_COUNTER_WITH_LEVEL(_profile, "MemoryUsageHashTable", TUnit::BYTES, 1); _build_timer = ADD_TIMER(_profile, "HashTableBuildTime"); _selector_block_timer = ADD_TIMER(_profile, "SelectorBlockTime"); _emplace_key_timer = ADD_TIMER(_profile, "EmplaceKeyTime"); diff --git a/be/src/pipeline/exec/partition_sort_source_operator.cpp b/be/src/pipeline/exec/partition_sort_source_operator.cpp index f2cd8dea0b943c..6d355477ab871c 100644 --- a/be/src/pipeline/exec/partition_sort_source_operator.cpp +++ b/be/src/pipeline/exec/partition_sort_source_operator.cpp @@ -58,7 +58,6 @@ Status PartitionSortSourceOperatorX::get_block(RuntimeState* state, vectorized:: } } if (!output_block->empty()) { - COUNTER_UPDATE(local_state.blocks_returned_counter(), 1); local_state._num_rows_returned += output_block->rows(); } return Status::OK(); @@ -80,7 +79,6 @@ Status PartitionSortSourceOperatorX::get_block(RuntimeState* state, vectorized:: local_state._sort_idx >= local_state._shared_state->partition_sorts.size(); } if (!output_block->empty()) { - COUNTER_UPDATE(local_state.blocks_returned_counter(), 1); local_state._num_rows_returned += output_block->rows(); } return Status::OK(); diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp index 469716b7a22182..ab0a43f4a635cf 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp @@ -81,10 +81,10 @@ Status PartitionedAggSinkLocalState::close(RuntimeState* state, Status exec_stat void PartitionedAggSinkLocalState::_init_counters() { _internal_runtime_profile = std::make_unique("internal_profile"); - _hash_table_memory_usage = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(), "HashTable", - TUnit::BYTES, "MemoryUsage", 1); + _hash_table_memory_usage = + ADD_COUNTER_WITH_LEVEL(Base::profile(), "MemoryUsageHashTable", TUnit::BYTES, 1); _serialize_key_arena_memory_usage = Base::profile()->AddHighWaterMarkCounter( - "SerializeKeyArena", TUnit::BYTES, "MemoryUsage", 1); + "MemoryUsageSerializeKeyArena", TUnit::BYTES, "", 1); _build_timer = ADD_TIMER(Base::profile(), "BuildTime"); _serialize_key_timer = ADD_TIMER(Base::profile(), "SerializeKeyTime"); @@ -110,8 +110,8 @@ void PartitionedAggSinkLocalState::_init_counters() { } while (false) void PartitionedAggSinkLocalState::update_profile(RuntimeProfile* child_profile) { - UPDATE_PROFILE(_hash_table_memory_usage, "HashTable"); - UPDATE_PROFILE(_serialize_key_arena_memory_usage, "SerializeKeyArena"); + UPDATE_PROFILE(_hash_table_memory_usage, "MemoryUsageHashTable"); + UPDATE_PROFILE(_serialize_key_arena_memory_usage, "MemoryUsageSerializeKeyArena"); UPDATE_PROFILE(_build_timer, "BuildTime"); UPDATE_PROFILE(_serialize_key_timer, "SerializeKeyTime"); UPDATE_PROFILE(_merge_timer, "MergeTime"); diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp index 018d63a6deebb1..0e56acc1c574b2 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp @@ -557,8 +557,7 @@ Status PartitionedHashJoinProbeOperatorX::push(RuntimeState* state, vectorized:: } { SCOPED_TIMER(local_state._partition_timer); - RETURN_IF_ERROR(local_state._partitioner->do_partitioning(state, input_block, - local_state._mem_tracker.get())); + RETURN_IF_ERROR(local_state._partitioner->do_partitioning(state, input_block)); } std::vector> partition_indexes(_partition_count); diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp index 32af13ba548b32..e3ca74c2d53197 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp @@ -80,7 +80,7 @@ size_t PartitionedHashJoinSinkLocalState::revocable_mem_size(RuntimeState* state if (inner_sink_state_) { auto inner_sink_state = assert_cast(inner_sink_state_); - return inner_sink_state->_build_side_mem_used; + return inner_sink_state->_build_blocks_memory_usage->value(); } } return 0; @@ -161,7 +161,7 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta { SCOPED_TIMER(_partition_timer); - (void)_partitioner->do_partitioning(state, &sub_block, _mem_tracker.get()); + (void)_partitioner->do_partitioning(state, &sub_block); } const auto* channel_ids = _partitioner->get_channel_ids().get(); @@ -334,7 +334,7 @@ Status PartitionedHashJoinSinkLocalState::_partition_block(RuntimeState* state, { /// TODO: DO NOT execute build exprs twice(when partition and building hash table) SCOPED_TIMER(_partition_timer); - RETURN_IF_ERROR(_partitioner->do_partitioning(state, in_block, _mem_tracker.get())); + RETURN_IF_ERROR(_partitioner->do_partitioning(state, in_block)); } auto& p = _parent->cast(); diff --git a/be/src/pipeline/exec/repeat_operator.cpp b/be/src/pipeline/exec/repeat_operator.cpp index cd707ccc49f8c2..5c94d43f0d1e05 100644 --- a/be/src/pipeline/exec/repeat_operator.cpp +++ b/be/src/pipeline/exec/repeat_operator.cpp @@ -248,7 +248,6 @@ Status RepeatOperatorX::pull(doris::RuntimeState* state, vectorized::Block* outp } *eos = _child_eos && _child_block.rows() == 0; local_state.reached_limit(output_block, eos); - COUNTER_SET(local_state._rows_returned_counter, local_state._num_rows_returned); return Status::OK(); } diff --git a/be/src/pipeline/exec/scan_operator.h b/be/src/pipeline/exec/scan_operator.h index 7c774a5aaa0dbc..5d41c800383bd0 100644 --- a/be/src/pipeline/exec/scan_operator.h +++ b/be/src/pipeline/exec/scan_operator.h @@ -358,7 +358,15 @@ class ScanOperatorX : public OperatorX { Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override; Status get_block_after_projects(RuntimeState* state, vectorized::Block* block, bool* eos) override { - return get_block(state, block, eos); + Status status = get_block(state, block, eos); + if (status.ok()) { + if (auto rows = block->rows()) { + auto* local_state = state->get_local_state(operator_id()); + COUNTER_UPDATE(local_state->_rows_returned_counter, rows); + COUNTER_UPDATE(local_state->_blocks_returned_counter, 1); + } + } + return status; } [[nodiscard]] bool is_source() const override { return true; } diff --git a/be/src/pipeline/exec/sort_sink_operator.cpp b/be/src/pipeline/exec/sort_sink_operator.cpp index 6f67262ef1f3ed..ad7d72971094fe 100644 --- a/be/src/pipeline/exec/sort_sink_operator.cpp +++ b/be/src/pipeline/exec/sort_sink_operator.cpp @@ -31,7 +31,7 @@ Status SortSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_init_timer); _sort_blocks_memory_usage = - ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "SortBlocks", TUnit::BYTES, "MemoryUsage", 1); + ADD_COUNTER_WITH_LEVEL(_profile, "MemoryUsageSortBlocks", TUnit::BYTES, 1); _append_blocks_timer = ADD_TIMER(profile(), "AppendBlockTime"); _update_runtime_predicate_timer = ADD_TIMER(profile(), "UpdateRuntimePredicateTime"); return Status::OK(); @@ -121,12 +121,14 @@ Status SortSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block* in SCOPED_TIMER(local_state.exec_time_counter()); COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); if (in_block->rows() > 0) { - COUNTER_UPDATE(local_state._sort_blocks_memory_usage, (int64_t)in_block->bytes()); { SCOPED_TIMER(local_state._append_blocks_timer); RETURN_IF_ERROR(local_state._shared_state->sorter->append_block(in_block)); } - local_state._mem_tracker->set_consumption(local_state._shared_state->sorter->data_size()); + int64_t data_size = local_state._shared_state->sorter->data_size(); + COUNTER_SET(local_state._sort_blocks_memory_usage, data_size); + COUNTER_SET(local_state._memory_used_counter, data_size); + COUNTER_SET(local_state._peak_memory_usage_counter, data_size); RETURN_IF_CANCELLED(state); if (state->get_query_ctx()->has_runtime_predicate(_node_id)) { diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp b/be/src/pipeline/exec/spill_sort_sink_operator.cpp index 4bf1ab04efb628..130affd2f4b7aa 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp @@ -51,7 +51,7 @@ void SpillSortSinkLocalState::_init_counters() { _partial_sort_timer = ADD_TIMER(_profile, "PartialSortTime"); _merge_block_timer = ADD_TIMER(_profile, "MergeBlockTime"); _sort_blocks_memory_usage = - ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "SortBlocks", TUnit::BYTES, "MemoryUsage", 1); + ADD_COUNTER_WITH_LEVEL(_profile, "MemoryUsageSortBlocks", TUnit::BYTES, 1); _spill_merge_sort_timer = ADD_CHILD_TIMER_WITH_LEVEL(_profile, "SpillMergeSortTime", "Spill", 1); @@ -70,7 +70,7 @@ void SpillSortSinkLocalState::_init_counters() { void SpillSortSinkLocalState::update_profile(RuntimeProfile* child_profile) { UPDATE_PROFILE(_partial_sort_timer, "PartialSortTime"); UPDATE_PROFILE(_merge_block_timer, "MergeBlockTime"); - UPDATE_PROFILE(_sort_blocks_memory_usage, "SortBlocks"); + UPDATE_PROFILE(_sort_blocks_memory_usage, "MemoryUsageSortBlocks"); } Status SpillSortSinkLocalState::close(RuntimeState* state, Status execsink_status) { @@ -156,8 +156,11 @@ Status SpillSortSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Bloc DBUG_EXECUTE_IF("fault_inject::spill_sort_sink::sink", { return Status::InternalError("fault_inject spill_sort_sink sink failed"); }); RETURN_IF_ERROR(_sort_sink_operator->sink(local_state._runtime_state.get(), in_block, false)); - local_state._mem_tracker->set_consumption( - local_state._shared_state->in_mem_shared_state->sorter->data_size()); + int64_t data_size = local_state._shared_state->in_mem_shared_state->sorter->data_size(); + COUNTER_SET(local_state._sort_blocks_memory_usage, data_size); + COUNTER_SET(local_state._memory_used_counter, data_size); + COUNTER_SET(local_state._peak_memory_usage_counter, data_size); + if (eos) { if (local_state._shared_state->is_spilled) { if (revocable_mem_size(state) > 0) { diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp b/be/src/pipeline/exec/streaming_aggregation_operator.cpp index 9ead9c37b1752a..ab969a129e6023 100644 --- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp +++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp @@ -87,10 +87,10 @@ Status StreamingAggLocalState::init(RuntimeState* state, LocalStateInfo& info) { RETURN_IF_ERROR(Base::init(state, info)); SCOPED_TIMER(Base::exec_time_counter()); SCOPED_TIMER(Base::_init_timer); - _hash_table_memory_usage = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(), "HashTable", - TUnit::BYTES, "MemoryUsage", 1); + _hash_table_memory_usage = + ADD_COUNTER_WITH_LEVEL(Base::profile(), "MemoryUsageHashTable", TUnit::BYTES, 1); _serialize_key_arena_memory_usage = Base::profile()->AddHighWaterMarkCounter( - "SerializeKeyArena", TUnit::BYTES, "MemoryUsage", 1); + "MemoryUsageSerializeKeyArena", TUnit::BYTES, "", 1); _build_timer = ADD_TIMER(Base::profile(), "BuildTime"); _merge_timer = ADD_TIMER(Base::profile(), "MergeTime"); @@ -350,10 +350,10 @@ Status StreamingAggLocalState::_merge_without_key(vectorized::Block* block) { } void StreamingAggLocalState::_update_memusage_without_key() { - auto arena_memory_usage = _agg_arena_pool->size() - _mem_usage_record.used_in_arena; - Base::_mem_tracker->consume(arena_memory_usage); - _serialize_key_arena_memory_usage->add(arena_memory_usage); - _mem_usage_record.used_in_arena = _agg_arena_pool->size(); + int64_t arena_memory_usage = _agg_arena_pool->size(); + COUNTER_SET(_memory_used_counter, arena_memory_usage); + COUNTER_SET(_peak_memory_usage_counter, arena_memory_usage); + COUNTER_SET(_serialize_key_arena_memory_usage, arena_memory_usage); } Status StreamingAggLocalState::_execute_with_serialized_key(vectorized::Block* block) { @@ -365,28 +365,25 @@ Status StreamingAggLocalState::_execute_with_serialized_key(vectorized::Block* b } void StreamingAggLocalState::_update_memusage_with_serialized_key() { - std::visit( - vectorized::Overload { - [&](std::monostate& arg) -> void { - throw doris::Exception(ErrorCode::INTERNAL_ERROR, "uninited hash table"); - }, - [&](auto& agg_method) -> void { - auto& data = *agg_method.hash_table; - auto arena_memory_usage = _agg_arena_pool->size() + - _aggregate_data_container->memory_usage() - - _mem_usage_record.used_in_arena; - Base::_mem_tracker->consume(arena_memory_usage); - Base::_mem_tracker->consume(data.get_buffer_size_in_bytes() - - _mem_usage_record.used_in_state); - _serialize_key_arena_memory_usage->add(arena_memory_usage); - COUNTER_UPDATE( - _hash_table_memory_usage, - data.get_buffer_size_in_bytes() - _mem_usage_record.used_in_state); - _mem_usage_record.used_in_state = data.get_buffer_size_in_bytes(); - _mem_usage_record.used_in_arena = - _agg_arena_pool->size() + _aggregate_data_container->memory_usage(); - }}, - _agg_data->method_variant); + std::visit(vectorized::Overload { + [&](std::monostate& arg) -> void { + throw doris::Exception(ErrorCode::INTERNAL_ERROR, "uninited hash table"); + }, + [&](auto& agg_method) -> void { + auto& data = *agg_method.hash_table; + int64_t arena_memory_usage = _agg_arena_pool->size() + + _aggregate_data_container->memory_usage(); + int64_t hash_table_memory_usage = data.get_buffer_size_in_bytes(); + + COUNTER_SET(_memory_used_counter, + arena_memory_usage + hash_table_memory_usage); + COUNTER_SET(_peak_memory_usage_counter, + arena_memory_usage + hash_table_memory_usage); + + COUNTER_SET(_serialize_key_arena_memory_usage, arena_memory_usage); + COUNTER_SET(_hash_table_memory_usage, hash_table_memory_usage); + }}, + _agg_data->method_variant); } template @@ -508,7 +505,6 @@ Status StreamingAggLocalState::do_pre_agg(vectorized::Block* input_block, // pre stream agg need use _num_row_return to decide whether to do pre stream agg _cur_num_rows_returned += output_block->rows(); _make_nullable_output_key(output_block); - // COUNTER_SET(_rows_returned_counter, _num_rows_returned); _executor->update_memusage(this); return Status::OK(); } @@ -1247,7 +1243,6 @@ Status StreamingAggLocalState::close(RuntimeState* state) { std::vector tmp_deserialize_buffer; _deserialize_buffer.swap(tmp_deserialize_buffer); - Base::_mem_tracker->release(_mem_usage_record.used_in_state + _mem_usage_record.used_in_arena); /// _hash_table_size_counter may be null if prepare failed. if (_hash_table_size_counter) { diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.h b/be/src/pipeline/exec/streaming_aggregation_operator.h index 59d5491d10c12f..b695880ac2857b 100644 --- a/be/src/pipeline/exec/streaming_aggregation_operator.h +++ b/be/src/pipeline/exec/streaming_aggregation_operator.h @@ -167,12 +167,6 @@ class StreamingAggLocalState final : public PipelineXLocalState }; std::unique_ptr _executor = nullptr; - struct MemoryRecord { - MemoryRecord() : used_in_arena(0), used_in_state(0) {} - int64_t used_in_arena; - int64_t used_in_state; - }; - MemoryRecord _mem_usage_record; std::unique_ptr _child_block = nullptr; bool _child_eos = false; std::unique_ptr _pre_aggregated_block = nullptr; diff --git a/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp b/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp index 2d20b8f365cd7d..c4832b9958c00d 100644 --- a/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp +++ b/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp @@ -26,7 +26,7 @@ Status LocalExchangeSourceLocalState::init(RuntimeState* state, LocalStateInfo& SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_init_timer); _channel_id = info.task_idx; - _shared_state->mem_trackers[_channel_id] = _mem_tracker.get(); + _shared_state->mem_counters[_channel_id] = _memory_used_counter; _exchanger = _shared_state->exchanger.get(); DCHECK(_exchanger != nullptr); _get_block_failed_counter = @@ -105,8 +105,8 @@ std::string LocalExchangeSourceLocalState::debug_string(int indentation_level) c _exchanger->data_queue_debug_string(_channel_id)); size_t i = 0; fmt::format_to(debug_string_buffer, ", MemTrackers: "); - for (auto* mem_tracker : _shared_state->mem_trackers) { - fmt::format_to(debug_string_buffer, "{}: {}, ", i, mem_tracker->consumption()); + for (auto* mem_counter : _shared_state->mem_counters) { + fmt::format_to(debug_string_buffer, "{}: {}, ", i, mem_counter->value()); i++; } return fmt::to_string(debug_string_buffer); diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp b/be/src/pipeline/local_exchange/local_exchanger.cpp index fa34b6a4040b6c..23f91cca631882 100644 --- a/be/src/pipeline/local_exchange/local_exchanger.cpp +++ b/be/src/pipeline/local_exchange/local_exchanger.cpp @@ -118,8 +118,7 @@ Status ShuffleExchanger::sink(RuntimeState* state, vectorized::Block* in_block, } { SCOPED_TIMER(local_state._compute_hash_value_timer); - RETURN_IF_ERROR(local_state._partitioner->do_partitioning(state, in_block, - local_state.mem_tracker())); + RETURN_IF_ERROR(local_state._partitioner->do_partitioning(state, in_block)); } { SCOPED_TIMER(local_state._distribute_timer); diff --git a/be/src/vec/runtime/partitioner.cpp b/be/src/vec/runtime/partitioner.cpp index 0d6165b75556f6..d0c8f2d8fcdcad 100644 --- a/be/src/vec/runtime/partitioner.cpp +++ b/be/src/vec/runtime/partitioner.cpp @@ -25,8 +25,8 @@ namespace doris::vectorized { template -Status Partitioner::do_partitioning(RuntimeState* state, Block* block, - MemTracker* mem_tracker) const { +Status Partitioner::do_partitioning(RuntimeState* state, + Block* block) const { int rows = block->rows(); if (rows > 0) { @@ -38,10 +38,7 @@ Status Partitioner::do_partitioning(RuntimeState* sta _hash_vals.resize(rows); std::fill(_hash_vals.begin(), _hash_vals.end(), 0); auto* __restrict hashes = _hash_vals.data(); - { - SCOPED_CONSUME_MEM_TRACKER(mem_tracker); - RETURN_IF_ERROR(_get_partition_column_result(block, result)); - } + RETURN_IF_ERROR(_get_partition_column_result(block, result)); for (int j = 0; j < result_size; ++j) { _do_hash(unpack_if_const(block->get_by_position(result[j]).column).first, hashes, j); } @@ -50,10 +47,7 @@ Status Partitioner::do_partitioning(RuntimeState* sta hashes[i] = ChannelIds()(hashes[i], _partition_count); } - { - SCOPED_CONSUME_MEM_TRACKER(mem_tracker); - Block::erase_useless_column(block, column_to_keep); - } + Block::erase_useless_column(block, column_to_keep); } return Status::OK(); } diff --git a/be/src/vec/runtime/partitioner.h b/be/src/vec/runtime/partitioner.h index 3152edb5cb57c7..5607a83327b119 100644 --- a/be/src/vec/runtime/partitioner.h +++ b/be/src/vec/runtime/partitioner.h @@ -48,8 +48,7 @@ class PartitionerBase { virtual Status open(RuntimeState* state) = 0; - virtual Status do_partitioning(RuntimeState* state, Block* block, - MemTracker* mem_tracker) const = 0; + virtual Status do_partitioning(RuntimeState* state, Block* block) const = 0; virtual ChannelField get_channel_ids() const = 0; @@ -75,8 +74,7 @@ class Partitioner : public PartitionerBase { Status open(RuntimeState* state) override { return VExpr::open(_partition_expr_ctxs, state); } - Status do_partitioning(RuntimeState* state, Block* block, - MemTracker* mem_tracker) const override; + Status do_partitioning(RuntimeState* state, Block* block) const override; ChannelField get_channel_ids() const override { return {_hash_vals.data(), sizeof(HashValueType)}; diff --git a/be/src/vec/runtime/vdata_stream_mgr.cpp b/be/src/vec/runtime/vdata_stream_mgr.cpp index c14d119e0fee25..78067b9b18106c 100644 --- a/be/src/vec/runtime/vdata_stream_mgr.cpp +++ b/be/src/vec/runtime/vdata_stream_mgr.cpp @@ -61,12 +61,13 @@ inline uint32_t VDataStreamMgr::get_hash_value(const TUniqueId& fragment_instanc } std::shared_ptr VDataStreamMgr::create_recvr( - RuntimeState* state, const RowDescriptor& row_desc, const TUniqueId& fragment_instance_id, - PlanNodeId dest_node_id, int num_senders, RuntimeProfile* profile, bool is_merging) { + RuntimeState* state, pipeline::ExchangeLocalState* parent, const RowDescriptor& row_desc, + const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders, + RuntimeProfile* profile, bool is_merging) { DCHECK(profile != nullptr); VLOG_FILE << "creating receiver for fragment=" << print_id(fragment_instance_id) << ", node=" << dest_node_id; - std::shared_ptr recvr(new VDataStreamRecvr(this, state, row_desc, + std::shared_ptr recvr(new VDataStreamRecvr(this, parent, state, row_desc, fragment_instance_id, dest_node_id, num_senders, is_merging, profile)); uint32_t hash_value = get_hash_value(fragment_instance_id, dest_node_id); diff --git a/be/src/vec/runtime/vdata_stream_mgr.h b/be/src/vec/runtime/vdata_stream_mgr.h index 09e347fcfb2a7a..bd5e6f9b91ee57 100644 --- a/be/src/vec/runtime/vdata_stream_mgr.h +++ b/be/src/vec/runtime/vdata_stream_mgr.h @@ -40,6 +40,9 @@ class RuntimeState; class RowDescriptor; class RuntimeProfile; class PTransmitDataParams; +namespace pipeline { +class ExchangeLocalState; +} namespace vectorized { class VDataStreamRecvr; @@ -50,6 +53,7 @@ class VDataStreamMgr { ~VDataStreamMgr(); std::shared_ptr create_recvr(RuntimeState* state, + pipeline::ExchangeLocalState* parent, const RowDescriptor& row_desc, const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders, diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp index 1ca6bb7f2c5931..f1dfbbf304763f 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.cpp +++ b/be/src/vec/runtime/vdata_stream_recvr.cpp @@ -96,6 +96,7 @@ Status VDataStreamRecvr::SenderQueue::_inner_get_batch_without_lock(Block* block DCHECK(!_block_queue.empty()); auto [next_block, block_byte_size] = std::move(_block_queue.front()); _block_queue.pop_front(); + _recvr->_parent->memory_used_counter()->update(-(int64_t)block_byte_size); sub_blocks_memory_usage(block_byte_size); _record_debug_info(); if (_block_queue.empty() && _source_dependency) { @@ -207,6 +208,9 @@ Status VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_num _pending_closures.emplace_back(*done, monotonicStopWatch); *done = nullptr; } + _recvr->_parent->memory_used_counter()->update(block_byte_size); + _recvr->_parent->peak_memory_usage_counter()->set( + _recvr->_parent->memory_used_counter()->value()); add_blocks_memory_usage(block_byte_size); return Status::OK(); } @@ -245,6 +249,9 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* block, bool use_move) { _record_debug_info(); try_set_dep_ready_without_lock(); COUNTER_UPDATE(_recvr->_local_bytes_received_counter, block_mem_size); + _recvr->_parent->memory_used_counter()->update(block_mem_size); + _recvr->_parent->peak_memory_usage_counter()->set( + _recvr->_parent->memory_used_counter()->value()); add_blocks_memory_usage(block_mem_size); } } @@ -315,12 +322,13 @@ void VDataStreamRecvr::SenderQueue::close() { _block_queue.clear(); } -VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr* stream_mgr, RuntimeState* state, - const RowDescriptor& row_desc, +VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr* stream_mgr, pipeline::ExchangeLocalState* parent, + RuntimeState* state, const RowDescriptor& row_desc, const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders, bool is_merging, RuntimeProfile* profile) : HasTaskExecutionCtx(state), _mgr(stream_mgr), + _parent(parent), _query_thread_context(state->query_id(), state->query_mem_tracker(), state->get_query_ctx()->workload_group()), _fragment_instance_id(fragment_instance_id), @@ -352,9 +360,6 @@ VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr* stream_mgr, RuntimeState* sta } // Initialize the counters - _memory_usage_counter = ADD_LABEL_COUNTER(_profile, "MemoryUsage"); - _peak_memory_usage_counter = - _profile->add_counter("PeakMemoryUsage", TUnit::BYTES, "MemoryUsage"); _remote_bytes_received_counter = ADD_COUNTER(_profile, "RemoteBytesReceived", TUnit::BYTES); _local_bytes_received_counter = ADD_COUNTER(_profile, "LocalBytesReceived", TUnit::BYTES); @@ -417,7 +422,6 @@ std::shared_ptr VDataStreamRecvr::get_local_channel_depend } Status VDataStreamRecvr::get_next(Block* block, bool* eos) { - _peak_memory_usage_counter->set(_mem_tracker->peak_consumption()); if (!_is_merging) { block->clear(); return _sender_queues[0]->get_batch(block, eos); @@ -492,9 +496,6 @@ void VDataStreamRecvr::close() { _mgr = nullptr; _merger.reset(); - if (_peak_memory_usage_counter) { - _peak_memory_usage_counter->set(_mem_tracker->peak_consumption()); - } } void VDataStreamRecvr::set_sink_dep_always_ready() const { diff --git a/be/src/vec/runtime/vdata_stream_recvr.h b/be/src/vec/runtime/vdata_stream_recvr.h index e8dcfdedba5fb9..b2d76590ba2717 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.h +++ b/be/src/vec/runtime/vdata_stream_recvr.h @@ -69,7 +69,8 @@ class VDataStreamRecvr; class VDataStreamRecvr : public HasTaskExecutionCtx { public: class SenderQueue; - VDataStreamRecvr(VDataStreamMgr* stream_mgr, RuntimeState* state, const RowDescriptor& row_desc, + VDataStreamRecvr(VDataStreamMgr* stream_mgr, pipeline::ExchangeLocalState* parent, + RuntimeState* state, const RowDescriptor& row_desc, const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders, bool is_merging, RuntimeProfile* profile); @@ -120,6 +121,8 @@ class VDataStreamRecvr : public HasTaskExecutionCtx { // DataStreamMgr instance used to create this recvr. (Not owned) VDataStreamMgr* _mgr = nullptr; + pipeline::ExchangeLocalState* _parent = nullptr; + QueryThreadContext _query_thread_context; // Fragment and node id of the destination exchange node this receiver is used by. @@ -152,8 +155,6 @@ class VDataStreamRecvr : public HasTaskExecutionCtx { RuntimeProfile::Counter* _data_arrival_timer = nullptr; RuntimeProfile::Counter* _decompress_timer = nullptr; RuntimeProfile::Counter* _decompress_bytes = nullptr; - RuntimeProfile::Counter* _memory_usage_counter = nullptr; - RuntimeProfile::Counter* _peak_memory_usage_counter = nullptr; // Number of rows received RuntimeProfile::Counter* _rows_produced_counter = nullptr; diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index a458d7f5ef16a7..e169d57e05e58b 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -253,12 +253,10 @@ Status BlockSerializer::next_serialized_block(Block* block, PBlock* dest, int nu bool* serialized, bool eos, const std::vector* rows) { if (_mutable_block == nullptr) { - SCOPED_CONSUME_MEM_TRACKER(_parent->mem_tracker()); _mutable_block = MutableBlock::create_unique(block->clone_empty()); } { - SCOPED_CONSUME_MEM_TRACKER(_parent->mem_tracker()); if (rows) { if (!rows->empty()) { const auto* begin = rows->data();