From eb7f9cce77add700246796d8884930623648ce36 Mon Sep 17 00:00:00 2001 From: yair Date: Tue, 21 Nov 2023 15:59:30 +0200 Subject: [PATCH] getting kafka creds from the api --- README.md | 8 ++++---- app/consumers/kafka_consumer.py | 7 +++++-- app/core/config.py | 2 -- app/port_client.py | 10 ++++++++++ .../kafka/test_kafka_to_webhook_processor.py | 2 +- 5 files changed, 20 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index 230c540..092e0b8 100644 --- a/README.md +++ b/README.md @@ -265,10 +265,10 @@ helm repo update helm install my-port-agent port-labs/port-agent \ --create-namespace --namespace port-agent \ + --set env.normal=YOUR_PORT_CLIENT_ID \ + --set env.secret.PORT_CLIENT_SECRET=YOUR_PORT_CLIENT_SECRET \ --set env.normal.PORT_ORG_ID=YOUR_ORG_ID \ --set env.normal.KAFKA_CONSUMER_GROUP_ID=YOUR_KAFKA_CONSUMER_GROUP \ - --set env.secret.KAFKA_CONSUMER_USERNAME=YOUR_KAFKA_USERNAME \ - --set env.secret.KAFKA_CONSUMER_PASSWORD=YOUR_KAFKA_PASSWORD \ --set env.normal.KAFKA_CONSUMER_BROKERS=PORT_KAFKA_BROKERS \ --set env.normal.STREAMER_NAME=KAFKA \ --set env.normal.KAFKA_CONSUMER_AUTHENTICATION_MECHANISM=SCRAM-SHA-512 \ @@ -365,10 +365,10 @@ helm repo update helm install my-port-agent port-labs/port-agent \ --create-namespace --namespace port-agent \ + --set env.normal=YOUR_PORT_CLIENT_ID \ + --set env.secret.PORT_CLIENT_SECRET=YOUR_PORT_CLIENT_SECRET \ --set env.normal.PORT_ORG_ID=YOUR_ORG_ID \ --set env.normal.KAFKA_CONSUMER_GROUP_ID=YOUR_KAFKA_CONSUMER_GROUP \ - --set env.secret.KAFKA_CONSUMER_USERNAME=YOUR_KAFKA_USERNAME \ - --set env.secret.KAFKA_CONSUMER_PASSWORD=YOUR_KAFKA_PASSWORD --set env.normal.KAFKA_CONSUMER_BROKERS=PORT_KAFKA_BROKERS \ --set env.normal.STREAMER_NAME=KAFKA \ --set env.normal.KAFKA_CONSUMER_AUTHENTICATION_MECHANISM=SCRAM-SHA-512 \ diff --git a/app/consumers/kafka_consumer.py b/app/consumers/kafka_consumer.py index ecc2bc6..978dfd2 100644 --- a/app/consumers/kafka_consumer.py +++ b/app/consumers/kafka_consumer.py @@ -6,6 +6,7 @@ from consumers.base_consumer import BaseConsumer from core.config import settings from core.consts import consts +from port_client import get_kafka_credentials logging.basicConfig(level=settings.LOG_LEVEL) logger = logging.getLogger(__name__) @@ -24,13 +25,15 @@ def __init__( if consumer: self.consumer = consumer else: + logger.info("Getting Kafka credentials") + username, password = get_kafka_credentials() conf = { "bootstrap.servers": settings.KAFKA_CONSUMER_BROKERS, "client.id": consts.KAFKA_CONSUMER_CLIENT_ID, "security.protocol": settings.KAFKA_CONSUMER_SECURITY_PROTOCOL, "sasl.mechanism": settings.KAFKA_CONSUMER_AUTHENTICATION_MECHANISM, - "sasl.username": settings.KAFKA_CONSUMER_USERNAME, - "sasl.password": settings.KAFKA_CONSUMER_PASSWORD, + "sasl.username": username, + "sasl.password": password, "group.id": settings.KAFKA_CONSUMER_GROUP_ID, "session.timeout.ms": settings.KAFKA_CONSUMER_SESSION_TIMEOUT_MS, "auto.offset.reset": settings.KAFKA_CONSUMER_AUTO_OFFSET_RESET, diff --git a/app/core/config.py b/app/core/config.py index fc138e9..2abf4e2 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -41,8 +41,6 @@ class Settings(BaseSettings): KAFKA_CONSUMER_BROKERS: str = "localhost:9092" KAFKA_CONSUMER_SECURITY_PROTOCOL: str = "plaintext" KAFKA_CONSUMER_AUTHENTICATION_MECHANISM: str = "none" - KAFKA_CONSUMER_USERNAME: str = "local" - KAFKA_CONSUMER_PASSWORD: str = "" KAFKA_CONSUMER_SESSION_TIMEOUT_MS: int = 45000 KAFKA_CONSUMER_AUTO_OFFSET_RESET: str = "earliest" KAFKA_CONSUMER_GROUP_ID: str = "" diff --git a/app/port_client.py b/app/port_client.py index 423937c..66eedb6 100644 --- a/app/port_client.py +++ b/app/port_client.py @@ -64,3 +64,13 @@ def report_run_response(run_id: str, response: dict | str) -> Response: headers=headers, ) return res + + +def get_kafka_credentials() -> tuple[str, str]: + headers = get_port_api_headers() + res = requests.get( + f"{settings.PORT_API_BASE_URL}/v1/kafka-credentials", headers=headers + ) + res.raise_for_status() + data = res.json()["credentials"] + return data["username"], data["password"] diff --git a/tests/unit/processors/kafka/test_kafka_to_webhook_processor.py b/tests/unit/processors/kafka/test_kafka_to_webhook_processor.py index 391f1ea..7a8a059 100644 --- a/tests/unit/processors/kafka/test_kafka_to_webhook_processor.py +++ b/tests/unit/processors/kafka/test_kafka_to_webhook_processor.py @@ -177,7 +177,7 @@ def test_invocation_method_synchronized( json={"status": "SUCCESS"}, headers={}, ), - call().ok.__bool__() + call().ok.__bool__(), ] )