diff --git a/src/storage/meta/catalog.cpp b/src/storage/meta/catalog.cpp index e47cf48fe3..98c0709386 100644 --- a/src/storage/meta/catalog.cpp +++ b/src/storage/meta/catalog.cpp @@ -60,6 +60,7 @@ import chunk_index_entry; import log_file; import persist_result_handler; import local_file_handle; +import peer_task; namespace infinity { @@ -551,6 +552,10 @@ Catalog::LoadFromFiles(const FullCatalogFileInfo &full_ckp_info, const Vector Catalog::LoadFromFileDelta(const DeltaCatalogFileInfo &delta_ckp_info) { const auto &catalog_path = delta_ckp_info.path_; + if(!VirtualStore::Exists(catalog_path)){ + VirtualStore::DownloadObject(catalog_path, catalog_path); + } + auto [catalog_file_handle, status] = VirtualStore::Open(catalog_path, FileAccessMode::kRead); if (!status.ok()) { UnrecoverableError(status.message()); @@ -946,6 +951,10 @@ void Catalog::LoadFromEntryDelta(UniquePtr delta_entry, Buffe UniquePtr Catalog::LoadFromFile(const FullCatalogFileInfo &full_ckp_info, BufferManager *buffer_mgr) { const auto &catalog_path = Path(InfinityContext::instance().config()->DataDir()) / full_ckp_info.path_; + if(!VirtualStore::Exists(catalog_path)){ + VirtualStore::DownloadObject(catalog_path, catalog_path); + } + auto [catalog_file_handle, status] = VirtualStore::Open(catalog_path, FileAccessMode::kRead); if (!status.ok()) { UnrecoverableError(status.message()); @@ -1013,6 +1022,9 @@ void Catalog::SaveFullCatalog(TxnTimeStamp max_commit_ts, String &full_catalog_p // Rename temp file to regular catalog file VirtualStore::Rename(catalog_tmp_path, full_path); + if(InfinityContext::instance().GetServerRole() == NodeRole::kLeader){ + VirtualStore::UploadObject(full_path, full_path); + } global_catalog_delta_entry_->InitFullCheckpointTs(max_commit_ts); @@ -1097,7 +1109,10 @@ bool Catalog::SaveDeltaCatalog(TxnTimeStamp last_ckp_ts, TxnTimeStamp &max_commi } out_file_handle->Append((reinterpret_cast(buf.data())), act_size); - + out_file_handle->Sync(); + if(InfinityContext::instance().GetServerRole() == NodeRole::kLeader){ + VirtualStore::UploadObject(full_path, full_path); + } // { // log for delta op debug // std::stringstream ss; diff --git a/src/storage/persistence/persist_result_handler.cpp b/src/storage/persistence/persist_result_handler.cpp index 2f01aeb2f8..c412035088 100644 --- a/src/storage/persistence/persist_result_handler.cpp +++ b/src/storage/persistence/persist_result_handler.cpp @@ -21,19 +21,32 @@ module persist_result_handler; import infinity_exception; import third_party; +import virtual_store; +import infinity_context; +import peer_task; namespace fs = std::filesystem; namespace infinity { void PersistResultHandler::HandleWriteResult(const PersistWriteResult &result) { - for ([[maybe_unused]] const String &persist_key : result.persist_keys_) { - // + for (const String &persist_key : result.persist_keys_) { + String persist_path = pm_->GetObjPath(persist_key); + if(InfinityContext::instance().GetServerRole() == NodeRole::kLeader){ + VirtualStore::UploadObject(persist_path, persist_path); + } } for (const String &drop_key : result.drop_keys_) { String drop_path = pm_->GetObjPath(drop_key); fs::remove(drop_path); } + for (const String &drop_key : result.drop_from_remote_keys_) { + String drop_path = pm_->GetObjPath(drop_key); + fs::remove(drop_path); + if(InfinityContext::instance().GetServerRole() == NodeRole::kLeader){ + VirtualStore::RemoveObject(drop_path); + } + } } ObjAddr PersistResultHandler::HandleReadResult(const PersistReadResult &result) { diff --git a/src/storage/persistence/persistence_manager.cpp b/src/storage/persistence/persistence_manager.cpp index a59740b36d..3daaf86dff 100644 --- a/src/storage/persistence/persistence_manager.cpp +++ b/src/storage/persistence/persistence_manager.cpp @@ -99,7 +99,7 @@ PersistWriteResult PersistenceManager::Persist(const String &file_path, const St std::lock_guard lock(mtx_); auto it = local_path_obj_.find(local_path); if (it != local_path_obj_.end()) { - CleanupNoLock(it->second, result.persist_keys_, result.drop_keys_); + CleanupNoLock(it->second, result.persist_keys_, result.drop_keys_, result.drop_from_remote_keys_); LOG_TRACE(fmt::format("Persist deleted mapping from local path {} to ObjAddr({}, {}, {})", local_path, it->second.obj_key_, @@ -237,6 +237,14 @@ PersistReadResult PersistenceManager::GetObjCache(const String &file_path) { result.obj_addr_ = it->second; if (ObjStat *obj_stat = objects_->Get(it->second.obj_key_); obj_stat != nullptr) { LOG_TRACE(fmt::format("GetObjCache object {} ref count {}", it->second.obj_key_, obj_stat->ref_count_)); + if(!obj_stat->cached_){ + String read_path = GetObjPath(result.obj_addr_.obj_key_); + VirtualStore::DownloadObject(read_path, read_path); + if(VirtualStore::Exists(read_path)){ + LOG_TRACE(fmt::format("GetObjCache download object {}", read_path)); + obj_stat->cached_ = true; + } + } result.cached_ = obj_stat->cached_; } else { if (it->second.obj_key_ != current_object_key_) { @@ -284,7 +292,7 @@ PersistWriteResult PersistenceManager::PutObjCache(const String &file_path) { if (it->second.part_size_ == 0) { UnrecoverableError(fmt::format("PutObjCache object {} part size is 0", it->second.obj_key_)); } - ObjStat *obj_stat = objects_->Release(it->second.obj_key_, result.drop_keys_); + ObjStat *obj_stat = objects_->Release(it->second.obj_key_, result.drop_keys_); if (obj_stat == nullptr) { if (it->second.obj_key_ != current_object_key_) { UnrecoverableError(fmt::format("PutObjCache object {} not found", it->second.obj_key_)); @@ -332,7 +340,11 @@ void PersistenceManager::CurrentObjAppendNoLock(const String &tmp_file_path, Siz dstFile.close(); } -void PersistenceManager::CleanupNoLock(const ObjAddr &object_addr, Vector &persist_keys, Vector &drop_keys, bool check_ref_count) { +void PersistenceManager::CleanupNoLock(const ObjAddr &object_addr, + Vector &persist_keys, + Vector &drop_keys, + Vector &drop_from_remote_keys, + bool check_ref_count) { ObjStat *obj_stat = objects_->GetNoCount(object_addr.obj_key_); if (obj_stat == nullptr) { if (object_addr.obj_key_ == current_object_key_) { @@ -412,7 +424,7 @@ void PersistenceManager::CleanupNoLock(const ObjAddr &object_addr, VectorInvalidate(object_addr.obj_key_); LOG_TRACE(fmt::format("Deleted object {}", object_addr.obj_key_)); } @@ -483,7 +495,7 @@ PersistWriteResult PersistenceManager::Cleanup(const String &file_path) { LOG_WARN(error_message); return result; } - CleanupNoLock(it->second, result.persist_keys_, result.drop_keys_, true); + CleanupNoLock(it->second, result.persist_keys_, result.drop_keys_, result.drop_from_remote_keys_, true); LOG_TRACE(fmt::format("Deleted mapping from local path {} to ObjAddr({}, {}, {})", local_path, it->second.obj_key_, diff --git a/src/storage/persistence/persistence_manager.cppm b/src/storage/persistence/persistence_manager.cppm index 5299e39a73..8fd9541385 100644 --- a/src/storage/persistence/persistence_manager.cppm +++ b/src/storage/persistence/persistence_manager.cppm @@ -44,15 +44,17 @@ export struct ObjAddr { }; export struct PersistWriteResult { - ObjAddr obj_addr_; // where data is persisted, only returned by Persist - Vector persist_keys_; // object that should be persisted to local disk. because of cleanup current_object - Vector drop_keys_; // object that should be removed from local disk. because of 1. disk used over limit (TODO) 2. object's all parts are deleted + ObjAddr obj_addr_; // where data is persisted, only returned by Persist + Vector persist_keys_; // object that should be persisted to local disk. because of cleanup current_object + Vector drop_keys_; // object that should be removed from local disk. because of 1. disk used over limit + Vector drop_from_remote_keys_; // object that should be removed from remote storage. because of object's all parts are deleted }; export struct PersistReadResult { - ObjAddr obj_addr_; // where data should read from - bool cached_; // whether the object is in localdisk cache - Vector drop_keys_; + ObjAddr obj_addr_; // where data should read from + bool cached_; // whether the object is in localdisk cache + Vector drop_keys_; // object that should be removed from local disk. because of 1. disk used over limit + Vector drop_from_remote_keys_; // object that should be removed from remote storage. because of object's all parts are deleted }; export class PersistenceManager { @@ -104,7 +106,11 @@ private: void CurrentObjFinalizeNoLock(Vector &persist_keys, Vector &drop_keys); // Cleanup - void CleanupNoLock(const ObjAddr &object_addr, Vector &persist_keys, Vector &drop_keys ,bool check_ref_count = false); + void CleanupNoLock(const ObjAddr &object_addr, + Vector &persist_keys, + Vector &drop_keys, + Vector &drop_from_remote_keys, + bool check_ref_count = false); String RemovePrefix(const String &path); @@ -125,7 +131,7 @@ private: mutable std::mutex mtx_; // HashMap objects_; // obj_key -> ObjStat UniquePtr objects_; // obj_key -> ObjStat - HashMap local_path_obj_; // local_file_path -> ObjAddr + HashMap local_path_obj_; // local_file_path -> ObjAddr // Current unsealed object key String current_object_key_; SizeT current_object_size_ = 0; @@ -141,7 +147,7 @@ export struct AddrSerializer { void InitializeValid(PersistenceManager *persistence_manager); SizeT GetSizeInBytes() const; - + void WriteBufAdv(char *&buf) const; Vector ReadBufAdv(const char *&buf);