diff --git a/.flake8 b/.flake8 new file mode 100644 index 0000000..6db3a7c --- /dev/null +++ b/.flake8 @@ -0,0 +1,3 @@ +[flake8] +max-line-length = 88 +exclude = .git,__pycache__,__init__.py,.mypy_cache,.pytest_cache,venv diff --git a/.github/workflows/docker-publish.yml b/.github/workflows/docker-publish.yml new file mode 100644 index 0000000..df25d4a --- /dev/null +++ b/.github/workflows/docker-publish.yml @@ -0,0 +1,57 @@ +name: Docker + +on: + push: + tags: [ 'v*.*.*' ] + pull_request: + branches: [ "main" ] + +env: + REGISTRY: ghcr.io + IMAGE_NAME: ${{ github.repository }} + +jobs: + build: + + runs-on: ubuntu-latest + permissions: + contents: read + packages: write + + steps: + - name: Checkout repository + uses: actions/checkout@v3 + + - name: Set up QEMU + uses: docker/setup-qemu-action@v2 + + - name: Setup Docker buildx + uses: docker/setup-buildx-action@v2 + with: + platforms: linux/amd64,linux/arm64 + + - name: Log into registry ${{ env.REGISTRY }} + if: github.event_name != 'pull_request' + uses: docker/login-action@v2 + with: + registry: ${{ env.REGISTRY }} + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + + - name: Extract Docker metadata + id: meta + uses: docker/metadata-action@v4 + with: + images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }} + + - name: Build and push Docker image + id: build-and-push + uses: docker/build-push-action@v3 + with: + context: . + platforms: linux/amd64,linux/arm64 + push: ${{ github.event_name != 'pull_request' }} + tags: ${{ steps.meta.outputs.tags }} + labels: ${{ steps.meta.outputs.labels }} + cache-from: type=gha + cache-to: type=gha,mode=max diff --git a/.github/workflows/python-app.yml b/.github/workflows/python-app.yml new file mode 100644 index 0000000..f8f5456 --- /dev/null +++ b/.github/workflows/python-app.yml @@ -0,0 +1,36 @@ +# This workflow will install Python dependencies, run tests and lint with a single version of Python +# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-python + +name: Python application + +on: + push: + branches: [ "main" ] + pull_request: + branches: [ "main" ] + +permissions: + contents: read + +jobs: + build: + + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v3 + - name: Set up Python 3.10 + uses: actions/setup-python@v3 + with: + python-version: "3.10" + - name: Install dependencies + run: | + python -m pip install --upgrade pip + if [ -f requirements.txt ]; then pip install -r requirements.txt; fi + if [ -f requirements_dev.txt ]; then pip install -r requirements_dev.txt; fi + - name: Lint + run: | + ./scripts/lint.sh + - name: Test with pytest + run: | + ./scripts/test.sh diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..872c981 --- /dev/null +++ b/.gitignore @@ -0,0 +1,131 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +pip-wheel-metadata/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +.python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +.idea diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..afa770e --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,19 @@ +files: app|tests +repos: + - repo: https://github.com/PyCQA/autoflake + rev: v1.7.7 + hooks: + - id: autoflake + args: ["--remove-all-unused-imports", + "--remove-unused-variables", + "--in-place", + "--exclude=__init__.py"] + - repo: https://github.com/psf/black + rev: 22.10.0 + hooks: + - id: black + - repo: https://github.com/pycqa/isort + rev: 5.10.1 + hooks: + - id: isort + args: ["--profile", "black"] diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..999a1dd --- /dev/null +++ b/Dockerfile @@ -0,0 +1,17 @@ +FROM python:3.10-slim-buster + +ENV LIBRDKAFKA_VERSION 1.9.2 + +WORKDIR /app + +RUN apt update && apt install -y wget make g++ libssl-dev +RUN wget https://github.com/edenhill/librdkafka/archive/v${LIBRDKAFKA_VERSION}.tar.gz && \ + tar xvzf v${LIBRDKAFKA_VERSION}.tar.gz && \ + (cd librdkafka-${LIBRDKAFKA_VERSION}/ && ./configure && make && make install && ldconfig) + +COPY requirements.txt requirements.txt +RUN pip3 install -r requirements.txt + +COPY ./app . + +CMD [ "python3", "main.py"] diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..261eeb9 --- /dev/null +++ b/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/app/__init__.py b/app/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/consumers/__init__.py b/app/consumers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/consumers/base_consumer.py b/app/consumers/base_consumer.py new file mode 100644 index 0000000..56d8709 --- /dev/null +++ b/app/consumers/base_consumer.py @@ -0,0 +1,7 @@ +from abc import ABC, abstractmethod + + +class BaseConsumer(ABC): + @abstractmethod + def start(self) -> None: + pass diff --git a/app/consumers/kafka_consumer.py b/app/consumers/kafka_consumer.py new file mode 100644 index 0000000..ecc2bc6 --- /dev/null +++ b/app/consumers/kafka_consumer.py @@ -0,0 +1,85 @@ +import logging +import signal +from typing import Any, Callable + +from confluent_kafka import Consumer, KafkaException, Message +from consumers.base_consumer import BaseConsumer +from core.config import settings +from core.consts import consts + +logging.basicConfig(level=settings.LOG_LEVEL) +logger = logging.getLogger(__name__) + + +class KafkaConsumer(BaseConsumer): + def __init__( + self, msg_process: Callable[[Message], None], consumer: Consumer = None + ) -> None: + self.running = False + signal.signal(signal.SIGINT, self.exit_gracefully) + signal.signal(signal.SIGTERM, self.exit_gracefully) + + self.msg_process = msg_process + + if consumer: + self.consumer = consumer + else: + conf = { + "bootstrap.servers": settings.KAFKA_CONSUMER_BROKERS, + "client.id": consts.KAFKA_CONSUMER_CLIENT_ID, + "security.protocol": settings.KAFKA_CONSUMER_SECURITY_PROTOCOL, + "sasl.mechanism": settings.KAFKA_CONSUMER_AUTHENTICATION_MECHANISM, + "sasl.username": settings.KAFKA_CONSUMER_USERNAME, + "sasl.password": settings.KAFKA_CONSUMER_PASSWORD, + "group.id": settings.KAFKA_CONSUMER_GROUP_ID, + "session.timeout.ms": settings.KAFKA_CONSUMER_SESSION_TIMEOUT_MS, + "auto.offset.reset": settings.KAFKA_CONSUMER_AUTO_OFFSET_RESET, + "enable.auto.commit": "false", + } + self.consumer = Consumer(conf) + + def start(self) -> None: + try: + self.consumer.subscribe( + [settings.KAFKA_RUNS_TOPIC, settings.KAFKA_CHANGE_LOG_TOPIC], + on_assign=lambda _, partitions: logger.info( + "Assignment: %s", partitions + ), + ) + self.running = True + while self.running: + try: + msg = self.consumer.poll(timeout=1.0) + if msg is None: + continue + if msg.error(): + raise KafkaException(msg.error()) + else: + try: + logger.info( + "Process message" + " from topic %s, partition %d, offset %d", + msg.topic(), + msg.partition(), + msg.offset(), + ) + self.msg_process(msg) + except Exception as process_error: + logger.error( + "Failed process message" + " from topic %s, partition %d, offset %d: %s", + msg.topic(), + msg.partition(), + msg.offset(), + str(process_error), + ) + finally: + self.consumer.commit(asynchronous=False) + except Exception as message_error: + logger.error(str(message_error)) + finally: + self.consumer.close() + + def exit_gracefully(self, *_: Any) -> None: + logger.info("Exiting gracefully...") + self.running = False diff --git a/app/core/__init__.py b/app/core/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/core/config.py b/app/core/config.py new file mode 100644 index 0000000..832d965 --- /dev/null +++ b/app/core/config.py @@ -0,0 +1,44 @@ +from typing import Optional + +from pydantic import BaseSettings, validator + + +class Settings(BaseSettings): + LOG_LEVEL: str = "INFO" + + STREAMER_NAME: str + + PORT_ORG_ID: str + + KAFKA_CONSUMER_BROKERS: str = "localhost:9092" + KAFKA_CONSUMER_SECURITY_PROTOCOL: str = "plaintext" + KAFKA_CONSUMER_AUTHENTICATION_MECHANISM: str = "none" + KAFKA_CONSUMER_USERNAME: str = "local" + KAFKA_CONSUMER_PASSWORD: str = "" + KAFKA_CONSUMER_SESSION_TIMEOUT_MS: int = 45000 + KAFKA_CONSUMER_AUTO_OFFSET_RESET: str = "earliest" + KAFKA_CONSUMER_GROUP_ID: str = "" + + KAFKA_RUNS_TOPIC: str = "" + + @validator("KAFKA_RUNS_TOPIC", always=True) + def set_kafka_runs_topic(cls, v: Optional[str], values: dict) -> str: + if isinstance(v, str) and v: + return v + return f"{values.get('PORT_ORG_ID')}.runs" + + KAFKA_CHANGE_LOG_TOPIC: str = "" + + @validator("KAFKA_CHANGE_LOG_TOPIC", always=True) + def set_kafka_change_log_topic(cls, v: Optional[str], values: dict) -> str: + if isinstance(v, str) and v: + return v + return f"{values.get('PORT_ORG_ID')}.change.log" + + class Config: + case_sensitive = True + + WEBHOOK_INVOKER_TIMEOUT: int = 5000 + + +settings = Settings() diff --git a/app/core/consts.py b/app/core/consts.py new file mode 100644 index 0000000..b6a204a --- /dev/null +++ b/app/core/consts.py @@ -0,0 +1,6 @@ +class Consts: + INVOCATION_TYPE_WEBHOOK = "WEBHOOK" + KAFKA_CONSUMER_CLIENT_ID = "port-agent" + + +consts = Consts() diff --git a/app/invokers/__init__.py b/app/invokers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/invokers/base_invoker.py b/app/invokers/base_invoker.py new file mode 100644 index 0000000..f3d1425 --- /dev/null +++ b/app/invokers/base_invoker.py @@ -0,0 +1,7 @@ +from abc import ABC, abstractmethod + + +class BaseInvoker(ABC): + @abstractmethod + def invoke(self, message: dict, destination: dict) -> None: + pass diff --git a/app/invokers/webhook_invoker.py b/app/invokers/webhook_invoker.py new file mode 100644 index 0000000..7aa54ff --- /dev/null +++ b/app/invokers/webhook_invoker.py @@ -0,0 +1,27 @@ +import logging + +import requests +from core.config import settings +from invokers.base_invoker import BaseInvoker + +logging.basicConfig(level=settings.LOG_LEVEL) +logger = logging.getLogger(__name__) + + +class WebhookInvoker(BaseInvoker): + def invoke(self, body: dict, destination: dict) -> None: + logger.info("WebhookInvoker - start - destination: %s", destination) + res = requests.post( + destination.get("url", ""), + json=body, + timeout=settings.WEBHOOK_INVOKER_TIMEOUT, + ) + logger.info( + "WebhookInvoker - done - destination: %s, status code: %s", + destination, + res.status_code, + ) + res.raise_for_status() + + +webhook_invoker = WebhookInvoker() diff --git a/app/main.py b/app/main.py new file mode 100644 index 0000000..b6ba5f2 --- /dev/null +++ b/app/main.py @@ -0,0 +1,18 @@ +import logging + +from core.config import settings +from streamers.streamer_factory import StreamerFactory + +logging.basicConfig(level=settings.LOG_LEVEL) +logger = logging.getLogger(__name__) + + +def main() -> None: + streamer_factory = StreamerFactory() + streamer = streamer_factory.get_streamer(settings.STREAMER_NAME) + logger.info("Starting streaming with streamer: %s", settings.STREAMER_NAME) + streamer.stream() + + +if __name__ == "__main__": + main() diff --git a/app/streamers/__init__.py b/app/streamers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/streamers/base_streamer.py b/app/streamers/base_streamer.py new file mode 100644 index 0000000..9565ffe --- /dev/null +++ b/app/streamers/base_streamer.py @@ -0,0 +1,7 @@ +from abc import ABC, abstractmethod + + +class BaseStreamer(ABC): + @abstractmethod + def stream(self) -> None: + pass diff --git a/app/streamers/kafka/__init__.py b/app/streamers/kafka/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/streamers/kafka/base_kafka_streamer.py b/app/streamers/kafka/base_kafka_streamer.py new file mode 100644 index 0000000..4162fd9 --- /dev/null +++ b/app/streamers/kafka/base_kafka_streamer.py @@ -0,0 +1,37 @@ +import logging +from abc import abstractmethod + +from confluent_kafka import Consumer, Message +from consumers.kafka_consumer import KafkaConsumer +from core.config import settings +from streamers.base_streamer import BaseStreamer + +logging.basicConfig(level=settings.LOG_LEVEL) +logger = logging.getLogger(__name__) + + +class BaseKafkaStreamer(BaseStreamer): + def __init__(self, consumer: Consumer = None) -> None: + self.kafka_consumer = KafkaConsumer(self.msg_process, consumer) + + @staticmethod + @abstractmethod + def msg_process(msg: Message) -> None: + pass + + @staticmethod + def get_invocation_method(msg_value: dict, topic: str) -> dict: + if topic == settings.KAFKA_RUNS_TOPIC: + return ( + msg_value.get("payload", {}) + .get("action", {}) + .get("invocationMethod", {}) + ) + + if topic == settings.KAFKA_CHANGE_LOG_TOPIC: + return msg_value.get("changelogDestination", {}) + + return {} + + def stream(self) -> None: + self.kafka_consumer.start() diff --git a/app/streamers/kafka/kafka_to_webhook_streamer.py b/app/streamers/kafka/kafka_to_webhook_streamer.py new file mode 100644 index 0000000..ded9756 --- /dev/null +++ b/app/streamers/kafka/kafka_to_webhook_streamer.py @@ -0,0 +1,48 @@ +import json +import logging + +from confluent_kafka import Message +from core.config import settings +from core.consts import consts +from invokers.webhook_invoker import webhook_invoker +from streamers.kafka.base_kafka_streamer import BaseKafkaStreamer + +logging.basicConfig(level=settings.LOG_LEVEL) +logger = logging.getLogger(__name__) + + +class KafkaToWebhookStreamer(BaseKafkaStreamer): + @staticmethod + def msg_process(msg: Message) -> None: + logger.info("Raw message value: %s", msg.value()) + msg_value = json.loads(msg.value().decode()) + topic = msg.topic() + invocation_method = BaseKafkaStreamer.get_invocation_method(msg_value, topic) + + if not invocation_method.pop("agent", False): + logger.info( + "Skip process message" + " from topic %s, partition %d, offset %d: not for agent", + topic, + msg.partition(), + msg.offset(), + ) + return + + if invocation_method.pop("type", "") != consts.INVOCATION_TYPE_WEBHOOK: + logger.info( + "Skip process message" + " from topic %s, partition %d, offset %d: not for webhook invoker", + topic, + msg.partition(), + msg.offset(), + ) + return + + webhook_invoker.invoke(msg_value, invocation_method) + logger.info( + "Successfully processed message from topic %s, partition %d, offset %d", + topic, + msg.partition(), + msg.offset(), + ) diff --git a/app/streamers/streamer_factory.py b/app/streamers/streamer_factory.py new file mode 100644 index 0000000..2e353b7 --- /dev/null +++ b/app/streamers/streamer_factory.py @@ -0,0 +1,11 @@ +from streamers.base_streamer import BaseStreamer +from streamers.kafka.kafka_to_webhook_streamer import KafkaToWebhookStreamer + + +class StreamerFactory: + @staticmethod + def get_streamer(name: str) -> BaseStreamer: + if name == "KafkaToWebhookStreamer": + return KafkaToWebhookStreamer() + + raise Exception("Not found streamer for name: %s" % name) diff --git a/mypy.ini b/mypy.ini new file mode 100644 index 0000000..19273b9 --- /dev/null +++ b/mypy.ini @@ -0,0 +1,4 @@ +[mypy] +plugins = pydantic.mypy +ignore_missing_imports = True +disallow_untyped_defs = True diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..4336245 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +confluent-kafka==1.9.2 +pydantic==1.10.2 +requests==2.28.1 \ No newline at end of file diff --git a/requirements_dev.txt b/requirements_dev.txt new file mode 100644 index 0000000..d55395b --- /dev/null +++ b/requirements_dev.txt @@ -0,0 +1,9 @@ +pytest==7.2.0 +pytest-cov==4.0.0 +mypy==0.991 +types-requests==2.28.11.5 +isort==5.10.1 +autoflake==1.7.7 +black==22.10.0 +flake8==5.0.4 +pre-commit==2.20.0 diff --git a/scripts/format.sh b/scripts/format.sh new file mode 100755 index 0000000..4e91608 --- /dev/null +++ b/scripts/format.sh @@ -0,0 +1,6 @@ +#!/bin/sh -e +set -x + +autoflake --remove-all-unused-imports --recursive --remove-unused-variables --in-place app tests --exclude=__init__.py +black app tests +isort --profile black app tests diff --git a/scripts/lint.sh b/scripts/lint.sh new file mode 100755 index 0000000..0d62801 --- /dev/null +++ b/scripts/lint.sh @@ -0,0 +1,8 @@ +#!/usr/bin/env bash + +set -x + +mypy app tests +black app tests --check +isort --profile black --check-only app tests +flake8 diff --git a/scripts/test.sh b/scripts/test.sh new file mode 100755 index 0000000..60ffd65 --- /dev/null +++ b/scripts/test.sh @@ -0,0 +1,6 @@ +#!/usr/bin/env bash + +set -e +set -x + +PYTHONPATH=app STREAMER_NAME=test PORT_ORG_ID=test_org pytest --cov=app --cov-report=term-missing tests "${@}" diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/unit/__init__.py b/tests/unit/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/unit/streamers/__init__.py b/tests/unit/streamers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/unit/streamers/kafka/__init__.py b/tests/unit/streamers/kafka/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/unit/streamers/kafka/conftest.py b/tests/unit/streamers/kafka/conftest.py new file mode 100644 index 0000000..1a06e26 --- /dev/null +++ b/tests/unit/streamers/kafka/conftest.py @@ -0,0 +1,198 @@ +import json +import os +from signal import SIGINT +from typing import Any, Callable, Generator, Optional + +import pytest +import requests +from _pytest.monkeypatch import MonkeyPatch +from confluent_kafka import Consumer as _Consumer + + +@pytest.fixture +def mock_requests(monkeypatch: MonkeyPatch, request: Any) -> None: + class MockResponse: + status_code = request.param.get("status_code") + + def raise_for_status(self) -> None: + if 400 <= self.status_code <= 599: + raise Exception( + "Webhook Invoker failed with status code: %d" % self.status_code + ) + + def mock_post(*args: Any, **kwargs: Any) -> MockResponse: + return MockResponse() + + monkeypatch.setattr(requests, "post", mock_post) + + +def terminate_consumer() -> None: + os.kill(os.getpid(), SIGINT) + + +class Consumer(_Consumer): + def __init__(self) -> None: + pass + + def subscribe( + self, topics: Any, on_assign: Any = None, *args: Any, **kwargs: Any + ) -> None: + pass + + def poll(self, timeout: Any = None) -> None: + pass + + def commit(self, message: Any = None, *args: Any, **kwargs: Any) -> None: + pass + + def close(self, *args: Any, **kwargs: Any) -> None: + pass + + +@pytest.fixture +def mock_kafka(monkeypatch: MonkeyPatch, request: Any) -> None: + class MockKafkaMessage: + def error(self) -> None: + return None + + def topic(self, *args: Any, **kwargs: Any) -> str: + return request.param[2] + + def partition(self, *args: Any, **kwargs: Any) -> int: + return 0 + + def offset(self, *args: Any, **kwargs: Any) -> int: + return 0 + + def value(self) -> bytes: + return request.getfixturevalue(request.param[0])(request.param[1]) + + def mock_subscribe( + self: Any, topics: Any, on_assign: Any = None, *args: Any, **kwargs: Any + ) -> None: + pass + + def generate_kafka_messages() -> Generator[Optional[MockKafkaMessage], None, None]: + yield MockKafkaMessage() + while True: + yield None + + kafka_messages_generator = generate_kafka_messages() + + def mock_poll(self: Any, timeout: Any = None) -> Optional[MockKafkaMessage]: + return next(kafka_messages_generator) + + def mock_commit(self: Any, message: Any = None, *args: Any, **kwargs: Any) -> None: + return None + + def mock_close(self: Any, *args: Any, **kwargs: Any) -> None: + pass + + monkeypatch.setattr(Consumer, "subscribe", mock_subscribe) + monkeypatch.setattr(Consumer, "poll", mock_poll) + monkeypatch.setattr(Consumer, "commit", mock_commit) + monkeypatch.setattr(Consumer, "close", mock_close) + + +@pytest.fixture(scope="module") +def mock_change_log_message() -> Callable[[dict], bytes]: + change_log_message = { + "action": "Create", + "resourceType": "run", + "status": "TRIGGERED", + "trigger": { + "by": {"orgId": "test_org", "userId": "test_user"}, + "origin": "UI", + "at": "2022-11-16T16:31:32.447Z", + }, + "context": { + "entity": None, + "blueprint": "Service", + "runId": "r_jE5FhDURh4Uen2Qr", + }, + "diff": { + "before": None, + "after": { + "id": "r_jE5FhDURh4Uen2Qr", + "status": "IN_PROGRESS", + "blueprint": {"identifier": "Service", "title": "Service"}, + "action": "Create", + "endedAt": None, + "source": "UI", + "relatedEntityExists": False, + "relatedBlueprintExists": True, + "properties": {}, + "createdAt": "2022-11-16T16:31:32.447Z", + "updatedAt": "2022-11-16T16:31:32.447Z", + "createdBy": "test_user", + "updatedBy": "test_user", + }, + }, + "changelogDestination": { + "type": "WEBHOOK", + "agent": True, + "url": "http://localhost:80/api/test", + }, + } + + def get_change_log_message(invocation_method: dict) -> bytes: + if invocation_method is not None: + change_log_message["changelogDestination"] = invocation_method + return json.dumps(change_log_message).encode() + + return get_change_log_message + + +@pytest.fixture(scope="module") +def mock_run_message() -> Callable[[dict], bytes]: + run_message: dict = { + "action": "Create", + "resourceType": "run", + "status": "TRIGGERED", + "trigger": { + "by": {"orgId": "test_org", "userId": "test_user"}, + "origin": "UI", + "at": "2022-11-16T16:31:32.447Z", + }, + "context": { + "entity": None, + "blueprint": "Service", + "runId": "r_jE5FhDURh4Uen2Qr", + }, + "payload": { + "entity": None, + "action": { + "id": "action_34aweFQtayw7SCVb", + "identifier": "Create", + "title": "Create", + "icon": "DefaultBlueprint", + "userInputs": { + "properties": { + "foo": {"type": "string", "description": "Description"}, + "bar": {"type": "number", "description": "Description"}, + }, + "required": [], + }, + "invocationMethod": { + "type": "WEBHOOK", + "agent": True, + "url": "http://localhost:80/api/test", + }, + "trigger": "CREATE", + "description": "", + "blueprint": "Service", + "createdAt": "2022-11-15T09:58:52.863Z", + "createdBy": "test_user", + "updatedAt": "2022-11-15T09:58:52.863Z", + "updatedBy": "test_user", + }, + "properties": {}, + }, + } + + def get_run_message(invocation_method: dict) -> bytes: + if invocation_method is not None: + run_message["payload"]["action"]["invocationMethod"] = invocation_method + return json.dumps(run_message).encode() + + return get_run_message diff --git a/tests/unit/streamers/kafka/test_kafka_to_webhook_streamer.py b/tests/unit/streamers/kafka/test_kafka_to_webhook_streamer.py new file mode 100644 index 0000000..e55eec5 --- /dev/null +++ b/tests/unit/streamers/kafka/test_kafka_to_webhook_streamer.py @@ -0,0 +1,127 @@ +from threading import Timer +from unittest import mock +from unittest.mock import ANY, call + +import pytest +from consumers.kafka_consumer import logger as consumer_logger +from core.config import settings +from streamers.kafka.kafka_to_webhook_streamer import KafkaToWebhookStreamer +from streamers.kafka.kafka_to_webhook_streamer import logger as streamer_logger + +from tests.unit.streamers.kafka.conftest import Consumer, terminate_consumer + + +@pytest.mark.parametrize("mock_requests", [{"status_code": 200}], indirect=True) +@pytest.mark.parametrize( + "mock_kafka", + [ + ("mock_change_log_message", None, settings.KAFKA_CHANGE_LOG_TOPIC), + ("mock_run_message", None, settings.KAFKA_RUNS_TOPIC), + ], + indirect=True, +) +def test_single_stream_success(mock_requests: None, mock_kafka: None) -> None: + Timer(0.01, terminate_consumer).start() + + with mock.patch.object(consumer_logger, "error") as mock_error: + streamer = KafkaToWebhookStreamer(Consumer()) + streamer.stream() + + mock_error.assert_not_called() + + +@pytest.mark.parametrize("mock_requests", [{"status_code": 500}], indirect=True) +@pytest.mark.parametrize( + "mock_kafka", + [ + ("mock_change_log_message", None, settings.KAFKA_CHANGE_LOG_TOPIC), + ("mock_run_message", None, settings.KAFKA_RUNS_TOPIC), + ], + indirect=True, +) +def test_single_stream_failed(mock_requests: None, mock_kafka: None) -> None: + Timer(0.01, terminate_consumer).start() + + with mock.patch.object(consumer_logger, "error") as mock_error: + streamer = KafkaToWebhookStreamer(Consumer()) + streamer.stream() + + mock_error.assert_called_once_with( + "Failed process message from topic %s, partition %d, offset %d: %s", + ANY, + 0, + 0, + "Webhook Invoker failed with status code: 500", + ) + + +@pytest.mark.parametrize( + "mock_kafka", + [ + ("mock_change_log_message", {"agent": False}, settings.KAFKA_CHANGE_LOG_TOPIC), + ("mock_run_message", {"agent": False}, settings.KAFKA_RUNS_TOPIC), + ], + indirect=True, +) +def test_single_stream_skipped_due_to_agentless(mock_kafka: None) -> None: + Timer(0.01, terminate_consumer).start() + + with mock.patch.object(consumer_logger, "error") as mock_error, mock.patch.object( + streamer_logger, "info" + ) as mock_info: + streamer = KafkaToWebhookStreamer(Consumer()) + streamer.stream() + + mock_error.assert_not_called() + mock_info.assert_has_calls( + [ + call(ANY, ANY), + call( + "Skip process message" + " from topic %s, partition %d, offset %d: not for agent", + ANY, + 0, + 0, + ), + ] + ) + + +@pytest.mark.parametrize( + "mock_kafka", + [ + ( + "mock_change_log_message", + {"agent": True, "type": "NOT_WEBHOOK"}, + settings.KAFKA_CHANGE_LOG_TOPIC, + ), + ( + "mock_run_message", + {"agent": True, "type": "NOT_WEBHOOK"}, + settings.KAFKA_RUNS_TOPIC, + ), + ], + indirect=True, +) +def test_single_stream_skipped_due_to_not_webhook_invoker(mock_kafka: None) -> None: + Timer(0.01, terminate_consumer).start() + + with mock.patch.object(consumer_logger, "error") as mock_error, mock.patch.object( + streamer_logger, "info" + ) as mock_info: + streamer = KafkaToWebhookStreamer(Consumer()) + streamer.stream() + + mock_error.assert_not_called() + mock_info.assert_has_calls( + [ + call(ANY, ANY), + call( + "Skip process message" + " from topic %s, partition %d, offset %d: not for webhook invoker", + ANY, + 0, + 0, + ), + ] + )