Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix object storage bugs #2120

Merged
merged 5 commits into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions conf/pytest_parallel_infinity_follower.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
[general]
version = "0.5.0"
time_zone = "utc-8"
server_mode = "cluster"

[network]
server_address = "0.0.0.0"
postgres_port = 5434
http_port = 23822
client_port = 23819
connection_pool_size = 128
peer_ip = "0.0.0.0"
peer_port = 23852

[log]
log_filename = "infinity.log"
log_dir = "/var/infinity/follower/log"
log_to_stdout = true
log_file_max_size = "10GB"
log_file_rotate_count = 10

# trace/debug/info/warning/error/critical 6 log levels, default: info
log_level = "trace"

[storage]
persistence_dir = "/var/infinity/follower/persistence"
data_dir = "/var/infinity/follower/data"
# periodically activates garbage collection:
# 0 means real-time,
# s means seconds, for example "60s", 60 seconds
# m means minutes, for example "60m", 60 minutes
# h means hours, for example "1h", 1 hour
optimize_interval = "10s"
cleanup_interval = "60s"
compact_interval = "120s"

# dump memory index entry when it reachs the capacity
mem_index_capacity = 1048576

storage_type = "minio"

[storage.object_storage]
url = "127.0.0.1:9000"
bucket_name = "infinity"
access_key = "pk9s2oJFX1qXLYObwIcz"
secret_key = "ho1G9xh2iKup4Xj9Ja3eRgg8bfwMyDv4fvkQGcZl"
enable_https = false

[buffer]
buffer_manager_size = "8GB"
lru_num = 7
temp_dir = "/var/infinity/follower/tmp"
result_cache_mode = "on"

memindex_memory_quota = "1GB"

[wal]
wal_dir = "/var/infinity/follower/wal"
full_checkpoint_interval = "86400s"
delta_checkpoint_interval = "60s"
# delta_checkpoint_threshold = 1000000000
wal_compact_threshold = "1GB"

# flush_at_once: write and flush log each commit
# only_write: write log, OS control when to flush the log, default
# flush_per_second: logs are written after each commit and flushed to disk per second.
wal_flush = "only_write"

[resource]
resource_dir = "/var/infinity/follower/resource"
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
[general]
version = "0.5.0"
time_zone = "utc-8"
server_mode = "standalone"
server_mode = "cluster"

[network]
server_address = "0.0.0.0"
postgres_port = 5433
postgres_port = 5432
http_port = 23820
client_port = 23817
connection_pool_size = 128
Expand All @@ -20,7 +20,7 @@ log_file_max_size = "10GB"
log_file_rotate_count = 10

# trace/debug/info/warning/error/critical 6 log levels, default: info
log_level = "debug"
log_level = "trace"

[storage]
persistence_dir = "/var/infinity/leader/persistence"
Expand All @@ -36,19 +36,21 @@ compact_interval = "120s"

# dump memory index entry when it reachs the capacity
mem_index_capacity = 1048576

storage_type = "minio"

[storage.object_storage]
url = "192.168.200.165:9000"
url = "127.0.0.1:9000"
bucket_name = "infinity"
access_key = "minioadmin"
secret_key = "minioadmin"
enable_https = false

[buffer]
buffer_manager_size = "4GB"
buffer_manager_size = "8GB"
lru_num = 7
temp_dir = "/var/infinity/leader/tmp"
result_cache_mode = "on"

memindex_memory_quota = "1GB"

Expand All @@ -65,4 +67,4 @@ wal_compact_threshold = "1GB"
wal_flush = "only_write"

[resource]
resource_dir = "/var/infinity/leader/resource"
resource_dir = "/var/infinity/leader/resource"
1 change: 1 addition & 0 deletions python/test_pysdk/test_connection_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def teardown(self):
res = self.infinity_obj.disconnect()
assert res.error_code == ErrorCode.OK

#@pytest.mark.skip(reason = "cluster fail")
@pytest.mark.usefixtures("skip_if_local_infinity")
def test_connection_pool(self, suffix):
connection_pool = ConnectionPool(uri=self.uri, max_size=8)
Expand Down
12 changes: 8 additions & 4 deletions src/storage/io/virtual_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import stream_reader;
import s3_client_minio;
import infinity_context;
import object_storage_task;
import admin_statement;

namespace infinity {

Expand Down Expand Up @@ -448,11 +449,14 @@ Status VirtualStore::InitRemoteStore(StorageType storage_type,
}

bucket_ = bucket;
if (s3_client_->BucketExists(bucket)) {
return Status::OK();
} else {
return s3_client_->MakeBucket(bucket);
if(InfinityContext::instance().GetServerRole() == NodeRole::kLeader or InfinityContext::instance().GetServerRole() == NodeRole::kStandalone) {
if (s3_client_->BucketExists(bucket)) {
return Status::OK();
} else {
return s3_client_->MakeBucket(bucket);
}
}
return Status::OK();
}

Status VirtualStore::UnInitRemoteStore() {
Expand Down
1 change: 0 additions & 1 deletion src/storage/meta/entry/block_column_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ UniquePtr<BlockColumnEntry> BlockColumnEntry::NewBlockColumnEntry(const BlockEnt
buffer_mgr->persistence_manager());

block_column_entry->buffer_ = buffer_mgr->AllocateBufferObject(std::move(file_worker));

return block_column_entry;
}

Expand Down
5 changes: 4 additions & 1 deletion src/storage/persistence/persist_result_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@ void PersistResultHandler::HandleWriteResult(const PersistWriteResult &result) {

ObjAddr PersistResultHandler::HandleReadResult(const PersistReadResult &result) {
if (!result.cached_) {
UnrecoverableError(fmt::format("HandleReadResult: object {} is not cached", result.obj_addr_.obj_key_));
String read_path = InfinityContext::instance().persistence_manager()->GetObjPath(result.obj_addr_.obj_key_);
VirtualStore::DownloadObject(read_path, result.obj_addr_.obj_key_);
LOG_TRACE(fmt::format("GetObjCache download object {}", read_path));
result.obj_stat_->cached_ = true;
}
return result.obj_addr_;
}
Expand Down
15 changes: 7 additions & 8 deletions src/storage/persistence/persistence_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -256,15 +256,14 @@ PersistReadResult PersistenceManager::GetObjCache(const String &file_path) {
result.cached_ = true;
} else if (ObjStat *obj_stat = objects_->Get(it->second.obj_key_); obj_stat != nullptr) {
LOG_TRACE(fmt::format("GetObjCache object {} ref count {}", it->second.obj_key_, obj_stat->ref_count_));
if(!obj_stat->cached_){
String read_path = GetObjPath(result.obj_addr_.obj_key_);
VirtualStore::DownloadObject(read_path, result.obj_addr_.obj_key_);
if(VirtualStore::Exists(read_path)){
LOG_TRACE(fmt::format("GetObjCache download object {}", read_path));
obj_stat->cached_ = true;
}
String read_path = GetObjPath(result.obj_addr_.obj_key_);
if(!VirtualStore::Exists(read_path)){
obj_stat->cached_ = false;
result.cached_ = false;
result.obj_stat_ = obj_stat;
} else {
result.cached_ = true;
}
result.cached_ = obj_stat->cached_;
} else {
if (it->second.obj_key_ != current_object_key_) {
String error_message = fmt::format("GetObjCache object {} not found", it->second.obj_key_);
Expand Down
1 change: 1 addition & 0 deletions src/storage/persistence/persistence_manager.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ export struct PersistReadResult {
bool cached_; // whether the object is in localdisk cache
Vector<String> drop_keys_; // object that should be removed from local disk. because of 1. disk used over limit
Vector<String> drop_from_remote_keys_; // object that should be removed from remote storage. because of object's all parts are deleted
ObjStat *obj_stat_{nullptr}; // object stat
};

export class PersistenceManager {
Expand Down
12 changes: 12 additions & 0 deletions src/storage/txn/txn_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ import bg_task;
import compact_statement;
import build_fast_rough_filter_task;
import create_index_info;
import persistence_manager;
import infinity_context;
import persist_result_handler;

namespace infinity {

Expand Down Expand Up @@ -635,6 +638,15 @@ void TxnStore::PrepareCommit1() {
for (const auto &[table_name, table_store] : txn_tables_store_) {
table_store->PrepareCommit1(segment_infos);
}
if(!segment_infos.empty()) {
PersistenceManager *pm = InfinityContext::instance().persistence_manager();
if (pm != nullptr) {
PersistResultHandler handler(pm);
PersistWriteResult result = pm->CurrentObjFinalize(true);
handler.HandleWriteResult(result);
}
LOG_TRACE("Finalize current object to ensure PersistenceManager be in a consistent state");
}
}

void TxnStore::PrepareCommit(TransactionID txn_id, TxnTimeStamp commit_ts, BufferManager *buffer_mgr) {
Expand Down
1 change: 1 addition & 0 deletions tools/run_pysdk_remote_infinity_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def python_sdk_test(python_test_dir: str, pytest_mark: str):
"pytest",
# "--capture=tee-sys",
"--tb=short",
#"-v",
"-x",
"-m",
pytest_mark,
Expand Down