Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
hx235 committed Feb 11, 2025
1 parent d48af21 commit da73398
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 43 deletions.
42 changes: 28 additions & 14 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,15 +162,25 @@ class Directories {
std::unique_ptr<FSDirectory> wal_dir_;
};

struct DBOpenLogReporter : public log::Reader::Reporter {
struct DBOpenLogRecordReadReporter : public log::Reader::Reporter {
Env* env;
Logger* info_log;
const char* fname;
Status* status; // nullptr if immutable_db_options_.paranoid_checks==false
bool* old_log_record;
void Corruption(size_t bytes, const Status& s) override;

void PredecessorLogCorruption(size_t bytes, const Status& s,
uint64_t predecessor_log_number) override;

void OldLogRecord(size_t bytes) override;

uint64_t GetCorruptedPredecessorLogNumber() const {
return corrupted_predecessor_log_number_;
}

private:
uint64_t corrupted_predecessor_log_number_ = kMaxSequenceNumber;
};

// While DB is the public interface of RocksDB, and DBImpl is the actual
Expand Down Expand Up @@ -2067,21 +2077,25 @@ class DBImpl : public DB {

void SetupLogFileProcessing(uint64_t wal_number);

Status InitializeLogReader(
uint64_t wal_number, bool is_retry, std::string& fname,
Status InitializeLogReader(uint64_t wal_number, bool is_retry,
std::string& fname,

bool stop_replay_for_corruption, uint64_t min_wal_number,
const PredecessorWALInfo& predecessor_wal_info,
bool* const old_log_record, Status* const reporter_status,
DBOpenLogReporter* reporter, std::unique_ptr<log::Reader>& reader);
bool stop_replay_for_corruption,
uint64_t min_wal_number,
const PredecessorWALInfo& predecessor_wal_info,
bool* const old_log_record,
Status* const reporter_status,
DBOpenLogRecordReadReporter* reporter,
std::unique_ptr<log::Reader>& reader);
Status ProcessLogRecord(
Slice record, const std::unique_ptr<log::Reader>& reader,
const UnorderedMap<uint32_t, size_t>& running_ts_sz, uint64_t wal_number,
const std::string& fname, bool read_only, int job_id,
std::function<void()> logFileDropped, DBOpenLogReporter* reporter,
uint64_t* record_checksum, SequenceNumber* last_seqno_observed,
SequenceNumber* next_sequence, bool* stop_replay_for_corruption,
Status* status, bool* stop_replay_by_wal_filter,
std::function<void()> logFileDropped,
DBOpenLogRecordReadReporter* reporter, uint64_t* record_checksum,
SequenceNumber* last_seqno_observed, SequenceNumber* next_sequence,
bool* stop_replay_for_corruption, Status* status,
bool* stop_replay_by_wal_filter,
std::unordered_map<int, VersionEdit>* version_edits, bool* flushed);

Status InitializeWriteBatchForLogRecord(
Expand All @@ -2106,9 +2120,9 @@ class DBImpl : public DB {

Status HandleNonOkStatusOrOldLogRecord(
uint64_t wal_number, SequenceNumber const* const next_sequence,
Status log_read_status, bool* old_log_record,
bool* stop_replay_for_corruption, uint64_t* corrupted_wal_number,
bool* corrupted_wal_found);
Status status, const DBOpenLogRecordReadReporter& reporter,
bool* old_log_record, bool* stop_replay_for_corruption,
uint64_t* corrupted_wal_number, bool* corrupted_wal_found);

Status UpdatePredecessorWALInfo(uint64_t wal_number,
const SequenceNumber last_seqno_observed,
Expand Down
33 changes: 24 additions & 9 deletions db/db_impl/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1106,7 +1106,7 @@ bool DBImpl::InvokeWalFilterIfNeededOnWalRecord(uint64_t wal_number,
return true;
}

void DBOpenLogReporter::Corruption(size_t bytes, const Status& s) {
void DBOpenLogRecordReadReporter::Corruption(size_t bytes, const Status& s) {
ROCKS_LOG_WARN(info_log, "%s%s: dropping %d bytes; %s",
(status == nullptr ? "(ignoring error) " : ""), fname,
static_cast<int>(bytes), s.ToString().c_str());
Expand All @@ -1115,7 +1115,16 @@ void DBOpenLogReporter::Corruption(size_t bytes, const Status& s) {
}
}

void DBOpenLogReporter::OldLogRecord(size_t bytes) {
void DBOpenLogRecordReadReporter::PredecessorLogCorruption(
size_t bytes, const Status& s, uint64_t predecessor_log_number) {
assert(predecessor_log_number != kMaxSequenceNumber);
assert(corrupted_predecessor_log_number_ == kMaxSequenceNumber);

Corruption(bytes, s);
corrupted_predecessor_log_number_ = predecessor_log_number;
}

void DBOpenLogRecordReadReporter::OldLogRecord(size_t bytes) {
if (old_log_record != nullptr) {
*old_log_record = true;
}
Expand Down Expand Up @@ -1229,7 +1238,7 @@ Status DBImpl::ProcessLogFile(
Status status;
bool old_log_record = false;

DBOpenLogReporter reporter;
DBOpenLogRecordReadReporter reporter;
std::unique_ptr<log::Reader> reader;

std::string fname =
Expand Down Expand Up @@ -1323,7 +1332,7 @@ Status DBImpl::ProcessLogFile(
}

ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Recovered to log #%" PRIu64 " seq #%" PRIu64, wal_number,
"Recovered to log #%" PRIu64 " next seq #%" PRIu64, wal_number,
*next_sequence);

if (status.ok()) {
Expand All @@ -1333,7 +1342,7 @@ Status DBImpl::ProcessLogFile(

if (!status.ok() || old_log_record) {
status = HandleNonOkStatusOrOldLogRecord(
wal_number, next_sequence, status, &old_log_record,
wal_number, next_sequence, status, reporter, &old_log_record,
stop_replay_for_corruption, corrupted_wal_number, corrupted_wal_found);
}

Expand All @@ -1357,7 +1366,7 @@ Status DBImpl::InitializeLogReader(
uint64_t wal_number, bool is_retry, std::string& fname,
bool stop_replay_for_corruption, uint64_t min_wal_number,
const PredecessorWALInfo& predecessor_wal_info, bool* const old_log_record,
Status* const reporter_status, DBOpenLogReporter* reporter,
Status* const reporter_status, DBOpenLogRecordReadReporter* reporter,
std::unique_ptr<log::Reader>& reader) {
assert(old_log_record);
assert(reporter_status);
Expand Down Expand Up @@ -1408,7 +1417,7 @@ Status DBImpl::ProcessLogRecord(
Slice record, const std::unique_ptr<log::Reader>& reader,
const UnorderedMap<uint32_t, size_t>& running_ts_sz, uint64_t wal_number,
const std::string& fname, bool read_only, int job_id,
std::function<void()> logFileDropped, DBOpenLogReporter* reporter,
std::function<void()> logFileDropped, DBOpenLogRecordReadReporter* reporter,
uint64_t* record_checksum, SequenceNumber* last_seqno_observed,
SequenceNumber* next_sequence, bool* stop_replay_for_corruption,
Status* status, bool* stop_replay_by_wal_filter,
Expand Down Expand Up @@ -1607,7 +1616,8 @@ Status DBImpl::MaybeWriteLevel0TableForRecovery(

Status DBImpl::HandleNonOkStatusOrOldLogRecord(
uint64_t wal_number, SequenceNumber const* const next_sequence,
Status status, bool* old_log_record, bool* stop_replay_for_corruption,
Status status, const DBOpenLogRecordReadReporter& reporter,
bool* old_log_record, bool* stop_replay_for_corruption,
uint64_t* corrupted_wal_number, bool* corrupted_wal_found) {
assert(!status.ok() || *old_log_record);

Expand Down Expand Up @@ -1641,7 +1651,12 @@ Status DBImpl::HandleNonOkStatusOrOldLogRecord(
// We should ignore the error but not continue replaying
*old_log_record = false;
*stop_replay_for_corruption = true;
*corrupted_wal_number = wal_number;
uint64_t reporter_corrupted_predecessor_wal_number =
reporter.GetCorruptedPredecessorLogNumber();
*corrupted_wal_number =
reporter_corrupted_predecessor_wal_number != kMaxSequenceNumber
? reporter_corrupted_predecessor_wal_number
: wal_number;
if (corrupted_wal_found != nullptr) {
*corrupted_wal_found = true;
}
Expand Down
28 changes: 20 additions & 8 deletions db/log_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -374,14 +374,16 @@ void Reader::MaybeVerifyPredecessorWALInfo(
if (recorded_predecessor_log_number >= min_wal_number_to_keep_) {
std::string reason = "Missing WAL of log number " +
std::to_string(recorded_predecessor_log_number);
ReportCorruption(fragment.size(), reason.c_str());
ReportCorruption(fragment.size(), reason.c_str(),
recorded_predecessor_log_number);
}
} else {
if (observed_predecessor_wal_info_.GetLogNumber() !=
recorded_predecessor_log_number) {
std::string reason = "Missing WAL of log number " +
std::to_string(recorded_predecessor_log_number);
ReportCorruption(fragment.size(), reason.c_str());
ReportCorruption(fragment.size(), reason.c_str(),
recorded_predecessor_log_number);
} else if (observed_predecessor_wal_info_.GetLastSeqnoRecorded() !=
recorded_predecessor_wal_info.GetLastSeqnoRecorded()) {
std::string reason =
Expand All @@ -392,7 +394,8 @@ void Reader::MaybeVerifyPredecessorWALInfo(
std::to_string(
observed_predecessor_wal_info_.GetLastSeqnoRecorded()) +
". (Last sequence number equal to 0 indicates no WAL records)";
ReportCorruption(fragment.size(), reason.c_str());
ReportCorruption(fragment.size(), reason.c_str(),
recorded_predecessor_log_number);
} else if (observed_predecessor_wal_info_.GetSizeBytes() !=
recorded_predecessor_wal_info.GetSizeBytes()) {
std::string reason =
Expand All @@ -402,7 +405,8 @@ void Reader::MaybeVerifyPredecessorWALInfo(
" bytes. Observed " +
std::to_string(observed_predecessor_wal_info_.GetSizeBytes()) +
" bytes.";
ReportCorruption(fragment.size(), reason.c_str());
ReportCorruption(fragment.size(), reason.c_str(),
recorded_predecessor_log_number);
}
}
}
Expand Down Expand Up @@ -483,13 +487,21 @@ void Reader::UnmarkEOFInternal() {
}
}

void Reader::ReportCorruption(size_t bytes, const char* reason) {
ReportDrop(bytes, Status::Corruption(reason));
void Reader::ReportCorruption(size_t bytes, const char* reason,
uint64_t corrupted_predecessor_log_number) {
ReportDrop(bytes, Status::Corruption(reason),
corrupted_predecessor_log_number);
}

void Reader::ReportDrop(size_t bytes, const Status& reason) {
void Reader::ReportDrop(size_t bytes, const Status& reason,
uint64_t corrupted_predecessor_log_number) {
if (reporter_ != nullptr) {
reporter_->Corruption(bytes, reason);
if (corrupted_predecessor_log_number != kMaxSequenceNumber) {
reporter_->PredecessorLogCorruption(bytes, reason,
corrupted_predecessor_log_number);
} else {
reporter_->Corruption(bytes, reason);
}
}
}

Expand Down
29 changes: 17 additions & 12 deletions db/log_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ class Reader {
// of bytes dropped due to the corruption.
virtual void Corruption(size_t bytes, const Status& status) = 0;

virtual void PredecessorLogCorruption(size_t /*bytes*/,
const Status& /*status*/,
uint64_t /*predecessor_log_number*/) {
}
virtual void OldLogRecord(size_t /*bytes*/) {}
};

Expand Down Expand Up @@ -220,8 +224,12 @@ class Reader {

// Reports dropped bytes to the reporter.
// buffer_ must be updated to remove the dropped bytes prior to invocation.
void ReportCorruption(size_t bytes, const char* reason);
void ReportDrop(size_t bytes, const Status& reason);
void ReportCorruption(
size_t bytes, const char* reason,
uint64_t corrupted_predecessor_log_number = kMaxSequenceNumber);
void ReportDrop(
size_t bytes, const Status& reason,
uint64_t corrupted_predecessor_log_number = kMaxSequenceNumber);
void ReportOldLogRecord(size_t bytes);

void InitCompression(const CompressionTypeRecord& compression_record);
Expand All @@ -236,17 +244,14 @@ class Reader {

class FragmentBufferedReader : public Reader {
public:
FragmentBufferedReader(
std::shared_ptr<Logger> info_log,
std::unique_ptr<SequentialFileReader>&& _file, Reporter* reporter,
bool checksum, uint64_t log_num, bool verify_and_track_wals = false,
bool stop_replay_for_corruption = false,
uint64_t min_wal_number_to_keep = std::numeric_limits<uint64_t>::max(),
const PredecessorWALInfo& observed_predecessor_wal_info =
PredecessorWALInfo())
FragmentBufferedReader(std::shared_ptr<Logger> info_log,
std::unique_ptr<SequentialFileReader>&& _file,
Reporter* reporter, bool checksum, uint64_t log_num)
: Reader(info_log, std::move(_file), reporter, checksum, log_num,
verify_and_track_wals, stop_replay_for_corruption,
min_wal_number_to_keep, observed_predecessor_wal_info),
false /*verify_and_track_wals*/,
false /*stop_replay_for_corruption*/,
std::numeric_limits<uint64_t>::max() /*min_wal_number_to_keep*/,
PredecessorWALInfo() /*observed_predecessor_wal_info*/),
fragments_(),
in_fragmented_record_(false) {}
~FragmentBufferedReader() override {}
Expand Down

0 comments on commit da73398

Please sign in to comment.