From d040484bf89e69f273838ca9c8aaf256aed4c31e Mon Sep 17 00:00:00 2001 From: lupengfan1 Date: Tue, 6 Aug 2024 17:46:42 +0800 Subject: [PATCH] fix(Bulkload): Avoid use reference of `_metadata.files` in download_sst_file (#2006) replica_bulk_loader::clear_bulk_load_states function cannot cancel already downloading sst task, which access `_metadata.files` references. But clear_bulk_load_states function will clear `_metadata.files`. It's cause core dump. I use a copy of `_metadata.files` to solve this problem. --- src/replica/bulk_load/replica_bulk_loader.cpp | 83 +++++++++---------- src/replica/bulk_load/replica_bulk_loader.h | 2 +- 2 files changed, 38 insertions(+), 47 deletions(-) diff --git a/src/replica/bulk_load/replica_bulk_loader.cpp b/src/replica/bulk_load/replica_bulk_loader.cpp index 697083cb76..bda9fa8a97 100644 --- a/src/replica/bulk_load/replica_bulk_loader.cpp +++ b/src/replica/bulk_load/replica_bulk_loader.cpp @@ -474,11 +474,12 @@ error_code replica_bulk_loader::start_download(const std::string &remote_dir, // start download _is_downloading.store(true); - _download_task = tasking::enqueue( - LPC_BACKGROUND_BULK_LOAD, - tracker(), - std::bind( - &replica_bulk_loader::download_files, this, provider_name, remote_dir, local_dir)); + _download_task = + tasking::enqueue(LPC_BACKGROUND_BULK_LOAD, + tracker(), + [this, remote_dir, local_dir, download_file_metas, fs] { + download_sst_file(remote_dir, local_dir, download_file_metas, fs); + }); return ERR_OK; } @@ -519,41 +520,39 @@ void replica_bulk_loader::download_files(const std::string &provider_name, } // download sst files asynchronously - if (!_metadata.files.empty()) { - const file_meta &f_meta = _metadata.files[0]; - _download_files_task[f_meta.name] = tasking::enqueue( - LPC_BACKGROUND_BULK_LOAD, - tracker(), - std::bind(&replica_bulk_loader::download_sst_file, this, remote_dir, local_dir, 0, fs)); + std::vector<::dsn::replication::file_meta> download_file_metas; + { + zauto_read_lock l(_lock); + std::copy(_metadata.files.begin(), + _metadata.files.end(), + std::back_inserter(download_file_metas)); + } + if (!download_file_metas.empty()) { + _download_files_task[download_file_metas.back().name] = + tasking::enqueue(LPC_BACKGROUND_BULK_LOAD, + tracker(), + std::bind(&replica_bulk_loader::download_sst_file, + this, + remote_dir, + local_dir, + download_file_metas, + fs)); } } // ThreadPool: THREAD_POOL_DEFAULT -void replica_bulk_loader::download_sst_file(const std::string &remote_dir, - const std::string &local_dir, - int32_t file_index, - dist::block_service::block_filesystem *fs) +void replica_bulk_loader::download_sst_file( + const std::string &remote_dir, + const std::string &local_dir, + std::vector<::dsn::replication::file_meta> &download_file_metas, + dist::block_service::block_filesystem *fs) { if (_status != bulk_load_status::BLS_DOWNLOADING) { - LOG_WARNING_PREFIX("Cancel download_sst_file task, because bulk_load local_status is {}. " - "local_dir: {} , file_index is {}.", - enum_to_string(_status), - local_dir, - file_index); - return; - } - file_meta f_meta; - { - zauto_read_lock l(_lock); - if (file_index < _metadata.files.size()) { - f_meta = _metadata.files[file_index]; - } - } - if (f_meta.name.empty()) { - LOG_WARNING_PREFIX("Cannot get file_meta of {}, cancel download_sst_file task.", - file_index); + LOG_WARNING_PREFIX("Cancel download_sst_file task, because bulk_load local_status is {}.", + enum_to_string(_status)); return; } + const file_meta &f_meta = download_file_metas.back(); uint64_t f_size = 0; std::string f_md5; error_code ec = _stub->_block_service_manager.download_file( @@ -604,27 +603,19 @@ void replica_bulk_loader::download_sst_file(const std::string &remote_dir, return; } // download file succeed, update progress + download_file_metas.pop_back(); update_bulk_load_download_progress(f_size, f_meta.name); METRIC_VAR_INCREMENT(bulk_load_download_file_successful_count); METRIC_VAR_INCREMENT_BY(bulk_load_download_file_bytes, f_size); // download next file - { - zauto_read_lock l(_lock); - if (file_index + 1 < _metadata.files.size()) { - f_meta = _metadata.files[file_index + 1]; - } - } - if (!f_meta.name.empty()) { - _download_files_task[f_meta.name] = + if (!download_file_metas.empty()) { + _download_files_task[download_file_metas.back().name] = tasking::enqueue(LPC_BACKGROUND_BULK_LOAD, tracker(), - std::bind(&replica_bulk_loader::download_sst_file, - this, - remote_dir, - local_dir, - file_index + 1, - fs)); + [this, remote_dir, local_dir, download_file_metas, fs] { + download_sst_file(remote_dir, local_dir, download_file_metas, fs); + }); } } diff --git a/src/replica/bulk_load/replica_bulk_loader.h b/src/replica/bulk_load/replica_bulk_loader.h index 92f76810cd..18daa45a19 100644 --- a/src/replica/bulk_load/replica_bulk_loader.h +++ b/src/replica/bulk_load/replica_bulk_loader.h @@ -89,7 +89,7 @@ class replica_bulk_loader : replica_base // download sst files from remote provider void download_sst_file(const std::string &remote_dir, const std::string &local_dir, - int32_t file_index, + std::vector<::dsn::replication::file_meta> &download_file_metas, dist::block_service::block_filesystem *fs); // \return ERR_PATH_NOT_FOUND: file not exist