Skip to content

Commit

Permalink
Some tiered storage refactorings setting up more work (facebook#13230)
Browse files Browse the repository at this point in the history
Summary:
* Simplify some testing callbacks for tiered_compaction_test ahead of some significant functional updates.
* Refactor CompactionJob::Prepare() for sharing with CompactionServiceCompactionJob. This is a minor functional change in computing preserve/preclude sequence numbers for remote compaction, but it is a start toward support for tiered storage with remote compaction. A test is added that is only partly working but does check that outputs are being split (just not to the correct levels).

Pull Request resolved: facebook#13230

Test Plan: mostly test changes and additions. Arguably makes tiered storage + remote compaction MORE broken as a step toward supporting it.

Reviewed By: jaykorean

Differential Revision: D67493682

Pulled By: pdillinger

fbshipit-source-id: fd6db74e08ef0e4fc7fdd599ff8555aab0c8ddc4
  • Loading branch information
pdillinger authored and facebook-github-bot committed Dec 20, 2024
1 parent cf768a2 commit 54b614d
Show file tree
Hide file tree
Showing 9 changed files with 173 additions and 51 deletions.
3 changes: 1 addition & 2 deletions db/compaction/compaction_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ CompactionIterator::CompactionIterator(
report_detailed_time, expect_valid_internal_key, range_del_agg,
blob_file_builder, allow_data_in_errors, enforce_single_del_contracts,
manual_compaction_canceled,
std::unique_ptr<CompactionProxy>(
compaction ? new RealCompaction(compaction) : nullptr),
compaction ? std::make_unique<RealCompaction>(compaction) : nullptr,
must_count_input_entries, compaction_filter, shutting_down, info_log,
full_history_ts_low, preserve_time_min_seqno,
preclude_last_level_min_seqno) {}
Expand Down
30 changes: 25 additions & 5 deletions db/compaction/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,10 @@ void CompactionJob::ReportStartedCompaction(Compaction* compaction) {
compaction_job_stats_->is_full_compaction = compaction->is_full_compaction();
}

void CompactionJob::Prepare() {
void CompactionJob::Prepare(
std::optional<std::pair<std::optional<Slice>, std::optional<Slice>>>
known_single_subcompact) {
db_mutex_->AssertHeld();
AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_COMPACTION_PREPARE);

Expand All @@ -260,11 +263,12 @@ void CompactionJob::Prepare() {
write_hint_ = storage_info->CalculateSSTWriteHint(c->output_level());
bottommost_level_ = c->bottommost_level();

if (c->ShouldFormSubcompactions()) {
if (!known_single_subcompact.has_value() && c->ShouldFormSubcompactions()) {
StopWatch sw(db_options_.clock, stats_, SUBCOMPACTION_SETUP_TIME);
GenSubcompactionBoundaries();
}
if (boundaries_.size() >= 1) {
assert(!known_single_subcompact.has_value());
for (size_t i = 0; i <= boundaries_.size(); i++) {
compact_->sub_compact_states.emplace_back(
c, (i != 0) ? std::optional<Slice>(boundaries_[i - 1]) : std::nullopt,
Expand All @@ -280,13 +284,20 @@ void CompactionJob::Prepare() {
RecordInHistogram(stats_, NUM_SUBCOMPACTIONS_SCHEDULED,
compact_->sub_compact_states.size());
} else {
compact_->sub_compact_states.emplace_back(c, std::nullopt, std::nullopt,
std::optional<Slice> start_key;
std::optional<Slice> end_key;
if (known_single_subcompact.has_value()) {
start_key = known_single_subcompact.value().first;
end_key = known_single_subcompact.value().second;
} else {
assert(!start_key.has_value() && !end_key.has_value());
}
compact_->sub_compact_states.emplace_back(c, start_key, end_key,
/*sub_job_id*/ 0);
}

// collect all seqno->time information from the input files which will be used
// to encode seqno->time to the output files.

uint64_t preserve_time_duration =
std::max(c->mutable_cf_options()->preserve_internal_time_seconds,
c->mutable_cf_options()->preclude_last_level_data_seconds);
Expand Down Expand Up @@ -341,6 +352,16 @@ void CompactionJob::Prepare() {
// larger than kMaxSeqnoTimePairsPerSST.
seqno_to_time_mapping_.SetCapacity(kMaxSeqnoToTimeEntries);
}
#ifndef NDEBUG
assert(preserve_time_min_seqno_ <= preclude_last_level_min_seqno_);
TEST_SYNC_POINT_CALLBACK(
"CompactionJob::PrepareTimes():preclude_last_level_min_seqno",
static_cast<void*>(&preclude_last_level_min_seqno_));
// Restore the invariant asserted above, in case it was broken under the
// callback
preserve_time_min_seqno_ =
std::min(preclude_last_level_min_seqno_, preserve_time_min_seqno_);
#endif
}

uint64_t CompactionJob::GetSubcompactionsLimit() {
Expand Down Expand Up @@ -2118,7 +2139,6 @@ void CompactionJob::UpdateCompactionJobStats(
void CompactionJob::LogCompaction() {
Compaction* compaction = compact_->compaction;
ColumnFamilyData* cfd = compaction->column_family_data();

// Let's check if anything will get logged. Don't prepare all the info if
// we're not logging
if (db_options_.info_log_level <= InfoLogLevel::INFO_LEVEL) {
Expand Down
12 changes: 11 additions & 1 deletion db/compaction/compaction_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,13 @@ class CompactionJob {

// REQUIRED: mutex held
// Prepare for the compaction by setting up boundaries for each subcompaction
void Prepare();
// and organizing seqno <-> time info. `known_single_subcompact` is non-null
// if we already have a known single subcompaction, with optional key bounds
// (currently for executing a remote compaction).
void Prepare(
std::optional<std::pair<std::optional<Slice>, std::optional<Slice>>>
known_single_subcompact);

// REQUIRED mutex not held
// Launch threads for each subcompaction and wait for them to finish. After
// that, verify table is usable and finally do bookkeeping to unify
Expand Down Expand Up @@ -496,6 +502,10 @@ class CompactionServiceCompactionJob : private CompactionJob {
const CompactionServiceInput& compaction_service_input,
CompactionServiceResult* compaction_service_result);

// REQUIRED: mutex held
// Like CompactionJob::Prepare()
void Prepare();

// Run the compaction in current thread and return the result
Status Run();

Expand Down
2 changes: 1 addition & 1 deletion db/compaction/compaction_job_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,7 @@ class CompactionJobTestBase : public testing::Test {
full_history_ts_low_);
VerifyInitializationOfCompactionJobStats(compaction_job_stats_);

compaction_job.Prepare();
compaction_job.Prepare(std::nullopt /*subcompact to be computed*/);
mutex_.Unlock();
Status s = compaction_job.Run();
ASSERT_OK(s);
Expand Down
33 changes: 16 additions & 17 deletions db/compaction/compaction_service_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,10 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService(

FileMetaData meta;
uint64_t file_size;
// FIXME: file_size should be part of CompactionServiceOutputFile so that
// we don't get DB corruption if the full file size has not been propagated
// back to the caller through the file system (which could have metadata
// lag or caching bugs).
s = fs_->GetFileSize(tgt_file, IOOptions(), &file_size, nullptr);
if (!s.ok()) {
sub_compact->status = s;
Expand Down Expand Up @@ -280,28 +284,23 @@ CompactionServiceCompactionJob::CompactionServiceCompactionJob(
compaction_input_(compaction_service_input),
compaction_result_(compaction_service_result) {}

void CompactionServiceCompactionJob::Prepare() {
std::optional<Slice> begin;
if (compaction_input_.has_begin) {
begin = compaction_input_.begin;
}
std::optional<Slice> end;
if (compaction_input_.has_end) {
end = compaction_input_.end;
}
CompactionJob::Prepare(std::make_pair(begin, end));
}

Status CompactionServiceCompactionJob::Run() {
AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_COMPACTION_RUN);

auto* c = compact_->compaction;
assert(c->column_family_data() != nullptr);
const VersionStorageInfo* storage_info = c->input_version()->storage_info();
assert(storage_info);
assert(storage_info->NumLevelFiles(compact_->compaction->level()) > 0);
write_hint_ = storage_info->CalculateSSTWriteHint(c->output_level());

bottommost_level_ = c->bottommost_level();

Slice begin = compaction_input_.begin;
Slice end = compaction_input_.end;
compact_->sub_compact_states.emplace_back(
c,
compaction_input_.has_begin ? std::optional<Slice>(begin)
: std::optional<Slice>(),
compaction_input_.has_end ? std::optional<Slice>(end)
: std::optional<Slice>(),
/*sub_job_id*/ 0);

log_buffer_->FlushBufferToLog();
LogCompaction();
Expand Down
57 changes: 57 additions & 0 deletions db/compaction/compaction_service_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1118,6 +1118,63 @@ TEST_F(CompactionServiceTest, Snapshot) {
db_->ReleaseSnapshot(s1);
}

TEST_F(CompactionServiceTest, PrecludeLastLevel) {
const int kNumTrigger = 4;
const int kNumLevels = 7;
const int kNumKeys = 100;

Options options = CurrentOptions();
options.compaction_style = kCompactionStyleUniversal;
options.last_level_temperature = Temperature::kCold;
options.level0_file_num_compaction_trigger = 4;
options.max_subcompactions = 10;
options.num_levels = kNumLevels;

ReopenWithCompactionService(&options);
// Alternate for comparison: DestroyAndReopen(options);

// This is simpler than setting up mock time to make the user option work,
// but is not as direct as testing with preclude option itself.
SyncPoint::GetInstance()->SetCallBack(
"Compaction::SupportsPerKeyPlacement:Enabled",
[&](void* arg) { *static_cast<bool*>(arg) = true; });
SyncPoint::GetInstance()->SetCallBack(
"CompactionJob::PrepareTimes():preclude_last_level_min_seqno",
[&](void* arg) { *static_cast<SequenceNumber*>(arg) = 100; });
SyncPoint::GetInstance()->EnableProcessing();

for (int i = 0; i < kNumTrigger; i++) {
for (int j = 0; j < kNumKeys; j++) {
// FIXME: need to assign outputs to levels to allow overlapping ranges:
// ASSERT_OK(Put(Key(j * kNumTrigger + i), "v" + std::to_string(i)));
// instead of this (too easy):
ASSERT_OK(Put(Key(i * kNumKeys + j), "v" + std::to_string(i)));
}
ASSERT_OK(Flush());
}
ASSERT_OK(dbfull()->TEST_WaitForCompact());

// Data split between penultimate (kUnknown) and last (kCold) levels
// FIXME: need to assign outputs to levels to get this:
// ASSERT_EQ("0,0,0,0,0,1,1", FilesPerLevel());
// ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0);
// ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0);
// instead of this (WRONG but currently expected):
ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel());
// Check manifest temperatures
ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0);
ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0);
// TODO: Check FileSystem temperatures with FileTemperatureTestFS

for (int i = 0; i < kNumTrigger; i++) {
for (int j = 0; j < kNumKeys; j++) {
// FIXME
// ASSERT_EQ(Get(Key(j * kNumTrigger + i)), "v" + std::to_string(i));
ASSERT_EQ(Get(Key(i * kNumKeys + j)), "v" + std::to_string(i));
}
}
}

TEST_F(CompactionServiceTest, ConcurrentCompaction) {
Options options = CurrentOptions();
options.level0_file_num_compaction_trigger = 100;
Expand Down
81 changes: 58 additions & 23 deletions db/compaction/tiered_compaction_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,41 @@ class TieredCompactionTest : public DBTestBase {
}
};

TEST_F(TieredCompactionTest, BlahPrecludeLastLevel) {
const int kNumTrigger = 4;
const int kNumLevels = 7;
const int kNumKeys = 100;

Options options = CurrentOptions();
options.compaction_style = kCompactionStyleUniversal;
options.last_level_temperature = Temperature::kCold;
options.level0_file_num_compaction_trigger = 4;
options.max_subcompactions = 10;
options.num_levels = kNumLevels;
DestroyAndReopen(options);
// ReopenWithCompactionService(&options);

// This is simpler than setting up mock time to make the user option work.
SyncPoint::GetInstance()->SetCallBack(
"CompactionJob::PrepareTimes():preclude_last_level_min_seqno",
[&](void* arg) { *static_cast<SequenceNumber*>(arg) = 100; });
SyncPoint::GetInstance()->EnableProcessing();

for (int i = 0; i < kNumTrigger - 1; i++) {
for (int j = 0; j < kNumKeys; j++) {
ASSERT_OK(Put(Key(i * 10 + j), "value" + std::to_string(i)));
}
ASSERT_OK(Flush());
}
// ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));

// Data split between penultimate (kUnknown) and last (kCold) levels
ASSERT_EQ("0,0,0,0,0,1,1", FilesPerLevel());
ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0);
ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0);
}

TEST_F(TieredCompactionTest, SequenceBasedTieredStorageUniversal) {
const int kNumTrigger = 4;
const int kNumLevels = 7;
Expand All @@ -195,10 +230,9 @@ TEST_F(TieredCompactionTest, SequenceBasedTieredStorageUniversal) {
std::vector<SequenceNumber> seq_history;

SyncPoint::GetInstance()->SetCallBack(
"CompactionIterator::PrepareOutput.context", [&](void* arg) {
auto context = static_cast<PerKeyPlacementContext*>(arg);
context->output_to_penultimate_level =
context->seq_num > latest_cold_seq;
"CompactionJob::PrepareTimes():preclude_last_level_min_seqno",
[&](void* arg) {
*static_cast<SequenceNumber*>(arg) = latest_cold_seq.load();
});
SyncPoint::GetInstance()->EnableProcessing();

Expand Down Expand Up @@ -527,13 +561,13 @@ TEST_F(TieredCompactionTest, LevelColdRangeDelete) {
options.max_subcompactions = 10;
DestroyAndReopen(options);

std::atomic_uint64_t latest_cold_seq = 0;
// Initially let everything into cold
std::atomic_uint64_t latest_cold_seq = kMaxSequenceNumber;

SyncPoint::GetInstance()->SetCallBack(
"CompactionIterator::PrepareOutput.context", [&](void* arg) {
auto context = static_cast<PerKeyPlacementContext*>(arg);
context->output_to_penultimate_level =
context->seq_num > latest_cold_seq;
"CompactionJob::PrepareTimes():preclude_last_level_min_seqno",
[&](void* arg) {
*static_cast<SequenceNumber*>(arg) = latest_cold_seq.load();
});
SyncPoint::GetInstance()->EnableProcessing();

Expand Down Expand Up @@ -634,13 +668,13 @@ TEST_F(TieredCompactionTest, LevelOutofBoundaryRangeDelete) {
options.max_subcompactions = 10;
DestroyAndReopen(options);

std::atomic_uint64_t latest_cold_seq = 0;
// Initially let everything into cold
std::atomic_uint64_t latest_cold_seq = kMaxSequenceNumber;

SyncPoint::GetInstance()->SetCallBack(
"CompactionIterator::PrepareOutput.context", [&](void* arg) {
auto context = static_cast<PerKeyPlacementContext*>(arg);
context->output_to_penultimate_level =
context->seq_num > latest_cold_seq;
"CompactionJob::PrepareTimes():preclude_last_level_min_seqno",
[&](void* arg) {
*static_cast<SequenceNumber*>(arg) = latest_cold_seq.load();
});
SyncPoint::GetInstance()->EnableProcessing();

Expand All @@ -654,6 +688,8 @@ TEST_F(TieredCompactionTest, LevelOutofBoundaryRangeDelete) {
ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0);
ASSERT_EQ("0,0,10", FilesPerLevel());

// Stop admitting to cold tier
latest_cold_seq = dbfull()->GetLatestSequenceNumber();
auto snap = db_->GetSnapshot();

// only range delete
Expand Down Expand Up @@ -767,10 +803,9 @@ TEST_F(TieredCompactionTest, UniversalRangeDelete) {
std::atomic_uint64_t latest_cold_seq = 0;

SyncPoint::GetInstance()->SetCallBack(
"CompactionIterator::PrepareOutput.context", [&](void* arg) {
auto context = static_cast<PerKeyPlacementContext*>(arg);
context->output_to_penultimate_level =
context->seq_num > latest_cold_seq;
"CompactionJob::PrepareTimes():preclude_last_level_min_seqno",
[&](void* arg) {
*static_cast<SequenceNumber*>(arg) = latest_cold_seq.load();
});
SyncPoint::GetInstance()->EnableProcessing();

Expand Down Expand Up @@ -894,14 +929,13 @@ TEST_F(TieredCompactionTest, SequenceBasedTieredStorageLevel) {
options.max_subcompactions = 10;
DestroyAndReopen(options);

std::atomic_uint64_t latest_cold_seq = 0;
std::atomic_uint64_t latest_cold_seq = kMaxSequenceNumber;
std::vector<SequenceNumber> seq_history;

SyncPoint::GetInstance()->SetCallBack(
"CompactionIterator::PrepareOutput.context", [&](void* arg) {
auto context = static_cast<PerKeyPlacementContext*>(arg);
context->output_to_penultimate_level =
context->seq_num > latest_cold_seq;
"CompactionJob::PrepareTimes():preclude_last_level_min_seqno",
[&](void* arg) {
*static_cast<SequenceNumber*>(arg) = latest_cold_seq.load();
});
SyncPoint::GetInstance()->EnableProcessing();

Expand Down Expand Up @@ -952,6 +986,7 @@ TEST_F(TieredCompactionTest, SequenceBasedTieredStorageLevel) {
VerifyCompactionStats(expect_stats, expect_pl_stats);

// Add new data, which is all hot and overriding all existing data
latest_cold_seq = dbfull()->GetLatestSequenceNumber();
for (int i = 0; i < kNumTrigger; i++) {
for (int j = 0; j < kNumKeys; j++) {
ASSERT_OK(Put(Key(i * 10 + j), "value" + std::to_string(i)));
Expand Down
4 changes: 2 additions & 2 deletions db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1554,7 +1554,7 @@ Status DBImpl::CompactFilesImpl(
version->storage_info()->ComputeCompactionScore(*cfd->ioptions(),
*c->mutable_cf_options());

compaction_job.Prepare();
compaction_job.Prepare(std::nullopt /*subcompact to be computed*/);

std::unique_ptr<std::list<uint64_t>::iterator> min_options_file_number_elem;
if (immutable_db_options().compaction_service != nullptr) {
Expand Down Expand Up @@ -3936,7 +3936,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
db_id_, db_session_id_, c->column_family_data()->GetFullHistoryTsLow(),
c->trim_ts(), &blob_callback_, &bg_compaction_scheduled_,
&bg_bottom_compaction_scheduled_);
compaction_job.Prepare();
compaction_job.Prepare(std::nullopt /*subcompact to be computed*/);

std::unique_ptr<std::list<uint64_t>::iterator> min_options_file_number_elem;
if (immutable_db_options().compaction_service != nullptr) {
Expand Down
Loading

0 comments on commit 54b614d

Please sign in to comment.