Skip to content

Commit

Permalink
Add config options for s3 storage (#1990)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?
- Add config options for s3 storage
- Add unit test case for s3 storage

### Type of change

- [x] New Feature (non-breaking change which adds functionality)
- [x] Test cases
  • Loading branch information
Ami11111 authored Oct 9, 2024
1 parent 8f4bcdc commit 69a2ce7
Show file tree
Hide file tree
Showing 15 changed files with 179 additions and 5 deletions.
2 changes: 2 additions & 0 deletions src/common/third_party.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ namespace minio {
export using minio::s3::UploadObjectResponse;
export using minio::s3::PutObjectArgs;
export using minio::s3::PutObjectResponse;
export using minio::s3::BucketExistsArgs;
export using minio::s3::BucketExistsResponse;
} // namespace s3

namespace creds {
Expand Down
73 changes: 73 additions & 0 deletions src/main/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1405,6 +1405,51 @@ Status Config::Init(const SharedPtr<String> &config_path, DefaultConfig *default
global_options_.AddOption(std::move(object_bucket_option));
break;
}
case GlobalOptionIndex::kObjectStorageAccessKey: {
String object_storage_access_key_str;
if (elem.second.is_string()) {
Optional<String> str_optional = elem.second.value<std::string>();
if (!str_optional.has_value()) {
return Status::InvalidConfig("'access_key' field in [storage.object_storage] isn't string");
}
object_storage_access_key_str = *str_optional;
} else {
return Status::InvalidConfig("'access_key' field in [storage.object_storage] isn't string");
}
auto object_storage_access_key_option = MakeUnique<StringOption>(OBJECT_STORAGE_ACCESS_KEY_OPTION_NAME, object_storage_access_key_str);
global_options_.AddOption(std::move(object_storage_access_key_option));
break;
}
case GlobalOptionIndex::kObjectStorageSecretKey: {
String object_storage_secret_key_str;
if (elem.second.is_string()) {
Optional<String> str_optional = elem.second.value<std::string>();
if (!str_optional.has_value()) {
return Status::InvalidConfig("'secret_key' field in [storage.object_storage] isn't string");
}
object_storage_secret_key_str = *str_optional;
} else {
return Status::InvalidConfig("'secret_key' field in [storage.object_storage] isn't string");
}
auto object_storage_secret_key_option = MakeUnique<StringOption>(OBJECT_STORAGE_SECRET_KEY_OPTION_NAME, object_storage_secret_key_str);
global_options_.AddOption(std::move(object_storage_secret_key_option));
break;
}
case GlobalOptionIndex::kObjectStorageHttps: {
bool https = false;
if (elem.second.is_boolean()) {
https = elem.second.value_or(https);
} else {
return Status::InvalidConfig("'enable_https' field isn't boolean.");
}

UniquePtr<BooleanOption> object_storage_https_option = MakeUnique<BooleanOption>(OBJECT_STORAGE_ENABLE_HTTPS_OPTION_NAME, https);
Status status = global_options_.AddOption(std::move(object_storage_https_option));
if(!status.ok()) {
UnrecoverableError(status.message());
}
break;
}
default: {
return Status::InvalidConfig(fmt::format("Unrecognized config parameter: {} in 'storage.object_storage' field", var_name));
}
Expand All @@ -1421,6 +1466,15 @@ Status Config::Init(const SharedPtr<String> &config_path, DefaultConfig *default
UnrecoverableError(status.message());
}
}
if (global_options_.GetOptionByIndex(GlobalOptionIndex::kObjectStorageAccessKey) == nullptr) {
return Status::InvalidConfig("No 'access_key' field in [storage.object_storage]");
}
if (global_options_.GetOptionByIndex(GlobalOptionIndex::kObjectStorageSecretKey) == nullptr) {
return Status::InvalidConfig("No 'secret_key' field in [storage.object_storage]");
}
if (global_options_.GetOptionByIndex(GlobalOptionIndex::kObjectStorageHttps) == nullptr) {
return Status::InvalidConfig("No 'enable_https' field in [storage.object_storage]");
}
break;
}
default: {
Expand Down Expand Up @@ -2161,6 +2215,21 @@ String Config::ObjectStorageBucket() {
return global_options_.GetStringValue(GlobalOptionIndex::kObjectStorageBucket);
}

String Config::ObjectStorageAccessKey() {
std::lock_guard<std::mutex> guard(mutex_);
return global_options_.GetStringValue(GlobalOptionIndex::kObjectStorageAccessKey);
}

String Config::ObjectStorageSecretKey() {
std::lock_guard<std::mutex> guard(mutex_);
return global_options_.GetStringValue(GlobalOptionIndex::kObjectStorageSecretKey);
}

bool Config::ObjectStorageHttps() {
std::lock_guard<std::mutex> guard(mutex_);
return global_options_.GetBoolValue(GlobalOptionIndex::kObjectStorageHttps);
}

// Persistence
String Config::PersistenceDir() {
std::lock_guard<std::mutex> guard(mutex_);
Expand Down Expand Up @@ -2317,6 +2386,10 @@ void Config::PrintAll() {
case StorageType::kMinio: {
fmt::print(" - object_storage_url: {}\n", ObjectStorageUrl());
fmt::print(" - object_storage_bucket: {}\n", ObjectStorageBucket());
fmt::print(" - object_storage_access_key: {}\n", ObjectStorageAccessKey());
fmt::print(" - object_storage_secret_key: {}\n", ObjectStorageSecretKey());
fmt::print(" - object_storage_enable_https: {}\n", ObjectStorageHttps());
break;
}
default: {
break;
Expand Down
3 changes: 3 additions & 0 deletions src/main/config.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ public:
StorageType StorageType();
String ObjectStorageUrl();
String ObjectStorageBucket();
String ObjectStorageAccessKey();
String ObjectStorageSecretKey();
bool ObjectStorageHttps();

// Persistence
String PersistenceDir();
Expand Down
3 changes: 3 additions & 0 deletions src/main/options.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ GlobalOptions::GlobalOptions() {
name2index_[String(OBJECT_STORAGE_OPTION_NAME)] = GlobalOptionIndex::kObjectStorage;
name2index_[String(OBJECT_STORAGE_URL_OPTION_NAME)] = GlobalOptionIndex::kObjectStorageUrl;
name2index_[String(OBJECT_STORAGE_BUCKET_OPTION_NAME)] = GlobalOptionIndex::kObjectStorageBucket;
name2index_[String(OBJECT_STORAGE_ACCESS_KEY_OPTION_NAME)] = GlobalOptionIndex::kObjectStorageAccessKey;
name2index_[String(OBJECT_STORAGE_SECRET_KEY_OPTION_NAME)] = GlobalOptionIndex::kObjectStorageSecretKey;
name2index_[String(OBJECT_STORAGE_ENABLE_HTTPS_OPTION_NAME)] = GlobalOptionIndex::kObjectStorageHttps;

name2index_[String(BUFFER_MANAGER_SIZE_OPTION_NAME)] = GlobalOptionIndex::kBufferManagerSize;
name2index_[String(LRU_NUM_OPTION_NAME)] = GlobalOptionIndex::kLRUNum;
Expand Down
5 changes: 4 additions & 1 deletion src/main/options.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,11 @@ export enum class GlobalOptionIndex : i8 {
kObjectStorage = 39,
kObjectStorageUrl = 40,
kObjectStorageBucket = 41,
kObjectStorageAccessKey = 42,
kObjectStorageSecretKey = 43,
kObjectStorageHttps = 44,

kInvalid = 42,
kInvalid = 45,
};

export struct GlobalOptions {
Expand Down
2 changes: 2 additions & 0 deletions src/storage/io/s3_client.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ public:
virtual Status
CopyObject(const String &src_bucket_name, const String &src_object_name, const String &dst_bucket_name, const String &dst_object_name) = 0;

virtual bool BucketExists(const String &bucket_name) = 0;

protected:
String url;
bool https;
Expand Down
17 changes: 17 additions & 0 deletions src/storage/io/s3_client_minio.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,21 @@ Status S3ClientMinio::CopyObject(const String &src_bucket_name,
return Status::OK();
}

bool S3ClientMinio::BucketExists(const String &bucket_name) {
// Create bucket exists arguments.
minio::s3::BucketExistsArgs args;
args.bucket = bucket_name;

// Call bucket exists.
minio::s3::BucketExistsResponse resp = client_->BucketExists(args);
// Handle response.
if (resp && resp.exist) {
return true;
} else {
return false;
}

return false;
}

} // namespace infinity
5 changes: 4 additions & 1 deletion src/storage/io/s3_client_minio.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ namespace infinity {
export class S3ClientMinio final : public S3Client {
public:
S3ClientMinio(String _url = "http://localhost:9000", bool _https = false, String _access_key = "minioadmin", String _secret_key = "minioadmin")
: S3Client(_url, _https, _access_key, _secret_key), base_url(_url, _https), provider(_access_key, _secret_key) {}
: S3Client(_url, _https, _access_key, _secret_key), base_url(_url, _https), provider(_access_key, _secret_key) {
client_ = MakeUnique<minio::s3::Client>(base_url, &provider);
}

~S3ClientMinio() = default;

Expand All @@ -23,6 +25,7 @@ public:
Status UploadObject(const String &bucket_name, const String &object_name, const String &file_path) final;
Status RemoveObject(const String &bucket_name, const String &object_name) final;
Status CopyObject(const String &src_bucket_name, const String &src_object_name, const String &dst_bucket_name, const String &dst_object_name) final;
bool BucketExists(const String &bucket_name) final;

private:
minio::s3::BaseUrl base_url;
Expand Down
19 changes: 18 additions & 1 deletion src/storage/io/virtual_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ i32 VirtualStore::MunmapFile(const String &file_path) {

// Remote storage
StorageType VirtualStore::storage_type_ = StorageType::kInvalid;
String VirtualStore::bucket_ = "infinity_bucket";
String VirtualStore::bucket_ = "infinity";
UniquePtr<S3Client> VirtualStore::s3_client_ = nullptr;

Status VirtualStore::InitRemoteStore(StorageType storage_type,
Expand All @@ -435,6 +435,7 @@ Status VirtualStore::InitRemoteStore(StorageType storage_type,
case StorageType::kMinio: {
storage_type_ = StorageType::kMinio;
s3_client_ = MakeUnique<S3ClientMinio>(URL, HTTPS, access_key, secret_key);
break;
}
default: {
return Status::NotSupport("Not support storage type");
Expand Down Expand Up @@ -518,4 +519,20 @@ Status VirtualStore::CopyObject(const String &src_object_name, const String &dst
return Status::OK();
}

bool VirtualStore::BucketExists() {
if (VirtualStore::storage_type_ == StorageType::kLocal) {
return false;
}
switch (VirtualStore::storage_type_) {
case StorageType::kMinio: {
return s3_client_->BucketExists(VirtualStore::bucket_);
}
default: {
return false;
}
}

return false;
}

} // namespace infinity
4 changes: 3 additions & 1 deletion src/storage/io/virtual_store.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public:
bool HTTPS = false,
const String &access_key = "minioadmin",
const String &secret_key = "minioadmin",
const String &bucket = "infinity_bucket");
const String &bucket = "infinity");

static Status UnInitRemoteStore();

Expand All @@ -84,6 +84,8 @@ public:
static Status UploadObject(const String &file_dir, const String& object_name);
static Status RemoveObject(const String &object_name);
static Status CopyObject(const String &src_object_name, const String &dst_object_name);
//
static bool BucketExists();

private:
static std::mutex mtx_;
Expand Down
8 changes: 7 additions & 1 deletion src/storage/storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,13 @@ void Storage::SetStorageMode(StorageMode target_mode) {
if (VirtualStore::IsInit()) {
UnrecoverableError("remote storage system was initialized before.");
}
Status status = VirtualStore::InitRemoteStore(StorageType::kMinio, config_ptr_->ObjectStorageUrl());
Status status = VirtualStore::InitRemoteStore(
StorageType::kMinio,
config_ptr_->ObjectStorageUrl(),
config_ptr_->ObjectStorageHttps(),
config_ptr_->ObjectStorageAccessKey(),
config_ptr_->ObjectStorageSecretKey(),
config_ptr_->ObjectStorageBucket());
if (!status.ok()) {
UnrecoverableError(status.message());
}
Expand Down
3 changes: 3 additions & 0 deletions src/unit_test/main/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ TEST_F(ConfigTest, test2) {
EXPECT_EQ(config.StorageType(), StorageType::kLocal);
EXPECT_EQ(config.ObjectStorageUrl(), "0.0.0.0:9000");
EXPECT_EQ(config.ObjectStorageBucket(), "infinity");
EXPECT_EQ(config.ObjectStorageAccessKey(), "minioadmin");
EXPECT_EQ(config.ObjectStorageSecretKey(), "minioadmin");
EXPECT_EQ(config.ObjectStorageHttps(), false);

// buffer
EXPECT_EQ(config.BufferManagerSize(), 3 * 1024l * 1024l * 1024l);
Expand Down
34 changes: 34 additions & 0 deletions src/unit_test/storage/io/virtual_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -268,4 +268,38 @@ TEST_F(VirtualStoreTest, TestCleanDir) {
EXPECT_FALSE(VirtualStore::Exists(file_path1));
EXPECT_FALSE(VirtualStore::Exists(file_path2));
EXPECT_TRUE(VirtualStore::Exists(dir));
}

TEST_F(VirtualStoreTest, minio_upload) {
using namespace infinity;

VirtualStore::InitRemoteStore();

if(VirtualStore::BucketExists()){
String path = String(GetFullTmpDir()) + "/test_minio_upload.abc";
auto [file_handle, status] = VirtualStore::Open(path, FileAccessMode::kWrite);
if (!status.ok()) {
UnrecoverableError(status.message());
}
SizeT len = 10;
UniquePtr<char[]> data_array = MakeUnique<char[]>(len);
for (SizeT i = 0; i < len; ++i) {
data_array[i] = i + 1;
}
file_handle->Append(data_array.get(), len);
file_handle->Sync();

auto status1 = VirtualStore::UploadObject(path, path);
EXPECT_TRUE(status1.ok());

status1 = VirtualStore::RemoveObject(path);
EXPECT_TRUE(status1.ok());

VirtualStore::DeleteFile(path);
EXPECT_FALSE(VirtualStore::Exists(path));
} else {
LOG_INFO("bucket existence check failed, skip the test");
}

VirtualStore::UnInitRemoteStore();
}
3 changes: 3 additions & 0 deletions test/data/config/infinity_conf.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ storage_type = "local"
[storage.object_storage]
url = "0.0.0.0:9000"
bucket_name = "infinity"
access_key = "minioadmin"
secret_key = "minioadmin"
enable_https = false

[buffer]
buffer_manager_size = "3GB"
Expand Down
3 changes: 3 additions & 0 deletions test/data/config/test_minio_s3_storage.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ mem_index_capacity = 1048576
[storage.object_storage]
url = "127.0.0.1:9000"
bucket_name = "infinity"
access_key = "minioadmin"
secret_key = "minioadmin"
enable_https = false

[buffer]
buffer_manager_size = "4GB"
Expand Down

0 comments on commit 69a2ce7

Please sign in to comment.