Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
  • Loading branch information
morningman committed Sep 6, 2024
1 parent 9f46335 commit 76779ad
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 25 deletions.
58 changes: 34 additions & 24 deletions be/src/io/cache/cached_remote_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,26 +50,29 @@ CachedRemoteFileReader::CachedRemoteFileReader(FileReaderSPtr remote_file_reader
const FileReaderOptions& opts)
: _remote_file_reader(std::move(remote_file_reader)) {
_is_doris_table = opts.is_doris_table;
if (_is_doris_table) {
_cache_hash = BlockFileCache::hash(path().filename().native());
_cache = FileCacheFactory::instance()->get_by_path(_cache_hash);
if (config::enable_read_cache_file_directly) {
_cache_file_readers = _cache->get_blocks_by_key(_cache_hash);
}
} else {
// Use path and modification time to build cache key
std::string unique_path = fmt::format("{}:{}", path().native(), opts.mtime);
_cache_hash = BlockFileCache::hash(unique_path);
if (opts.cache_base_path.empty()) {
// if cache path is not specified by session variable, chose randomly.
_cache_type = opts.cache_type;
if (_cache_type == FileCachePolicy::FILE_BLOCK_CACHE) {
if (_is_doris_table) {
_cache_hash = BlockFileCache::hash(path().filename().native());
_cache = FileCacheFactory::instance()->get_by_path(_cache_hash);
if (config::enable_read_cache_file_directly) {
_cache_file_readers = _cache->get_blocks_by_key(_cache_hash);
}
} else {
// from query session variable: file_cache_base_path
_cache = FileCacheFactory::instance()->get_by_path(opts.cache_base_path);
if (_cache == nullptr) {
LOG(WARNING) << "Can't get cache from base path: " << opts.cache_base_path
<< ", using random instead.";
// Use path and modification time to build cache key
std::string unique_path = fmt::format("{}:{}", path().native(), opts.mtime);
_cache_hash = BlockFileCache::hash(unique_path);
if (opts.cache_base_path.empty()) {
// if cache path is not specified by session variable, chose randomly.
_cache = FileCacheFactory::instance()->get_by_path(_cache_hash);
} else {
// from query session variable: file_cache_base_path
_cache = FileCacheFactory::instance()->get_by_path(opts.cache_base_path);
if (_cache == nullptr) {
LOG(WARNING) << "Can't get cache from base path: " << opts.cache_base_path
<< ", using random instead.";
_cache = FileCacheFactory::instance()->get_by_path(_cache_hash);
}
}
}
}
Expand Down Expand Up @@ -111,6 +114,20 @@ std::pair<size_t, size_t> CachedRemoteFileReader::s_align_size(size_t offset, si
Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const IOContext* io_ctx) {
DCHECK(!closed());
ReadStatistics stats;
auto defer_func = [&](int*) {
if (io_ctx->file_cache_stats) {
_update_state(stats, io_ctx->file_cache_stats);
io::FileCacheProfile::instance().update(io_ctx->file_cache_stats);
}
};
if (_cache_type == FileCachePolicy::NO_CACHE) {
Status st = _remote_file_reader->read_at(offset, result, bytes_read, io_ctx);
stats.hit_cache = false;
stats.bytes_read += *bytes_read;
return st;
}

DCHECK(io_ctx);
if (offset > size()) {
return Status::InvalidArgument(
Expand All @@ -123,13 +140,6 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t*
*bytes_read = 0;
return Status::OK();
}
ReadStatistics stats;
auto defer_func = [&](int*) {
if (io_ctx->file_cache_stats) {
_update_state(stats, io_ctx->file_cache_stats);
io::FileCacheProfile::instance().update(io_ctx->file_cache_stats);
}
};
std::unique_ptr<int, decltype(defer_func)> defer((int*)0x01, std::move(defer_func));
stats.bytes_read += bytes_req;
if (config::enable_read_cache_file_directly) {
Expand Down
1 change: 1 addition & 0 deletions be/src/io/cache/cached_remote_file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ class CachedRemoteFileReader final : public FileReader {
BlockFileCache* _cache;
std::shared_mutex _mtx;
std::map<size_t, FileBlockSPtr> _cache_file_readers;
FileCachePolicy _cache_type;

struct ReadStatistics {
bool hit_cache = true;
Expand Down
2 changes: 1 addition & 1 deletion be/src/io/fs/file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ Result<FileReaderSPtr> create_cached_file_reader(FileReaderSPtr raw_reader,
const FileReaderOptions& opts) {
switch (opts.cache_type) {
case io::FileCachePolicy::NO_CACHE:
return raw_reader;
// return raw_reader;
case FileCachePolicy::FILE_BLOCK_CACHE:
return std::make_shared<CachedRemoteFileReader>(std::move(raw_reader), opts);
default:
Expand Down
6 changes: 6 additions & 0 deletions be/src/vec/exec/format/generic_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,15 @@ class Block;
class GenericReader : public ProfileCollector {
public:
GenericReader() : _push_down_agg_type(TPushAggOp::type::NONE) {}

void set_push_down_agg_type(TPushAggOp::type push_down_agg_type) {
_push_down_agg_type = push_down_agg_type;
}

void set_query_statistics(QueryStatistics* query_statistics) {
_query_statistics = query_statistics;
}

virtual Status get_next_block(Block* block, size_t* read_rows, bool* eof) = 0;

virtual Status get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type,
Expand Down Expand Up @@ -73,6 +78,7 @@ class GenericReader : public ProfileCollector {
/// Whether the underlying FileReader has filled the partition&missing columns
bool _fill_all_columns = false;
TPushAggOp::type _push_down_agg_type;
QueryStatistics* _query_statistics = nullptr;
};

} // namespace doris::vectorized
6 changes: 6 additions & 0 deletions be/src/vec/exec/format/parquet/vparquet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1060,6 +1060,12 @@ void ParquetReader::_collect_profile() {
}

void ParquetReader::_collect_profile_before_close() {
if (_query_statistics && _io_ctx && _io_ctx->file_cache_stats) {
_query_statistics->add_scan_bytes_from_local_storage(
_io_ctx->file_cache_stats->bytes_read_from_local);
_query_statistics->add_scan_bytes_from_remote_storage(
_io_ctx->file_cache_stats->bytes_read_from_remote);
}
_collect_profile();
}

Expand Down
1 change: 1 addition & 0 deletions be/src/vec/exec/scan/vfile_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -979,6 +979,7 @@ Status VFileScanner::_get_next_reader() {
_missing_cols.clear();
RETURN_IF_ERROR(_cur_reader->get_columns(&_name_to_col_type, &_missing_cols));
_cur_reader->set_push_down_agg_type(_get_push_down_agg_type());
_cur_reader->set_query_statistics(_query_statistics);
RETURN_IF_ERROR(_generate_fill_columns());
if (VLOG_NOTICE_IS_ON && !_missing_cols.empty() && _is_load) {
fmt::memory_buffer col_buf;
Expand Down

0 comments on commit 76779ad

Please sign in to comment.