diff --git a/app/consumers/kafka_consumer.py b/app/consumers/kafka_consumer.py index afd0683..f802920 100644 --- a/app/consumers/kafka_consumer.py +++ b/app/consumers/kafka_consumer.py @@ -26,7 +26,6 @@ def __init__( self.consumer = consumer else: conf = { - "bootstrap.servers": settings.KAFKA_CONSUMER_BROKERS, "client.id": consts.KAFKA_CONSUMER_CLIENT_ID, "security.protocol": settings.KAFKA_CONSUMER_SECURITY_PROTOCOL, "sasl.mechanism": settings.KAFKA_CONSUMER_AUTHENTICATION_MECHANISM, @@ -37,9 +36,10 @@ def __init__( } if not settings.USING_LOCAL_PORT_INSTANCE: logger.info("Getting Kafka credentials") - username, password = get_kafka_credentials() + brokers, username, password = get_kafka_credentials() conf["sasl.username"] = username conf["sasl.password"] = password + conf["bootstrap.servers"] = ','.join(brokers) self.consumer = Consumer(conf) diff --git a/app/core/config.py b/app/core/config.py index 377d287..2385265 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -40,7 +40,6 @@ class Settings(BaseSettings): PORT_API_BASE_URL: AnyHttpUrl = parse_obj_as(AnyHttpUrl, "https://api.getport.io") PORT_CLIENT_ID: str PORT_CLIENT_SECRET: str - KAFKA_CONSUMER_BROKERS: str = "localhost:9092" KAFKA_CONSUMER_SECURITY_PROTOCOL: str = "plaintext" KAFKA_CONSUMER_AUTHENTICATION_MECHANISM: str = "none" KAFKA_CONSUMER_SESSION_TIMEOUT_MS: int = 45000 diff --git a/app/port_client.py b/app/port_client.py index 1496b8b..d196f32 100644 --- a/app/port_client.py +++ b/app/port_client.py @@ -66,11 +66,11 @@ def report_run_response(run_id: str, response: dict | str | None) -> Response: return res -def get_kafka_credentials() -> tuple[str, str]: +def get_kafka_credentials() -> tuple[list[str], str, str]: headers = get_port_api_headers() res = requests.get( f"{settings.PORT_API_BASE_URL}/v1/kafka-credentials", headers=headers ) res.raise_for_status() data = res.json()["credentials"] - return data["username"], data["password"] + return data['brokers'], data["username"], data["password"]