Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CORE-7961] rptest: Add ducktape test for partition movement in RRR cluster #24159

Open
wants to merge 1 commit into
base: dev
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 80 additions & 5 deletions tests/rptest/tests/read_replica_e2e_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from rptest.services.cluster import cluster

from rptest.clients.default import DefaultClient
from rptest.services.admin import Admin
from rptest.services.redpanda import SISettings
from rptest.clients.rpk import RpkTool, RpkException
from rptest.clients.types import TopicSpec
Expand Down Expand Up @@ -159,12 +160,12 @@ def __init__(self, test_context: TestContext):
cloud_storage_housekeeping_interval_ms=10)
self.second_cluster = None

def start_second_cluster(self) -> None:
def start_second_cluster(self, num_brokers=3) -> None:
# NOTE: the RRR cluster won't have a bucket, so don't upload.
extra_rp_conf = dict(enable_cluster_metadata_upload_loop=False)
self.second_cluster = make_redpanda_service(
self.test_context,
num_brokers=3,
num_brokers=num_brokers,
si_settings=self.rr_settings,
extra_rp_conf=extra_rp_conf)
self.second_cluster.start(start_si=False)
Expand Down Expand Up @@ -210,15 +211,17 @@ def create_read_replica_topic_success(self) -> bool:
def _setup_read_replica(self,
num_messages=0,
partition_count=3,
producer_timeout=None) -> None:
producer_timeout=None,
num_source_brokers=3,
num_rrr_brokers=3) -> None:
if producer_timeout is None:
producer_timeout = 30

self.logger.info(f"Setup read replica \"{self.topic_name}\", : "
f"{num_messages} msg, {partition_count} "
"partitions.")
# Create original topic
self.start_redpanda(3, si_settings=self.si_settings)
self.start_redpanda(num_source_brokers, si_settings=self.si_settings)
spec = TopicSpec(name=self.topic_name,
partition_count=partition_count,
replication_factor=3)
Expand All @@ -237,7 +240,7 @@ def _setup_read_replica(self,
str(self.producer.last_acked_offsets))
self.producer.stop()

self.start_second_cluster()
self.start_second_cluster(num_rrr_brokers)

# wait until the read replica topic creation succeeds
wait_until(
Expand Down Expand Up @@ -457,6 +460,78 @@ def test_simple_end_to_end(
m = f"S3 Bucket usage changed during read replica test: {delta}"
assert False, m

def _get_node_assignments(self, admin, topic, partition):
def try_get_partitions():
try:
res = admin.get_partitions(topic, partition)
return True, res
except:
return False, None

res = wait_until_result(try_get_partitions,
timeout_sec=30,
backoff_sec=1)

return [dict(node_id=a["node_id"]) for a in res["replicas"]]

def _set_partition_assignments(self, topic, partition, assignments,
admin: Admin):
self.logger.info(
f"setting assignments for {topic}/{partition} to {assignments}")

admin.set_partition_replicas(topic, partition,
[{
"core": 0,
"node_id": a["node_id"],
} for a in assignments])

# fips on S3 is not compatible with path-style urls. TODO remove this once get_cloud_storage_type_and_url_style is fips aware
@skip_fips_mode
@cluster(num_nodes=11, log_allow_list=READ_REPLICA_LOG_ALLOW_LIST)
@matrix(partition_count=[10])
def test_partition_movement(self, partition_count: int) -> None:
data_timeout = 300
num_messages = 100000
self._setup_read_replica(num_messages=num_messages,
partition_count=partition_count,
producer_timeout=300,
num_rrr_brokers=4)

# Consume from read replica topic and validate
self.start_consumer()
self.run_validation(
min_records=num_messages,
consumer_timeout_sec=data_timeout) # calls self.consumer.stop()

# Initiate partition movement in RRR cluster
admin = Admin(self.second_cluster)
brokers = admin.get_brokers()
for part_id in range(0, partition_count):
assignments = self._get_node_assignments(admin, self.topic_name,
part_id)
self.logger.info(
f"initial assignments for {self.topic_name}/{part_id}: {assignments}"
)
replicas = set([r['node_id'] for r in assignments])
for b in brokers:
if b['node_id'] not in replicas:
assignments[0] = {"node_id": b['node_id']}
break
self.logger.info(
f"new assignments for {self.topic_name}/{part_id}: {assignments}"
)
self._set_partition_assignments(self.topic_name, part_id,
assignments, admin)
Comment on lines +509 to +524
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the goal to run partition movement concurrently with data arriving into the destination cluster?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not necessary, just to make sure that fast partition movement is not broken for RRR
we have a gap in our testing here

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we verify that fast partition movement was used?


# Wait until reconfigurations are started and start consuming in parallel
wait_until(
lambda: len(admin.list_reconfigurations()) == partition_count, 30)
Comment on lines +527 to +528
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add an informative error message parameter to this wait_until

self.start_consumer()

# Wait until reconfiguration are completed and start consuming again
wait_until(lambda: len(admin.list_reconfigurations()) == 0, 30)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add an informative error message parameter to this wait_until

self.start_consumer()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we start the consumer and then immediately end the test?



class ReadReplicasUpgradeTest(EndToEndTest):
log_segment_size = 1024 * 1024
Expand Down
Loading