From 981fa698920f170e26bbffae1ead46331bfa51b3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?M=C3=A1ty=C3=A1s=20Kuti?= Date: Wed, 29 Nov 2023 16:30:39 +0100 Subject: [PATCH] Replace sync Kafka consumers with confluent_kafka one This change replaces all synchronous Kafka consumers (from the kafka-python library) with a new implementation based on confluent-kafka-python's `Consumer`, keeping the same interface as much as possible. The PyTest timeout is raised from 60s to 90s to accomodate for the default poll timeout for backups consumers (otherwise the tests would time out while still waiting for messages to arrive).o Since the `conluent_kafka.Consumer` implementation does not allow for consumers to be without a group ID, if the new `KafkaConsumer` client is not given one, we'll generate one on the fly to mimic a groupless behaviour. Resources: * confluent-kafka-python documentation: https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html# * librdkafka configuration documentation: https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md --- karapace/backup/api.py | 22 ++-- karapace/backup/backends/v3/backend.py | 22 ++-- karapace/backup/backends/writer.py | 15 ++- karapace/backup/errors.py | 2 +- karapace/backup/poll_timeout.py | 5 + karapace/backup/topic_configurations.py | 3 +- karapace/kafka/admin.py | 22 ++-- karapace/kafka/common.py | 16 ++- karapace/kafka/consumer.py | 44 ++++++++ karapace/kafka/producer.py | 5 +- karapace/kafka/types.py | 55 +++++++++ karapace/kafka_rest_apis/__init__.py | 3 +- karapace/kafka_utils.py | 4 +- karapace/master_coordinator.py | 4 +- karapace/schema_reader.py | 105 +++++++++--------- pytest.ini | 2 +- stubs/confluent_kafka/__init__.pyi | 21 +++- stubs/confluent_kafka/cimpl.pyi | 25 ++++- .../backup/test_get_topic_configurations.py | 3 +- .../integration/backup/test_legacy_backup.py | 27 ++--- tests/integration/backup/test_v3_backup.py | 41 ++++--- tests/integration/conftest.py | 3 +- tests/integration/kafka/test_admin.py | 3 +- tests/integration/kafka/test_consumer.py | 26 +++++ tests/integration/kafka/test_producer.py | 3 +- tests/unit/backup/backends/test_v2.py | 47 +++----- tests/unit/backup/backends/v3/test_backend.py | 49 +++----- tests/unit/backup/test_api.py | 14 ++- tests/unit/backup/test_poll_timeout.py | 3 + tests/unit/test_schema_reader.py | 6 +- tests/utils.py | 16 ++- 31 files changed, 396 insertions(+), 220 deletions(-) create mode 100644 karapace/kafka/consumer.py create mode 100644 karapace/kafka/types.py create mode 100644 tests/integration/kafka/test_consumer.py diff --git a/karapace/backup/api.py b/karapace/backup/api.py index 04cc8f989..dfa2f545b 100644 --- a/karapace/backup/api.py +++ b/karapace/backup/api.py @@ -24,24 +24,23 @@ from concurrent.futures import Future from enum import Enum from functools import partial -from kafka import KafkaConsumer -from kafka.consumer.fetcher import ConsumerRecord from kafka.errors import KafkaError, TopicAlreadyExistsError -from kafka.structs import TopicPartition from karapace import constants from karapace.backup.backends.v1 import SchemaBackupV1Reader from karapace.backup.backends.v2 import AnonymizeAvroWriter, SchemaBackupV2Reader, SchemaBackupV2Writer, V2_MARKER from karapace.backup.backends.v3.backend import SchemaBackupV3Reader, SchemaBackupV3Writer, VerifyFailure, VerifySuccess from karapace.config import Config from karapace.kafka.admin import KafkaAdminClient +from karapace.kafka.consumer import KafkaConsumer from karapace.kafka.producer import KafkaProducer +from karapace.kafka.types import Message, TopicPartition from karapace.kafka_utils import kafka_admin_from_config, kafka_consumer_from_config, kafka_producer_from_config from karapace.key_format import KeyFormatter from karapace.utils import assert_never from pathlib import Path from rich.console import Console from tenacity import retry, retry_if_exception_type, RetryCallState, stop_after_delay, wait_fixed -from typing import Callable, Collection, Iterator, Literal, Mapping, NewType, Sized, TypeVar +from typing import Callable, Iterator, Literal, Mapping, NewType, Sized, TypeVar import contextlib import datetime @@ -282,9 +281,8 @@ def _consume_records( consumer: KafkaConsumer, topic_partition: TopicPartition, poll_timeout: PollTimeout, -) -> Iterator[ConsumerRecord]: - start_offset: int = consumer.beginning_offsets([topic_partition])[topic_partition] - end_offset: int = consumer.end_offsets([topic_partition])[topic_partition] +) -> Iterator[Message]: + start_offset, end_offset = consumer.get_watermark_offsets(topic_partition) last_offset = start_offset LOG.info( @@ -301,12 +299,11 @@ def _consume_records( end_offset -= 1 # high watermark to actual end offset while True: - records: Collection[ConsumerRecord] = consumer.poll(poll_timeout.milliseconds).get(topic_partition, []) - if len(records) == 0: + record: Message | None = consumer.poll(timeout=poll_timeout.seconds) + if record is None: raise StaleConsumerError(topic_partition, start_offset, end_offset, last_offset, poll_timeout) - for record in records: - yield record - last_offset = record.offset # pylint: disable=undefined-loop-variable + yield record + last_offset = record.offset() if last_offset >= end_offset: break @@ -528,6 +525,7 @@ def create_backup( with _consumer(config, topic_name) as consumer: (partition,) = consumer.partitions_for_topic(topic_name) topic_partition = TopicPartition(topic_name, partition) + consumer.assign([topic_partition]) try: data_file = _write_partition( diff --git a/karapace/backup/backends/v3/backend.py b/karapace/backup/backends/v3/backend.py index bd9b35dbd..a670287c3 100644 --- a/karapace/backup/backends/v3/backend.py +++ b/karapace/backup/backends/v3/backend.py @@ -10,11 +10,11 @@ from .schema import ChecksumAlgorithm, DataFile, Header, Metadata, Record from .writers import write_metadata, write_record from dataclasses import dataclass -from kafka.consumer.fetcher import ConsumerRecord from karapace.backup.backends.reader import BaseBackupReader, Instruction, ProducerSend, RestoreTopic from karapace.backup.backends.writer import BytesBackupWriter, StdOut from karapace.backup.safe_writer import bytes_writer, staging_directory from karapace.dataclasses import default_dataclass +from karapace.kafka.types import Message from karapace.utils import assert_never from karapace.version import __version__ from pathlib import Path @@ -334,27 +334,31 @@ def store_metadata( def store_record( self, buffer: IO[bytes], - record: ConsumerRecord, + record: Message, ) -> None: - stats: Final = self._partition_stats[record.partition] + stats: Final = self._partition_stats[record.partition()] checksum_checkpoint: Final = stats.get_checkpoint( records_threshold=self._max_records_per_checkpoint, bytes_threshold=self._max_bytes_per_checkpoint, ) offset_start: Final = buffer.tell() + + record_key = record.key() + record_value = record.value() + write_record( buffer, record=Record( - key=record.key, - value=record.value, - headers=tuple(Header(key=key.encode(), value=value) for key, value in record.headers), - offset=record.offset, - timestamp=record.timestamp, + key=record_key.encode() if isinstance(record_key, str) else record_key, + value=record_value.encode() if isinstance(record_value, str) else record_value, + headers=tuple(Header(key=key.encode(), value=value) for key, value in record.headers() or []), + offset=record.offset(), + timestamp=record.timestamp()[1], checksum_checkpoint=checksum_checkpoint, ), running_checksum=stats.running_checksum, ) stats.update( bytes_offset=buffer.tell() - offset_start, - record_offset=record.offset, + record_offset=record.offset(), ) diff --git a/karapace/backup/backends/writer.py b/karapace/backup/backends/writer.py index c2079eb04..70b1ee6c4 100644 --- a/karapace/backup/backends/writer.py +++ b/karapace/backup/backends/writer.py @@ -4,8 +4,8 @@ """ from __future__ import annotations -from kafka.consumer.fetcher import ConsumerRecord from karapace.backup.safe_writer import bytes_writer, str_writer +from karapace.kafka.types import Message from pathlib import Path from typing import ContextManager, Generic, IO, Iterator, Literal, Mapping, Sequence, TypeVar from typing_extensions import TypeAlias @@ -98,7 +98,7 @@ def store_metadata( def store_record( self, buffer: IO[B], - record: ConsumerRecord, + record: Message, ) -> None: """ Called in order for each record read from a topic to be backed up. It's safe to @@ -154,9 +154,16 @@ class BaseKVBackupWriter(StrBackupWriter, abc.ABC): def store_record( self, buffer: IO[str], - record: ConsumerRecord, + record: Message, ) -> None: - buffer.write(self.serialize_record(record.key, record.value)) + record_key = record.key() + record_value = record.value() + buffer.write( + self.serialize_record( + record_key.encode() if isinstance(record_key, str) else record_key, + record_value.encode() if isinstance(record_value, str) else record_value, + ) + ) @staticmethod @abc.abstractmethod diff --git a/karapace/backup/errors.py b/karapace/backup/errors.py index 364052cc5..cfa50ceb0 100644 --- a/karapace/backup/errors.py +++ b/karapace/backup/errors.py @@ -2,8 +2,8 @@ Copyright (c) 2023 Aiven Ltd See LICENSE for details """ -from kafka.structs import TopicPartition from karapace.backup.poll_timeout import PollTimeout +from karapace.kafka.types import TopicPartition __all__ = ["BackupError", "BackupTopicAlreadyExists", "EmptyPartition", "PartitionCountError", "StaleConsumerError"] diff --git a/karapace/backup/poll_timeout.py b/karapace/backup/poll_timeout.py index 0a1b9e157..91d5871f1 100644 --- a/karapace/backup/poll_timeout.py +++ b/karapace/backup/poll_timeout.py @@ -49,3 +49,8 @@ def __repr__(self) -> str: def milliseconds(self) -> int: """Returns this poll timeout in milliseconds, anything smaller than a milliseconds is ignored (no rounding).""" return self.__value // timedelta(milliseconds=1) + + @cached_property + def seconds(self) -> float: + """Returns this poll timeout in seconds.""" + return self.__value / timedelta(seconds=1) diff --git a/karapace/backup/topic_configurations.py b/karapace/backup/topic_configurations.py index 93b9ceacf..f9916565f 100644 --- a/karapace/backup/topic_configurations.py +++ b/karapace/backup/topic_configurations.py @@ -4,7 +4,8 @@ """ from __future__ import annotations -from karapace.kafka.admin import ConfigSource, KafkaAdminClient +from karapace.kafka.admin import KafkaAdminClient +from karapace.kafka.types import ConfigSource from typing import Container, Final ALL_CONFIG_SOURCES: Final = ConfigSource diff --git a/karapace/kafka/admin.py b/karapace/kafka/admin.py index 5b9d9e5ad..46323a61b 100644 --- a/karapace/kafka/admin.py +++ b/karapace/kafka/admin.py @@ -7,25 +7,25 @@ from collections.abc import Iterable from concurrent.futures import Future -from confluent_kafka import TopicPartition -from confluent_kafka.admin import ( - AdminClient, +from confluent_kafka.admin import AdminClient +from karapace.constants import TOPIC_CREATION_TIMEOUT_S +from karapace.kafka.common import ( + _KafkaConfigMixin, + raise_from_kafkaexception, + single_futmap_result, + UnknownTopicOrPartitionError, +) +from karapace.kafka.types import ( BrokerMetadata, ClusterMetadata, ConfigResource, ConfigSource, + KafkaException, NewTopic, OffsetSpec, ResourceType, TopicMetadata, -) -from confluent_kafka.error import KafkaException -from karapace.constants import TOPIC_CREATION_TIMEOUT_S -from karapace.kafka.common import ( - _KafkaConfigMixin, - raise_from_kafkaexception, - single_futmap_result, - UnknownTopicOrPartitionError, + TopicPartition, ) from typing import Container diff --git a/karapace/kafka/common.py b/karapace/kafka/common.py index cb38165c8..42c5115e6 100644 --- a/karapace/kafka/common.py +++ b/karapace/kafka/common.py @@ -7,9 +7,9 @@ from collections.abc import Iterable from concurrent.futures import Future -from confluent_kafka.error import KafkaError, KafkaException from kafka.errors import AuthenticationFailedError, for_code, NoBrokersAvailable, UnknownTopicOrPartitionError -from typing import Any, Callable, NoReturn, Protocol, TypedDict, TypeVar +from karapace.kafka.types import KafkaError, KafkaException +from typing import Any, Callable, Literal, NoReturn, Protocol, TypedDict, TypeVar from typing_extensions import Unpack import logging @@ -85,6 +85,12 @@ class KafkaClientParams(TypedDict, total=False): ssl_certfile: str | None ssl_keyfile: str | None sasl_oauth_token_provider: TokenWithExpiryProvider + # Consumer-only + auto_offset_reset: Literal["smallest", "earliest", "beginning", "largest", "latest", "end", "error"] + enable_auto_commit: bool + fetch_max_wait_ms: int + group_id: str + session_timeout_ms: int class _KafkaConfigMixin: @@ -128,6 +134,12 @@ def _get_config_from_params(self, bootstrap_servers: Iterable[str] | str, **para "ssl.certificate.location": params.get("ssl_certfile"), "ssl.key.location": params.get("ssl_keyfile"), "error_cb": self._error_callback, + # Consumer-only + "auto.offset.reset": params.get("auto_offset_reset"), + "enable.auto.commit": params.get("enable_auto_commit"), + "fetch.wait.max.ms": params.get("fetch_max_wait_ms"), + "group.id": params.get("group_id"), + "session.timeout.ms": params.get("session_timeout_ms"), } config = {key: value for key, value in config.items() if value is not None} diff --git a/karapace/kafka/consumer.py b/karapace/kafka/consumer.py new file mode 100644 index 000000000..deb60e125 --- /dev/null +++ b/karapace/kafka/consumer.py @@ -0,0 +1,44 @@ +""" +Copyright (c) 2023 Aiven Ltd +See LICENSE for details +""" + +from __future__ import annotations + +from confluent_kafka import Consumer +from karapace.kafka.common import _KafkaConfigMixin, KafkaClientParams, raise_from_kafkaexception +from karapace.kafka.types import KafkaException, PartitionMetadata +from typing import Iterable +from typing_extensions import Unpack + +import secrets + + +class KafkaConsumer(_KafkaConfigMixin, Consumer): + def __init__( + self, + topic: str, + bootstrap_servers: Iterable[str] | str, + verify_connection: bool = True, + **params: Unpack[KafkaClientParams], + ) -> None: + # The `confluent_kafka.Consumer` does not allow for a missing group id + # if the client of this class does not provide one, we'll generate a + # unique group id to achieve the groupless behaviour + if "group_id" not in params: + params["group_id"] = self._create_group_id() + + super().__init__(bootstrap_servers, verify_connection, **params) + + self.subscribe([topic]) + + @staticmethod + def _create_group_id() -> str: + return f"karapace-{secrets.token_hex(6)}" + + def partitions_for_topic(self, topic: str) -> dict[int, PartitionMetadata]: + """Returns all partition metadata for the given topic.""" + try: + return self.list_topics(topic).topics[topic].partitions + except KafkaException as exc: + raise_from_kafkaexception(exc) diff --git a/karapace/kafka/producer.py b/karapace/kafka/producer.py index 0caecb4c2..0b7f7ac83 100644 --- a/karapace/kafka/producer.py +++ b/karapace/kafka/producer.py @@ -6,11 +6,10 @@ from __future__ import annotations from concurrent.futures import Future -from confluent_kafka import Message, Producer -from confluent_kafka.admin import PartitionMetadata -from confluent_kafka.error import KafkaError, KafkaException +from confluent_kafka import Producer from functools import partial from karapace.kafka.common import _KafkaConfigMixin, raise_from_kafkaexception, translate_from_kafkaerror +from karapace.kafka.types import KafkaError, KafkaException, Message, PartitionMetadata from typing import cast, TypedDict from typing_extensions import Unpack diff --git a/karapace/kafka/types.py b/karapace/kafka/types.py new file mode 100644 index 000000000..d1e755422 --- /dev/null +++ b/karapace/kafka/types.py @@ -0,0 +1,55 @@ +""" +Copyright (c) 2023 Aiven Ltd +See LICENSE for details +""" + +from confluent_kafka import ( + Message, + TIMESTAMP_CREATE_TIME, + TIMESTAMP_LOG_APPEND_TIME, + TIMESTAMP_NOT_AVAILABLE, + TopicPartition, +) +from confluent_kafka.admin import ( + BrokerMetadata, + ClusterMetadata, + ConfigResource, + ConfigSource, + NewTopic, + OffsetSpec, + PartitionMetadata, + ResourceType, + TopicMetadata, +) +from confluent_kafka.error import KafkaError, KafkaException + +import enum + +__all__ = ( + "BrokerMetadata", + "ClusterMetadata", + "ConfigResource", + "ConfigSource", + "ConfigSource", + "DEFAULT_REQUEST_TIMEOUT_MS", + "KafkaError", + "KafkaException", + "Message", + "NewTopic", + "OffsetSpec", + "PartitionMetadata", + "ResourceType", + "Timestamp", + "TopicMetadata", + "TopicPartition", +) + +# A constant that corresponds to the default value of request.timeout.ms in +# the librdkafka C library +DEFAULT_REQUEST_TIMEOUT_MS = 30000 + + +class Timestamp(enum.IntEnum): + NOT_AVAILABLE = TIMESTAMP_NOT_AVAILABLE + CREATE_TIME = TIMESTAMP_CREATE_TIME + LOG_APPEND_TIME = TIMESTAMP_LOG_APPEND_TIME diff --git a/karapace/kafka_rest_apis/__init__.py b/karapace/kafka_rest_apis/__init__.py index b9b7b28aa..e55c0d945 100644 --- a/karapace/kafka_rest_apis/__init__.py +++ b/karapace/kafka_rest_apis/__init__.py @@ -14,7 +14,8 @@ ) from karapace.config import Config, create_client_ssl_context from karapace.errors import InvalidSchema -from karapace.kafka.admin import KafkaAdminClient, KafkaException +from karapace.kafka.admin import KafkaAdminClient +from karapace.kafka.types import KafkaException from karapace.kafka_rest_apis.authentication import ( get_auth_config_from_header, get_expiration_time_from_header, diff --git a/karapace/kafka_utils.py b/karapace/kafka_utils.py index c70cd530c..0e88026d7 100644 --- a/karapace/kafka_utils.py +++ b/karapace/kafka_utils.py @@ -3,9 +3,8 @@ See LICENSE for details """ from .config import Config -from .utils import KarapaceKafkaClient -from kafka import KafkaConsumer from karapace.kafka.admin import KafkaAdminClient +from karapace.kafka.consumer import KafkaConsumer from karapace.kafka.producer import KafkaProducer from typing import Iterator @@ -42,7 +41,6 @@ def kafka_consumer_from_config(config: Config, topic: str) -> Iterator[KafkaCons sasl_plain_password=config["sasl_plain_password"], auto_offset_reset="earliest", metadata_max_age_ms=config["metadata_max_age_ms"], - kafka_client=KarapaceKafkaClient, ) try: yield consumer diff --git a/karapace/master_coordinator.py b/karapace/master_coordinator.py index 45497daa6..9ff823290 100644 --- a/karapace/master_coordinator.py +++ b/karapace/master_coordinator.py @@ -5,12 +5,12 @@ See LICENSE for details """ from dataclasses import dataclass -from kafka import KafkaConsumer from kafka.coordinator.base import BaseCoordinator from kafka.errors import NoBrokersAvailable, NodeNotReadyError from kafka.metrics import MetricConfig, Metrics from karapace import constants from karapace.config import Config +from karapace.kafka.types import DEFAULT_REQUEST_TIMEOUT_MS from karapace.typing import JsonData, JsonObject from karapace.utils import json_decode, json_encode, KarapaceKafkaClient from karapace.version import __version__ @@ -238,7 +238,7 @@ def init_schema_coordinator(self) -> None: election_strategy=self.config.get("master_election_strategy", "lowest"), group_id=self.config["group_id"], session_timeout_ms=session_timeout_ms, - request_timeout_ms=max(session_timeout_ms, KafkaConsumer.DEFAULT_CONFIG["request_timeout_ms"]), + request_timeout_ms=max(session_timeout_ms, DEFAULT_REQUEST_TIMEOUT_MS), ) self.schema_coordinator_ready.set() diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index b2fde9bbf..d5e73de15 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -10,7 +10,6 @@ from contextlib import closing, ExitStack from enum import Enum from jsonschema.validators import Draft7Validator -from kafka import KafkaConsumer, TopicPartition from kafka.errors import ( InvalidReplicationFactorError, KafkaConfigurationError, @@ -25,6 +24,8 @@ from karapace.errors import InvalidReferences, InvalidSchema from karapace.in_memory_database import InMemoryDatabase from karapace.kafka.admin import KafkaAdminClient +from karapace.kafka.consumer import KafkaConsumer +from karapace.kafka.types import Message, TopicPartition from karapace.key_format import is_key_in_canonical_format, KeyFormatter, KeyMode from karapace.master_coordinator import MasterCoordinator from karapace.offset_watcher import OffsetWatcher @@ -33,7 +34,7 @@ from karapace.schema_references import LatestVersionReference, Reference, reference_from_mapping, Referents from karapace.statsd import StatsClient from karapace.typing import JsonObject, ResolvedVersion, SchemaId, Subject -from karapace.utils import json_decode, JSONDecodeError, KarapaceKafkaClient +from karapace.utils import json_decode, JSONDecodeError from threading import Event, Thread from typing import Final, Mapping, Sequence @@ -69,11 +70,9 @@ class MessageType(Enum): def _create_consumer_from_config(config: Config) -> KafkaConsumer: # Group not set on purpose, all consumers read the same data session_timeout_ms = config["session_timeout_ms"] - request_timeout_ms = max(session_timeout_ms, KafkaConsumer.DEFAULT_CONFIG["request_timeout_ms"]) return KafkaConsumer( config["topic_name"], enable_auto_commit=False, - api_version=(1, 0, 0), bootstrap_servers=config["bootstrap_uri"], client_id=config["client_id"], fetch_max_wait_ms=50, @@ -86,8 +85,6 @@ def _create_consumer_from_config(config: Config) -> KafkaConsumer: sasl_plain_password=config["sasl_plain_password"], auto_offset_reset="earliest", session_timeout_ms=session_timeout_ms, - request_timeout_ms=request_timeout_ms, - kafka_client=KarapaceKafkaClient, metadata_max_age_ms=config["metadata_max_age_ms"], ) @@ -117,7 +114,7 @@ def __init__( ) -> None: Thread.__init__(self, name="schema-reader") self.master_coordinator = master_coordinator - self.timeout_ms = 200 + self.timeout_s = 0.2 self.config = config self.database = database @@ -234,10 +231,7 @@ def _get_beginning_offset(self) -> int: assert self.consumer is not None, "Thread must be started" try: - offsets = self.consumer.beginning_offsets([TopicPartition(self.config["topic_name"], 0)]) - # Offset in the response is the offset for last offset. - # Reduce by one for matching on startup. - beginning_offset = list(offsets.values())[0] - 1 + beginning_offset, _ = self.consumer.get_watermark_offsets(TopicPartition(self.config["topic_name"], 0)) return beginning_offset except KafkaTimeoutError: LOG.exception("Reading begin offsets timed out.") @@ -253,7 +247,7 @@ def _is_ready(self) -> bool: assert self.consumer is not None, "Thread must be started" try: - offsets = self.consumer.end_offsets([TopicPartition(self.config["topic_name"], 0)]) + _, end_offset = self.consumer.get_watermark_offsets(TopicPartition(self.config["topic_name"], 0)) except KafkaTimeoutError: LOG.exception("Reading end offsets timed out.") return False @@ -263,7 +257,7 @@ def _is_ready(self) -> bool: return False # Offset in the response is the offset for the next upcoming message. # Reduce by one for actual highest offset. - self._highest_offset = list(offsets.values())[0] - 1 + self._highest_offset = end_offset - 1 cur_time = time.monotonic() time_from_last_check = cur_time - self.last_check progress_pct = 0 if not self._highest_offset else round((self.offset / self._highest_offset) * 100, 2) @@ -281,7 +275,7 @@ def highest_offset(self) -> int: return max(self._highest_offset, self._offset_watcher.greatest_offset()) @staticmethod - def _parse_message_value(raw_value: str) -> JsonObject | None: + def _parse_message_value(raw_value: str | bytes) -> JsonObject | None: value = json_decode(raw_value) if isinstance(value, dict): return value @@ -292,7 +286,7 @@ def _parse_message_value(raw_value: str) -> JsonObject | None: def handle_messages(self) -> None: assert self.consumer is not None, "Thread must be started" - raw_msgs = self.consumer.poll(timeout_ms=self.timeout_ms) + msgs: list[Message] = self.consumer.consume(timeout=self.timeout_s) if self.ready is False: self.ready = self._is_ready() @@ -306,49 +300,52 @@ def handle_messages(self) -> None: if are_we_master is True: watch_offsets = True - for _, msgs in raw_msgs.items(): - schema_records_processed_keymode_canonical = 0 - schema_records_processed_keymode_deprecated_karapace = 0 - for msg in msgs: + schema_records_processed_keymode_canonical = 0 + schema_records_processed_keymode_deprecated_karapace = 0 + for msg in msgs: + try: + message_key = msg.key() + if message_key is None: + continue + key = json_decode(message_key) + except JSONDecodeError: + LOG.exception("Invalid JSON in msg.key()") + continue + + assert isinstance(key, dict) + msg_keymode = KeyMode.CANONICAL if is_key_in_canonical_format(key) else KeyMode.DEPRECATED_KARAPACE + # Key mode detection happens on startup. + # Default keymode is CANONICAL and preferred unless any data consumed + # has key in non-canonical format. If keymode is set to DEPRECATED_KARAPACE + # the subsequent keys are omitted from detection. + if not self.ready and self.key_formatter.get_keymode() == KeyMode.CANONICAL: + if msg_keymode == KeyMode.DEPRECATED_KARAPACE: + self.key_formatter.set_keymode(KeyMode.DEPRECATED_KARAPACE) + + value = None + message_value = msg.value() + if message_value: try: - key = json_decode(msg.key) + value = self._parse_message_value(message_value) except JSONDecodeError: - LOG.exception("Invalid JSON in msg.key") + LOG.exception("Invalid JSON in msg.value()") continue - assert isinstance(key, dict) - msg_keymode = KeyMode.CANONICAL if is_key_in_canonical_format(key) else KeyMode.DEPRECATED_KARAPACE - # Key mode detection happens on startup. - # Default keymode is CANONICAL and preferred unless any data consumed - # has key in non-canonical format. If keymode is set to DEPRECATED_KARAPACE - # the subsequent keys are omitted from detection. - if not self.ready and self.key_formatter.get_keymode() == KeyMode.CANONICAL: - if msg_keymode == KeyMode.DEPRECATED_KARAPACE: - self.key_formatter.set_keymode(KeyMode.DEPRECATED_KARAPACE) - - value = None - if msg.value: - try: - value = self._parse_message_value(msg.value) - except JSONDecodeError: - LOG.exception("Invalid JSON in msg.value") - continue - - self.handle_msg(key, value) - self.offset = msg.offset - - if msg_keymode == KeyMode.CANONICAL: - schema_records_processed_keymode_canonical += 1 - else: - schema_records_processed_keymode_deprecated_karapace += 1 - - if self.ready and watch_offsets: - self._offset_watcher.offset_seen(self.offset) - - self._report_schema_metrics( - schema_records_processed_keymode_canonical, - schema_records_processed_keymode_deprecated_karapace, - ) + self.handle_msg(key, value) + self.offset = msg.offset() + + if msg_keymode == KeyMode.CANONICAL: + schema_records_processed_keymode_canonical += 1 + else: + schema_records_processed_keymode_deprecated_karapace += 1 + + if self.ready and watch_offsets: + self._offset_watcher.offset_seen(self.offset) + + self._report_schema_metrics( + schema_records_processed_keymode_canonical, + schema_records_processed_keymode_deprecated_karapace, + ) def _report_schema_metrics( self, diff --git a/pytest.ini b/pytest.ini index 0f19813c2..8ed1116d2 100644 --- a/pytest.ini +++ b/pytest.ini @@ -1,4 +1,4 @@ [pytest] addopts = -ra --numprocesses auto --import-mode=importlib -timeout = 60 +timeout = 90 timeout_func_only = true diff --git a/stubs/confluent_kafka/__init__.pyi b/stubs/confluent_kafka/__init__.pyi index 5762cb52a..3d26b0393 100644 --- a/stubs/confluent_kafka/__init__.pyi +++ b/stubs/confluent_kafka/__init__.pyi @@ -1,4 +1,21 @@ from ._model import IsolationLevel -from .cimpl import Message, Producer, TopicPartition +from .cimpl import ( + Consumer, + Message, + Producer, + TIMESTAMP_CREATE_TIME, + TIMESTAMP_LOG_APPEND_TIME, + TIMESTAMP_NOT_AVAILABLE, + TopicPartition, +) -__all__ = ("IsolationLevel", "Message", "Producer", "TopicPartition") +__all__ = ( + "Consumer", + "IsolationLevel", + "Message", + "Producer", + "TIMESTAMP_CREATE_TIME", + "TIMESTAMP_LOG_APPEND_TIME", + "TIMESTAMP_NOT_AVAILABLE", + "TopicPartition", +) diff --git a/stubs/confluent_kafka/cimpl.pyi b/stubs/confluent_kafka/cimpl.pyi index 9b573c5b9..a794f10ca 100644 --- a/stubs/confluent_kafka/cimpl.pyi +++ b/stubs/confluent_kafka/cimpl.pyi @@ -31,8 +31,13 @@ class TopicPartition: partition: int = -1, offset: int = -1001, metadata: str | None = None, - leader_epoc: int | None = None, - ) -> None: ... + leader_epoch: int | None = None, + ) -> None: + self.topic: str + self.partition: int + self.offset: int + self.metadata: str | None + self.leader_epoch: int | None class Message: def offset(self) -> int: ... @@ -41,6 +46,7 @@ class Message: def value(self) -> str | bytes | None: ... def topic(self) -> str: ... def partition(self) -> int: ... + def headers(self) -> list[tuple[str, bytes]] | None: ... class Producer: def produce( @@ -56,3 +62,18 @@ class Producer: def flush(self, timeout: float = -1) -> None: ... def list_topics(self, topic: str | None = None, timeout: float = -1) -> ClusterMetadata: ... def poll(self, timeout: float = -1) -> int: ... + +class Consumer: + def subscribe(self, topics: list[str]) -> None: ... + def get_watermark_offsets( + self, partition: TopicPartition, timeout: float | None = None, cached: bool = False + ) -> tuple[int, int]: ... + def close(self) -> None: ... + def list_topics(self, topic: str | None = None, timeout: float = -1) -> ClusterMetadata: ... + def consume(self, num_messages: int = 1, timeout: float = -1) -> list[Message]: ... + def poll(self, timeout: float = -1) -> Message | None: ... + def assign(self, partitions: list[TopicPartition]) -> None: ... + +TIMESTAMP_CREATE_TIME = ... +TIMESTAMP_NOT_AVAILABLE = ... +TIMESTAMP_LOG_APPEND_TIME = ... diff --git a/tests/integration/backup/test_get_topic_configurations.py b/tests/integration/backup/test_get_topic_configurations.py index 16592aed3..587ae65a4 100644 --- a/tests/integration/backup/test_get_topic_configurations.py +++ b/tests/integration/backup/test_get_topic_configurations.py @@ -5,7 +5,8 @@ from __future__ import annotations from karapace.backup.topic_configurations import ALL_CONFIG_SOURCES, ConfigSource, DEFAULT_CONFIGS, get_topic_configurations -from karapace.kafka.admin import KafkaAdminClient, NewTopic +from karapace.kafka.admin import KafkaAdminClient +from karapace.kafka.types import NewTopic import pytest diff --git a/tests/integration/backup/test_legacy_backup.py b/tests/integration/backup/test_legacy_backup.py index 3d732bd58..c9fed2e73 100644 --- a/tests/integration/backup/test_legacy_backup.py +++ b/tests/integration/backup/test_legacy_backup.py @@ -4,8 +4,6 @@ Copyright (c) 2023 Aiven Ltd See LICENSE for details """ -from datetime import timedelta -from kafka import KafkaConsumer from karapace.backup import api from karapace.backup.api import BackupVersion from karapace.backup.errors import StaleConsumerError @@ -13,6 +11,7 @@ from karapace.client import Client from karapace.config import set_config_defaults from karapace.kafka.admin import KafkaAdminClient +from karapace.kafka.consumer import KafkaConsumer from karapace.key_format import is_key_in_canonical_format from karapace.utils import Expiration from pathlib import Path @@ -131,18 +130,16 @@ def _assert_canonical_key_format( schemas_topic, group_id="assert-canonical-key-format-consumer", enable_auto_commit=False, - api_version=(1, 0, 0), bootstrap_servers=bootstrap_servers, auto_offset_reset="earliest", ) - raw_msgs = consumer.poll(timeout_ms=2000) - while raw_msgs: - for _, messages in raw_msgs.items(): - for message in messages: - key = json.loads(message.key) - assert is_key_in_canonical_format(key), f"Not in canonical format: {key}" - raw_msgs = consumer.poll() + messages = consumer.consume(timeout=30) + while messages: + for message in messages: + key = json.loads(message.key()) + assert is_key_in_canonical_format(key), f"Not in canonical format: {key}" + messages = consumer.consume(timeout=30) consumer.close() @@ -174,7 +171,7 @@ async def test_backup_restore( # The restored karapace should have the previously created subject all_subjects = [] - expiration = Expiration.from_timeout(timeout=10) + expiration = Expiration.from_timeout(timeout=30) while subject not in all_subjects: expiration.raise_timeout_if_expired( msg_format="{subject} not in {all_subjects}", @@ -184,7 +181,7 @@ async def test_backup_restore( res = await registry_async_client.get("subjects") assert res.status_code == 200 all_subjects = res.json() - time.sleep(0.1) + time.sleep(1) # Test a few exotic scenarios @@ -260,13 +257,13 @@ async def test_stale_consumer( # The proper way to test this would be with quotas by throttling our client to death while using a very short # poll timeout. However, we have no way to set up quotas because all Kafka clients available to us do not # implement the necessary APIs. - with mock.patch(f"{KafkaConsumer.__module__}.{KafkaConsumer.__qualname__}._poll_once") as poll_once_mock: - poll_once_mock.return_value = {} + with mock.patch(f"{KafkaConsumer.__module__}.{KafkaConsumer.__qualname__}.poll") as poll_mock: + poll_mock.return_value = None api.create_backup( config=config, backup_location=tmp_path / "backup", topic_name=api.normalize_topic_name(None, config), version=BackupVersion.V2, - poll_timeout=PollTimeout(timedelta(seconds=1)), + poll_timeout=PollTimeout.of(seconds=1), ) assert str(e.value) == f"{registry_cluster.schemas_topic}:0#0 (0,0) after PT1S" diff --git a/tests/integration/backup/test_v3_backup.py b/tests/integration/backup/test_v3_backup.py index be336afcf..cf2d993a4 100644 --- a/tests/integration/backup/test_v3_backup.py +++ b/tests/integration/backup/test_v3_backup.py @@ -5,8 +5,6 @@ from __future__ import annotations from dataclasses import fields -from kafka import TopicPartition -from kafka.consumer.fetcher import ConsumerRecord from kafka.errors import UnknownTopicOrPartitionError from karapace.backup import api from karapace.backup.api import _consume_records, TopicName @@ -16,8 +14,9 @@ from karapace.backup.poll_timeout import PollTimeout from karapace.backup.topic_configurations import ConfigSource, get_topic_configurations from karapace.config import Config, set_config_defaults -from karapace.kafka.admin import KafkaAdminClient, NewTopic +from karapace.kafka.admin import KafkaAdminClient from karapace.kafka.producer import KafkaProducer +from karapace.kafka.types import Message, NewTopic, Timestamp, TopicPartition from karapace.kafka_utils import kafka_consumer_from_config, kafka_producer_from_config from karapace.version import __version__ from pathlib import Path @@ -182,28 +181,28 @@ def test_roundtrip_from_kafka_state( ) # First record. - assert isinstance(first_record, ConsumerRecord) - assert first_record.topic == new_topic.topic - assert first_record.partition == partition + assert isinstance(first_record, Message) + assert first_record.topic() == new_topic.topic + assert first_record.partition() == partition # Note: This might be unreliable due to not using idempotent producer, i.e. we have # no guarantee against duplicates currently. - assert first_record.offset == 0 - assert first_record.timestamp == 1683474641 - assert first_record.timestamp_type == 0 - assert first_record.key == b"bar" - assert first_record.value == b"foo" - assert first_record.headers == [] + assert first_record.offset() == 0 + assert first_record.timestamp()[1] == 1683474641 + assert first_record.timestamp()[0] == Timestamp.CREATE_TIME + assert first_record.key() == b"bar" + assert first_record.value() == b"foo" + assert first_record.headers() is None # Second record. - assert isinstance(second_record, ConsumerRecord) - assert second_record.topic == new_topic.topic - assert second_record.partition == partition - assert second_record.offset == 1 - assert second_record.timestamp == 1683474657 - assert second_record.timestamp_type == 0 - assert second_record.key == b"foo" - assert second_record.value == b"bar" - assert second_record.headers == [ + assert isinstance(second_record, Message) + assert second_record.topic() == new_topic.topic + assert second_record.partition() == partition + assert second_record.offset() == 1 + assert second_record.timestamp()[1] == 1683474657 + assert second_record.timestamp()[0] == Timestamp.CREATE_TIME + assert second_record.key() == b"foo" + assert second_record.value() == b"bar" + assert second_record.headers() == [ ("some-header", b"some header value"), ("other-header", b"some other header value"), ] diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index d9ab70d40..071077dd3 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -12,8 +12,9 @@ from filelock import FileLock from karapace.client import Client from karapace.config import Config, set_config_defaults, write_config -from karapace.kafka.admin import KafkaAdminClient, NewTopic +from karapace.kafka.admin import KafkaAdminClient from karapace.kafka.producer import KafkaProducer +from karapace.kafka.types import NewTopic from karapace.kafka_rest_apis import KafkaRest from pathlib import Path from tests.conftest import KAFKA_VERSION diff --git a/tests/integration/kafka/test_admin.py b/tests/integration/kafka/test_admin.py index d6d586a6c..0c9079c9c 100644 --- a/tests/integration/kafka/test_admin.py +++ b/tests/integration/kafka/test_admin.py @@ -6,8 +6,9 @@ from __future__ import annotations from kafka.errors import InvalidReplicationFactorError, TopicAlreadyExistsError, UnknownTopicOrPartitionError -from karapace.kafka.admin import ConfigSource, KafkaAdminClient, NewTopic +from karapace.kafka.admin import KafkaAdminClient from karapace.kafka.producer import KafkaProducer +from karapace.kafka.types import ConfigSource, NewTopic from tests.utils import new_topic as create_new_topic import pytest diff --git a/tests/integration/kafka/test_consumer.py b/tests/integration/kafka/test_consumer.py new file mode 100644 index 000000000..9ca5b9320 --- /dev/null +++ b/tests/integration/kafka/test_consumer.py @@ -0,0 +1,26 @@ +""" +Copyright (c) 2023 Aiven Ltd +See LICENSE for details +""" +from __future__ import annotations + +from karapace.kafka.consumer import KafkaConsumer +from karapace.kafka.types import NewTopic +from tests.integration.utils.kafka_server import KafkaServers + + +class TestPartitionsForTopic: + def test_partitions_for_returns_empty_for_unknown_topic(self, kafka_servers: KafkaServers) -> None: + consumer = KafkaConsumer(bootstrap_servers=kafka_servers.bootstrap_servers, topic="nonexistent") + + assert consumer.partitions_for_topic("nonexistent") == {} + + def test_partitions_for(self, kafka_servers: KafkaServers, new_topic: NewTopic) -> None: + consumer = KafkaConsumer(bootstrap_servers=kafka_servers.bootstrap_servers, topic=new_topic.topic) + + partitions = consumer.partitions_for_topic(new_topic.topic) + + assert len(partitions) == 1 + assert partitions[0].id == 0 + assert partitions[0].replicas == [1] + assert partitions[0].isrs == [1] diff --git a/tests/integration/kafka/test_producer.py b/tests/integration/kafka/test_producer.py index 69e06ea08..6faa42216 100644 --- a/tests/integration/kafka/test_producer.py +++ b/tests/integration/kafka/test_producer.py @@ -6,8 +6,8 @@ from __future__ import annotations from kafka.errors import MessageSizeTooLargeError, UnknownTopicOrPartitionError -from karapace.kafka.admin import NewTopic from karapace.kafka.producer import KafkaProducer +from karapace.kafka.types import NewTopic, Timestamp import pytest import time @@ -37,6 +37,7 @@ def test_send(self, producer: KafkaProducer, new_topic: NewTopic) -> None: assert message.topic() == new_topic.topic assert message.key() == key assert message.value() == value + assert message.timestamp()[0] == Timestamp.CREATE_TIME assert message.timestamp()[1] == timestamp def test_send_raises_for_unknown_topic(self, producer: KafkaProducer) -> None: diff --git a/tests/unit/backup/backends/test_v2.py b/tests/unit/backup/backends/test_v2.py index b38f83469..b2ad273b2 100644 --- a/tests/unit/backup/backends/test_v2.py +++ b/tests/unit/backup/backends/test_v2.py @@ -5,12 +5,13 @@ from __future__ import annotations from functools import partial -from kafka.consumer.fetcher import ConsumerRecord from karapace.backup.backends.reader import ProducerSend, RestoreTopicLegacy from karapace.backup.backends.v2 import AnonymizeAvroWriter, SchemaBackupV2Reader, SchemaBackupV2Writer from karapace.backup.encoders import encode_key, encode_value +from karapace.kafka.types import Timestamp from karapace.key_format import KeyFormatter from pathlib import Path +from tests.utils import StubMessage import datetime import json @@ -29,7 +30,7 @@ def test_schema_backup_v2_roundtrip(tmp_path: Path) -> None: topic_name = "a-topic" partition_index = 123 records = ( - ConsumerRecord( + StubMessage( key=json.dumps( { "keytype": "SCHEMA", @@ -50,15 +51,10 @@ def test_schema_backup_v2_roundtrip(tmp_path: Path) -> None: topic=topic_name, partition=partition_index, offset=0, - timestamp=round(time.time()), - timestamp_type=None, - headers=(), - checksum=None, - serialized_key_size=None, - serialized_value_size=None, - serialized_header_size=None, + timestamp=(Timestamp.CREATE_TIME, round(time.time())), + headers=None, ), - ConsumerRecord( + StubMessage( key=json.dumps( { "keytype": "SCHEMA", @@ -79,13 +75,8 @@ def test_schema_backup_v2_roundtrip(tmp_path: Path) -> None: topic=topic_name, partition=partition_index, offset=0, - timestamp=round(time.time()), - timestamp_type=None, - headers=(), - checksum=None, - serialized_key_size=None, - serialized_value_size=None, - serialized_header_size=None, + timestamp=(Timestamp.CREATE_TIME, round(time.time())), + headers=None, ), ) @@ -166,7 +157,7 @@ def test_anonymize_avro_roundtrip(tmp_path: Path) -> None: topic_name = "a-topic" partition_index = 123 records = ( - ConsumerRecord( + StubMessage( key=json.dumps( { "keytype": "SCHEMA", @@ -194,15 +185,10 @@ def test_anonymize_avro_roundtrip(tmp_path: Path) -> None: topic=topic_name, partition=partition_index, offset=0, - timestamp=round(time.time()), - timestamp_type=None, - headers=(), - checksum=None, - serialized_key_size=None, - serialized_value_size=None, - serialized_header_size=None, + timestamp=(Timestamp.CREATE_TIME, round(time.time())), + headers=None, ), - ConsumerRecord( + StubMessage( key=json.dumps( { "keytype": "SCHEMA", @@ -230,13 +216,8 @@ def test_anonymize_avro_roundtrip(tmp_path: Path) -> None: topic=topic_name, partition=partition_index, offset=0, - timestamp=round(time.time()), - timestamp_type=None, - headers=(), - checksum=None, - serialized_key_size=None, - serialized_value_size=None, - serialized_header_size=None, + timestamp=(Timestamp.CREATE_TIME, round(time.time())), + headers=None, ), ) diff --git a/tests/unit/backup/backends/v3/test_backend.py b/tests/unit/backup/backends/v3/test_backend.py index 4f8562311..dd4b47afd 100644 --- a/tests/unit/backup/backends/v3/test_backend.py +++ b/tests/unit/backup/backends/v3/test_backend.py @@ -3,7 +3,6 @@ See LICENSE for details """ from dataclasses import replace -from kafka.consumer.fetcher import ConsumerRecord from karapace.backup.backends.reader import ProducerSend, RestoreTopic from karapace.backup.backends.v3.backend import _PartitionStats, SchemaBackupV3Reader, SchemaBackupV3Writer from karapace.backup.backends.v3.errors import ( @@ -17,6 +16,7 @@ from karapace.backup.backends.v3.readers import read_records from karapace.backup.backends.v3.schema import ChecksumAlgorithm, DataFile from pathlib import Path +from tests.utils import StubMessage from unittest import mock import datetime @@ -33,33 +33,23 @@ def test_writer_reader_roundtrip(tmp_path: Path) -> None: finished_at = datetime.datetime.now(datetime.timezone.utc) records = ( - ConsumerRecord( + StubMessage( key=b"foo", value=b"bar", topic=topic_name, partition=partition_index, offset=10, - timestamp=round(time.time()), - timestamp_type=None, - headers=(), - checksum=None, - serialized_key_size=None, - serialized_value_size=None, - serialized_header_size=None, + timestamp=(1, round(time.time())), # TODO + headers=None, ), - ConsumerRecord( + StubMessage( key=b"foo", value=b"bar", topic=topic_name, partition=partition_index, offset=14, - timestamp=round(time.time()), - timestamp_type=None, - headers=(("some-key", b"some-value"),), - checksum=None, - serialized_key_size=None, - serialized_value_size=None, - serialized_header_size=None, + timestamp=(1, round(time.time())), # TODO + headers=[("some-key", b"some-value")], ), ) topic_configurations = {"max.message.bytes": "1024"} @@ -124,36 +114,31 @@ def test_writer_reader_roundtrip(tmp_path: Path) -> None: ProducerSend( topic_name=topic_name, partition_index=partition_index, - key=records[0].key, - value=records[0].value, + key=records[0].key(), + value=records[0].value(), headers=(), - timestamp=records[0].timestamp, + timestamp=records[0].timestamp()[1], ), ProducerSend( topic_name=topic_name, partition_index=partition_index, - key=records[1].key, - value=records[1].value, + key=records[1].key(), + value=records[1].value(), headers=((b"some-key", b"some-value"),), - timestamp=records[0].timestamp, + timestamp=records[0].timestamp()[1], ), ) -def make_record(topic_name: str, partition_index: int, offset: int) -> ConsumerRecord: - return ConsumerRecord( +def make_record(topic_name: str, partition_index: int, offset: int) -> StubMessage: + return StubMessage( key=b"foo", value=b"bar", topic=topic_name, partition=partition_index, offset=offset, - timestamp=round(time.time()), - timestamp_type=None, - headers=(("some-key", b"some-value"),), - checksum=None, - serialized_key_size=None, - serialized_value_size=None, - serialized_header_size=None, + timestamp=(1, round(time.time())), # TODO + headers=[("some-key", b"some-value")], ) diff --git a/tests/unit/backup/test_api.py b/tests/unit/backup/test_api.py index c112d5ffc..820287cc3 100644 --- a/tests/unit/backup/test_api.py +++ b/tests/unit/backup/test_api.py @@ -4,9 +4,7 @@ """ from __future__ import annotations -from kafka import KafkaConsumer from kafka.errors import KafkaError, TopicAlreadyExistsError -from kafka.structs import PartitionMetadata from karapace import config from karapace.backup.api import ( _admin, @@ -24,6 +22,7 @@ from karapace.backup.errors import BackupError, PartitionCountError from karapace.config import Config from karapace.constants import DEFAULT_SCHEMA_TOPIC +from karapace.kafka.consumer import KafkaConsumer, PartitionMetadata from karapace.kafka.producer import KafkaProducer from pathlib import Path from types import FunctionType @@ -148,7 +147,16 @@ def test_skip_topic_creation( class TestClients: @staticmethod def _partition_metadata(c: int = 1) -> set[PartitionMetadata]: - return {PartitionMetadata("topic", i, 0, tuple(), tuple(), None) for i in range(0, c)} + def create(partition) -> PartitionMetadata: + metadata = PartitionMetadata() + metadata.id = partition + metadata.leader = 1 + metadata.replicas = () + metadata.isrs = () + + return metadata + + return {create(i) for i in range(c)} @pytest.mark.parametrize( "ctx_mng,client_class,partitions_method,close_method_name", diff --git a/tests/unit/backup/test_poll_timeout.py b/tests/unit/backup/test_poll_timeout.py index 9a0614038..ecd9bfce4 100644 --- a/tests/unit/backup/test_poll_timeout.py +++ b/tests/unit/backup/test_poll_timeout.py @@ -37,3 +37,6 @@ def test__repr__(self) -> None: def test_milliseconds(self) -> None: assert PollTimeout(timedelta(milliseconds=1000.5)).milliseconds == 1000 + + def test_seconds(self) -> None: + assert PollTimeout(timedelta(milliseconds=1500)).seconds == 1.5 diff --git a/tests/unit/test_schema_reader.py b/tests/unit/test_schema_reader.py index b6566e927..12a404ac5 100644 --- a/tests/unit/test_schema_reader.py +++ b/tests/unit/test_schema_reader.py @@ -137,9 +137,9 @@ class ReadinessTestCase(BaseTestCase): def test_readiness_check(testcase: ReadinessTestCase) -> None: key_formatter_mock = Mock() consumer_mock = Mock() - consumer_mock.poll.return_value = {} - # Return dict {partition: offsets}, end offset is the next upcoming record offset - consumer_mock.end_offsets.return_value = {0: testcase.end_offset} + consumer_mock.consume.return_value = [] + # Return tuple (beginning, end), end offset is the next upcoming record offset + consumer_mock.get_watermark_offsets.return_value = (0, testcase.end_offset) offset_watcher = OffsetWatcher() schema_reader = KafkaSchemaReader( diff --git a/tests/utils.py b/tests/utils.py index 12e544408..c1e636286 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -10,7 +10,7 @@ from karapace.utils import Expiration from pathlib import Path from subprocess import Popen -from typing import Callable, IO, List, Union +from typing import Any, Callable, IO, List, Union from urllib.parse import quote import asyncio @@ -307,3 +307,17 @@ def popen_karapace_all(config_path: Union[Path, str], stdout: IO, stderr: IO, ** kwargs["stdout"] = stdout kwargs["stderr"] = stderr return Popen([python_exe(), "-m", "karapace.karapace_all", str(config_path)], **kwargs) + + +class StubMessage: + """A stub to stand-in for `confluent_kafka.Message` in unittests. + + Since that class cannot be instantiated, thus this is a liberal simulation + of its behaviour ie. its attributes are accessible via getter functions: + `message.offset()`.""" + + def __init__(self, **attrs: Any) -> None: + self._attrs = attrs + + def __getattr__(self, key: str) -> None: + return lambda: self._attrs[key]