Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
  • Loading branch information
morningman committed Sep 12, 2024
1 parent dac58d2 commit f7ce982
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 36 deletions.
62 changes: 27 additions & 35 deletions be/src/io/cache/cached_remote_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,29 +50,26 @@ CachedRemoteFileReader::CachedRemoteFileReader(FileReaderSPtr remote_file_reader
const FileReaderOptions& opts)
: _remote_file_reader(std::move(remote_file_reader)) {
_is_doris_table = opts.is_doris_table;
_cache_type = opts.cache_type;
if (_cache_type == FileCachePolicy::FILE_BLOCK_CACHE) {
if (_is_doris_table) {
_cache_hash = BlockFileCache::hash(path().filename().native());
if (_is_doris_table) {
_cache_hash = BlockFileCache::hash(path().filename().native());
_cache = FileCacheFactory::instance()->get_by_path(_cache_hash);
if (config::enable_read_cache_file_directly) {
_cache_file_readers = _cache->get_blocks_by_key(_cache_hash);
}
} else {
// Use path and modification time to build cache key
std::string unique_path = fmt::format("{}:{}", path().native(), opts.mtime);
_cache_hash = BlockFileCache::hash(unique_path);
if (opts.cache_base_path.empty()) {
// if cache path is not specified by session variable, chose randomly.
_cache = FileCacheFactory::instance()->get_by_path(_cache_hash);
if (config::enable_read_cache_file_directly) {
_cache_file_readers = _cache->get_blocks_by_key(_cache_hash);
}
} else {
// Use path and modification time to build cache key
std::string unique_path = fmt::format("{}:{}", path().native(), opts.mtime);
_cache_hash = BlockFileCache::hash(unique_path);
if (opts.cache_base_path.empty()) {
// if cache path is not specified by session variable, chose randomly.
// from query session variable: file_cache_base_path
_cache = FileCacheFactory::instance()->get_by_path(opts.cache_base_path);
if (_cache == nullptr) {
LOG(WARNING) << "Can't get cache from base path: " << opts.cache_base_path
<< ", using random instead.";
_cache = FileCacheFactory::instance()->get_by_path(_cache_hash);
} else {
// from query session variable: file_cache_base_path
_cache = FileCacheFactory::instance()->get_by_path(opts.cache_base_path);
if (_cache == nullptr) {
LOG(WARNING) << "Can't get cache from base path: " << opts.cache_base_path
<< ", using random instead.";
_cache = FileCacheFactory::instance()->get_by_path(_cache_hash);
}
}
}
}
Expand Down Expand Up @@ -114,21 +111,6 @@ std::pair<size_t, size_t> CachedRemoteFileReader::s_align_size(size_t offset, si
Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const IOContext* io_ctx) {
DCHECK(!closed());
ReadStatistics stats;
auto defer_func = [&](int*) {
if (io_ctx->file_cache_stats) {
_update_state(stats, io_ctx->file_cache_stats);
io::FileCacheProfile::instance().update(io_ctx->file_cache_stats);
}
};
std::unique_ptr<int, decltype(defer_func)> defer((int*)0x01, std::move(defer_func));

// If cache is not enabled, just call inner file reader to read data
if (_cache_type == FileCachePolicy::NO_CACHE) {
Status st = _remote_file_reader->read_at(offset, result, bytes_read, io_ctx);
return st;
}

DCHECK(io_ctx);
if (offset > size()) {
return Status::InvalidArgument(
Expand All @@ -141,6 +123,16 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t*
*bytes_read = 0;
return Status::OK();
}

ReadStatistics stats;
auto defer_func = [&](int*) {
if (io_ctx->file_cache_stats) {
_update_state(stats, io_ctx->file_cache_stats);
io::FileCacheProfile::instance().update(io_ctx->file_cache_stats);
}
};
std::unique_ptr<int, decltype(defer_func)> defer((int*)0x01, std::move(defer_func));

if (config::enable_read_cache_file_directly) {
// read directly
size_t need_read_size = bytes_req;
Expand Down
1 change: 0 additions & 1 deletion be/src/io/cache/cached_remote_file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ class CachedRemoteFileReader final : public FileReader {
BlockFileCache* _cache;
std::shared_mutex _mtx;
std::map<size_t, FileBlockSPtr> _cache_file_readers;
FileCachePolicy _cache_type;

// Used to record read/write timer and cache related metrics.
// These metrics will finally be saved in FileCacheStatistics.
Expand Down

0 comments on commit f7ce982

Please sign in to comment.