Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/requirements update #126

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
boto3>=1.4.3
envs>=0.3.0
python-jose-cryptodome>=1.3.2
python-jose[pycryptodome]>=3.0.0
requests>=2.13.0
6 changes: 5 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import os

from setuptools import setup, find_packages
from pip.req import parse_requirements

try: # for pip >= 10
from pip._internal.req import parse_requirements
except ImportError: # for pip <= 9.0.3
from pip.req import parse_requirements

install_reqs = parse_requirements('requirements.txt', session=False)
test_reqs = parse_requirements('requirements_test.txt', session=False)
Expand Down
45 changes: 38 additions & 7 deletions warrant/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ def check_token(self, renew=True):
expired = True
if renew:
self.renew_access_token()
expired = self.check_token(renew=False)
else:
expired = False
return expired
Expand Down Expand Up @@ -365,6 +366,12 @@ def admin_authenticate(self, password):
AuthParameters=auth_params,
)

if tokens.get('ChallengeName') is not None:
if tokens.get('ChallengeName') == aws_srp.AWSSRP.NEW_PASSWORD_REQUIRED_CHALLENGE:
raise aws_srp.ForceChangePasswordException('Change password before authenticating')
else:
raise NotImplementedError('The %s challenge is not supported' % tokens.get('ChallengeName'))

self.verify_token(tokens['AuthenticationResult']['IdToken'], 'id_token','id')
self.refresh_token = tokens['AuthenticationResult']['RefreshToken']
self.verify_token(tokens['AuthenticationResult']['AccessToken'], 'access_token','access')
Expand Down Expand Up @@ -497,7 +504,7 @@ def admin_get_user(self, attr_map=None):
attribute_list=user.get('UserAttributes'),
metadata=user_metadata,attr_map=attr_map)

def admin_create_user(self, username, temporary_password='', attr_map=None, **kwargs):
def admin_create_user(self, username, temporary_password='', attr_map=None, email_delivery=False, sms_delivery=False, **kwargs):
"""
Create a user using admin super privileges.
:param username: User Pool username
Expand All @@ -507,18 +514,40 @@ def admin_create_user(self, username, temporary_password='', attr_map=None, **kw
:param kwargs: Additional User Pool attributes
:return response: Response from Cognito
"""
response = self.client.admin_create_user(
UserPoolId=self.user_pool_id,
Username=username,
UserAttributes=dict_to_cognito(kwargs, attr_map),
TemporaryPassword=temporary_password,
)
delivery_mediums = []
if email_delivery:
delivery_mediums.append("EMAIL")
if sms_delivery:
delivery_mediums.append("SMS")
args = {
'UserPoolId': self.user_pool_id,
'Username': username,
'UserAttributes': dict_to_cognito(kwargs, attr_map),
'DesiredDeliveryMediums': delivery_mediums
}
if temporary_password is not None:
args['TemporaryPassword'] = temporary_password
response = self.client.admin_create_user(**args)
kwargs.update(username=username)
self._set_attributes(response, kwargs)

response.pop('ResponseMetadata')
return response

def admin_reset_password(self, username):
"""
Reset a password using admin super privileges.
:param username: User Pool username
:return response: Response from Cognito
"""
args = {
'UserPoolId': self.user_pool_id,
'Username': username
}
response = self.client.admin_reset_user_password(**args)
response.pop('ResponseMetadata')
return response

def send_verification(self, attribute='email'):
"""
Sends the user an attribute verification code for the specified attribute name.
Expand Down Expand Up @@ -548,7 +577,9 @@ def renew_access_token(self):
Sets a new access token on the User using the refresh token.
"""
auth_params = {'REFRESH_TOKEN': self.refresh_token}

self._add_secret_hash(auth_params, 'SECRET_HASH')

refresh_response = self.client.initiate_auth(
ClientId=self.client_id,
AuthFlow='REFRESH_TOKEN',
Expand Down
14 changes: 12 additions & 2 deletions warrant/aws_srp.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ def process_challenge(self, challenge_parameters):
if self.client_secret is not None:
response.update({
"SECRET_HASH":
self.get_secret_hash(self.username, self.client_id, self.client_secret)})
self.get_secret_hash(user_id_for_srp, self.client_id, self.client_secret)})
return response

def authenticate_user(self, client=None):
Expand Down Expand Up @@ -235,10 +235,20 @@ def set_new_password_challenge(self, new_password, client=None):
ChallengeResponses=challenge_response)

if tokens['ChallengeName'] == self.NEW_PASSWORD_REQUIRED_CHALLENGE:
challenge_parameters = response['ChallengeParameters']
user_id_for_srp = challenge_parameters.get('USER_ID_FOR_SRP')

if user_id_for_srp is None:
user_id_for_srp = self.username

challenge_response = {
'USERNAME': auth_params['USERNAME'],
'USERNAME': user_id_for_srp,
'NEW_PASSWORD': new_password
}

if self.client_secret is not None:
challenge_response['SECRET_HASH'] = self.get_secret_hash(user_id_for_srp, self.client_id, self.client_secret)

new_password_response = boto_client.respond_to_auth_challenge(
ClientId=self.client_id,
ChallengeName=self.NEW_PASSWORD_REQUIRED_CHALLENGE,
Expand Down