Skip to content

Commit

Permalink
feature: Enable logging in backup tool
Browse files Browse the repository at this point in the history
  • Loading branch information
aiven-anton committed Sep 14, 2023
1 parent a33b4e1 commit 45c913e
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 34 deletions.
12 changes: 6 additions & 6 deletions karapace/backup/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import json
import logging
import math
import sys
import textwrap

__all__ = (
Expand Down Expand Up @@ -166,7 +165,7 @@ def before_sleep(it: RetryCallState) -> None:
result = f"failed ({outcome.exception()})"
else:
result = f"returned {outcome.result()!r}"
print(f"{description} {result}, retrying... (Ctrl+C to abort)", file=sys.stderr)
LOG.info(f"{description} {result}, retrying... (Ctrl+C to abort)") # noqa: W1203

return before_sleep

Expand Down Expand Up @@ -627,12 +626,14 @@ class VerifyLevel(enum.Enum):


def verify(backup_location: ExistingFile, level: VerifyLevel) -> None:
console = Console()
error_console = Console(stderr=True)

backup_version = BackupVersion.identify(backup_location)

if backup_version is not BackupVersion.V3:
print(
f"Only backups using format {BackupVersion.V3.name} can be verified, found {backup_version.name}.",
file=sys.stderr,
error_console.print(
f"Only backups using format {BackupVersion.V3.name} can be verified, found {backup_version.name}."
)
raise SystemExit(1)

Expand All @@ -646,7 +647,6 @@ def verify(backup_location: ExistingFile, level: VerifyLevel) -> None:
else:
assert_never(level)

console = Console()
success = True
verified_files = 0

Expand Down
53 changes: 35 additions & 18 deletions karapace/backup/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,16 @@
from .poll_timeout import PollTimeout
from karapace.backup.api import VerifyLevel
from karapace.config import Config, read_config
from typing import Iterator

import argparse
import contextlib
import logging
import sys
import traceback

logger = logging.getLogger(__name__)


def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Karapace schema backup tool")
Expand All @@ -29,8 +34,10 @@ def parse_args() -> argparse.Namespace:
"export-anonymized-avro-schemas", help="Export anonymized Avro schemas into a file"
)

# Options shared by all subparsers.
for p in (parser_get, parser_restore, parser_inspect, parser_verify, parser_export_anonymized_avro_schemas):
p.add_argument("--location", default="", help="File path for the backup file")
p.add_argument("--verbose", default=False, action="store_true", help="Enable debug logging.")

for p in (parser_get, parser_restore, parser_export_anonymized_avro_schemas):
p.add_argument("--config", help="Configuration file path", required=True)
Expand Down Expand Up @@ -77,6 +84,11 @@ def get_config(args: argparse.Namespace) -> Config:


def dispatch(args: argparse.Namespace) -> None:
logging.basicConfig(
stream=sys.stderr,
level=logging.DEBUG if args.verbose else logging.INFO,
)

location = api.normalize_location(args.location)

if args.command == "get":
Expand Down Expand Up @@ -124,29 +136,34 @@ def dispatch(args: argparse.Namespace) -> None:
raise NotImplementedError(f"Unknown command: {args.command!r}")


def main() -> None:
@contextlib.contextmanager
def handle_keyboard_interrupt() -> Iterator[None]:
try:
args = parse_args()

try:
dispatch(args)
# TODO: This specific treatment of StaleConsumerError looks quite misplaced
# here, and should probably be pushed down into the (internal) API layer.
except StaleConsumerError as e:
print(
f"The Kafka consumer did not receive any records for partition {e.topic_partition.partition} of topic "
f"{e.topic_partition.topic!r} "
f"within the poll timeout ({e.poll_timeout} seconds) while trying to reach offset {e.end_offset:,} "
f"(start was {e.start_offset:,} and the last seen offset was {e.last_offset:,}).\n"
"\n"
"Try increasing --poll-timeout to give the broker more time.",
file=sys.stderr,
)
raise SystemExit(1) from e
yield
except KeyboardInterrupt as e:
# Not an error -- user choice -- and thus should not end up in a Python stacktrace.
raise SystemExit(2) from e


@handle_keyboard_interrupt()
def main() -> None:
args = parse_args()

try:
dispatch(args)
# TODO: This specific treatment of StaleConsumerError looks quite misplaced
# here, and should probably be pushed down into the (internal) API layer.
except StaleConsumerError as e:
logger.error( # noqa: W1203
f"The Kafka consumer did not receive any records for partition {e.topic_partition.partition} of topic "
f"{e.topic_partition.topic!r} "
f"within the poll timeout ({e.poll_timeout} seconds) while trying to reach offset {e.end_offset:,} "
f"(start was {e.start_offset:,} and the last seen offset was {e.last_offset:,}).\n"
"\n"
"Try increasing --poll-timeout to give the broker more time.",
)
raise SystemExit(1) from e


if __name__ == "__main__":
main()
23 changes: 13 additions & 10 deletions tests/integration/backup/test_v3_backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,16 @@

import datetime
import json
import logging
import os
import pytest
import secrets
import shutil
import subprocess
import textwrap

logger = logging.getLogger(__name__)


@pytest.fixture(scope="function", name="karapace_config")
def config_fixture(
Expand Down Expand Up @@ -350,9 +353,9 @@ def test_exits_with_return_code_3_for_data_restoration_error(
try:
admin_client.delete_topics([topic_name])
except UnknownTopicOrPartitionError:
print("No previously existing topic.")
logger.info("No previously existing topic.")
else:
print("Deleted topic from previous run.")
logger.info("Deleted topic from previous run.")

admin_client.create_topics([NewTopic(topic_name, 1, 1)])
with pytest.raises(subprocess.CalledProcessError) as er:
Expand Down Expand Up @@ -390,9 +393,9 @@ def test_roundtrip_from_file(
try:
admin_client.delete_topics([topic_name])
except UnknownTopicOrPartitionError:
print("No previously existing topic.")
logger.info("No previously existing topic.")
else:
print("Deleted topic from previous run.")
logger.info("Deleted topic from previous run.")

# Execute backup restoration.
subprocess.run(
Expand Down Expand Up @@ -483,9 +486,9 @@ def test_roundtrip_from_file_skipping_topic_creation(
try:
admin_client.delete_topics([topic_name])
except UnknownTopicOrPartitionError:
print("No previously existing topic.")
logger.info("No previously existing topic.")
else:
print("Deleted topic from previous run.")
logger.info("Deleted topic from previous run.")

admin_client.create_topics(
[NewTopic(topic_name, 1, 1)],
Expand Down Expand Up @@ -578,9 +581,9 @@ def test_backup_restoration_fails_when_topic_does_not_exist_and_skip_creation_is
try:
admin_client.delete_topics([topic_name])
except UnknownTopicOrPartitionError:
print("No previously existing topic.")
logger.info("No previously existing topic.")
else:
print("Deleted topic from previous run.")
logger.info("Deleted topic from previous run.")

config = set_config_defaults(
{
Expand Down Expand Up @@ -632,9 +635,9 @@ def test_producer_raises_exceptions(
try:
admin_client.delete_topics([topic_name])
except UnknownTopicOrPartitionError:
print("No previously existing topic.")
logger.info("No previously existing topic.")
else:
print("Deleted topic from previous run.")
logger.info("Deleted topic from previous run.")

config = set_config_defaults(
{
Expand Down

0 comments on commit 45c913e

Please sign in to comment.