Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(duplication): deal with plog concurrent problem #2068

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions src/replica/duplication/load_from_private_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include <string_view>
#include "common/duplication_common.h"
#include "common/gpid.h"
#include "duplication_types.h"
#include "load_from_private_log.h"
#include "replica/duplication/mutation_batch.h"
Expand Down Expand Up @@ -156,14 +157,29 @@ void load_from_private_log::find_log_file_to_start()
// Reopen the files. Because the internal file handle of `file_map`
// is cleared once WAL replay finished. They are unable to read.
mutation_log::log_file_map_by_index new_file_map;
for (const auto &pr : file_map) {
decree cleanable_decree = _private_log->get_cleanable_decree();
decree max_decree_gpid = _private_log->max_decree(get_gpid());
CHECK(max_decree_gpid > cleanable_decree,
"plog files all error: max_decree_gpid {} , cleanable_decree {}",
max_decree_gpid,
cleanable_decree);

for (auto it = file_map.crbegin(); it != file_map.crend(); ++it) {
log_file_ptr file;
error_s es = log_utils::open_read(pr.second->path(), file);
error_s es = log_utils::open_read(it->second->path(), file);
if (!es.is_ok()) {
LOG_ERROR_PREFIX("{}", es);
return;
}
new_file_map.emplace(pr.first, file);
new_file_map.emplace(it->first, file);

gpid pid = get_gpid();
decree previous_log_max_decree = file->previous_log_max_decree(pid);
// These plog files has possible be deleted do not open_read() next plog file , otherwise it
// may coredump.
if (previous_log_max_decree <= cleanable_decree) {
break;
}
}

find_log_file_to_start(std::move(new_file_map));
Expand Down
13 changes: 13 additions & 0 deletions src/replica/mutation_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,7 @@ void mutation_log::init_states()
_private_log_info = {0, 0};
_plog_max_decree_on_disk = 0;
_plog_max_commit_on_disk = 0;
_cleanable_decree = 0;
}

error_code mutation_log::open(replay_callback read_callback,
Expand Down Expand Up @@ -898,6 +899,18 @@ void mutation_log::update_max_commit_on_disk_no_lock(decree d)
}
}

decree mutation_log::get_cleanable_decree() const
{
zauto_lock l(_lock);
return _cleanable_decree;
}

void mutation_log::set_cleanable_decree(decree d)
{
zauto_lock l(_lock);
_cleanable_decree = d;
}

bool mutation_log::get_learn_state(gpid gpid, decree start, /*out*/ learn_state &state) const
{
CHECK(_is_private, "this method is only valid for private logs");
Expand Down
5 changes: 5 additions & 0 deletions src/replica/mutation_log.h
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,9 @@ class mutation_log : public ref_counter

task_tracker *tracker() { return &_tracker; }

decree get_cleanable_decree() const;
void set_cleanable_decree(decree target);

protected:
// 'size' is data size to write; the '_global_end_offset' will be updated by 'size'.
// can switch file only when create_new_log_if_needed = true;
Expand Down Expand Up @@ -400,6 +403,8 @@ class mutation_log : public ref_counter
// for plog. Since it is set with mutation.data.header.last_committed_decree, it must
// be less than _plog_max_decree_on_disk.
decree _plog_max_commit_on_disk;

decree _cleanable_decree; // To deal with gc conflict
};

typedef dsn::ref_ptr<mutation_log> mutation_log_ptr;
Expand Down
1 change: 1 addition & 0 deletions src/replica/replica_chkpt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ void replica::on_checkpoint_timer()
}
}

_private_log->set_cleanable_decree(cleanable_decree);
tasking::enqueue(LPC_GARBAGE_COLLECT_LOGS_AND_REPLICAS,
&_tracker,
[this, plog, cleanable_decree, valid_start_offset] {
Expand Down