Skip to content

Commit

Permalink
Merge pull request #583 from aiven/aiven-anton/refactor-schema-backup
Browse files Browse the repository at this point in the history
Refactor schema backups to separate API and backends
  • Loading branch information
jjaakola-aiven authored May 3, 2023
2 parents 2b2586c + 6bfadbb commit 4ddbd93
Show file tree
Hide file tree
Showing 11 changed files with 433 additions and 209 deletions.
1 change: 1 addition & 0 deletions .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ disable=
no-name-in-module,
use-list-literal,
use-dict-literal,
no-value-for-parameter,


[FORMAT]
Expand Down
313 changes: 112 additions & 201 deletions karapace/schema_backup.py → karapace/backup/api.py

Large diffs are not rendered by default.

104 changes: 104 additions & 0 deletions karapace/backup/backend.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
"""
Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""
from __future__ import annotations

from kafka.consumer.fetcher import ConsumerRecord
from karapace.typing import JsonData, JsonObject
from typing import Callable, Final, Generator, Generic, IO, Iterator, Optional, Sequence, TypeVar, Union
from typing_extensions import TypeAlias

import abc
import dataclasses
import logging

logger = logging.getLogger(__name__)


# Schema topic has single partition. Use of this in `producer.send` disables the
# partitioner to calculate which partition the data is sent.
PARTITION_ZERO: Final = 0


@dataclasses.dataclass(frozen=True)
class ProducerSend:
topic_name: str
value: bytes | None
key: bytes | None
headers: Sequence[tuple[bytes | None, bytes | None]] | None = None
partition: int | None = None
timestamp_ms: int | None = None


KeyEncoder: TypeAlias = Callable[[Union[JsonObject, str]], Optional[bytes]]
ValueEncoder: TypeAlias = Callable[[JsonData], Optional[bytes]]
B = TypeVar("B", bound="IO[bytes] | IO[str]")


class BaseBackupReader(abc.ABC, Generic[B]):
@abc.abstractmethod
def read(
self,
topic_name: str,
buffer: B,
) -> Iterator[ProducerSend]:
...


class BaseItemsBackupReader(BaseBackupReader[IO[str]]):
def __init__(
self,
key_encoder: KeyEncoder,
value_encoder: ValueEncoder,
) -> None:
self.key_encoder: Final = key_encoder
self.value_encoder: Final = value_encoder

@staticmethod
@abc.abstractmethod
def items_from_file(fp: IO[str]) -> Iterator[tuple[str, str]]:
...

def read(
self,
topic_name: str,
buffer: IO[str],
) -> Generator[ProducerSend, None, None]:
for item in self.items_from_file(buffer):
key, value = item
yield ProducerSend(
topic_name=topic_name,
key=self.key_encoder(key),
value=self.value_encoder(value),
partition=PARTITION_ZERO,
)


class BaseBackupWriter(abc.ABC, Generic[B]):
@classmethod
@abc.abstractmethod
def store_record(
cls,
buffer: B,
record: ConsumerRecord,
) -> None:
...


class BaseKVBackupWriter(BaseBackupWriter[IO[str]]):
@classmethod
def store_record(
cls,
buffer: IO[str],
record: ConsumerRecord,
) -> None:
buffer.write(cls.serialize_record(record.key, record.value))

@staticmethod
@abc.abstractmethod
def serialize_record(
key_bytes: bytes | None,
value_bytes: bytes | None,
) -> str:
...
83 changes: 83 additions & 0 deletions karapace/backup/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
"""
karapace - schema backup cli
Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""
from .api import BackupVersion, SchemaBackup
from .consumer import PollTimeout
from .errors import StaleConsumerError
from karapace.config import read_config

import argparse
import sys


def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Karapace schema backup tool")
subparsers = parser.add_subparsers(help="Schema backup command", dest="command", required=True)

parser_get = subparsers.add_parser("get", help="Store the schema backup into a file")
parser_restore = subparsers.add_parser("restore", help="Restore the schema backup from a file")
parser_export_anonymized_avro_schemas = subparsers.add_parser(
"export-anonymized-avro-schemas", help="Export anonymized Avro schemas into a file"
)
for p in (parser_get, parser_restore, parser_export_anonymized_avro_schemas):
p.add_argument("--config", help="Configuration file path", required=True)
p.add_argument("--location", default="", help="File path for the backup file")
p.add_argument("--topic", help="Kafka topic name to be used", required=False)

for p in (parser_get, parser_export_anonymized_avro_schemas):
p.add_argument("--overwrite", action="store_true", help="Overwrite --location even if it exists.")
p.add_argument("--poll-timeout", help=PollTimeout.__doc__, type=PollTimeout)

return parser.parse_args()


def main() -> None:
try:
args = parse_args()

with open(args.config, encoding="utf8") as handler:
config = read_config(handler)

sb = SchemaBackup(config, args.location, args.topic)

try:
if args.command == "get":
sb.create(
BackupVersion.V2,
poll_timeout=args.poll_timeout,
overwrite=args.overwrite,
)
elif args.command == "restore":
sb.restore_backup()
elif args.command == "export-anonymized-avro-schemas":
sb.create(
BackupVersion.ANONYMIZE_AVRO,
poll_timeout=args.poll_timeout,
overwrite=args.overwrite,
)
else:
# Only reachable if a new subcommand was added that is not mapped above. There are other ways with
# argparse to handle this, but all rely on the programmer doing exactly the right thing. Only switching
# to another CLI framework would provide the ability to not handle this situation manually while
# ensuring that it is not possible to add a new subcommand without also providing a handler for it.
raise SystemExit(f"Entered unreachable code, unknown command: {args.command!r}")
except StaleConsumerError as e:
print(
f"The Kafka consumer did not receive any records for partition {e.partition} of topic {e.topic!r} "
f"within the poll timeout ({e.poll_timeout} seconds) while trying to reach offset {e.end_offset:,} "
f"(start was {e.start_offset:,} and the last seen offset was {e.last_offset:,}).\n"
"\n"
"Try increasing --poll-timeout to give the broker more time.",
file=sys.stderr,
)
raise SystemExit(1) from e
except KeyboardInterrupt as e:
# Not an error -- user choice -- and thus should not end up in a Python stacktrace.
raise SystemExit(2) from e


if __name__ == "__main__":
main()
32 changes: 32 additions & 0 deletions karapace/backup/encoders.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""
Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""
from __future__ import annotations

from karapace.key_format import KeyFormatter
from karapace.typing import JsonData, JsonObject
from karapace.utils import json_decode, json_encode


def encode_key(
key: JsonObject | str,
key_formatter: KeyFormatter | None,
) -> bytes | None:
if key == "null":
return None
if not key_formatter:
if isinstance(key, str):
return key.encode("utf8")
return json_encode(key, sort_keys=False, binary=True, compact=False)
if isinstance(key, str):
key = json_decode(key, JsonObject)
return key_formatter.format_key(key)


def encode_value(value: JsonData) -> bytes | None:
if value == "null":
return None
if isinstance(value, str):
return value.encode("utf8")
return json_encode(value, compact=True, sort_keys=False, binary=True)
22 changes: 22 additions & 0 deletions karapace/backup/v1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
"""
Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""
from __future__ import annotations

from .backend import BaseItemsBackupReader
from karapace.utils import json_decode
from typing import Generator, IO, List, Tuple


class SchemaBackupV1Reader(BaseItemsBackupReader):
@staticmethod
def items_from_file(fp: IO[str]) -> Generator[tuple[str, str], None, None]:
raw_msg = fp.read()
# json_decode cannot really produce tuples. Typing was added in hindsight here,
# and it looks like _handle_restore_message has been lying about the type of
# item for some time already.
values = json_decode(raw_msg, List[Tuple[str, str]])
if not values:
return
yield from values
69 changes: 69 additions & 0 deletions karapace/backup/v2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
"""
Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""
from __future__ import annotations

from .backend import BaseItemsBackupReader, BaseKVBackupWriter
from karapace.anonymize_schemas import anonymize_avro
from karapace.utils import json_decode, json_encode
from typing import Any, Dict, Generator, IO

import base64


def serialize_record(key_bytes: bytes | None, value_bytes: bytes | None) -> str:
key = base64.b16encode(key_bytes).decode("utf8") if key_bytes is not None else "null"
value = base64.b16encode(value_bytes).decode("utf8") if value_bytes is not None else "null"
return f"{key}\t{value}\n"


class SchemaBackupV2Writer(BaseKVBackupWriter):
@staticmethod
def serialize_record(
key_bytes: bytes | None,
value_bytes: bytes | None,
) -> str:
return serialize_record(key_bytes, value_bytes)


class AnonymizeAvroWriter(BaseKVBackupWriter):
@staticmethod
def serialize_record(
key_bytes: bytes | None,
value_bytes: bytes | None,
) -> str:
if key_bytes is None:
raise RuntimeError("Cannot Avro-encode message with key_bytes=None")
if value_bytes is None:
raise RuntimeError("Cannot Avro-encode message with value_bytes=None")
# Check that the message has key `schema` and type is Avro schema.
# The Avro schemas may have `schemaType` key, if not present the schema is Avro.

key = json_decode(key_bytes, Dict[str, str])
value = json_decode(value_bytes, Dict[str, str])

if value and "schema" in value and value.get("schemaType", "AVRO") == "AVRO":
original_schema: Any = json_decode(value["schema"])
anonymized_schema = anonymize_avro.anonymize(original_schema)
if anonymized_schema:
value["schema"] = json_encode(anonymized_schema, compact=True, sort_keys=False)
if value and "subject" in value:
value["subject"] = anonymize_avro.anonymize_name(value["subject"])
# The schemas topic contain all changes to schema metadata.
if key.get("subject", None):
key["subject"] = anonymize_avro.anonymize_name(key["subject"])
return serialize_record(
json_encode(key, compact=True, binary=True),
json_encode(value, compact=True, binary=True),
)


class SchemaBackupV2Reader(BaseItemsBackupReader):
@staticmethod
def items_from_file(fp: IO[str]) -> Generator[tuple[str, str], None, None]:
for line in fp:
hex_key, hex_value = (val.strip() for val in line.split("\t"))
key = base64.b16decode(hex_key).decode("utf8") if hex_key != "null" else hex_key
value = base64.b16decode(hex_value).decode("utf8") if hex_value != "null" else hex_value
yield key, value
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
entry_points={
"console_scripts": [
"karapace = karapace.karapace_all:main",
"karapace_schema_backup = karapace.schema_backup:main",
"karapace_schema_backup = karapace.backup.cli:main",
"karapace_mkpasswd = karapace.auth:main",
],
},
Expand Down
9 changes: 5 additions & 4 deletions tests/integration/test_schema_backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@
"""
from datetime import timedelta
from kafka import KafkaConsumer
from karapace.backup.api import BackupVersion, SchemaBackup
from karapace.backup.consumer import PollTimeout
from karapace.backup.errors import StaleConsumerError
from karapace.client import Client
from karapace.config import set_config_defaults
from karapace.kafka_rest_apis import KafkaRestAdminClient
from karapace.key_format import is_key_in_canonical_format
from karapace.schema_backup import PollTimeout, SchemaBackup, serialize_record
from karapace.utils import Expiration
from pathlib import Path
from tests.integration.utils.cluster import RegistryDescription
Expand Down Expand Up @@ -55,7 +56,7 @@ async def test_backup_get(
}
)
sb = SchemaBackup(config, str(backup_location))
sb.create(serialize_record)
sb.create(BackupVersion.V2)

# The backup file has been created
assert os.path.exists(backup_location)
Expand Down Expand Up @@ -93,7 +94,7 @@ async def test_backup_restore_and_get_non_schema_topic(
# Get the backup
backup_location = tmp_path / "non_schemas_topic.log"
sb = SchemaBackup(config, str(backup_location), topic_option=test_topic_name)
sb.create(serialize_record)
sb.create(BackupVersion.V2)
# The backup file has been created
assert os.path.exists(backup_location)

Expand Down Expand Up @@ -237,7 +238,7 @@ async def test_stale_consumer(
with mock.patch(f"{KafkaConsumer.__module__}.{KafkaConsumer.__qualname__}._poll_once") as poll_once_mock:
poll_once_mock.return_value = {}
SchemaBackup(config, str(tmp_path / "backup")).create(
serialize_record,
BackupVersion.V2,
poll_timeout=PollTimeout(timedelta(seconds=1)),
)
assert str(e.value) == f"{registry_cluster.schemas_topic}:0#0 (0,0) after PT1S"
4 changes: 2 additions & 2 deletions tests/integration/test_schema_backup_avro_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""
from karapace.backup.api import BackupVersion, SchemaBackup
from karapace.client import Client
from karapace.config import set_config_defaults
from karapace.schema_backup import anonymize_avro_schema_message, SchemaBackup
from karapace.utils import json_encode
from pathlib import Path
from tests.integration.utils.cluster import RegistryDescription
Expand Down Expand Up @@ -116,7 +116,7 @@ async def test_export_anonymized_avro_schemas(
}
)
sb = SchemaBackup(config, str(export_location))
sb.create(anonymize_avro_schema_message)
sb.create(BackupVersion.ANONYMIZE_AVRO)

# The export file has been created
assert os.path.exists(export_location)
Expand Down
Loading

0 comments on commit 4ddbd93

Please sign in to comment.