diff --git a/src/satosa/backends/saml2.py b/src/satosa/backends/saml2.py index be7a095fb..f1c21c204 100644 --- a/src/satosa/backends/saml2.py +++ b/src/satosa/backends/saml2.py @@ -404,11 +404,18 @@ def authn_response(self, context, binding): del self.outstanding_queries[req_id] # check if the relay_state matches the cookie state - if context.state[self.name]["relay_state"] != context.request["RelayState"]: - msg = "State did not match relay state for state" - logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg) - logger.debug(logline) - raise SATOSAAuthenticationError(context.state, "State did not match relay state") + try: + if context.state[self.name]["relay_state"] != context.request["RelayState"]: + msg = "State did not match relay state for state" + logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg) + logger.debug(logline) + raise SATOSAAuthenticationError(context.state, "State did not match relay state") + except KeyError: + raise SATOSAAuthenticationError( + context.state, + "Unable to verify the relay state; the request initiation address may not match " + "the ACS address", + ) context.decorate(Context.KEY_METADATA_STORE, self.sp.metadata) if self.config.get(SAMLBackend.KEY_MEMORIZE_IDP): diff --git a/tests/satosa/backends/test_saml2.py b/tests/satosa/backends/test_saml2.py index dcfdb0fa9..7e8a75403 100644 --- a/tests/satosa/backends/test_saml2.py +++ b/tests/satosa/backends/test_saml2.py @@ -21,6 +21,7 @@ from satosa.backends.saml2 import SAMLBackend from satosa.context import Context +from satosa.exception import SATOSAAuthenticationError from satosa.internal import InternalData from tests.users import USERS from tests.util import FakeIdP, create_metadata_from_config_dict, FakeSP @@ -241,13 +242,11 @@ def test_unknown_or_no_hostname_selects_first_acs( def test_authn_response(self, context, idp_conf, sp_conf): response_binding = BINDING_HTTP_REDIRECT - fakesp = FakeSP(SPConfig().load(sp_conf)) - fakeidp = FakeIdP(USERS, config=IdPConfig().load(idp_conf)) - destination, request_params = fakesp.make_auth_req(idp_conf["entityid"]) - url, auth_resp = fakeidp.handle_auth_req(request_params["SAMLRequest"], request_params["RelayState"], - BINDING_HTTP_REDIRECT, - "testuser1", response_binding=response_binding) - + request_params, auth_resp = self._perform_request_response( + idp_conf, + sp_conf, + response_binding + ) context.request = auth_resp context.state[self.samlbackend.name] = {"relay_state": request_params["RelayState"]} self.samlbackend.authn_response(context, response_binding) @@ -256,29 +255,53 @@ def test_authn_response(self, context, idp_conf, sp_conf): assert_authn_response(internal_resp) assert self.samlbackend.name not in context.state + def _perform_request_response( + self, idp_conf, sp_conf, response_binding, receive_nameid=True + ): + fakesp = FakeSP(SPConfig().load(sp_conf)) + fakeidp = FakeIdP(USERS, config=IdPConfig().load(idp_conf)) + destination, request_params = fakesp.make_auth_req(idp_conf["entityid"]) + auth_resp_func = ( + fakeidp.handle_auth_req + if receive_nameid + else fakeidp.handle_auth_req_no_name_id + ) + url, auth_resp = auth_resp_func( + request_params["SAMLRequest"], + request_params["RelayState"], + BINDING_HTTP_REDIRECT, + "testuser1", + response_binding=response_binding, + ) + + return request_params, auth_resp + + def test_no_relay_state_raises_error(self, context, idp_conf, sp_conf): + response_binding = BINDING_HTTP_REDIRECT + request_params, auth_resp = self._perform_request_response( + idp_conf, + sp_conf, + response_binding + ) + context.request = auth_resp + # not setting context.state[self.samlbackend.name] to simulate a request + # without a session id + + with pytest.raises(SATOSAAuthenticationError): + self.samlbackend.authn_response(context, response_binding) + @pytest.mark.skipif( saml2.__version__ < '4.6.1', reason="Optional NameID needs pysaml2 v4.6.1 or higher") def test_authn_response_no_name_id(self, context, idp_conf, sp_conf): response_binding = BINDING_HTTP_REDIRECT - fakesp_conf = SPConfig().load(sp_conf) - fakesp = FakeSP(fakesp_conf) - - fakeidp_conf = IdPConfig().load(idp_conf) - fakeidp = FakeIdP(USERS, config=fakeidp_conf) - - destination, request_params = fakesp.make_auth_req( - idp_conf["entityid"]) - - # Use the fake IdP to mock up an authentication request that has no - # element. - url, auth_resp = fakeidp.handle_auth_req_no_name_id( - request_params["SAMLRequest"], - request_params["RelayState"], - BINDING_HTTP_REDIRECT, - "testuser1", - response_binding=response_binding) + request_params, auth_resp = self._perform_request_response( + idp_conf, + sp_conf, + response_binding, + receive_nameid=False, + ) backend = self.samlbackend