Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Datalake in Random Node Operations test #24179

Merged
merged 2 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions src/v/datalake/datalake_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,22 @@ datalake_manager::datalake_manager(
, _sg(sg)
, _effective_max_translator_buffered_data(
std::min(memory_limit, max_translator_buffered_data))
, _parallel_translations(std::make_unique<ssx::semaphore>(
size_t(
std::floor(memory_limit / _effective_max_translator_buffered_data)),
"datalake_parallel_translations"))
, _iceberg_commit_interval(
config::shard_local_cfg().iceberg_catalog_commit_interval_ms.bind()) {}
config::shard_local_cfg().iceberg_catalog_commit_interval_ms.bind()) {
vassert(memory_limit > 0, "Memory limit must be greater than 0");
auto max_parallel = static_cast<size_t>(
std::floor(memory_limit / _effective_max_translator_buffered_data));
vlog(
datalake_log.debug,
"Creating datalake manager with memory limit: {}, effective max "
"translator buffered data: {} and max parallel translations: {}",
memory_limit,
_effective_max_translator_buffered_data,
max_parallel);

_parallel_translations = std::make_unique<ssx::semaphore>(
size_t(max_parallel), "datalake_parallel_translations");
}
datalake_manager::~datalake_manager() = default;

ss::future<> datalake_manager::start() {
Expand Down
83 changes: 65 additions & 18 deletions tests/rptest/tests/random_node_operations_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,17 @@
import threading
from rptest.clients.rpk import RpkTool
from rptest.services.admin import Admin
from rptest.services.apache_iceberg_catalog import IcebergRESTCatalog
from rptest.tests.prealloc_nodes import PreallocNodesTest

from ducktape.mark import matrix
from ducktape.mark import matrix, ignore
from ducktape.utils.util import wait_until
from rptest.services.admin_ops_fuzzer import AdminOperationsFuzzer
from rptest.services.cluster import cluster
from rptest.clients.types import TopicSpec
from rptest.clients.default import DefaultClient
from rptest.services.kgo_verifier_services import KgoVerifierConsumerGroupConsumer, KgoVerifierProducer
from rptest.services.redpanda import CHAOS_LOG_ALLOW_LIST, PREV_VERSION_LOG_ALLOW_LIST, SISettings
from rptest.services.redpanda import CHAOS_LOG_ALLOW_LIST, PREV_VERSION_LOG_ALLOW_LIST, CloudStorageType, PandaproxyConfig, SISettings, SchemaRegistryConfig
from rptest.services.redpanda_installer import RedpandaInstaller
from rptest.utils.mode_checks import cleanup_on_early_exit, skip_debug_mode, skip_fips_mode
from rptest.utils.node_operations import FailureInjectorBackgroundThread, NodeOpsExecutor, generate_random_workload
Expand Down Expand Up @@ -59,12 +60,22 @@ def __init__(self, test_context, *args, **kwargs):
},
# 2 nodes for kgo producer/consumer workloads
node_prealloc_count=3,
schema_registry_config=SchemaRegistryConfig(),
pandaproxy_config=PandaproxyConfig(),
*args,
**kwargs)
self.nodes_with_prev_version = []
self.installer = self.redpanda._installer
self.previous_version = self.installer.highest_from_prior_feature_version(
RedpandaInstaller.HEAD)
self._si_settings = SISettings(self.test_context,
cloud_storage_enable_remote_read=True,
cloud_storage_enable_remote_write=True,
fast_uploads=True)
self.catalog_service = IcebergRESTCatalog(
test_context,
cloud_storage_bucket=self._si_settings.cloud_storage_bucket,
filesystem_wrapper_mode=False)

def min_producer_records(self):
return 20 * self.producer_throughput
Expand Down Expand Up @@ -95,8 +106,7 @@ def early_exit_hook(self):
self.redpanda.set_skip_if_no_redpanda_log(True)

def setUp(self):
# defer starting redpanda to test body
pass
self.catalog_service.start()

def _setup_test_scale(self):
# test setup
Expand Down Expand Up @@ -132,17 +142,28 @@ def _setup_test_scale(self):
f"running test with: [message_size {self.msg_size}, total_bytes: {self.total_data}, message_count: {self.msg_count}, rate_limit: {self.rate_limit}, cluster_operations: {self.node_operations}]"
)

def _start_redpanda(self, mixed_versions, with_tiered_storage):
def _start_redpanda(self, mixed_versions, with_tiered_storage,
with_iceberg):

if with_tiered_storage:
si_settings = SISettings(self.test_context,
cloud_storage_enable_remote_read=True,
cloud_storage_enable_remote_write=True,
fast_uploads=True)
if with_tiered_storage or with_iceberg:
# since this test is deleting topics we must tolerate missing manifests
si_settings.set_expected_damage(
self._si_settings.set_expected_damage(
{"ntr_no_topic_manifest", "ntpr_no_manifest"})
self.redpanda.set_si_settings(si_settings)
self.redpanda.set_si_settings(self._si_settings)

if with_iceberg:
self.redpanda.add_extra_rp_conf({
"iceberg_enabled":
"true",
"iceberg_catalog_type":
"rest",
"iceberg_rest_catalog_endpoint":
self.catalog_service.catalog_url,
"iceberg_rest_catalog_client_id":
"panda-user",
"iceberg_rest_catalog_client_secret":
"panda-secret",
})

self.redpanda.set_seed_servers(self.redpanda.nodes)
if mixed_versions:
Expand Down Expand Up @@ -276,24 +297,40 @@ def verify(self):

assert self.consumer.consumer_status.validator.invalid_reads == 0, f"Invalid reads in topic: {self.topic}, invalid reads count: {self.consumer.consumer_status.validator.invalid_reads}"

def maybe_enable_iceberg_for_topic(self, topic_spec: TopicSpec,
iceberg_enabled: bool):
if iceberg_enabled:
client = DefaultClient(self.redpanda)
client.alter_topic_config(topic_spec.name,
TopicSpec.PROPERTY_ICEBERG_MODE,
"key_value")

# before v24.2, dns query to s3 endpoint do not include the bucketname, which is required for AWS S3 fips endpoints
@skip_fips_mode
@skip_debug_mode
@cluster(num_nodes=8,
@cluster(num_nodes=9,
log_allow_list=CHAOS_LOG_ALLOW_LIST +
PREV_VERSION_LOG_ALLOW_LIST + TS_LOG_ALLOW_LIST)
@matrix(enable_failures=[True, False],
mixed_versions=[True, False],
with_tiered_storage=[True, False])
with_tiered_storage=[True, False],
with_iceberg=[True, False],
cloud_storage_type=[CloudStorageType.S3])
def test_node_operations(self, enable_failures, mixed_versions,
with_tiered_storage):
with_tiered_storage, with_iceberg,
cloud_storage_type):
# In order to reduce the number of parameters and at the same time cover
# as many use cases as possible this test uses 3 topics which 3 separate
# producer/consumer pairs:
#
# tp-workload-deletion - topic with delete cleanup policy
# tp-workload-compaction - topic with compaction
# tp-workload-fast - topic with fast partition movements enabled
if with_iceberg and mixed_versions:
self.should_skip = True
self.logger.info(
"Skipping test with iceberg and mixed versions as it is not supported"
)

def enable_fast_partition_movement():
if not with_tiered_storage:
Expand Down Expand Up @@ -328,11 +365,16 @@ def enable_write_caching_testing():

# start redpanda process
self._start_redpanda(mixed_versions,
with_tiered_storage=with_tiered_storage)
with_tiered_storage=with_tiered_storage,
with_iceberg=with_iceberg)

self.redpanda.set_cluster_config(
{"controller_snapshot_max_age_sec": 1})

if with_iceberg:
self.redpanda.set_cluster_config(
{"iceberg_catalog_commit_interval_ms": 10000})

client = DefaultClient(self.redpanda)

# create some initial topics
Expand All @@ -345,6 +387,7 @@ def enable_write_caching_testing():
redpanda_remote_read=with_tiered_storage,
redpanda_remote_write=with_tiered_storage)
client.create_topic(regular_topic)
self.maybe_enable_iceberg_for_topic(regular_topic, with_iceberg)

if with_tiered_storage:
# change local retention policy to make some local segments will be deleted during the test
Expand All @@ -370,6 +413,7 @@ def enable_write_caching_testing():
redpanda_remote_read=with_tiered_storage,
redpanda_remote_write=with_tiered_storage)
client.create_topic(compacted_topic)
self.maybe_enable_iceberg_for_topic(compacted_topic, with_iceberg)

compacted_producer_consumer = RandomNodeOperationsTest.producer_consumer(
test_context=self.test_context,
Expand Down Expand Up @@ -404,7 +448,7 @@ def enable_write_caching_testing():
default_segment_size)
self._alter_local_topic_retention_bytes(fast_topic.name,
8 * default_segment_size)

self.maybe_enable_iceberg_for_topic(fast_topic, with_iceberg)
fast_producer_consumer = RandomNodeOperationsTest.producer_consumer(
test_context=self.test_context,
logger=self.logger,
Expand Down Expand Up @@ -433,7 +477,8 @@ def enable_write_caching_testing():
client.create_topic(write_caching_topic)
client.alter_topic_config(write_caching_topic.name,
TopicSpec.PROPERTY_WRITE_CACHING, "true")

self.maybe_enable_iceberg_for_topic(write_caching_topic,
with_iceberg)
write_caching_producer_consumer = RandomNodeOperationsTest.producer_consumer(
test_context=self.test_context,
logger=self.logger,
Expand Down Expand Up @@ -473,6 +518,8 @@ def enable_write_caching_testing():
self.logger,
lock,
progress_timeout=120 if enable_failures else 60)
if with_iceberg:
executor.override_config_params = {"iceberg_enabled": True}
for i, op in enumerate(
generate_random_workload(
available_nodes=self.active_node_idxs)):
Expand Down
11 changes: 7 additions & 4 deletions tests/rptest/utils/node_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ def __init__(self,
self.timeout = 360
self.lock = lock
self.progress_timeout = progress_timeout
self.override_config_params: None | dict = None

def node_id(self, idx):
return self.redpanda.node_id(self.redpanda.get_node(idx),
Expand Down Expand Up @@ -373,10 +374,12 @@ def add(self, idx: int):

node = self.redpanda.get_node(idx)

self.redpanda.start_node(node,
timeout=self.timeout,
auto_assign_node_id=True,
omit_seeds_on_idx_one=False)
self.redpanda.start_node(
node,
timeout=self.timeout,
auto_assign_node_id=True,
omit_seeds_on_idx_one=False,
override_cfg_params=self.override_config_params)

self.logger.info(
f"added node: {idx} with new node id: {self.node_id(idx)}")
Expand Down
Loading