Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

API: refactor webapp to make use of FastAPI dependecy injection #213

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions .github/workflows/kind.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,13 @@ jobs:

for secret_name in strict-secret namespace-secret cluster-secret; do
echo -n "Testing ${secret_name} "
test "$(kubectl get secret "${secret_name}" -n e2e \
-o go-template --template '{{ index .data "a-secret" }}')" = "YQ==" ||
{ echo ERR; exit 1; } &&
echo OK
secret_value=$(kubectl get secret "${secret_name}" -n e2e \
-o go-template --template '{{ index .data "a-secret" }}')
if test "$secret_value" = "YQ=="; then
echo OK
else
echo ERR
echo "Unexpected secret value '${secret_value}'" 1>&2
exit 1
fi
done
1 change: 1 addition & 0 deletions Dockerfile.api
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ ENV UVICORN_PORT=5000 \
UVICORN_HOST=0.0.0.0 \
UVICORN_NO_DATE_HEADER=1 \
UVICORN_NO_SERVER_HEADER=1 \
KUBESEAL_CERT=/tmp/cert.pem \
KUBESEAL_BINARY=/tmp/kubeseal

WORKDIR ${APP_PATH}
Expand Down
8 changes: 6 additions & 2 deletions api/kubeseal_webgui_api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
import fastapi
from fastapi.middleware.cors import CORSMiddleware

from .app_config import fetch_sealed_secrets_cert
from .app_config import settings
from .dependencies import get_kubeseal_client
from .routers import config, kubernetes, kubeseal

LOGGER = logging.getLogger("kubeseal-webgui")
Expand Down Expand Up @@ -36,7 +37,10 @@
@app.on_event("startup")
def startup_event():
LOGGER.info("Running startup tasks...")
fetch_sealed_secrets_cert()
if settings.kubeseal_autofetch:
with open(settings.kubeseal_cert, "w") as file:
LOGGER.info("Saving certificate in '%s'", settings.kubeseal_cert)

Check failure

Code scanning / CodeQL

Clear-text logging of sensitive information

This expression logs [sensitive data (certificate)](1) as clear text.
file.write(get_kubeseal_client().get_certificate())
Fixed Show fixed Hide fixed
LOGGER.info("Startup tasks complete.")


Expand Down
93 changes: 17 additions & 76 deletions api/kubeseal_webgui_api/app_config.py
Original file line number Diff line number Diff line change
@@ -1,85 +1,26 @@
import logging
import subprocess
from os import environ
from pathlib import Path

from pydantic import BaseSettings

binary = environ.get("KUBESEAL_BINARY", "/bin/false")
mock = environ.get("MOCK_ENABLED", "False").lower() == "true"
autofetch = environ.get("KUBESEAL_AUTOFETCH", "false")
kubeseal_cert = environ.get("KUBESEAL_CERT", "/kubeseal-webgui/cert/kubeseal-cert.pem")


class AppSettings(BaseSettings):
kubeseal_version: str
kubeseal_binary: str = binary
kubeseal_cert: str = environ.get("KUBESEAL_CERT", "/dev/null")
mock_enabled: bool = mock
kubeseal_binary: Path = Path("/bin/false")
kubeseal_cert: Path = Path("/dev/null")
kubeseal_autofetch: bool = False
mock_enabled: bool = False
sealed_secrets_namespace: str = "sealed-secrets"
sealed_secrets_controller_name: str = "sealed-secrets-controller"
mock_namespace_count: int = 120


LOGGER = logging.getLogger("kubeseal-webgui")


def fetch_sealed_secrets_cert():
if mock or autofetch == "false":
return

sealed_secrets_namespace = environ.get(
"KUBESEAL_CONTROLLER_NAMESPACE", "sealed-secrets"
)
sealed_secrets_controller_name = environ.get(
"KUBESEAL_CONTROLLER_NAME", "sealed-secrets-controller"
)

LOGGER.info(
"Fetch certificate from sealed secrets controller '%s' in namespace '%s'",
sealed_secrets_controller_name,
sealed_secrets_namespace,
)
kubeseal_subprocess = subprocess.Popen(
[
binary,
"--fetch-cert",
"--controller-name",
sealed_secrets_controller_name,
"--controller-namespace",
sealed_secrets_namespace,
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
encoding="utf-8",
)
output, error = kubeseal_subprocess.communicate()
if error:
error_message = f"Error in run_kubeseal: {error}"
LOGGER.error(error_message)
raise RuntimeError(error_message)
with open(kubeseal_cert, "w") as file:
LOGGER.info("Saving certificate in '%s'", kubeseal_cert)
file.write(output)


def get_kubeseal_version() -> str:
"""Retrieve the kubeseal binary version."""
LOGGER.debug("Retrieving kubeseal binary version.")
kubeseal_subprocess = subprocess.Popen(
[binary, "--version"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
encoding="utf-8",
)
output, error = kubeseal_subprocess.communicate()
if error:
error_message = f"Error in run_kubeseal: {error}"
LOGGER.error(error_message)
raise RuntimeError(error_message)

version = "".join(output.split("\n"))

return str(version).split(":")[1].replace('"', "").lstrip()
class Config:
fields = {
"sealed_secrets_namespace": {
"env": ["sealed_secrets_namespace", "KUBESEAL_CONTROLLER_NAMESPACE"],
},
"sealed_secrets_controller_name": {
"env": ["sealed_secrets_controller_name", "KUBESEAL_CONTROLLER_NAME"],
},
}


settings = AppSettings(
kubeseal_version=get_kubeseal_version(),
)
settings = AppSettings()
26 changes: 26 additions & 0 deletions api/kubeseal_webgui_api/dependencies.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from kubeseal_webgui_api.app_config import settings
from kubeseal_webgui_api.internal import (
KubernetesClient,
KubesealClient,
MockKubernetesClient,
MockKubesealClient,
)


def get_kubeseal_client() -> KubesealClient:
if settings.mock_enabled:
return MockKubesealClient()

return KubesealClient(
cert=settings.kubeseal_cert,
binary=settings.kubeseal_binary,
namespace=settings.sealed_secrets_namespace,
controller=settings.sealed_secrets_controller_name,
)


def get_kubernetes_client() -> KubernetesClient:
if settings.mock_enabled:
return MockKubernetesClient(namespace_count=settings.mock_namespace_count)

return KubernetesClient()
4 changes: 4 additions & 0 deletions api/kubeseal_webgui_api/internal/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# flake8: noqa

from .kubernetes_client import KubernetesClient, MockKubernetesClient
from .kubeseal_client import DEFAULT_KUBESEAL_CERT, DEFAULT_KUBESEAL_BINARY, KubesealClient, MockKubesealClient
129 changes: 129 additions & 0 deletions api/kubeseal_webgui_api/internal/kubernetes_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
import itertools
import logging
import random
from typing import List, Optional

from kubernetes import config
from kubernetes.client import ApiClient, CoreV1Api, V1NamespaceList

LOGGER = logging.getLogger("kubeseal-webgui")


class KubernetesClient:
def __init__(self, api_client: Optional[ApiClient] = None):
self._api_client = api_client

def get_namespaces(self) -> List[str]:
"""Retrieve a list of namespaces from current kubernetes cluster."""
namespaces_list = []

LOGGER.info("Resolving in-cluster Namespaces")
v1 = CoreV1Api(api_client=self.api_client())
namespaces = v1.list_namespace()
if isinstance(namespaces, V1NamespaceList) and namespaces.items:
for ns in namespaces.items:
namespaces_list.append(ns.metadata.name)
else:
LOGGER.warning("No valid namespace list available via %s", namespaces)

LOGGER.debug("Namespaces list %s", namespaces_list)
return namespaces_list

def api_client(self) -> ApiClient:
if self._api_client is not None:
return self._api_client

config.load_incluster_config()
return ApiClient()


adjectives = [
"altered",
"angry",
"big",
"blinking",
"boring",
"broken",
"bubbling",
"calculating",
"cute",
"diffing",
"expensive",
"fresh",
"fierce",
"floating",
"generous",
"golden",
"green",
"growing",
"hidden",
"hideous",
"interesting",
"kubed",
"mumbling",
"rusty",
"singing",
"small",
"sniffing",
"squared",
"talking",
"trusty",
"wise",
"walking",
"zooming",
]
nouns = [
"ant",
"bike",
"bird",
"captain",
"cheese",
"clock",
"digit",
"gorilla",
"kraken",
"number",
"maven",
"monitor",
"moose",
"moon",
"mouse",
"news",
"newt",
"octopus",
"opossum",
"otter",
"paper",
"passenger",
"potato",
"ship",
"spaceship",
"spaghetti",
"spoon",
"store",
"tomcat",
"trombone",
"unicorn",
"vine",
"whale",
]


class MockKubernetesClient(KubernetesClient):
def __init__(self, namespace_count: int = 50):
super().__init__()

self.namespace_count = namespace_count

def get_namespaces(self) -> List[str]:
"""Generate a list of namespaces."""
LOGGER.debug("Generating %d namespaces", self.namespace_count)

return sorted(
{
"-".join(words)
for words in random.choices( # noqa: S311 no security needed here
list(itertools.product(adjectives, nouns)), k=self.namespace_count
)
}
)
Loading