From 76779ad92c0dc965dcae19da76cd9741ee9962e1 Mon Sep 17 00:00:00 2001 From: morningman Date: Fri, 6 Sep 2024 23:43:02 +0800 Subject: [PATCH] 1 --- be/src/io/cache/cached_remote_file_reader.cpp | 58 +++++++++++-------- be/src/io/cache/cached_remote_file_reader.h | 1 + be/src/io/fs/file_reader.cpp | 2 +- be/src/vec/exec/format/generic_reader.h | 6 ++ .../exec/format/parquet/vparquet_reader.cpp | 6 ++ be/src/vec/exec/scan/vfile_scanner.cpp | 1 + 6 files changed, 49 insertions(+), 25 deletions(-) diff --git a/be/src/io/cache/cached_remote_file_reader.cpp b/be/src/io/cache/cached_remote_file_reader.cpp index 0a46c98390e70f8..d04d6a561db9e21 100644 --- a/be/src/io/cache/cached_remote_file_reader.cpp +++ b/be/src/io/cache/cached_remote_file_reader.cpp @@ -50,26 +50,29 @@ CachedRemoteFileReader::CachedRemoteFileReader(FileReaderSPtr remote_file_reader const FileReaderOptions& opts) : _remote_file_reader(std::move(remote_file_reader)) { _is_doris_table = opts.is_doris_table; - if (_is_doris_table) { - _cache_hash = BlockFileCache::hash(path().filename().native()); - _cache = FileCacheFactory::instance()->get_by_path(_cache_hash); - if (config::enable_read_cache_file_directly) { - _cache_file_readers = _cache->get_blocks_by_key(_cache_hash); - } - } else { - // Use path and modification time to build cache key - std::string unique_path = fmt::format("{}:{}", path().native(), opts.mtime); - _cache_hash = BlockFileCache::hash(unique_path); - if (opts.cache_base_path.empty()) { - // if cache path is not specified by session variable, chose randomly. + _cache_type = opts.cache_type; + if (_cache_type == FileCachePolicy::FILE_BLOCK_CACHE) { + if (_is_doris_table) { + _cache_hash = BlockFileCache::hash(path().filename().native()); _cache = FileCacheFactory::instance()->get_by_path(_cache_hash); + if (config::enable_read_cache_file_directly) { + _cache_file_readers = _cache->get_blocks_by_key(_cache_hash); + } } else { - // from query session variable: file_cache_base_path - _cache = FileCacheFactory::instance()->get_by_path(opts.cache_base_path); - if (_cache == nullptr) { - LOG(WARNING) << "Can't get cache from base path: " << opts.cache_base_path - << ", using random instead."; + // Use path and modification time to build cache key + std::string unique_path = fmt::format("{}:{}", path().native(), opts.mtime); + _cache_hash = BlockFileCache::hash(unique_path); + if (opts.cache_base_path.empty()) { + // if cache path is not specified by session variable, chose randomly. _cache = FileCacheFactory::instance()->get_by_path(_cache_hash); + } else { + // from query session variable: file_cache_base_path + _cache = FileCacheFactory::instance()->get_by_path(opts.cache_base_path); + if (_cache == nullptr) { + LOG(WARNING) << "Can't get cache from base path: " << opts.cache_base_path + << ", using random instead."; + _cache = FileCacheFactory::instance()->get_by_path(_cache_hash); + } } } } @@ -111,6 +114,20 @@ std::pair CachedRemoteFileReader::s_align_size(size_t offset, si Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read, const IOContext* io_ctx) { DCHECK(!closed()); + ReadStatistics stats; + auto defer_func = [&](int*) { + if (io_ctx->file_cache_stats) { + _update_state(stats, io_ctx->file_cache_stats); + io::FileCacheProfile::instance().update(io_ctx->file_cache_stats); + } + }; + if (_cache_type == FileCachePolicy::NO_CACHE) { + Status st = _remote_file_reader->read_at(offset, result, bytes_read, io_ctx); + stats.hit_cache = false; + stats.bytes_read += *bytes_read; + return st; + } + DCHECK(io_ctx); if (offset > size()) { return Status::InvalidArgument( @@ -123,13 +140,6 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t* *bytes_read = 0; return Status::OK(); } - ReadStatistics stats; - auto defer_func = [&](int*) { - if (io_ctx->file_cache_stats) { - _update_state(stats, io_ctx->file_cache_stats); - io::FileCacheProfile::instance().update(io_ctx->file_cache_stats); - } - }; std::unique_ptr defer((int*)0x01, std::move(defer_func)); stats.bytes_read += bytes_req; if (config::enable_read_cache_file_directly) { diff --git a/be/src/io/cache/cached_remote_file_reader.h b/be/src/io/cache/cached_remote_file_reader.h index b3efb83c0803c8e..979530cf81ea084 100644 --- a/be/src/io/cache/cached_remote_file_reader.h +++ b/be/src/io/cache/cached_remote_file_reader.h @@ -66,6 +66,7 @@ class CachedRemoteFileReader final : public FileReader { BlockFileCache* _cache; std::shared_mutex _mtx; std::map _cache_file_readers; + FileCachePolicy _cache_type; struct ReadStatistics { bool hit_cache = true; diff --git a/be/src/io/fs/file_reader.cpp b/be/src/io/fs/file_reader.cpp index 86596fd88f7020c..3c41e10e96d7f9c 100644 --- a/be/src/io/fs/file_reader.cpp +++ b/be/src/io/fs/file_reader.cpp @@ -42,7 +42,7 @@ Result create_cached_file_reader(FileReaderSPtr raw_reader, const FileReaderOptions& opts) { switch (opts.cache_type) { case io::FileCachePolicy::NO_CACHE: - return raw_reader; + // return raw_reader; case FileCachePolicy::FILE_BLOCK_CACHE: return std::make_shared(std::move(raw_reader), opts); default: diff --git a/be/src/vec/exec/format/generic_reader.h b/be/src/vec/exec/format/generic_reader.h index e32928e4b95de4c..927ee154e6af42f 100644 --- a/be/src/vec/exec/format/generic_reader.h +++ b/be/src/vec/exec/format/generic_reader.h @@ -34,10 +34,15 @@ class Block; class GenericReader : public ProfileCollector { public: GenericReader() : _push_down_agg_type(TPushAggOp::type::NONE) {} + void set_push_down_agg_type(TPushAggOp::type push_down_agg_type) { _push_down_agg_type = push_down_agg_type; } + void set_query_statistics(QueryStatistics* query_statistics) { + _query_statistics = query_statistics; + } + virtual Status get_next_block(Block* block, size_t* read_rows, bool* eof) = 0; virtual Status get_columns(std::unordered_map* name_to_type, @@ -73,6 +78,7 @@ class GenericReader : public ProfileCollector { /// Whether the underlying FileReader has filled the partition&missing columns bool _fill_all_columns = false; TPushAggOp::type _push_down_agg_type; + QueryStatistics* _query_statistics = nullptr; }; } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index 6c4e4983c70a43f..3464b22e1d58cb4 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -1060,6 +1060,12 @@ void ParquetReader::_collect_profile() { } void ParquetReader::_collect_profile_before_close() { + if (_query_statistics && _io_ctx && _io_ctx->file_cache_stats) { + _query_statistics->add_scan_bytes_from_local_storage( + _io_ctx->file_cache_stats->bytes_read_from_local); + _query_statistics->add_scan_bytes_from_remote_storage( + _io_ctx->file_cache_stats->bytes_read_from_remote); + } _collect_profile(); } diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index e2809433b3f0971..3c9dc086100c9d1 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -979,6 +979,7 @@ Status VFileScanner::_get_next_reader() { _missing_cols.clear(); RETURN_IF_ERROR(_cur_reader->get_columns(&_name_to_col_type, &_missing_cols)); _cur_reader->set_push_down_agg_type(_get_push_down_agg_type()); + _cur_reader->set_query_statistics(_query_statistics); RETURN_IF_ERROR(_generate_fill_columns()); if (VLOG_NOTICE_IS_ON && !_missing_cols.empty() && _is_load) { fmt::memory_buffer col_buf;