diff --git a/src/replica/duplication/load_from_private_log.cpp b/src/replica/duplication/load_from_private_log.cpp index b0e501409f..2a2d492580 100644 --- a/src/replica/duplication/load_from_private_log.cpp +++ b/src/replica/duplication/load_from_private_log.cpp @@ -22,6 +22,7 @@ #include #include "common/duplication_common.h" +#include "common/gpid.h" #include "duplication_types.h" #include "load_from_private_log.h" #include "replica/duplication/mutation_batch.h" @@ -156,14 +157,29 @@ void load_from_private_log::find_log_file_to_start() // Reopen the files. Because the internal file handle of `file_map` // is cleared once WAL replay finished. They are unable to read. mutation_log::log_file_map_by_index new_file_map; - for (const auto &pr : file_map) { + decree cleanable_decree = _private_log->get_cleanable_decree(); + decree max_decree_gpid = _private_log->max_decree(get_gpid()); + CHECK(max_decree_gpid > cleanable_decree, + "plog files all error: max_decree_gpid {} , cleanable_decree {}", + max_decree_gpid, + cleanable_decree); + + for (auto it = file_map.crbegin(); it != file_map.crend(); ++it) { log_file_ptr file; - error_s es = log_utils::open_read(pr.second->path(), file); + error_s es = log_utils::open_read(it->second->path(), file); if (!es.is_ok()) { LOG_ERROR_PREFIX("{}", es); return; } - new_file_map.emplace(pr.first, file); + new_file_map.emplace(it->first, file); + + gpid pid = get_gpid(); + decree previous_log_max_decree = file->previous_log_max_decree(pid); + // These plog files has possible be deleted do not open_read() next plog file , otherwise it + // may coredump. + if (previous_log_max_decree <= cleanable_decree) { + break; + } } find_log_file_to_start(std::move(new_file_map)); diff --git a/src/replica/mutation_log.cpp b/src/replica/mutation_log.cpp index df3ca2e6b2..23b78efe36 100644 --- a/src/replica/mutation_log.cpp +++ b/src/replica/mutation_log.cpp @@ -361,6 +361,7 @@ void mutation_log::init_states() _private_log_info = {0, 0}; _plog_max_decree_on_disk = 0; _plog_max_commit_on_disk = 0; + _cleanable_decree = 0; } error_code mutation_log::open(replay_callback read_callback, @@ -898,6 +899,18 @@ void mutation_log::update_max_commit_on_disk_no_lock(decree d) } } +decree mutation_log::get_cleanable_decree() const +{ + zauto_lock l(_lock); + return _cleanable_decree; +} + +void mutation_log::set_cleanable_decree(decree d) +{ + zauto_lock l(_lock); + _cleanable_decree = d; +} + bool mutation_log::get_learn_state(gpid gpid, decree start, /*out*/ learn_state &state) const { CHECK(_is_private, "this method is only valid for private logs"); diff --git a/src/replica/mutation_log.h b/src/replica/mutation_log.h index 7689004aae..b1d3c8e6a0 100644 --- a/src/replica/mutation_log.h +++ b/src/replica/mutation_log.h @@ -301,6 +301,9 @@ class mutation_log : public ref_counter task_tracker *tracker() { return &_tracker; } + decree get_cleanable_decree() const; + void set_cleanable_decree(decree target); + protected: // 'size' is data size to write; the '_global_end_offset' will be updated by 'size'. // can switch file only when create_new_log_if_needed = true; @@ -400,6 +403,8 @@ class mutation_log : public ref_counter // for plog. Since it is set with mutation.data.header.last_committed_decree, it must // be less than _plog_max_decree_on_disk. decree _plog_max_commit_on_disk; + + decree _cleanable_decree; // To deal with gc conflict }; typedef dsn::ref_ptr mutation_log_ptr; diff --git a/src/replica/replica_chkpt.cpp b/src/replica/replica_chkpt.cpp index b975626b61..ffeee67978 100644 --- a/src/replica/replica_chkpt.cpp +++ b/src/replica/replica_chkpt.cpp @@ -171,6 +171,7 @@ void replica::on_checkpoint_timer() } } + _private_log->set_cleanable_decree(cleanable_decree); tasking::enqueue(LPC_GARBAGE_COLLECT_LOGS_AND_REPLICAS, &_tracker, [this, plog, cleanable_decree, valid_start_offset] {