Skip to content

Commit

Permalink
Auto trigger compaction for tiering
Browse files Browse the repository at this point in the history
  • Loading branch information
jowlyzhang committed Jun 12, 2024
1 parent af50823 commit e91c951
Show file tree
Hide file tree
Showing 35 changed files with 886 additions and 60 deletions.
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -939,6 +939,7 @@ set(SOURCES
utilities/persistent_cache/volatile_tier_impl.cc
utilities/simulator_cache/cache_simulator.cc
utilities/simulator_cache/sim_cache.cc
utilities/table_properties_collectors/compact_for_tiering_collector.cc
utilities/table_properties_collectors/compact_on_deletion_collector.cc
utilities/trace/file_trace_reader_writer.cc
utilities/trace/replayer_impl.cc
Expand Down Expand Up @@ -1481,6 +1482,7 @@ if(WITH_TESTS)
utilities/persistent_cache/persistent_cache_test.cc
utilities/simulator_cache/cache_simulator_test.cc
utilities/simulator_cache/sim_cache_test.cc
utilities/table_properties_collectors/compact_for_tiering_collector_test.cc
utilities/table_properties_collectors/compact_on_deletion_collector_test.cc
utilities/transactions/optimistic_transaction_test.cc
utilities/transactions/transaction_test.cc
Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -1631,6 +1631,9 @@ compaction_job_stats_test: $(OBJ_DIR)/db/compaction/compaction_job_stats_test.o
compaction_service_test: $(OBJ_DIR)/db/compaction/compaction_service_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

compact_for_tiering_collector_test: $(OBJ_DIR)/utilities/table_properties_collectors/compact_for_tiering_collector_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

compact_on_deletion_collector_test: $(OBJ_DIR)/utilities/table_properties_collectors/compact_on_deletion_collector_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

Expand Down
7 changes: 7 additions & 0 deletions TARGETS
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[
"utilities/persistent_cache/volatile_tier_impl.cc",
"utilities/simulator_cache/cache_simulator.cc",
"utilities/simulator_cache/sim_cache.cc",
"utilities/table_properties_collectors/compact_for_tiering_collector.cc",
"utilities/table_properties_collectors/compact_on_deletion_collector.cc",
"utilities/trace/file_trace_reader_writer.cc",
"utilities/trace/replayer_impl.cc",
Expand Down Expand Up @@ -4626,6 +4627,12 @@ cpp_unittest_wrapper(name="compact_files_test",
extra_compiler_flags=[])


cpp_unittest_wrapper(name="compact_for_tiering_collector_test",
srcs=["utilities/table_properties_collectors/compact_for_tiering_collector_test.cc"],
deps=[":rocksdb_test_lib"],
extra_compiler_flags=[])


cpp_unittest_wrapper(name="compact_on_deletion_collector_test",
srcs=["utilities/table_properties_collectors/compact_on_deletion_collector_test.cc"],
deps=[":rocksdb_test_lib"],
Expand Down
4 changes: 2 additions & 2 deletions db/builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ Status BuildTable(
std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
range_del_iters,
FileMetaData* meta, std::vector<BlobFileAddition>* blob_file_additions,
std::vector<SequenceNumber> snapshots,
std::vector<SequenceNumber> snapshots, SequenceNumber earliest_snapshot,
SequenceNumber earliest_write_conflict_snapshot,
SequenceNumber job_snapshot, SnapshotChecker* snapshot_checker,
bool paranoid_file_checks, InternalStats* internal_stats,
Expand Down Expand Up @@ -195,7 +195,7 @@ Status BuildTable(

const std::atomic<bool> kManualCompactionCanceledFalse{false};
CompactionIterator c_iter(
iter, ucmp, &merge, kMaxSequenceNumber, &snapshots,
iter, ucmp, &merge, kMaxSequenceNumber, &snapshots, earliest_snapshot,
earliest_write_conflict_snapshot, job_snapshot, snapshot_checker, env,
ShouldReportDetailedTime(env, ioptions.stats),
true /* internal key corruption is not ok */, range_del_agg.get(),
Expand Down
2 changes: 1 addition & 1 deletion db/builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ Status BuildTable(
std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
range_del_iters,
FileMetaData* meta, std::vector<BlobFileAddition>* blob_file_additions,
std::vector<SequenceNumber> snapshots,
std::vector<SequenceNumber> snapshots, SequenceNumber earliest_snapshot,
SequenceNumber earliest_write_conflict_snapshot,
SequenceNumber job_snapshot, SnapshotChecker* snapshot_checker,
bool paranoid_file_checks, InternalStats* internal_stats,
Expand Down
8 changes: 4 additions & 4 deletions db/compaction/compaction_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ namespace ROCKSDB_NAMESPACE {
CompactionIterator::CompactionIterator(
InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots,
SequenceNumber earliest_snapshot,
SequenceNumber earliest_write_conflict_snapshot,
SequenceNumber job_snapshot, const SnapshotChecker* snapshot_checker,
Env* env, bool report_detailed_time, bool expect_valid_internal_key,
Expand All @@ -40,7 +41,7 @@ CompactionIterator::CompactionIterator(
const SequenceNumber preserve_time_min_seqno,
const SequenceNumber preclude_last_level_min_seqno)
: CompactionIterator(
input, cmp, merge_helper, last_sequence, snapshots,
input, cmp, merge_helper, last_sequence, snapshots, earliest_snapshot,
earliest_write_conflict_snapshot, job_snapshot, snapshot_checker, env,
report_detailed_time, expect_valid_internal_key, range_del_agg,
blob_file_builder, allow_data_in_errors, enforce_single_del_contracts,
Expand All @@ -54,6 +55,7 @@ CompactionIterator::CompactionIterator(
CompactionIterator::CompactionIterator(
InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
SequenceNumber /*last_sequence*/, std::vector<SequenceNumber>* snapshots,
SequenceNumber earliest_snapshot,
SequenceNumber earliest_write_conflict_snapshot,
SequenceNumber job_snapshot, const SnapshotChecker* snapshot_checker,
Env* env, bool report_detailed_time, bool expect_valid_internal_key,
Expand Down Expand Up @@ -91,9 +93,7 @@ CompactionIterator::CompactionIterator(
// snapshots_ cannot be nullptr, but we will assert later in the body of
// the constructor.
visible_at_tip_(snapshots_ ? snapshots_->empty() : false),
earliest_snapshot_(!snapshots_ || snapshots_->empty()
? kMaxSequenceNumber
: snapshots_->at(0)),
earliest_snapshot_(earliest_snapshot),
info_log_(info_log),
allow_data_in_errors_(allow_data_in_errors),
enforce_single_del_contracts_(enforce_single_del_contracts),
Expand Down
2 changes: 2 additions & 0 deletions db/compaction/compaction_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ class CompactionIterator {
CompactionIterator(
InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots,
SequenceNumber earliest_snapshot,
SequenceNumber earliest_write_conflict_snapshot,
SequenceNumber job_snapshot, const SnapshotChecker* snapshot_checker,
Env* env, bool report_detailed_time, bool expect_valid_internal_key,
Expand All @@ -222,6 +223,7 @@ class CompactionIterator {
CompactionIterator(
InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots,
SequenceNumber earliest_snapshot,
SequenceNumber earliest_write_conflict_snapshot,
SequenceNumber job_snapshot, const SnapshotChecker* snapshot_checker,
Env* env, bool report_detailed_time, bool expect_valid_internal_key,
Expand Down
1 change: 1 addition & 0 deletions db/compaction/compaction_iterator_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ class CompactionIteratorTest : public testing::TestWithParam<bool> {
iter_->SeekToFirst();
c_iter_.reset(new CompactionIterator(
iter_.get(), cmp_, merge_helper_.get(), last_sequence, &snapshots_,
snapshots_.empty() ? kMaxSequenceNumber : snapshots_.at(0),
earliest_write_conflict_snapshot, kMaxSequenceNumber,
snapshot_checker_.get(), Env::Default(),
false /* report_detailed_time */, false, range_del_agg_.get(),
Expand Down
41 changes: 16 additions & 25 deletions db/compaction/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,9 @@ CompactionJob::CompactionJob(
db_mutex_(db_mutex),
db_error_handler_(db_error_handler),
existing_snapshots_(std::move(existing_snapshots)),
earliest_snapshot_(existing_snapshots_.empty()
? kMaxSequenceNumber
: existing_snapshots_.at(0)),
earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot),
snapshot_checker_(snapshot_checker),
job_context_(job_context),
Expand Down Expand Up @@ -282,6 +285,7 @@ void CompactionJob::Prepare() {

// collect all seqno->time information from the input files which will be used
// to encode seqno->time to the output files.

uint64_t preserve_time_duration =
std::max(c->immutable_options()->preserve_internal_time_seconds,
c->immutable_options()->preclude_last_level_data_seconds);
Expand Down Expand Up @@ -319,28 +323,11 @@ void CompactionJob::Prepare() {
seqno_to_time_mapping_.Enforce();
} else {
seqno_to_time_mapping_.Enforce(_current_time);
uint64_t preserve_time =
static_cast<uint64_t>(_current_time) > preserve_time_duration
? _current_time - preserve_time_duration
: 0;
// GetProximalSeqnoBeforeTime tells us the last seqno known to have been
// written at or before the given time. + 1 to get the minimum we should
// preserve without excluding anything that might have been written on or
// after the given time.
preserve_time_min_seqno_ =
seqno_to_time_mapping_.GetProximalSeqnoBeforeTime(preserve_time) + 1;
if (c->immutable_options()->preclude_last_level_data_seconds > 0) {
uint64_t preclude_last_level_time =
static_cast<uint64_t>(_current_time) >
c->immutable_options()->preclude_last_level_data_seconds
? _current_time -
c->immutable_options()->preclude_last_level_data_seconds
: 0;
preclude_last_level_min_seqno_ =
seqno_to_time_mapping_.GetProximalSeqnoBeforeTime(
preclude_last_level_time) +
1;
}
seqno_to_time_mapping_.GetCurrentTieringCutoffSeqnos(
static_cast<uint64_t>(_current_time),
c->immutable_options()->preserve_internal_time_seconds,
c->immutable_options()->preclude_last_level_data_seconds,
&preserve_time_min_seqno_, &preclude_last_level_min_seqno_);
}
// For accuracy of the GetProximalSeqnoBeforeTime queries above, we only
// limit the capacity after them.
Expand Down Expand Up @@ -1295,8 +1282,9 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {

auto c_iter = std::make_unique<CompactionIterator>(
input, cfd->user_comparator(), &merge, versions_->LastSequence(),
&existing_snapshots_, earliest_write_conflict_snapshot_, job_snapshot_seq,
snapshot_checker_, env_, ShouldReportDetailedTime(env_, stats_),
&existing_snapshots_, earliest_snapshot_,
earliest_write_conflict_snapshot_, job_snapshot_seq, snapshot_checker_,
env_, ShouldReportDetailedTime(env_, stats_),
/*expect_valid_internal_key=*/true, range_del_agg.get(),
blob_file_builder.get(), db_options_.allow_data_in_errors,
db_options_.enforce_single_del_contracts, manual_compaction_canceled_,
Expand Down Expand Up @@ -1969,7 +1957,10 @@ Status CompactionJob::OpenCompactionOutputFile(SubcompactionState* sub_compact,
cfd->GetName(), sub_compact->compaction->output_level(),
bottommost_level_, TableFileCreationReason::kCompaction,
0 /* oldest_key_time */, current_time, db_id_, db_session_id_,
sub_compact->compaction->max_output_file_size(), file_number);
sub_compact->compaction->max_output_file_size(), file_number,
preclude_last_level_min_seqno_ == kMaxSequenceNumber
? preclude_last_level_min_seqno_
: std::min(earliest_snapshot_, preclude_last_level_min_seqno_));

outputs.NewBuilder(tboptions);

Expand Down
2 changes: 2 additions & 0 deletions db/compaction/compaction_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,8 @@ class CompactionJob {
// deleted because that version is not visible in any snapshot.
std::vector<SequenceNumber> existing_snapshots_;

SequenceNumber earliest_snapshot_;

// This is the earliest snapshot that could be used for write-conflict
// checking by a transaction. For any user-key newer than this snapshot, we
// should make sure not to remove evidence that a write occurred.
Expand Down
79 changes: 79 additions & 0 deletions db/compaction/tiered_compaction_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "rocksdb/iostats_context.h"
#include "rocksdb/listener.h"
#include "rocksdb/utilities/debug.h"
#include "rocksdb/utilities/table_properties_collectors.h"
#include "test_util/mock_time_env.h"
#include "utilities/merge_operators.h"

Expand Down Expand Up @@ -1734,6 +1735,84 @@ TEST_P(TimedPutPrecludeLastLevelTest, PreserveTimedPutOnPenultimateLevel) {
Close();
}

TEST_P(TimedPutPrecludeLastLevelTest, AutoTriggerCompaction) {
const int kNumTrigger = 10;
const int kNumLevels = 7;
const int kNumKeys = 200;

Options options = CurrentOptions();
options.compaction_style = kCompactionStyleUniversal;
options.preclude_last_level_data_seconds = 60;
options.preserve_internal_time_seconds = 0;
options.env = mock_env_.get();
options.level0_file_num_compaction_trigger = kNumTrigger;
options.num_levels = kNumLevels;
options.last_level_temperature = Temperature::kCold;
ConfigOptions config_options;
config_options.ignore_unsupported_options = false;
std::shared_ptr<TablePropertiesCollectorFactory> factory;
std::string id = CompactForTieringCollectorFactory::kClassName();
ASSERT_OK(TablePropertiesCollectorFactory::CreateFromString(
config_options, "compaction_triggers=0:50; enabled=true; id=" + id,
&factory));
auto collector_factory =
factory->CheckedCast<CompactForTieringCollectorFactory>();
options.table_properties_collector_factories.push_back(factory);
DestroyAndReopen(options);
WriteOptions wo;
wo.protection_bytes_per_key = GetParam();

Random rnd(301);

dbfull()->TEST_WaitForPeriodicTaskRun([&] {
mock_clock_->MockSleepForSeconds(static_cast<int>(rnd.Uniform(10) + 1));
});

for (int i = 0; i < kNumKeys / 4; i++) {
ASSERT_OK(Put(Key(i), rnd.RandomString(100), wo));
dbfull()->TEST_WaitForPeriodicTaskRun([&] {
mock_clock_->MockSleepForSeconds(static_cast<int>(rnd.Uniform(2)));
});
}
// Create one file with regular Put.
ASSERT_OK(Flush());

// Create one file with TimedPut.
// These data are eligible to be put on the last level once written to db
// and compaction will fast track them to the last level.
for (int i = kNumKeys / 4; i < kNumKeys / 2; i++) {
ASSERT_OK(TimedPut(0, Key(i), rnd.RandomString(100), 50, wo));
}
ASSERT_OK(Flush());

// TimedPut file moved to the last level via auto triggered compaction.
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ("1,0,0,0,0,0,1", FilesPerLevel());
ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0);
ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0);

collector_factory->SetCompactionTrigger(0, 51);
for (int i = kNumKeys / 2; i < kNumKeys * 3 / 4; i++) {
ASSERT_OK(TimedPut(0, Key(i), rnd.RandomString(100), 50, wo));
}
ASSERT_OK(Flush());

ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ("2,0,0,0,0,0,1", FilesPerLevel());

collector_factory->SetCompactionTrigger(0, 50);
collector_factory->SetEnabled(false);
for (int i = kNumKeys * 3 / 4; i < kNumKeys; i++) {
ASSERT_OK(TimedPut(0, Key(i), rnd.RandomString(100), 50, wo));
}
ASSERT_OK(Flush());

ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ("3,0,0,0,0,0,1", FilesPerLevel());

Close();
}

INSTANTIATE_TEST_CASE_P(TimedPutPrecludeLastLevelTest,
TimedPutPrecludeLastLevelTest, ::testing::Values(0, 8));

Expand Down
28 changes: 16 additions & 12 deletions db/db_impl/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1670,6 +1670,8 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
SequenceNumber earliest_write_conflict_snapshot;
std::vector<SequenceNumber> snapshot_seqs =
snapshots_.GetAll(&earliest_write_conflict_snapshot);
SequenceNumber earliest_snapshot =
(snapshot_seqs.empty() ? kMaxSequenceNumber : snapshot_seqs.at(0));
auto snapshot_checker = snapshot_checker_.get();
if (use_custom_gc_ && snapshot_checker == nullptr) {
snapshot_checker = DisableGCSnapshotChecker::Instance();
Expand All @@ -1689,6 +1691,7 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
IOStatus io_s;
const ReadOptions read_option(Env::IOActivity::kDBOpen);
const WriteOptions write_option(Env::IO_HIGH, Env::IOActivity::kDBOpen);

TableBuilderOptions tboptions(
*cfd->ioptions(), mutable_cf_options, read_option, write_option,
cfd->internal_comparator(), cfd->internal_tbl_prop_coll_factories(),
Expand All @@ -1697,21 +1700,22 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
0 /* level */, false /* is_bottommost */,
TableFileCreationReason::kRecovery, 0 /* oldest_key_time */,
0 /* file_creation_time */, db_id_, db_session_id_,
0 /* target_file_size */, meta.fd.GetNumber());
0 /* target_file_size */, meta.fd.GetNumber(), kMaxSequenceNumber);
Version* version = cfd->current();
version->Ref();
uint64_t num_input_entries = 0;
s = BuildTable(
dbname_, versions_.get(), immutable_db_options_, tboptions,
file_options_for_compaction_, cfd->table_cache(), iter.get(),
std::move(range_del_iters), &meta, &blob_file_additions,
snapshot_seqs, earliest_write_conflict_snapshot, kMaxSequenceNumber,
snapshot_checker, paranoid_file_checks, cfd->internal_stats(), &io_s,
io_tracer_, BlobFileCreationReason::kRecovery,
nullptr /* seqno_to_time_mapping */, &event_logger_, job_id,
nullptr /* table_properties */, write_hint,
nullptr /*full_history_ts_low*/, &blob_callback_, version,
&num_input_entries);
s = BuildTable(dbname_, versions_.get(), immutable_db_options_, tboptions,
file_options_for_compaction_, cfd->table_cache(),
iter.get(), std::move(range_del_iters), &meta,
&blob_file_additions, snapshot_seqs, earliest_snapshot,
earliest_write_conflict_snapshot, kMaxSequenceNumber,
snapshot_checker, paranoid_file_checks,
cfd->internal_stats(), &io_s, io_tracer_,
BlobFileCreationReason::kRecovery,
nullptr /* seqno_to_time_mapping */, &event_logger_,
job_id, nullptr /* table_properties */, write_hint,
nullptr /*full_history_ts_low*/, &blob_callback_, version,
&num_input_entries);
version->Unref();
LogFlush(immutable_db_options_.info_log);
ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
Expand Down
Loading

0 comments on commit e91c951

Please sign in to comment.