Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrates S3 #2003

Merged
merged 7 commits into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion src/storage/meta/catalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ import chunk_index_entry;
import log_file;
import persist_result_handler;
import local_file_handle;
import peer_task;

namespace infinity {

Expand Down Expand Up @@ -551,6 +552,10 @@ Catalog::LoadFromFiles(const FullCatalogFileInfo &full_ckp_info, const Vector<De
UniquePtr<CatalogDeltaEntry> Catalog::LoadFromFileDelta(const DeltaCatalogFileInfo &delta_ckp_info) {
const auto &catalog_path = delta_ckp_info.path_;

if(!VirtualStore::Exists(catalog_path)){
VirtualStore::DownloadObject(catalog_path, catalog_path);
}

auto [catalog_file_handle, status] = VirtualStore::Open(catalog_path, FileAccessMode::kRead);
if (!status.ok()) {
UnrecoverableError(status.message());
Expand Down Expand Up @@ -946,6 +951,10 @@ void Catalog::LoadFromEntryDelta(UniquePtr<CatalogDeltaEntry> delta_entry, Buffe
UniquePtr<Catalog> Catalog::LoadFromFile(const FullCatalogFileInfo &full_ckp_info, BufferManager *buffer_mgr) {
const auto &catalog_path = Path(InfinityContext::instance().config()->DataDir()) / full_ckp_info.path_;

if(!VirtualStore::Exists(catalog_path)){
VirtualStore::DownloadObject(catalog_path, catalog_path);
}

auto [catalog_file_handle, status] = VirtualStore::Open(catalog_path, FileAccessMode::kRead);
if (!status.ok()) {
UnrecoverableError(status.message());
Expand Down Expand Up @@ -1013,6 +1022,9 @@ void Catalog::SaveFullCatalog(TxnTimeStamp max_commit_ts, String &full_catalog_p

// Rename temp file to regular catalog file
VirtualStore::Rename(catalog_tmp_path, full_path);
if(InfinityContext::instance().GetServerRole() == NodeRole::kLeader){
VirtualStore::UploadObject(full_path, full_path);
}

global_catalog_delta_entry_->InitFullCheckpointTs(max_commit_ts);

Expand Down Expand Up @@ -1097,7 +1109,10 @@ bool Catalog::SaveDeltaCatalog(TxnTimeStamp last_ckp_ts, TxnTimeStamp &max_commi
}

out_file_handle->Append((reinterpret_cast<const char *>(buf.data())), act_size);

out_file_handle->Sync();
if(InfinityContext::instance().GetServerRole() == NodeRole::kLeader){
VirtualStore::UploadObject(full_path, full_path);
}
// {
// log for delta op debug
// std::stringstream ss;
Expand Down
17 changes: 15 additions & 2 deletions src/storage/persistence/persist_result_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,32 @@ module persist_result_handler;

import infinity_exception;
import third_party;
import virtual_store;
import infinity_context;
import peer_task;

namespace fs = std::filesystem;

namespace infinity {

void PersistResultHandler::HandleWriteResult(const PersistWriteResult &result) {
for ([[maybe_unused]] const String &persist_key : result.persist_keys_) {
//
for (const String &persist_key : result.persist_keys_) {
String persist_path = pm_->GetObjPath(persist_key);
if(InfinityContext::instance().GetServerRole() == NodeRole::kLeader){
VirtualStore::UploadObject(persist_path, persist_path);
}
}
for (const String &drop_key : result.drop_keys_) {
String drop_path = pm_->GetObjPath(drop_key);
fs::remove(drop_path);
}
for (const String &drop_key : result.drop_from_remote_keys_) {
String drop_path = pm_->GetObjPath(drop_key);
fs::remove(drop_path);
if(InfinityContext::instance().GetServerRole() == NodeRole::kLeader){
VirtualStore::RemoveObject(drop_path);
}
}
}

ObjAddr PersistResultHandler::HandleReadResult(const PersistReadResult &result) {
Expand Down
22 changes: 17 additions & 5 deletions src/storage/persistence/persistence_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ PersistWriteResult PersistenceManager::Persist(const String &file_path, const St
std::lock_guard<std::mutex> lock(mtx_);
auto it = local_path_obj_.find(local_path);
if (it != local_path_obj_.end()) {
CleanupNoLock(it->second, result.persist_keys_, result.drop_keys_);
CleanupNoLock(it->second, result.persist_keys_, result.drop_keys_, result.drop_from_remote_keys_);
LOG_TRACE(fmt::format("Persist deleted mapping from local path {} to ObjAddr({}, {}, {})",
local_path,
it->second.obj_key_,
Expand Down Expand Up @@ -237,6 +237,14 @@ PersistReadResult PersistenceManager::GetObjCache(const String &file_path) {
result.obj_addr_ = it->second;
if (ObjStat *obj_stat = objects_->Get(it->second.obj_key_); obj_stat != nullptr) {
LOG_TRACE(fmt::format("GetObjCache object {} ref count {}", it->second.obj_key_, obj_stat->ref_count_));
if(!obj_stat->cached_){
String read_path = GetObjPath(result.obj_addr_.obj_key_);
VirtualStore::DownloadObject(read_path, read_path);
if(VirtualStore::Exists(read_path)){
LOG_TRACE(fmt::format("GetObjCache download object {}", read_path));
obj_stat->cached_ = true;
}
}
result.cached_ = obj_stat->cached_;
} else {
if (it->second.obj_key_ != current_object_key_) {
Expand Down Expand Up @@ -284,7 +292,7 @@ PersistWriteResult PersistenceManager::PutObjCache(const String &file_path) {
if (it->second.part_size_ == 0) {
UnrecoverableError(fmt::format("PutObjCache object {} part size is 0", it->second.obj_key_));
}
ObjStat *obj_stat = objects_->Release(it->second.obj_key_, result.drop_keys_);
ObjStat *obj_stat = objects_->Release(it->second.obj_key_, result.drop_keys_);
if (obj_stat == nullptr) {
if (it->second.obj_key_ != current_object_key_) {
UnrecoverableError(fmt::format("PutObjCache object {} not found", it->second.obj_key_));
Expand Down Expand Up @@ -332,7 +340,11 @@ void PersistenceManager::CurrentObjAppendNoLock(const String &tmp_file_path, Siz
dstFile.close();
}

void PersistenceManager::CleanupNoLock(const ObjAddr &object_addr, Vector<String> &persist_keys, Vector<String> &drop_keys, bool check_ref_count) {
void PersistenceManager::CleanupNoLock(const ObjAddr &object_addr,
Vector<String> &persist_keys,
Vector<String> &drop_keys,
Vector<String> &drop_from_remote_keys,
bool check_ref_count) {
ObjStat *obj_stat = objects_->GetNoCount(object_addr.obj_key_);
if (obj_stat == nullptr) {
if (object_addr.obj_key_ == current_object_key_) {
Expand Down Expand Up @@ -412,7 +424,7 @@ void PersistenceManager::CleanupNoLock(const ObjAddr &object_addr, Vector<String
String error_message = fmt::format("Failed to find object key");
UnrecoverableError(error_message);
}
drop_keys.emplace_back(object_addr.obj_key_);
drop_from_remote_keys.emplace_back(object_addr.obj_key_);
objects_->Invalidate(object_addr.obj_key_);
LOG_TRACE(fmt::format("Deleted object {}", object_addr.obj_key_));
}
Expand Down Expand Up @@ -483,7 +495,7 @@ PersistWriteResult PersistenceManager::Cleanup(const String &file_path) {
LOG_WARN(error_message);
return result;
}
CleanupNoLock(it->second, result.persist_keys_, result.drop_keys_, true);
CleanupNoLock(it->second, result.persist_keys_, result.drop_keys_, result.drop_from_remote_keys_, true);
LOG_TRACE(fmt::format("Deleted mapping from local path {} to ObjAddr({}, {}, {})",
local_path,
it->second.obj_key_,
Expand Down
24 changes: 15 additions & 9 deletions src/storage/persistence/persistence_manager.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,17 @@ export struct ObjAddr {
};

export struct PersistWriteResult {
ObjAddr obj_addr_; // where data is persisted, only returned by Persist
Vector<String> persist_keys_; // object that should be persisted to local disk. because of cleanup current_object
Vector<String> drop_keys_; // object that should be removed from local disk. because of 1. disk used over limit (TODO) 2. object's all parts are deleted
ObjAddr obj_addr_; // where data is persisted, only returned by Persist
Vector<String> persist_keys_; // object that should be persisted to local disk. because of cleanup current_object
Vector<String> drop_keys_; // object that should be removed from local disk. because of 1. disk used over limit
Vector<String> drop_from_remote_keys_; // object that should be removed from remote storage. because of object's all parts are deleted
};

export struct PersistReadResult {
ObjAddr obj_addr_; // where data should read from
bool cached_; // whether the object is in localdisk cache
Vector<String> drop_keys_;
ObjAddr obj_addr_; // where data should read from
bool cached_; // whether the object is in localdisk cache
Vector<String> drop_keys_; // object that should be removed from local disk. because of 1. disk used over limit
Vector<String> drop_from_remote_keys_; // object that should be removed from remote storage. because of object's all parts are deleted
};

export class PersistenceManager {
Expand Down Expand Up @@ -104,7 +106,11 @@ private:
void CurrentObjFinalizeNoLock(Vector<String> &persist_keys, Vector<String> &drop_keys);

// Cleanup
void CleanupNoLock(const ObjAddr &object_addr, Vector<String> &persist_keys, Vector<String> &drop_keys ,bool check_ref_count = false);
void CleanupNoLock(const ObjAddr &object_addr,
Vector<String> &persist_keys,
Vector<String> &drop_keys,
Vector<String> &drop_from_remote_keys,
bool check_ref_count = false);

String RemovePrefix(const String &path);

Expand All @@ -125,7 +131,7 @@ private:
mutable std::mutex mtx_;
// HashMap<String, ObjStat> objects_; // obj_key -> ObjStat
UniquePtr<ObjectStatAccessorBase> objects_; // obj_key -> ObjStat
HashMap<String, ObjAddr> local_path_obj_; // local_file_path -> ObjAddr
HashMap<String, ObjAddr> local_path_obj_; // local_file_path -> ObjAddr
// Current unsealed object key
String current_object_key_;
SizeT current_object_size_ = 0;
Expand All @@ -141,7 +147,7 @@ export struct AddrSerializer {
void InitializeValid(PersistenceManager *persistence_manager);

SizeT GetSizeInBytes() const;

void WriteBufAdv(char *&buf) const;

Vector<String> ReadBufAdv(const char *&buf);
Expand Down
Loading