Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace sync Kafka consumers with confluent_kafka one #794

Merged
merged 2 commits into from
Jan 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 14 additions & 12 deletions karapace/backup/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,26 @@
from .poll_timeout import PollTimeout
from .topic_configurations import ConfigSource, get_topic_configurations
from concurrent.futures import Future
from confluent_kafka import Message, TopicPartition
from enum import Enum
from functools import partial
from kafka import KafkaConsumer
from kafka.consumer.fetcher import ConsumerRecord
from kafka.errors import KafkaError, TopicAlreadyExistsError
from kafka.structs import TopicPartition
from karapace import constants
from karapace.backup.backends.v1 import SchemaBackupV1Reader
from karapace.backup.backends.v2 import AnonymizeAvroWriter, SchemaBackupV2Reader, SchemaBackupV2Writer, V2_MARKER
from karapace.backup.backends.v3.backend import SchemaBackupV3Reader, SchemaBackupV3Writer, VerifyFailure, VerifySuccess
from karapace.config import Config
from karapace.kafka.admin import KafkaAdminClient
from karapace.kafka.common import translate_from_kafkaerror
from karapace.kafka.consumer import KafkaConsumer
from karapace.kafka.producer import KafkaProducer
from karapace.kafka_utils import kafka_admin_from_config, kafka_consumer_from_config, kafka_producer_from_config
from karapace.key_format import KeyFormatter
from karapace.utils import assert_never
from pathlib import Path
from rich.console import Console
from tenacity import retry, retry_if_exception_type, RetryCallState, stop_after_delay, wait_fixed
from typing import Callable, Collection, Iterator, Literal, Mapping, NewType, Sized, TypeVar
from typing import Callable, Iterator, Literal, Mapping, NewType, Sized, TypeVar

import contextlib
import datetime
Expand Down Expand Up @@ -282,9 +282,8 @@ def _consume_records(
consumer: KafkaConsumer,
topic_partition: TopicPartition,
poll_timeout: PollTimeout,
) -> Iterator[ConsumerRecord]:
start_offset: int = consumer.beginning_offsets([topic_partition])[topic_partition]
end_offset: int = consumer.end_offsets([topic_partition])[topic_partition]
) -> Iterator[Message]:
start_offset, end_offset = consumer.get_watermark_offsets(topic_partition)
last_offset = start_offset

LOG.info(
Expand All @@ -301,12 +300,15 @@ def _consume_records(
end_offset -= 1 # high watermark to actual end offset

while True:
records: Collection[ConsumerRecord] = consumer.poll(poll_timeout.milliseconds).get(topic_partition, [])
if len(records) == 0:
record: Message | None = consumer.poll(timeout=poll_timeout.seconds)
if record is None:
raise StaleConsumerError(topic_partition, start_offset, end_offset, last_offset, poll_timeout)
for record in records:
yield record
last_offset = record.offset # pylint: disable=undefined-loop-variable
error = record.error()
if error is not None:
raise translate_from_kafkaerror(error)

yield record
last_offset = record.offset()
if last_offset >= end_offset:
break

Expand Down
22 changes: 13 additions & 9 deletions karapace/backup/backends/v3/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
from .readers import read_metadata, read_records
from .schema import ChecksumAlgorithm, DataFile, Header, Metadata, Record
from .writers import write_metadata, write_record
from confluent_kafka import Message
from dataclasses import dataclass
from kafka.consumer.fetcher import ConsumerRecord
from karapace.backup.backends.reader import BaseBackupReader, Instruction, ProducerSend, RestoreTopic
from karapace.backup.backends.writer import BytesBackupWriter, StdOut
from karapace.backup.safe_writer import bytes_writer, staging_directory
Expand Down Expand Up @@ -334,27 +334,31 @@ def store_metadata(
def store_record(
self,
buffer: IO[bytes],
record: ConsumerRecord,
record: Message,
) -> None:
stats: Final = self._partition_stats[record.partition]
stats: Final = self._partition_stats[record.partition()]
checksum_checkpoint: Final = stats.get_checkpoint(
records_threshold=self._max_records_per_checkpoint,
bytes_threshold=self._max_bytes_per_checkpoint,
)
offset_start: Final = buffer.tell()

record_key = record.key()
record_value = record.value()

write_record(
buffer,
record=Record(
key=record.key,
value=record.value,
headers=tuple(Header(key=key.encode(), value=value) for key, value in record.headers),
offset=record.offset,
timestamp=record.timestamp,
key=record_key.encode() if isinstance(record_key, str) else record_key,
value=record_value.encode() if isinstance(record_value, str) else record_value,
headers=tuple(Header(key=key.encode(), value=value) for key, value in record.headers() or []),
offset=record.offset(),
timestamp=record.timestamp()[1],
checksum_checkpoint=checksum_checkpoint,
),
running_checksum=stats.running_checksum,
)
stats.update(
bytes_offset=buffer.tell() - offset_start,
record_offset=record.offset,
record_offset=record.offset(),
)
15 changes: 11 additions & 4 deletions karapace/backup/backends/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"""
from __future__ import annotations

from kafka.consumer.fetcher import ConsumerRecord
from confluent_kafka import Message
from karapace.backup.safe_writer import bytes_writer, str_writer
from pathlib import Path
from typing import ContextManager, Generic, IO, Iterator, Literal, Mapping, Sequence, TypeVar
Expand Down Expand Up @@ -98,7 +98,7 @@ def store_metadata(
def store_record(
self,
buffer: IO[B],
record: ConsumerRecord,
record: Message,
) -> None:
"""
Called in order for each record read from a topic to be backed up. It's safe to
Expand Down Expand Up @@ -154,9 +154,16 @@ class BaseKVBackupWriter(StrBackupWriter, abc.ABC):
def store_record(
self,
buffer: IO[str],
record: ConsumerRecord,
record: Message,
) -> None:
buffer.write(self.serialize_record(record.key, record.value))
record_key = record.key()
record_value = record.value()
buffer.write(
self.serialize_record(
record_key.encode() if isinstance(record_key, str) else record_key,
record_value.encode() if isinstance(record_value, str) else record_value,
)
)

@staticmethod
@abc.abstractmethod
Expand Down
9 changes: 9 additions & 0 deletions karapace/backup/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from . import api
from .errors import BackupDataRestorationError, StaleConsumerError
from .poll_timeout import PollTimeout
from kafka.errors import BrokerResponseError
from karapace.backup.api import VerifyLevel
from karapace.config import Config, read_config
from typing import Iterator
Expand Down Expand Up @@ -163,6 +164,14 @@ def main() -> None:
"Try increasing --poll-timeout to give the broker more time.",
)
raise SystemExit(1) from e
except BrokerResponseError as exc:
logger.exception(
"An unexpected Kafka error occurred during consuming messages with error code %s: %s - %s",
exc.errno,
exc.message,
exc.description,
)
raise SystemExit(1) from exc


if __name__ == "__main__":
Expand Down
2 changes: 1 addition & 1 deletion karapace/backup/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""
from kafka.structs import TopicPartition
from confluent_kafka import TopicPartition
from karapace.backup.poll_timeout import PollTimeout

__all__ = ["BackupError", "BackupTopicAlreadyExists", "EmptyPartition", "PartitionCountError", "StaleConsumerError"]
Expand Down
5 changes: 5 additions & 0 deletions karapace/backup/poll_timeout.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,8 @@ def __repr__(self) -> str:
def milliseconds(self) -> int:
"""Returns this poll timeout in milliseconds, anything smaller than a milliseconds is ignored (no rounding)."""
return self.__value // timedelta(milliseconds=1)

@cached_property
def seconds(self) -> float:
"""Returns this poll timeout in seconds."""
return self.__value / timedelta(seconds=1)
3 changes: 2 additions & 1 deletion karapace/backup/topic_configurations.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
"""
from __future__ import annotations

from karapace.kafka.admin import ConfigSource, KafkaAdminClient
from confluent_kafka.admin import ConfigSource
from karapace.kafka.admin import KafkaAdminClient
from typing import Container, Final

ALL_CONFIG_SOURCES: Final = ConfigSource
Expand Down
14 changes: 13 additions & 1 deletion karapace/kafka/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from concurrent.futures import Future
from confluent_kafka.error import KafkaError, KafkaException
from kafka.errors import AuthenticationFailedError, for_code, NoBrokersAvailable, UnknownTopicOrPartitionError
from typing import Any, Callable, NoReturn, Protocol, TypedDict, TypeVar
from typing import Any, Callable, Literal, NoReturn, Protocol, TypedDict, TypeVar
from typing_extensions import Unpack

import logging
Expand Down Expand Up @@ -85,6 +85,12 @@ class KafkaClientParams(TypedDict, total=False):
ssl_certfile: str | None
ssl_keyfile: str | None
sasl_oauth_token_provider: TokenWithExpiryProvider
# Consumer-only
auto_offset_reset: Literal["smallest", "earliest", "beginning", "largest", "latest", "end", "error"]
enable_auto_commit: bool
fetch_max_wait_ms: int
group_id: str
session_timeout_ms: int


class _KafkaConfigMixin:
Expand Down Expand Up @@ -128,6 +134,12 @@ def _get_config_from_params(self, bootstrap_servers: Iterable[str] | str, **para
"ssl.certificate.location": params.get("ssl_certfile"),
"ssl.key.location": params.get("ssl_keyfile"),
"error_cb": self._error_callback,
# Consumer-only
"auto.offset.reset": params.get("auto_offset_reset"),
"enable.auto.commit": params.get("enable_auto_commit"),
"fetch.wait.max.ms": params.get("fetch_max_wait_ms"),
"group.id": params.get("group_id"),
"session.timeout.ms": params.get("session_timeout_ms"),
}
config = {key: value for key, value in config.items() if value is not None}

Expand Down
68 changes: 68 additions & 0 deletions karapace/kafka/consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
"""
Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""

from __future__ import annotations

from confluent_kafka import Consumer, TopicPartition
from confluent_kafka.admin import PartitionMetadata
from confluent_kafka.error import KafkaException
from kafka.errors import KafkaTimeoutError
from karapace.kafka.common import _KafkaConfigMixin, KafkaClientParams, raise_from_kafkaexception
from typing import Iterable
from typing_extensions import Unpack

import secrets


class KafkaConsumer(_KafkaConfigMixin, Consumer):
def __init__(
self,
topic: str,
bootstrap_servers: Iterable[str] | str,
verify_connection: bool = True,
**params: Unpack[KafkaClientParams],
) -> None:
# The `confluent_kafka.Consumer` does not allow for a missing group id
# if the client of this class does not provide one, we'll generate a
# unique group id to achieve the groupless behaviour
if "group_id" not in params:
params["group_id"] = self._create_group_id()

super().__init__(bootstrap_servers, verify_connection, **params)

self.subscribe([topic])

@staticmethod
def _create_group_id() -> str:
return f"karapace-autogenerated-{secrets.token_hex(6)}"

def partitions_for_topic(self, topic: str) -> dict[int, PartitionMetadata]:
"""Returns all partition metadata for the given topic."""
try:
return self.list_topics(topic).topics[topic].partitions
except KafkaException as exc:
raise_from_kafkaexception(exc)

def get_watermark_offsets(
self, partition: TopicPartition, timeout: float | None = None, cached: bool = False
) -> tuple[int, int]:
"""Wrapper around `Consumer.get_watermark_offsets` to handle error cases and exceptions.

confluent-kafka is somewhat inconsistent with error-related behaviours,
`get_watermark_offsets` returns `None` on timeouts, so we are translating it to an
exception.
"""
try:
if timeout is not None:
result = super().get_watermark_offsets(partition, timeout, cached)
else:
result = super().get_watermark_offsets(partition, cached=cached)

if result is None:
raise KafkaTimeoutError()

return result
except KafkaException as exc:
raise_from_kafkaexception(exc)
19 changes: 19 additions & 0 deletions karapace/kafka/types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
"""
Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""

from confluent_kafka import TIMESTAMP_CREATE_TIME, TIMESTAMP_LOG_APPEND_TIME, TIMESTAMP_NOT_AVAILABLE
from typing import Final

import enum

# A constant that corresponds to the default value of request.timeout.ms in
# the librdkafka C library
DEFAULT_REQUEST_TIMEOUT_MS: Final = 30000


class Timestamp(enum.IntEnum):
NOT_AVAILABLE = TIMESTAMP_NOT_AVAILABLE
CREATE_TIME = TIMESTAMP_CREATE_TIME
LOG_APPEND_TIME = TIMESTAMP_LOG_APPEND_TIME
3 changes: 2 additions & 1 deletion karapace/kafka_rest_apis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from aiokafka.errors import KafkaConnectionError
from binascii import Error as B64DecodeError
from collections import namedtuple
from confluent_kafka.error import KafkaException
from contextlib import AsyncExitStack
from http import HTTPStatus
from kafka.errors import (
Expand All @@ -14,7 +15,7 @@
)
from karapace.config import Config, create_client_ssl_context
from karapace.errors import InvalidSchema
from karapace.kafka.admin import KafkaAdminClient, KafkaException
from karapace.kafka.admin import KafkaAdminClient
from karapace.kafka_rest_apis.authentication import (
get_auth_config_from_header,
get_expiration_time_from_header,
Expand Down
4 changes: 1 addition & 3 deletions karapace/kafka_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@
See LICENSE for details
"""
from .config import Config
from .utils import KarapaceKafkaClient
from kafka import KafkaConsumer
from karapace.kafka.admin import KafkaAdminClient
from karapace.kafka.consumer import KafkaConsumer
from karapace.kafka.producer import KafkaProducer
from typing import Iterator

Expand Down Expand Up @@ -42,7 +41,6 @@ def kafka_consumer_from_config(config: Config, topic: str) -> Iterator[KafkaCons
sasl_plain_password=config["sasl_plain_password"],
auto_offset_reset="earliest",
metadata_max_age_ms=config["metadata_max_age_ms"],
kafka_client=KarapaceKafkaClient,
)
try:
yield consumer
Expand Down
4 changes: 2 additions & 2 deletions karapace/master_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
See LICENSE for details
"""
from dataclasses import dataclass
from kafka import KafkaConsumer
from kafka.coordinator.base import BaseCoordinator
from kafka.errors import NoBrokersAvailable, NodeNotReadyError
from kafka.metrics import MetricConfig, Metrics
from karapace import constants
from karapace.config import Config
from karapace.kafka.types import DEFAULT_REQUEST_TIMEOUT_MS
from karapace.typing import JsonData, JsonObject
from karapace.utils import json_decode, json_encode, KarapaceKafkaClient
from karapace.version import __version__
Expand Down Expand Up @@ -238,7 +238,7 @@ def init_schema_coordinator(self) -> None:
election_strategy=self.config.get("master_election_strategy", "lowest"),
group_id=self.config["group_id"],
session_timeout_ms=session_timeout_ms,
request_timeout_ms=max(session_timeout_ms, KafkaConsumer.DEFAULT_CONFIG["request_timeout_ms"]),
request_timeout_ms=max(session_timeout_ms, DEFAULT_REQUEST_TIMEOUT_MS),
)
self.schema_coordinator_ready.set()

Expand Down
Loading