-
Notifications
You must be signed in to change notification settings - Fork 602
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
564a424
commit 379a1e1
Showing
1 changed file
with
243 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,243 @@ | ||
# Copyright 2024 Redpanda Data, Inc. | ||
# | ||
# Use of this software is governed by the Business Source License | ||
# included in the file licenses/BSL.md | ||
# | ||
# As of the Change Date specified in that file, in accordance with | ||
# the Business Source License, use of this software will be governed | ||
# by the Apache License, Version 2.0 | ||
import time | ||
import threading | ||
|
||
from ducktape.mark import matrix | ||
from ducktape.utils.util import wait_until | ||
from rptest.clients.types import TopicSpec | ||
from rptest.services.cluster import cluster | ||
from rptest.services.kgo_verifier_services import KgoVerifierProducer, KgoVerifierConsumerGroupConsumer, KgoVerifierSeqConsumer | ||
from rptest.services.redpanda import MetricsEndpoint | ||
from rptest.tests.partition_movement import PartitionMovementMixin | ||
from rptest.tests.redpanda_test import RedpandaTest | ||
from rptest.utils.mode_checks import skip_debug_mode | ||
from rptest.tests.prealloc_nodes import PreallocNodesTest | ||
|
||
|
||
class LogCompactionTest(PreallocNodesTest, PartitionMovementMixin): | ||
def __init__(self, test_context): | ||
self.test_context = test_context | ||
# Run with small segments, a low retention value and a very frequent compaction interval. | ||
self.extra_rp_conf = { | ||
'log_compaction_interval_ms': 4000, | ||
'log_segment_size': 2 * 1024**2, # 2 MiB | ||
'retention_bytes': 25 * 1024**2, # 25 MiB | ||
'compacted_log_segment_size': 1024**2 # 1 MiB | ||
} | ||
super().__init__(test_context=test_context, | ||
num_brokers=3, | ||
node_prealloc_count=1, | ||
extra_rp_conf=self.extra_rp_conf) | ||
|
||
def prologue(self, cleanup_policy, key_set_cardinality): | ||
""" | ||
Sets variables and creates topic. | ||
""" | ||
self.msg_size = 1024 # 1 KiB | ||
self.rate_limit = 50 * 1024**2 # 50 MiBps | ||
self.total_data = 100 * 1024**2 # 100 MiB | ||
self.msg_count = int(self.total_data / self.msg_size) | ||
self.tombstone_probability = 0.4 | ||
self.partition_count = 10 | ||
self.consumer_count = 1 | ||
self.cleanup_policy = cleanup_policy | ||
self.key_set_cardinality = key_set_cardinality | ||
|
||
# A value below log_compaction_interval_ms (therefore, tombstones that would be compacted away during deduplication will be visibly removed instead) | ||
self.delete_retention_ms = 3000 | ||
self.topic_spec = TopicSpec( | ||
name="tapioca", | ||
delete_retention_ms=self.delete_retention_ms, | ||
partition_count=self.partition_count, | ||
cleanup_policy=self.cleanup_policy) | ||
self.client().create_topic(self.topic_spec) | ||
|
||
def _start_stress_fibers(self): | ||
""" | ||
Start the stress fibers across the redpanda nodes, | ||
retrying until successful or timeout is reached. | ||
""" | ||
def try_start_stress_fiber(): | ||
res = list( | ||
map( | ||
lambda node: self.redpanda._admin.stress_fiber_start( | ||
node, | ||
num_fibers=10, | ||
min_spins_per_scheduling_point=10, | ||
max_spins_per_scheduling_point=100), | ||
self.redpanda.nodes)) | ||
return all([r.status_code == 200 for r in res]) | ||
|
||
wait_until(try_start_stress_fiber, | ||
timeout_sec=30, | ||
backoff_sec=2, | ||
err_msg="Unable to start stress fibers") | ||
|
||
def _stop_stress_fibers(self): | ||
""" | ||
Attempt to stop the stress fibers. | ||
""" | ||
try: | ||
for node in self.redpanda.nodes: | ||
self.redpanda._admin.stress_fiber_stop(node) | ||
except: | ||
return | ||
|
||
def get_removed_tombstones(self): | ||
return self.redpanda.metric_sum( | ||
metric_name="vectorized_storage_log_tombstones_removed_total", | ||
metrics_endpoint=MetricsEndpoint.METRICS) | ||
|
||
def part_one(self): | ||
""" | ||
Creates producer and consumer, a partition movement thread, and triggers | ||
stress fibers on brokers. After producing and consuming, asserts on | ||
latest key-value pairs produced and consumed. | ||
""" | ||
class PartitionMoveExceptionReporter: | ||
exc = None | ||
|
||
def background_test_loop(reporter, | ||
fn, | ||
iterations=10, | ||
sleep_sec=1, | ||
allowable_retries=3): | ||
try: | ||
while iterations > 0: | ||
try: | ||
fn() | ||
except Exception as e: | ||
if allowable_retries == 0: | ||
raise e | ||
time.sleep(sleep_sec) | ||
iterations -= 1 | ||
allowable_retries -= 1 | ||
except Exception as e: | ||
reporter.exc = e | ||
|
||
def issue_partition_move(): | ||
try: | ||
self._dispatch_random_partition_move(self.topic_spec.name, 0) | ||
self._wait_for_move_in_progress(self.topic_spec.name, | ||
0, | ||
timeout=5) | ||
except Exception as e: | ||
reporter.exc = e | ||
|
||
partition_move_thread = threading.Thread( | ||
target=background_test_loop, | ||
args=(PartitionMoveExceptionReporter, issue_partition_move), | ||
kwargs={ | ||
'iterations': 5, | ||
'sleep_sec': 1 | ||
}) | ||
|
||
producer = KgoVerifierProducer( | ||
context=self.test_context, | ||
redpanda=self.redpanda, | ||
topic=self.topic_spec.name, | ||
msg_size=self.msg_size, | ||
msg_count=self.msg_count, | ||
rate_limit_bps=self.rate_limit, | ||
key_set_cardinality=self.key_set_cardinality, | ||
tolerate_data_loss=False, | ||
tombstone_probability=self.tombstone_probability, | ||
validate_latest_values=True, | ||
custom_node=self.preallocated_nodes) | ||
|
||
consumer = KgoVerifierConsumerGroupConsumer( | ||
self.test_context, | ||
self.redpanda, | ||
self.topic_spec.name, | ||
self.msg_size, | ||
readers=self.consumer_count, | ||
compacted=True, | ||
nodes=self.preallocated_nodes) | ||
|
||
# Start partition movement thread | ||
partition_move_thread.start() | ||
|
||
# Start stress fibers | ||
self._start_stress_fibers() | ||
|
||
# Produce and wait | ||
producer.start() | ||
producer.wait_for_latest_value_map() | ||
producer.wait(timeout_sec=60) | ||
|
||
assert producer.produce_status.tombstones_produced > 0 | ||
assert producer.produce_status.bad_offsets == 0 | ||
|
||
# Consume and wait. clean=False to not accidentally remove latest value map. | ||
consumer.start(clean=False) | ||
consumer.wait(timeout_sec=60) | ||
|
||
partition_move_thread.join() | ||
self._stop_stress_fibers() | ||
|
||
# Clean up | ||
producer.stop() | ||
consumer.stop() | ||
|
||
if PartitionMoveExceptionReporter.exc is not None: | ||
raise PartitionMoveExceptionReporter.exc | ||
|
||
assert consumer.consumer_status.validator.tombstones_consumed > 0 | ||
assert consumer.consumer_status.validator.invalid_reads == 0 | ||
|
||
def part_two(self): | ||
""" | ||
After several rounds of compaction, restart the brokers, | ||
create a consumer, and assert that no tombstones are consumed. | ||
""" | ||
|
||
# Restart each redpanda broker to force roll segments | ||
self.redpanda.restart_nodes(self.redpanda.nodes) | ||
self.redpanda.wait_for_membership(first_start=False) | ||
|
||
# Sleep until the log has definitely been fully compacted. | ||
timeout = self.extra_rp_conf['log_compaction_interval_ms'] / 1000 * 10 | ||
time.sleep(timeout) | ||
consumer = KgoVerifierSeqConsumer(self.test_context, | ||
self.redpanda, | ||
self.topic_spec.name, | ||
self.msg_size, | ||
compacted=True, | ||
loop=False, | ||
validate_latest_values=True, | ||
nodes=self.preallocated_nodes) | ||
|
||
# Consume and wait. clean=False to not accidentally remove latest value map. | ||
consumer.start(clean=False) | ||
consumer.wait(timeout_sec=60) | ||
|
||
# Expect to see 0 tombstones consumed | ||
assert consumer.consumer_status.validator.tombstones_consumed == 0 | ||
|
||
consumer.stop() | ||
|
||
@skip_debug_mode | ||
@cluster(num_nodes=4) | ||
@matrix( | ||
cleanup_policy=[ | ||
TopicSpec.CLEANUP_COMPACT, TopicSpec.CLEANUP_COMPACT_DELETE | ||
], | ||
key_set_cardinality=[100, 1000], | ||
) | ||
def compaction_stress_test(self, cleanup_policy, key_set_cardinality): | ||
""" | ||
Uses partition movement, stress fibers, and frequent compaction/garbage collecting to | ||
validate tombstone removal and general compaction behavior. | ||
""" | ||
self.prologue(cleanup_policy, key_set_cardinality) | ||
|
||
self.part_one() | ||
|
||
self.part_two() |