Skip to content

Commit

Permalink
tests: enable datalake in random node operation tests
Browse files Browse the repository at this point in the history
Added a parameter that enables Iceberg in RandomNodeOperations test to
validate the behavior with node operations and failures.

Signed-off-by: Michał Maślanka <[email protected]>
  • Loading branch information
mmaslankaprv committed Nov 21, 2024
1 parent 33c5027 commit 8c8cb91
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 23 deletions.
88 changes: 69 additions & 19 deletions tests/rptest/tests/random_node_operations_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import threading
from rptest.clients.rpk import RpkTool
from rptest.services.admin import Admin
from rptest.services.apache_iceberg_catalog import IcebergRESTCatalog
from rptest.tests.prealloc_nodes import PreallocNodesTest

from ducktape.mark import matrix
Expand All @@ -22,7 +23,7 @@
from rptest.clients.types import TopicSpec
from rptest.clients.default import DefaultClient
from rptest.services.kgo_verifier_services import KgoVerifierConsumerGroupConsumer, KgoVerifierProducer
from rptest.services.redpanda import CHAOS_LOG_ALLOW_LIST, PREV_VERSION_LOG_ALLOW_LIST, SISettings
from rptest.services.redpanda import CHAOS_LOG_ALLOW_LIST, PREV_VERSION_LOG_ALLOW_LIST, PandaproxyConfig, SISettings, SchemaRegistryConfig
from rptest.services.redpanda_installer import RedpandaInstaller
from rptest.utils.mode_checks import cleanup_on_early_exit, skip_debug_mode, skip_fips_mode
from rptest.utils.node_operations import FailureInjectorBackgroundThread, NodeOpsExecutor, generate_random_workload
Expand Down Expand Up @@ -59,12 +60,29 @@ def __init__(self, test_context, *args, **kwargs):
},
# 2 nodes for kgo producer/consumer workloads
node_prealloc_count=3,
schema_registry_config=SchemaRegistryConfig(),
pandaproxy_config=PandaproxyConfig(),
*args,
**kwargs)
self.nodes_with_prev_version = []
self.installer = self.redpanda._installer
self.previous_version = self.installer.highest_from_prior_feature_version(
RedpandaInstaller.HEAD)
self.previous_version = None
self._si_settings = SISettings(self.test_context,
cloud_storage_enable_remote_read=True,
cloud_storage_enable_remote_write=True,
fast_uploads=True)
self.catalog_service = IcebergRESTCatalog(
test_context,
cloud_storage_bucket=self._si_settings.cloud_storage_bucket,
cloud_storage_access_key=self._si_settings.
cloud_storage_access_key,
cloud_storage_secret_key=self._si_settings.
cloud_storage_secret_key,
cloud_storage_region=self._si_settings.cloud_storage_region,
cloud_storage_api_endpoint=self._si_settings.endpoint_url,
filesystem_wrapper_mode=False)

def min_producer_records(self):
return 20 * self.producer_throughput
Expand Down Expand Up @@ -95,8 +113,17 @@ def early_exit_hook(self):
self.redpanda.set_skip_if_no_redpanda_log(True)

def setUp(self):
# defer starting redpanda to test body
pass
self.catalog_service.start()
self.redpanda.add_extra_rp_conf({
"iceberg_catalog_type":
"rest",
"iceberg_rest_catalog_endpoint":
self.catalog_service.catalog_url,
"iceberg_rest_catalog_client_id":
"panda-user",
"iceberg_rest_catalog_client_secret":
"panda-secret",
})

def _setup_test_scale(self):
# test setup
Expand Down Expand Up @@ -132,17 +159,17 @@ def _setup_test_scale(self):
f"running test with: [message_size {self.msg_size}, total_bytes: {self.total_data}, message_count: {self.msg_count}, rate_limit: {self.rate_limit}, cluster_operations: {self.node_operations}]"
)

def _start_redpanda(self, mixed_versions, with_tiered_storage):
def _start_redpanda(self, mixed_versions, with_tiered_storage,
with_iceberg):

if with_tiered_storage:
si_settings = SISettings(self.test_context,
cloud_storage_enable_remote_read=True,
cloud_storage_enable_remote_write=True,
fast_uploads=True)
if with_tiered_storage or with_iceberg:
# since this test is deleting topics we must tolerate missing manifests
si_settings.set_expected_damage(
self._si_settings.set_expected_damage(
{"ntr_no_topic_manifest", "ntpr_no_manifest"})
self.redpanda.set_si_settings(si_settings)
self.redpanda.set_si_settings(self._si_settings)

if with_iceberg:
self.redpanda.add_extra_rp_conf({"iceberg_enabled": "true"})

self.redpanda.set_seed_servers(self.redpanda.nodes)
if mixed_versions:
Expand Down Expand Up @@ -276,17 +303,26 @@ def verify(self):

assert self.consumer.consumer_status.validator.invalid_reads == 0, f"Invalid reads in topic: {self.topic}, invalid reads count: {self.consumer.consumer_status.validator.invalid_reads}"

def maybe_enable_iceberg_for_topic(self, topic_spec: TopicSpec,
iceberg_enabled: bool):
if iceberg_enabled:
client = DefaultClient(self.redpanda)
client.alter_topic_config(topic_spec.name,
TopicSpec.PROPERTY_ICEBERG_ENABLED,
'true')

# before v24.2, dns query to s3 endpoint do not include the bucketname, which is required for AWS S3 fips endpoints
@skip_fips_mode
@skip_debug_mode
@cluster(num_nodes=8,
@cluster(num_nodes=9,
log_allow_list=CHAOS_LOG_ALLOW_LIST +
PREV_VERSION_LOG_ALLOW_LIST + TS_LOG_ALLOW_LIST)
@matrix(enable_failures=[True, False],
mixed_versions=[True, False],
with_tiered_storage=[True, False])
@matrix(enable_failures=[False],
mixed_versions=[False],
with_tiered_storage=[False],
with_iceberg=[False])
def test_node_operations(self, enable_failures, mixed_versions,
with_tiered_storage):
with_tiered_storage, with_iceberg):
# In order to reduce the number of parameters and at the same time cover
# as many use cases as possible this test uses 3 topics which 3 separate
# producer/consumer pairs:
Expand Down Expand Up @@ -328,11 +364,20 @@ def enable_write_caching_testing():

# start redpanda process
self._start_redpanda(mixed_versions,
with_tiered_storage=with_tiered_storage)
with_tiered_storage=with_tiered_storage,
with_iceberg=with_iceberg)

self.redpanda.set_cluster_config(
{"controller_snapshot_max_age_sec": 1})

if with_iceberg:
self.redpanda.set_cluster_config({
"iceberg_translation_interval_ms_default":
5000,
"iceberg_catalog_commit_interval_ms":
10000
})

client = DefaultClient(self.redpanda)

# create some initial topics
Expand All @@ -345,6 +390,7 @@ def enable_write_caching_testing():
redpanda_remote_read=with_tiered_storage,
redpanda_remote_write=with_tiered_storage)
client.create_topic(regular_topic)
self.maybe_enable_iceberg_for_topic(regular_topic, with_iceberg)

if with_tiered_storage:
# change local retention policy to make some local segments will be deleted during the test
Expand All @@ -370,6 +416,7 @@ def enable_write_caching_testing():
redpanda_remote_read=with_tiered_storage,
redpanda_remote_write=with_tiered_storage)
client.create_topic(compacted_topic)
self.maybe_enable_iceberg_for_topic(compacted_topic, with_iceberg)

compacted_producer_consumer = RandomNodeOperationsTest.producer_consumer(
test_context=self.test_context,
Expand Down Expand Up @@ -404,7 +451,7 @@ def enable_write_caching_testing():
default_segment_size)
self._alter_local_topic_retention_bytes(fast_topic.name,
8 * default_segment_size)

self.maybe_enable_iceberg_for_topic(fast_topic, with_iceberg)
fast_producer_consumer = RandomNodeOperationsTest.producer_consumer(
test_context=self.test_context,
logger=self.logger,
Expand Down Expand Up @@ -433,7 +480,8 @@ def enable_write_caching_testing():
client.create_topic(write_caching_topic)
client.alter_topic_config(write_caching_topic.name,
TopicSpec.PROPERTY_WRITE_CACHING, "true")

self.maybe_enable_iceberg_for_topic(write_caching_topic,
with_iceberg)
write_caching_producer_consumer = RandomNodeOperationsTest.producer_consumer(
test_context=self.test_context,
logger=self.logger,
Expand Down Expand Up @@ -473,6 +521,8 @@ def enable_write_caching_testing():
self.logger,
lock,
progress_timeout=120 if enable_failures else 60)
if with_iceberg:
executor.override_config_params = {"iceberg_enabled": True}
for i, op in enumerate(
generate_random_workload(
available_nodes=self.active_node_idxs)):
Expand Down
11 changes: 7 additions & 4 deletions tests/rptest/utils/node_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ def __init__(self,
self.timeout = 360
self.lock = lock
self.progress_timeout = progress_timeout
self.override_config_params: None | dict = None

def node_id(self, idx):
return self.redpanda.node_id(self.redpanda.get_node(idx),
Expand Down Expand Up @@ -373,10 +374,12 @@ def add(self, idx: int):

node = self.redpanda.get_node(idx)

self.redpanda.start_node(node,
timeout=self.timeout,
auto_assign_node_id=True,
omit_seeds_on_idx_one=False)
self.redpanda.start_node(
node,
timeout=self.timeout,
auto_assign_node_id=True,
omit_seeds_on_idx_one=False,
override_cfg_params=self.override_config_params)

self.logger.info(
f"added node: {idx} with new node id: {self.node_id(idx)}")
Expand Down

0 comments on commit 8c8cb91

Please sign in to comment.