Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backup: add command line flag to override replication factor in restore command #988

Merged
merged 1 commit into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions src/karapace/backup/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,13 +373,20 @@ def _handle_restore_topic(
instruction: RestoreTopic,
config: Config,
skip_topic_creation: bool = False,
override_replication_factor: int | None = None,
) -> None:
if skip_topic_creation:
return
repl_factor = instruction.replication_factor
if override_replication_factor is not None:
LOG.info(
"Overriding replication factor with: %d (was: %d)", override_replication_factor, instruction.replication_factor
)
repl_factor = override_replication_factor
if not _maybe_create_topic(
config=config,
name=instruction.topic_name,
replication_factor=instruction.replication_factor,
replication_factor=repl_factor,
topic_configs=instruction.topic_configs,
):
raise BackupTopicAlreadyExists(f"Topic to restore '{instruction.topic_name}' already exists")
Expand Down Expand Up @@ -426,6 +433,7 @@ def restore_backup(
backup_location: ExistingFile,
topic_name: TopicName,
skip_topic_creation: bool = False,
override_replication_factor: int | None = None,
) -> None:
"""Restores a backup from the specified location into the configured topic.

Expand Down Expand Up @@ -475,7 +483,7 @@ def _check_producer_exception() -> None:
_handle_restore_topic_legacy(instruction, config, skip_topic_creation)
producer = stack.enter_context(_producer(config, instruction.topic_name))
elif isinstance(instruction, RestoreTopic):
_handle_restore_topic(instruction, config, skip_topic_creation)
_handle_restore_topic(instruction, config, skip_topic_creation, override_replication_factor)
producer = stack.enter_context(_producer(config, instruction.topic_name))
elif isinstance(instruction, ProducerSend):
if producer is None:
Expand Down
10 changes: 10 additions & 0 deletions src/karapace/backup/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,15 @@ def parse_args() -> argparse.Namespace:
),
)

parser_restore.add_argument(
"--override-replication-factor",
help=(
"Override the replication factor that is save in the backup. This is needed when restoring a backup from a"
"downsized cluster (like scaling down from 6 to 3 nodes). This has effect only for V3 backups."
),
type=int,
)

return parser.parse_args()


Expand Down Expand Up @@ -115,6 +124,7 @@ def dispatch(args: argparse.Namespace) -> None:
backup_location=api.locate_backup_file(location),
topic_name=api.normalize_topic_name(args.topic, config),
skip_topic_creation=args.skip_topic_creation,
override_replication_factor=args.override_replication_factor,
)
except BackupDataRestorationError:
traceback.print_exc()
Expand Down
52 changes: 51 additions & 1 deletion tests/integration/backup/test_v3_backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"""
from __future__ import annotations

from aiokafka.errors import UnknownTopicOrPartitionError
from aiokafka.errors import InvalidReplicationFactorError, UnknownTopicOrPartitionError
from collections.abc import Iterator
from confluent_kafka import Message, TopicPartition
from confluent_kafka.admin import NewTopic
Expand Down Expand Up @@ -698,6 +698,56 @@ def __exit__(self, exc_type, exc_value, exc_traceback):
)


def test_backup_restoration_override_replication_factor(
admin_client: KafkaAdminClient,
kafka_servers: KafkaServers,
producer: KafkaProducer,
new_topic: NewTopic,
) -> None:
backup_directory = Path(__file__).parent.parent.resolve() / "test_data" / "backup_v3_single_partition" / new_topic.topic
metadata_path = backup_directory / f"{new_topic.topic}.metadata"
config = set_config_defaults(
{
"bootstrap_uri": kafka_servers.bootstrap_servers,
}
)

# pupulate the topic and create a backup
for i in range(10):
producer.send(
new_topic.topic,
key=f"message-key-{i}",
value=f"message-value-{i}-" + 1000 * "X",
)
producer.flush()
api.create_backup(
config=config,
backup_location=backup_directory,
topic_name=TopicName(new_topic.topic),
version=BackupVersion.V3,
replication_factor=6,
)

# make sure topic doesn't exist beforehand.
_delete_topic(admin_client, new_topic.topic)

# assert that the restore would fail without the replication factor override
with pytest.raises(InvalidReplicationFactorError):
api.restore_backup(
config=config,
backup_location=metadata_path,
topic_name=TopicName(new_topic.topic),
)

# finally restore the backup with override
api.restore_backup(
config=config,
backup_location=metadata_path,
topic_name=TopicName(new_topic.topic),
override_replication_factor=1,
)


def no_color_env() -> dict[str, str]:
env = os.environ.copy()
try:
Expand Down
Loading