Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(token-service): generated password should contain required characters #813

Merged
merged 2 commits into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# -*- coding: utf-8 -*-
import random
import secrets
import string
from dataclasses import dataclass
from typing import List

import boto3

Expand All @@ -13,24 +14,49 @@ class ServiceUserDto:
email: str


SPECIAL_PASSWORD_CHAR_SET = "_-!@#%&."


class CognitoTokenService:

def __init__(self, user_pool_id: str, user_pool_app_client_id: str):
self.client = boto3.client('cognito-idp')
self.user_pool_id: str = user_pool_id
self.user_pool_app_client_id: str = user_pool_app_client_id

@staticmethod
def generate_password(length: int = 32) -> str:
self.password_char_sets: List[str] = [
string.ascii_lowercase,
string.ascii_uppercase,
string.digits,
SPECIAL_PASSWORD_CHAR_SET,
]

def generate_password(self, length: int = 32) -> str:
"""
Must meet the Cognito password requirements policy
https://docs.aws.amazon.com/cognito/latest/developerguide/user-pool-settings-policies.html
"""
if length < 8:
raise ValueError('Length must be at least 8 characters or more better')
return ''.join(
random.SystemRandom().choice(string.ascii_letters + string.digits + '_-!@#%&.') for _ in range(length)
)
raise ValueError("Length must be at least 8 characters or more better")

password = ""
while not self.is_password_valid(password):
password = "".join(
secrets.SystemRandom().choice("".join(self.password_char_sets))
for _ in range(length)
)

return password

def is_password_valid(self, password: str) -> bool:
"""
Check if the password meets the Cognito password requirements policy
https://docs.aws.amazon.com/cognito/latest/developerguide/user-pool-settings-policies.html
"""
for char_set in self.password_char_sets:
if not any(c in char_set for c in password):
return False

return True

def list_users(self, **kwargs) -> dict:
if 'UserPoolId' in kwargs.keys():
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import logging
import string
import unittest
from datetime import datetime

import botocore
from botocore.stub import Stubber

from . import CognitoTokenService, ServiceUserDto
from . import CognitoTokenService, ServiceUserDto, SPECIAL_PASSWORD_CHAR_SET

logger = logging.getLogger()
logger.setLevel(logging.INFO)
Expand All @@ -22,7 +23,7 @@ def setUp(self):
self.jjb_dto = ServiceUserDto(
username='jjb',
email='<EMAIL>',
password=CognitoTokenService.generate_password()
password=self.srv.generate_password()
)

self.mock_client = botocore.session.get_session().create_client('cognito-idp')
Expand All @@ -39,11 +40,57 @@ def test_generate_password(self):
"""
python -m unittest token_service.cognitor.tests.CognitorUnitTest.test_generate_password
"""
passwd = CognitoTokenService.generate_password()
passwd = self.srv.generate_password()
self.assertIsNotNone(passwd)
self.assertEqual(len(passwd), 32)
self.assertTrue(any(c.islower() for c in passwd))
self.assertTrue(any(c.isupper() for c in passwd))
self.assertTrue(any(c.isdigit() for c in passwd))
self.assertTrue(any(c in SPECIAL_PASSWORD_CHAR_SET for c in passwd))
# print(passwd)

def test_is_password_valid(self):
"""
python -m unittest token_service.cognitor.tests.CognitorUnitTest.test_is_password_valid
"""
self.assertTrue(
self.srv.is_password_valid(
string.ascii_lowercase
+ string.ascii_uppercase
+ string.digits
+ SPECIAL_PASSWORD_CHAR_SET
)
)
self.assertFalse(
self.srv.is_password_valid(
string.ascii_lowercase
+ string.ascii_uppercase
+ SPECIAL_PASSWORD_CHAR_SET
)
)
self.assertFalse(
self.srv.is_password_valid(
string.ascii_lowercase + SPECIAL_PASSWORD_CHAR_SET + string.digits
)
)
self.assertFalse(
self.srv.is_password_valid(
SPECIAL_PASSWORD_CHAR_SET + string.ascii_uppercase + string.digits
)
)
self.assertFalse(
self.srv.is_password_valid(
SPECIAL_PASSWORD_CHAR_SET
+ string.ascii_uppercase
+ string.ascii_lowercase
)
)
self.assertFalse(
self.srv.is_password_valid(
string.ascii_lowercase + string.ascii_uppercase + string.digits
)
)

def test_list_users(self):
"""
python -m unittest token_service.cognitor.tests.CognitorUnitTest.test_list_users
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def lambda_handler(event, context):
event (dict): Lambda dictionary of event parameters. These keys must include the following:
- SecretId: The secret ARN or identifier
- ClientRequestToken: The ClientRequestToken of the secret version
- Step: The rotation step (one of createSecret, setSecret, testSecret, or finishSecret)
- Step: The rotation step (one of createSecret, setSecret, testSecret, finishSecret or resetPendingSecret)

context (LambdaContext): The Lambda runtime information

Expand Down Expand Up @@ -97,6 +97,9 @@ def lambda_handler(event, context):
elif step == "finishSecret":
finish_secret(service_client, arn, token)

elif step == "resetPendingSecret":
reset_pending_secret(service_client, arn, token)

else:
logger.error("lambda_handler: Invalid step parameter %s for secret %s" % (step, arn))
raise ValueError("Invalid step parameter %s for secret %s" % (step, arn))
Expand Down Expand Up @@ -260,6 +263,31 @@ def finish_secret(service_client, arn, token):
logger.info("finishSecret: Successfully set AWSCURRENT stage to version %s for secret %s." % (token, arn))


def reset_pending_secret(service_client, arn, token):
"""Clears the pending secret so that a new one can be generated.

Args:
service_client (client): The secrets manager service client

arn (string): The secret ARN or other identifier

token (string): The ClientRequestToken associated with the secret version

"""
secret = service_client.get_secret_value(
SecretId=arn, VersionId=token, VersionStage="AWSPENDING"
)
version_id = secret["VersionId"]

service_client.update_secret_version_stage(
SecretId=arn, VersionStage="AWSPENDING", RemoveFromVersionId=version_id
)
logger.info(
"resetPendingSecret: Successfully removed AWSPENDING stage from version %s for secret %s."
% (token, arn)
)


# --- module internal functions


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def lambda_handler(event, context):
event (dict): Lambda dictionary of event parameters. These keys must include the following:
- SecretId: The secret ARN or identifier
- ClientRequestToken: The ClientRequestToken of the secret version
- Step: The rotation step (one of createSecret, setSecret, testSecret, or finishSecret)
- Step: The rotation step (one of createSecret, setSecret, testSecret, finishSecret or resetPendingSecret)

context (LambdaContext): The Lambda runtime information

Expand Down Expand Up @@ -91,6 +91,9 @@ def lambda_handler(event, context):
elif step == "finishSecret":
finish_secret(service_client, arn, token)

elif step == "resetPendingSecret":
reset_pending_secret(service_client, arn, token)

else:
logger.error("lambda_handler: Invalid step parameter %s for secret %s" % (step, arn))
raise ValueError("Invalid step parameter %s for secret %s" % (step, arn))
Expand Down Expand Up @@ -124,7 +127,7 @@ def create_secret(service_client, arn, token):
logger.info("createSecret: Successfully retrieved secret for %s." % arn)
except service_client.exceptions.ResourceNotFoundException:
# Generate a random password
current_dict['password'] = CognitoTokenService.generate_password()
current_dict['password'] = token_srv.generate_password()
# Put the secret
service_client.put_secret_value(SecretId=arn, ClientRequestToken=token, SecretString=json.dumps(current_dict), VersionStages=['AWSPENDING'])
logger.info("createSecret: Successfully put secret for ARN %s and version %s." % (arn, token))
Expand Down Expand Up @@ -254,6 +257,31 @@ def finish_secret(service_client, arn, token):
logger.info("finishSecret: Successfully set AWSCURRENT stage to version %s for secret %s." % (token, arn))


def reset_pending_secret(service_client, arn, token):
"""Clears the pending secret so that a new one can be generated.

Args:
service_client (client): The secrets manager service client

arn (string): The secret ARN or other identifier

token (string): The ClientRequestToken associated with the secret version

"""
secret = service_client.get_secret_value(
SecretId=arn, VersionId=token, VersionStage="AWSPENDING"
)
version_id = secret["VersionId"]

service_client.update_secret_version_stage(
SecretId=arn, VersionStage="AWSPENDING", RemoveFromVersionId=version_id
)
logger.info(
"resetPendingSecret: Successfully removed AWSPENDING stage from version %s for secret %s."
% (token, arn)
)


# --- module internal functions


Expand Down
Loading