Skip to content

Commit

Permalink
Detect
Browse files Browse the repository at this point in the history
  • Loading branch information
hx235 committed Dec 26, 2024
1 parent e3024e7 commit 370ad0a
Show file tree
Hide file tree
Showing 24 changed files with 704 additions and 120 deletions.
34 changes: 22 additions & 12 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -2071,23 +2071,26 @@ class DBImpl : public DB {
bool read_only, int job_id, SequenceNumber* next_sequence,
bool* stop_replay_for_corruption, bool* stop_replay_by_wal_filter,
uint64_t* corrupted_wal_number, bool* corrupted_wal_found,
std::unordered_map<int, VersionEdit>* version_edits, bool* flushed);
std::unordered_map<int, VersionEdit>* version_edits, bool* flushed,
PredecessorWALInfo& predecessor_wal_info);

void SetupLogFileProcessing(uint64_t wal_number);

Status InitializeLogReader(uint64_t wal_number, bool is_retry,
std::string& fname, bool* const old_log_record,
Status* const reporter_status,
DBOpenLogReporter* reporter,
std::unique_ptr<log::Reader>& reader);
Status InitializeLogReader(
uint64_t wal_number, bool is_retry, std::string& fname,

bool stop_replay_for_corruption, uint64_t min_wal_number,
const PredecessorWALInfo& predecessor_wal_info,
bool* const old_log_record, Status* const reporter_status,
DBOpenLogReporter* reporter, std::unique_ptr<log::Reader>& reader);
Status ProcessLogRecord(
Slice record, const std::unique_ptr<log::Reader>& reader,
const UnorderedMap<uint32_t, size_t>& running_ts_sz, uint64_t wal_number,
const std::string& fname, bool read_only, int job_id,
std::function<void()> logFileDropped, DBOpenLogReporter* reporter,
uint64_t* record_checksum, SequenceNumber* next_sequence,
bool* stop_replay_for_corruption, Status* status,
bool* stop_replay_by_wal_filter,
uint64_t* record_checksum, SequenceNumber* last_seqno_observed,
SequenceNumber* next_sequence, bool* stop_replay_for_corruption,
Status* status, bool* stop_replay_by_wal_filter,
std::unordered_map<int, VersionEdit>* version_edits, bool* flushed);

Status InitializeWriteBatchForLogRecord(
Expand Down Expand Up @@ -2116,8 +2119,13 @@ class DBImpl : public DB {
bool* stop_replay_for_corruption, uint64_t* corrupted_wal_number,
bool* corrupted_wal_found);

void FinishLogFileProcessing(SequenceNumber const* const next_sequence,
const Status& status);
Status UpdatePredecessorWALInfo(uint64_t wal_number,
const SequenceNumber last_seqno_observed,
const std::string& fname,
PredecessorWALInfo& predecessor_wal_info);

void FinishLogFileProcessing(const Status& status,
const SequenceNumber* next_sequence);

// Return `Status::Corruption()` when `stop_replay_for_corruption == true` and
// exits inconsistency between SST and WAL data
Expand Down Expand Up @@ -2309,7 +2317,8 @@ class DBImpl : public DB {
const WriteOptions& write_options,
log::Writer* log_writer, uint64_t* log_used,
uint64_t* log_size,
LogFileNumberSize& log_file_number_size);
LogFileNumberSize& log_file_number_size,
SequenceNumber sequence);

IOStatus WriteToWAL(const WriteThread::WriteGroup& write_group,
log::Writer* log_writer, uint64_t* log_used,
Expand Down Expand Up @@ -2554,6 +2563,7 @@ class DBImpl : public DB {

IOStatus CreateWAL(const WriteOptions& write_options, uint64_t log_file_num,
uint64_t recycle_log_number, size_t preallocate_block_size,
const PredecessorWALInfo& predecessor_wal_info,
log::Writer** new_log);

// Validate self-consistency of DB options
Expand Down
126 changes: 94 additions & 32 deletions db/db_impl/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -754,6 +754,11 @@ Status DBImpl::Recover(
}
}

if (immutable_db_options_.track_and_verify_wals && !is_new_db &&
!immutable_db_options_.best_efforts_recovery && wal_files.empty()) {
return Status::Corruption("Opening an existing DB with no WAL files");
}

if (immutable_db_options_.track_and_verify_wals_in_manifest) {
if (!immutable_db_options_.best_efforts_recovery) {
// Verify WALs in MANIFEST.
Expand Down Expand Up @@ -1189,14 +1194,15 @@ Status DBImpl::ProcessLogFiles(
bool stop_replay_for_corruption = false;
bool flushed = false;
uint64_t corrupted_wal_number = kMaxSequenceNumber;
PredecessorWALInfo predecessor_wal_info;

for (auto wal_number : wal_numbers) {
if (status.ok()) {
status =
ProcessLogFile(wal_number, min_wal_number, is_retry, read_only,
job_id, next_sequence, &stop_replay_for_corruption,
&stop_replay_by_wal_filter, &corrupted_wal_number,
corrupted_wal_found, version_edits, &flushed);
status = ProcessLogFile(
wal_number, min_wal_number, is_retry, read_only, job_id,
next_sequence, &stop_replay_for_corruption,
&stop_replay_by_wal_filter, &corrupted_wal_number,
corrupted_wal_found, version_edits, &flushed, predecessor_wal_info);
}
}

Expand All @@ -1217,7 +1223,8 @@ Status DBImpl::ProcessLogFile(
int job_id, SequenceNumber* next_sequence, bool* stop_replay_for_corruption,
bool* stop_replay_by_wal_filter, uint64_t* corrupted_wal_number,
bool* corrupted_wal_found,
std::unordered_map<int, VersionEdit>* version_edits, bool* flushed) {
std::unordered_map<int, VersionEdit>* version_edits, bool* flushed,
PredecessorWALInfo& predecessor_wal_info) {
assert(stop_replay_by_wal_filter);

// Variable initialization starts
Expand All @@ -1244,6 +1251,11 @@ Status DBImpl::ProcessLogFile(
uint64_t record_checksum;
const UnorderedMap<uint32_t, size_t>& running_ts_sz =
versions_->GetRunningColumnFamiliesTimestampSize();

// We need to track `last_seqno_observed` in addition to `next_sequence` since
// `last_seqno_observed != *next_sequence` when there are multiple key-value
// pairs in one WAL entry
SequenceNumber last_seqno_observed = 0;
// Variable initialization ends

if (wal_number < min_wal_number) {
Expand All @@ -1264,14 +1276,19 @@ Status DBImpl::ProcessLogFile(
}

Status init_status = InitializeLogReader(
wal_number, is_retry, fname, &old_log_record, &status, &reporter, reader);
wal_number, is_retry, fname, *stop_replay_for_corruption, min_wal_number,
predecessor_wal_info, &old_log_record, &status, &reporter, reader);

// FIXME(hx235): Consolidate `!init_status.ok()` and `reader == nullptr` cases
if (!init_status.ok()) {
assert(status.ok());
status.PermitUncheckedError();
return init_status;
} else if (reader == nullptr) {
// TODO(hx235): remove this case since it's confusing
assert(status.ok());
// Fail initializing log reader for one log file with an ok status.
// Try next one.
return status;
}

Expand All @@ -1296,9 +1313,9 @@ Status DBImpl::ProcessLogFile(
// FIXME(hx235): consolidate `process_status` and `status`
Status process_status = ProcessLogRecord(
record, reader, running_ts_sz, wal_number, fname, read_only, job_id,
logFileDropped, &reporter, &record_checksum, next_sequence,
stop_replay_for_corruption, &status, stop_replay_by_wal_filter,
version_edits, flushed);
logFileDropped, &reporter, &record_checksum, &last_seqno_observed,
next_sequence, stop_replay_for_corruption, &status,
stop_replay_by_wal_filter, version_edits, flushed);

if (!process_status.ok()) {
return process_status;
Expand All @@ -1311,13 +1328,19 @@ Status DBImpl::ProcessLogFile(
"Recovered to log #%" PRIu64 " seq #%" PRIu64, wal_number,
*next_sequence);

if (status.ok()) {
status = UpdatePredecessorWALInfo(wal_number, last_seqno_observed, fname,
predecessor_wal_info);
}

if (!status.ok() || old_log_record) {
status = HandleNonOkStatusOrOldLogRecord(
wal_number, next_sequence, status, &old_log_record,
stop_replay_for_corruption, corrupted_wal_number, corrupted_wal_found);
}

FinishLogFileProcessing(next_sequence, status);
FinishLogFileProcessing(status, next_sequence);

return status;
}

Expand All @@ -1332,12 +1355,12 @@ void DBImpl::SetupLogFileProcessing(uint64_t wal_number) {
static_cast<int>(immutable_db_options_.wal_recovery_mode));
}

Status DBImpl::InitializeLogReader(uint64_t wal_number, bool is_retry,
std::string& fname,
bool* const old_log_record,
Status* const reporter_status,
DBOpenLogReporter* reporter,
std::unique_ptr<log::Reader>& reader) {
Status DBImpl::InitializeLogReader(
uint64_t wal_number, bool is_retry, std::string& fname,
bool stop_replay_for_corruption, uint64_t min_wal_number,
const PredecessorWALInfo& predecessor_wal_info, bool* const old_log_record,
Status* const reporter_status, DBOpenLogReporter* reporter,
std::unique_ptr<log::Reader>& reader) {
assert(old_log_record);
assert(reporter_status);
assert(reporter);
Expand Down Expand Up @@ -1375,9 +1398,11 @@ Status DBImpl::InitializeLogReader(uint64_t wal_number, bool is_retry,
// paranoid_checks==false so that corruptions cause entire commits
// to be skipped instead of propagating bad information (like overly
// large sequence numbers).
reader.reset(new log::Reader(immutable_db_options_.info_log,
std::move(file_reader), reporter,
true /*checksum*/, wal_number));
reader.reset(new log::Reader(
immutable_db_options_.info_log, std::move(file_reader), reporter,
true /*checksum*/, wal_number,
immutable_db_options_.track_and_verify_wals, stop_replay_for_corruption,
min_wal_number, predecessor_wal_info));
return status;
}

Expand All @@ -1386,11 +1411,12 @@ Status DBImpl::ProcessLogRecord(
const UnorderedMap<uint32_t, size_t>& running_ts_sz, uint64_t wal_number,
const std::string& fname, bool read_only, int job_id,
std::function<void()> logFileDropped, DBOpenLogReporter* reporter,
uint64_t* record_checksum, SequenceNumber* next_sequence,
bool* stop_replay_for_corruption, Status* status,
bool* stop_replay_by_wal_filter,
uint64_t* record_checksum, SequenceNumber* last_seqno_observed,
SequenceNumber* next_sequence, bool* stop_replay_for_corruption,
Status* status, bool* stop_replay_by_wal_filter,
std::unordered_map<int, VersionEdit>* version_edits, bool* flushed) {
assert(reporter);
assert(last_seqno_observed);
assert(stop_replay_for_corruption);
assert(status);
assert(stop_replay_by_wal_filter);
Expand All @@ -1416,17 +1442,18 @@ Status DBImpl::ProcessLogRecord(
}
assert(batch_to_use);

SequenceNumber sequence = WriteBatchInternal::Sequence(batch_to_use);
if (sequence > kMaxSequenceNumber) {
*last_seqno_observed = WriteBatchInternal::Sequence(batch_to_use);

if (*last_seqno_observed > kMaxSequenceNumber) {
reporter->Corruption(
record.size(),
Status::Corruption("sequence " + std::to_string(sequence) +
Status::Corruption("sequence " + std::to_string(*last_seqno_observed) +
" is too large"));
assert(process_status.ok());
return process_status;
}

MaybeReviseStopReplayForCorruption(sequence, next_sequence,
MaybeReviseStopReplayForCorruption(*last_seqno_observed, next_sequence,
stop_replay_for_corruption);
if (*stop_replay_for_corruption) {
logFileDropped();
Expand Down Expand Up @@ -1630,8 +1657,30 @@ Status DBImpl::HandleNonOkStatusOrOldLogRecord(
return status;
}
}
void DBImpl::FinishLogFileProcessing(SequenceNumber const* const next_sequence,
const Status& status) {

Status DBImpl::UpdatePredecessorWALInfo(
uint64_t wal_number, const SequenceNumber last_seqno_observed,
const std::string& fname, PredecessorWALInfo& predecessor_wal_info) {
uint64_t bytes;

Status s = env_->GetFileSize(fname, &bytes);
if (!s.ok()) {
return s;
}

SequenceNumber mock_seqno = kMaxSequenceNumber;
[[maybe_unused]] std::pair<uint64_t, SequenceNumber*> pair =
std::make_pair(wal_number, &mock_seqno);
TEST_SYNC_POINT_CALLBACK("DBImpl::UpdatePredecessorWALInfo", &pair);
predecessor_wal_info = PredecessorWALInfo(
wal_number, bytes,
mock_seqno != kMaxSequenceNumber ? mock_seqno : last_seqno_observed);

return s;
}

void DBImpl::FinishLogFileProcessing(const Status& status,
const SequenceNumber* next_sequence) {
if (status.ok()) {
assert(next_sequence);
flush_scheduler_.Clear();
Expand Down Expand Up @@ -2193,6 +2242,7 @@ Status DB::OpenAndTrimHistory(
IOStatus DBImpl::CreateWAL(const WriteOptions& write_options,
uint64_t log_file_num, uint64_t recycle_log_number,
size_t preallocate_block_size,
const PredecessorWALInfo& predecessor_wal_info,
log::Writer** new_log) {
IOStatus io_s;
std::unique_ptr<FSWritableFile> lfile;
Expand Down Expand Up @@ -2236,9 +2286,15 @@ IOStatus DBImpl::CreateWAL(const WriteOptions& write_options,
*new_log = new log::Writer(std::move(file_writer), log_file_num,
immutable_db_options_.recycle_log_file_num > 0,
immutable_db_options_.manual_wal_flush,
immutable_db_options_.wal_compression);
immutable_db_options_.wal_compression,
immutable_db_options_.track_and_verify_wals);
io_s = (*new_log)->AddCompressionTypeRecord(write_options);
if (io_s.ok()) {
io_s = (*new_log)->MaybeAddPredecessorWALInfo(write_options,
predecessor_wal_info);
}
}

return io_s;
}

Expand Down Expand Up @@ -2331,8 +2387,14 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
log::Writer* new_log = nullptr;
const size_t preallocate_block_size =
impl->GetWalPreallocateBlockSize(max_write_buffer_size);
// TODO(hx235): Pass in the correct `predecessor_wal_info` for the first WAL
// created during DB open with predecessor WALs from previous DB session due
// to `avoid_flush_during_recovery == true`. This can protect the last WAL
// recovered.
s = impl->CreateWAL(write_options, new_log_number, 0 /*recycle_log_number*/,
preallocate_block_size, &new_log);
preallocate_block_size,
PredecessorWALInfo() /* predecessor_wal_info */,
&new_log);
if (s.ok()) {
// Prevent log files created by previous instance from being recycled.
// They might be in alive_log_file_, and might get recycled otherwise.
Expand Down Expand Up @@ -2367,7 +2429,7 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
assert(log_writer->get_log_number() == log_file_number_size.number);
impl->mutex_.AssertHeld();
s = impl->WriteToWAL(empty_batch, write_options, log_writer, &log_used,
&log_size, log_file_number_size);
&log_size, log_file_number_size, recovered_seq);
if (s.ok()) {
// Need to fsync, otherwise it might get lost after a power reset.
s = impl->FlushWAL(write_options, false);
Expand Down
20 changes: 15 additions & 5 deletions db/db_impl/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1558,7 +1558,8 @@ IOStatus DBImpl::WriteToWAL(const WriteBatch& merged_batch,
const WriteOptions& write_options,
log::Writer* log_writer, uint64_t* log_used,
uint64_t* log_size,
LogFileNumberSize& log_file_number_size) {
LogFileNumberSize& log_file_number_size,
SequenceNumber sequence) {
assert(log_size != nullptr);

Slice log_entry = WriteBatchInternal::Contents(&merged_batch);
Expand All @@ -1584,7 +1585,7 @@ IOStatus DBImpl::WriteToWAL(const WriteBatch& merged_batch,
if (!io_s.ok()) {
return io_s;
}
io_s = log_writer->AddRecord(write_options, log_entry);
io_s = log_writer->AddRecord(write_options, log_entry, sequence);

if (UNLIKELY(needs_locking)) {
log_write_mutex_.Unlock();
Expand Down Expand Up @@ -1634,7 +1635,7 @@ IOStatus DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group,
write_options.rate_limiter_priority =
write_group.leader->rate_limiter_priority;
io_s = WriteToWAL(*merged_batch, write_options, log_writer, log_used,
&log_size, log_file_number_size);
&log_size, log_file_number_size, sequence);
if (to_be_cached_state) {
cached_recoverable_state_ = *to_be_cached_state;
cached_recoverable_state_empty_ = false;
Expand Down Expand Up @@ -1760,7 +1761,7 @@ IOStatus DBImpl::ConcurrentWriteToWAL(
write_options.rate_limiter_priority =
write_group.leader->rate_limiter_priority;
io_s = WriteToWAL(*merged_batch, write_options, log_writer, log_used,
&log_size, log_file_number_size);
&log_size, log_file_number_size, sequence);
if (to_be_cached_state) {
cached_recoverable_state_ = *to_be_cached_state;
cached_recoverable_state_empty_ = false;
Expand Down Expand Up @@ -2443,10 +2444,19 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context,
GetWalPreallocateBlockSize(mutable_cf_options.write_buffer_size);
mutex_.Unlock();
if (creating_new_log) {
PredecessorWALInfo info;
log_write_mutex_.Lock();
if (!logs_.empty()) {
log::Writer* cur_log_writer = logs_.back().writer;
info = PredecessorWALInfo(cur_log_writer->get_log_number(),
cur_log_writer->file()->GetFileSize(),
cur_log_writer->GetLastSeqnoRecorded());
}
log_write_mutex_.Unlock();
// TODO: Write buffer size passed in should be max of all CF's instead
// of mutable_cf_options.write_buffer_size.
io_s = CreateWAL(write_options, new_log_number, recycle_log_number,
preallocate_block_size, &new_log);
preallocate_block_size, info, &new_log);
if (s.ok()) {
s = io_s;
}
Expand Down
Loading

0 comments on commit 370ad0a

Please sign in to comment.