Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: Enable logging in backup tool #714

Merged
merged 1 commit into from
Sep 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions karapace/backup/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import json
import logging
import math
import sys
import textwrap

__all__ = (
Expand Down Expand Up @@ -166,7 +165,7 @@ def before_sleep(it: RetryCallState) -> None:
result = f"failed ({outcome.exception()})"
else:
result = f"returned {outcome.result()!r}"
print(f"{description} {result}, retrying... (Ctrl+C to abort)", file=sys.stderr)
LOG.info(f"{description} {result}, retrying... (Ctrl+C to abort)") # pylint: disable=logging-fstring-interpolation

return before_sleep

Expand Down Expand Up @@ -627,12 +626,14 @@ class VerifyLevel(enum.Enum):


def verify(backup_location: ExistingFile, level: VerifyLevel) -> None:
console = Console()
error_console = Console(stderr=True)

backup_version = BackupVersion.identify(backup_location)

if backup_version is not BackupVersion.V3:
print(
f"Only backups using format {BackupVersion.V3.name} can be verified, found {backup_version.name}.",
file=sys.stderr,
error_console.print(
f"Only backups using format {BackupVersion.V3.name} can be verified, found {backup_version.name}."
)
raise SystemExit(1)

Expand All @@ -646,7 +647,6 @@ def verify(backup_location: ExistingFile, level: VerifyLevel) -> None:
else:
assert_never(level)

console = Console()
success = True
verified_files = 0

Expand Down
53 changes: 35 additions & 18 deletions karapace/backup/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,16 @@
from .poll_timeout import PollTimeout
from karapace.backup.api import VerifyLevel
from karapace.config import Config, read_config
from typing import Iterator

import argparse
import contextlib
import logging
import sys
import traceback

logger = logging.getLogger(__name__)


def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Karapace schema backup tool")
Expand All @@ -29,8 +34,10 @@ def parse_args() -> argparse.Namespace:
"export-anonymized-avro-schemas", help="Export anonymized Avro schemas into a file"
)

# Options shared by all subparsers.
for p in (parser_get, parser_restore, parser_inspect, parser_verify, parser_export_anonymized_avro_schemas):
p.add_argument("--location", default="", help="File path for the backup file")
p.add_argument("--verbose", default=False, action="store_true", help="Enable debug logging.")

for p in (parser_get, parser_restore, parser_export_anonymized_avro_schemas):
p.add_argument("--config", help="Configuration file path", required=True)
Expand Down Expand Up @@ -77,6 +84,11 @@ def get_config(args: argparse.Namespace) -> Config:


def dispatch(args: argparse.Namespace) -> None:
logging.basicConfig(
stream=sys.stderr,
level=logging.DEBUG if args.verbose else logging.INFO,
)

location = api.normalize_location(args.location)

if args.command == "get":
Expand Down Expand Up @@ -124,29 +136,34 @@ def dispatch(args: argparse.Namespace) -> None:
raise NotImplementedError(f"Unknown command: {args.command!r}")


def main() -> None:
@contextlib.contextmanager
def handle_keyboard_interrupt() -> Iterator[None]:
try:
args = parse_args()

try:
dispatch(args)
# TODO: This specific treatment of StaleConsumerError looks quite misplaced
# here, and should probably be pushed down into the (internal) API layer.
except StaleConsumerError as e:
print(
f"The Kafka consumer did not receive any records for partition {e.topic_partition.partition} of topic "
f"{e.topic_partition.topic!r} "
f"within the poll timeout ({e.poll_timeout} seconds) while trying to reach offset {e.end_offset:,} "
f"(start was {e.start_offset:,} and the last seen offset was {e.last_offset:,}).\n"
"\n"
"Try increasing --poll-timeout to give the broker more time.",
file=sys.stderr,
)
raise SystemExit(1) from e
yield
except KeyboardInterrupt as e:
# Not an error -- user choice -- and thus should not end up in a Python stacktrace.
raise SystemExit(2) from e


@handle_keyboard_interrupt()
def main() -> None:
args = parse_args()

try:
dispatch(args)
# TODO: This specific treatment of StaleConsumerError looks quite misplaced
# here, and should probably be pushed down into the (internal) API layer.
except StaleConsumerError as e:
logger.error( # pylint: disable=logging-fstring-interpolation
f"The Kafka consumer did not receive any records for partition {e.topic_partition.partition} of topic "
f"{e.topic_partition.topic!r} "
f"within the poll timeout ({e.poll_timeout} seconds) while trying to reach offset {e.end_offset:,} "
f"(start was {e.start_offset:,} and the last seen offset was {e.last_offset:,}).\n"
"\n"
"Try increasing --poll-timeout to give the broker more time.",
)
raise SystemExit(1) from e


if __name__ == "__main__":
main()
23 changes: 13 additions & 10 deletions tests/integration/backup/test_v3_backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,16 @@

import datetime
import json
import logging
import os
import pytest
import secrets
import shutil
import subprocess
import textwrap

logger = logging.getLogger(__name__)


@pytest.fixture(scope="function", name="karapace_config")
def config_fixture(
Expand Down Expand Up @@ -350,9 +353,9 @@ def test_exits_with_return_code_3_for_data_restoration_error(
try:
admin_client.delete_topics([topic_name])
except UnknownTopicOrPartitionError:
print("No previously existing topic.")
logger.info("No previously existing topic.")
else:
print("Deleted topic from previous run.")
logger.info("Deleted topic from previous run.")

admin_client.create_topics([NewTopic(topic_name, 1, 1)])
with pytest.raises(subprocess.CalledProcessError) as er:
Expand Down Expand Up @@ -390,9 +393,9 @@ def test_roundtrip_from_file(
try:
admin_client.delete_topics([topic_name])
except UnknownTopicOrPartitionError:
print("No previously existing topic.")
logger.info("No previously existing topic.")
else:
print("Deleted topic from previous run.")
logger.info("Deleted topic from previous run.")

# Execute backup restoration.
subprocess.run(
Expand Down Expand Up @@ -483,9 +486,9 @@ def test_roundtrip_from_file_skipping_topic_creation(
try:
admin_client.delete_topics([topic_name])
except UnknownTopicOrPartitionError:
print("No previously existing topic.")
logger.info("No previously existing topic.")
else:
print("Deleted topic from previous run.")
logger.info("Deleted topic from previous run.")

admin_client.create_topics(
[NewTopic(topic_name, 1, 1)],
Expand Down Expand Up @@ -578,9 +581,9 @@ def test_backup_restoration_fails_when_topic_does_not_exist_and_skip_creation_is
try:
admin_client.delete_topics([topic_name])
except UnknownTopicOrPartitionError:
print("No previously existing topic.")
logger.info("No previously existing topic.")
else:
print("Deleted topic from previous run.")
logger.info("Deleted topic from previous run.")

config = set_config_defaults(
{
Expand Down Expand Up @@ -632,9 +635,9 @@ def test_producer_raises_exceptions(
try:
admin_client.delete_topics([topic_name])
except UnknownTopicOrPartitionError:
print("No previously existing topic.")
logger.info("No previously existing topic.")
else:
print("Deleted topic from previous run.")
logger.info("Deleted topic from previous run.")

config = set_config_defaults(
{
Expand Down