Skip to content

Commit

Permalink
[fix](clone) Fix engine_clone file exist (apache#27361) (apache#27536)
Browse files Browse the repository at this point in the history
  • Loading branch information
JackDrogon authored Nov 24, 2023
1 parent 47ba995 commit 9f2d88b
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 7 deletions.
2 changes: 2 additions & 0 deletions be/src/common/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,8 @@ inline std::string Status::to_string() const {
template <typename T>
using Result = expected<T, Status>;

using ResultError = unexpected<Status>;

#define RETURN_IF_ERROR_RESULT(stmt) \
do { \
Status _status_ = (stmt); \
Expand Down
79 changes: 72 additions & 7 deletions be/src/olap/task/engine_clone_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,58 @@ using strings::SkipWhitespace;
namespace doris {
using namespace ErrorCode;

namespace {
/// if binlog file exist, then check if binlog file md5sum equal
/// if equal, then skip link file
/// if not equal, then return error
/// return value: if binlog file not exist, then return to binlog file path
Result<std::string> check_dest_binlog_valid(const std::string& tablet_dir,
const std::string& clone_file, bool* skip_link_file) {
// change clone_file suffix .binlog to .dat
std::string new_clone_file = clone_file;
new_clone_file.replace(clone_file.size() - 7, 7, ".dat");
auto to = fmt::format("{}/_binlog/{}", tablet_dir, new_clone_file);

// check to to file exist
bool exists = true;
auto status = io::global_local_filesystem()->exists(to, &exists);
if (!status.ok()) {
return ResultError(std::move(status));
}

if (!exists) {
return to;
}

LOG(WARNING) << "binlog file already exist. "
<< "tablet_dir=" << tablet_dir << ", clone_file=" << clone_file;

std::string clone_file_md5sum;
status = io::global_local_filesystem()->md5sum(clone_file, &clone_file_md5sum);
if (!status.ok()) {
return ResultError(std::move(status));
}
std::string to_file_md5sum;
status = io::global_local_filesystem()->md5sum(to, &to_file_md5sum);
if (!status.ok()) {
return ResultError(std::move(status));
}

if (clone_file_md5sum == to_file_md5sum) {
// if md5sum equal, then skip link file
*skip_link_file = true;
return to;
} else {
auto err_msg = fmt::format(
"binlog file already exist, but md5sum not equal. "
"tablet_dir={}, clone_file={}",
tablet_dir, clone_file);
LOG(WARNING) << err_msg;
return ResultError(Status::InternalError(std::move(err_msg)));
}
}
} // namespace

#define RETURN_IF_ERROR_(status, stmt) \
do { \
status = (stmt); \
Expand Down Expand Up @@ -603,6 +655,8 @@ Status EngineCloneTask::_finish_clone(Tablet* tablet, const std::string& clone_d
/// Traverse all downloaded clone files in CLONE dir.
/// If it does not exist in local tablet dir, link the file to local tablet dir
/// And save all linked files in linked_success_files.
/// if binlog exist in clone dir and md5sum equal, then skip link file
bool skip_link_file = false;
for (const string& clone_file : clone_file_names) {
if (local_file_names.find(clone_file) != local_file_names.end()) {
VLOG_NOTICE << "find same file when clone, skip it. "
Expand All @@ -619,19 +673,30 @@ Status EngineCloneTask::_finish_clone(Tablet* tablet, const std::string& clone_d
break;
}

// change clone_file suffix .binlog to .dat
std::string new_clone_file = clone_file;
new_clone_file.replace(clone_file.size() - 7, 7, ".dat");
to = fmt::format("{}/_binlog/{}", tablet_dir, new_clone_file);
if (auto&& result = check_dest_binlog_valid(tablet_dir, clone_file, &skip_link_file);
result) {
to = std::move(result.value());
} else {
status = std::move(result.error());
return status;
}
} else {
to = fmt::format("{}/{}", tablet_dir, clone_file);
}

RETURN_IF_ERROR(io::global_local_filesystem()->link_file(from, to));
linked_success_files.emplace_back(std::move(to));
if (!skip_link_file) {
status = io::global_local_filesystem()->link_file(from, to);
if (!status.ok()) {
return status;
}
linked_success_files.emplace_back(std::move(to));
}
}
if (contain_binlog) {
RETURN_IF_ERROR(tablet->ingest_binlog_metas(&rowset_binlog_metas_pb));
status = tablet->ingest_binlog_metas(&rowset_binlog_metas_pb);
if (!status.ok()) {
return status;
}
}

// clone and compaction operation should be performed sequentially
Expand Down

0 comments on commit 9f2d88b

Please sign in to comment.