From c71514dd4060b1855fab3ca125534a46b30bb521 Mon Sep 17 00:00:00 2001 From: zhengyu Date: Thu, 12 Sep 2024 17:42:34 +0800 Subject: [PATCH] [enhancement](cloud) clarify codes and make TTL expiration work after abnormal cache type transition (#40226) current TTL embeds the expiration time and type into filename and path. Maintaining both is buggy for lack of atomicity. I simplify this by using only expiration time to infer the type so that we need only expiration time. Signed-off-by: freemandealer --- be/src/common/config.cpp | 2 + be/src/common/config.h | 2 + be/src/io/cache/block_file_cache.cpp | 49 ++-- be/src/io/cache/block_file_cache.h | 12 +- be/src/io/cache/file_block.cpp | 40 +-- be/src/io/cache/file_block.h | 4 +- be/src/io/cache/file_cache_storage.h | 4 +- be/src/io/cache/fs_file_cache_storage.cpp | 256 +++++++++++------- be/src/io/cache/fs_file_cache_storage.h | 15 +- .../olap/rowset/segment_v2/segment_writer.cpp | 3 +- be/test/io/cache/block_file_cache_test.cpp | 246 ++++++++++++----- be/test/io/fs/s3_file_writer_test.cpp | 4 +- 12 files changed, 420 insertions(+), 217 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 9a554230ce7624..06144dd3142b25 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -997,6 +997,8 @@ DEFINE_mInt64(file_cache_ttl_valid_check_interval_second, "0"); // zero for not // If true, evict the ttl cache using LRU when full. // Otherwise, only expiration can evict ttl and new data won't add to cache when full. DEFINE_Bool(enable_ttl_cache_evict_using_lru, "true"); +// rename ttl filename to new format during read, with some performance cost +DEFINE_mBool(translate_to_new_ttl_format_during_read, "false"); DEFINE_mInt32(index_cache_entry_stay_time_after_lookup_s, "1800"); DEFINE_mInt32(inverted_index_cache_stale_sweep_time_sec, "600"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 5bca9ac280a348..cc26f52abbab23 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1046,6 +1046,8 @@ DECLARE_mInt64(file_cache_ttl_valid_check_interval_second); // If true, evict the ttl cache using LRU when full. // Otherwise, only expiration can evict ttl and new data won't add to cache when full. DECLARE_Bool(enable_ttl_cache_evict_using_lru); +// rename ttl filename to new format during read, with some performance cost +DECLARE_Bool(translate_to_new_ttl_format_during_read); // inverted index searcher cache // cache entry stay time after lookup diff --git a/be/src/io/cache/block_file_cache.cpp b/be/src/io/cache/block_file_cache.cpp index c253130cf3b485..f5c0a7c79bff49 100644 --- a/be/src/io/cache/block_file_cache.cpp +++ b/be/src/io/cache/block_file_cache.cpp @@ -261,7 +261,7 @@ FileBlocks BlockFileCache::get_impl(const UInt128Wrapper& hash, const CacheConte /// find list [block1, ..., blockN] of blocks which intersect with given range. auto it = _files.find(hash); if (it == _files.end()) { - if (_lazy_open_done) { + if (_async_open_done) { return {}; } FileCacheKey key; @@ -285,11 +285,10 @@ FileBlocks BlockFileCache::get_impl(const UInt128Wrapper& hash, const CacheConte if (!st.ok()) { LOG_WARNING("Failed to change key meta").error(st); } - } - for (auto& [_, cell] : file_blocks) { + FileCacheType origin_type = cell.file_block->cache_type(); if (origin_type == FileCacheType::TTL) continue; - Status st = cell.file_block->change_cache_type_by_mgr(FileCacheType::TTL); + st = cell.file_block->change_cache_type_between_ttl_and_others(FileCacheType::TTL); if (st.ok()) { auto& queue = get_queue(origin_type); queue.remove(cell.queue_iterator.value(), cache_lock); @@ -309,6 +308,7 @@ FileBlocks BlockFileCache::get_impl(const UInt128Wrapper& hash, const CacheConte _time_to_key.insert(std::make_pair(context.expiration_time, hash)); } if (auto iter = _key_to_time.find(hash); + // TODO(zhengyu): Why the hell the type is NORMAL while context set expiration_time? (context.cache_type == FileCacheType::NORMAL || context.cache_type == FileCacheType::TTL) && iter != _key_to_time.end() && iter->second != context.expiration_time) { // remove from _time_to_key @@ -330,7 +330,8 @@ FileBlocks BlockFileCache::get_impl(const UInt128Wrapper& hash, const CacheConte for (auto& [_, cell] : file_blocks) { auto cache_type = cell.file_block->cache_type(); if (cache_type != FileCacheType::TTL) continue; - auto st = cell.file_block->change_cache_type_by_mgr(FileCacheType::NORMAL); + auto st = cell.file_block->change_cache_type_between_ttl_and_others( + FileCacheType::NORMAL); if (st.ok()) { if (config::enable_ttl_cache_evict_using_lru) { auto& ttl_queue = get_queue(FileCacheType::TTL); @@ -699,9 +700,9 @@ BlockFileCache::FileBlockCell* BlockFileCache::add_cell(const UInt128Wrapper& ha FileBlockCell cell(std::make_shared(key, size, this, state), cache_lock); Status st; if (context.expiration_time == 0 && context.cache_type == FileCacheType::TTL) { - st = cell.file_block->change_cache_type_by_mgr(FileCacheType::NORMAL); + st = cell.file_block->change_cache_type_between_ttl_and_others(FileCacheType::NORMAL); } else if (context.cache_type != FileCacheType::TTL && context.expiration_time != 0) { - st = cell.file_block->change_cache_type_by_mgr(FileCacheType::TTL); + st = cell.file_block->change_cache_type_between_ttl_and_others(FileCacheType::TTL); } if (!st.ok()) { LOG(WARNING) << "Cannot change cache type. expiration_time=" << context.expiration_time @@ -912,8 +913,8 @@ bool BlockFileCache::try_reserve_for_ttl(size_t size, std::lock_guard& cache_lock) { - if (!_lazy_open_done) { - return try_reserve_for_lazy_load(size, cache_lock); + if (!_async_open_done) { + return try_reserve_during_async_load(size, cache_lock); } // use this strategy in scenarios where there is insufficient disk capacity or insufficient number of inodes remaining @@ -1022,10 +1023,10 @@ bool BlockFileCache::remove_if_ttl_file_unlock(const UInt128Wrapper& file_key, b LOG_WARNING("Failed to update expiration time to 0").error(st); } } - } - for (auto& [_, cell] : _files[file_key]) { + if (cell.file_block->cache_type() == FileCacheType::NORMAL) continue; - auto st = cell.file_block->change_cache_type_by_mgr(FileCacheType::NORMAL); + auto st = cell.file_block->change_cache_type_between_ttl_and_others( + FileCacheType::NORMAL); if (st.ok()) { if (config::enable_ttl_cache_evict_using_lru) { ttl_queue.remove(cell.queue_iterator.value(), cache_lock); @@ -1396,6 +1397,21 @@ std::string BlockFileCache::dump_structure_unlocked(const UInt128Wrapper& hash, return result.str(); } +std::string BlockFileCache::dump_single_cache_type(const UInt128Wrapper& hash, size_t offset) { + std::lock_guard cache_lock(_mutex); + return dump_single_cache_type_unlocked(hash, offset, cache_lock); +} + +std::string BlockFileCache::dump_single_cache_type_unlocked(const UInt128Wrapper& hash, + size_t offset, + std::lock_guard&) { + std::stringstream result; + const auto& cells_by_offset = _files[hash]; + const auto& cell = cells_by_offset.find(offset); + + return cache_type_to_string(cell->second.file_block->cache_type()); +} + void BlockFileCache::change_cache_type(const UInt128Wrapper& hash, size_t offset, FileCacheType new_type, std::lock_guard& cache_lock) { @@ -1621,11 +1637,10 @@ void BlockFileCache::modify_expiration_time(const UInt128Wrapper& hash, if (!st.ok()) { LOG_WARNING("").error(st); } - } - for (auto& [_, cell] : iter->second) { + FileCacheType origin_type = cell.file_block->cache_type(); if (origin_type == FileCacheType::TTL) continue; - auto st = cell.file_block->change_cache_type_by_mgr(FileCacheType::TTL); + st = cell.file_block->change_cache_type_between_ttl_and_others(FileCacheType::TTL); if (st.ok()) { auto& queue = get_queue(origin_type); queue.remove(cell.queue_iterator.value(), cache_lock); @@ -1672,8 +1687,8 @@ BlockFileCache::get_hot_blocks_meta(const UInt128Wrapper& hash) const { return blocks_meta; } -bool BlockFileCache::try_reserve_for_lazy_load(size_t size, - std::lock_guard& cache_lock) { +bool BlockFileCache::try_reserve_during_async_load(size_t size, + std::lock_guard& cache_lock) { size_t removed_size = 0; size_t normal_queue_size = _normal_queue.get_capacity(cache_lock); size_t disposable_queue_size = _disposable_queue.get_capacity(cache_lock); diff --git a/be/src/io/cache/block_file_cache.h b/be/src/io/cache/block_file_cache.h index cd44e77eaa381f..def354b155b2fe 100644 --- a/be/src/io/cache/block_file_cache.h +++ b/be/src/io/cache/block_file_cache.h @@ -104,8 +104,9 @@ class BlockFileCache { std::string reset_capacity(size_t new_capacity); std::map get_blocks_by_key(const UInt128Wrapper& hash); - /// For debug. + /// For debug and UT std::string dump_structure(const UInt128Wrapper& hash); + std::string dump_single_cache_type(const UInt128Wrapper& hash, size_t offset); [[nodiscard]] size_t get_used_cache_size(FileCacheType type) const; @@ -130,7 +131,7 @@ class BlockFileCache { [[nodiscard]] std::vector> get_hot_blocks_meta(const UInt128Wrapper& hash) const; - [[nodiscard]] bool get_lazy_open_success() const { return _lazy_open_done; } + [[nodiscard]] bool get_async_open_success() const { return _async_open_done; } BlockFileCache& operator=(const BlockFileCache&) = delete; BlockFileCache(const BlockFileCache&) = delete; @@ -338,7 +339,7 @@ class BlockFileCache { const CacheContext& context, size_t offset, size_t size, std::lock_guard& cache_lock); - bool try_reserve_for_lazy_load(size_t size, std::lock_guard& cache_lock); + bool try_reserve_during_async_load(size_t size, std::lock_guard& cache_lock); std::vector get_other_cache_type(FileCacheType cur_cache_type); @@ -358,6 +359,9 @@ class BlockFileCache { std::string dump_structure_unlocked(const UInt128Wrapper& hash, std::lock_guard& cache_lock); + std::string dump_single_cache_type_unlocked(const UInt128Wrapper& hash, size_t offset, + std::lock_guard& cache_lock); + void fill_holes_with_empty_file_blocks(FileBlocks& file_blocks, const UInt128Wrapper& hash, const CacheContext& context, const FileBlock::Range& range, @@ -413,7 +417,7 @@ class BlockFileCache { std::mutex _close_mtx; std::condition_variable _close_cv; std::thread _cache_background_thread; - std::atomic_bool _lazy_open_done {false}; + std::atomic_bool _async_open_done {false}; bool _async_clear_file_cache {false}; // disk space or inode is less than the specified value bool _disk_resource_limit_mode {false}; diff --git a/be/src/io/cache/file_block.cpp b/be/src/io/cache/file_block.cpp index 6586dcf589bdde..b015cbd61110d2 100644 --- a/be/src/io/cache/file_block.cpp +++ b/be/src/io/cache/file_block.cpp @@ -161,32 +161,41 @@ Status FileBlock::read(Slice buffer, size_t read_offset) { return _mgr->_storage->read(_key, read_offset, buffer); } -Status FileBlock::change_cache_type_by_mgr(FileCacheType new_type) { +Status FileBlock::change_cache_type_between_ttl_and_others(FileCacheType new_type) { std::lock_guard block_lock(_mutex); DCHECK(new_type != _key.meta.type); - if (_download_state == State::DOWNLOADED) { - KeyMeta new_meta; - new_meta.expiration_time = _key.meta.expiration_time; - new_meta.type = new_type; - auto st = _mgr->_storage->change_key_meta(_key, new_meta); - TEST_SYNC_POINT_CALLBACK("FileBlock::change_cache_type", &st); - if (!st.ok()) return st; + bool expr = (new_type == FileCacheType::TTL || _key.meta.type == FileCacheType::TTL); + if (!expr) { + LOG(WARNING) << "none of the cache type is TTL" + << ", hash: " << _key.hash.to_string() << ", offset: " << _key.offset + << ", new type: " << BlockFileCache::cache_type_to_string(new_type) + << ", old type: " << BlockFileCache::cache_type_to_string(_key.meta.type); } + DCHECK(expr); + + // change cache type between TTL to others don't need to rename the filename suffix _key.meta.type = new_type; return Status::OK(); } -Status FileBlock::change_cache_type_self(FileCacheType new_type) { +Status FileBlock::change_cache_type_between_normal_and_index(FileCacheType new_type) { std::lock_guard cache_lock(_mgr->_mutex); std::lock_guard block_lock(_mutex); + bool expr = (new_type != FileCacheType::TTL && _key.meta.type != FileCacheType::TTL); + if (!expr) { + LOG(WARNING) << "one of the cache type is TTL" + << ", hash: " << _key.hash.to_string() << ", offset: " << _key.offset + << ", new type: " << BlockFileCache::cache_type_to_string(new_type) + << ", old type: " << BlockFileCache::cache_type_to_string(_key.meta.type); + } + DCHECK(expr); if (_key.meta.type == FileCacheType::TTL || new_type == _key.meta.type) { return Status::OK(); } if (_download_state == State::DOWNLOADED) { - KeyMeta new_meta; - new_meta.expiration_time = _key.meta.expiration_time; - new_meta.type = new_type; - RETURN_IF_ERROR(_mgr->_storage->change_key_meta(_key, new_meta)); + Status st; + TEST_SYNC_POINT_CALLBACK("FileBlock::change_cache_type", &st); + RETURN_IF_ERROR(_mgr->_storage->change_key_meta_type(_key, new_type)); } _mgr->change_cache_type(_key.hash, _block_range.left, new_type, cache_lock); _key.meta.type = new_type; @@ -196,10 +205,7 @@ Status FileBlock::change_cache_type_self(FileCacheType new_type) { Status FileBlock::update_expiration_time(uint64_t expiration_time) { std::lock_guard block_lock(_mutex); if (_download_state == State::DOWNLOADED) { - KeyMeta new_meta; - new_meta.expiration_time = expiration_time; - new_meta.type = _key.meta.type; - auto st = _mgr->_storage->change_key_meta(_key, new_meta); + auto st = _mgr->_storage->change_key_meta_expiration(_key, expiration_time); if (!st.ok() && !st.is()) { return st; } diff --git a/be/src/io/cache/file_block.h b/be/src/io/cache/file_block.h index b4044786dc73df..6e49a597b7b95c 100644 --- a/be/src/io/cache/file_block.h +++ b/be/src/io/cache/file_block.h @@ -115,9 +115,9 @@ class FileBlock { std::string get_info_for_log() const; - [[nodiscard]] Status change_cache_type_by_mgr(FileCacheType new_type); + [[nodiscard]] Status change_cache_type_between_ttl_and_others(FileCacheType new_type); - [[nodiscard]] Status change_cache_type_self(FileCacheType new_type); + [[nodiscard]] Status change_cache_type_between_normal_and_index(FileCacheType new_type); [[nodiscard]] Status update_expiration_time(uint64_t expiration_time); diff --git a/be/src/io/cache/file_cache_storage.h b/be/src/io/cache/file_cache_storage.h index 64639356f148bf..4120fe0ca5af78 100644 --- a/be/src/io/cache/file_cache_storage.h +++ b/be/src/io/cache/file_cache_storage.h @@ -40,7 +40,9 @@ class FileCacheStorage { // remove the block virtual Status remove(const FileCacheKey& key) = 0; // change the block meta - virtual Status change_key_meta(const FileCacheKey& key, const KeyMeta& new_meta) = 0; + virtual Status change_key_meta_type(const FileCacheKey& key, const FileCacheType type) = 0; + virtual Status change_key_meta_expiration(const FileCacheKey& key, + const uint64_t expiration) = 0; // use when lazy load cache virtual void load_blocks_directly_unlocked(BlockFileCache* _mgr, const FileCacheKey& key, std::lock_guard& cache_lock) {} diff --git a/be/src/io/cache/fs_file_cache_storage.cpp b/be/src/io/cache/fs_file_cache_storage.cpp index 34e62d6fe6f672..d2662ba36d013e 100644 --- a/be/src/io/cache/fs_file_cache_storage.cpp +++ b/be/src/io/cache/fs_file_cache_storage.cpp @@ -100,10 +100,10 @@ size_t FDCache::file_reader_cache_size() { Status FSFileCacheStorage::init(BlockFileCache* _mgr) { _cache_base_path = _mgr->_cache_base_path; - RETURN_IF_ERROR(rebuild_data_structure()); + RETURN_IF_ERROR(upgrade_cache_dir_if_necessary()); _cache_background_load_thread = std::thread([this, mgr = _mgr]() { load_cache_info_into_memory(mgr); - mgr->_lazy_open_done = true; + mgr->_async_open_done = true; LOG_INFO("FileCache {} lazy load done.", _cache_base_path); }); return Status::OK(); @@ -159,7 +159,27 @@ Status FSFileCacheStorage::read(const FileCacheKey& key, size_t value_offset, Sl std::string file = get_path_in_local_cache(get_path_in_local_cache(key.hash, key.meta.expiration_time), key.offset, key.meta.type); - RETURN_IF_ERROR(fs->open_file(file, &file_reader)); + Status s = fs->open_file(file, &file_reader); + if (!s.ok()) { + if (!s.is() || key.meta.type != FileCacheType::TTL) { + return s; + } + std::string file_old_format = get_path_in_local_cache_old_ttl_format( + get_path_in_local_cache(key.hash, key.meta.expiration_time), key.offset, + key.meta.type); + if (config::translate_to_new_ttl_format_during_read) { + // try to rename the file with old ttl format to new and retry + VLOG(7) << "try to rename the file with old ttl format to new and retry" + << " oldformat=" << file_old_format << " original=" << file; + RETURN_IF_ERROR(fs->rename(file_old_format, file)); + RETURN_IF_ERROR(fs->open_file(file, &file_reader)); + } else { + // try to open the file with old ttl format + VLOG(7) << "try to open the file with old ttl format" + << " oldformat=" << file_old_format << " original=" << file; + RETURN_IF_ERROR(fs->open_file(file_old_format, &file_reader)); + } + } FDCache::instance()->insert_file_reader(fd_key, file_reader); } size_t bytes_read = 0; @@ -173,6 +193,19 @@ Status FSFileCacheStorage::remove(const FileCacheKey& key) { std::string file = get_path_in_local_cache(dir, key.offset, key.meta.type); FDCache::instance()->remove_file_reader(std::make_pair(key.hash, key.offset)); RETURN_IF_ERROR(fs->delete_file(file)); + // return OK not means the file is deleted, it may be not exist + // So for TTL, we make sure the old format will be removed well + if (key.meta.type == FileCacheType::TTL) { + bool exists {false}; + // try to detect the file with old ttl format + file = get_path_in_local_cache_old_ttl_format(dir, key.offset, key.meta.type); + RETURN_IF_ERROR(fs->exists(file, &exists)); + if (exists) { + VLOG(7) << "try to remove the file with old ttl format" + << " file=" << file; + RETURN_IF_ERROR(fs->delete_file(file)); + } + } std::vector files; bool exists {false}; RETURN_IF_ERROR(fs->list(dir, true, &files, &exists)); @@ -183,29 +216,58 @@ Status FSFileCacheStorage::remove(const FileCacheKey& key) { return Status::OK(); } -Status FSFileCacheStorage::change_key_meta(const FileCacheKey& key, const KeyMeta& new_meta) { - // TTL change - if (key.meta.expiration_time != new_meta.expiration_time) { +Status FSFileCacheStorage::change_key_meta_type(const FileCacheKey& key, const FileCacheType type) { + // file operation + if (key.meta.type != type) { + // TTL type file dose not need to change the suffix + bool expr = (key.meta.type != FileCacheType::TTL && type != FileCacheType::TTL); + if (!expr) { + LOG(WARNING) << "TTL type file dose not need to change the suffix" + << " key=" << key.hash.to_string() << " offset=" << key.offset + << " old_type=" << BlockFileCache::cache_type_to_string(key.meta.type) + << " new_type=" << BlockFileCache::cache_type_to_string(type); + } + DCHECK(expr); + std::string dir = get_path_in_local_cache(key.hash, key.meta.expiration_time); + std::string original_file = get_path_in_local_cache(dir, key.offset, key.meta.type); + std::string new_file = get_path_in_local_cache(dir, key.offset, type); + RETURN_IF_ERROR(fs->rename(original_file, new_file)); + } + return Status::OK(); +} + +Status FSFileCacheStorage::change_key_meta_expiration(const FileCacheKey& key, + const uint64_t expiration) { + // directory operation + if (key.meta.expiration_time != expiration) { std::string original_dir = get_path_in_local_cache(key.hash, key.meta.expiration_time); - std::string new_dir = get_path_in_local_cache(key.hash, new_meta.expiration_time); + std::string new_dir = get_path_in_local_cache(key.hash, expiration); // It will be concurrent, but we don't care who rename Status st = fs->rename(original_dir, new_dir); if (!st.ok() && !st.is()) { return st; } - } else if (key.meta.type != new_meta.type) { - std::string dir = get_path_in_local_cache(key.hash, key.meta.expiration_time); - std::string original_file = get_path_in_local_cache(dir, key.offset, key.meta.type); - std::string new_file = get_path_in_local_cache(dir, key.offset, new_meta.type); - RETURN_IF_ERROR(fs->rename(original_file, new_file)); } return Status::OK(); } std::string FSFileCacheStorage::get_path_in_local_cache(const std::string& dir, size_t offset, FileCacheType type, bool is_tmp) { - return Path(dir) / (std::to_string(offset) + - (is_tmp ? "_tmp" : BlockFileCache::cache_type_to_string(type))); + if (is_tmp) { + return Path(dir) / (std::to_string(offset) + "_tmp"); + } else if (type == FileCacheType::TTL) { + return Path(dir) / std::to_string(offset); + } else { + return Path(dir) / (std::to_string(offset) + BlockFileCache::cache_type_to_string(type)); + } +} + +std::string FSFileCacheStorage::get_path_in_local_cache_old_ttl_format(const std::string& dir, + size_t offset, + FileCacheType type, + bool is_tmp) { + DCHECK(type == FileCacheType::TTL); + return Path(dir) / (std::to_string(offset) + BlockFileCache::cache_type_to_string(type)); } std::string FSFileCacheStorage::get_path_in_local_cache(const UInt128Wrapper& value, @@ -227,7 +289,7 @@ std::string FSFileCacheStorage::get_path_in_local_cache(const UInt128Wrapper& va } } -Status FSFileCacheStorage::rebuild_data_structure() const { +Status FSFileCacheStorage::upgrade_cache_dir_if_necessary() const { /// version 1.0: cache_base_path / key / offset /// version 2.0: cache_base_path / key_prefix / key / offset std::string version; @@ -338,6 +400,72 @@ std::string FSFileCacheStorage::get_version_path() const { return Path(_cache_base_path) / "version"; } +Status FSFileCacheStorage::parse_filename_suffix_to_cache_type( + const std::shared_ptr& fs, const Path& file_path, long expiration_time, + size_t size, size_t* offset, bool* is_tmp, FileCacheType* cache_type) const { + std::error_code ec; + std::string offset_with_suffix = file_path.native(); + auto delim_pos1 = offset_with_suffix.find('_'); + bool parsed = true; + + try { + if (delim_pos1 == std::string::npos) { + // same as type "normal" + *offset = stoull(offset_with_suffix); + } else { + *offset = stoull(offset_with_suffix.substr(0, delim_pos1)); + std::string suffix = offset_with_suffix.substr(delim_pos1 + 1); + // not need persistent anymore + // if suffix is equals to "tmp", it should be removed too. + if (suffix == "tmp") [[unlikely]] { + *is_tmp = true; + } else { + *cache_type = BlockFileCache::string_to_cache_type(suffix); + } + } + } catch (...) { + parsed = false; + } + + // File in dir with expiration time > 0 should all be TTL type + // while expiration time == 0 should all be NORMAL type but + // in old days, bug happens, thus break such consistency, e.g. + // BEs shut down during cache type transition. + // Nowadays, we only use expiration time to decide the type, + // i.e. whenever expiration time > 0, it IS TTL, otherwise + // it is NORMAL or INDEX depending on its suffix. + // From now on, the ttl type encoding in file name is only for + // compatibility. It won't be build into the filename, and existing + // ones will be ignored. + if (expiration_time > 0) { + *cache_type = FileCacheType::TTL; + } else if (*cache_type == FileCacheType::TTL && expiration_time == 0) { + *cache_type = FileCacheType::NORMAL; + } + + if (!parsed) { + LOG(WARNING) << "parse offset err, path=" << file_path.native(); + return Status::InternalError("parse offset err, path={}", file_path.native()); + } + TEST_SYNC_POINT_CALLBACK("BlockFileCache::REMOVE_FILE", &offset_with_suffix); + + if (ec) { + LOG(WARNING) << "failed to file_size: file_name=" << offset_with_suffix + << "error=" << ec.message(); + return Status::InternalError("failed to file_size: file_name={}, error={}", + offset_with_suffix, ec.message()); + } + + if (size == 0 && !(*is_tmp)) { + auto st = fs->delete_file(file_path); + if (!st.ok()) { + LOG_WARNING("delete file {} error", file_path.native()).error(st); + } + return Status::InternalError("file size is 0, file_name={}", offset_with_suffix); + } + return Status::OK(); +} + void FSFileCacheStorage::load_cache_info_into_memory(BlockFileCache* _mgr) const { int scan_length = 10000; std::vector batch_load_buffer; @@ -383,50 +511,16 @@ void FSFileCacheStorage::load_cache_info_into_memory(BlockFileCache* _mgr) const } CacheContext context; context.query_id = TUniqueId(); - context.expiration_time = std::stoul(expiration_time_str); + long expiration_time = std::stoul(expiration_time_str); + context.expiration_time = expiration_time; for (; offset_it != std::filesystem::directory_iterator(); ++offset_it) { - std::string offset_with_suffix = offset_it->path().filename().native(); - auto delim_pos1 = offset_with_suffix.find('_'); - FileCacheType cache_type = FileCacheType::NORMAL; - bool parsed = true; - bool is_tmp = false; - size_t offset = 0; - try { - if (delim_pos1 == std::string::npos) { - // same as type "normal" - offset = stoull(offset_with_suffix); - } else { - offset = stoull(offset_with_suffix.substr(0, delim_pos1)); - std::string suffix = offset_with_suffix.substr(delim_pos1 + 1); - // not need persistent anymore - // if suffix is equals to "tmp", it should be removed too. - if (suffix == "tmp") [[unlikely]] { - is_tmp = true; - } else { - cache_type = BlockFileCache::string_to_cache_type(suffix); - } - } - } catch (...) { - parsed = false; - } - - if (!parsed) { - LOG(WARNING) << "parse offset err, path=" << offset_it->path().native(); - continue; - } - TEST_SYNC_POINT_CALLBACK("BlockFileCache::REMOVE_FILE_2", &offset_with_suffix); size_t size = offset_it->file_size(ec); - if (ec) { - LOG(WARNING) << "failed to file_size: file_name=" << offset_with_suffix - << "error=" << ec.message(); - continue; - } - - if (size == 0 && !is_tmp) { - auto st = fs->delete_file(offset_it->path()); - if (!st.ok()) { - LOG_WARNING("delete file {} error", offset_it->path().native()).error(st); - } + size_t offset = 0; + bool is_tmp = false; + FileCacheType cache_type = FileCacheType::NORMAL; + if (!parse_filename_suffix_to_cache_type(fs, offset_it->path().filename().native(), + expiration_time, size, &offset, &is_tmp, + &cache_type)) { continue; } context.cache_type = cache_type; @@ -450,6 +544,7 @@ void FSFileCacheStorage::load_cache_info_into_memory(BlockFileCache* _mgr) const }; std::error_code ec; if constexpr (USE_CACHE_VERSION2) { + TEST_SYNC_POINT_CALLBACK("BlockFileCache::BeforeScan"); std::filesystem::directory_iterator key_prefix_it {_cache_base_path, ec}; if (ec) { LOG(WARNING) << ec.message(); @@ -457,7 +552,7 @@ void FSFileCacheStorage::load_cache_info_into_memory(BlockFileCache* _mgr) const } for (; key_prefix_it != std::filesystem::directory_iterator(); ++key_prefix_it) { if (!key_prefix_it->is_directory()) { - // maybe version hits file + // skip version file continue; } if (key_prefix_it->path().filename().native().size() != KEY_PREFIX_LENGTH) { @@ -516,46 +611,13 @@ void FSFileCacheStorage::load_blocks_directly_unlocked(BlockFileCache* mgr, cons return; } for (; check_it != std::filesystem::directory_iterator(); ++check_it) { - uint64_t offset = 0; - std::string offset_with_suffix = check_it->path().filename().native(); - auto delim_pos1 = offset_with_suffix.find('_'); - FileCacheType cache_type = FileCacheType::NORMAL; - bool parsed = true; - bool is_tmp = false; - try { - if (delim_pos1 == std::string::npos) { - // same as type "normal" - offset = stoull(offset_with_suffix); - } else { - offset = stoull(offset_with_suffix.substr(0, delim_pos1)); - std::string suffix = offset_with_suffix.substr(delim_pos1 + 1); - if (suffix == "tmp") [[unlikely]] { - is_tmp = true; - } else { - cache_type = BlockFileCache::string_to_cache_type(suffix); - } - } - } catch (...) { - parsed = false; - } - - if (!parsed) [[unlikely]] { - LOG(WARNING) << "parse offset err, path=" << offset_with_suffix; - continue; - } - - TEST_SYNC_POINT_CALLBACK("BlockFileCache::REMOVE_FILE_1", &offset_with_suffix); - std::error_code ec; size_t size = check_it->file_size(ec); - if (ec) { - LOG(WARNING) << "failed to file_size: error=" << ec.message(); - continue; - } - if (size == 0 && !is_tmp) [[unlikely]] { - auto st = fs->delete_file(check_it->path()); - if (!st.ok()) { - LOG_WARNING("Failed to delete file {}", check_it->path().native()).error(st); - } + size_t offset = 0; + bool is_tmp = false; + FileCacheType cache_type = FileCacheType::NORMAL; + if (!parse_filename_suffix_to_cache_type(fs, check_it->path().filename().native(), + context_original.expiration_time, size, &offset, + &is_tmp, &cache_type)) { continue; } if (!mgr->_files.contains(key.hash) || !mgr->_files[key.hash].contains(offset)) { diff --git a/be/src/io/cache/fs_file_cache_storage.h b/be/src/io/cache/fs_file_cache_storage.h index d3299c6af0e99f..352b4e21f3f1b0 100644 --- a/be/src/io/cache/fs_file_cache_storage.h +++ b/be/src/io/cache/fs_file_cache_storage.h @@ -65,7 +65,8 @@ class FSFileCacheStorage : public FileCacheStorage { Status finalize(const FileCacheKey& key) override; Status read(const FileCacheKey& key, size_t value_offset, Slice buffer) override; Status remove(const FileCacheKey& key) override; - Status change_key_meta(const FileCacheKey& key, const KeyMeta& new_meta) override; + Status change_key_meta_type(const FileCacheKey& key, const FileCacheType type) override; + Status change_key_meta_expiration(const FileCacheKey& key, const uint64_t expiration) override; void load_blocks_directly_unlocked(BlockFileCache* _mgr, const FileCacheKey& key, std::lock_guard& cache_lock) override; @@ -73,14 +74,24 @@ class FSFileCacheStorage : public FileCacheStorage { FileCacheType type, bool is_tmp = false); + [[nodiscard]] static std::string get_path_in_local_cache_old_ttl_format(const std::string& dir, + size_t offset, + FileCacheType type, + bool is_tmp = false); + [[nodiscard]] std::string get_path_in_local_cache(const UInt128Wrapper&, uint64_t expiration_time) const; private: - Status rebuild_data_structure() const; + Status upgrade_cache_dir_if_necessary() const; Status read_file_cache_version(std::string* buffer) const; + Status parse_filename_suffix_to_cache_type(const std::shared_ptr& fs, + const Path& file_path, long expiration_time, + size_t size, size_t* offset, bool* is_tmp, + FileCacheType* cache_type) const; + Status write_file_cache_version() const; [[nodiscard]] std::string get_version_path() const; diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp index 759913bcaeaa5f..84fa6c9e0041ad 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp @@ -1032,7 +1032,8 @@ Status SegmentWriter::finalize(uint64_t* segment_file_size, uint64_t* index_size auto size = *index_size + *segment_file_size; auto holder = cache_builder->allocate_cache_holder(index_start, size); for (auto& segment : holder->file_blocks) { - static_cast(segment->change_cache_type_self(io::FileCacheType::INDEX)); + static_cast( + segment->change_cache_type_between_normal_and_index(io::FileCacheType::INDEX)); } } return Status::OK(); diff --git a/be/test/io/cache/block_file_cache_test.cpp b/be/test/io/cache/block_file_cache_test.cpp index 77cf48ac8e74f0..4c3285a27b3eca 100644 --- a/be/test/io/cache/block_file_cache_test.cpp +++ b/be/test/io/cache/block_file_cache_test.cpp @@ -248,7 +248,7 @@ void test_file_cache(io::FileCacheType cache_type) { ASSERT_TRUE(mgr.initialize().ok()); for (int i = 0; i < 100; i++) { - if (mgr.get_lazy_open_success()) { + if (mgr.get_async_open_success()) { break; }; std::this_thread::sleep_for(std::chrono::milliseconds(1)); @@ -586,7 +586,7 @@ void test_file_cache(io::FileCacheType cache_type) { io::BlockFileCache cache2(cache_base_path, settings); ASSERT_TRUE(cache2.initialize().ok()); for (int i = 0; i < 100; i++) { - if (cache2.get_lazy_open_success()) { + if (cache2.get_async_open_success()) { break; }; std::this_thread::sleep_for(std::chrono::milliseconds(1)); @@ -624,7 +624,7 @@ void test_file_cache(io::FileCacheType cache_type) { io::BlockFileCache cache2(cache_path2, settings2); ASSERT_TRUE(cache2.initialize().ok()); for (int i = 0; i < 100; i++) { - if (cache2.get_lazy_open_success()) { + if (cache2.get_async_open_success()) { break; }; std::this_thread::sleep_for(std::chrono::milliseconds(1)); @@ -677,7 +677,7 @@ TEST_F(BlockFileCacheTest, resize) { io::BlockFileCache cache(cache_base_path, settings); ASSERT_TRUE(cache.initialize()); for (int i = 0; i < 100; i++) { - if (cache.get_lazy_open_success()) { + if (cache.get_async_open_success()) { break; }; std::this_thread::sleep_for(std::chrono::milliseconds(1)); @@ -711,12 +711,12 @@ TEST_F(BlockFileCacheTest, max_ttl_size) { ASSERT_TRUE(cache.initialize()); int i = 0; for (; i < 100; i++) { - if (cache.get_lazy_open_success()) { + if (cache.get_async_open_success()) { break; } std::this_thread::sleep_for(std::chrono::milliseconds(1)); } - ASSERT_TRUE(cache.get_lazy_open_success()); + ASSERT_TRUE(cache.get_async_open_success()); int64_t offset = 0; for (; offset < 100000000; offset += 100000) { auto holder = cache.get_or_set(key1, offset, 100000, context); @@ -759,7 +759,7 @@ TEST_F(BlockFileCacheTest, query_limit_heap_use_after_free) { io::BlockFileCache cache(cache_base_path, settings); ASSERT_TRUE(cache.initialize()); for (int i = 0; i < 100; i++) { - if (cache.get_lazy_open_success()) { + if (cache.get_async_open_success()) { break; }; std::this_thread::sleep_for(std::chrono::milliseconds(1)); @@ -844,7 +844,7 @@ TEST_F(BlockFileCacheTest, query_limit_dcheck) { io::BlockFileCache cache(cache_base_path, settings); ASSERT_TRUE(cache.initialize()); for (int i = 0; i < 100; i++) { - if (cache.get_lazy_open_success()) { + if (cache.get_async_open_success()) { break; }; std::this_thread::sleep_for(std::chrono::milliseconds(1)); @@ -961,7 +961,7 @@ TEST_F(BlockFileCacheTest, reset_range) { EXPECT_EQ(cache.capacity(), 15); ASSERT_TRUE(cache.initialize()); for (int i = 0; i < 100; i++) { - if (cache.get_lazy_open_success()) { + if (cache.get_async_open_success()) { break; }; std::this_thread::sleep_for(std::chrono::milliseconds(1)); @@ -1011,7 +1011,7 @@ TEST_F(BlockFileCacheTest, change_cache_type) { io::BlockFileCache cache(cache_base_path, settings); ASSERT_TRUE(cache.initialize()); for (int i = 0; i < 100; i++) { - if (cache.get_lazy_open_success()) { + if (cache.get_async_open_success()) { break; }; std::this_thread::sleep_for(std::chrono::milliseconds(1)); @@ -1030,7 +1030,8 @@ TEST_F(BlockFileCacheTest, change_cache_type) { std::string data(size, '0'); Slice result(data.data(), size); ASSERT_TRUE(blocks[0]->append(result).ok()); - ASSERT_TRUE(blocks[0]->change_cache_type_self(io::FileCacheType::INDEX)); + ASSERT_TRUE( + blocks[0]->change_cache_type_between_normal_and_index(io::FileCacheType::INDEX)); ASSERT_TRUE(blocks[0]->finalize().ok()); auto key_str = key.to_string(); auto subdir = fs::path(cache_base_path) / key_str.substr(0, 3) / @@ -1061,7 +1062,7 @@ TEST_F(BlockFileCacheTest, fd_cache_remove) { io::BlockFileCache cache(cache_base_path, settings); ASSERT_TRUE(cache.initialize()); for (int i = 0; i < 100; i++) { - if (cache.get_lazy_open_success()) { + if (cache.get_async_open_success()) { break; }; std::this_thread::sleep_for(std::chrono::milliseconds(1)); @@ -1143,7 +1144,7 @@ TEST_F(BlockFileCacheTest, fd_cache_evict) { io::BlockFileCache cache(cache_base_path, settings); ASSERT_TRUE(cache.initialize()); for (int i = 0; i < 100; i++) { - if (cache.get_lazy_open_success()) { + if (cache.get_async_open_success()) { break; }; std::this_thread::sleep_for(std::chrono::milliseconds(1)); @@ -1289,7 +1290,7 @@ void test_file_cache_run_in_resource_limit(io::FileCacheType cache_type) { cache._index_queue.hot_data_interval = 0; ASSERT_TRUE(cache.initialize()); for (int i = 0; i < 100; i++) { - if (cache.get_lazy_open_success()) { + if (cache.get_async_open_success()) { break; }; std::this_thread::sleep_for(std::chrono::milliseconds(1)); @@ -1421,7 +1422,7 @@ TEST_F(BlockFileCacheTest, fix_tmp_file) { } } -TEST_F(BlockFileCacheTest, test_lazy_load) { +TEST_F(BlockFileCacheTest, test_async_load) { if (fs::exists(cache_base_path)) { fs::remove_all(cache_base_path); } @@ -1471,7 +1472,7 @@ TEST_F(BlockFileCacheTest, test_lazy_load) { ASSERT_TRUE(blocks[0]->finalize()); flag1 = true; for (int i = 0; i < 100; i++) { - if (cache.get_lazy_open_success()) { + if (cache.get_async_open_success()) { break; }; std::this_thread::sleep_for(std::chrono::milliseconds(1)); @@ -1482,7 +1483,7 @@ TEST_F(BlockFileCacheTest, test_lazy_load) { } } -TEST_F(BlockFileCacheTest, test_lazy_load_with_limit) { +TEST_F(BlockFileCacheTest, test_async_load_with_limit) { if (fs::exists(cache_base_path)) { fs::remove_all(cache_base_path); } @@ -1533,7 +1534,7 @@ TEST_F(BlockFileCacheTest, test_lazy_load_with_limit) { ASSERT_TRUE(blocks[0]->finalize()); flag1 = true; for (int i = 0; i < 100; i++) { - if (cache.get_lazy_open_success()) { + if (cache.get_async_open_success()) { break; }; std::this_thread::sleep_for(std::chrono::milliseconds(1)); @@ -1576,7 +1577,7 @@ TEST_F(BlockFileCacheTest, ttl_normal) { io::BlockFileCache cache(cache_base_path, settings); ASSERT_TRUE(cache.initialize()); for (int i = 0; i < 100; i++) { - if (cache.get_lazy_open_success()) { + if (cache.get_async_open_success()) { break; }; std::this_thread::sleep_for(std::chrono::milliseconds(1)); @@ -1671,7 +1672,7 @@ TEST_F(BlockFileCacheTest, ttl_modify) { io::BlockFileCache cache(cache_base_path, settings); ASSERT_TRUE(cache.initialize()); for (int i = 0; i < 100; i++) { - if (cache.get_lazy_open_success()) { + if (cache.get_async_open_success()) { break; }; std::this_thread::sleep_for(std::chrono::milliseconds(1)); @@ -1746,7 +1747,7 @@ TEST_F(BlockFileCacheTest, ttl_change_to_normal) { io::BlockFileCache cache(cache_base_path, settings); ASSERT_TRUE(cache.initialize()); for (int i = 0; i < 100; i++) { - if (cache.get_lazy_open_success()) { + if (cache.get_async_open_success()) { break; }; std::this_thread::sleep_for(std::chrono::milliseconds(1)); @@ -1810,7 +1811,7 @@ TEST_F(BlockFileCacheTest, ttl_change_expiration_time) { io::BlockFileCache cache(cache_base_path, settings); ASSERT_TRUE(cache.initialize()); for (int i = 0; i < 100; i++) { - if (cache.get_lazy_open_success()) { + if (cache.get_async_open_success()) { break; }; std::this_thread::sleep_for(std::chrono::milliseconds(1)); @@ -1873,12 +1874,12 @@ TEST_F(BlockFileCacheTest, ttl_reverse) { io::BlockFileCache cache(cache_base_path, settings); ASSERT_TRUE(cache.initialize()); for (int i = 0; i < 100; i++) { - if (cache.get_lazy_open_success()) { + if (cache.get_async_open_success()) { break; }; std::this_thread::sleep_for(std::chrono::milliseconds(1)); } - ASSERT_TRUE(cache.get_lazy_open_success()); + ASSERT_TRUE(cache.get_async_open_success()); for (size_t offset = 0; offset < 30; offset += 6) { auto holder = cache.get_or_set(key2, offset, 6, context); auto blocks = fromHolder(holder); @@ -1925,7 +1926,7 @@ TEST_F(BlockFileCacheTest, io_error) { io::BlockFileCache cache(cache_base_path, settings); ASSERT_TRUE(cache.initialize()); for (int i = 0; i < 100; i++) { - if (cache.get_lazy_open_success()) { + if (cache.get_async_open_success()) { break; }; std::this_thread::sleep_for(std::chrono::milliseconds(1)); @@ -2081,7 +2082,7 @@ TEST_F(BlockFileCacheTest, remove_directly_when_normal_change_to_ttl) { io::BlockFileCache cache(cache_base_path, settings); ASSERT_TRUE(cache.initialize()); for (int i = 0; i < 100; i++) { - if (cache.get_lazy_open_success()) { + if (cache.get_async_open_success()) { break; }; std::this_thread::sleep_for(std::chrono::milliseconds(1)); @@ -2157,7 +2158,7 @@ TEST_F(BlockFileCacheTest, recyle_cache_async) { sp->enable_processing(); ASSERT_TRUE(cache.initialize()); for (int i = 0; i < 100; i++) { - if (cache.get_lazy_open_success()) { + if (cache.get_async_open_success()) { break; }; std::this_thread::sleep_for(std::chrono::milliseconds(1)); @@ -2228,7 +2229,7 @@ TEST_F(BlockFileCacheTest, recyle_cache_async_ttl) { sp->enable_processing(); ASSERT_TRUE(cache.initialize()); for (int i = 0; i < 100; i++) { - if (cache.get_lazy_open_success()) { + if (cache.get_async_open_success()) { break; }; std::this_thread::sleep_for(std::chrono::milliseconds(1)); @@ -2291,7 +2292,7 @@ TEST_F(BlockFileCacheTest, remove_directly) { context.expiration_time = UnixSeconds() + 3600; ASSERT_TRUE(cache.initialize()); for (int i = 0; i < 100; i++) { - if (cache.get_lazy_open_success()) { + if (cache.get_async_open_success()) { break; }; std::this_thread::sleep_for(std::chrono::milliseconds(1)); @@ -2368,7 +2369,7 @@ TEST_F(BlockFileCacheTest, test_factory_1) { auto cache = FileCacheFactory::instance()->get_by_path(key1); int i = 0; while (i++ < 1000) { - if (cache->get_lazy_open_success()) { + if (cache->get_async_open_success()) { break; }; std::this_thread::sleep_for(std::chrono::milliseconds(1)); @@ -2434,7 +2435,7 @@ TEST_F(BlockFileCacheTest, test_factory_2) { auto cache = FileCacheFactory::instance()->get_by_path(key); int i = 0; while (i++ < 1000) { - if (cache->get_lazy_open_success()) { + if (cache->get_async_open_success()) { break; }; std::this_thread::sleep_for(std::chrono::milliseconds(1)); @@ -2469,7 +2470,7 @@ TEST_F(BlockFileCacheTest, test_factory_3) { auto cache = FileCacheFactory::instance()->get_by_path(key); int i = 0; while (i++ < 1000) { - if (cache->get_lazy_open_success()) { + if (cache->get_async_open_success()) { break; }; std::this_thread::sleep_for(std::chrono::milliseconds(1)); @@ -2545,7 +2546,7 @@ TEST_F(BlockFileCacheTest, test_disposable) { io::BlockFileCache cache(cache_base_path, settings); ASSERT_TRUE(cache.initialize()); for (int i = 0; i < 100; i++) { - if (cache.get_lazy_open_success()) { + if (cache.get_async_open_success()) { break; }; std::this_thread::sleep_for(std::chrono::milliseconds(1)); @@ -2593,7 +2594,7 @@ TEST_F(BlockFileCacheTest, test_query_limit) { auto cache = FileCacheFactory::instance()->get_by_path(key); int i = 0; while (i++ < 1000) { - if (cache->get_lazy_open_success()) { + if (cache->get_async_open_success()) { break; }; std::this_thread::sleep_for(std::chrono::milliseconds(1)); @@ -2656,7 +2657,7 @@ TEST_F(BlockFileCacheTest, append_many_time) { io::BlockFileCache cache(cache_base_path, settings); ASSERT_TRUE(cache.initialize()); for (int i = 0; i < 100; i++) { - if (cache.get_lazy_open_success()) { + if (cache.get_async_open_success()) { break; }; std::this_thread::sleep_for(std::chrono::milliseconds(1)); @@ -2676,7 +2677,8 @@ TEST_F(BlockFileCacheTest, append_many_time) { auto holder = cache.get_or_set(key, 0, 5, context); auto blocks = fromHolder(holder); assert_range(1, blocks[0], io::FileBlock::Range(0, 4), io::FileBlock::State::DOWNLOADED); - ASSERT_TRUE(blocks[0]->change_cache_type_self(FileCacheType::INDEX).ok()); + ASSERT_TRUE( + blocks[0]->change_cache_type_between_normal_and_index(FileCacheType::INDEX).ok()); if (auto storage = dynamic_cast(cache._storage.get()); storage != nullptr) { auto dir = storage->get_path_in_local_cache(blocks[0]->get_hash_value(), @@ -2684,7 +2686,8 @@ TEST_F(BlockFileCacheTest, append_many_time) { EXPECT_TRUE(fs::exists(storage->get_path_in_local_cache(dir, blocks[0]->offset(), blocks[0]->cache_type()))); } - ASSERT_TRUE(blocks[0]->change_cache_type_self(FileCacheType::INDEX).ok()); + ASSERT_TRUE( + blocks[0]->change_cache_type_between_normal_and_index(FileCacheType::INDEX).ok()); auto sp = SyncPoint::get_instance(); sp->enable_processing(); SyncPoint::CallbackGuard guard1; @@ -2697,15 +2700,9 @@ TEST_F(BlockFileCacheTest, append_many_time) { }, &guard1); { - ASSERT_FALSE(blocks[0]->change_cache_type_self(FileCacheType::NORMAL).ok()); - EXPECT_EQ(blocks[0]->cache_type(), FileCacheType::INDEX); - std::string buffer; - buffer.resize(5); - EXPECT_TRUE(blocks[0]->read(Slice(buffer.data(), 5), 0).ok()); - EXPECT_EQ(buffer, std::string(5, '0')); - } - { - EXPECT_FALSE(blocks[0]->change_cache_type_by_mgr(FileCacheType::NORMAL).ok()); + ASSERT_FALSE(blocks[0] + ->change_cache_type_between_normal_and_index(FileCacheType::NORMAL) + .ok()); EXPECT_EQ(blocks[0]->cache_type(), FileCacheType::INDEX); std::string buffer; buffer.resize(5); @@ -2776,7 +2773,7 @@ TEST_F(BlockFileCacheTest, query_file_cache) { io::BlockFileCache cache(cache_base_path, settings); ASSERT_TRUE(cache.initialize()); for (int i = 0; i < 100; i++) { - if (cache.get_lazy_open_success()) { + if (cache.get_async_open_success()) { break; }; std::this_thread::sleep_for(std::chrono::milliseconds(1)); @@ -2787,7 +2784,7 @@ TEST_F(BlockFileCacheTest, query_file_cache) { io::BlockFileCache cache(cache_base_path, settings); ASSERT_TRUE(cache.initialize()); for (int i = 0; i < 100; i++) { - if (cache.get_lazy_open_success()) { + if (cache.get_async_open_success()) { break; }; std::this_thread::sleep_for(std::chrono::milliseconds(1)); @@ -2844,7 +2841,7 @@ TEST_F(BlockFileCacheTest, query_file_cache_reserve) { io::BlockFileCache cache(cache_base_path, settings); ASSERT_TRUE(cache.initialize()); for (int i = 0; i < 100; i++) { - if (cache.get_lazy_open_success()) { + if (cache.get_async_open_success()) { break; }; std::this_thread::sleep_for(std::chrono::milliseconds(1)); @@ -3085,7 +3082,7 @@ TEST_F(BlockFileCacheTest, cached_remote_file_reader_error_handle) { ASSERT_TRUE(FileCacheFactory::instance()->create_file_cache(cache_base_path, settings).ok()); auto cache = FileCacheFactory::instance()->_caches[0].get(); for (int i = 0; i < 100; i++) { - if (cache->get_lazy_open_success()) { + if (cache->get_async_open_success()) { break; }; std::this_thread::sleep_for(std::chrono::milliseconds(1)); @@ -3356,7 +3353,7 @@ TEST_F(BlockFileCacheTest, test_hot_data) { io::BlockFileCache cache(cache_base_path, settings); ASSERT_TRUE(cache.initialize()); for (int i = 0; i < 100; i++) { - if (cache.get_lazy_open_success()) { + if (cache.get_async_open_success()) { break; }; std::this_thread::sleep_for(std::chrono::milliseconds(1)); @@ -3418,7 +3415,7 @@ TEST_F(BlockFileCacheTest, test_hot_data) { EXPECT_EQ(cache.get_hot_blocks_meta(key2).size(), 1); } -TEST_F(BlockFileCacheTest, test_lazy_load_with_error_file_1) { +TEST_F(BlockFileCacheTest, test_async_load_with_error_file_1) { if (fs::exists(cache_base_path)) { fs::remove_all(cache_base_path); } @@ -3439,7 +3436,7 @@ TEST_F(BlockFileCacheTest, test_lazy_load_with_error_file_1) { io::BlockFileCache cache(cache_base_path, settings); ASSERT_TRUE(cache.initialize()); for (int i = 0; i < 100; i++) { - if (cache.get_lazy_open_success()) { + if (cache.get_async_open_success()) { break; }; std::this_thread::sleep_for(std::chrono::milliseconds(1)); @@ -3466,7 +3463,7 @@ TEST_F(BlockFileCacheTest, test_lazy_load_with_error_file_1) { ASSERT_TRUE(writer->append(Slice("111", 3)).ok()); ASSERT_TRUE(writer->close().ok()); }); - sp->set_call_back("BlockFileCache::REMOVE_FILE_2", [&](auto&& args) { + sp->set_call_back("BlockFileCache::REMOVE_FILE", [&](auto&& args) { if (*try_any_cast(args[0]) == "30086_idx") { static_cast(global_local_filesystem()->delete_file(dir / "30086_idx")); } @@ -3491,7 +3488,7 @@ TEST_F(BlockFileCacheTest, test_lazy_load_with_error_file_1) { } } -TEST_F(BlockFileCacheTest, test_lazy_load_with_error_file_2) { +TEST_F(BlockFileCacheTest, test_async_load_with_error_file_2) { if (fs::exists(cache_base_path)) { fs::remove_all(cache_base_path); } @@ -3538,7 +3535,7 @@ TEST_F(BlockFileCacheTest, test_lazy_load_with_error_file_2) { while (!flag1) { } }); - sp->set_call_back("BlockFileCache::REMOVE_FILE_1", [&](auto&& args) { + sp->set_call_back("BlockFileCache::REMOVE_FILE", [&](auto&& args) { if (*try_any_cast(args[0]) == "30086_idx") { static_cast(global_local_filesystem()->delete_file(dir / "30086_idx")); } @@ -3562,7 +3559,7 @@ TEST_F(BlockFileCacheTest, test_lazy_load_with_error_file_2) { ASSERT_TRUE(blocks[0]->finalize()); flag1 = true; for (int i = 0; i < 100; i++) { - if (cache.get_lazy_open_success()) { + if (cache.get_async_open_success()) { break; }; std::this_thread::sleep_for(std::chrono::milliseconds(1)); @@ -3588,7 +3585,7 @@ TEST_F(BlockFileCacheTest, test_check_disk_reource_limit_1) { config::file_cache_exit_disk_resource_limit_mode_percent = 50; ASSERT_TRUE(cache.initialize()); for (int i = 0; i < 100; i++) { - if (cache.get_lazy_open_success()) { + if (cache.get_async_open_success()) { break; }; std::this_thread::sleep_for(std::chrono::milliseconds(1)); @@ -3618,7 +3615,7 @@ TEST_F(BlockFileCacheTest, test_check_disk_reource_limit_2) { config::file_cache_exit_disk_resource_limit_mode_percent = 1; ASSERT_TRUE(cache.initialize()); for (int i = 0; i < 100; i++) { - if (cache.get_lazy_open_success()) { + if (cache.get_async_open_success()) { break; }; std::this_thread::sleep_for(std::chrono::milliseconds(1)); @@ -3649,7 +3646,7 @@ TEST_F(BlockFileCacheTest, test_check_disk_reource_limit_3) { config::file_cache_exit_disk_resource_limit_mode_percent = 98; ASSERT_TRUE(cache.initialize()); for (int i = 0; i < 100; i++) { - if (cache.get_lazy_open_success()) { + if (cache.get_async_open_success()) { break; }; std::this_thread::sleep_for(std::chrono::milliseconds(1)); @@ -3749,7 +3746,7 @@ TEST_F(BlockFileCacheTest, remove_if_cached_when_isnt_releasable) { io::BlockFileCache cache(cache_base_path, settings); ASSERT_TRUE(cache.initialize()); for (int i = 0; i < 100; i++) { - if (cache.get_lazy_open_success()) { + if (cache.get_async_open_success()) { break; }; std::this_thread::sleep_for(std::chrono::milliseconds(1)); @@ -3898,7 +3895,7 @@ TEST_F(BlockFileCacheTest, remove_from_other_queue_1) { ASSERT_TRUE(cache.initialize()); for (int i = 0; i < 100; i++) { - if (cache.get_lazy_open_success()) { + if (cache.get_async_open_success()) { break; }; std::this_thread::sleep_for(std::chrono::milliseconds(1)); @@ -3969,7 +3966,7 @@ TEST_F(BlockFileCacheTest, remove_from_other_queue_2) { ASSERT_TRUE(cache.initialize()); for (int i = 0; i < 100; i++) { - if (cache.get_lazy_open_success()) { + if (cache.get_async_open_success()) { break; }; std::this_thread::sleep_for(std::chrono::milliseconds(1)); @@ -4088,7 +4085,7 @@ TEST_F(BlockFileCacheTest, recyle_unvalid_ttl_async) { sp->enable_processing(); ASSERT_TRUE(cache.initialize()); for (int i = 0; i < 100; i++) { - if (cache.get_lazy_open_success()) { + if (cache.get_async_open_success()) { break; }; std::this_thread::sleep_for(std::chrono::milliseconds(1)); @@ -4143,7 +4140,7 @@ TEST_F(BlockFileCacheTest, ttl_reserve_wo_evict_using_lru) { ASSERT_TRUE(cache.initialize()); for (int i = 0; i < 100; i++) { - if (cache.get_lazy_open_success()) { + if (cache.get_async_open_success()) { break; }; std::this_thread::sleep_for(std::chrono::milliseconds(1)); @@ -4206,7 +4203,7 @@ TEST_F(BlockFileCacheTest, ttl_reserve_with_evict_using_lru) { ASSERT_TRUE(cache.initialize()); for (int i = 0; i < 100; i++) { - if (cache.get_lazy_open_success()) { + if (cache.get_async_open_success()) { break; }; std::this_thread::sleep_for(std::chrono::milliseconds(1)); @@ -4275,7 +4272,7 @@ TEST_F(BlockFileCacheTest, ttl_reserve_with_evict_using_lru_meet_max_ttl_cache_r ASSERT_TRUE(cache.initialize()); for (int i = 0; i < 100; i++) { - if (cache.get_lazy_open_success()) { + if (cache.get_async_open_success()) { break; }; std::this_thread::sleep_for(std::chrono::milliseconds(1)); @@ -4350,7 +4347,7 @@ TEST_F(BlockFileCacheTest, reset_capacity) { sp->enable_processing(); ASSERT_TRUE(cache.initialize()); for (int i = 0; i < 100; i++) { - if (cache.get_lazy_open_success()) { + if (cache.get_async_open_success()) { break; }; std::this_thread::sleep_for(std::chrono::milliseconds(1)); @@ -4419,7 +4416,7 @@ TEST_F(BlockFileCacheTest, change_cache_type1) { io::BlockFileCache cache(cache_base_path, settings); ASSERT_TRUE(cache.initialize()); for (int i = 0; i < 100; i++) { - if (cache.get_lazy_open_success()) { + if (cache.get_async_open_success()) { break; }; std::this_thread::sleep_for(std::chrono::milliseconds(1)); @@ -4444,7 +4441,7 @@ TEST_F(BlockFileCacheTest, change_cache_type1) { ASSERT_EQ(segments.size(), 1); assert_range(1, segments[0], io::FileBlock::Range(50, 59), io::FileBlock::State::DOWNLOADED); - EXPECT_EQ(segments[0]->cache_type(), io::FileCacheType::TTL); + EXPECT_EQ(segments[0]->cache_type(), io::FileCacheType::NORMAL); EXPECT_EQ(segments[0]->expiration_time(), 0); } sp->clear_call_back("FileBlock::change_cache_type"); @@ -4493,7 +4490,7 @@ TEST_F(BlockFileCacheTest, change_cache_type2) { io::BlockFileCache cache(cache_base_path, settings); ASSERT_TRUE(cache.initialize()); for (int i = 0; i < 100; i++) { - if (cache.get_lazy_open_success()) { + if (cache.get_async_open_success()) { break; }; std::this_thread::sleep_for(std::chrono::milliseconds(1)); @@ -4518,7 +4515,7 @@ TEST_F(BlockFileCacheTest, change_cache_type2) { ASSERT_EQ(segments.size(), 1); assert_range(1, segments[0], io::FileBlock::Range(50, 59), io::FileBlock::State::DOWNLOADED); - EXPECT_EQ(segments[0]->cache_type(), io::FileCacheType::NORMAL); + EXPECT_EQ(segments[0]->cache_type(), io::FileCacheType::TTL); EXPECT_EQ(segments[0]->expiration_time(), context.expiration_time); } sp->clear_call_back("FileBlock::change_cache_type"); @@ -4552,6 +4549,7 @@ TEST_F(BlockFileCacheTest, change_cache_type2) { } } +/* TEST_F(BlockFileCacheTest, load_cache1) { if (fs::exists(cache_base_path)) { fs::remove_all(cache_base_path); @@ -4577,7 +4575,7 @@ TEST_F(BlockFileCacheTest, load_cache1) { io::BlockFileCache cache(cache_base_path, settings); ASSERT_TRUE(cache.initialize()); for (int i = 0; i < 100; i++) { - if (cache.get_lazy_open_success()) { + if (cache.get_async_open_success()) { break; }; std::this_thread::sleep_for(std::chrono::milliseconds(1)); @@ -4624,7 +4622,7 @@ TEST_F(BlockFileCacheTest, load_cache2) { io::BlockFileCache cache(cache_base_path, settings); ASSERT_TRUE(cache.initialize()); for (int i = 0; i < 100; i++) { - if (cache.get_lazy_open_success()) { + if (cache.get_async_open_success()) { break; }; std::this_thread::sleep_for(std::chrono::milliseconds(1)); @@ -4644,5 +4642,105 @@ TEST_F(BlockFileCacheTest, load_cache2) { key1.to_string() + "_0/" + std::to_string(offset)); } } +*/ + +TEST_F(BlockFileCacheTest, test_load) { + // test both path formats when loading file cache into memory + // old file path format, [hash]_[expiration]/[offset]_ttl + // new file path format, [hash]_[expiration]/[offset] + const int64_t expiration = 1987654321; + if (fs::exists(cache_base_path)) { + fs::remove_all(cache_base_path); + } + fs::create_directories(cache_base_path); + auto sp = SyncPoint::get_instance(); + Defer defer {[sp] { sp->clear_all_call_backs(); }}; + io::FileCacheSettings settings; + settings.index_queue_size = 30; + settings.index_queue_elements = 5; + settings.capacity = 30; + settings.max_file_block_size = 30; + settings.max_query_cache_size = 30; + io::CacheContext context; + context.cache_type = io::FileCacheType::TTL; + context.expiration_time = expiration; + auto key = io::BlockFileCache::hash("key1"); + io::BlockFileCache cache(cache_base_path, settings); + std::string dir = cache_base_path + key.to_string().substr(0, 3) + "/" + key.to_string() + "_" + + std::to_string(expiration); + std::cout << dir << std::endl; + auto st = global_local_filesystem()->create_directory(dir, false); + if (!st.ok()) { + std::cout << dir << " create failed"; + ASSERT_TRUE(false); + } + sp->set_call_back("BlockFileCache::BeforeScan", [&](auto&&) { + FileWriterPtr writer; + ASSERT_TRUE(global_local_filesystem()->create_file(dir / "10086_ttl", &writer).ok()); + ASSERT_TRUE(writer->append(Slice("111", 3)).ok()); + ASSERT_TRUE(writer->close().ok()); + + // no suffix, but it is not NORMAL, instead it is TTL because the + // dirname contains non-zero expiration time + ASSERT_TRUE(global_local_filesystem()->create_file(dir / "20086", &writer).ok()); + ASSERT_TRUE(writer->append(Slice("222", 3)).ok()); + ASSERT_TRUE(writer->close().ok()); + + ASSERT_TRUE(global_local_filesystem()->create_file(dir / "30086_idx", &writer).ok()); + ASSERT_TRUE(writer->append(Slice("333", 3)).ok()); + ASSERT_TRUE(writer->close().ok()); + }); + sp->enable_processing(); + ASSERT_TRUE(cache.initialize()); + for (int i = 0; i < 100; i++) { + if (cache.get_async_open_success()) { + break; + }; + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + + { + auto type = cache.dump_single_cache_type(key, 10086); + ASSERT_TRUE(type == "_ttl"); + auto holder = cache.get_or_set(key, 10086, 3, context); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + assert_range(1, blocks[0], io::FileBlock::Range(10086, 10086 + 3 - 1), + io::FileBlock::State::DOWNLOADED); + ASSERT_TRUE(blocks[0]->cache_type() == io::FileCacheType::TTL); + // OK, looks like old format is correctly loaded, let's read it + std::string buffer; + buffer.resize(3); + ASSERT_TRUE(blocks[0]->read(Slice(buffer.data(), buffer.size()), 0).ok()); + ASSERT_EQ(buffer, "111"); + // OK, read successfully, let's try removing it + std::mutex m1, m2; + std::lock_guard cache_lock(m1); + std::lock_guard block_lock(m2); + cache.remove(blocks[0], cache_lock, block_lock); + ASSERT_FALSE(fs::exists(dir / "10086_ttl")); + } + { + auto type = cache.dump_single_cache_type(key, 20086); + ASSERT_TRUE(type == "_ttl"); + auto holder = cache.get_or_set(key, 20086, 3, context); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + assert_range(1, blocks[0], io::FileBlock::Range(20086, 20086 + 3 - 1), + io::FileBlock::State::DOWNLOADED); + ASSERT_TRUE(blocks[0]->cache_type() == io::FileCacheType::TTL); + // OK, looks like old format is correctly loaded, let's read it + std::string buffer; + buffer.resize(3); + ASSERT_TRUE(blocks[0]->read(Slice(buffer.data(), buffer.size()), 0).ok()); + ASSERT_EQ(buffer, "222"); + // OK, read successfully, let's try removing it + std::mutex m1, m2; + std::lock_guard cache_lock(m1); + std::lock_guard block_lock(m2); + cache.remove(blocks[0], cache_lock, block_lock); + ASSERT_FALSE(fs::exists(dir / "20086")); + } +} } // namespace doris::io diff --git a/be/test/io/fs/s3_file_writer_test.cpp b/be/test/io/fs/s3_file_writer_test.cpp index ab76fb54347d27..7021346a7043e9 100644 --- a/be/test/io/fs/s3_file_writer_test.cpp +++ b/be/test/io/fs/s3_file_writer_test.cpp @@ -625,7 +625,7 @@ TEST_F(S3FileWriterTest, multi_part_open_error) { // auto cache = std::make_unique(cache_base_path, settings); // ASSERT_TRUE(cache->initialize()); // while (true) { -// if (cache->get_lazy_open_success()) { +// if (cache->get_async_open_success()) { // break; // }; // std::this_thread::sleep_for(std::chrono::milliseconds(1)); @@ -710,7 +710,7 @@ TEST_F(S3FileWriterTest, multi_part_open_error) { // auto cache = std::make_unique(cache_base_path, settings); // ASSERT_TRUE(cache->initialize()); // while (true) { -// if (cache->get_lazy_open_success()) { +// if (cache->get_async_open_success()) { // break; // }; // std::this_thread::sleep_for(std::chrono::milliseconds(1));