Skip to content

Commit

Permalink
[opt](scanner) Add scanner metrics apache#40496 (apache#41314)
Browse files Browse the repository at this point in the history
cherry pick from apache#40496
  • Loading branch information
zhiqiang-hhhh authored Sep 27, 2024
1 parent 21b8887 commit e0c9cbd
Show file tree
Hide file tree
Showing 9 changed files with 54 additions and 4 deletions.
3 changes: 2 additions & 1 deletion be/src/runtime/query_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ QueryContext::QueryContext(TUniqueId query_id, int total_fragment_num, ExecEnv*
clock_gettime(CLOCK_MONOTONIC, &this->_query_arrival_timestamp);
register_memory_statistics();
register_cpu_statistics();
DorisMetrics::instance()->query_ctx_cnt->increment(1);
}

void QueryContext::_init_query_mem_tracker() {
Expand Down Expand Up @@ -183,7 +184,7 @@ QueryContext::~QueryContext() {
obj_pool.clear();

_exec_env->spill_stream_mgr()->async_cleanup_query(_query_id);

DorisMetrics::instance()->query_ctx_cnt->increment(-1);
LOG_INFO("Query {} deconstructed, {}", print_id(this->_query_id), mem_tracker_msg);
}

Expand Down
16 changes: 16 additions & 0 deletions be/src/util/doris_metrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,14 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(broker_file_open_reading, MetricUnit::FILESYS
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(local_file_open_writing, MetricUnit::FILESYSTEM);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(s3_file_open_writing, MetricUnit::FILESYSTEM);

DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(query_ctx_cnt, MetricUnit::NOUNIT);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_ctx_cnt, MetricUnit::NOUNIT);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_cnt, MetricUnit::NOUNIT);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_task_cnt, MetricUnit::NOUNIT);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_task_queued, MetricUnit::NOUNIT);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_task_running, MetricUnit::NOUNIT);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_task_submit_failed, MetricUnit::NOUNIT);

const std::string DorisMetrics::_s_registry_name = "doris_be";
const std::string DorisMetrics::_s_hook_name = "doris_metrics";

Expand Down Expand Up @@ -293,6 +301,14 @@ DorisMetrics::DorisMetrics() : _metric_registry(_s_registry_name) {
INT_GAUGE_METRIC_REGISTER(_server_metric_entity, broker_file_open_reading);
INT_GAUGE_METRIC_REGISTER(_server_metric_entity, local_file_open_writing);
INT_GAUGE_METRIC_REGISTER(_server_metric_entity, s3_file_open_writing);

INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, query_ctx_cnt);
INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_ctx_cnt);
INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_cnt);
INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_task_cnt);
INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_task_queued);
INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_task_running);
INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_task_submit_failed);
}

void DorisMetrics::initialize(bool init_system_metrics, const std::set<std::string>& disk_devices,
Expand Down
9 changes: 8 additions & 1 deletion be/src/util/doris_metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -228,11 +228,18 @@ class DorisMetrics {
UIntGauge* group_local_scan_thread_pool_queue_size = nullptr;
UIntGauge* group_local_scan_thread_pool_thread_num = nullptr;

IntAtomicCounter* query_ctx_cnt = nullptr;
IntAtomicCounter* scanner_ctx_cnt = nullptr;
IntAtomicCounter* scanner_cnt = nullptr;
IntAtomicCounter* scanner_task_cnt = nullptr;
IntAtomicCounter* scanner_task_queued = nullptr;
IntAtomicCounter* scanner_task_submit_failed = nullptr;
IntAtomicCounter* scanner_task_running = nullptr;

static DorisMetrics* instance() {
static DorisMetrics instance;
return &instance;
}

// not thread-safe, call before calling metrics
void initialize(
bool init_system_metrics = false,
Expand Down
3 changes: 3 additions & 0 deletions be/src/vec/exec/scan/scanner_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "common/status.h"
#include "pipeline/exec/scan_operator.h"
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
#include "runtime/runtime_state.h"
#include "util/uid_util.h"
#include "vec/core/block.h"
Expand Down Expand Up @@ -138,6 +139,8 @@ ScannerContext::ScannerContext(doris::RuntimeState* state, doris::vectorized::VS
: ScannerContext(state, output_tuple_desc, output_row_descriptor, scanners, limit_,
max_bytes_in_blocks_queue, num_parallel_instances, local_state) {
_parent = parent;

DorisMetrics::instance()->scanner_ctx_cnt->increment(1);
}

// After init function call, should not access _parent
Expand Down
4 changes: 3 additions & 1 deletion be/src/vec/exec/scan/scanner_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,13 @@ class ScanTask {
public:
ScanTask(std::weak_ptr<ScannerDelegate> delegate_scanner) : scanner(delegate_scanner) {
_query_thread_context.init();
DorisMetrics::instance()->scanner_task_cnt->increment(1);
}

~ScanTask() {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_thread_context.query_mem_tracker);
cached_blocks.clear();
DorisMetrics::instance()->scanner_task_cnt->increment(-1);
}

private:
Expand Down Expand Up @@ -116,6 +118,7 @@ class ScannerContext : public std::enable_shared_from_this<ScannerContext>,
// do nothing
}
block.reset();
DorisMetrics::instance()->scanner_ctx_cnt->increment(-1);
}
virtual Status init();

Expand Down Expand Up @@ -155,7 +158,6 @@ class ScannerContext : public std::enable_shared_from_this<ScannerContext>,

RuntimeState* state() { return _state; }
void incr_ctx_scheduling_time(int64_t num) { _scanner_ctx_sched_time->update(num); }

std::string parent_name();

virtual bool empty_in_queue(int id);
Expand Down
11 changes: 11 additions & 0 deletions be/src/vec/exec/scan/scanner_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#include "util/cpu_info.h"
#include "util/defer_op.h"
#include "util/doris_metrics.h"
#include "util/metrics.h"
#include "util/runtime_profile.h"
#include "util/thread.h"
#include "util/threadpool.h"
Expand Down Expand Up @@ -140,6 +141,11 @@ Status ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,

scanner_delegate->_scanner->start_wait_worker_timer();
auto s = ctx->thread_token->submit_func([scanner_ref = scan_task, ctx]() {
DorisMetrics::instance()->scanner_task_queued->increment(-1);
DorisMetrics::instance()->scanner_task_running->increment(1);
Defer metrics_defer(
[&] { DorisMetrics::instance()->scanner_task_running->increment(-1); });

auto status = [&] {
RETURN_IF_CATCH_EXCEPTION(_scanner_scan(ctx, scanner_ref));
return Status::OK();
Expand Down Expand Up @@ -171,6 +177,11 @@ Status ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
is_local ? _local_scan_thread_pool.get() : _remote_scan_thread_pool.get();
}
auto work_func = [scanner_ref = scan_task, ctx]() {
DorisMetrics::instance()->scanner_task_queued->increment(-1);
DorisMetrics::instance()->scanner_task_running->increment(1);
Defer metrics_defer(
[&] { DorisMetrics::instance()->scanner_task_running->increment(-1); });

auto status = [&] {
RETURN_IF_CATCH_EXCEPTION(_scanner_scan(ctx, scanner_ref));
return Status::OK();
Expand Down
10 changes: 9 additions & 1 deletion be/src/vec/exec/scan/scanner_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
#pragma once

#include <atomic>
#include <cstdint>
#include <memory>

#include "common/status.h"
#include "util/doris_metrics.h"
#include "util/threadpool.h"
#include "vec/exec/scan/vscanner.h"

Expand Down Expand Up @@ -135,7 +137,13 @@ class SimplifiedScanScheduler {

Status submit_scan_task(SimplifiedScanTask scan_task) {
if (!_is_stop) {
return _scan_thread_pool->submit_func([scan_task] { scan_task.scan_func(); });
DorisMetrics::instance()->scanner_task_queued->increment(1);
auto st = _scan_thread_pool->submit_func([scan_task] { scan_task.scan_func(); });
if (!st.ok()) {
DorisMetrics::instance()->scanner_task_queued->increment(-1);
DorisMetrics::instance()->scanner_task_submit_failed->increment(1);
}
return st;
} else {
return Status::InternalError<false>("scanner pool {} is shutdown.", _sched_name);
}
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/exec/scan/vscanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ VScanner::VScanner(RuntimeState* state, pipeline::ScanLocalStateBase* local_stat
_output_tuple_desc(_local_state->output_tuple_desc()),
_output_row_descriptor(_local_state->_parent->output_row_descriptor()) {
_total_rf_num = _local_state->runtime_filter_num();
DorisMetrics::instance()->scanner_cnt->increment(1);
}

Status VScanner::prepare(RuntimeState* state, const VExprContextSPtrs& conjuncts) {
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/exec/scan/vscanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class VScanner {
_origin_block.clear();
_common_expr_ctxs_push_down.clear();
_stale_expr_ctxs.clear();
DorisMetrics::instance()->scanner_cnt->increment(-1);
}

virtual Status init() { return Status::OK(); }
Expand Down

0 comments on commit e0c9cbd

Please sign in to comment.