Skip to content

Commit

Permalink
fix(Bulkload): Avoid use reference of _metadata.files in download_…
Browse files Browse the repository at this point in the history
…sst_file (#2006)

    replica_bulk_loader::clear_bulk_load_states function cannot cancel already
    downloading sst task, which access `_metadata.files` references.
    But clear_bulk_load_states function will clear `_metadata.files`. It's
    cause core dump. I use a copy of `_metadata.files` to solve this
    problem.
  • Loading branch information
lupengfan1 committed Aug 21, 2024
1 parent 2b90137 commit d040484
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 47 deletions.
83 changes: 37 additions & 46 deletions src/replica/bulk_load/replica_bulk_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -474,11 +474,12 @@ error_code replica_bulk_loader::start_download(const std::string &remote_dir,

// start download
_is_downloading.store(true);
_download_task = tasking::enqueue(
LPC_BACKGROUND_BULK_LOAD,
tracker(),
std::bind(
&replica_bulk_loader::download_files, this, provider_name, remote_dir, local_dir));
_download_task =
tasking::enqueue(LPC_BACKGROUND_BULK_LOAD,
tracker(),
[this, remote_dir, local_dir, download_file_metas, fs] {
download_sst_file(remote_dir, local_dir, download_file_metas, fs);
});
return ERR_OK;
}

Expand Down Expand Up @@ -519,41 +520,39 @@ void replica_bulk_loader::download_files(const std::string &provider_name,
}

// download sst files asynchronously
if (!_metadata.files.empty()) {
const file_meta &f_meta = _metadata.files[0];
_download_files_task[f_meta.name] = tasking::enqueue(
LPC_BACKGROUND_BULK_LOAD,
tracker(),
std::bind(&replica_bulk_loader::download_sst_file, this, remote_dir, local_dir, 0, fs));
std::vector<::dsn::replication::file_meta> download_file_metas;
{
zauto_read_lock l(_lock);
std::copy(_metadata.files.begin(),
_metadata.files.end(),
std::back_inserter(download_file_metas));
}
if (!download_file_metas.empty()) {
_download_files_task[download_file_metas.back().name] =
tasking::enqueue(LPC_BACKGROUND_BULK_LOAD,
tracker(),
std::bind(&replica_bulk_loader::download_sst_file,
this,
remote_dir,
local_dir,
download_file_metas,
fs));
}
}

// ThreadPool: THREAD_POOL_DEFAULT
void replica_bulk_loader::download_sst_file(const std::string &remote_dir,
const std::string &local_dir,
int32_t file_index,
dist::block_service::block_filesystem *fs)
void replica_bulk_loader::download_sst_file(
const std::string &remote_dir,
const std::string &local_dir,
std::vector<::dsn::replication::file_meta> &download_file_metas,
dist::block_service::block_filesystem *fs)
{
if (_status != bulk_load_status::BLS_DOWNLOADING) {
LOG_WARNING_PREFIX("Cancel download_sst_file task, because bulk_load local_status is {}. "
"local_dir: {} , file_index is {}.",
enum_to_string(_status),
local_dir,
file_index);
return;
}
file_meta f_meta;
{
zauto_read_lock l(_lock);
if (file_index < _metadata.files.size()) {
f_meta = _metadata.files[file_index];
}
}
if (f_meta.name.empty()) {
LOG_WARNING_PREFIX("Cannot get file_meta of {}, cancel download_sst_file task.",
file_index);
LOG_WARNING_PREFIX("Cancel download_sst_file task, because bulk_load local_status is {}.",
enum_to_string(_status));
return;
}
const file_meta &f_meta = download_file_metas.back();
uint64_t f_size = 0;
std::string f_md5;
error_code ec = _stub->_block_service_manager.download_file(
Expand Down Expand Up @@ -604,27 +603,19 @@ void replica_bulk_loader::download_sst_file(const std::string &remote_dir,
return;
}
// download file succeed, update progress
download_file_metas.pop_back();
update_bulk_load_download_progress(f_size, f_meta.name);
METRIC_VAR_INCREMENT(bulk_load_download_file_successful_count);
METRIC_VAR_INCREMENT_BY(bulk_load_download_file_bytes, f_size);

// download next file
{
zauto_read_lock l(_lock);
if (file_index + 1 < _metadata.files.size()) {
f_meta = _metadata.files[file_index + 1];
}
}
if (!f_meta.name.empty()) {
_download_files_task[f_meta.name] =
if (!download_file_metas.empty()) {
_download_files_task[download_file_metas.back().name] =
tasking::enqueue(LPC_BACKGROUND_BULK_LOAD,
tracker(),
std::bind(&replica_bulk_loader::download_sst_file,
this,
remote_dir,
local_dir,
file_index + 1,
fs));
[this, remote_dir, local_dir, download_file_metas, fs] {
download_sst_file(remote_dir, local_dir, download_file_metas, fs);
});
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/replica/bulk_load/replica_bulk_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class replica_bulk_loader : replica_base
// download sst files from remote provider
void download_sst_file(const std::string &remote_dir,
const std::string &local_dir,
int32_t file_index,
std::vector<::dsn::replication::file_meta> &download_file_metas,
dist::block_service::block_filesystem *fs);

// \return ERR_PATH_NOT_FOUND: file not exist
Expand Down

0 comments on commit d040484

Please sign in to comment.