From 9efc8dc07d44a0d3bd0afb99bece4e27d821f1eb Mon Sep 17 00:00:00 2001 From: Geoff V Date: Thu, 14 Nov 2019 12:47:08 -0500 Subject: [PATCH] Fix 355 - Don't duplicate ? if qs in redirect url Instead of appending a ? or # (implicit), parse the query string and fragment for parameters. Add our parameters and rebuild the redirect uri This should hopfully be more robust to redirect_uri's containing query parametes / fragment parameters One thing that it will not account for is if the fragment doesn't contain a parameter, but a single argument. That would be lost. But I'm not sure if you could legally combine those in a fragment? eg: https://example.com/#anchor¶m=value This also adds tests for AuthorizeError.create_uri and fixes updates the test_missing_nonce test to account for this change. --- oidc_provider/lib/errors.py | 51 ++++-- .../tests/cases/test_authorize_endpoint.py | 3 +- oidc_provider/tests/cases/test_error.py | 162 ++++++++++++++++++ 3 files changed, 201 insertions(+), 15 deletions(-) create mode 100644 oidc_provider/tests/cases/test_error.py diff --git a/oidc_provider/lib/errors.py b/oidc_provider/lib/errors.py index 318fb969..6ce4df5e 100644 --- a/oidc_provider/lib/errors.py +++ b/oidc_provider/lib/errors.py @@ -1,7 +1,18 @@ try: - from urllib.parse import quote + from urllib.parse import ( + urlparse, + urlunparse, + urlencode, + parse_qsl + ) + except ImportError: - from urllib import quote + from urllib import urlencode + from urlparse import ( + urlparse, + urlunparse, + parse_qsl + ) class RedirectUriError(Exception): @@ -105,21 +116,33 @@ def __init__(self, redirect_uri, error, grant_type): self.grant_type = grant_type def create_uri(self, redirect_uri, state): - description = quote(self.description) + """Return uri with error, error_description and optional state""" # See: # http://openid.net/specs/openid-connect-core-1_0.html#ImplicitAuthError - hash_or_question = '#' if self.grant_type == 'implicit' else '?' - - uri = '{0}{1}error={2}&error_description={3}'.format( - redirect_uri, - hash_or_question, - self.error, - description) - - # Add state if present. - uri = uri + ('&state={0}'.format(state) if state else '') - + ours = dict({ + 'error': self.error, 'error_description': self.description}) + if state: + ours['state'] = state + + parsed = list(urlparse(redirect_uri)) + query_params = dict(parse_qsl(parsed[4])) + # This assumes redirect_url would never contain a non-parameter type + # fragment + frag_params = dict(parse_qsl(parsed[5])) + + # Update the appropriate params with ours + if self.grant_type == 'implicit': + frag_params.update(ours) + else: + query_params.update(ours) + + # Rebuild the uri with updated query and fragments + query_params = urlencode(query_params) + frag_params = urlencode(frag_params) + parsed[4] = query_params + parsed[5] = frag_params + uri = urlunparse(parsed) return uri diff --git a/oidc_provider/tests/cases/test_authorize_endpoint.py b/oidc_provider/tests/cases/test_authorize_endpoint.py index 7ccd0a23..f0e54ca4 100644 --- a/oidc_provider/tests/cases/test_authorize_endpoint.py +++ b/oidc_provider/tests/cases/test_authorize_endpoint.py @@ -532,7 +532,8 @@ def test_missing_nonce(self): response = self._auth_request('get', data, is_user_authenticated=True) - self.assertIn('#error=invalid_request', response['Location']) + self.assertIn('#', response['Location']) + self.assertIn('error=invalid_request', response['Location']) def test_idtoken_token_response(self): """ diff --git a/oidc_provider/tests/cases/test_error.py b/oidc_provider/tests/cases/test_error.py new file mode 100644 index 00000000..9ea6ed2b --- /dev/null +++ b/oidc_provider/tests/cases/test_error.py @@ -0,0 +1,162 @@ +from django.test import TestCase +from oidc_provider.lib.errors import AuthorizeError + +try: + from urllib.parse import ( + urlparse, + parse_qsl + ) +except ImportError: + from urlparse import ( + urlparse, + parse_qsl + ) + + +def compare(expected, created): + """Compare expected and created urls""" + ex_parsed = list(urlparse(expected)) + ex_qp = dict(parse_qsl(ex_parsed[4])) + ex_frag = dict(parse_qsl(ex_parsed[5])) + + cr_parsed = list(urlparse(created)) + cr_qp = dict(parse_qsl(cr_parsed[4])) + cr_frag = dict(parse_qsl(cr_parsed[5])) + + # Validate scheme, netloc, path match + assert ex_parsed[:3] == cr_parsed[:3] + # Validate qp and frags match + assert ex_qp == cr_qp + assert ex_frag == cr_frag + + +class TestImplicitAuthorizeErrorNonImplicit(TestCase): + """Tests with grant_type code - all responses in query params""" + redirect_uri = 'https://example.com/' + grant_type = 'code' + error = 'login_required' + desc = 'The+Authorization+Server+requires+End-User+authentication' + + def test_no_params(self): + """Test with a path only and no query/frag params""" + redirect_uri = self.redirect_uri + 'path' + error = AuthorizeError(redirect_uri, self.error, self.grant_type) + created_uri = error.create_uri(redirect_uri, '') + expected_uri = '{}?error={}&error_description={}'.format( + redirect_uri, self.error, self.desc) + compare(expected_uri, created_uri) + + def test_query_params_only(self): + """Test with query param in redirect uri""" + redirect_uri = self.redirect_uri + "path/?action=something" + error = AuthorizeError(redirect_uri, self.error, self.grant_type) + created_uri = error.create_uri(redirect_uri, '') + expected_uri = '{}&error={}&error_description={}'.format( + redirect_uri, self.error, self.desc) + compare(expected_uri, created_uri) + + def test_frag_params_only(self): + """Test with fragment params only""" + redirect_uri = self.redirect_uri + 'path' + frag = '#action=something' + error = AuthorizeError(redirect_uri + frag, self.error, self.grant_type) + created_uri = error.create_uri(redirect_uri + frag, '') + expected_uri = '{}path?error={}&error_description={}{}'.format( + self.redirect_uri, self.error, self.desc, frag) + compare(expected_uri, created_uri) + + def test_query_and_frag_params(self): + """Test with both qp's and fragment""" + redirect_uri = self.redirect_uri + 'path?my_qp=test' + frag = '#action=something' + error = AuthorizeError(redirect_uri + frag, self.error, self.grant_type) + created_uri = error.create_uri(redirect_uri + frag, '') + expected_uri = '{}path?my_qp=test&error={}&error_description={}{}' \ + .format(self.redirect_uri, self.error, self.desc, frag) + compare(expected_uri, created_uri) + + def test_with_state(self): + """Test with state""" + redirect_uri = self.redirect_uri + 'path' + state = 'my_state' + error = AuthorizeError(redirect_uri, self.error, self.grant_type) + created_uri = error.create_uri(redirect_uri, state) + expected_uri = '{}path?error={}&error_description={}&state={}' \ + .format(self.redirect_uri, self.error, self.desc, state) + compare(expected_uri, created_uri) + + def test_with_deep_link(self): + """Test with a non-http schema; deep link style (think slack://)""" + redirect_uri = 'slack://example.com/path' + state = 'my_state' + error = AuthorizeError(redirect_uri, self.error, self.grant_type) + created_uri = error.create_uri(redirect_uri, state) + expected_uri = '{}?error={}&error_description={}&state={}' \ + .format(redirect_uri, self.error, self.desc, state) + compare(expected_uri, created_uri) + + +class TestImplicitAuthorizeErrorImplicit(TestCase): + """Tests with grant_type code - all responses in query params""" + redirect_uri = 'https://example.com/' + grant_type = 'implicit' + error = 'login_required' + desc = 'The+Authorization+Server+requires+End-User+authentication' + + def test_no_params(self): + """Test with a path only and no query/frag params""" + redirect_uri = self.redirect_uri + 'path' + error = AuthorizeError(redirect_uri, self.error, self.grant_type) + created_uri = error.create_uri(redirect_uri, '') + expected_uri = '{}#error={}&error_description={}'.format( + redirect_uri, self.error, self.desc) + compare(expected_uri, created_uri) + + def test_query_params_only(self): + """Test with query param in redirect uri""" + redirect_uri = self.redirect_uri + "path/?action=something" + error = AuthorizeError(redirect_uri, self.error, self.grant_type) + created_uri = error.create_uri(redirect_uri, '') + expected_uri = '{}#error={}&error_description={}'.format( + redirect_uri, self.error, self.desc) + compare(expected_uri, created_uri) + + def test_frag_params_only(self): + """Test with fragment params only""" + redirect_uri = self.redirect_uri + 'path' + frag = '#action=something' + error = AuthorizeError(redirect_uri + frag, self.error, self.grant_type) + created_uri = error.create_uri(redirect_uri + frag, '') + expected_uri = '{}path{}&error={}&error_description={}'.format( + self.redirect_uri, frag, self.error, self.desc) + compare(expected_uri, created_uri) + + def test_query_and_frag_params(self): + """Test with both qp's and fragment""" + redirect_uri = self.redirect_uri + 'path?my_qp=test' + frag = '#action=something' + error = AuthorizeError(redirect_uri + frag, self.error, self.grant_type) + created_uri = error.create_uri(redirect_uri + frag, '') + expected_uri = '{}path?my_qp=test{}&error={}&error_description={}' \ + .format(self.redirect_uri, frag, self.error, self.desc) + compare(expected_uri, created_uri) + + def test_with_state(self): + """Test with state""" + redirect_uri = self.redirect_uri + 'path' + state = 'my_state' + error = AuthorizeError(redirect_uri, self.error, self.grant_type) + created_uri = error.create_uri(redirect_uri, state) + expected_uri = '{}path#error={}&error_description={}&state={}' \ + .format(self.redirect_uri, self.error, self.desc, state) + compare(expected_uri, created_uri) + + def test_with_deep_link(self): + """Test with a non-http schema; deep link style (think slack://)""" + redirect_uri = 'slack://example.com/path' + state = 'my_state' + error = AuthorizeError(redirect_uri, self.error, self.grant_type) + created_uri = error.create_uri(redirect_uri, state) + expected_uri = '{}#error={}&error_description={}&state={}' \ + .format(redirect_uri, self.error, self.desc, state) + compare(expected_uri, created_uri)