Skip to content

Commit

Permalink
Integrates S3 (#2003)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

_Briefly describe what this PR aims to solve. Include background context
that will help reviewers understand the purpose of the PR._

### Type of change
- [x] New Feature (non-breaking change which adds functionality)

---------

Co-authored-by: shenyushi <[email protected]>
  • Loading branch information
Ami11111 and small-turtle-1 authored Oct 10, 2024
1 parent 2add773 commit db1fab5
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 17 deletions.
17 changes: 16 additions & 1 deletion src/storage/meta/catalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ import chunk_index_entry;
import log_file;
import persist_result_handler;
import local_file_handle;
import peer_task;

namespace infinity {

Expand Down Expand Up @@ -551,6 +552,10 @@ Catalog::LoadFromFiles(const FullCatalogFileInfo &full_ckp_info, const Vector<De
UniquePtr<CatalogDeltaEntry> Catalog::LoadFromFileDelta(const DeltaCatalogFileInfo &delta_ckp_info) {
const auto &catalog_path = delta_ckp_info.path_;

if(!VirtualStore::Exists(catalog_path)){
VirtualStore::DownloadObject(catalog_path, catalog_path);
}

auto [catalog_file_handle, status] = VirtualStore::Open(catalog_path, FileAccessMode::kRead);
if (!status.ok()) {
UnrecoverableError(status.message());
Expand Down Expand Up @@ -946,6 +951,10 @@ void Catalog::LoadFromEntryDelta(UniquePtr<CatalogDeltaEntry> delta_entry, Buffe
UniquePtr<Catalog> Catalog::LoadFromFile(const FullCatalogFileInfo &full_ckp_info, BufferManager *buffer_mgr) {
const auto &catalog_path = Path(InfinityContext::instance().config()->DataDir()) / full_ckp_info.path_;

if(!VirtualStore::Exists(catalog_path)){
VirtualStore::DownloadObject(catalog_path, catalog_path);
}

auto [catalog_file_handle, status] = VirtualStore::Open(catalog_path, FileAccessMode::kRead);
if (!status.ok()) {
UnrecoverableError(status.message());
Expand Down Expand Up @@ -1013,6 +1022,9 @@ void Catalog::SaveFullCatalog(TxnTimeStamp max_commit_ts, String &full_catalog_p

// Rename temp file to regular catalog file
VirtualStore::Rename(catalog_tmp_path, full_path);
if(InfinityContext::instance().GetServerRole() == NodeRole::kLeader){
VirtualStore::UploadObject(full_path, full_path);
}

global_catalog_delta_entry_->InitFullCheckpointTs(max_commit_ts);

Expand Down Expand Up @@ -1097,7 +1109,10 @@ bool Catalog::SaveDeltaCatalog(TxnTimeStamp last_ckp_ts, TxnTimeStamp &max_commi
}

out_file_handle->Append((reinterpret_cast<const char *>(buf.data())), act_size);

out_file_handle->Sync();
if(InfinityContext::instance().GetServerRole() == NodeRole::kLeader){
VirtualStore::UploadObject(full_path, full_path);
}
// {
// log for delta op debug
// std::stringstream ss;
Expand Down
17 changes: 15 additions & 2 deletions src/storage/persistence/persist_result_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,32 @@ module persist_result_handler;

import infinity_exception;
import third_party;
import virtual_store;
import infinity_context;
import peer_task;

namespace fs = std::filesystem;

namespace infinity {

void PersistResultHandler::HandleWriteResult(const PersistWriteResult &result) {
for ([[maybe_unused]] const String &persist_key : result.persist_keys_) {
//
for (const String &persist_key : result.persist_keys_) {
String persist_path = pm_->GetObjPath(persist_key);
if(InfinityContext::instance().GetServerRole() == NodeRole::kLeader){
VirtualStore::UploadObject(persist_path, persist_path);
}
}
for (const String &drop_key : result.drop_keys_) {
String drop_path = pm_->GetObjPath(drop_key);
fs::remove(drop_path);
}
for (const String &drop_key : result.drop_from_remote_keys_) {
String drop_path = pm_->GetObjPath(drop_key);
fs::remove(drop_path);
if(InfinityContext::instance().GetServerRole() == NodeRole::kLeader){
VirtualStore::RemoveObject(drop_path);
}
}
}

ObjAddr PersistResultHandler::HandleReadResult(const PersistReadResult &result) {
Expand Down
22 changes: 17 additions & 5 deletions src/storage/persistence/persistence_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ PersistWriteResult PersistenceManager::Persist(const String &file_path, const St
std::lock_guard<std::mutex> lock(mtx_);
auto it = local_path_obj_.find(local_path);
if (it != local_path_obj_.end()) {
CleanupNoLock(it->second, result.persist_keys_, result.drop_keys_);
CleanupNoLock(it->second, result.persist_keys_, result.drop_keys_, result.drop_from_remote_keys_);
LOG_TRACE(fmt::format("Persist deleted mapping from local path {} to ObjAddr({}, {}, {})",
local_path,
it->second.obj_key_,
Expand Down Expand Up @@ -237,6 +237,14 @@ PersistReadResult PersistenceManager::GetObjCache(const String &file_path) {
result.obj_addr_ = it->second;
if (ObjStat *obj_stat = objects_->Get(it->second.obj_key_); obj_stat != nullptr) {
LOG_TRACE(fmt::format("GetObjCache object {} ref count {}", it->second.obj_key_, obj_stat->ref_count_));
if(!obj_stat->cached_){
String read_path = GetObjPath(result.obj_addr_.obj_key_);
VirtualStore::DownloadObject(read_path, read_path);
if(VirtualStore::Exists(read_path)){
LOG_TRACE(fmt::format("GetObjCache download object {}", read_path));
obj_stat->cached_ = true;
}
}
result.cached_ = obj_stat->cached_;
} else {
if (it->second.obj_key_ != current_object_key_) {
Expand Down Expand Up @@ -284,7 +292,7 @@ PersistWriteResult PersistenceManager::PutObjCache(const String &file_path) {
if (it->second.part_size_ == 0) {
UnrecoverableError(fmt::format("PutObjCache object {} part size is 0", it->second.obj_key_));
}
ObjStat *obj_stat = objects_->Release(it->second.obj_key_, result.drop_keys_);
ObjStat *obj_stat = objects_->Release(it->second.obj_key_, result.drop_keys_);
if (obj_stat == nullptr) {
if (it->second.obj_key_ != current_object_key_) {
UnrecoverableError(fmt::format("PutObjCache object {} not found", it->second.obj_key_));
Expand Down Expand Up @@ -332,7 +340,11 @@ void PersistenceManager::CurrentObjAppendNoLock(const String &tmp_file_path, Siz
dstFile.close();
}

void PersistenceManager::CleanupNoLock(const ObjAddr &object_addr, Vector<String> &persist_keys, Vector<String> &drop_keys, bool check_ref_count) {
void PersistenceManager::CleanupNoLock(const ObjAddr &object_addr,
Vector<String> &persist_keys,
Vector<String> &drop_keys,
Vector<String> &drop_from_remote_keys,
bool check_ref_count) {
ObjStat *obj_stat = objects_->GetNoCount(object_addr.obj_key_);
if (obj_stat == nullptr) {
if (object_addr.obj_key_ == current_object_key_) {
Expand Down Expand Up @@ -412,7 +424,7 @@ void PersistenceManager::CleanupNoLock(const ObjAddr &object_addr, Vector<String
String error_message = fmt::format("Failed to find object key");
UnrecoverableError(error_message);
}
drop_keys.emplace_back(object_addr.obj_key_);
drop_from_remote_keys.emplace_back(object_addr.obj_key_);
objects_->Invalidate(object_addr.obj_key_);
LOG_TRACE(fmt::format("Deleted object {}", object_addr.obj_key_));
}
Expand Down Expand Up @@ -483,7 +495,7 @@ PersistWriteResult PersistenceManager::Cleanup(const String &file_path) {
LOG_WARN(error_message);
return result;
}
CleanupNoLock(it->second, result.persist_keys_, result.drop_keys_, true);
CleanupNoLock(it->second, result.persist_keys_, result.drop_keys_, result.drop_from_remote_keys_, true);
LOG_TRACE(fmt::format("Deleted mapping from local path {} to ObjAddr({}, {}, {})",
local_path,
it->second.obj_key_,
Expand Down
24 changes: 15 additions & 9 deletions src/storage/persistence/persistence_manager.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,17 @@ export struct ObjAddr {
};

export struct PersistWriteResult {
ObjAddr obj_addr_; // where data is persisted, only returned by Persist
Vector<String> persist_keys_; // object that should be persisted to local disk. because of cleanup current_object
Vector<String> drop_keys_; // object that should be removed from local disk. because of 1. disk used over limit (TODO) 2. object's all parts are deleted
ObjAddr obj_addr_; // where data is persisted, only returned by Persist
Vector<String> persist_keys_; // object that should be persisted to local disk. because of cleanup current_object
Vector<String> drop_keys_; // object that should be removed from local disk. because of 1. disk used over limit
Vector<String> drop_from_remote_keys_; // object that should be removed from remote storage. because of object's all parts are deleted
};

export struct PersistReadResult {
ObjAddr obj_addr_; // where data should read from
bool cached_; // whether the object is in localdisk cache
Vector<String> drop_keys_;
ObjAddr obj_addr_; // where data should read from
bool cached_; // whether the object is in localdisk cache
Vector<String> drop_keys_; // object that should be removed from local disk. because of 1. disk used over limit
Vector<String> drop_from_remote_keys_; // object that should be removed from remote storage. because of object's all parts are deleted
};

export class PersistenceManager {
Expand Down Expand Up @@ -104,7 +106,11 @@ private:
void CurrentObjFinalizeNoLock(Vector<String> &persist_keys, Vector<String> &drop_keys);

// Cleanup
void CleanupNoLock(const ObjAddr &object_addr, Vector<String> &persist_keys, Vector<String> &drop_keys ,bool check_ref_count = false);
void CleanupNoLock(const ObjAddr &object_addr,
Vector<String> &persist_keys,
Vector<String> &drop_keys,
Vector<String> &drop_from_remote_keys,
bool check_ref_count = false);

String RemovePrefix(const String &path);

Expand All @@ -125,7 +131,7 @@ private:
mutable std::mutex mtx_;
// HashMap<String, ObjStat> objects_; // obj_key -> ObjStat
UniquePtr<ObjectStatAccessorBase> objects_; // obj_key -> ObjStat
HashMap<String, ObjAddr> local_path_obj_; // local_file_path -> ObjAddr
HashMap<String, ObjAddr> local_path_obj_; // local_file_path -> ObjAddr
// Current unsealed object key
String current_object_key_;
SizeT current_object_size_ = 0;
Expand All @@ -141,7 +147,7 @@ export struct AddrSerializer {
void InitializeValid(PersistenceManager *persistence_manager);

SizeT GetSizeInBytes() const;

void WriteBufAdv(char *&buf) const;

Vector<String> ReadBufAdv(const char *&buf);
Expand Down

0 comments on commit db1fab5

Please sign in to comment.