Skip to content

Commit

Permalink
[enhancement](cloud) clarify codes and make TTL expiration work after…
Browse files Browse the repository at this point in the history
… abnormal cache type transition (apache#40226)

current TTL embeds the expiration time and type into filename and path.
Maintaining both is buggy for lack of atomicity. I simplify this by
using only expiration time to infer the type so that we need only
expiration time.

Signed-off-by: freemandealer <[email protected]>
  • Loading branch information
freemandealer authored Sep 12, 2024
1 parent 5eb3c60 commit c71514d
Show file tree
Hide file tree
Showing 12 changed files with 420 additions and 217 deletions.
2 changes: 2 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -997,6 +997,8 @@ DEFINE_mInt64(file_cache_ttl_valid_check_interval_second, "0"); // zero for not
// If true, evict the ttl cache using LRU when full.
// Otherwise, only expiration can evict ttl and new data won't add to cache when full.
DEFINE_Bool(enable_ttl_cache_evict_using_lru, "true");
// rename ttl filename to new format during read, with some performance cost
DEFINE_mBool(translate_to_new_ttl_format_during_read, "false");

DEFINE_mInt32(index_cache_entry_stay_time_after_lookup_s, "1800");
DEFINE_mInt32(inverted_index_cache_stale_sweep_time_sec, "600");
Expand Down
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1046,6 +1046,8 @@ DECLARE_mInt64(file_cache_ttl_valid_check_interval_second);
// If true, evict the ttl cache using LRU when full.
// Otherwise, only expiration can evict ttl and new data won't add to cache when full.
DECLARE_Bool(enable_ttl_cache_evict_using_lru);
// rename ttl filename to new format during read, with some performance cost
DECLARE_Bool(translate_to_new_ttl_format_during_read);

// inverted index searcher cache
// cache entry stay time after lookup
Expand Down
49 changes: 32 additions & 17 deletions be/src/io/cache/block_file_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ FileBlocks BlockFileCache::get_impl(const UInt128Wrapper& hash, const CacheConte
/// find list [block1, ..., blockN] of blocks which intersect with given range.
auto it = _files.find(hash);
if (it == _files.end()) {
if (_lazy_open_done) {
if (_async_open_done) {
return {};
}
FileCacheKey key;
Expand All @@ -285,11 +285,10 @@ FileBlocks BlockFileCache::get_impl(const UInt128Wrapper& hash, const CacheConte
if (!st.ok()) {
LOG_WARNING("Failed to change key meta").error(st);
}
}
for (auto& [_, cell] : file_blocks) {

FileCacheType origin_type = cell.file_block->cache_type();
if (origin_type == FileCacheType::TTL) continue;
Status st = cell.file_block->change_cache_type_by_mgr(FileCacheType::TTL);
st = cell.file_block->change_cache_type_between_ttl_and_others(FileCacheType::TTL);
if (st.ok()) {
auto& queue = get_queue(origin_type);
queue.remove(cell.queue_iterator.value(), cache_lock);
Expand All @@ -309,6 +308,7 @@ FileBlocks BlockFileCache::get_impl(const UInt128Wrapper& hash, const CacheConte
_time_to_key.insert(std::make_pair(context.expiration_time, hash));
}
if (auto iter = _key_to_time.find(hash);
// TODO(zhengyu): Why the hell the type is NORMAL while context set expiration_time?
(context.cache_type == FileCacheType::NORMAL || context.cache_type == FileCacheType::TTL) &&
iter != _key_to_time.end() && iter->second != context.expiration_time) {
// remove from _time_to_key
Expand All @@ -330,7 +330,8 @@ FileBlocks BlockFileCache::get_impl(const UInt128Wrapper& hash, const CacheConte
for (auto& [_, cell] : file_blocks) {
auto cache_type = cell.file_block->cache_type();
if (cache_type != FileCacheType::TTL) continue;
auto st = cell.file_block->change_cache_type_by_mgr(FileCacheType::NORMAL);
auto st = cell.file_block->change_cache_type_between_ttl_and_others(
FileCacheType::NORMAL);
if (st.ok()) {
if (config::enable_ttl_cache_evict_using_lru) {
auto& ttl_queue = get_queue(FileCacheType::TTL);
Expand Down Expand Up @@ -699,9 +700,9 @@ BlockFileCache::FileBlockCell* BlockFileCache::add_cell(const UInt128Wrapper& ha
FileBlockCell cell(std::make_shared<FileBlock>(key, size, this, state), cache_lock);
Status st;
if (context.expiration_time == 0 && context.cache_type == FileCacheType::TTL) {
st = cell.file_block->change_cache_type_by_mgr(FileCacheType::NORMAL);
st = cell.file_block->change_cache_type_between_ttl_and_others(FileCacheType::NORMAL);
} else if (context.cache_type != FileCacheType::TTL && context.expiration_time != 0) {
st = cell.file_block->change_cache_type_by_mgr(FileCacheType::TTL);
st = cell.file_block->change_cache_type_between_ttl_and_others(FileCacheType::TTL);
}
if (!st.ok()) {
LOG(WARNING) << "Cannot change cache type. expiration_time=" << context.expiration_time
Expand Down Expand Up @@ -912,8 +913,8 @@ bool BlockFileCache::try_reserve_for_ttl(size_t size, std::lock_guard<std::mutex
bool BlockFileCache::try_reserve(const UInt128Wrapper& hash, const CacheContext& context,
size_t offset, size_t size,
std::lock_guard<std::mutex>& cache_lock) {
if (!_lazy_open_done) {
return try_reserve_for_lazy_load(size, cache_lock);
if (!_async_open_done) {
return try_reserve_during_async_load(size, cache_lock);
}

// use this strategy in scenarios where there is insufficient disk capacity or insufficient number of inodes remaining
Expand Down Expand Up @@ -1022,10 +1023,10 @@ bool BlockFileCache::remove_if_ttl_file_unlock(const UInt128Wrapper& file_key, b
LOG_WARNING("Failed to update expiration time to 0").error(st);
}
}
}
for (auto& [_, cell] : _files[file_key]) {

if (cell.file_block->cache_type() == FileCacheType::NORMAL) continue;
auto st = cell.file_block->change_cache_type_by_mgr(FileCacheType::NORMAL);
auto st = cell.file_block->change_cache_type_between_ttl_and_others(
FileCacheType::NORMAL);
if (st.ok()) {
if (config::enable_ttl_cache_evict_using_lru) {
ttl_queue.remove(cell.queue_iterator.value(), cache_lock);
Expand Down Expand Up @@ -1396,6 +1397,21 @@ std::string BlockFileCache::dump_structure_unlocked(const UInt128Wrapper& hash,
return result.str();
}

std::string BlockFileCache::dump_single_cache_type(const UInt128Wrapper& hash, size_t offset) {
std::lock_guard cache_lock(_mutex);
return dump_single_cache_type_unlocked(hash, offset, cache_lock);
}

std::string BlockFileCache::dump_single_cache_type_unlocked(const UInt128Wrapper& hash,
size_t offset,
std::lock_guard<std::mutex>&) {
std::stringstream result;
const auto& cells_by_offset = _files[hash];
const auto& cell = cells_by_offset.find(offset);

return cache_type_to_string(cell->second.file_block->cache_type());
}

void BlockFileCache::change_cache_type(const UInt128Wrapper& hash, size_t offset,
FileCacheType new_type,
std::lock_guard<std::mutex>& cache_lock) {
Expand Down Expand Up @@ -1621,11 +1637,10 @@ void BlockFileCache::modify_expiration_time(const UInt128Wrapper& hash,
if (!st.ok()) {
LOG_WARNING("").error(st);
}
}
for (auto& [_, cell] : iter->second) {

FileCacheType origin_type = cell.file_block->cache_type();
if (origin_type == FileCacheType::TTL) continue;
auto st = cell.file_block->change_cache_type_by_mgr(FileCacheType::TTL);
st = cell.file_block->change_cache_type_between_ttl_and_others(FileCacheType::TTL);
if (st.ok()) {
auto& queue = get_queue(origin_type);
queue.remove(cell.queue_iterator.value(), cache_lock);
Expand Down Expand Up @@ -1672,8 +1687,8 @@ BlockFileCache::get_hot_blocks_meta(const UInt128Wrapper& hash) const {
return blocks_meta;
}

bool BlockFileCache::try_reserve_for_lazy_load(size_t size,
std::lock_guard<std::mutex>& cache_lock) {
bool BlockFileCache::try_reserve_during_async_load(size_t size,
std::lock_guard<std::mutex>& cache_lock) {
size_t removed_size = 0;
size_t normal_queue_size = _normal_queue.get_capacity(cache_lock);
size_t disposable_queue_size = _disposable_queue.get_capacity(cache_lock);
Expand Down
12 changes: 8 additions & 4 deletions be/src/io/cache/block_file_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,9 @@ class BlockFileCache {
std::string reset_capacity(size_t new_capacity);

std::map<size_t, FileBlockSPtr> get_blocks_by_key(const UInt128Wrapper& hash);
/// For debug.
/// For debug and UT
std::string dump_structure(const UInt128Wrapper& hash);
std::string dump_single_cache_type(const UInt128Wrapper& hash, size_t offset);

[[nodiscard]] size_t get_used_cache_size(FileCacheType type) const;

Expand All @@ -130,7 +131,7 @@ class BlockFileCache {
[[nodiscard]] std::vector<std::tuple<size_t, size_t, FileCacheType, uint64_t>>
get_hot_blocks_meta(const UInt128Wrapper& hash) const;

[[nodiscard]] bool get_lazy_open_success() const { return _lazy_open_done; }
[[nodiscard]] bool get_async_open_success() const { return _async_open_done; }

BlockFileCache& operator=(const BlockFileCache&) = delete;
BlockFileCache(const BlockFileCache&) = delete;
Expand Down Expand Up @@ -338,7 +339,7 @@ class BlockFileCache {
const CacheContext& context, size_t offset, size_t size,
std::lock_guard<std::mutex>& cache_lock);

bool try_reserve_for_lazy_load(size_t size, std::lock_guard<std::mutex>& cache_lock);
bool try_reserve_during_async_load(size_t size, std::lock_guard<std::mutex>& cache_lock);

std::vector<FileCacheType> get_other_cache_type(FileCacheType cur_cache_type);

Expand All @@ -358,6 +359,9 @@ class BlockFileCache {
std::string dump_structure_unlocked(const UInt128Wrapper& hash,
std::lock_guard<std::mutex>& cache_lock);

std::string dump_single_cache_type_unlocked(const UInt128Wrapper& hash, size_t offset,
std::lock_guard<std::mutex>& cache_lock);

void fill_holes_with_empty_file_blocks(FileBlocks& file_blocks, const UInt128Wrapper& hash,
const CacheContext& context,
const FileBlock::Range& range,
Expand Down Expand Up @@ -413,7 +417,7 @@ class BlockFileCache {
std::mutex _close_mtx;
std::condition_variable _close_cv;
std::thread _cache_background_thread;
std::atomic_bool _lazy_open_done {false};
std::atomic_bool _async_open_done {false};
bool _async_clear_file_cache {false};
// disk space or inode is less than the specified value
bool _disk_resource_limit_mode {false};
Expand Down
40 changes: 23 additions & 17 deletions be/src/io/cache/file_block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,32 +161,41 @@ Status FileBlock::read(Slice buffer, size_t read_offset) {
return _mgr->_storage->read(_key, read_offset, buffer);
}

Status FileBlock::change_cache_type_by_mgr(FileCacheType new_type) {
Status FileBlock::change_cache_type_between_ttl_and_others(FileCacheType new_type) {
std::lock_guard block_lock(_mutex);
DCHECK(new_type != _key.meta.type);
if (_download_state == State::DOWNLOADED) {
KeyMeta new_meta;
new_meta.expiration_time = _key.meta.expiration_time;
new_meta.type = new_type;
auto st = _mgr->_storage->change_key_meta(_key, new_meta);
TEST_SYNC_POINT_CALLBACK("FileBlock::change_cache_type", &st);
if (!st.ok()) return st;
bool expr = (new_type == FileCacheType::TTL || _key.meta.type == FileCacheType::TTL);
if (!expr) {
LOG(WARNING) << "none of the cache type is TTL"
<< ", hash: " << _key.hash.to_string() << ", offset: " << _key.offset
<< ", new type: " << BlockFileCache::cache_type_to_string(new_type)
<< ", old type: " << BlockFileCache::cache_type_to_string(_key.meta.type);
}
DCHECK(expr);

// change cache type between TTL to others don't need to rename the filename suffix
_key.meta.type = new_type;
return Status::OK();
}

Status FileBlock::change_cache_type_self(FileCacheType new_type) {
Status FileBlock::change_cache_type_between_normal_and_index(FileCacheType new_type) {
std::lock_guard cache_lock(_mgr->_mutex);
std::lock_guard block_lock(_mutex);
bool expr = (new_type != FileCacheType::TTL && _key.meta.type != FileCacheType::TTL);
if (!expr) {
LOG(WARNING) << "one of the cache type is TTL"
<< ", hash: " << _key.hash.to_string() << ", offset: " << _key.offset
<< ", new type: " << BlockFileCache::cache_type_to_string(new_type)
<< ", old type: " << BlockFileCache::cache_type_to_string(_key.meta.type);
}
DCHECK(expr);
if (_key.meta.type == FileCacheType::TTL || new_type == _key.meta.type) {
return Status::OK();
}
if (_download_state == State::DOWNLOADED) {
KeyMeta new_meta;
new_meta.expiration_time = _key.meta.expiration_time;
new_meta.type = new_type;
RETURN_IF_ERROR(_mgr->_storage->change_key_meta(_key, new_meta));
Status st;
TEST_SYNC_POINT_CALLBACK("FileBlock::change_cache_type", &st);
RETURN_IF_ERROR(_mgr->_storage->change_key_meta_type(_key, new_type));
}
_mgr->change_cache_type(_key.hash, _block_range.left, new_type, cache_lock);
_key.meta.type = new_type;
Expand All @@ -196,10 +205,7 @@ Status FileBlock::change_cache_type_self(FileCacheType new_type) {
Status FileBlock::update_expiration_time(uint64_t expiration_time) {
std::lock_guard block_lock(_mutex);
if (_download_state == State::DOWNLOADED) {
KeyMeta new_meta;
new_meta.expiration_time = expiration_time;
new_meta.type = _key.meta.type;
auto st = _mgr->_storage->change_key_meta(_key, new_meta);
auto st = _mgr->_storage->change_key_meta_expiration(_key, expiration_time);
if (!st.ok() && !st.is<ErrorCode::NOT_FOUND>()) {
return st;
}
Expand Down
4 changes: 2 additions & 2 deletions be/src/io/cache/file_block.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,9 @@ class FileBlock {

std::string get_info_for_log() const;

[[nodiscard]] Status change_cache_type_by_mgr(FileCacheType new_type);
[[nodiscard]] Status change_cache_type_between_ttl_and_others(FileCacheType new_type);

[[nodiscard]] Status change_cache_type_self(FileCacheType new_type);
[[nodiscard]] Status change_cache_type_between_normal_and_index(FileCacheType new_type);

[[nodiscard]] Status update_expiration_time(uint64_t expiration_time);

Expand Down
4 changes: 3 additions & 1 deletion be/src/io/cache/file_cache_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ class FileCacheStorage {
// remove the block
virtual Status remove(const FileCacheKey& key) = 0;
// change the block meta
virtual Status change_key_meta(const FileCacheKey& key, const KeyMeta& new_meta) = 0;
virtual Status change_key_meta_type(const FileCacheKey& key, const FileCacheType type) = 0;
virtual Status change_key_meta_expiration(const FileCacheKey& key,
const uint64_t expiration) = 0;
// use when lazy load cache
virtual void load_blocks_directly_unlocked(BlockFileCache* _mgr, const FileCacheKey& key,
std::lock_guard<std::mutex>& cache_lock) {}
Expand Down
Loading

0 comments on commit c71514d

Please sign in to comment.