Skip to content

Commit

Permalink
getting kafka creds from the api
Browse files Browse the repository at this point in the history
  • Loading branch information
yairsimantov20 committed Nov 21, 2023
1 parent 087bd44 commit eb7f9cc
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 9 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -265,10 +265,10 @@ helm repo update

helm install my-port-agent port-labs/port-agent \
--create-namespace --namespace port-agent \
--set env.normal=YOUR_PORT_CLIENT_ID \
--set env.secret.PORT_CLIENT_SECRET=YOUR_PORT_CLIENT_SECRET \
--set env.normal.PORT_ORG_ID=YOUR_ORG_ID \
--set env.normal.KAFKA_CONSUMER_GROUP_ID=YOUR_KAFKA_CONSUMER_GROUP \
--set env.secret.KAFKA_CONSUMER_USERNAME=YOUR_KAFKA_USERNAME \
--set env.secret.KAFKA_CONSUMER_PASSWORD=YOUR_KAFKA_PASSWORD \
--set env.normal.KAFKA_CONSUMER_BROKERS=PORT_KAFKA_BROKERS \
--set env.normal.STREAMER_NAME=KAFKA \
--set env.normal.KAFKA_CONSUMER_AUTHENTICATION_MECHANISM=SCRAM-SHA-512 \
Expand Down Expand Up @@ -365,10 +365,10 @@ helm repo update

helm install my-port-agent port-labs/port-agent \
--create-namespace --namespace port-agent \
--set env.normal=YOUR_PORT_CLIENT_ID \
--set env.secret.PORT_CLIENT_SECRET=YOUR_PORT_CLIENT_SECRET \
--set env.normal.PORT_ORG_ID=YOUR_ORG_ID \
--set env.normal.KAFKA_CONSUMER_GROUP_ID=YOUR_KAFKA_CONSUMER_GROUP \
--set env.secret.KAFKA_CONSUMER_USERNAME=YOUR_KAFKA_USERNAME \
--set env.secret.KAFKA_CONSUMER_PASSWORD=YOUR_KAFKA_PASSWORD
--set env.normal.KAFKA_CONSUMER_BROKERS=PORT_KAFKA_BROKERS \
--set env.normal.STREAMER_NAME=KAFKA \
--set env.normal.KAFKA_CONSUMER_AUTHENTICATION_MECHANISM=SCRAM-SHA-512 \
Expand Down
7 changes: 5 additions & 2 deletions app/consumers/kafka_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from consumers.base_consumer import BaseConsumer
from core.config import settings
from core.consts import consts
from port_client import get_kafka_credentials

logging.basicConfig(level=settings.LOG_LEVEL)
logger = logging.getLogger(__name__)
Expand All @@ -24,13 +25,15 @@ def __init__(
if consumer:
self.consumer = consumer
else:
logger.info("Getting Kafka credentials")
username, password = get_kafka_credentials()
conf = {
"bootstrap.servers": settings.KAFKA_CONSUMER_BROKERS,
"client.id": consts.KAFKA_CONSUMER_CLIENT_ID,
"security.protocol": settings.KAFKA_CONSUMER_SECURITY_PROTOCOL,
"sasl.mechanism": settings.KAFKA_CONSUMER_AUTHENTICATION_MECHANISM,
"sasl.username": settings.KAFKA_CONSUMER_USERNAME,
"sasl.password": settings.KAFKA_CONSUMER_PASSWORD,
"sasl.username": username,
"sasl.password": password,
"group.id": settings.KAFKA_CONSUMER_GROUP_ID,
"session.timeout.ms": settings.KAFKA_CONSUMER_SESSION_TIMEOUT_MS,
"auto.offset.reset": settings.KAFKA_CONSUMER_AUTO_OFFSET_RESET,
Expand Down
2 changes: 0 additions & 2 deletions app/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@ class Settings(BaseSettings):
KAFKA_CONSUMER_BROKERS: str = "localhost:9092"
KAFKA_CONSUMER_SECURITY_PROTOCOL: str = "plaintext"
KAFKA_CONSUMER_AUTHENTICATION_MECHANISM: str = "none"
KAFKA_CONSUMER_USERNAME: str = "local"
KAFKA_CONSUMER_PASSWORD: str = ""
KAFKA_CONSUMER_SESSION_TIMEOUT_MS: int = 45000
KAFKA_CONSUMER_AUTO_OFFSET_RESET: str = "earliest"
KAFKA_CONSUMER_GROUP_ID: str = ""
Expand Down
10 changes: 10 additions & 0 deletions app/port_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,13 @@ def report_run_response(run_id: str, response: dict | str) -> Response:
headers=headers,
)
return res


def get_kafka_credentials() -> tuple[str, str]:
headers = get_port_api_headers()
res = requests.get(
f"{settings.PORT_API_BASE_URL}/v1/kafka-credentials", headers=headers
)
res.raise_for_status()
data = res.json()["credentials"]
return data["username"], data["password"]
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ def test_invocation_method_synchronized(
json={"status": "SUCCESS"},
headers={},
),
call().ok.__bool__()
call().ok.__bool__(),
]
)

Expand Down

0 comments on commit eb7f9cc

Please sign in to comment.