Skip to content

Commit

Permalink
Handle object persist/cleanup/read outside persistence_manager. (#1931)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

PersistenceManager::Persist, PersistenceManager::Finalize,
PersistenceManager::Cleanup now return PersistWriteResult.
PersistenceManager::GetObj now return PersistReadResult.

PerisistResultHandler is used to handle the result.
If remote filesystem is supported. the envict and recover key is inside
the PersistWriteResult. and PersistReadResult will contain whether cache
is valid. Send request to remote fielsystem and update cache is job of
PersistResultHandler.

### Type of change

- [x] Refactoring
  • Loading branch information
small-turtle-1 authored Sep 27, 2024
1 parent 2bb1ecb commit 0077b4a
Show file tree
Hide file tree
Showing 13 changed files with 247 additions and 64 deletions.
24 changes: 19 additions & 5 deletions src/storage/buffer/file_worker/file_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import local_file_system;
import persistence_manager;
import infinity_context;
import logger;
import persist_result_handler;

namespace infinity {

Expand Down Expand Up @@ -65,8 +66,13 @@ bool FileWorker::WriteToFile(bool to_spill, const FileWorkerSaveCtx &ctx) {
}

fs.SyncFile(*file_handler_);
obj_addr_ = persistence_manager_->Persist(write_path, tmp_write_path);
fs.DeleteFile(tmp_write_path);

PersistResultHandler handler(persistence_manager_);
PersistWriteResult persist_result = persistence_manager_->Persist(write_path, tmp_write_path);
handler.HandleWriteResult(persist_result);

obj_addr_ = persist_result.obj_addr_;

return all_save;
} else {
String write_dir = ChooseFileDir(to_spill);
Expand Down Expand Up @@ -109,7 +115,9 @@ void FileWorker::ReadFromFile(bool from_spill) {
bool use_object_cache = !from_spill && persistence_manager_ != nullptr;
String read_path = fmt::format("{}/{}", ChooseFileDir(from_spill), *file_name_);
if (use_object_cache) {
obj_addr_ = persistence_manager_->GetObjCache(read_path);
PersistResultHandler handler(persistence_manager_);
PersistReadResult result = persistence_manager_->GetObjCache(read_path);
obj_addr_ = handler.HandleReadResult(result);
if (!obj_addr_.Valid()) {
String error_message = fmt::format("Failed to find object for local path {}", read_path);
UnrecoverableError(error_message);
Expand Down Expand Up @@ -159,7 +167,11 @@ void FileWorker::MoveFile() {
// }
fs.Rename(src_path, dest_path);
} else {
obj_addr_ = persistence_manager_->Persist(dest_path, src_path);
PersistResultHandler handler(persistence_manager_);
PersistWriteResult persist_result = persistence_manager_->Persist(dest_path, src_path);
handler.HandleWriteResult(persist_result);

obj_addr_ = persist_result.obj_addr_;
}
}

Expand All @@ -173,8 +185,10 @@ String FileWorker::ChooseFileDir(bool spill) const {

void FileWorker::CleanupFile() const {
if (persistence_manager_ != nullptr) {
PersistResultHandler handler(persistence_manager_);
String path = fmt::format("{}/{}", ChooseFileDir(false), *file_name_);
persistence_manager_->Cleanup(path);
PersistWriteResult result = persistence_manager_->Cleanup(path);
handler.HandleWriteResult(result);
return;
}
LocalFileSystem fs;
Expand Down
10 changes: 8 additions & 2 deletions src/storage/invertedindex/column_index_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import local_file_system;
import third_party;
import infinity_context;
import persistence_manager;
import persist_result_handler;

namespace infinity {

Expand All @@ -31,10 +32,15 @@ ColumnIndexIterator::ColumnIndexIterator(const String &index_dir, const String &
posting_file.append(POSTING_SUFFIX);

if (use_object_cache) {
PersistResultHandler handler(pm);
dict_file_path_ = dict_file;
posting_file_path_ = posting_file;
dict_file = pm->GetObjPath(pm->GetObjCache(dict_file).obj_key_);
posting_file = pm->GetObjPath(pm->GetObjCache(posting_file).obj_key_);
PersistReadResult result = pm->GetObjCache(dict_file);
const ObjAddr &obj_addr = handler.HandleReadResult(result);
dict_file = pm->GetObjPath(obj_addr.obj_key_);
PersistReadResult result2 = pm->GetObjCache(posting_file);
const ObjAddr &obj_addr2 = handler.HandleReadResult(result2);
posting_file = pm->GetObjPath(obj_addr2.obj_key_);
}

dict_reader_ = MakeShared<DictionaryReader>(dict_file, PostingFormatOption(flag));
Expand Down
16 changes: 12 additions & 4 deletions src/storage/invertedindex/column_index_merger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import persistence_manager;
import infinity_context;
import defer_op;
import utility;
import persist_result_handler;

namespace infinity {
ColumnIndexMerger::ColumnIndexMerger(const String &index_dir, optionflag_t flag) : index_dir_(index_dir), flag_(flag) {}
Expand Down Expand Up @@ -96,7 +97,10 @@ void ColumnIndexMerger::Merge(const Vector<String> &base_names, const Vector<Row
u32 id_offset = base_row_id - merge_base_rowid;

if (use_object_cache) {
column_len_file = pm->GetObjPath(pm->GetObjCache(column_len_file).obj_key_);
PersistResultHandler handler(pm);
PersistReadResult result = pm->GetObjCache(column_len_file);
const ObjAddr &obj_addr = handler.HandleReadResult(result);
column_len_file = pm->GetObjPath(obj_addr.obj_key_);
}

auto [file_handler, status] = fs_.OpenFile(column_len_file, FileFlags::READ_FLAG, FileLockType::kNoLock);
Expand Down Expand Up @@ -147,9 +151,13 @@ void ColumnIndexMerger::Merge(const Vector<String> &base_names, const Vector<Row
fs_.AppendFile(tmp_dict_file, tmp_fst_file);
fs_.DeleteFile(tmp_fst_file);
if (use_object_cache) {
pm->Persist(dict_file, tmp_dict_file, false);
pm->Persist(posting_file, tmp_posting_file, false);
pm->Persist(column_length_file, tmp_column_length_file, false);
PersistResultHandler handler(pm);
PersistWriteResult result1 = pm->Persist(dict_file, tmp_dict_file, false);
PersistWriteResult result2 = pm->Persist(posting_file, tmp_posting_file, false);
PersistWriteResult result3 = pm->Persist(column_length_file, tmp_column_length_file, false);
handler.HandleWriteResult(result1);
handler.HandleWriteResult(result2);
handler.HandleWriteResult(result3);
}
}

Expand Down
10 changes: 8 additions & 2 deletions src/storage/invertedindex/disk_segment_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import status;
import logger;
import persistence_manager;
import infinity_context;
import persist_result_handler;

namespace infinity {

Expand All @@ -49,7 +50,9 @@ DiskIndexSegmentReader::DiskIndexSegmentReader(const String &index_dir, const St
posting_file_.append(POSTING_SUFFIX);
String posting_file = posting_file_;
if (nullptr != pm) {
ObjAddr obj_addr = pm->GetObjCache(posting_file);
PersistResultHandler handler(pm);
PersistReadResult result = pm->GetObjCache(posting_file);
const ObjAddr &obj_addr = handler.HandleReadResult(result);
if (!obj_addr.Valid()) {
// Empty posting
return;
Expand All @@ -72,7 +75,10 @@ DiskIndexSegmentReader::DiskIndexSegmentReader(const String &index_dir, const St
dict_file_.append(DICT_SUFFIX);
String dict_file = dict_file_;
if (nullptr != pm) {
dict_file = pm->GetObjPath(pm->GetObjCache(dict_file).obj_key_);
PersistResultHandler handler(pm);
PersistReadResult result = pm->GetObjCache(dict_file);
const ObjAddr &obj_addr = handler.HandleReadResult(result);
dict_file = pm->GetObjPath(obj_addr.obj_key_);
}
dict_reader_ = MakeShared<DictionaryReader>(dict_file, PostingFormatOption(flag));
}
Expand Down
21 changes: 15 additions & 6 deletions src/storage/invertedindex/memory_indexer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ import blocking_queue;
import segment_index_entry;
import persistence_manager;
import utility;
import persist_result_handler;

namespace infinity {
constexpr int MAX_TUPLE_LENGTH = 1024; // we assume that analyzed term, together with docid/offset info, will never exceed such length
Expand Down Expand Up @@ -318,9 +319,13 @@ void MemoryIndexer::Dump(bool offline, bool spill) {
fs.Write(*file_handler, &column_length_array[0], sizeof(column_length_array[0]) * column_length_array.size());
fs.Close(*file_handler);
if (use_object_cache) {
pm->Persist(posting_file, tmp_posting_file, false);
pm->Persist(dict_file, tmp_dict_file, false);
pm->Persist(column_length_file, tmp_column_length_file, false);
PersistResultHandler handler(pm);
PersistWriteResult result1 = pm->Persist(posting_file, tmp_posting_file, false);
PersistWriteResult result2 = pm->Persist(dict_file, tmp_dict_file, false);
PersistWriteResult result3 = pm->Persist(column_length_file, tmp_column_length_file, false);
handler.HandleWriteResult(result1);
handler.HandleWriteResult(result2);
handler.HandleWriteResult(result3);
}

is_spilled_ = spill;
Expand Down Expand Up @@ -493,9 +498,13 @@ void MemoryIndexer::TupleListToIndexFile(UniquePtr<SortMergerTermTuple<TermTuple
fs.Write(*file_handler, &unsafe_column_lengths[0], sizeof(unsafe_column_lengths[0]) * unsafe_column_lengths.size());
fs.Close(*file_handler);
if (use_object_cache) {
pm->Persist(posting_file, tmp_posting_file, false);
pm->Persist(dict_file, tmp_dict_file, false);
pm->Persist(column_length_file, tmp_column_length_file, false);
PersistResultHandler handler(pm);
PersistWriteResult result1 = pm->Persist(posting_file, tmp_posting_file, false);
PersistWriteResult result2 = pm->Persist(dict_file, tmp_dict_file, false);
PersistWriteResult result3 = pm->Persist(column_length_file, tmp_column_length_file, false);
handler.HandleWriteResult(result1);
handler.HandleWriteResult(result2);
handler.HandleWriteResult(result3);
}
}

Expand Down
9 changes: 7 additions & 2 deletions src/storage/meta/catalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ import block_column_entry;
import segment_index_entry;
import chunk_index_entry;
import log_file;
import persist_result_handler;

namespace infinity {

Expand Down Expand Up @@ -512,8 +513,10 @@ nlohmann::json Catalog::Serialize(TxnTimeStamp max_commit_ts) {

PersistenceManager *pm = InfinityContext::instance().persistence_manager();
if (pm != nullptr) {
PersistResultHandler handler(pm);
// Finalize current object to ensure PersistenceManager be in a consistent state
pm->CurrentObjFinalize(true);
PersistWriteResult result = pm->CurrentObjFinalize(true);
handler.HandleWriteResult(result);

json_res["obj_addr_map"] = pm->Serialize();
}
Expand Down Expand Up @@ -1061,7 +1064,9 @@ bool Catalog::SaveDeltaCatalog(TxnTimeStamp last_ckp_ts, TxnTimeStamp &max_commi
// Finalize current object to ensure PersistenceManager be in a consistent state
PersistenceManager *pm = InfinityContext::instance().persistence_manager();
if (pm != nullptr) {
pm->CurrentObjFinalize(true);
PersistResultHandler handler(pm);
PersistWriteResult result = pm->CurrentObjFinalize(true);
handler.HandleWriteResult(result);
for (auto &op : flush_delta_entry->operations()) {
op->InitializeAddrSerializer();
}
Expand Down
8 changes: 6 additions & 2 deletions src/storage/meta/entry/chunk_index_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import column_def;
import internal_types;
import infinity_context;
import persistence_manager;
import persist_result_handler;

namespace infinity {

Expand Down Expand Up @@ -399,8 +400,11 @@ void ChunkIndexEntry::Cleanup(CleanupInfoTracer *info_tracer, bool dropped) {
String posting_file = index_prefix + POSTING_SUFFIX;
String dict_file = index_prefix + DICT_SUFFIX;
if (pm != nullptr) {
pm->Cleanup(posting_file);
pm->Cleanup(dict_file);
PersistResultHandler handler(pm);
PersistWriteResult result1 = pm->Cleanup(posting_file);
PersistWriteResult result2 = pm->Cleanup(dict_file);
handler.HandleWriteResult(result1);
handler.HandleWriteResult(result2);
LOG_DEBUG(fmt::format("Cleaned chunk index entry {}, posting: {}, dictionary file: {}", index_prefix, posting_file, dict_file));
} else {
String absolute_posting_file = fmt::format("{}/{}", InfinityContext::instance().config()->DataDir(), posting_file);
Expand Down
46 changes: 46 additions & 0 deletions src/storage/persistence/persist_result_handler.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright(C) 2023 InfiniFlow, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

module;

#include <vector>
#include <filesystem>

module persist_result_handler;

import infinity_exception;
import third_party;

namespace fs = std::filesystem;

namespace infinity {

void PersistResultHandler::HandleWriteResult(const PersistWriteResult &result) {
for ([[maybe_unused]] const String &persist_key : result.persist_keys_) {
//
}
for (const String &drop_key : result.drop_keys_) {
String drop_path = pm_->GetObjPath(drop_key);
fs::remove(drop_path);
}
}

ObjAddr PersistResultHandler::HandleReadResult(const PersistReadResult &result) {
if (!result.cached_) {
UnrecoverableError(fmt::format("HandleReadResult: object {} is not cached", result.obj_addr_.obj_key_));
}
return result.obj_addr_;
}

}
36 changes: 36 additions & 0 deletions src/storage/persistence/persist_result_handler.cppm
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright(C) 2023 InfiniFlow, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

module;

export module persist_result_handler;

import stl;
import persistence_manager;

namespace infinity {

export class PersistResultHandler {
public:
PersistResultHandler(PersistenceManager *pm) : pm_(pm) {}

void HandleWriteResult(const PersistWriteResult &result);

ObjAddr HandleReadResult(const PersistReadResult &result);

private:
PersistenceManager *pm_;
};

}
Loading

0 comments on commit 0077b4a

Please sign in to comment.