From cc42838c188ed037252ee8f54f5108abe347b50d Mon Sep 17 00:00:00 2001 From: morningman Date: Sat, 7 Sep 2024 23:36:01 +0800 Subject: [PATCH] 7 --- be/src/io/fs/buffered_reader.h | 2 - be/src/vec/exec/format/csv/csv_reader.cpp | 1 + be/src/vec/exec/format/generic_reader.h | 5 - .../vec/exec/format/json/new_json_reader.cpp | 1 + be/src/vec/exec/format/orc/vorc_reader.cpp | 8 +- be/src/vec/exec/format/orc/vorc_reader.h | 1 - .../exec/format/parquet/vparquet_reader.cpp | 7 +- be/src/vec/exec/scan/new_olap_scanner.cpp | 20 +- be/src/vec/exec/scan/new_olap_scanner.h | 2 + be/src/vec/exec/scan/vfile_scanner.cpp | 15 +- be/src/vec/exec/scan/vfile_scanner.h | 284 +++++++++--------- be/src/vec/exec/scan/vscanner.cpp | 15 +- be/src/vec/exec/scan/vscanner.h | 12 +- 13 files changed, 201 insertions(+), 172 deletions(-) diff --git a/be/src/io/fs/buffered_reader.h b/be/src/io/fs/buffered_reader.h index 70c8445db233e63..7afb8cfec22de86 100644 --- a/be/src/io/fs/buffered_reader.h +++ b/be/src/io/fs/buffered_reader.h @@ -355,8 +355,6 @@ struct PrefetchBuffer : std::enable_shared_from_this, public Pro size_t merge_small_ranges(size_t off, int range_index) const; - void _collect_profile_at_runtime() override {} - void _collect_profile_before_close() override; }; diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp b/be/src/vec/exec/format/csv/csv_reader.cpp index 82781cddf2465ea..29ef0a67394f67c 100644 --- a/be/src/vec/exec/format/csv/csv_reader.cpp +++ b/be/src/vec/exec/format/csv/csv_reader.cpp @@ -541,6 +541,7 @@ Status CsvReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { *eof = (rows == 0); *read_rows = rows; + _update_bytes_read(_io_ctx); return Status::OK(); } diff --git a/be/src/vec/exec/format/generic_reader.h b/be/src/vec/exec/format/generic_reader.h index 927ee154e6af42f..cdec5085a7f5b53 100644 --- a/be/src/vec/exec/format/generic_reader.h +++ b/be/src/vec/exec/format/generic_reader.h @@ -39,10 +39,6 @@ class GenericReader : public ProfileCollector { _push_down_agg_type = push_down_agg_type; } - void set_query_statistics(QueryStatistics* query_statistics) { - _query_statistics = query_statistics; - } - virtual Status get_next_block(Block* block, size_t* read_rows, bool* eof) = 0; virtual Status get_columns(std::unordered_map* name_to_type, @@ -78,7 +74,6 @@ class GenericReader : public ProfileCollector { /// Whether the underlying FileReader has filled the partition&missing columns bool _fill_all_columns = false; TPushAggOp::type _push_down_agg_type; - QueryStatistics* _query_statistics = nullptr; }; } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp b/be/src/vec/exec/format/json/new_json_reader.cpp index 2aff2cb4e7efdbd..bf0a74a03fc3e0b 100644 --- a/be/src/vec/exec/format/json/new_json_reader.cpp +++ b/be/src/vec/exec/format/json/new_json_reader.cpp @@ -232,6 +232,7 @@ Status NewJsonReader::get_next_block(Block* block, size_t* read_rows, bool* eof) } ++(*read_rows); } + _update_bytes_read(_io_ctx); return Status::OK(); } diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp b/be/src/vec/exec/format/orc/vorc_reader.cpp index e1b3f562436c21c..2ecf6a92ce597d1 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.cpp +++ b/be/src/vec/exec/format/orc/vorc_reader.cpp @@ -185,13 +185,6 @@ OrcReader::~OrcReader() { } void OrcReader::_collect_profile_before_close() { - if (_query_statistics && _io_ctx && _io_ctx->file_cache_stats) { - _query_statistics->add_scan_bytes_from_local_storage( - _io_ctx->file_cache_stats->bytes_read_from_local); - _query_statistics->add_scan_bytes_from_remote_storage( - _io_ctx->file_cache_stats->bytes_read_from_remote); - } - if (_profile != nullptr) { COUNTER_UPDATE(_orc_profile.read_time, _statistics.fs_read_time); COUNTER_UPDATE(_orc_profile.read_calls, _statistics.fs_read_calls); @@ -1558,6 +1551,7 @@ Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { if (_string_dict_filter) { RETURN_IF_ERROR(_string_dict_filter->get_status()); } + _update_bytes_read(_io_ctx); return Status::OK(); } diff --git a/be/src/vec/exec/format/orc/vorc_reader.h b/be/src/vec/exec/format/orc/vorc_reader.h index c0b372dfcea5ee0..77ca92446266ec4 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.h +++ b/be/src/vec/exec/format/orc/vorc_reader.h @@ -657,7 +657,6 @@ class ORCFileInputStream : public orc::InputStream, public ProfileCollector { std::vector selected_columns) override; protected: - void _collect_profile_at_runtime() override {}; void _collect_profile_before_close() override; private: diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index 3464b22e1d58cb4..9879cadcfe5cb27 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -604,6 +604,7 @@ Status ParquetReader::get_next_block(Block* block, size_t* read_rows, bool* eof) *eof = false; } } + _update_bytes_read(_io_ctx); return Status::OK(); } @@ -1060,12 +1061,6 @@ void ParquetReader::_collect_profile() { } void ParquetReader::_collect_profile_before_close() { - if (_query_statistics && _io_ctx && _io_ctx->file_cache_stats) { - _query_statistics->add_scan_bytes_from_local_storage( - _io_ctx->file_cache_stats->bytes_read_from_local); - _query_statistics->add_scan_bytes_from_remote_storage( - _io_ctx->file_cache_stats->bytes_read_from_remote); - } _collect_profile(); } diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp b/be/src/vec/exec/scan/new_olap_scanner.cpp index a01bfaa92d0fd3e..8044580c216c1d1 100644 --- a/be/src/vec/exec/scan/new_olap_scanner.cpp +++ b/be/src/vec/exec/scan/new_olap_scanner.cpp @@ -652,9 +652,6 @@ void NewOlapScanner::_collect_profile_before_close() { COUNTER_UPDATE(Parent->_total_segment_counter, stats.total_segment_number); // Update counters for NewOlapScanner - // Update counters from tablet reader's stats - auto& stats = _tablet_reader->stats(); - auto* local_state = (pipeline::OlapScanLocalState*)_local_state; INCR_COUNTER(local_state); @@ -667,11 +664,20 @@ void NewOlapScanner::_collect_profile_before_close() { tablet->query_scan_bytes->increment(_compressed_bytes_read); tablet->query_scan_rows->increment(_raw_rows_read); tablet->query_scan_count->increment(1); +} + +void NewOlapScanner::_update_bytes_and_rows_read() { + VScanner::_update_bytes_and_rows_read(); if (_query_statistics) { - _query_statistics->add_scan_bytes_from_local_storage( - stats.file_cache_stats.bytes_read_from_local); - _query_statistics->add_scan_bytes_from_remote_storage( - stats.file_cache_stats.bytes_read_from_remote); + auto& stats = _tablet_reader->stats(); + int64_t delta_local = stats.file_cache_stats.bytes_read_from_local - _bytes_read_from_local; + int64_t delta_remote = + stats.file_cache_stats.bytes_read_from_remote - _bytes_read_from_remote; + _query_statistics->add_scan_bytes_from_local_storage(delta_local); + _query_statistics->add_scan_bytes_from_remote_storage(delta_remote); + _query_statistics->add_scan_bytes(delta_local + delta_remote); + _bytes_read_from_local = stats.file_cache_stats.bytes_read_from_local; + _bytes_read_from_remote = stats.file_cache_stats.bytes_read_from_remote; } } diff --git a/be/src/vec/exec/scan/new_olap_scanner.h b/be/src/vec/exec/scan/new_olap_scanner.h index 52c664c026afd82..1c3b3edbee6180e 100644 --- a/be/src/vec/exec/scan/new_olap_scanner.h +++ b/be/src/vec/exec/scan/new_olap_scanner.h @@ -81,6 +81,8 @@ class NewOlapScanner : public VScanner { Status _get_block_impl(RuntimeState* state, Block* block, bool* eos) override; void _collect_profile_before_close() override; + void _update_bytes_read() override; + private: void _update_realtime_counters(); diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index 3c9dc086100c9d1..b407e45241f720e 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -979,7 +979,6 @@ Status VFileScanner::_get_next_reader() { _missing_cols.clear(); RETURN_IF_ERROR(_cur_reader->get_columns(&_name_to_col_type, &_missing_cols)); _cur_reader->set_push_down_agg_type(_get_push_down_agg_type()); - _cur_reader->set_query_statistics(_query_statistics); RETURN_IF_ERROR(_generate_fill_columns()); if (VLOG_NOTICE_IS_ON && !_missing_cols.empty() && _is_load) { fmt::memory_buffer col_buf; @@ -1201,4 +1200,18 @@ void VFileScanner::_collect_profile_before_close() { } } +void VFileScanner::_update_bytes_and_rows_read() override; +VScanner::_update_bytes_and_rows_read(); +if (_query_statistics && _io_ctx.get() && _io_ctx->file_cache_stats) { + int64_t delta_local = _io_ctx->file_cache_stats->bytes_read_from_local - _bytes_read_from_local; + int64_t delta_remote = + _io_ctx->file_cache_stats->bytes_read_from_remote - _bytes_read_from_remote; + _query_statistics->add_scan_bytes_from_local_storage(delta_local); + _query_statistics->add_scan_bytes_from_remote_storage(delta_remote); + _query_statistics->add_scan_bytes(delta_local + delta_remote); + _bytes_read_from_local = _io_ctx->file_cache_stats->bytes_read_from_local; + _bytes_read_from_remote = _io_ctx->file_cache_stats->bytes_read_from_remote; +} +} + } // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/vfile_scanner.h b/be/src/vec/exec/scan/vfile_scanner.h index 82fecd9e67524e2..e6444dd0f0b6bd9 100644 --- a/be/src/vec/exec/scan/vfile_scanner.h +++ b/be/src/vec/exec/scan/vfile_scanner.h @@ -91,143 +91,159 @@ class VFileScanner : public VScanner { void _collect_profile_before_close() override; -protected: - const TFileScanRangeParams* _params = nullptr; - std::shared_ptr _split_source; - bool _first_scan_range = false; - TFileRangeDesc _current_range; - - std::unique_ptr _cur_reader; - bool _cur_reader_eof; - std::unordered_map* _colname_to_value_range = nullptr; - // File source slot descriptors - std::vector _file_slot_descs; - // col names from _file_slot_descs - std::vector _file_col_names; - // column id to name map. Collect from FE slot descriptor. - std::unordered_map _col_id_name_map; - - // Partition source slot descriptors - std::vector _partition_slot_descs; - // Partition slot id to index in _partition_slot_descs - std::unordered_map _partition_slot_index_map; - // created from param.expr_of_dest_slot - // For query, it saves default value expr of all dest columns, or nullptr for NULL. - // For load, it saves conversion expr/default value of all dest columns. - VExprContextSPtrs _dest_vexpr_ctx; - // dest slot name to index in _dest_vexpr_ctx; - std::unordered_map _dest_slot_name_to_idx; - // col name to default value expr - std::unordered_map _col_default_value_ctx; - // the map values of dest slot id to src slot desc - // if there is not key of dest slot id in dest_sid_to_src_sid_without_trans, it will be set to nullptr - std::vector _src_slot_descs_order_by_dest; - // dest slot desc index to src slot desc index - std::unordered_map _dest_slot_to_src_slot_index; - - std::unordered_map _src_block_name_to_idx; - - // Get from GenericReader, save the existing columns in file to their type. - std::unordered_map _name_to_col_type; - // Get from GenericReader, save columns that required by scan but not exist in file. - // These columns will be filled by default value or null. - std::unordered_set _missing_cols; - - // The col names and types of source file, such as parquet, orc files. - std::vector _source_file_col_names; - std::vector _source_file_col_types; - std::map _source_file_col_name_types; - - // For load task - vectorized::VExprContextSPtrs _pre_conjunct_ctxs; - std::unique_ptr _src_row_desc; - std::unique_ptr _dest_row_desc; - // row desc for default exprs - std::unique_ptr _default_val_row_desc; - // owned by scan node - ShardedKVCache* _kv_cache = nullptr; - - bool _scanner_eof = false; - int _rows = 0; - int _num_of_columns_from_file; - - bool _src_block_mem_reuse = false; - bool _strict_mode; - - bool _src_block_init = false; - Block* _src_block_ptr = nullptr; - Block _src_block; - - VExprContextSPtrs _push_down_conjuncts; - - std::unique_ptr _file_cache_statistics; - std::unique_ptr _io_ctx; - - std::unordered_map> - _partition_col_descs; - std::unordered_map _missing_col_descs; + void _update_bytes_and_rows_read() override; + if (_query_statistics && io_ctx && io_ctx->file_cache_stats) { + int64_t delta_local = + io_ctx->file_cache_stats->bytes_read_from_local - _bytes_read_from_local; + int64_t delta_remote = + io_ctx->file_cache_stats->bytes_read_from_remote - _bytes_read_from_remote; + _query_statistics->add_scan_bytes_from_local_storage(delta_local); + _query_statistics->add_scan_bytes_from_remote_storage(delta_remote); + _bytes_read_from_local = io_ctx->file_cache_stats->bytes_read_from_local; + _bytes_read_from_remote = io_ctx->file_cache_stats->bytes_read_from_remote; + } +} + +protected : const TFileScanRangeParams* _params = nullptr; +std::shared_ptr _split_source; +bool _first_scan_range = false; +TFileRangeDesc _current_range; + +std::unique_ptr _cur_reader; +bool _cur_reader_eof; +std::unordered_map* _colname_to_value_range = nullptr; +// File source slot descriptors +std::vector _file_slot_descs; +// col names from _file_slot_descs +std::vector _file_col_names; +// column id to name map. Collect from FE slot descriptor. +std::unordered_map _col_id_name_map; + +// Partition source slot descriptors +std::vector _partition_slot_descs; +// Partition slot id to index in _partition_slot_descs +std::unordered_map _partition_slot_index_map; +// created from param.expr_of_dest_slot +// For query, it saves default value expr of all dest columns, or nullptr for NULL. +// For load, it saves conversion expr/default value of all dest columns. +VExprContextSPtrs _dest_vexpr_ctx; +// dest slot name to index in _dest_vexpr_ctx; +std::unordered_map _dest_slot_name_to_idx; +// col name to default value expr +std::unordered_map _col_default_value_ctx; +// the map values of dest slot id to src slot desc +// if there is not key of dest slot id in dest_sid_to_src_sid_without_trans, it will be set to nullptr +std::vector _src_slot_descs_order_by_dest; +// dest slot desc index to src slot desc index +std::unordered_map _dest_slot_to_src_slot_index; + +std::unordered_map _src_block_name_to_idx; + +// Get from GenericReader, save the existing columns in file to their type. +std::unordered_map _name_to_col_type; +// Get from GenericReader, save columns that required by scan but not exist in file. +// These columns will be filled by default value or null. +std::unordered_set _missing_cols; + +// The col names and types of source file, such as parquet, orc files. +std::vector _source_file_col_names; +std::vector _source_file_col_types; +std::map _source_file_col_name_types; + +// For load task +vectorized::VExprContextSPtrs _pre_conjunct_ctxs; +std::unique_ptr _src_row_desc; +std::unique_ptr _dest_row_desc; +// row desc for default exprs +std::unique_ptr _default_val_row_desc; +// owned by scan node +ShardedKVCache* _kv_cache = nullptr; + +bool _scanner_eof = false; +int _rows = 0; +int _num_of_columns_from_file; + +bool _src_block_mem_reuse = false; +bool _strict_mode; + +bool _src_block_init = false; +Block* _src_block_ptr = nullptr; +Block _src_block; + +VExprContextSPtrs _push_down_conjuncts; + +std::unique_ptr _file_cache_statistics; +std::unique_ptr _io_ctx; + +std::unordered_map> + _partition_col_descs; +std::unordered_map _missing_col_descs; private: - RuntimeProfile::Counter* _get_block_timer = nullptr; - RuntimeProfile::Counter* _open_reader_timer = nullptr; - RuntimeProfile::Counter* _cast_to_input_block_timer = nullptr; - RuntimeProfile::Counter* _fill_path_columns_timer = nullptr; - RuntimeProfile::Counter* _fill_missing_columns_timer = nullptr; - RuntimeProfile::Counter* _pre_filter_timer = nullptr; - RuntimeProfile::Counter* _convert_to_output_block_timer = nullptr; - RuntimeProfile::Counter* _empty_file_counter = nullptr; - RuntimeProfile::Counter* _not_found_file_counter = nullptr; - RuntimeProfile::Counter* _file_counter = nullptr; - RuntimeProfile::Counter* _has_fully_rf_file_counter = nullptr; - - const std::unordered_map* _col_name_to_slot_id = nullptr; - // single slot filter conjuncts - std::unordered_map _slot_id_to_filter_conjuncts; - // not single(zero or multi) slot filter conjuncts - VExprContextSPtrs _not_single_slot_filter_conjuncts; - // save the path of current scan range - std::string _current_range_path = ""; - - // Only for load scan node. - const TupleDescriptor* _input_tuple_desc = nullptr; - // If _input_tuple_desc is set, - // the _real_tuple_desc will point to _input_tuple_desc, - // otherwise, point to _output_tuple_desc - const TupleDescriptor* _real_tuple_desc = nullptr; +RuntimeProfile::Counter* _get_block_timer = nullptr; +RuntimeProfile::Counter* _open_reader_timer = nullptr; +RuntimeProfile::Counter* _cast_to_input_block_timer = nullptr; +RuntimeProfile::Counter* _fill_path_columns_timer = nullptr; +RuntimeProfile::Counter* _fill_missing_columns_timer = nullptr; +RuntimeProfile::Counter* _pre_filter_timer = nullptr; +RuntimeProfile::Counter* _convert_to_output_block_timer = nullptr; +RuntimeProfile::Counter* _empty_file_counter = nullptr; +RuntimeProfile::Counter* _not_found_file_counter = nullptr; +RuntimeProfile::Counter* _file_counter = nullptr; +RuntimeProfile::Counter* _has_fully_rf_file_counter = nullptr; + +const std::unordered_map* _col_name_to_slot_id = nullptr; +// single slot filter conjuncts +std::unordered_map _slot_id_to_filter_conjuncts; +// not single(zero or multi) slot filter conjuncts +VExprContextSPtrs _not_single_slot_filter_conjuncts; +// save the path of current scan range +std::string _current_range_path = ""; + +// Only for load scan node. +const TupleDescriptor* _input_tuple_desc = nullptr; +// If _input_tuple_desc is set, +// the _real_tuple_desc will point to _input_tuple_desc, +// otherwise, point to _output_tuple_desc +const TupleDescriptor* _real_tuple_desc = nullptr; private: - Status _init_expr_ctxes(); - Status _init_src_block(Block* block); - Status _check_output_block_types(); - Status _cast_to_input_block(Block* block); - Status _fill_columns_from_path(size_t rows); - Status _fill_missing_columns(size_t rows); - Status _pre_filter_src_block(); - Status _convert_to_output_block(Block* block); - Status _truncate_char_or_varchar_columns(Block* block); - void _truncate_char_or_varchar_column(Block* block, int idx, int len); - Status _generate_fill_columns(); - Status _handle_dynamic_block(Block* block); - Status _process_conjuncts_for_dict_filter(); - Status _process_late_arrival_conjuncts(); - void _get_slot_ids(VExpr* expr, std::vector* slot_ids); - - void _reset_counter() { - _counter.num_rows_unselected = 0; - _counter.num_rows_filtered = 0; - } - - TPushAggOp::type _get_push_down_agg_type() { return _local_state->get_push_down_agg_type(); } - - int64_t _get_push_down_count() { return _local_state->get_push_down_count(); } - - // enable the file meta cache only when - // 1. max_external_file_meta_cache_num is > 0 - // 2. the file number is less than 1/3 of cache's capacibility - // Otherwise, the cache miss rate will be high - bool _shoudl_enable_file_meta_cache() { - return config::max_external_file_meta_cache_num > 0 && - _split_source->num_scan_ranges() < config::max_external_file_meta_cache_num / 3; - } +Status _init_expr_ctxes(); +Status _init_src_block(Block* block); +Status _check_output_block_types(); +Status _cast_to_input_block(Block* block); +Status _fill_columns_from_path(size_t rows); +Status _fill_missing_columns(size_t rows); +Status _pre_filter_src_block(); +Status _convert_to_output_block(Block* block); +Status _truncate_char_or_varchar_columns(Block* block); +void _truncate_char_or_varchar_column(Block* block, int idx, int len); +Status _generate_fill_columns(); +Status _handle_dynamic_block(Block* block); +Status _process_conjuncts_for_dict_filter(); +Status _process_late_arrival_conjuncts(); +void _get_slot_ids(VExpr* expr, std::vector* slot_ids); + +void _reset_counter() { + _counter.num_rows_unselected = 0; + _counter.num_rows_filtered = 0; +} + +TPushAggOp::type _get_push_down_agg_type() { + return _local_state->get_push_down_agg_type(); +} + +int64_t _get_push_down_count() { + return _local_state->get_push_down_count(); +} + +// enable the file meta cache only when +// 1. max_external_file_meta_cache_num is > 0 +// 2. the file number is less than 1/3 of cache's capacibility +// Otherwise, the cache miss rate will be high +bool _shoudl_enable_file_meta_cache() { + return config::max_external_file_meta_cache_num > 0 && + _split_source->num_scan_ranges() < config::max_external_file_meta_cache_num / 3; +} }; } // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/vscanner.cpp b/be/src/vec/exec/scan/vscanner.cpp index 43d791caffae026..da9296472841140 100644 --- a/be/src/vec/exec/scan/vscanner.cpp +++ b/be/src/vec/exec/scan/vscanner.cpp @@ -102,8 +102,7 @@ Status VScanner::get_block(RuntimeState* state, Block* block, bool* eof) { } } - int64_t old_scan_rows = _num_rows_read; - int64_t old_scan_bytes = _num_byte_read; + _prev_num_rows_read = _num_rows_read; { do { // if step 2 filter all rows of block, and block will be reused to get next rows, @@ -122,7 +121,6 @@ Status VScanner::get_block(RuntimeState* state, Block* block, bool* eof) { break; } _num_rows_read += block->rows(); - _num_byte_read += block->allocated_bytes(); } // 2. Filter the output block finally. @@ -137,10 +135,7 @@ Status VScanner::get_block(RuntimeState* state, Block* block, bool* eof) { _num_rows_read < rows_read_threshold); } - if (_query_statistics) { - _query_statistics->add_scan_rows(_num_rows_read - old_scan_rows); - _query_statistics->add_scan_bytes(_num_byte_read - old_scan_bytes); - } + _update_bytes_and_rows_read(); if (state->is_cancelled()) { return Status::Cancelled("cancelled"); @@ -266,4 +261,10 @@ void VScanner::update_scan_cpu_timer() { } } +void VScanner::_update_bytes_and_rows_read() 【 if (_query_statistics) { + _query_statistics->add_scan_rows(_num_rows_read - _prev_num_rows_read); + _prev_num_rows_read = _num_rows_read; +} +} + } // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h index 29ad37e926984ea..3496da4ca757bc2 100644 --- a/be/src/vec/exec/scan/vscanner.h +++ b/be/src/vec/exec/scan/vscanner.h @@ -163,6 +163,10 @@ class VScanner { _conjuncts.clear(); } + // update the bytes and rows read at each round in query statistics. + // so that we can get runtime statistics for each query. + void _update_bytes_and_rows_read(); + RuntimeState* _state = nullptr; pipeline::ScanLocalStateBase* _local_state = nullptr; QueryStatistics* _query_statistics = nullptr; @@ -204,8 +208,12 @@ class VScanner { // num of rows read from scanner int64_t _num_rows_read = 0; - - int64_t _num_byte_read = 0; + // save the current _num_rows_read before next round, + // so that we can get delta rows between each round. + int64_t _prev_num_rows_read = 0; + // bytes read from local and remote fs + int64_t _bytes_read_from_local = 0; + int64_t _bytes_read_from_remote = 0; // num of rows return from scanner, after filter block int64_t _num_rows_return = 0;