Skip to content

Commit

Permalink
build(RND-7271): MIgrate to confluent-kafka lib (#71)
Browse files Browse the repository at this point in the history
  • Loading branch information
Christos-Hadjinikolis authored May 28, 2024
1 parent 69a44a2 commit 22017cd
Show file tree
Hide file tree
Showing 13 changed files with 480 additions and 226 deletions.
11 changes: 0 additions & 11 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,6 @@ repos:
types: [file, yaml]
stages: [commit]

- repo: local
hooks:
- id: mypy
name: mypy
description: Static type checking
entry: mypy dynamicio/
language: system
types: [python]
stages: [commit]
pass_filenames: false

- repo: local
hooks:
- id: pytest-check
Expand Down
251 changes: 212 additions & 39 deletions dynamicio/mixins/with_kafka.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
# pylint: disable=no-member, protected-access, too-few-public-methods

"""This module provides mixins that are providing Kafka I/O support."""
# pylint: disable=no-member, protected-access, too-few-public-methods


from typing import Any, Callable, Iterable, Mapping, MutableMapping, Optional
from typing import Any, Callable, Mapping, MutableMapping, Optional

import pandas as pd # type: ignore
import simplejson
from kafka import KafkaProducer # type: ignore
from confluent_kafka import Producer
from magic_logger import logger


from dynamicio.config.pydantic import DataframeSchema, KafkaDataEnvironment
from dynamicio.mixins import utils

Expand Down Expand Up @@ -76,9 +73,174 @@ class WithKafka:
schema: DataframeSchema
options: MutableMapping[str, Any]
__kafka_config: Optional[Mapping] = None
__producer: Optional[KafkaProducer] = None
__producer: Optional[Producer] = None
__key_generator: Optional[Callable[[Any, Mapping[Any, Any]], Optional[str]]] = None
__document_transformer: Optional[Callable[[Mapping[Any, Any]], Mapping[Any, Any]]] = None
__key_serializer: Optional[Callable[[Optional[str]], Optional[bytes]]] = None
__value_serializer: Optional[Callable[[Mapping[Any, Any]], bytes]] = None

# N.B.: Please refer to https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md and update this config in case of a major release change.
VALID_CONFIG_KEYS = {
"acks",
"allow.auto.create.topics",
"api.version.fallback.ms",
"api.version.request.timeout.ms",
"api.version.request",
"auto.commit.enable",
"auto.commit.interval.ms",
"auto.offset.reset",
"background_event_cb",
"batch.size",
"batch.num.messages",
"bootstrap.servers",
"broker.address.family",
"broker.address.ttl",
"broker.version.fallback",
"builtin.features",
"check.crcs",
"client.dns.lookup",
"client.id",
"client.rack",
"closesocket_cb",
"compression.codec",
"compression.level",
"compression.type",
"connect_cb",
"connections.max.idle.ms",
"consume_cb",
"consume.callback.max.messages",
"coordinator.query.interval.ms",
"debug",
"default_topic_conf",
"delivery.report.only.error",
"dr_cb",
"dr_msg_cb",
"enable.auto.commit",
"enable.auto.offset.store",
"enable.gapless.guarantee",
"enable.idempotence",
"enable.partition.eof",
"enable.random.seed",
"enable.sasl.oauthbearer.unsecure.jwt",
"enable.ssl.certificate.verification",
"enabled_events",
"error_cb",
"fetch.error.backoff.ms",
"fetch.max.bytes",
"fetch.message.max.bytes",
"fetch.min.bytes",
"fetch.queue.backoff.ms",
"fetch.wait.max.ms",
"group.id",
"group.instance.id",
"group.protocol.type",
"group.protocol",
"group.remote.assignor",
"heartbeat.interval.ms",
"interceptors",
"internal.termination.signal",
"isolation.level",
"linger.ms",
"log_cb",
"log_level",
"log.connection.close",
"log.queue",
"log.thread.name",
"max.block.ms",
"max.in.flight.requests.per.connection",
"max.in.flight",
"max.partition.fetch.bytes",
"max.poll.interval.ms",
"max.request.size",
"message.copy.max.bytes",
"message.max.bytes",
"message.send.max.retries",
"metadata.broker.list",
"metadata.max.age.ms",
"msg_order_cmp",
"oauth_cb",
"oauthbearer_token_refresh_cb",
"offset_commit_cb",
"offset.store.method",
"offset.store.path",
"offset.store.sync.interval.ms",
"on_delivery",
"opaque",
"open_cb",
"partition.assignment.strategy",
"partitioner_cb",
"partitioner",
"plugin.library.paths",
"produce.offset.report",
"queue.buffering.backpressure.threshold",
"queue.buffering.max.kbytes",
"queue.buffering.max.messages",
"queue.buffering.max.ms",
"queued.max.messages.kbytes",
"queued.min.messages",
"rebalance_cb",
"receive.buffer.bytes",
"receive.message.max.bytes",
"reconnect.backoff.jitter.ms",
"reconnect.backoff.max.ms",
"reconnect.backoff.ms",
"request.timeout.ms",
"resolve_cb",
"retry.backoff.ms",
"sasl.kerberos.keytab",
"sasl.kerberos.kinit.cmd",
"sasl.kerberos.min.time.before.relogin",
"sasl.kerberos.principal",
"sasl.kerberos.service.name",
"sasl.mechanisms",
"sasl.oauthbearer.client.id",
"sasl.oauthbearer.client.secret",
"sasl.oauthbearer.config",
"sasl.oauthbearer.extensions",
"sasl.oauthbearer.method",
"sasl.oauthbearer.scope",
"sasl.oauthbearer.token.endpoint.url",
"sasl.password",
"sasl.username",
"security.protocol",
"send.buffer.bytes",
"session.timeout.ms",
"socket_cb",
"socket.blocking.max.ms",
"socket.connection.setup.timeout.ms",
"socket.keepalive.enable",
"socket.max.fails",
"socket.nagle.disable",
"socket.timeout.ms",
"ssl_engine_callback_data",
"ssl.ca.certificate.stores",
"ssl.ca.location",
"ssl.certificate.location",
"ssl.certificate.verify_cb",
"ssl.cipher.suites",
"ssl.crl.location",
"ssl.curves.list",
"ssl.endpoint.identification.algorithm",
"ssl.engine.id",
"ssl.engine.location",
"ssl.key.location",
"ssl.key.password",
"ssl.keystore.location",
"ssl.keystore.password",
"ssl.providers",
"ssl.sigalgs.list",
"statistics.interval.ms",
"sticky.partitioning.linger.ms",
"throttle_cb",
"topic.blacklist",
"topic.metadata.propagation.max.ms",
"topic.metadata.refresh.fast.cnt",
"topic.metadata.refresh.fast.interval.ms",
"topic.metadata.refresh.interval.ms",
"topic.metadata.refresh.sparse",
"transaction.timeout.ms",
"transactional.id",
}

def _write_to_kafka(self, df: pd.DataFrame) -> None:
"""Given a dataframe where each row is a message to be sent to a Kafka Topic, iterate through all rows and send them to a Kafka topic.
Expand All @@ -89,28 +251,39 @@ def _write_to_kafka(self, df: pd.DataFrame) -> None:
Args:
df: A dataframe where each row is a message to be sent to a Kafka Topic.
"""
self.populate_cls_attributes()

if self.__producer is None:
self.__producer = self._get_producer(self.sources_config.kafka.kafka_server, **self.options)

self._send_messages(df=df, topic=self.sources_config.kafka.kafka_topic)

def populate_cls_attributes(self):
"""Pop dynamicio options (key_generator, document_transformer, key_serializer, value_serializer) from kafka config options."""
if self.__key_generator is None:
self.__key_generator = lambda idx, __: idx # default key generator uses the dataframe's index
if self.options.get("key_generator") is not None:
self.__key_generator = self.options.pop("key_generator")

if self.__document_transformer is None:
self.__document_transformer = lambda value: value
if self.options.get("document_transformer") is not None:
self.__document_transformer = self.options.pop("document_transformer")

if self.__producer is None:
self.__producer = self._get_producer(self.sources_config.kafka.kafka_server, **self.options)

self._send_messages(df=df, topic=self.sources_config.kafka.kafka_topic)

@utils.allow_options(KafkaProducer.DEFAULT_CONFIG.keys())
def _get_producer(self, server: str, **options: MutableMapping[str, Any]) -> KafkaProducer:
if self.__key_serializer is None:
self.__key_serializer = self._default_key_serializer
if self.options.get("key_serializer") is not None:
self.__key_serializer = self.options.pop("key_serializer")
if self.__value_serializer is None:
self.__value_serializer = self._default_value_serializer
if self.options.get("value_serializer") is not None:
self.__value_serializer = self.options.pop("value_serializer")

@utils.allow_options(VALID_CONFIG_KEYS)
def _get_producer(self, server: str, **options: MutableMapping[str, Any]) -> Producer:
"""Generate and return a Kafka Producer.
Default options are used to generate the producer. Specifically:
- `bootstrap_servers`: Passed on through the source_config
- `value_serializer`: Uses a default_value_serializer defined in this mixin
- `bootstrap.servers`: Passed on through the source_config
- `compression.type`: Uses snappy compression
More options can be added to the producer by passing them as keyword arguments, through valid options.
Expand All @@ -124,39 +297,39 @@ def _get_producer(self, server: str, **options: MutableMapping[str, Any]) -> Kaf
A Kafka producer instance.
"""
self.__kafka_config = {
**{
"bootstrap_servers": server,
"compression_type": "snappy",
"key_serializer": self._default_key_serializer,
"value_serializer": self._default_value_serializer,
},
"bootstrap.servers": server,
"compression.type": "snappy",
**options,
}
return KafkaProducer(**self.__kafka_config)
return Producer(**self.__kafka_config)

def _send_messages(self, df: pd.DataFrame, topic: str) -> None:
logger.info(f"Sending {len(df)} messages to Kafka topic:{topic}.")
logger.info(f"Sending {len(df)} messages to Kafka topic: {topic}.")

messages = df.reset_index(drop=True).to_dict("records")

for idx, message in zip(df.index.values, messages):
self.__producer.send(topic, key=self.__key_generator(idx, message), value=self.__document_transformer(message)) # type: ignore
key = self.__key_generator(idx, message)
transformed_message = self.__document_transformer(message)
serialized_key = self.__key_serializer(key)
serialized_value = self.__value_serializer(transformed_message)

self.__producer.produce(topic=topic, key=serialized_key, value=serialized_value, on_delivery=self._on_delivery)

self.__producer.flush() # type: ignore
self.__producer.flush()

@staticmethod
def _default_key_serializer(key: Optional[str]) -> Optional[bytes]:
if key:
return key.encode("utf-8")
def _on_delivery(err, msg):
"""Callback for message delivery."""
if err is not None:
raise Exception(f"Message delivery failed: {err}, for message: {msg}")

@staticmethod
def _default_key_serializer(key: Optional[Any]) -> Optional[bytes]:
if key is not None:
return str(key).encode("utf-8")
return None

@staticmethod
def _default_value_serializer(value: Mapping) -> bytes:
return simplejson.dumps(value, ignore_nan=True).encode("utf-8")

def _read_from_kafka(self) -> Iterable[Mapping]: # type: ignore
"""Read messages from a Kafka Topic and convert them to separate dataframes.
Returns:
Multiple dataframes, one per message read from the Kafka topic of interest.
"""
# TODO: Implement kafka reader
1 change: 0 additions & 1 deletion requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ flake8-tidy-imports>=4.8.0
flake8==5.0.4
gitlint==0.17.0
mock==4.0.3
mypy==0.990
pandas-stubs==2.0.3.230814
pre-commit==2.20.0
pydocstyle==6.1.1
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ awscli>=1.22.24
boto3>=1.20.24
fastparquet>=0.8.0
fsspec==2022.3.0
kafka-python~=2.0.2
confluent-kafka~=2.4.0
logzero>=1.7.0
magic-logger>=1.0.2
no_implicit_optional==1.4.0
Expand Down
10 changes: 8 additions & 2 deletions tests/mocking/io.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
# pylint: disable=missing-class-docstring, missing-module-docstring, missing-function-docstring
from typing import Callable, Optional
from unittest.mock import MagicMock

from dynamicio import UnifiedIO
from dynamicio.core import SCHEMA_FROM_FILE
Expand Down Expand Up @@ -117,11 +119,15 @@ class WriteKeyedKafkaIO(UnifiedIO):


class MockKafkaProducer:
def __init__(self):
def __init__(self, **kwargs):
self.my_stream = []
self.config = kwargs # Store the config to ensure it can accept any options
self.produce_call_count = 0

def send(self, topic: str, value: dict, key: str = None): # pylint: disable=unused-argument
def produce(self, topic: str, key: Optional[bytes], value: bytes, on_delivery: Callable): # pylint: disable=unused-argument
self.my_stream.append({"key": key, "value": value})
on_delivery(None, MagicMock())
self.produce_call_count += 1

def flush(self):
pass
Expand Down
14 changes: 14 additions & 0 deletions tests/resources/definitions/input.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -293,3 +293,17 @@ WRITE_TO_S3_PATH_PREFIX_PARQUET:
bucket: "[[ MOCK_BUCKET ]]"
path_prefix: "[[ MOCK_PREFIX ]]"
file_type: "parquet"

PRODUCTS:
LOCAL:
type: "local"
local:
file_path: "[[ TEST_RESOURCES ]]/data/input/some_csv_to_read.csv"
file_type: "csv"
schema:
name: products
columns:
id:
type: "object"
validations: {}
metrics: []
Loading

0 comments on commit 22017cd

Please sign in to comment.