Skip to content

Commit

Permalink
Replace sync Kafka consumers with confluent_kafka one
Browse files Browse the repository at this point in the history
This change replaces all synchronous Kafka consumers (from the
kafka-python library) with a new implementation based on
confluent-kafka-python's `Consumer`, keeping the same interface as much
as possible.

The PyTest timeout is raised from 60s to 90s to accomodate for the
default poll timeout for backups consumers (otherwise the tests would
time out while still waiting for messages to arrive).o

Since the `conluent_kafka.Consumer` implementation does not allow for
consumers to be without a group ID, if the new `KafkaConsumer` client is
not given one, we'll generate one on the fly to mimic a groupless
behaviour.

Resources:
* confluent-kafka-python documentation:
  https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#
* librdkafka configuration documentation:
  https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md
  • Loading branch information
Mátyás Kuti committed Dec 6, 2023
1 parent 83157fa commit 981fa69
Show file tree
Hide file tree
Showing 31 changed files with 396 additions and 220 deletions.
22 changes: 10 additions & 12 deletions karapace/backup/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,23 @@
from concurrent.futures import Future
from enum import Enum
from functools import partial
from kafka import KafkaConsumer
from kafka.consumer.fetcher import ConsumerRecord
from kafka.errors import KafkaError, TopicAlreadyExistsError
from kafka.structs import TopicPartition
from karapace import constants
from karapace.backup.backends.v1 import SchemaBackupV1Reader
from karapace.backup.backends.v2 import AnonymizeAvroWriter, SchemaBackupV2Reader, SchemaBackupV2Writer, V2_MARKER
from karapace.backup.backends.v3.backend import SchemaBackupV3Reader, SchemaBackupV3Writer, VerifyFailure, VerifySuccess
from karapace.config import Config
from karapace.kafka.admin import KafkaAdminClient
from karapace.kafka.consumer import KafkaConsumer
from karapace.kafka.producer import KafkaProducer
from karapace.kafka.types import Message, TopicPartition
from karapace.kafka_utils import kafka_admin_from_config, kafka_consumer_from_config, kafka_producer_from_config
from karapace.key_format import KeyFormatter
from karapace.utils import assert_never
from pathlib import Path
from rich.console import Console
from tenacity import retry, retry_if_exception_type, RetryCallState, stop_after_delay, wait_fixed
from typing import Callable, Collection, Iterator, Literal, Mapping, NewType, Sized, TypeVar
from typing import Callable, Iterator, Literal, Mapping, NewType, Sized, TypeVar

import contextlib
import datetime
Expand Down Expand Up @@ -282,9 +281,8 @@ def _consume_records(
consumer: KafkaConsumer,
topic_partition: TopicPartition,
poll_timeout: PollTimeout,
) -> Iterator[ConsumerRecord]:
start_offset: int = consumer.beginning_offsets([topic_partition])[topic_partition]
end_offset: int = consumer.end_offsets([topic_partition])[topic_partition]
) -> Iterator[Message]:
start_offset, end_offset = consumer.get_watermark_offsets(topic_partition)
last_offset = start_offset

LOG.info(
Expand All @@ -301,12 +299,11 @@ def _consume_records(
end_offset -= 1 # high watermark to actual end offset

while True:
records: Collection[ConsumerRecord] = consumer.poll(poll_timeout.milliseconds).get(topic_partition, [])
if len(records) == 0:
record: Message | None = consumer.poll(timeout=poll_timeout.seconds)
if record is None:
raise StaleConsumerError(topic_partition, start_offset, end_offset, last_offset, poll_timeout)
for record in records:
yield record
last_offset = record.offset # pylint: disable=undefined-loop-variable
yield record
last_offset = record.offset()
if last_offset >= end_offset:
break

Expand Down Expand Up @@ -528,6 +525,7 @@ def create_backup(
with _consumer(config, topic_name) as consumer:
(partition,) = consumer.partitions_for_topic(topic_name)
topic_partition = TopicPartition(topic_name, partition)
consumer.assign([topic_partition])

try:
data_file = _write_partition(
Expand Down
22 changes: 13 additions & 9 deletions karapace/backup/backends/v3/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@
from .schema import ChecksumAlgorithm, DataFile, Header, Metadata, Record
from .writers import write_metadata, write_record
from dataclasses import dataclass
from kafka.consumer.fetcher import ConsumerRecord
from karapace.backup.backends.reader import BaseBackupReader, Instruction, ProducerSend, RestoreTopic
from karapace.backup.backends.writer import BytesBackupWriter, StdOut
from karapace.backup.safe_writer import bytes_writer, staging_directory
from karapace.dataclasses import default_dataclass
from karapace.kafka.types import Message
from karapace.utils import assert_never
from karapace.version import __version__
from pathlib import Path
Expand Down Expand Up @@ -334,27 +334,31 @@ def store_metadata(
def store_record(
self,
buffer: IO[bytes],
record: ConsumerRecord,
record: Message,
) -> None:
stats: Final = self._partition_stats[record.partition]
stats: Final = self._partition_stats[record.partition()]
checksum_checkpoint: Final = stats.get_checkpoint(
records_threshold=self._max_records_per_checkpoint,
bytes_threshold=self._max_bytes_per_checkpoint,
)
offset_start: Final = buffer.tell()

record_key = record.key()
record_value = record.value()

write_record(
buffer,
record=Record(
key=record.key,
value=record.value,
headers=tuple(Header(key=key.encode(), value=value) for key, value in record.headers),
offset=record.offset,
timestamp=record.timestamp,
key=record_key.encode() if isinstance(record_key, str) else record_key,
value=record_value.encode() if isinstance(record_value, str) else record_value,
headers=tuple(Header(key=key.encode(), value=value) for key, value in record.headers() or []),
offset=record.offset(),
timestamp=record.timestamp()[1],
checksum_checkpoint=checksum_checkpoint,
),
running_checksum=stats.running_checksum,
)
stats.update(
bytes_offset=buffer.tell() - offset_start,
record_offset=record.offset,
record_offset=record.offset(),
)
15 changes: 11 additions & 4 deletions karapace/backup/backends/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
"""
from __future__ import annotations

from kafka.consumer.fetcher import ConsumerRecord
from karapace.backup.safe_writer import bytes_writer, str_writer
from karapace.kafka.types import Message
from pathlib import Path
from typing import ContextManager, Generic, IO, Iterator, Literal, Mapping, Sequence, TypeVar
from typing_extensions import TypeAlias
Expand Down Expand Up @@ -98,7 +98,7 @@ def store_metadata(
def store_record(
self,
buffer: IO[B],
record: ConsumerRecord,
record: Message,
) -> None:
"""
Called in order for each record read from a topic to be backed up. It's safe to
Expand Down Expand Up @@ -154,9 +154,16 @@ class BaseKVBackupWriter(StrBackupWriter, abc.ABC):
def store_record(
self,
buffer: IO[str],
record: ConsumerRecord,
record: Message,
) -> None:
buffer.write(self.serialize_record(record.key, record.value))
record_key = record.key()
record_value = record.value()
buffer.write(
self.serialize_record(
record_key.encode() if isinstance(record_key, str) else record_key,
record_value.encode() if isinstance(record_value, str) else record_value,
)
)

@staticmethod
@abc.abstractmethod
Expand Down
2 changes: 1 addition & 1 deletion karapace/backup/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""
from kafka.structs import TopicPartition
from karapace.backup.poll_timeout import PollTimeout
from karapace.kafka.types import TopicPartition

__all__ = ["BackupError", "BackupTopicAlreadyExists", "EmptyPartition", "PartitionCountError", "StaleConsumerError"]

Expand Down
5 changes: 5 additions & 0 deletions karapace/backup/poll_timeout.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,8 @@ def __repr__(self) -> str:
def milliseconds(self) -> int:
"""Returns this poll timeout in milliseconds, anything smaller than a milliseconds is ignored (no rounding)."""
return self.__value // timedelta(milliseconds=1)

@cached_property
def seconds(self) -> float:
"""Returns this poll timeout in seconds."""
return self.__value / timedelta(seconds=1)
3 changes: 2 additions & 1 deletion karapace/backup/topic_configurations.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
"""
from __future__ import annotations

from karapace.kafka.admin import ConfigSource, KafkaAdminClient
from karapace.kafka.admin import KafkaAdminClient
from karapace.kafka.types import ConfigSource
from typing import Container, Final

ALL_CONFIG_SOURCES: Final = ConfigSource
Expand Down
22 changes: 11 additions & 11 deletions karapace/kafka/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,25 @@

from collections.abc import Iterable
from concurrent.futures import Future
from confluent_kafka import TopicPartition
from confluent_kafka.admin import (
AdminClient,
from confluent_kafka.admin import AdminClient
from karapace.constants import TOPIC_CREATION_TIMEOUT_S
from karapace.kafka.common import (
_KafkaConfigMixin,
raise_from_kafkaexception,
single_futmap_result,
UnknownTopicOrPartitionError,
)
from karapace.kafka.types import (
BrokerMetadata,
ClusterMetadata,
ConfigResource,
ConfigSource,
KafkaException,
NewTopic,
OffsetSpec,
ResourceType,
TopicMetadata,
)
from confluent_kafka.error import KafkaException
from karapace.constants import TOPIC_CREATION_TIMEOUT_S
from karapace.kafka.common import (
_KafkaConfigMixin,
raise_from_kafkaexception,
single_futmap_result,
UnknownTopicOrPartitionError,
TopicPartition,
)
from typing import Container

Expand Down
16 changes: 14 additions & 2 deletions karapace/kafka/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@

from collections.abc import Iterable
from concurrent.futures import Future
from confluent_kafka.error import KafkaError, KafkaException
from kafka.errors import AuthenticationFailedError, for_code, NoBrokersAvailable, UnknownTopicOrPartitionError
from typing import Any, Callable, NoReturn, Protocol, TypedDict, TypeVar
from karapace.kafka.types import KafkaError, KafkaException
from typing import Any, Callable, Literal, NoReturn, Protocol, TypedDict, TypeVar
from typing_extensions import Unpack

import logging
Expand Down Expand Up @@ -85,6 +85,12 @@ class KafkaClientParams(TypedDict, total=False):
ssl_certfile: str | None
ssl_keyfile: str | None
sasl_oauth_token_provider: TokenWithExpiryProvider
# Consumer-only
auto_offset_reset: Literal["smallest", "earliest", "beginning", "largest", "latest", "end", "error"]
enable_auto_commit: bool
fetch_max_wait_ms: int
group_id: str
session_timeout_ms: int


class _KafkaConfigMixin:
Expand Down Expand Up @@ -128,6 +134,12 @@ def _get_config_from_params(self, bootstrap_servers: Iterable[str] | str, **para
"ssl.certificate.location": params.get("ssl_certfile"),
"ssl.key.location": params.get("ssl_keyfile"),
"error_cb": self._error_callback,
# Consumer-only
"auto.offset.reset": params.get("auto_offset_reset"),
"enable.auto.commit": params.get("enable_auto_commit"),
"fetch.wait.max.ms": params.get("fetch_max_wait_ms"),
"group.id": params.get("group_id"),
"session.timeout.ms": params.get("session_timeout_ms"),
}
config = {key: value for key, value in config.items() if value is not None}

Expand Down
44 changes: 44 additions & 0 deletions karapace/kafka/consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
"""
Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""

from __future__ import annotations

from confluent_kafka import Consumer
from karapace.kafka.common import _KafkaConfigMixin, KafkaClientParams, raise_from_kafkaexception
from karapace.kafka.types import KafkaException, PartitionMetadata
from typing import Iterable
from typing_extensions import Unpack

import secrets


class KafkaConsumer(_KafkaConfigMixin, Consumer):
def __init__(
self,
topic: str,
bootstrap_servers: Iterable[str] | str,
verify_connection: bool = True,
**params: Unpack[KafkaClientParams],
) -> None:
# The `confluent_kafka.Consumer` does not allow for a missing group id
# if the client of this class does not provide one, we'll generate a
# unique group id to achieve the groupless behaviour
if "group_id" not in params:
params["group_id"] = self._create_group_id()

super().__init__(bootstrap_servers, verify_connection, **params)

self.subscribe([topic])

@staticmethod
def _create_group_id() -> str:
return f"karapace-{secrets.token_hex(6)}"

def partitions_for_topic(self, topic: str) -> dict[int, PartitionMetadata]:
"""Returns all partition metadata for the given topic."""
try:
return self.list_topics(topic).topics[topic].partitions
except KafkaException as exc:
raise_from_kafkaexception(exc)
5 changes: 2 additions & 3 deletions karapace/kafka/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@
from __future__ import annotations

from concurrent.futures import Future
from confluent_kafka import Message, Producer
from confluent_kafka.admin import PartitionMetadata
from confluent_kafka.error import KafkaError, KafkaException
from confluent_kafka import Producer
from functools import partial
from karapace.kafka.common import _KafkaConfigMixin, raise_from_kafkaexception, translate_from_kafkaerror
from karapace.kafka.types import KafkaError, KafkaException, Message, PartitionMetadata
from typing import cast, TypedDict
from typing_extensions import Unpack

Expand Down
55 changes: 55 additions & 0 deletions karapace/kafka/types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
"""
Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""

from confluent_kafka import (
Message,
TIMESTAMP_CREATE_TIME,
TIMESTAMP_LOG_APPEND_TIME,
TIMESTAMP_NOT_AVAILABLE,
TopicPartition,
)
from confluent_kafka.admin import (
BrokerMetadata,
ClusterMetadata,
ConfigResource,
ConfigSource,
NewTopic,
OffsetSpec,
PartitionMetadata,
ResourceType,
TopicMetadata,
)
from confluent_kafka.error import KafkaError, KafkaException

import enum

__all__ = (
"BrokerMetadata",
"ClusterMetadata",
"ConfigResource",
"ConfigSource",
"ConfigSource",
"DEFAULT_REQUEST_TIMEOUT_MS",
"KafkaError",
"KafkaException",
"Message",
"NewTopic",
"OffsetSpec",
"PartitionMetadata",
"ResourceType",
"Timestamp",
"TopicMetadata",
"TopicPartition",
)

# A constant that corresponds to the default value of request.timeout.ms in
# the librdkafka C library
DEFAULT_REQUEST_TIMEOUT_MS = 30000


class Timestamp(enum.IntEnum):
NOT_AVAILABLE = TIMESTAMP_NOT_AVAILABLE
CREATE_TIME = TIMESTAMP_CREATE_TIME
LOG_APPEND_TIME = TIMESTAMP_LOG_APPEND_TIME
3 changes: 2 additions & 1 deletion karapace/kafka_rest_apis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
)
from karapace.config import Config, create_client_ssl_context
from karapace.errors import InvalidSchema
from karapace.kafka.admin import KafkaAdminClient, KafkaException
from karapace.kafka.admin import KafkaAdminClient
from karapace.kafka.types import KafkaException
from karapace.kafka_rest_apis.authentication import (
get_auth_config_from_header,
get_expiration_time_from_header,
Expand Down
Loading

0 comments on commit 981fa69

Please sign in to comment.