Skip to content

Commit

Permalink
[WIP] Delete duplicated key/value pairs recursively
Browse files Browse the repository at this point in the history
This is to implement the idea: http://pad.ceph.com/p/rocksdb-wal-improvement
Add a new flush style called kFlushStyleDedup which users can config by setting
flush_style=kFlushStyleDedup and write_buffer_number_to_flush. Flush
is triggered while number of non-flushed memtables is >= min_write_buffer_number_to_merge.
When flushing memtables to L0, it merges write_buffer_number_to_flush memtables
and deduping against other non-flushed memtables.

With bluestore fio plugin to do 4k random write test, I compared the two stats
db.get.micros and db.write.micros from rocksdb every ten minutes. db.get.micros
is the average of get operations, and db.write.micros is the average time of put/delete
functions. From the result, rocksdb read performance can be improved up to 38%,
and write performance can be improved to 15%. More detailed info please see the excel:
https://drive.google.com/drive/folders/0B6jqFc7e2yxVdUQ2aEpCR3ItbG8

And to complement, the io perfermance of bluestore fio doesn't improve much. From time of
txc states, the time spends a lot in kv_state_commiting_lat (up to 80%), but submit_transaction and
submit_transaction_sync takes little time compared.

Signed-off-by: Xiaoyan Li [email protected]
  • Loading branch information
lixiaoy1 committed Aug 22, 2017
1 parent 2e64f45 commit bf29157
Show file tree
Hide file tree
Showing 27 changed files with 249 additions and 31 deletions.
62 changes: 52 additions & 10 deletions db/builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,15 @@ Status BuildTable(
const CompressionOptions& compression_opts, bool paranoid_file_checks,
InternalStats* internal_stats, TableFileCreationReason reason,
EventLogger* event_logger, int job_id, const Env::IOPriority io_priority,
TableProperties* table_properties, int level) {
TableProperties* table_properties, int level,
InternalIterator* compare_iter) {
assert((column_family_id ==
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) ==
column_family_name.empty());
// Reports the IOStats for flush for every following bytes.
const size_t kReportFlushIOStatsEvery = 1048576;
Status s;
uint64_t num_duplicated = 0, num_total = 0;
meta->fd.file_size = 0;
iter->SeekToFirst();
std::unique_ptr<RangeDelAggregator> range_del_agg(
Expand Down Expand Up @@ -138,17 +140,56 @@ Status BuildTable(
&snapshots, earliest_write_conflict_snapshot, env,
true /* internal key corruption is not ok */, range_del_agg.get());
c_iter.SeekToFirst();

ParsedInternalKey c_ikey, comp_ikey;
if (compare_iter != nullptr) {
compare_iter->SeekToFirst(); //find the first one
}
const rocksdb::Comparator *comp = internal_comparator.user_comparator();
for (; c_iter.Valid(); c_iter.Next()) {
const Slice& key = c_iter.key();
const Slice& value = c_iter.value();
builder->Add(key, value);
meta->UpdateBoundaries(key, c_iter.ikey().sequence);

// TODO(noetzli): Update stats after flush, too.
if (io_priority == Env::IO_HIGH &&
IOSTATS(bytes_written) >= kReportFlushIOStatsEvery) {
ThreadStatusUtil::SetThreadOperationProperty(
ThreadStatus::FLUSH_BYTES_WRITTEN, IOSTATS(bytes_written));
bool skip = false;
num_total++;

// Check whether the key is duplicated in later mems
c_ikey = c_iter.ikey();
if (compare_iter != nullptr ) {
while (compare_iter->Valid()) {
if (!ParseInternalKey(compare_iter->key(), &comp_ikey)) {
compare_iter->Next();
continue;
}
int result = comp->Compare(c_ikey.user_key, comp_ikey.user_key);
if (result == 0) {
// No delete for merge options.
if (c_ikey.type != kTypeMerge && comp_ikey.type != kTypeMerge) {
skip = true;
}
break;
}
else if (result > 0) {
compare_iter->Next();
}
else {
break;
}
}
}
if (skip) {
num_duplicated++;
continue;
}
else {
builder->Add(key, value);
meta->UpdateBoundaries(key, c_iter.ikey().sequence);

// TODO(noetzli): Update stats after flush, too.
if (io_priority == Env::IO_HIGH &&
IOSTATS(bytes_written) >= kReportFlushIOStatsEvery) {
ThreadStatusUtil::SetThreadOperationProperty(
ThreadStatus::FLUSH_BYTES_WRITTEN, IOSTATS(bytes_written));
}
}
}
// nullptr for table_{min,max} so all range tombstones will be flushed
Expand Down Expand Up @@ -218,9 +259,10 @@ Status BuildTable(
}

// Output to event logger and fire events.
bool show_num = (compare_iter != nullptr)?true:false;
EventHelpers::LogAndNotifyTableFileCreationFinished(
event_logger, ioptions.listeners, dbname, column_family_name, fname,
job_id, meta->fd, tp, reason, s);
job_id, meta->fd, tp, reason, s, show_num, num_total, num_duplicated);

return s;
}
Expand Down
3 changes: 2 additions & 1 deletion db/builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ extern Status BuildTable(
InternalStats* internal_stats, TableFileCreationReason reason,
EventLogger* event_logger = nullptr, int job_id = 0,
const Env::IOPriority io_priority = Env::IO_HIGH,
TableProperties* table_properties = nullptr, int level = -1);
TableProperties* table_properties = nullptr, int level = -1,
InternalIterator* compare_iter = nullptr);

} // namespace rocksdb
8 changes: 8 additions & 0 deletions db/c.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2263,6 +2263,14 @@ void rocksdb_options_set_max_write_buffer_number(rocksdb_options_t* opt, int n)
opt->rep.max_write_buffer_number = n;
}

void rocksdb_options_set_flush_style(rocksdb_options_t* opt, int style) {
opt->rep.flush_style = static_cast<rocksdb::FlushStyle>(style);
}

void rocksdb_options_set_write_buffer_number_to_flush(rocksdb_options_t* opt, int n) {
opt->rep.write_buffer_number_to_flush = n;
}

void rocksdb_options_set_min_write_buffer_number_to_merge(rocksdb_options_t* opt, int n) {
opt->rep.min_write_buffer_number_to_merge = n;
}
Expand Down
13 changes: 12 additions & 1 deletion db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,14 @@ ColumnFamilyOptions SanitizeOptions(const ImmutableDBOptions& db_options,
result.min_write_buffer_number_to_merge = 1;
}

if (result.flush_style == kFlushStyleDedup) {
if (result.write_buffer_number_to_flush < 1 ||
(result.write_buffer_number_to_flush >
result.min_write_buffer_number_to_merge)) {
result.write_buffer_number_to_flush = 1;
}
}

if (result.num_levels < 1) {
result.num_levels = 1;
}
Expand All @@ -181,6 +189,7 @@ ColumnFamilyOptions SanitizeOptions(const ImmutableDBOptions& db_options,
if (result.max_write_buffer_number < 2) {
result.max_write_buffer_number = 2;
}

if (result.max_write_buffer_number_to_maintain < 0) {
result.max_write_buffer_number_to_maintain = result.max_write_buffer_number;
}
Expand Down Expand Up @@ -359,7 +368,9 @@ ColumnFamilyData::ColumnFamilyData(
write_buffer_manager_(write_buffer_manager),
mem_(nullptr),
imm_(ioptions_.min_write_buffer_number_to_merge,
ioptions_.max_write_buffer_number_to_maintain),
ioptions_.max_write_buffer_number_to_maintain,
ioptions_.write_buffer_number_to_flush),
stop(false),
super_version_(nullptr),
super_version_number_(0),
local_sv_(new ThreadLocalPtr(&SuperVersionUnrefHandle)),
Expand Down
4 changes: 4 additions & 0 deletions db/column_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,9 @@ class ColumnFamilyData {
MemTable* mem() { return mem_; }
Version* current() { return current_; }
Version* dummy_versions() { return dummy_versions_; }
void set_stop() { stop = true; }
bool is_stop() { return stop; }
bool is_flush_recursive_dedup() { return (ioptions_.flush_style == kFlushStyleDedup); }
void SetCurrent(Version* _current);
uint64_t GetNumLiveVersions() const; // REQUIRE: DB mutex held
uint64_t GetTotalSstFilesSize() const; // REQUIRE: DB mutex held
Expand Down Expand Up @@ -371,6 +374,7 @@ class ColumnFamilyData {

MemTable* mem_;
MemTableList imm_;
bool stop;
SuperVersion* super_version_;

// An ordinal representing the current SuperVersion. Updated by
Expand Down
1 change: 1 addition & 0 deletions db/column_family_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1027,6 +1027,7 @@ TEST_F(ColumnFamilyTest, DifferentWriteBufferSizes) {
default_cf.arena_block_size = 4 * 4096;
default_cf.max_write_buffer_number = 10;
default_cf.min_write_buffer_number_to_merge = 1;
default_cf.write_buffer_number_to_flush = 0;
default_cf.max_write_buffer_number_to_maintain = 0;
one.write_buffer_size = 200000;
one.arena_block_size = 4 * 4096;
Expand Down
2 changes: 1 addition & 1 deletion db/compaction_picker_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class CompactionPickerTest : public testing::Test {
DeleteVersionStorage();
options_.num_levels = num_levels;
vstorage_.reset(new VersionStorageInfo(&icmp_, ucmp_, options_.num_levels,
style, nullptr, false));
kFlushStyleMerge, style, nullptr, false));
vstorage_->CalculateBaseBytes(ioptions_, mutable_cf_options_);
}

Expand Down
1 change: 1 addition & 0 deletions db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ void DBImpl::CancelAllBackgroundWork(bool wait) {
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (!cfd->IsDropped() && !cfd->mem()->IsEmpty()) {
cfd->Ref();
cfd->set_stop();
mutex_.Unlock();
FlushMemTable(cfd, FlushOptions());
mutex_.Lock();
Expand Down
5 changes: 4 additions & 1 deletion db/event_helpers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ void EventHelpers::LogAndNotifyTableFileCreationFinished(
const std::string& db_name, const std::string& cf_name,
const std::string& file_path, int job_id, const FileDescriptor& fd,
const TableProperties& table_properties, TableFileCreationReason reason,
const Status& s) {
const Status& s, bool show_num, int total_num, int dup_num) {
if (s.ok() && event_logger) {
JSONWriter jwriter;
AppendCurrentTime(&jwriter);
Expand Down Expand Up @@ -73,6 +73,9 @@ void EventHelpers::LogAndNotifyTableFileCreationFinished(
<< "num_data_blocks" << table_properties.num_data_blocks
<< "num_entries" << table_properties.num_entries
<< "filter_policy_name" << table_properties.filter_policy_name;
if (show_num) {
jwriter << "total_paris" << total_num << "duplicated_pairs" << dup_num;
}

// user collected properties
for (const auto& prop : table_properties.readable_properties) {
Expand Down
2 changes: 1 addition & 1 deletion db/event_helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class EventHelpers {
const std::string& db_name, const std::string& cf_name,
const std::string& file_path, int job_id, const FileDescriptor& fd,
const TableProperties& table_properties, TableFileCreationReason reason,
const Status& s);
const Status& s, bool show_num = false, int total_num = 0, int duplicated_num = 0);
static void LogAndNotifyTableFileDeletion(
EventLogger* event_logger, int job_id,
uint64_t file_number, const std::string& file_path,
Expand Down
33 changes: 29 additions & 4 deletions db/flush_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,16 @@ void FlushJob::PickMemTable() {
assert(!pick_memtable_called);
pick_memtable_called = true;
// Save the contents of the earliest memtable as a new Table
cfd_->imm()->PickMemtablesToFlush(&mems_);

if (cfd_->is_flush_recursive_dedup() && !cfd_->is_stop()) {
cfd_->imm()->PickMemtablesToFlush(&mems_, &compare_mems_);
}
else {
cfd_->imm()->PickMemtablesToFlush(&mems_);
}
if (mems_.empty()) {
return;
}

ReportFlushInputSize(mems_);

// entries mems are (implicitly) sorted in ascending order by their created
Expand Down Expand Up @@ -257,6 +262,7 @@ Status FlushJob::WriteLevel0Table() {
ReadOptions ro;
ro.total_order_seek = true;
Arena arena;
Arena arena1;
uint64_t total_num_entries = 0, total_num_deletes = 0;
size_t total_memory_usage = 0;
for (MemTable* m : mems_) {
Expand All @@ -274,17 +280,36 @@ Status FlushJob::WriteLevel0Table() {
total_memory_usage += m->ApproximateMemoryUsage();
}

std::vector<InternalIterator*> compare_memtables;
int comp_entries = 0;
for (MemTable *m: compare_mems_) {
compare_memtables.push_back(m->NewIterator(ro, &arena1));
comp_entries += m->num_entries();
}

event_logger_->Log() << "job" << job_context_->job_id << "event"
<< "flush_started"
<< "num_memtables" << mems_.size() << "num_entries"
<< total_num_entries << "num_deletes"
<< total_num_deletes << "memory_usage"
<< total_memory_usage;
<< total_memory_usage << "mems table count "
<< mems_.size() << " compare mems count "
<< compare_mems_.size() << " compare mems total "
<< comp_entries;

{
ScopedArenaIterator iter(
NewMergingIterator(&cfd_->internal_comparator(), &memtables[0],
static_cast<int>(memtables.size()), &arena));
InternalIterator *compare_iter = nullptr;
ScopedArenaIterator compare_scope_iter;
if (compare_mems_.size() > 0) {
compare_scope_iter.set(
NewMergingIterator(&cfd_->internal_comparator(), &compare_memtables[0],
static_cast<int>(compare_memtables.size()), &arena1));
compare_iter = compare_scope_iter.get();
}

std::unique_ptr<InternalIterator> range_del_iter(NewMergingIterator(
&cfd_->internal_comparator(),
range_del_iters.empty() ? nullptr : &range_del_iters[0],
Expand All @@ -308,7 +333,7 @@ Status FlushJob::WriteLevel0Table() {
cfd_->ioptions()->compression_opts,
mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(),
TableFileCreationReason::kFlush, event_logger_, job_context_->job_id,
Env::IO_HIGH, &table_properties_, 0 /* level */);
Env::IO_HIGH, &table_properties_, 0, compare_iter);
LogFlush(db_options_.info_log);
}
ROCKS_LOG_INFO(db_options_.info_log,
Expand Down
1 change: 1 addition & 0 deletions db/flush_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ class FlushJob {
// Variables below are set by PickMemTable():
FileMetaData meta_;
autovector<MemTable*> mems_;
autovector<MemTable*> compare_mems_;
VersionEdit* edit_;
Version* base_;
bool pick_memtable_called;
Expand Down
27 changes: 27 additions & 0 deletions db/memtable_list.cc
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,33 @@ bool MemTableList::IsFlushPending() const {
return false;
}

// Returns the memtables that need to be flushed.
void MemTableList::PickMemtablesToFlush(autovector<MemTable*>* ret,
autovector<MemTable*>* compare_ret) {
AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_PICK_MEMTABLES_TO_FLUSH);
assert(write_buffer_number_to_flush_ > 0);
const auto& memlist = current_->memlist_;
for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) {
MemTable* m = *it;
if (!m->flush_in_progress_) {
assert(!m->flush_completed_);
if (ret->size() < write_buffer_number_to_flush_) {
num_flush_not_started_--;
if (num_flush_not_started_ == 0) {
imm_flush_needed.store(false, std::memory_order_release);
}
m->flush_in_progress_ = true; // flushing will start very soon
ret->push_back(m);
}
else {
compare_ret->push_back(m);
}
}
}
flush_requested_ = false; // start-flush request is complete
}

// Returns the memtables that need to be flushed.
void MemTableList::PickMemtablesToFlush(autovector<MemTable*>* ret) {
AutoThreadOperationStageUpdater stage_updater(
Expand Down
7 changes: 6 additions & 1 deletion db/memtable_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,11 @@ class MemTableList {
public:
// A list of memtables.
explicit MemTableList(int min_write_buffer_number_to_merge,
int max_write_buffer_number_to_maintain)
int max_write_buffer_number_to_maintain,
int write_buffer_number_to_flush = 1)
: imm_flush_needed(false),
min_write_buffer_number_to_merge_(min_write_buffer_number_to_merge),
write_buffer_number_to_flush_(write_buffer_number_to_flush),
current_(new MemTableListVersion(&current_memory_usage_,
max_write_buffer_number_to_maintain)),
num_flush_not_started_(0),
Expand Down Expand Up @@ -194,6 +196,7 @@ class MemTableList {

// Returns the earliest memtables that needs to be flushed. The returned
// memtables are guaranteed to be in the ascending order of created time.
void PickMemtablesToFlush(autovector<MemTable*>* mems, autovector<MemTable*>* compare_mems);
void PickMemtablesToFlush(autovector<MemTable*>* mems);

// Reset status of the given memtable list back to pending state so that
Expand Down Expand Up @@ -242,6 +245,8 @@ class MemTableList {

const int min_write_buffer_number_to_merge_;

const unsigned int write_buffer_number_to_flush_;

MemTableListVersion* current_;

// the number of elements that still need flushing
Expand Down
Loading

0 comments on commit bf29157

Please sign in to comment.