Skip to content

Commit

Permalink
Fix default replication_factor for topics created by Quix apps (#317)
Browse files Browse the repository at this point in the history
Set replication_factor to None by default to let the Quix Cloud figure out the default value for the broker
  • Loading branch information
daniil-quix authored Mar 25, 2024
1 parent fe4c210 commit f0af284
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 4 deletions.
3 changes: 2 additions & 1 deletion quixstreams/platforms/quix/topic_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ class QuixTopicManager(TopicManager):
"""

_topic_partitions = 1
_topic_replication = 2
# Setting it to None to use defaults defined in Quix Cloud
_topic_replication = None
_max_topic_name_len = 249

_changelog_extra_config_defaults = {"cleanup.policy": "compact"}
Expand Down
11 changes: 8 additions & 3 deletions tests/test_quixstreams/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,8 +391,6 @@ def factory(workspace_id: Optional[str] = None):
),
)
quix_topic_manager._create_topics = topic_manager._create_topics
patcher = patch.object(quix_topic_manager, "_topic_replication", 1)
patcher.start()
return quix_topic_manager

return factory
Expand Down Expand Up @@ -428,6 +426,13 @@ def factory(
workspace_id: str = "my_ws",
) -> Application:
state_dir = state_dir or (tmp_path / "state").absolute()
topic_manager = quix_topic_manager_factory(workspace_id=workspace_id)
# Patch the topic manager to always set replication_factor to 1
# (normally QuixTopicManager will set it to None which is invalid for
# Kafka Admin API)
patcher = patch.object(topic_manager, "_topic_replication", 1)
patcher.start()

return Application.Quix(
consumer_group=random_consumer_group,
state_dir=state_dir,
Expand All @@ -443,7 +448,7 @@ def factory(
on_message_processed=on_message_processed,
auto_create_topics=auto_create_topics,
use_changelog_topics=use_changelog_topics,
topic_manager=quix_topic_manager_factory(workspace_id=workspace_id),
topic_manager=topic_manager,
)

return factory
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
from quixstreams.models import TopicConfig


class TestQuixTopicManager:
def test_quix_topic_name(self, quix_topic_manager_factory):
topic_name = "my_topic"
Expand All @@ -8,6 +11,8 @@ def test_quix_topic_name(self, quix_topic_manager_factory):
# should be the same regardless of workspace_id being included
assert topic_manager.topic(topic_name).name == expected
assert topic_manager.topic(expected).name == expected
# Replication factor should be None by default
assert topic_manager.topic(expected).config.replication_factor is None

def test_quix_changelog_topic(self, quix_topic_manager_factory):
"""
Expand Down Expand Up @@ -43,3 +48,14 @@ def test_quix_changelog_topic(self, quix_topic_manager_factory):
assert changelog.name == expected

assert topic_manager.changelog_topics[topic.name][store_name] == changelog

def test_quix_topic_custom_config(self, quix_topic_manager_factory):
topic_name = "my_topic"
workspace_id = "my_wid"
topic_manager = quix_topic_manager_factory(workspace_id=workspace_id)

config = TopicConfig(num_partitions=2, replication_factor=2)

topic = topic_manager.topic(topic_name, config=config)
assert topic.config.replication_factor == config.replication_factor
assert topic.config.num_partitions == config.num_partitions

0 comments on commit f0af284

Please sign in to comment.