diff --git a/src/replica/bulk_load/replica_bulk_loader.cpp b/src/replica/bulk_load/replica_bulk_loader.cpp index a3c27e99e4..b071bb9496 100644 --- a/src/replica/bulk_load/replica_bulk_loader.cpp +++ b/src/replica/bulk_load/replica_bulk_loader.cpp @@ -533,7 +533,30 @@ void replica_bulk_loader::download_sst_file(const std::string &remote_dir, int32_t file_index, dist::block_service::block_filesystem *fs) { - const file_meta &f_meta = _metadata.files[file_index]; + if (_status != bulk_load_status::BLS_DOWNLOADING) { + LOG_WARNING_PREFIX("Cancel download_sst_file task, because bulk_load local_status is {}. " + "local_dir: {} , file_index is {}.", + enum_to_string(_status), + local_dir, + file_index); + return; + } + file_meta f_meta; + bool get_f_meta = true; + { + zauto_read_lock l(_lock); + if (file_index < _metadata.files.size()) { + f_meta = _metadata.files[file_index]; + } else { + get_f_meta = false; + } + } + if (!get_f_meta) { + LOG_WARNING_PREFIX("sst file index {} exceeds number of bulkload sst files, Cancel " + "download_sst_file task.", + file_index); + return; + } uint64_t f_size = 0; std::string f_md5; error_code ec = _stub->_block_service_manager.download_file( @@ -589,9 +612,17 @@ void replica_bulk_loader::download_sst_file(const std::string &remote_dir, METRIC_VAR_INCREMENT_BY(bulk_load_download_file_bytes, f_size); // download next file - if (file_index + 1 < _metadata.files.size()) { - const file_meta &next_f_meta = _metadata.files[file_index + 1]; - _download_files_task[next_f_meta.name] = + get_f_meta = true; + { + zauto_read_lock l(_lock); + if (file_index + 1 < _metadata.files.size()) { + f_meta = _metadata.files[file_index + 1]; + } else { + get_f_meta = false; + } + } + if (get_f_meta) { + _download_files_task[f_meta.name] = tasking::enqueue(LPC_BACKGROUND_BULK_LOAD, tracker(), std::bind(&replica_bulk_loader::download_sst_file,