Skip to content

Commit

Permalink
refactor: rework vault client auth
Browse files Browse the repository at this point in the history
  • Loading branch information
MatteoVoges committed Oct 4, 2023
1 parent a92a90f commit 9e12800
Showing 1 changed file with 78 additions and 48 deletions.
126 changes: 78 additions & 48 deletions kapitan/refs/vault_resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,8 @@ class VaultClient(hvac.Client):

def __init__(self, vault_parameters):
self.vault_parameters = vault_parameters
self.env = get_env(vault_parameters)

super().__init__(**{k: v for k, v in self.env.items() if k != "auth"})
client_parameters = get_env(vault_parameters)
super().__init__(**client_parameters)

self.authenticate()

Expand All @@ -45,7 +44,7 @@ def get_auth_token(self):
token_file = os.path.join(os.path.expanduser("~"), ".vault-token")
token = self.read_token_from_file(token_file)

self.env["token"] = token
return token

def read_token_from_file(self, token_file):
try:
Expand All @@ -64,77 +63,108 @@ def read_token_from_file(self, token_file):
return token

def authenticate(self):
# DIFFERENT LOGIN METHOD BASED ON AUTHENTICATION TYPE
# different login method based on authentication type
auth_type = self.vault_parameters["auth"]
self.get_auth_token()

token = self.get_auth_token()
username = os.getenv("VAULT_USERNAME")
password = os.getenv("VAULT_PASSWORD")

# token
if auth_type == "token":
self.token = self.env["token"]
if token:
self.token = token
else:
raise VaultError(
"token authentication failed: VAULT_TOKEN is empty and '~/.vaulttoken not found"
)
# github
elif auth_type == "github":
if token:
self.auth.github.login = token
else:
raise VaultError(
"token authentication failed: VAULT_TOKEN is empty and '~/.vaulttoken not found"
)
# ldap
elif auth_type == "ldap":
self.auth.ldap.login(username=username, password=password)
if username and password:
self.auth.ldap.login(username=username, password=password)
else:
raise VaultError("ldap authentication failed: VAULT_USERNAME or VAULT_PASSWORD is empty")
# userpass
elif auth_type == "userpass":
self.auth.userpass.login(username=username, password=password)
if username and password:
self.auth.userpass.login(username=username, password=password)
else:
raise VaultError("userpass authentication failed: VAULT_USERNAME or VAULT_PASSWORD is empty")
# approle
elif auth_type == "approle":
self.auth.approle.login(os.getenv("VAULT_ROLE_ID"), secret_id=os.getenv("VAULT_SECRET_ID"))
elif auth_type == "github":
self.auth.github.login(token=self.env["token"])
role_id = os.getenv("VAULT_ROLE_ID")
secret_id = os.getenv("VAULT_SECRET_ID")
if role_id and secret_id:
self.auth.approle.login(role_id, secret_id=secret_id)
else:
raise VaultError("approle authentication failed: VAULT_ROLE_ID or VAULT_SECRET_ID is empty")
else:
raise VaultError("Authentication type '{}' not supported".format(auth_type))
raise VaultError(f"Authentication type '{auth_type}' not supported")

if not self.is_authenticated():
self.adapter.close()
raise VaultError("Vault Authentication Error, Environment Variables defined?")


def get_env(parameter):
def get_env(vault_parameters):
"""
The following variables need to be exported to the environment or defined in inventory.
* VAULT_ADDR: url for vault
* VAULT_SKIP_VERIFY=true: if set, do not verify presented TLS certificate before communicating with Vault server.
* VAULT_SKIP_VERIFY: if set, do not verify presented TLS certificate before communicating with Vault server.
* VAULT_CLIENT_KEY: path to an unencrypted PEM-encoded private key matching the client certificate
* VAULT_CLIENT_CERT: path to a PEM-encoded client certificate for TLS authentication to the Vault server
* VAULT_CACERT: path to a PEM-encoded CA cert file to use to verify the Vault server TLS certificate
* VAULT_CAPATH: path to a directory of PEM-encoded CA cert files to verify the Vault server TLS certificate
* VAULT_NAMESPACE: specify the Vault Namespace, if you have one
Following keys are used to creates a new hvac client instance.
:param url: Base URL for the Vault instance being addressed.
:type url: str
:param cert: Certificates for use in requests sent to the Vault instance. This should be a tuple with the
certificate and then key.
:type cert: tuple
:param verify: Either a boolean to indicate whether TLS verification should be performed when sending requests to Vault,
or a string pointing at the CA bundle to use for verification.
### ssl-cert-verification.
See http://docs.python-requests.org/en/master/user/advanced/
:type verify: Union[bool,str]
:param namespace: Optional Vault Namespace.
:type namespace: str
"""
client_parameters = {}
client_parameters["url"] = parameter.get("VAULT_ADDR", os.getenv("VAULT_ADDR", default=""))
client_parameters["namespace"] = parameter.get(
"VAULT_NAMESPACE", os.getenv("VAULT_NAMESPACE", default="")
)
# VERIFY VAULT SERVER TLS CERTIFICATE
skip_verify = str(parameter.get("VAULT_SKIP_VERIFY", os.getenv("VAULT_SKIP_VERIFY", default="")))

if skip_verify.lower() == "false":
cert = parameter.get("VAULT_CACERT", os.getenv("VAULT_CACERT", default=""))
if not cert:
cert_path = parameter.get("VAULT_CAPATH", os.getenv("VAULT_CAPATH", default=""))
if not cert_path:
raise VaultError("Neither VAULT_CACERT nor VAULT_CAPATH specified")
client_parameters["verify"] = cert_path
else:
client_parameters["verify"] = cert

# fetch missing values from env
variables = ["ADDR", "NAMESPACE", "SKIP_VERIFY", "CACERT", "CAPATH", "CLIENT_KEY", "CLIENT_CERT"]
for var in variables:
var = "VAULT_" + var
if vault_parameters.get(var) is None and os.getenv(var) is not None:
vault_parameters[var] = os.getenv(var)

# set vault adress
vault_address = vault_parameters.get("VAULT_ADDR")
if not vault_address:
raise VaultError("VAULT_ADDR has to be specified in inventory or env")
else:
client_parameters["url"] = vault_address

# set vault namespace
vault_namespace = vault_parameters.get("VAULT_NAMESPACE")
if vault_namespace:
client_parameters["namespace"] = vault_namespace

# set ca cert/path or skip verification
skip_verify = not (str(vault_parameters.get("VAULT_SKIP_VERIFY", False)).lower() == "false")
if skip_verify:
# TODO: surpress ssl warning
client_parameters["verify"] = False
else:
cert = vault_parameters.get("VAULT_CACERT")
cert_path = vault_parameters.get("VAULT_CAPATH")
if cert and os.path.isfile(cert):
client_parameters["verify"] = cert
elif cert_path and os.path.isdir(cert_path):
client_parameters["verify"] = cert_path
else:
raise VaultError("Neither VAULT_CACERT nor VAULT_CAPATH specified or exist")

# CLIENT CERTIFICATE FOR TLS AUTHENTICATION
client_key = parameter.get("VAULT_CLIENT_KEY", os.getenv("VAULT_CLIENT_KEY", default=""))
client_cert = parameter.get("VAULT_CLIENT_CERT", os.getenv("VAULT_CLIENT_CERT", default=""))
if client_key != "" and client_cert != "":
# set client cert for tls authentication
client_key = vault_parameters.get("VAULT_CLIENT_KEY")
client_cert = vault_parameters.get("VAULT_CLIENT_CERT")
if client_key and client_cert:
client_parameters["cert"] = (client_cert, client_key)

return client_parameters

0 comments on commit 9e12800

Please sign in to comment.