Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable reading un-synced data in db stress test #12752

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 1 addition & 17 deletions db_stress_tool/db_stress_test_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -997,12 +997,7 @@ void StressTest::OperateDb(ThreadState* thread) {
" to %" PRIu64 "\n",
old_wal->LogNumber(), new_wal->LogNumber());
}
// FIXME: FaultInjectionTestFS does not report file sizes that
// reflect what has been flushed. Either that needs to be fixed
// or GetSortedWals/GetLiveWalFile need to stop relying on
// asking the FS for sizes.
if (!fault_fs_guard &&
old_wal->SizeFileBytes() != new_wal->SizeFileBytes()) {
if (old_wal->SizeFileBytes() != new_wal->SizeFileBytes()) {
fprintf(stderr,
"Failed: WAL %" PRIu64
" size changed during LockWAL(): %" PRIu64
Expand Down Expand Up @@ -1953,17 +1948,6 @@ Status StressTest::TestBackupRestore(
}
}

// FIXME: this is only needed as long as db_stress uses
// SetReadUnsyncedData(false), because it will only be able to
// copy the synced portion of the WAL. For correctness validation, that
// needs to include updates to the locked key.
if (s.ok()) {
s = db_->SyncWAL();
if (!s.ok()) {
from = "SyncWAL";
}
}

if (s.ok()) {
if (backup_opts.schema_version >= 2 && thread->rand.OneIn(2)) {
TEST_BackupMetaSchemaOptions test_opts;
Expand Down
4 changes: 0 additions & 4 deletions db_stress_tool/db_stress_tool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,6 @@ int db_stress_tool(int argc, char** argv) {
// This will be overwritten in StressTest::Open() for open fault injection
// and in RunStressTestImpl() for proper write fault injection setup.
fault_fs_guard->SetFilesystemDirectWritable(true);
// FIXME: For some reason(s), db_stress currently relies on the WRONG
// semantic of reading only synced data from files currently open for
// write.
fault_fs_guard->SetReadUnsyncedData(false);
fault_env_guard =
std::make_shared<CompositeEnvWrapper>(raw_env, fault_fs_guard);
raw_env = fault_env_guard.get();
Expand Down
72 changes: 58 additions & 14 deletions utilities/fault_injection_fs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ TestFSWritableFile::TestFSWritableFile(const std::string& fname,
writable_file_opened_(true),
fs_(fs) {
assert(target_ != nullptr);
state_.pos_ = 0;
state_.pos_at_last_append_ = 0;
}

TestFSWritableFile::~TestFSWritableFile() {
Expand All @@ -166,14 +166,19 @@ IOStatus TestFSWritableFile::Append(const Slice& data, const IOOptions& options,
if (!fs_->IsFilesystemActive()) {
return fs_->GetError();
}
IOStatus io_s;
if (target_->use_direct_io()) {
target_->Append(data, options, dbg).PermitUncheckedError();
// TODO(hx235): buffer data for direct IO write to simulate data loss like
// non-direct IO write
io_s = target_->Append(data, options, dbg);
} else {
state_.buffer_.append(data.data(), data.size());
state_.pos_ += data.size();
}
if (io_s.ok()) {
state_.pos_at_last_append_ += data.size();
fs_->WritableFileAppended(state_);
io_s = fs_->InjectWriteError(state_.filename_);
}
IOStatus io_s = fs_->InjectWriteError(state_.filename_);
return io_s;
}

Expand Down Expand Up @@ -202,14 +207,46 @@ IOStatus TestFSWritableFile::Append(
"current data checksum: " + Slice(checksum).ToString(true);
return IOStatus::Corruption(msg);
}
IOStatus io_s;
if (target_->use_direct_io()) {
target_->Append(data, options, dbg).PermitUncheckedError();
// TODO(hx235): buffer data for direct IO write to simulate data loss like
// non-direct IO write
io_s = target_->Append(data, options, dbg);
} else {
state_.buffer_.append(data.data(), data.size());
state_.pos_ += data.size();
}
if (io_s.ok()) {
state_.pos_at_last_append_ += data.size();
fs_->WritableFileAppended(state_);
io_s = fs_->InjectWriteError(state_.filename_);
}
return io_s;
}

IOStatus TestFSWritableFile::Truncate(uint64_t size, const IOOptions& options,
IODebugContext* dbg) {
MutexLock l(&mutex_);
// TODO(hx235): inject error
IOStatus io_s = target_->Truncate(size, options, dbg);
if (io_s.ok()) {
state_.pos_at_last_append_ = size;
}
return io_s;
}

IOStatus TestFSWritableFile::PositionedAppend(const Slice& data,
uint64_t offset,
const IOOptions& options,
IODebugContext* dbg) {
MutexLock l(&mutex_);
// TODO(hx235): buffer data for direct IO write to simulate data loss like
// non-direct IO write
// TODO(hx235): inject error
IOStatus io_s = target_->PositionedAppend(data, offset, options, dbg);
if (io_s.ok()) {
state_.pos_at_last_append_ = offset + data.size();
fs_->WritableFileAppended(state_);
}
IOStatus io_s = fs_->InjectWriteError(state_.filename_);
return io_s;
}

Expand All @@ -236,8 +273,14 @@ IOStatus TestFSWritableFile::PositionedAppend(
"current data checksum: " + Slice(checksum).ToString(true);
return IOStatus::Corruption(msg);
}
target_->PositionedAppend(data, offset, options, dbg);
IOStatus io_s = fs_->InjectWriteError(state_.filename_);
// TODO(hx235): buffer data for direct IO write to simulate data loss like
// non-direct IO write
IOStatus io_s = target_->PositionedAppend(data, offset, options, dbg);
if (io_s.ok()) {
state_.pos_at_last_append_ = offset + data.size();
fs_->WritableFileAppended(state_);
io_s = fs_->InjectWriteError(state_.filename_);
}
return io_s;
}

Expand Down Expand Up @@ -292,7 +335,7 @@ IOStatus TestFSWritableFile::Sync(const IOOptions& options,
state_.buffer_.resize(0);
// Ignore sync errors
target_->Sync(options, dbg).PermitUncheckedError();
state_.pos_at_last_sync_ = state_.pos_;
state_.pos_at_last_sync_ = state_.pos_at_last_append_;
fs_->WritableFileSynced(state_);
return io_s;
}
Expand Down Expand Up @@ -493,10 +536,11 @@ void FaultInjectionTestFS::AddUnsyncedToRead(const std::string& fname,
auto it = db_file_state_.find(fname);
if (it != db_file_state_.end()) {
auto& st = it->second;
if (st.pos_ > static_cast<ssize_t>(pos_after)) {
if (st.pos_at_last_append_ > static_cast<ssize_t>(pos_after)) {
size_t remaining_requested = n - result->size();
size_t to_copy = std::min(remaining_requested,
static_cast<size_t>(st.pos_) - pos_after);
size_t to_copy =
std::min(remaining_requested,
static_cast<size_t>(st.pos_at_last_append_) - pos_after);
size_t buffer_offset = pos_after - static_cast<size_t>(std::max(
st.pos_at_last_sync_, ssize_t{0}));
// Data might have been dropped from buffer
Expand Down Expand Up @@ -805,7 +849,7 @@ IOStatus FaultInjectionTestFS::GetFileSize(const std::string& f,
MutexLock l(&mutex_);
auto it = db_file_state_.find(f);
if (it != db_file_state_.end()) {
*file_size = it->second.pos_;
*file_size = it->second.pos_at_last_append_;
}
}
return io_s;
Expand Down
18 changes: 8 additions & 10 deletions utilities/fault_injection_fs.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,18 @@ class FaultInjectionTestFS;

struct FSFileState {
std::string filename_;
ssize_t pos_;
ssize_t pos_at_last_append_;
ssize_t pos_at_last_sync_;
std::string buffer_;

explicit FSFileState(const std::string& filename)
: filename_(filename), pos_(-1), pos_at_last_sync_(-1) {}
: filename_(filename), pos_at_last_append_(-1), pos_at_last_sync_(-1) {}

FSFileState() : pos_(-1), pos_at_last_sync_(-1) {}
FSFileState() : pos_at_last_append_(-1), pos_at_last_sync_(-1) {}

bool IsFullySynced() const { return pos_ <= 0 || pos_ == pos_at_last_sync_; }
bool IsFullySynced() const {
return pos_at_last_append_ <= 0 || pos_at_last_append_ == pos_at_last_sync_;
}

IOStatus DropUnsyncedData();

Expand All @@ -65,9 +67,7 @@ class TestFSWritableFile : public FSWritableFile {
const DataVerificationInfo& verification_info,
IODebugContext* dbg) override;
IOStatus Truncate(uint64_t size, const IOOptions& options,
IODebugContext* dbg) override {
return target_->Truncate(size, options, dbg);
}
IODebugContext* dbg) override;
IOStatus Close(const IOOptions& options, IODebugContext* dbg) override;
IOStatus Flush(const IOOptions&, IODebugContext*) override;
IOStatus Sync(const IOOptions& options, IODebugContext* dbg) override;
Expand All @@ -76,9 +76,7 @@ class TestFSWritableFile : public FSWritableFile {
bool IsSyncThreadSafe() const override { return true; }
IOStatus PositionedAppend(const Slice& data, uint64_t offset,
const IOOptions& options,
IODebugContext* dbg) override {
return target_->PositionedAppend(data, offset, options, dbg);
}
IODebugContext* dbg) override;
IOStatus PositionedAppend(const Slice& data, uint64_t offset,
const IOOptions& options,
const DataVerificationInfo& verification_info,
Expand Down
Loading