Skip to content

Commit

Permalink
[fix](counters) fix MemoryUsage and PeakMemoryUsage counters of some …
Browse files Browse the repository at this point in the history
…operators (apache#41602)
  • Loading branch information
jacktengg committed Jan 2, 2025
1 parent c548055 commit 558fac6
Show file tree
Hide file tree
Showing 44 changed files with 246 additions and 246 deletions.
2 changes: 1 addition & 1 deletion be/src/pipeline/dependency.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ void LocalExchangeSharedState::sub_running_source_operators(

LocalExchangeSharedState::LocalExchangeSharedState(int num_instances) {
source_deps.resize(num_instances, nullptr);
mem_trackers.resize(num_instances, nullptr);
mem_counters.resize(num_instances, nullptr);
}

vectorized::MutableColumns AggSharedState::_get_keys_hash_table() {
Expand Down
13 changes: 5 additions & 8 deletions be/src/pipeline/dependency.h
Original file line number Diff line number Diff line change
Expand Up @@ -324,11 +324,6 @@ struct AggSharedState : public BasicSharedState {
vectorized::Sizes offsets_of_aggregate_states;
std::vector<size_t> make_nullable_keys;

struct MemoryRecord {
int64_t used_in_arena {};
int64_t used_in_state {};
};
MemoryRecord mem_usage_record;
bool agg_data_created_without_key = false;
bool enable_spill = false;
bool reach_limit = false;
Expand Down Expand Up @@ -825,7 +820,7 @@ struct LocalExchangeSharedState : public BasicSharedState {
LocalExchangeSharedState(int num_instances);
~LocalExchangeSharedState() override;
std::unique_ptr<ExchangerBase> exchanger {};
std::vector<MemTracker*> mem_trackers;
std::vector<RuntimeProfile::Counter*> mem_counters;
std::atomic<int64_t> mem_usage = 0;
// We need to make sure to add mem_usage first and then enqueue, otherwise sub mem_usage may cause negative mem_usage during concurrent dequeue.
std::mutex le_lock;
Expand Down Expand Up @@ -861,13 +856,15 @@ struct LocalExchangeSharedState : public BasicSharedState {
}

void add_mem_usage(int channel_id, size_t delta, bool update_total_mem_usage = true) {
mem_trackers[channel_id]->consume(delta);
mem_counters[channel_id]->update(delta);
if (update_total_mem_usage) {
add_total_mem_usage(delta, channel_id);
}
}

void sub_mem_usage(int channel_id, size_t delta) { mem_trackers[channel_id]->release(delta); }
void sub_mem_usage(int channel_id, size_t delta) {
mem_counters[channel_id]->update(-(int64_t)delta);
}

virtual void add_total_mem_usage(size_t delta, int channel_id) {
if (mem_usage.fetch_add(delta) + delta > config::local_exchange_buffer_mem_limit) {
Expand Down
46 changes: 18 additions & 28 deletions be/src/pipeline/exec/aggregation_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@ Status AggSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) {
_agg_data = Base::_shared_state->agg_data.get();
_agg_arena_pool = Base::_shared_state->agg_arena_pool.get();
_hash_table_size_counter = ADD_COUNTER(profile(), "HashTableSize", TUnit::UNIT);
_hash_table_memory_usage = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(), "HashTable",
TUnit::BYTES, "MemoryUsage", 1);
_serialize_key_arena_memory_usage = Base::profile()->AddHighWaterMarkCounter(
"SerializeKeyArena", TUnit::BYTES, "MemoryUsage", 1);
_hash_table_memory_usage =
ADD_COUNTER_WITH_LEVEL(Base::profile(), "MemoryUsageHashTable", TUnit::BYTES, 1);
_serialize_key_arena_memory_usage = ADD_COUNTER_WITH_LEVEL(
Base::profile(), "MemoryUsageSerializeKeyArena", TUnit::BYTES, 1);

_build_timer = ADD_TIMER(Base::profile(), "BuildTime");
_merge_timer = ADD_TIMER(Base::profile(), "MergeTime");
Expand Down Expand Up @@ -223,24 +223,17 @@ void AggSinkLocalState::_update_memusage_with_serialized_key() {
},
[&](auto& agg_method) -> void {
auto& data = *agg_method.hash_table;
auto arena_memory_usage =
int64_t arena_memory_usage =
_agg_arena_pool->size() +
Base::_shared_state->aggregate_data_container->memory_usage() -
Base::_shared_state->mem_usage_record.used_in_arena;
Base::_mem_tracker->consume(arena_memory_usage);
Base::_mem_tracker->consume(
data.get_buffer_size_in_bytes() -
Base::_shared_state->mem_usage_record.used_in_state);
_serialize_key_arena_memory_usage->add(arena_memory_usage);
COUNTER_UPDATE(
_hash_table_memory_usage,
data.get_buffer_size_in_bytes() -
Base::_shared_state->mem_usage_record.used_in_state);
Base::_shared_state->mem_usage_record.used_in_state =
data.get_buffer_size_in_bytes();
Base::_shared_state->mem_usage_record.used_in_arena =
_agg_arena_pool->size() +
Base::_shared_state->aggregate_data_container->memory_usage();
_shared_state->aggregate_data_container->memory_usage();
int64_t hash_table_memory_usage = data.get_buffer_size_in_bytes();

COUNTER_SET(_memory_used_counter,
arena_memory_usage + hash_table_memory_usage);
COUNTER_SET(_peak_memory_usage_counter, _memory_used_counter->value());

COUNTER_SET(_serialize_key_arena_memory_usage, arena_memory_usage);
COUNTER_SET(_hash_table_memory_usage, hash_table_memory_usage);
}},
_agg_data->method_variant);
}
Expand Down Expand Up @@ -419,11 +412,10 @@ Status AggSinkLocalState::_merge_without_key(vectorized::Block* block) {
}

void AggSinkLocalState::_update_memusage_without_key() {
auto arena_memory_usage =
_agg_arena_pool->size() - Base::_shared_state->mem_usage_record.used_in_arena;
Base::_mem_tracker->consume(arena_memory_usage);
_serialize_key_arena_memory_usage->add(arena_memory_usage);
Base::_shared_state->mem_usage_record.used_in_arena = _agg_arena_pool->size();
int64_t arena_memory_usage = _agg_arena_pool->size();
COUNTER_SET(_memory_used_counter, arena_memory_usage);
COUNTER_SET(_peak_memory_usage_counter, arena_memory_usage);
COUNTER_SET(_serialize_key_arena_memory_usage, arena_memory_usage);
}

Status AggSinkLocalState::_execute_with_serialized_key(vectorized::Block* block) {
Expand Down Expand Up @@ -875,8 +867,6 @@ Status AggSinkLocalState::close(RuntimeState* state, Status exec_status) {

std::vector<char> tmp_deserialize_buffer;
_deserialize_buffer.swap(tmp_deserialize_buffer);
Base::_mem_tracker->release(Base::_shared_state->mem_usage_record.used_in_state +
Base::_shared_state->mem_usage_record.used_in_arena);
return Base::close(state, exec_status);
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/aggregation_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ class AggSinkLocalState : public PipelineXSinkLocalState<AggSharedState> {
RuntimeProfile::Counter* _deserialize_data_timer = nullptr;
RuntimeProfile::Counter* _hash_table_memory_usage = nullptr;
RuntimeProfile::Counter* _hash_table_size_counter = nullptr;
RuntimeProfile::HighWaterMarkCounter* _serialize_key_arena_memory_usage = nullptr;
RuntimeProfile::Counter* _serialize_key_arena_memory_usage = nullptr;

bool _should_limit_output = false;

Expand Down
4 changes: 0 additions & 4 deletions be/src/pipeline/exec/aggregation_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -450,17 +450,13 @@ void AggLocalState::do_agg_limit(vectorized::Block* block, bool* eos) {
vectorized::Block::filter_block_internal(block, _shared_state->need_computes);
if (auto rows = block->rows()) {
_num_rows_returned += rows;
COUNTER_UPDATE(_blocks_returned_counter, 1);
COUNTER_SET(_rows_returned_counter, _num_rows_returned);
}
} else {
reached_limit(block, eos);
}
} else {
if (auto rows = block->rows()) {
_num_rows_returned += rows;
COUNTER_UPDATE(_blocks_returned_counter, 1);
COUNTER_SET(_rows_returned_counter, _num_rows_returned);
}
}
}
Expand Down
5 changes: 4 additions & 1 deletion be/src/pipeline/exec/analytic_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ Status AnalyticSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf
_compute_agg_data_timer = ADD_TIMER(profile(), "ComputeAggDataTime");
_compute_partition_by_timer = ADD_TIMER(profile(), "ComputePartitionByTime");
_compute_order_by_timer = ADD_TIMER(profile(), "ComputeOrderByTime");
_blocks_memory_usage = ADD_COUNTER_WITH_LEVEL(_profile, "MemoryUsageBlocks", TUnit::BYTES, 1);
return Status::OK();
}

Expand Down Expand Up @@ -322,8 +323,10 @@ Status AnalyticSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block
}
}

COUNTER_UPDATE(local_state._memory_used_counter, input_block->allocated_bytes());
int64_t block_mem_usage = input_block->allocated_bytes();
COUNTER_UPDATE(local_state._memory_used_counter, block_mem_usage);
COUNTER_SET(local_state._peak_memory_usage_counter, local_state._memory_used_counter->value());
COUNTER_UPDATE(local_state._blocks_memory_usage, block_mem_usage);

//TODO: if need improvement, the is a tips to maintain a free queue,
//so the memory could reuse, no need to new/delete again;
Expand Down
1 change: 1 addition & 0 deletions be/src/pipeline/exec/analytic_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class AnalyticSinkLocalState : public PipelineXSinkLocalState<AnalyticSharedStat
RuntimeProfile::Counter* _compute_agg_data_timer = nullptr;
RuntimeProfile::Counter* _compute_partition_by_timer = nullptr;
RuntimeProfile::Counter* _compute_order_by_timer = nullptr;
RuntimeProfile::Counter* _blocks_memory_usage = nullptr;

std::vector<vectorized::VExprContextSPtrs> _agg_expr_ctxs;
};
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/exec/analytic_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,6 @@ bool AnalyticLocalState::init_next_partition(BlockRowPos found_partition_end) {
Status AnalyticLocalState::output_current_block(vectorized::Block* block) {
block->swap(std::move(_shared_state->input_blocks[_output_block_index]));
_blocks_memory_usage->add(-block->allocated_bytes());
mem_tracker()->consume(-block->allocated_bytes());
if (_shared_state->origin_cols.size() < block->columns()) {
block->erase_not_in(_shared_state->origin_cols);
}
Expand Down
2 changes: 0 additions & 2 deletions be/src/pipeline/exec/assert_num_rows_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,6 @@ Status AssertNumRowsOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc
return Status::Cancelled("Expected {} {} to be returned by expression {}",
to_string_lambda(_assertion), _desired_num_rows, _subquery_string);
}
COUNTER_SET(local_state.rows_returned_counter(), local_state.num_rows_returned());
COUNTER_UPDATE(local_state.blocks_returned_counter(), 1);
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, block,
block->columns()));
return Status::OK();
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/exec/cache_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ Status CacheSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* b
local_state._current_query_cache_rows += output_block->rows();
auto mem_consume = output_block->allocated_bytes();
local_state._current_query_cache_bytes += mem_consume;
local_state._mem_tracker->consume(mem_consume);

if (_cache_param.entry_max_bytes < local_state._current_query_cache_bytes ||
_cache_param.entry_max_rows < local_state._current_query_cache_rows) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,6 @@ Status DistinctStreamingAggOperatorX::pull(RuntimeState* state, vectorized::Bloc
block->columns()));
}
local_state.add_num_rows_returned(block->rows());
COUNTER_UPDATE(local_state.blocks_returned_counter(), 1);
// If the limit is not reached, it is important to ensure that _aggregated_block is empty
// because it may still contain data.
// However, if the limit is reached, there is no need to output data even if some exists.
Expand Down
16 changes: 16 additions & 0 deletions be/src/pipeline/exec/exchange_sink_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ Status ExchangeSinkBuffer::add_block(TransmitInfo&& request) {
if (request.block) {
RETURN_IF_ERROR(
BeExecVersionManager::check_be_exec_version(request.block->be_exec_version()));
COUNTER_UPDATE(_parent->memory_used_counter(), request.block->ByteSizeLong());
COUNTER_SET(_parent->peak_memory_usage_counter(),
_parent->memory_used_counter()->value());
}
_instance_to_package_queue[ins_id].emplace(std::move(request));
_total_queue_size++;
Expand Down Expand Up @@ -291,6 +294,7 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
}
if (request.block) {
static_cast<void>(brpc_request->release_block());
COUNTER_UPDATE(_parent->memory_used_counter(), -request.block->ByteSizeLong());
}
q.pop();
_total_queue_size--;
Expand Down Expand Up @@ -416,12 +420,24 @@ void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId id) {
_turn_off_channel(id, lock);
std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>& broadcast_q =
_instance_to_broadcast_package_queue[id];
for (; !broadcast_q.empty(); broadcast_q.pop()) {
if (broadcast_q.front().block_holder->get_block()) {
COUNTER_UPDATE(_parent->memory_used_counter(),
-broadcast_q.front().block_holder->get_block()->ByteSizeLong());
}
}
{
std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>> empty;
swap(empty, broadcast_q);
}

std::queue<TransmitInfo, std::list<TransmitInfo>>& q = _instance_to_package_queue[id];
for (; !q.empty(); q.pop()) {
if (q.front().block) {
COUNTER_UPDATE(_parent->memory_used_counter(), -q.front().block->ByteSizeLong());
}
}

{
std::queue<TransmitInfo, std::list<TransmitInfo>> empty;
swap(empty, q);
Expand Down
Loading

0 comments on commit 558fac6

Please sign in to comment.