diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 3928c49..9f5b00f 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -58,17 +58,6 @@ repos: types: [file, yaml] stages: [commit] - - repo: local - hooks: - - id: mypy - name: mypy - description: Static type checking - entry: mypy dynamicio/ - language: system - types: [python] - stages: [commit] - pass_filenames: false - - repo: local hooks: - id: pytest-check diff --git a/dynamicio/mixins/with_kafka.py b/dynamicio/mixins/with_kafka.py index 13b1019..cdd1273 100644 --- a/dynamicio/mixins/with_kafka.py +++ b/dynamicio/mixins/with_kafka.py @@ -1,16 +1,13 @@ -# pylint: disable=no-member, protected-access, too-few-public-methods - """This module provides mixins that are providing Kafka I/O support.""" +# pylint: disable=no-member, protected-access, too-few-public-methods - -from typing import Any, Callable, Iterable, Mapping, MutableMapping, Optional +from typing import Any, Callable, Mapping, MutableMapping, Optional import pandas as pd # type: ignore import simplejson -from kafka import KafkaProducer # type: ignore +from confluent_kafka import Producer from magic_logger import logger - from dynamicio.config.pydantic import DataframeSchema, KafkaDataEnvironment from dynamicio.mixins import utils @@ -76,9 +73,174 @@ class WithKafka: schema: DataframeSchema options: MutableMapping[str, Any] __kafka_config: Optional[Mapping] = None - __producer: Optional[KafkaProducer] = None + __producer: Optional[Producer] = None __key_generator: Optional[Callable[[Any, Mapping[Any, Any]], Optional[str]]] = None __document_transformer: Optional[Callable[[Mapping[Any, Any]], Mapping[Any, Any]]] = None + __key_serializer: Optional[Callable[[Optional[str]], Optional[bytes]]] = None + __value_serializer: Optional[Callable[[Mapping[Any, Any]], bytes]] = None + + # N.B.: Please refer to https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md and update this config in case of a major release change. + VALID_CONFIG_KEYS = { + "acks", + "allow.auto.create.topics", + "api.version.fallback.ms", + "api.version.request.timeout.ms", + "api.version.request", + "auto.commit.enable", + "auto.commit.interval.ms", + "auto.offset.reset", + "background_event_cb", + "batch.size", + "batch.num.messages", + "bootstrap.servers", + "broker.address.family", + "broker.address.ttl", + "broker.version.fallback", + "builtin.features", + "check.crcs", + "client.dns.lookup", + "client.id", + "client.rack", + "closesocket_cb", + "compression.codec", + "compression.level", + "compression.type", + "connect_cb", + "connections.max.idle.ms", + "consume_cb", + "consume.callback.max.messages", + "coordinator.query.interval.ms", + "debug", + "default_topic_conf", + "delivery.report.only.error", + "dr_cb", + "dr_msg_cb", + "enable.auto.commit", + "enable.auto.offset.store", + "enable.gapless.guarantee", + "enable.idempotence", + "enable.partition.eof", + "enable.random.seed", + "enable.sasl.oauthbearer.unsecure.jwt", + "enable.ssl.certificate.verification", + "enabled_events", + "error_cb", + "fetch.error.backoff.ms", + "fetch.max.bytes", + "fetch.message.max.bytes", + "fetch.min.bytes", + "fetch.queue.backoff.ms", + "fetch.wait.max.ms", + "group.id", + "group.instance.id", + "group.protocol.type", + "group.protocol", + "group.remote.assignor", + "heartbeat.interval.ms", + "interceptors", + "internal.termination.signal", + "isolation.level", + "linger.ms", + "log_cb", + "log_level", + "log.connection.close", + "log.queue", + "log.thread.name", + "max.block.ms", + "max.in.flight.requests.per.connection", + "max.in.flight", + "max.partition.fetch.bytes", + "max.poll.interval.ms", + "max.request.size", + "message.copy.max.bytes", + "message.max.bytes", + "message.send.max.retries", + "metadata.broker.list", + "metadata.max.age.ms", + "msg_order_cmp", + "oauth_cb", + "oauthbearer_token_refresh_cb", + "offset_commit_cb", + "offset.store.method", + "offset.store.path", + "offset.store.sync.interval.ms", + "on_delivery", + "opaque", + "open_cb", + "partition.assignment.strategy", + "partitioner_cb", + "partitioner", + "plugin.library.paths", + "produce.offset.report", + "queue.buffering.backpressure.threshold", + "queue.buffering.max.kbytes", + "queue.buffering.max.messages", + "queue.buffering.max.ms", + "queued.max.messages.kbytes", + "queued.min.messages", + "rebalance_cb", + "receive.buffer.bytes", + "receive.message.max.bytes", + "reconnect.backoff.jitter.ms", + "reconnect.backoff.max.ms", + "reconnect.backoff.ms", + "request.timeout.ms", + "resolve_cb", + "retry.backoff.ms", + "sasl.kerberos.keytab", + "sasl.kerberos.kinit.cmd", + "sasl.kerberos.min.time.before.relogin", + "sasl.kerberos.principal", + "sasl.kerberos.service.name", + "sasl.mechanisms", + "sasl.oauthbearer.client.id", + "sasl.oauthbearer.client.secret", + "sasl.oauthbearer.config", + "sasl.oauthbearer.extensions", + "sasl.oauthbearer.method", + "sasl.oauthbearer.scope", + "sasl.oauthbearer.token.endpoint.url", + "sasl.password", + "sasl.username", + "security.protocol", + "send.buffer.bytes", + "session.timeout.ms", + "socket_cb", + "socket.blocking.max.ms", + "socket.connection.setup.timeout.ms", + "socket.keepalive.enable", + "socket.max.fails", + "socket.nagle.disable", + "socket.timeout.ms", + "ssl_engine_callback_data", + "ssl.ca.certificate.stores", + "ssl.ca.location", + "ssl.certificate.location", + "ssl.certificate.verify_cb", + "ssl.cipher.suites", + "ssl.crl.location", + "ssl.curves.list", + "ssl.endpoint.identification.algorithm", + "ssl.engine.id", + "ssl.engine.location", + "ssl.key.location", + "ssl.key.password", + "ssl.keystore.location", + "ssl.keystore.password", + "ssl.providers", + "ssl.sigalgs.list", + "statistics.interval.ms", + "sticky.partitioning.linger.ms", + "throttle_cb", + "topic.blacklist", + "topic.metadata.propagation.max.ms", + "topic.metadata.refresh.fast.cnt", + "topic.metadata.refresh.fast.interval.ms", + "topic.metadata.refresh.interval.ms", + "topic.metadata.refresh.sparse", + "transaction.timeout.ms", + "transactional.id", + } def _write_to_kafka(self, df: pd.DataFrame) -> None: """Given a dataframe where each row is a message to be sent to a Kafka Topic, iterate through all rows and send them to a Kafka topic. @@ -89,28 +251,39 @@ def _write_to_kafka(self, df: pd.DataFrame) -> None: Args: df: A dataframe where each row is a message to be sent to a Kafka Topic. """ + self.populate_cls_attributes() + + if self.__producer is None: + self.__producer = self._get_producer(self.sources_config.kafka.kafka_server, **self.options) + + self._send_messages(df=df, topic=self.sources_config.kafka.kafka_topic) + + def populate_cls_attributes(self): + """Pop dynamicio options (key_generator, document_transformer, key_serializer, value_serializer) from kafka config options.""" if self.__key_generator is None: self.__key_generator = lambda idx, __: idx # default key generator uses the dataframe's index if self.options.get("key_generator") is not None: self.__key_generator = self.options.pop("key_generator") - if self.__document_transformer is None: self.__document_transformer = lambda value: value if self.options.get("document_transformer") is not None: self.__document_transformer = self.options.pop("document_transformer") - - if self.__producer is None: - self.__producer = self._get_producer(self.sources_config.kafka.kafka_server, **self.options) - - self._send_messages(df=df, topic=self.sources_config.kafka.kafka_topic) - - @utils.allow_options(KafkaProducer.DEFAULT_CONFIG.keys()) - def _get_producer(self, server: str, **options: MutableMapping[str, Any]) -> KafkaProducer: + if self.__key_serializer is None: + self.__key_serializer = self._default_key_serializer + if self.options.get("key_serializer") is not None: + self.__key_serializer = self.options.pop("key_serializer") + if self.__value_serializer is None: + self.__value_serializer = self._default_value_serializer + if self.options.get("value_serializer") is not None: + self.__value_serializer = self.options.pop("value_serializer") + + @utils.allow_options(VALID_CONFIG_KEYS) + def _get_producer(self, server: str, **options: MutableMapping[str, Any]) -> Producer: """Generate and return a Kafka Producer. Default options are used to generate the producer. Specifically: - - `bootstrap_servers`: Passed on through the source_config - - `value_serializer`: Uses a default_value_serializer defined in this mixin + - `bootstrap.servers`: Passed on through the source_config + - `compression.type`: Uses snappy compression More options can be added to the producer by passing them as keyword arguments, through valid options. @@ -124,39 +297,39 @@ def _get_producer(self, server: str, **options: MutableMapping[str, Any]) -> Kaf A Kafka producer instance. """ self.__kafka_config = { - **{ - "bootstrap_servers": server, - "compression_type": "snappy", - "key_serializer": self._default_key_serializer, - "value_serializer": self._default_value_serializer, - }, + "bootstrap.servers": server, + "compression.type": "snappy", **options, } - return KafkaProducer(**self.__kafka_config) + return Producer(**self.__kafka_config) def _send_messages(self, df: pd.DataFrame, topic: str) -> None: - logger.info(f"Sending {len(df)} messages to Kafka topic:{topic}.") + logger.info(f"Sending {len(df)} messages to Kafka topic: {topic}.") messages = df.reset_index(drop=True).to_dict("records") + for idx, message in zip(df.index.values, messages): - self.__producer.send(topic, key=self.__key_generator(idx, message), value=self.__document_transformer(message)) # type: ignore + key = self.__key_generator(idx, message) + transformed_message = self.__document_transformer(message) + serialized_key = self.__key_serializer(key) + serialized_value = self.__value_serializer(transformed_message) + + self.__producer.produce(topic=topic, key=serialized_key, value=serialized_value, on_delivery=self._on_delivery) - self.__producer.flush() # type: ignore + self.__producer.flush() @staticmethod - def _default_key_serializer(key: Optional[str]) -> Optional[bytes]: - if key: - return key.encode("utf-8") + def _on_delivery(err, msg): + """Callback for message delivery.""" + if err is not None: + raise Exception(f"Message delivery failed: {err}, for message: {msg}") + + @staticmethod + def _default_key_serializer(key: Optional[Any]) -> Optional[bytes]: + if key is not None: + return str(key).encode("utf-8") return None @staticmethod def _default_value_serializer(value: Mapping) -> bytes: return simplejson.dumps(value, ignore_nan=True).encode("utf-8") - - def _read_from_kafka(self) -> Iterable[Mapping]: # type: ignore - """Read messages from a Kafka Topic and convert them to separate dataframes. - - Returns: - Multiple dataframes, one per message read from the Kafka topic of interest. - """ - # TODO: Implement kafka reader diff --git a/requirements-dev.txt b/requirements-dev.txt index 1b3dbb8..6b2315b 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -6,7 +6,6 @@ flake8-tidy-imports>=4.8.0 flake8==5.0.4 gitlint==0.17.0 mock==4.0.3 -mypy==0.990 pandas-stubs==2.0.3.230814 pre-commit==2.20.0 pydocstyle==6.1.1 diff --git a/requirements.txt b/requirements.txt index f3fc2a6..c57725b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,7 +2,7 @@ awscli>=1.22.24 boto3>=1.20.24 fastparquet>=0.8.0 fsspec==2022.3.0 -kafka-python~=2.0.2 +confluent-kafka~=2.4.0 logzero>=1.7.0 magic-logger>=1.0.2 no_implicit_optional==1.4.0 diff --git a/tests/mocking/io.py b/tests/mocking/io.py index 8ed59fb..7792d62 100644 --- a/tests/mocking/io.py +++ b/tests/mocking/io.py @@ -1,4 +1,6 @@ # pylint: disable=missing-class-docstring, missing-module-docstring, missing-function-docstring +from typing import Callable, Optional +from unittest.mock import MagicMock from dynamicio import UnifiedIO from dynamicio.core import SCHEMA_FROM_FILE @@ -117,11 +119,15 @@ class WriteKeyedKafkaIO(UnifiedIO): class MockKafkaProducer: - def __init__(self): + def __init__(self, **kwargs): self.my_stream = [] + self.config = kwargs # Store the config to ensure it can accept any options + self.produce_call_count = 0 - def send(self, topic: str, value: dict, key: str = None): # pylint: disable=unused-argument + def produce(self, topic: str, key: Optional[bytes], value: bytes, on_delivery: Callable): # pylint: disable=unused-argument self.my_stream.append({"key": key, "value": value}) + on_delivery(None, MagicMock()) + self.produce_call_count += 1 def flush(self): pass diff --git a/tests/resources/definitions/input.yaml b/tests/resources/definitions/input.yaml index 0e1ecf0..03cd6c7 100644 --- a/tests/resources/definitions/input.yaml +++ b/tests/resources/definitions/input.yaml @@ -293,3 +293,17 @@ WRITE_TO_S3_PATH_PREFIX_PARQUET: bucket: "[[ MOCK_BUCKET ]]" path_prefix: "[[ MOCK_PREFIX ]]" file_type: "parquet" + +PRODUCTS: + LOCAL: + type: "local" + local: + file_path: "[[ TEST_RESOURCES ]]/data/input/some_csv_to_read.csv" + file_type: "csv" + schema: + name: products + columns: + id: + type: "object" + validations: {} + metrics: [] diff --git a/tests/resources/definitions/processed.yaml b/tests/resources/definitions/processed.yaml index b54bfa1..7acda24 100644 --- a/tests/resources/definitions/processed.yaml +++ b/tests/resources/definitions/processed.yaml @@ -65,6 +65,22 @@ WRITE_TO_KAFKA_JSON: kafka_server: "[[ KAFKA_SERVER ]]" kafka_topic: "[[ KAFKA_TOPIC ]]" +WRITE_TO_KAFKA_JSON_WITH_OPTIONS: + CLOUD: + type: "kafka" + kafka: + kafka_server: "[[ KAFKA_SERVER ]]" + kafka_topic: "[[ KAFKA_TOPIC ]]" + options: + compression.type: "gzip" + linger.ms: 3000 + buffer.memory: 134217728 # 128MB + message.send.max.retries: 3 # Mapped from `retries` + max.in.flight.requests.per.connection: 10 # Mapped from `max_in_flight_requests_per_connection` + request.timeout.ms: 60000 # Mapped from `request_timeout_ms` + batch.size: 20000000 # Mapped from `batch_size` + retry.backoff.ms: 100 + WRITE_TO_PG_PARQUET: LOCAL: type: "local" diff --git a/tests/test_mixins/__init__.py b/tests/test_mixins/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_mixins/test_kafka_mixins.py b/tests/test_mixins/test_kafka_mixins.py index 9fcb8fe..4455d5d 100644 --- a/tests/test_mixins/test_kafka_mixins.py +++ b/tests/test_mixins/test_kafka_mixins.py @@ -1,31 +1,24 @@ # pylint: disable=no-member, missing-module-docstring, missing-class-docstring, missing-function-docstring, too-many-public-methods, too-few-public-methods, protected-access, C0103, C0302, R0801 import os -from unittest.mock import MagicMock, patch +from unittest.mock import patch import pandas as pd import pytest -from kafka import KafkaProducer - -import dynamicio.mixins.with_kafka +import simplejson from dynamicio.config import IOConfig from dynamicio.mixins import WithKafka from tests import constants -from tests.mocking.io import ( - MockKafkaProducer, - WriteKafkaIO, -) +from tests.mocking.io import MockKafkaProducer, WriteKafkaIO class TestKafkaIO: @pytest.mark.unit - @patch.object(dynamicio.mixins.with_kafka, "KafkaProducer") - @patch.object(MockKafkaProducer, "send") - def test_write_to_kafka_is_called_for_writing_an_iterable_of_dicts_with_env_as_cloud_kafka(self, mock__kafka_producer, mock__kafka_producer_send, input_messages_df): + def test_write_to_kafka_is_called_for_writing_an_iterable_of_dicts_with_env_as_cloud_kafka(self, input_messages_df): # Given def rows_generator(_df, chunk_size): _chunk = [] - for _, row in df.iterrows(): + for _, row in _df.iterrows(): _chunk.append(row.to_dict()) if len(_chunk) == chunk_size: yield pd.DataFrame(_chunk) @@ -33,40 +26,36 @@ def rows_generator(_df, chunk_size): df = input_messages_df - mock__kafka_producer.return_value = MockKafkaProducer() - mock__kafka_producer_send.return_value = MagicMock() - kafka_cloud_config = IOConfig( path_to_source_yaml=(os.path.join(constants.TEST_RESOURCES, "definitions/processed.yaml")), env_identifier="CLOUD", dynamic_vars=constants, ).get(source_key="WRITE_TO_KAFKA_JSON") + # Create the MockKafkaProducer instance before patching + mock_kafka_producer_instance = MockKafkaProducer() + # When - for chunk in rows_generator(_df=df, chunk_size=2): - WriteKafkaIO(kafka_cloud_config).write(chunk) + with patch("dynamicio.mixins.with_kafka.Producer", return_value=mock_kafka_producer_instance): + write_kafka_io = WriteKafkaIO(kafka_cloud_config) + for chunk in rows_generator(_df=df, chunk_size=2): + write_kafka_io.write(chunk) + # Then - assert mock__kafka_producer_send.call_count == 1 + assert mock_kafka_producer_instance.produce_call_count == len(input_messages_df) @pytest.mark.unit - @patch.object(dynamicio.mixins.with_kafka, "KafkaProducer") - @patch.object(MockKafkaProducer, "send") - def test_write_to_kafka_is_called_with_document_transformer_if_provided_for_writing_an_iterable_of_dicts_with_env_as_cloud_kafka( - self, mock__kafka_producer, mock__kafka_producer_send, input_messages_df - ): + def test_write_to_kafka_is_called_with_document_transformer_if_provided_for_writing_an_iterable_of_dicts_with_env_as_cloud_kafka(self, input_messages_df): # Given def rows_generator(_df, chunk_size): _chunk = [] - for _, row in df.iterrows(): + for _, row in _df.iterrows(): _chunk.append(row.to_dict()) if len(_chunk) == chunk_size: yield pd.DataFrame(_chunk) _chunk.clear() - df = input_messages_df.iloc[[0]] - - mock__kafka_producer.return_value = MockKafkaProducer() - mock__kafka_producer_send.return_value = MagicMock() + df = input_messages_df kafka_cloud_config = IOConfig( path_to_source_yaml=(os.path.join(constants.TEST_RESOURCES, "definitions/processed.yaml")), @@ -74,19 +63,22 @@ def rows_generator(_df, chunk_size): dynamic_vars=constants, ).get(source_key="WRITE_TO_KAFKA_JSON") + # Create the MockKafkaProducer instance before patching + mock_kafka_producer_instance = MockKafkaProducer() + # When - for chunk in rows_generator(_df=df, chunk_size=2): - WriteKafkaIO(kafka_cloud_config, document_transformer=lambda v: dict(**v, worked=True)).write(chunk) + with patch("dynamicio.mixins.with_kafka.Producer", return_value=mock_kafka_producer_instance): + write_kafka_io = WriteKafkaIO(kafka_cloud_config, document_transformer=lambda v: dict(**v, worked=True)) + for chunk in rows_generator(_df=df, chunk_size=2): + write_kafka_io.write(chunk) + + # Debug: Print the contents of my_stream to trace the issue + print("my_stream contents:", mock_kafka_producer_instance.my_stream) + # Then - mock__kafka_producer_send.assert_called_once_with( - { - "id": "message01", - "foo": "xxxxxxxx", - "bar": 0, - "baz": ["a", "b", "c"], - "worked": True, - } - ) + for i in range(len(df)): + assert len(mock_kafka_producer_instance.my_stream) > 0, "No messages were produced" + assert mock_kafka_producer_instance.my_stream[i]["value"] == simplejson.dumps(dict(**df.iloc[i].to_dict(), worked=True), ignore_nan=True).encode("utf-8") @pytest.mark.unit def test_kafka_producer_default_value_serialiser_is_used_unless_alternative_is_given(self, test_df): @@ -96,18 +88,18 @@ def test_kafka_producer_default_value_serialiser_is_used_unless_alternative_is_g env_identifier="CLOUD", dynamic_vars=constants, ).get(source_key="WRITE_TO_KAFKA_JSON") + + # Create the MockKafkaProducer instance before patching + mock_kafka_producer_instance = MockKafkaProducer() + write_kafka_io = WriteKafkaIO(kafka_cloud_config) # When - with patch.object(dynamicio.mixins.with_kafka, "KafkaProducer") as mock__kafka_producer, patch.object(MockKafkaProducer, "send") as mock__kafka_producer_send: - mock__kafka_producer.DEFAULT_CONFIG = KafkaProducer.DEFAULT_CONFIG - mock__kafka_producer.return_value = MockKafkaProducer() - mock__kafka_producer_send.return_value = MagicMock() + with patch("dynamicio.mixins.with_kafka.Producer", return_value=mock_kafka_producer_instance): write_kafka_io.write(test_df) - # Then - value_serializer = write_kafka_io._WithKafka__kafka_config.pop("value_serializer") - assert "WithKafka._default_value_serializer" in str(value_serializer) + # Then (excuse me for resorting to private attributes, but it's the only way to test this) + assert write_kafka_io._WithKafka__value_serializer == write_kafka_io._default_value_serializer # pylint: disable=comparison-with-callable @pytest.mark.unit def test_kafka_producer_default_key_serialiser_is_used_unless_alternative_is_given(self, test_df): @@ -117,68 +109,115 @@ def test_kafka_producer_default_key_serialiser_is_used_unless_alternative_is_giv env_identifier="CLOUD", dynamic_vars=constants, ).get(source_key="WRITE_TO_KAFKA_JSON") + + # Create the MockKafkaProducer instance before patching + mock_kafka_producer_instance = MockKafkaProducer() + write_kafka_io = WriteKafkaIO(kafka_cloud_config) # When - with patch.object(dynamicio.mixins.with_kafka, "KafkaProducer") as mock__kafka_producer, patch.object(MockKafkaProducer, "send") as mock__kafka_producer_send: - mock__kafka_producer.DEFAULT_CONFIG = KafkaProducer.DEFAULT_CONFIG - mock__kafka_producer.return_value = MockKafkaProducer() - mock__kafka_producer_send.return_value = MagicMock() + with patch("dynamicio.mixins.with_kafka.Producer", return_value=mock_kafka_producer_instance): write_kafka_io.write(test_df) - # Then - key_serializer = write_kafka_io._WithKafka__kafka_config.pop("key_serializer") - assert "WithKafka._default_key_serializer" in str(key_serializer) + # Then (excuse me for resorting to private attributes, but it's the only way to test this) + assert write_kafka_io._WithKafka__key_serializer == write_kafka_io._default_key_serializer # pylint: disable=comparison-with-callable @pytest.mark.unit - @patch.object(MockKafkaProducer, "send") - @patch.object(dynamicio.mixins.with_kafka, "KafkaProducer") - def test_kafka_producer_default_compression_type_is_snappy(self, mock__kafka_producer, mock__kafka_producer_send, test_df): + def test_kafka_producer_default_compression_type_is_snappy(self, test_df): # Given - mock__kafka_producer.DEFAULT_CONFIG = KafkaProducer.DEFAULT_CONFIG - mock__kafka_producer.return_value = MockKafkaProducer() - mock__kafka_producer_send.return_value = MagicMock() kafka_cloud_config = IOConfig( path_to_source_yaml=(os.path.join(constants.TEST_RESOURCES, "definitions/processed.yaml")), env_identifier="CLOUD", dynamic_vars=constants, ).get(source_key="WRITE_TO_KAFKA_JSON") + + # Create the MockKafkaProducer instance before patching + mock_kafka_producer_instance = MockKafkaProducer() + write_kafka_io = WriteKafkaIO(kafka_cloud_config) # When - write_kafka_io.write(test_df) + with patch("dynamicio.mixins.with_kafka.Producer", return_value=mock_kafka_producer_instance): + write_kafka_io.write(test_df) # Then - write_kafka_io._WithKafka__kafka_config.pop("value_serializer") # Removed as it returns a unique function identifier - write_kafka_io._WithKafka__kafka_config.pop("key_serializer") # Removed as it returns a unique function identifier - assert write_kafka_io._WithKafka__kafka_config == {"bootstrap_servers": "mock-kafka-server", "compression_type": "snappy"} + # Remove serializers from config for assertion as they are function references + kafka_config = write_kafka_io._WithKafka__kafka_config.copy() + kafka_config.pop("value_serializer", None) # Use .pop with default value to avoid KeyError + kafka_config.pop("key_serializer", None) # Use .pop with default value to avoid KeyError + + # Check that default options are correctly set + assert kafka_config == {"bootstrap.servers": "mock-kafka-server", "compression.type": "snappy"} @pytest.mark.unit - @patch.object(MockKafkaProducer, "send") - @patch.object(dynamicio.mixins.with_kafka, "KafkaProducer") - def test_kafka_producer_options_are_replaced_by_the_user_options(self, mock__kafka_producer, mock__kafka_producer_send, test_df): + def test_kafka_producer_options_are_replaced_by_the_user_options(self, test_df): # Given - mock__kafka_producer.DEFAULT_CONFIG = KafkaProducer.DEFAULT_CONFIG - mock__kafka_producer.return_value = MockKafkaProducer() - mock__kafka_producer_send.return_value = MagicMock() kafka_cloud_config = IOConfig( path_to_source_yaml=(os.path.join(constants.TEST_RESOURCES, "definitions/processed.yaml")), env_identifier="CLOUD", dynamic_vars=constants, ).get(source_key="WRITE_TO_KAFKA_JSON") - write_kafka_io = WriteKafkaIO(kafka_cloud_config, compression_type="lz4", acks=2) + + # Create the MockKafkaProducer instance before patching + mock_kafka_producer_instance = MockKafkaProducer() + + write_kafka_io = WriteKafkaIO(kafka_cloud_config, **{"compression.type": "lz4", "acks": 2}) # When - write_kafka_io.write(test_df) + with patch("dynamicio.mixins.with_kafka.Producer", return_value=mock_kafka_producer_instance): + write_kafka_io.write(test_df) # Then - value_serializer = write_kafka_io._WithKafka__kafka_config.pop("value_serializer") # Removed as it returns a unique function identifier - write_kafka_io._WithKafka__kafka_config.pop("key_serializer") # Removed as it returns a unique function identifier - assert write_kafka_io._WithKafka__kafka_config == { + # Remove serializers from config for assertion as they are function references + kafka_config = write_kafka_io._WithKafka__kafka_config.copy() + kafka_config.pop("value_serializer", None) # Use .pop with default value to avoid KeyError + kafka_config.pop("key_serializer", None) # Use .pop with default value to avoid KeyError + + # Check that user options are correctly set + assert kafka_config == { "acks": 2, - "bootstrap_servers": "mock-kafka-server", - "compression_type": "lz4", - } and "WithKafka._default_value_serializer" in str(value_serializer) + "bootstrap.servers": "mock-kafka-server", + "compression.type": "lz4", + } + assert write_kafka_io._WithKafka__kafka_config == kafka_config + + @pytest.mark.unit + def test_kafka_producer_options_are_replaced_by_the_user_options_from_resource_definition(self, test_df): + # Given + kafka_cloud_config = IOConfig( + path_to_source_yaml=(os.path.join(constants.TEST_RESOURCES, "definitions/processed.yaml")), + env_identifier="CLOUD", + dynamic_vars=constants, + ).get(source_key="WRITE_TO_KAFKA_JSON_WITH_OPTIONS") + + # Create the MockKafkaProducer instance before patching + mock_kafka_producer_instance = MockKafkaProducer() + + write_kafka_io = WriteKafkaIO(kafka_cloud_config) + + # When + with patch("dynamicio.mixins.with_kafka.Producer", return_value=mock_kafka_producer_instance): + write_kafka_io.write(test_df) + + # Then + # Remove serializers from config for assertion as they are function references + kafka_config = write_kafka_io._WithKafka__kafka_config.copy() + kafka_config.pop("value_serializer", None) # Use .pop with default value to avoid KeyError + kafka_config.pop("key_serializer", None) # Use .pop with default value to avoid KeyError + + # Check that user options are correctly set + assert kafka_config == { + "batch.size": 20000000, + "bootstrap.servers": "mock-kafka-server", + "compression.type": "gzip", + "linger.ms": 3000, + "max.in.flight.requests.per.connection": 10, + "message.send.max.retries": 3, + "request.timeout.ms": 60000, + "retry.backoff.ms": 100, + } + + assert write_kafka_io._WithKafka__kafka_config == kafka_config @pytest.mark.unit def test_producer_send_method_sends_messages_with_index_as_key_by_default_if_a_keygen_is_not_provided(self, test_df): @@ -188,21 +227,30 @@ def test_producer_send_method_sends_messages_with_index_as_key_by_default_if_a_k env_identifier="CLOUD", dynamic_vars=constants, ).get(source_key="WRITE_TO_KAFKA_JSON") + + # Create the MockKafkaProducer instance before patching + mock_kafka_producer_instance = MockKafkaProducer() + write_kafka_io = WriteKafkaIO(kafka_cloud_config) # When - with patch.object(dynamicio.mixins.with_kafka, "KafkaProducer") as mock__kafka_producer: - mock__kafka_producer.DEFAULT_CONFIG = KafkaProducer.DEFAULT_CONFIG - mock_producer = MockKafkaProducer() - mock__kafka_producer.return_value = mock_producer + with patch("dynamicio.mixins.with_kafka.Producer", return_value=mock_kafka_producer_instance): write_kafka_io.write(test_df) # Then - assert mock_producer.my_stream == [ - {"key": 0, "value": {"bar": 1000, "baz": "ABC", "foo": "id_1", "id": "cm_1"}}, - {"key": 1, "value": {"bar": 1000, "baz": "ABC", "foo": "id_2", "id": "cm_2"}}, - {"key": 2, "value": {"bar": 1000, "baz": "ABC", "foo": "id_3", "id": "cm_3"}}, + def sort_dict(d): + return {k: d[k] for k in sorted(d)} + + expected_stream = [ + {"key": b"0", "value": simplejson.dumps(sort_dict({"bar": 1000, "baz": "ABC", "foo": "id_1", "id": "cm_1"}), ignore_nan=True).encode("utf-8")}, + {"key": b"1", "value": simplejson.dumps(sort_dict({"bar": 1000, "baz": "ABC", "foo": "id_2", "id": "cm_2"}), ignore_nan=True).encode("utf-8")}, + {"key": b"2", "value": simplejson.dumps(sort_dict({"bar": 1000, "baz": "ABC", "foo": "id_3", "id": "cm_3"}), ignore_nan=True).encode("utf-8")}, ] + actual = [] + for message in mock_kafka_producer_instance.my_stream: + actual.append({"key": message["key"], "value": simplejson.dumps(sort_dict(simplejson.loads(message["value"])), ignore_nan=True).encode("utf-8")}) + + assert actual == expected_stream @pytest.mark.unit def test_producer_send_method_can_send_keyed_messages_using_a_custom_key_generator(self, test_df): @@ -214,19 +262,27 @@ def test_producer_send_method_can_send_keyed_messages_using_a_custom_key_generat ).get(source_key="WRITE_TO_KAFKA_JSON") write_kafka_io = WriteKafkaIO(kafka_cloud_config, key_generator=lambda _, message: "XXX") + # Create the MockKafkaProducer instance before patching + mock_kafka_producer_instance = MockKafkaProducer() + # When - with patch.object(dynamicio.mixins.with_kafka, "KafkaProducer") as mock__kafka_producer: - mock__kafka_producer.DEFAULT_CONFIG = KafkaProducer.DEFAULT_CONFIG - mock_producer = MockKafkaProducer() - mock__kafka_producer.return_value = mock_producer + with patch("dynamicio.mixins.with_kafka.Producer", return_value=mock_kafka_producer_instance): write_kafka_io.write(test_df) # Then - assert mock_producer.my_stream == [ - {"key": "XXX", "value": {"bar": 1000, "baz": "ABC", "foo": "id_1", "id": "cm_1"}}, - {"key": "XXX", "value": {"bar": 1000, "baz": "ABC", "foo": "id_2", "id": "cm_2"}}, - {"key": "XXX", "value": {"bar": 1000, "baz": "ABC", "foo": "id_3", "id": "cm_3"}}, + def sort_dict(d): + return {k: d[k] for k in sorted(d)} + + expected_stream = [ + {"key": b"XXX", "value": simplejson.dumps(sort_dict({"bar": 1000, "baz": "ABC", "foo": "id_1", "id": "cm_1"}), ignore_nan=True).encode("utf-8")}, + {"key": b"XXX", "value": simplejson.dumps(sort_dict({"bar": 1000, "baz": "ABC", "foo": "id_2", "id": "cm_2"}), ignore_nan=True).encode("utf-8")}, + {"key": b"XXX", "value": simplejson.dumps(sort_dict({"bar": 1000, "baz": "ABC", "foo": "id_3", "id": "cm_3"}), ignore_nan=True).encode("utf-8")}, ] + actual = [] + for message in mock_kafka_producer_instance.my_stream: + actual.append({"key": message["key"], "value": simplejson.dumps(sort_dict(simplejson.loads(message["value"])), ignore_nan=True).encode("utf-8")}) + + assert actual == expected_stream @pytest.mark.unit @pytest.mark.parametrize( @@ -271,15 +327,16 @@ def test_default_key_generator_and_transformer_are_used_if_none_are_provided_by_ ).get(source_key="WRITE_TO_KAFKA_JSON") write_kafka_io = WriteKafkaIO(kafka_cloud_config) - # When - with patch.object(dynamicio.mixins.with_kafka, "KafkaProducer") as mock__kafka_producer: - mock__kafka_producer.DEFAULT_CONFIG = KafkaProducer.DEFAULT_CONFIG - mock_producer = MockKafkaProducer() - mock__kafka_producer.return_value = mock_producer + # Create the MockKafkaProducer instance before patching + mock_kafka_producer_instance = MockKafkaProducer() - # When + # When + with patch("dynamicio.mixins.with_kafka.Producer", return_value=mock_kafka_producer_instance): write_kafka_io.write(keyed_test_df) - assert (write_kafka_io._WithKafka__key_generator("idx", "value") == "idx") and (write_kafka_io._WithKafka__document_transformer("value") == "value") + + # Then + assert write_kafka_io._WithKafka__key_generator("idx", "value") == "idx" + assert write_kafka_io._WithKafka__document_transformer({"value": "value"}) == {"value": "value"} @pytest.mark.unit def test_custom_key_generator_and_transformer_are_used_if_they_are_provided_by_the_user(self): @@ -299,12 +356,51 @@ def test_custom_key_generator_and_transformer_are_used_if_they_are_provided_by_t ).get(source_key="WRITE_TO_KAFKA_JSON") write_kafka_io = WriteKafkaIO(kafka_cloud_config, key_generator=lambda idx, _: "xxx", document_transformer=lambda _: "xxx") - # When - with patch.object(dynamicio.mixins.with_kafka, "KafkaProducer") as mock__kafka_producer: - mock__kafka_producer.DEFAULT_CONFIG = KafkaProducer.DEFAULT_CONFIG - mock_producer = MockKafkaProducer() - mock__kafka_producer.return_value = mock_producer + # Create the MockKafkaProducer instance before patching + mock_kafka_producer_instance = MockKafkaProducer() - # When + # When + with patch("dynamicio.mixins.with_kafka.Producer", return_value=mock_kafka_producer_instance): write_kafka_io.write(keyed_test_df) - assert (write_kafka_io._WithKafka__key_generator("idx", "value") == "xxx") and (write_kafka_io._WithKafka__document_transformer("value") == "xxx") + + # Then + assert write_kafka_io._WithKafka__key_generator("idx", "value") == "xxx" + assert write_kafka_io._WithKafka__document_transformer("value") == "xxx" + + @pytest.mark.unit + def test_raise_exception_on_single_message_failure(self, input_messages_df): + # Given + def rows_generator(_df, chunk_size): + _chunk = [] + for _, row in _df.iterrows(): + _chunk.append(row.to_dict()) + if len(_chunk) == chunk_size: + yield pd.DataFrame(_chunk) + _chunk.clear() + + df = input_messages_df + + kafka_cloud_config = IOConfig( + path_to_source_yaml=(os.path.join(constants.TEST_RESOURCES, "definitions/processed.yaml")), + env_identifier="CLOUD", + dynamic_vars=constants, + ).get(source_key="WRITE_TO_KAFKA_JSON") + + # Create the MockKafkaProducer instance before patching + mock_kafka_producer_instance = MockKafkaProducer() + + def mock_produce(*args, **kwargs): # pylint: disable=unused-argument + if mock_kafka_producer_instance.produce_call_count == 1: + raise Exception("Mock message delivery failure") + mock_kafka_producer_instance.produce_call_count += 1 + + # When + with patch("dynamicio.mixins.with_kafka.Producer", return_value=mock_kafka_producer_instance): + with patch.object(mock_kafka_producer_instance, "produce", side_effect=mock_produce): + write_kafka_io = WriteKafkaIO(kafka_cloud_config) + with pytest.raises(Exception, match="Mock message delivery failure"): + for chunk in rows_generator(_df=df, chunk_size=2): + write_kafka_io.write(chunk) + + # Ensure only one message is sent successfully before the failure + assert mock_kafka_producer_instance.produce_call_count == 1 diff --git a/tests/test_regressions/conftest.py b/tests/test_regressions/conftest.py deleted file mode 100644 index 8c192b4..0000000 --- a/tests/test_regressions/conftest.py +++ /dev/null @@ -1,26 +0,0 @@ -import imp -import pathlib - -import pytest - - -@pytest.fixture -def regressions_resources_dir() -> pathlib.Path: - return (pathlib.Path(__file__).parent / "resources").resolve() - - -@pytest.fixture -def tests_resources_dir(regressions_resources_dir): - return regressions_resources_dir.parent.parent / "resources" - - -@pytest.fixture -def regressions_constants_module(regressions_resources_dir, tests_resources_dir): - mod = imp.new_module("regressions_constants_module") - mod.__dict__.update( - { - "REGRESSIONS_RESOURCES_DIR": str(regressions_resources_dir), - "TEST_RESOURCES_DIR": str(tests_resources_dir), - } - ) - return mod diff --git a/tests/test_regressions/resources/missing_v430_validations.yaml b/tests/test_regressions/resources/missing_v430_validations.yaml deleted file mode 100644 index 0669e30..0000000 --- a/tests/test_regressions/resources/missing_v430_validations.yaml +++ /dev/null @@ -1,13 +0,0 @@ -PRODUCTS: - LOCAL: - type: "local" - local: - file_path: "[[ TEST_RESOURCES_DIR ]]/data/input/some_csv_to_read.csv" - file_type: "csv" - schema: - name: products - columns: - id: - type: "object" - validations: {} - metrics: [] \ No newline at end of file diff --git a/tests/test_regressions/test_v430.py b/tests/test_regressions/test_v430.py deleted file mode 100644 index e0d1d11..0000000 --- a/tests/test_regressions/test_v430.py +++ /dev/null @@ -1,26 +0,0 @@ -"""Test regressions discovered in v4.3.0 release""" - -from dynamicio import UnifiedIO -from dynamicio.config import IOConfig -from dynamicio.core import SCHEMA_FROM_FILE - - -class IO(UnifiedIO): - schema = SCHEMA_FROM_FILE - - -def test_missing_validations_and_metrics(regressions_resources_dir, regressions_constants_module): - """Dynamicio was refusing to work with schemas that did not have any validations specified.""" - # Given - input_config = IOConfig( - path_to_source_yaml=regressions_resources_dir / "missing_v430_validations.yaml", - env_identifier="LOCAL", - dynamic_vars=regressions_constants_module, - ) - io_instance = IO(source_config=input_config.get(source_key="PRODUCTS"), apply_schema_validations=True, log_schema_metrics=True) - - # When - data = io_instance.read() - - # Then - assert data.to_dict() == {"id": {0: 1, 1: 2, 2: 3, 3: 4, 4: 5, 5: 6, 6: 7, 7: 8, 8: 9, 9: 10, 10: 11, 11: 12, 12: 13, 13: 14, 14: 15}} diff --git a/tests/test_validations.py b/tests/test_validations.py index 121e55e..c66f935 100644 --- a/tests/test_validations.py +++ b/tests/test_validations.py @@ -1,6 +1,9 @@ # pylint: disable=missing-module-docstring, missing-class-docstring, missing-function-docstring, too-many-public-methods +import os + import pytest +from dynamicio.config import IOConfig from dynamicio.validations import ( has_acceptable_percentage_of_nulls, has_no_null_values, @@ -12,6 +15,8 @@ is_lower_than, is_lower_than_or_equal, ) +from tests import constants +from tests.mocking.io import ReadS3CsvIO class TestHasUniqueValues: @@ -448,3 +453,24 @@ def test_returns_true_if_all_column_values_are_between_upper_and_lower_bounds_ir # Then assert validation.valid is True and validation.value == 0 and validation.message == "All values of TEST[weight_b] is between 4 and 10 thresholds" + + +class TestRegressions: + """Tests for regressions discovered in v4.3.0 release.""" + + @pytest.mark.unit + def test_missing_validations_and_metrics(self): + """Test that dynamicio works with schemas without validations specified.""" + # Given + input_config = IOConfig( + path_to_source_yaml=(os.path.join(constants.TEST_RESOURCES, "definitions/input.yaml")), + env_identifier="LOCAL", + dynamic_vars=constants, + ).get(source_key="PRODUCTS") + io_instance = ReadS3CsvIO(source_config=input_config, apply_schema_validations=True, log_schema_metrics=True) + + # When + data = io_instance.read() + + # Then + assert data.to_dict() == {"id": {0: 1, 1: 2, 2: 3, 3: 4, 4: 5, 5: 6, 6: 7, 7: 8, 8: 9, 9: 10, 10: 11, 11: 12, 12: 13, 13: 14, 14: 15}}