From f7b42166283223d29b6876ecb6acfb6373f07eaf Mon Sep 17 00:00:00 2001 From: Maciej Szeszko Date: Thu, 19 Dec 2024 16:57:03 -0800 Subject: [PATCH] Generalize work item definition in BackupEngineImpl (#13228) Summary: This change refactors existing `CopyOrCreateWorkItem` async task definition to a more generic one (`WorkItem`) with an assigned `type` indicative of intended action. This would allow us to reuse existing, battle-tested async tasks initialization code to handle wider range of incoming use cases in B/R space. ### Motivation Historically, the two main use cases for `BackupEngineImpl`'s async work items were either creating a file in backup workflow or copying files in restore workflow. However, as we're now exploring opportunities in incremental restore (and potentially speeding up backup verification), we need the work item abstraction to be capable of processing different workflow types concurrently (computing checksum comes to mind). Pull Request resolved: https://github.com/facebook/rocksdb/pull/13228 Test Plan: Since this is purely cosmetic change where behavior remains intact, existing test collateral will suffice. Reviewed By: pdillinger Differential Revision: D67441210 Pulled By: mszeszko-meta fbshipit-source-id: 78803e8cf3cf40b9d81831fac3a99193e1a30ef0 --- utilities/backup/backup_engine.cc | 210 +++++++++++++++++------------- 1 file changed, 121 insertions(+), 89 deletions(-) diff --git a/utilities/backup/backup_engine.cc b/utilities/backup/backup_engine.cc index e6cbae48c01..ed43ff0b89e 100644 --- a/utilities/backup/backup_engine.cc +++ b/utilities/backup/backup_engine.cc @@ -594,8 +594,29 @@ class BackupEngineImpl { Temperature file_temp, RateLimiter* rate_limiter, std::string* db_id, std::string* db_session_id); - struct CopyOrCreateResult { - ~CopyOrCreateResult() { + struct WorkItemResult { + WorkItemResult() + : size(0), + expected_src_temperature(Temperature::kUnknown), + current_src_temperature(Temperature::kUnknown) {} + + WorkItemResult(const WorkItemResult& other) = delete; + WorkItemResult& operator=(const WorkItemResult& other) = delete; + + WorkItemResult(WorkItemResult&& o) noexcept { *this = std::move(o); } + + WorkItemResult& operator=(WorkItemResult&& o) noexcept { + size = o.size; + checksum_hex = std::move(o.checksum_hex); + db_id = std::move(o.db_id); + db_session_id = std::move(o.db_session_id); + io_status = std::move(o.io_status); + expected_src_temperature = o.expected_src_temperature; + current_src_temperature = o.current_src_temperature; + return *this; + } + + ~WorkItemResult() { // The Status needs to be ignored here for two reasons. // First, if the BackupEngineImpl shuts down with jobs outstanding, then // it is possible that the Status in the future/promise is never read, @@ -614,10 +635,14 @@ class BackupEngineImpl { Temperature current_src_temperature = Temperature::kUnknown; }; + enum WorkItemType : uint64_t { + CopyOrCreate = 1U, + }; + // Exactly one of src_path and contents must be non-empty. If src_path is // non-empty, the file is copied from this pathname. Otherwise, if contents is // non-empty, the file will be created at dst_path with these contents. - struct CopyOrCreateWorkItem { + struct WorkItem { std::string src_path; std::string dst_path; Temperature src_temperature; @@ -630,17 +655,17 @@ class BackupEngineImpl { RateLimiter* rate_limiter; uint64_t size_limit; Statistics* stats; - std::promise result; + std::promise result; std::function progress_callback; std::string src_checksum_func_name; std::string src_checksum_hex; std::string db_id; std::string db_session_id; + WorkItemType type; - CopyOrCreateWorkItem() + WorkItem() : src_temperature(Temperature::kUnknown), dst_temperature(Temperature::kUnknown), - src_env(nullptr), dst_env(nullptr), src_env_options(), @@ -648,16 +673,15 @@ class BackupEngineImpl { rate_limiter(nullptr), size_limit(0), stats(nullptr), - src_checksum_func_name(kUnknownFileChecksumFuncName) {} + src_checksum_func_name(kUnknownFileChecksumFuncName), + type(WorkItemType::CopyOrCreate) {} - CopyOrCreateWorkItem(const CopyOrCreateWorkItem&) = delete; - CopyOrCreateWorkItem& operator=(const CopyOrCreateWorkItem&) = delete; + WorkItem(const WorkItem&) = delete; + WorkItem& operator=(const WorkItem&) = delete; - CopyOrCreateWorkItem(CopyOrCreateWorkItem&& o) noexcept { - *this = std::move(o); - } + WorkItem(WorkItem&& o) noexcept { *this = std::move(o); } - CopyOrCreateWorkItem& operator=(CopyOrCreateWorkItem&& o) noexcept { + WorkItem& operator=(WorkItem&& o) noexcept { src_path = std::move(o.src_path); dst_path = std::move(o.dst_path); src_temperature = std::move(o.src_temperature); @@ -677,22 +701,22 @@ class BackupEngineImpl { db_id = std::move(o.db_id); db_session_id = std::move(o.db_session_id); src_temperature = o.src_temperature; + type = std::move(o.type); return *this; } - CopyOrCreateWorkItem(std::string _src_path, std::string _dst_path, - const Temperature _src_temperature, - const Temperature _dst_temperature, - std::string _contents, Env* _src_env, Env* _dst_env, - EnvOptions _src_env_options, bool _sync, - RateLimiter* _rate_limiter, uint64_t _size_limit, - Statistics* _stats, - std::function _progress_callback = {}, - const std::string& _src_checksum_func_name = - kUnknownFileChecksumFuncName, - const std::string& _src_checksum_hex = "", - const std::string& _db_id = "", - const std::string& _db_session_id = "") + WorkItem(std::string _src_path, std::string _dst_path, + const Temperature _src_temperature, + const Temperature _dst_temperature, std::string _contents, + Env* _src_env, Env* _dst_env, EnvOptions _src_env_options, + bool _sync, RateLimiter* _rate_limiter, uint64_t _size_limit, + Statistics* _stats, std::function _progress_callback = {}, + const std::string& _src_checksum_func_name = + kUnknownFileChecksumFuncName, + const std::string& _src_checksum_hex = "", + const std::string& _db_id = "", + const std::string& _db_session_id = "", + WorkItemType _type = WorkItemType::CopyOrCreate) : src_path(std::move(_src_path)), dst_path(std::move(_dst_path)), src_temperature(_src_temperature), @@ -709,11 +733,14 @@ class BackupEngineImpl { src_checksum_func_name(_src_checksum_func_name), src_checksum_hex(_src_checksum_hex), db_id(_db_id), - db_session_id(_db_session_id) {} + db_session_id(_db_session_id), + type(_type) {} + + ~WorkItem() = default; }; struct BackupAfterCopyOrCreateWorkItem { - std::future result; + std::future result; bool shared; bool needed_to_copy; Env* backup_env; @@ -740,7 +767,7 @@ class BackupEngineImpl { return *this; } - BackupAfterCopyOrCreateWorkItem(std::future&& _result, + BackupAfterCopyOrCreateWorkItem(std::future&& _result, bool _shared, bool _needed_to_copy, Env* _backup_env, std::string _dst_path_tmp, std::string _dst_path, @@ -755,15 +782,15 @@ class BackupEngineImpl { }; using BackupWorkItemPair = - std::pair; + std::pair; struct RestoreAfterCopyOrCreateWorkItem { - std::future result; + std::future result; std::string from_file; std::string to_file; std::string checksum_hex; RestoreAfterCopyOrCreateWorkItem() {} - RestoreAfterCopyOrCreateWorkItem(std::future&& _result, + RestoreAfterCopyOrCreateWorkItem(std::future&& _result, const std::string& _from_file, const std::string& _to_file, const std::string& _checksum_hex) @@ -786,7 +813,7 @@ class BackupEngineImpl { bool initialized_; std::mutex byte_report_mutex_; - mutable channel files_to_copy_or_create_; + mutable channel work_items_; std::vector threads_; std::atomic threads_cpu_priority_; @@ -1040,7 +1067,7 @@ BackupEngineImpl::BackupEngineImpl(const BackupEngineOptions& options, } BackupEngineImpl::~BackupEngineImpl() { - files_to_copy_or_create_.sendEof(); + work_items_.sendEof(); for (auto& t : threads_) { t.join(); } @@ -1239,7 +1266,7 @@ IOStatus BackupEngineImpl::Initialize() { ROCKS_LOG_INFO(options_.info_log, "Latest valid backup is %u", latest_valid_backup_id_); - // set up threads perform copies from files_to_copy_or_create_ in the + // set up threads perform copies from work_items_ in the // background threads_cpu_priority_ = CpuPriority::kNormal; threads_.reserve(options_.max_background_operations); @@ -1251,9 +1278,9 @@ IOStatus BackupEngineImpl::Initialize() { #endif #endif CpuPriority current_priority = CpuPriority::kNormal; - CopyOrCreateWorkItem work_item; + WorkItem work_item; uint64_t bytes_toward_next_callback = 0; - while (files_to_copy_or_create_.read(work_item)) { + while (work_items_.read(work_item)) { CpuPriority priority = threads_cpu_priority_; if (current_priority != priority) { TEST_SYNC_POINT_CALLBACK( @@ -1268,54 +1295,59 @@ IOStatus BackupEngineImpl::Initialize() { uint64_t prev_bytes_read = IOSTATS(bytes_read); uint64_t prev_bytes_written = IOSTATS(bytes_written); - CopyOrCreateResult result; + WorkItemResult result; Temperature temp = work_item.src_temperature; - result.io_status = CopyOrCreateFile( - work_item.src_path, work_item.dst_path, work_item.contents, - work_item.size_limit, work_item.src_env, work_item.dst_env, - work_item.src_env_options, work_item.sync, work_item.rate_limiter, - work_item.progress_callback, &temp, work_item.dst_temperature, - &bytes_toward_next_callback, &result.size, &result.checksum_hex); - - RecordTick(work_item.stats, BACKUP_READ_BYTES, - IOSTATS(bytes_read) - prev_bytes_read); - RecordTick(work_item.stats, BACKUP_WRITE_BYTES, - IOSTATS(bytes_written) - prev_bytes_written); - - result.db_id = work_item.db_id; - result.db_session_id = work_item.db_session_id; - result.expected_src_temperature = work_item.src_temperature; - result.current_src_temperature = temp; - if (result.io_status.ok() && !work_item.src_checksum_hex.empty()) { - // unknown checksum function name implies no db table file checksum in - // db manifest; work_item.src_checksum_hex not empty means - // backup engine has calculated its crc32c checksum for the table - // file; therefore, we are able to compare the checksums. - if (work_item.src_checksum_func_name == - kUnknownFileChecksumFuncName || - work_item.src_checksum_func_name == kDbFileChecksumFuncName) { - if (work_item.src_checksum_hex != result.checksum_hex) { - std::string checksum_info( - "Expected checksum is " + work_item.src_checksum_hex + - " while computed checksum is " + result.checksum_hex); - result.io_status = IOStatus::Corruption( - "Checksum mismatch after copying to " + work_item.dst_path + - ": " + checksum_info); + if (work_item.type == WorkItemType::CopyOrCreate) { + result.io_status = CopyOrCreateFile( + work_item.src_path, work_item.dst_path, work_item.contents, + work_item.size_limit, work_item.src_env, work_item.dst_env, + work_item.src_env_options, work_item.sync, work_item.rate_limiter, + work_item.progress_callback, &temp, work_item.dst_temperature, + &bytes_toward_next_callback, &result.size, &result.checksum_hex); + + RecordTick(work_item.stats, BACKUP_READ_BYTES, + IOSTATS(bytes_read) - prev_bytes_read); + RecordTick(work_item.stats, BACKUP_WRITE_BYTES, + IOSTATS(bytes_written) - prev_bytes_written); + + result.db_id = work_item.db_id; + result.db_session_id = work_item.db_session_id; + result.expected_src_temperature = work_item.src_temperature; + result.current_src_temperature = temp; + if (result.io_status.ok() && !work_item.src_checksum_hex.empty()) { + // unknown checksum function name implies no db table file checksum + // in db manifest; work_item.src_checksum_hex not empty means backup + // engine has calculated its crc32c checksum for the table file; + // therefore, we are able to compare the checksums. + if (work_item.src_checksum_func_name == + kUnknownFileChecksumFuncName || + work_item.src_checksum_func_name == kDbFileChecksumFuncName) { + if (work_item.src_checksum_hex != result.checksum_hex) { + std::string checksum_info( + "Expected checksum is " + work_item.src_checksum_hex + + " while computed checksum is " + result.checksum_hex); + result.io_status = IOStatus::Corruption( + "Checksum mismatch after copying to " + work_item.dst_path + + ": " + checksum_info); + } + } else { + // FIXME(peterd): dead code? + std::string checksum_function_info( + "Existing checksum function is " + + work_item.src_checksum_func_name + + " while provided checksum function is " + + kBackupFileChecksumFuncName); + ROCKS_LOG_INFO( + options_.info_log, + "Unable to verify checksum after copying to %s: %s\n", + work_item.dst_path.c_str(), checksum_function_info.c_str()); } - } else { - // FIXME(peterd): dead code? - std::string checksum_function_info( - "Existing checksum function is " + - work_item.src_checksum_func_name + - " while provided checksum function is " + - kBackupFileChecksumFuncName); - ROCKS_LOG_INFO( - options_.info_log, - "Unable to verify checksum after copying to %s: %s\n", - work_item.dst_path.c_str(), checksum_function_info.c_str()); } + work_item.result.set_value(std::move(result)); + } else { + result.io_status = IOStatus::InvalidArgument( + "Unknown work item type: " + std::to_string(work_item.type)); } - work_item.result.set_value(std::move(result)); } }); } @@ -1404,7 +1436,7 @@ IOStatus BackupEngineImpl::CreateNewBackupWithMetadata( std::deque excludable_items; std::deque backup_items_to_finish; - // Add a CopyOrCreateWorkItem to the channel for each live file + // Add a WorkItem to the channel for each live file Status disabled = db->DisableFileDeletions(); DBOptions db_options = db->GetDBOptions(); Statistics* stats = db_options.statistics.get(); @@ -1533,7 +1565,7 @@ IOStatus BackupEngineImpl::CreateNewBackupWithMetadata( if (maybe_exclude_files[i].exclude_decision) { new_backup.get()->AddExcludedFile(e.second.dst_relative); } else { - files_to_copy_or_create_.write(std::move(e.first)); + work_items_.write(std::move(e.first)); backup_items_to_finish.push_back(std::move(e.second)); } } @@ -2000,7 +2032,7 @@ IOStatus BackupEngineImpl::RestoreDBFromBackup( ROCKS_LOG_INFO(options_.info_log, "Restoring %s to %s\n", file.c_str(), dst.c_str()); - CopyOrCreateWorkItem copy_or_create_work_item( + WorkItem copy_or_create_work_item( absolute_file, dst, Temperature::kUnknown /* src_temp */, file_info->temp, "" /* contents */, src_env, db_env_, EnvOptions() /* src_env_options */, options_.sync, @@ -2009,7 +2041,7 @@ IOStatus BackupEngineImpl::RestoreDBFromBackup( RestoreAfterCopyOrCreateWorkItem after_copy_or_create_work_item( copy_or_create_work_item.result.get_future(), file, dst, file_info->checksum_hex); - files_to_copy_or_create_.write(std::move(copy_or_create_work_item)); + work_items_.write(std::move(copy_or_create_work_item)); restore_items_to_finish.push_back( std::move(after_copy_or_create_work_item)); } @@ -2486,7 +2518,7 @@ IOStatus BackupEngineImpl::AddBackupFileWorkItem( // Step 3: Add work item if (!contents.empty() || need_to_copy) { - CopyOrCreateWorkItem copy_or_create_work_item( + WorkItem copy_or_create_work_item( src_dir.empty() ? "" : src_path, *copy_dest_path, src_temperature, Temperature::kUnknown /*dst_temp*/, contents, db_env_, backup_env_, src_env_options, options_.sync, rate_limiter, size_limit, stats, @@ -2506,17 +2538,17 @@ IOStatus BackupEngineImpl::AddBackupFileWorkItem( // the checkpoint ROCKS_LOG_INFO(options_.info_log, "Copying %s to %s", fname.c_str(), copy_dest_path->c_str()); - files_to_copy_or_create_.write(std::move(copy_or_create_work_item)); + work_items_.write(std::move(copy_or_create_work_item)); backup_items_to_finish.push_back( std::move(after_copy_or_create_work_item)); } } else { - std::promise promise_result; + std::promise promise_result; BackupAfterCopyOrCreateWorkItem after_copy_or_create_work_item( promise_result.get_future(), shared, need_to_copy, backup_env_, temp_dest_path, final_dest_path, dst_relative); backup_items_to_finish.push_back(std::move(after_copy_or_create_work_item)); - CopyOrCreateResult result; + WorkItemResult result; result.io_status = IOStatus::OK(); result.size = size_bytes; result.checksum_hex = std::move(checksum_hex);