-
Notifications
You must be signed in to change notification settings - Fork 603
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CORE-7961] rptest: Add ducktape test for partition movement in RRR cluster #24159
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,6 +11,7 @@ | |
from rptest.services.cluster import cluster | ||
|
||
from rptest.clients.default import DefaultClient | ||
from rptest.services.admin import Admin | ||
from rptest.services.redpanda import SISettings | ||
from rptest.clients.rpk import RpkTool, RpkException | ||
from rptest.clients.types import TopicSpec | ||
|
@@ -159,12 +160,12 @@ def __init__(self, test_context: TestContext): | |
cloud_storage_housekeeping_interval_ms=10) | ||
self.second_cluster = None | ||
|
||
def start_second_cluster(self) -> None: | ||
def start_second_cluster(self, num_brokers=3) -> None: | ||
# NOTE: the RRR cluster won't have a bucket, so don't upload. | ||
extra_rp_conf = dict(enable_cluster_metadata_upload_loop=False) | ||
self.second_cluster = make_redpanda_service( | ||
self.test_context, | ||
num_brokers=3, | ||
num_brokers=num_brokers, | ||
si_settings=self.rr_settings, | ||
extra_rp_conf=extra_rp_conf) | ||
self.second_cluster.start(start_si=False) | ||
|
@@ -210,15 +211,17 @@ def create_read_replica_topic_success(self) -> bool: | |
def _setup_read_replica(self, | ||
num_messages=0, | ||
partition_count=3, | ||
producer_timeout=None) -> None: | ||
producer_timeout=None, | ||
num_source_brokers=3, | ||
num_rrr_brokers=3) -> None: | ||
if producer_timeout is None: | ||
producer_timeout = 30 | ||
|
||
self.logger.info(f"Setup read replica \"{self.topic_name}\", : " | ||
f"{num_messages} msg, {partition_count} " | ||
"partitions.") | ||
# Create original topic | ||
self.start_redpanda(3, si_settings=self.si_settings) | ||
self.start_redpanda(num_source_brokers, si_settings=self.si_settings) | ||
spec = TopicSpec(name=self.topic_name, | ||
partition_count=partition_count, | ||
replication_factor=3) | ||
|
@@ -237,7 +240,7 @@ def _setup_read_replica(self, | |
str(self.producer.last_acked_offsets)) | ||
self.producer.stop() | ||
|
||
self.start_second_cluster() | ||
self.start_second_cluster(num_rrr_brokers) | ||
|
||
# wait until the read replica topic creation succeeds | ||
wait_until( | ||
|
@@ -457,6 +460,82 @@ def test_simple_end_to_end( | |
m = f"S3 Bucket usage changed during read replica test: {delta}" | ||
assert False, m | ||
|
||
def _get_node_assignments(self, admin, topic, partition): | ||
def try_get_partitions(): | ||
try: | ||
res = admin.get_partitions(topic, partition) | ||
return True, res | ||
except: | ||
return False, None | ||
|
||
res = wait_until_result(try_get_partitions, | ||
timeout_sec=30, | ||
backoff_sec=1) | ||
|
||
return [dict(node_id=a["node_id"]) for a in res["replicas"]] | ||
|
||
def _set_partition_assignments(self, topic, partition, assignments, | ||
admin: Admin): | ||
self.logger.info( | ||
f"setting assignments for {topic}/{partition} to {assignments}") | ||
|
||
admin.set_partition_replicas(topic, partition, | ||
[{ | ||
"core": 0, | ||
"node_id": a["node_id"], | ||
} for a in assignments]) | ||
|
||
# fips on S3 is not compatible with path-style urls. TODO remove this once get_cloud_storage_type_and_url_style is fips aware | ||
@skip_fips_mode | ||
@cluster(num_nodes=10, log_allow_list=READ_REPLICA_LOG_ALLOW_LIST) | ||
@matrix(partition_count=[10]) | ||
def test_partition_movement(self, partition_count: int) -> None: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: would have been helpful to have the pr description as method doc here |
||
data_timeout = 300 | ||
num_messages = 100000 | ||
self._setup_read_replica(num_messages=num_messages, | ||
partition_count=partition_count, | ||
producer_timeout=300, | ||
num_rrr_brokers=4) | ||
|
||
# Consume from read replica topic and validate | ||
self.start_consumer() | ||
self.run_validation( | ||
min_records=num_messages, | ||
consumer_timeout_sec=data_timeout) # calls self.consumer.stop() | ||
|
||
# Initiate partition movement in RRR cluster | ||
admin = Admin(self.second_cluster) | ||
brokers = admin.get_brokers() | ||
for part_id in range(0, partition_count): | ||
assignments = self._get_node_assignments(admin, self.topic_name, | ||
part_id) | ||
self.logger.info( | ||
f"initial assignments for {self.topic_name}/{part_id}: {assignments}" | ||
) | ||
replicas = set([r['node_id'] for r in assignments]) | ||
for b in brokers: | ||
if b['node_id'] not in replicas: | ||
assignments[0] = {"node_id": b['node_id']} | ||
break | ||
self.logger.info( | ||
f"new assignments for {self.topic_name}/{part_id}: {assignments}" | ||
) | ||
self._set_partition_assignments(self.topic_name, part_id, | ||
assignments, admin) | ||
Comment on lines
+509
to
+524
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is the goal to run partition movement concurrently with data arriving into the destination cluster? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not necessary, just to make sure that fast partition movement is not broken for RRR There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we verify that fast partition movement was used? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I mistakenly mentioned fast partition movement here. RRR shouldn't use it. But the partition can still be moved and we need to check that it's not breaking RRR. |
||
|
||
# Wait until reconfigurations are started and then completed | ||
wait_until(lambda: len(admin.list_reconfigurations()) > 0, | ||
30, | ||
err_msg="Reconfigurations are not started") | ||
wait_until(lambda: len(admin.list_reconfigurations()) == 0, | ||
30, | ||
err_msg="Reconfiguration are not completed in time") | ||
|
||
# Consume all messages | ||
self.start_consumer() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why do we start the consumer and then immediately end the test? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the goal here was to check that we can still consume from the partition |
||
self.run_validation(min_records=num_messages, | ||
consumer_timeout_sec=data_timeout) | ||
|
||
|
||
class ReadReplicasUpgradeTest(EndToEndTest): | ||
log_segment_size = 1024 * 1024 | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Accidental copy-paste error? I don't see where you force path-style urls in this test so it should run just fine in FIPS mode.