Skip to content

Commit

Permalink
7
Browse files Browse the repository at this point in the history
  • Loading branch information
morningman committed Sep 7, 2024
1 parent 46aa22e commit cc42838
Show file tree
Hide file tree
Showing 13 changed files with 201 additions and 172 deletions.
2 changes: 0 additions & 2 deletions be/src/io/fs/buffered_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -355,8 +355,6 @@ struct PrefetchBuffer : std::enable_shared_from_this<PrefetchBuffer>, public Pro

size_t merge_small_ranges(size_t off, int range_index) const;

void _collect_profile_at_runtime() override {}

void _collect_profile_before_close() override;
};

Expand Down
1 change: 1 addition & 0 deletions be/src/vec/exec/format/csv/csv_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,7 @@ Status CsvReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {

*eof = (rows == 0);
*read_rows = rows;
_update_bytes_read(_io_ctx);

return Status::OK();
}
Expand Down
5 changes: 0 additions & 5 deletions be/src/vec/exec/format/generic_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,6 @@ class GenericReader : public ProfileCollector {
_push_down_agg_type = push_down_agg_type;
}

void set_query_statistics(QueryStatistics* query_statistics) {
_query_statistics = query_statistics;
}

virtual Status get_next_block(Block* block, size_t* read_rows, bool* eof) = 0;

virtual Status get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type,
Expand Down Expand Up @@ -78,7 +74,6 @@ class GenericReader : public ProfileCollector {
/// Whether the underlying FileReader has filled the partition&missing columns
bool _fill_all_columns = false;
TPushAggOp::type _push_down_agg_type;
QueryStatistics* _query_statistics = nullptr;
};

} // namespace doris::vectorized
1 change: 1 addition & 0 deletions be/src/vec/exec/format/json/new_json_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ Status NewJsonReader::get_next_block(Block* block, size_t* read_rows, bool* eof)
}
++(*read_rows);
}
_update_bytes_read(_io_ctx);

return Status::OK();
}
Expand Down
8 changes: 1 addition & 7 deletions be/src/vec/exec/format/orc/vorc_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,13 +185,6 @@ OrcReader::~OrcReader() {
}

void OrcReader::_collect_profile_before_close() {
if (_query_statistics && _io_ctx && _io_ctx->file_cache_stats) {
_query_statistics->add_scan_bytes_from_local_storage(
_io_ctx->file_cache_stats->bytes_read_from_local);
_query_statistics->add_scan_bytes_from_remote_storage(
_io_ctx->file_cache_stats->bytes_read_from_remote);
}

if (_profile != nullptr) {
COUNTER_UPDATE(_orc_profile.read_time, _statistics.fs_read_time);
COUNTER_UPDATE(_orc_profile.read_calls, _statistics.fs_read_calls);
Expand Down Expand Up @@ -1558,6 +1551,7 @@ Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
if (_string_dict_filter) {
RETURN_IF_ERROR(_string_dict_filter->get_status());
}
_update_bytes_read(_io_ctx);
return Status::OK();
}

Expand Down
1 change: 0 additions & 1 deletion be/src/vec/exec/format/orc/vorc_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,6 @@ class ORCFileInputStream : public orc::InputStream, public ProfileCollector {
std::vector<bool> selected_columns) override;

protected:
void _collect_profile_at_runtime() override {};
void _collect_profile_before_close() override;

private:
Expand Down
7 changes: 1 addition & 6 deletions be/src/vec/exec/format/parquet/vparquet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,7 @@ Status ParquetReader::get_next_block(Block* block, size_t* read_rows, bool* eof)
*eof = false;
}
}
_update_bytes_read(_io_ctx);
return Status::OK();
}

Expand Down Expand Up @@ -1060,12 +1061,6 @@ void ParquetReader::_collect_profile() {
}

void ParquetReader::_collect_profile_before_close() {
if (_query_statistics && _io_ctx && _io_ctx->file_cache_stats) {
_query_statistics->add_scan_bytes_from_local_storage(
_io_ctx->file_cache_stats->bytes_read_from_local);
_query_statistics->add_scan_bytes_from_remote_storage(
_io_ctx->file_cache_stats->bytes_read_from_remote);
}
_collect_profile();
}

Expand Down
20 changes: 13 additions & 7 deletions be/src/vec/exec/scan/new_olap_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -652,9 +652,6 @@ void NewOlapScanner::_collect_profile_before_close() {
COUNTER_UPDATE(Parent->_total_segment_counter, stats.total_segment_number);

// Update counters for NewOlapScanner
// Update counters from tablet reader's stats
auto& stats = _tablet_reader->stats();

auto* local_state = (pipeline::OlapScanLocalState*)_local_state;
INCR_COUNTER(local_state);

Expand All @@ -667,11 +664,20 @@ void NewOlapScanner::_collect_profile_before_close() {
tablet->query_scan_bytes->increment(_compressed_bytes_read);
tablet->query_scan_rows->increment(_raw_rows_read);
tablet->query_scan_count->increment(1);
}

void NewOlapScanner::_update_bytes_and_rows_read() {
VScanner::_update_bytes_and_rows_read();
if (_query_statistics) {
_query_statistics->add_scan_bytes_from_local_storage(
stats.file_cache_stats.bytes_read_from_local);
_query_statistics->add_scan_bytes_from_remote_storage(
stats.file_cache_stats.bytes_read_from_remote);
auto& stats = _tablet_reader->stats();
int64_t delta_local = stats.file_cache_stats.bytes_read_from_local - _bytes_read_from_local;
int64_t delta_remote =
stats.file_cache_stats.bytes_read_from_remote - _bytes_read_from_remote;
_query_statistics->add_scan_bytes_from_local_storage(delta_local);
_query_statistics->add_scan_bytes_from_remote_storage(delta_remote);
_query_statistics->add_scan_bytes(delta_local + delta_remote);
_bytes_read_from_local = stats.file_cache_stats.bytes_read_from_local;
_bytes_read_from_remote = stats.file_cache_stats.bytes_read_from_remote;
}
}

Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/exec/scan/new_olap_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ class NewOlapScanner : public VScanner {
Status _get_block_impl(RuntimeState* state, Block* block, bool* eos) override;
void _collect_profile_before_close() override;

void _update_bytes_read() override;

private:
void _update_realtime_counters();

Expand Down
15 changes: 14 additions & 1 deletion be/src/vec/exec/scan/vfile_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -979,7 +979,6 @@ Status VFileScanner::_get_next_reader() {
_missing_cols.clear();
RETURN_IF_ERROR(_cur_reader->get_columns(&_name_to_col_type, &_missing_cols));
_cur_reader->set_push_down_agg_type(_get_push_down_agg_type());
_cur_reader->set_query_statistics(_query_statistics);
RETURN_IF_ERROR(_generate_fill_columns());
if (VLOG_NOTICE_IS_ON && !_missing_cols.empty() && _is_load) {
fmt::memory_buffer col_buf;
Expand Down Expand Up @@ -1201,4 +1200,18 @@ void VFileScanner::_collect_profile_before_close() {
}
}

void VFileScanner::_update_bytes_and_rows_read() override;
VScanner::_update_bytes_and_rows_read();
if (_query_statistics && _io_ctx.get() && _io_ctx->file_cache_stats) {
int64_t delta_local = _io_ctx->file_cache_stats->bytes_read_from_local - _bytes_read_from_local;
int64_t delta_remote =
_io_ctx->file_cache_stats->bytes_read_from_remote - _bytes_read_from_remote;
_query_statistics->add_scan_bytes_from_local_storage(delta_local);
_query_statistics->add_scan_bytes_from_remote_storage(delta_remote);
_query_statistics->add_scan_bytes(delta_local + delta_remote);
_bytes_read_from_local = _io_ctx->file_cache_stats->bytes_read_from_local;
_bytes_read_from_remote = _io_ctx->file_cache_stats->bytes_read_from_remote;
}
}

} // namespace doris::vectorized
Loading

0 comments on commit cc42838

Please sign in to comment.