diff --git a/README.md b/README.md index 6019e57..6bc5a84 100644 --- a/README.md +++ b/README.md @@ -38,7 +38,13 @@ Here is the mapping file schema: "method": JQ, # Optional. default is POST. Should return one of the following string values POST / PUT / DELETE / GET "headers": dict[str, JQ], # Optional. default is {} "body": ".body", # Optional. default is the whole payload incoming from Port. - "query": dict[str, JQ] # Optional. default is {} + "query": dict[str, JQ] # Optional. default is {}, + "report" { # Optional. Used to report the run status back to Port right after the request is sent to the 3rd party application + "status": JQ, # Optional. Should return the wanted runs status + "link": JQ, # Optional. Should return the wanted link or a list of links + "summary": JQ, # Optional. Should return the wanted summary + "externalRunId": JQ # Optional. Should return the wanted external run id + } } ] ``` @@ -128,6 +134,19 @@ Here is the mapping file schema: + +### The report mapping + +After the request is sent to the 3rd party application, the Port agent can report the run status back to Port. +The report mapping is used to construct the report that will be sent to Port. + +The report mapping can use the following fields: + +`.body` - The incoming message as mentioned [Above](#the-incoming-message-to-base-your-mapping-on) +`.request` - The request that was calculated using the control the payload mapping and sent to the 3rd party application +`.response` - The response that was received from the 3rd party application + + ### Examples #### Terraform Cloud @@ -147,10 +166,20 @@ Create the following blueprint, action and mapping to trigger a Terraform Cloud "workspace_id": { "title": "Workspace Id", "type": "string" + }, + "organization_name": { + "title": "Organization Name", + "type": "string" + }, + "workspace_name": { + "title": "Workspace Name", + "type": "string" } }, "required": [ - "workspace_id" + "workspace_id", + "organization_name", + "workspace_name" ] }, "mirrorProperties": {}, @@ -192,7 +221,8 @@ Create the following blueprint, action and mapping to trigger a Terraform Cloud Mapping - (Should be saved as `invocations.json`) ```json -[{ +[ + { "enabled": ".action == \"trigger_tf_run\"", "headers": { "Authorization": "\"Bearer \" + env.TF_TOKEN", @@ -215,8 +245,14 @@ Create the following blueprint, action and mapping to trigger a Terraform Cloud } } } + }, + "report": { + "status": "if .response.statusCode == 201 then \"SUCCESS\" else \"FAILURE\" end", + "link": "\"https://app.terraform.io/app/\" + .body.payload.entity.properties.organization_name + \"/workspaces/\" + .body.payload.entity.properties.workspace_name + \"/runs/\" + .response.json.data.id", + "externalRunId": ".response.json.data.id" } - }] + } +] ``` @@ -229,10 +265,10 @@ helm repo update helm install my-port-agent port-labs/port-agent \ --create-namespace --namespace port-agent \ + --set env.secret.PORT_CLIENT_ID=YOUR_PORT_CLIENT_ID \ + --set env.secret.PORT_CLIENT_SECRET=YOUR_PORT_CLIENT_SECRET \ --set env.normal.PORT_ORG_ID=YOUR_ORG_ID \ --set env.normal.KAFKA_CONSUMER_GROUP_ID=YOUR_KAFKA_CONSUMER_GROUP \ - --set env.secret.KAFKA_CONSUMER_USERNAME=YOUR_KAFKA_USERNAME \ - --set env.secret.KAFKA_CONSUMER_PASSWORD=YOUR_KAFKA_PASSWORD \ --set env.normal.KAFKA_CONSUMER_BROKERS=PORT_KAFKA_BROKERS \ --set env.normal.STREAMER_NAME=KAFKA \ --set env.normal.KAFKA_CONSUMER_AUTHENTICATION_MECHANISM=SCRAM-SHA-512 \ @@ -329,10 +365,10 @@ helm repo update helm install my-port-agent port-labs/port-agent \ --create-namespace --namespace port-agent \ + --set env.secret.PORT_CLIENT_ID=YOUR_PORT_CLIENT_ID \ + --set env.secret.PORT_CLIENT_SECRET=YOUR_PORT_CLIENT_SECRET \ --set env.normal.PORT_ORG_ID=YOUR_ORG_ID \ --set env.normal.KAFKA_CONSUMER_GROUP_ID=YOUR_KAFKA_CONSUMER_GROUP \ - --set env.secret.KAFKA_CONSUMER_USERNAME=YOUR_KAFKA_USERNAME \ - --set env.secret.KAFKA_CONSUMER_PASSWORD=YOUR_KAFKA_PASSWORD --set env.normal.KAFKA_CONSUMER_BROKERS=PORT_KAFKA_BROKERS \ --set env.normal.STREAMER_NAME=KAFKA \ --set env.normal.KAFKA_CONSUMER_AUTHENTICATION_MECHANISM=SCRAM-SHA-512 \ diff --git a/app/consumers/kafka_consumer.py b/app/consumers/kafka_consumer.py index ecc2bc6..978dfd2 100644 --- a/app/consumers/kafka_consumer.py +++ b/app/consumers/kafka_consumer.py @@ -6,6 +6,7 @@ from consumers.base_consumer import BaseConsumer from core.config import settings from core.consts import consts +from port_client import get_kafka_credentials logging.basicConfig(level=settings.LOG_LEVEL) logger = logging.getLogger(__name__) @@ -24,13 +25,15 @@ def __init__( if consumer: self.consumer = consumer else: + logger.info("Getting Kafka credentials") + username, password = get_kafka_credentials() conf = { "bootstrap.servers": settings.KAFKA_CONSUMER_BROKERS, "client.id": consts.KAFKA_CONSUMER_CLIENT_ID, "security.protocol": settings.KAFKA_CONSUMER_SECURITY_PROTOCOL, "sasl.mechanism": settings.KAFKA_CONSUMER_AUTHENTICATION_MECHANISM, - "sasl.username": settings.KAFKA_CONSUMER_USERNAME, - "sasl.password": settings.KAFKA_CONSUMER_PASSWORD, + "sasl.username": username, + "sasl.password": password, "group.id": settings.KAFKA_CONSUMER_GROUP_ID, "session.timeout.ms": settings.KAFKA_CONSUMER_SESSION_TIMEOUT_MS, "auto.offset.reset": settings.KAFKA_CONSUMER_AUTO_OFFSET_RESET, diff --git a/app/core/config.py b/app/core/config.py index 1a4c057..2abf4e2 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -1,7 +1,22 @@ from pathlib import Path from typing import Any, Literal, Optional -from pydantic import BaseModel, BaseSettings, parse_file_as, validator +from pydantic import ( + AnyHttpUrl, + BaseModel, + BaseSettings, + Field, + parse_file_as, + parse_obj_as, + validator, +) + + +class ActionReport(BaseModel): + status: str | None = None + link: str | None = None + summary: str | None = None + external_run_id: str | None = Field(None, alias="externalRunId") class Mapping(BaseModel): @@ -11,6 +26,7 @@ class Mapping(BaseModel): body: dict[str, Any] | str | None = None headers: dict[str, str] | str | None = None query: dict[str, str] | str | None = None + report: ActionReport | None = None class Settings(BaseSettings): @@ -19,11 +35,12 @@ class Settings(BaseSettings): STREAMER_NAME: str PORT_ORG_ID: str + PORT_API_BASE_URL: AnyHttpUrl = parse_obj_as(AnyHttpUrl, "https://api.getport.io") + PORT_CLIENT_ID: str + PORT_CLIENT_SECRET: str KAFKA_CONSUMER_BROKERS: str = "localhost:9092" KAFKA_CONSUMER_SECURITY_PROTOCOL: str = "plaintext" KAFKA_CONSUMER_AUTHENTICATION_MECHANISM: str = "none" - KAFKA_CONSUMER_USERNAME: str = "local" - KAFKA_CONSUMER_PASSWORD: str = "" KAFKA_CONSUMER_SESSION_TIMEOUT_MS: int = 45000 KAFKA_CONSUMER_AUTO_OFFSET_RESET: str = "earliest" KAFKA_CONSUMER_GROUP_ID: str = "" diff --git a/app/invokers/webhook_invoker.py b/app/invokers/webhook_invoker.py index a0b2a88..7dc499d 100644 --- a/app/invokers/webhook_invoker.py +++ b/app/invokers/webhook_invoker.py @@ -1,6 +1,6 @@ +import json import logging -from dataclasses import dataclass -from typing import Any +from typing import Any, Callable import pyjq as jq import requests @@ -8,13 +8,16 @@ from core.consts import consts from flatten_dict import flatten, unflatten from invokers.base_invoker import BaseInvoker +from port_client import report_run_response, report_run_status, run_logger_factory +from pydantic import BaseModel, Field +from requests import Response +from utils import get_invocation_method_object, response_to_dict logging.basicConfig(level=settings.LOG_LEVEL) logger = logging.getLogger(__name__) -@dataclass -class RequestPayload: +class RequestPayload(BaseModel): method: str url: str body: dict @@ -22,6 +25,13 @@ class RequestPayload: query: dict +class ReportPayload(BaseModel): + status: Any | None = None + link: Any | None = None + summary: Any | None = None + external_run_id: Any | None = Field(None, alias="externalRunId") + + class WebhookInvoker(BaseInvoker): def _jq_exec(self, expression: str, context: dict) -> dict | None: try: @@ -43,16 +53,59 @@ def _apply_jq_on_field(self, mapping: dict[str, str] | str, body: dict) -> Any: else: return self._jq_exec(mapping, body) - def _prepare_payload(self, body: dict, invocation_method: dict) -> RequestPayload: + def _prepare_payload( + self, mapping: Mapping | None, body: dict, invocation_method: dict + ) -> RequestPayload: request_payload: RequestPayload = RequestPayload( - consts.DEFAULT_HTTP_METHOD, - invocation_method.get("url", ""), - body, - {}, - {}, + method=invocation_method.get("method", consts.DEFAULT_HTTP_METHOD), + url=invocation_method.get("url", ""), + body=body, + headers={}, + query={}, ) + if not mapping: + return request_payload + + raw_mapping: dict = mapping.dict(exclude_none=True) + raw_mapping.pop("enabled") + raw_mapping.pop("report", None) + for key, value in raw_mapping.items(): + result = self._apply_jq_on_field(value, body) + setattr(request_payload, key, result) - mapping: Mapping | None = next( + return request_payload + + def _prepare_report( + self, + mapping: Mapping | None, + response_context: Response, + request_context: dict, + body_context: dict, + ) -> ReportPayload: + default_status = ( + ("SUCCESS" if response_context.ok else "FAILURE") + if get_invocation_method_object(body_context).get("synchronized") + else None + ) + report_payload: ReportPayload = ReportPayload(status=default_status) + if not mapping or not mapping.report: + return report_payload + + context = { + "body": body_context, + "request": request_context, + "response": response_to_dict(response_context), + } + + raw_mapping: dict = mapping.report.dict(exclude_none=True) + for key, value in raw_mapping.items(): + result = self._apply_jq_on_field(value, context) + setattr(report_payload, key, result) + + return report_payload + + def _find_mapping(self, body: dict) -> Mapping: + return next( ( action_mapping for action_mapping in control_the_payload_config @@ -65,25 +118,18 @@ def _prepare_payload(self, body: dict, invocation_method: dict) -> RequestPayloa None, ) - if not mapping: - return request_payload - - raw_mapping: dict = mapping.dict(exclude_none=True) - raw_mapping.pop("enabled") - for key, value in raw_mapping.items(): - result = self._apply_jq_on_field(value, body) - setattr(request_payload, key, result) - - return request_payload - - def invoke(self, body: dict, invocation_method: dict) -> None: - logger.info("WebhookInvoker - start - destination: %s", invocation_method) - request_payload = self._prepare_payload(body, invocation_method) + @staticmethod + def _request( + request_payload: RequestPayload, run_logger: Callable[[str], None] + ) -> Response: logger.info( - "WebhookInvoker - request - " "method: %s, url: %s", + "WebhookInvoker - request - " "method: %s, url: %s, body: %s", request_payload.method, request_payload.url, + request_payload.body, ) + run_logger("Sending the request") + res = requests.request( request_payload.method, request_payload.url, @@ -92,12 +138,134 @@ def invoke(self, body: dict, invocation_method: dict) -> None: params=request_payload.query, timeout=settings.WEBHOOK_INVOKER_TIMEOUT, ) + + if res.ok: + logger.info( + "WebhookInvoker - request - status_code: %s", + res.status_code, + ) + run_logger( + f"The request was successful with status code: {res.status_code}" + ) + else: + logger.warning( + "WebhookInvoker - request - status_code: %s, response: %s", + res.status_code, + res.text, + ) + run_logger( + f"The request failed with status code: {res.status_code} " + f"and response: {res.text}" + ) + + return res + + @staticmethod + def _report_run_status( + run_id: str, data_to_patch: dict, run_logger: Callable[[str], None] + ) -> Response: + run_logger("Reporting the run status") + res = report_run_status(run_id, data_to_patch) + + if res.ok: + logger.info( + "WebhookInvoker - report run - run_id: %s, status_code: %s", + run_id, + res.status_code, + ) + run_logger("The run status was reported successfully") + else: + logger.warning( + "WebhookInvoker - report run - " + "run_id: %s, status_code: %s, response: %s", + run_id, + res.status_code, + res.text, + ) + run_logger( + f"The run status failed to be reported " + f"with status code: {res.status_code} and response: {res.text}" + ) + + return res + + @staticmethod + def _report_run_response( + run_id: str, response: Response, run_logger: Callable[[str], None] + ) -> Response: + logger.info( + "WebhookInvoker - report run response - run_id: %s, response: %s", + run_id, + response, + ) + run_logger("Reporting the run response") + + try: + response_value = response.json() + except json.JSONDecodeError: + response_value = response.text + + res = report_run_response(run_id, response_value) + + if res.ok: + logger.info( + "WebhookInvoker - report run response - " "run_id: %s, status_code: %s", + run_id, + res.status_code, + ) + run_logger("The run response was reported successfully ") + else: + logger.warning( + "WebhookInvoker - report run response - " + "run_id: %s, status_code: %s, response: %s", + run_id, + res.status_code, + res.text, + ) + run_logger( + f"The run response failed to be reported " + f"with status code: {res.status_code} and response: {res.text}" + ) + + return res + + def invoke(self, body: dict, invocation_method: dict) -> None: + run_id = body["context"]["runId"] + run_logger = run_logger_factory(run_id) + + run_logger("An action message has been received") + logger.info("WebhookInvoker - start - destination: %s", invocation_method) + mapping = self._find_mapping(body) + logger.info( - "WebhookInvoker - done - destination: %s, status code: %s", - invocation_method, - res.status_code, + "WebhookInvoker - mapping - mapping: %s", + mapping.dict() if mapping else None, + ) + run_logger("Preparing the payload for the request") + request_payload = self._prepare_payload(mapping, body, invocation_method) + res = self._request(request_payload, run_logger) + + if invocation_method.get("synchronized"): + self._report_run_response(run_id, res, run_logger) + + report_payload = self._prepare_report( + mapping, res, request_payload.dict(), body ) + if report_dict := report_payload.dict(exclude_none=True, by_alias=True): + logger.info( + "WebhookInvoker - report mapping - report_payload: %s", + report_payload.dict(exclude_none=True, by_alias=True), + ) + self._report_run_status(run_id, report_dict, run_logger) + else: + logger.info( + "WebhookInvoker - report mapping " + "- no report mapping found - run_id: %s", + run_id, + ) + res.raise_for_status() + run_logger("Finished processing the action") webhook_invoker = WebhookInvoker() diff --git a/app/port_client.py b/app/port_client.py new file mode 100644 index 0000000..66eedb6 --- /dev/null +++ b/app/port_client.py @@ -0,0 +1,76 @@ +from logging import getLogger +from typing import Callable + +import requests +from core.config import settings +from requests import Response + +logger = getLogger(__name__) + + +def get_port_api_headers() -> dict[str, str]: + credentials = { + "clientId": settings.PORT_CLIENT_ID, + "clientSecret": settings.PORT_CLIENT_SECRET, + } + + token_response = requests.post( + f"{settings.PORT_API_BASE_URL}/v1/auth/access_token", json=credentials + ) + + if not token_response.ok: + logger.error( + f"Failed to get Port API access token - " + f"status: {token_response.status_code}, " + f"response: {token_response.text}" + ) + + token_response.raise_for_status() + + return { + "Authorization": f"Bearer {token_response.json()['accessToken']}", + "User-Agent": "port-agent", + } + + +def run_logger_factory(run_id: str) -> Callable[[str], None]: + def send_run_log(message: str) -> None: + headers = get_port_api_headers() + + requests.post( + f"{settings.PORT_API_BASE_URL}/v1/actions/runs/{run_id}/logs", + json={"message": message}, + headers=headers, + ) + + return send_run_log + + +def report_run_status(run_id: str, data_to_patch: dict) -> Response: + headers = get_port_api_headers() + res = requests.patch( + f"{settings.PORT_API_BASE_URL}/v1/actions/runs/{run_id}", + json=data_to_patch, + headers=headers, + ) + return res + + +def report_run_response(run_id: str, response: dict | str) -> Response: + headers = get_port_api_headers() + res = requests.patch( + f"{settings.PORT_API_BASE_URL}/v1/actions/runs/{run_id}/response", + json={"response": response}, + headers=headers, + ) + return res + + +def get_kafka_credentials() -> tuple[str, str]: + headers = get_port_api_headers() + res = requests.get( + f"{settings.PORT_API_BASE_URL}/v1/kafka-credentials", headers=headers + ) + res.raise_for_status() + data = res.json()["credentials"] + return data["username"], data["password"] diff --git a/app/utils.py b/app/utils.py new file mode 100644 index 0000000..614f61d --- /dev/null +++ b/app/utils.py @@ -0,0 +1,22 @@ +from requests import Response + + +def response_to_dict(response: Response) -> dict: + response_dict = { + "statusCode": response.status_code, + "headers": dict(response.headers), + "text": response.text, + "json": None, + } + + if response.ok: + try: + response_dict["json"] = response.json() + except ValueError: + pass + + return response_dict + + +def get_invocation_method_object(body: dict) -> dict: + return body.get("payload", {}).get("action", {}).get("invocationMethod", {}) diff --git a/scripts/test.sh b/scripts/test.sh index d5dfbfc..4ce9930 100755 --- a/scripts/test.sh +++ b/scripts/test.sh @@ -3,4 +3,4 @@ set -e set -x -cd ./app && PYTHONPATH=./ STREAMER_NAME=test PORT_ORG_ID=test_org pytest --cov=./ --cov-report=term-missing ../tests "${@}" +cd ./app && PYTHONPATH=./ STREAMER_NAME=test PORT_ORG_ID=test_org PORT_CLIENT_ID=test PORT_CLIENT_SECRET=test pytest --cov=./ --cov-report=term-missing ../tests "${@}" diff --git a/tests/unit/processors/kafka/conftest.py b/tests/unit/processors/kafka/conftest.py index 340c39f..5bb0182 100644 --- a/tests/unit/processors/kafka/conftest.py +++ b/tests/unit/processors/kafka/conftest.py @@ -3,6 +3,7 @@ from signal import SIGINT from typing import Any, Callable, Generator, Optional +import port_client import pytest import requests from _pytest.monkeypatch import MonkeyPatch @@ -17,6 +18,13 @@ class MockResponse: status_code = request.param.get("status_code") text = "Invoker failed with status code: %d" % status_code + def json(self) -> dict: + return request.param.get("json") + + @property + def ok(self) -> bool: + return 200 <= self.status_code <= 299 + def raise_for_status(self) -> None: if 400 <= self.status_code <= 599: raise Exception(self.text) @@ -24,6 +32,7 @@ def raise_for_status(self) -> None: def mock_request(*args: Any, **kwargs: Any) -> MockResponse: return MockResponse() + monkeypatch.setattr(port_client, "get_port_api_headers", lambda *args: {}) monkeypatch.setattr(requests, "request", mock_request) monkeypatch.setattr(requests, "get", mock_request) monkeypatch.setattr(requests, "post", mock_request) @@ -208,73 +217,6 @@ def get_run_message(invocation_method: dict) -> bytes: return get_run_message -@pytest.fixture(scope="module") -def mock_gitlab_run_message() -> Callable[[dict], bytes]: - run_message: dict = { - "action": "Create", - "resourceType": "run", - "status": "TRIGGERED", - "trigger": { - "by": {"orgId": "test_org", "userId": "test_user"}, - "origin": "UI", - "at": "2022-11-16T16:31:32.447Z", - }, - "context": { - "entity": None, - "blueprint": "Service", - "runId": "r_jE5FhDURh4Uen2Qr", - }, - "payload": { - "entity": None, - "action": { - "id": "action_34aweFQtayw7SCVb", - "identifier": "Create", - "title": "Create", - "icon": "DefaultBlueprint", - "userInputs": { - "properties": { - "foo": {"type": "string", "description": "Description"}, - "bar": {"type": "number", "description": "Description"}, - }, - "required": [], - }, - "invocationMethod": { - "type": "GITLAB", - "agent": True, - "defaultRef": "main", - "projectName": "project", - "groupName": "group", - }, - "trigger": "CREATE", - "description": "", - "blueprint": "Service", - "createdAt": "2022-11-15T09:58:52.863Z", - "createdBy": "test_user", - "updatedAt": "2022-11-15T09:58:52.863Z", - "updatedBy": "test_user", - }, - "properties": {}, - }, - } - - def get_gitlab_run_message(invocation_method: dict) -> bytes: - if invocation_method is not None: - run_message["payload"]["action"]["invocationMethod"] = invocation_method - return json.dumps(run_message).encode() - - return get_gitlab_run_message - - -@pytest.fixture -def mock_gitlab_token(monkeypatch: MonkeyPatch) -> None: - monkeypatch.setenv("group_project", "token") - - -@pytest.fixture -def mock_gitlab_token_subgroup(monkeypatch: MonkeyPatch) -> None: - monkeypatch.setenv("group_subgroup_sub2_project", "token") - - @pytest.fixture() def mock_control_the_payload_config(monkeypatch: MonkeyPatch) -> list[dict[str, Any]]: mapping = [ @@ -293,6 +235,7 @@ def mock_control_the_payload_config(monkeypatch: MonkeyPatch) -> list[dict[str, "MY-HEADER": ".payload.action.identifier", }, "query": {}, + "report": {"link": '"http://test.com"'}, }, ] control_the_payload_config = parse_obj_as(list[Mapping], mapping) diff --git a/tests/unit/processors/kafka/test_kafka_to_webhook_processor.py b/tests/unit/processors/kafka/test_kafka_to_webhook_processor.py index eef18ca..7a8a059 100644 --- a/tests/unit/processors/kafka/test_kafka_to_webhook_processor.py +++ b/tests/unit/processors/kafka/test_kafka_to_webhook_processor.py @@ -1,6 +1,6 @@ from threading import Timer from unittest import mock -from unittest.mock import ANY +from unittest.mock import ANY, call import pytest from _pytest.monkeypatch import MonkeyPatch @@ -56,7 +56,11 @@ def test_single_stream_failed(mock_requests: None, mock_kafka: None) -> None: ) -@pytest.mark.parametrize("mock_requests", [{"status_code": 200}], indirect=True) +@pytest.mark.parametrize( + "mock_requests", + [{"status_code": 200}], + indirect=True, +) @pytest.mark.parametrize( "mock_kafka", [ @@ -79,6 +83,72 @@ def test_single_stream_success_control_the_payload( expected_query: dict[str, ANY] = {} Timer(0.01, terminate_consumer).start() request_mock = mocker.patch("requests.request") + request_mock.return_value.headers = {} + request_mock.return_value.text = "test" + request_mock.return_value.status_code = 200 + request_mock.return_value.json.return_value = {} + request_patch_mock = mocker.patch("requests.patch") + mocker.patch("pathlib.Path.is_file", side_effect=(True,)) + + with mock.patch.object(consumer_logger, "error") as mock_error: + streamer = KafkaStreamer(Consumer()) + streamer.stream() + request_mock.assert_called_once_with( + "POST", + ANY, + json=expected_body, + headers=expected_headers, + params=expected_query, + timeout=settings.WEBHOOK_INVOKER_TIMEOUT, + ) + + request_patch_mock.assert_called_once_with( + f"{settings.PORT_API_BASE_URL}/v1/actions/runs/" + f"{webhook_run_payload['context']['runId']}", + headers={}, + json={"link": "http://test.com"}, + ) + + mock_error.assert_not_called() + + +@pytest.mark.parametrize( + "mock_requests", + [{"status_code": 200}], + indirect=True, +) +@pytest.mark.parametrize( + "mock_kafka", + [ + ( + "mock_webhook_run_message", + { + "type": "WEBHOOK", + "agent": True, + "url": "http://localhost:80/api/test", + "synchronized": True, + }, + settings.KAFKA_RUNS_TOPIC, + ), + ], + indirect=True, +) +def test_invocation_method_synchronized( + monkeypatch: MonkeyPatch, + mocker: MockFixture, + mock_requests: None, + mock_kafka: None, + mock_control_the_payload_config: list[Mapping], + webhook_run_payload: dict, +) -> None: + expected_body = webhook_run_payload + expected_headers = { + "MY-HEADER": webhook_run_payload["payload"]["action"]["identifier"] + } + expected_query: dict[str, ANY] = {} + Timer(0.01, terminate_consumer).start() + request_mock = mocker.patch("requests.request") + request_patch_mock = mocker.patch("requests.patch") mocker.patch("pathlib.Path.is_file", side_effect=(True,)) with mock.patch.object(consumer_logger, "error") as mock_error: @@ -92,5 +162,72 @@ def test_single_stream_success_control_the_payload( params=expected_query, timeout=settings.WEBHOOK_INVOKER_TIMEOUT, ) + request_patch_mock.assert_has_calls( + calls=[ + call( + f"{settings.PORT_API_BASE_URL}/v1/actions/runs/" + f"{webhook_run_payload['context']['runId']}/response", + json=ANY, + headers={}, + ), + call().ok.__bool__(), + call( + f"{settings.PORT_API_BASE_URL}/v1/actions/runs/" + f"{webhook_run_payload['context']['runId']}", + json={"status": "SUCCESS"}, + headers={}, + ), + call().ok.__bool__(), + ] + ) + + mock_error.assert_not_called() + + +@pytest.mark.parametrize("mock_requests", [{"status_code": 200}], indirect=True) +@pytest.mark.parametrize( + "mock_kafka", + [ + ( + "mock_webhook_run_message", + { + "type": "WEBHOOK", + "agent": True, + "url": "http://localhost:80/api/test", + "method": "GET", + }, + settings.KAFKA_RUNS_TOPIC, + ), + ], + indirect=True, +) +def test_invocation_method_method_override( + monkeypatch: MonkeyPatch, + mocker: MockFixture, + mock_requests: None, + mock_kafka: None, + mock_control_the_payload_config: list[Mapping], + webhook_run_payload: dict, +) -> None: + expected_body = webhook_run_payload + expected_headers = { + "MY-HEADER": webhook_run_payload["payload"]["action"]["identifier"] + } + expected_query: dict[str, ANY] = {} + Timer(0.01, terminate_consumer).start() + request_mock = mocker.patch("requests.request") + mocker.patch("pathlib.Path.is_file", side_effect=(True,)) + + with mock.patch.object(consumer_logger, "error") as mock_error: + streamer = KafkaStreamer(Consumer()) + streamer.stream() + request_mock.assert_called_once_with( + "GET", + ANY, + json=expected_body, + headers=expected_headers, + params=expected_query, + timeout=settings.WEBHOOK_INVOKER_TIMEOUT, + ) mock_error.assert_not_called() diff --git a/tests/unit/streamers/kafka/conftest.py b/tests/unit/streamers/kafka/conftest.py index 36f7185..707d3c1 100644 --- a/tests/unit/streamers/kafka/conftest.py +++ b/tests/unit/streamers/kafka/conftest.py @@ -3,6 +3,7 @@ from signal import SIGINT from typing import Any, Callable, Generator, Optional +import port_client import pytest import requests from _pytest.monkeypatch import MonkeyPatch @@ -15,6 +16,13 @@ class MockResponse: status_code = request.param.get("status_code") text = "Invoker failed with status code: %d" % status_code + def json(self) -> dict: + return request.param.get("json") + + @property + def ok(self) -> bool: + return 200 <= self.status_code <= 299 + def raise_for_status(self) -> None: if 400 <= self.status_code <= 599: raise Exception(self.text) @@ -22,6 +30,7 @@ def raise_for_status(self) -> None: def mock_request(*args: Any, **kwargs: Any) -> MockResponse: return MockResponse() + monkeypatch.setattr(port_client, "get_port_api_headers", lambda *args: {}) monkeypatch.setattr(requests, "request", mock_request) monkeypatch.setattr(requests, "get", mock_request) monkeypatch.setattr(requests, "post", mock_request)