diff --git a/fedcloudclient/decorators.py b/fedcloudclient/decorators.py index 54416f8..832464c 100644 --- a/fedcloudclient/decorators.py +++ b/fedcloudclient/decorators.py @@ -34,6 +34,15 @@ metavar="site-name", ) +# Output format for secret module +secret_output_params = click.option( + "--output-format", + "-f", + required=False, + help="Output format", + type=click.Choice(["text", "YAML", "JSON"], case_sensitive=False), +) + def all_site_params(func): """ @@ -332,3 +341,71 @@ def wrapper(*args, **kwargs): return func(*args, **kwargs) return wrapper + + +def secret_token_params(func): + """ + Decorator for secret token. + If locker token is not defined, get access token from oidc-* parameters + and replace them in the wrapper function + """ + + @optgroup.group("Token options", help="Choose one of options for providing token") + @optgroup.option( + "--locker-token", + help="Locker token", + envvar="FEDCLOUD_LOCKER_TOKEN", + metavar="locker_token", + ) + @optgroup.option( + "--oidc-agent-account", + help="Account name in oidc-agent", + envvar="OIDC_AGENT_ACCOUNT", + metavar="account", + ) + @optgroup.option( + "--oidc-access-token", + help="OIDC access token", + envvar="OIDC_ACCESS_TOKEN", + metavar="token", + ) + @optgroup.option( + "--mytoken", + help="Mytoken string", + envvar="FEDCLOUD_MYTOKEN", + metavar="mytoken", + ) + @optgroup.option( + "--mytoken-server", + help="Mytoken sever", + envvar="FEDCLOUD_MYTOKEN_SERVER", + default=DEFAULT_MYTOKEN_SERVER, + show_default=True, + metavar="mytoken-server", + ) + @wraps(func) + def wrapper(*args, **kwargs): + from fedcloudclient.checkin import get_access_token + + # If locker token is given, ignore OIDC token options + locker_token = kwargs.pop("locker_token") + if locker_token: + kwargs.pop("oidc_access_token") + kwargs.pop("oidc_agent_account") + kwargs.pop("mytoken") + kwargs.pop("mytoken_server") + kwargs["access_token"] = None + kwargs["locker_token"] = locker_token + return func(*args, **kwargs) + + access_token = get_access_token( + kwargs.pop("oidc_access_token"), + kwargs.pop("oidc_agent_account"), + kwargs.pop("mytoken"), + kwargs.pop("mytoken_server"), + ) + kwargs["access_token"] = access_token + kwargs["locker_token"] = None + return func(*args, **kwargs) + + return wrapper diff --git a/fedcloudclient/secret.py b/fedcloudclient/secret.py index a961ed0..113b501 100644 --- a/fedcloudclient/secret.py +++ b/fedcloudclient/secret.py @@ -8,8 +8,8 @@ import click import hvac +import requests import yaml - from cryptography.fernet import Fernet, InvalidToken from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC @@ -18,12 +18,17 @@ from yaml import YAMLError from fedcloudclient.checkin import get_checkin_id -from fedcloudclient.decorators import oidc_params +from fedcloudclient.decorators import ( + oidc_params, + secret_output_params, + secret_token_params, +) VAULT_ADDR = "https://vault.services.fedcloud.eu:8200" VAULT_ROLE = "" -VAULT_MOUNT_POINT = "/secrets" +VAULT_MOUNT_POINT = "/secrets/" VAULT_SALT = "fedcloud_salt" +VAULT_LOCKER_MOUNT_POINT = "/v1/cubbyhole/" def secret_client(access_token, command, path, data): @@ -64,6 +69,41 @@ def secret_client(access_token, command, path, data): ) +def locker_client(locker_token, command, path, data): + """ + Client function for accessing secrets + :param path: path to secret + :param command: the command to perform + :param data: input data + :param locker_token: locker token + :return: Output data from the service + """ + + try: + headers = {"X-Vault-Token": locker_token} + url = VAULT_ADDR + VAULT_LOCKER_MOUNT_POINT + path + if command == "list_secrets": + response = requests.get(url, headers=headers, params={"list": "true"}) + elif command == "read_secret": + response = requests.get(url, headers=headers) + elif command == "delete_secret": + response = requests.delete(url, headers=headers) + elif command == "put": + response = requests.post(url, headers=headers, data=data) + else: + raise SystemExit(f"Invalid command {command}") + response.raise_for_status() + if command in ["list_secrets", "read_secret"]: + response_json = response.json() + return dict(response_json) + else: + return None + except requests.exceptions.HTTPError as exception: + raise SystemExit( + f"Error: Error when accessing secrets on server. Server response: {type(exception).__name__}: {exception}" + ) + + def read_data_from_file(input_format, input_file): """ Read data from file. Format may be text, yaml, json or auto-detect according to file extension @@ -243,14 +283,8 @@ def secret(): @secret.command() -@oidc_params -@click.option( - "--output-format", - "-f", - required=False, - help="Output format", - type=click.Choice(["text", "YAML", "JSON"], case_sensitive=False), -) +@secret_token_params +@secret_output_params @click.argument("short_path", metavar="[secret path]") @click.argument("key", metavar="[key]", required=False) @click.option( @@ -276,6 +310,7 @@ def secret(): ) def get( access_token, + locker_token, short_path, key, output_format, @@ -286,8 +321,10 @@ def get( """ Get the secret object in the path. If a key is given, print only the value of the key """ - - response = secret_client(access_token, "read_secret", short_path, None) + if locker_token: + response = locker_client(locker_token, "read_secret", short_path, None) + else: + response = secret_client(access_token, "read_secret", short_path, None) if decrypt_key: decrypt_data(decrypt_key, response["data"]) if not key: @@ -300,40 +337,42 @@ def get( @secret.command("list") -@oidc_params +@secret_token_params @click.argument("short_path", metavar="[secret path]", required=False, default="") def list_( access_token, + locker_token, short_path, ): """ List secret objects in the path """ - - response = secret_client(access_token, "list_secrets", short_path, None) + if locker_token: + response = locker_client(locker_token, "list_secrets", short_path, None) + else: + response = secret_client(access_token, "list_secrets", short_path, None) print("\n".join(map(str, response["data"]["keys"]))) @secret.command() -@oidc_params +@secret_token_params @click.argument("short_path", metavar="[secret path]") @click.argument("secrets", nargs=-1, metavar="[key=value...]") @click.option( "--encrypt-key", "-e", metavar="[key]", - required=False, help="Encryption key or passphrase", ) @click.option( "--binary-file", "-b", - required=False, is_flag=True, help="True for reading secrets from binary files", ) def put( access_token, + locker_token, short_path, secrets, encrypt_key, @@ -346,18 +385,97 @@ def put( secret_dict = secret_params_to_dict(secrets, binary_file) if encrypt_key: encrypt_data(encrypt_key, secret_dict) - secret_client(access_token, "put", short_path, secret_dict) + if locker_token: + locker_client(locker_token, "put", short_path, secret_dict) + else: + secret_client(access_token, "put", short_path, secret_dict) @secret.command() -@oidc_params +@secret_token_params @click.argument("short_path", metavar="[secret path]") def delete( access_token, + locker_token, short_path, ): """ Delete the secret object in the path """ + if locker_token: + locker_client(locker_token, "delete_secret", short_path, None) + else: + secret_client(access_token, "delete_secret", short_path, None) + + +@secret.group() +def locker(): + """ + Commands for creating and accessing locker objects + """ - secret_client(access_token, "delete_secret", short_path, None) + +@locker.command() +@oidc_params +@secret_output_params +@click.option("--ttl", default="24h", help="Time-to-live for the new locker") +@click.option("--num-uses", default=10, help="Max number of uses") +@click.option("--verbose", is_flag=True, help="Print token details") +def create(access_token, ttl, num_uses, output_format, verbose): + """ + Create a locker and return the locker token + """ + try: + client = hvac.Client(url=VAULT_ADDR) + client.auth.jwt.jwt_login(role=VAULT_ROLE, jwt=access_token) + client.auth.token.renew_self(increment=ttl) + locker_token = client.auth.token.create( + policies=["default"], ttl=ttl, num_uses=num_uses, renewable=False + ) + if not verbose: + print(locker_token["auth"]["client_token"]) + else: + print_secrets(None, output_format, locker_token["auth"]) + except VaultError as e: + raise SystemExit( + f"Error: Error when accessing secrets on server. Server response: {type(e).__name__}: {e}" + ) + + +@locker.command() +@secret_output_params +@click.argument( + "locker_token", metavar="[locker_token]", envvar="FEDCLOUD_LOCKER_TOKEN" +) +def check(locker_token, output_format): + """ + Check status of locker token + """ + + try: + client = hvac.Client(url=VAULT_ADDR) + client.token = locker_token + locker_info = client.auth.token.lookup_self() + print_secrets(None, output_format, locker_info["data"]) + except VaultError as e: + raise SystemExit( + f"Error: Error when accessing secrets on server. Server response: {type(e).__name__}: {e}" + ) + + +@locker.command() +@click.argument( + "locker_token", metavar="[locker_token]", envvar="FEDCLOUD_LOCKER_TOKEN" +) +def revoke(locker_token): + """ + Revoke the locker token and delete all data in the locker + """ + try: + client = hvac.Client(url=VAULT_ADDR) + client.token = locker_token + client.auth.token.revoke_self() + except VaultError as e: + raise SystemExit( + f"Error: Error when accessing secrets on server. Server response: {type(e).__name__}: {e}" + )