diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 6621b20f64a..2e29cc26cc1 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -2071,23 +2071,26 @@ class DBImpl : public DB { bool read_only, int job_id, SequenceNumber* next_sequence, bool* stop_replay_for_corruption, bool* stop_replay_by_wal_filter, uint64_t* corrupted_wal_number, bool* corrupted_wal_found, - std::unordered_map* version_edits, bool* flushed); + std::unordered_map* version_edits, bool* flushed, + PredecessorWALInfo& predecessor_wal_info); void SetupLogFileProcessing(uint64_t wal_number); - Status InitializeLogReader(uint64_t wal_number, bool is_retry, - std::string& fname, bool* const old_log_record, - Status* const reporter_status, - DBOpenLogReporter* reporter, - std::unique_ptr& reader); + Status InitializeLogReader( + uint64_t wal_number, bool is_retry, std::string& fname, + + bool stop_replay_for_corruption, uint64_t min_wal_number, + const PredecessorWALInfo& predecessor_wal_info, + bool* const old_log_record, Status* const reporter_status, + DBOpenLogReporter* reporter, std::unique_ptr& reader); Status ProcessLogRecord( Slice record, const std::unique_ptr& reader, const UnorderedMap& running_ts_sz, uint64_t wal_number, const std::string& fname, bool read_only, int job_id, std::function logFileDropped, DBOpenLogReporter* reporter, - uint64_t* record_checksum, SequenceNumber* next_sequence, - bool* stop_replay_for_corruption, Status* status, - bool* stop_replay_by_wal_filter, + uint64_t* record_checksum, SequenceNumber* last_seqno_observed, + SequenceNumber* next_sequence, bool* stop_replay_for_corruption, + Status* status, bool* stop_replay_by_wal_filter, std::unordered_map* version_edits, bool* flushed); Status InitializeWriteBatchForLogRecord( @@ -2116,8 +2119,13 @@ class DBImpl : public DB { bool* stop_replay_for_corruption, uint64_t* corrupted_wal_number, bool* corrupted_wal_found); - void FinishLogFileProcessing(SequenceNumber const* const next_sequence, - const Status& status); + Status UpdatePredecessorWALInfo(uint64_t wal_number, + const SequenceNumber last_seqno_observed, + const std::string& fname, + PredecessorWALInfo& predecessor_wal_info); + + void FinishLogFileProcessing(const Status& status, + const SequenceNumber* next_sequence); // Return `Status::Corruption()` when `stop_replay_for_corruption == true` and // exits inconsistency between SST and WAL data @@ -2309,7 +2317,8 @@ class DBImpl : public DB { const WriteOptions& write_options, log::Writer* log_writer, uint64_t* log_used, uint64_t* log_size, - LogFileNumberSize& log_file_number_size); + LogFileNumberSize& log_file_number_size, + SequenceNumber sequence); IOStatus WriteToWAL(const WriteThread::WriteGroup& write_group, log::Writer* log_writer, uint64_t* log_used, @@ -2554,6 +2563,7 @@ class DBImpl : public DB { IOStatus CreateWAL(const WriteOptions& write_options, uint64_t log_file_num, uint64_t recycle_log_number, size_t preallocate_block_size, + const PredecessorWALInfo& predecessor_wal_info, log::Writer** new_log); // Validate self-consistency of DB options diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index d3c472640c6..065ea4446d8 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -754,6 +754,11 @@ Status DBImpl::Recover( } } + if (immutable_db_options_.track_and_verify_wals && !is_new_db && + !immutable_db_options_.best_efforts_recovery && wal_files.empty()) { + return Status::Corruption("Opening an existing DB with no WAL files"); + } + if (immutable_db_options_.track_and_verify_wals_in_manifest) { if (!immutable_db_options_.best_efforts_recovery) { // Verify WALs in MANIFEST. @@ -1189,14 +1194,15 @@ Status DBImpl::ProcessLogFiles( bool stop_replay_for_corruption = false; bool flushed = false; uint64_t corrupted_wal_number = kMaxSequenceNumber; + PredecessorWALInfo predecessor_wal_info; for (auto wal_number : wal_numbers) { if (status.ok()) { - status = - ProcessLogFile(wal_number, min_wal_number, is_retry, read_only, - job_id, next_sequence, &stop_replay_for_corruption, - &stop_replay_by_wal_filter, &corrupted_wal_number, - corrupted_wal_found, version_edits, &flushed); + status = ProcessLogFile( + wal_number, min_wal_number, is_retry, read_only, job_id, + next_sequence, &stop_replay_for_corruption, + &stop_replay_by_wal_filter, &corrupted_wal_number, + corrupted_wal_found, version_edits, &flushed, predecessor_wal_info); } } @@ -1217,7 +1223,8 @@ Status DBImpl::ProcessLogFile( int job_id, SequenceNumber* next_sequence, bool* stop_replay_for_corruption, bool* stop_replay_by_wal_filter, uint64_t* corrupted_wal_number, bool* corrupted_wal_found, - std::unordered_map* version_edits, bool* flushed) { + std::unordered_map* version_edits, bool* flushed, + PredecessorWALInfo& predecessor_wal_info) { assert(stop_replay_by_wal_filter); // Variable initialization starts @@ -1244,6 +1251,11 @@ Status DBImpl::ProcessLogFile( uint64_t record_checksum; const UnorderedMap& running_ts_sz = versions_->GetRunningColumnFamiliesTimestampSize(); + + // We need to track `last_seqno_observed` in addition to `next_sequence` since + // `last_seqno_observed != *next_sequence` when there are multiple key-value + // pairs in one WAL entry + SequenceNumber last_seqno_observed = 0; // Variable initialization ends if (wal_number < min_wal_number) { @@ -1264,7 +1276,10 @@ Status DBImpl::ProcessLogFile( } Status init_status = InitializeLogReader( - wal_number, is_retry, fname, &old_log_record, &status, &reporter, reader); + wal_number, is_retry, fname, *stop_replay_for_corruption, min_wal_number, + predecessor_wal_info, &old_log_record, &status, &reporter, reader); + + // FIXME(hx235): Consolidate `!init_status.ok()` and `reader == nullptr` cases if (!init_status.ok()) { assert(status.ok()); status.PermitUncheckedError(); @@ -1272,6 +1287,8 @@ Status DBImpl::ProcessLogFile( } else if (reader == nullptr) { // TODO(hx235): remove this case since it's confusing assert(status.ok()); + // Fail initializing log reader for one log file with an ok status. + // Try next one. return status; } @@ -1296,9 +1313,9 @@ Status DBImpl::ProcessLogFile( // FIXME(hx235): consolidate `process_status` and `status` Status process_status = ProcessLogRecord( record, reader, running_ts_sz, wal_number, fname, read_only, job_id, - logFileDropped, &reporter, &record_checksum, next_sequence, - stop_replay_for_corruption, &status, stop_replay_by_wal_filter, - version_edits, flushed); + logFileDropped, &reporter, &record_checksum, &last_seqno_observed, + next_sequence, stop_replay_for_corruption, &status, + stop_replay_by_wal_filter, version_edits, flushed); if (!process_status.ok()) { return process_status; @@ -1311,13 +1328,19 @@ Status DBImpl::ProcessLogFile( "Recovered to log #%" PRIu64 " seq #%" PRIu64, wal_number, *next_sequence); + if (status.ok()) { + status = UpdatePredecessorWALInfo(wal_number, last_seqno_observed, fname, + predecessor_wal_info); + } + if (!status.ok() || old_log_record) { status = HandleNonOkStatusOrOldLogRecord( wal_number, next_sequence, status, &old_log_record, stop_replay_for_corruption, corrupted_wal_number, corrupted_wal_found); } - FinishLogFileProcessing(next_sequence, status); + FinishLogFileProcessing(status, next_sequence); + return status; } @@ -1332,12 +1355,12 @@ void DBImpl::SetupLogFileProcessing(uint64_t wal_number) { static_cast(immutable_db_options_.wal_recovery_mode)); } -Status DBImpl::InitializeLogReader(uint64_t wal_number, bool is_retry, - std::string& fname, - bool* const old_log_record, - Status* const reporter_status, - DBOpenLogReporter* reporter, - std::unique_ptr& reader) { +Status DBImpl::InitializeLogReader( + uint64_t wal_number, bool is_retry, std::string& fname, + bool stop_replay_for_corruption, uint64_t min_wal_number, + const PredecessorWALInfo& predecessor_wal_info, bool* const old_log_record, + Status* const reporter_status, DBOpenLogReporter* reporter, + std::unique_ptr& reader) { assert(old_log_record); assert(reporter_status); assert(reporter); @@ -1375,9 +1398,11 @@ Status DBImpl::InitializeLogReader(uint64_t wal_number, bool is_retry, // paranoid_checks==false so that corruptions cause entire commits // to be skipped instead of propagating bad information (like overly // large sequence numbers). - reader.reset(new log::Reader(immutable_db_options_.info_log, - std::move(file_reader), reporter, - true /*checksum*/, wal_number)); + reader.reset(new log::Reader( + immutable_db_options_.info_log, std::move(file_reader), reporter, + true /*checksum*/, wal_number, + immutable_db_options_.track_and_verify_wals, stop_replay_for_corruption, + min_wal_number, predecessor_wal_info)); return status; } @@ -1386,11 +1411,12 @@ Status DBImpl::ProcessLogRecord( const UnorderedMap& running_ts_sz, uint64_t wal_number, const std::string& fname, bool read_only, int job_id, std::function logFileDropped, DBOpenLogReporter* reporter, - uint64_t* record_checksum, SequenceNumber* next_sequence, - bool* stop_replay_for_corruption, Status* status, - bool* stop_replay_by_wal_filter, + uint64_t* record_checksum, SequenceNumber* last_seqno_observed, + SequenceNumber* next_sequence, bool* stop_replay_for_corruption, + Status* status, bool* stop_replay_by_wal_filter, std::unordered_map* version_edits, bool* flushed) { assert(reporter); + assert(last_seqno_observed); assert(stop_replay_for_corruption); assert(status); assert(stop_replay_by_wal_filter); @@ -1416,17 +1442,18 @@ Status DBImpl::ProcessLogRecord( } assert(batch_to_use); - SequenceNumber sequence = WriteBatchInternal::Sequence(batch_to_use); - if (sequence > kMaxSequenceNumber) { + *last_seqno_observed = WriteBatchInternal::Sequence(batch_to_use); + + if (*last_seqno_observed > kMaxSequenceNumber) { reporter->Corruption( record.size(), - Status::Corruption("sequence " + std::to_string(sequence) + + Status::Corruption("sequence " + std::to_string(*last_seqno_observed) + " is too large")); assert(process_status.ok()); return process_status; } - MaybeReviseStopReplayForCorruption(sequence, next_sequence, + MaybeReviseStopReplayForCorruption(*last_seqno_observed, next_sequence, stop_replay_for_corruption); if (*stop_replay_for_corruption) { logFileDropped(); @@ -1630,8 +1657,30 @@ Status DBImpl::HandleNonOkStatusOrOldLogRecord( return status; } } -void DBImpl::FinishLogFileProcessing(SequenceNumber const* const next_sequence, - const Status& status) { + +Status DBImpl::UpdatePredecessorWALInfo( + uint64_t wal_number, const SequenceNumber last_seqno_observed, + const std::string& fname, PredecessorWALInfo& predecessor_wal_info) { + uint64_t bytes; + + Status s = env_->GetFileSize(fname, &bytes); + if (!s.ok()) { + return s; + } + + SequenceNumber mock_seqno = kMaxSequenceNumber; + [[maybe_unused]] std::pair pair = + std::make_pair(wal_number, &mock_seqno); + TEST_SYNC_POINT_CALLBACK("DBImpl::UpdatePredecessorWALInfo", &pair); + predecessor_wal_info = PredecessorWALInfo( + wal_number, bytes, + mock_seqno != kMaxSequenceNumber ? mock_seqno : last_seqno_observed); + + return s; +} + +void DBImpl::FinishLogFileProcessing(const Status& status, + const SequenceNumber* next_sequence) { if (status.ok()) { assert(next_sequence); flush_scheduler_.Clear(); @@ -2193,6 +2242,7 @@ Status DB::OpenAndTrimHistory( IOStatus DBImpl::CreateWAL(const WriteOptions& write_options, uint64_t log_file_num, uint64_t recycle_log_number, size_t preallocate_block_size, + const PredecessorWALInfo& predecessor_wal_info, log::Writer** new_log) { IOStatus io_s; std::unique_ptr lfile; @@ -2236,9 +2286,15 @@ IOStatus DBImpl::CreateWAL(const WriteOptions& write_options, *new_log = new log::Writer(std::move(file_writer), log_file_num, immutable_db_options_.recycle_log_file_num > 0, immutable_db_options_.manual_wal_flush, - immutable_db_options_.wal_compression); + immutable_db_options_.wal_compression, + immutable_db_options_.track_and_verify_wals); io_s = (*new_log)->AddCompressionTypeRecord(write_options); + if (io_s.ok()) { + io_s = (*new_log)->MaybeAddPredecessorWALInfo(write_options, + predecessor_wal_info); + } } + return io_s; } @@ -2331,8 +2387,14 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, log::Writer* new_log = nullptr; const size_t preallocate_block_size = impl->GetWalPreallocateBlockSize(max_write_buffer_size); + // TODO(hx235): Pass in the correct `predecessor_wal_info` for the first WAL + // created during DB open with predecessor WALs from previous DB session due + // to `avoid_flush_during_recovery == true`. This can protect the last WAL + // recovered. s = impl->CreateWAL(write_options, new_log_number, 0 /*recycle_log_number*/, - preallocate_block_size, &new_log); + preallocate_block_size, + PredecessorWALInfo() /* predecessor_wal_info */, + &new_log); if (s.ok()) { // Prevent log files created by previous instance from being recycled. // They might be in alive_log_file_, and might get recycled otherwise. @@ -2367,7 +2429,7 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, assert(log_writer->get_log_number() == log_file_number_size.number); impl->mutex_.AssertHeld(); s = impl->WriteToWAL(empty_batch, write_options, log_writer, &log_used, - &log_size, log_file_number_size); + &log_size, log_file_number_size, recovered_seq); if (s.ok()) { // Need to fsync, otherwise it might get lost after a power reset. s = impl->FlushWAL(write_options, false); diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index 9f627430d68..39b03dc7e3a 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -1558,7 +1558,8 @@ IOStatus DBImpl::WriteToWAL(const WriteBatch& merged_batch, const WriteOptions& write_options, log::Writer* log_writer, uint64_t* log_used, uint64_t* log_size, - LogFileNumberSize& log_file_number_size) { + LogFileNumberSize& log_file_number_size, + SequenceNumber sequence) { assert(log_size != nullptr); Slice log_entry = WriteBatchInternal::Contents(&merged_batch); @@ -1584,7 +1585,7 @@ IOStatus DBImpl::WriteToWAL(const WriteBatch& merged_batch, if (!io_s.ok()) { return io_s; } - io_s = log_writer->AddRecord(write_options, log_entry); + io_s = log_writer->AddRecord(write_options, log_entry, sequence); if (UNLIKELY(needs_locking)) { log_write_mutex_.Unlock(); @@ -1634,7 +1635,7 @@ IOStatus DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group, write_options.rate_limiter_priority = write_group.leader->rate_limiter_priority; io_s = WriteToWAL(*merged_batch, write_options, log_writer, log_used, - &log_size, log_file_number_size); + &log_size, log_file_number_size, sequence); if (to_be_cached_state) { cached_recoverable_state_ = *to_be_cached_state; cached_recoverable_state_empty_ = false; @@ -1760,7 +1761,7 @@ IOStatus DBImpl::ConcurrentWriteToWAL( write_options.rate_limiter_priority = write_group.leader->rate_limiter_priority; io_s = WriteToWAL(*merged_batch, write_options, log_writer, log_used, - &log_size, log_file_number_size); + &log_size, log_file_number_size, sequence); if (to_be_cached_state) { cached_recoverable_state_ = *to_be_cached_state; cached_recoverable_state_empty_ = false; @@ -2443,10 +2444,19 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context, GetWalPreallocateBlockSize(mutable_cf_options.write_buffer_size); mutex_.Unlock(); if (creating_new_log) { + PredecessorWALInfo info; + log_write_mutex_.Lock(); + if (!logs_.empty()) { + log::Writer* cur_log_writer = logs_.back().writer; + info = PredecessorWALInfo(cur_log_writer->get_log_number(), + cur_log_writer->file()->GetFileSize(), + cur_log_writer->GetLastSeqnoRecorded()); + } + log_write_mutex_.Unlock(); // TODO: Write buffer size passed in should be max of all CF's instead // of mutable_cf_options.write_buffer_size. io_s = CreateWAL(write_options, new_log_number, recycle_log_number, - preallocate_block_size, &new_log); + preallocate_block_size, info, &new_log); if (s.ok()) { s = io_s; } diff --git a/db/db_wal_test.cc b/db/db_wal_test.cc index 9ac09082a07..6d875007e84 100644 --- a/db/db_wal_test.cc +++ b/db/db_wal_test.cc @@ -1838,9 +1838,194 @@ class RecoveryTestHelper { } }; -class DBWALTestWithParams : public DBWALTestBase, - public ::testing::WithParamInterface< - std::tuple> { +TEST_F(DBWALTest, TrackAndVerifyWALsRecycleWAL) { + Options options = CurrentOptions(); + options.avoid_flush_during_shutdown = true; + options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery; + options.recycle_log_file_num = 1; + options.track_and_verify_wals = true; + + DestroyAndReopen(options); + + ASSERT_OK(Put("key_ignore", "wal_to_recycle")); + ASSERT_OK(Put("key_ignore1", "wal_to_recycle")); + ASSERT_OK(Put("key_ignore2", "wal_to_recycle")); + ASSERT_OK(Flush()); + + ASSERT_OK(Put("key_ignore", "wal_to_recycle")); + ASSERT_OK(Put("key_ignore1", "wal_to_recycle")); + ASSERT_OK(Put("key_ignore2", "wal_to_recycle")); + ASSERT_OK(Flush()); + + // Stop background flush to avoid deleting any WAL + options.env->SetBackgroundThreads(1, Env::HIGH); + test::SleepingBackgroundTask sleeping_task; + options.env->Schedule(&test::SleepingBackgroundTask::DoSleepTask, + &sleeping_task, Env::Priority::HIGH); + + // Recycle the first WAL + ASSERT_OK(Put("key1", "old_value")); + // Recycle the second WAL + ASSERT_OK(dbfull()->TEST_SwitchWAL()); + ASSERT_OK(Put("key1", "new_value")); + + // Create a WAL hole on sequence number by truncating the first WAL to 0 byte + VectorWalPtr log_files; + ASSERT_OK(db_->GetSortedWalFiles(log_files)); + ASSERT_EQ(log_files.size(), 2); + std::string log_name = LogFileName(dbname_, log_files.front()->LogNumber()); + Close(); + // Drop `Put("key1", "old_value")` in the first WAL + ASSERT_OK(test::TruncateFile(options.env, log_name, 0 /* new_length */)); + + Status s = DB::Open(options, dbname_, &db_); + + ASSERT_OK(s); + + ASSERT_EQ("wal_to_recycle", Get("key_ignore2")); + ASSERT_EQ("NOT_FOUND", Get("key1")); + + Close(); +} + +class DBWALTrackAndVerifyWALsWithParamsTest + : public DBWALTestBase, + public ::testing::WithParamInterface { + public: + DBWALTrackAndVerifyWALsWithParamsTest() + : DBWALTestBase("/db_wal_track_and_verify_wals_with_params_test") {} +}; + +INSTANTIATE_TEST_CASE_P( + DBWALTrackAndVerifyWALsWithParamsTest, + DBWALTrackAndVerifyWALsWithParamsTest, + ::testing::Values(WALRecoveryMode::kTolerateCorruptedTailRecords, + WALRecoveryMode::kAbsoluteConsistency, + WALRecoveryMode::kPointInTimeRecovery, + WALRecoveryMode::kSkipAnyCorruptedRecords)); + +TEST_P(DBWALTrackAndVerifyWALsWithParamsTest, Basic) { + Options options = CurrentOptions(); + options.avoid_flush_during_shutdown = true; + options.track_and_verify_wals = true; + options.wal_recovery_mode = GetParam(); + + // Stop background flush to avoid deleting any WAL + options.env->SetBackgroundThreads(1, Env::HIGH); + test::SleepingBackgroundTask sleeping_task; + options.env->Schedule(&test::SleepingBackgroundTask::DoSleepTask, + &sleeping_task, Env::Priority::HIGH); + + for (int i = 0; i < 5; i++) { + DestroyAndReopen(options); + + ASSERT_OK(Put("key1", "old_value")); + SequenceNumber last_seqno_recorded_in_fist_wal = + dbfull()->GetLatestSequenceNumber(); + + ASSERT_OK(dbfull()->TEST_SwitchWAL()); + ASSERT_OK(Put("key1", "new_value")); + + VectorWalPtr log_files; + ASSERT_OK(db_->GetSortedWalFiles(log_files)); + ASSERT_EQ(log_files.size(), 2); + uint64_t first_log_number = log_files.front()->LogNumber(); + std::string first_log_name = LogFileName(dbname_, first_log_number); + std::string second_log_name = + LogFileName(dbname_, log_files.back()->LogNumber()); + + if (i == 0) { + // Delete the obsolete WAL and verify it will not be seen as a WAL hole + sleeping_task.WakeUp(); + sleeping_task.WaitUntilDone(); + ASSERT_OK(dbfull()->TEST_WaitForBackgroundWork()); + // Stop background flush to avoid deleting any WAL + sleeping_task.Reset(); + options.env->Schedule(&test::SleepingBackgroundTask::DoSleepTask, + &sleeping_task, Env::Priority::HIGH); + Close(); + } else if (i == 1) { + // Create a WAL hole on WAL number by deleting the first WAL and verify + // the hole will be detected + Close(); + ASSERT_OK(options.env->DeleteFile(first_log_name)); + } else if (i == 2) { + // Create a WAL hole on sequence number by truncating the first WAL and + // verify the hole will be detected + Close(); + ASSERT_OK( + test::TruncateFile(options.env, first_log_name, 0 /* new_length */)); + } else if (i == 3) { + // Create a WAL hole on size difference by truncating the first WAL and + // mocking a correct sequence number to force triggering corruption based + // on size instead of sequence number and verify the hole will be detected + Close(); + ASSERT_OK( + test::TruncateFile(options.env, first_log_name, 0 /* new_length */)); + SyncPoint::GetInstance()->SetCallBack( + "DBImpl::UpdatePredecessorWALInfo", [&](void* arg) { + std::pair* pair = + static_cast*>(arg); + if (pair->first == first_log_number) { + *(pair->second) = last_seqno_recorded_in_fist_wal; + } + }); + SyncPoint::GetInstance()->EnableProcessing(); + } else if (i == 4) { + // Delete all wals and verify opening a DB with no WAL will be detected + Close(); + ASSERT_OK(options.env->DeleteFile(first_log_name)); + ASSERT_OK(options.env->DeleteFile(second_log_name)); + } + + Status s = DB::Open(options, dbname_, &db_); + + if (i == 0) { + ASSERT_OK(s); + ASSERT_EQ("new_value", Get("key1")); + continue; + } else if (i == 3) { + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->DisableProcessing(); + } else if (i == 4) { + ASSERT_TRUE(s.IsCorruption()); + ASSERT_TRUE( + s.ToString().find("Opening an existing DB with no WAL files") != + std::string::npos); + Close(); + continue; + } + + if (options.wal_recovery_mode == WALRecoveryMode::kPointInTimeRecovery) { + ASSERT_OK(s); + ASSERT_EQ("NOT_FOUND", Get("key1")); + } else if (options.wal_recovery_mode == + WALRecoveryMode::kAbsoluteConsistency || + options.wal_recovery_mode == + WALRecoveryMode::kTolerateCorruptedTailRecords) { + ASSERT_TRUE(s.IsCorruption()); + std::string msg; + if (i == 1) { + msg = "Missing WAL"; + } else if (i == 2) { + msg = "Mismatched last sequence number recorded in the WAL"; + } else if (i == 3) { + msg = "Mismatched size of the WAL"; + } + ASSERT_TRUE(s.ToString().find(msg) != std::string::npos); + } else { + ASSERT_OK(s); + ASSERT_EQ("new_value", Get("key1")); + } + + Close(); + } +} + +class DBWALTestWithParams + : public DBWALTestBase, + public ::testing::WithParamInterface< + std::tuple> { public: DBWALTestWithParams() : DBWALTestBase("/db_wal_test_with_params") {} }; @@ -1853,7 +2038,8 @@ INSTANTIATE_TEST_CASE_P( RecoveryTestHelper::kWALFilesCount, 1), ::testing::Values(CompressionType::kNoCompression, - CompressionType::kZSTD))); + CompressionType::kZSTD), + ::testing::Bool())); class DBWALTestWithParamsVaryingRecoveryMode : public DBWALTestBase, @@ -1891,6 +2077,7 @@ TEST_P(DBWALTestWithParams, kTolerateCorruptedTailRecords) { // Fill data for testing Options options = CurrentOptions(); + options.track_and_verify_wals = std::get<4>(GetParam()); const size_t row_count = RecoveryTestHelper::FillData(this, &options); // test checksum failure or parsing RecoveryTestHelper::CorruptWAL(this, options, corrupt_offset * .3, @@ -1914,6 +2101,7 @@ TEST_P(DBWALTestWithParams, kTolerateCorruptedTailRecords) { TEST_P(DBWALTestWithParams, kAbsoluteConsistency) { // Verify clean slate behavior Options options = CurrentOptions(); + options.track_and_verify_wals = std::get<4>(GetParam()); const size_t row_count = RecoveryTestHelper::FillData(this, &options); options.create_if_missing = false; ASSERT_OK(TryReopen(options)); @@ -2164,6 +2352,7 @@ TEST_P(DBWALTestWithParams, kPointInTimeRecovery) { // Fill data for testing Options options = CurrentOptions(); + options.track_and_verify_wals = std::get<4>(GetParam()); options.wal_compression = compression_type; const size_t row_count = RecoveryTestHelper::FillData(this, &options); @@ -2221,6 +2410,7 @@ TEST_P(DBWALTestWithParams, kSkipAnyCorruptedRecords) { // Fill data for testing Options options = CurrentOptions(); + options.track_and_verify_wals = std::get<4>(GetParam()); options.wal_compression = compression_type; const size_t row_count = RecoveryTestHelper::FillData(this, &options); diff --git a/db/dbformat.h b/db/dbformat.h index f3a9b9a1a52..76039fc0d5d 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -1179,4 +1179,64 @@ struct ParsedInternalKeyComparator { const InternalKeyComparator* cmp; }; +class PredecessorWALInfo { + public: + PredecessorWALInfo() + : log_number_(0), + size_bytes_(0), + last_seqno_recorded_(0), + initialized_(false) {} + + explicit PredecessorWALInfo(uint64_t log_number, uint64_t size_bytes, + SequenceNumber last_seqno_recorded) + : log_number_(log_number), + size_bytes_(size_bytes), + last_seqno_recorded_(last_seqno_recorded), + initialized_(true) {} + + uint64_t GetLogNumber() const { + assert(initialized_); + return log_number_; + } + + uint64_t GetSizeBytes() const { + assert(initialized_); + return size_bytes_; + } + + SequenceNumber GetLastSeqnoRecorded() const { + assert(initialized_); + return last_seqno_recorded_; + } + + bool IsInitialized() const { return initialized_; } + + inline void EncodeTo(std::string* dst) const { + assert(dst != nullptr); + assert(initialized_); + PutFixed64(dst, log_number_); + PutFixed64(dst, size_bytes_); + PutFixed64(dst, last_seqno_recorded_); + } + + inline Status DecodeFrom(Slice* src) { + if (!GetFixed64(src, &log_number_)) { + return Status::Corruption("Error decoding log number"); + } + if (!GetFixed64(src, &size_bytes_)) { + return Status::Corruption("Error decoding size bytes"); + } + if (!GetFixed64(src, &last_seqno_recorded_)) { + return Status::Corruption("Error decoding last seqno recorded"); + } + initialized_ = true; + return Status::OK(); + } + + private: + uint64_t log_number_; + uint64_t size_bytes_; + SequenceNumber last_seqno_recorded_; + bool initialized_; +}; } // namespace ROCKSDB_NAMESPACE diff --git a/db/log_format.h b/db/log_format.h index 9b691eeb5d7..b49b2c09e99 100644 --- a/db/log_format.h +++ b/db/log_format.h @@ -38,13 +38,18 @@ enum RecordType : uint8_t { // Compression Type kSetCompressionType = 9, + // For all the values >= 10, the 1 bit indicates whether it's recyclable // User-defined timestamp sizes kUserDefinedTimestampSizeType = 10, kRecyclableUserDefinedTimestampSizeType = 11, + + // For WAL verification + kPredecessorWALInfoType = 130, + kRecyclePredecessorWALInfoType = 131, }; // Unknown type of value with the 8-th bit set will be ignored constexpr uint8_t kRecordTypeSafeIgnoreMask = 1 << 7; -constexpr uint8_t kMaxRecordType = kRecyclableUserDefinedTimestampSizeType; +constexpr uint8_t kMaxRecordType = kRecyclePredecessorWALInfoType; constexpr unsigned int kBlockSize = 32768; diff --git a/db/log_reader.cc b/db/log_reader.cc index cae4fd7739d..3a73e4600c9 100644 --- a/db/log_reader.cc +++ b/db/log_reader.cc @@ -24,7 +24,10 @@ Reader::Reporter::~Reporter() = default; Reader::Reader(std::shared_ptr info_log, std::unique_ptr&& _file, - Reporter* reporter, bool checksum, uint64_t log_num) + Reporter* reporter, bool checksum, uint64_t log_num, + bool track_and_verify_wals, bool stop_replay_for_corruption, + uint64_t min_wal_number_to_keep, + const PredecessorWALInfo& observed_predecessor_wal_info) : info_log_(info_log), file_(std::move(_file)), reporter_(reporter), @@ -37,6 +40,10 @@ Reader::Reader(std::shared_ptr info_log, last_record_offset_(0), end_of_buffer_offset_(0), log_number_(log_num), + track_and_verify_wals_(track_and_verify_wals), + stop_replay_for_corruption_(stop_replay_for_corruption), + min_wal_number_to_keep_(min_wal_number_to_keep), + observed_predecessor_wal_info_(observed_predecessor_wal_info), recycled_(false), first_record_read_(false), compression_type_(kNoCompression), @@ -65,6 +72,9 @@ Reader::~Reader() { // // TODO krad: Evaluate if we need to move to a more strict mode where we // restrict the inconsistency to only the last log +// TODO (hx235): move `wal_recovery_mode` to be a member data like other +// information (e.g, `stop_replay_for_corruption`) to decide whether to +// check for and surface corruption in `ReadRecord()` bool Reader::ReadRecord(Slice* record, std::string* scratch, WALRecoveryMode wal_recovery_mode, uint64_t* record_checksum) { @@ -185,6 +195,23 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch, } break; } + case kPredecessorWALInfoType: + case kRecyclePredecessorWALInfoType: { + prospective_record_offset = physical_record_offset; + scratch->clear(); + last_record_offset_ = prospective_record_offset; + + PredecessorWALInfo recorded_predecessor_wal_info; + Status s = recorded_predecessor_wal_info.DecodeFrom(&fragment); + if (!s.ok()) { + ReportCorruption(fragment.size(), + "could not decode PredecessorWALInfoType record"); + } else { + MaybeVerifyPredecessorWALInfo(wal_recovery_mode, fragment, + recorded_predecessor_wal_info); + } + break; + } case kUserDefinedTimestampSizeType: case kRecyclableUserDefinedTimestampSizeType: { if (in_fragmented_record && !scratch->empty()) { @@ -329,6 +356,57 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch, return false; } +void Reader::MaybeVerifyPredecessorWALInfo( + WALRecoveryMode wal_recovery_mode, Slice fragment, + const PredecessorWALInfo& recorded_predecessor_wal_info) { + if (!track_and_verify_wals_ || + wal_recovery_mode == WALRecoveryMode::kSkipAnyCorruptedRecords || + stop_replay_for_corruption_) { + return; + } + assert(recorded_predecessor_wal_info.IsInitialized()); + uint64_t recorded_predecessor_log_number = + recorded_predecessor_wal_info.GetLogNumber(); + + // This is the first WAL recovered thus with no predecessor WAL info has been + // initialized + if (!observed_predecessor_wal_info_.IsInitialized()) { + if (recorded_predecessor_log_number >= min_wal_number_to_keep_) { + std::string reason = "Missing WAL of log number " + + std::to_string(recorded_predecessor_log_number); + ReportCorruption(fragment.size(), reason.c_str()); + } + } else { + if (observed_predecessor_wal_info_.GetLogNumber() != + recorded_predecessor_log_number) { + std::string reason = "Missing WAL of log number " + + std::to_string(recorded_predecessor_log_number); + ReportCorruption(fragment.size(), reason.c_str()); + } else if (observed_predecessor_wal_info_.GetLastSeqnoRecorded() != + recorded_predecessor_wal_info.GetLastSeqnoRecorded()) { + std::string reason = + "Mismatched last sequence number recorded in the WAL of log number " + + std::to_string(recorded_predecessor_log_number) + ". Recorded " + + std::to_string(recorded_predecessor_wal_info.GetLastSeqnoRecorded()) + + ". Observed " + + std::to_string( + observed_predecessor_wal_info_.GetLastSeqnoRecorded()) + + ". (Last sequence number equal to 0 indicates no WAL records)"; + ReportCorruption(fragment.size(), reason.c_str()); + } else if (observed_predecessor_wal_info_.GetSizeBytes() != + recorded_predecessor_wal_info.GetSizeBytes()) { + std::string reason = + "Mismatched size of the WAL of log number " + + std::to_string(recorded_predecessor_log_number) + ". Recorded " + + std::to_string(recorded_predecessor_wal_info.GetSizeBytes()) + + " bytes. Observed " + + std::to_string(observed_predecessor_wal_info_.GetSizeBytes()) + + " bytes."; + ReportCorruption(fragment.size(), reason.c_str()); + } + } +} + uint64_t Reader::LastRecordOffset() { return last_record_offset_; } uint64_t Reader::LastRecordEnd() { @@ -486,7 +564,8 @@ uint8_t Reader::ReadPhysicalRecord(Slice* result, size_t* drop_size, int header_size = kHeaderSize; const bool is_recyclable_type = ((type >= kRecyclableFullType && type <= kRecyclableLastType) || - type == kRecyclableUserDefinedTimestampSizeType); + type == kRecyclableUserDefinedTimestampSizeType || + type == kRecyclePredecessorWALInfoType); if (is_recyclable_type) { header_size = kRecyclableHeaderSize; if (first_record_read_ && !recycled_) { @@ -551,6 +630,8 @@ uint8_t Reader::ReadPhysicalRecord(Slice* result, size_t* drop_size, buffer_.remove_prefix(header_size + length); if (!uncompress_ || type == kSetCompressionType || + type == kPredecessorWALInfoType || + type == kRecyclePredecessorWALInfoType || type == kUserDefinedTimestampSizeType || type == kRecyclableUserDefinedTimestampSizeType) { *result = Slice(header + header_size, length); @@ -640,7 +721,9 @@ Status Reader::UpdateRecordedTimestampSize( } bool FragmentBufferedReader::ReadRecord(Slice* record, std::string* scratch, - WALRecoveryMode /*unused*/, + WALRecoveryMode wal_recovery_mode + + , uint64_t* /* checksum */) { assert(record != nullptr); assert(scratch != nullptr); @@ -730,7 +813,24 @@ bool FragmentBufferedReader::ReadRecord(Slice* record, std::string* scratch, } break; } + case kPredecessorWALInfoType: + case kRecyclePredecessorWALInfoType: { + fragments_.clear(); + prospective_record_offset = physical_record_offset; + last_record_offset_ = prospective_record_offset; + in_fragmented_record_ = false; + PredecessorWALInfo recorded_predecessor_wal_info; + Status s = recorded_predecessor_wal_info.DecodeFrom(&fragment); + if (!s.ok()) { + ReportCorruption(fragment.size(), + "could not decode PredecessorWALInfoType record"); + } else { + MaybeVerifyPredecessorWALInfo(wal_recovery_mode, fragment, + recorded_predecessor_wal_info); + } + break; + } case kUserDefinedTimestampSizeType: case kRecyclableUserDefinedTimestampSizeType: { if (in_fragmented_record_ && !scratch->empty()) { @@ -871,7 +971,8 @@ bool FragmentBufferedReader::TryReadFragment(Slice* fragment, size_t* drop_size, const uint32_t length = a | (b << 8); int header_size = kHeaderSize; if ((type >= kRecyclableFullType && type <= kRecyclableLastType) || - type == kRecyclableUserDefinedTimestampSizeType) { + type == kRecyclableUserDefinedTimestampSizeType || + type == kRecyclePredecessorWALInfoType) { if (first_record_read_ && !recycled_) { // A recycled log should have started with a recycled record *fragment_type_or_err = kBadRecord; @@ -927,6 +1028,8 @@ bool FragmentBufferedReader::TryReadFragment(Slice* fragment, size_t* drop_size, buffer_.remove_prefix(header_size + length); if (!uncompress_ || type == kSetCompressionType || + type == kPredecessorWALInfoType || + type == kRecyclePredecessorWALInfoType || type == kUserDefinedTimestampSizeType || type == kRecyclableUserDefinedTimestampSizeType) { *fragment = Slice(header + header_size, length); diff --git a/db/log_reader.h b/db/log_reader.h index a39f5b9cbb3..2b6f6cd6622 100644 --- a/db/log_reader.h +++ b/db/log_reader.h @@ -58,9 +58,15 @@ class Reader { // live while this Reader is in use. // // If "checksum" is true, verify checksums if available. + // TODO(hx235): seperate WAL related parameters from general `Reader` + // parameters Reader(std::shared_ptr info_log, std::unique_ptr&& file, Reporter* reporter, - bool checksum, uint64_t log_num); + bool checksum, uint64_t log_num, bool track_and_verify_wals = false, + bool stop_replay_for_corruption = false, + uint64_t min_wal_number_to_keep = std::numeric_limits::max(), + const PredecessorWALInfo& observed_predecessor_wal_info = + PredecessorWALInfo()); // No copying allowed Reader(const Reader&) = delete; void operator=(const Reader&) = delete; @@ -148,6 +154,17 @@ class Reader { // which log number this is uint64_t const log_number_; + // See `Optinos::track_and_verify_wals` + bool track_and_verify_wals_; + // Below variables are used for WAL verification + // TODO(hx235): To revise `stop_replay_for_corruption_` inside `LogReader` + // since we have `observed_predecessor_wal_info_` to verify against the + // `recorded_predecessor_wal_info_` recorded in current WAL. If there is no + // WAL hole, we can revise `stop_replay_for_corruption_` to be false. + bool stop_replay_for_corruption_; + uint64_t min_wal_number_to_keep_; + PredecessorWALInfo observed_predecessor_wal_info_; + // Whether this is a recycled log file bool recycled_; @@ -211,14 +228,25 @@ class Reader { Status UpdateRecordedTimestampSize( const std::vector>& cf_to_ts_sz); + + void MaybeVerifyPredecessorWALInfo( + WALRecoveryMode wal_recovery_mode, Slice fragment, + const PredecessorWALInfo& recorded_predecessor_wal_info); }; class FragmentBufferedReader : public Reader { public: - FragmentBufferedReader(std::shared_ptr info_log, - std::unique_ptr&& _file, - Reporter* reporter, bool checksum, uint64_t log_num) - : Reader(info_log, std::move(_file), reporter, checksum, log_num), + FragmentBufferedReader( + std::shared_ptr info_log, + std::unique_ptr&& _file, Reporter* reporter, + bool checksum, uint64_t log_num, bool verify_and_track_wals = false, + bool stop_replay_for_corruption = false, + uint64_t min_wal_number_to_keep = std::numeric_limits::max(), + const PredecessorWALInfo& observed_predecessor_wal_info = + PredecessorWALInfo()) + : Reader(info_log, std::move(_file), reporter, checksum, log_num, + verify_and_track_wals, stop_replay_for_corruption, + min_wal_number_to_keep, observed_predecessor_wal_info), fragments_(), in_fragmented_record_(false) {} ~FragmentBufferedReader() override {} diff --git a/db/log_test.cc b/db/log_test.cc index 1264c2a594a..2e26ad8f003 100644 --- a/db/log_test.cc +++ b/db/log_test.cc @@ -465,7 +465,7 @@ TEST_P(LogTest, TruncatedTrailingRecordIsNotIgnored) { Write("foo"); ShrinkSize(4); // Drop all payload as well as a header byte ASSERT_EQ("EOF", Read(WALRecoveryMode::kAbsoluteConsistency)); - // Truncated last record is ignored, not treated as an error + // Truncated last record is not ignored, treated as an error ASSERT_GT(DroppedBytes(), 0U); ASSERT_EQ("OK", MatchError("Corruption: truncated header")); } diff --git a/db/log_writer.cc b/db/log_writer.cc index f178d6281b5..ae4cb821184 100644 --- a/db/log_writer.cc +++ b/db/log_writer.cc @@ -22,7 +22,7 @@ namespace ROCKSDB_NAMESPACE::log { Writer::Writer(std::unique_ptr&& dest, uint64_t log_number, bool recycle_log_files, bool manual_flush, - CompressionType compression_type) + CompressionType compression_type, bool track_and_verify_wals) : dest_(std::move(dest)), block_offset_(0), log_number_(log_number), @@ -31,7 +31,9 @@ Writer::Writer(std::unique_ptr&& dest, uint64_t log_number, header_size_(recycle_log_files ? kRecyclableHeaderSize : kHeaderSize), manual_flush_(manual_flush), compression_type_(compression_type), - compress_(nullptr) { + compress_(nullptr), + track_and_verify_wals_(track_and_verify_wals), + last_seqno_recorded_(0) { for (uint8_t i = 0; i <= kMaxRecordType; i++) { char t = static_cast(i); type_crc_[i] = crc32c::Value(&t, 1); @@ -52,19 +54,12 @@ Writer::~Writer() { } IOStatus Writer::WriteBuffer(const WriteOptions& write_options) { - if (dest_->seen_error()) { -#ifndef NDEBUG - if (dest_->seen_injected_error()) { - std::stringstream msg; - msg << "Seen " << FaultInjectionTestFS::kInjected - << " error. Skip writing buffer."; - return IOStatus::IOError(msg.str()); - } -#endif // NDEBUG - return IOStatus::IOError("Seen error. Skip writing buffer."); + IOStatus s = MaybeHandleSeenFileWriterError(); + if (!s.ok()) { + return s; } IOOptions opts; - IOStatus s = WritableFileWriter::PrepareIOOptions(write_options, opts); + s = WritableFileWriter::PrepareIOOptions(write_options, opts); if (!s.ok()) { return s; } @@ -92,17 +87,10 @@ bool Writer::PublishIfClosed() { } IOStatus Writer::AddRecord(const WriteOptions& write_options, - const Slice& slice) { - if (dest_->seen_error()) { -#ifndef NDEBUG - if (dest_->seen_injected_error()) { - std::stringstream msg; - msg << "Seen " << FaultInjectionTestFS::kInjected - << " error. Skip writing buffer."; - return IOStatus::IOError(msg.str()); - } -#endif // NDEBUG - return IOStatus::IOError("Seen error. Skip writing buffer."); + const Slice& slice, const SequenceNumber& seqno) { + IOStatus s = MaybeHandleSeenFileWriterError(); + if (!s.ok()) { + return s; } const char* ptr = slice.data(); size_t left = slice.size(); @@ -118,7 +106,6 @@ IOStatus Writer::AddRecord(const WriteOptions& write_options, compress_start = true; } - IOStatus s; IOOptions opts; s = WritableFileWriter::PrepareIOOptions(write_options, opts); if (s.ok()) { @@ -196,6 +183,10 @@ IOStatus Writer::AddRecord(const WriteOptions& write_options, } } + if (s.ok()) { + last_seqno_recorded_ = std::max(last_seqno_recorded_, seqno); + } + return s; } @@ -208,23 +199,16 @@ IOStatus Writer::AddCompressionTypeRecord(const WriteOptions& write_options) { return IOStatus::OK(); } - if (dest_->seen_error()) { -#ifndef NDEBUG - if (dest_->seen_injected_error()) { - std::stringstream msg; - msg << "Seen " << FaultInjectionTestFS::kInjected - << " error. Skip writing buffer."; - return IOStatus::IOError(msg.str()); - } -#endif // NDEBUG - return IOStatus::IOError("Seen error. Skip writing buffer."); + IOStatus s = MaybeHandleSeenFileWriterError(); + if (!s.ok()) { + return s; } CompressionTypeRecord record(compression_type_); std::string encode; record.EncodeTo(&encode); - IOStatus s = EmitPhysicalRecord(write_options, kSetCompressionType, - encode.data(), encode.size()); + s = EmitPhysicalRecord(write_options, kSetCompressionType, encode.data(), + encode.size()); if (s.ok()) { if (!manual_flush_) { IOOptions io_opts; @@ -251,6 +235,44 @@ IOStatus Writer::AddCompressionTypeRecord(const WriteOptions& write_options) { return s; } +IOStatus Writer::MaybeAddPredecessorWALInfo(const WriteOptions& write_options, + const PredecessorWALInfo& info) { + IOStatus s = MaybeHandleSeenFileWriterError(); + + if (!s.ok()) { + return s; + } + + if (!track_and_verify_wals_ || !info.IsInitialized()) { + return IOStatus::OK(); + } + + std::string encode; + info.EncodeTo(&encode); + + s = MaybeSwitchToNewBlock(write_options, encode); + if (!s.ok()) { + return s; + } + + RecordType type = recycle_log_files_ ? kRecyclePredecessorWALInfoType + : kPredecessorWALInfoType; + s = EmitPhysicalRecord(write_options, type, encode.data(), encode.size()); + + if (!s.ok()) { + return s; + } + + if (!manual_flush_) { + IOOptions io_opts; + s = WritableFileWriter::PrepareIOOptions(write_options, io_opts); + if (s.ok()) { + s = dest_->Flush(io_opts); + } + } + return s; +} + IOStatus Writer::MaybeAddUserDefinedTimestampSizeRecord( const WriteOptions& write_options, const UnorderedMap& cf_to_ts_sz) { @@ -275,22 +297,9 @@ IOStatus Writer::MaybeAddUserDefinedTimestampSizeRecord( RecordType type = recycle_log_files_ ? kRecyclableUserDefinedTimestampSizeType : kUserDefinedTimestampSizeType; - // If there's not enough space for this record, switch to a new block. - const int64_t leftover = kBlockSize - block_offset_; - if (leftover < header_size_ + (int)encoded.size()) { - IOOptions opts; - IOStatus s = WritableFileWriter::PrepareIOOptions(write_options, opts); - if (!s.ok()) { - return s; - } - - std::vector trailer(leftover, '\x00'); - s = dest_->Append(opts, Slice(trailer.data(), trailer.size())); - if (!s.ok()) { - return s; - } - - block_offset_ = 0; + IOStatus s = MaybeSwitchToNewBlock(write_options, encoded); + if (!s.ok()) { + return s; } return EmitPhysicalRecord(write_options, type, encoded.data(), @@ -313,7 +322,7 @@ IOStatus Writer::EmitPhysicalRecord(const WriteOptions& write_options, uint32_t crc = type_crc_[t]; if (t < kRecyclableFullType || t == kSetCompressionType || - t == kUserDefinedTimestampSizeType) { + t == kPredecessorWALInfoType || t == kUserDefinedTimestampSizeType) { // Legacy record format assert(block_offset_ + kHeaderSize + n <= kBlockSize); header_size = kHeaderSize; @@ -352,4 +361,42 @@ IOStatus Writer::EmitPhysicalRecord(const WriteOptions& write_options, return s; } +IOStatus Writer::MaybeHandleSeenFileWriterError() { + if (dest_->seen_error()) { +#ifndef NDEBUG + if (dest_->seen_injected_error()) { + std::stringstream msg; + msg << "Seen " << FaultInjectionTestFS::kInjected + << " error. Skip writing buffer."; + return IOStatus::IOError(msg.str()); + } +#endif // NDEBUG + return IOStatus::IOError("Seen error. Skip writing buffer."); + } + return IOStatus::OK(); +} + +IOStatus Writer::MaybeSwitchToNewBlock(const WriteOptions& write_options, + const std::string& content_to_write) { + IOStatus s; + const int64_t leftover = kBlockSize - block_offset_; + // If there's not enough space for this record, switch to a new block. + if (leftover < header_size_ + (int)content_to_write.size()) { + IOOptions opts; + s = WritableFileWriter::PrepareIOOptions(write_options, opts); + if (!s.ok()) { + return s; + } + + std::vector trailer(leftover, '\x00'); + s = dest_->Append(opts, Slice(trailer.data(), trailer.size())); + if (!s.ok()) { + return s; + } + + block_offset_ = 0; + } + return s; +} + } // namespace ROCKSDB_NAMESPACE::log diff --git a/db/log_writer.h b/db/log_writer.h index 7cae52dd51c..f7aef75197d 100644 --- a/db/log_writer.h +++ b/db/log_writer.h @@ -13,6 +13,7 @@ #include #include +#include "db/dbformat.h" #include "db/log_format.h" #include "rocksdb/compression_type.h" #include "rocksdb/env.h" @@ -76,18 +77,24 @@ class Writer { // Create a writer that will append data to "*dest". // "*dest" must be initially empty. // "*dest" must remain live while this Writer is in use. + // TODO(hx235): seperate WAL related parameters from general `Reader` + // parameters explicit Writer(std::unique_ptr&& dest, uint64_t log_number, bool recycle_log_files, bool manual_flush = false, - CompressionType compressionType = kNoCompression); + CompressionType compressionType = kNoCompression, + bool track_and_verify_wals = false); // No copying allowed Writer(const Writer&) = delete; void operator=(const Writer&) = delete; ~Writer(); - IOStatus AddRecord(const WriteOptions& write_options, const Slice& slice); + IOStatus AddRecord(const WriteOptions& write_options, const Slice& slice, + const SequenceNumber& seqno = 0); IOStatus AddCompressionTypeRecord(const WriteOptions& write_options); + IOStatus MaybeAddPredecessorWALInfo(const WriteOptions& write_options, + const PredecessorWALInfo& info); // If there are column families in `cf_to_ts_sz` not included in // `recorded_cf_to_ts_sz_` and its user-defined timestamp size is non-zero, @@ -116,6 +123,8 @@ class Writer { size_t TEST_block_offset() const { return block_offset_; } + SequenceNumber GetLastSeqnoRecorded() const { return last_seqno_recorded_; }; + private: std::unique_ptr dest_; size_t block_offset_; // Current offset in block @@ -131,6 +140,11 @@ class Writer { IOStatus EmitPhysicalRecord(const WriteOptions& write_options, RecordType type, const char* ptr, size_t length); + IOStatus MaybeHandleSeenFileWriterError(); + + IOStatus MaybeSwitchToNewBlock(const WriteOptions& write_options, + const std::string& content_to_write); + // If true, it does not flush after each write. Instead it relies on the upper // layer to manually does the flush by calling ::WriteBuffer() bool manual_flush_; @@ -145,6 +159,11 @@ class Writer { // Since the user-defined timestamp size cannot be changed while the DB is // running, existing entry in this map cannot be updated. UnorderedMap recorded_cf_to_ts_sz_; + + // See `Options::track_and_verify_wals` + bool track_and_verify_wals_; + + SequenceNumber last_seqno_recorded_; }; } // namespace log diff --git a/db_stress_tool/db_stress_common.h b/db_stress_tool/db_stress_common.h index 62c79ac0e5e..0c1ef4e1136 100644 --- a/db_stress_tool/db_stress_common.h +++ b/db_stress_tool/db_stress_common.h @@ -421,6 +421,7 @@ DECLARE_int32(test_ingest_standalone_range_deletion_one_in); DECLARE_bool(allow_unprepared_value); DECLARE_string(file_temperature_age_thresholds); DECLARE_uint32(commit_bypass_memtable_one_in); +DECLARE_bool(track_and_verify_wals); constexpr long KB = 1024; constexpr int kRandomValueMaxFactor = 3; diff --git a/db_stress_tool/db_stress_gflags.cc b/db_stress_tool/db_stress_gflags.cc index c49846b9f19..1859e6940fb 100644 --- a/db_stress_tool/db_stress_gflags.cc +++ b/db_stress_tool/db_stress_gflags.cc @@ -847,6 +847,10 @@ DEFINE_bool(allow_unprepared_value, ROCKSDB_NAMESPACE::ReadOptions().allow_unprepared_value, "Allow lazy loading of values for range scans"); +DEFINE_bool(track_and_verify_wals, + ROCKSDB_NAMESPACE::Options().track_and_verify_wals, + "See Options::track_and_verify_wals"); + static bool ValidateInt32Percent(const char* flagname, int32_t value) { if (value < 0 || value > 100) { fprintf(stderr, "Invalid value for --%s: %d, 0<= pct <=100 \n", flagname, diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc index 52123749a71..f5ff66cff20 100644 --- a/db_stress_tool/db_stress_test_base.cc +++ b/db_stress_tool/db_stress_test_base.cc @@ -4127,6 +4127,7 @@ void InitializeOptionsFromFlags( options.level_compaction_dynamic_level_bytes = FLAGS_level_compaction_dynamic_level_bytes; options.track_and_verify_wals_in_manifest = true; + options.track_and_verify_wals = FLAGS_track_and_verify_wals; options.verify_sst_unique_id_in_manifest = FLAGS_verify_sst_unique_id_in_manifest; options.memtable_protection_bytes_per_key = diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index b27f53b4a84..f34e34e59cc 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -632,6 +632,26 @@ struct DBOptions { // Default: false bool track_and_verify_wals_in_manifest = false; + // EXPERIMENTAL + // + // If true, each new WAL will record various information about its predecessor + // WAL for verification on the predecessor WAL during WAL recovery. + // + // It verifies the following: + // 1. There exists at least some WAL in the DB + // - It's not compatible with `RepairDB()` since this option imposes a + // stricter requirement on WAL than the DB went through `RepariDB()` can + // normally meet + // 2. There exists no WAL hole where new WAL data presents while some old WAL + // data not yet obsolete is missing. The DB manifest indicates which WALs are + // obsolete. + // + // This is intended to be a better replacement to + // `track_and_verify_wals_in_manifest`. + // + // Default: false + bool track_and_verify_wals = false; + // If true, verifies the SST unique id between MANIFEST and actual file // each time an SST file is opened. This check ensures an SST file is not // overwritten or misplaced. A corruption error will be reported if mismatch diff --git a/options/db_options.cc b/options/db_options.cc index 967bb9b964a..29e7632473a 100644 --- a/options/db_options.cc +++ b/options/db_options.cc @@ -230,6 +230,10 @@ static std::unordered_map track_and_verify_wals_in_manifest), OptionType::kBoolean, OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, + {"track_and_verify_wals", + {offsetof(struct ImmutableDBOptions, track_and_verify_wals), + OptionType::kBoolean, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, {"verify_sst_unique_id_in_manifest", {offsetof(struct ImmutableDBOptions, verify_sst_unique_id_in_manifest), OptionType::kBoolean, OptionVerificationType::kNormal, @@ -716,6 +720,7 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options) compaction_verify_record_count(options.compaction_verify_record_count), track_and_verify_wals_in_manifest( options.track_and_verify_wals_in_manifest), + track_and_verify_wals(options.track_and_verify_wals), verify_sst_unique_id_in_manifest( options.verify_sst_unique_id_in_manifest), env(options.env), @@ -820,6 +825,10 @@ void ImmutableDBOptions::Dump(Logger* log) const { " " "Options.track_and_verify_wals_in_manifest: %d", track_and_verify_wals_in_manifest); + ROCKS_LOG_HEADER(log, + " " + "Options.track_and_verify_wals: %d", + track_and_verify_wals); ROCKS_LOG_HEADER(log, " Options.verify_sst_unique_id_in_manifest: %d", verify_sst_unique_id_in_manifest); ROCKS_LOG_HEADER(log, " Options.env: %p", diff --git a/options/db_options.h b/options/db_options.h index ac76ea40d8e..25318ec1a61 100644 --- a/options/db_options.h +++ b/options/db_options.h @@ -27,6 +27,7 @@ struct ImmutableDBOptions { bool flush_verify_memtable_count; bool compaction_verify_record_count; bool track_and_verify_wals_in_manifest; + bool track_and_verify_wals; bool verify_sst_unique_id_in_manifest; Env* env; std::shared_ptr rate_limiter; diff --git a/options/options_helper.cc b/options/options_helper.cc index f05d90f7c1c..0e9b0199a44 100644 --- a/options/options_helper.cc +++ b/options/options_helper.cc @@ -71,6 +71,7 @@ void BuildDBOptions(const ImmutableDBOptions& immutable_db_options, immutable_db_options.compaction_verify_record_count; options.track_and_verify_wals_in_manifest = immutable_db_options.track_and_verify_wals_in_manifest; + options.track_and_verify_wals = immutable_db_options.track_and_verify_wals; options.verify_sst_unique_id_in_manifest = immutable_db_options.verify_sst_unique_id_in_manifest; options.env = immutable_db_options.env; diff --git a/options/options_settable_test.cc b/options/options_settable_test.cc index 6036d051312..d5d92838308 100644 --- a/options/options_settable_test.cc +++ b/options/options_settable_test.cc @@ -406,6 +406,7 @@ TEST_F(OptionsSettableTest, DBOptionsAllFieldsSettable) { "flush_verify_memtable_count=true;" "compaction_verify_record_count=true;" "track_and_verify_wals_in_manifest=true;" + "track_and_verify_wals=true;" "verify_sst_unique_id_in_manifest=true;" "is_fd_close_on_exec=false;" "bytes_per_sync=4295013613;" diff --git a/options/options_test.cc b/options/options_test.cc index 2f9b12d3a1d..ef5e62c3fed 100644 --- a/options/options_test.cc +++ b/options/options_test.cc @@ -146,6 +146,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) { {"error_if_exists", "false"}, {"paranoid_checks", "true"}, {"track_and_verify_wals_in_manifest", "true"}, + {"track_and_verify_wals", "true"}, {"verify_sst_unique_id_in_manifest", "true"}, {"max_open_files", "32"}, {"max_total_wal_size", "33"}, @@ -329,6 +330,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) { ASSERT_EQ(new_db_opt.error_if_exists, false); ASSERT_EQ(new_db_opt.paranoid_checks, true); ASSERT_EQ(new_db_opt.track_and_verify_wals_in_manifest, true); + ASSERT_EQ(new_db_opt.track_and_verify_wals, true); ASSERT_EQ(new_db_opt.verify_sst_unique_id_in_manifest, true); ASSERT_EQ(new_db_opt.max_open_files, 32); ASSERT_EQ(new_db_opt.max_total_wal_size, static_cast(33)); @@ -901,6 +903,7 @@ TEST_F(OptionsTest, OldInterfaceTest) { {"error_if_exists", "false"}, {"paranoid_checks", "true"}, {"track_and_verify_wals_in_manifest", "true"}, + {"track_and_verify_wals", "true"}, {"verify_sst_unique_id_in_manifest", "true"}, {"max_open_files", "32"}, {"daily_offpeak_time_utc", "06:30-23:30"}, @@ -916,6 +919,7 @@ TEST_F(OptionsTest, OldInterfaceTest) { ASSERT_EQ(new_db_opt.error_if_exists, false); ASSERT_EQ(new_db_opt.paranoid_checks, true); ASSERT_EQ(new_db_opt.track_and_verify_wals_in_manifest, true); + ASSERT_EQ(new_db_opt.track_and_verify_wals, true); ASSERT_EQ(new_db_opt.verify_sst_unique_id_in_manifest, true); ASSERT_EQ(new_db_opt.max_open_files, 32); db_options_map["unknown_option"] = "1"; @@ -2450,6 +2454,7 @@ TEST_F(OptionsOldApiTest, GetOptionsFromMapTest) { {"error_if_exists", "false"}, {"paranoid_checks", "true"}, {"track_and_verify_wals_in_manifest", "true"}, + {"track_and_verify_wals", "true"}, {"verify_sst_unique_id_in_manifest", "true"}, {"max_open_files", "32"}, {"max_total_wal_size", "33"}, @@ -2638,6 +2643,7 @@ TEST_F(OptionsOldApiTest, GetOptionsFromMapTest) { ASSERT_EQ(new_db_opt.error_if_exists, false); ASSERT_EQ(new_db_opt.paranoid_checks, true); ASSERT_EQ(new_db_opt.track_and_verify_wals_in_manifest, true); + ASSERT_EQ(new_db_opt.track_and_verify_wals, true); ASSERT_EQ(new_db_opt.max_open_files, 32); ASSERT_EQ(new_db_opt.max_total_wal_size, static_cast(33)); ASSERT_EQ(new_db_opt.use_fsync, true); diff --git a/test_util/testutil.cc b/test_util/testutil.cc index 49ab2ebfd7d..35884a7b378 100644 --- a/test_util/testutil.cc +++ b/test_util/testutil.cc @@ -308,6 +308,7 @@ void RandomInitDBOptions(DBOptions* db_opt, Random* rnd) { db_opt->is_fd_close_on_exec = rnd->Uniform(2); db_opt->paranoid_checks = rnd->Uniform(2); db_opt->track_and_verify_wals_in_manifest = rnd->Uniform(2); + db_opt->track_and_verify_wals = rnd->Uniform(2); db_opt->verify_sst_unique_id_in_manifest = rnd->Uniform(2); db_opt->skip_stats_update_on_db_open = rnd->Uniform(2); db_opt->skip_checking_sst_file_sizes_on_db_open = rnd->Uniform(2); diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index f643552608f..66aacd23098 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -1808,6 +1808,8 @@ DEFINE_bool(build_info, false, DEFINE_bool(track_and_verify_wals_in_manifest, false, "If true, enable WAL tracking in the MANIFEST"); +DEFINE_bool(track_and_verify_wals, false, "See Options.track_and_verify_wals"); + namespace ROCKSDB_NAMESPACE { namespace { static Status CreateMemTableRepFactory( @@ -4721,6 +4723,7 @@ class Benchmark { options.allow_data_in_errors = FLAGS_allow_data_in_errors; options.track_and_verify_wals_in_manifest = FLAGS_track_and_verify_wals_in_manifest; + options.track_and_verify_wals = FLAGS_track_and_verify_wals; // Integrated BlobDB options.enable_blob_files = FLAGS_enable_blob_files; diff --git a/tools/db_crashtest.py b/tools/db_crashtest.py index f9943a2c764..59912dbe9da 100644 --- a/tools/db_crashtest.py +++ b/tools/db_crashtest.py @@ -343,6 +343,7 @@ "universal_max_read_amp": lambda: random.choice([-1] * 3 + [0, 4, 10]), "paranoid_memory_checks": lambda: random.choice([0] * 7 + [1]), "allow_unprepared_value": lambda: random.choice([0, 1]), + "track_and_verify_wals": lambda: random.choice([0, 1]), } _TEST_DIR_ENV_VAR = "TEST_TMPDIR" # If TEST_TMPDIR_EXPECTED is not specified, default value will be TEST_TMPDIR diff --git a/unreleased_history/new_features/track_and_verify_wals_api.md b/unreleased_history/new_features/track_and_verify_wals_api.md new file mode 100644 index 00000000000..b0889d0f9bb --- /dev/null +++ b/unreleased_history/new_features/track_and_verify_wals_api.md @@ -0,0 +1 @@ +Provide a new option `track_and_verify_wals` to track and verify various information about WAL during WAL recovery. This is intended to be a better replacement to `track_and_verify_wals_in_manifest`.