Skip to content

Commit

Permalink
Add snapshot, part2 (#2421)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

_Briefly describe what this PR aims to solve. Include background context
that will help reviewers understand the purpose of the PR._

### Type of change

- [x] New Feature (non-breaking change which adds functionality)

---------

Signed-off-by: Jin Hai <[email protected]>
  • Loading branch information
JinHai-CN authored Jan 3, 2025
1 parent 4b0159b commit 9e0b6cc
Show file tree
Hide file tree
Showing 17 changed files with 421 additions and 36 deletions.
2 changes: 2 additions & 0 deletions src/common/utility/utility.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,6 @@ IdentifierValidationStatus IdentifierValidation(const String &identifier);
bool ParseIPPort(const String &str, String &ip, i64 &port);

String StringTransform(const String &source, const String &from, const String &to);

bool CompressDirectory(const String &source_dir, const String &dest_file);
} // namespace infinity
57 changes: 32 additions & 25 deletions src/executor/operator/physical_command.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import periodic_trigger;
import bg_task;
import wal_manager;
import result_cache_manager;
import snapshot;

namespace infinity {

Expand Down Expand Up @@ -456,9 +457,39 @@ bool PhysicalCommand::Execute(QueryContext *query_context, OperatorState *operat
SnapshotCmd *snapshot_cmd = static_cast<SnapshotCmd *>(command_info_.get());
LOG_INFO(fmt::format("Execute snapshot command"));
SnapshotOp snapshot_operation = snapshot_cmd->operation();
switch(snapshot_operation) {
SnapshotScope snapshot_scope = snapshot_cmd->scope();
const String &snapshot_name = snapshot_cmd->name();
switch (snapshot_operation) {
case SnapshotOp::kCreate: {
LOG_INFO(fmt::format("Execute snapshot create"));
switch (snapshot_scope) {
case SnapshotScope::kSystem: {
LOG_INFO(fmt::format("Execute snapshot system"));
break;
}
case SnapshotScope::kDatabase: {
LOG_INFO(fmt::format("Execute snapshot database"));
break;
}
case SnapshotScope::kTable: {
const String &table_name = snapshot_cmd->object_name();
Status snapshot_status = Snapshot::CreateTableSnapshot(query_context, snapshot_name, table_name);
if (!snapshot_status.ok()) {
RecoverableError(snapshot_status);
}
LOG_INFO(fmt::format("Execute snapshot table"));
break;
}
case SnapshotScope::kIgnore: {
LOG_INFO(fmt::format("Execute snapshot ignore"));
break;
}
default: {
String error_message = "Invalid snapshot scope";
UnrecoverableError(error_message);
break;
}
}
break;
}
case SnapshotOp::kDrop: {
Expand All @@ -476,30 +507,6 @@ bool PhysicalCommand::Execute(QueryContext *query_context, OperatorState *operat
}
}

SnapshotScope snapshot_scope = snapshot_cmd->scope();
switch(snapshot_scope) {
case SnapshotScope::kSystem: {
LOG_INFO(fmt::format("Execute snapshot system"));
break;
}
case SnapshotScope::kDatabase: {
LOG_INFO(fmt::format("Execute snapshot database"));
break;
}
case SnapshotScope::kTable: {
LOG_INFO(fmt::format("Execute snapshot table"));
break;
}
case SnapshotScope::kIgnore: {
LOG_INFO(fmt::format("Execute snapshot ignore"));
break;
}
default: {
String error_message = "Invalid snapshot scope";
UnrecoverableError(error_message);
break;
}
}
break;
}
default: {
Expand Down
2 changes: 1 addition & 1 deletion src/executor/operator/snapshot/snapshot.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import query_context;

namespace infinity {

class Snapshot {
export class Snapshot {
public:
static Status CreateTableSnapshot(QueryContext *query_context, const String &snapshot_name, const String& table_name);
static Status RestoreTableSnapshot();
Expand Down
17 changes: 14 additions & 3 deletions src/executor/operator/snapshot/table_snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,24 @@ import txn;
import query_context;
import table_entry;
import status;
import third_party;
import config;

namespace infinity {

Status Snapshot::CreateTableSnapshot(QueryContext *query_context, const String &snapshot_name, const String &table_name) {
// Txn *txn_ptr = query_context->GetTxn();
// const String &db_name = query_context->schema_name();
// auto [table_entry, table_status] = txn_ptr->GetTableByName(db_name, table_name);
Txn *txn_ptr = query_context->GetTxn();
const String &db_name = query_context->schema_name();
Tuple<TableEntry *, Status> result = txn_ptr->GetTableByName(db_name, table_name);
TableEntry *table_entry_ptr = std::get<0>(result);
Status table_status = std::get<1>(result);
if (!table_status.ok()) {
return table_status;
}
SharedPtr<TableSnapshotInfo> table_snapshot = table_entry_ptr->GetSnapshotInfo();
table_snapshot->snapshot_name_ = snapshot_name;
String snapshot_dir = query_context->global_config()->SnapshotDir();
table_snapshot->Serialize(snapshot_dir);

return Status::OK();
}
Expand Down
3 changes: 2 additions & 1 deletion src/parser/statement/command_statement.h
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ class TestCmd final : public CommandInfo {
};

enum class SnapshotOp { kCreate, kDrop, kRestore, kInvalid };
enum class SnapshotScope { kTable, kDatabase, kSystem, kIgnore, kInvalid };
enum class SnapshotScope : uint8_t { kTable = 0, kDatabase, kSystem, kIgnore, kInvalid };

class SnapshotCmd final : public CommandInfo {
public:
Expand All @@ -258,6 +258,7 @@ class SnapshotCmd final : public CommandInfo {
[[nodiscard]] std::string ToString() const final;

const std::string &name() { return name_; }
const std::string &object_name() { return object_name_.value(); }
SnapshotOp operation() { return operation_; }
SnapshotScope &scope() { return scope_; }

Expand Down
247 changes: 247 additions & 0 deletions src/storage/common/snapshot_info.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
// Copyright(C) 2024 InfiniFlow, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

module;

#include <vector>

module snapshot_info;

import stl;
import status;
import command_statement;
// import index_base;
import third_party;
import constant_expr;
import logger;
import virtual_store;
import local_file_handle;
import infinity_exception;
import infinity_context;
import config;
import persistence_manager;
import persist_result_handler;
import defer_op;

namespace infinity {

nlohmann::json BlockColumnSnapshotInfo::Serialize() {
nlohmann::json json_res;
json_res["column_id"] = column_id_;
json_res["filename"] = filename_;
for (const auto &outline_snapshot : outline_snapshots_) {
json_res["outlines"].emplace_back(outline_snapshot->filename_);
}
return json_res;
}

nlohmann::json BlockSnapshotInfo::Serialize() {
nlohmann::json json_res;
json_res["block_id"] = block_id_;
json_res["block_dir"] = block_dir_;
for (const auto &column_block_snapshot : column_block_snapshots_) {
json_res["columns"].emplace_back(column_block_snapshot->Serialize());
}
return json_res;
}

nlohmann::json SegmentSnapshotInfo::Serialize() {
nlohmann::json json_res;
json_res["segment_id"] = segment_id_;
json_res["segment_dir"] = segment_dir_;
for (const auto &block_snapshot : block_snapshots_) {
json_res["blocks"].emplace_back(block_snapshot->Serialize());
}
return json_res;
}

void TableSnapshotInfo::Serialize(const String &save_dir) {

Config *config = InfinityContext::instance().config();
PersistenceManager *persistence_manager = InfinityContext::instance().persistence_manager();

// Get files
Vector<String> original_files = GetFiles();

// Copy files
if (persistence_manager != nullptr) {
PersistResultHandler pm_handler(persistence_manager);
for (const auto &file : original_files) {
PersistReadResult result = persistence_manager->GetObjCache(file);
DeferFn defer_fn([&]() {
auto res = persistence_manager->PutObjCache(file);
pm_handler.HandleWriteResult(res);
});

const ObjAddr &obj_addr = pm_handler.HandleReadResult(result);
if (!obj_addr.Valid()) {
String error_message = fmt::format("Failed to find object for local path {}", file);
UnrecoverableError(error_message);
}
String read_path = persistence_manager->GetObjPath(obj_addr.obj_key_);
LOG_INFO(fmt::format("READ: {} from {}", file, read_path));

auto [reader_handle, reader_open_status] = VirtualStore::Open(read_path, FileAccessMode::kRead);
if (!reader_open_status.ok()) {
UnrecoverableError(reader_open_status.message());
}

auto seek_status = reader_handle->Seek(obj_addr.part_offset_);
if (!seek_status.ok()) {
UnrecoverableError(seek_status.message());
}

auto file_size = obj_addr.part_size_;
auto buffer = std::make_unique<char[]>(file_size);
auto [nread, read_status] = reader_handle->Read(buffer.get(), file_size);

String dst_file_path = fmt::format("{}/{}/{}", save_dir, snapshot_name_, file);
String dst_dir = VirtualStore::GetParentPath(dst_file_path);
if (!VirtualStore::Exists(dst_dir)) {
VirtualStore::MakeDirectory(dst_dir);
}

auto [write_file_handle, writer_open_status] = VirtualStore::Open(dst_file_path, FileAccessMode::kWrite);
if (!writer_open_status.ok()) {
UnrecoverableError(writer_open_status.message());
}

Status write_status = write_file_handle->Append(buffer.get(), file_size);
if (!write_status.ok()) {
UnrecoverableError(write_status.message());
}
write_file_handle->Sync();
}
} else {
String data_dir = config->DataDir();
for (const auto &file : original_files) {
String src_file_path = fmt::format("{}/{}", data_dir, file);
String dst_file_path = fmt::format("{}/{}/{}", save_dir, snapshot_name_, file);
// LOG_INFO(fmt::format("Copy from: {} to {}", src_file_path, dst_file_path));
Status copy_status = VirtualStore::Copy(dst_file_path, src_file_path);
if (!copy_status.ok()) {
RecoverableError(copy_status);
}
}
}

// Compress the directory
String directory = fmt::format("{}/{}", save_dir, snapshot_name_);
String zip_filename = fmt::format("{}/{}.zip", save_dir, snapshot_name_);
String command = fmt::format("zip -r {} {}", directory, zip_filename);
int ret = system(command.c_str());
if (ret != 0) {
Status status = Status::IOError(fmt::format("Failed to compress directory: {}", directory));
RecoverableError(status);
}
// Get the MD5 of compress file
String md5_command = fmt::format("md5sum {}", zip_filename);
FILE *md5_fp = popen(md5_command.c_str(), "r");
if (md5_fp == nullptr) {
Status status = Status::IOError(fmt::format("Failed to get md5 of file: {}", zip_filename));
RecoverableError(status);
}
char md5_buf[1024];
if (fgets(md5_buf, sizeof(md5_buf), md5_fp) == nullptr) {
Status status = Status::IOError(fmt::format("Failed to read md5 of file: {}", zip_filename));
RecoverableError(status);
}
pclose(md5_fp);
String md5 = md5_buf;
md5 = md5.substr(0, md5.find(" "));
LOG_INFO(fmt::format("MD5: {}", md5));

// Remove the directory
VirtualStore::RemoveDirectory(directory);

nlohmann::json json_res;
json_res["version"] = version_;
json_res["snapshot_name"] = snapshot_name_;
json_res["snapshot_scope"] = SnapshotScope::kTable;
json_res["database_name"] = db_name_;
json_res["table_name"] = table_name_;
json_res["table_comment"] = table_comment_;
json_res["md5"] = md5;

json_res["txn_id"] = txn_id_;
json_res["begin_ts"] = begin_ts_;
json_res["commit_ts"] = commit_ts_;
json_res["max_commit_ts"] = max_commit_ts_;

json_res["next_column_id"] = next_column_id_;
json_res["next_segment_id"] = next_segment_id_;

for (const auto &column_def : this->columns_) {
nlohmann::json column_def_json;
column_def_json["column_type"] = column_def->type()->Serialize();
column_def_json["column_id"] = column_def->id();
column_def_json["column_name"] = column_def->name();

for (const auto &column_constraint : column_def->constraints_) {
column_def_json["constraints"].emplace_back(column_constraint);
}

if (!(column_def->comment().empty())) {
column_def_json["column_comment"] = column_def->comment();
}

if (column_def->has_default_value()) {
auto default_expr = dynamic_pointer_cast<ConstantExpr>(column_def->default_expr_);
column_def_json["default"] = default_expr->Serialize();
}

json_res["column_definition"].emplace_back(column_def_json);
}

for (const auto &segment_snapshot_pair : segment_snapshots_) {
json_res["segments"].emplace_back(segment_snapshot_pair.second->Serialize());
}

String json_string = json_res.dump();

Path save_path = Path(save_dir) / fmt::format("{}.json", snapshot_name_);

if (!VirtualStore::Exists(save_dir)) {
VirtualStore::MakeDirectory(save_dir);
}
auto [snapshot_file_handle, status] = VirtualStore::Open(save_path.string(), FileAccessMode::kWrite);
if (!status.ok()) {
UnrecoverableError(fmt::format("{}: {}", save_path.string(), status.message()));
}

status = snapshot_file_handle->Append(json_string.data(), json_string.size());
if (!status.ok()) {
RecoverableError(status);
}
snapshot_file_handle->Sync();

LOG_INFO(fmt::format("{}", json_res.dump()));
}

Vector<String> TableSnapshotInfo::GetFiles() const {
Vector<String> files;
for (const auto &segment_snapshot_pair : segment_snapshots_) {
for (const auto &block_snapshot : segment_snapshot_pair.second->block_snapshots_) {
for (const auto &column_block_snapshot : block_snapshot->column_block_snapshots_) {
files.emplace_back(VirtualStore::ConcatenatePath(block_snapshot->block_dir_, column_block_snapshot->filename_));
for (const auto &outline_snapshot : column_block_snapshot->outline_snapshots_) {
files.emplace_back(VirtualStore::ConcatenatePath(block_snapshot->block_dir_, outline_snapshot->filename_));
}
}
}
}
return files;
}

} // namespace infinity
Loading

0 comments on commit 9e0b6cc

Please sign in to comment.