Skip to content

Commit

Permalink
Generalize work item definition in BackupEngineImpl (#13228)
Browse files Browse the repository at this point in the history
Summary:
This change refactors existing `CopyOrCreateWorkItem` async task definition to a more generic one (`WorkItem`) with an assigned `type` indicative of intended action. This would allow us to reuse existing, battle-tested async tasks initialization code to handle wider range of incoming use cases in B/R space.

### Motivation
Historically, the two main use cases for `BackupEngineImpl`'s async work items were either creating a file in backup workflow or copying files in restore workflow. However, as we're now exploring opportunities in incremental restore (and potentially speeding up backup verification), we need the work item abstraction to be capable of processing different workflow types concurrently (computing checksum comes to mind).

Pull Request resolved: #13228

Test Plan: Since this is purely cosmetic change where behavior remains intact, existing test collateral will suffice.

Reviewed By: pdillinger

Differential Revision: D67441210

Pulled By: mszeszko-meta

fbshipit-source-id: 78803e8cf3cf40b9d81831fac3a99193e1a30ef0
  • Loading branch information
mszeszko-meta authored and facebook-github-bot committed Dec 20, 2024
1 parent c8bc2b6 commit f7b4216
Showing 1 changed file with 121 additions and 89 deletions.
210 changes: 121 additions & 89 deletions utilities/backup/backup_engine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -594,8 +594,29 @@ class BackupEngineImpl {
Temperature file_temp, RateLimiter* rate_limiter,
std::string* db_id, std::string* db_session_id);

struct CopyOrCreateResult {
~CopyOrCreateResult() {
struct WorkItemResult {
WorkItemResult()
: size(0),
expected_src_temperature(Temperature::kUnknown),
current_src_temperature(Temperature::kUnknown) {}

WorkItemResult(const WorkItemResult& other) = delete;
WorkItemResult& operator=(const WorkItemResult& other) = delete;

WorkItemResult(WorkItemResult&& o) noexcept { *this = std::move(o); }

WorkItemResult& operator=(WorkItemResult&& o) noexcept {
size = o.size;
checksum_hex = std::move(o.checksum_hex);
db_id = std::move(o.db_id);
db_session_id = std::move(o.db_session_id);
io_status = std::move(o.io_status);
expected_src_temperature = o.expected_src_temperature;
current_src_temperature = o.current_src_temperature;
return *this;
}

~WorkItemResult() {
// The Status needs to be ignored here for two reasons.
// First, if the BackupEngineImpl shuts down with jobs outstanding, then
// it is possible that the Status in the future/promise is never read,
Expand All @@ -614,10 +635,14 @@ class BackupEngineImpl {
Temperature current_src_temperature = Temperature::kUnknown;
};

enum WorkItemType : uint64_t {
CopyOrCreate = 1U,
};

// Exactly one of src_path and contents must be non-empty. If src_path is
// non-empty, the file is copied from this pathname. Otherwise, if contents is
// non-empty, the file will be created at dst_path with these contents.
struct CopyOrCreateWorkItem {
struct WorkItem {
std::string src_path;
std::string dst_path;
Temperature src_temperature;
Expand All @@ -630,34 +655,33 @@ class BackupEngineImpl {
RateLimiter* rate_limiter;
uint64_t size_limit;
Statistics* stats;
std::promise<CopyOrCreateResult> result;
std::promise<WorkItemResult> result;
std::function<void()> progress_callback;
std::string src_checksum_func_name;
std::string src_checksum_hex;
std::string db_id;
std::string db_session_id;
WorkItemType type;

CopyOrCreateWorkItem()
WorkItem()
: src_temperature(Temperature::kUnknown),
dst_temperature(Temperature::kUnknown),

src_env(nullptr),
dst_env(nullptr),
src_env_options(),
sync(false),
rate_limiter(nullptr),
size_limit(0),
stats(nullptr),
src_checksum_func_name(kUnknownFileChecksumFuncName) {}
src_checksum_func_name(kUnknownFileChecksumFuncName),
type(WorkItemType::CopyOrCreate) {}

CopyOrCreateWorkItem(const CopyOrCreateWorkItem&) = delete;
CopyOrCreateWorkItem& operator=(const CopyOrCreateWorkItem&) = delete;
WorkItem(const WorkItem&) = delete;
WorkItem& operator=(const WorkItem&) = delete;

CopyOrCreateWorkItem(CopyOrCreateWorkItem&& o) noexcept {
*this = std::move(o);
}
WorkItem(WorkItem&& o) noexcept { *this = std::move(o); }

CopyOrCreateWorkItem& operator=(CopyOrCreateWorkItem&& o) noexcept {
WorkItem& operator=(WorkItem&& o) noexcept {
src_path = std::move(o.src_path);
dst_path = std::move(o.dst_path);
src_temperature = std::move(o.src_temperature);
Expand All @@ -677,22 +701,22 @@ class BackupEngineImpl {
db_id = std::move(o.db_id);
db_session_id = std::move(o.db_session_id);
src_temperature = o.src_temperature;
type = std::move(o.type);
return *this;
}

CopyOrCreateWorkItem(std::string _src_path, std::string _dst_path,
const Temperature _src_temperature,
const Temperature _dst_temperature,
std::string _contents, Env* _src_env, Env* _dst_env,
EnvOptions _src_env_options, bool _sync,
RateLimiter* _rate_limiter, uint64_t _size_limit,
Statistics* _stats,
std::function<void()> _progress_callback = {},
const std::string& _src_checksum_func_name =
kUnknownFileChecksumFuncName,
const std::string& _src_checksum_hex = "",
const std::string& _db_id = "",
const std::string& _db_session_id = "")
WorkItem(std::string _src_path, std::string _dst_path,
const Temperature _src_temperature,
const Temperature _dst_temperature, std::string _contents,
Env* _src_env, Env* _dst_env, EnvOptions _src_env_options,
bool _sync, RateLimiter* _rate_limiter, uint64_t _size_limit,
Statistics* _stats, std::function<void()> _progress_callback = {},
const std::string& _src_checksum_func_name =
kUnknownFileChecksumFuncName,
const std::string& _src_checksum_hex = "",
const std::string& _db_id = "",
const std::string& _db_session_id = "",
WorkItemType _type = WorkItemType::CopyOrCreate)
: src_path(std::move(_src_path)),
dst_path(std::move(_dst_path)),
src_temperature(_src_temperature),
Expand All @@ -709,11 +733,14 @@ class BackupEngineImpl {
src_checksum_func_name(_src_checksum_func_name),
src_checksum_hex(_src_checksum_hex),
db_id(_db_id),
db_session_id(_db_session_id) {}
db_session_id(_db_session_id),
type(_type) {}

~WorkItem() = default;
};

struct BackupAfterCopyOrCreateWorkItem {
std::future<CopyOrCreateResult> result;
std::future<WorkItemResult> result;
bool shared;
bool needed_to_copy;
Env* backup_env;
Expand All @@ -740,7 +767,7 @@ class BackupEngineImpl {
return *this;
}

BackupAfterCopyOrCreateWorkItem(std::future<CopyOrCreateResult>&& _result,
BackupAfterCopyOrCreateWorkItem(std::future<WorkItemResult>&& _result,
bool _shared, bool _needed_to_copy,
Env* _backup_env, std::string _dst_path_tmp,
std::string _dst_path,
Expand All @@ -755,15 +782,15 @@ class BackupEngineImpl {
};

using BackupWorkItemPair =
std::pair<CopyOrCreateWorkItem, BackupAfterCopyOrCreateWorkItem>;
std::pair<WorkItem, BackupAfterCopyOrCreateWorkItem>;

struct RestoreAfterCopyOrCreateWorkItem {
std::future<CopyOrCreateResult> result;
std::future<WorkItemResult> result;
std::string from_file;
std::string to_file;
std::string checksum_hex;
RestoreAfterCopyOrCreateWorkItem() {}
RestoreAfterCopyOrCreateWorkItem(std::future<CopyOrCreateResult>&& _result,
RestoreAfterCopyOrCreateWorkItem(std::future<WorkItemResult>&& _result,
const std::string& _from_file,
const std::string& _to_file,
const std::string& _checksum_hex)
Expand All @@ -786,7 +813,7 @@ class BackupEngineImpl {

bool initialized_;
std::mutex byte_report_mutex_;
mutable channel<CopyOrCreateWorkItem> files_to_copy_or_create_;
mutable channel<WorkItem> work_items_;
std::vector<port::Thread> threads_;
std::atomic<CpuPriority> threads_cpu_priority_;

Expand Down Expand Up @@ -1040,7 +1067,7 @@ BackupEngineImpl::BackupEngineImpl(const BackupEngineOptions& options,
}

BackupEngineImpl::~BackupEngineImpl() {
files_to_copy_or_create_.sendEof();
work_items_.sendEof();
for (auto& t : threads_) {
t.join();
}
Expand Down Expand Up @@ -1239,7 +1266,7 @@ IOStatus BackupEngineImpl::Initialize() {
ROCKS_LOG_INFO(options_.info_log, "Latest valid backup is %u",
latest_valid_backup_id_);

// set up threads perform copies from files_to_copy_or_create_ in the
// set up threads perform copies from work_items_ in the
// background
threads_cpu_priority_ = CpuPriority::kNormal;
threads_.reserve(options_.max_background_operations);
Expand All @@ -1251,9 +1278,9 @@ IOStatus BackupEngineImpl::Initialize() {
#endif
#endif
CpuPriority current_priority = CpuPriority::kNormal;
CopyOrCreateWorkItem work_item;
WorkItem work_item;
uint64_t bytes_toward_next_callback = 0;
while (files_to_copy_or_create_.read(work_item)) {
while (work_items_.read(work_item)) {
CpuPriority priority = threads_cpu_priority_;
if (current_priority != priority) {
TEST_SYNC_POINT_CALLBACK(
Expand All @@ -1268,54 +1295,59 @@ IOStatus BackupEngineImpl::Initialize() {
uint64_t prev_bytes_read = IOSTATS(bytes_read);
uint64_t prev_bytes_written = IOSTATS(bytes_written);

CopyOrCreateResult result;
WorkItemResult result;
Temperature temp = work_item.src_temperature;
result.io_status = CopyOrCreateFile(
work_item.src_path, work_item.dst_path, work_item.contents,
work_item.size_limit, work_item.src_env, work_item.dst_env,
work_item.src_env_options, work_item.sync, work_item.rate_limiter,
work_item.progress_callback, &temp, work_item.dst_temperature,
&bytes_toward_next_callback, &result.size, &result.checksum_hex);

RecordTick(work_item.stats, BACKUP_READ_BYTES,
IOSTATS(bytes_read) - prev_bytes_read);
RecordTick(work_item.stats, BACKUP_WRITE_BYTES,
IOSTATS(bytes_written) - prev_bytes_written);

result.db_id = work_item.db_id;
result.db_session_id = work_item.db_session_id;
result.expected_src_temperature = work_item.src_temperature;
result.current_src_temperature = temp;
if (result.io_status.ok() && !work_item.src_checksum_hex.empty()) {
// unknown checksum function name implies no db table file checksum in
// db manifest; work_item.src_checksum_hex not empty means
// backup engine has calculated its crc32c checksum for the table
// file; therefore, we are able to compare the checksums.
if (work_item.src_checksum_func_name ==
kUnknownFileChecksumFuncName ||
work_item.src_checksum_func_name == kDbFileChecksumFuncName) {
if (work_item.src_checksum_hex != result.checksum_hex) {
std::string checksum_info(
"Expected checksum is " + work_item.src_checksum_hex +
" while computed checksum is " + result.checksum_hex);
result.io_status = IOStatus::Corruption(
"Checksum mismatch after copying to " + work_item.dst_path +
": " + checksum_info);
if (work_item.type == WorkItemType::CopyOrCreate) {
result.io_status = CopyOrCreateFile(
work_item.src_path, work_item.dst_path, work_item.contents,
work_item.size_limit, work_item.src_env, work_item.dst_env,
work_item.src_env_options, work_item.sync, work_item.rate_limiter,
work_item.progress_callback, &temp, work_item.dst_temperature,
&bytes_toward_next_callback, &result.size, &result.checksum_hex);

RecordTick(work_item.stats, BACKUP_READ_BYTES,
IOSTATS(bytes_read) - prev_bytes_read);
RecordTick(work_item.stats, BACKUP_WRITE_BYTES,
IOSTATS(bytes_written) - prev_bytes_written);

result.db_id = work_item.db_id;
result.db_session_id = work_item.db_session_id;
result.expected_src_temperature = work_item.src_temperature;
result.current_src_temperature = temp;
if (result.io_status.ok() && !work_item.src_checksum_hex.empty()) {
// unknown checksum function name implies no db table file checksum
// in db manifest; work_item.src_checksum_hex not empty means backup
// engine has calculated its crc32c checksum for the table file;
// therefore, we are able to compare the checksums.
if (work_item.src_checksum_func_name ==
kUnknownFileChecksumFuncName ||
work_item.src_checksum_func_name == kDbFileChecksumFuncName) {
if (work_item.src_checksum_hex != result.checksum_hex) {
std::string checksum_info(
"Expected checksum is " + work_item.src_checksum_hex +
" while computed checksum is " + result.checksum_hex);
result.io_status = IOStatus::Corruption(
"Checksum mismatch after copying to " + work_item.dst_path +
": " + checksum_info);
}
} else {
// FIXME(peterd): dead code?
std::string checksum_function_info(
"Existing checksum function is " +
work_item.src_checksum_func_name +
" while provided checksum function is " +
kBackupFileChecksumFuncName);
ROCKS_LOG_INFO(
options_.info_log,
"Unable to verify checksum after copying to %s: %s\n",
work_item.dst_path.c_str(), checksum_function_info.c_str());
}
} else {
// FIXME(peterd): dead code?
std::string checksum_function_info(
"Existing checksum function is " +
work_item.src_checksum_func_name +
" while provided checksum function is " +
kBackupFileChecksumFuncName);
ROCKS_LOG_INFO(
options_.info_log,
"Unable to verify checksum after copying to %s: %s\n",
work_item.dst_path.c_str(), checksum_function_info.c_str());
}
work_item.result.set_value(std::move(result));
} else {
result.io_status = IOStatus::InvalidArgument(
"Unknown work item type: " + std::to_string(work_item.type));
}
work_item.result.set_value(std::move(result));
}
});
}
Expand Down Expand Up @@ -1404,7 +1436,7 @@ IOStatus BackupEngineImpl::CreateNewBackupWithMetadata(

std::deque<BackupWorkItemPair> excludable_items;
std::deque<BackupAfterCopyOrCreateWorkItem> backup_items_to_finish;
// Add a CopyOrCreateWorkItem to the channel for each live file
// Add a WorkItem to the channel for each live file
Status disabled = db->DisableFileDeletions();
DBOptions db_options = db->GetDBOptions();
Statistics* stats = db_options.statistics.get();
Expand Down Expand Up @@ -1533,7 +1565,7 @@ IOStatus BackupEngineImpl::CreateNewBackupWithMetadata(
if (maybe_exclude_files[i].exclude_decision) {
new_backup.get()->AddExcludedFile(e.second.dst_relative);
} else {
files_to_copy_or_create_.write(std::move(e.first));
work_items_.write(std::move(e.first));
backup_items_to_finish.push_back(std::move(e.second));
}
}
Expand Down Expand Up @@ -2000,7 +2032,7 @@ IOStatus BackupEngineImpl::RestoreDBFromBackup(

ROCKS_LOG_INFO(options_.info_log, "Restoring %s to %s\n", file.c_str(),
dst.c_str());
CopyOrCreateWorkItem copy_or_create_work_item(
WorkItem copy_or_create_work_item(
absolute_file, dst, Temperature::kUnknown /* src_temp */,
file_info->temp, "" /* contents */, src_env, db_env_,
EnvOptions() /* src_env_options */, options_.sync,
Expand All @@ -2009,7 +2041,7 @@ IOStatus BackupEngineImpl::RestoreDBFromBackup(
RestoreAfterCopyOrCreateWorkItem after_copy_or_create_work_item(
copy_or_create_work_item.result.get_future(), file, dst,
file_info->checksum_hex);
files_to_copy_or_create_.write(std::move(copy_or_create_work_item));
work_items_.write(std::move(copy_or_create_work_item));
restore_items_to_finish.push_back(
std::move(after_copy_or_create_work_item));
}
Expand Down Expand Up @@ -2486,7 +2518,7 @@ IOStatus BackupEngineImpl::AddBackupFileWorkItem(

// Step 3: Add work item
if (!contents.empty() || need_to_copy) {
CopyOrCreateWorkItem copy_or_create_work_item(
WorkItem copy_or_create_work_item(
src_dir.empty() ? "" : src_path, *copy_dest_path, src_temperature,
Temperature::kUnknown /*dst_temp*/, contents, db_env_, backup_env_,
src_env_options, options_.sync, rate_limiter, size_limit, stats,
Expand All @@ -2506,17 +2538,17 @@ IOStatus BackupEngineImpl::AddBackupFileWorkItem(
// the checkpoint
ROCKS_LOG_INFO(options_.info_log, "Copying %s to %s", fname.c_str(),
copy_dest_path->c_str());
files_to_copy_or_create_.write(std::move(copy_or_create_work_item));
work_items_.write(std::move(copy_or_create_work_item));
backup_items_to_finish.push_back(
std::move(after_copy_or_create_work_item));
}
} else {
std::promise<CopyOrCreateResult> promise_result;
std::promise<WorkItemResult> promise_result;
BackupAfterCopyOrCreateWorkItem after_copy_or_create_work_item(
promise_result.get_future(), shared, need_to_copy, backup_env_,
temp_dest_path, final_dest_path, dst_relative);
backup_items_to_finish.push_back(std::move(after_copy_or_create_work_item));
CopyOrCreateResult result;
WorkItemResult result;
result.io_status = IOStatus::OK();
result.size = size_bytes;
result.checksum_hex = std::move(checksum_hex);
Expand Down

0 comments on commit f7b4216

Please sign in to comment.