Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v24.1.x] Fixed race condition in disk_log_impl::truncate_prefix #24284

Open
wants to merge 4 commits into
base: v24.1.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/v/storage/disk_log_appender.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ ss::future<> disk_log_appender::initialize() {
}

bool disk_log_appender::segment_is_appendable(model::term_id batch_term) const {
if (!_seg || !_seg->has_appender()) {
if (!_seg || !_seg->has_appender() || _seg->is_tombstone()) {
// The latest segment with which this log_appender has called
// initialize() has been rolled and no longer has an segment appender
// (e.g. because segment.ms rolled onto a new segment). There is likely
Expand Down
40 changes: 30 additions & 10 deletions src/v/storage/disk_log_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1605,6 +1605,12 @@ log_appender disk_log_impl::make_appender(log_append_config cfg) {
next_offset = model::offset(0);
}
}
vlog(
stlog.trace,
"creating log appender for: {}, next offset: {}, log offsets: {}",
config().ntp(),
next_offset,
ofs);
return log_appender(
std::make_unique<disk_log_appender>(*this, cfg, now, next_offset));
}
Expand Down Expand Up @@ -1713,7 +1719,7 @@ ss::future<> disk_log_impl::maybe_roll_unlocked(
co_return co_await new_segment(next_offset, t, iopc);
}
auto ptr = _segs.back();
if (!ptr->has_appender()) {
if (!ptr->has_appender() || ptr->is_tombstone()) {
co_return co_await new_segment(next_offset, t, iopc);
}
bool size_should_roll = false;
Expand Down Expand Up @@ -2483,12 +2489,6 @@ ss::future<> disk_log_impl::remove_segment_permanently(
_probe->delete_segment(*s);
// background close
s->tombstone();
if (s->has_outstanding_locks()) {
vlog(
stlog.info,
"Segment has outstanding locks. Might take a while to close:{}",
s->reader().filename());
}

return _readers_cache->evict_segment_readers(s)
.then([s](readers_cache::range_lock_holder cache_lock) {
Expand Down Expand Up @@ -2529,9 +2529,29 @@ disk_log_impl::remove_prefix_full_segments(truncate_prefix_config cfg) {
},
[this] {
auto ptr = _segs.front();
_segs.pop_front();
_probe->add_bytes_prefix_truncated(ptr->file_size());
return remove_segment_permanently(ptr, "remove_prefix_full_segments");
// evict readers before trying to grab a write lock to prevent
// contention
return _readers_cache->evict_segment_readers(ptr).then(
[this, ptr](readers_cache::range_lock_holder cache_lock) {
/**
* If segment has outstanding locks wait for it to be unlocked
* before prefixing truncating it, this way the prefix
* truncation will not overlap with appends.
*/
return ptr->write_lock().then(
[this, ptr, cache_lock = std::move(cache_lock)](
ss::rwlock::holder lock_holder) {
_segs.pop_front();
_probe->add_bytes_prefix_truncated(ptr->file_size());
// first call the remove segments, then release the lock
// before waiting for future to finish
auto f = remove_segment_permanently(
ptr, "remove_prefix_full_segments");
lock_holder.return_all();

return f;
});
});
});
}

Expand Down
113 changes: 110 additions & 3 deletions src/v/storage/tests/storage_e2e_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
#include <fmt/chrono.h>

#include <algorithm>
#include <chrono>
#include <iterator>
#include <numeric>
#include <optional>
Expand Down Expand Up @@ -886,7 +887,8 @@ ss::future<storage::append_result> append_exactly(
ss::shared_ptr<storage::log> log,
size_t batch_count,
size_t batch_sz,
std::optional<bytes> key = std::nullopt) {
std::optional<bytes> key = std::nullopt,
model::record_batch_type batch_type = model::record_batch_type::raft_data) {
vassert(
batch_sz > model::packed_record_batch_header_size,
"Batch size must be greater than {}, requested {}",
Expand Down Expand Up @@ -924,8 +926,7 @@ ss::future<storage::append_result> append_exactly(
val_sz -= real_batch_size;

for (int i = 0; i < batch_count; ++i) {
storage::record_batch_builder builder(
model::record_batch_type::raft_data, model::offset{});
storage::record_batch_builder builder(batch_type, model::offset{});
iobuf value = bytes_to_iobuf(random_generators::get_bytes(val_sz));
builder.add_raw_kv(key_buf.copy(), std::move(value));

Expand Down Expand Up @@ -1006,6 +1007,112 @@ FIXTURE_TEST(write_concurrently_with_gc, storage_test_fixture) {
model::offset(9 + appends * batches_per_append));
};

/**
* This test executes operations which may be executed by Raft layer without
* synchronization. i.e. appends, reads, flushes and prefix truncations. The
* test validates if the offsets are correctly assigned i.e. if any batch did
* not get the same offset assigned twice.
*/
FIXTURE_TEST(append_concurrent_with_prefix_truncate, storage_test_fixture) {
auto cfg = default_log_config(test_dir);

ss::abort_source as;
storage::log_manager mgr = make_log_manager(cfg);
model::offset last_append_base_offset{};
model::offset last_append_end_offset{};
auto deferred = ss::defer([&mgr]() mutable { mgr.stop().get(); });
auto ntp = model::controller_ntp;

storage::ntp_config ntp_cfg(ntp, mgr.config().base_dir);
auto log = mgr.manage(std::move(ntp_cfg)).get0();

bool stop = false;
int cnt = 0;
std::vector<model::record_batch_type> types{
model::record_batch_type::raft_data,
model::record_batch_type::raft_configuration};
#ifndef NDEBUG
static constexpr size_t stop_after = 10;
#else
static constexpr size_t stop_after = 200;
#endif
auto append = [&] {
return append_exactly(
log,
1,
random_generators::get_int(75, 237),
std::nullopt,
random_generators::random_choice(types))
.then([&](storage::append_result result) {
info("append result: {}", result);
vassert(
result.base_offset > last_append_base_offset,
"Invalid append result base offset: {}. The same base offset "
"was already assigned.",
last_append_base_offset,
result.base_offset);
vassert(
result.last_offset > last_append_end_offset,
"Invalid append result last offset: {}. The same last offset "
"was already assigned.",
last_append_end_offset,
result.last_offset);
last_append_base_offset = result.base_offset;
last_append_end_offset = result.last_offset;
cnt++;
if (cnt >= stop_after) {
stop = true;
}
return ss::sleep(
std::chrono::milliseconds(random_generators::get_int(10, 100)));
});
};

auto flush = [&] { return log->flush().discard_result(); };

auto read = [&] {
auto lstats = log->offsets();
return log
->make_reader(storage::log_reader_config(
lstats.start_offset,
model::offset::max(),
ss::default_priority_class()))
.then([](auto reader) {
return ss::sleep(std::chrono::milliseconds(
random_generators::get_int(15, 30)))
.then([r = std::move(reader)]() mutable {
return model::consume_reader_to_memory(
std::move(r), model::no_timeout);
})
.discard_result();
});
};

auto prefix_truncate = [&] {
auto offset = model::next_offset(log->offsets().dirty_offset);

return log
->truncate_prefix(storage::truncate_prefix_config(
offset, ss::default_priority_class()))
.then([offset] {
info("prefix truncate at: {}", offset);
return ss::sleep(
std::chrono::milliseconds(random_generators::get_int(5, 20)));
});
};
/**
* Execute all operations concurrently
*/
auto f_1 = ss::do_until([&] { return stop; }, [&] { return append(); });
auto f_2 = ss::do_until(
[&] { return stop; }, [&] { return prefix_truncate(); });
auto f_3 = ss::do_until([&] { return stop; }, [&] { return read(); });
auto f_4 = ss::do_until([&] { return stop; }, [&] { return flush(); });

ss::when_all(std::move(f_1), std::move(f_2), std::move(f_3), std::move(f_4))
.get();
};

FIXTURE_TEST(empty_segment_recovery, storage_test_fixture) {
auto cfg = default_log_config(test_dir);
auto ntp = model::ntp("default", "test", 0);
Expand Down
Loading