diff --git a/src/v/storage/disk_log_appender.cc b/src/v/storage/disk_log_appender.cc index 98f133c3dd62..dd736e9588bf 100644 --- a/src/v/storage/disk_log_appender.cc +++ b/src/v/storage/disk_log_appender.cc @@ -50,7 +50,7 @@ ss::future<> disk_log_appender::initialize() { } bool disk_log_appender::segment_is_appendable(model::term_id batch_term) const { - if (!_seg || !_seg->has_appender()) { + if (!_seg || !_seg->has_appender() || _seg->is_tombstone()) { // The latest segment with which this log_appender has called // initialize() has been rolled and no longer has an segment appender // (e.g. because segment.ms rolled onto a new segment). There is likely diff --git a/src/v/storage/disk_log_impl.cc b/src/v/storage/disk_log_impl.cc index 5f3a3887640d..da822ea9a24d 100644 --- a/src/v/storage/disk_log_impl.cc +++ b/src/v/storage/disk_log_impl.cc @@ -1605,6 +1605,12 @@ log_appender disk_log_impl::make_appender(log_append_config cfg) { next_offset = model::offset(0); } } + vlog( + stlog.trace, + "creating log appender for: {}, next offset: {}, log offsets: {}", + config().ntp(), + next_offset, + ofs); return log_appender( std::make_unique(*this, cfg, now, next_offset)); } @@ -1713,7 +1719,7 @@ ss::future<> disk_log_impl::maybe_roll_unlocked( co_return co_await new_segment(next_offset, t, iopc); } auto ptr = _segs.back(); - if (!ptr->has_appender()) { + if (!ptr->has_appender() || ptr->is_tombstone()) { co_return co_await new_segment(next_offset, t, iopc); } bool size_should_roll = false; @@ -2483,12 +2489,6 @@ ss::future<> disk_log_impl::remove_segment_permanently( _probe->delete_segment(*s); // background close s->tombstone(); - if (s->has_outstanding_locks()) { - vlog( - stlog.info, - "Segment has outstanding locks. Might take a while to close:{}", - s->reader().filename()); - } return _readers_cache->evict_segment_readers(s) .then([s](readers_cache::range_lock_holder cache_lock) { @@ -2529,9 +2529,29 @@ disk_log_impl::remove_prefix_full_segments(truncate_prefix_config cfg) { }, [this] { auto ptr = _segs.front(); - _segs.pop_front(); - _probe->add_bytes_prefix_truncated(ptr->file_size()); - return remove_segment_permanently(ptr, "remove_prefix_full_segments"); + // evict readers before trying to grab a write lock to prevent + // contention + return _readers_cache->evict_segment_readers(ptr).then( + [this, ptr](readers_cache::range_lock_holder cache_lock) { + /** + * If segment has outstanding locks wait for it to be unlocked + * before prefixing truncating it, this way the prefix + * truncation will not overlap with appends. + */ + return ptr->write_lock().then( + [this, ptr, cache_lock = std::move(cache_lock)]( + ss::rwlock::holder lock_holder) { + _segs.pop_front(); + _probe->add_bytes_prefix_truncated(ptr->file_size()); + // first call the remove segments, then release the lock + // before waiting for future to finish + auto f = remove_segment_permanently( + ptr, "remove_prefix_full_segments"); + lock_holder.return_all(); + + return f; + }); + }); }); } diff --git a/src/v/storage/tests/storage_e2e_test.cc b/src/v/storage/tests/storage_e2e_test.cc index e4aef420752f..a1cd08f82402 100644 --- a/src/v/storage/tests/storage_e2e_test.cc +++ b/src/v/storage/tests/storage_e2e_test.cc @@ -50,6 +50,7 @@ #include #include +#include #include #include #include @@ -886,7 +887,8 @@ ss::future append_exactly( ss::shared_ptr log, size_t batch_count, size_t batch_sz, - std::optional key = std::nullopt) { + std::optional key = std::nullopt, + model::record_batch_type batch_type = model::record_batch_type::raft_data) { vassert( batch_sz > model::packed_record_batch_header_size, "Batch size must be greater than {}, requested {}", @@ -924,8 +926,7 @@ ss::future append_exactly( val_sz -= real_batch_size; for (int i = 0; i < batch_count; ++i) { - storage::record_batch_builder builder( - model::record_batch_type::raft_data, model::offset{}); + storage::record_batch_builder builder(batch_type, model::offset{}); iobuf value = bytes_to_iobuf(random_generators::get_bytes(val_sz)); builder.add_raw_kv(key_buf.copy(), std::move(value)); @@ -1006,6 +1007,112 @@ FIXTURE_TEST(write_concurrently_with_gc, storage_test_fixture) { model::offset(9 + appends * batches_per_append)); }; +/** + * This test executes operations which may be executed by Raft layer without + * synchronization. i.e. appends, reads, flushes and prefix truncations. The + * test validates if the offsets are correctly assigned i.e. if any batch did + * not get the same offset assigned twice. + */ +FIXTURE_TEST(append_concurrent_with_prefix_truncate, storage_test_fixture) { + auto cfg = default_log_config(test_dir); + + ss::abort_source as; + storage::log_manager mgr = make_log_manager(cfg); + model::offset last_append_base_offset{}; + model::offset last_append_end_offset{}; + auto deferred = ss::defer([&mgr]() mutable { mgr.stop().get(); }); + auto ntp = model::controller_ntp; + + storage::ntp_config ntp_cfg(ntp, mgr.config().base_dir); + auto log = mgr.manage(std::move(ntp_cfg)).get0(); + + bool stop = false; + int cnt = 0; + std::vector types{ + model::record_batch_type::raft_data, + model::record_batch_type::raft_configuration}; +#ifndef NDEBUG + static constexpr size_t stop_after = 10; +#else + static constexpr size_t stop_after = 200; +#endif + auto append = [&] { + return append_exactly( + log, + 1, + random_generators::get_int(75, 237), + std::nullopt, + random_generators::random_choice(types)) + .then([&](storage::append_result result) { + info("append result: {}", result); + vassert( + result.base_offset > last_append_base_offset, + "Invalid append result base offset: {}. The same base offset " + "was already assigned.", + last_append_base_offset, + result.base_offset); + vassert( + result.last_offset > last_append_end_offset, + "Invalid append result last offset: {}. The same last offset " + "was already assigned.", + last_append_end_offset, + result.last_offset); + last_append_base_offset = result.base_offset; + last_append_end_offset = result.last_offset; + cnt++; + if (cnt >= stop_after) { + stop = true; + } + return ss::sleep( + std::chrono::milliseconds(random_generators::get_int(10, 100))); + }); + }; + + auto flush = [&] { return log->flush().discard_result(); }; + + auto read = [&] { + auto lstats = log->offsets(); + return log + ->make_reader(storage::log_reader_config( + lstats.start_offset, + model::offset::max(), + ss::default_priority_class())) + .then([](auto reader) { + return ss::sleep(std::chrono::milliseconds( + random_generators::get_int(15, 30))) + .then([r = std::move(reader)]() mutable { + return model::consume_reader_to_memory( + std::move(r), model::no_timeout); + }) + .discard_result(); + }); + }; + + auto prefix_truncate = [&] { + auto offset = model::next_offset(log->offsets().dirty_offset); + + return log + ->truncate_prefix(storage::truncate_prefix_config( + offset, ss::default_priority_class())) + .then([offset] { + info("prefix truncate at: {}", offset); + return ss::sleep( + std::chrono::milliseconds(random_generators::get_int(5, 20))); + }); + }; + /** + * Execute all operations concurrently + */ + auto f_1 = ss::do_until([&] { return stop; }, [&] { return append(); }); + auto f_2 = ss::do_until( + [&] { return stop; }, [&] { return prefix_truncate(); }); + auto f_3 = ss::do_until([&] { return stop; }, [&] { return read(); }); + auto f_4 = ss::do_until([&] { return stop; }, [&] { return flush(); }); + + ss::when_all(std::move(f_1), std::move(f_2), std::move(f_3), std::move(f_4)) + .get(); +}; + FIXTURE_TEST(empty_segment_recovery, storage_test_fixture) { auto cfg = default_log_config(test_dir); auto ntp = model::ntp("default", "test", 0);