Skip to content

Commit

Permalink
Test with copying trace file. Still hit failures
Browse files Browse the repository at this point in the history
  • Loading branch information
archang19 committed Jan 7, 2025
1 parent e9ff3cf commit 08f62cd
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 137 deletions.
31 changes: 19 additions & 12 deletions db_stress_tool/expected_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -728,19 +728,21 @@ Status FileExpectedStateManager::SetSecondaryExpectedState(DB* db) {
std::to_string(saved_seqno_) + kTraceFilenameSuffix;
std::string trace_file_path = GetPathForFilename(trace_filename);
std::cout << "trace_file_path = " << trace_file_path << std::endl;
Status exists_status = Env::Default()->FileExists(trace_file_path);
if (!exists_status.ok()) {
if (exists_status.IsNotFound()) {
std::cout << "Cannot find trace file" << std::endl;
} else {
std::cout << "Encountered error checking for trace file existence"
<< std::endl;
}
return exists_status;

// Try copying trace file first?
std::string trace_file_temp_path = GetTempPathForFilename(trace_filename);
Status s =
CopyFile(FileSystem::Default(), trace_file_path, Temperature::kUnknown,
trace_file_temp_path, Temperature::kUnknown, 0 /* size */,
false /* use_fsync */, nullptr /* io_tracer */);
if (!s.ok()) {
std::cout << "error copying over trace file" << std::endl;
return s;
}

std::unique_ptr<TraceReader> trace_reader;
Status s = NewFileTraceReader(Env::Default(), EnvOptions(), trace_file_path,
&trace_reader);
s = NewFileTraceReader(Env::Default(), EnvOptions(), trace_file_temp_path,
&trace_reader);
if (!s.ok()) {
std::cout << "Could not create file trace reader" << std::endl;
return s;
Expand Down Expand Up @@ -918,7 +920,8 @@ Status FileExpectedStateManager::ReplayTrace(
if (s.ok()) {
s = replayer->Prepare();
}
for (; s.ok();) {
size_t replay_count = 0;
for (; s.ok() && !handler->IsDone();) {
std::unique_ptr<TraceRecord> record;
s = replayer->Next(&record);
if (!s.ok()) {
Expand All @@ -936,9 +939,13 @@ Status FileExpectedStateManager::ReplayTrace(
}
break;
}
replay_count++;
std::unique_ptr<TraceRecordResult> res;
s = record->Accept(handler.get(), &res);
}

std::cout << "ReplayTrace: Replayed " << replay_count << " records"
<< std::endl;
return s;
}

Expand Down
205 changes: 80 additions & 125 deletions db_stress_tool/no_batched_ops_stress.cc
Original file line number Diff line number Diff line change
Expand Up @@ -382,18 +382,8 @@ class NonBatchedOpsStressTest : public StressTest {

uint64_t start_secondary_scan = clock_->NowMicros();

uint64_t prefix_to_use =
(FLAGS_prefix_size < 0) ? 1 : static_cast<size_t>(FLAGS_prefix_size);

std::cout << "Creating cmp_db_ iterator at sequence number "
std::cout << "Checking again on cmp_db_ sequence number "
<< cmp_db_->GetLatestSequenceNumber() << std::endl;
std::unique_ptr<Iterator> iter(
cmp_db_->NewIterator(read_opts, column_families_[0]));

std::string seek_key = Key(0);
iter->Seek(seek_key);

Slice prefix(seek_key.data(), prefix_to_use);

uint64_t num_keys_with_data = 0;

Expand All @@ -403,45 +393,8 @@ class NonBatchedOpsStressTest : public StressTest {
}

const std::string key = Key(i);
const Slice k(key);
const Slice pfx(key.data(), prefix_to_use);

// Reseek when the prefix changes
if (prefix_to_use > 0 && prefix.compare(pfx) != 0) {
iter->Seek(k);
seek_key = key;
prefix = Slice(seek_key.data(), prefix_to_use);
}

s = iter->status();

std::string from_db;

if (iter->Valid()) {
const int diff = iter->key().compare(k);

if (diff > 0) {
s = Status::NotFound();
} else if (diff == 0) {
if (!VerifyWideColumns(iter->value(), iter->columns())) {
VerificationAbort(shared, static_cast<int>(0), i, iter->value(),
iter->columns());
}

from_db = iter->value().ToString();
iter->Next();
} else {
assert(diff < 0);

VerificationAbort(shared, "An out of range key was found",
static_cast<int>(0), i);
}
} else {
// The iterator found no value for the key in question, so do not
// move to the next item in the iterator
s = Status::NotFound();
}

s = cmp_db_->Get(read_opts, column_families_[0], key, &from_db);
if (!VerifyOrSyncValue(
0, i, read_opts, shared, shared->GetSecondary(0, i), from_db,
/* msg_prefix */ "Secondary get verification", s)) {
Expand All @@ -460,82 +413,84 @@ class NonBatchedOpsStressTest : public StressTest {
<< std::endl;
}

const auto checksum_column_family = [](Iterator* iter,
uint32_t* checksum) -> Status {
assert(nullptr != checksum);
uint32_t ret = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ret = crc32c::Extend(ret, iter->key().data(), iter->key().size());
ret = crc32c::Extend(ret, iter->value().data(), iter->value().size());
}
*checksum = ret;
return iter->status();
};

std::string ts_str;
Slice ts;
if (FLAGS_user_timestamp_size > 0) {
ts_str = GetNowNanos();
ts = ts_str;
read_opts.timestamp = &ts;
}

static Random64 rand64(shared->GetSeed());

{
uint32_t crc = 0;
std::unique_ptr<Iterator> it(cmp_db_->NewIterator(read_opts));
s = checksum_column_family(it.get(), &crc);
if (!s.ok()) {
fprintf(stderr, "Computing checksum of default cf: %s\n",
s.ToString().c_str());
assert(false);
}
}

for (auto* handle : cmp_cfhs_) {
if (thread->rand.OneInOpt(3)) {
// Use Get()
uint64_t key = rand64.Uniform(static_cast<uint64_t>(max_key));
std::string key_str = Key(key);
std::string value;
std::string key_ts;
s = cmp_db_->Get(read_opts, handle, key_str, &value,
FLAGS_user_timestamp_size > 0 ? &key_ts : nullptr);
s.PermitUncheckedError();
} else {
// Use range scan
std::unique_ptr<Iterator> iter(cmp_db_->NewIterator(read_opts, handle));
uint32_t rnd = (thread->rand.Next()) % 4;
if (0 == rnd) {
// SeekToFirst() + Next()*5
read_opts.total_order_seek = true;
iter->SeekToFirst();
for (int i = 0; i < 5 && iter->Valid(); ++i, iter->Next()) {
}
} else if (1 == rnd) {
// SeekToLast() + Prev()*5
read_opts.total_order_seek = true;
iter->SeekToLast();
for (int i = 0; i < 5 && iter->Valid(); ++i, iter->Prev()) {
}
} else if (2 == rnd) {
// Seek() +Next()*5
uint64_t key = rand64.Uniform(static_cast<uint64_t>(max_key));
std::string key_str = Key(key);
iter->Seek(key_str);
for (int i = 0; i < 5 && iter->Valid(); ++i, iter->Next()) {
}
} else {
// SeekForPrev() + Prev()*5
uint64_t key = rand64.Uniform(static_cast<uint64_t>(max_key));
std::string key_str = Key(key);
iter->SeekForPrev(key_str);
for (int i = 0; i < 5 && iter->Valid(); ++i, iter->Prev()) {
}
}
}
}
// This iterator code seems to result in an assertion failure for
// `iter_.iter()->IsValuePinned()'
// const auto checksum_column_family = [](Iterator* iter,
// uint32_t* checksum) -> Status {
// assert(nullptr != checksum);
// uint32_t ret = 0;
// for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
// ret = crc32c::Extend(ret, iter->key().data(), iter->key().size());
// ret = crc32c::Extend(ret, iter->value().data(),
// iter->value().size());
// }
// *checksum = ret;
// return iter->status();
// };

// std::string ts_str;
// Slice ts;
// if (FLAGS_user_timestamp_size > 0) {
// ts_str = GetNowNanos();
// ts = ts_str;
// read_opts.timestamp = &ts;
// }

// static Random64 rand64(shared->GetSeed());

// {
// uint32_t crc = 0;
// std::unique_ptr<Iterator> it(cmp_db_->NewIterator(read_opts));
// s = checksum_column_family(it.get(), &crc);
// if (!s.ok()) {
// fprintf(stderr, "Computing checksum of default cf: %s\n",
// s.ToString().c_str());
// assert(false);
// }
// }

// for (auto* handle : cmp_cfhs_) {
// if (thread->rand.OneInOpt(3)) {
// // Use Get()
// uint64_t key = rand64.Uniform(static_cast<uint64_t>(max_key));
// std::string key_str = Key(key);
// std::string value;
// std::string key_ts;
// s = cmp_db_->Get(read_opts, handle, key_str, &value,
// FLAGS_user_timestamp_size > 0 ? &key_ts : nullptr);
// s.PermitUncheckedError();
// } else {
// // Use range scan
// std::unique_ptr<Iterator> iter(cmp_db_->NewIterator(read_opts,
// handle)); uint32_t rnd = (thread->rand.Next()) % 4; if (0 == rnd) {
// // SeekToFirst() + Next()*5
// read_opts.total_order_seek = true;
// iter->SeekToFirst();
// for (int i = 0; i < 5 && iter->Valid(); ++i, iter->Next()) {
// }
// } else if (1 == rnd) {
// // SeekToLast() + Prev()*5
// read_opts.total_order_seek = true;
// iter->SeekToLast();
// for (int i = 0; i < 5 && iter->Valid(); ++i, iter->Prev()) {
// }
// } else if (2 == rnd) {
// // Seek() +Next()*5
// uint64_t key = rand64.Uniform(static_cast<uint64_t>(max_key));
// std::string key_str = Key(key);
// iter->Seek(key_str);
// for (int i = 0; i < 5 && iter->Valid(); ++i, iter->Next()) {
// }
// } else {
// // SeekForPrev() + Prev()*5
// uint64_t key = rand64.Uniform(static_cast<uint64_t>(max_key));
// std::string key_str = Key(key);
// iter->SeekForPrev(key_str);
// for (int i = 0; i < 5 && iter->Valid(); ++i, iter->Prev()) {
// }
// }
// }
// }
}

void MaybeClearOneColumnFamily(ThreadState* thread) override {
Expand Down

0 comments on commit 08f62cd

Please sign in to comment.