From 6f66eb3e65416978ee97b392a40259a5c3ebc3c8 Mon Sep 17 00:00:00 2001 From: Francesco D'Orlandi Date: Thu, 31 Oct 2024 18:46:52 +0100 Subject: [PATCH] backup: add command line flag to override replication factor in restore command --- src/karapace/backup/api.py | 11 ++++- src/karapace/backup/cli.py | 10 ++++ tests/integration/backup/test_v3_backup.py | 54 +++++++++++++++++++++- 3 files changed, 72 insertions(+), 3 deletions(-) diff --git a/src/karapace/backup/api.py b/src/karapace/backup/api.py index d06c99ebe..a9b527e22 100644 --- a/src/karapace/backup/api.py +++ b/src/karapace/backup/api.py @@ -373,13 +373,19 @@ def _handle_restore_topic( instruction: RestoreTopic, config: Config, skip_topic_creation: bool = False, + override_replication_factor: int | None = None, ) -> None: if skip_topic_creation: return + repl_factor = instruction.replication_factor + if override_replication_factor is not None: + LOG.info("Overriding replication factor with: %d (was: %d)", + override_replication_factor, instruction.replication_factor) + repl_factor = override_replication_factor if not _maybe_create_topic( config=config, name=instruction.topic_name, - replication_factor=instruction.replication_factor, + replication_factor=repl_factor, topic_configs=instruction.topic_configs, ): raise BackupTopicAlreadyExists(f"Topic to restore '{instruction.topic_name}' already exists") @@ -426,6 +432,7 @@ def restore_backup( backup_location: ExistingFile, topic_name: TopicName, skip_topic_creation: bool = False, + override_replication_factor: int | None = None, ) -> None: """Restores a backup from the specified location into the configured topic. @@ -475,7 +482,7 @@ def _check_producer_exception() -> None: _handle_restore_topic_legacy(instruction, config, skip_topic_creation) producer = stack.enter_context(_producer(config, instruction.topic_name)) elif isinstance(instruction, RestoreTopic): - _handle_restore_topic(instruction, config, skip_topic_creation) + _handle_restore_topic(instruction, config, skip_topic_creation, override_replication_factor) producer = stack.enter_context(_producer(config, instruction.topic_name)) elif isinstance(instruction, ProducerSend): if producer is None: diff --git a/src/karapace/backup/cli.py b/src/karapace/backup/cli.py index 5e3d72854..7125b1e04 100644 --- a/src/karapace/backup/cli.py +++ b/src/karapace/backup/cli.py @@ -76,6 +76,15 @@ def parse_args() -> argparse.Namespace: ), ) + parser_restore.add_argument( + "--override-replication-factor", + help=( + "Override the replication factor that is save in the backup. This is needed when restoring a backup from a" + "downsized cluster (like scaling down from 6 to 3 nodes). This has effect only for V3 backups." + ), + type=int, + ) + return parser.parse_args() @@ -115,6 +124,7 @@ def dispatch(args: argparse.Namespace) -> None: backup_location=api.locate_backup_file(location), topic_name=api.normalize_topic_name(args.topic, config), skip_topic_creation=args.skip_topic_creation, + override_replication_factor=args.override_replication_factor, ) except BackupDataRestorationError: traceback.print_exc() diff --git a/tests/integration/backup/test_v3_backup.py b/tests/integration/backup/test_v3_backup.py index 332b09f0a..83a2028ce 100644 --- a/tests/integration/backup/test_v3_backup.py +++ b/tests/integration/backup/test_v3_backup.py @@ -4,7 +4,7 @@ """ from __future__ import annotations -from aiokafka.errors import UnknownTopicOrPartitionError +from aiokafka.errors import InvalidReplicationFactorError, UnknownTopicOrPartitionError from collections.abc import Iterator from confluent_kafka import Message, TopicPartition from confluent_kafka.admin import NewTopic @@ -698,6 +698,58 @@ def __exit__(self, exc_type, exc_value, exc_traceback): ) +def test_backup_restoration_override_replication_factor( + admin_client: KafkaAdminClient, + kafka_servers: KafkaServers, + producer: KafkaProducer, + new_topic: NewTopic, +) -> None: + topic_name = new_topic.topic + backup_directory = Path(__file__).parent.parent.resolve() / "test_data" / "backup_v3_single_partition" / topic_name + metadata_path = backup_directory / f"{topic_name}.metadata" + config = set_config_defaults( + { + "bootstrap_uri": kafka_servers.bootstrap_servers, + "topic_name": new_topic.topic, + } + ) + + # pupulate the topic and create a backup + for i in range(10): + producer.send( + new_topic.topic, + key=f"message-key-{i}", + value=f"message-value-{i}-" + 1000 * "X", + ) + producer.flush() + api.create_backup( + config=config, + backup_location=backup_directory, + topic_name=api.normalize_topic_name(None, config), + version=BackupVersion.V3, + replication_factor=6, + ) + + # make sure topic doesn't exist beforehand. + _delete_topic(admin_client, topic_name) + + # assert that the restore would fail without the replication factor override + with pytest.raises(InvalidReplicationFactorError): + api.restore_backup( + config=config, + backup_location=metadata_path, + topic_name=TopicName(topic_name), + ) + + # finally restore the backup with override + api.restore_backup( + config=config, + backup_location=metadata_path, + topic_name=TopicName(topic_name), + override_replication_factor=1, + ) + + def no_color_env() -> dict[str, str]: env = os.environ.copy() try: