diff --git a/karapace/backup/api.py b/karapace/backup/api.py index 04cc8f989..dfa2f545b 100644 --- a/karapace/backup/api.py +++ b/karapace/backup/api.py @@ -24,24 +24,23 @@ from concurrent.futures import Future from enum import Enum from functools import partial -from kafka import KafkaConsumer -from kafka.consumer.fetcher import ConsumerRecord from kafka.errors import KafkaError, TopicAlreadyExistsError -from kafka.structs import TopicPartition from karapace import constants from karapace.backup.backends.v1 import SchemaBackupV1Reader from karapace.backup.backends.v2 import AnonymizeAvroWriter, SchemaBackupV2Reader, SchemaBackupV2Writer, V2_MARKER from karapace.backup.backends.v3.backend import SchemaBackupV3Reader, SchemaBackupV3Writer, VerifyFailure, VerifySuccess from karapace.config import Config from karapace.kafka.admin import KafkaAdminClient +from karapace.kafka.consumer import KafkaConsumer from karapace.kafka.producer import KafkaProducer +from karapace.kafka.types import Message, TopicPartition from karapace.kafka_utils import kafka_admin_from_config, kafka_consumer_from_config, kafka_producer_from_config from karapace.key_format import KeyFormatter from karapace.utils import assert_never from pathlib import Path from rich.console import Console from tenacity import retry, retry_if_exception_type, RetryCallState, stop_after_delay, wait_fixed -from typing import Callable, Collection, Iterator, Literal, Mapping, NewType, Sized, TypeVar +from typing import Callable, Iterator, Literal, Mapping, NewType, Sized, TypeVar import contextlib import datetime @@ -282,9 +281,8 @@ def _consume_records( consumer: KafkaConsumer, topic_partition: TopicPartition, poll_timeout: PollTimeout, -) -> Iterator[ConsumerRecord]: - start_offset: int = consumer.beginning_offsets([topic_partition])[topic_partition] - end_offset: int = consumer.end_offsets([topic_partition])[topic_partition] +) -> Iterator[Message]: + start_offset, end_offset = consumer.get_watermark_offsets(topic_partition) last_offset = start_offset LOG.info( @@ -301,12 +299,11 @@ def _consume_records( end_offset -= 1 # high watermark to actual end offset while True: - records: Collection[ConsumerRecord] = consumer.poll(poll_timeout.milliseconds).get(topic_partition, []) - if len(records) == 0: + record: Message | None = consumer.poll(timeout=poll_timeout.seconds) + if record is None: raise StaleConsumerError(topic_partition, start_offset, end_offset, last_offset, poll_timeout) - for record in records: - yield record - last_offset = record.offset # pylint: disable=undefined-loop-variable + yield record + last_offset = record.offset() if last_offset >= end_offset: break @@ -528,6 +525,7 @@ def create_backup( with _consumer(config, topic_name) as consumer: (partition,) = consumer.partitions_for_topic(topic_name) topic_partition = TopicPartition(topic_name, partition) + consumer.assign([topic_partition]) try: data_file = _write_partition( diff --git a/karapace/backup/backends/v3/backend.py b/karapace/backup/backends/v3/backend.py index bd9b35dbd..a670287c3 100644 --- a/karapace/backup/backends/v3/backend.py +++ b/karapace/backup/backends/v3/backend.py @@ -10,11 +10,11 @@ from .schema import ChecksumAlgorithm, DataFile, Header, Metadata, Record from .writers import write_metadata, write_record from dataclasses import dataclass -from kafka.consumer.fetcher import ConsumerRecord from karapace.backup.backends.reader import BaseBackupReader, Instruction, ProducerSend, RestoreTopic from karapace.backup.backends.writer import BytesBackupWriter, StdOut from karapace.backup.safe_writer import bytes_writer, staging_directory from karapace.dataclasses import default_dataclass +from karapace.kafka.types import Message from karapace.utils import assert_never from karapace.version import __version__ from pathlib import Path @@ -334,27 +334,31 @@ def store_metadata( def store_record( self, buffer: IO[bytes], - record: ConsumerRecord, + record: Message, ) -> None: - stats: Final = self._partition_stats[record.partition] + stats: Final = self._partition_stats[record.partition()] checksum_checkpoint: Final = stats.get_checkpoint( records_threshold=self._max_records_per_checkpoint, bytes_threshold=self._max_bytes_per_checkpoint, ) offset_start: Final = buffer.tell() + + record_key = record.key() + record_value = record.value() + write_record( buffer, record=Record( - key=record.key, - value=record.value, - headers=tuple(Header(key=key.encode(), value=value) for key, value in record.headers), - offset=record.offset, - timestamp=record.timestamp, + key=record_key.encode() if isinstance(record_key, str) else record_key, + value=record_value.encode() if isinstance(record_value, str) else record_value, + headers=tuple(Header(key=key.encode(), value=value) for key, value in record.headers() or []), + offset=record.offset(), + timestamp=record.timestamp()[1], checksum_checkpoint=checksum_checkpoint, ), running_checksum=stats.running_checksum, ) stats.update( bytes_offset=buffer.tell() - offset_start, - record_offset=record.offset, + record_offset=record.offset(), ) diff --git a/karapace/backup/backends/writer.py b/karapace/backup/backends/writer.py index c2079eb04..70b1ee6c4 100644 --- a/karapace/backup/backends/writer.py +++ b/karapace/backup/backends/writer.py @@ -4,8 +4,8 @@ """ from __future__ import annotations -from kafka.consumer.fetcher import ConsumerRecord from karapace.backup.safe_writer import bytes_writer, str_writer +from karapace.kafka.types import Message from pathlib import Path from typing import ContextManager, Generic, IO, Iterator, Literal, Mapping, Sequence, TypeVar from typing_extensions import TypeAlias @@ -98,7 +98,7 @@ def store_metadata( def store_record( self, buffer: IO[B], - record: ConsumerRecord, + record: Message, ) -> None: """ Called in order for each record read from a topic to be backed up. It's safe to @@ -154,9 +154,16 @@ class BaseKVBackupWriter(StrBackupWriter, abc.ABC): def store_record( self, buffer: IO[str], - record: ConsumerRecord, + record: Message, ) -> None: - buffer.write(self.serialize_record(record.key, record.value)) + record_key = record.key() + record_value = record.value() + buffer.write( + self.serialize_record( + record_key.encode() if isinstance(record_key, str) else record_key, + record_value.encode() if isinstance(record_value, str) else record_value, + ) + ) @staticmethod @abc.abstractmethod diff --git a/karapace/backup/errors.py b/karapace/backup/errors.py index 364052cc5..cfa50ceb0 100644 --- a/karapace/backup/errors.py +++ b/karapace/backup/errors.py @@ -2,8 +2,8 @@ Copyright (c) 2023 Aiven Ltd See LICENSE for details """ -from kafka.structs import TopicPartition from karapace.backup.poll_timeout import PollTimeout +from karapace.kafka.types import TopicPartition __all__ = ["BackupError", "BackupTopicAlreadyExists", "EmptyPartition", "PartitionCountError", "StaleConsumerError"] diff --git a/karapace/backup/poll_timeout.py b/karapace/backup/poll_timeout.py index 0a1b9e157..91d5871f1 100644 --- a/karapace/backup/poll_timeout.py +++ b/karapace/backup/poll_timeout.py @@ -49,3 +49,8 @@ def __repr__(self) -> str: def milliseconds(self) -> int: """Returns this poll timeout in milliseconds, anything smaller than a milliseconds is ignored (no rounding).""" return self.__value // timedelta(milliseconds=1) + + @cached_property + def seconds(self) -> float: + """Returns this poll timeout in seconds.""" + return self.__value / timedelta(seconds=1) diff --git a/karapace/backup/topic_configurations.py b/karapace/backup/topic_configurations.py index 93b9ceacf..f9916565f 100644 --- a/karapace/backup/topic_configurations.py +++ b/karapace/backup/topic_configurations.py @@ -4,7 +4,8 @@ """ from __future__ import annotations -from karapace.kafka.admin import ConfigSource, KafkaAdminClient +from karapace.kafka.admin import KafkaAdminClient +from karapace.kafka.types import ConfigSource from typing import Container, Final ALL_CONFIG_SOURCES: Final = ConfigSource diff --git a/karapace/kafka/admin.py b/karapace/kafka/admin.py index 5b9d9e5ad..46323a61b 100644 --- a/karapace/kafka/admin.py +++ b/karapace/kafka/admin.py @@ -7,25 +7,25 @@ from collections.abc import Iterable from concurrent.futures import Future -from confluent_kafka import TopicPartition -from confluent_kafka.admin import ( - AdminClient, +from confluent_kafka.admin import AdminClient +from karapace.constants import TOPIC_CREATION_TIMEOUT_S +from karapace.kafka.common import ( + _KafkaConfigMixin, + raise_from_kafkaexception, + single_futmap_result, + UnknownTopicOrPartitionError, +) +from karapace.kafka.types import ( BrokerMetadata, ClusterMetadata, ConfigResource, ConfigSource, + KafkaException, NewTopic, OffsetSpec, ResourceType, TopicMetadata, -) -from confluent_kafka.error import KafkaException -from karapace.constants import TOPIC_CREATION_TIMEOUT_S -from karapace.kafka.common import ( - _KafkaConfigMixin, - raise_from_kafkaexception, - single_futmap_result, - UnknownTopicOrPartitionError, + TopicPartition, ) from typing import Container diff --git a/karapace/kafka/common.py b/karapace/kafka/common.py index cb38165c8..04e0d33b7 100644 --- a/karapace/kafka/common.py +++ b/karapace/kafka/common.py @@ -7,9 +7,9 @@ from collections.abc import Iterable from concurrent.futures import Future -from confluent_kafka.error import KafkaError, KafkaException from kafka.errors import AuthenticationFailedError, for_code, NoBrokersAvailable, UnknownTopicOrPartitionError -from typing import Any, Callable, NoReturn, Protocol, TypedDict, TypeVar +from karapace.kafka.types import KafkaError, KafkaException +from typing import Any, Callable, Literal, NoReturn, Protocol, TypedDict, TypeVar from typing_extensions import Unpack import logging @@ -85,6 +85,13 @@ class KafkaClientParams(TypedDict, total=False): ssl_certfile: str | None ssl_keyfile: str | None sasl_oauth_token_provider: TokenWithExpiryProvider + # Consumer-only + auto_offset_reset: Literal["smallest", "earliest", "beginning", "largest", "latest", "end", "error"] + enable_auto_commit: bool + fetch_max_wait_ms: int + group_id: str + request_timeout_ms: int + session_timeout_ms: int class _KafkaConfigMixin: @@ -128,6 +135,13 @@ def _get_config_from_params(self, bootstrap_servers: Iterable[str] | str, **para "ssl.certificate.location": params.get("ssl_certfile"), "ssl.key.location": params.get("ssl_keyfile"), "error_cb": self._error_callback, + # Consumer-only + "auto.offset.reset": params.get("auto_offset_reset"), + "enable.auto.commit": params.get("enable_auto_commit"), + "fetch.wait.max.ms": params.get("fetch_max_wait_ms"), + "group.id": params.get("group_id"), + "request.timeout.ms": params.get("request_timeout_ms"), + "session.timeout.ms": params.get("session_timeout_ms"), } config = {key: value for key, value in config.items() if value is not None} diff --git a/karapace/kafka/consumer.py b/karapace/kafka/consumer.py new file mode 100644 index 000000000..2b67871fd --- /dev/null +++ b/karapace/kafka/consumer.py @@ -0,0 +1,48 @@ +""" +Copyright (c) 2023 Aiven Ltd +See LICENSE for details +""" + +from __future__ import annotations + +from confluent_kafka import Consumer +from karapace.kafka.common import _KafkaConfigMixin, KafkaClientParams, raise_from_kafkaexception +from karapace.kafka.types import KafkaException, PartitionMetadata +from typing import Iterable +from typing_extensions import Unpack + +import secrets + +# A constant that corresponds to the default value of request.timeout.ms in +# the librdkafka C library +DEFAULT_REQUEST_TIMEOUT_MS = 30000 + + +class KafkaConsumer(_KafkaConfigMixin, Consumer): + def __init__( + self, + topic: str, + bootstrap_servers: Iterable[str] | str, + verify_connection: bool = True, + **params: Unpack[KafkaClientParams], + ) -> None: + # The `confluent_kafka.Consumer` does not allow for a missing group id + # if the client of this class does not provide one, we'll generate a + # unique group id to achieve the groupless behaviour + if "group_id" not in params: + params["group_id"] = self._create_group_id() + + super().__init__(bootstrap_servers, verify_connection, **params) + + self.subscribe([topic]) + + @staticmethod + def _create_group_id() -> str: + return f"karapace-{secrets.token_hex(6)}" + + def partitions_for_topic(self, topic: str) -> dict[int, PartitionMetadata]: + """Returns all partition metadata for the given topic.""" + try: + return self.list_topics(topic).topics[topic].partitions + except KafkaException as exc: + raise_from_kafkaexception(exc) diff --git a/karapace/kafka/producer.py b/karapace/kafka/producer.py index 0caecb4c2..0b7f7ac83 100644 --- a/karapace/kafka/producer.py +++ b/karapace/kafka/producer.py @@ -6,11 +6,10 @@ from __future__ import annotations from concurrent.futures import Future -from confluent_kafka import Message, Producer -from confluent_kafka.admin import PartitionMetadata -from confluent_kafka.error import KafkaError, KafkaException +from confluent_kafka import Producer from functools import partial from karapace.kafka.common import _KafkaConfigMixin, raise_from_kafkaexception, translate_from_kafkaerror +from karapace.kafka.types import KafkaError, KafkaException, Message, PartitionMetadata from typing import cast, TypedDict from typing_extensions import Unpack diff --git a/karapace/kafka/types.py b/karapace/kafka/types.py new file mode 100644 index 000000000..3769bec19 --- /dev/null +++ b/karapace/kafka/types.py @@ -0,0 +1,49 @@ +""" +Copyright (c) 2023 Aiven Ltd +See LICENSE for details +""" + +from confluent_kafka import ( + Message, + TIMESTAMP_CREATE_TIME, + TIMESTAMP_LOG_APPEND_TIME, + TIMESTAMP_NOT_AVAILABLE, + TopicPartition, +) +from confluent_kafka.admin import ( + BrokerMetadata, + ClusterMetadata, + ConfigResource, + ConfigSource, + NewTopic, + OffsetSpec, + PartitionMetadata, + ResourceType, + TopicMetadata, +) +from confluent_kafka.error import KafkaError, KafkaException + +import enum + +__all__ = ( + "BrokerMetadata", + "ClusterMetadata", + "ConfigResource", + "ConfigSource", + "ConfigSource", + "KafkaError", + "KafkaException", + "Message", + "NewTopic", + "OffsetSpec", + "PartitionMetadata", + "ResourceType", + "TopicMetadata", + "TopicPartition", +) + + +class Timestamp(enum.IntEnum): + NOT_AVAILABLE = TIMESTAMP_NOT_AVAILABLE + CREATE_TIME = TIMESTAMP_CREATE_TIME + LOG_APPEND_TIME = TIMESTAMP_LOG_APPEND_TIME diff --git a/karapace/kafka_rest_apis/__init__.py b/karapace/kafka_rest_apis/__init__.py index b9b7b28aa..e55c0d945 100644 --- a/karapace/kafka_rest_apis/__init__.py +++ b/karapace/kafka_rest_apis/__init__.py @@ -14,7 +14,8 @@ ) from karapace.config import Config, create_client_ssl_context from karapace.errors import InvalidSchema -from karapace.kafka.admin import KafkaAdminClient, KafkaException +from karapace.kafka.admin import KafkaAdminClient +from karapace.kafka.types import KafkaException from karapace.kafka_rest_apis.authentication import ( get_auth_config_from_header, get_expiration_time_from_header, diff --git a/karapace/kafka_utils.py b/karapace/kafka_utils.py index c70cd530c..0e88026d7 100644 --- a/karapace/kafka_utils.py +++ b/karapace/kafka_utils.py @@ -3,9 +3,8 @@ See LICENSE for details """ from .config import Config -from .utils import KarapaceKafkaClient -from kafka import KafkaConsumer from karapace.kafka.admin import KafkaAdminClient +from karapace.kafka.consumer import KafkaConsumer from karapace.kafka.producer import KafkaProducer from typing import Iterator @@ -42,7 +41,6 @@ def kafka_consumer_from_config(config: Config, topic: str) -> Iterator[KafkaCons sasl_plain_password=config["sasl_plain_password"], auto_offset_reset="earliest", metadata_max_age_ms=config["metadata_max_age_ms"], - kafka_client=KarapaceKafkaClient, ) try: yield consumer diff --git a/karapace/master_coordinator.py b/karapace/master_coordinator.py index 45497daa6..41ddd3e1d 100644 --- a/karapace/master_coordinator.py +++ b/karapace/master_coordinator.py @@ -5,12 +5,12 @@ See LICENSE for details """ from dataclasses import dataclass -from kafka import KafkaConsumer from kafka.coordinator.base import BaseCoordinator from kafka.errors import NoBrokersAvailable, NodeNotReadyError from kafka.metrics import MetricConfig, Metrics from karapace import constants from karapace.config import Config +from karapace.kafka.consumer import DEFAULT_REQUEST_TIMEOUT_MS from karapace.typing import JsonData, JsonObject from karapace.utils import json_decode, json_encode, KarapaceKafkaClient from karapace.version import __version__ @@ -238,7 +238,7 @@ def init_schema_coordinator(self) -> None: election_strategy=self.config.get("master_election_strategy", "lowest"), group_id=self.config["group_id"], session_timeout_ms=session_timeout_ms, - request_timeout_ms=max(session_timeout_ms, KafkaConsumer.DEFAULT_CONFIG["request_timeout_ms"]), + request_timeout_ms=max(session_timeout_ms, DEFAULT_REQUEST_TIMEOUT_MS), ) self.schema_coordinator_ready.set() diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index b2fde9bbf..3df4ed8da 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -10,7 +10,6 @@ from contextlib import closing, ExitStack from enum import Enum from jsonschema.validators import Draft7Validator -from kafka import KafkaConsumer, TopicPartition from kafka.errors import ( InvalidReplicationFactorError, KafkaConfigurationError, @@ -25,6 +24,8 @@ from karapace.errors import InvalidReferences, InvalidSchema from karapace.in_memory_database import InMemoryDatabase from karapace.kafka.admin import KafkaAdminClient +from karapace.kafka.consumer import DEFAULT_REQUEST_TIMEOUT_MS, KafkaConsumer +from karapace.kafka.types import Message, TopicPartition from karapace.key_format import is_key_in_canonical_format, KeyFormatter, KeyMode from karapace.master_coordinator import MasterCoordinator from karapace.offset_watcher import OffsetWatcher @@ -33,7 +34,7 @@ from karapace.schema_references import LatestVersionReference, Reference, reference_from_mapping, Referents from karapace.statsd import StatsClient from karapace.typing import JsonObject, ResolvedVersion, SchemaId, Subject -from karapace.utils import json_decode, JSONDecodeError, KarapaceKafkaClient +from karapace.utils import json_decode, JSONDecodeError from threading import Event, Thread from typing import Final, Mapping, Sequence @@ -69,11 +70,10 @@ class MessageType(Enum): def _create_consumer_from_config(config: Config) -> KafkaConsumer: # Group not set on purpose, all consumers read the same data session_timeout_ms = config["session_timeout_ms"] - request_timeout_ms = max(session_timeout_ms, KafkaConsumer.DEFAULT_CONFIG["request_timeout_ms"]) + request_timeout_ms = max(session_timeout_ms, DEFAULT_REQUEST_TIMEOUT_MS) return KafkaConsumer( config["topic_name"], enable_auto_commit=False, - api_version=(1, 0, 0), bootstrap_servers=config["bootstrap_uri"], client_id=config["client_id"], fetch_max_wait_ms=50, @@ -87,7 +87,6 @@ def _create_consumer_from_config(config: Config) -> KafkaConsumer: auto_offset_reset="earliest", session_timeout_ms=session_timeout_ms, request_timeout_ms=request_timeout_ms, - kafka_client=KarapaceKafkaClient, metadata_max_age_ms=config["metadata_max_age_ms"], ) @@ -117,7 +116,7 @@ def __init__( ) -> None: Thread.__init__(self, name="schema-reader") self.master_coordinator = master_coordinator - self.timeout_ms = 200 + self.timeout_s = 0.2 self.config = config self.database = database @@ -234,10 +233,7 @@ def _get_beginning_offset(self) -> int: assert self.consumer is not None, "Thread must be started" try: - offsets = self.consumer.beginning_offsets([TopicPartition(self.config["topic_name"], 0)]) - # Offset in the response is the offset for last offset. - # Reduce by one for matching on startup. - beginning_offset = list(offsets.values())[0] - 1 + beginning_offset, _ = self.consumer.get_watermark_offsets(TopicPartition(self.config["topic_name"], 0)) return beginning_offset except KafkaTimeoutError: LOG.exception("Reading begin offsets timed out.") @@ -253,7 +249,7 @@ def _is_ready(self) -> bool: assert self.consumer is not None, "Thread must be started" try: - offsets = self.consumer.end_offsets([TopicPartition(self.config["topic_name"], 0)]) + _, end_offset = self.consumer.get_watermark_offsets(TopicPartition(self.config["topic_name"], 0)) except KafkaTimeoutError: LOG.exception("Reading end offsets timed out.") return False @@ -263,7 +259,7 @@ def _is_ready(self) -> bool: return False # Offset in the response is the offset for the next upcoming message. # Reduce by one for actual highest offset. - self._highest_offset = list(offsets.values())[0] - 1 + self._highest_offset = end_offset - 1 cur_time = time.monotonic() time_from_last_check = cur_time - self.last_check progress_pct = 0 if not self._highest_offset else round((self.offset / self._highest_offset) * 100, 2) @@ -281,7 +277,7 @@ def highest_offset(self) -> int: return max(self._highest_offset, self._offset_watcher.greatest_offset()) @staticmethod - def _parse_message_value(raw_value: str) -> JsonObject | None: + def _parse_message_value(raw_value: str | bytes) -> JsonObject | None: value = json_decode(raw_value) if isinstance(value, dict): return value @@ -292,7 +288,7 @@ def _parse_message_value(raw_value: str) -> JsonObject | None: def handle_messages(self) -> None: assert self.consumer is not None, "Thread must be started" - raw_msgs = self.consumer.poll(timeout_ms=self.timeout_ms) + msgs: list[Message] = self.consumer.consume(timeout=self.timeout_s) if self.ready is False: self.ready = self._is_ready() @@ -306,49 +302,52 @@ def handle_messages(self) -> None: if are_we_master is True: watch_offsets = True - for _, msgs in raw_msgs.items(): - schema_records_processed_keymode_canonical = 0 - schema_records_processed_keymode_deprecated_karapace = 0 - for msg in msgs: + schema_records_processed_keymode_canonical = 0 + schema_records_processed_keymode_deprecated_karapace = 0 + for msg in msgs: + try: + message_key = msg.key() + if message_key is None: + continue + key = json_decode(message_key) + except JSONDecodeError: + LOG.exception("Invalid JSON in msg.key()") + continue + + assert isinstance(key, dict) + msg_keymode = KeyMode.CANONICAL if is_key_in_canonical_format(key) else KeyMode.DEPRECATED_KARAPACE + # Key mode detection happens on startup. + # Default keymode is CANONICAL and preferred unless any data consumed + # has key in non-canonical format. If keymode is set to DEPRECATED_KARAPACE + # the subsequent keys are omitted from detection. + if not self.ready and self.key_formatter.get_keymode() == KeyMode.CANONICAL: + if msg_keymode == KeyMode.DEPRECATED_KARAPACE: + self.key_formatter.set_keymode(KeyMode.DEPRECATED_KARAPACE) + + value = None + message_value = msg.value() + if message_value: try: - key = json_decode(msg.key) + value = self._parse_message_value(message_value) except JSONDecodeError: - LOG.exception("Invalid JSON in msg.key") + LOG.exception("Invalid JSON in msg.value()") continue - assert isinstance(key, dict) - msg_keymode = KeyMode.CANONICAL if is_key_in_canonical_format(key) else KeyMode.DEPRECATED_KARAPACE - # Key mode detection happens on startup. - # Default keymode is CANONICAL and preferred unless any data consumed - # has key in non-canonical format. If keymode is set to DEPRECATED_KARAPACE - # the subsequent keys are omitted from detection. - if not self.ready and self.key_formatter.get_keymode() == KeyMode.CANONICAL: - if msg_keymode == KeyMode.DEPRECATED_KARAPACE: - self.key_formatter.set_keymode(KeyMode.DEPRECATED_KARAPACE) - - value = None - if msg.value: - try: - value = self._parse_message_value(msg.value) - except JSONDecodeError: - LOG.exception("Invalid JSON in msg.value") - continue - - self.handle_msg(key, value) - self.offset = msg.offset - - if msg_keymode == KeyMode.CANONICAL: - schema_records_processed_keymode_canonical += 1 - else: - schema_records_processed_keymode_deprecated_karapace += 1 - - if self.ready and watch_offsets: - self._offset_watcher.offset_seen(self.offset) - - self._report_schema_metrics( - schema_records_processed_keymode_canonical, - schema_records_processed_keymode_deprecated_karapace, - ) + self.handle_msg(key, value) + self.offset = msg.offset() + + if msg_keymode == KeyMode.CANONICAL: + schema_records_processed_keymode_canonical += 1 + else: + schema_records_processed_keymode_deprecated_karapace += 1 + + if self.ready and watch_offsets: + self._offset_watcher.offset_seen(self.offset) + + self._report_schema_metrics( + schema_records_processed_keymode_canonical, + schema_records_processed_keymode_deprecated_karapace, + ) def _report_schema_metrics( self, diff --git a/pytest.ini b/pytest.ini index 0f19813c2..8ed1116d2 100644 --- a/pytest.ini +++ b/pytest.ini @@ -1,4 +1,4 @@ [pytest] addopts = -ra --numprocesses auto --import-mode=importlib -timeout = 60 +timeout = 90 timeout_func_only = true diff --git a/stubs/confluent_kafka/__init__.pyi b/stubs/confluent_kafka/__init__.pyi index 5762cb52a..3d26b0393 100644 --- a/stubs/confluent_kafka/__init__.pyi +++ b/stubs/confluent_kafka/__init__.pyi @@ -1,4 +1,21 @@ from ._model import IsolationLevel -from .cimpl import Message, Producer, TopicPartition +from .cimpl import ( + Consumer, + Message, + Producer, + TIMESTAMP_CREATE_TIME, + TIMESTAMP_LOG_APPEND_TIME, + TIMESTAMP_NOT_AVAILABLE, + TopicPartition, +) -__all__ = ("IsolationLevel", "Message", "Producer", "TopicPartition") +__all__ = ( + "Consumer", + "IsolationLevel", + "Message", + "Producer", + "TIMESTAMP_CREATE_TIME", + "TIMESTAMP_LOG_APPEND_TIME", + "TIMESTAMP_NOT_AVAILABLE", + "TopicPartition", +) diff --git a/stubs/confluent_kafka/cimpl.pyi b/stubs/confluent_kafka/cimpl.pyi index 9b573c5b9..a794f10ca 100644 --- a/stubs/confluent_kafka/cimpl.pyi +++ b/stubs/confluent_kafka/cimpl.pyi @@ -31,8 +31,13 @@ class TopicPartition: partition: int = -1, offset: int = -1001, metadata: str | None = None, - leader_epoc: int | None = None, - ) -> None: ... + leader_epoch: int | None = None, + ) -> None: + self.topic: str + self.partition: int + self.offset: int + self.metadata: str | None + self.leader_epoch: int | None class Message: def offset(self) -> int: ... @@ -41,6 +46,7 @@ class Message: def value(self) -> str | bytes | None: ... def topic(self) -> str: ... def partition(self) -> int: ... + def headers(self) -> list[tuple[str, bytes]] | None: ... class Producer: def produce( @@ -56,3 +62,18 @@ class Producer: def flush(self, timeout: float = -1) -> None: ... def list_topics(self, topic: str | None = None, timeout: float = -1) -> ClusterMetadata: ... def poll(self, timeout: float = -1) -> int: ... + +class Consumer: + def subscribe(self, topics: list[str]) -> None: ... + def get_watermark_offsets( + self, partition: TopicPartition, timeout: float | None = None, cached: bool = False + ) -> tuple[int, int]: ... + def close(self) -> None: ... + def list_topics(self, topic: str | None = None, timeout: float = -1) -> ClusterMetadata: ... + def consume(self, num_messages: int = 1, timeout: float = -1) -> list[Message]: ... + def poll(self, timeout: float = -1) -> Message | None: ... + def assign(self, partitions: list[TopicPartition]) -> None: ... + +TIMESTAMP_CREATE_TIME = ... +TIMESTAMP_NOT_AVAILABLE = ... +TIMESTAMP_LOG_APPEND_TIME = ... diff --git a/tests/integration/backup/test_get_topic_configurations.py b/tests/integration/backup/test_get_topic_configurations.py index 16592aed3..587ae65a4 100644 --- a/tests/integration/backup/test_get_topic_configurations.py +++ b/tests/integration/backup/test_get_topic_configurations.py @@ -5,7 +5,8 @@ from __future__ import annotations from karapace.backup.topic_configurations import ALL_CONFIG_SOURCES, ConfigSource, DEFAULT_CONFIGS, get_topic_configurations -from karapace.kafka.admin import KafkaAdminClient, NewTopic +from karapace.kafka.admin import KafkaAdminClient +from karapace.kafka.types import NewTopic import pytest diff --git a/tests/integration/backup/test_legacy_backup.py b/tests/integration/backup/test_legacy_backup.py index 3d732bd58..6b7cf8ae1 100644 --- a/tests/integration/backup/test_legacy_backup.py +++ b/tests/integration/backup/test_legacy_backup.py @@ -4,8 +4,6 @@ Copyright (c) 2023 Aiven Ltd See LICENSE for details """ -from datetime import timedelta -from kafka import KafkaConsumer from karapace.backup import api from karapace.backup.api import BackupVersion from karapace.backup.errors import StaleConsumerError @@ -13,6 +11,7 @@ from karapace.client import Client from karapace.config import set_config_defaults from karapace.kafka.admin import KafkaAdminClient +from karapace.kafka.consumer import KafkaConsumer from karapace.key_format import is_key_in_canonical_format from karapace.utils import Expiration from pathlib import Path @@ -131,18 +130,16 @@ def _assert_canonical_key_format( schemas_topic, group_id="assert-canonical-key-format-consumer", enable_auto_commit=False, - api_version=(1, 0, 0), bootstrap_servers=bootstrap_servers, auto_offset_reset="earliest", ) - raw_msgs = consumer.poll(timeout_ms=2000) - while raw_msgs: - for _, messages in raw_msgs.items(): - for message in messages: - key = json.loads(message.key) - assert is_key_in_canonical_format(key), f"Not in canonical format: {key}" - raw_msgs = consumer.poll() + messages = consumer.poll(timeout=2) + while messages: + for message in messages: + key = json.loads(message.key()) + assert is_key_in_canonical_format(key), f"Not in canonical format: {key}" + messages = consumer.poll(timeout=2) consumer.close() @@ -174,7 +171,7 @@ async def test_backup_restore( # The restored karapace should have the previously created subject all_subjects = [] - expiration = Expiration.from_timeout(timeout=10) + expiration = Expiration.from_timeout(timeout=30) while subject not in all_subjects: expiration.raise_timeout_if_expired( msg_format="{subject} not in {all_subjects}", @@ -184,7 +181,7 @@ async def test_backup_restore( res = await registry_async_client.get("subjects") assert res.status_code == 200 all_subjects = res.json() - time.sleep(0.1) + time.sleep(1) # Test a few exotic scenarios @@ -260,13 +257,13 @@ async def test_stale_consumer( # The proper way to test this would be with quotas by throttling our client to death while using a very short # poll timeout. However, we have no way to set up quotas because all Kafka clients available to us do not # implement the necessary APIs. - with mock.patch(f"{KafkaConsumer.__module__}.{KafkaConsumer.__qualname__}._poll_once") as poll_once_mock: - poll_once_mock.return_value = {} + with mock.patch(f"{KafkaConsumer.__module__}.{KafkaConsumer.__qualname__}.poll") as poll_mock: + poll_mock.return_value = None api.create_backup( config=config, backup_location=tmp_path / "backup", topic_name=api.normalize_topic_name(None, config), version=BackupVersion.V2, - poll_timeout=PollTimeout(timedelta(seconds=1)), + poll_timeout=PollTimeout.of(seconds=1), ) assert str(e.value) == f"{registry_cluster.schemas_topic}:0#0 (0,0) after PT1S" diff --git a/tests/integration/backup/test_v3_backup.py b/tests/integration/backup/test_v3_backup.py index be336afcf..cf2d993a4 100644 --- a/tests/integration/backup/test_v3_backup.py +++ b/tests/integration/backup/test_v3_backup.py @@ -5,8 +5,6 @@ from __future__ import annotations from dataclasses import fields -from kafka import TopicPartition -from kafka.consumer.fetcher import ConsumerRecord from kafka.errors import UnknownTopicOrPartitionError from karapace.backup import api from karapace.backup.api import _consume_records, TopicName @@ -16,8 +14,9 @@ from karapace.backup.poll_timeout import PollTimeout from karapace.backup.topic_configurations import ConfigSource, get_topic_configurations from karapace.config import Config, set_config_defaults -from karapace.kafka.admin import KafkaAdminClient, NewTopic +from karapace.kafka.admin import KafkaAdminClient from karapace.kafka.producer import KafkaProducer +from karapace.kafka.types import Message, NewTopic, Timestamp, TopicPartition from karapace.kafka_utils import kafka_consumer_from_config, kafka_producer_from_config from karapace.version import __version__ from pathlib import Path @@ -182,28 +181,28 @@ def test_roundtrip_from_kafka_state( ) # First record. - assert isinstance(first_record, ConsumerRecord) - assert first_record.topic == new_topic.topic - assert first_record.partition == partition + assert isinstance(first_record, Message) + assert first_record.topic() == new_topic.topic + assert first_record.partition() == partition # Note: This might be unreliable due to not using idempotent producer, i.e. we have # no guarantee against duplicates currently. - assert first_record.offset == 0 - assert first_record.timestamp == 1683474641 - assert first_record.timestamp_type == 0 - assert first_record.key == b"bar" - assert first_record.value == b"foo" - assert first_record.headers == [] + assert first_record.offset() == 0 + assert first_record.timestamp()[1] == 1683474641 + assert first_record.timestamp()[0] == Timestamp.CREATE_TIME + assert first_record.key() == b"bar" + assert first_record.value() == b"foo" + assert first_record.headers() is None # Second record. - assert isinstance(second_record, ConsumerRecord) - assert second_record.topic == new_topic.topic - assert second_record.partition == partition - assert second_record.offset == 1 - assert second_record.timestamp == 1683474657 - assert second_record.timestamp_type == 0 - assert second_record.key == b"foo" - assert second_record.value == b"bar" - assert second_record.headers == [ + assert isinstance(second_record, Message) + assert second_record.topic() == new_topic.topic + assert second_record.partition() == partition + assert second_record.offset() == 1 + assert second_record.timestamp()[1] == 1683474657 + assert second_record.timestamp()[0] == Timestamp.CREATE_TIME + assert second_record.key() == b"foo" + assert second_record.value() == b"bar" + assert second_record.headers() == [ ("some-header", b"some header value"), ("other-header", b"some other header value"), ] diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index d9ab70d40..071077dd3 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -12,8 +12,9 @@ from filelock import FileLock from karapace.client import Client from karapace.config import Config, set_config_defaults, write_config -from karapace.kafka.admin import KafkaAdminClient, NewTopic +from karapace.kafka.admin import KafkaAdminClient from karapace.kafka.producer import KafkaProducer +from karapace.kafka.types import NewTopic from karapace.kafka_rest_apis import KafkaRest from pathlib import Path from tests.conftest import KAFKA_VERSION diff --git a/tests/integration/kafka/test_admin.py b/tests/integration/kafka/test_admin.py index d6d586a6c..0c9079c9c 100644 --- a/tests/integration/kafka/test_admin.py +++ b/tests/integration/kafka/test_admin.py @@ -6,8 +6,9 @@ from __future__ import annotations from kafka.errors import InvalidReplicationFactorError, TopicAlreadyExistsError, UnknownTopicOrPartitionError -from karapace.kafka.admin import ConfigSource, KafkaAdminClient, NewTopic +from karapace.kafka.admin import KafkaAdminClient from karapace.kafka.producer import KafkaProducer +from karapace.kafka.types import ConfigSource, NewTopic from tests.utils import new_topic as create_new_topic import pytest diff --git a/tests/integration/kafka/test_consumer.py b/tests/integration/kafka/test_consumer.py new file mode 100644 index 000000000..9ca5b9320 --- /dev/null +++ b/tests/integration/kafka/test_consumer.py @@ -0,0 +1,26 @@ +""" +Copyright (c) 2023 Aiven Ltd +See LICENSE for details +""" +from __future__ import annotations + +from karapace.kafka.consumer import KafkaConsumer +from karapace.kafka.types import NewTopic +from tests.integration.utils.kafka_server import KafkaServers + + +class TestPartitionsForTopic: + def test_partitions_for_returns_empty_for_unknown_topic(self, kafka_servers: KafkaServers) -> None: + consumer = KafkaConsumer(bootstrap_servers=kafka_servers.bootstrap_servers, topic="nonexistent") + + assert consumer.partitions_for_topic("nonexistent") == {} + + def test_partitions_for(self, kafka_servers: KafkaServers, new_topic: NewTopic) -> None: + consumer = KafkaConsumer(bootstrap_servers=kafka_servers.bootstrap_servers, topic=new_topic.topic) + + partitions = consumer.partitions_for_topic(new_topic.topic) + + assert len(partitions) == 1 + assert partitions[0].id == 0 + assert partitions[0].replicas == [1] + assert partitions[0].isrs == [1] diff --git a/tests/integration/kafka/test_producer.py b/tests/integration/kafka/test_producer.py index 69e06ea08..6faa42216 100644 --- a/tests/integration/kafka/test_producer.py +++ b/tests/integration/kafka/test_producer.py @@ -6,8 +6,8 @@ from __future__ import annotations from kafka.errors import MessageSizeTooLargeError, UnknownTopicOrPartitionError -from karapace.kafka.admin import NewTopic from karapace.kafka.producer import KafkaProducer +from karapace.kafka.types import NewTopic, Timestamp import pytest import time @@ -37,6 +37,7 @@ def test_send(self, producer: KafkaProducer, new_topic: NewTopic) -> None: assert message.topic() == new_topic.topic assert message.key() == key assert message.value() == value + assert message.timestamp()[0] == Timestamp.CREATE_TIME assert message.timestamp()[1] == timestamp def test_send_raises_for_unknown_topic(self, producer: KafkaProducer) -> None: diff --git a/tests/unit/backup/backends/test_v2.py b/tests/unit/backup/backends/test_v2.py index b38f83469..b2ad273b2 100644 --- a/tests/unit/backup/backends/test_v2.py +++ b/tests/unit/backup/backends/test_v2.py @@ -5,12 +5,13 @@ from __future__ import annotations from functools import partial -from kafka.consumer.fetcher import ConsumerRecord from karapace.backup.backends.reader import ProducerSend, RestoreTopicLegacy from karapace.backup.backends.v2 import AnonymizeAvroWriter, SchemaBackupV2Reader, SchemaBackupV2Writer from karapace.backup.encoders import encode_key, encode_value +from karapace.kafka.types import Timestamp from karapace.key_format import KeyFormatter from pathlib import Path +from tests.utils import StubMessage import datetime import json @@ -29,7 +30,7 @@ def test_schema_backup_v2_roundtrip(tmp_path: Path) -> None: topic_name = "a-topic" partition_index = 123 records = ( - ConsumerRecord( + StubMessage( key=json.dumps( { "keytype": "SCHEMA", @@ -50,15 +51,10 @@ def test_schema_backup_v2_roundtrip(tmp_path: Path) -> None: topic=topic_name, partition=partition_index, offset=0, - timestamp=round(time.time()), - timestamp_type=None, - headers=(), - checksum=None, - serialized_key_size=None, - serialized_value_size=None, - serialized_header_size=None, + timestamp=(Timestamp.CREATE_TIME, round(time.time())), + headers=None, ), - ConsumerRecord( + StubMessage( key=json.dumps( { "keytype": "SCHEMA", @@ -79,13 +75,8 @@ def test_schema_backup_v2_roundtrip(tmp_path: Path) -> None: topic=topic_name, partition=partition_index, offset=0, - timestamp=round(time.time()), - timestamp_type=None, - headers=(), - checksum=None, - serialized_key_size=None, - serialized_value_size=None, - serialized_header_size=None, + timestamp=(Timestamp.CREATE_TIME, round(time.time())), + headers=None, ), ) @@ -166,7 +157,7 @@ def test_anonymize_avro_roundtrip(tmp_path: Path) -> None: topic_name = "a-topic" partition_index = 123 records = ( - ConsumerRecord( + StubMessage( key=json.dumps( { "keytype": "SCHEMA", @@ -194,15 +185,10 @@ def test_anonymize_avro_roundtrip(tmp_path: Path) -> None: topic=topic_name, partition=partition_index, offset=0, - timestamp=round(time.time()), - timestamp_type=None, - headers=(), - checksum=None, - serialized_key_size=None, - serialized_value_size=None, - serialized_header_size=None, + timestamp=(Timestamp.CREATE_TIME, round(time.time())), + headers=None, ), - ConsumerRecord( + StubMessage( key=json.dumps( { "keytype": "SCHEMA", @@ -230,13 +216,8 @@ def test_anonymize_avro_roundtrip(tmp_path: Path) -> None: topic=topic_name, partition=partition_index, offset=0, - timestamp=round(time.time()), - timestamp_type=None, - headers=(), - checksum=None, - serialized_key_size=None, - serialized_value_size=None, - serialized_header_size=None, + timestamp=(Timestamp.CREATE_TIME, round(time.time())), + headers=None, ), ) diff --git a/tests/unit/backup/backends/v3/test_backend.py b/tests/unit/backup/backends/v3/test_backend.py index 4f8562311..dd4b47afd 100644 --- a/tests/unit/backup/backends/v3/test_backend.py +++ b/tests/unit/backup/backends/v3/test_backend.py @@ -3,7 +3,6 @@ See LICENSE for details """ from dataclasses import replace -from kafka.consumer.fetcher import ConsumerRecord from karapace.backup.backends.reader import ProducerSend, RestoreTopic from karapace.backup.backends.v3.backend import _PartitionStats, SchemaBackupV3Reader, SchemaBackupV3Writer from karapace.backup.backends.v3.errors import ( @@ -17,6 +16,7 @@ from karapace.backup.backends.v3.readers import read_records from karapace.backup.backends.v3.schema import ChecksumAlgorithm, DataFile from pathlib import Path +from tests.utils import StubMessage from unittest import mock import datetime @@ -33,33 +33,23 @@ def test_writer_reader_roundtrip(tmp_path: Path) -> None: finished_at = datetime.datetime.now(datetime.timezone.utc) records = ( - ConsumerRecord( + StubMessage( key=b"foo", value=b"bar", topic=topic_name, partition=partition_index, offset=10, - timestamp=round(time.time()), - timestamp_type=None, - headers=(), - checksum=None, - serialized_key_size=None, - serialized_value_size=None, - serialized_header_size=None, + timestamp=(1, round(time.time())), # TODO + headers=None, ), - ConsumerRecord( + StubMessage( key=b"foo", value=b"bar", topic=topic_name, partition=partition_index, offset=14, - timestamp=round(time.time()), - timestamp_type=None, - headers=(("some-key", b"some-value"),), - checksum=None, - serialized_key_size=None, - serialized_value_size=None, - serialized_header_size=None, + timestamp=(1, round(time.time())), # TODO + headers=[("some-key", b"some-value")], ), ) topic_configurations = {"max.message.bytes": "1024"} @@ -124,36 +114,31 @@ def test_writer_reader_roundtrip(tmp_path: Path) -> None: ProducerSend( topic_name=topic_name, partition_index=partition_index, - key=records[0].key, - value=records[0].value, + key=records[0].key(), + value=records[0].value(), headers=(), - timestamp=records[0].timestamp, + timestamp=records[0].timestamp()[1], ), ProducerSend( topic_name=topic_name, partition_index=partition_index, - key=records[1].key, - value=records[1].value, + key=records[1].key(), + value=records[1].value(), headers=((b"some-key", b"some-value"),), - timestamp=records[0].timestamp, + timestamp=records[0].timestamp()[1], ), ) -def make_record(topic_name: str, partition_index: int, offset: int) -> ConsumerRecord: - return ConsumerRecord( +def make_record(topic_name: str, partition_index: int, offset: int) -> StubMessage: + return StubMessage( key=b"foo", value=b"bar", topic=topic_name, partition=partition_index, offset=offset, - timestamp=round(time.time()), - timestamp_type=None, - headers=(("some-key", b"some-value"),), - checksum=None, - serialized_key_size=None, - serialized_value_size=None, - serialized_header_size=None, + timestamp=(1, round(time.time())), # TODO + headers=[("some-key", b"some-value")], ) diff --git a/tests/unit/backup/test_api.py b/tests/unit/backup/test_api.py index c112d5ffc..820287cc3 100644 --- a/tests/unit/backup/test_api.py +++ b/tests/unit/backup/test_api.py @@ -4,9 +4,7 @@ """ from __future__ import annotations -from kafka import KafkaConsumer from kafka.errors import KafkaError, TopicAlreadyExistsError -from kafka.structs import PartitionMetadata from karapace import config from karapace.backup.api import ( _admin, @@ -24,6 +22,7 @@ from karapace.backup.errors import BackupError, PartitionCountError from karapace.config import Config from karapace.constants import DEFAULT_SCHEMA_TOPIC +from karapace.kafka.consumer import KafkaConsumer, PartitionMetadata from karapace.kafka.producer import KafkaProducer from pathlib import Path from types import FunctionType @@ -148,7 +147,16 @@ def test_skip_topic_creation( class TestClients: @staticmethod def _partition_metadata(c: int = 1) -> set[PartitionMetadata]: - return {PartitionMetadata("topic", i, 0, tuple(), tuple(), None) for i in range(0, c)} + def create(partition) -> PartitionMetadata: + metadata = PartitionMetadata() + metadata.id = partition + metadata.leader = 1 + metadata.replicas = () + metadata.isrs = () + + return metadata + + return {create(i) for i in range(c)} @pytest.mark.parametrize( "ctx_mng,client_class,partitions_method,close_method_name", diff --git a/tests/unit/backup/test_poll_timeout.py b/tests/unit/backup/test_poll_timeout.py index 9a0614038..ecd9bfce4 100644 --- a/tests/unit/backup/test_poll_timeout.py +++ b/tests/unit/backup/test_poll_timeout.py @@ -37,3 +37,6 @@ def test__repr__(self) -> None: def test_milliseconds(self) -> None: assert PollTimeout(timedelta(milliseconds=1000.5)).milliseconds == 1000 + + def test_seconds(self) -> None: + assert PollTimeout(timedelta(milliseconds=1500)).seconds == 1.5 diff --git a/tests/unit/test_schema_reader.py b/tests/unit/test_schema_reader.py index b6566e927..01bc2ab54 100644 --- a/tests/unit/test_schema_reader.py +++ b/tests/unit/test_schema_reader.py @@ -137,9 +137,9 @@ class ReadinessTestCase(BaseTestCase): def test_readiness_check(testcase: ReadinessTestCase) -> None: key_formatter_mock = Mock() consumer_mock = Mock() - consumer_mock.poll.return_value = {} + consumer_mock.consume.return_value = [] # Return dict {partition: offsets}, end offset is the next upcoming record offset - consumer_mock.end_offsets.return_value = {0: testcase.end_offset} + consumer_mock.get_watermark_offsets.return_value = (0, testcase.end_offset) offset_watcher = OffsetWatcher() schema_reader = KafkaSchemaReader( diff --git a/tests/utils.py b/tests/utils.py index 12e544408..7f31bb6e1 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -10,7 +10,7 @@ from karapace.utils import Expiration from pathlib import Path from subprocess import Popen -from typing import Callable, IO, List, Union +from typing import Any, Callable, IO, List, Union from urllib.parse import quote import asyncio @@ -307,3 +307,15 @@ def popen_karapace_all(config_path: Union[Path, str], stdout: IO, stderr: IO, ** kwargs["stdout"] = stdout kwargs["stderr"] = stderr return Popen([python_exe(), "-m", "karapace.karapace_all", str(config_path)], **kwargs) + + +# A stub to stand-in for `confluent_kafka.Message` in unittests, since that +# class cannot be instantiated, thus this is a liberal simulation of its +# behaviour ie. its attributes are accessible via getter functions: +# `message.offset()` +class StubMessage: + def __init__(self, **attrs: Any) -> None: + self._attrs = attrs + + def __getattr__(self, key: str) -> None: + return lambda: self._attrs[key]