Skip to content

Commit

Permalink
Reduce unnecessary MutableCFOptions copies and parameters
Browse files Browse the repository at this point in the history
Summary: As follow-up to facebook#13239, this change is primarily motivated by
simplifying the calling conventions of LogAndApply. Since it must be
called while holding the DB mutex, it can read safely read
cfd->GetLatestMutableCFOptions(), until it releases the mutex. Before it
releases the mutex, it makes a copy of the mutable options in a new,
unpublished Version object, which can be used when not holding the DB
mutex. This eliminates the need for callers of LogAndApply to copy
mutable options for its sake, or even specify mutable options at all.
And it eliminates the need for *another* copy saved in ManifestWriter.

Other functions that don't need the mutable options parameter:
* ColumnFamilyData::CreateNewMemtable()
* CompactionJob::Install() / InstallCompactionResults()
* MemTableList::*InstallMemtable*()
* Version::PrepareAppend()

Test Plan: existing tests, CI with sanitizers
  • Loading branch information
pdillinger committed Jan 15, 2025
1 parent b333358 commit c0c1bfd
Show file tree
Hide file tree
Showing 24 changed files with 166 additions and 257 deletions.
7 changes: 4 additions & 3 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1160,12 +1160,13 @@ MemTable* ColumnFamilyData::ConstructNewMemtable(
write_buffer_manager_, earliest_seq, id_);
}

void ColumnFamilyData::CreateNewMemtable(
const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) {
void ColumnFamilyData::CreateNewMemtable(SequenceNumber earliest_seq) {
if (mem_ != nullptr) {
delete mem_->Unref();
}
SetMemtable(ConstructNewMemtable(mutable_cf_options, earliest_seq));
// NOTE: db mutex must be locked for SetMemtable, so safe for
// GetLatestMutableCFOptions
SetMemtable(ConstructNewMemtable(GetLatestMutableCFOptions(), earliest_seq));
mem_->Ref();
}

Expand Down
5 changes: 3 additions & 2 deletions db/column_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -401,10 +401,11 @@ class ColumnFamilyData {
uint64_t OldestLogToKeep();

// See Memtable constructor for explanation of earliest_seq param.
// `mutable_cf_options` might need to be a saved copy if calling this without
// holding the DB mutex.
MemTable* ConstructNewMemtable(const MutableCFOptions& mutable_cf_options,
SequenceNumber earliest_seq);
void CreateNewMemtable(const MutableCFOptions& mutable_cf_options,
SequenceNumber earliest_seq);
void CreateNewMemtable(SequenceNumber earliest_seq);

TableCache* table_cache() const { return table_cache_.get(); }
BlobFileCache* blob_file_cache() const { return blob_file_cache_.get(); }
Expand Down
18 changes: 8 additions & 10 deletions db/compaction/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -902,8 +902,7 @@ Status CompactionJob::Run() {
return status;
}

Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options,
bool* compaction_released) {
Status CompactionJob::Install(bool* compaction_released) {
assert(compact_);

AutoThreadOperationStageUpdater stage_updater(
Expand All @@ -919,7 +918,7 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options,
compaction_stats_);

if (status.ok()) {
status = InstallCompactionResults(mutable_cf_options, compaction_released);
status = InstallCompactionResults(compaction_released);
}
if (!versions_->io_status().ok()) {
io_status_ = versions_->io_status();
Expand Down Expand Up @@ -1800,8 +1799,7 @@ Status CompactionJob::FinishCompactionOutputFile(
return s;
}

Status CompactionJob::InstallCompactionResults(
const MutableCFOptions& mutable_cf_options, bool* compaction_released) {
Status CompactionJob::InstallCompactionResults(bool* compaction_released) {
assert(compact_);

db_mutex_->AssertHeld();
Expand Down Expand Up @@ -1890,11 +1888,11 @@ Status CompactionJob::InstallCompactionResults(
*compaction_released = true;
};

return versions_->LogAndApply(
compaction->column_family_data(), mutable_cf_options, read_options,
write_options, edit, db_mutex_, db_directory_,
/*new_descriptor_log=*/false,
/*column_family_options=*/nullptr, manifest_wcb);
return versions_->LogAndApply(compaction->column_family_data(), read_options,
write_options, edit, db_mutex_, db_directory_,
/*new_descriptor_log=*/false,
/*column_family_options=*/nullptr,
manifest_wcb);
}

void CompactionJob::RecordCompactionIOStats() {
Expand Down
6 changes: 2 additions & 4 deletions db/compaction/compaction_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,7 @@ class CompactionJob {
// Add compaction input/output to the current version
// Releases compaction file through Compaction::ReleaseCompactionFiles().
// Sets *compaction_released to true if compaction is released.
Status Install(const MutableCFOptions& mutable_cf_options,
bool* compaction_released);
Status Install(bool* compaction_released);

// Return the IO status
IOStatus io_status() const { return io_status_; }
Expand Down Expand Up @@ -282,8 +281,7 @@ class CompactionJob {
const Slice& next_table_min_key,
const Slice* comp_start_user_key,
const Slice* comp_end_user_key);
Status InstallCompactionResults(const MutableCFOptions& mutable_cf_options,
bool* compaction_released);
Status InstallCompactionResults(bool* compaction_released);
Status OpenCompactionOutputFile(SubcompactionState* sub_compact,
CompactionOutputs& outputs);

Expand Down
7 changes: 3 additions & 4 deletions db/compaction/compaction_job_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -397,8 +397,8 @@ class CompactionJobTestBase : public testing::Test {

mutex_.Lock();
EXPECT_OK(versions_->LogAndApply(
versions_->GetColumnFamilySet()->GetDefault(), mutable_cf_options_,
read_options_, write_options_, &edit, &mutex_, nullptr));
versions_->GetColumnFamilySet()->GetDefault(), read_options_,
write_options_, &edit, &mutex_, nullptr));
mutex_.Unlock();
}

Expand Down Expand Up @@ -684,8 +684,7 @@ class CompactionJobTestBase : public testing::Test {
ASSERT_OK(compaction_job.io_status());
mutex_.Lock();
bool compaction_released = false;
ASSERT_OK(compaction_job.Install(cfd->GetLatestMutableCFOptions(),
&compaction_released));
ASSERT_OK(compaction_job.Install(&compaction_released));
ASSERT_OK(compaction_job.io_status());
mutex_.Unlock();
log_buffer.FlushBufferToLog();
Expand Down
63 changes: 25 additions & 38 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -386,9 +386,8 @@ Status DBImpl::ResumeImpl(DBRecoverContext context) {
static_cast_with_check<ColumnFamilyHandleImpl>(default_cf_handle_);
assert(cfh);
ColumnFamilyData* cfd = cfh->cfd();
s = versions_->LogAndApply(cfd, cfd->GetLatestMutableCFOptions(),
read_options, write_options, &edit, &mutex_,
directories_.GetDbDir());
s = versions_->LogAndApply(cfd, read_options, write_options, &edit,
&mutex_, directories_.GetDbDir());
if (!s.ok()) {
io_s = versions_->io_status();
if (!io_s.ok()) {
Expand Down Expand Up @@ -1290,7 +1289,7 @@ Status DBImpl::SetOptions(
}

InstrumentedMutexLock ol(&options_mutex_);
MutableCFOptions new_options_copy;
MutableCFOptions new_options_copy; // For logging outside of DB mutex
Status s;
Status persist_options_status;
SuperVersionContext sv_context(/* create_superversion */ true);
Expand All @@ -1302,9 +1301,8 @@ Status DBImpl::SetOptions(
new_options_copy = cfd->GetLatestMutableCFOptions();
// Append new version to recompute compaction score.
VersionEdit dummy_edit;
s = versions_->LogAndApply(cfd, new_options_copy, read_options,
write_options, &dummy_edit, &mutex_,
directories_.GetDbDir());
s = versions_->LogAndApply(cfd, read_options, write_options, &dummy_edit,
&mutex_, directories_.GetDbDir());
if (!versions_->io_status().ok()) {
assert(!s.ok());
error_handler_.SetBGError(versions_->io_status(),
Expand Down Expand Up @@ -3652,9 +3650,9 @@ Status DBImpl::CreateColumnFamilyImpl(const ReadOptions& read_options,
write_thread_.EnterUnbatched(&w, &mutex_);
// LogAndApply will both write the creation in MANIFEST and create
// ColumnFamilyData object
s = versions_->LogAndApply(nullptr, MutableCFOptions(cf_options),
read_options, write_options, &edit, &mutex_,
directories_.GetDbDir(), false, &cf_options);
s = versions_->LogAndApply(nullptr, read_options, write_options, &edit,
&mutex_, directories_.GetDbDir(), false,
&cf_options);
write_thread_.ExitUnbatched(&w);
}
if (s.ok()) {
Expand Down Expand Up @@ -3762,9 +3760,8 @@ Status DBImpl::DropColumnFamilyImpl(ColumnFamilyHandle* column_family) {
// we drop column family from a single write thread
WriteThread::Writer w;
write_thread_.EnterUnbatched(&w, &mutex_);
s = versions_->LogAndApply(cfd, cfd->GetLatestMutableCFOptions(),
read_options, write_options, &edit, &mutex_,
directories_.GetDbDir());
s = versions_->LogAndApply(cfd, read_options, write_options, &edit,
&mutex_, directories_.GetDbDir());
write_thread_.ExitUnbatched(&w);
}
if (s.ok()) {
Expand Down Expand Up @@ -4969,9 +4966,8 @@ Status DBImpl::DEPRECATED_DeleteFile(std::string name) {
}
edit.SetColumnFamily(cfd->GetID());
edit.DeleteFile(level, number);
status = versions_->LogAndApply(cfd, cfd->GetLatestMutableCFOptions(),
read_options, write_options, &edit, &mutex_,
directories_.GetDbDir());
status = versions_->LogAndApply(cfd, read_options, write_options, &edit,
&mutex_, directories_.GetDbDir());
if (status.ok()) {
InstallSuperVersionAndScheduleWork(
cfd, job_context.superversion_contexts.data());
Expand Down Expand Up @@ -5081,9 +5077,8 @@ Status DBImpl::DeleteFilesInRanges(ColumnFamilyHandle* column_family,
return status;
}
input_version->Ref();
status = versions_->LogAndApply(cfd, cfd->GetLatestMutableCFOptions(),
read_options, write_options, &edit, &mutex_,
directories_.GetDbDir());
status = versions_->LogAndApply(cfd, read_options, write_options, &edit,
&mutex_, directories_.GetDbDir());
if (status.ok()) {
InstallSuperVersionAndScheduleWork(
cfd, job_context.superversion_contexts.data());
Expand Down Expand Up @@ -6129,14 +6124,12 @@ Status DBImpl::IngestExternalFiles(
ReadOptions read_options;
read_options.fill_cache = args[0].options.fill_cache;
autovector<ColumnFamilyData*> cfds_to_commit;
autovector<const MutableCFOptions*> mutable_cf_options_list;
autovector<autovector<VersionEdit*>> edit_lists;
uint32_t num_entries = 0;
for (size_t i = 0; i != num_cfs; ++i) {
auto* cfd = ingestion_jobs[i].GetColumnFamilyData();
assert(!cfd->IsDropped());
cfds_to_commit.push_back(cfd);
mutable_cf_options_list.push_back(&cfd->GetLatestMutableCFOptions());
autovector<VersionEdit*> edit_list;
edit_list.push_back(ingestion_jobs[i].edit());
edit_lists.push_back(edit_list);
Expand All @@ -6151,10 +6144,10 @@ Status DBImpl::IngestExternalFiles(
}
assert(0 == num_entries);
}
status = versions_->LogAndApply(
cfds_to_commit, mutable_cf_options_list, read_options, write_options,
status =
versions_->LogAndApply(cfds_to_commit, read_options, write_options,

edit_lists, &mutex_, directories_.GetDbDir());
edit_lists, &mutex_, directories_.GetDbDir());
// It is safe to update VersionSet last seqno here after LogAndApply since
// LogAndApply persists last sequence number from VersionEdits,
// which are from file's largest seqno and not from VersionSet.
Expand Down Expand Up @@ -6305,11 +6298,9 @@ Status DBImpl::CreateColumnFamilyWithImport(
// and this will overwrite the external file. To protect the external
// file, we have to make sure the file number will never being reused.
next_file_number = versions_->FetchAddFileNumber(total_file_num);
MutableCFOptions mutable_cf_options_copy =
cfd->GetLatestMutableCFOptions();
status = versions_->LogAndApply(cfd, mutable_cf_options_copy,
read_options, write_options, &dummy_edit,
&mutex_, directories_.GetDbDir());
status =
versions_->LogAndApply(cfd, read_options, write_options, &dummy_edit,
&mutex_, directories_.GetDbDir());
if (status.ok()) {
InstallSuperVersionAndScheduleWork(cfd, &dummy_sv_ctx);
}
Expand Down Expand Up @@ -6344,11 +6335,9 @@ Status DBImpl::CreateColumnFamilyWithImport(

// Install job edit [Mutex will be unlocked here]
if (status.ok()) {
MutableCFOptions mutable_cf_options_copy =
cfd->GetLatestMutableCFOptions();
status = versions_->LogAndApply(
cfd, mutable_cf_options_copy, read_options, write_options,
import_job.edit(), &mutex_, directories_.GetDbDir());
status = versions_->LogAndApply(cfd, read_options, write_options,
import_job.edit(), &mutex_,
directories_.GetDbDir());
if (status.ok()) {
InstallSuperVersionAndScheduleWork(cfd, &sv_context);
}
Expand Down Expand Up @@ -6768,15 +6757,13 @@ Status DBImpl::ReserveFileNumbersBeforeIngestion(
pending_output_elem.reset(new std::list<uint64_t>::iterator(
CaptureCurrentFileNumberInPendingOutputs()));
*next_file_number = versions_->FetchAddFileNumber(static_cast<uint64_t>(num));
MutableCFOptions mutable_cf_options_copy = cfd->GetLatestMutableCFOptions();
VersionEdit dummy_edit;
// If crash happen after a hard link established, Recover function may
// reuse the file number that has already assigned to the internal file,
// and this will overwrite the external file. To protect the external
// file, we have to make sure the file number will never being reused.
s = versions_->LogAndApply(cfd, mutable_cf_options_copy, read_options,
write_options, &dummy_edit, &mutex_,
directories_.GetDbDir());
s = versions_->LogAndApply(cfd, read_options, write_options, &dummy_edit,
&mutex_, directories_.GetDbDir());
if (s.ok()) {
InstallSuperVersionAndScheduleWork(cfd, &dummy_sv_ctx);
}
Expand Down
2 changes: 0 additions & 2 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1451,7 +1451,6 @@ class DBImpl : public DB {
uint32_t size = static_cast<uint32_t>(map_.size());
map_.emplace(cfd->GetID(), size);
cfds_.emplace_back(cfd);
mutable_cf_opts_.emplace_back(&cfd->GetLatestMutableCFOptions());
edit_lists_.emplace_back(autovector<VersionEdit*>());
}
uint32_t i = map_[cfd->GetID()];
Expand All @@ -1460,7 +1459,6 @@ class DBImpl : public DB {

std::unordered_map<uint32_t, uint32_t> map_; // cf_id to index;
autovector<ColumnFamilyData*> cfds_;
autovector<const MutableCFOptions*> mutable_cf_opts_;
autovector<autovector<VersionEdit*>> edit_lists_;
// All existing data files (SST files and Blob files) found during DB::Open.
std::vector<std::string> existing_data_files_;
Expand Down
Loading

0 comments on commit c0c1bfd

Please sign in to comment.