Skip to content

Commit

Permalink
Use new Credential type
Browse files Browse the repository at this point in the history
* ManagedCredentialType custom callback functions now use a proper
  Credential class defined in awx_plugins.interfaces
  • Loading branch information
chrismeyersfsu committed Dec 2, 2024
1 parent 42adccb commit 6a50394
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 15 deletions.
60 changes: 46 additions & 14 deletions src/awx_plugins/credentials/injectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,22 @@
from awx_plugins.interfaces._temporary_private_container_api import ( # noqa: WPS436
get_incontainer_path,
)
from awx_plugins.interfaces._temporary_private_credential_api import ( # noqa: WPS436
Credential,
GenericOptionalPrimitiveType,
)
from awx_plugins.interfaces._temporary_private_django_api import ( # noqa: WPS436
get_vmware_certificate_validation_setting,
)

import yaml


def aws(cred, env, private_data_dir):
def aws(
cred: Credential,
env: dict[str, GenericOptionalPrimitiveType],
private_data_dir: str,
) -> None:
env['AWS_ACCESS_KEY_ID'] = cred.get_input('username', default='')
env['AWS_SECRET_ACCESS_KEY'] = cred.get_input('password', default='')

Expand All @@ -27,7 +35,11 @@ def aws(cred, env, private_data_dir):
env['AWS_SESSION_TOKEN'] = env['AWS_SECURITY_TOKEN']


def gce(cred, env, private_data_dir):
def gce(
cred: Credential,
env: dict[str, GenericOptionalPrimitiveType],
private_data_dir: str,
) -> str:
project = cred.get_input('project', default='')
username = cred.get_input('username', default='')

Expand Down Expand Up @@ -66,13 +78,17 @@ def gce(cred, env, private_data_dir):
return path


def azure_rm(cred, env, private_data_dir):
def azure_rm(
cred: Credential,
env: dict[str, GenericOptionalPrimitiveType],
private_data_dir: str,
) -> None:
client = cred.get_input('client', default='')
tenant = cred.get_input('tenant', default='')

env['AZURE_SUBSCRIPTION_ID'] = cred.get_input('subscription', default='')

if len(client) and len(tenant):
if client and tenant:
env['AZURE_CLIENT_ID'] = client
env['AZURE_TENANT'] = tenant
env['AZURE_SECRET'] = cred.get_input('secret', default='')
Expand All @@ -84,7 +100,11 @@ def azure_rm(cred, env, private_data_dir):
env['AZURE_CLOUD_ENVIRONMENT'] = cred.get_input('cloud_environment')


def vmware(cred, env, private_data_dir):
def vmware(
cred: Credential,
env: dict[str, GenericOptionalPrimitiveType],
private_data_dir: str,
) -> None:
env['VMWARE_USER'] = cred.get_input('username', default='')
env['VMWARE_PASSWORD'] = cred.get_input('password', default='')
env['VMWARE_HOST'] = cred.get_input('host', default='')
Expand All @@ -93,7 +113,7 @@ def vmware(cred, env, private_data_dir):
)


def _openstack_data(cred):
def _openstack_data(cred: Credential):
openstack_auth = dict(
auth_url=cred.get_input('host', default=''),
username=cred.get_input('username', default=''),
Expand Down Expand Up @@ -125,7 +145,11 @@ def _openstack_data(cred):
return openstack_data


def openstack(cred, env, private_data_dir):
def openstack(
cred: Credential,
env: dict[str, GenericOptionalPrimitiveType],
private_data_dir: str,
) -> None:
handle, path = tempfile.mkstemp(dir=os.path.join(private_data_dir, 'env'))
f = os.fdopen(handle, 'w')
openstack_data = _openstack_data(cred)
Expand All @@ -140,40 +164,48 @@ def openstack(cred, env, private_data_dir):
env['OS_CLIENT_CONFIG_FILE'] = get_incontainer_path(path, private_data_dir)


def kubernetes_bearer_token(cred, env, private_data_dir):
def kubernetes_bearer_token(
cred: Credential,
env: dict[str, GenericOptionalPrimitiveType],
private_data_dir: str,
) -> None:
env['K8S_AUTH_HOST'] = cred.get_input('host', default='')
env['K8S_AUTH_API_KEY'] = cred.get_input('bearer_token', default='')
if cred.get_input('verify_ssl') and 'ssl_ca_cert' in cred.inputs:
if cred.get_input('verify_ssl') and cred.has_input('ssl_ca_cert'):
env['K8S_AUTH_VERIFY_SSL'] = 'True'
handle, path = tempfile.mkstemp(
dir=os.path.join(private_data_dir, 'env'),
)
with os.fdopen(handle, 'w') as f:
os.chmod(path, stat.S_IRUSR | stat.S_IWUSR)
f.write(cred.get_input('ssl_ca_cert'))
f.write(str(cred.get_input('ssl_ca_cert')))
env['K8S_AUTH_SSL_CA_CERT'] = get_incontainer_path(
path, private_data_dir,
)
else:
env['K8S_AUTH_VERIFY_SSL'] = 'False'


def terraform(cred, env, private_data_dir):
def terraform(
cred: Credential,
env: dict[str, GenericOptionalPrimitiveType],
private_data_dir: str,
) -> None:
handle, path = tempfile.mkstemp(dir=os.path.join(private_data_dir, 'env'))
with os.fdopen(handle, 'w') as f:
os.chmod(path, stat.S_IRUSR | stat.S_IWUSR)
f.write(cred.get_input('configuration'))
f.write(str(cred.get_input('configuration')))
env['TF_BACKEND_CONFIG_FILE'] = get_incontainer_path(
path, private_data_dir,
)
# Handle env variables for GCP account credentials
if 'gce_credentials' in cred.inputs:
if cred.has_input('gce_credentials'):
handle, path = tempfile.mkstemp(
dir=os.path.join(private_data_dir, 'env'),
)
with os.fdopen(handle, 'w') as f:
os.chmod(path, stat.S_IRUSR | stat.S_IWUSR)
f.write(cred.get_input('gce_credentials'))
f.write(str(cred.get_input('gce_credentials')))
env['GOOGLE_BACKEND_CREDENTIALS'] = get_incontainer_path(
path, private_data_dir,
)
2 changes: 1 addition & 1 deletion src/awx_plugins/credentials/plugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
gettext_noop,
)

from awx_plugins.credentials.injectors import (
from .injectors import (
aws,
azure_rm,
gce,
Expand Down

0 comments on commit 6a50394

Please sign in to comment.