From feb1606d6e89cb8467fb6a8d662ca7de2c2fb82f Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Fri, 15 Mar 2024 09:13:09 +0100 Subject: [PATCH 1/2] r/stm_manager: fixed not applying snapshot for new stms State machine manager manages the aggregated Raft snapshot for all the state machines created based on one Raft instance. The managed snapshot is a map containing individual snapshot data for each state machine. When a new STM is created while the managed snapshot was already taken the `state_machine_base::apply_raft_snapshot` should still be called even if the snapshot doesn't exists in the managed snapshot map. This way an STM will be informed that the log doesn't start from 0. Previously when snapshot was not present in the managed snapshot map we skipped calling `apply_snapshot` on the STM and didn't advance it's `_next` offset which lead to stuck background apply fiber loop. Signed-off-by: Michal Maslanka (cherry picked from commit bb3b63eed8eabf6f79f463a9b13164d948919b95) --- src/v/raft/state_machine_manager.cc | 48 ++++++----- src/v/raft/state_machine_manager.h | 5 ++ src/v/raft/tests/persisted_stm_test.cc | 110 ++++++++++++++++++++++++- 3 files changed, 140 insertions(+), 23 deletions(-) diff --git a/src/v/raft/state_machine_manager.cc b/src/v/raft/state_machine_manager.cc index 913caef1be86d..989f78a501b63 100644 --- a/src/v/raft/state_machine_manager.cc +++ b/src/v/raft/state_machine_manager.cc @@ -251,11 +251,11 @@ ss::future<> state_machine_manager::do_apply_raft_snapshot( _log.debug, "applying empty snapshot at offset: {} for backward " "compatibility", - metadata.last_included_index); + last_offset); co_await ss::coroutine::parallel_for_each( - _machines, [metadata, last_offset](auto& pair) { + _machines, [last_offset](auto& pair) { auto stm = pair.second->stm; - if (stm->last_applied_offset() >= metadata.last_included_index) { + if (stm->last_applied_offset() >= last_offset) { return ss::now(); } return stm->apply_raft_snapshot(iobuf{}).then([stm, last_offset] { @@ -269,27 +269,37 @@ ss::future<> state_machine_manager::do_apply_raft_snapshot( auto snap = co_await serde::read_async(parser); co_await ss::coroutine::parallel_for_each( - snap.snapshot_map, - [this, metadata, last_offset](auto& snapshot_pair) { - auto it = _machines.find(snapshot_pair.first); - if ( - it == _machines.end() - || it->second->stm->last_applied_offset() - >= metadata.last_included_index) { - return ss::now(); - } - - return it->second->stm - ->apply_raft_snapshot(std::move(snapshot_pair.second)) - .then([stm = it->second->stm, last_offset] { - stm->set_next( - std::max(model::next_offset(last_offset), stm->next())); - }); + _machines, + [this, snap = std::move(snap), last_offset]( + state_machines_t::value_type& stm_pair) mutable { + return apply_snapshot_to_stm(stm_pair.second, snap, last_offset); }); } _next = model::next_offset(metadata.last_included_index); } +ss::future<> state_machine_manager::apply_snapshot_to_stm( + ss::lw_shared_ptr stm_entry, + const managed_snapshot& snapshot, + model::offset last_offset) { + auto it = snapshot.snapshot_map.find(stm_entry->name); + + if (stm_entry->stm->last_applied_offset() < last_offset) { + if (it != snapshot.snapshot_map.end()) { + co_await stm_entry->stm->apply_raft_snapshot(it->second); + } else { + /** + * In order to hold the stm contract we need to call the + * apply_raft_snapshot with empty data + */ + co_await stm_entry->stm->apply_raft_snapshot(iobuf{}); + } + } + + stm_entry->stm->set_next( + std::max(model::next_offset(last_offset), stm_entry->stm->next())); +} + ss::future<> state_machine_manager::apply() { try { ss::coroutine::switch_to sg_sw(_apply_sg); diff --git a/src/v/raft/state_machine_manager.h b/src/v/raft/state_machine_manager.h index a028867dd4190..26456a6782fee 100644 --- a/src/v/raft/state_machine_manager.h +++ b/src/v/raft/state_machine_manager.h @@ -173,6 +173,11 @@ class state_machine_manager final { auto serde_fields() { return std::tie(snapshot_map); } }; + ss::future<> apply_snapshot_to_stm( + ss::lw_shared_ptr stm_entry, + const managed_snapshot& snapshot, + model::offset last_included_offset); + consensus* _raft; ctx_log _log; mutex _apply_mutex; diff --git a/src/v/raft/tests/persisted_stm_test.cc b/src/v/raft/tests/persisted_stm_test.cc index a7827954832f5..2bf6c34216fb5 100644 --- a/src/v/raft/tests/persisted_stm_test.cc +++ b/src/v/raft/tests/persisted_stm_test.cc @@ -171,12 +171,13 @@ class persisted_kv : public persisted_stm<> { 0, last_applied_offset(), serde::to_iobuf(state)); }; - static void + static std::optional apply_to_state(const model::record_batch& batch, kv_state& state) { if (batch.header().type != model::record_batch_type::raft_data) { - return; + return std::nullopt; } - batch.for_each_record([&state](model::record r) { + kv_operation last_op; + batch.for_each_record([&state, &last_op](model::record r) { auto op = serde::from_iobuf(r.value().copy()); /** * Here we check if validation pre replication is correct. @@ -206,11 +207,16 @@ class persisted_kv : public persisted_stm<> { state.remove(op.key); } } + last_op = op; }); + return last_op; } ss::future<> apply(const model::record_batch& batch) override { - apply_to_state(batch, state); + auto last_op = apply_to_state(batch, state); + if (last_op) { + last_operation = std::move(*last_op); + } co_return; } @@ -265,9 +271,38 @@ class persisted_kv : public persisted_stm<> { } kv_state state; + kv_operation last_operation; raft_node_instance& raft_node; }; +class other_persisted_kv : public persisted_kv { +public: + static constexpr std::string_view name = "other_persited_kv_stm"; + explicit other_persisted_kv(raft_node_instance& rn) + : persisted_kv(rn) {} + ss::future<> apply_raft_snapshot(const iobuf& buffer) override { + if (buffer.empty()) { + co_return; + } + state = serde::from_iobuf(buffer.copy()); + co_return; + }; + /** + * This STM doesn't execute the full apply logic from the base persisted_kv + * as it is going to be started without the full data in the snapshot, hence + * the validation would fail. + */ + ss::future<> apply(const model::record_batch& batch) override { + if (batch.header().type != model::record_batch_type::raft_data) { + co_return; + } + batch.for_each_record([this](model::record r) { + last_operation = serde::from_iobuf(r.value().copy()); + }); + co_return; + } +}; + struct persisted_stm_test_fixture : state_machine_fixture { ss::future<> initialize_state_machines() { create_nodes(); @@ -549,3 +584,70 @@ TEST_F_CORO(persisted_stm_test_fixture, test_raft_and_local_snapshot) { ASSERT_EQ_CORO(stm->state, expected); } } +/** + * Tests the scenario in which an STM is added to the partition after it was + * already alive and Raft snapshot was taken on the partition. + * + * The snapshot doesn't contain data for the newly created stm, however the stm + * next offset should still be updated to make it possible for the STM to catch + * up. + */ +TEST_F_CORO(persisted_stm_test_fixture, test_adding_state_machine) { + co_await initialize_state_machines(); + kv_state expected; + auto ops = random_operations(2000); + for (auto batch : ops) { + co_await apply_operations(expected, std::move(batch)); + } + co_await wait_for_apply(); + for (const auto& [_, stm] : node_stms) { + ASSERT_EQ_CORO(stm->state, expected); + } + + // take local snapshot on every node + co_await take_local_snapshot_on_every_node(); + // update state + auto ops_phase_two = random_operations(50); + for (auto batch : ops_phase_two) { + co_await apply_operations(expected, std::move(batch)); + } + + co_await wait_for_apply(); + for (const auto& [_, stm] : node_stms) { + ASSERT_EQ_CORO(stm->state, expected); + } + + // take Raft snapshot on every node, there are two possibilities here either + // a snapshot will be taken at offset preceding current local snapshot or + // the one following local snapshot. + co_await take_raft_snapshot_all_nodes(); + + auto committed = node(model::node_id(0)).raft()->committed_offset(); + + absl::flat_hash_map data_directories; + for (auto& [id, node] : nodes()) { + data_directories[id] = node->raft()->log()->config().base_directory(); + } + + for (auto& [id, data_dir] : data_directories) { + co_await stop_node(id); + add_node(id, model::revision_id(0), std::move(data_dir)); + } + ss::shared_ptr other_stm; + for (auto& [_, node] : nodes()) { + co_await node->initialise(all_vnodes()); + raft::state_machine_manager_builder builder; + auto stm = builder.create_stm(*node); + other_stm = builder.create_stm(*node); + co_await node->start(std::move(builder)); + node_stms.emplace(node->get_vnode(), std::move(stm)); + } + + co_await wait_for_committed_offset(committed, 30s); + co_await wait_for_apply(); + + for (const auto& [_, stm] : node_stms) { + ASSERT_EQ_CORO(stm->state, expected); + ASSERT_EQ_CORO(stm->last_operation, other_stm->last_operation); + } +} From eacd0a8f0914d46b8b9c538a82cb6128d24d8db6 Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Fri, 15 Mar 2024 11:31:23 +0100 Subject: [PATCH 2/2] tests: add a test validating enabling tiered storage in running cluster Signed-off-by: Michal Maslanka (cherry picked from commit b700b95562dbfc7469e79f7c0c966d29c01d2f6d) --- .../tests/tiered_storage_enable_test.py | 89 +++++++++++++++++++ 1 file changed, 89 insertions(+) create mode 100644 tests/rptest/tests/tiered_storage_enable_test.py diff --git a/tests/rptest/tests/tiered_storage_enable_test.py b/tests/rptest/tests/tiered_storage_enable_test.py new file mode 100644 index 0000000000000..359a072587602 --- /dev/null +++ b/tests/rptest/tests/tiered_storage_enable_test.py @@ -0,0 +1,89 @@ +# Copyright 2024 Redpanda Data, Inc. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.md +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0 + +from rptest.clients.rpk import RpkTool +from rptest.services.cluster import cluster +from ducktape.utils.util import wait_until +from rptest.clients.types import TopicSpec +from rptest.services.kgo_verifier_services import KgoVerifierProducer +from rptest.services.redpanda import SISettings + +from rptest.tests.prealloc_nodes import PreallocNodesTest +from rptest.utils.mode_checks import skip_debug_mode + + +class TestEnablingTieredStorage(PreallocNodesTest): + def __init__(self, test_context): + super().__init__(test_context, + num_brokers=3, + node_prealloc_count=1, + si_settings=SISettings(test_context=test_context, + fast_uploads=True)) + + @property + def producer_throughput(self): + return 5 * (1024 * 1024) if not self.debug_mode else 1000 + + @property + def msg_count(self): + return 20 * int(self.producer_throughput / self.msg_size) + + @property + def msg_size(self): + return 128 + + def start_producer(self): + self.logger.info( + f"starting kgo-verifier producer with {self.msg_count} messages of size {self.msg_size} and throughput: {self.producer_throughput} bps" + ) + self.producer = KgoVerifierProducer( + self.test_context, + self.redpanda, + self._topic, + self.msg_size, + self.msg_count, + custom_node=self.preallocated_nodes, + rate_limit_bps=self.producer_throughput) + + self.producer.start(clean=False) + self.producer.wait_for_acks( + 5 * (self.producer_throughput / self.msg_size), 120, 1) + + @cluster(num_nodes=4) + @skip_debug_mode + def test_enabling_tiered_storage_on_old_topic(self): + # disable cloud storage and restart cluster + self.redpanda.set_cluster_config({"cloud_storage_enabled": False}, + expect_restart=True) + # create topic without tiered storage enabled + topic = TopicSpec(partition_count=3, + segment_bytes=1024 * 1024, + retention_bytes=5 * 1024 * 1024) + + self.client().create_topic(topic) + self._topic = topic.name + self.start_producer() + rpk = RpkTool(self.redpanda) + + def _start_offset_updated(): + partitions = rpk.describe_topic(self._topic) + return all([p.start_offset > 0 for p in partitions]) + + wait_until( + _start_offset_updated, + timeout_sec=60, + backoff_sec=1, + err_msg= + "timed out waiting for local retention to clean up some some data") + + # enable cloud storage + self.redpanda.set_cluster_config({"cloud_storage_enabled": True}, + expect_restart=True) + + self.redpanda.wait_for_manifest_uploads()