Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Abstract Secret Management Support #703

Open
wants to merge 24 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/pull_request_push_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ jobs:
COSMOS1_KEY: ${{secrets.COSMOS1_KEY}}
SQL1_USER: ${{secrets.SQL1_USER}}
SQL1_PASSWORD: ${{secrets.SQL1_PASSWORD}}
AWS_ACCESS_KEY_ID: ${{secrets.AWS_ACCESS_KEY_ID}}
AWS_SECRET_ACCESS_KEY: ${{secrets.AWS_SECRET_ACCESS_KEY}}
run: |
# run only test with databricks. run in 6 parallel jobs
pytest -n 6 --cov-report term-missing --cov=feathr_project/feathr feathr_project/test --cov-config=.github/workflows/.coveragerc_db --cov-fail-under=75
Expand Down Expand Up @@ -196,6 +198,8 @@ jobs:
COSMOS1_KEY: ${{secrets.COSMOS1_KEY}}
SQL1_USER: ${{secrets.SQL1_USER}}
SQL1_PASSWORD: ${{secrets.SQL1_PASSWORD}}
AWS_ACCESS_KEY_ID: ${{secrets.AWS_ACCESS_KEY_ID}}
AWS_SECRET_ACCESS_KEY: ${{secrets.AWS_SECRET_ACCESS_KEY}}
run: |
# skip databricks related test as we just ran the test; also seperate databricks and synapse test to make sure there's no write conflict
# run in 6 parallel jobs to make the time shorter
Expand Down
56 changes: 54 additions & 2 deletions docs/how-to-guides/feathr-configuration-and-env.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,60 @@ This allows end users to store the configuration in a secure way, say in Kuberne
Feathr will get the configurations in the following order:

1. If the key is set in the environment variable, Feathr will use the value of that environment variable
2. If it's not set in the environment, then a value is retrieved from the feathr_config.yaml file with the same config key.
3. If it's not available in the feathr_config.yaml file, Feathr will try to retrieve the value from a key vault service. Currently only Azure Key Vault is supported.
2. If it's not set in the environment, then a value is retrieved from the `feathr_config.yaml` file with the same config key.
3. If it's not available in the `feathr_config.yaml` file, Feathr will try to retrieve the value from a key vault service. Currently both Azure Key Vault and AWS Secrets Manager are supported.

# Using Secret Management Service in Feathr

Feathr supports using a Secret Management service for all the credentials and environment variables. Currently the supported secret management services are Azure Key Vault and AWS Secrets Manager.

In order to use those secret management service, there are two steps:

Step 1: Tell Feathr which secret management service to use, and what is the corresponding namespace.

If using Azure Key Vault:
```yaml
secrets:
azure_key_vault:
name: feathrazuretest3-kv
```

If using AWS Secret Manager, users should put the corresponding secret_id in the `feathr_config.yaml` section, like below, so that Feathr knows which secret_id to use to retrieve the required credentials.
```yaml
secrets:
aws_secrets_manager:
secret_id: feathrsecret_namespace
```

Step 2: Initialize a secret management client and pass it to Feathr.

For Azure Key Vault:
```python
from azure.keyvault.secrets import SecretClient
secret_client = SecretClient(
vault_url = f"https://<replace_with_key_vault_name>.vault.azure.net",
credential=DefaultAzureCredential()
)
feathr_client = FeathrClient(..., secret_manager_client = secret_client)
```

For AWS Secrets Manager, users need to create a SecretCache object and pass it to Feathr client, like below:
```python
import botocore
import botocore.session
from aws_secretsmanager_caching import SecretCache, SecretCacheConfig

client = botocore.session.get_session().create_client(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As an alternative, we can include in the documentation, that IRSA based authentication is possible as well if the users are running feathr in k8s. More documentation here: https://docs.aws.amazon.com/eks/latest/userguide/iam-roles-for-service-accounts.html
I don't believe any code changes are required for this auth to be supported.

service_name='secretsmanager',
aws_access_key_id = '<replace_your_aws_access_key_id>',
aws_secret_access_key= '<replace_your_aws_secret_access_key>',
region_name=region_name
)
cache_config = SecretCacheConfig()
cache = SecretCache( config = cache_config, client = client)
feathr_client = FeathrClient(..., secret_manager_client = cache)

```

# A list of environment variables that Feathr uses when running Spark job

Expand Down
20 changes: 6 additions & 14 deletions feathr_project/feathr/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ def __init__(
local_workspace_dir: str = None,
credential: Any = None,
project_registry_tag: Dict[str, str] = None,
secret_manager_client = None
):
"""Initialize Feathr Client.
Configuration values used by the Feathr are evaluated in the following precedence, with items higher on the list taking priority.
Expand All @@ -77,12 +78,13 @@ def __init__(
local_workspace_dir (optional): Set where is the local work space dir. If not set, Feathr will create a temporary folder to store local workspace related files.
credential (optional): Azure credential to access cloud resources, most likely to be the returned result of DefaultAzureCredential(). If not set, Feathr will initialize DefaultAzureCredential() inside the __init__ function to get credentials.
project_registry_tag (optional): Adding tags for project in Feathr registry. This might be useful if you want to tag your project as deprecated, or allow certain customizations on project level. Default is empty
secret_manager_client: the secret manager client initialized outside of Feathr. End users need to initialize the secret manager outside of Feathr and pass it to Feathr so Feathr can use it to get required secrets.
"""
self.logger = logging.getLogger(__name__)
# Redis key separator
self._KEY_SEPARATOR = ':'
self._COMPOSITE_KEY_SEPARATOR = '#'
self.env_config = EnvConfigReader(config_path=config_path)
self.env_config = EnvConfigReader(config_path=config_path, secret_manager_client=None)
if local_workspace_dir:
self.local_workspace_dir = local_workspace_dir
else:
Expand Down Expand Up @@ -215,17 +217,6 @@ def __init__(

logger.info(f"Feathr client {get_version()} initialized successfully.")

def _check_required_environment_variables_exist(self):
"""Checks if the required environment variables(form feathr_config.yaml) is set.

Some required information has to be set via environment variables so the client can work.
"""
props = self.secret_names
for required_field in (self.required_fields + props):
if required_field not in os.environ:
raise RuntimeError(f'{required_field} is not set in environment variable. All required environment '
f'variables are: {self.required_fields}.')

def register_features(self, from_context: bool = True):
"""Registers features based on the current workspace

Expand Down Expand Up @@ -487,7 +478,8 @@ def _construct_redis_client(self):
host = self.redis_host
port = self.redis_port
ssl_enabled = self.redis_ssl_enabled
self.redis_client = redis.Redis(

self.self.redis_client = redis.Redis(
host=host,
port=port,
password=password,
Expand Down Expand Up @@ -676,7 +668,7 @@ def monitor_features(self, settings: MonitoringSettings, execution_configuration
# Should search in both 'derived_feature_list' and 'anchor_list'
# Return related keys(key_column list) or None if cannot find the feature
def _get_feature_key(self, feature_name: str):
features = []
features: List[FeatureBase] = []
if 'derived_feature_list' in dir(self):
features += self.derived_feature_list
if 'anchor_list' in dir(self):
Expand Down
30 changes: 30 additions & 0 deletions feathr_project/feathr/secrets/abc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from abc import ABC, abstractmethod

from typing import Any, Dict, List, Optional, Tuple


class FeathrSecretsManagementClient(ABC):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the file should not be named as abc.py?

"""This is the abstract class for all the secrets management service, which are used to store the credentials that Feathr might use.
"""

@abstractmethod
def __init__(self, secret_namespace: str, secret_client) -> None:
"""Initialize the FeathrSecretsManagementClient class.

Args:
secret_namespace (str): a namespace that Feathr needs to get secrets from.
For Azure Key Vault, it is something like the key vault name.
For AWS secrets manager, it is something like a secret name.

secret_client: A client that will be used to retrieve Feathr secrets.
"""
pass

@abstractmethod
def get_feathr_secret(self, secret_name: str) -> str:
"""Get Feathr Secrets from a certain secret management service, such as Azure Key Vault or AWS Secrets Manager.

Returns:
str: returned secret from secret management service
"""
pass
40 changes: 23 additions & 17 deletions feathr_project/feathr/secrets/akv_client.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,37 @@
from azure.keyvault.secrets import SecretClient
from azure.identity import DefaultAzureCredential
from loguru import logger
from azure.core.exceptions import ResourceNotFoundError
from feathr.secrets.abc import FeathrSecretsManagementClient

class AzureKeyVaultClient:
def __init__(self, akv_name: str):
self.akv_name = akv_name
self.secret_client = None

def get_feathr_akv_secret(self, secret_name: str):
class AzureKeyVaultClient(FeathrSecretsManagementClient):
def __init__(self, secret_namespace: str, secret_client: SecretClient = None):
"""Initializes the AzureKeyVaultClient. Note that `secret_namespace` is not used, since the namespace information will be included in secret_client.
"""
self.secret_client = secret_client
if self.secret_client is not None and not isinstance(secret_client, SecretClient):
raise RuntimeError(
"You need to pass an azure.keyvault.secrets.SecretClient instance.")

def get_feathr_secret(self, secret_name: str) -> str:
"""Get Feathr Secrets from Azure Key Vault. Note that this function will replace '_' in `secret_name` with '-' since Azure Key Vault doesn't support it

Returns:
_type_: _description_
str: returned secret from secret management service
"""
if self.secret_client is None:
self.secret_client = SecretClient(
vault_url = f"https://{self.akv_name}.vault.azure.net",
credential=DefaultAzureCredential()
)
raise RuntimeError("You need to pass an azure.keyvault.secrets.SecretClient instance when initializing FeathrClient.")

try:
# replace '_' with '-' since Azure Key Vault doesn't support it
variable_replaced = secret_name.replace('_','-') #.upper()
logger.info('Fetching the secret {} from Key Vault {}.', variable_replaced, self.akv_name)
variable_replaced = secret_name.replace('_', '-') # .upper()
logger.info('Fetching the secret {} from Key Vault {}.',
variable_replaced, self.secret_client.vault_url)
secret = self.secret_client.get_secret(variable_replaced)
logger.info('Secret {} fetched from Key Vault {}.', variable_replaced, self.akv_name)
logger.info('Secret {} fetched from Key Vault {}.',
variable_replaced, self.secret_client.vault_url)
return secret.value
except ResourceNotFoundError as e:
logger.error(f"Secret {secret_name} cannot be found in Key Vault {self.akv_name}.")
raise
except ResourceNotFoundError:
logger.error(
f"Secret {secret_name} cannot be found in Key Vault {self.secret_client.vault_url}.")
raise
33 changes: 33 additions & 0 deletions feathr_project/feathr/secrets/aws_secretmanager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from loguru import logger
import json
from feathr.secrets.abc import FeathrSecretsManagementClient
from aws_secretsmanager_caching.secret_cache import SecretCache


class AWSSecretManagerClient(FeathrSecretsManagementClient):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the file name: aws_secret_manager

def __init__(self, secret_namespace: str = None, secret_client: SecretCache = None):
self.secret_id = secret_namespace
self.secret_client = secret_client
# make sure secret_client is a SecretCache type
if secret_client is not None and not isinstance(secret_client, SecretCache):
raise RuntimeError(
"You need to pass a aws_secretsmanager_caching.secret_cache.SecretCache instance. Please refer to https://docs.aws.amazon.com/secretsmanager/latest/userguide/retrieving-secrets_cache-python.html for more details.")

def get_feathr_secret(self, secret_name: str):
"""Get Feathr Secrets from AWS Secrets manager. It's also recommended that the client passes a cache objects to reduce cost.
See more details here: https://docs.aws.amazon.com/secretsmanager/latest/userguide/retrieving-secrets_cache-python.html
"""
if self.secret_client is None:
raise RuntimeError(
"You need to pass a aws_secretsmanager_caching.secret_cache.SecretCache instance when initializing FeathrClient.")

try:
get_secret_value_response = self.secret_client.get_secret_string(
self.secret_id)
# result is in str format, so we need to load it as a dict
secret = json.loads(get_secret_value_response)
return secret[secret_name]
except KeyError as e:
logger.error(
f"Secret {secret_name} cannot be found in secretsmanager {self.secret_id}.")
raise e
4 changes: 1 addition & 3 deletions feathr_project/feathr/spark_provider/_abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from typing import Dict, List, Optional, Tuple



class SparkJobLauncher(ABC):
"""This is the abstract class for all the spark launchers. All the Spark launcher should implement those interfaces
"""
Expand Down Expand Up @@ -52,8 +53,5 @@ def get_status(self) -> str:

Returns:
str: Status of the current job

Returns:
str: _description_
"""
pass
111 changes: 111 additions & 0 deletions feathr_project/feathr/utils/_envvariableutil.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import os
import yaml
from loguru import logger
from feathr.secrets.akv_client import AzureKeyVaultClient
from azure.core.exceptions import ResourceNotFoundError
from feathr.secrets.aws_secretmanager import AWSSecretManagerClient

class _EnvVaraibleUtil(object):
def __init__(self, config_path: str, secret_manager_client = None):
"""Initialize the environment variable utils client

Args:
config_path (str): configuration path, if users want to use YAML to load all the configs
secret_manager_client: the secret manager client type. currently only Azure key vault and AWS secret manager is supported.
"""
self.config_path = config_path
# Set to none first to avoid invalid reference
self.secret_manager_client = None
if secret_manager_client and self.get_environment_variable_with_default('secrets', 'azure_key_vault', 'name'):
self.secret_manager_client = AzureKeyVaultClient(
secret_namespace=self.get_environment_variable_with_default('secrets', 'azure_key_vault', 'name'),
secret_client=secret_manager_client)
elif secret_manager_client and self.get_environment_variable_with_default('secrets', 'aws_secrets_manager', 'secret_id'):
self.secret_manager_client = AWSSecretManagerClient(
secret_namespace=self.get_environment_variable_with_default('secrets', 'aws_secrets_manager', 'secret_id'),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this introduce an else and error out in the case if the secrets manager client is not supported by feathr?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch. I'll update

secret_client=secret_manager_client)

def get_environment_variable_with_default(self, *args):
"""Gets the environment variable for the variable key.
Args:
*args: list of keys in feathr_config.yaml file
Return:
A environment variable for the variable key. It will retrieve the value of the environment variables in the following order:
If the key is set in the environment variable, Feathr will use the value of that environment variable
If it's not set in the environment, then a default is retrieved from the feathr_config.yaml file with the same config key.
If it's not available in the feathr_config.yaml file, Feathr will try to retrieve the value from key vault
If not found, an empty string will be returned with a warning error message.
"""

# if envs exist, just return the existing env variable without reading the file
env_keyword = "__".join(args)
upper_env_keyword = env_keyword.upper()
# make it work for lower case and upper case.
env_variable = os.environ.get(
env_keyword, os.environ.get(upper_env_keyword))

# If the key is set in the environment variable, Feathr will use the value of that environment variable
if env_variable:
return env_variable

# If it's not set in the environment, then a default is retrieved from the feathr_config.yaml file with the same config key.
if os.path.exists(os.path.abspath(self.config_path)):
with open(os.path.abspath(self.config_path), 'r') as stream:
try:
yaml_config = yaml.safe_load(stream)
# concat all layers and check in environment variable
yaml_layer = yaml_config

# resolve one layer after another
for arg in args:
yaml_layer = yaml_layer[arg]
return yaml_layer
except KeyError as exc:
logger.info(
"{} not found in the config file.", env_keyword)
except yaml.YAMLError as exc:
logger.warning(exc)

# If it's not available in the feathr_config.yaml file, Feathr will try to retrieve the value from key vault
if self.secret_manager_client:
try:
return self.secret_manager_client.get_feathr_secret(env_keyword)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this check be case insensitive (upper case and lower case)?

except ResourceNotFoundError:
# print out warning message if cannot find the env variable in all the resources
logger.warning('Environment variable {} not found in environment variable, default YAML config file, or key vault service.', env_keyword)
return None
except KeyError:
# print out warning message if cannot find the env variable in all the resources
logger.warning('Environment variable {} not found in environment variable, default YAML config file, or key vault service.', env_keyword)
return None

def get_environment_variable(self, variable_key):
"""Gets the environment variable for the variable key.

Args:
variable_key: environment variable key that is used to retrieve the environment variable
Return:
A environment variable for the variable key. It will retrieve the value of the environment variables in the following order:
If the key is set in the environment variable, Feathr will use the value of that environment variable
If it's not available in the environment variable file, Feathr will try to retrieve the value from key vault
If not found, an empty string will be returned with a warning error message.
"""
env_var_value = os.environ.get(variable_key)

if env_var_value:
return env_var_value

# If it's not available in the environment variable file, Feathr will try to retrieve the value from key vault
logger.info(variable_key + ' is not set in the environment variables.')

if self.secret_manager_client:
try:
return self.secret_manager_client.get_feathr_secret(variable_key)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above

except ResourceNotFoundError:
# print out warning message if cannot find the env variable in all the resources
logger.warning('Environment variable {} not found in environment variable, default YAML config file, or key vault service.', variable_key)
return None
except KeyError:
# print out warning message if cannot find the env variable in all the resources
logger.warning('Environment variable {} not found in environment variable, default YAML config file, or key vault service.', variable_key)
return None
Loading