diff --git a/kapitan/refs/vault_resources.py b/kapitan/refs/vault_resources.py index 47f64cc68..6720d9cc9 100644 --- a/kapitan/refs/vault_resources.py +++ b/kapitan/refs/vault_resources.py @@ -26,9 +26,8 @@ class VaultClient(hvac.Client): def __init__(self, vault_parameters): self.vault_parameters = vault_parameters - self.env = get_env(vault_parameters) - - super().__init__(**{k: v for k, v in self.env.items() if k != "auth"}) + client_parameters = get_env(vault_parameters) + super().__init__(**client_parameters) self.authenticate() @@ -45,7 +44,7 @@ def get_auth_token(self): token_file = os.path.join(os.path.expanduser("~"), ".vault-token") token = self.read_token_from_file(token_file) - self.env["token"] = token + return token def read_token_from_file(self, token_file): try: @@ -64,77 +63,108 @@ def read_token_from_file(self, token_file): return token def authenticate(self): - # DIFFERENT LOGIN METHOD BASED ON AUTHENTICATION TYPE + # different login method based on authentication type auth_type = self.vault_parameters["auth"] - self.get_auth_token() + + token = self.get_auth_token() username = os.getenv("VAULT_USERNAME") password = os.getenv("VAULT_PASSWORD") + # token if auth_type == "token": - self.token = self.env["token"] + if token: + self.token = token + else: + raise VaultError( + "token authentication failed: VAULT_TOKEN is empty and '~/.vaulttoken not found" + ) + # github + elif auth_type == "github": + if token: + self.auth.github.login = token + else: + raise VaultError( + "token authentication failed: VAULT_TOKEN is empty and '~/.vaulttoken not found" + ) + # ldap elif auth_type == "ldap": - self.auth.ldap.login(username=username, password=password) + if username and password: + self.auth.ldap.login(username=username, password=password) + else: + raise VaultError("ldap authentication failed: VAULT_USERNAME or VAULT_PASSWORD is empty") + # userpass elif auth_type == "userpass": - self.auth.userpass.login(username=username, password=password) + if username and password: + self.auth.userpass.login(username=username, password=password) + else: + raise VaultError("userpass authentication failed: VAULT_USERNAME or VAULT_PASSWORD is empty") + # approle elif auth_type == "approle": - self.auth.approle.login(os.getenv("VAULT_ROLE_ID"), secret_id=os.getenv("VAULT_SECRET_ID")) - elif auth_type == "github": - self.auth.github.login(token=self.env["token"]) + role_id = os.getenv("VAULT_ROLE_ID") + secret_id = os.getenv("VAULT_SECRET_ID") + if role_id and secret_id: + self.auth.approle.login(role_id, secret_id=secret_id) + else: + raise VaultError("approle authentication failed: VAULT_ROLE_ID or VAULT_SECRET_ID is empty") else: - raise VaultError("Authentication type '{}' not supported".format(auth_type)) + raise VaultError(f"Authentication type '{auth_type}' not supported") if not self.is_authenticated(): self.adapter.close() raise VaultError("Vault Authentication Error, Environment Variables defined?") -def get_env(parameter): +def get_env(vault_parameters): """ The following variables need to be exported to the environment or defined in inventory. * VAULT_ADDR: url for vault - * VAULT_SKIP_VERIFY=true: if set, do not verify presented TLS certificate before communicating with Vault server. + * VAULT_SKIP_VERIFY: if set, do not verify presented TLS certificate before communicating with Vault server. * VAULT_CLIENT_KEY: path to an unencrypted PEM-encoded private key matching the client certificate * VAULT_CLIENT_CERT: path to a PEM-encoded client certificate for TLS authentication to the Vault server * VAULT_CACERT: path to a PEM-encoded CA cert file to use to verify the Vault server TLS certificate * VAULT_CAPATH: path to a directory of PEM-encoded CA cert files to verify the Vault server TLS certificate * VAULT_NAMESPACE: specify the Vault Namespace, if you have one - Following keys are used to creates a new hvac client instance. - :param url: Base URL for the Vault instance being addressed. - :type url: str - :param cert: Certificates for use in requests sent to the Vault instance. This should be a tuple with the - certificate and then key. - :type cert: tuple - :param verify: Either a boolean to indicate whether TLS verification should be performed when sending requests to Vault, - or a string pointing at the CA bundle to use for verification. - ### ssl-cert-verification. - See http://docs.python-requests.org/en/master/user/advanced/ - :type verify: Union[bool,str] - :param namespace: Optional Vault Namespace. - :type namespace: str """ client_parameters = {} - client_parameters["url"] = parameter.get("VAULT_ADDR", os.getenv("VAULT_ADDR", default="")) - client_parameters["namespace"] = parameter.get( - "VAULT_NAMESPACE", os.getenv("VAULT_NAMESPACE", default="") - ) - # VERIFY VAULT SERVER TLS CERTIFICATE - skip_verify = str(parameter.get("VAULT_SKIP_VERIFY", os.getenv("VAULT_SKIP_VERIFY", default=""))) - - if skip_verify.lower() == "false": - cert = parameter.get("VAULT_CACERT", os.getenv("VAULT_CACERT", default="")) - if not cert: - cert_path = parameter.get("VAULT_CAPATH", os.getenv("VAULT_CAPATH", default="")) - if not cert_path: - raise VaultError("Neither VAULT_CACERT nor VAULT_CAPATH specified") - client_parameters["verify"] = cert_path - else: - client_parameters["verify"] = cert + + # fetch missing values from env + variables = ["ADDR", "NAMESPACE", "SKIP_VERIFY", "CACERT", "CAPATH", "CLIENT_KEY", "CLIENT_CERT"] + for var in variables: + var = "VAULT_" + var + if vault_parameters.get(var) is None and os.getenv(var) is not None: + vault_parameters[var] = os.getenv(var) + + # set vault adress + vault_address = vault_parameters.get("VAULT_ADDR") + if not vault_address: + raise VaultError("VAULT_ADDR has to be specified in inventory or env") else: + client_parameters["url"] = vault_address + + # set vault namespace + vault_namespace = vault_parameters.get("VAULT_NAMESPACE") + if vault_namespace: + client_parameters["namespace"] = vault_namespace + + # set ca cert/path or skip verification + skip_verify = not (str(vault_parameters.get("VAULT_SKIP_VERIFY", False)).lower() == "false") + if skip_verify: + # TODO: surpress ssl warning client_parameters["verify"] = False + else: + cert = vault_parameters.get("VAULT_CACERT") + cert_path = vault_parameters.get("VAULT_CAPATH") + if cert and os.path.isfile(cert): + client_parameters["verify"] = cert + elif cert_path and os.path.isdir(cert_path): + client_parameters["verify"] = cert_path + else: + raise VaultError("Neither VAULT_CACERT nor VAULT_CAPATH specified or exist") - # CLIENT CERTIFICATE FOR TLS AUTHENTICATION - client_key = parameter.get("VAULT_CLIENT_KEY", os.getenv("VAULT_CLIENT_KEY", default="")) - client_cert = parameter.get("VAULT_CLIENT_CERT", os.getenv("VAULT_CLIENT_CERT", default="")) - if client_key != "" and client_cert != "": + # set client cert for tls authentication + client_key = vault_parameters.get("VAULT_CLIENT_KEY") + client_cert = vault_parameters.get("VAULT_CLIENT_CERT") + if client_key and client_cert: client_parameters["cert"] = (client_cert, client_key) + return client_parameters